Airflow using @task and @task_group along with operators other than python callable

43 Views Asked by At

I am currently working on creating airflow dags which performs set of tasks inside all the files in a given directory. Below is different version of a dag but there are multiple steps involved.

def write_log_file(source_file: str, record_content: str) -> str:
    with open(source_file, "a") as log_file:
        log_file.write(f"{record_content}\n")
    return f"logged {record_content} in file {os.path.basename(source_file)} successfully"

# DAG definition
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
}

with DAG(
    'my_inbound_files_dag',
    default_args=default_args,
    description='TEST DAG for processing inbound files',
    schedule_interval='@daily',  
    catchup=False,
) as dag:
    
    purge_processing = LocalFileOperator(
        task_id="purge_processing",
        operation="purge",
        source=SFTP_INBOUND,
        days_threshold=550,
    )
    @task_group(group_id="process_inbound_files_group")
    def processing_inbound_files(inbound_file_path, **kwargs):
        @task
        def write_log_file(source_file: str):
            print(source_file)
            write_log_file_op = PythonOperator(
                task_id=f"write_log_file",
                python_callable=write_log_file,
                op_kwargs={
                    "source_file": TEST,
                    "record_content": f"{source_file}\n",
                },
            )
            write_log_file_op.execute(context=kwargs)
        
        @task
        def write_log_file_next(source_file: str):
            print(source_file)
            write_log_file_next = PythonOperator(
                task_id=f"write_log_file_next",
                python_callable=write_log_file,
                op_kwargs={
                    "source_file": TEST,
                    "record_content": f"{source_file}2\n",
                },
            )
            write_log_file_next.execute(context=kwargs)
        
        write_log_file(inbound_file_path) >> write_log_file_next(inbound_file_path)

    process_inbound_files_group_op = processing_inbound_files.expand(
        inbound_file_path=[os.path.join(SOURCE, file_name) for file_name in os.listdir(SOURCE)]
        )
    
    exit_job = DummyOperator(task_id="exit_job",)

    purge_processing >> process_inbound_files_group_op >> exit_job

I am not sure what is happening, tasks are all completed in airflow UI but it is not actually executed. I tried to debug it but nothing is happening. Any assistance on this matter is really helpful.

0

There are 0 best solutions below