_InactiveRpcError when implementing dsl.ParallelFor, dsl.If and dsl.Else in vertex AI Kubeflow pipeline

20 Views Asked by At

I am trying to build a vertex ai pipeline that imports a config converts that to a list of dictionaries to use in a parallel for loop on a condition. Pipeline compiles but throws Inactive rpc error.

I have a list of dictionaries.

['{"file_name": "gs://xxxxx/raw/sales/ *.csv", "table_name": "raw_sales", "query": "sales_query", "sep": ",", "flag": "true", "layer": "raw"}', '{"file_name": "gs://xxxxx/data/raw/clean_sku_master/sku_master.csv", "table_name": "raw_sku_master", "query": "sku_master_query", "sep": ",", "flag": "true", "layer": "raw"}', '{"file_name": "gs://xxxxx/raw/sales/ ABRIL.csv", "table_name": "udm_sales", "query": "sales_query", "sep": ",", "flag": "true", "layer": "udm"}', '{"file_name": "gs://xxxxx/data/raw/clean_sku_master/sku_master.csv", "table_name": "udm_sku_master", "query": "sku_master_query", "sep": ",", "flag": "true", "layer": "udm"}']

I pass this into a parallelfor loop under a If condition.

@component(base_image='python:3.10.13')
def zero_return()->str:
    a = str('no_items_present')
    return a

@dsl.component(base_image='python:3.9')
def print_comp(text:str):
    print(text)

@pipeline(name='ingestion-trial_1',pipeline_root=PIPELINE_ROOT + "ingestion-trial_1")

def raw_data_load_pipeline(project_id: str,location:str):
    data_with_json = [json.dumps(d) for d in result_list]

    zero_return_task = zero_return()
    with dsl.If(zero_return_task.output!='no_items_present'):

        with dsl.ParallelFor(data_with_json,parallelism=1) as items:
            create_bigquery_dataset = create_big_query_dataset(dataset_id=items.layer,
                                                             project_id = "softy-dev",
                                                             location = 'us-central1')

    with dsl.Else():
        print_comp(text='Draw!')

Problem is with items.layer parameter. when i give a static value, it executes or when i remove the if condition and only run for loop it executes. But when i try to access the parameter under an if condition. It throws the error:

_InactiveRpcError: _InactiveRpcError of RPC that terminated with: status = StatusCode.INVALID_ARGUMENT details = "Component input parameter 'loop-item-param-2' is not defined." debug_error_string = "UNKNOWN:Error received from peer ipv4:64.233.182.95:443 {created_time:"2024-02-27T13:36:38.8521191+00:00", grpc_status:3, grpc_message:"Component input parameter loop-item-param-2 is not defined."}"

Please guide me how can i access the parameter under if condition.

0

There are 0 best solutions below