In Monitoring Pipeline, Unable to Parse Numerical Data Collected from an Online Endpoint
sraza-onshape opened this issue · 1 comments
sraza-onshape commented
Operating System
MacOS
Version Information
Python Version: 3.9
azure-ai-ml package version: 1.11.1
azureml-ai-monitoring: 0.1.0b4
Steps to reproduce
- Register a model in Azure ML - mine is for the task of regression, with a single feature and single target variable
- Deploy an online scoring endpoint to get inference from that model. In the scoring script, use the follow API for the collector objects:
import os
import logging
import json
import joblib
from azureml.ai.monitoring import Collector
import numpy
import pandas as pd
def init():
"""
This function is called when the container is initialized/started, typically after create/update of the deployment.
You can write the logic here to perform init operations like caching the model in memory
"""
global model, inputs_collector, outputs_collector, inputs_outputs_collector
# AZUREML_MODEL_DIR is an environment variable created during deployment.
# It is the path to the model folder (./azureml-models/$MODEL_NAME/$VERSION)
model_path = os.path.join(
os.getenv("AZUREML_MODEL_DIR"), "model.pkl" # TODO[Zain]: make this more robust to different types of model file names
)
# deserialize the model file back into a sklearn model
model = joblib.load(model_path)
logging.info("Init complete")
# instantiate collectors with appropriate names, make sure align with deployment spec
inputs_collector = Collector(name='model_inputs', on_error=lambda e: logging.info("error:{}".format(e)))
outputs_collector = Collector(name="model_outputs", on_error=lambda e: logging.info("error:{}".format(e)))
inputs_outputs_collector = Collector(
name="model_inputs_outputs",
on_error=lambda e: logging.info("error:{}".format(e))
) # note: this is used to enable Feature Attribution Drift
def run(raw_data, feature_name='intermediary_V'):
"""
This function is called for every invocation of the endpoint to perform the actual scoring/prediction.
In the example we extract the data from the json input and call the scikit-learn model's predict()
method and return the result back
Example of raw_data: {
"data": {
"intermediary_V": [5.9, 6.7]
}
}
"""
logging.info("Request received")
pdf_data = json.loads(raw_data)["data"]
input_df = pd.DataFrame(pdf_data)
# collect inputs data, store correlation_context
context = inputs_collector.collect(input_df)
# perform scoring with pandas Dataframe, return value is also pandas Dataframe
scoring_data = input_df[feature_name].values.reshape(-1, 1)
output_df = pd.DataFrame(data={
"target_W_pred": model.predict(scoring_data)
})
# collect outputs data, pass in correlation_context so inputs and outputs data can be correlated later
outputs_collector.collect(output_df, context)
# create a dataframe with inputs/outputs joined - this creates a URI folder (not mltable)
# input_output_df = input_df.merge(output_df, context)
input_output_df = input_df.join(output_df)
# collect both the inputs and output
inputs_outputs_collector.collect(input_output_df, context)
logging.info("Request processed")
return output_df.to_dict()
- Send a test prediction to the endpoint - use code like this to consume it:
import json
import os
import ssl
from typing import Dict
import urllib
import urllib.request as url_request
import numpy as np
AZUREML_ONLINE_ENDPOINT_API_KEY = ... # from the Azure ML UI
AZUREML_ONLINE_ENDPOINT_URL = ... # created in the previous step
def invoke_online_scoring_endpoint(
data: Dict[str, Dict[str, list]],
api_key: str = AZUREML_ONLINE_ENDPOINT_API_KEY,
url: str = AZUREML_ONLINE_ENDPOINT_URL,
) -> Dict[str, np.ndarray]:
"""
Example Input:
The example below assumes JSON formatting which may be updated
depending on the format your endpoint expects.
More information can be found here:
https://docs.microsoft.com/azure/machine-learning/how-to-deploy-advanced-entry-script:
data = {
"data": {
"intermediary_V": [5.9, 6.7]
}
}
"""
### HELPER(S)
def allowSelfSignedHttps(allowed):
# bypass the server certificate verification on client side
if allowed and not os.environ.get('PYTHONHTTPSVERIFY', '') and getattr(ssl, '_create_unverified_context', None):
ssl._create_default_https_context = ssl._create_unverified_context
### DRIVER
allowSelfSignedHttps(True) # this line is needed if you use self-signed certificate in your scoring service.
# Formulat the request
body = str.encode(json.dumps(data))
if not api_key:
raise Exception("An Azure ML primary or secret key should be provided to invoke the endpoint")
# The azureml-model-deployment header will force the request to go to a specific deployment.
# Remove this header to have the request observe the endpoint traffic rules
headers = {'Content-Type':'application/json', 'Authorization':('Bearer '+ api_key), 'azureml-model-deployment': 'blue' }
req = url_request.Request(url, body, headers)
# Make the request
try:
response = url_request.urlopen(req)
result_str = response.read().decode("utf-8")
# formulate the response into a Pandas-like dict (w/o actually creating a dataframe)
result_dict = json.loads(result_str)
result_dict_with_array = {}
for key, col_values in result_dict.items():
values_list = list(col_values.values())
result_dict_with_array[key] = np.array(values_list)
return result_dict_with_array
except urllib.error.HTTPError as error:
print("The request failed with status code: " + str(error.code))
# Print the headers - they include the requert ID and the timestamp, which are useful for debugging the failure
print(error.info())
print(error.read().decode("utf8", 'ignore'))
return {'error': 'Request to Azure ML was unsuccessful. Please see error logs above.'}
if __name__ == "__main__":
data = {
"data": {
"intermediary_V": [5.9, 6.7]
}
}
res = invoke_online_scoring_endpoint(data)
print(type(res), res)
- Define a monitoring pipeline for the endpoint created. Use the following code based on the Azure docs, adapted for an endpoint that does 1D regression:
from azure.ai.ml import Input, MLClient
from azure.ai.ml.constants import (
MonitorFeatureType,
MonitorMetricName,
MonitorDatasetContext,
)
from azure.ai.ml.entities import (
AlertNotification,
FeatureAttributionDriftSignal,
FeatureAttributionDriftMetricThreshold,
DataDriftSignal,
DataQualitySignal,
DataDriftMetricThreshold,
DataQualityMetricThreshold,
NumericalDriftMetrics,
CategoricalDriftMetrics,
DataQualityMetricsNumerical,
DataQualityMetricsCategorical,
MonitorFeatureFilter,
MonitorInputData,
MonitoringTarget,
MonitorDefinition,
MonitorSchedule,
RecurrencePattern,
RecurrenceTrigger,
ServerlessSparkCompute,
ReferenceData,
)
ml_client = MLClient(
DefaultAzureCredential(),
subscription_id=...,
resource_group=...,
workspace=...
)
spark_compute = ServerlessSparkCompute(
instance_type="standard_e4s_v3",
runtime_version="3.2"
)
monitoring_target = MonitoringTarget(
ml_task="regression",
# in general - this follows a pattern of azurem
endpoint_deployment_id=...,
)
# training data to be used as baseline dataset
reference_data_training = ReferenceData(
input_data=Input(
type="mltable",
path=...,
),
data_context=MonitorDatasetContext.TRAINING,
target_column_name="target_W",
)
# create an advanced data drift signal
features = MonitorFeatureFilter(top_n_feature_importance=1)
metric_thresholds = DataDriftMetricThreshold(
numerical=NumericalDriftMetrics(
jensen_shannon_distance=0.01
),
categorical=CategoricalDriftMetrics(
pearsons_chi_squared_test=0.02
)
)
advanced_data_drift = DataDriftSignal(
reference_data=reference_data_training,
features=features,
metric_thresholds=metric_thresholds,
feature_type_override=feature_dtype_spec,
)
# put all monitoring signals in a dictionary
monitoring_signals = {
'data_drift_advanced':advanced_data_drift,
}
# create alert notification object
alert_notification = AlertNotification(
emails=['abc@def.com']
)
# Finally monitor definition
monitor_definition = MonitorDefinition(
compute=spark_compute,
monitoring_target=monitoring_target,
monitoring_signals=monitoring_signals,
alert_notification=alert_notification
)
recurrence_trigger = RecurrenceTrigger(
frequency="day",
interval=1,
schedule=RecurrencePattern(hours=3, minutes=15)
)
model_monitor_v5 = MonitorSchedule(
name=...,
trigger=recurrence_trigger,
create_monitor=monitor_definition
)
poller = ml_client.schedules.begin_create_or_update(model_monitor_v5)
created_monitor = poller.result()
- Once monitor is successfully deployed, manually trigger it to see if it can parse the collected production data
Expected behavior
The pipeline (for the monitor) should run OK, with all the child nodes executing successfully.
Actual behavior
There is an error in one of the preprocessor nodes that fails the pipeline.
The error message is this:
UserError: field intermediary_V: Can not merge type <class 'pyspark.sql.types.DoubleType'> and <class 'pyspark.sql.types.StringType'>
Addition information
Just for reference - the error itself arises in this code executed by the preprocessor component, which is automatically added by the system (in this case). It's in this line utilizes pyspark
: data_as_df = spark.createDataFrame(infer_pdf)
Full script is below:
# /model_data_collector_preprocessor/run.py
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Entry script for Model Data Collector Data Window Component."""
import argparse
import pandas as pd
from pyspark.sql import DataFrame
from dateutil import parser
import mltable
from mltable import MLTable
import tempfile
from fsspec import AbstractFileSystem
from azureml.fsspec import AzureMachineLearningFileSystem
from datetime import datetime
from pyspark.sql.functions import lit, col
from shared_utilities.event_utils import post_warning_event
from shared_utilities.io_utils import (
init_spark,
try_read_mltable_in_spark,
save_spark_df_as_mltable,
)
from shared_utilities.constants import (
MDC_CHAT_HISTORY_COLUMN,
MDC_CORRELATION_ID_COLUMN,
MDC_DATA_COLUMN,
MDC_DATAREF_COLUMN,
SCHEMA_INFER_ROW_COUNT
)
from typing import Tuple
import os
from urllib.parse import urlparse
from azureml.core.run import Run
from azure.ai.ml import MLClient
def _convert_to_azureml_long_form(url_str: str, datastore: str, sub_id=None, rg_name=None, ws_name=None) -> str:
"""Convert path to AzureML path."""
url = urlparse(url_str)
if url.scheme in ["https", "http"]:
idx = url.path.find('/', 1)
path = url.path[idx+1:]
elif url.scheme in ["wasbs", "wasb", "abfss", "abfs"]:
path = url.path[1:]
elif url.scheme == "azureml" and url.hostname == "datastores": # azrueml short form
idx = url.path.find('/paths/')
path = url.path[idx+7:]
else:
return url_str # azureml long form, azureml asset, file or other scheme, return original path directly
sub_id = sub_id or os.environ.get("AZUREML_ARM_SUBSCRIPTION", None)
rg_name = rg_name or os.environ.get("AZUREML_ARM_RESOURCEGROUP", None)
ws_name = ws_name or os.environ.get("AZUREML_ARM_WORKSPACE_NAME", None)
return f"azureml://subscriptions/{sub_id}/resourcegroups/{rg_name}/workspaces/{ws_name}/datastores" \
f"/{datastore}/paths/{path}"
def _get_datastore_from_input_path(input_path: str, ml_client=None) -> str:
"""Get datastore name from input path."""
url = urlparse(input_path)
if url.scheme == "azureml":
if ':' in url.path: # azureml asset path
return _get_datastore_from_asset_path(input_path, ml_client)
else: # azureml long or short form
return _get_datastore_from_azureml_path(input_path)
elif url.scheme == "file" or os.path.isdir(input_path):
return None # local path for testing, datastore is not needed
else:
raise ValueError("Only azureml path(long, short or asset) is supported as input path of the MDC preprocessor.")
def _get_workspace_info() -> Tuple[str, str, str]:
"""Get workspace info from Run context and environment variables."""
ws = Run.get_context().experiment.workspace
sub_id = ws.subscription_id or os.environ.get("AZUREML_ARM_SUBSCRIPTION")
rg_name = ws.resource_group or os.environ.get("AZUREML_ARM_RESOURCEGROUP")
ws_name = ws.name or os.environ.get("AZUREML_ARM_WORKSPACE_NAME")
return sub_id, rg_name, ws_name
def _get_datastore_from_azureml_path(azureml_path: str) -> str:
start_idx = azureml_path.find('/datastores/')
end_idx = azureml_path.find('/paths/')
return azureml_path[start_idx+12:end_idx]
def _get_datastore_from_asset_path(asset_path: str, ml_client=None) -> str:
if not ml_client:
sub_id, rg_name, ws_name = _get_workspace_info()
ml_client = MLClient(subscription_id=sub_id, resource_group=rg_name, workspace_name=ws_name)
# todo: validation
asset_sections = asset_path.split(':')
asset_name = asset_sections[1]
asset_version = asset_sections[2]
data_asset = ml_client.data.get(asset_name, asset_version)
return data_asset.datastore or _get_datastore_from_input_path(data_asset.path)
def _raw_mdc_uri_folder_to_mltable(
start_datetime: datetime, end_datetime: datetime, input_data: str
):
"""Create mltable definition - extract, filter and convert columns."""
# Extract partition format
table = mltable.from_json_lines_files(
paths=[{"pattern": f"{input_data}**/*.jsonl"}]
)
# Uppercase HH for hour info
partitionFormat = "{PartitionDate:yyyy/MM/dd/HH}/{fileName}.jsonl"
table = table.extract_columns_from_partition_format(partitionFormat)
# Filter on partitionFormat based on user data window
filterStr = f"PartitionDate >= datetime({start_datetime.year}, {start_datetime.month}, {start_datetime.day}, " \
f"{start_datetime.hour}) and PartitionDate <= datetime({end_datetime.year}, {end_datetime.month}, " \
f"{end_datetime.day}, {end_datetime.hour})"
table = table.filter(filterStr)
# Data column is a list of objects, convert it into string because spark.read_json cannot read object
table = table.convert_column_types({"data": mltable.DataType.to_string()})
return table
def _convert_mltable_to_spark_df(table: MLTable, preprocessed_input_data: str, fs: AbstractFileSystem) -> DataFrame:
"""Convert MLTable to Spark DataFrame."""
with tempfile.TemporaryDirectory() as mltable_temp_path:
# Save MLTable to temp location
table.save(mltable_temp_path)
# Save preprocessed_data MLTable to temp location
des_path = preprocessed_input_data + "temp"
fs = fs or AzureMachineLearningFileSystem(des_path) # for testing
print("MLTable path:", des_path)
# TODO: Evaluate if we need to overwrite
# Richard: why we need to upload the mltable folder to azureml://?
fs.upload(
lpath=mltable_temp_path,
rpath=des_path,
**{"overwrite": "MERGE_WITH_OVERWRITE"},
recursive=True,
)
# Read mltable from preprocessed_data
return try_read_mltable_in_spark(des_path, "preprocessed_data")
def _get_data_columns(df: DataFrame) -> list:
columns = []
if MDC_DATA_COLUMN in df.columns:
columns.append(MDC_DATA_COLUMN)
if MDC_DATAREF_COLUMN in df.columns:
columns.append(MDC_DATAREF_COLUMN)
return columns
def _extract_data_and_correlation_id(df: DataFrame, extract_correlation_id: bool, datastore: str = None) -> DataFrame:
"""
Extract data and correlation id from the MDC logs.
If data column exists, return the json contents in it,
otherwise, return the dataref content which is a url to the json file.
"""
def convert_object_to_str(dataframe: pd.DataFrame) -> pd.DataFrame:
columns = dataframe.columns
for column in columns:
if dataframe[column].dtype == "object":
dataframe[column] = dataframe[column].astype(str)
return dataframe
def read_data(row) -> str:
data = getattr(row, MDC_DATA_COLUMN, None)
if data:
return data
dataref = getattr(row, MDC_DATAREF_COLUMN, None)
# convert https path to azureml long form path which can be recognized by azureml filesystem
# and read by pd.read_json()
data_url = _convert_to_azureml_long_form(dataref, datastore)
return data_url
# TODO: Move this to tracking stream if both data and dataref are NULL
def row_to_pdf(row) -> pd.DataFrame:
df = pd.read_json(read_data(row))
df = convert_object_to_str(df)
return df
data_columns = _get_data_columns(df)
data_rows = df.select(data_columns).rdd.take(SCHEMA_INFER_ROW_COUNT) # TODO: make it an argument user can define
spark = init_spark()
infer_pdf = pd.concat([row_to_pdf(row) for row in data_rows], ignore_index=True)
data_as_df = spark.createDataFrame(infer_pdf)
# data_as_df.show()
# data_as_df.printSchema()
# The temporary workaround to remove the chat_history column if it exists.
# We are removing the column because the pyspark DF is unable to parse it.
# This version of the MDC is applied only to GSQ.
if MDC_CHAT_HISTORY_COLUMN in data_as_df.columns:
data_as_df = data_as_df.drop(col(MDC_CHAT_HISTORY_COLUMN))
def extract_data_and_correlation_id(entry, correlationid):
result = pd.read_json(entry)
result = convert_object_to_str(result)
result[MDC_CORRELATION_ID_COLUMN] = ""
if MDC_CHAT_HISTORY_COLUMN in result.columns:
result.drop(columns=[MDC_CHAT_HISTORY_COLUMN], inplace=True)
for index, row in result.iterrows():
result.loc[index, MDC_CORRELATION_ID_COLUMN] = (
correlationid + "_" + str(index)
)
return result
def transform_df_function_with_correlation_id(iterator):
for df in iterator:
yield pd.concat(
extract_data_and_correlation_id(
read_data(row),
getattr(row, MDC_CORRELATION_ID_COLUMN),
)
for row in df.itertuples()
)
def transform_df_function_without_correlation_id(iterator):
for df in iterator:
pdf = pd.concat(
convert_object_to_str(pd.read_json(read_data(row))) for row in df.itertuples()
)
if MDC_CHAT_HISTORY_COLUMN in pdf.columns:
pdf.drop(columns=[MDC_CHAT_HISTORY_COLUMN], inplace=True)
yield pdf
if extract_correlation_id:
# Add empty column to get the correlationId in the schema
data_as_df = data_as_df.withColumn(MDC_CORRELATION_ID_COLUMN, lit(""))
data_columns.append(MDC_CORRELATION_ID_COLUMN)
transformed_df = df.select(data_columns).mapInPandas(
transform_df_function_with_correlation_id, schema=data_as_df.schema
)
else:
# TODO: if neither data and dataref move to tracking stream (or throw ModelMonitoringException?)
transformed_df = df.select(data_columns).mapInPandas(
transform_df_function_without_correlation_id, schema=data_as_df.schema
)
return transformed_df
def _raw_mdc_uri_folder_to_preprocessed_spark_df(
data_window_start: datetime, data_window_end: datetime,
input_data: str, preprocessed_input_data: str, extract_correlation_id: bool,
fs: AbstractFileSystem = None) -> DataFrame:
"""Read raw MDC data, preprocess, and return in a Spark DataFrame."""
# Parse the dates
start_datetime = parser.parse(data_window_start)
end_datetime = parser.parse(data_window_end)
table = _raw_mdc_uri_folder_to_mltable(start_datetime, end_datetime, input_data)
# print("MLTable:", table)
df = _convert_mltable_to_spark_df(table, preprocessed_input_data, fs)
# print("df after converting mltable to spark df:")
# df.show()
# df.printSchema()
if not df:
print("Skipping the Model Data Collector preprocessor.")
post_warning_event(
"Although data was found, the window for this current run contains no data. "
+ "Please visit aka.ms/mlmonitoringhelp for more information."
)
return
datastore = _get_datastore_from_input_path(input_data)
# print("Datastore:", datastore)
transformed_df = _extract_data_and_correlation_id(df, extract_correlation_id, datastore)
# transformed_df.show()
# transformed_df.printSchema()
return transformed_df
def mdc_preprocessor(
data_window_start: str,
data_window_end: str,
input_data: str,
preprocessed_input_data: str,
extract_correlation_id: bool,
fs: AbstractFileSystem = None,
):
"""Extract data based on window size provided and preprocess it into MLTable.
Args:
data_window_start: The start date of the data window.
data_window_end: The end date of the data window.
input_data: The data asset on which the date filter is applied.
preprocessed_data: The mltable path pointing to location where the outputted mltable will be written to.
extract_correlation_id: The boolean to extract correlation Id from the MDC logs.
"""
transformed_df = _raw_mdc_uri_folder_to_preprocessed_spark_df(data_window_start, data_window_end, input_data,
preprocessed_input_data, extract_correlation_id, fs)
save_spark_df_as_mltable(transformed_df, preprocessed_input_data)
def run():
"""Compute data window and preprocess data from MDC."""
# Parse arguments
parser = argparse.ArgumentParser()
parser.add_argument("--data_window_start", type=str)
parser.add_argument("--data_window_end", type=str)
parser.add_argument("--input_data", type=str)
parser.add_argument("--extract_correlation_id", type=str)
parser.add_argument("--preprocessed_input_data", type=str)
args = parser.parse_args()
mdc_preprocessor(
args.data_window_start,
args.data_window_end,
args.input_data,
args.preprocessed_input_data,
eval(args.extract_correlation_id.capitalize()),
)
if __name__ == "__main__":
run()
sraza-onshape commented
Closing for now, as I've relocated the issue in the repo for the SDK: Azure/azure-sdk-for-python#33452