I send task with .send_task method and using the same object to get the result. Task reports successful on Airflow Dashboard inspite of it fails with error on Celery side.
Here is my DAG look like
RBROKER = "db+sqlite:///db.sqlite3"
CELERY_BROKER_URL = "pyamqp://guest:guest@hostname//"
celery_obj = Celery( 'worker',backend=RBROKER, broker=CELERY_BROKER_URL)
with DAG(
'redcap_etl',
default_args=default_args,
schedule_interval='@once',
) as dag:
def run_celery(**kwargs):
context=get_current_context()
print(f"context value: {context}")
redcap_study = kwargs['dag_run'].conf.get('redcap_study')
print(f"redcap_study: {redcap_study}")
script = kwargs['dag_run'].conf.get('script')
print(f"script: {script}")
row_id = kwargs['dag_run'].conf.get('row_id')
print(f"row_id: {row_id}")
result = celery_obj.send_task("run_redcap_etl", args=[redcap_study,row_id,script])
print(f"result1 from celery -------: {result}")
start_redcap_etl_task = BashOperator(
task_id='start_redcap_etl_task',
bash_command="echo Start redcap etl Task!!"
)
run_redcap_etl_task = PythonOperator(
task_id='run_redcap_etl_task',
provide_context=True,
python_callable=run_celery
)
end_rdcap_etl_task = BashOperator(
task_id='end_rdcap_etl_task',
bash_command="echo end redcap_etl Task!"
)
# Define the task dependencies
start_redcap_etl_task >> run_redcap_etl_task >> end_rdcap_etl_task
Here is the log from Celery on purpose failure:
[2024-01-25 21:58:12,961: INFO/MainProcess] celery@ip-10-70-21-144 ready.
[2024-01-25 21:58:57,088: INFO/MainProcess] Task run_redcap_etl[5f4ea609-0da5-4a41-88eb-fb875e35fed9] received
[2024-01-25 21:58:57,089: WARNING/ForkPoolWorker-2] Executing redcap_etl for redcap_study - None script - None and row id is - None
[2024-01-25 21:58:57,126: WARNING/ForkPoolWorker-2] task failed -------
[2024-01-25 21:58:57,126: ERROR/ForkPoolWorker-2] Task run_redcap_etl[5f4ea609-0da5-4a41-88eb-fb875e35fed9] raised unexpected: TypeError('expected str, bytes or os.PathLike object, not NoneType')
Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.8/site-packages/celery/app/trace.py", line 477, in trace_task
R = retval = fun(*args, **kwargs)
File "/home/ubuntu/.local/lib/python3.8/site-packages/celery/app/trace.py", line 760, in __protected_call__
return self.run(*args, **kwargs)
File "/home/ubuntu/deployment/airflow/deployments/stage/worker.py", line 63, in run_redcap_etl
result = subprocess.run(["sh", script_path_redcap_etl, script])
File "/usr/lib/python3.8/subprocess.py", line 493, in run
with Popen(*popenargs, **kwargs) as process:
File "/usr/lib/python3.8/subprocess.py", line 858, in __init__
self._execute_child(args, executable, preexec_fn, close_fds,
File "/usr/lib/python3.8/subprocess.py", line 1639, in _execute_child
self.pid = _posixsubprocess.fork_exec(
TypeError: expected str, bytes or os.PathLike object, not NoneType
Here is the log from Airflow Dashboard:
*** Found local files:
*** * /opt/airflow/logs/dag_id=redcap_etl/run_id=manual__2024-01-25T21:58:52.670290+00:00/task_id=run_redcap_etl_task/attempt=1.log
[2024-01-25, 21:58:56 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: redcap_etl.run_redcap_etl_task manual__2024-01-25T21:58:52.670290+00:00 [queued]>
[2024-01-25, 21:58:56 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: redcap_etl.run_redcap_etl_task manual__2024-01-25T21:58:52.670290+00:00 [queued]>
[2024-01-25, 21:58:56 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 of 2
[2024-01-25, 21:58:56 UTC] {taskinstance.py:1327} INFO - Executing <Task(PythonOperator): run_redcap_etl_task> on 2024-01-25 21:58:52.670290+00:00
[2024-01-25, 21:58:56 UTC] {standard_task_runner.py:57} INFO - Started process 7185 to run task
[2024-01-25, 21:58:56 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'redcap_etl', 'run_redcap_etl_task', 'manual__2024-01-25T21:58:52.670290+00:00', '--job-id', '30', '--raw', '--subdir', 'DAGS_FOLDER/redcap_etl.py', '--cfg-path', '/tmp/tmp2a2wt8nn']
[2024-01-25, 21:58:56 UTC] {standard_task_runner.py:85} INFO - Job 30: Subtask run_redcap_etl_task
[2024-01-25, 21:58:56 UTC] {task_command.py:410} INFO - Running <TaskInstance: redcap_etl.run_redcap_etl_task manual__2024-01-25T21:58:52.670290+00:00 [running]> on host a3c6392055a0
[2024-01-25, 21:58:56 UTC] {taskinstance.py:1547} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='redcap_etl' AIRFLOW_CTX_TASK_ID='run_redcap_etl_task' AIRFLOW_CTX_EXECUTION_DATE='2024-01-25T21:58:52.670290+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-01-25T21:58:52.670290+00:00'
[2024-01-25, 21:58:56 UTC] {logging_mixin.py:149} INFO - context value: {'conf': <***.configuration.AirflowConfigParser object at 0x7f93a834b250>, 'dag': <DAG: redcap_etl>, 'dag_run': <DagRun redcap_etl @ 2024-01-25 21:58:52.670290+00:00: manual__2024-01-25T21:58:52.670290+00:00, state:running, queued_at: 2024-01-25 21:58:52.690464+00:00. externally triggered: True>, 'data_interval_end': DateTime(2024, 1, 25, 21, 58, 52, 670290, tzinfo=Timezone('UTC')), 'data_interval_start': DateTime(2024, 1, 25, 21, 58, 52, 670290, tzinfo=Timezone('UTC')), 'ds': '2024-01-25', 'ds_nodash': '20240125', 'execution_date': DateTime(2024, 1, 25, 21, 58, 52, 670290, tzinfo=Timezone('UTC')), 'expanded_ti_count': None, 'inlets': [], 'logical_date': DateTime(2024, 1, 25, 21, 58, 52, 670290, tzinfo=Timezone('UTC')), 'macros': <module '***.macros' from '/home/***/.local/lib/python3.7/site-packages/***/macros/__init__.py'>, 'next_ds': '2024-01-25', 'next_ds_nodash': '20240125', 'next_execution_date': DateTime(2024, 1, 25, 21, 58, 52, 670290, tzinfo=Timezone('UTC')), 'outlets': [], 'params': {}, 'prev_data_interval_start_success': DateTime(2024, 1, 25, 21, 54, 38, 787991, tzinfo=Timezone('UTC')), 'prev_data_interval_end_success': DateTime(2024, 1, 25, 21, 54, 38, 787991, tzinfo=Timezone('UTC')), 'prev_ds': '2024-01-25', 'prev_ds_nodash': '20240125', 'prev_execution_date': DateTime(2024, 1, 25, 21, 58, 52, 670290, tzinfo=Timezone('UTC')), 'prev_execution_date_success': DateTime(2024, 1, 25, 21, 54, 38, 787991, tzinfo=Timezone('UTC')), 'prev_start_date_success': DateTime(2024, 1, 25, 21, 54, 39, 173563, tzinfo=Timezone('UTC')), 'run_id': 'manual__2024-01-25T21:58:52.670290+00:00', 'task': <Task(PythonOperator): run_redcap_etl_task>, 'task_instance': <TaskInstance: redcap_etl.run_redcap_etl_task manual__2024-01-25T21:58:52.670290+00:00 [running]>, 'task_instance_key_str': 'redcap_etl__run_redcap_etl_task__20240125', 'test_mode': False, 'ti': <TaskInstance: redcap_etl.run_redcap_etl_task manual__2024-01-25T21:58:52.670290+00:00 [running]>, 'tomorrow_ds': '2024-01-26', 'tomorrow_ds_nodash': '20240126', 'triggering_dataset_events': <Proxy at 0x7f939e3f90f0 with factory <function TaskInstance.get_template_context.<locals>.get_triggering_events at 0x7f939e3d7e60>>, 'ts': '2024-01-25T21:58:52.670290+00:00', 'ts_nodash': '20240125T215852', 'ts_nodash_with_tz': '20240125T215852.670290+0000', 'var': {'json': None, 'value': None}, 'conn': None, 'yesterday_ds': '2024-01-24', 'yesterday_ds_nodash': '20240124', 'templates_dict': None}
[2024-01-25, 21:58:56 UTC] {logging_mixin.py:149} INFO - redcap_study: None
[2024-01-25, 21:58:56 UTC] {logging_mixin.py:149} INFO - script: None
[2024-01-25, 21:58:56 UTC] {logging_mixin.py:149} INFO - row_id: None
[2024-01-25, 21:58:57 UTC] {logging_mixin.py:149} INFO - result1 from celery -------: 5f4ea609-0da5-4a41-88eb-fb875e35fed9
[2024-01-25, 21:58:57 UTC] {python.py:183} INFO - Done. Returned value was: None
[2024-01-25, 21:58:57 UTC] {taskinstance.py:1350} INFO - Marking task as SUCCESS. dag_id=redcap_etl, task_id=run_redcap_etl_task, execution_date=20240125T215852, start_date=20240125T215856, end_date=20240125T215857
[2024-01-25, 21:58:57 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2024-01-25, 21:58:57 UTC] {taskinstance.py:2653} INFO - 1 downstream tasks scheduled from follow-on schedule check
Here is screenshot of Task and Dag successful for that run.
I have tried result.get() to retrieve error and status code from Celery however it is not giving anything back. Any help would be appreciated. :)
