In this tutorial, we show how to create an Azure ML Pipeline that will be started from a change-based trigger. As an example, we demonstrate a scenario in which new audio files (.mp3) are added to blob storage, triggering an ML pipeline for processing these files and output the result to a SQL Database. The full notebook can be found here
- Install Azure ML SDK
pip install azureml-sdk
from azureml.core import Workspace
ws = Workspace.from_config()
print('Workspace name: ' + ws.name,
'Azure region: ' + ws.location,
'Subscription id: ' + ws.subscription_id,
'Resource group: ' + ws.resource_group, sep = '\n')
In this example we will use an existing cluster, however you can create a new one using the SDK.
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException
cluster_name = "<your-cluster-name>"
try:
compute_target = ComputeTarget(workspace=ws, name=cluster_name)
print('Found existing compute target')
except ComputeTargetException:
print('Compute target not found')
We will create an environment using a Dockerfile
. This environment contains an R base image as well as some dependencies required to run the following examples.
from azureml.core import Environment
env = Environment.from_dockerfile(name='r_env', dockerfile='Dockerfile')
Now with the dependencies we will configure an Azure ML Batch Pipeline. For more details about how to create pipelines we encourage you to take a look in this MS Learn module.
In this example we use pipeline parameters to be able to submit experiment passing some default values (very useful to test the pipeline). We use two datasets, one for the mp3 file and another one to read an rds file from our Data Store (blob storage).
# Get the Data Store
datastore = Datastore.get(workspace=ws, datastore_name="<your-datastore>")
# RDS Input Data parameter
input_rds_datapath = DataPath(datastore=datastore, path_on_datastore='r-pipeline-data/rds/accidents.Rd')
input_rds_data_pipeline_param = (PipelineParameter(name="input_rds_data", default_value=input_rds_datapath),
DataPathComputeBinding(mode='mount'))
# MP3 Input Data parameter
input_mp3_datapath = DataPath(datastore=datastore, path_on_datastore='r-pipeline-data/mp3')
input_mp3_data_pipeline_param = (PipelineParameter(name="input_mp3_data", default_value=input_mp3_datapath),
DataPathComputeBinding(mode='mount'))
Now we set up an intermediate output. It can be used for example as input to the next steps. We also use the Azure ML default storage to persist these data.
from azureml.data import OutputFileDatasetConfig
datastore_output = ws.get_default_datastore()
output_data = OutputFileDatasetConfig(name="output_data",
destination=(datastore_output, "output_data/{run-id}/{output-name}")).as_upload()
With the input and the intermediate output configured we can set up the pipelines' steps. We will have two steps: train and persist. The first one is a very simple R script receiving both input data (mp3 and rds) and producing an output (for the next step). The second one is a Python script that receives (as input) the result from the first step and persists into a SQL Database.
source_directory = '.'
train_config = ScriptRunConfig(source_directory=source_directory,
command=['Rscript R-model.R '
'--input_rds_data', input_rds_data_pipeline_param,
'--input_mp3_data', input_mp3_data_pipeline_param,
'--output_data', output_data
],
compute_target=compute_target,
environment=env)
# R Model
train = CommandStep(name='train',
runconfig=train_config,
allow_reuse=False,
inputs=[input_rds_data_pipeline_param, input_mp3_data_pipeline_param, output_data_pipeline_param],
outputs=[output_data]
)
# Persist data (python script)
run_config = RunConfiguration()
run_config.environment.python.conda_dependencies = CondaDependencies.create(conda_packages=['pandas', 'pyodbc'])
persist_data = PythonScriptStep(
script_name="persist_data.py",
arguments=["--raw_data", output_data.as_input('raw_data')],
inputs=[output_data],
compute_target=compute_target,
source_directory=source_directory,
runconfig=run_config
)
Finally we can submit the pipeline, passing the input parameters (for example sample audio).
from azureml.pipeline.core import Pipeline
from azureml.core import Experiment
pipeline = Pipeline(workspace=ws, steps=[train, persist_data])
# Parameters to tests our pipeline
test_rds_path = DataPath(datastore=datastore, path_on_datastore='./rds/accidents.Rd')
test_mp3_path = DataPath(datastore=datastore, path_on_datastore='./mp3/file_example_MP3_5MG_01.mp3')
experiment_name = 'R-batch-scoring'
pipeline_run = Experiment(ws, experiment_name).submit(pipeline,
pipeline_parameters={"input_rds_data": test_rds_path,
"input_mp3_data": test_mp3_path
})
And check the execution log
from azureml.widgets import RunDetails
RunDetails(pipeline_run).show()
After the test we can publish the pipeline, so it will be available to be triggered outside the Azure ML (for example from an Azure Data Factory Pipeline).
published_pipeline = pipeline.publish(name='pipeline-batch-score',
description='R batch pipeline')
We can also trigger our pipeline recurrently or using a change-based schedule. In this case we can trigger the pipeline for example when a new mp3 file is added in a specific blob container.
from azureml.data.datapath import DataPath
datastore = Datastore.get(ws, datastore_name='<your-datastore>')
reactive_schedule = Schedule.create(ws,
name="R-Schedule",
description="Based on input file change.",
pipeline_id=published_pipeline.id,
experiment_name=experiment_name,
datastore=datastore,
polling_interval=1,
data_path_parameter_name="input_mp3_data",
path_on_datastore='r-pipeline-data/mp3/'
)
In this example we use Azure Key Vault to protect some sensitive data (username, password, etc.). We can use the default Azure Key Vault integrated into the Azure ML Workspace for this task, so it will be easier to get the secrets during the Pipelines' execution. Please check this doc to obtain more details about this process.
keyvault = ws.get_default_keyvault()
username = '<your-user-name>'
password = '<your-password>'
database = '<your-database>'
server = '<your-server>'
keyvault.set_secret(name="username", value = username)
keyvault.set_secret(name="password", value = username)
keyvault.set_secret(name="database", value = database)
keyvault.set_secret(name="server", value = server)