2020-08-05 10:49:43 +08:00
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
2022-11-29 18:50:04 +08:00
import copy
import json
2020-08-05 10:49:43 +08:00
import logging
2022-11-29 18:50:04 +08:00
import multiprocessing
2020-08-05 10:49:43 +08:00
import os
2022-11-29 18:50:04 +08:00
import shutil
2020-08-05 10:49:43 +08:00
import signal
2022-11-29 18:50:04 +08:00
import socket
import struct
2020-08-05 10:49:43 +08:00
import subprocess
2022-11-29 18:50:04 +08:00
import sys
2020-10-13 09:52:59 +08:00
import tempfile
2022-11-29 18:50:04 +08:00
import time
2020-08-05 10:49:43 +08:00
from contextlib import closing
2021-10-21 14:07:13 +08:00
import paddle . utils . cpp_extension . extension_utils as utils
2023-03-25 15:29:52 +08:00
from paddle import framework
2024-07-03 10:41:41 +08:00
from paddle . utils import strtobool
2022-06-05 10:58:58 +08:00
2020-08-05 10:49:43 +08:00
logger = logging . getLogger ( " root " )
logger . propagate = False
2022-10-23 20:01:27 +08:00
class DistributeMode :
2020-10-13 09:52:59 +08:00
"""
There are various mode for fleetrun, each of them is designed for different model.
"""
2022-10-23 20:01:27 +08:00
2020-10-13 09:52:59 +08:00
COLLECTIVE = 0
PS = 1
PS_HETER = 2
2022-10-23 20:01:27 +08:00
class DeviceMode :
2020-11-26 10:35:26 +08:00
"""
Training devices type
"""
2022-10-23 20:01:27 +08:00
2021-02-03 10:45:47 +08:00
UNKNOWN = - 1
2020-11-26 10:35:26 +08:00
CPU = 0
GPU = 1
KUNLUN = 2
2021-02-03 10:45:47 +08:00
XPU = 2
2020-11-26 10:35:26 +08:00
2022-11-08 11:29:41 +08:00
class Cluster :
2020-08-05 10:49:43 +08:00
def __init__ ( self , hdfs ) :
self . job_server = None
self . pods = [ ]
self . hdfs = None
self . job_stage_flag = None
def __str__ ( self ) :
2024-04-01 10:20:33 +08:00
return f " job_server: { self . job_server } pods: { [ str ( pod ) for pod in self . pods ] } job_stage_flag: { self . job_stage_flag } hdfs: { self . hdfs } "
2020-08-05 10:49:43 +08:00
def __eq__ ( self , cluster ) :
if len ( self . pods ) != len ( cluster . pods ) :
return False
for a , b in zip ( self . pods , cluster . pods ) :
if a != b :
return False
if self . job_stage_flag != cluster . job_stage_flag :
return False
return True
def __ne__ ( self , cluster ) :
return not self . __eq__ ( cluster )
2021-06-08 15:42:39 +08:00
def update_pods ( self , cluster ) :
2020-08-05 10:49:43 +08:00
self . pods = copy . copy ( cluster . pods )
def trainers_nranks ( self ) :
return len ( self . trainers_endpoints ( ) )
def pods_nranks ( self ) :
return len ( self . pods )
def trainers_endpoints ( self ) :
r = [ ]
for pod in self . pods :
for t in pod . trainers :
r . append ( t . endpoint )
return r
【NPU】Merge ascend GE&distributed code by 0208 from ascendrc (#31957)
* Ascend rc (#30483)
* Fix compilcation on CANN20.1 and older (#30494)
Fix compilcation on CANN20.1 and older
* Add distribution supported (#30578)
Add distribution supported
* Build praser for Hcom* operators (#30627)
Build praser for Hcom* operators
* Pass device_ids info from launch to trainer. (#30632)
Pass device_ids info from launch to trainer
* Add Hccl program group (#30642)
Add Hccl program group
* Add startup bash files of test_ascend_group. (#30645)
Add startup bash files of test_ascend_group
* cleanup (#30646)
cleanup test_ascend_group.py
* [Feature] Build parser to support distributed training (#30658)
[Feature] Build parser to support distributed training
* fix compilation on ascend-20.1 (#30722)
fix compilation on ascend-20.1
* Dev/fix ascend string (#30749)
Dev/fix ascend string
* code style (#30781)
code style
* Merge ascend_optimizer and ascend_parser. (#30776)
Merge ascend_optimizer and ascend_parser.
* Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug (#30797)
Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug
* Add paddle ascend distribution training supported (#30796)
Add paddle ascend distribution training supported
* pass cxx_flags to gloo cmake (#30857)
* Destroy session first. (#30954)
Destroy session first.
* merge
* fix, test=develop
* fix, test=develop
* fix style, test=develop
* fix, test=develop
* fix
* fix log fatal, test=develop
* fix enforce style, test=develop
* fix, test=develop
* fix, test=develop
* fix rccl, test=develop
* fix test, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix node_num, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
Co-authored-by: hutuxian <hutuxian2011@sina.cn>
Co-authored-by: gongweibao <weibao.gong@gmail.com>
Co-authored-by: Void Main <voidmain1313113@gmail.com>
Co-authored-by: Leo Chen <chenqiuliang@baidu.com>
Co-authored-by: dingsiyu <18369187719@163.com>
Co-authored-by: OleNet <olenet@126.com>
2021-04-07 19:06:26 +08:00
def world_device_ids ( self ) :
r = [ ]
for pod in self . pods :
for t in pod . trainers :
str_accelerators = [ str ( acc ) for acc in t . accelerators ]
r . append ( str_accelerators )
return r
2020-08-05 10:49:43 +08:00
def pods_endpoints ( self ) :
r = [ ]
for pod in self . pods :
2023-03-31 10:11:56 +08:00
ep = f " { pod . addr } : { pod . port } "
2025-08-21 02:03:08 +08:00
assert pod . port is not None and pod . addr is not None , (
f " { ep } not a valid endpoint "
)
2020-08-05 10:49:43 +08:00
r . append ( ep )
return r
def get_pod_by_id ( self , pod_id ) :
for pod in self . pods :
if str ( pod_id ) == str ( pod . id ) :
return pod
return None
2022-11-08 11:29:41 +08:00
class JobServer :
2020-08-05 10:49:43 +08:00
def __init__ ( self ) :
self . endpoint = None
def __str__ ( self ) :
2023-03-31 10:11:56 +08:00
return f " { self . endpoint } "
2020-08-05 10:49:43 +08:00
def __eq__ ( self , j ) :
2024-02-20 12:00:53 +08:00
return self . endpoint == j . endpoint
2020-08-05 10:49:43 +08:00
def __ne__ ( self , j ) :
return not self == j
2022-11-08 11:29:41 +08:00
class Trainer :
2020-08-05 10:49:43 +08:00
def __init__ ( self ) :
【NPU】Merge ascend GE&distributed code by 0208 from ascendrc (#31957)
* Ascend rc (#30483)
* Fix compilcation on CANN20.1 and older (#30494)
Fix compilcation on CANN20.1 and older
* Add distribution supported (#30578)
Add distribution supported
* Build praser for Hcom* operators (#30627)
Build praser for Hcom* operators
* Pass device_ids info from launch to trainer. (#30632)
Pass device_ids info from launch to trainer
* Add Hccl program group (#30642)
Add Hccl program group
* Add startup bash files of test_ascend_group. (#30645)
Add startup bash files of test_ascend_group
* cleanup (#30646)
cleanup test_ascend_group.py
* [Feature] Build parser to support distributed training (#30658)
[Feature] Build parser to support distributed training
* fix compilation on ascend-20.1 (#30722)
fix compilation on ascend-20.1
* Dev/fix ascend string (#30749)
Dev/fix ascend string
* code style (#30781)
code style
* Merge ascend_optimizer and ascend_parser. (#30776)
Merge ascend_optimizer and ascend_parser.
* Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug (#30797)
Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug
* Add paddle ascend distribution training supported (#30796)
Add paddle ascend distribution training supported
* pass cxx_flags to gloo cmake (#30857)
* Destroy session first. (#30954)
Destroy session first.
* merge
* fix, test=develop
* fix, test=develop
* fix style, test=develop
* fix, test=develop
* fix
* fix log fatal, test=develop
* fix enforce style, test=develop
* fix, test=develop
* fix, test=develop
* fix rccl, test=develop
* fix test, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix node_num, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
Co-authored-by: hutuxian <hutuxian2011@sina.cn>
Co-authored-by: gongweibao <weibao.gong@gmail.com>
Co-authored-by: Void Main <voidmain1313113@gmail.com>
Co-authored-by: Leo Chen <chenqiuliang@baidu.com>
Co-authored-by: dingsiyu <18369187719@163.com>
Co-authored-by: OleNet <olenet@126.com>
2021-04-07 19:06:26 +08:00
self . accelerators = [ ]
2020-08-05 10:49:43 +08:00
self . endpoint = None
self . rank = None
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
self . stage = None
2020-08-05 10:49:43 +08:00
def __str__ ( self ) :
2024-04-01 10:20:33 +08:00
return f " accelerator: { self . accelerators } endpoint: { self . endpoint } rank: { self . rank } "
2020-08-05 10:49:43 +08:00
def __eq__ ( self , t ) :
【NPU】Merge ascend GE&distributed code by 0208 from ascendrc (#31957)
* Ascend rc (#30483)
* Fix compilcation on CANN20.1 and older (#30494)
Fix compilcation on CANN20.1 and older
* Add distribution supported (#30578)
Add distribution supported
* Build praser for Hcom* operators (#30627)
Build praser for Hcom* operators
* Pass device_ids info from launch to trainer. (#30632)
Pass device_ids info from launch to trainer
* Add Hccl program group (#30642)
Add Hccl program group
* Add startup bash files of test_ascend_group. (#30645)
Add startup bash files of test_ascend_group
* cleanup (#30646)
cleanup test_ascend_group.py
* [Feature] Build parser to support distributed training (#30658)
[Feature] Build parser to support distributed training
* fix compilation on ascend-20.1 (#30722)
fix compilation on ascend-20.1
* Dev/fix ascend string (#30749)
Dev/fix ascend string
* code style (#30781)
code style
* Merge ascend_optimizer and ascend_parser. (#30776)
Merge ascend_optimizer and ascend_parser.
* Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug (#30797)
Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug
* Add paddle ascend distribution training supported (#30796)
Add paddle ascend distribution training supported
* pass cxx_flags to gloo cmake (#30857)
* Destroy session first. (#30954)
Destroy session first.
* merge
* fix, test=develop
* fix, test=develop
* fix style, test=develop
* fix, test=develop
* fix
* fix log fatal, test=develop
* fix enforce style, test=develop
* fix, test=develop
* fix, test=develop
* fix rccl, test=develop
* fix test, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix node_num, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
Co-authored-by: hutuxian <hutuxian2011@sina.cn>
Co-authored-by: gongweibao <weibao.gong@gmail.com>
Co-authored-by: Void Main <voidmain1313113@gmail.com>
Co-authored-by: Leo Chen <chenqiuliang@baidu.com>
Co-authored-by: dingsiyu <18369187719@163.com>
Co-authored-by: OleNet <olenet@126.com>
2021-04-07 19:06:26 +08:00
if len ( self . accelerators ) != len ( t . accelerators ) :
2020-08-05 10:49:43 +08:00
return False
2022-10-23 20:01:27 +08:00
if self . endpoint != t . endpoint or self . rank != t . rank :
2020-08-05 10:49:43 +08:00
return False
【NPU】Merge ascend GE&distributed code by 0208 from ascendrc (#31957)
* Ascend rc (#30483)
* Fix compilcation on CANN20.1 and older (#30494)
Fix compilcation on CANN20.1 and older
* Add distribution supported (#30578)
Add distribution supported
* Build praser for Hcom* operators (#30627)
Build praser for Hcom* operators
* Pass device_ids info from launch to trainer. (#30632)
Pass device_ids info from launch to trainer
* Add Hccl program group (#30642)
Add Hccl program group
* Add startup bash files of test_ascend_group. (#30645)
Add startup bash files of test_ascend_group
* cleanup (#30646)
cleanup test_ascend_group.py
* [Feature] Build parser to support distributed training (#30658)
[Feature] Build parser to support distributed training
* fix compilation on ascend-20.1 (#30722)
fix compilation on ascend-20.1
* Dev/fix ascend string (#30749)
Dev/fix ascend string
* code style (#30781)
code style
* Merge ascend_optimizer and ascend_parser. (#30776)
Merge ascend_optimizer and ascend_parser.
* Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug (#30797)
Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug
* Add paddle ascend distribution training supported (#30796)
Add paddle ascend distribution training supported
* pass cxx_flags to gloo cmake (#30857)
* Destroy session first. (#30954)
Destroy session first.
* merge
* fix, test=develop
* fix, test=develop
* fix style, test=develop
* fix, test=develop
* fix
* fix log fatal, test=develop
* fix enforce style, test=develop
* fix, test=develop
* fix, test=develop
* fix rccl, test=develop
* fix test, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix node_num, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
Co-authored-by: hutuxian <hutuxian2011@sina.cn>
Co-authored-by: gongweibao <weibao.gong@gmail.com>
Co-authored-by: Void Main <voidmain1313113@gmail.com>
Co-authored-by: Leo Chen <chenqiuliang@baidu.com>
Co-authored-by: dingsiyu <18369187719@163.com>
Co-authored-by: OleNet <olenet@126.com>
2021-04-07 19:06:26 +08:00
for a , b in zip ( self . accelerators , t . accelerators ) :
2020-08-05 10:49:43 +08:00
if a != b :
return False
return True
def __ne__ ( self , t ) :
return not self == t
def rank ( self ) :
return self . rank
2022-11-08 11:29:41 +08:00
class Pod :
2020-08-05 10:49:43 +08:00
def __init__ ( self ) :
self . rank = None
self . id = None
self . addr = None
self . port = None
self . trainers = [ ]
2020-08-10 10:26:30 +08:00
self . servers = [ ]
self . workers = [ ]
2022-07-26 18:43:23 +08:00
self . coordinators = [ ]
2020-10-13 09:52:59 +08:00
self . heter_workers = [ ]
【NPU】Merge ascend GE&distributed code by 0208 from ascendrc (#31957)
* Ascend rc (#30483)
* Fix compilcation on CANN20.1 and older (#30494)
Fix compilcation on CANN20.1 and older
* Add distribution supported (#30578)
Add distribution supported
* Build praser for Hcom* operators (#30627)
Build praser for Hcom* operators
* Pass device_ids info from launch to trainer. (#30632)
Pass device_ids info from launch to trainer
* Add Hccl program group (#30642)
Add Hccl program group
* Add startup bash files of test_ascend_group. (#30645)
Add startup bash files of test_ascend_group
* cleanup (#30646)
cleanup test_ascend_group.py
* [Feature] Build parser to support distributed training (#30658)
[Feature] Build parser to support distributed training
* fix compilation on ascend-20.1 (#30722)
fix compilation on ascend-20.1
* Dev/fix ascend string (#30749)
Dev/fix ascend string
* code style (#30781)
code style
* Merge ascend_optimizer and ascend_parser. (#30776)
Merge ascend_optimizer and ascend_parser.
* Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug (#30797)
Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug
* Add paddle ascend distribution training supported (#30796)
Add paddle ascend distribution training supported
* pass cxx_flags to gloo cmake (#30857)
* Destroy session first. (#30954)
Destroy session first.
* merge
* fix, test=develop
* fix, test=develop
* fix style, test=develop
* fix, test=develop
* fix
* fix log fatal, test=develop
* fix enforce style, test=develop
* fix, test=develop
* fix, test=develop
* fix rccl, test=develop
* fix test, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix node_num, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
Co-authored-by: hutuxian <hutuxian2011@sina.cn>
Co-authored-by: gongweibao <weibao.gong@gmail.com>
Co-authored-by: Void Main <voidmain1313113@gmail.com>
Co-authored-by: Leo Chen <chenqiuliang@baidu.com>
Co-authored-by: dingsiyu <18369187719@163.com>
Co-authored-by: OleNet <olenet@126.com>
2021-04-07 19:06:26 +08:00
self . accelerators = [ ]
self . device_mode = None
2020-08-05 10:49:43 +08:00
def __str__ ( self ) :
2024-04-01 10:20:33 +08:00
return f " rank: { self . rank } id: { self . id } addr: { self . addr } port: { self . port } visible_accelerator: { self . accelerators } trainers: { [ str ( t ) for t in self . trainers ] } servers: { [ str ( s ) for s in self . servers ] } \
workers: { [ str ( w ) for w in self . workers ] } heter_workers: { [ str ( h ) for h in self . heter_workers ] } coordinators: { [ str ( c ) for c in self . coordinators ] } "
2020-08-05 10:49:43 +08:00
def __eq__ ( self , pod ) :
2022-10-23 20:01:27 +08:00
if (
self . rank != pod . rank
or self . id != pod . id
or self . addr != pod . addr
or self . port != pod . port
) :
2023-03-31 10:11:56 +08:00
logger . debug ( f " pod { self } != { pod } " )
2020-08-05 10:49:43 +08:00
return False
if len ( self . trainers ) != len ( pod . trainers ) :
2023-03-31 10:11:56 +08:00
logger . debug ( f " trainers { self . trainers } != { pod . trainers } " )
2020-08-05 10:49:43 +08:00
return False
for i in range ( len ( self . trainers ) ) :
if self . trainers [ i ] != pod . trainers [ i ] :
2023-03-31 10:11:56 +08:00
logger . debug ( f " trainer { self . trainers [ i ] } != { pod . trainers [ i ] } " )
2020-08-05 10:49:43 +08:00
return False
2020-08-10 10:26:30 +08:00
if len ( self . servers ) != len ( pod . servers ) :
2023-03-31 10:11:56 +08:00
logger . debug ( f " servers { self . servers } != { pod . servers } " )
2020-08-10 10:26:30 +08:00
return False
for i in range ( len ( self . servers ) ) :
if self . servers [ i ] != pod . servers [ i ] :
2023-03-31 10:11:56 +08:00
logger . debug ( f " servers { self . servers [ i ] } != { pod . servers [ i ] } " )
2020-08-10 10:26:30 +08:00
return False
if len ( self . workers ) != len ( pod . workers ) :
2023-03-31 10:11:56 +08:00
logger . debug ( f " workers { self . workers } != { pod . workers } " )
2020-08-10 10:26:30 +08:00
return False
for i in range ( len ( self . workers ) ) :
if self . workers [ i ] != pod . workers [ i ] :
2023-03-31 10:11:56 +08:00
logger . debug ( f " workers { self . workers [ i ] } != { pod . workers [ i ] } " )
2020-08-10 10:26:30 +08:00
return False
2020-08-05 10:49:43 +08:00
return True
def __ne__ ( self , pod ) :
return not self == pod
def parse_response ( self , res_pods ) :
pass
def rank ( self ) :
return self . rank
【NPU】Merge ascend GE&distributed code by 0208 from ascendrc (#31957)
* Ascend rc (#30483)
* Fix compilcation on CANN20.1 and older (#30494)
Fix compilcation on CANN20.1 and older
* Add distribution supported (#30578)
Add distribution supported
* Build praser for Hcom* operators (#30627)
Build praser for Hcom* operators
* Pass device_ids info from launch to trainer. (#30632)
Pass device_ids info from launch to trainer
* Add Hccl program group (#30642)
Add Hccl program group
* Add startup bash files of test_ascend_group. (#30645)
Add startup bash files of test_ascend_group
* cleanup (#30646)
cleanup test_ascend_group.py
* [Feature] Build parser to support distributed training (#30658)
[Feature] Build parser to support distributed training
* fix compilation on ascend-20.1 (#30722)
fix compilation on ascend-20.1
* Dev/fix ascend string (#30749)
Dev/fix ascend string
* code style (#30781)
code style
* Merge ascend_optimizer and ascend_parser. (#30776)
Merge ascend_optimizer and ascend_parser.
* Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug (#30797)
Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug
* Add paddle ascend distribution training supported (#30796)
Add paddle ascend distribution training supported
* pass cxx_flags to gloo cmake (#30857)
* Destroy session first. (#30954)
Destroy session first.
* merge
* fix, test=develop
* fix, test=develop
* fix style, test=develop
* fix, test=develop
* fix
* fix log fatal, test=develop
* fix enforce style, test=develop
* fix, test=develop
* fix, test=develop
* fix rccl, test=develop
* fix test, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix node_num, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
Co-authored-by: hutuxian <hutuxian2011@sina.cn>
Co-authored-by: gongweibao <weibao.gong@gmail.com>
Co-authored-by: Void Main <voidmain1313113@gmail.com>
Co-authored-by: Leo Chen <chenqiuliang@baidu.com>
Co-authored-by: dingsiyu <18369187719@163.com>
Co-authored-by: OleNet <olenet@126.com>
2021-04-07 19:06:26 +08:00
def get_visible_accelerators ( self ) :
2020-08-05 10:49:43 +08:00
r = " "
【NPU】Merge ascend GE&distributed code by 0208 from ascendrc (#31957)
* Ascend rc (#30483)
* Fix compilcation on CANN20.1 and older (#30494)
Fix compilcation on CANN20.1 and older
* Add distribution supported (#30578)
Add distribution supported
* Build praser for Hcom* operators (#30627)
Build praser for Hcom* operators
* Pass device_ids info from launch to trainer. (#30632)
Pass device_ids info from launch to trainer
* Add Hccl program group (#30642)
Add Hccl program group
* Add startup bash files of test_ascend_group. (#30645)
Add startup bash files of test_ascend_group
* cleanup (#30646)
cleanup test_ascend_group.py
* [Feature] Build parser to support distributed training (#30658)
[Feature] Build parser to support distributed training
* fix compilation on ascend-20.1 (#30722)
fix compilation on ascend-20.1
* Dev/fix ascend string (#30749)
Dev/fix ascend string
* code style (#30781)
code style
* Merge ascend_optimizer and ascend_parser. (#30776)
Merge ascend_optimizer and ascend_parser.
* Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug (#30797)
Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug
* Add paddle ascend distribution training supported (#30796)
Add paddle ascend distribution training supported
* pass cxx_flags to gloo cmake (#30857)
* Destroy session first. (#30954)
Destroy session first.
* merge
* fix, test=develop
* fix, test=develop
* fix style, test=develop
* fix, test=develop
* fix
* fix log fatal, test=develop
* fix enforce style, test=develop
* fix, test=develop
* fix, test=develop
* fix rccl, test=develop
* fix test, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix node_num, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
Co-authored-by: hutuxian <hutuxian2011@sina.cn>
Co-authored-by: gongweibao <weibao.gong@gmail.com>
Co-authored-by: Void Main <voidmain1313113@gmail.com>
Co-authored-by: Leo Chen <chenqiuliang@baidu.com>
Co-authored-by: dingsiyu <18369187719@163.com>
Co-authored-by: OleNet <olenet@126.com>
2021-04-07 19:06:26 +08:00
for g in self . accelerators :
2023-03-31 10:11:56 +08:00
r + = f " { g } , "
2020-08-05 10:49:43 +08:00
2023-03-31 10:11:56 +08:00
assert r != " " , f " this pod { self } can ' t see any accelerators "
2020-08-05 10:49:43 +08:00
r = r [ : - 1 ]
return r
def get_logger ( log_level = 20 , name = " root " ) :
logger = logging . getLogger ( name )
logger . setLevel ( log_level )
log_handler = logging . StreamHandler ( )
log_format = logging . Formatter (
2022-10-23 20:01:27 +08:00
' %(levelname)s %(asctime)s %(filename)s : %(lineno)d ] %(message)s '
)
2020-08-05 10:49:43 +08:00
log_handler . setFormatter ( log_format )
logger . addHandler ( log_handler )
return logger
2022-10-23 20:01:27 +08:00
def get_cluster (
node_ips , node_ip , trainer_endpoints , device_mode , devices_per_proc
) :
2020-09-16 10:37:26 +08:00
assert type ( trainer_endpoints ) is list , " trainer_endpoints must be list "
2020-08-05 10:49:43 +08:00
cluster = Cluster ( hdfs = None )
trainer_rank = 0
for node_rank , ip in enumerate ( node_ips ) :
pod = Pod ( )
pod . rank = node_rank
pod . addr = ip
【NPU】Merge ascend GE&distributed code by 0208 from ascendrc (#31957)
* Ascend rc (#30483)
* Fix compilcation on CANN20.1 and older (#30494)
Fix compilcation on CANN20.1 and older
* Add distribution supported (#30578)
Add distribution supported
* Build praser for Hcom* operators (#30627)
Build praser for Hcom* operators
* Pass device_ids info from launch to trainer. (#30632)
Pass device_ids info from launch to trainer
* Add Hccl program group (#30642)
Add Hccl program group
* Add startup bash files of test_ascend_group. (#30645)
Add startup bash files of test_ascend_group
* cleanup (#30646)
cleanup test_ascend_group.py
* [Feature] Build parser to support distributed training (#30658)
[Feature] Build parser to support distributed training
* fix compilation on ascend-20.1 (#30722)
fix compilation on ascend-20.1
* Dev/fix ascend string (#30749)
Dev/fix ascend string
* code style (#30781)
code style
* Merge ascend_optimizer and ascend_parser. (#30776)
Merge ascend_optimizer and ascend_parser.
* Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug (#30797)
Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug
* Add paddle ascend distribution training supported (#30796)
Add paddle ascend distribution training supported
* pass cxx_flags to gloo cmake (#30857)
* Destroy session first. (#30954)
Destroy session first.
* merge
* fix, test=develop
* fix, test=develop
* fix style, test=develop
* fix, test=develop
* fix
* fix log fatal, test=develop
* fix enforce style, test=develop
* fix, test=develop
* fix, test=develop
* fix rccl, test=develop
* fix test, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix node_num, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
Co-authored-by: hutuxian <hutuxian2011@sina.cn>
Co-authored-by: gongweibao <weibao.gong@gmail.com>
Co-authored-by: Void Main <voidmain1313113@gmail.com>
Co-authored-by: Leo Chen <chenqiuliang@baidu.com>
Co-authored-by: dingsiyu <18369187719@163.com>
Co-authored-by: OleNet <olenet@126.com>
2021-04-07 19:06:26 +08:00
pod . device_mode = device_mode
2020-09-16 10:37:26 +08:00
cur_node_endpoints = trainer_endpoints [ node_rank ]
2020-11-26 10:35:26 +08:00
# when use paddlecloud, endpoints may > devices_per_proc(user_defined)
2025-08-21 02:03:08 +08:00
assert len ( cur_node_endpoints ) > = len ( devices_per_proc ) , (
" current trainer_endpoints size should be greater equal than accelerators size. "
)
2020-11-26 10:35:26 +08:00
for i in range ( len ( devices_per_proc ) ) :
2020-08-05 10:49:43 +08:00
trainer = Trainer ( )
2023-04-06 11:35:33 +08:00
if device_mode == DeviceMode . GPU :
2020-11-26 10:35:26 +08:00
if isinstance ( devices_per_proc [ i ] , ( list , tuple ) ) :
【NPU】Merge ascend GE&distributed code by 0208 from ascendrc (#31957)
* Ascend rc (#30483)
* Fix compilcation on CANN20.1 and older (#30494)
Fix compilcation on CANN20.1 and older
* Add distribution supported (#30578)
Add distribution supported
* Build praser for Hcom* operators (#30627)
Build praser for Hcom* operators
* Pass device_ids info from launch to trainer. (#30632)
Pass device_ids info from launch to trainer
* Add Hccl program group (#30642)
Add Hccl program group
* Add startup bash files of test_ascend_group. (#30645)
Add startup bash files of test_ascend_group
* cleanup (#30646)
cleanup test_ascend_group.py
* [Feature] Build parser to support distributed training (#30658)
[Feature] Build parser to support distributed training
* fix compilation on ascend-20.1 (#30722)
fix compilation on ascend-20.1
* Dev/fix ascend string (#30749)
Dev/fix ascend string
* code style (#30781)
code style
* Merge ascend_optimizer and ascend_parser. (#30776)
Merge ascend_optimizer and ascend_parser.
* Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug (#30797)
Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug
* Add paddle ascend distribution training supported (#30796)
Add paddle ascend distribution training supported
* pass cxx_flags to gloo cmake (#30857)
* Destroy session first. (#30954)
Destroy session first.
* merge
* fix, test=develop
* fix, test=develop
* fix style, test=develop
* fix, test=develop
* fix
* fix log fatal, test=develop
* fix enforce style, test=develop
* fix, test=develop
* fix, test=develop
* fix rccl, test=develop
* fix test, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix node_num, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
Co-authored-by: hutuxian <hutuxian2011@sina.cn>
Co-authored-by: gongweibao <weibao.gong@gmail.com>
Co-authored-by: Void Main <voidmain1313113@gmail.com>
Co-authored-by: Leo Chen <chenqiuliang@baidu.com>
Co-authored-by: dingsiyu <18369187719@163.com>
Co-authored-by: OleNet <olenet@126.com>
2021-04-07 19:06:26 +08:00
trainer . accelerators . extend ( devices_per_proc [ i ] )
pod . accelerators . extend ( devices_per_proc [ i ] )
2020-11-26 10:35:26 +08:00
else :
【NPU】Merge ascend GE&distributed code by 0208 from ascendrc (#31957)
* Ascend rc (#30483)
* Fix compilcation on CANN20.1 and older (#30494)
Fix compilcation on CANN20.1 and older
* Add distribution supported (#30578)
Add distribution supported
* Build praser for Hcom* operators (#30627)
Build praser for Hcom* operators
* Pass device_ids info from launch to trainer. (#30632)
Pass device_ids info from launch to trainer
* Add Hccl program group (#30642)
Add Hccl program group
* Add startup bash files of test_ascend_group. (#30645)
Add startup bash files of test_ascend_group
* cleanup (#30646)
cleanup test_ascend_group.py
* [Feature] Build parser to support distributed training (#30658)
[Feature] Build parser to support distributed training
* fix compilation on ascend-20.1 (#30722)
fix compilation on ascend-20.1
* Dev/fix ascend string (#30749)
Dev/fix ascend string
* code style (#30781)
code style
* Merge ascend_optimizer and ascend_parser. (#30776)
Merge ascend_optimizer and ascend_parser.
* Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug (#30797)
Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug
* Add paddle ascend distribution training supported (#30796)
Add paddle ascend distribution training supported
* pass cxx_flags to gloo cmake (#30857)
* Destroy session first. (#30954)
Destroy session first.
* merge
* fix, test=develop
* fix, test=develop
* fix style, test=develop
* fix, test=develop
* fix
* fix log fatal, test=develop
* fix enforce style, test=develop
* fix, test=develop
* fix, test=develop
* fix rccl, test=develop
* fix test, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix node_num, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
Co-authored-by: hutuxian <hutuxian2011@sina.cn>
Co-authored-by: gongweibao <weibao.gong@gmail.com>
Co-authored-by: Void Main <voidmain1313113@gmail.com>
Co-authored-by: Leo Chen <chenqiuliang@baidu.com>
Co-authored-by: dingsiyu <18369187719@163.com>
Co-authored-by: OleNet <olenet@126.com>
2021-04-07 19:06:26 +08:00
trainer . accelerators . append ( devices_per_proc [ i ] )
pod . accelerators . append ( devices_per_proc [ i ] )
2021-02-03 10:45:47 +08:00
elif device_mode == DeviceMode . XPU :
if isinstance ( devices_per_proc [ i ] , ( list , tuple ) ) :
【NPU】Merge ascend GE&distributed code by 0208 from ascendrc (#31957)
* Ascend rc (#30483)
* Fix compilcation on CANN20.1 and older (#30494)
Fix compilcation on CANN20.1 and older
* Add distribution supported (#30578)
Add distribution supported
* Build praser for Hcom* operators (#30627)
Build praser for Hcom* operators
* Pass device_ids info from launch to trainer. (#30632)
Pass device_ids info from launch to trainer
* Add Hccl program group (#30642)
Add Hccl program group
* Add startup bash files of test_ascend_group. (#30645)
Add startup bash files of test_ascend_group
* cleanup (#30646)
cleanup test_ascend_group.py
* [Feature] Build parser to support distributed training (#30658)
[Feature] Build parser to support distributed training
* fix compilation on ascend-20.1 (#30722)
fix compilation on ascend-20.1
* Dev/fix ascend string (#30749)
Dev/fix ascend string
* code style (#30781)
code style
* Merge ascend_optimizer and ascend_parser. (#30776)
Merge ascend_optimizer and ascend_parser.
* Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug (#30797)
Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug
* Add paddle ascend distribution training supported (#30796)
Add paddle ascend distribution training supported
* pass cxx_flags to gloo cmake (#30857)
* Destroy session first. (#30954)
Destroy session first.
* merge
* fix, test=develop
* fix, test=develop
* fix style, test=develop
* fix, test=develop
* fix
* fix log fatal, test=develop
* fix enforce style, test=develop
* fix, test=develop
* fix, test=develop
* fix rccl, test=develop
* fix test, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix node_num, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
Co-authored-by: hutuxian <hutuxian2011@sina.cn>
Co-authored-by: gongweibao <weibao.gong@gmail.com>
Co-authored-by: Void Main <voidmain1313113@gmail.com>
Co-authored-by: Leo Chen <chenqiuliang@baidu.com>
Co-authored-by: dingsiyu <18369187719@163.com>
Co-authored-by: OleNet <olenet@126.com>
2021-04-07 19:06:26 +08:00
trainer . accelerators . extend ( devices_per_proc [ i ] )
2021-02-03 10:45:47 +08:00
else :
【NPU】Merge ascend GE&distributed code by 0208 from ascendrc (#31957)
* Ascend rc (#30483)
* Fix compilcation on CANN20.1 and older (#30494)
Fix compilcation on CANN20.1 and older
* Add distribution supported (#30578)
Add distribution supported
* Build praser for Hcom* operators (#30627)
Build praser for Hcom* operators
* Pass device_ids info from launch to trainer. (#30632)
Pass device_ids info from launch to trainer
* Add Hccl program group (#30642)
Add Hccl program group
* Add startup bash files of test_ascend_group. (#30645)
Add startup bash files of test_ascend_group
* cleanup (#30646)
cleanup test_ascend_group.py
* [Feature] Build parser to support distributed training (#30658)
[Feature] Build parser to support distributed training
* fix compilation on ascend-20.1 (#30722)
fix compilation on ascend-20.1
* Dev/fix ascend string (#30749)
Dev/fix ascend string
* code style (#30781)
code style
* Merge ascend_optimizer and ascend_parser. (#30776)
Merge ascend_optimizer and ascend_parser.
* Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug (#30797)
Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug
* Add paddle ascend distribution training supported (#30796)
Add paddle ascend distribution training supported
* pass cxx_flags to gloo cmake (#30857)
* Destroy session first. (#30954)
Destroy session first.
* merge
* fix, test=develop
* fix, test=develop
* fix style, test=develop
* fix, test=develop
* fix
* fix log fatal, test=develop
* fix enforce style, test=develop
* fix, test=develop
* fix, test=develop
* fix rccl, test=develop
* fix test, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix node_num, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
Co-authored-by: hutuxian <hutuxian2011@sina.cn>
Co-authored-by: gongweibao <weibao.gong@gmail.com>
Co-authored-by: Void Main <voidmain1313113@gmail.com>
Co-authored-by: Leo Chen <chenqiuliang@baidu.com>
Co-authored-by: dingsiyu <18369187719@163.com>
Co-authored-by: OleNet <olenet@126.com>
2021-04-07 19:06:26 +08:00
trainer . accelerators . append ( devices_per_proc [ i ] )
2024-06-30 06:27:11 +08:00
trainer . endpoint = f " { cur_node_endpoints [ i ] } "
2020-08-05 10:49:43 +08:00
trainer . rank = trainer_rank
trainer_rank + = 1
pod . trainers . append ( trainer )
cluster . pods . append ( pod )
pod_rank = node_ips . index ( node_ip )
return cluster , cluster . pods [ pod_rank ]
def terminate_local_procs ( procs ) :
2024-04-02 11:42:28 +08:00
# try to terminate process by group, this happened in multiprocess scenario in user process
2021-08-10 10:57:23 +08:00
if os . name != ' nt ' :
for p in procs :
if p . proc . poll ( ) is None :
os . killpg ( os . getpgid ( p . proc . pid ) , signal . SIGTERM )
if p . log_fn :
p . log_fn . close ( )
2023-03-31 10:11:56 +08:00
logger . info ( f " terminate process group gid: { p . proc . pid } " )
2021-08-10 10:57:23 +08:00
time . sleep ( 1 )
2020-08-05 10:49:43 +08:00
for p in procs :
if p . proc . poll ( ) is None :
p . proc . terminate ( )
2020-09-03 23:50:13 +08:00
if p . log_fn :
p . log_fn . close ( )
2023-03-31 10:11:56 +08:00
logger . debug ( f " terminate process id: { p . proc . pid } " )
2020-08-05 10:49:43 +08:00
2024-02-23 14:29:40 +08:00
# wait all process terminated
2020-08-05 10:49:43 +08:00
time . sleep ( 3 )
for step in range ( 0 , 50 ) :
alive = False
for p in procs :
2024-02-23 14:29:40 +08:00
if p . proc . poll ( ) is None : # not terminate
2020-08-05 10:49:43 +08:00
os . kill ( p . proc . pid , signal . SIGKILL )
alive = True
if not alive :
logger . info ( " terminate all the procs " )
return
time . sleep ( 3 )
logger . fatal ( " can ' t kill all process and exit " )
2023-03-29 10:25:21 +08:00
sys . exit ( 1 )
2020-08-05 10:49:43 +08:00
def get_host_name_ip ( ) :
try :
host_name = socket . gethostname ( )
host_ip = socket . gethostbyname ( host_name )
return host_name , host_ip
except :
return None
def add_arguments ( argname , type , default , help , argparser , * * kwargs ) :
""" Add argparse ' s argument.
2023-09-08 10:31:20 +08:00
Examples:
2026-02-20 00:31:46 +08:00
.. code-block:: pycon
2023-09-08 10:31:20 +08:00
>>> import argparse
>>> from paddle.distributed.fleet.launch_utils import add_arguments
>>> parser = argparse.ArgumentParser()
>>> add_arguments( " name " , str, " Jonh " , " User name. " , parser)
>>> args = parser.parse_args()
2020-08-05 10:49:43 +08:00
"""
2021-06-24 14:37:12 +08:00
type = strtobool if type == bool else type
2022-10-23 20:01:27 +08:00
argparser . add_argument (
" -- " + argname ,
default = default ,
type = type ,
help = help + ' Default: %(default)s . ' ,
2023-03-31 10:11:56 +08:00
* * kwargs ,
2022-10-23 20:01:27 +08:00
)
2020-08-05 10:49:43 +08:00
def find_free_ports ( num ) :
def __free_port ( ) :
with closing ( socket . socket ( socket . AF_INET , socket . SOCK_STREAM ) ) as s :
2021-05-14 16:05:21 +08:00
# Note(wangxi): Close the connection with a TCP RST instead
# of a TCP FIN, to avoid time_wait state.
2022-10-23 20:01:27 +08:00
s . setsockopt (
socket . SOL_SOCKET , socket . SO_LINGER , struct . pack ( ' ii ' , 1 , 0 )
)
2020-08-05 10:49:43 +08:00
s . bind ( ( ' ' , 0 ) )
return s . getsockname ( ) [ 1 ]
port_set = set ( )
step = 0
while True :
port = __free_port ( )
if port not in port_set :
port_set . add ( port )
if len ( port_set ) > = num :
return port_set
step + = 1
2021-05-14 16:05:21 +08:00
if step > 400 :
2020-08-05 10:49:43 +08:00
print (
2024-02-23 14:29:40 +08:00
" can ' t find available port and use the specified static port now! "
2020-08-05 10:49:43 +08:00
)
return None
return None
2020-08-10 10:26:30 +08:00
def get_ports ( num , offset ) :
if os . environ . get ( ' FLAGS_START_PORT ' ) is None :
ports = find_free_ports ( num )
if ports is not None :
ports = list ( ports )
else :
2021-11-22 15:01:30 +08:00
start_port = int ( os . environ . get ( ' FLAGS_START_PORT ' ) )
2020-08-10 10:26:30 +08:00
ports = range ( start_port + offset , start_port + offset + num , 1 )
return ports
2020-09-03 23:50:13 +08:00
def pretty_print_envs ( envs , header = None ) :
spacing = 2
max_k = 40
max_v = 45
for k , v in envs . items ( ) :
max_k = max ( max_k , len ( k ) )
2022-06-05 10:58:58 +08:00
h_format = " " + " | {{ :> {} s}} {} {{ :^ {} s}}| \n " . format (
2022-10-23 20:01:27 +08:00
max_k , " " * spacing , max_v
)
2023-03-31 10:11:56 +08:00
l_format = " " + f " | {{ :> { max_k } s }} {{ }} {{ :^ { max_v } s }} | \n "
2020-09-03 23:50:13 +08:00
length = max_k + max_v + spacing
2020-09-09 15:44:43 +08:00
border = " + " + " " . join ( [ " = " ] * length ) + " + "
line = " + " + " " . join ( [ " - " ] * length ) + " + "
2020-09-03 23:50:13 +08:00
draws = " "
draws + = border + " \n "
if header :
draws + = h_format . format ( header [ 0 ] , header [ 1 ] )
else :
draws + = h_format . format ( " fleetrun Distributed Envs " , " Value " )
draws + = line + " \n "
for k , v in envs . items ( ) :
if isinstance ( v , str ) and len ( v ) > = max_v :
str_v = " ... " + v [ - 41 : ]
else :
str_v = v
draws + = l_format . format ( k , " " * spacing , str ( str_v ) )
draws + = border
2023-03-31 10:11:56 +08:00
_str = f " \n { draws } \n "
2020-09-03 23:50:13 +08:00
return _str
2022-11-08 11:29:41 +08:00
class TrainerProc :
2020-08-05 10:49:43 +08:00
def __init__ ( self ) :
self . proc = None
self . log_fn = None
self . log_offset = None
self . rank = None
self . local_rank = None
self . cmd = None
2021-11-15 11:13:52 +08:00
_run_with_coverage = False
def run_with_coverage ( * args ) :
global _run_with_coverage
2023-03-31 10:11:56 +08:00
assert len ( args ) < = 1 , f " len(args) { len ( args ) } should <= 1 "
2021-11-15 11:13:52 +08:00
if len ( args ) == 1 :
assert isinstance ( args [ 0 ] , bool )
_run_with_coverage = args [ 0 ]
return _run_with_coverage
2022-10-23 20:01:27 +08:00
def start_local_trainers (
cluster , pod , training_script , training_script_args , log_dir = None , envs = None
) :
2020-09-18 22:32:28 +08:00
if envs is None :
current_env = copy . copy ( os . environ . copy ( ) )
else :
current_env = copy . copy ( envs )
2020-10-13 09:52:59 +08:00
# paddle broadcast ncclUniqueId use socket, and
# proxy maybe make trainers unreachable, so delete them.
# if we set them to "", grpc will log error message "bad uri"
# so just delete them.
2020-08-05 10:49:43 +08:00
current_env . pop ( " http_proxy " , None )
current_env . pop ( " https_proxy " , None )
【NPU】Merge ascend GE&distributed code by 0208 from ascendrc (#31957)
* Ascend rc (#30483)
* Fix compilcation on CANN20.1 and older (#30494)
Fix compilcation on CANN20.1 and older
* Add distribution supported (#30578)
Add distribution supported
* Build praser for Hcom* operators (#30627)
Build praser for Hcom* operators
* Pass device_ids info from launch to trainer. (#30632)
Pass device_ids info from launch to trainer
* Add Hccl program group (#30642)
Add Hccl program group
* Add startup bash files of test_ascend_group. (#30645)
Add startup bash files of test_ascend_group
* cleanup (#30646)
cleanup test_ascend_group.py
* [Feature] Build parser to support distributed training (#30658)
[Feature] Build parser to support distributed training
* fix compilation on ascend-20.1 (#30722)
fix compilation on ascend-20.1
* Dev/fix ascend string (#30749)
Dev/fix ascend string
* code style (#30781)
code style
* Merge ascend_optimizer and ascend_parser. (#30776)
Merge ascend_optimizer and ascend_parser.
* Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug (#30797)
Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug
* Add paddle ascend distribution training supported (#30796)
Add paddle ascend distribution training supported
* pass cxx_flags to gloo cmake (#30857)
* Destroy session first. (#30954)
Destroy session first.
* merge
* fix, test=develop
* fix, test=develop
* fix style, test=develop
* fix, test=develop
* fix
* fix log fatal, test=develop
* fix enforce style, test=develop
* fix, test=develop
* fix, test=develop
* fix rccl, test=develop
* fix test, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix node_num, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
Co-authored-by: hutuxian <hutuxian2011@sina.cn>
Co-authored-by: gongweibao <weibao.gong@gmail.com>
Co-authored-by: Void Main <voidmain1313113@gmail.com>
Co-authored-by: Leo Chen <chenqiuliang@baidu.com>
Co-authored-by: dingsiyu <18369187719@163.com>
Co-authored-by: OleNet <olenet@126.com>
2021-04-07 19:06:26 +08:00
ids = cluster . world_device_ids ( )
res = [ ' : ' . join ( ele ) for ele in ids ]
2020-08-05 10:49:43 +08:00
procs = [ ]
for idx , t in enumerate ( pod . trainers ) :
proc_env = {
2024-12-12 01:59:20 +08:00
" PADDLE_TRAINER_ID " : str ( t . rank ) ,
" PADDLE_CURRENT_ENDPOINT " : str ( t . endpoint ) ,
" PADDLE_TRAINERS_NUM " : str ( cluster . trainers_nranks ( ) ) ,
2022-10-23 20:01:27 +08:00
" PADDLE_TRAINER_ENDPOINTS " : " , " . join ( cluster . trainers_endpoints ( ) ) ,
" PADDLE_RANK_IN_NODE " : str ( idx ) ,
" PADDLE_LOCAL_DEVICE_IDS " : " , " . join (
[ str ( acc ) for acc in t . accelerators ]
) ,
" PADDLE_WORLD_DEVICE_IDS " : " , " . join ( res ) ,
2020-08-05 10:49:43 +08:00
}
[CodeStyle][Typos][E-[11-20]] Fix typos(`enfore`, `entrys`, `envirnment`, `environnement`, `epoches`, `EPOCHES`, `epslion`, `eqaul`, `Errorr`, `exmaple`, `expection`, `excption`) (#70447)
2024-12-26 01:28:54 +08:00
# The following three environment variables are used for auto mapping
2021-12-07 12:13:14 +08:00
if current_env . get ( " PADDLE_CLUSTER_TOPO_PATH " , None ) is not None :
proc_env [ " PADDLE_CLUSTER_TOPO_PATH " ] = current_env [
2022-10-23 20:01:27 +08:00
" PADDLE_CLUSTER_TOPO_PATH "
]
2021-12-07 12:13:14 +08:00
if current_env . get ( " PADDLE_RANK_MAPPING_PATH " , None ) is not None :
proc_env [ " PADDLE_RANK_MAPPING_PATH " ] = current_env [
2022-10-23 20:01:27 +08:00
" PADDLE_RANK_MAPPING_PATH "
]
2021-12-07 12:13:14 +08:00
if current_env . get ( " PADDLE_ENABLE_AUTO_MAPPING " , None ) is not None :
proc_env [ " PADDLE_ENABLE_AUTO_MAPPING " ] = current_env [
2022-10-23 20:01:27 +08:00
" PADDLE_ENABLE_AUTO_MAPPING "
]
2021-12-07 12:13:14 +08:00
【NPU】Merge ascend GE&distributed code by 0208 from ascendrc (#31957)
* Ascend rc (#30483)
* Fix compilcation on CANN20.1 and older (#30494)
Fix compilcation on CANN20.1 and older
* Add distribution supported (#30578)
Add distribution supported
* Build praser for Hcom* operators (#30627)
Build praser for Hcom* operators
* Pass device_ids info from launch to trainer. (#30632)
Pass device_ids info from launch to trainer
* Add Hccl program group (#30642)
Add Hccl program group
* Add startup bash files of test_ascend_group. (#30645)
Add startup bash files of test_ascend_group
* cleanup (#30646)
cleanup test_ascend_group.py
* [Feature] Build parser to support distributed training (#30658)
[Feature] Build parser to support distributed training
* fix compilation on ascend-20.1 (#30722)
fix compilation on ascend-20.1
* Dev/fix ascend string (#30749)
Dev/fix ascend string
* code style (#30781)
code style
* Merge ascend_optimizer and ascend_parser. (#30776)
Merge ascend_optimizer and ascend_parser.
* Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug (#30797)
Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug
* Add paddle ascend distribution training supported (#30796)
Add paddle ascend distribution training supported
* pass cxx_flags to gloo cmake (#30857)
* Destroy session first. (#30954)
Destroy session first.
* merge
* fix, test=develop
* fix, test=develop
* fix style, test=develop
* fix, test=develop
* fix
* fix log fatal, test=develop
* fix enforce style, test=develop
* fix, test=develop
* fix, test=develop
* fix rccl, test=develop
* fix test, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix node_num, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
Co-authored-by: hutuxian <hutuxian2011@sina.cn>
Co-authored-by: gongweibao <weibao.gong@gmail.com>
Co-authored-by: Void Main <voidmain1313113@gmail.com>
Co-authored-by: Leo Chen <chenqiuliang@baidu.com>
Co-authored-by: dingsiyu <18369187719@163.com>
Co-authored-by: OleNet <olenet@126.com>
2021-04-07 19:06:26 +08:00
if len ( t . accelerators ) > 0 and pod . device_mode == DeviceMode . GPU :
2024-06-30 06:27:11 +08:00
proc_env [ " FLAGS_selected_gpus " ] = " {} " . format (
" , " . join ( [ str ( g ) for g in t . accelerators ] )
2022-10-23 20:01:27 +08:00
)
【NPU】Merge ascend GE&distributed code by 0208 from ascendrc (#31957)
* Ascend rc (#30483)
* Fix compilcation on CANN20.1 and older (#30494)
Fix compilcation on CANN20.1 and older
* Add distribution supported (#30578)
Add distribution supported
* Build praser for Hcom* operators (#30627)
Build praser for Hcom* operators
* Pass device_ids info from launch to trainer. (#30632)
Pass device_ids info from launch to trainer
* Add Hccl program group (#30642)
Add Hccl program group
* Add startup bash files of test_ascend_group. (#30645)
Add startup bash files of test_ascend_group
* cleanup (#30646)
cleanup test_ascend_group.py
* [Feature] Build parser to support distributed training (#30658)
[Feature] Build parser to support distributed training
* fix compilation on ascend-20.1 (#30722)
fix compilation on ascend-20.1
* Dev/fix ascend string (#30749)
Dev/fix ascend string
* code style (#30781)
code style
* Merge ascend_optimizer and ascend_parser. (#30776)
Merge ascend_optimizer and ascend_parser.
* Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug (#30797)
Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug
* Add paddle ascend distribution training supported (#30796)
Add paddle ascend distribution training supported
* pass cxx_flags to gloo cmake (#30857)
* Destroy session first. (#30954)
Destroy session first.
* merge
* fix, test=develop
* fix, test=develop
* fix style, test=develop
* fix, test=develop
* fix
* fix log fatal, test=develop
* fix enforce style, test=develop
* fix, test=develop
* fix, test=develop
* fix rccl, test=develop
* fix test, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix node_num, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
Co-authored-by: hutuxian <hutuxian2011@sina.cn>
Co-authored-by: gongweibao <weibao.gong@gmail.com>
Co-authored-by: Void Main <voidmain1313113@gmail.com>
Co-authored-by: Leo Chen <chenqiuliang@baidu.com>
Co-authored-by: dingsiyu <18369187719@163.com>
Co-authored-by: OleNet <olenet@126.com>
2021-04-07 19:06:26 +08:00
if len ( t . accelerators ) > 0 :
2024-06-30 06:27:11 +08:00
proc_env [ " FLAGS_selected_accelerators " ] = " {} " . format (
" , " . join ( [ str ( g ) for g in t . accelerators ] )
2022-10-23 20:01:27 +08:00
)
【NPU】Merge ascend GE&distributed code by 0208 from ascendrc (#31957)
* Ascend rc (#30483)
* Fix compilcation on CANN20.1 and older (#30494)
Fix compilcation on CANN20.1 and older
* Add distribution supported (#30578)
Add distribution supported
* Build praser for Hcom* operators (#30627)
Build praser for Hcom* operators
* Pass device_ids info from launch to trainer. (#30632)
Pass device_ids info from launch to trainer
* Add Hccl program group (#30642)
Add Hccl program group
* Add startup bash files of test_ascend_group. (#30645)
Add startup bash files of test_ascend_group
* cleanup (#30646)
cleanup test_ascend_group.py
* [Feature] Build parser to support distributed training (#30658)
[Feature] Build parser to support distributed training
* fix compilation on ascend-20.1 (#30722)
fix compilation on ascend-20.1
* Dev/fix ascend string (#30749)
Dev/fix ascend string
* code style (#30781)
code style
* Merge ascend_optimizer and ascend_parser. (#30776)
Merge ascend_optimizer and ascend_parser.
* Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug (#30797)
Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug
* Add paddle ascend distribution training supported (#30796)
Add paddle ascend distribution training supported
* pass cxx_flags to gloo cmake (#30857)
* Destroy session first. (#30954)
Destroy session first.
* merge
* fix, test=develop
* fix, test=develop
* fix style, test=develop
* fix, test=develop
* fix
* fix log fatal, test=develop
* fix enforce style, test=develop
* fix, test=develop
* fix, test=develop
* fix rccl, test=develop
* fix test, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix node_num, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
Co-authored-by: hutuxian <hutuxian2011@sina.cn>
Co-authored-by: gongweibao <weibao.gong@gmail.com>
Co-authored-by: Void Main <voidmain1313113@gmail.com>
Co-authored-by: Leo Chen <chenqiuliang@baidu.com>
Co-authored-by: dingsiyu <18369187719@163.com>
Co-authored-by: OleNet <olenet@126.com>
2021-04-07 19:06:26 +08:00
# to do: same code style in future
2022-11-18 19:03:10 +08:00
if framework . core . is_compiled_with_xpu ( ) and len ( t . accelerators ) > 0 :
2024-06-30 06:27:11 +08:00
proc_env [ " FLAGS_selected_xpus " ] = " {} " . format (
" , " . join ( [ str ( g ) for g in t . accelerators ] )
2022-10-23 20:01:27 +08:00
)
2020-11-26 10:35:26 +08:00
2020-08-05 10:49:43 +08:00
current_env . update ( proc_env )
2021-11-15 11:13:52 +08:00
coverage_args = [ ]
2022-10-23 20:01:27 +08:00
if (
run_with_coverage ( )
or os . environ . get ( " WITH_COVERAGE " , " OFF " ) == " ON "
) :
2021-11-15 11:13:52 +08:00
coverage_args = [ " -m " , " coverage " , " run " , " --branch " , " -p " ]
2024-08-14 01:40:01 +08:00
cmd = [
sys . executable ,
" -u " ,
* coverage_args ,
training_script ,
* training_script_args ,
]
2020-08-05 10:49:43 +08:00
2023-03-31 10:11:56 +08:00
logger . debug ( f " start trainer proc { cmd } env: { current_env } " )
2020-09-03 23:50:13 +08:00
if idx == 0 :
2022-10-23 20:01:27 +08:00
logger . info (
" Local start {} processes. First process distributed "
" environment info (Only For Debug): {} " . format (
len ( pod . trainers ) ,
pretty_print_envs ( proc_env , ( " Distributed Envs " , " Value " ) ) ,
)
)
2020-09-22 15:23:10 +08:00
logger . info (
2025-11-14 11:46:51 +08:00
" Details about PADDLE_TRAINER_ENDPOINTS can be found in "
f " { log_dir } /endpoints.log, and detail running logs may be found in "
2023-09-22 10:14:38 +08:00
f " { log_dir } /workerlog.0 "
2022-10-23 20:01:27 +08:00
)
2020-08-05 10:49:43 +08:00
fn = None
2021-07-21 11:04:52 +08:00
pre_fn = None if os . name == ' nt ' else os . setsid
2020-08-05 10:49:43 +08:00
if log_dir is not None :
2023-04-07 10:27:27 +08:00
os . makedirs ( log_dir , exist_ok = True )
2024-06-30 06:27:11 +08:00
if os . path . exists ( f " { log_dir } /endpoints.log " ) :
2023-04-07 10:27:27 +08:00
os . remove ( f " { log_dir } /endpoints.log " )
2024-06-30 06:27:11 +08:00
with open ( f " { log_dir } /endpoints.log " , " w " ) as f :
2020-09-22 15:23:10 +08:00
f . write ( " PADDLE_TRAINER_ENDPOINTS: \n " )
f . write ( " \n " . join ( cluster . trainers_endpoints ( ) ) )
2022-10-23 20:01:27 +08:00
if (
current_env . get ( " PADDLE_ENABLE_AUTO_MAPPING " ) is not None
and current_env . get ( " PADDLE_NEED_RANK_MAPPING " ) . lower ( )
== " true "
) :
2024-12-12 01:59:20 +08:00
fn = open ( f " { log_dir } /prelaunchlog. { idx } " , " a " )
2021-12-07 12:13:14 +08:00
else :
2024-12-12 01:59:20 +08:00
fn = open ( f " { log_dir } /workerlog. { idx } " , " a " )
2022-10-23 20:01:27 +08:00
proc = subprocess . Popen (
cmd , env = current_env , stdout = fn , stderr = fn , preexec_fn = pre_fn
)
2020-08-05 10:49:43 +08:00
else :
2021-07-21 11:04:52 +08:00
proc = subprocess . Popen ( cmd , env = current_env , preexec_fn = pre_fn )
2020-08-05 10:49:43 +08:00
tp = TrainerProc ( )
tp . proc = proc
tp . rank = t . rank
tp . local_rank = idx
tp . log_fn = fn
tp . log_offset = fn . tell ( ) if fn else None
tp . cmd = cmd
procs . append ( tp )
return procs
def pull_worker_log ( tp ) :
if tp . log_fn :
with open ( tp . log_fn . name , ' r ' ) as fin :
fin . seek ( tp . log_offset , 0 )
for line in fin :
try :
sys . stdout . write ( line )
except UnicodeEncodeError :
sys . stdout . write (
' UnicodeEncodeError occurs at this line. '
2024-06-30 06:27:11 +08:00
f ' Please refer to the original log file " { tp . log_fn . name } " \n '
2022-10-23 20:01:27 +08:00
)
2020-08-05 10:49:43 +08:00
tp . log_offset = fin . tell ( )
def watch_local_trainers ( procs , nranks ) :
try :
error = False
error_rank = [ ]
# wait all process finish or one error
alive = False
for p in procs :
if p . log_fn and p . local_rank == 0 :
pull_worker_log ( p )
ret = p . proc . poll ( )
if ret is None :
alive = True
elif ret != 0 :
error = True
error_rank . append ( p . rank )
if error :
terminate_local_procs ( procs )
2023-03-29 10:25:21 +08:00
sys . exit ( 1 )
2020-08-05 10:49:43 +08:00
except KeyboardInterrupt :
logger . warning ( " KeyboardInterrupt, exit " )
terminate_local_procs ( procs )
2021-08-10 10:57:23 +08:00
return
2020-08-05 10:49:43 +08:00
except SystemExit :
logger . error (
2024-04-01 10:20:33 +08:00
f " ABORT!!! Out of all { nranks } trainers, the trainer process with rank= { error_rank } was aborted. Please check its log. "
2022-10-23 20:01:27 +08:00
)
2020-08-05 10:49:43 +08:00
terminate_local_procs ( procs )
2022-08-03 16:51:38 +08:00
raise
2020-08-05 10:49:43 +08:00
except :
logger . error (
2024-04-01 10:20:33 +08:00
f " ABORT!!! Out of all { nranks } trainers, the trainer process with rank= { error_rank } was aborted. Please check its log. "
2022-10-23 20:01:27 +08:00
)
2020-08-05 10:49:43 +08:00
terminate_local_procs ( procs )
2021-08-10 10:57:23 +08:00
return
2020-08-05 10:49:43 +08:00
return alive
2020-10-13 09:52:59 +08:00
def get_gpus ( gpus ) :
if gpus is None :
2022-11-18 19:03:10 +08:00
gpus_num = framework . core . get_cuda_device_count ( )
2020-10-13 09:52:59 +08:00
res_gpus = [ str ( x ) for x in range ( 0 , gpus_num ) ]
else :
cuda_visible_devices = os . getenv ( " CUDA_VISIBLE_DEVICES " )
if cuda_visible_devices is None or cuda_visible_devices == " " :
res_gpus = [ x . strip ( ) for x in gpus . split ( ' , ' ) ]
else :
# change gpus into relative values
# e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.gpus=4,5,6,7;
# therefore gpus=0,1,2,3
cuda_visible_devices_list = cuda_visible_devices . split ( ' , ' )
for x in gpus . split ( ' , ' ) :
2022-10-23 20:01:27 +08:00
assert x in cuda_visible_devices_list , (
" Can ' t find "
2023-09-22 10:14:38 +08:00
f " your gpus { x } in CUDA_VISIBLE_DEVICES[ { cuda_visible_devices } ]. "
2022-10-23 20:01:27 +08:00
)
2020-10-13 09:52:59 +08:00
res_gpus = [
cuda_visible_devices_list . index ( x . strip ( ) )
for x in gpus . split ( ' , ' )
]
2022-10-23 20:01:27 +08:00
logger . info (
2024-02-23 14:29:40 +08:00
f " Change selected_gpus into relative values. --ips: { gpus } "
2023-09-22 10:14:38 +08:00
f " will change into relative_ips: { res_gpus } according to your "
f " CUDA_VISIBLE_DEVICES: { cuda_visible_devices_list } "
2022-10-23 20:01:27 +08:00
)
2020-10-13 09:52:59 +08:00
return res_gpus
2021-02-03 10:45:47 +08:00
def get_xpus ( xpus ) :
if xpus is None :
2022-11-18 19:03:10 +08:00
xpus_num = framework . core . get_xpu_device_count ( )
2021-02-03 10:45:47 +08:00
res_xpus = [ str ( x ) for x in range ( 0 , xpus_num ) ]
else :
xpu_visible_devices = os . getenv ( " XPU_VISIBLE_DEVICES " )
if xpu_visible_devices is None or xpu_visible_devices == " " :
res_xpus = [ x . strip ( ) for x in xpus . split ( ' , ' ) ]
else :
# change xpus into relative values
# e.g. XPU_VISIBLE_DEVICES=4,5,6,7; args.xpus=4,5,6,7;
# therefore xpus=0,1,2,3
xpu_visible_devices_list = xpu_visible_devices . split ( ' , ' )
for x in xpus . split ( ' , ' ) :
2023-03-31 10:11:56 +08:00
assert x in xpu_visible_devices_list , (
" Can ' t find "
2023-09-22 10:14:38 +08:00
f " your xpus { x } in XPU_VISIBLE_DEVICES[ { xpu_visible_devices } ]. "
2022-10-23 20:01:27 +08:00
)
2021-02-03 10:45:47 +08:00
res_xpus = [
xpu_visible_devices_list . index ( x . strip ( ) )
for x in xpus . split ( ' , ' )
]
2022-10-23 20:01:27 +08:00
logger . info (
2024-02-23 14:29:40 +08:00
f " Change selected_xpus into relative values. --ips: { xpus } "
2023-09-22 10:14:38 +08:00
f " will change into relative_ips: { res_xpus } according to your "
f " XPU_VISIBLE_DEVICES: { xpu_visible_devices_list } "
2022-10-23 20:01:27 +08:00
)
2021-02-03 10:45:47 +08:00
return res_xpus
2021-10-21 14:07:13 +08:00
def get_device_mode ( backend ) :
2021-12-06 09:01:15 +08:00
if backend == ' heter ' :
2022-10-23 20:01:27 +08:00
if (
2022-11-18 19:03:10 +08:00
framework . core . is_compiled_with_cuda ( )
and framework . core . get_cuda_device_count ( ) > 0
2022-10-23 20:01:27 +08:00
) :
2021-12-06 09:01:15 +08:00
print ( " launch train in heter mode with GPU device. " )
return DeviceMode . GPU
2022-10-23 20:01:27 +08:00
if (
2022-11-18 19:03:10 +08:00
framework . core . is_compiled_with_xpu ( )
and framework . core . get_xpu_device_count ( ) > 0
2022-10-23 20:01:27 +08:00
) :
2021-12-06 09:01:15 +08:00
print ( " launch train in heter mode with XPU device. " )
return DeviceMode . XPU
【NPU】Merge ascend GE&distributed code by 0208 from ascendrc (#31957)
* Ascend rc (#30483)
* Fix compilcation on CANN20.1 and older (#30494)
Fix compilcation on CANN20.1 and older
* Add distribution supported (#30578)
Add distribution supported
* Build praser for Hcom* operators (#30627)
Build praser for Hcom* operators
* Pass device_ids info from launch to trainer. (#30632)
Pass device_ids info from launch to trainer
* Add Hccl program group (#30642)
Add Hccl program group
* Add startup bash files of test_ascend_group. (#30645)
Add startup bash files of test_ascend_group
* cleanup (#30646)
cleanup test_ascend_group.py
* [Feature] Build parser to support distributed training (#30658)
[Feature] Build parser to support distributed training
* fix compilation on ascend-20.1 (#30722)
fix compilation on ascend-20.1
* Dev/fix ascend string (#30749)
Dev/fix ascend string
* code style (#30781)
code style
* Merge ascend_optimizer and ascend_parser. (#30776)
Merge ascend_optimizer and ascend_parser.
* Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug (#30797)
Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug
* Add paddle ascend distribution training supported (#30796)
Add paddle ascend distribution training supported
* pass cxx_flags to gloo cmake (#30857)
* Destroy session first. (#30954)
Destroy session first.
* merge
* fix, test=develop
* fix, test=develop
* fix style, test=develop
* fix, test=develop
* fix
* fix log fatal, test=develop
* fix enforce style, test=develop
* fix, test=develop
* fix, test=develop
* fix rccl, test=develop
* fix test, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix node_num, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
Co-authored-by: hutuxian <hutuxian2011@sina.cn>
Co-authored-by: gongweibao <weibao.gong@gmail.com>
Co-authored-by: Void Main <voidmain1313113@gmail.com>
Co-authored-by: Leo Chen <chenqiuliang@baidu.com>
Co-authored-by: dingsiyu <18369187719@163.com>
Co-authored-by: OleNet <olenet@126.com>
2021-04-07 19:06:26 +08:00
2022-11-18 19:03:10 +08:00
if backend == ' nccl ' and framework . core . get_cuda_device_count ( ) > 0 :
【NPU】Merge ascend GE&distributed code by 0208 from ascendrc (#31957)
* Ascend rc (#30483)
* Fix compilcation on CANN20.1 and older (#30494)
Fix compilcation on CANN20.1 and older
* Add distribution supported (#30578)
Add distribution supported
* Build praser for Hcom* operators (#30627)
Build praser for Hcom* operators
* Pass device_ids info from launch to trainer. (#30632)
Pass device_ids info from launch to trainer
* Add Hccl program group (#30642)
Add Hccl program group
* Add startup bash files of test_ascend_group. (#30645)
Add startup bash files of test_ascend_group
* cleanup (#30646)
cleanup test_ascend_group.py
* [Feature] Build parser to support distributed training (#30658)
[Feature] Build parser to support distributed training
* fix compilation on ascend-20.1 (#30722)
fix compilation on ascend-20.1
* Dev/fix ascend string (#30749)
Dev/fix ascend string
* code style (#30781)
code style
* Merge ascend_optimizer and ascend_parser. (#30776)
Merge ascend_optimizer and ascend_parser.
* Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug (#30797)
Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug
* Add paddle ascend distribution training supported (#30796)
Add paddle ascend distribution training supported
* pass cxx_flags to gloo cmake (#30857)
* Destroy session first. (#30954)
Destroy session first.
* merge
* fix, test=develop
* fix, test=develop
* fix style, test=develop
* fix, test=develop
* fix
* fix log fatal, test=develop
* fix enforce style, test=develop
* fix, test=develop
* fix, test=develop
* fix rccl, test=develop
* fix test, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix node_num, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
Co-authored-by: hutuxian <hutuxian2011@sina.cn>
Co-authored-by: gongweibao <weibao.gong@gmail.com>
Co-authored-by: Void Main <voidmain1313113@gmail.com>
Co-authored-by: Leo Chen <chenqiuliang@baidu.com>
Co-authored-by: dingsiyu <18369187719@163.com>
Co-authored-by: OleNet <olenet@126.com>
2021-04-07 19:06:26 +08:00
print ( " launch train in GPU mode! " )
2021-02-03 10:45:47 +08:00
return DeviceMode . GPU
【NPU】Merge ascend GE&distributed code by 0208 from ascendrc (#31957)
* Ascend rc (#30483)
* Fix compilcation on CANN20.1 and older (#30494)
Fix compilcation on CANN20.1 and older
* Add distribution supported (#30578)
Add distribution supported
* Build praser for Hcom* operators (#30627)
Build praser for Hcom* operators
* Pass device_ids info from launch to trainer. (#30632)
Pass device_ids info from launch to trainer
* Add Hccl program group (#30642)
Add Hccl program group
* Add startup bash files of test_ascend_group. (#30645)
Add startup bash files of test_ascend_group
* cleanup (#30646)
cleanup test_ascend_group.py
* [Feature] Build parser to support distributed training (#30658)
[Feature] Build parser to support distributed training
* fix compilation on ascend-20.1 (#30722)
fix compilation on ascend-20.1
* Dev/fix ascend string (#30749)
Dev/fix ascend string
* code style (#30781)
code style
* Merge ascend_optimizer and ascend_parser. (#30776)
Merge ascend_optimizer and ascend_parser.
* Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug (#30797)
Ascendrc add converted op : [range/equal/range/uniform_random/expand/squeeze], fix cast op bug
* Add paddle ascend distribution training supported (#30796)
Add paddle ascend distribution training supported
* pass cxx_flags to gloo cmake (#30857)
* Destroy session first. (#30954)
Destroy session first.
* merge
* fix, test=develop
* fix, test=develop
* fix style, test=develop
* fix, test=develop
* fix
* fix log fatal, test=develop
* fix enforce style, test=develop
* fix, test=develop
* fix, test=develop
* fix rccl, test=develop
* fix test, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix node_num, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix ids str, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
* fix style code, test=develop
Co-authored-by: hutuxian <hutuxian2011@sina.cn>
Co-authored-by: gongweibao <weibao.gong@gmail.com>
Co-authored-by: Void Main <voidmain1313113@gmail.com>
Co-authored-by: Leo Chen <chenqiuliang@baidu.com>
Co-authored-by: dingsiyu <18369187719@163.com>
Co-authored-by: OleNet <olenet@126.com>
2021-04-07 19:06:26 +08:00
2022-11-18 19:03:10 +08:00
if backend == ' bkcl ' and framework . core . get_xpu_device_count ( ) > 0 :
2021-02-03 10:45:47 +08:00
print ( " launch train in XPU mode " )
return DeviceMode . XPU
2020-11-26 10:35:26 +08:00
2021-10-21 14:07:13 +08:00
if backend == ' gloo ' :
print ( " launch train in CPU mode " )
return DeviceMode . CPU
raise RuntimeError ( " Don ' t supported devices " )
2020-11-26 10:35:26 +08:00
def get_device_proc_info ( args ) :
# device_mode
2021-10-21 14:07:13 +08:00
device_mode = get_device_mode ( args . backend )
2020-11-26 10:35:26 +08:00
# devices
devices_per_proc = [ ]
if device_mode == DeviceMode . GPU :
gpus = get_gpus ( args . gpus )
if args . nproc_per_node is not None :
2025-08-21 02:03:08 +08:00
assert ( len ( gpus ) % int ( args . nproc_per_node ) ) == 0 , (
f " gpus ' number: { len ( gpus ) } mod args.nproc_per_node: { args . nproc_per_node } must == 0 "
)
2020-11-26 10:35:26 +08:00
n = int ( len ( gpus ) / int ( args . nproc_per_node ) )
2022-10-23 20:01:27 +08:00
devices_per_proc = [ gpus [ i : i + n ] for i in range ( 0 , len ( gpus ) , n ) ]
2020-11-26 10:35:26 +08:00
else :
devices_per_proc = gpus
2021-02-03 10:45:47 +08:00
elif device_mode == DeviceMode . XPU :
xpus = get_xpus ( args . xpus )
if args . nproc_per_node is not None :
2025-08-21 02:03:08 +08:00
assert ( len ( xpus ) % int ( args . nproc_per_node ) ) == 0 , (
f " xpus ' number: { len ( xpus ) } mod args.nproc_per_node: { args . nproc_per_node } must == 0 "
)
2021-02-03 10:45:47 +08:00
n = int ( len ( xpus ) / int ( args . nproc_per_node ) )
2022-10-23 20:01:27 +08:00
devices_per_proc = [ xpus [ i : i + n ] for i in range ( 0 , len ( xpus ) , n ) ]
2021-02-03 10:45:47 +08:00
else :
devices_per_proc = xpus
2020-11-26 10:35:26 +08:00
elif device_mode == DeviceMode . CPU :
2021-10-21 14:07:13 +08:00
if hasattr ( args , " paddle_cpuonly " ) and args . nproc_per_node is None :
2022-10-23 20:01:27 +08:00
# NOTE (xiongkun03) set it to cpu core number
2021-10-21 14:07:13 +08:00
args . nproc_per_node = multiprocessing . cpu_count ( )
2020-11-26 10:35:26 +08:00
if args . nproc_per_node is None :
devices_per_proc = [ 0 ]
else :
2023-03-30 10:17:11 +08:00
devices_per_proc = list ( range ( 0 , args . nproc_per_node ) )
2020-11-26 10:35:26 +08:00
else :
2023-03-22 10:13:01 +08:00
raise AssertionError (
2023-09-22 10:14:38 +08:00
f " Can ' t support device_mode: { device_mode } , support only cpu|gpu|xpu now. "
2022-10-23 20:01:27 +08:00
)
2020-11-26 10:35:26 +08:00
return ( device_mode , devices_per_proc )
2020-10-13 09:52:59 +08:00
def direct_start ( args ) :
# run ps-cpu mode on paddlecloud, using given envs
2022-10-23 20:01:27 +08:00
cmd = [
sys . executable ,
" -u " ,
args . training_script ,
2024-08-14 01:40:01 +08:00
* args . training_script_args ,
]
2020-10-13 09:52:59 +08:00
proc = subprocess . Popen ( cmd )
proc . wait ( )
def get_custom_endpoints ( origin_endpoints , offset = 0 ) :
"""
origin_endpoint: ip:port
user_define_endpoint: ip:(port+offset)
"""
2022-11-01 22:14:52 +08:00
assert origin_endpoints is not None
2020-10-13 09:52:59 +08:00
paddle_user_define_endpoints_list = [ ]
for ip_port in origin_endpoints . split ( " , " ) :
ip = ip_port . split ( " : " ) [ 0 ]
port = ip_port . split ( " : " ) [ 1 ]
new_port = int ( port ) + offset
paddle_user_define_endpoints_list . append ( " : " . join ( ( ip , str ( new_port ) ) ) )
paddle_user_define_endpoints = " , " . join ( paddle_user_define_endpoints_list )
return paddle_user_define_endpoints
2022-10-23 20:01:27 +08:00
# def cloud_ps_heter_env_set(args):
2021-11-15 09:31:04 +08:00
# environs = {}
#
# paddle_trainer_endpoints = os.getenv("TRAINER_IP_PORT_LIST", "")
# assert paddle_trainer_endpoints != None
#
# paddle_pserver_endpoints = os.getenv("PSERVER_IP_PORT_LIST", "")
# assert paddle_pserver_endpoints != None
#
# # hard code for paddlecloud custom-framework
2024-02-23 14:29:40 +08:00
# available_ports = os.getenv("TRAINER_PORTS", "").split(",")
2021-11-15 09:31:04 +08:00
# assert len(
2024-02-23 14:29:40 +08:00
# available_ports
2021-11-15 09:31:04 +08:00
# ) >= 2, "set paddle_ports_num >= 2 in config.ini for paddlecloud job submit"
#
# # hard code for paddlecloud custom-framework
# trainers_num = len(paddle_pserver_endpoints.split(","))
# assert trainers_num != 0
# environs["PADDLE_TRAINERS_NUM"] = trainers_num
# environs["TRAINERS_NUM"] = trainers_num
#
# # hard code for paddlecloud custom-framework
# environs["PADDLE_HETER_TRAINER_IP_PORT_LIST"] = paddle_trainer_endpoints
# environs["PADDLE_PSERVERS_IP_PORT_LIST"] = paddle_pserver_endpoints
# environs["PADDLE_TRAINER_ENDPOINTS"] = get_custom_endpoints(
# paddle_pserver_endpoints, 1)
# heter_worker_num = len(paddle_trainer_endpoints.split(","))
# if (args.heter_worker_num != None) and (
# heter_worker_num != args.heter_worker_num):
# warnings.warn(
# "Your fleetrun setting: heter_worker_num is {}, but we find {} device can be used, this setting has been changed.".
# format(args.heter_worker_num, heter_worker_num))
# args.heter_worker_num = heter_worker_num
#
# for k, v in environs.items():
# os.environ[k] = str(v)
# logger.info("Set heter parameter server env: {}".format(
# pretty_print_envs(environs)))
2020-10-13 09:52:59 +08:00
2022-10-23 20:01:27 +08:00
def get_mapped_cluster_without_rank_mapping (
node_ips , node_ip , trainer_endpoints , device_mode , node_ranks
) :
2021-10-28 11:03:28 +08:00
assert type ( trainer_endpoints ) is list , " trainer_endpoints must be list "
2025-08-21 02:03:08 +08:00
assert device_mode == DeviceMode . GPU , (
" Only support get mapped cluster for gpu now. "
)
2021-10-28 11:03:28 +08:00
cluster = Cluster ( hdfs = None )
for node_rank , ip in enumerate ( node_ips ) :
pod = Pod ( )
pod . rank = node_rank
pod . addr = ip
pod . device_mode = device_mode
cur_node_endpoints = trainer_endpoints [ node_rank ]
# choose rank from global mapped ranks and set it to the trainer.
2021-12-07 12:13:14 +08:00
ranks_per_node = node_ranks [ node_rank ]
assert len ( ranks_per_node ) == 1
2021-10-28 11:03:28 +08:00
for i in range ( len ( ranks_per_node ) ) :
trainer = Trainer ( )
2024-06-30 06:27:11 +08:00
trainer . endpoint = f " { cur_node_endpoints [ i ] } "
2021-10-28 11:03:28 +08:00
trainer . rank = ranks_per_node [ i ]
2021-12-07 12:13:14 +08:00
pod . trainers . append ( trainer )
cluster . pods . append ( pod )
pod_rank = node_ips . index ( node_ip )
return cluster , cluster . pods [ pod_rank ]
def get_mapped_cluster_from_args_without_rank_mapping ( args , device_mode ) :
2025-08-21 02:03:08 +08:00
assert device_mode == DeviceMode . GPU , (
" Only support get mapped cluster for gpu now. "
)
2022-11-18 19:03:10 +08:00
gpus_num = framework . core . get_cuda_device_count ( )
2021-12-07 12:13:14 +08:00
# parse ip-ranks json file
cluster_topo = None
with open ( args . cluster_topo_path , " r " ) as json_file :
cluster_topo = json . load ( json_file )
node_ips = [ ]
node_ranks = [ ]
for idx , cur_cluster_topo in enumerate ( cluster_topo [ " machines " ] ) :
node_ips . append ( cur_cluster_topo [ ' addr ' ] )
node_ranks . append ( [ idx ] )
if len ( node_ips ) == 1 :
node_ip = node_ips [ 0 ]
else :
if args . host :
node_ip = args . host
else :
_ , node_ip = get_host_name_ip ( )
2025-08-21 02:03:08 +08:00
assert node_ip in node_ips , (
f " Can ' t find your local ip {{ { node_ip } }} in node_ips: {{ { node_ips } }} "
)
2021-12-07 12:13:14 +08:00
node_rank = node_ips . index ( node_ip )
2025-08-21 02:03:08 +08:00
assert len ( node_ranks ) == len ( node_ips ) , (
" ranks length should be equal to ips length. "
)
2021-12-07 12:13:14 +08:00
2022-10-23 20:01:27 +08:00
logger . debug (
2023-09-22 10:14:38 +08:00
f " parsed from args: node_ips: { node_ips } node_ip: { node_ip } "
f " node_rank: { node_rank } node_ranks: { node_ranks [ node_rank ] } "
2022-10-23 20:01:27 +08:00
)
2021-12-07 12:13:14 +08:00
# NOTE: there are different number of global mapped ranks on each node.
free_ports = [ ]
trainer_endpoints = [ ]
for ip in node_ips :
node_rank = node_ips . index ( ip )
if os . environ . get ( ' PADDLE_PORT ' ) is not None :
start_port = int ( os . getenv ( " PADDLE_PORT " , " " ) )
2023-03-30 10:17:11 +08:00
free_ports = list (
range ( start_port , start_port + len ( node_ranks [ node_rank ] ) )
)
2021-12-07 12:13:14 +08:00
elif os . environ . get ( ' FLAGS_START_PORT ' ) is not None :
start_port = int ( os . environ . get ( ' FLAGS_START_PORT ' ) )
2023-03-30 10:17:11 +08:00
free_ports = list (
range ( start_port , start_port + len ( node_ranks [ node_rank ] ) )
)
2021-12-07 12:13:14 +08:00
else :
free_ports = find_free_ports ( len ( node_ranks [ node_rank ] ) )
2024-12-12 01:59:20 +08:00
trainer_endpoints . append ( [ f " { ip } : { port } " for port in free_ports ] )
2021-12-07 12:13:14 +08:00
2022-10-23 20:01:27 +08:00
return get_mapped_cluster_without_rank_mapping (
node_ips , node_ip , trainer_endpoints , device_mode , node_ranks
)
2021-12-07 12:13:14 +08:00
2022-10-23 20:01:27 +08:00
def get_mapped_cluster_with_rank_mapping (
node_ips ,
node_ip ,
trainer_endpoints ,
device_mode ,
node_ranks ,
node_rank_mappings ,
) :
2021-12-07 12:13:14 +08:00
assert type ( trainer_endpoints ) is list , " trainer_endpoints must be list "
2025-08-21 02:03:08 +08:00
assert device_mode == DeviceMode . GPU , (
" Only support get mapped cluster for gpu now. "
)
2021-12-07 12:13:14 +08:00
def get_relative_gpu_id ( gpu_id ) :
cuda_visible_devices = os . getenv ( " CUDA_VISIBLE_DEVICES " )
if cuda_visible_devices is None or cuda_visible_devices == " " :
return gpu_id
else :
cuda_visible_devices_list = cuda_visible_devices . split ( ' , ' )
relative_id = cuda_visible_devices_list . index ( str ( gpu_id ) )
logger . info (
2024-04-01 10:20:33 +08:00
f " Change gpu id from { gpu_id } to { relative_id } based on CUDA_VISIBLE_DEVICES { cuda_visible_devices_list } "
2022-10-23 20:01:27 +08:00
)
2021-12-07 12:13:14 +08:00
return relative_id
cluster = Cluster ( hdfs = None )
for node_rank , ip in enumerate ( node_ips ) :
pod = Pod ( )
pod . rank = node_rank
pod . addr = ip
pod . device_mode = device_mode
cur_node_endpoints = trainer_endpoints [ node_rank ]
2021-10-28 11:03:28 +08:00
2021-12-07 12:13:14 +08:00
# choose rank from global mapped ranks and set it to the trainer.
ranks_per_node = node_ranks [ node_rank ]
cur_node_rank_mapping = node_rank_mappings [ node_rank ]
for i in range ( len ( ranks_per_node ) ) :
trainer = Trainer ( )
2022-10-23 20:01:27 +08:00
local_device_ids = cur_node_rank_mapping [ " ranks " ] [
str ( ranks_per_node [ i ] )
]
2025-08-21 02:03:08 +08:00
assert len ( local_device_ids ) == 1 , (
" Only support one process to one device mapping "
)
2022-10-23 20:01:27 +08:00
trainer . accelerators . append (
get_relative_gpu_id ( local_device_ids [ 0 ] )
)
2024-06-30 06:27:11 +08:00
trainer . endpoint = f " { cur_node_endpoints [ i ] } "
2021-12-07 12:13:14 +08:00
trainer . rank = ranks_per_node [ i ]
2021-10-28 11:03:28 +08:00
pod . trainers . append ( trainer )
cluster . pods . append ( pod )
pod_rank = node_ips . index ( node_ip )
return cluster , cluster . pods [ pod_rank ]
2021-12-07 12:13:14 +08:00
def get_mapped_cluster_from_args_with_rank_mapping ( args , device_mode ) :
2025-08-21 02:03:08 +08:00
assert device_mode == DeviceMode . GPU , (
" Only support get mapped cluster for gpu now. "
)
2022-11-18 19:03:10 +08:00
gpus_num = framework . core . get_cuda_device_count ( )
2021-10-28 11:03:28 +08:00
# parse ip-ranks json file
2021-12-07 12:13:14 +08:00
rank_mapping_path = args . rank_mapping_path or os . getenv (
2022-10-23 20:01:27 +08:00
" PADDLE_RANK_MAPPING_PATH "
)
2021-12-07 12:13:14 +08:00
rank_mapping = None
with open ( rank_mapping_path , " r " ) as json_file :
rank_mapping = json . load ( json_file )
# reset PADDLE_RANK_MAPPING_PATH env
os . environ [ " PADDLE_RANK_MAPPING_PATH " ] = " "
2021-10-28 11:03:28 +08:00
node_ips = [ ]
2021-12-07 12:13:14 +08:00
node_ranks = [ ]
node_rank_mappings = [ ]
for cur_rank_mapping in rank_mapping :
node_ips . append ( cur_rank_mapping [ ' addr ' ] )
cur_node_rank_list = [
int ( i ) for i in list ( cur_rank_mapping [ ' ranks ' ] . keys ( ) )
]
cur_node_rank_list . sort ( )
node_ranks . append ( cur_node_rank_list )
node_rank_mappings . append ( cur_rank_mapping )
2021-10-28 11:03:28 +08:00
if len ( node_ips ) == 1 :
node_ip = node_ips [ 0 ]
else :
if args . host :
node_ip = args . host
else :
_ , node_ip = get_host_name_ip ( )
2025-08-21 02:03:08 +08:00
assert node_ip in node_ips , (
f " Can ' t find your local ip {{ { node_ip } }} in node_ips: {{ { node_ips } }} "
)
2021-10-28 11:03:28 +08:00
node_rank = node_ips . index ( node_ip )
2025-08-21 02:03:08 +08:00
assert len ( node_ranks [ node_rank ] ) < = gpus_num , (
" number of ranks mapped to one node should not exceed the available ones. "
)
assert len ( node_ranks ) == len ( node_ips ) , (
" ranks length should be equal to ips length. "
)
2022-10-23 20:01:27 +08:00
logger . debug (
2023-09-22 10:14:38 +08:00
f " parsed from args: node_ips: { node_ips } node_ip: { node_ip } "
f " node_rank: { node_rank } node_ranks: { node_ranks [ node_rank ] } "
2022-10-23 20:01:27 +08:00
)
2021-10-28 11:03:28 +08:00
# NOTE: there are different number of global mapped ranks on each node.
free_ports = [ ]
trainer_endpoints = [ ]
for ip in node_ips :
node_rank = node_ips . index ( ip )
2021-12-07 12:13:14 +08:00
if os . environ . get ( ' PADDLE_PORT ' ) is not None :
start_port = int ( os . getenv ( " PADDLE_PORT " , " " ) )
2023-03-30 10:17:11 +08:00
free_ports = list (
range ( start_port , start_port + len ( node_ranks [ node_rank ] ) )
)
2021-12-07 12:13:14 +08:00
elif os . environ . get ( ' FLAGS_START_PORT ' ) is not None :
2021-10-28 11:03:28 +08:00
start_port = int ( os . environ . get ( ' FLAGS_START_PORT ' ) )
2023-03-30 10:17:11 +08:00
free_ports = list (
range ( start_port , start_port + len ( node_ranks [ node_rank ] ) )
)
2021-10-28 11:03:28 +08:00
else :
2021-12-07 12:13:14 +08:00
free_ports = find_free_ports ( len ( node_ranks [ node_rank ] ) )
2024-12-12 01:59:20 +08:00
trainer_endpoints . append ( [ f " { ip } : { port } " for port in free_ports ] )
2021-10-28 11:03:28 +08:00
2022-10-23 20:01:27 +08:00
return get_mapped_cluster_with_rank_mapping (
node_ips ,
node_ip ,
trainer_endpoints ,
device_mode ,
node_ranks ,
node_rank_mappings ,
)
2021-10-28 11:03:28 +08:00
2022-11-08 11:29:41 +08:00
class ParameterServerLauncher :
2020-10-13 09:52:59 +08:00
def __init__ ( self , args , distribute_mode ) :
self . args = args
self . distribute_mode = distribute_mode
2022-07-26 18:43:23 +08:00
self . with_coordinator = False
2020-10-13 09:52:59 +08:00
self . server_num = 0
self . worker_num = 0
self . heter_worker_num = 0
2022-07-26 18:43:23 +08:00
self . coordinator_num = 0
2020-10-13 09:52:59 +08:00
self . server_endpoints = " "
self . server_endpoints_ips = [ ]
self . server_endpoints_port = [ ]
self . worker_endpoints = " "
self . worker_endpoints_ips = [ ]
self . worker_endpoints_port = [ ]
self . heter_worker_endpoints = " "
self . heter_worker_endpoints_ips = [ ]
self . heter_worker_endpoints_port = [ ]
2022-07-26 18:43:23 +08:00
self . coordinator_endpoints = " "
self . coordinator_endpoints_ips = [ ]
self . coordinator_endpoints_port = [ ]
2020-10-13 09:52:59 +08:00
self . is_local = True
self . current_node_ip = " "
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
self . stage_trainer_num = [ ]
self . stage_heter_map = { }
self . stage_list = [ ]
self . stage_device_map = { }
self . stage_num = 0
2020-10-13 09:52:59 +08:00
self . get_role_endpoints ( args )
def get_role_endpoints ( self , args ) :
if args . server_num :
self . server_num = args . server_num
if args . servers :
2025-08-21 02:03:08 +08:00
assert len ( args . servers . split ( " , " ) ) == self . server_num , (
" The server_num and servers doesn ' t match. Expect servers endpoints num equal to server_num, but received servers endpoint num: {} and server_num {} " . format (
len ( args . servers . split ( " , " ) ) , self . server_num
)
2022-10-23 20:01:27 +08:00
)
2020-10-13 09:52:59 +08:00
self . server_endpoints = args . servers
else :
ports = get_ports ( self . server_num , 0 )
self . server_endpoints = " , " . join (
2022-10-23 20:01:27 +08:00
[ " 127.0.0.1: " + str ( x ) for x in ports ]
)
2020-10-13 09:52:59 +08:00
else :
2025-08-21 02:03:08 +08:00
assert args . servers != " " , (
" The setting of Parameter-Server must has server_num or servers. "
)
2020-10-13 09:52:59 +08:00
self . server_endpoints = args . servers
self . server_num = len ( self . server_endpoints . split ( " , " ) )
# get worker envs
if args . worker_num :
self . worker_num = args . worker_num
if args . workers :
2025-08-21 02:03:08 +08:00
assert len ( args . workers . split ( " , " ) ) == self . worker_num , (
" The worker_num and workers doesn ' t match. Expect workers endpoints num equal to worker_num, but received workers endpoint num: {} and worker_num {} " . format (
len ( args . workers . split ( " , " ) ) , self . worker_num
)
2022-10-23 20:01:27 +08:00
)
2020-10-13 09:52:59 +08:00
self . worker_endpoints = args . workers
else :
ports = get_ports ( self . worker_num , self . server_num )
self . worker_endpoints = " , " . join (
2022-10-23 20:01:27 +08:00
[ " 127.0.0.1: " + str ( x ) for x in ports ]
)
2020-10-13 09:52:59 +08:00
else :
2025-08-21 02:03:08 +08:00
assert args . workers != " " , (
" The setting of Parameter-Server must has worker_num or workers. "
)
2020-10-13 09:52:59 +08:00
worker_endpoints_ips = [
x . strip ( ) . split ( " : " ) [ 0 ] for x in args . workers . split ( " , " )
]
self . worker_num = len ( worker_endpoints_ips )
worker_endpoints_len = [
len ( x . strip ( ) . split ( " : " ) ) for x in args . workers . split ( " , " )
]
if 1 in worker_endpoints_len :
# if no port value in worker_endpoints, will set default port values.
start_port = 6170
worker_endpoints_port = range (
start_port + self . server_num ,
2022-10-23 20:01:27 +08:00
start_port + self . server_num + self . worker_num ,
1 ,
)
2020-10-13 09:52:59 +08:00
# create endpoints str
worker_endpoints = [ ]
for i in range ( self . worker_num ) :
2022-10-23 20:01:27 +08:00
worker_endpoints . append (
" : " . join (
(
worker_endpoints_ips [ i ] ,
str ( worker_endpoints_port [ i ] ) ,
)
)
)
2020-10-13 09:52:59 +08:00
self . worker_endpoints = " , " . join ( worker_endpoints )
else :
self . worker_endpoints = args . workers
2022-07-26 18:43:23 +08:00
# get coordinator envs
if args . coordinator_num :
self . with_coordinator = True
self . coordinator_num = args . coordinator_num
if args . coordinators :
2022-10-23 20:01:27 +08:00
assert (
len ( args . coordinators . split ( " , " ) ) == self . coordinator_num
2025-08-21 02:03:08 +08:00
) , (
" The coordinator_num and coordinators doesn ' t match. Expect coordinators endpoints num equal to coordinator_num, but received coordinator endpoint num: {} and coordinator_num {} " . format (
len ( args . coordinators . split ( " , " ) ) , self . coordinator_num
)
2022-10-23 20:01:27 +08:00
)
2022-07-26 18:43:23 +08:00
self . coordinator_endpoints = args . coordinators
else :
ports = get_ports ( self . coordinator_num , 1 )
self . coordinator_endpoints = " , " . join (
2022-10-23 20:01:27 +08:00
[ " 127.0.0.1: " + str ( x ) for x in ports ]
)
2022-07-26 18:43:23 +08:00
print ( " >>> use default coordinator addr(only one process) " )
2020-10-13 09:52:59 +08:00
# get heter worker envs
if self . distribute_mode == DistributeMode . PS_HETER :
2025-08-21 02:03:08 +08:00
assert args . heter_devices != " " , (
" The setting of Parameter-Server heter mode must has heter_devices. "
)
2022-11-07 10:52:21 +08:00
self . stage_device_map [ 1 ] = " cpu " # for cpu trainer
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
heter_devices_list = args . heter_devices . split ( " ; " )
for i in range ( len ( heter_devices_list ) ) :
self . stage_device_map [ i + 2 ] = heter_devices_list [ i ]
self . stage_heter_map [ 1 ] = self . worker_endpoints
2020-10-13 09:52:59 +08:00
if args . heter_worker_num :
2021-11-15 09:31:04 +08:00
self . stage_heter_trainer_num = args . heter_worker_num . split ( " ; " )
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
self . stage_heter_trainer_num = [
int ( trainer_num )
for trainer_num in self . stage_heter_trainer_num
]
2020-10-13 09:52:59 +08:00
if args . heter_workers :
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
assert len ( args . heter_workers . split ( " ; " ) ) == len (
self . stage_heter_trainer_num
2025-08-21 02:03:08 +08:00
) , (
" The stage_num and heter_workers doesn ' t match. Expect heter_workers endpoints stage num equal to heter_worker_num stage, but received heter_workers endpoint stage num: {} and heter_worker_num stage {} " . format (
len ( args . heter_workers . split ( " ; " ) ) ,
len ( self . stage_heter_trainer_num ) ,
)
2022-10-23 20:01:27 +08:00
)
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
heter_worker_endpoints_list = args . heter_workers . split ( " ; " )
self . heter_worker_endpoints = " "
for i in range ( len ( self . stage_heter_trainer_num ) ) :
if self . heter_worker_endpoints != " " :
self . heter_worker_endpoints + = " , "
heter_worker_endpoints = heter_worker_endpoints_list [
2022-10-23 20:01:27 +08:00
i
] . split ( " , " )
assert (
len ( heter_worker_endpoints )
== self . stage_heter_trainer_num [ i ]
2025-08-21 02:03:08 +08:00
) , (
f " The heter trainer num in stage { i } is not equal in args.heter_worker_num and args.heter_workers "
)
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
heter_worker_endpoints_ips = [
x . strip ( ) . split ( " : " ) [ 0 ]
for x in heter_worker_endpoints
]
heter_worker_endpoints_len = [
len ( x . strip ( ) . split ( " : " ) )
for x in heter_worker_endpoints
]
if 1 in heter_worker_endpoints_len :
# if no port value in heter_worker_endpoint, will set default port values.
heter_worker_endpoints_port = get_ports (
2022-06-05 10:58:58 +08:00
len ( heter_worker_endpoints_ips ) ,
2022-10-23 20:01:27 +08:00
self . worker_num
+ self . server_num
+ self . heter_worker_num ,
)
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
new_heter_worker_endpoints = [ ]
for j in range ( len ( heter_worker_endpoints_ips ) ) :
2022-10-23 20:01:27 +08:00
new_heter_worker_endpoints . append (
" : " . join (
(
heter_worker_endpoints_ips [ j ] ,
str ( heter_worker_endpoints_port [ j ] ) ,
)
)
)
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
ip_port_list = " , " . join ( new_heter_worker_endpoints )
else :
ip_port_list = " , " . join ( heter_worker_endpoints )
self . stage_heter_map [ i + 2 ] = ip_port_list
2022-10-23 20:01:27 +08:00
self . stage_list . extend (
[ i + 2 ] * len ( ip_port_list . split ( ' , ' ) )
)
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
self . heter_worker_num + = self . stage_heter_trainer_num [ i ]
self . heter_worker_endpoints + = ip_port_list
2020-10-13 09:52:59 +08:00
else :
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
for i in range ( len ( self . stage_heter_trainer_num ) ) :
heter_trainer_num = self . stage_heter_trainer_num [ i ]
2022-06-05 10:58:58 +08:00
ports = get_ports (
2022-10-23 20:01:27 +08:00
heter_trainer_num ,
self . server_num
+ self . worker_num
+ self . heter_worker_num ,
)
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
ip_port_list = " , " . join (
2022-10-23 20:01:27 +08:00
[ " 127.0.0.1: " + str ( x ) for x in ports ]
)
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
self . stage_heter_map [ i + 2 ] = ip_port_list
2022-10-23 20:01:27 +08:00
self . stage_list . extend (
[ i + 2 ] * len ( ip_port_list . split ( ' , ' ) )
)
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
self . heter_worker_num + = heter_trainer_num
if self . heter_worker_endpoints != " " :
self . heter_worker_endpoints + = " , "
self . heter_worker_endpoints + = ip_port_list
2020-10-13 09:52:59 +08:00
else :
2025-08-21 02:03:08 +08:00
assert args . heter_workers != " " , (
" The setting of Parameter-Server heter mode must has heter_worker_num or heter_workers. "
)
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
self . stage_heter_trainer_num = [ ]
heter_worker_endpoints_list = args . heter_workers . split ( " ; " )
self . heter_worker_endpoints = " "
for i in range ( len ( heter_worker_endpoints_list ) ) :
heter_worker_endpoints = heter_worker_endpoints_list [
2022-10-23 20:01:27 +08:00
i
] . split ( " , " )
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
self . stage_heter_trainer_num . append (
2022-10-23 20:01:27 +08:00
len ( heter_worker_endpoints )
)
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
heter_worker_endpoints_ips = [
x . strip ( ) . split ( " : " ) [ 0 ] for x in heter_worker_endpoints
]
heter_worker_endpoints_len = [
len ( x . strip ( ) . split ( " : " ) )
for x in heter_worker_endpoints
]
if 1 in heter_worker_endpoints_len :
# if no port value in heter_worker_endpoint, will set default port values.
heter_worker_endpoints_port = get_ports (
2022-10-23 20:01:27 +08:00
len ( heter_worker_endpoints_ips ) ,
self . worker_num
+ self . server_num
+ self . heter_worker_num ,
)
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
new_heter_worker_endpoints = [ ]
for j in range ( len ( heter_worker_endpoints_ips ) ) :
2022-10-23 20:01:27 +08:00
new_heter_worker_endpoints . append (
" : " . join (
(
heter_worker_endpoints_ips [ j ] ,
str ( heter_worker_endpoints_port [ j ] ) ,
)
)
)
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
ip_port_list = " , " . join ( new_heter_worker_endpoints )
else :
ip_port_list = " , " . join ( heter_worker_endpoints )
self . stage_heter_map [ i + 2 ] = ip_port_list
2022-10-23 20:01:27 +08:00
self . stage_list . extend (
[ i + 2 ] * len ( ip_port_list . split ( ' , ' ) )
)
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
self . heter_worker_num + = self . stage_heter_trainer_num [ - 1 ]
if self . heter_worker_endpoints != " " :
self . heter_worker_endpoints + = " , "
self . heter_worker_endpoints + = ip_port_list
2022-10-23 20:01:27 +08:00
self . stage_trainer_num = [
2024-08-14 01:40:01 +08:00
self . worker_num ,
* self . stage_heter_trainer_num ,
]
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
self . stage_num = len ( self . stage_trainer_num )
# get http_port
if args . http_port :
2021-11-22 15:01:30 +08:00
http_port = [ args . http_port ]
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
else :
http_port = get_ports (
2022-10-23 20:01:27 +08:00
1 , self . server_num + self . worker_num + self . heter_worker_num
)
2021-11-22 15:01:30 +08:00
http_ip = self . server_endpoints . split ( " , " ) [ 0 ] . split ( " : " ) [ 0 ]
self . http_port = http_ip + " : " + str ( http_port [ 0 ] )
2020-10-13 09:52:59 +08:00
# check local or user define
self . server_endpoints_ips = [
x . strip ( ) . split ( " : " ) [ 0 ] for x in self . server_endpoints . split ( " , " )
]
self . worker_endpoints_ips = [
x . strip ( ) . split ( " : " ) [ 0 ] for x in self . worker_endpoints . split ( " , " )
]
2022-07-26 18:43:23 +08:00
2022-11-01 17:12:28 +08:00
if self . with_coordinator :
2022-07-26 18:43:23 +08:00
self . coordinator_endpoints_ips = [
x . strip ( ) . split ( " : " ) [ 0 ]
for x in self . coordinator_endpoints . split ( " , " )
]
self . coordinator_endpoints_port = [
x . strip ( ) . split ( " : " ) [ 1 ]
for x in self . coordinator_endpoints . split ( " , " )
]
2020-10-13 09:52:59 +08:00
self . server_endpoints_port = [
x . strip ( ) . split ( " : " ) [ 1 ] for x in self . server_endpoints . split ( " , " )
]
self . worker_endpoints_port = [
x . strip ( ) . split ( " : " ) [ 1 ] for x in self . worker_endpoints . split ( " , " )
]
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
self . node_ips = [ ]
for ip in self . server_endpoints_ips :
if ip not in self . node_ips :
self . node_ips . append ( ip )
for ip in self . worker_endpoints_ips :
if ip not in self . node_ips :
self . node_ips . append ( ip )
2020-10-13 09:52:59 +08:00
if self . distribute_mode == DistributeMode . PS_HETER :
self . heter_worker_endpoints_ips = [
x . strip ( ) . split ( " : " ) [ 0 ]
for x in self . heter_worker_endpoints . split ( " , " )
]
self . heter_worker_endpoints_port = [
x . strip ( ) . split ( " : " ) [ 1 ]
for x in self . heter_worker_endpoints . split ( " , " )
]
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
for ip in self . heter_worker_endpoints_ips :
if ip not in self . node_ips :
self . node_ips . append ( ip )
2020-10-13 09:52:59 +08:00
if len ( set ( self . node_ips ) ) == 1 :
self . is_local = True
self . current_node_ip = self . node_ips [ 0 ]
else :
self . is_local = False
pod_ip = os . getenv ( " POD_IP " , None )
2022-11-01 22:14:52 +08:00
if pod_ip is None :
2020-10-13 09:52:59 +08:00
_ , self . current_node_ip = get_host_name_ip ( )
else :
self . current_node_ip = pod_ip
2021-12-07 12:13:14 +08:00
if not self . distribute_mode == DistributeMode . PS_HETER :
2025-08-21 02:03:08 +08:00
assert self . current_node_ip in self . node_ips , (
f " Can ' t find your local ip {{ { self . current_node_ip } }} in args.servers and args.workers ips: {{ { self . node_ips } }} "
)
2021-12-07 12:13:14 +08:00
if self . current_node_ip in self . node_ips :
self . node_rank = self . node_ips . index ( self . current_node_ip )
logger . debug (
2024-04-01 10:20:33 +08:00
f " parsed from args: node_ips: { self . node_ips } current_node_ip: { self . current_node_ip } node_rank: { self . node_rank } "
2022-10-23 20:01:27 +08:00
)
2020-10-13 09:52:59 +08:00
def start_ps ( self ) :
2022-10-11 11:32:23 +08:00
if self . current_node_ip not in self . node_ips :
2021-12-07 12:13:14 +08:00
return
2020-10-13 09:52:59 +08:00
cluster = Cluster ( hdfs = None )
server_rank = 0
worker_rank = 0
heter_worker_rank = 0
2022-07-26 18:43:23 +08:00
coordinator_rank = 0
2020-10-13 09:52:59 +08:00
for node_rank , ip in enumerate ( self . node_ips ) :
pod = Pod ( )
pod . rank = node_rank
pod . addr = ip
for i in range ( len ( self . server_endpoints_ips ) ) :
if ip == self . server_endpoints_ips [ i ] :
server = Trainer ( )
2023-09-22 10:14:38 +08:00
server . endpoint = f " { ip } : { self . server_endpoints_port [ i ] } "
2020-10-13 09:52:59 +08:00
server . rank = server_rank
server_rank + = 1
pod . servers . append ( server )
for j in range ( len ( self . worker_endpoints_ips ) ) :
if ip == self . worker_endpoints_ips [ j ] :
worker = Trainer ( )
2023-09-22 10:14:38 +08:00
worker . endpoint = f " { ip } : { self . worker_endpoints_port [ j ] } "
2020-10-13 09:52:59 +08:00
worker . rank = worker_rank
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
worker . stage = 1
2020-10-13 09:52:59 +08:00
worker_rank + = 1
pod . workers . append ( worker )
2022-07-26 18:43:23 +08:00
for m in range ( len ( self . coordinator_endpoints_ips ) ) :
if ip == self . coordinator_endpoints_ips [ m ] :
coordinator = Trainer ( )
2023-09-22 10:14:38 +08:00
coordinator . endpoint = (
f " { ip } : { self . coordinator_endpoints_port [ m ] } "
2022-10-23 20:01:27 +08:00
)
2022-07-26 18:43:23 +08:00
coordinator . rank = coordinator_rank
coordinator . stage = 1
coordinator_rank + = 1
pod . coordinators . append ( coordinator )
2020-10-13 09:52:59 +08:00
for k in range ( len ( self . heter_worker_endpoints_ips ) ) :
if ip == self . heter_worker_endpoints_ips [ k ] :
heter_worker = Trainer ( )
2024-04-01 10:20:33 +08:00
heter_worker . endpoint = (
f " { ip } : { self . heter_worker_endpoints_port [ k ] } "
2022-10-23 20:01:27 +08:00
)
2020-10-13 09:52:59 +08:00
heter_worker . rank = heter_worker_rank
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
heter_worker . stage = self . stage_list [ k ]
2020-10-13 09:52:59 +08:00
heter_worker_rank + = 1
pod . heter_workers . append ( heter_worker )
cluster . pods . append ( pod )
pod = cluster . pods [ self . node_rank ]
self . gloo_rendezvous_dir = tempfile . mkdtemp ( )
2024-02-20 12:00:53 +08:00
# 3. subprocess start
2022-07-26 18:43:23 +08:00
self . procs = {
" worker " : [ ] ,
" coordinator " : [ ] ,
" server " : [ ] ,
2022-10-23 20:01:27 +08:00
" heter_worker " : [ ] ,
2022-07-26 18:43:23 +08:00
}
self . cmds = {
" worker " : [ ] ,
" coordinator " : [ ] ,
" server " : [ ] ,
2022-10-23 20:01:27 +08:00
" heter_worker " : [ ] ,
2022-07-26 18:43:23 +08:00
}
self . log_fns = {
" worker " : [ ] ,
" coordinator " : [ ] ,
" server " : [ ] ,
2022-10-23 20:01:27 +08:00
" heter_worker " : [ ] ,
2022-07-26 18:43:23 +08:00
}
2020-10-13 09:52:59 +08:00
self . start_pod_server ( self . args , pod )
self . start_pod_worker ( self . args , pod )
2022-07-26 18:43:23 +08:00
if self . with_coordinator :
self . start_pod_coordinator ( self . args , pod )
2020-10-14 12:51:30 +08:00
if self . distribute_mode == DistributeMode . PS_HETER :
self . start_pod_heter_worker ( self . args , pod )
2020-10-13 09:52:59 +08:00
logger . info (
2024-04-01 10:20:33 +08:00
f " Please check servers, workers, coordinator and heter_worker logs in { self . args . log_dir } /workerlog.*, { self . args . log_dir } /serverlog.* , { self . args . log_dir } /coordinatorlog.*, and { self . args . log_dir } /heterlog.* "
2022-10-23 20:01:27 +08:00
)
2020-10-13 09:52:59 +08:00
# 4. wait for finish training
if len ( self . procs [ " worker " ] ) > 0 :
# if node has worker procs
# only wait worker to finish here
for i , proc in enumerate ( self . procs [ " worker " ] ) :
self . procs [ " worker " ] [ i ] . proc . wait ( )
if len ( self . log_fns [ " worker " ] ) > 0 :
self . log_fns [ " worker " ] [ i ] . close ( )
logger . info (
" all workers exit, going to finish parameter server and heter_worker. "
)
if len ( self . procs [ " heter_worker " ] ) > 0 :
for i , proc in enumerate ( self . procs [ " heter_worker " ] ) :
self . log_fns [ " heter_worker " ] [ i ] . close ( )
self . procs [ " heter_worker " ] [ i ] . proc . terminate ( )
logger . info ( " all heter_worker are killed " )
if len ( self . procs [ " server " ] ) > 0 :
for i , proc in enumerate ( self . procs [ " server " ] ) :
self . log_fns [ " server " ] [ i ] . close ( )
self . procs [ " server " ] [ i ] . proc . terminate ( )
logger . info ( " all parameter server are killed " )
2022-07-26 18:43:23 +08:00
if len ( self . procs [ " coordinator " ] ) > 0 :
for i , proc in enumerate ( self . procs [ " coordinator " ] ) :
self . log_fns [ " coordinator " ] [ i ] . close ( )
self . procs [ " coordinator " ] [ i ] . proc . terminate ( )
logger . info ( " all coordinators are killed " )
2020-10-13 09:52:59 +08:00
else :
# if node has not worker procs
# blocking training process
if len ( self . procs [ " server " ] ) > 0 :
for i , proc in enumerate ( self . procs [ " server " ] ) :
self . procs [ " server " ] [ i ] . proc . wait ( )
if len ( self . procs [ " heter_worker " ] ) > 0 :
for i , proc in enumerate ( self . procs [ " heter_worker " ] ) :
self . procs [ " heter_worker " ] [ i ] . proc . wait ( )
if os . path . exists ( self . gloo_rendezvous_dir ) :
shutil . rmtree ( self . gloo_rendezvous_dir )
def start_pod_server ( self , args , pod ) :
default_env = os . environ . copy ( )
current_env = copy . copy ( default_env )
current_env . pop ( " http_proxy " , None )
current_env . pop ( " https_proxy " , None )
for idx , cur_server in enumerate ( pod . servers ) :
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
if self . distribute_mode == DistributeMode . PS_HETER :
proc_env = {
" PADDLE_PSERVERS_IP_PORT_LIST " : self . server_endpoints ,
" PADDLE_TRAINER_ENDPOINTS " : self . worker_endpoints ,
2022-07-26 18:43:23 +08:00
" PADDLE_COORDINATOR_ENDPOINTS " : self . coordinator_endpoints ,
2022-10-23 20:01:27 +08:00
" PADDLE_ALL_HETER_TRAINER_IP_PORT_LIST " : self . heter_worker_endpoints ,
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
" PADDLE_PORT " : cur_server . endpoint . split ( " : " ) [ 1 ] ,
" TRAINING_ROLE " : " PSERVER " ,
" PADDLE_TRAINERS_NUM " : str ( self . worker_num ) ,
" POD_IP " : cur_server . endpoint . split ( " : " ) [ 0 ] ,
2022-06-05 10:58:58 +08:00
" PADDLE_WITH_GLOO " : str ( os . getenv ( " PADDLE_WITH_GLOO " , " 0 " ) ) ,
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
" PADDLE_GLOO_RENDEZVOUS " : " 3 " ,
" PADDLE_GLOO_FS_PATH " : self . gloo_rendezvous_dir ,
2022-10-23 20:01:27 +08:00
" PADDLE_GLOO_HTTP_ENDPOINT " : self . http_port ,
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
}
else :
proc_env = {
" PADDLE_PSERVERS_IP_PORT_LIST " : self . server_endpoints ,
" PADDLE_TRAINER_ENDPOINTS " : self . worker_endpoints ,
2022-07-26 18:43:23 +08:00
" PADDLE_COORDINATOR_ENDPOINTS " : self . coordinator_endpoints ,
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
" PADDLE_PORT " : cur_server . endpoint . split ( " : " ) [ 1 ] ,
" TRAINING_ROLE " : " PSERVER " ,
" PADDLE_TRAINERS_NUM " : str ( self . worker_num ) ,
" POD_IP " : cur_server . endpoint . split ( " : " ) [ 0 ] ,
2022-06-05 10:58:58 +08:00
" PADDLE_WITH_GLOO " : str ( os . getenv ( " PADDLE_WITH_GLOO " , " 0 " ) ) ,
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
" PADDLE_GLOO_RENDEZVOUS " : " 3 " ,
" PADDLE_GLOO_FS_PATH " : self . gloo_rendezvous_dir ,
2022-10-23 20:01:27 +08:00
" PADDLE_GLOO_HTTP_ENDPOINT " : self . http_port ,
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
}
2020-10-13 09:52:59 +08:00
current_env . update ( proc_env )
2022-10-23 20:01:27 +08:00
cmd = [
sys . executable ,
" -u " ,
args . training_script ,
2024-08-14 01:40:01 +08:00
* args . training_script_args ,
]
2020-10-13 09:52:59 +08:00
self . cmds [ " server " ] . append ( cmd )
if idx == 0 :
logger . info (
" Local server start {} processes. First process distributed "
" environment info (Only For Debug): {} " . format (
len ( pod . servers ) ,
2022-10-23 20:01:27 +08:00
pretty_print_envs (
proc_env , ( " Distributed Envs " , " Value " )
) ,
)
)
2020-10-13 09:52:59 +08:00
if args . log_dir is not None :
2023-04-07 10:27:27 +08:00
os . makedirs ( args . log_dir , exist_ok = True )
2024-12-12 01:59:20 +08:00
fn = open ( f " { args . log_dir } /serverlog. { idx } " , " w " )
2020-10-13 09:52:59 +08:00
self . log_fns [ " server " ] . append ( fn )
2022-10-23 20:01:27 +08:00
proc = subprocess . Popen (
cmd , env = current_env , stdout = fn , stderr = fn
)
2020-10-13 09:52:59 +08:00
else :
proc = subprocess . Popen ( cmd , env = current_env )
tp = TrainerProc ( )
tp . proc = proc
tp . rank = cur_server . rank
tp . local_rank = idx
tp . log_fn = fn
tp . log_offset = fn . tell ( ) if fn else None
tp . cmd = cmd
self . procs [ " server " ] . append ( tp )
def start_pod_worker ( self , args , pod ) :
default_env = os . environ . copy ( )
current_env = copy . copy ( default_env )
current_env . pop ( " http_proxy " , None )
current_env . pop ( " https_proxy " , None )
heter_device_num = 0
device_list = [ ]
2022-11-18 19:03:10 +08:00
if framework . core . is_compiled_with_cuda ( ) :
2020-10-13 09:52:59 +08:00
device_list = get_gpus ( args . gpus )
heter_device_num = len ( device_list )
2022-11-18 19:03:10 +08:00
elif framework . core . is_compiled_with_xpu ( ) :
heter_device_num = framework . core . get_xpu_device_count ( )
2020-10-13 09:52:59 +08:00
device_list = [ str ( x ) for x in range ( 0 , heter_device_num ) ]
for idx , cur_worker in enumerate ( pod . workers ) :
2022-10-23 20:01:27 +08:00
device_id = (
" 0 "
if heter_device_num == 0
else str ( device_list [ ( idx ) % heter_device_num ] )
)
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
if self . distribute_mode == DistributeMode . PS_HETER :
proc_env = {
2022-10-23 20:01:27 +08:00
" PADDLE_PSERVERS_IP_PORT_LIST " : self . server_endpoints ,
" PADDLE_TRAINER_ENDPOINTS " : self . worker_endpoints ,
" PADDLE_TRAINERS_NUM " : str ( self . worker_num ) ,
" PADDLE_COORDINATOR_ENDPOINTS " : self . coordinator_endpoints ,
" PADDLE_STAGE_TRAINERS_NUM " : str ( self . stage_trainer_num ) ,
" STAGE_ID " : " 1 " ,
" STAGE_NUM " : str ( self . stage_num ) ,
" PADDLE_PREVIOUS_HETER_TRAINER_IP_PORT_LIST " : " " ,
" PADDLE_NEXT_HETER_TRAINER_IP_PORT_LIST " : self . stage_heter_map [
2
] ,
" PADDLE_ALL_HETER_TRAINER_IP_PORT_LIST " : self . heter_worker_endpoints ,
" HETER_DEVICE_TYPE " : self . stage_device_map [ 1 ] ,
" TRAINING_ROLE " : " TRAINER " ,
" POD_IP " : cur_worker . endpoint . split ( " : " ) [ 0 ] ,
" PADDLE_PORT " : cur_worker . endpoint . split ( " : " ) [ 1 ] ,
" PADDLE_TRAINER_ID " : str ( cur_worker . rank ) ,
" PADDLE_WITH_GLOO " : str ( os . getenv ( " PADDLE_WITH_GLOO " , " 0 " ) ) ,
" PADDLE_GLOO_RENDEZVOUS " : " 3 " ,
" PADDLE_GLOO_FS_PATH " : self . gloo_rendezvous_dir ,
" FLAGS_selected_gpus " : " 0 " ,
" FLAGS_selected_xpus " : " 0 " ,
" CUDA_VISIBLE_DEVICES " : device_id ,
" XPU_VISIBLE_DEVICES " : device_id ,
" PADDLE_GLOO_HTTP_ENDPOINT " : self . http_port ,
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
}
else :
proc_env = {
" PADDLE_PSERVERS_IP_PORT_LIST " : self . server_endpoints ,
" PADDLE_TRAINER_ENDPOINTS " : self . worker_endpoints ,
" PADDLE_TRAINERS_NUM " : str ( self . worker_num ) ,
" TRAINING_ROLE " : " TRAINER " ,
2022-07-26 18:43:23 +08:00
" PADDLE_COORDINATOR_ENDPOINTS " : self . coordinator_endpoints ,
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
" POD_IP " : cur_worker . endpoint . split ( " : " ) [ 0 ] ,
" PADDLE_PORT " : cur_worker . endpoint . split ( " : " ) [ 1 ] ,
" PADDLE_TRAINER_ID " : str ( cur_worker . rank ) ,
2022-06-05 10:58:58 +08:00
" PADDLE_WITH_GLOO " : str ( os . getenv ( " PADDLE_WITH_GLOO " , " 0 " ) ) ,
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
" PADDLE_GLOO_RENDEZVOUS " : " 3 " ,
" PADDLE_GLOO_FS_PATH " : self . gloo_rendezvous_dir ,
" FLAGS_selected_gpus " : " 0 " ,
" FLAGS_selected_xpus " : " 0 " ,
" CUDA_VISIBLE_DEVICES " : device_id ,
" XPU_VISIBLE_DEVICES " : device_id ,
2022-10-23 20:01:27 +08:00
" PADDLE_GLOO_HTTP_ENDPOINT " : self . http_port ,
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
}
2020-10-13 09:52:59 +08:00
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
current_env . update ( proc_env )
2022-10-23 20:01:27 +08:00
cmd = [
sys . executable ,
" -u " ,
args . training_script ,
2024-08-14 01:40:01 +08:00
* args . training_script_args ,
]
2020-10-13 09:52:59 +08:00
self . cmds [ " worker " ] . append ( cmd )
if idx == 0 :
logger . info (
" Local worker start {} processes. First process distributed "
" environment info (Only For Debug): {} " . format (
len ( pod . workers ) ,
2022-10-23 20:01:27 +08:00
pretty_print_envs (
proc_env , ( " Distributed Envs " , " Value " )
) ,
)
)
2020-10-13 09:52:59 +08:00
if args . log_dir is not None :
2023-04-07 10:27:27 +08:00
os . makedirs ( args . log_dir , exist_ok = True )
2024-12-12 01:59:20 +08:00
fn = open ( f " { args . log_dir } /workerlog. { idx } " , " w " )
2020-10-13 09:52:59 +08:00
self . log_fns [ " worker " ] . append ( fn )
2022-10-23 20:01:27 +08:00
proc = subprocess . Popen (
cmd , env = current_env , stdout = fn , stderr = fn
)
2020-10-13 09:52:59 +08:00
else :
proc = subprocess . Popen ( cmd , env = current_env )
tp = TrainerProc ( )
tp . proc = proc
tp . rank = cur_worker . rank
tp . local_rank = idx
tp . log_fn = fn
tp . log_offset = fn . tell ( ) if fn else None
tp . cmd = cmd
self . procs [ " worker " ] . append ( tp )
2022-07-26 18:43:23 +08:00
def start_pod_coordinator ( self , args , pod ) :
print ( " >>> entering start_pod_coordinator " )
default_env = os . environ . copy ( )
current_env = copy . copy ( default_env )
current_env . pop ( " http_proxy " , None )
current_env . pop ( " https_proxy " , None )
for idx , cur_coordinator in enumerate ( pod . coordinators ) :
device_id = " 0 "
proc_env = {
" PADDLE_PSERVERS_IP_PORT_LIST " : self . server_endpoints ,
" PADDLE_TRAINER_ENDPOINTS " : self . worker_endpoints ,
" PADDLE_TRAINERS_NUM " : str ( self . worker_num ) ,
" PADDLE_COORDINATOR_ENDPOINTS " : self . coordinator_endpoints ,
" PADDLE_COORDINATOR_NUM " : str ( self . coordinator_num ) ,
" TRAINING_ROLE " : " COORDINATOR " ,
" POD_IP " : cur_coordinator . endpoint . split ( " : " ) [ 0 ] ,
" PADDLE_PORT " : cur_coordinator . endpoint . split ( " : " ) [ 1 ] ,
" PADDLE_TRAINER_ID " : str ( cur_coordinator . rank ) ,
" PADDLE_WITH_GLOO " : str ( os . getenv ( " PADDLE_WITH_GLOO " , " 0 " ) ) ,
" PADDLE_GLOO_RENDEZVOUS " : " 3 " ,
" PADDLE_GLOO_FS_PATH " : self . gloo_rendezvous_dir ,
" FLAGS_selected_gpus " : " 0 " ,
" FLAGS_selected_xpus " : " 0 " ,
" CUDA_VISIBLE_DEVICES " : device_id ,
" XPU_VISIBLE_DEVICES " : device_id ,
2022-10-23 20:01:27 +08:00
" PADDLE_GLOO_HTTP_ENDPOINT " : self . http_port ,
2022-07-26 18:43:23 +08:00
}
current_env . update ( proc_env )
2022-10-23 20:01:27 +08:00
cmd = [
sys . executable ,
" -u " ,
args . training_script ,
2024-08-14 01:40:01 +08:00
* args . training_script_args ,
]
2022-07-26 18:43:23 +08:00
self . cmds [ " coordinator " ] . append ( cmd )
if idx == 0 :
logger . info (
" Local coordinator start {} processes. First process distributed "
" environment info (Only For Debug): {} " . format (
len ( pod . coordinators ) ,
2022-10-23 20:01:27 +08:00
pretty_print_envs (
proc_env , ( " Distributed Envs " , " Value " )
) ,
)
)
2022-07-26 18:43:23 +08:00
if args . log_dir is not None :
2023-04-07 10:27:27 +08:00
os . makedirs ( args . log_dir , exist_ok = True )
2024-12-12 01:59:20 +08:00
fn = open ( f " { args . log_dir } /coordinator. { idx } " , " w " )
2022-07-26 18:43:23 +08:00
self . log_fns [ " coordinator " ] . append ( fn )
2022-10-23 20:01:27 +08:00
proc = subprocess . Popen (
cmd , env = current_env , stdout = fn , stderr = fn
)
2022-07-26 18:43:23 +08:00
else :
proc = subprocess . Popen ( cmd , env = current_env )
tp = TrainerProc ( )
tp . proc = proc
tp . rank = cur_coordinator . rank
tp . local_rank = idx
tp . log_fn = fn
tp . log_offset = fn . tell ( ) if fn else None
tp . cmd = cmd
self . procs [ " coordinator " ] . append ( tp )
2020-10-13 09:52:59 +08:00
def start_pod_heter_worker ( self , args , pod ) :
default_env = os . environ . copy ( )
current_env = copy . copy ( default_env )
current_env . pop ( " http_proxy " , None )
current_env . pop ( " https_proxy " , None )
heter_device_num = 0
device_list = [ ]
2022-11-18 19:03:10 +08:00
if framework . core . is_compiled_with_cuda ( ) :
2020-10-13 09:52:59 +08:00
device_list = get_gpus ( args . gpus )
heter_device_num = len ( device_list )
2022-11-18 19:03:10 +08:00
elif framework . core . is_compiled_with_xpu ( ) :
heter_device_num = framework . core . get_xpu_device_count ( )
2020-10-13 09:52:59 +08:00
device_list = [ str ( x ) for x in range ( 0 , heter_device_num ) ]
for idx , cur_heter_worker in enumerate ( pod . heter_workers ) :
2022-10-23 20:01:27 +08:00
device_id = (
" 0 "
if heter_device_num == 0
else str ( device_list [ ( idx ) % heter_device_num ] )
)
[Heterps]Refactor Heter Pipeline Parameter Server (#36845)
* change username
* fix
* fix
* fix
* fix
* fix
* update
* update
* update unittests
* fix
* update
* fix
* update
* fix
* fix
* fix
* update
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update send_and_recv op. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* update. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix. test=develop
* fix ut. test=develop
* fix unit. notest,test=coverage
* fix ut. notest, test=coverage
* update. notest,test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix. notest, test=coverage
* fix. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* fix ut. notest, test=coverage
* add func. notest, test=coverage
* fix ut. notest, test=coverage
* fix. test=develop
* fix. test=develop
2021-11-11 13:24:41 +08:00
stage_id = cur_heter_worker . stage
2020-10-13 09:52:59 +08:00
proc_env = {
2022-10-23 20:01:27 +08:00
" PADDLE_PSERVERS_IP_PORT_LIST " : self . server_endpoints ,
" PADDLE_TRAINER_ENDPOINTS " : self . worker_endpoints ,
2024-07-03 10:41:41 +08:00
" PADDLE_NEXT_HETER_TRAINER_IP_PORT_LIST " : (
self . stage_heter_map [ stage_id + 1 ]
if stage_id < = self . stage_num - 1
else " "
) ,
2022-10-23 20:01:27 +08:00
" PADDLE_PREVIOUS_HETER_TRAINER_IP_PORT_LIST " : self . stage_heter_map [
stage_id - 1
] ,
" PADDLE_ALL_HETER_TRAINER_IP_PORT_LIST " : self . heter_worker_endpoints ,
" HETER_DEVICE_TYPE " : self . stage_device_map [ stage_id ] ,
" STAGE_ID " : str ( stage_id ) ,
" STAGE_NUM " : str ( self . stage_num ) ,
" PADDLE_PORT " : cur_heter_worker . endpoint . split ( " : " ) [ 1 ] ,
" TRAINING_ROLE " : " HETER_TRAINER " ,
" PADDLE_TRAINERS_NUM " : str ( self . worker_num ) ,
" PADDLE_STAGE_TRAINERS_NUM " : str ( self . stage_trainer_num ) ,
" POD_IP " : cur_heter_worker . endpoint . split ( " : " ) [ 0 ] ,
" PADDLE_WITH_GLOO " : str ( os . getenv ( " PADDLE_WITH_GLOO " , " 0 " ) ) ,
" PADDLE_GLOO_RENDEZVOUS " : " 3 " ,
" PADDLE_GLOO_FS_PATH " : self . gloo_rendezvous_dir ,
" FLAGS_selected_gpus " : " 0 " ,
" FLAGS_selected_xpus " : " 0 " ,
" CUDA_VISIBLE_DEVICES " : device_id ,
" XPU_VISIBLE_DEVICES " : device_id ,
" PADDLE_GLOO_HTTP_ENDPOINT " : self . http_port ,
2020-10-13 09:52:59 +08:00
}
current_env . update ( proc_env )
2022-10-23 20:01:27 +08:00
cmd = [
sys . executable ,
" -u " ,
args . training_script ,
2024-08-14 01:40:01 +08:00
* args . training_script_args ,
]
2020-10-13 09:52:59 +08:00
self . cmds [ " heter_worker " ] . append ( cmd )
if idx == 0 :
logger . info (
" Local heter_worker start {} processes. First process distributed "
" environment info (Only For Debug): {} " . format (
len ( pod . heter_workers ) ,
2022-10-23 20:01:27 +08:00
pretty_print_envs (
proc_env , ( " Distributed Envs " , " Value " )
) ,
)
)
2020-10-13 09:52:59 +08:00
if args . log_dir is not None :
2023-04-07 10:27:27 +08:00
os . makedirs ( args . log_dir , exist_ok = True )
2024-12-12 01:59:20 +08:00
fn = open ( f " { args . log_dir } /heterlog. { idx } " , " w " )
2020-10-13 09:52:59 +08:00
self . log_fns [ " heter_worker " ] . append ( fn )
2022-10-23 20:01:27 +08:00
proc = subprocess . Popen (
cmd , env = current_env , stdout = fn , stderr = fn
)
2020-10-13 09:52:59 +08:00
else :
proc = subprocess . Popen ( cmd , env = current_env )
tp = TrainerProc ( )
tp . proc = proc
tp . rank = cur_heter_worker . rank
tp . local_rank = idx
tp . log_fn = fn
tp . log_offset = fn . tell ( ) if fn else None
tp . cmd = cmd
self . procs [ " heter_worker " ] . append ( tp )
2021-10-21 14:07:13 +08:00
def check_backend ( backend ) :
2022-08-03 18:52:16 +08:00
if backend not in [
2022-10-23 20:01:27 +08:00
' nccl ' ,
' gloo ' ,
' bkcl ' ,
' auto ' ,
' heter ' ,
' xccl ' ,
2025-03-31 11:56:20 +08:00
' flagcx ' ,
2022-08-03 18:52:16 +08:00
] :
raise ValueError (
" paddle.distributed initialize error, "
" backend argument can only be one of "
2023-04-17 11:28:39 +08:00
" ' nccl ' , ' gloo ' , ' bkcl ' , ' auto ' , ' heter ' , ' xccl ' "
2024-06-30 06:27:11 +08:00
f " but got { backend } "
2022-10-23 20:01:27 +08:00
)
2021-10-21 14:07:13 +08:00
2022-11-18 19:03:10 +08:00
if backend == ' nccl ' and not framework . core . is_compiled_with_cuda ( ) :
2021-10-21 14:07:13 +08:00
raise ValueError (
" paddle.distributed initialize error, "
" your paddle is not compiled with cuda but you assign ' nccl ' as backend. "
)
2022-11-18 19:03:10 +08:00
if backend == ' bkcl ' and not framework . core . is_compiled_with_xpu ( ) :
2021-10-21 14:07:13 +08:00
raise ValueError (
" paddle.distributed initialize error, "
" your paddle is not compiled with xpu but you assign ' bkcl ' as backend. "
)
2025-03-31 11:56:20 +08:00
if backend == ' flagcx ' and not framework . core . is_compiled_with_flagcx ( ) :
raise ValueError (
" paddle.distributed initialize error, "
" your paddle is not compiled with flagcx but you assign ' flagcx ' as backend. "
)
2021-10-21 14:07:13 +08:00
def block_windows_and_macos ( backend ) :
2022-10-23 20:01:27 +08:00
if backend != ' gloo ' :
return
2021-10-21 14:07:13 +08:00
if utils . OS_NAME . startswith ( ' darwin ' ) : # MACOS , block
raise ValueError (
" You are going to using gloo on macos, but currently is not supported "
)
if utils . IS_WINDOWS : # MACOS , block
raise ValueError (
" You are going to using gloo on windows, but currently is not supported "
)
def get_backend_by_compile_flag ( ) :
2022-11-18 19:03:10 +08:00
if framework . core . is_compiled_with_cuda ( ) :
2021-10-21 14:07:13 +08:00
return ' nccl '
2022-11-18 19:03:10 +08:00
if framework . core . is_compiled_with_xpu ( ) :
2021-10-21 14:07:13 +08:00
return ' bkcl '
return ' gloo '