Airflow Scheduler Race Condition With Kubernetes Executor

by SLV Team 58 views
Airflow Scheduler Race Condition with Kubernetes Executor

Hey everyone! Let's dive into a tricky issue some of us have been facing with Apache Airflow, especially when using the Kubernetes executor. We're talking about a race condition in the scheduler that can lead to some unexpected behavior. This article will break down the problem, how to reproduce it, and what we can do about it. So, buckle up, and let's get started!

The Problem: TaskInstance Duplication

The core issue revolves around task instances being created multiple times, specifically when you have more than one scheduler replica running. Imagine this: you've upgraded to Airflow 3.0.4 (or even 3.0.6 and 3.1.1), you're using the Helm chart v1.18.0 to deploy on EKS, and your backend DB is AWS RDS Postgres. All sounds good, right? But then, bam!, a task gets its TaskInstance created twice, each by a different scheduler, before the first one even starts running. This leads to two worker pods being created, and one of them inevitably fails with an invalid_state error.

This race condition occurs because both schedulers are racing to schedule the same task. When the first scheduler attempts to create a TaskInstance, it may not immediately update the database in a way that the second scheduler recognizes. The second scheduler, seeing that the task hasn't been scheduled, also creates a TaskInstance. This results in duplicate TaskInstances with different try_number values.

This duplication issue is particularly problematic because it violates the expected behavior of Airflow. A task's retry TaskInstance should only be created after the previous instance has failed. When multiple schedulers are involved, this sequencing can get disrupted, leading to the invalid_state error. This error arises when a worker pod attempts to mark a task instance as running, but the task instance is already in a state where it cannot transition to running, such as success or another state.

The impact of this race condition extends beyond just failed tasks. It can lead to wasted resources, as two worker pods are created instead of one. This is particularly concerning in resource-constrained environments. Additionally, the duplicate task instances can complicate monitoring and debugging, making it harder to track the true state of your DAG runs. To further illustrate, consider a scenario where a task interacts with an external system. If the task runs twice due to this issue, it can lead to unintended side effects, such as duplicate entries in a database or multiple API calls.

Diving into the Error Logs

Let's take a peek at an example error log from a failed pod to understand what's happening under the hood:

> {"timestamp":"2025-10-25T02:49:26.475102Z","level":"info","event":"Executing workload","workload":"ExecuteTask(token='aaabbbbcccc', ti=TaskInstance(id=UUID('01994b35-8445-7dd3-bcd8-ecc39e0e448b'), task_id='spam_7.task_2', dag_id='spam_tasks_multiple_groups_dag', run_id='scheduled__2025-10-25T02:30:00+00:00', try_number=1, map_index=-1, pool_slots=1, queue='default', priority_weight=6, executor_config=None, parent_context_carrier={}, context_carrier={}, queued_dttm=None), dag_rel_path=PurePosixPath(''yeng-test/spam-dags.py'), bundle_info=BundleInfo(name='airflow-pipes', version='xx'), log_path='dag_id=spam_tasks_multiple_groups_dag/run_id=manual__2025-10-31T12:08:41+00:00/task_id=spam_7.task_2/attempt=1.log', type='ExecuteTask')","logger":"__main__"}
> {"timestamp":"2025-10-25T02:49:27.001212Z","level":"info","event":"Connecting to server:","server":"http://airflow-api-server:8080/execution/","logger":"__main__"}
> {"timestamp":"2025-10-25T02:49:27.068125Z","level":"info","event":"Secrets backends loaded for worker","count":1,"backend_classes":["EnvironmentVariablesBackend"],"logger":"supervisor"}
> {"timestamp":"2025-10-25T02:49:27.095790Z","level":"warning","event":"Server error","detail":{"detail":{"reason":"invalid_state","message":"TI was not in a state where it could be marked as running","previous_state":"success"}},"logger":"airflow.sdk.api.client"}
> {"timestamp":"2025-10-25T02:49:27.140185Z","level":"info","event":"Process exited","pid":14,"exit_code":-9,"signal_sent":"SIGKILL","logger":"supervisor"}
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.10/runpy.py", line 196, in _run_module_as_main
>     return _run_code(code, main_globals, None,
>   File "/usr/local/lib/python3.10/runpy.py", line 86, in _run_code
>     exec(code, run_globals)
>   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/execute_workload.py", line 125, in <module>
>     main()
>   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/execute_workload.py", line 121, in main
>     execute_workload(workload)
>   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/execute_workload.py", line 66, in execute_workload
>     supervise(
>   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py", line 1829, in supervise
>     process = ActivitySubprocess.start(
>   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py", line 933, in start
>     proc._on_child_started(ti=what, dag_rel_path=dag_rel_path, bundle_info=bundle_info)
>   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py", line 944, in _on_child_started
>     ti_context = self.client.task_instances.start(ti.id, self.pid, start_date)
>   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/api/client.py", line 152, in start
>     resp = self.client.patch(f"task-instances/{id}/run", content=body.model_dump_json())
>   File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", line 1218, in patch
>     return self.request(
>   File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 338, in wrapped_f
>     return copy(f, *args, **kw)
>   File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 477, in __call__
>     do = self.iter(retry_state=retry_state)
>   File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 378, in iter
>     result = action(retry_state)
>   File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 400, in <lambda>
>     self._add_action_func(lambda rs: rs.outcome.result())
>   File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 451, in result
>     return self.__get_result()
>   File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
>     raise self._exception
>   File "/home/airflow/.local/lib/python3.10/site-packages/tenacity/__init__.py", line 480, in __call__
>     result = fn(*args, **kwargs)
>   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/api/client.py", line 735, in request
>     return super().request(*args, **kwargs)
>   File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", line 825, in request
>     return self.send(request, auth=auth, follow_redirects=follow_redirects)
>   File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", line 914, in send
>     response = self._send_handling_auth(
>   File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", line 942, in _send_handling_auth
>     response = self._send_handling_redirects(
>   File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", line 999, in _send_handling_redirects
>     raise exc
>   File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_client.py", line 982, in _send_handling_redirects
>     hook(response)
>   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/api/client.py", line 117, in raise_on_4xx_5xx
>     return get_json_error(response) or response.raise_for_status()
>   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/sdk/api/client.py", line 113, in get_json_error
>     raise err
> airflow.sdk.api.client.ServerResponseError: Server returned error

The key part here is the `