In my KubernetesPodOperator I want to assign dynamic values to namespace, servic_account_name and image_pull secrets. I am using jijna template. Below is the code for my custom KubernetesPodOperator. namespace, servic_account_name are correctly resolved at run time where as image_pull_secrets is not. When I check pod description image_pull_secrets is not rendered
Pod description with image_pull_secrets. I can see the rendered values in UI attached both images . Pod description. Airflow UI Rendered template
class CustomKubeOperator(KubernetesPodOperator):
""" Creates Custom Kube Operator"""
template_fields = ("image",
"namespace",
"cluster_context",
"image_pull_secrets",
"service_account_name",
"on_finish_action",)
print("====== template_fields ========")
print(template_fields)
def __init__(self, dag, kpo_config_dict, **kwargs):
super().__init__(
dag=dag,
get_logs=True,
log_events_on_failure=True,
**kpo_config_dict, # Pass all parameters from kpo_k8s_config
**kwargs
)
self.dag = dag
self.kwargs = kwargs
self.kpo_config_dict = kpo_config_dict
self.logger = logging.getLogger(__name__)
def execute(self, context):
# Add logging to check the XCom values
image_pull_secrets_cm= "{{ task_instance.xcom_pull(key='image_pull_secrets', task_ids='job_start') }}"
service_account_cm = "{{ task_instance.xcom_pull(key='service_account', task_ids='job_start') }}"
self.logger.info("XCom image_pull_secrets: %s",
context['task_instance'].xcom_pull(key='image_pull_secrets_cm', task_ids='job_start'))
self.logger.info("XCom service_account: %s",
context['task_instance'].xcom_pull(key='service_account', task_ids='job_start'))
self.logger.info("XCom image_pull_secrets_cm: %s",image_pull_secrets_cm)
self.logger.info("XCom image_pull_secrets_cm: %s", service_account_cm)
super().execute(context)
registry_url = "{{ task_instance.xcom_pull(key='registry_url',task_ids='job_start') }}"
common_images_namespace = "{{ task_instance.xcom_pull(key='common_images_namespace', " \
"task_ids='job_start') }} "
image_pull_secrets_cm = "{{ task_instance.xcom_pull(key='image_pull_secrets_cm', task_ids='job_start') }}"
service_account = "{{ task_instance.xcom_pull(key='service_account', task_ids='job_start') }}"
in_cluster = "{{ task_instance.xcom_pull(key='is_airflow_in_cluster', task_ids='job_start') }}"
namespace = "{{ task_instance.xcom_pull(key='namespace', task_ids='job_start') }}"
kpo_k8s_config = K8sPodOperatorConfig(
task_id=task_id,
pod_name=pod_name,
image_pull_secrets=image_pull_secrets_cm,
image_pull_policy=image_pull_policy,
on_finish_action=on_finish_action,
security_context=security_context,
service_account_name=service_account,
image=registry_url + '/' + common_paas_images_namespace + '/' + image_name,
arguments=command_arguments,
volumes=configmap_volumes,
volume_mounts=configmap_volume_mounts
)
operator_config_dict = kpo_k8s_config.to_dict()
custom_kube_opr = CustomKubeOperator(dag=dag,
kpo_config_dict=operator_config_dict,
**kwargs)
I tried to print the values, I am able to see template fields also in the UI