Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Builds: check celery and redis before marking a build as stale #8269

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 115 additions & 29 deletions readthedocs/projects/tasks/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import base64
import datetime
import json
import os
import shutil


from celery.worker.request import Request
import redis
import structlog

from celery.worker.request import Request
from django.conf import settings
from django.db.models import Q
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
Expand Down Expand Up @@ -78,42 +80,126 @@ def clean_project_resources(project, version=None):
)


@app.task()
@app.task(queue="web")
def finish_inactive_builds():
"""
Finish inactive builds.

A build is consider inactive if it's not in ``FINISHED`` state and it has been
"running" for more time that the allowed one (``Project.container_time_limit``
or ``DOCKER_LIMITS['time']`` plus a 20% of it).
A build is consider inactive if all the followings are true:

- it's not in ``FINISHED`` state
- it was created +15 minutes ago
- there is not task queued for it on redis
- celery is not currently executing it

These inactive builds will be marked as ``success`` and ``FINISHED`` with an
``error`` to be communicated to the user.
These inactive builds will be marked as ``success=False`` and ``FINISHED`` with an
``error`` to communicate this problem to the user.
"""
# TODO similar to the celery task time limit, we can't infer this from
# Docker settings anymore, because Docker settings are determined on the
# build servers dynamically.
# time_limit = int(DOCKER_LIMITS['time'] * 1.2)
# Set time as maximum celery task time limit + 5m
time_limit = 7200 + 300
time_limit = 15 * 60 # 15 minutes
delta = datetime.timedelta(seconds=time_limit)
query = (
~Q(state=BUILD_STATE_FINISHED) & Q(date__lte=timezone.now() - delta)
)

builds_finished = 0
stale_build_pks = []
builds = Build.objects.filter(query)[:50]
for build in builds:

if build.project.container_time_limit:
custom_delta = datetime.timedelta(
seconds=int(build.project.container_time_limit),
)
if build.date + custom_delta > timezone.now():
# Do not mark as FINISHED builds with a custom time limit that wasn't
# expired yet (they are still building the project version)
continue
redis_client = redis.Redis.from_url(settings.BROKER_URL)

log.info("Builds not yet finished to check for stale.", count=builds.count())
for build in builds:
build_stale = True

# 1. check if it's being executed by celery
#
# Ask Celery for all the tasks their workers are running and filter
# them by `update_docs_task` only. Over those tasks we check if the
# argument `build_pk` is the same than the build object we are
# checking. In case it matches, we mark the build as NOT stale.
tasks_running = app.control.inspect().active().items()
log.debug(
"Celery active tasks running.",
tasks=tasks_running,
)
for queue, tasks in tasks_running:
for task in tasks:
try:
if (
task.get("name")
!= "readthedocs.projects.tasks.builds.update_docs_task"
):
log.debug(
"Skipping not build task. task=%s",
task.get("name"),
)
continue

task_build_pk = task.get("kwargs", {}).get("build_id")
log.info(
"Task running.",
task=task.get("name"),
build_id=task_build_pk,
)

if task_build_pk == build.pk:
# The build is not stale, it's being executed
log.info(
"Build not stale. Found as an active task on Celery.",
build_id=build.pk,
)
build_stale = False
break
except Exception:
log.exception(
"There was an error accesssing the Celery task inspect."
)

if not build_stale:
# Continue with the following build that matches the filter
break

# 2. check if it's queued on redis
#
# Ask Redis for all the queues starting with `build*` (build:default,
# build:large). Then, we check if `build_id` queued matches with the
# build object we are checking. In that case we mark the build as NOT
# stale.
for queue in redis_client.keys("build*"):
log.debug("Redis queue with queued tasks.", queue=queue)
for task in redis_client.lrange(queue, 0, -1):
try:
task = json.loads(task)
body = json.loads(base64.b64decode(task["body"]))
# ``body`` is a 3-element list
# [
# [int],
# {'record': bool, 'force': bool, 'commit': None, 'build_id': int},
# {'callbacks': None, 'errbacks': None, 'chain': None, 'chord': None},
# ]
log.debug("Redis queue task.", queue=queue, body=body)

build_pk = body[1]["build_id"]
if build_pk == build.pk:
# The build is not stale, it's queued to be executed
log.info(
"Build not stale. Found as a queued task in Redis.",
build_id=build.pk,
)
build_stale = False
break
except Exception:
log.exception(
"There was an error accesssing the Celery task from Redis."
)

if not build_stale:
# Continue with the following build that matches the filter
break

if build_stale:
stale_build_pks.append(build.pk)

for build_pk in stale_build_pks:
build = Build.objects.get(pk=build_pk)
build.success = False
build.state = BUILD_STATE_FINISHED
build.error = _(
Expand All @@ -122,11 +208,11 @@ def finish_inactive_builds():
'request with and reference this build id ({}).'.format(build.pk),
)
build.save()
builds_finished += 1

log.info(
log.warning(
'Builds marked as "Terminated due inactivity".',
count=builds_finished,
count=len(stale_build_pks),
build_ids=stale_build_pks,
)


Expand Down
69 changes: 1 addition & 68 deletions readthedocs/rtd_tests/tests/test_project.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import datetime
import json
from unittest import mock
from unittest.mock import patch

import pytest
from django.contrib.auth.models import User
from django.forms.models import model_to_dict
from django.test import TestCase
from django.utils import timezone
from django_dynamic_fixture import get
from rest_framework.reverse import reverse

Expand All @@ -19,12 +16,11 @@
LATEST,
TAG,
)
from readthedocs.builds.models import Build, Version
from readthedocs.builds.models import Version
from readthedocs.oauth.services import GitHubService, GitLabService
from readthedocs.projects.constants import GITHUB_BRAND, GITLAB_BRAND
from readthedocs.projects.exceptions import ProjectConfigurationError
from readthedocs.projects.models import Project
from readthedocs.projects.tasks.utils import finish_inactive_builds
from readthedocs.rtd_tests.mocks.paths import fake_paths_by_regex


Expand Down Expand Up @@ -549,66 +545,3 @@ def test_user_can_change_project_with_same_lang(self):
)
self.assertEqual(resp.status_code, 200)
self.assertNotContains(resp, 'There is already a')


class TestFinishInactiveBuildsTask(TestCase):
fixtures = ['eric', 'test_data']

def setUp(self):
self.client.login(username='eric', password='test')
self.pip = Project.objects.get(slug='pip')

self.taggit = Project.objects.get(slug='taggit')
self.taggit.container_time_limit = 7200 # 2 hours
self.taggit.save()

# Build just started with the default time
self.build_1 = Build.objects.create(
project=self.pip,
version=self.pip.get_stable_version(),
state=BUILD_STATE_CLONING,
)

# Build started an hour ago with default time
self.build_2 = Build.objects.create(
project=self.pip,
version=self.pip.get_stable_version(),
state=BUILD_STATE_TRIGGERED,
)
self.build_2.date = (
timezone.now() - datetime.timedelta(hours=1)
)
self.build_2.save()

# Build started an hour ago with custom time (2 hours)
self.build_3 = Build.objects.create(
project=self.taggit,
version=self.taggit.get_stable_version(),
state=BUILD_STATE_TRIGGERED,
)
self.build_3.date = (
timezone.now() - datetime.timedelta(hours=1)
)
self.build_3.save()

@pytest.mark.xfail(reason='Fails while we work out Docker time limits', strict=True)
def test_finish_inactive_builds_task(self):
finish_inactive_builds()

# Legitimate build (just started) not finished
self.build_1.refresh_from_db()
self.assertTrue(self.build_1.success)
self.assertEqual(self.build_1.error, '')
self.assertEqual(self.build_1.state, BUILD_STATE_CLONING)

# Build with default time finished
self.build_2.refresh_from_db()
self.assertFalse(self.build_2.success)
self.assertNotEqual(self.build_2.error, '')
self.assertEqual(self.build_2.state, BUILD_STATE_FINISHED)

# Build with custom time not finished
self.build_3.refresh_from_db()
self.assertTrue(self.build_3.success)
self.assertEqual(self.build_3.error, '')
self.assertEqual(self.build_3.state, BUILD_STATE_TRIGGERED)
4 changes: 2 additions & 2 deletions readthedocs/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,9 @@ def TEMPLATES(self):

CELERY_DEFAULT_QUEUE = 'celery'
CELERYBEAT_SCHEDULE = {
'quarter-finish-inactive-builds': {
'every-5-minutes-finish-inactive-builds': {
'task': 'readthedocs.projects.tasks.utils.finish_inactive_builds',
'schedule': crontab(minute='*/15'),
'schedule': crontab(minute='*/5'),
'options': {'queue': 'web'},
},
'every-three-hour-clear-persistent-messages': {
Expand Down