Fleet is High-Level API for distributed training in PaddlePaddle. The name of Fleet means that a large crowd of ships working together to finish a large scale job. The design of Fleet makes a trade-off between easy-to-use and algorithmic extensibility. First, a user can shift from single machine paddle fluid code to distributed code within ten lines of code. Second, different algorithms can be easily defined through distributed strategy through Fleet API.
Note: all the examples here should be replicated from develop branch of Paddle
We show quick-start examples for user to use Collective training with Fleet API. Multiple GPU training is frequently used in modern AI models that require high performance computing ability of deep learning framework. Sample codes can be found in examples/quick-start folder.
Suppose we define a simple neural nets in examples/quick-start/nets.py
import paddle.fluid as fluid
def mlp(input_x, input_y, hid_dim=128, label_dim=2):
fc_1 = fluid.layers.fc(input=input_x, size=hid_dim, act='tanh')
fc_2 = fluid.layers.fc(input=fc_1, size=hid_dim, act='tanh')
prediction = fluid.layers.fc(input=[fc_2], size=label_dim, act='softmax')
cost = fluid.layers.cross_entropy(input=prediction, label=input_y)
avg_cost = fluid.layers.mean(x=cost)
return avg_cost
Simple local training can be defined as in examples/quick-start/local_trainer.py
import paddle.fluid as fluid
from nets import mlp
from utils import gen_data
input_x = fluid.layers.data(name="x", shape=[32], dtype='float32')
input_y = fluid.layers.data(name="y", shape=[1], dtype='int64')
cost = mlp(input_x, input_y)
optimizer = fluid.optimizer.SGD(learning_rate=0.01)
place = fluid.CUDAPlace(0)
exe = fluid.Executor(place)
step = 1001
for i in range(step):
cost_val = exe.run(feed=gen_data(), fetch_list=[cost.name])
print("step%d cost=%f" % (i, cost_val[0]))
Parameter server training is suitable for parallel training of large-scale data. Based on the definition of single-machine model, we give an example of training with parameter server, source code of this example is in examples/quick-start/distributed_train.py
import paddle.fluid as fluid
from nets import mlp
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.base import role_maker
from utils import gen_data
input_x = fluid.layers.data(name="x", shape=[32], dtype='float32')
input_y = fluid.layers.data(name="y", shape=[1], dtype='int64')
cost = mlp(input_x, input_y)
optimizer = fluid.optimizer.SGD(learning_rate=0.01)
role = role_maker.PaddleCloudRoleMaker()
optimizer = fleet.distributed_optimizer(optimizer)
if fleet.is_server():
elif fleet.is_worker():
place = fluid.CPUPlace()
exe = fluid.Executor(place)
step = 1001
for i in range(step):
cost_val = exe.run(
print("worker_index: %d, step%d cost = %f" %
(fleet.worker_index(), i, cost_val[0]))
Command for parameter server training with multiple process on CPU is as follows:
python -m paddle.distributed.launch_ps distributed_train.py
If you want to use high performance chip to do distributed training, such as distributed GPU training, Fleet API will help you by adding less than 10 lines of code, source code of this example is in examples/quick-start/collective_train.py
import paddle.fluid as fluid
from nets import mlp
from paddle.fluid.incubate.fleet.collective import fleet
from paddle.fluid.incubate.fleet.base import role_maker
from utils import gen_data
input_x = fluid.layers.data(name="x", shape=[32], dtype='float32')
input_y = fluid.layers.data(name="y", shape=[1], dtype='int64')
cost = mlp(input_x, input_y)
optimizer = fluid.optimizer.SGD(learning_rate=0.01)
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
optimizer = fleet.distributed_optimizer(optimizer)
place = fluid.CUDAPlace(0)
exe = fluid.Executor(place)
step = 1001
for i in range(step):
cost_val = exe.run(
print("worker_index: %d, step%d cost = %f" %
(fleet.worker_index(), i, cost_val[0]))
Command for distributed training with multiple process on multiple GPU card is as follows:
python -m paddle.distributed.launch collective_train.py
Fleet API aims to help users run distributed training of deep learning models with easy-to-use API. Also, Fleet API maintains the ability to plugin new distributed algorithm by developers of PaddlePaddle.
It is responsible for fleet initialization, which needs to be called before using other Fleet interfaces. Basically it is used to define multi-machine environment configuration.
It is used to check whether the current node is a worker node in parameter server training. If the current node is a worker node,the function will return True, otherwise the function will return False.
It is used to check whether the current node is a server node in parameter server training. If the current node is a server node,the function will return True, otherwise the function will return False.
Fleet loads model-related parameters stored in model_dir to initialize parameter server in parameter server training.
It is used to start the server service in parameter server training.
It is used to start the worker service in parameter server training.
It is used to terminate the worker service after the parameter server training is done.
distributed_optimizer(optimizer, strategy=None)
It is used to decorate the distributed optimization algorithm. Users can bring in single optimizer and configure distributed training strategy, the function will return a distributed optimizer
- MPISymetricRoleMaker
description:MPISymetric RoleMaker assumes that each node starts two processes, one for worker service and one for parameter server service, which requires an MPI environment on the user's cluster.
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.base import role_maker
role = role_maker.MPISymetricRoleMaker()
command for running the example:
mpirun -np 2 python trainer.py
- PaddleCloudRoleMaker
description:Paddle CloudRoleMaker is a high-level encapsulation that supports starting scripts using paddle.distributed.launch or paddle.distributed.launch_ps.
Parameter Server training example:
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.base import role_maker
role = role_maker.PaddleCloudRoleMaker()
command for running the example:
python -m paddle.distributed.launch_ps --worker_num 2 --server_num 2 trainer.py
Collective training example:
from paddle.fluid.incubate.fleet.collective import fleet
from paddle.fluid.incubate.fleet.base import role_maker
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
command for running the example:
python -m paddle.distributed.launch trainer.py
- UserDefinedRoleMaker
description:User-defined node role information, IP and port information.
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.base import role_maker
role = role_maker.UserDefinedRoleMaker(
role=role_maker.Role.WORKER if bool(int(os.getenv("IS_WORKER")))
else role_maker.Role.SERVER,
Parameter Server Training
- Sync_mode
Collective Training
- LocalSGD
- ReduceGrad
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.collective import fleet