I am using AWS service MWAA, In that I have first DAG for refreshing the external tables of snowflake. It took 2 mins for it to complete the whole DAG (it has 9 task one by one to refresh 9 tables) out of which 20 sec are queued. Now, I want to use task concurrently. so I am running 3 task concurrently but still getting the same time output. I want to know the reason behind this queue and how can reduce the queue time. I am sharing the code as well as Screenshot for for understanding. enter image description here as you can see the queue time is much more than execution time.
My environment setting: enter image description here
`from airflow import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from datetime import datetime, timedelta
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from commons.dag_failure_notification import send_failure_email
default_args = {
"retries": 1,
"retry_delay": timedelta(minutes=5),
"on_failure_callback": send_failure_email,
}
DAG_ID = "test_External_Table_refresh"
with DAG(
dag_id=DAG_ID,
default_args=default_args,
schedule_interval="30 20 * * *",
catchup=False,
start_date=datetime(2023, 10, 13),
concurrency=3, # Set concurrency to allow parallel execution of tasks
) as dag:
# Define SnowflakeOperator task for refreshing FLIGHT_BOOKING table
flight_booking_task = SnowflakeOperator(
task_id="refresh_flight_booking_external_table",
sql="ALTER EXTERNAL TABLE RAW_DYNAMODB.RAW.FLIGHT_BOOKING REFRESH;",
snowflake_conn_id="Snowflake_dev",
dag=dag,
)
# Define SnowflakeOperator task for refreshing USERS table
users_task = SnowflakeOperator(
task_id="refresh_users_external_table",
sql="ALTER EXTERNAL TABLE RAW_DYNAMODB.RAW.USERS REFRESH;",
snowflake_conn_id="Snowflake_dev",
dag=dag,
)
# Define SnowflakeOperator task for refreshing PAYMENTS table
payments_task = SnowflakeOperator(
task_id="refresh_payments_external_table",
sql="ALTER EXTERNAL TABLE RAW_DYNAMODB.RAW.PAYMENTS REFRESH;",
snowflake_conn_id="Snowflake_dev",
dag=dag,
)
# Define SnowflakeOperator task for refreshing CANCELLATIONS table
cancellations_task = SnowflakeOperator(
task_id="refresh_cancellations_external_table",
sql="ALTER EXTERNAL TABLE RAW_DYNAMODB.RAW.CANCELLATIONS REFRESH;",
snowflake_conn_id="Snowflake_dev",
dag=dag,
)
# Define SnowflakeOperator task for refreshing REFUNDS table
refunds_task = SnowflakeOperator(
task_id="refresh_refunds_external_table",
sql="ALTER EXTERNAL TABLE RAW_DYNAMODB.RAW.REFUNDS REFRESH;",
snowflake_conn_id="Snowflake_dev",
dag=dag,
)
# Define SnowflakeOperator task for refreshing WALLET table
wallet_task = SnowflakeOperator(
task_id="refresh_wallet_external_table",
sql="ALTER EXTERNAL TABLE RAW_DYNAMODB.RAW.WALLET REFRESH;",
snowflake_conn_id="Snowflake_dev",
dag=dag,
)
# Define SnowflakeOperator task for refreshing WALLET_TRANSACTIONS table
wallet_transactions_task = SnowflakeOperator(
task_id="refresh_wallet_transactions_external_table",
sql="ALTER EXTERNAL TABLE RAW_DYNAMODB.RAW.WALLET_TRANSACTIONS REFRESH;",
snowflake_conn_id="Snowflake_dev",
dag=dag,
)
# Define TriggerDagRunOperator task
trigger_dbt_deps = TriggerDagRunOperator(
task_id="trigger_dbt_deps", trigger_dag_id="test_DBT_deps", dag=dag
)
# Set up dependencies
[users_task, flight_booking_task] >> trigger_dbt_deps
payments_task >> cancellations_task >> refunds_task >> wallet_task >>
wallet_transactions_task >> trigger_dbt_deps`
- I tried removing dependencies.
- I tried to change code.