How to pass URI between Vertex AI Pipeline components in a pipeline

77 Views Asked by At

I am trying to understand how to pass URI of an object created in a custom pipeline component to prebuild component from google_cloud_pipeline_components.

Lets consider this example from google tutorial on using Vertex AI Pipelines(https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline):

import kfp
from google.cloud import aiplatform
from google_cloud_pipeline_components.v1.dataset import ImageDatasetCreateOp
from google_cloud_pipeline_components.v1.automl.training_job import AutoMLImageTrainingJobRunOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp, ModelDeployOp

project_id = PROJECT_ID
pipeline_root_path = PIPELINE_ROOT

# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="automl-image-training-v2",
    pipeline_root=pipeline_root_path)
def pipeline(project_id: str):

    ds_op = ImageDatasetCreateOp(
        project=project_id,
        display_name="flowers",
        gcs_source="gs://cloud-samples-data/vision/automl_classification/flowers/all_data_v2.csv",
        import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification,
    )

Now, lets say I want to create a custom component before ImageDatasetCreateOp which creates a csv from a GCP bucket, which should be used for creating a dataset in a gcs_source parameter:

# My new component for creating and saving a csv file
@component
def save_gcs_file_paths_to_csv(bucket_name : str, output_file : OutputPath('csv')):
    import os
    import pandas as pd
    from google.cloud import storage
    
    # Create a storage client
    storage_client = storage.Client()
    # Access the bucket
    bucket = storage_client.get_bucket(bucket_name)
    # List all files in the bucket
    files = [blob.name for blob in bucket.list_blobs()]
    # Create a DataFrame
    df = pd.DataFrame(files, columns=['File Path'])
    # Save the DataFrame to a CSV file
    df.to_csv(output_file, index=False)

# Pipeline part
project_id = PROJECT_ID
pipeline_root_path = PIPELINE_ROOT

# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="automl-image-training-v2",
    pipeline_root=pipeline_root_path)
def pipeline(project_id: str):

    # Creating a csv
    csv_loader = save_gcs_file_paths_to_csv(bucket_name = 'gs://my- 
    bucket')
    # Create a dataset
    ds_op = ImageDatasetCreateOp(
        project=project_id,
        display_name="flowers",
        gcs_source=csv_loader.outputs['output_file'],
        import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification,
    )

Now, in function ImageDatasetCreateOp I can't use gcs_source=csv_loader.outputs['output_file'] because it is a pipeline object. If I understand correctly I can access path only inside component, but since I want to use prebuild component from google_cloud_pipeline_components I can't.

My question is - how should I create a components so I can pass the URI of created object to following prebuild components.

0

There are 0 best solutions below