How to schedule repeated runs of a custom training job in Vertex AI

5.1k Views Asked by At

I have packaged my training code as a python package and then am able to run it as a custom training job on Vertex AI. Now, I wanted to be able to schedule this job to run, say every 2 weeks, and re-train the model. The Scheduling settings in the CustomJoBSpec allow only 2 fields, "timeout" and "restartJobOnWorkerRestart" so it's not possible using the scheduling settings in the CustomJobSpec. One way to achieve this I could think of was to create a Vertex AI pipeline with a single step using the "CustomPythonPackageTrainingJobRunOp" Google Cloud Pipeline Component and then scheduling the pipeline to run as I see fit. Are there better alternatives to achieve this?

Edit:

I was able to schedule the custom training job using Cloud Scheduler, but I found using the create_schedule_from_job_spec method in the AIPlatformClient very easy to use in the Vertex AI pipeline. The steps I took to schedule the custom job using Cloud Scheduler in gcp are as follows, link to google docs:

  1. Set target type to HTTP
  2. For the url to specify the custom job, I followed this link to get the url
  3. For the authentication, under Auth header, I selected the "Add OAauth token"

You also need to have a "Cloud Scheduler service account" with a "Cloud Scheduler Service Agent role granted to it" in your project. Although the docs ay this should have been set up automatically if you enabled the Cloud Scheduler API after March 19, 2019, this was not the case for me and had to add the service account with the role manually.

1

There are 1 best solutions below

7
On

As per your requirement, the various possible ways for scheduling :

1. Cloud Composer

Cloud Composer is a managed Apache Airflow that helps you create, schedule, monitor and manage workflows.

You can follow the below mentioned steps to schedule your job every two weeks using Composer :

  • Create a Composer Environment.
  • Write a DAG file and add your custom training python code to the DAG file.
  • Since the custom training job is a python code, the PythonOperator can be used to schedule the tasks.
  • In the DAG file you need to provide the start time i.e from which time the scheduling will start and you need to define the schedule interval as two weeks as shown :
with models.DAG(
        'composer_sample_bq_notify',
        schedule_interval=datetime.timedelta(weeks=2),
        default_args=default_dag_args) as dag:

Alternately, you can also use the unix-cron string format (* * * * *) to do the scheduling.

I.e. In your case for scheduling every two weeks the cron format will be like : * * 1,15 * *

You can pass the parameters required by the custom job inside the PythonOperator using op_args and op_kwargs arguments.

After the DAG file is written, you need to push it to the dags/ folder inside the Composer Environment Bucket.

You can check the status of the scheduled DAG in the Airflow UI.

The scheduled DAG file would look like this:

sample_dag.py :

from __future__ import print_function

import datetime

from google.cloud import aiplatform

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)


default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': YESTERDAY,
}

with models.DAG(
        'composer_sample_simple_greeting',
        schedule_interval=datetime.timedelta(weeks=2),
        default_args=default_dag_args) as dag:
    
    def create_custom_job_sample(
    project: str,
    display_name: str,
    container_image_uri: str,
    location: str,
    api_endpoint: str,
):
    # The AI Platform services require regional API endpoints.
    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for multiple requests.
    client = aiplatform.gapic.JobServiceClient(client_options=client_options)
    custom_job = {
        "display_name": display_name,
        "job_spec": {
            "worker_pool_specs": [
                {
                    "machine_spec": {
                        "machine_type": "n1-standard-4",
                        "accelerator_type": aiplatform.gapic.AcceleratorType.NVIDIA_TESLA_K80,
                        "accelerator_count": 1,
                    },
                    "replica_count": 1,
                    "container_spec": {
                        "image_uri": container_image_uri,
                        "command": [],
                        "args": [],
                    },
                }
            ]
        },
    }
    parent = f"projects/{project}/locations/{location}"
    response = client.create_custom_job(parent=parent, custom_job=custom_job)
    print("response:", response)
    
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=create_custom_job_sample,
        op_kwargs={"project" : "your_project","display_name" : "name","container_image_uri":"uri path","location": "us-central1","api_endpoint":"us-central1-aiplatform.googleapis.com"}
        )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='job scheduled')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

2. Cloud Scheduler: To schedule a job using Cloud Scheduler you will need to do the following configuration:

  • Target : HTTP
  • URL : Endpoint URL of the job (example : “us-central1-aiplatform.googleapis.com”)
  • Auth header : OAuth token for for Google APIs hosted on *.googleapis.com

3. Scheduling a recurring pipeline run using the Kubeflow Pipelines SDK:

You can schedule a recurring pipeline run using Python and the Kubeflow Pipelines SDK.

from kfp.v2.google.client import AIPlatformClient

api_client = AIPlatformClient(project_id=PROJECT_ID,
                           region=REGION)

api_client.create_schedule_from_job_spec(
    job_spec_path=COMPILED_PIPELINE_PATH,
    schedule=* * 1,15 * *,
    time_zone=TIME_ZONE,
    parameter_values=PIPELINE_PARAMETERS
)