How to set dataset scheduling based on trigger time in Azure ML?

263 Views Asked by At

I'm using Azure Machine Learning (Azure ML) to manage my machine learning workflows, and I want to set up dataset scheduling based on trigger time. The dataset I'm working with has a different format than the trigger time. For example, my dataset has the format "path_on_datastore/2023/01/01/some_data.tsv", while the trigger time format is different.

I have discovered that the scheduling function supports the use of "${{creation_context.trigger_time}}" as a PipelineParameter,(link: https://learn.microsoft.com/en-us/azure/machine-learning/how-to-schedule-pipeline-job?view=azureml-api-2&tabs=cliv2#expressions-supported-in-schedule) but the format it provides doesn't match the format of my dataset. I try to use the components to do that, but the components only support outputting the dataset. Is there a way to customize the format or adapt the trigger time format to match my dataset format?

1

There are 1 best solutions below

0
On

You can use PythonScriptStep class in Azure Machine Learning to execute a python script to get formatted data path based on trigger. Example: Python script file (script.py):

import datetime
# Trigger time is same as current_time
current_time = datetime.datetime.now()
    
# Format the current time to match the dataset path format
dataset_path = "path_on_datastore/{}/{}/{}/some_data.tsv".format(current_time.year, current_time.month, current_time.day)
    
# Use the dataset path in your further processing or operations
print(dataset_path)

With the script you can create a pipeline:

from  azureml.core  import  Workspace, Experiment, Dataset
from  azureml.pipeline.core  import  Pipeline, PipelineData, ScheduleRecurrence
from  azureml.pipeline.steps  import  PythonScriptStep
workspace = Workspace.from_config()

script_step = PythonScriptStep(
name="Get Dataset Path",
script_name="script.py",
compute_target="targetCompute",
inputs=[],
outputs=[],
source_directory="./",
allow_reuse=False
)

Then you can schedule the pipeline:

# Daily execution at 8:00 AM
daily_schedule = ScheduleRecurrence(frequency="Day", interval=1, hours=[8], minutes=[0]) 

pipeline = Pipeline(workspace=workspace, steps=[script_step]) 
pipeline_schedule = pipeline.schedule( start_time="2023-06-01T08:00:00", description="Daily pipeline schedule", recurrence=daily_schedule ) 

# Pipeline Execution
experiment = Experiment(workspace, "dataset_scheduling_experiment") 
pipeline_schedule.submit(pipeline_run=experiment.submit(pipeline))

To disable or update the schedule:

 # Specify the name of the pipeline schedule 
 schedule_name = 'your_schedule_name'  
 schedule = Schedule.get(workspace, schedule_name) 
 # Disable the schedule 
 schedule.disable() 
 # Update the schedule  
 schedule.update()

Above example explain how you can use PythonScriptStep` class and current time in datetime as trigger time. For more information, please refer to this. Note: Make sure to make changes in python script and datastore paths as necessary.