A minimalist data pipeline library built for rapid machine learning development using AWS
- Cache intermediate data transformations, allowing for rapid prototyping.
- Compatible with most machine learning libraries out of the box (TensorFlow / Keras / Theano / Caffe / ... )
- Automatically create and manage an EC2 cluster, including dependencies and server setup
{
"raw_images": {
"type": "file_folder",
"filepath": "./local_path_to_cat_images/",
},
"preprocess_parameters": {
"whiten": true,
"PCA": false
},
"preprocessed_images": {
"inputs": ["raw_images", "preprocess_parameters"],
"outputs": "file_folder",
"run": "my_image_preprocess_function"
},
"model_parameters": {
"number_hidden_neurons": 100,
"epochs": 200
},
"test_parameters": {
"type": "grid_search_parameters",
"learning_rate": [0.1, 0.01, 0.001, 0.2]
},
"trained_model": {
"inputs": ["preprocessed_images", "model_parameters", "test_parameters"],
"outputs": "file_folder",
"run": "my_train_model_function"
}
}
# Function is passed keyword arguments based on the names of its pipeline item inputs
def my_image_preprocess_function(raw_images, preprocess_parameters):
if preprocess_parameters["PCA"] is True:
pipetree.log("PCA enabled")
for image in raw_images:
yield some_preprocess_function(image)
def my_train_model_function(preprocessed_images, model_parameters, test_parameters):
hidden_neurons = int(model_parameters["number_hidden_neurons"])
learning_rate = float(test_parameters["learning_rate"])
epochs = int(model_parameters["epochs"])
for i in xrange(epochs):
(error, trained_model) = train_model_one_epoch()
# Produce trained model objects to be stored as pipeline artifacts
yield {
"data": trained_model.serialize(),
"metadata": { "error": float(error) }
}
- Use builtin spark datatypes
- Smart handling of file_folders (stream to s3 as they get produced, from s3 as they are used)
- Allow mapping over files for parallel preprocessing