Skip to content
This repository was archived by the owner on Aug 10, 2024. It is now read-only.

Commit 214137f

Browse files
committed
feat: improve celery stability
1 parent d2abb22 commit 214137f

File tree

5 files changed

+19
-11
lines changed

5 files changed

+19
-11
lines changed

app/views/views.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import logging
33

44
from celery.exceptions import TimeoutError
5-
from celery.result import AsyncResult
5+
# from celery.result import AsyncResult
66
from celery.states import FAILURE, PENDING, SUCCESS
77
from django.contrib.auth.decorators import login_required
88
from django.core import serializers
@@ -26,6 +26,7 @@
2626
from app.worker.tasks import receiptor
2727
from app.worker.tasks.exporter import exporter
2828
from app.worker.tasks.importers import historical_data_importer
29+
from reboot.celery import app
2930

3031
logger = logging.getLogger(__name__)
3132

@@ -121,8 +122,9 @@ def poll_state(request: HttpRequest):
121122
request=request,
122123
err_msg="The task_id query parameter of the request was omitted.")
123124

124-
task = AsyncResult(task_id)
125+
task = app.AsyncResult(task_id)
125126
res = JsonResponse(_poll_state(PENDING, 0, 200))
127+
print(f"!!! task id={task_id},state={task.state},successful={task.successful()},ready={task.ready()},failed={task.failed()}")
126128
if task.state == FAILURE or task.failed():
127129
res = JsonResponse(_poll_state(FAILURE, 0, 400))
128130
elif task.state == PROGRESS:
@@ -146,8 +148,8 @@ def download_file(request: HttpRequest):
146148
while (attempts < ATTEMPT_LIMIT):
147149
try:
148150
attempts += 1
149-
task = AsyncResult(task_id)
150-
result = task.get(timeout=0.5 * attempts)
151+
task = app.AsyncResult(task_id)
152+
result = task.get(timeout=1.0 * attempts)
151153
print(f"{task} {task_name} success #{attempts}: {result}")
152154
break
153155
except TimeoutError:

app/worker/tasks/__init__.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
'''
22
Module for tasks to be sent on task queue
33
'''
4-
from celery import task
5-
64
from app.worker.app_celery import AppTask
5+
# from celery import task
6+
from reboot.celery import app
7+
78
from .create_receipt import Receiptor
89

910

10-
@task(bind=True, base=AppTask)
11+
@app.task(bind=True, base=AppTask)
1112
def receiptor(self, queryset, total_count):
1213
receiptor = Receiptor(queryset, total_count)
1314
return receiptor()

app/worker/tasks/exporter.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
import csv
22

3-
from celery import task
3+
# from celery import task
44
from celery.utils.log import get_task_logger
55
from django.core import serializers
66
from django.db.models.query import QuerySet
77
from django.http import HttpResponse
88

99
from app.constants.field_names import CURRENT_FIELDS
1010
from app.worker.app_celery import AppTask, update_percent
11+
from reboot.celery import app
1112

1213

13-
@task(bind=True, base=AppTask)
14+
@app.task(bind=True, base=AppTask)
1415
def exporter(self, file_name, qs: QuerySet = None, total_count: int = 0):
1516
rows = serializers.deserialize('json', qs)
1617
csv_exporter = CsvExporter(file_name, rows, total_count)
+4-2
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
"""
22
Module for csv file importers to be sent to queue
33
"""
4-
from celery import task
4+
# from celery import task
55

66
from app.worker.app_celery import AppTask
7+
from reboot.celery import app
8+
79
from .historical_data_importer import HistoricalDataImporter
810

911

10-
@task(bind=True, base=AppTask)
12+
@app.task(bind=True, base=AppTask)
1113
def historical_data_importer(self, csvpath):
1214
importer = HistoricalDataImporter(csvpath)
1315
importer()

reboot/celeryconfig.py

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
result_backend = config("REDIS_URL")
1414
task_serializer = 'pickle'
1515
result_serializer = 'pickle'
16+
task_track_started = True
17+
task_ignore_result = False
1618

1719
# Use PROD settings if valid CLOUDAMQP_URl, else dev
1820
if config('CLOUDAMQP_URL', default=False):

0 commit comments

Comments
 (0)