Azure/azureml-examples

In Monitoring Pipeline, Unable to Parse Numerical Data Collected from an Online Endpoint

sraza-onshape opened this issue · 1 comments

Operating System

MacOS

Version Information

Python Version: 3.9
azure-ai-ml package version:  1.11.1
azureml-ai-monitoring: 0.1.0b4

Steps to reproduce

  1. Register a model in Azure ML - mine is for the task of regression, with a single feature and single target variable
  2. Deploy an online scoring endpoint to get inference from that model. In the scoring script, use the follow API for the collector objects:
import os
import logging
import json
import joblib

from azureml.ai.monitoring import Collector
import numpy
import pandas as pd


def init():
    """
    This function is called when the container is initialized/started, typically after create/update of the deployment.
    You can write the logic here to perform init operations like caching the model in memory
    """
    global model, inputs_collector, outputs_collector, inputs_outputs_collector
    # AZUREML_MODEL_DIR is an environment variable created during deployment.
    # It is the path to the model folder (./azureml-models/$MODEL_NAME/$VERSION)
    model_path = os.path.join(
        os.getenv("AZUREML_MODEL_DIR"), "model.pkl"  # TODO[Zain]: make this more robust to different types of model file names 
    )
    # deserialize the model file back into a sklearn model
    model = joblib.load(model_path)
    logging.info("Init complete")

    # instantiate collectors with appropriate names, make sure align with deployment spec
    inputs_collector = Collector(name='model_inputs', on_error=lambda e: logging.info("error:{}".format(e)))
    outputs_collector = Collector(name="model_outputs", on_error=lambda e: logging.info("error:{}".format(e)))
    inputs_outputs_collector = Collector(
        name="model_inputs_outputs",
        on_error=lambda e: logging.info("error:{}".format(e))
    )  # note: this is used to enable Feature Attribution Drift


def run(raw_data, feature_name='intermediary_V'):
    """
    This function is called for every invocation of the endpoint to perform the actual scoring/prediction.
    In the example we extract the data from the json input and call the scikit-learn model's predict()
    method and return the result back

    Example of raw_data: {
        "data": {
            "intermediary_V": [5.9, 6.7]
        }
    }
    """
    logging.info("Request received")
    pdf_data = json.loads(raw_data)["data"]
    input_df = pd.DataFrame(pdf_data)

    # collect inputs data, store correlation_context
    context = inputs_collector.collect(input_df)

    # perform scoring with pandas Dataframe, return value is also pandas Dataframe
    scoring_data = input_df[feature_name].values.reshape(-1, 1)
    output_df = pd.DataFrame(data={
        "target_W_pred": model.predict(scoring_data)
    })

    # collect outputs data, pass in correlation_context so inputs and outputs data can be correlated later
    outputs_collector.collect(output_df, context)

    # create a dataframe with inputs/outputs joined - this creates a URI folder (not mltable)
    # input_output_df = input_df.merge(output_df, context)
    input_output_df = input_df.join(output_df)

    # collect both the inputs and output
    inputs_outputs_collector.collect(input_output_df, context)

    logging.info("Request processed")
    return output_df.to_dict()
  1. Send a test prediction to the endpoint - use code like this to consume it:
import json
import os
import ssl
from typing import Dict
import urllib
import urllib.request as url_request

import numpy as np

AZUREML_ONLINE_ENDPOINT_API_KEY = ...  # from the Azure ML UI
AZUREML_ONLINE_ENDPOINT_URL = ...  # created in the previous step 


def invoke_online_scoring_endpoint(
        data: Dict[str, Dict[str, list]],
        api_key: str = AZUREML_ONLINE_ENDPOINT_API_KEY,
        url: str = AZUREML_ONLINE_ENDPOINT_URL,
    ) -> Dict[str, np.ndarray]:
    """
    Example Input:
        The example below assumes JSON formatting which may be updated
        depending on the format your endpoint expects.
        More information can be found here:
        https://docs.microsoft.com/azure/machine-learning/how-to-deploy-advanced-entry-script:
        data = {
            "data": {
                "intermediary_V": [5.9, 6.7]
            }
        }
    """
    ### HELPER(S)
    def allowSelfSignedHttps(allowed):
        # bypass the server certificate verification on client side
        if allowed and not os.environ.get('PYTHONHTTPSVERIFY', '') and getattr(ssl, '_create_unverified_context', None):
            ssl._create_default_https_context = ssl._create_unverified_context
    ### DRIVER
    allowSelfSignedHttps(True) # this line is needed if you use self-signed certificate in your scoring service.

    # Formulat the request
    body = str.encode(json.dumps(data))
    if not api_key:
        raise Exception("An Azure ML primary or secret key should be provided to invoke the endpoint")
    # The azureml-model-deployment header will force the request to go to a specific deployment.
    # Remove this header to have the request observe the endpoint traffic rules
    headers = {'Content-Type':'application/json', 'Authorization':('Bearer '+ api_key), 'azureml-model-deployment': 'blue' }
    req = url_request.Request(url, body, headers)

    # Make the request
    try:
        response = url_request.urlopen(req)
        result_str = response.read().decode("utf-8")
        # formulate the response into a Pandas-like dict (w/o actually creating a dataframe)
        result_dict = json.loads(result_str)
        result_dict_with_array = {}
        for key, col_values in result_dict.items():
            values_list = list(col_values.values())
            result_dict_with_array[key] = np.array(values_list)
        return result_dict_with_array
    except urllib.error.HTTPError as error:
        print("The request failed with status code: " + str(error.code))
        # Print the headers - they include the requert ID and the timestamp, which are useful for debugging the failure
        print(error.info())
        print(error.read().decode("utf8", 'ignore'))
        return {'error': 'Request to Azure ML was unsuccessful. Please see error logs above.'}


if __name__ == "__main__":
    data = {
            "data": {
                "intermediary_V": [5.9, 6.7]
            }
        }
    res = invoke_online_scoring_endpoint(data)
    print(type(res), res)
  1. Define a monitoring pipeline for the endpoint created. Use the following code based on the Azure docs, adapted for an endpoint that does 1D regression:
from azure.ai.ml import Input, MLClient
from azure.ai.ml.constants import (
    MonitorFeatureType,
    MonitorMetricName,
    MonitorDatasetContext,
)
from azure.ai.ml.entities import (
    AlertNotification,
    FeatureAttributionDriftSignal,
    FeatureAttributionDriftMetricThreshold,
    DataDriftSignal,
    DataQualitySignal,
    DataDriftMetricThreshold,
    DataQualityMetricThreshold,
    NumericalDriftMetrics,
    CategoricalDriftMetrics,
    DataQualityMetricsNumerical,
    DataQualityMetricsCategorical,
    MonitorFeatureFilter,
    MonitorInputData,
    MonitoringTarget,
    MonitorDefinition,
    MonitorSchedule,
    RecurrencePattern,
    RecurrenceTrigger,
    ServerlessSparkCompute,
    ReferenceData,
)

ml_client = MLClient(
    DefaultAzureCredential(), 
    subscription_id=...,
    resource_group=...,
    workspace=...
)

spark_compute = ServerlessSparkCompute(
    instance_type="standard_e4s_v3",
    runtime_version="3.2"
)

monitoring_target = MonitoringTarget(
    ml_task="regression",
    # in general - this follows a pattern of azurem
    endpoint_deployment_id=...,
)

# training data to be used as baseline dataset
reference_data_training = ReferenceData(
    input_data=Input(
        type="mltable",
        path=...,
    ),
    data_context=MonitorDatasetContext.TRAINING,
    target_column_name="target_W",
)

# create an advanced data drift signal
features = MonitorFeatureFilter(top_n_feature_importance=1)
metric_thresholds = DataDriftMetricThreshold(
    numerical=NumericalDriftMetrics(
        jensen_shannon_distance=0.01
    ),
    categorical=CategoricalDriftMetrics(
        pearsons_chi_squared_test=0.02
    )
)

advanced_data_drift = DataDriftSignal(
    reference_data=reference_data_training,
    features=features,
    metric_thresholds=metric_thresholds,
    feature_type_override=feature_dtype_spec,
)

# put all monitoring signals in a dictionary
monitoring_signals = {
    'data_drift_advanced':advanced_data_drift,
}

# create alert notification object
alert_notification = AlertNotification(
    emails=['abc@def.com']
)

# Finally monitor definition
monitor_definition = MonitorDefinition(
    compute=spark_compute,
    monitoring_target=monitoring_target,
    monitoring_signals=monitoring_signals,
    alert_notification=alert_notification
)

recurrence_trigger = RecurrenceTrigger(
    frequency="day",
    interval=1,
    schedule=RecurrencePattern(hours=3, minutes=15)
)

model_monitor_v5 = MonitorSchedule(
    name=...,
    trigger=recurrence_trigger,
    create_monitor=monitor_definition
)

poller = ml_client.schedules.begin_create_or_update(model_monitor_v5)
created_monitor = poller.result()
  1. Once monitor is successfully deployed, manually trigger it to see if it can parse the collected production data

Expected behavior

The pipeline (for the monitor) should run OK, with all the child nodes executing successfully.

Actual behavior

There is an error in one of the preprocessor nodes that fails the pipeline.

The error message is this:

UserError: field intermediary_V: Can not merge type <class 'pyspark.sql.types.DoubleType'> and <class 'pyspark.sql.types.StringType'>

Addition information

Just for reference - the error itself arises in this code executed by the preprocessor component, which is automatically added by the system (in this case). It's in this line utilizes pyspark: data_as_df = spark.createDataFrame(infer_pdf)

Full script is below:

# /model_data_collector_preprocessor/run.py 
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""Entry script for Model Data Collector Data Window Component."""

import argparse

import pandas as pd
from pyspark.sql import DataFrame
from dateutil import parser
import mltable
from mltable import MLTable
import tempfile

from fsspec import AbstractFileSystem
from azureml.fsspec import AzureMachineLearningFileSystem
from datetime import datetime
from pyspark.sql.functions import lit, col
from shared_utilities.event_utils import post_warning_event
from shared_utilities.io_utils import (
    init_spark,
    try_read_mltable_in_spark,
    save_spark_df_as_mltable,
)

from shared_utilities.constants import (
    MDC_CHAT_HISTORY_COLUMN,
    MDC_CORRELATION_ID_COLUMN,
    MDC_DATA_COLUMN,
    MDC_DATAREF_COLUMN,
    SCHEMA_INFER_ROW_COUNT
)

from typing import Tuple
import os
from urllib.parse import urlparse
from azureml.core.run import Run
from azure.ai.ml import MLClient


def _convert_to_azureml_long_form(url_str: str, datastore: str, sub_id=None, rg_name=None, ws_name=None) -> str:
    """Convert path to AzureML path."""
    url = urlparse(url_str)
    if url.scheme in ["https", "http"]:
        idx = url.path.find('/', 1)
        path = url.path[idx+1:]
    elif url.scheme in ["wasbs", "wasb", "abfss", "abfs"]:
        path = url.path[1:]
    elif url.scheme == "azureml" and url.hostname == "datastores":  # azrueml short form
        idx = url.path.find('/paths/')
        path = url.path[idx+7:]
    else:
        return url_str  # azureml long form, azureml asset, file or other scheme, return original path directly

    sub_id = sub_id or os.environ.get("AZUREML_ARM_SUBSCRIPTION", None)
    rg_name = rg_name or os.environ.get("AZUREML_ARM_RESOURCEGROUP", None)
    ws_name = ws_name or os.environ.get("AZUREML_ARM_WORKSPACE_NAME", None)

    return f"azureml://subscriptions/{sub_id}/resourcegroups/{rg_name}/workspaces/{ws_name}/datastores" \
           f"/{datastore}/paths/{path}"


def _get_datastore_from_input_path(input_path: str, ml_client=None) -> str:
    """Get datastore name from input path."""
    url = urlparse(input_path)
    if url.scheme == "azureml":
        if ':' in url.path:  # azureml asset path
            return _get_datastore_from_asset_path(input_path, ml_client)
        else:  # azureml long or short form
            return _get_datastore_from_azureml_path(input_path)
    elif url.scheme == "file" or os.path.isdir(input_path):
        return None  # local path for testing, datastore is not needed
    else:
        raise ValueError("Only azureml path(long, short or asset) is supported as input path of the MDC preprocessor.")


def _get_workspace_info() -> Tuple[str, str, str]:
    """Get workspace info from Run context and environment variables."""
    ws = Run.get_context().experiment.workspace
    sub_id = ws.subscription_id or os.environ.get("AZUREML_ARM_SUBSCRIPTION")
    rg_name = ws.resource_group or os.environ.get("AZUREML_ARM_RESOURCEGROUP")
    ws_name = ws.name or os.environ.get("AZUREML_ARM_WORKSPACE_NAME")
    return sub_id, rg_name, ws_name


def _get_datastore_from_azureml_path(azureml_path: str) -> str:
    start_idx = azureml_path.find('/datastores/')
    end_idx = azureml_path.find('/paths/')
    return azureml_path[start_idx+12:end_idx]


def _get_datastore_from_asset_path(asset_path: str, ml_client=None) -> str:
    if not ml_client:
        sub_id, rg_name, ws_name = _get_workspace_info()
        ml_client = MLClient(subscription_id=sub_id, resource_group=rg_name, workspace_name=ws_name)

    # todo: validation
    asset_sections = asset_path.split(':')
    asset_name = asset_sections[1]
    asset_version = asset_sections[2]

    data_asset = ml_client.data.get(asset_name, asset_version)
    return data_asset.datastore or _get_datastore_from_input_path(data_asset.path)


def _raw_mdc_uri_folder_to_mltable(
    start_datetime: datetime, end_datetime: datetime, input_data: str
):
    """Create mltable definition - extract, filter and convert columns."""
    # Extract partition format
    table = mltable.from_json_lines_files(
        paths=[{"pattern": f"{input_data}**/*.jsonl"}]
    )
    # Uppercase HH for hour info
    partitionFormat = "{PartitionDate:yyyy/MM/dd/HH}/{fileName}.jsonl"
    table = table.extract_columns_from_partition_format(partitionFormat)

    # Filter on partitionFormat based on user data window
    filterStr = f"PartitionDate >= datetime({start_datetime.year}, {start_datetime.month}, {start_datetime.day}, " \
                f"{start_datetime.hour}) and PartitionDate <= datetime({end_datetime.year}, {end_datetime.month}, " \
                f"{end_datetime.day}, {end_datetime.hour})"
    table = table.filter(filterStr)

    # Data column is a list of objects, convert it into string because spark.read_json cannot read object
    table = table.convert_column_types({"data": mltable.DataType.to_string()})
    return table


def _convert_mltable_to_spark_df(table: MLTable, preprocessed_input_data: str, fs: AbstractFileSystem) -> DataFrame:
    """Convert MLTable to Spark DataFrame."""
    with tempfile.TemporaryDirectory() as mltable_temp_path:
        # Save MLTable to temp location
        table.save(mltable_temp_path)

        # Save preprocessed_data MLTable to temp location
        des_path = preprocessed_input_data + "temp"
        fs = fs or AzureMachineLearningFileSystem(des_path)  # for testing
        print("MLTable path:", des_path)
        # TODO: Evaluate if we need to overwrite
        # Richard: why we need to upload the mltable folder to azureml://?
        fs.upload(
            lpath=mltable_temp_path,
            rpath=des_path,
            **{"overwrite": "MERGE_WITH_OVERWRITE"},
            recursive=True,
        )

    # Read mltable from preprocessed_data
    return try_read_mltable_in_spark(des_path, "preprocessed_data")


def _get_data_columns(df: DataFrame) -> list:
    columns = []
    if MDC_DATA_COLUMN in df.columns:
        columns.append(MDC_DATA_COLUMN)
    if MDC_DATAREF_COLUMN in df.columns:
        columns.append(MDC_DATAREF_COLUMN)

    return columns


def _extract_data_and_correlation_id(df: DataFrame, extract_correlation_id: bool, datastore: str = None) -> DataFrame:
    """
    Extract data and correlation id from the MDC logs.

    If data column exists, return the json contents in it,
    otherwise, return the dataref content which is a url to the json file.
    """

    def convert_object_to_str(dataframe: pd.DataFrame) -> pd.DataFrame:
        columns = dataframe.columns
        for column in columns:
            if dataframe[column].dtype == "object":
                dataframe[column] = dataframe[column].astype(str)

        return dataframe

    def read_data(row) -> str:
        data = getattr(row, MDC_DATA_COLUMN, None)
        if data:
            return data

        dataref = getattr(row, MDC_DATAREF_COLUMN, None)
        # convert https path to azureml long form path which can be recognized by azureml filesystem
        # and read by pd.read_json()
        data_url = _convert_to_azureml_long_form(dataref, datastore)
        return data_url
        # TODO: Move this to tracking stream if both data and dataref are NULL

    def row_to_pdf(row) -> pd.DataFrame:
        df = pd.read_json(read_data(row))
        df = convert_object_to_str(df)
        return df

    data_columns = _get_data_columns(df)
    data_rows = df.select(data_columns).rdd.take(SCHEMA_INFER_ROW_COUNT)  # TODO: make it an argument user can define

    spark = init_spark()
    infer_pdf = pd.concat([row_to_pdf(row) for row in data_rows], ignore_index=True)
    data_as_df = spark.createDataFrame(infer_pdf)
    # data_as_df.show()
    # data_as_df.printSchema()

    # The temporary workaround to remove the chat_history column if it exists.
    # We are removing the column because the pyspark DF is unable to parse it.
    # This version of the MDC is applied only to GSQ.
    if MDC_CHAT_HISTORY_COLUMN in data_as_df.columns:
        data_as_df = data_as_df.drop(col(MDC_CHAT_HISTORY_COLUMN))

    def extract_data_and_correlation_id(entry, correlationid):
        result = pd.read_json(entry)
        result = convert_object_to_str(result)
        result[MDC_CORRELATION_ID_COLUMN] = ""
        if MDC_CHAT_HISTORY_COLUMN in result.columns:
            result.drop(columns=[MDC_CHAT_HISTORY_COLUMN], inplace=True)
        for index, row in result.iterrows():
            result.loc[index, MDC_CORRELATION_ID_COLUMN] = (
                correlationid + "_" + str(index)
            )
        return result

    def transform_df_function_with_correlation_id(iterator):
        for df in iterator:
            yield pd.concat(
                extract_data_and_correlation_id(
                    read_data(row),
                    getattr(row, MDC_CORRELATION_ID_COLUMN),
                )
                for row in df.itertuples()
            )

    def transform_df_function_without_correlation_id(iterator):
        for df in iterator:
            pdf = pd.concat(
                convert_object_to_str(pd.read_json(read_data(row))) for row in df.itertuples()
            )
            if MDC_CHAT_HISTORY_COLUMN in pdf.columns:
                pdf.drop(columns=[MDC_CHAT_HISTORY_COLUMN], inplace=True)
            yield pdf

    if extract_correlation_id:
        # Add empty column to get the correlationId in the schema
        data_as_df = data_as_df.withColumn(MDC_CORRELATION_ID_COLUMN, lit(""))
        data_columns.append(MDC_CORRELATION_ID_COLUMN)
        transformed_df = df.select(data_columns).mapInPandas(
            transform_df_function_with_correlation_id, schema=data_as_df.schema
        )
    else:
        # TODO: if neither data and dataref move to tracking stream (or throw ModelMonitoringException?)
        transformed_df = df.select(data_columns).mapInPandas(
            transform_df_function_without_correlation_id, schema=data_as_df.schema
        )
    return transformed_df


def _raw_mdc_uri_folder_to_preprocessed_spark_df(
        data_window_start: datetime, data_window_end: datetime,
        input_data: str, preprocessed_input_data: str, extract_correlation_id: bool,
        fs: AbstractFileSystem = None) -> DataFrame:
    """Read raw MDC data, preprocess, and return in a Spark DataFrame."""
    # Parse the dates
    start_datetime = parser.parse(data_window_start)
    end_datetime = parser.parse(data_window_end)

    table = _raw_mdc_uri_folder_to_mltable(start_datetime, end_datetime, input_data)
    # print("MLTable:", table)

    df = _convert_mltable_to_spark_df(table, preprocessed_input_data, fs)
    # print("df after converting mltable to spark df:")
    # df.show()
    # df.printSchema()

    if not df:
        print("Skipping the Model Data Collector preprocessor.")
        post_warning_event(
            "Although data was found, the window for this current run contains no data. "
            + "Please visit aka.ms/mlmonitoringhelp for more information."
        )
        return

    datastore = _get_datastore_from_input_path(input_data)
    # print("Datastore:", datastore)
    transformed_df = _extract_data_and_correlation_id(df, extract_correlation_id, datastore)
    # transformed_df.show()
    # transformed_df.printSchema()

    return transformed_df


def mdc_preprocessor(
    data_window_start: str,
    data_window_end: str,
    input_data: str,
    preprocessed_input_data: str,
    extract_correlation_id: bool,
    fs: AbstractFileSystem = None,
):
    """Extract data based on window size provided and preprocess it into MLTable.

    Args:
        data_window_start: The start date of the data window.
        data_window_end: The end date of the data window.
        input_data: The data asset on which the date filter is applied.
        preprocessed_data: The mltable path pointing to location where the outputted mltable will be written to.
        extract_correlation_id: The boolean to extract correlation Id from the MDC logs.
    """
    transformed_df = _raw_mdc_uri_folder_to_preprocessed_spark_df(data_window_start, data_window_end, input_data,
                                                                  preprocessed_input_data, extract_correlation_id, fs)

    save_spark_df_as_mltable(transformed_df, preprocessed_input_data)


def run():
    """Compute data window and preprocess data from MDC."""
    # Parse arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--data_window_start", type=str)
    parser.add_argument("--data_window_end", type=str)
    parser.add_argument("--input_data", type=str)
    parser.add_argument("--extract_correlation_id", type=str)
    parser.add_argument("--preprocessed_input_data", type=str)
    args = parser.parse_args()

    mdc_preprocessor(
        args.data_window_start,
        args.data_window_end,
        args.input_data,
        args.preprocessed_input_data,
        eval(args.extract_correlation_id.capitalize()),
    )


if __name__ == "__main__":
    run()

Closing for now, as I've relocated the issue in the repo for the SDK: Azure/azure-sdk-for-python#33452