2022-02-08 17:22:06 +08:00
|
|
|
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
|
2022-06-05 10:58:58 +08:00
|
|
|
#
|
2022-02-08 17:22:06 +08:00
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
|
|
# You may obtain a copy of the License at
|
2022-06-05 10:58:58 +08:00
|
|
|
#
|
2022-02-08 17:22:06 +08:00
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
2022-06-05 10:58:58 +08:00
|
|
|
#
|
2022-02-08 17:22:06 +08:00
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
|
# limitations under the License.
|
|
|
|
|
|
2022-11-28 11:43:47 +08:00
|
|
|
import paddle
|
2023-09-07 17:26:19 +08:00
|
|
|
from paddle import base
|
2022-10-24 15:26:50 +08:00
|
|
|
from paddle.distributed.fleet.base.private_helper_function import (
|
|
|
|
|
wait_server_ready,
|
|
|
|
|
)
|
2022-10-12 10:52:31 +08:00
|
|
|
from paddle.distributed.passes import new_pass
|
2022-02-08 17:22:06 +08:00
|
|
|
|
2022-11-29 18:50:04 +08:00
|
|
|
from .public import * # noqa: F403
|
|
|
|
|
|
2022-02-08 17:22:06 +08:00
|
|
|
|
2022-11-08 11:29:41 +08:00
|
|
|
class PsProgramBuilder:
|
2022-02-08 17:22:06 +08:00
|
|
|
def __init__(self, pass_ctx):
|
|
|
|
|
self.pass_ctx = pass_ctx
|
|
|
|
|
self.attrs = self.pass_ctx._attrs
|
|
|
|
|
self.loss = self.attrs['loss']
|
2022-06-02 12:46:17 +08:00
|
|
|
self.origin_startup_program = self.attrs['origin_startup_program']
|
|
|
|
|
self.main_program = self.attrs['origin_main_programs']
|
|
|
|
|
|
2022-02-08 17:22:06 +08:00
|
|
|
self.cloned_main = self.attrs['cloned_main']
|
|
|
|
|
self.cloned_startup = self.attrs['cloned_startup']
|
|
|
|
|
|
|
|
|
|
self.use_ps_gpu = self.attrs['use_ps_gpu']
|
|
|
|
|
self.use_heter_ps = self.attrs['is_heter_ps_mode']
|
|
|
|
|
self.is_worker = self.attrs['is_worker']
|
|
|
|
|
self.is_heter_worker = self.attrs['is_heter_worker']
|
2022-06-02 12:46:17 +08:00
|
|
|
self.is_server = self.attrs['is_server']
|
2022-02-08 17:22:06 +08:00
|
|
|
self.ps_mode = self.attrs['ps_mode']
|
|
|
|
|
|
|
|
|
|
self.launch_barrier = self.attrs['launch_barrier']
|
|
|
|
|
self.launch_barrier_flag = self.attrs['launch_barrier_flag']
|
2022-10-24 15:26:50 +08:00
|
|
|
self.server_endpoints = self.attrs[
|
|
|
|
|
'role_maker'
|
|
|
|
|
]._get_pserver_endpoints()
|
2022-02-08 17:22:06 +08:00
|
|
|
|
2022-03-23 17:02:23 +08:00
|
|
|
def _build_trainer_desc(self):
|
|
|
|
|
opt_info = self.loss.block.program._fleet_opt
|
|
|
|
|
opt_info = {} if opt_info is None else opt_info
|
2022-03-28 20:36:15 +08:00
|
|
|
opt_info["trainer"] = opt_info.get("trainer", "MultiTrainer")
|
|
|
|
|
opt_info["device_worker"] = opt_info.get("device_worker", "Hogwild")
|
2022-03-23 17:02:23 +08:00
|
|
|
self.cloned_main._fleet_opt = opt_info
|
|
|
|
|
|
2022-02-08 17:22:06 +08:00
|
|
|
def _optimize_programs(self):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def _build_trainer_programs(self):
|
2022-02-11 13:08:06 +08:00
|
|
|
raise NotImplementedError
|
2022-02-08 17:22:06 +08:00
|
|
|
|
|
|
|
|
def _build_pserver_programs(self):
|
|
|
|
|
is_sgd_adam = False
|
|
|
|
|
ops = get_optimize_ops(self.attrs['origin_main_program'])
|
|
|
|
|
if len(ops) == 0:
|
|
|
|
|
return
|
2022-10-24 15:26:50 +08:00
|
|
|
add_lr_decay_table_pass = new_pass(
|
|
|
|
|
'add_lr_decay_table_pass', self.attrs
|
|
|
|
|
)
|
2022-02-08 17:22:06 +08:00
|
|
|
add_lr_decay_table_pass.apply([], [], self.pass_ctx)
|
|
|
|
|
for op in ops:
|
|
|
|
|
if op.type in ["sgd", "adam"]:
|
|
|
|
|
is_sgd_adam = True
|
|
|
|
|
break
|
|
|
|
|
if is_sgd_adam:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
def _build_programs(self):
|
|
|
|
|
if self.attrs['is_worker']:
|
|
|
|
|
self._build_trainer_programs()
|
2023-09-07 17:26:19 +08:00
|
|
|
base.framework.switch_startup_program(self.cloned_startup)
|
2022-10-24 15:26:50 +08:00
|
|
|
print(
|
2024-04-01 10:20:33 +08:00
|
|
|
f"paddle.static.default_startup_program: {paddle.static.default_startup_program}"
|
2022-10-24 15:26:50 +08:00
|
|
|
)
|
2022-03-23 17:02:23 +08:00
|
|
|
# print("ps_program_build before =", id(self.loss.block.program))
|
|
|
|
|
self._build_trainer_desc()
|
2022-02-08 17:22:06 +08:00
|
|
|
self.loss.block.program = self.cloned_main
|
2022-03-23 17:02:23 +08:00
|
|
|
# print("ps_program_build after =", id(self.loss.block.program))
|
|
|
|
|
# print("ps_program_build clone after =", id(self.cloned_main))
|
|
|
|
|
# print("ps_program_build after trainer_desc",
|
|
|
|
|
# id(self.loss.block.program))
|
|
|
|
|
# print("ps_program build trainer desc",
|
|
|
|
|
# self.loss.block.program._fleet_opt)
|
2022-02-08 17:22:06 +08:00
|
|
|
|
|
|
|
|
elif self.attrs['is_server']:
|
|
|
|
|
self._build_pserver_programs()
|
|
|
|
|
self.loss.block.program = self.attrs['_main_server']
|
2023-09-07 17:26:19 +08:00
|
|
|
base.framework.switch_startup_program(self.attrs['_startup_server'])
|
2022-02-08 17:22:06 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
class GeoPsProgramBuilder(PsProgramBuilder): # 仅 CPU 模式
|
|
|
|
|
def __init__(self, pass_ctx):
|
2022-11-03 14:33:00 +08:00
|
|
|
super().__init__(pass_ctx)
|
2022-02-08 17:22:06 +08:00
|
|
|
if self.ps_mode != DistributedMode.GEO:
|
2022-10-24 15:26:50 +08:00
|
|
|
raise ValueError(
|
2024-06-30 16:37:22 +08:00
|
|
|
f"ps mode: {self.ps_mode} not matched GeoPsProgramBuilder",
|
2022-10-24 15:26:50 +08:00
|
|
|
)
|
2022-02-08 17:22:06 +08:00
|
|
|
|
|
|
|
|
def _build_trainer_programs(self):
|
|
|
|
|
append_send_ops_pass = new_pass("append_send_ops_pass", self.attrs)
|
|
|
|
|
append_send_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)
|
|
|
|
|
|
|
|
|
|
self.attrs['origin_main_program'] = self.cloned_main
|
|
|
|
|
|
|
|
|
|
if self.launch_barrier and self.launch_barrier_flag:
|
2022-03-05 00:11:53 +08:00
|
|
|
wait_server_ready(self.server_endpoints)
|
2022-02-08 17:22:06 +08:00
|
|
|
|
2022-03-23 17:02:23 +08:00
|
|
|
def _build_pserver_programs(self):
|
2022-10-24 15:26:50 +08:00
|
|
|
add_listen_and_serv_pass = new_pass(
|
|
|
|
|
'add_listen_and_serv_pass', self.attrs
|
|
|
|
|
)
|
|
|
|
|
add_listen_and_serv_pass.apply(
|
|
|
|
|
[self.attrs['_main_server']], [None], self.pass_ctx
|
|
|
|
|
)
|
2022-03-23 17:02:23 +08:00
|
|
|
|
2022-02-08 17:22:06 +08:00
|
|
|
|
2022-08-13 08:27:37 +08:00
|
|
|
class NuPsProgramBuilder(PsProgramBuilder):
|
|
|
|
|
def __init__(self, pass_ctx):
|
2022-11-03 14:33:00 +08:00
|
|
|
super().__init__(pass_ctx)
|
2022-08-13 08:27:37 +08:00
|
|
|
if not self.attrs['local_sparse']:
|
|
|
|
|
raise ValueError("No local sparse params")
|
|
|
|
|
|
|
|
|
|
def _build_trainer_programs(self):
|
2022-10-24 15:26:50 +08:00
|
|
|
add_lr_decay_table_pass = new_pass(
|
|
|
|
|
"add_lr_decay_table_pass", self.attrs
|
|
|
|
|
)
|
2022-08-13 08:27:37 +08:00
|
|
|
add_lr_decay_table_pass.apply([], [], self.pass_ctx)
|
|
|
|
|
|
|
|
|
|
distributed_ops_pass = new_pass("distributed_ops_pass", self.attrs)
|
|
|
|
|
distributed_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)
|
|
|
|
|
|
|
|
|
|
delete_optimizer_pass = new_pass("delete_optimizer_pass", self.attrs)
|
|
|
|
|
delete_optimizer_pass.apply([self.cloned_main], [None], self.pass_ctx)
|
|
|
|
|
|
2022-10-24 15:26:50 +08:00
|
|
|
append_send_ops_pass = new_pass(
|
|
|
|
|
"append_send_ops_pass", self.attrs
|
|
|
|
|
) # fleet->PushDenseVarsAsync
|
2022-08-13 08:27:37 +08:00
|
|
|
append_send_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)
|
|
|
|
|
|
2022-10-24 15:26:50 +08:00
|
|
|
delete_extra_optimizer_pass = new_pass(
|
|
|
|
|
"delete_extra_optimizer_pass", self.attrs
|
|
|
|
|
)
|
|
|
|
|
delete_extra_optimizer_pass.apply(
|
|
|
|
|
[self.attrs['origin_main_program']],
|
|
|
|
|
[self.cloned_startup],
|
|
|
|
|
self.pass_ctx,
|
|
|
|
|
)
|
2022-08-13 08:27:37 +08:00
|
|
|
|
|
|
|
|
fake_init_ops_pass = new_pass("fake_init_ops_pass", self.attrs)
|
|
|
|
|
fake_init_ops_pass.apply([None], [self.cloned_startup], self.pass_ctx)
|
|
|
|
|
|
2022-10-24 15:26:50 +08:00
|
|
|
append_send_ops_pass = new_pass(
|
|
|
|
|
"append_send_ops_pass", self.attrs
|
|
|
|
|
) # communicator->Send
|
2022-08-13 08:27:37 +08:00
|
|
|
append_send_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)
|
|
|
|
|
|
|
|
|
|
self.attrs['origin_main_program'] = self.cloned_main
|
|
|
|
|
self.attrs['origin_startup_program'] = self.cloned_startup
|
|
|
|
|
|
|
|
|
|
if self.launch_barrier and self.launch_barrier_flag:
|
|
|
|
|
wait_server_ready(self.server_endpoints)
|
|
|
|
|
|
|
|
|
|
|
2022-02-08 17:22:06 +08:00
|
|
|
class CpuSyncPsProgramBuilder(PsProgramBuilder):
|
|
|
|
|
def __init__(self, pass_ctx):
|
2022-11-03 14:33:00 +08:00
|
|
|
super().__init__(pass_ctx)
|
2022-10-24 15:26:50 +08:00
|
|
|
if (
|
|
|
|
|
self.ps_mode != DistributedMode.SYNC
|
|
|
|
|
and self.ps_mode != DistributedMode.ASYNC
|
|
|
|
|
):
|
|
|
|
|
raise ValueError(
|
2024-06-30 16:37:22 +08:00
|
|
|
f"ps mode: {self.ps_mode} not matched PsProgramBuilder"
|
2022-10-24 15:26:50 +08:00
|
|
|
)
|
2022-02-08 17:22:06 +08:00
|
|
|
|
|
|
|
|
def _build_trainer_programs(self):
|
2022-03-23 17:02:23 +08:00
|
|
|
# print("build trainer program entry")
|
|
|
|
|
# print("before ps program builder program:", self.cloned_main)
|
2022-10-24 15:26:50 +08:00
|
|
|
add_lr_decay_table_pass = new_pass(
|
|
|
|
|
"add_lr_decay_table_pass", self.attrs
|
|
|
|
|
)
|
2022-02-08 17:22:06 +08:00
|
|
|
add_lr_decay_table_pass.apply([], [], self.pass_ctx)
|
|
|
|
|
|
2022-03-23 17:02:23 +08:00
|
|
|
# print("before distributed op pass")
|
2022-02-08 17:22:06 +08:00
|
|
|
distributed_ops_pass = new_pass("distributed_ops_pass", self.attrs)
|
|
|
|
|
distributed_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)
|
|
|
|
|
|
|
|
|
|
delete_optimizer_pass = new_pass("delete_optimizer_pass", self.attrs)
|
|
|
|
|
delete_optimizer_pass.apply([self.cloned_main], [None], self.pass_ctx)
|
|
|
|
|
|
|
|
|
|
append_send_ops_pass = new_pass("append_send_ops_pass", self.attrs)
|
|
|
|
|
append_send_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)
|
|
|
|
|
|
2022-10-24 15:26:50 +08:00
|
|
|
delete_extra_optimizer_pass = new_pass(
|
|
|
|
|
"delete_extra_optimizer_pass", self.attrs
|
|
|
|
|
)
|
|
|
|
|
delete_extra_optimizer_pass.apply(
|
|
|
|
|
[self.attrs['origin_main_program']],
|
|
|
|
|
[self.cloned_startup],
|
|
|
|
|
self.pass_ctx,
|
|
|
|
|
)
|
2022-02-08 17:22:06 +08:00
|
|
|
|
|
|
|
|
fake_init_ops_pass = new_pass("fake_init_ops_pass", self.attrs)
|
|
|
|
|
fake_init_ops_pass.apply([None], [self.cloned_startup], self.pass_ctx)
|
|
|
|
|
|
|
|
|
|
self.attrs['origin_main_program'] = self.cloned_main
|
|
|
|
|
self.attrs['origin_startup_program'] = self.cloned_startup
|
2022-03-23 17:02:23 +08:00
|
|
|
# print("after ps program builder program:", self.cloned_main)
|
2022-02-08 17:22:06 +08:00
|
|
|
|
|
|
|
|
if self.launch_barrier and self.launch_barrier_flag:
|
2022-03-05 00:11:53 +08:00
|
|
|
wait_server_ready(self.server_endpoints)
|
2022-02-08 17:22:06 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
class CpuAsyncPsProgramBuilder(CpuSyncPsProgramBuilder):
|
|
|
|
|
def __init__(self, pass_ctx):
|
2022-11-03 14:33:00 +08:00
|
|
|
super().__init__(pass_ctx)
|
2022-02-08 17:22:06 +08:00
|
|
|
|
2022-03-28 20:36:15 +08:00
|
|
|
def _build_trainer_desc(self):
|
|
|
|
|
opt_info = self.loss.block.program._fleet_opt
|
|
|
|
|
opt_info = {} if opt_info is None else opt_info
|
|
|
|
|
opt_info["trainer"] = opt_info.get("trainer", "DistMultiTrainer")
|
2022-10-24 15:26:50 +08:00
|
|
|
opt_info["device_worker"] = opt_info.get(
|
|
|
|
|
"device_worker", "DownpourLite"
|
|
|
|
|
)
|
2022-03-28 20:36:15 +08:00
|
|
|
pid = str(id(self.cloned_main))
|
|
|
|
|
program_configs = {
|
|
|
|
|
pid: {
|
|
|
|
|
'pull_dense': [],
|
|
|
|
|
'push_dense': [],
|
|
|
|
|
'pull_sparse': [],
|
2022-10-24 15:26:50 +08:00
|
|
|
'push_sparse': [],
|
2022-03-28 20:36:15 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
dense_table_config = {}
|
|
|
|
|
send_ctx = get_the_one_send_context(self.attrs)
|
|
|
|
|
recv_ctx = get_the_one_recv_context(self.attrs)
|
|
|
|
|
for name, ctx in send_ctx.items():
|
|
|
|
|
if ctx.program_id() != id(self.loss.block.program):
|
|
|
|
|
continue
|
|
|
|
|
if ctx.is_sparse():
|
|
|
|
|
continue
|
|
|
|
|
if not ctx.is_tensor_table():
|
|
|
|
|
program_configs[pid]['pull_dense'].append(ctx.table_id())
|
|
|
|
|
program_configs[pid]['push_dense'].append(ctx.table_id())
|
|
|
|
|
dense_table_config[ctx.table_id()] = recv_ctx[ctx.table_id()]
|
|
|
|
|
opt_info['program_configs'] = program_configs
|
|
|
|
|
opt_info['dense_table_config'] = dense_table_config
|
|
|
|
|
self.cloned_main._fleet_opt = opt_info
|
|
|
|
|
|
2022-02-08 17:22:06 +08:00
|
|
|
|
2022-02-11 13:08:06 +08:00
|
|
|
class GpuPsProgramBuilder(PsProgramBuilder):
|
2022-02-08 17:22:06 +08:00
|
|
|
def __init__(self, pass_ctx):
|
2022-11-03 14:33:00 +08:00
|
|
|
super().__init__(pass_ctx)
|
2022-02-08 17:22:06 +08:00
|
|
|
|
|
|
|
|
def _build_trainer_programs(self):
|
2022-10-24 15:26:50 +08:00
|
|
|
add_lr_decay_table_pass = new_pass(
|
|
|
|
|
"add_lr_decay_table_pass", self.attrs
|
|
|
|
|
)
|
2022-02-08 17:22:06 +08:00
|
|
|
add_lr_decay_table_pass.apply([], [], self.pass_ctx)
|
|
|
|
|
|
|
|
|
|
distributed_ops_pass = new_pass("distributed_ops_pass", self.attrs)
|
|
|
|
|
distributed_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)
|
|
|
|
|
|
|
|
|
|
fake_init_ops_pass = new_pass("fake_init_ops_pass", self.attrs)
|
|
|
|
|
fake_init_ops_pass.apply([None], [self.cloned_startup], self.pass_ctx)
|
|
|
|
|
|
|
|
|
|
ps_gpu_pass = new_pass("ps_gpu_pass", self.attrs)
|
|
|
|
|
ps_gpu_pass.apply([self.cloned_main], [None], self.pass_ctx)
|
|
|
|
|
|
2023-12-08 16:01:27 +08:00
|
|
|
if not getattr(self.attrs['user_defined_strategy'], "sharding", False):
|
|
|
|
|
ps_transpile_pass = new_pass("ps_transpile_pass", self.attrs)
|
|
|
|
|
ps_transpile_pass.apply(
|
|
|
|
|
[self.cloned_main], [self.cloned_startup], self.pass_ctx
|
|
|
|
|
)
|
2022-02-08 17:22:06 +08:00
|
|
|
|
|
|
|
|
self.attrs['origin_main_program'] = self.cloned_main
|
|
|
|
|
self.attrs['origin_startup_program'] = self.cloned_startup
|
|
|
|
|
|
|
|
|
|
if self.launch_barrier and self.launch_barrier_flag:
|
2022-03-05 00:11:53 +08:00
|
|
|
wait_server_ready(self.server_endpoints)
|
2022-02-08 17:22:06 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
class HeterAsyncPsProgramBuilder(PsProgramBuilder):
|
|
|
|
|
def __init__(self, pass_ctx):
|
2022-11-03 14:33:00 +08:00
|
|
|
super().__init__(pass_ctx)
|
2022-02-08 17:22:06 +08:00
|
|
|
|
|
|
|
|
def _build_trainer_programs(self):
|
2022-10-24 15:26:50 +08:00
|
|
|
add_lr_decay_table_pass = new_pass(
|
|
|
|
|
"add_lr_decay_table_pass", self.attrs
|
|
|
|
|
)
|
2022-02-08 17:22:06 +08:00
|
|
|
add_lr_decay_table_pass.apply([], [], self.pass_ctx)
|
|
|
|
|
|
|
|
|
|
distributed_ops_pass = new_pass("distributed_ops_pass", self.attrs)
|
2022-02-14 14:18:41 +08:00
|
|
|
distributed_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)
|
2022-02-08 17:22:06 +08:00
|
|
|
|
|
|
|
|
delete_optimizer_pass = new_pass("delete_optimizer_pass", self.attrs)
|
2022-02-14 14:18:41 +08:00
|
|
|
delete_optimizer_pass.apply([self.cloned_main], [None], self.pass_ctx)
|
2022-02-08 17:22:06 +08:00
|
|
|
|
|
|
|
|
append_send_ops_pass = new_pass("append_send_ops_pass", self.attrs)
|
|
|
|
|
append_send_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)
|
|
|
|
|
|
2022-10-24 15:26:50 +08:00
|
|
|
delete_extra_optimizer_pass = new_pass(
|
|
|
|
|
"delete_extra_optimizer_pass", self.attrs
|
|
|
|
|
)
|
|
|
|
|
delete_extra_optimizer_pass.apply(
|
|
|
|
|
[self.attrs['origin_main_program']],
|
|
|
|
|
[self.cloned_startup],
|
|
|
|
|
self.pass_ctx,
|
|
|
|
|
)
|
2022-02-08 17:22:06 +08:00
|
|
|
|
|
|
|
|
fake_init_ops_pass = new_pass("fake_init_ops_pass", self.attrs)
|
|
|
|
|
fake_init_ops_pass.apply([None], [self.cloned_startup], self.pass_ctx)
|
|
|
|
|
|
|
|
|
|
if self.is_heter_worker:
|
|
|
|
|
split_heter_worker_ops_pass = new_pass(
|
2022-10-24 15:26:50 +08:00
|
|
|
"split_heter_worker_ops_pass", self.attrs
|
|
|
|
|
)
|
|
|
|
|
split_heter_worker_ops_pass.apply(
|
|
|
|
|
[self.cloned_main], [None], self.pass_ctx
|
|
|
|
|
)
|
2022-02-08 17:22:06 +08:00
|
|
|
else:
|
2022-10-24 15:26:50 +08:00
|
|
|
split_trainer_ops_pass = new_pass(
|
|
|
|
|
"split_trainer_ops_pass", self.attrs
|
|
|
|
|
)
|
|
|
|
|
split_trainer_ops_pass.apply(
|
|
|
|
|
[self.cloned_main], [None], self.pass_ctx
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
set_heter_pipeline_opt_pass = new_pass(
|
|
|
|
|
'set_heter_pipeline_opt_pass', self.attrs
|
|
|
|
|
)
|
|
|
|
|
set_heter_pipeline_opt_pass.apply(
|
|
|
|
|
[self.cloned_main], [self.cloned_startup], self.pass_ctx
|
|
|
|
|
)
|
2022-02-08 17:22:06 +08:00
|
|
|
|
|
|
|
|
if self.launch_barrier and self.launch_barrier_flag:
|
2022-03-05 00:11:53 +08:00
|
|
|
wait_server_ready(self.server_endpoints)
|
2022-02-08 17:22:06 +08:00
|
|
|
|
|
|
|
|
def _build_programs(self):
|
|
|
|
|
if self.attrs['is_worker'] or self.attrs['is_heter_worker']:
|
|
|
|
|
self._build_trainer_programs()
|
|
|
|
|
ps_set_heter_pipeline_opt_pass = new_pass(
|
2022-10-24 15:26:50 +08:00
|
|
|
"set_heter_pipeline_opt_pass", self.attrs
|
|
|
|
|
)
|
|
|
|
|
ps_set_heter_pipeline_opt_pass.apply(
|
|
|
|
|
[self.cloned_main], [self.cloned_startup], self.pass_ctx
|
|
|
|
|
)
|
2022-02-08 17:22:06 +08:00
|
|
|
|
|
|
|
|
elif self.attrs['is_server']:
|
|
|
|
|
self._build_pserver_programs()
|
|
|
|
|
self.loss.block.program = self.attrs['_main_server']
|
2023-09-07 17:26:19 +08:00
|
|
|
base.framework.switch_startup_program(self.attrs['_startup_server'])
|
2022-02-08 17:22:06 +08:00
|
|
|
|
|
|
|
|
|
2022-06-02 12:46:17 +08:00
|
|
|
class FlPsProgramBuilder(HeterAsyncPsProgramBuilder):
|
2022-02-08 17:22:06 +08:00
|
|
|
def __init__(self, pass_ctx):
|
2022-11-03 14:33:00 +08:00
|
|
|
super().__init__(pass_ctx)
|
2022-02-08 17:22:06 +08:00
|
|
|
|
|
|
|
|
def _build_trainer_programs(self):
|
2022-06-02 12:46:17 +08:00
|
|
|
_main_file = ps_log_root_dir + '0_fl_worker_main_program.prototxt'
|
2022-10-24 15:26:50 +08:00
|
|
|
# debug_program(_main_file, self.cloned_main)
|
2022-06-02 12:46:17 +08:00
|
|
|
|
|
|
|
|
distributed_ops_pass = new_pass("distributed_ops_pass", self.attrs)
|
|
|
|
|
distributed_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)
|
|
|
|
|
|
|
|
|
|
_main_file = ps_log_root_dir + '1_fl_worker_main_program.prototxt'
|
2022-10-24 15:26:50 +08:00
|
|
|
# debug_program(_main_file, self.cloned_main)
|
2022-06-02 12:46:17 +08:00
|
|
|
|
|
|
|
|
delete_optimizer_pass = new_pass("delete_optimizer_pass", self.attrs)
|
|
|
|
|
delete_optimizer_pass.apply([self.cloned_main], [None], self.pass_ctx)
|
|
|
|
|
|
|
|
|
|
_main_file = ps_log_root_dir + '2_fl_worker_main_program.prototxt'
|
2022-10-24 15:26:50 +08:00
|
|
|
# debug_program(_main_file, self.cloned_main)
|
2022-06-02 12:46:17 +08:00
|
|
|
|
|
|
|
|
append_send_ops_pass = new_pass("append_send_ops_pass", self.attrs)
|
|
|
|
|
append_send_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)
|
|
|
|
|
|
|
|
|
|
_main_file = ps_log_root_dir + '3_fl_worker_main_program.prototxt'
|
2022-10-24 15:26:50 +08:00
|
|
|
# debug_program(_main_file, self.cloned_main)
|
2022-06-02 12:46:17 +08:00
|
|
|
|
2022-10-24 15:26:50 +08:00
|
|
|
delete_extra_optimizer_pass = new_pass(
|
|
|
|
|
"delete_extra_optimizer_pass", self.attrs
|
|
|
|
|
)
|
|
|
|
|
delete_extra_optimizer_pass.apply(
|
|
|
|
|
[self.attrs['origin_main_program']],
|
|
|
|
|
[self.cloned_startup],
|
|
|
|
|
self.pass_ctx,
|
|
|
|
|
)
|
2022-06-02 12:46:17 +08:00
|
|
|
|
|
|
|
|
_main_file = ps_log_root_dir + '4_fl_worker_main_program.prototxt'
|
2022-10-24 15:26:50 +08:00
|
|
|
# debug_program(_main_file, self.cloned_main)
|
2022-06-02 12:46:17 +08:00
|
|
|
|
2022-10-24 15:26:50 +08:00
|
|
|
# fake_init_ops_pass = new_pass("fake_init_ops_pass", self.attrs)
|
|
|
|
|
# fake_init_ops_pass.apply([None], [self.cloned_startup], self.pass_ctx)
|
2022-06-02 12:46:17 +08:00
|
|
|
|
|
|
|
|
_main_file = ps_log_root_dir + '5_fl_worker_main_program.prototxt'
|
2022-10-24 15:26:50 +08:00
|
|
|
# debug_program(_main_file, self.cloned_main)
|
2022-06-02 12:46:17 +08:00
|
|
|
|
|
|
|
|
split_trainer_ops_pass = new_pass("split_fl_ops_pass", self.attrs)
|
|
|
|
|
split_trainer_ops_pass.apply([self.cloned_main], [None], self.pass_ctx)
|
|
|
|
|
|
|
|
|
|
if not self.is_heter_worker:
|
|
|
|
|
self.part_a_program = self.pass_ctx._attrs['part_a_main_program']
|
|
|
|
|
self.cloned_main = self.part_a_program
|
|
|
|
|
_main_file = ps_log_root_dir + '8_fl_A_main_program.prototxt'
|
|
|
|
|
debug_program(_main_file, self.cloned_main)
|
|
|
|
|
else:
|
|
|
|
|
self.part_b_program = self.pass_ctx._attrs['part_b_main_program']
|
|
|
|
|
self.cloned_main = self.part_b_program
|
|
|
|
|
_main_file = ps_log_root_dir + '8_fl_B_main_program.prototxt'
|
|
|
|
|
debug_program(_main_file, self.cloned_main)
|
|
|
|
|
|
2022-10-24 15:26:50 +08:00
|
|
|
set_heter_pipeline_opt_pass = new_pass(
|
|
|
|
|
'set_heter_pipeline_opt_pass', self.attrs
|
|
|
|
|
)
|
|
|
|
|
set_heter_pipeline_opt_pass.apply(
|
|
|
|
|
[self.cloned_main], [self.cloned_startup], self.pass_ctx
|
|
|
|
|
)
|
2022-06-02 12:46:17 +08:00
|
|
|
|
|
|
|
|
self.attrs['origin_startup_program'] = self.cloned_startup
|
|
|
|
|
self.attrs['origin_main_program'] = self.cloned_main
|
|
|
|
|
|
|
|
|
|
if not self.is_heter_worker:
|
|
|
|
|
_main_file = ps_log_root_dir + 'final_fl_A_main_program.prototxt'
|
2022-06-05 10:58:58 +08:00
|
|
|
debug_program(
|
2022-10-24 15:26:50 +08:00
|
|
|
_main_file,
|
|
|
|
|
self.attrs['origin_main_program']._heter_pipeline_opt[
|
|
|
|
|
'section_program'
|
|
|
|
|
],
|
|
|
|
|
)
|
2022-06-02 12:46:17 +08:00
|
|
|
else:
|
|
|
|
|
_main_file = ps_log_root_dir + 'final_fl_B_main_program.prototxt'
|
2022-06-05 10:58:58 +08:00
|
|
|
debug_program(
|
2022-10-24 15:26:50 +08:00
|
|
|
_main_file,
|
|
|
|
|
self.attrs['origin_main_program']._heter_pipeline_opt[
|
|
|
|
|
'section_program'
|
|
|
|
|
],
|
|
|
|
|
)
|
2022-06-02 12:46:17 +08:00
|
|
|
|
2022-02-08 17:22:06 +08:00
|
|
|
def _build_pserver_programs(self):
|
2022-06-02 12:46:17 +08:00
|
|
|
self.loss.block.program = self.attrs['_main_server']
|
2022-02-08 17:22:06 +08:00
|
|
|
|
|
|
|
|
def _build_programs(self):
|
2022-06-02 12:46:17 +08:00
|
|
|
if not self.is_server:
|
|
|
|
|
self._build_trainer_programs()
|
2023-09-07 17:26:19 +08:00
|
|
|
base.framework.switch_startup_program(self.cloned_startup)
|
2023-01-16 15:37:39 +08:00
|
|
|
paddle.framework.switch_main_program(self.cloned_main)
|
2022-10-24 15:26:50 +08:00
|
|
|
print(
|
2024-04-01 10:20:33 +08:00
|
|
|
f"paddle.static.default_startup_program: {paddle.static.default_startup_program()._heter_pipeline_opt}"
|
2022-10-24 15:26:50 +08:00
|
|
|
)
|
2022-06-02 12:46:17 +08:00
|
|
|
else:
|
|
|
|
|
self._build_pserver_programs()
|
2023-09-07 17:26:19 +08:00
|
|
|
base.framework.switch_startup_program(self.attrs['_startup_server'])
|
2023-01-16 15:37:39 +08:00
|
|
|
paddle.framework.switch_main_program(self.attrs['_main_server'])
|