/ml-observability-workshop

Creating an end-to-end observability platform

Primary LanguageJupyter Notebook

ML Observability workshop

Creating an end-to-end observability platform

Plan

  • Introduction to MLOps and how it helps with the entire ML project lifecycle
  • Discuss the ride duration prediction use case, discuss the overall architecture, show the notebook with the solution, show the script with training it (do not implement it live - to save time, and refer to Lightweight MLOps Zoomcamp for details)
  • Implement a simple Flask application for serving the model
  • Log the predictions to a Kinesis stream and save them in a data lake (s3)
  • Set up a batch job for pulling the data from S3 and analyzing it
  • Analyze the data with Evidently, generate a simple visual report (Data Drift / Data Quality)
  • Create a Prefect pipeline
  • Automate data checks as part of the prediction pipeline. Design a custom test suite for data quality, data drift and prediction drift
  • Model quality checks. Generate the report on model performance once you receive the ground truth
  • Setting up slack/email alerts
  • Wrapping up: summarizing key takeaways and reviewing recommended practices

Prerequisites

Preparation

  • Install Python 3.9 (e.g. with Anaconda or Miniconda)
  • Install pipenv (pip install -U pipenv)
  • Install the dependencies:
(cd train && pipenv install)
(cd serve && pipenv install)
(cd monitor && pipenv install)

MLOps

Introduction to MLOps and how it helps with the entire ML project lifecycle

Use case

Discuss the ride duration prediction use case, discuss the overall architecture, show the notebook with the solution, show the script with training it (do not implement it live - to save time, and refer to Lightweight MLOps Zoomcamp for details)

cd train

Start jupyter

pipenv run jupyter notebook

If you have Anaconda, you can skip installing the packages and run the notebook:

jupyter notebook

Next, execute the code to get the model

Serving the model

Implement a simple Flask application for serving the model

cd serve

Run the serve_starter.py file:

pipenv run python serve_starter.py

Send a request:

REQUEST='{
    "ride_id": "XYZ10",
    "ride": {
        "PULocationID": 100,
        "DOLocationID": 102,
        "trip_distance": 30
    }
}'
URL="http://localhost:9696/predict"

curl -X POST \
    -d "${REQUEST}" \
    -H "Content-Type: application/json" \
    ${URL}

Response:

{
  "prediction": {
    "duration": 20.77956787473484
  }
}

Logging the predictions

Log the predictions to a Kinesis stream and save them in a data lake (s3)

Now let's modify our serve_starter.py to add logging. We will log the prediction to a kinesis stream, but you can use any other way of logging.

Create a kinesis stream, e.g. duration_prediction_serve_logger.

Add logging:

import json

PREDICTIONS_STREAM_NAME = 'duration_prediction_serve_logger'
kinesis_client = boto3.client('kinesis')

# in the serve function

prediction_event = {
    'ride_id': ride_id,
    'ride': ride,
    'features': features,
    'prediction': result 
}

print(f'logging {prediction_event} to {PREDICTIONS_STREAM_NAME}...')

kinesis_client.put_record(
    StreamName=PREDICTIONS_STREAM_NAME,
    Data=json.dumps(prediction_event) + "\n",
    PartitionKey=str(ride_id)
)

(Note + "\n" - it's important)

Send a request:

REQUEST='{
    "ride_id": "XYZ10",
    "ride": {
        "PULocationID": 100,
        "DOLocationID": 102,
        "trip_distance": 30
    }
}'
URL="http://localhost:9696/predict"

curl -X POST \
    -d "${REQUEST}" \
    -H "Content-Type: application/json" \
    ${URL}

We can check the logs

KINESIS_STREAM_OUTPUT='duration_prediction_serve_logger'
SHARD='shardId-000000000000'

SHARD_ITERATOR=$(aws kinesis \
    get-shard-iterator \
        --shard-id ${SHARD} \
        --shard-iterator-type TRIM_HORIZON \
        --stream-name ${KINESIS_STREAM_OUTPUT} \
        --query 'ShardIterator' \
)

RESULT=$(aws kinesis get-records --shard-iterator $SHARD_ITERATOR)

echo ${RESULT} | jq -r '.Records[0].Data' | base64 --decode

Getting data from S3

Set up a batch job for pulling the data from S3 and analyzing it

  • Create an s3 bucket "duration-prediction-serve-logs"
  • Enable firehose
  • No data transformation (explore yourself)
  • No data converstion (explore yourself)
  • Destination: "s3://duration-prediction-serve-logs"
  • Look at the files in the bucket

We can't wait for long, so we simulated the traffic and put the data in the monitor/data folder. To generate it, run the prepare-files.ipynb notebook.

Data drift report

  • Analyze the data with Evidently, generate a simple visual report (Data Drift / Data Quality).

Our virtual enviorment already has evidently installed. But if you're using your own environment, run

pip install evidently

Let's use it to generate a simple visual report

First, load the reference data (data we used for training)

df_reference = pd.read_parquet('data/2022/01/2022-01-full.parquet')

Evidently is quite slow when analyzing large datasets, so we should take a sample:

df_reference = pd.read_parquet('data/2022/01/2022-01-full.parquet')

Next, we load the "production" data. First, we load the trips:

year = 2023
month = 1
day = 2

trips_file = f'data/{year:04d}/{month:02d}/{year:04d}-{month:02d}-{day:02d}.parquet'
df_trips = pd.read_parquet(trips_file)

Second, load the logs:

logs_file = f'data/{year:04d}/{month:02d}/{year:04d}-{month:02d}-{day:02d}-predictions.jsonl'

df_logs = pd.read_json(logs_file, lines=True)

df_predictions = pd.DataFrame()
df_predictions['ride_id'] = df_logs['ride_id']
df_predictions['prediction'] = df_logs['prediction'].apply(lambda p: p['prediction']['duration'])

And merge them:

df = df_trips.merge(df_predictions, on='ride_id')

Now let's see if there's any drift. Import evidently:

from evidently.report import Report
from evidently.metric_preset import DataDriftPreset

Build a simple drift report:

report = Report(metrics=[
    DataDriftPreset(columns=['PULocationID', 'DOLocationID', 'trip_distance']), 
])

report.run(reference_data=df_reference_sample, current_data=df_trips)

report.show(mode='inline')

In this preset report, it uses Jensen-Shannon distance to measure the descrepancies between reference and production. While it says that drift is detected, we should be careful about it and check other months.

We can tune it:

report = Report(metrics=[
    DataDriftPreset(
        columns=['PULocationID', 'DOLocationID', 'trip_distance'],
        cat_stattest='psi',
        cat_stattest_threshold=0.2
        num_stattest='ks',
        num_stattest_threshold=0.2,
    ), 
])

report.run(reference_data=df_reference_sample, current_data=df_trips)
report.show(mode='inline')

Save the report as HTML:

report.save_html(f'reports/report-{year:04d}-{month:02d}-{day:02d}.html')

We can also extract information from this report and use it for e.g. sending an alert:

report_metrics = report.as_dict()['metrics']
report_metrics = {d['metric']: d['result'] for d in report_metrics}
drift_report = report_metrics['DataDriftTable']

if drift_report['dataset_drift']:
    # send alert
    print('drift detected!')

We won't implement the logic for sending alerts here, but you can find a lot of examples online. Or use ChatGPT to help you.

You can generate these reports in your automatic pipelines and then send them e.g. over email.

Let's create this pipeline.

Creating a pipeline with Prefect

Now we'll use Prefect to orchestrate the report generation.

We will take the code we created and put it into a Python script. See pipeline_sample.py for details.

Run prefect server:

pipenv run prefect config set PREFECT_UI_API_URL=http://127.0.0.1:4200/api
pipenv run prefect server start

Run the pipeline:

pipenv run prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api
pipenv run python pipeline_sample.py

Data checks

  • Automate data checks as part of the prediction pipeline. Design a custom test suite for data quality, data drift and prediction drift.

Now we want to add data quality checks. We will start with simple integrity checks: data types, missing values and so on.

They are also done via reports:

from evidently.metrics import DatasetSummaryMetric, DatasetMissingValuesMetric

data_integrity_report = Report(metrics=[
    DatasetSummaryMetric(),
    DatasetMissingValuesMetric()
])

data_integrity_report.run(reference_data=df_reference_sample, current_data=df_trips)
data_integrity_report.show(mode='inline')

In addition to reports, we can add tests. You can think of these tests as unit/integration tests for software. They pass or fail, and if they fail, we get an alert - something is wrong with the data, so we need to look at it.

from evidently.test_suite import TestSuite
from evidently.test_preset import DataStabilityTestPreset

data_stability = TestSuite(tests=[
    DataStabilityTestPreset(),
])

data_stability.run(reference_data=df_reference_sample, current_data=df_trips)
data_stability.show(mode='inline')

Let's tune the test:

data_stability = TestSuite(tests=[
    TestNumberOfRows(gte=1000, lte=20000),
    TestNumberOfColumns(),
    TestColumnsType(),
    TestAllColumnsShareOfMissingValues(),
    TestNumColumnsOutOfRangeValues(),
    TestCatColumnsOutOfListValues(
        columns=['PULocationID', 'DOLocationID', 'trip_distance']
    ),
    TestNumColumnsMeanInNSigmas(),
])

data_stability.run(reference_data=df_reference_sample, current_data=df_trips)
data_stability.show(mode='inline')

We can add this to our pipeline too:

test_results = data_stability.as_dict()['tests']

failed_tests = []

for test in test_results:
    status = test['status']
    if status == 'FAIL':
        failed_tests.append(test)

if len(failed_tests) > 0:
    print('tests failed:')
    print(failed_tests)

Examples:

Model quality checks

We also have labels that come with delay - every time the ride ended, we can compare the predictions with the actual duration and make some conclusions. If our model performance drifts, we can notice it and react (e.g. by retraining the model)

First, we need to prepare the data a bit:

df_reference_sample = df_reference_sample.rename(columns={'duration': 'target'})
df = df.rename(columns={'duration': 'target'})

Now let's run the report:

regression_performance_report = Report(metrics=[
    RegressionPreset(columns=['PULocationID', 'DOLocationID', 'trip_distance']),
])

regression_performance_report.run(reference_data=df_reference_sample, current_data=df)
regression_performance_report.show(mode='inline')

Note: for classification you can use this report:

from evidently.metric_preset import ClassificationPreset

Alerting

  • Setting up slack/email alerts

Summary

  • Wrapping up: summarizing key takeaways and reviewing recommended practices

More information