Summary: Building AI Solutions with Azure ML

Posted by Marco Santoni on Wed 19 August 2020

While studying for the Azure Data Scientist Associate certification, I took notes from Building AI Solution with Azure ML course. In this single page, you'll find the entire content of the course (as of 18th August, 2020). This page is a small support for those preparing for earning the certification.

Intro

Azure ML Workspace

workspaces are azure resources. include:

  • compute
  • notebooks
  • pipelines
  • data
  • experiments
  • models

created alongside

  • storage account: files by WS + data
  • application insights
  • key vault
  • vm
  • container registry

permission: RBAC

edition - basic (no graphic designer) - enterprise

Tools

Azure ML Studio - designer (no code ML model dev) - automated ML

Azure ML SDK

Azure ML CLI Extensions

Compute Instances - choose VM - store notebooks independently of VMs

VS Code - Azure ML Extension

Experiments

Azure ML tracks run of experiments

...
run = experiment.start_logging()
...
run.complete()
  • logging metrics. run.log('name', value). You can review them via RunDetails(run).show()
  • experiment output file. Example: trained models. run.upload_file(..).

Script as an experiment. In the script, you can get the context: run = Rune.get_context(). To run it, you define:

  • RunConfiguration: python environment
  • ScriptRunConfig: associates RunConfiguration with script

Train a ML model

Estimators

Estimator: encapsulates a run configuration and a script configuration in a single object. Save trained model as pickle in outputs folder

estimator = Estimator(
  source_directory='experiment',
  entry_script='training.py',
  compute_target='local',
  conda_packages=['scikit-learn']
)
experiment = Experiment(workspace, name='train_experiment')
run = experiment.submit(config=estimator)

Framework-specific estimators simplify configurations

from azureml.train.sklearn import SKLearn

estimator = SKLearn(
  source_directory='experiment',
  entry_script='training.py',
  compute_target='local'
)

Script parameters

Use argparse to read the parameters in a script (eg regularization rate). To pass a parameter to an Estimator:

estimator = SKLearn(
  source_directory='experiment',
  entry_script='training.py',
  script_params={'--reg_rate': 0.1}
  compute_target='local'
)

Registering models

Once the experiment Run has completed, you can retrieve its outputs (eg trained model).

run.download_file(name='outputs/models.pkl', output_file_path='model.pkl')

Registering a model allows to track multiple versions of a model.

model = Model.register(
  workspace=ws,
  model_name='classification_model',
  model_path='model.pkl', #local path
  description='a classification model',
  tags={'dept': 'sales'},
  model_framework=Model.Framework.SCIKITLEARN,
  model_framework_version='0.20.3'
)

or register from run:

run.register_model(
  ...
  model_path='outputs/model.pkl'
  ...
  )

Datastores

Abstractions of cloud data sources encapsulating the information required to connect.

You can register a data store

  • via ML Studio
  • via SDK
ws = Workspace.from_config()
blob = Datastore.register_azure_blob_container(
  workspace=ws,
  datastore_name='blob_data',
  container_name='data_container',
  account_name='az_acct',
  account_key='123456'
)

In the SDK, you can list data stores.

Use datastores

Most common: Azure blob and file

blob_ds.upload(
  src_dir='/files',
  target_path='/data/files',
  overwrite=True
)
blob_ds.download(
  target_path='downloads',
  prefix='/data'
)

You pass a data reference to the script to use a datastore. Data access models

  • download: contents downloaded to the compute context of experiment
  • upload: files generated by experiment are uploaded after run
  • mount: path of datastore mounted as remote storage (only on remote compute target)

Pass reference as script parameter:

data_ref = blob_ds.path('data/files').as_download(path_on_compute='training_data')
estimator = SKLearn(
  source_directory='experiment_folder',
  entry_script='training_script.py',
  compute_target='local',
  script_params={'--data_folder': data_ref}
)

Retrieve it in script and use it like local folder:

parser = argparse.ArgumentParser()
parser.add_argument('--data_folder', type='str', dest='data_folder')
args = parser.parse_args()
data_files = os.listdir(args.data_folder)

Datasets

Datasets are versioned packaged data objects consumed in experiments and pipelines. Types

  • tabular: read as table
  • file: list of file paths

You can create dataset via Azure ML Studio or via SDK. File paths can have wildcards (/files/*.csv).

Once a dataset is created, you can register it in the workspace (available later too).

Tabular:

from azureml.core import Dataset

blob_ds = we.get_default_datastore()
csv_paths = [
  (blob_ds, 'data/files/current_data.csv'),
  (blob_ds, 'data/files/archive/*.csv')
]
tab_ds = Dataset.Tabular.from_delimited_files(path=csv_paths)
tab_ds = tab_ds.register(workspace, name='csv_table')

File:

blob_ds = ws.get_default_datastore()
file_ds = Dataset.File.from_files(path=(blob_ds, 'data/files/images/*.jpg'))
file_ds = file_ds.register(workspace=ws, name='img_files')

Retrieve a dataset

ws = Workspace.from_config()

# Get a dataset from workspace datasets collection
ds1 = ws.datasets['csv_table']

# Get a dataset by name from the datasets class
ds2 = Dataset.get_by_name(ws, 'img_files')

Datasets can be versioned. Create a new versioning by registering with same name and create_new_version property:

file_ds = file_ds.register(workspace=ws, name='img_files', create_new_version=True)

Retrieve specific version:

img_ds = Dataset.get_by_name(workspace=ws, name='img_files', version=2)

Compute Contexts

The runtime context for each experiment consists of

  • environment for the script, which includes all packages
  • compute target on which the environment will be deployed

Intro to Environments

Python runs in virtual environments (eg Conda, pip). Azure creates a Docker container and creates the environment. You create environments by

  • Conda or pip yaml file and load it:
env = Environment.from_conda_specification(name='training_env', file_path='./conda.yml')
  • from existing Conda environment:
env = Environment.from_conda_environment(name='training_env',
                            conda_environment_name='py_env')
  • specifying packages:
env = Environment('training_env')
deps = CondaDependencies.create(conda_packages=['pandas', 'numpy']
                              pip_packages=['azureml-defaults'])
env.python.conda_dependencies = deps

Once created, you can register the environment in the workspace.

env.register(workspace=ws)

Retrieve and assign it to a ScriptRunConfig or an Estimator

tr_env = Environment.get(workspace=ws, name='training_env')
estimator = Estimator(
  source_directory='experiment_folder',
  entry_script='training_script.py',
  compute_target='local',
  environment_definition=tr_env
  )

Compute targets

Compute targets are physical or virtual computer on which experiments are run. Types of compute

  • local compute: your workstation or a virtual machine
  • compute clusters: multi-node clusters of VMs that automatically scale up or down
  • inference clusters: to deploy models, they use containers to initiate computing
  • attached compute: attach a VM or Databricks cluster that you already use

You can create a compute target via AML studio or via SDK. A managed compute target is one managed by AML. Via SDK

ws = Workspace.from_config()
compute_name = 'aml-cluster'
compute_config = AmlCompute.provisioning_configuration(
  vm_size='STANDARD_DS12_V2',
  min_nodes=0,
  max_nodes=4,
  vm_priority='dedicated'
  )
aml_cluster = ComputeTarget.create(we, compute_name, compute_config)
aml_cluster.wait_for_completion()

An unmanaged compute target is defined and managed outside AML. You can attach it via SDK:

ws = Workspace.from_config()
compute_name = 'db-cluster'
db_workspace_name = 'db_workspace'
db_resource_group = 'db_resource_group'
db_access_token = 'aocsinaocnasoivn'
db_config = DatabricksCompute.attach_configuration(
  resource_group=db_resource_group,
  workspace_name=db_workspace_name,
  access_token=db_access_token
  )
db_cluster = ComputeTarget.create(we, compute_name, db_config)
db_cluster.wait_for_completion()

You can check if a compute target does not exist already:

compute_name = 'aml_cluster'
try:
  aml_cluster = ComputeTarget(workspace=ws, name=compute_name)
except ComputeTargetException:
  # create it
  ...

You can use a compute target in an experiment run by specifying it as a parameter

compute_name = 'aml_cluster'
training_env = Environment.get(workspace=ws, name='training_env')
estimator = Estimator(
  source_directory='experiment_folder',
  entry_script='training_script.py',
  environment_definition=training_env,
  compute_target=compute_name
  )
# or specify a ComputeTarget object
training_cluster = ComputeTarget(workspace=ws, name=compute_name)
estimator = Estimator(
  source_directory='experiment_folder',
  entry_script='training_script.py',
  environment_definition=training_env,
  compute_target=training_cluster
  )

Orchestrating with Pipelines

A pipeline is a workflow of ml tasks in which each tasks is implemented as a step (either sequential or parallel). You can combine different compute targets. Common types of step:

  • PythonScriptStep
  • EstimatorStep: runs an estimator
  • DataTransferStep: uses ADF
  • DatabricksStep
  • AdlaStep: runs a U-SQL job in Azure Data Lake Analytics

Define steps:

step1 = PythonScriptStep(
  name='prepare data',
  source_directory='scripts',
  script_name='data_prep.py',
  compute_target='aml-cluster',
  runconfig=run_config
  )

step2 = EstimatorStep(
  name='train model',
  estimator=sk_estimator,
  compute_target='aml-cluster'
  )

Assign steps to pipeline:

train_pipeline = Pipeline(
  workspace=ws,
  steps=[step1,step2]
  )
# create experiment and run pipeline
experiment = Experiment(workspace=ws, name='training-pipeline')
pipeline_run = experiment.submit(train_pipeline)

Pass data between steps

The PipelineData object is a special kind of DataReference that

  • reference a location in a store
  • creates a da dependency between pipelines

To pass it

  • define a PipelineData object that references a location in a data store
  • specify the object as input or output for the steps that use it
  • pass the PipelineData object as a script parameter in steps that run scripts

Example

raw_ds = Dataset.get_by_name(ws, 'raw_dataset')
# Define object to pass data between steps
data_store = ws.get_default_datastore()
prepped_data = PipelineData('prepped', datastore=data_store)

step1 = PythonScriptStep(
  name='prepare data',
  source_directory='scripts',
  script_name='data_prep.py',
  compute_target='aml-cluster',
  runconfig=run_config,
  # specify dataset
  inputs = [raw_ds.as_named_input('raw_data')],
  # specify PipelineData as output
  outputs = [prepped_data],
  # script reference
  arugments = ['--folder', prepped_data]
  )

step2 = EstimatorStep(
  name='train model',
  estimator=sk_estimator,
  compute_target='aml-cluster'
  # specify PipelineData
  inputs = [prepped_data],
  # pass reference to estimator script
  estimator_entry_script_arguments = ['--folder', prepped_data]
  )

Inside the script, you can get reference to PipelineData object from the argument, and use it like a local folder.

parser = argpare.ArgumentParser()
parser.add_argument('--folder', type=str, dest='folder')
args = parser.parse_args()
output_folder = args.folder

# ...

# save data to PipelineData location
os.makedirs(output_folder, exist_ok=True)
output_path = os.path.join(output_folder, 'prepped_data.csv')
df.to_csv(output_path)

Reuse steps

By default, the step output from a previous pipeline run is reused without rerunning the step (if script, source directory and other params have not changed). You can control this:

step1 = PythonScriptStep(
  #...
  allow_reuse=False
  )

You can force the steps to run regardless of individual configuration:

pipeline_run = experiment.submit(train_pipeline, regenerate_outputs=True)

Publish pipelines

You can publish a pipelien to create a REST endpoint through which the pipeline can be run on demand.

published_pipeline = pipeline.publish(
  name='training_pipeline',
  description='Model training pipeline',
  version='1.0'
  )

You can view it in ML Studio and get the endpoint:

published_pipeline.endpoint

You start a published endpoint by making an HTTP request to it. You pass the authorisation header (with token) and a JSON payload specifying the experiment name. The pipeline is run asynchronously, you get the run ID as response.

Pipeline parameters

Create a PipelineParameter object for each parameter. Example:

reg_param = PipelineParameter(name='reg_rate', default_value=0.01)
# ...
step2 = EstimatorStep(
  # ...
  estimator_entry_script_arguments=[
    '--folder', prepped,
    '--reg', reg_param
  ]
)

After you publish a parametrised pipeline, you can pass parameter values in the JSON payload of the REST interface. Example

requests.post(
  enpoint,
  headers=auth_header,
  json={
    'ExperimentName': 'run_training_pipeline',
    'ParameterAssignments': {
      'reg_rate': 0.1
    }
  }
  )

Schedule pipelines

Define a ScheduleRecurrence and use it to create a Schedule.

daily = ScheduleRecurrence(
  frequency='Day',
  interval=1
  )
pipeline_schedule = Schedule.create(
  ws,
  name='Daily Training',
  description='train model every day',
  pipeline_id=published_pipeline.id,
  experiment_name='Training_Pipeline',
  recurrence=daily
  )

To schedule a pipeline to run whenever data changes, you must create a Schedule that monitors a specific path on a datastore:

training_datastore = Datastore(workspace=ws, name='blob_data')
pipeline_schedule = Schedule.create(
  # ...
  datastore=training_datastore,
  path_on_datastore='data/training'
  )

Deploy ML Models

You can deploy ass container to several compute targets

  • Azure ML compute instance
  • Azure container instance
  • Azure function
  • Azure Kubernetes service
  • IoT module

Steps

  1. register the model
  2. inference configuration
  3. deployment configuration
  4. deploy model

Register the model

After training, you must register the model to Azure ML workspace.

classification_model = Model.register(
  workspace=ws,
  model_name='classification_model',
  model_path='model.pkl',
  description='A classification model'
  )

Or you can use the reference to the run:

run.register_model(
  model_name='classification_model',
  model_path='outputs/model.pkl',
  description='A classification model'
  )

Inference configuration

The model will be deployed as a service consisting of

  • a script to load the model and return predictions for submitted data
  • an environment in which the script will be run

Create the entry script (or scoring script) as a Python file including 2 functions

  • init() called when service is initialised (load model from registry)
  • run(raw_data) called when new data is submitted to the service (generate predictions)

Example

def init():
  global model
  model_path = Model.get_model_path('classification_model')
  model = joblib.load(model_path)

def run(raw_data):
  data = np.array(json.loads(raw_data)['data'])
  predictions = model.predict(data)
  # return predictions as any JSON seriazable format
  return predictions.tolist()

You can configure the environment using Conda. You can use a CondaDependencies class to create a default environment (including azureml-defaults and other commonly-used) and add any other required packages. You then serialize the environment to a string and save it.

myenv = CondaDependencies()
myenv.add_conda_package('scikit-learn')

env_file = 'service_files/env.yml'
with open(env_file, 'w') as f:
  f.write(myenv.serialize_to_string())

After creating the script and the environment, you combine them in an InferenceConfig:

classifier_inference_config = InferenceConfig(
  runtime='python',
  source_directory='service_files',
  entry_script='score.py',
  conda_file='env.yml'
  )

Deployment configuration

Now that you have the entry script and the environment, you configure the compute service. If you deploy to an AKS cluster, you create it

cluster_name = 'aks-cluster'
compute_config = AksCompute.provisioning_configuration(location='eastus')
production_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
production_cluster.wait_for_completion()

You define the deployment configuration

classifier_deploy_config = AksWebservice.deploy_configuration(
  cpu_cores=1,
  memory_gb=1
)

Deploy the model

model = ws.models['classification_model']
service = Model.deploy(
  name='classification-service',
  models=[model],
  inference_config=classifier_inference_config,
  deploy_config=classifier_deploy_config,
  deployment_target=production_cluster
  )
service.wait_for_deployment()

Consuming a real-time inferencing service

For testing, you can use the AML SDK to call a web service through the run method of a WebService object. Typically, you send data to run method in a JSON like

{
  'data':[
    [0.1, 0.2, 3.4],
    [0.9, 8.2, 2.5],
    ...
  ]
}

The response is a JSON with a prediction for each case

response = service.run(input_data=json_data)
predictions = json.loads(response)

In production, you use a REST endpoint. You find the endpoint of a deployed service in Azure ML studio, or by retrieving the scoring_url property of a Webservice object:

endpoint = service.scoring_uri

There are 2 kinds of authentication:

  • key: requests are authenticated by specifying the key associated with the service
  • token: requests are authenticated by providing a JSON Web Token (JWT)

By default, authentication is disabled for Azure Container Instance service (set to key-based authentication for AKS).

To make an authenticate call to the REST endpoint, you include the oey or the token in the request header.

Troubleshooting service deployment

You can

  • check the service state (should be healty): service.state
  • review service logs: service.get_logs()
  • deploy to local container

Batch inference pipelines

Pipeline to read input data, load a registered model, predict labels, and write results.

  1. Register a model
  2. Create a scoring script. The run(mini_batch) method makes the inference on each batch.
  3. Create a pipeline with ParallelRunStep
  4. Run the pipeline and retrieve the step output

Azure ML provides a pipeline step performs parallel batch inference. Using ParallelRunStep class, you can read batches of files from a File dataset and write the output to a PipelineData reference. You can set the output_action to "append_row" (ensuring all instances of the step will collate the result to a single output file named parallel_run_step.txt).

batch_data_set = ws.datasets('batch-data')

# output location
default_ds = we.get_default_datastore()
output_dir = PipelineData(
  name='inferences',
  datastore=default_ds,
  output_path_on_compute='results'
)

parallel_run_config = ParallelRunConfig(
  source_directory='batch_scripts',
  entry_script='batch_scoring_script.py',
  mini_batch_size="5",
  error_threshold=10,
  output_action="append_row",
  environment=batch_env,
  compute_target=aml_cluster,
  node_count=4
  )

parallelrun_step = ParallelRunStep(
  name="batch-score",
  parallel_run_config=parallel_run_config,
  inputs=[batch_data_set.as_named_input('batch_data')],
  output=output_dir,
  arguments=[],
  allow_reuse=True
  )

pipeline = Pipeline(
  workspace=ws,
  steps=[parallelrun_step]
  )

Run the pipeline and retrieve output.

pipeline_run = Experiment(ws, 'batch_prediction_pipeline').submit(pipeline)
pipeline_run.wait_for_completion()

prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('inferences')
prediction_output.download(local_path='results')

Publishing a batch inference pipeline

You can publish it as a REST service.

published_pipeline = pipeline_run.publish_pipeline(
  name='Batch_Prediction_Pipeline',
  description='Batch Pipeline',
  version='1.0'
  )

rest_endpoint = published_pipeline.endpoint

Once published, you can use the endpoint to initiate a batch inferencing job.

You can also schedule the published pipeline to have it run automatically.

weekly = ScheduleRecurrence(frequency='Week', interval=1)
pipeline_schedule = Schedule.create(
  ws,
  name='Weekly Predictions',
  description='batch inferencing',
  pipeline_id=published_pipeline.id,
  experiment_name='Batch_Prediction',
  recurrence=weekly
  )

Tuning hyperparameters

Accomplished by training multiple models, using same algorithm and training data but different hyperparameter values. Then, evaluate for each the performance metric (eg accuracy), and the best-performing model is selected.

In Azure ML, you make an experiment that consist of a hyperdrive run, which initiates a child run for each hyperparameter. Each child run uses a training script with parametrised hyperparameter values to train a model, and logs the target performance metric achieved by the training model.

Define a search space

Depends on the type of hyperparameter:

  • discrete. Make a choice out of
  • an explicit python list: choice([10, 20, 30])
  • a range: choice(range(1,10))
  • select values from a discrete distribution: qnormal, quniform, qlognormal, qloguniform
  • continuous. Use any of these distribution: normal, uniform, lognormal, loguniform

Define a search space by creating a dictionary with parameter expressions for each hyperparameter.

from azureml.train.hyperdrive import choice, normal

param_space = {
  '--batch_size': choice(16, 32, 64),
  '--learning_rate': normal(10, 3)
}

Configuring sampling

The values used in a tuning run depend on the type of sampling used.

Grid sampling. Every possible combination when hyperparameters are discrete.

param_space = {
  '--batch_size': choice(16, 32, 64),
  '--learning_rate': choice(10, 20)
}

param_sampling = GridParameterSampling(param_space)

Random sampling. Randomly select a value for each hyperparameter.

param_space = {
  '--batch_size': choice(16, 32, 64),
  '--learning_rate': normal(10, 3)
}

param_sampling = RandomParameterSampling(param_space)

Bayesian sampling. Based on Bayesian optimisation algorithm that tries to select parameter combinations that will result in improved performance from the previous selection.

param_space = {
  '--batch_size': choice(16, 32, 64),
  '--learning_rate': uniform(0.5, 0.1)
}

param_sampling = BayesianParameterSampling(param_space)

Can only be used with choice, uniform, quniform distributions and can't be combined with early termination.

Configuring an early termination

Typically, you set a maximum number of iterations, but this could still result in a large number of runs that don't result in a better model than a combination that has already been tried.

To help preventing wasting time, you can set an early termination policy that abandons runs that are unlikely to produce a better result than previously completed runs. The policy is evaluated at an evaluation interval you specify, based on each time the target performance metric is logged. You can also set a delay evaluation parameter to avoid evaluating the policy until a minimum number of iterations have been completed.

Note. Early termination is particularly useful for deep learning scenarios where a deep neural network is trained iteratively over a number of epochs. The training script can report the target metric after each epoch, and if the run is significantly underperforming previous runs after the same number of intervals, it can be abandoned.

Bandit policy. Stop a run if the target performance metric underperforms the best run so far by a specified margin.

early_termination_policy = BanditPolicy(
  slack_amount=0.2, # abandon runs when metric is 0.2 or more worse than best run after the same number of intervals
  evaluation_interval=1,
  delay_evaluation=5
  )

You can also use a slack factor comparing the metric as ration rather than an absolute value.

Median stopping policy. Abandoning runs where the target performance metric is worse than the median of the running averages fo all runs.

early_termination_policy = MedianStoppingPolicy(
  evaluation_interval=1,
  delay_evaluation=5
  )

Truncation selection policy. Cancelling the lower performing X%% of runs at each evaluation interval based on the truncation_percentage valu you specify for X.

early_termination_policy = TruncationSelectionPolicy(
  truncation_percentage=10,
  evaluation_interval=1,
  delay_evaluation=5
  )

Running a hyperparameter tuning experiment

In Azure ML, you tune hyper by running a hyperdrive experiment. You need to create a training script just the way you would do for any other training experiment, except that you must:

  • include an argument for each hyperparameter
  • log the target performance metric.

This example script trains a logistic regression using a --regularization argument (regularization rate), and logs the accuracy.

parser = argparse.ArgumentParser()
parser.add_argument('--regularization', type=float, dest='reg_rate', default=0.01)
args = parser.parse_args()
reg = args.reg_rate

# get experiment run context
run = Run.get_context()

data = run.input_datasets['training_data'].to_pandas_dataframe()
X = data[['feature1', 'feature2', 'feature3', 'feature4']].values
y = data['label'].values
X_train, X_test, y_train, y_test = train_test_split(X, y test_size=0.3)

model = LogisticRegression(C=1/reg, solver='liblinear').fit(X_train, y_train)

# calculate and log accuracy
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
run.log('Accuracy', np.float(acc))

# save trained model
os.makedirs('outputs', exist_ok=True)
joblib.dump(value=model, filename='outputs/model.pkl')

run.complete()

To prepare the hyperdrive experiment, you use a HyperDriveConfig object to configure the experiment run.

hyperdrive = HyperDriveConfig(
  estimator=sklearn_estimator,
  hyperparameter_sampling=param_sampling,
  policy=None,
  primary_metric_name='Accuracy',
  primary_metricgoal=PrimaryMetricGoal.MAXIMIZE,
  max_total_runs=6,
  max_concurrent_runs=4
  )

experiment = Experiment(workspace=ws, name='hyperdrive_training')
hyperdrive_run = experiment.submit(config=hyperdrive)

You can monitor hyperdrive experiment in Azure ML studio. The experiment will initiate a child run for each hyperparameter combination to be tried

Automate model selection

Visual interface for automated ML in Azure ML Studio for Enterprise edition only.

You can use automated ML to train models for the tasks below. Azure ML supports common algorithms for these tasks:

  • classification
  • logistic regression
  • light gradient boosting machine
  • decision tree
  • random forest
  • naive Bayes
  • linear SVM
  • XGBoost
  • DNN classifier
  • others...
  • regression
  • linear regression
  • light gradient boosting machine
  • decision tree
  • random forest
  • elastic net
  • LARS Lasso
  • XGBoost
  • Others
  • time series forecasting
  • linear regression
  • light gradient boosting machine
  • decision tree
  • random forest
  • elastic net
  • LARS Lasso
  • XGBoost
  • others

By default, automated machine learning, will randomly select from the full range of algorithms for the specified task. You can choose to block individual algorithms from being selected.

Preprocessing and featurization

Automated ML (AutoML) can apply preprocessing transformations to your data.

  • scaling and normalization applied to numeric data automatically
  • optional featurization
  • missing value imputation
  • categorical encoding
  • dropping high cardinality features (eg IDs)
  • feature engineering (eg date parts from DateTime)

Running AutoML experiment

You can use Auzure ML Studio UI or use SDK (using AutoMLConfig class).

automl_run_config = RunConfiguration(framework='python')
automl_config = AutoMLConfig(
  name='auto ml experiment',
  task='classification',
  primary_metric='AUC_weighted',
  compute_target=aml_compute,
  training_data=train_dataset,
  validation_data=test_dataset,
  label_column_name='label',
  featurization='auto',
  iterations=12,
  max_concurrent_iterations=4
  )

With Azure ML Studio, you can create or select an Azure ML dataset to be used as input for your AutoML experiment. When using the SDK, you can submit data by

  • specify a dataset or dataframe of training data that includes features and label to be predicted
  • optionally, specify a second validation data dataset or dataframe. If this is not provided, Azure ML will apply cross-validation.

Alternatively:

  • specify a dataset, dataframe, or numpy array of X values containing features with a corresponding y array of label values

One of the most important setting you specify is primary_metric (ie target performance metric). Azure ML supports a set of named metrics for each type of task.

get_primary_metrics('classification')

You can submit an AutoML experiment like any other SDK-based experiment:

automl_experiment = Experiment(ws, 'automl_experiment')
automl_run = automl_experiment.submit(automl_config)

You can easily identify the best run in Auzre ML studio, and download or deploy the model it generated. Via SDK:

best_run, fitted_model = automl_run.get_output()
best_run_metrics = best_run.get_metrics()
for metric_name in best_run_metrics:
  metric = best_run_metrics[metric_name]
  print(metric_name, metric)

AutoML uses scikit-learn pipelines. You can view the steps in the fitted model you obtained from the best run.

for step in fitted_model.named_steps:
  print(step)

Explain ML models

Model explainers use statistical techniques to calculate feature importance. Explainers work by evaluating a test data set of feature cases and the labels the model predicts for them.

Global feature importance quantifies the relative importance of each feature in the test dataset as a whole: which feature in the dataset influences prediction?

Local feature importance measures the influence of each feature value for a specific individual prediction. Example, will Sam go deafult?

Prediction=0: Samuel won't default on the loan repayment

Features:

  • loan amount; support for 0: 0.9; support for 1: -0.9
  • income; support for 0: 0.6
  • age; support for 0: -0.2
  • marital status; support for 0: 0.1

Because this is a classification model, each feature gets a local importance value for each possible class, indicating the amount of support for that class based on the feature value.

The most important feature for a prediction of class 1 is loan amount. There could be multiple reasons why local importance for an individualprediction varies form global importance for the overall dataset. For example, Sam might have a lower income than average, but the loan amount in this case might be unusually small.

For a multi-class classification model, a local importance value for each possible class is calculated for every feature, with the total across all classes always being 0.

For a regression model, the local importance values simply indicate the level of influence each feature has on the predicted scalar label.

Using explainers

You can use Azure ML SDK to create explainers for models even if they were not trained using an Azure ML experiment.

You install the azureml-interpret package. Types of explainer include:

  • MimicExplainer creates a global surrogate model that approximates your trained model and can be used to generate explanations. This explainable model must have the same kind of architecture as your trained model (eg linear or tree-based)
  • TabularExplainer acts as a wrapper around various SHAP explainer algorithms, automatically choosing the one that is most appropriate for your model architecture
  • PFIExplainer (Permutation Feature Importance) analyzes feature importance by shuffling feature values and measuring the impact on prediction performance

Example for hypothetical model named loan_model

mim_explainer = MimicExplainer(
  model=loan_model,
  initialization_examples=X_test,
  explainable_model=DecisionTreeExplainableModel,
  features=['loan_amount', 'income', 'age', 'marital_status'],
  classes=['reject', 'approve']
  )

tab_explainer = TabularExplainer(
  model=loan_model,
  initialization_examples=X_test,
  features=['loan_amount', 'income', 'age', 'marital_status'],
  classes=['reject', 'approve']
  )

pfi_explainer = PFIExplainer(
  model=loan_model,
  features=['loan_amount', 'income', 'age', 'marital_status'],
  classes=['reject', 'approve']
  )

To retrieve global feature importance, call the explain_global() method of your explainer, and then use the get_feature_importance_dict() method to get a dictionary of the feature importance values.

global_mim_explanation = mim_explainer.explain_global(X_train)
global_mim_feature_importance = global_mim_explanation.get_feature_importance_dict()

# same as MimixExplainer
global_tab_explanation = mim_explainer.explain_global(X_train)
global_tab_feature_importance = global_tab_explanation.get_feature_importance_dict()

# requires actual labels
global_pfi_explanation = mim_explainer.explain_global(X_train)
global_pfi_feature_importance = global_pfi_explanation.get_feature_importance_dict()

To retriev local feature importance from a MimicExplainer or a TabularExplainer, you must call the explain_local() specifying the subset of cases you want to explain. Then you use the get_ranked_local_names() and get_ranked_local_values() to retrieve dictionares.

# same for tab_explainer too
local_mim_explanation = mim_explainer.explain(X_test[0:5])
local_mim_features = local_mim_explanation.get_ranked_local_names()
local_mim_importance = local_mim_explanation.get_ranked_local_values()

PFIExplainer does not support local feature importance explanations.

Creating explanations

You can create an explainer and upload the explanation it generates to the run for later analysis.

To create an explanation for the experiment script, you'll need to ensure that the azureml-interpret and azureml-contrib-interpret packages are installed in the run environment. Then you can use these to create an explanation from your trained model and upload it to the run outputs.

run = Run.get_context()

# code to train model goes here

# get explanation
explainer = TabularExplainer(model, X_train, features=features, classes=labels)
explanation = explainer.explain_global(X_test)

# get an explanation client and upload the explanation
explain_client = ExplanationClient.from_run(run)
explain_client.upload_model_explanation(explanation, comment='Tabular Explanation')

run.complete()

You can view the explanation you created for your model in the Explanations tab for the run in Azure ML Studio.

Visualizing explanations

Model explanations in Azure ML Studio include multiple visualizations that you can use to explore feature importance. Visualizations:

  • global feature importance
  • summary importance: shows the distribution of individual importance values for each feature across the test dataset
  • local feature importance by selecting an individual data point

Monitor models

You can use Application Insights to capture and review telemetry from models published with Azure ML. You must have an Application Insights resource associated with your Azure ML workspace.

When you create an Azure ML workspace, you can select an Application Insights resource. If you do not select an existing resource, a new one is created in the same resource group as your workspace.

When deploying a new real-time service, you can enable Application Insights in the deployment configuration for the service.

dep_config = AciWebservice.deploy_configuration(
  cpu_cores=1,
  memory_gb=1,
  enable_app_insights=True
  )

If you want to enable Application Insights for a service that is already deployed, you can modify the deployment configuration for AKS based services in the Azure portal.

Capture and view telemetry

Application Insights automatically captures any information written to the standard output and error logs, and provides a query capability to view data in these logs.

You can write any value to the standard output in the scoring script by using a print:

def run(raw_data):
  data = json.loads(raw_data)['data']
  predictions = model.predict(data)
  print('Data: ' + str(data) + ' - Predictions: ' + str(predictions))
  return predictions.tolist()

Azure ML creates a custom dimension in the data model for the output you write.

Yuo can use the Log Analytics query interface for the Applcation Insights in the Azure portal. It supports a SQL-like query syntax.

Monitor data drift

Over time there may be trends that change the profile of the data, making your model less accurate. This change in data profiles between training and inferencing is known as data drift.

Azure ML supports data drift monitoring through the use of datasets. You can compare two registered datasets to detect data drift, or you can capture new feature data submitted to a deployed model service and compare it to the dataset with which the model was trained.

You register 2 datasets:

  • a baseline dataset: original training data
  • a target dataset that will be compared to the baseline on time intervals. This dataset requires a column for each feature you want to compare, and a timestamp column

You define a dataset monitor to detect data drift and trigger alerts if the rate of drift exceeds a specified threshold. You can create dataset monitors using Azure ML Studio or by using the DataDriftDetector class.

monitor = DataDriftDetector.create_from_datasets(
  workspace=ws,
  name='dataset-drift-monitor',
  baseline_data_set=train_ds,
  target_data_set=new_data_ds,
  compute_target='aml-cluster',
  frequency= 'week',
  feature_list=['age', 'height', 'bmi'],
  latency=24
  )

You can backfill to immediately compare baseline to existing data in target.

backfill = monitor.backfill( dt.datetime.now() - dt.timedelta(weeks=6), dt.datetime.now())

If you have deployed a model as a real-time web service, you can capture new inferencing data s it is submitted, and compare it to the original training data. It has the benefit of automatically collecting new target data as the deployed model is used.

You include the training dataset in the model registration to provide a baseline.

model = Model.register(
  workspace=ws,
  model_path='./model/model.pkl',
  model_name='mymodel',
  datasets=[(Dataset.Scenario.TRAINING, train_ds)]
  )

You enable data collection for services in which the model is used. You use the ModelDataCollector class in each service's scoring script, writing code to capture data and predictions and write them to the data collector (which will store them in Azure blob storage).

def init():
  global model, data_collect, predict_collect
  model_name = 'mymodel'
  model = joblib.load(Model.get_model_path(model_name))

  # enable collection of data and predictions
  data_collect = ModelDataCollector(
    model_name,
    designation='inputs',
    features=['age', 'height', 'bmi']
    )
  predict_collect = ModelDataCollector(
    model_name,
    designation='predictions',
    features=['prediction']
    )

def run(raw_data):
  data = json.loads(raw_data['data'])
  predictions = model.predict(data)

  data_collect(data)
  predict_collect(predictions)

  return predictions.tolist()

With the data collection code in place in the scoring script, you can enable data collection in the deployment configuration.

dep_config = AksWebservice.deploy_configuration(collect_model_data=True)

You can configure data drift monitoring by using a DataDriftDetector class.

model = ws.models['mymodel']
datadrift = DataDriftDetector.create_from_model(
  ws,
  model.name,
  model.version,
  services=['my-svc'],
  frequency='Week'
  )

Scheduling alerts

You can specify a threshold for the rate of data drift and an operator email for notifications.

Monitoring works by running a comparison at scheduled frequency (day, week, or month), and calculating data drift metrics for the features. For dataset monitors, you can specify a latency indicating the number of hours to allow for new data to be collected and added to the target dataset. For deployed model data drifts monitor, you can specify a schedule_start time value to indicate when the data drift run should start (if omitted, the run will start at the current time).

Data drift is measured using a calculated magnitude of change in the statistical distributions of feature values over time. You can configure a threshold for data drift magnitude.

alert_email = AlertConfiguration('data_scientist@contoso.com')
monitor = DataDriftDetector.create_from_datasets(
  ws,
  'dataset-drift-detector',
  baseline_data_set,
  target_data_set,
  compute_target=cpu_cluster,
  frequency='Week',
  latency=2,
  drift_threshold=0.3,
  alert_configuration=alert_email
  )