Skip to content

Commit

Permalink
Spend less time waiting for LocalTaskJob's subprocss process to finish (
Browse files Browse the repository at this point in the history
apache#11373)

* Spend less time waiting for LocalTaskJob's subprocss process to finish

This is about is about a 20% speed up for short running tasks!

This change doesn't affect the "duration" reported in the TI table, but
does affect the time before the slot is freeded up from the executor -
which does affect overall task/dag throughput.

(All these tests are with the same BashOperator tasks, just running `echo 1`.)

**Before**

```
Task airflow.executors.celery_executor.execute_command[5e0bb50c-de6b-4c78-980d-f8d535bbd2aa] succeeded in 6.597011625010055s: None
Task airflow.executors.celery_executor.execute_command[0a39ec21-2b69-414c-a11b-05466204bcb3] succeeded in 6.604327297012787s: None

```

**After**

```
Task airflow.executors.celery_executor.execute_command[57077539-e7ea-452c-af03-6393278a2c34] succeeded in 1.7728257849812508s: None
Task airflow.executors.celery_executor.execute_command[9aa4a0c5-e310-49ba-a1aa-b0760adfce08] succeeded in 1.7124666879535653s: None
```

**After, including change from apache#11372**

```
Task airflow.executors.celery_executor.execute_command[35822fc6-932d-4a8a-b1d5-43a8b35c52a5] succeeded in 0.5421732050017454s: None
Task airflow.executors.celery_executor.execute_command[2ba46c47-c868-4c3a-80f8-40adaf03b720] succeeded in 0.5469810889917426s: None
```
  • Loading branch information
ashb authored Oct 13, 2020
1 parent b8cecf5 commit 623d5cd
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 12 deletions.
6 changes: 3 additions & 3 deletions airflow/executors/dask_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def _process_future(self, future: Future) -> None:
self.futures.pop(future)

def sync(self) -> None:
if not self.futures:
if self.futures is None:
raise AirflowException(NOT_STARTED_MESSAGE)
# make a copy so futures can be popped during iteration
for future in self.futures.copy():
Expand All @@ -109,14 +109,14 @@ def sync(self) -> None:
def end(self) -> None:
if not self.client:
raise AirflowException(NOT_STARTED_MESSAGE)
if not self.futures:
if self.futures is None:
raise AirflowException(NOT_STARTED_MESSAGE)
self.client.cancel(list(self.futures.keys()))
for future in as_completed(self.futures.copy()):
self._process_future(future)

def terminate(self):
if not self.futures:
if self.futures is None:
raise AirflowException(NOT_STARTED_MESSAGE)
self.client.cancel(self.futures.keys())
self.end()
19 changes: 15 additions & 4 deletions airflow/jobs/local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,22 @@ def signal_handler(signum, frame):
try:
self.task_runner.start()

heartbeat_time_limit = conf.getint('scheduler',
'scheduler_zombie_task_threshold')
heartbeat_time_limit = conf.getint('scheduler', 'scheduler_zombie_task_threshold')

while True:
# Monitor the task to see if it's done
return_code = self.task_runner.return_code()
# Monitor the task to see if it's done. Wait in a syscall
# (`os.wait`) for as long as possible so we notice the
# subprocess finishing as quick as we can
max_wait_time = max(
0, # Make sure this value is never negative,
min(
(heartbeat_time_limit -
(timezone.utcnow() - self.latest_heartbeat).total_seconds() * 0.75),
self.heartrate,
)
)

return_code = self.task_runner.return_code(timeout=max_wait_time)
if return_code is not None:
self.log.info("Task exited with return code %s", return_code)
return
Expand Down
5 changes: 4 additions & 1 deletion airflow/task/task_runner/standard_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,10 @@ def terminate(self):
if self.process is None:
return

if self.process.is_running():
# Reap the child process - it may already be finished
_ = self.return_code(timeout=0)

if self.process and self.process.is_running():
rcs = reap_process_group(self.process.pid, self.log)
self._rc = rcs.get(self.process.pid)

Expand Down
2 changes: 1 addition & 1 deletion tests/dags/test_heartbeat_failed_fast.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@
dag = DAG(dag_id='test_heartbeat_failed_fast', default_args=args)
task = BashOperator(
task_id='test_heartbeat_failed_fast_op',
bash_command='sleep 5',
bash_command='sleep 7',
dag=dag)
4 changes: 1 addition & 3 deletions tests/jobs/test_local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,10 @@ def test_localtaskjob_heartbeat(self, mock_pid):
mock_pid.return_value = 2
self.assertRaises(AirflowException, job1.heartbeat_callback)

@patch('os.getpid')
def test_heartbeat_failed_fast(self, mock_getpid):
def test_heartbeat_failed_fast(self):
"""
Test that task heartbeat will sleep when it fails fast
"""
mock_getpid.return_value = 1
self.mock_base_job_sleep.side_effect = time.sleep

with create_session() as session:
Expand Down

0 comments on commit 623d5cd

Please sign in to comment.