Skip to content

Commit

Permalink
Add logging and adapt the algorithm after extense QA locally
Browse files Browse the repository at this point in the history
  • Loading branch information
humitos committed Oct 6, 2021
1 parent 00d6df2 commit 2f0e826
Showing 1 changed file with 78 additions and 22 deletions.
100 changes: 78 additions & 22 deletions readthedocs/projects/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1822,7 +1822,7 @@ def clean_project_resources(project, version=None):
)


@app.task()
@app.task(queue='web')
def finish_inactive_builds():
"""
Finish inactive builds.
Expand All @@ -1847,38 +1847,93 @@ def finish_inactive_builds():
builds = Build.objects.filter(query)[:50]
redis_client = redis.Redis.from_url(settings.BROKER_URL)

log.info(
'Builds not yet finished to check for stale. count=%s',
builds.count()
)
for build in builds:
build_stale = True

# 1. check if it's being executed by celery
for queue, tasks in app.control.inspect().active().items():
#
# Ask Celery for all the tasks their workers are running and filter
# them by `update_docs_task` only. Over those tasks we check if the
# argument `build_pk` is the same than the build object we are
# checking. In case it matches, we mark the build as NOT stale.
tasks_running = app.control.inspect().active().items()
log.debug(
'Celery active tasks running. tasks=%s',
tasks_running,
)
for queue, tasks in tasks_running:
for task in tasks:
if task['kwargs']['build_pk'] == build.pk:
# The build is not stale, it's being executed
build_stale = False
break
try:
if task.get('name') != 'readthedocs.projects.tasks.update_docs_task':
log.debug(
'Skipping not build task. task=%s',
task.get('name'),
)
continue

task_build_pk = task.get('kwargs', {}).get('build_pk')
log.info(
'Task running. task=%s build_pk=%s',
task.get('name'),
task_build_pk,
)

if task_build_pk == build.pk:
# The build is not stale, it's being executed
log.info(
'Build not stale. Found as an active task on Celery. pk=%s',
build.pk,
)
build_stale = False
break
except Exception:
log.exception('There was an error accesssing the Celery task inspect.')

if not build_stale:
# Continue with the following build that matches the filter
break

# 2. check if it's queued on redis
for queue in redis_client.keys('build*'):
#
# Ask Redis for all the queues starting with `build*` (build:default,
# build:large) and `celery*` (the default queue with priority: celery3,
# celery5, celery7) that have queued tasks. Then, we check if
# `build_pk` queued matches with the build object we are checking. In
# that case we mark the build as NOT stale.
for queue in redis_client.keys('build*') + redis_client.keys('celery*'):
if queue.startswith(b'celery-task-meta'):
# Skip internal Celery queues that start with `celery*` but we
# are not interested in
continue

log.debug('Redis queue with queued tasks. queue=%s', queue)
for task in redis_client.lrange(queue, 0, -1):
task = json.loads(task)
body = json.loads(base64.b64decode(task['body']))
# ``body`` is a 3-element list
# [
# [int],
# {'record': bool, 'force': bool, 'commit': None, 'build_pk': int},
# {'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None},
# ]

build_pk = body[1]['build_pk']
if build_pk == build.pk:
# The build is not stale, it's queued to be executed
build_stale = False
break
try:
task = json.loads(task)
body = json.loads(base64.b64decode(task['body']))
# ``body`` is a 3-element list
# [
# [int],
# {'record': bool, 'force': bool, 'commit': None, 'build_pk': int},
# {'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None},
# ]
log.debug('Redis queue task. queue=%s body=%s', queue, body)

build_pk = body[1]['build_pk']
if build_pk == build.pk:
# The build is not stale, it's queued to be executed
log.info(
'Build not stale. Found as a queued task in Redis. pk=%s',
build.pk,
)
build_stale = False
break
except Exception:
log.exception('There was an error accesssing the Celery task from Redis.')

if not build_stale:
# Continue with the following build that matches the filter
Expand All @@ -1899,8 +1954,9 @@ def finish_inactive_builds():
build.save()

log.info(
'Builds marked as "Terminated due inactivity": %s',
'Builds marked as "Terminated due inactivity". count=%s pks=%s',
len(stale_build_pks),
stale_build_pks,
)


Expand Down

0 comments on commit 2f0e826

Please sign in to comment.