Airflow custom Kube Operator template fields not working for image_pull_Secrets

79 Views Asked by At

In my KubernetesPodOperator I want to assign dynamic values to namespace, servic_account_name and image_pull secrets. I am using jijna template. Below is the code for my custom KubernetesPodOperator. namespace, servic_account_name are correctly resolved at run time where as image_pull_secrets is not. When I check pod description image_pull_secrets is not rendered
Pod description with image_pull_secrets. I can see the rendered values in UI attached both images . Pod description. Airflow UI Rendered template

class CustomKubeOperator(KubernetesPodOperator):
    """ Creates Custom Kube Operator"""
    template_fields = ("image",
                        "namespace",
                        "cluster_context",
                        "image_pull_secrets",
                        "service_account_name",
                        "on_finish_action",)
    print("====== template_fields ========")
    print(template_fields)

    def __init__(self, dag, kpo_config_dict, **kwargs):
        super().__init__(
            dag=dag,
            get_logs=True,
            log_events_on_failure=True,
            **kpo_config_dict,  # Pass all parameters from kpo_k8s_config
            **kwargs

        )
        self.dag = dag
        self.kwargs = kwargs
        self.kpo_config_dict = kpo_config_dict
        self.logger = logging.getLogger(__name__)

    def execute(self, context):
        # Add logging to check the XCom values
        image_pull_secrets_cm= "{{ task_instance.xcom_pull(key='image_pull_secrets', task_ids='job_start') }}"
        service_account_cm = "{{ task_instance.xcom_pull(key='service_account', task_ids='job_start') }}"

        self.logger.info("XCom image_pull_secrets: %s",
                      context['task_instance'].xcom_pull(key='image_pull_secrets_cm', task_ids='job_start'))
        self.logger.info("XCom service_account: %s",
                      context['task_instance'].xcom_pull(key='service_account', task_ids='job_start'))

        self.logger.info("XCom image_pull_secrets_cm: %s",image_pull_secrets_cm)
        self.logger.info("XCom image_pull_secrets_cm: %s", service_account_cm)

        super().execute(context)
 registry_url = "{{ task_instance.xcom_pull(key='registry_url',task_ids='job_start') }}"
        common_images_namespace = "{{ task_instance.xcom_pull(key='common_images_namespace', " \
                                     "task_ids='job_start') }} "
       
        image_pull_secrets_cm = "{{ task_instance.xcom_pull(key='image_pull_secrets_cm', task_ids='job_start') }}"
        service_account = "{{ task_instance.xcom_pull(key='service_account', task_ids='job_start') }}"
        in_cluster = "{{ task_instance.xcom_pull(key='is_airflow_in_cluster', task_ids='job_start') }}"
        namespace = "{{ task_instance.xcom_pull(key='namespace', task_ids='job_start') }}"
kpo_k8s_config = K8sPodOperatorConfig(
                task_id=task_id,
                pod_name=pod_name,
                image_pull_secrets=image_pull_secrets_cm,
                image_pull_policy=image_pull_policy,
                on_finish_action=on_finish_action,
                security_context=security_context,
                service_account_name=service_account,
                image=registry_url + '/' + common_paas_images_namespace + '/' + image_name,
                arguments=command_arguments,
                volumes=configmap_volumes,
                volume_mounts=configmap_volume_mounts
            )
        operator_config_dict = kpo_k8s_config.to_dict()
        custom_kube_opr = CustomKubeOperator(dag=dag,
                                             kpo_config_dict=operator_config_dict,
                                             **kwargs)

I tried to print the values, I am able to see template fields also in the UI

0

There are 0 best solutions below