Understanding PipelineML and pipeline_ml
akruszewski opened this issue · 11 comments
Hi @Galileo-Galilei. As I mentioned in other issue, I'm working currently with integrating my training and inference pipelines with MLPipeline. Unfortunately I'm confused with handling inputs and outputs, I can't wrap my head around it.
Context
My training pipeline is built from three other pipelines: de_pipeline
(data engineering), fe_pipeline
(feature engineering) and md_pipeline
(training aka. modeling).
My inference pipeline is buit from the same pipelines but with predict argument which change their behavior (they're using previously saved models for imputer and prediction.
In my current implementation it looks like this:
de_pipeline_predict = pipeline(
de.create_pipeline(predict=True), # type: ignore
inputs={"remote_raw": "remote_new", "imputer": "imputer"},
namespace="new",
)
fe_pipeline_predict = pipeline(
fe.create_pipeline(predict=True), # type: ignore
namespace="new",
)
# `new_preds` output would be mapped to `new.new_preds` because of
# namespace usage, so we use map `new_preds` to `new_preds` to retain the
# name and keep catalog clean.
md_pipeline_predict = pipeline(
md.create_pipeline(predict=True), # type: ignore
inputs={"lgbm": "lgbm"},
outputs={"new_preds": "new_preds"},
namespace="new",
My pipelines also getting as input parameters, obtained from kedro configuration (by that I mean conf/base/parameters.yaml
).
When I'm trying to glue them together with:
train_pipeline = de_pipeline + fe_pipeline + md_pipeline
predict_pipeline = de_pipeline_predict + fe_pipeline_predict + md_pipeline_predict
training = pipeline_ml(
training=train_pipeline,
inference=predict_pipeline,
)
and running my training pipeline I'm getting:
kedro_mlflow.pipeline.pipeline_ml.KedroMlflowPipelineMLInputsError:
The following inputs are free for the inference pipeline:
- lgbm
- remote_new
- imputer
- params:data_engineering
- params:target.
Only one free input is allowed.
Please make sure that 'inference' pipeline inputs are 'training' pipeline outputs,
except one.
I'm understand the issue here, but I don't know how to proceed with that ("un-free" inputs which should be obtained (automatically?) using Kedro features). I would be glad for any tips.
TL; DR: make sure that training_pipeline.outputs()
and predict_pipeline.inputs()
are identical (except for one value for the data to predict in the second set, which seems to be remote_new
in your project.). Note: the namespace may have modified them and causes some interference and it might be a bug.
A bit of context: what is the pipeline_ml
function for?
Unlike all the others functions of the plugin, this function is not intended for production: it packages a whole pipeline to make it easy to serve (in api or batch) in one command line. The goal is to share a pipeline very easily and to facilitate test and reuse between data scientists.
For production purpose, you need a tool to schedule / monitor the pipelines (like airflow for instance).
What is its goal?
Under the hood, the PipelineML
class (the type of the output of pipeline_ml
) is nothing but a kedro Pipeline
(the "training" pipeline) which has access to another pipeline (the "predict" pipeline) (you can see it in the code here).
When the KedroPipelineHook
detects that the pipeline you are trying to run (with kedro run
) is not a standard Pipeline
but a PipelineML
, it tries to log it in mlflow with the KedroPipelineModel
class which is actually the class that contains all the code logic.
This class is inspired from mlflow example on how to create a custom mlflow model. Basically, it is a class that implements a predict
method which will be called automatically when serving the model, and a load_context
method which manages the inputs your predict
method needs. Inded, any logged model that respects this format can benefits from mlflow capabilites, including model service.
How does it works?
If you have read the KedroPipelineModel
code described above, you can see that the KedroPipelineModel
needs 2 elements to be logged in mlflow :
- the predict function (here, your kedro pipeline used for prediction)
- the data required for the predict function to work (here your lightgbm model, your imputer and a couple of parameters)
You can fill these arguments by hand but it can be a bit tedious and this is where the pipeline_ml
comes into play: your predict pipeline is a higher order function, i.e. a function generated by the training pipeline. All the arguments your predict pipeline needs are generated by the training pipeline, and the pipeline_ml
binds them together: it should have all the informations needed for logging. When the training pipeline is run, the associated "predict" is logged.
To do so, it applies the following process:
- the
pipeline_ml
check thattraining_pipeline.outputs()
=predict_pipeline.inputs()-{input_name}
. This means that allpredict_pipeline.inputs()
are outputs of a terminal node in thetraining_pipeline
. - when the
PipelineML
will be logged, aDataCatalog
with the corresponding items as keys will be stored. - the artifacts must be persisted locally (it is not needed to declare them as MlflowDataSet though, the plugin uploads them whatever if they are on the disk) and they will be uploaded to mlflow with the model to ensure they can be accessed.
How to fix your bug
The error comes from the fact that training_pipeline.outputs()
and predict_pipeline.inputs()
do not have the same entries.
It may either comes from the fact that you do not use the same name for entries in the model (fix that!) or that the namespace introduces a prefix that changes the entries names. In the second case, it may a bug to investigate (I confess I have not tried pipeline_ml
with namespaces which are quite recent in Kedro
). Your pipeline should look like this (I'm coding on the fly, the code is not tested):
full_pipeline= Pipeline(
[
node(
func=preprocess,
inputs=dict(data="raw_data"),
outputs="preprocessed_data",
tags=["training", "inference"]
),
node(
func=fit_imputer,
inputs=dict(data="preprocessed_data"),
outputs="imputer",
tags=["training"]
),
node(
func=transform_imputer,
inputs=dict(data="preprocessed_data", imputer="imputer"),
outputs="imputed_data",
tags=["training", "inference"]
),
node(
func=train_model,
inputs=dict(data="imputed_data"),
outputs="model",
tags=["training"]
),
node(
func=predict,
inputs=dict(data="imputed_data", model="model"),
outputs="predictions",
tags=["inference"]
),
]
)
### create the pipeline like you. Use kedro viz to see what's going on!
train_pipeline=full_pipeline.only_nodes_with_tags("training") # print the pipeline and check that train_pipeline.outputs={imputer, model}
predict_pipeline=full_pipeline.only_nodes_with_tags("inference") # print the pipeline and check that predict_pipeline.inputs={raw_data, imputer, model}
training = pipeline_ml(
training=train_pipeline,
inference=predict_pipeline,
input_name="raw_data"
)
If you still struggle with the function and needs additional help, can you please share :
- the result of
training_pipeline.outputs()
,predict_pipeline.inputs()
andtraining.input_name
(you must pass this argument) - the actual values of
training_pipeline
andpredict_pipeline
(i.e. the list of nodes). If you can show me code in a github repo it will make things easier to debug.
Additional infos
I will publish a detailed example in the docs this weekend, if you can wait until this it should make the explanation more clear I guess.
Hi @Galileo-Galilei, thanks for your great work!
My understanding of pipeline_ml
might be wrong, but I believe the requirement that training_pipeline.outputs()
and predict_pipeline.inputs()
are identical might be a bit too restrictive. Consider the following pipeline, which matches text strings to predefined labels:
def create_pipeline(**kwargs):
return Pipeline(
[
node(
name="Split Data",
func=split_data,
inputs=["text_samples", "parameters"],
outputs=["X_train", "X_test", "y_train", "y_test"],
tags=["training"],
),
node(
name="Fit MultiLabelBinarizer",
func=fit_label_binarizer,
inputs="y_train",
outputs="mlb",
tags=["training"],
),
node(
name="Transform Labels",
func=transform_labels,
inputs=["mlb", "y_train", "y_test"],
outputs=["Y_train", "Y_test"],
tags=["training"],
),
node(
name="Train Model",
func=train_model,
inputs=["X_train", "Y_train"],
outputs="classifier",
tags=["training"],
),
node(
name="Evaluate Model",
func=evaluate_model,
inputs=["classifier", "X_test", "Y_test"],
outputs=None,
tags=["evaluation"],
),
node(
name="Make Prediction",
func=make_prediction,
inputs=["classifier", "mlb", "features"],
outputs=None,
tags=["inference"],
),
]
)
Nodes:
def split_data(text_samples: pd.DataFrame, parameters: Dict) -> List:
# extract features
X = text_samples["features"].values
# extract labels
y = text_samples["labels"].values
# split dataset into train and test data
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=parameters["test_size"], random_state=parameters["random_state"]
)
return [X_train, X_test, y_train, y_test]
def fit_label_binarizer(y_train: np.ndarray) -> MultiLabelBinarizer:
# multi label binarizer to transform data labels
mlb = MultiLabelBinarizer()
# fit the mlb on the train label data
mlb.fit(y_train)
return mlb
def transform_labels(mlb: MultiLabelBinarizer, y_train: np.ndarray, y_test: np.ndarray) -> List:
# transform train label data
Y_train = mlb.transform(y_train)
# transform test label data
Y_test = mlb.transform(y_test)
return [Y_train, Y_test]
def train_model(X_train: np.ndarray, Y_train: np.ndarray) -> Pipeline:
# scikit-learn classifier pipeline
classifier = Pipeline(
[
("vectorizer", CountVectorizer()),
("tfidf", TfidfTransformer()),
("clf", OneVsRestClassifier(LinearSVC())),
]
)
# fit the classifier on the train data
classifier.fit(X_train, Y_train)
return classifier
def evaluate_model(classifier: Pipeline, X_test: np.ndarray, Y_test: np.ndarray):
# make prediction with test data
predicted = classifier.predict(X_test)
# accuracy score of the trained classifier
accu = accuracy_score(Y_test, predicted)
# log accuracy
logger = logging.getLogger(__name__)
logger.info("Model has an accuracy of %.3f", accu)
def make_prediction(classifier: Pipeline, mlb: MultiLabelBinarizer, features: np.ndarray) -> List:
# model inference on features
predicted = classifier.predict(features)
# inverse transform prediction matrix back to string labels
all_labels = mlb.inverse_transform(predicted)
# map input values to predicted label
predictions = []
for item, labels in zip(values, all_labels):
predictions.append({"value": item, "label": labels})
# return predictions as list of dicts
return predictions
The inference part does not only need access to the trained model but to the fitted MultiLabelBinarizer as well. Is this supported by pipeline_ml
?
Hello @laurids-reichardt, glad to see that you're playing around with the plugin!
Regarding your question, the access to other data (encoder, binarizer, ...) than the ml model is not only "possible" but exactly what pipeline_ml
is designed for, because it is a very common pattern in ml applications. It enables even more complex pipelines (your prediction pipeline is composed of a single node, but you can have split the train_model
scikit-learn pipeline in several kedro nodes, one for the vectorizer, one for tf-id and one for your classifier and it should still be able to handle this more complex use case).
A few remarks on pipeline_ml
:
- You are right that the condition
training_pipeline.outputs() = predict_pipeline.inputs()-{input_name}
is too restrictive: actually, we only need thatpredict_pipeline.inputs()
are included intraining_pipeline.outputs()
training_pipeline.all_outputs()
(but the pipeline may have other outputs, like metrics for instance). Nevertheless, I think it has no impact on what you are trying to do. Mlflow models
only accept one entry in their predict function, which should be apandas.DataFrame
. This is not a requirement of the plugin but of core mlflow, and I cannot change this (even if I find it too restrictive). Moreover, you cannot passs parameters at the runtime in the API, which is very restrictive too (e.g. you need to decide when you log the API if the labels are "inverse encoded" or not, you cannot let it as a parameter for the user).
The good news are that I think your code should almost work "as is". Can you check the following items:
- Are both
classifier
andmlb
persisted in the catalog.yml (aspickle.PicleDataSet
for instance)? It is necessary to keep track of the objects for further reuse. They should appear in the mlflow ui inmodel/artifacts
folder. - You will need to turn the
features
arguments of themake_prediction
to apandas.DataFrame
to ensure to be able to serve your pipeline as an API (see above) - Is your pipeline properly declared as a
pipeline_ml
withinput_name=features
in yourcreate_pipelines
function? - Your
make_predictions
does not return anything in the pipeline: it must return an object (predictions
?) which is not persisted in thecatalog.yml
and is retruened in a MemoryDataSet at the end of the pipeline.
In case these elements are not enough to solve your problem, what is the error message you get ? Can you share a sample of the data you use to make the problem reproducible?
EDIT: It is a bug. The pipeline_ml
considers only "terminal" outputs (i.e. which are not inputs of other nodes), and since your mlb is reused in your training pipeline (in the transform label), it crashes. This behaviour is very strange, I was sure that there was a test for it. I try to commit a fix ASAP. To make your pipeline work in current state, just return the binarizer as a terminal output (for example, make transform_labels
return it unmodified).
EDIT2: It should be fixed. Do all above modifications and install hotfix-pipeline-ml
version of kedro-mlflow with below command. I've tried with the example code you give above, and everything seems to be fine. I'll add some tests in coming weeks and I will merge it to develop.
pip install --upgrade git+https://github.com/Galileo-Galilei/kedro-mlflow.git@hotfix-pipeline-ml
Yeah, thanks for the quick answer! The hotfix works. For reference, the url is: git+https://github.com/Galileo-Galilei/kedro-mlflow.git@hotfix-pipeline-ml
Indeed, wrong copy pasting, sorry.
Does It "works" only means that it is properly stored in mlflow or did you set the API up and tried it out?
Here's my current implementation: https://github.com/laurids-reichardt/kedro-examples/blob/kedro-mlflow-hotfix2/text-classification/src/text_classification/pipelines/pipeline.py
kedro run --pipeline=kedro_mlflow
runs without issues and logs the mlb and classifier as artifacts inside the mlruns folder.
However I get the following error while trying to make some predictions:
❯ mlflow models predict -i ./predict_input.csv -m ./mlruns/1/887715ed7000444fae966f99376ea870/artifacts/text_classification --no-conda
2020/07/19 23:46:55 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
Traceback (most recent call last):
File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/bin/mlflow", line 8, in <module>
sys.exit(cli())
File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/mlflow/models/cli.py", line 93, in predict
json_format=json_format)
File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/mlflow/pyfunc/backend.py", line 65, in predict
json_format)
File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/mlflow/pyfunc/scoring_server/__init__.py", line 222, in _predict
pyfunc_model = load_model(model_uri)
File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/mlflow/pyfunc/__init__.py", line 466, in load_model
model_impl = importlib.import_module(conf[MAIN])._load_pyfunc(data_path)
File "/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/mlflow/pyfunc/model.py", line 209, in _load_pyfunc
python_model = cloudpickle.load(f)
ModuleNotFoundError: No module named 'text_classification'
My current guess would be that the issue stems from the fact that I use pyenv+venv to resolve my dependencies instead of conda. I'll investigate further and report back.
It sounds like your own package is not installed as a python package. What does pip show text_classification
return? In your active environment and at the root of your kedro project , can you try:
cd src
pip install -e .
You're right, pip install -e ./src
did the trick. I should read up on Python modules. 😃
Now it works without issues. Thanks for your great support!
❯ model_id=5372a2d2198b443382922a67c3dc3a40; mlflow models predict -i ./predict_input.csv -m ./mlruns/1/${model_id}/artifacts/text_classification --no-conda -t 'csv'
2020/07/20 11:49:40 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
/Users/laurids/Workspace/kedro-example/.direnv/python-3.7.7/lib/python3.7/site-packages/fsspec/implementations/local.py:33: FutureWarning: The default value of auto_mkdir=True has been deprecated and will be changed to auto_mkdir=False by default in a future release.
FutureWarning,
{"predictions": [{"value": "The weather in london is rainy as always.", "label": ["new york"]}, {"value": "England just lost the football game.", "label": ["london"]}, {"value": "New York is called the Big Apple.", "label": ["new york"]}]}%
I converted the scikit-learn classifier pipeline to a kedro pipeline as well: https://github.com/laurids-reichardt/kedro-examples/blob/master/text-classification/docs/kedro-pipeline.svg
I have just added unit tests, updated the changelog and merged this fix to develop. It will be released to pypi soon. I let the issue opened because apart from this bugfix, it is the best documentation of the pipeline_ml
function so far.
Short digression on module
Regarding the module, I always recommend to install your kedro package as a module. In you don't, you can perform relative import between your scripts, but they depend on your working directory. This may lead to annoying issues because:
- the working directory may be set by your IDE (typically, VSCode set it to the location of the first script you launch the interactive window from). It is very error prone because it changes across your different VSCode sessions (if you launch a notebook at the root of the kedro project, or a script inside the
src/pipelines/
folder it will define you working directory for the whole session...). - It even depends on the launcher you use (the Vscode Interactive window, the python of your active environment in the console and Jupyter may have different behaviours)
- Even
Kedro
itself (in the first versions, I haven't check if they changed this) was messing up with your working directory when calling some commands and it have lead to bug in the past (when a command failed and did not restore the path)
For all these reasons, I found it much more stable to install your project as a python package with pip.
The -e
flag means that you install the package in editable mode, which means that it is not installed in the directory of your virtual env (C:\Users\YOU\Anaconda3\envs\YOUR_ENV) but it points towards your local project (try pip freeze
and look for your package to see what I mean). This means that any changes in your code is immediately taken into account without reinstalling the package.
A better way to specify environment in kedro-mlflow
Note that with all this in mind, you can specifiy the conda_env
arg of MlflowPipelineHook
to ensure that your model will be reproducible for anyone which can pip install
your module (from github for instance)
from your_package import __version__as pkg_version # <-- add this
from kedro_mlflow.framework.hooks import MlflowNodeHook, MlflowPipelineHook
from pk.pipeline import create_pipelines
class ProjectContext(KedroContext):
"""Users can override the remaining methods from the parent class here,
or create new ones (e.g. as required by plugins)
"""
project_name = "YOUR PROJECT NAME"
project_version = "0.16.X"
hooks = (
MlflowNodeHook(flatten_dict_params=False),
MlflowPipelineHook(model_name="YOUR_PYTHON_PACKAGE",
conda_env={ "python": YOUR_PYTHON_VERSION,
"dependencies": {f"your_package=={pkg_version}"}) # <-- and this
)
This issue is closed since :
- Detailed documentation is now available on readthedocs
- An example with code is available in the kedro-mlflow-tutorial repo
Feel free to reopen if needed.