I am trying to build a vertex ai pipeline that imports a config converts that to a list of dictionaries to use in a parallel for loop on a condition. Pipeline compiles but throws Inactive rpc error.
I have a list of dictionaries.
['{"file_name": "gs://xxxxx/raw/sales/ *.csv", "table_name": "raw_sales", "query": "sales_query", "sep": ",", "flag": "true", "layer": "raw"}', '{"file_name": "gs://xxxxx/data/raw/clean_sku_master/sku_master.csv", "table_name": "raw_sku_master", "query": "sku_master_query", "sep": ",", "flag": "true", "layer": "raw"}', '{"file_name": "gs://xxxxx/raw/sales/ ABRIL.csv", "table_name": "udm_sales", "query": "sales_query", "sep": ",", "flag": "true", "layer": "udm"}', '{"file_name": "gs://xxxxx/data/raw/clean_sku_master/sku_master.csv", "table_name": "udm_sku_master", "query": "sku_master_query", "sep": ",", "flag": "true", "layer": "udm"}']
I pass this into a parallelfor loop under a If condition.
@component(base_image='python:3.10.13')
def zero_return()->str:
a = str('no_items_present')
return a
@dsl.component(base_image='python:3.9')
def print_comp(text:str):
print(text)
@pipeline(name='ingestion-trial_1',pipeline_root=PIPELINE_ROOT + "ingestion-trial_1")
def raw_data_load_pipeline(project_id: str,location:str):
data_with_json = [json.dumps(d) for d in result_list]
zero_return_task = zero_return()
with dsl.If(zero_return_task.output!='no_items_present'):
with dsl.ParallelFor(data_with_json,parallelism=1) as items:
create_bigquery_dataset = create_big_query_dataset(dataset_id=items.layer,
project_id = "softy-dev",
location = 'us-central1')
with dsl.Else():
print_comp(text='Draw!')
Problem is with items.layer parameter. when i give a static value, it executes or when i remove the if condition and only run for loop it executes. But when i try to access the parameter under an if condition. It throws the error:
_InactiveRpcError: _InactiveRpcError of RPC that terminated with: status = StatusCode.INVALID_ARGUMENT details = "Component input parameter 'loop-item-param-2' is not defined." debug_error_string = "UNKNOWN:Error received from peer ipv4:64.233.182.95:443 {created_time:"2024-02-27T13:36:38.8521191+00:00", grpc_status:3, grpc_message:"Component input parameter loop-item-param-2 is not defined."}"
Please guide me how can i access the parameter under if condition.