PeriFlow Python SDK
PeriFlow Python SDK for training machine learning models on FriendliAI PeriFlow. PeriFlow SDK is compatible with both local mode and cloud mode. When running your training script in your local machine, PeriFlow SDK runs in local mode. On the other hand, PeriFlow SDK runs in cloud mode when running your training script by using PeriFlow. If you want to use PeriFlow, please contact to us.
Contents
Installation
PeriFlow SDK is built to PyPI and can be installed as follows:
pip install periflow_sdk
Also, you can install from source by cloning this repository:
git clone https://github.com/friendliai/periflow-python-sdk.git
cd periflow-python-sdk
pip install .
Using PeriFlow SDK
Basically, you can import and use our SDK as follows:
import periflow_sdk as pf
PeriFlow SDK provides several functions to make your training code works with PeriFlow.
pf.init(total_train_steps: int, local_log_name: Optional[str] = None) -> None
This function initializes PeriFlow. All other functions of PeriFlow SDK should be called after initialization.
total_train_steps
: The number of total training steps of the training joblocal_log_name
: local filename wherepf.metric
writes into
Note: local_log_name
is meaningful to local mode
pf.start_step() -> None
Mark that a single training step begins.
pf.end_step() -> None
Mark that the single training step ends.
Note: we provide function pf.train_step
, a contextmanager which wraps start_step
and end_step
. train_step
can be used as follows:
with pf.train_step():
# your training step code
pf.upload_checkpoint() -> None
Trigger uploading the checkpoint of the current step.
Note: This function does nothing in local mode
Note: In distributed training (e.g., torch DDP), all ranks should call this function even if some of them have not actually saved the checkpoint.
For example, not
import torch
...
if torch.distributed.get_rank() == 0:
torch.save(state_dict, CKPT_PATH)
pf.upload_checkpoint()
, but
import torch
...
if torch.distributed.get_rank() == 0:
torch.save(state_dict, CKPT_PATH)
pf.upload_checkpoint()
pf.metric(msg: Dict[str, JSONValue]) -> None
Optional function which logs a key-value metric dict and sends it to PeriFlow. However, this function does nothing in cloud mode. In local mode, this function writes the given metric dict into the local file system.
Examples
We provide simple examples in which PeriFlow SDK is applied.
Each example contains a template as pf-template.yml
, which is needed when launching a training job with PeriFlow:
pf job run -f (cifar|huggingface|pth-lightning)/pf-template.yml -d (cifar|huggingface|pth-lightning)
Vanilla PyTorch Example
CIFAR example is an example training script that uses vanilla PyTorch and Torch Distributed Data Parallel (DDP). To apply PeriFlow SDK, we first initialize PeriFlow as follows:
pf.init(total_train_steps=total_steps)
Then we wrap calling the train_step
with pf.train_step
.
with pf.train_step():
loss, learning_rate = train_step(inputs=inputs,
labels=labels,
model=net,
loss_function=loss_function,
optimizer=optimizer,
lr_scheduler=lr_scheduler)
if not args.use_cpu:
torch.cuda.synchronize()
end_time = time.time()
Finally, we call upload_checkpoint
after saving checkpoint.
if args.save and step % args.save_interval == 0:
if torch_ddp.get_rank() == 0:
torch.save({"latest_step": step,
"model": net.state_dict(),
"optimizer": optimizer.state_dict(),
"lr_scheduler": lr_scheduler.state_dict()},
os.path.join(args.save, "checkpoint.pt"))
pf.upload_checkpoint()
HuggingFace Trainer Example
HuggingFace example is an example training script that uses HuggingFace Trainer class.
Instead of inheriting and writing a custom Trainer class, we inject PeriFlowCallback
as follows:
class PeriFlowCallback(TrainerCallback):
def on_step_begin(self, args, state, control, **kwargs):
pf.start_step()
def on_step_end(self, args, state, control, **kwargs):
pf.end_step()
def on_save(self, args, state, control, **kwargs):
pf.upload_checkpoint()
Then, we create a Trainer class using our custom callback.
pf.init(total_train_steps=training_args.max_steps)
callback = PeriFlowCallback()
# Initialize our Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset if training_args.do_train else None,
eval_dataset=eval_dataset if training_args.do_eval else None,
compute_metrics=compute_metrics,
tokenizer=tokenizer,
data_collator=data_collator,
callbacks=[callback],
)
PyTorch Lightning Trainer Example
PyTorch Lightning example is an example training script that uses PyTorch Lightning Trainer class. Similar to HuggingFace, we can create custom callback as follows:
class PeriFlowCallback(Callback):
def on_train_batch_start(self,
trainer: pl.Trainer,
pl_module: pl.LightningModule,
batch: Any,
batch_idx: int,
unused: int = 0) -> None:
pf.start_step()
def on_train_batch_end(self,
trainer: pl.Trainer,
pl_module: pl.LightningModule,
outputs: STEP_OUTPUT,
batch: Any,
batch_idx: int,
unused: int = 0) -> None:
loss = float(outputs['loss'])
pf.metric({
"iteration": trainer.global_step,
"loss": loss,
})
pf.end_step()
However, because PyTorch Lightning does not provide on_checkpoint_save
callback, we write a simple PeriFlowTrainer
.
class PeriFlowTrainer(Trainer):
def save_checkpoint(self,
filepath: Union[str, Path],
weights_only: bool = False,
storage_options: Optional[Any] = None) -> None:
super().save_checkpoint(filepath, weights_only=weights_only, storage_options=storage_options)
pf.upload_checkpoint()
With PeriFlowCallback
and PeriFlowTrainer
, we can start the training.
periflow_callback = PeriFlowCallback()
trainer = PeriFlowTrainer(
max_epochs=args.num_epochs,
callbacks=[periflow_callback, checkpoint_callback],
enable_checkpointing=isinstance(checkpoint_callback, ModelCheckpoint),
)
model = LitAutoEncoder()
datamodule = MyDataModule()
pf.init(total_train_steps=args.num_epochs * datamodule.num_steps_per_epoch)
trainer.fit(model=model,
datamodule=datamodule,
ckpt_path=ckpt_path)