Setup: Airflow 2.0.1 with Kubernetes 1.18 and Python 3.8, Kubernetes Client: 18.17.x
Pod template file:
apiVersion: v1
kind: Pod
metadata:
name: workerPod
spec:
containers:
- args: []
command: []
env:
- name: <Key>
value: "<value>"
envFrom: []
name: base
image: "<image_name>"
imagePullSecrets: [name: "<image_pull_secrets>"]
imagePullPolicy: "Always"
ports: []
volumeMounts:
- mountPath: "<path>"
name: "<name>"
The default config set in airflow.cfg is as follows:
[kubernetes]
pod_template_file = <path to template file>
worker_container_repository = <base-default-image>
worker_container_tag = <tag>
namespace = airflow
delete_worker_pods = True
delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 1
multi_namespace_mode = False
in_cluster = True
kube_client_request_args =
delete_option_kwargs =
enable_tcp_keepalive = False
tcp_keep_idle = 120
tcp_keep_intvl = 30
tcp_keep_cnt = 6
dags_in_image = True
dags_volume_mount_point = <volume-mount-point>
image_pull_secrets = <default-pull-secrets>
The problem is that, while certain keys are being read correctly from the pod_template_file, for instance, I can see all the env variables
be set correctly as well as imagePullPolicy
being read correctly as well (validated by overriding value of imagePullPolicy: "Always"
from imagePullPolicy: "IfNotPresent"
), but the key for imagePullSecrets
is not being read correctly. I can validate this, as I get a Base credentials not provided
error when the image is being pulled from the ecr repo. I have validated that the credentials are correct and I can create a pod when trying to do so explicitly.
Even when trying to set the imagePullSecrets
in the airflow.cfg
directly, I still end up getting the same error.
I have also tried to create the pod override using the V1 api explicitly as follows:
start_task = PythonOperator(
task_id=<start_task_id>, python_callable=<start_task_callabel>, op_args=[<args>], dag=dag,
executor_config={
"pod_template_file": "<path_to_template>",
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
image="<image_override>",
image_pull_policy="<pull_policy>"
),
],
image_pull_secrets=[k8s.V1LocalObjectReference('<image_pull_secrets>')],
)
),
},
)
In this case I can get the docker image to be used downloaded correctly without any authentication errors. But unfortunately, the pod throws an error: AttributeError: 'V1Container' object has no attribute '_startup_probe'
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.8/dist-packages/airflow/__main__.py", line 40, in main
args.func(args)
File "/usr/local/lib/python3.8/dist-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/airflow/utils/cli.py", line 89, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 234, in task_run
_run_task_by_selected_method(args, dag, ti)
File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
_run_task_by_local_task_job(args, ti)
File "/usr/local/lib/python3.8/dist-packages/airflow/cli/commands/task_command.py", line 120, in _run_task_by_local_task_job
run_job.run()
File "/usr/local/lib/python3.8/dist-packages/airflow/jobs/base_job.py", line 237, in run
self._execute()
File "/usr/local/lib/python3.8/dist-packages/airflow/jobs/local_task_job.py", line 84, in _execute
if not self.task_instance.check_and_change_state_before_execution(
File "/usr/local/lib/python3.8/dist-packages/airflow/utils/session.py", line 65, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 1029, in check_and_change_state_before_execution
session.commit()
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 1046, in commit
self.transaction.commit()
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 504, in commit
self._prepare_impl()
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 483, in _prepare_impl
self.session.flush()
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2540, in flush
self._flush(objects)
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2682, in _flush
transaction.rollback(_capture_exception=True)
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
compat.raise_(
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/session.py", line 2642, in _flush
flush_context.execute()
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
rec.execute(self)
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/unitofwork.py", line 586, in execute
persistence.save_obj(
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 230, in save_obj
_emit_update_statements(
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 885, in _emit_update_statements
for (
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/orm/persistence.py", line 626, in _collect_update_commands
state.manager[propkey].impl.is_equal(
File "/usr/local/lib/python3.8/dist-packages/sqlalchemy/sql/sqltypes.py", line 1738, in compare_values
return x == y
File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod.py", line 221, in __eq__
return self.to_dict() == other.to_dict()
File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod.py", line 196, in to_dict
result[attr] = value.to_dict()
File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod_spec.py", line 1004, in to_dict
result[attr] = list(map(
File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_pod_spec.py", line 1005, in <lambda>
lambda x: x.to_dict() if hasattr(x, "to_dict") else x,
File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_container.py", line 660, in to_dict
value = getattr(self, attr)
File "/usr/local/lib/python3.8/dist-packages/kubernetes/client/models/v1_container.py", line 458, in startup_probe
return self._startup_probe
AttributeError: 'V1Container' object has no attribute '_startup_probe'
I had a similar issue, which started occuring when we updated our airflow version. the problem was that we were installing an older version of kubernetes which was incompatible with the latest airflow, while creating the custom kubernetes container(in the dockerfile). Using the latest version of kubernetes in the dockerfile fixed the issue.