/delta-task

Python Delta task library

Primary LanguagePythonApache License 2.0Apache-2.0

delta

Delta is a lib for writing privacy computing task executed on delta framework, uses Pytorch as it backend. It helps users focus on implementing computing models and logic, no need to worry about the data privacy (Delta framework do it for you).

install

First install pytorch

using pip:

pip install -U delta-task

install from source code:

git clone this repo

pip install .

example

example.py

usage

Now lib delta supports horizontal federated learning task.

how to write a Task

In delta, you can define a horizontal federated learning task by inherit the base class delta.task.HorizontalTask

HorizontalTask is a abstract class, inspite of the constructor __init__, you have to implement the following four methods:

  • train(self, dataloader: Iterable)
  • get_params(self) -> List[torch.Tensor]
  • validate(self, dataloader: Iterable) -> Dict[str, float]
  • preprocess(self, x, y=None)

init

def __init__(self, name: str, dataset: str, max_rounds: int, validate_interval: int = 1, validate_frac: float = 0.1):

When inherit HorizontalTask, you must call the constructor of base class by super().__init__

params:

param type description
name string the task name
dataset string the dataset which the task uses
max_rounds int max execution rounds of task
validate_interval int validation every {validate_interval} rounds
validate_frac float validation dataset percentage, should be in [0, 1)

train

def train(self, dataloader: Iterable)

You can implement model training logic in this function, a traditional implementation is like this:

def train(self, dataloader: Iterable):
    x, y = batch
    # forwarding
    y_pred = self.model(x)
    # loss calculation
    loss = self.loss_func(y_pred, y)
    # backwarding
    self.optimizer.zero_grad()
    loss.backward()
    self.optimizer.step()

params:

param type description
dataloader Iterable the dataloader of training dataset

get_params

def get_params(self) -> List[torch.Tensor]

Get trainable model parameters. Delta framework will secure aggregate theses parameters from different nodes to get the global model parameters.

A traditional implementation is like this:

def get_params(self) -> List[torch.Tensor]:
    return list(self.model.parameters())

returns:

type description
List[torch.Tensor] parameters list

validate

def validate(self, dataloader: Iterable) -> Dict[str, float]

You can implement the validation logic in this function. The validation result is a Dict[str, float], whose key is the validation metric name, value is the vaidation metric value. Delta framework will secure aggregate nodes' validation result to get the global validation result.

A traditional implementation is like this:

def validate(self, dataloader: Iterable) -> Dict[str, float]:
    total_loss = 0
    count = 0
    ys = []
    y_s = []
    for batch in dataloader:
        x, y = batch
        y_pred = self.model(x)
        loss = self.loss_func(y_pred, y)
        total_loss += loss.item()
        count += 1

        y_ = torch.argmax(y_pred, dim=1)
        y_s.extend(y_.tolist())
        ys.extend(y.tolist())
    avg_loss = total_loss / count
    tp = len([1 for i in range(len(ys)) if ys[i] == y_s[i]])
    precision = tp / len(ys)

    return {"loss": avg_loss, "precision": precision}

In the above code, validate function compute the average loss and precision on the input validation dataset.

params:

param type description
dataloader Iterable the dataloader of validation dataset

preprocess

def preprocess(self, x, y=None)

Preprocess function preprocess each data item before training and validation.

The input parameter x is the data item, y is the corresponding data label if the dataset has a label.

The type of x depends on the dataset. It can be a np.ndarray, a torch.Tensor, a pd.DataFrame or a Image.Image. In the preprocess function, you should convert the x to a torch.Tensor.

The type of y is string (if has y), means the label name, and you should convert it to a torch.Tensor compatible with your loss function.

The preprocess function should return two torch.Tensor if x and y all exist, or only return one torch.Tensor.

(optional) dataloader_config

def dataloader_config(self) -> Union[Dict[str, Any], Tuple[Dict[str, Any], Dict[str, Any]]]:

You can specify dataloader arguments in this function.

This function can return one or two dictionary as the kwargs passed to dataloader constructor. If it returns only one dictionary, then the result will be passed to both training dataloader and validation dataloader. If it returns two dictionary, then the first one will be passed to training dataloader, the second one will be passed to validation dataloader.

The default implementation is as the follows:

def dataloader_config(
    self,
) -> Union[Dict[str, Any], Tuple[Dict[str, Any], Dict[str, Any]]]:
    return {"shuffle": True, "batch_size": 64, "drop_last": True}

(optional) algorithm

def algorithm(self) -> HorizontalAlgorithm:

You can specify secure aggregation algorithm in this function.

The algorithm you can use now is delta.algorithm.horizontal.FedAvg and delta.algorithm.horizontal.FaultTolerantFedAvg, which are subclasses of delta.algorithm.horizontal.HorizontalAlgorithm.

The difference between FedAvg and FaultTolerantFedAvg is that FaultTolerantFedAvg can tolerant some nodes being offline during secure aggregation, but FedAvg can't.

Parameters of FedAvg and FaultTolerantFedAvg are the same.

params:

param type description
merge_interval_iter int aggregate every {merge_interval_iter} iter
merge_interval_epoch int aggregate every {merge_interval_epoch} round. This param is mutually exclusive with merge_interval_iter, only one of them can be greater than 0
min_clients int minimal node count in a round
max_clients int maximal node count in a round
wait_timeout Optional[float] timeout in seconds for task computation in a round, default value is 60 second
connection_timeout Optional[float] timeout in seconds for each stage of the secure aggregation, default value is 60 second
precision int precision of the result. result will be rounded by this precision
curve CURVE_TYPE elliptic curve used for asymmetric encryption in the secure aggregation, default value is "secp256k1"

dataset format

Delta framework now supports four kinds of dataset format:

  1. Numpy array, with file extension .npy, .npz
  2. Torch tensor, with file extension .pt
  3. Table file, with file extension .csv, .tsv, .xls, .xlsx. Table file will be loaded by pandas
  4. Image file. Image file will be loaded by PIL

A dataset can be a single file or a directory. When creating a task, you can refer the dataset by its name.

When dataset is a single file, the first dimension of the data is the data sample, and each data sample has no label.

When dataset is a directory, there are two acceptable directory structure. The first one is that data files are placed in the directory. Each file represents a data sample and each data sample has no label. The second one is that directory contains several sub directories, and each sub directory represents a class of data. The sub directory name is the class name. Data files are placed in one sub directory according to its class.