Skip to content

Commit c2a126a

Browse files
authored
Merge pull request #33 from dabapps/queue-depth-utils
Queue Depth Utils
2 parents aeef06c + 475c3e0 commit c2a126a

File tree

4 files changed

+134
-1
lines changed

4 files changed

+134
-1
lines changed

README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,13 +182,34 @@ Jobs have a `state` field which can have one of the following values:
182182

183183
### API
184184

185+
#### Model methods
186+
187+
##### Job.get_queue_depths
188+
If you need to programatically get the depth of any queue you can run the following:
189+
```python
190+
from django_dbq.models import Job
191+
192+
...
193+
194+
Job.objects.create(name='do_work', workspace={})
195+
Job.objects.create(name='do_other_work', queue_name='other_queue', workspace={})
196+
197+
queue_depths = Job.get_queue_depths()
198+
print(queue_depths) # {"default": 1, "other_queue": 1}
199+
```
200+
201+
**Important:** When checking queue depths, do not assume that the key for your queue will always be available. Queue depths of zero won't be included
202+
in the dict returned by this method.
203+
185204
#### Management commands
186205

206+
##### manage.py delete_old_jobs
187207
There is a management command, `manage.py delete_old_jobs`, which deletes any
188208
jobs from the database which are in state `COMPLETE` or `FAILED` and were
189209
created more than 24 hours ago. This could be run, for example, as a cron task,
190210
to ensure the jobs table remains at a reasonable size.
191211

212+
##### manage.py create_job
192213
For debugging/development purposes, a simple management command is supplied to create jobs:
193214

194215
manage.py create_job <job_name> --queue_name 'my_queue_name' --workspace '{"key": "value"}'
@@ -197,13 +218,20 @@ The `workspace` flag is optional. If supplied, it must be a valid JSON string.
197218

198219
`queue_name` is optional and defaults to `default`
199220

221+
##### manage.py worker
200222
To start a worker:
201223

202224
manage.py worker [queue_name] [--rate_limit]
203225

204226
- `queue_name` is optional, and will default to `default`
205227
- The `--rate_limit` flag is optional, and will default to `1`. It is the minimum number of seconds that must have elapsed before a subsequent job can be run.
206228

229+
##### manage.py queue_depth
230+
If you'd like to check your queue depth from the command line, you can run `manage.py queue_depth [queue_name [queue_name ...]]` and any
231+
jobs in the "NEW" or "READY" states will be returned.
232+
233+
**Important:** If you misspell or provide a queue name which does not have any jobs, a depth of 0 will always be returned.
234+
207235
## Testing
208236

209237
It may be necessary to supply a DATABASE_PORT environment variable.
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from django.core.management.base import BaseCommand
2+
from django_dbq.models import Job
3+
4+
5+
class Command(BaseCommand):
6+
7+
help = "Print the current depth of the given queue"
8+
9+
def add_arguments(self, parser):
10+
parser.add_argument("queue_name", nargs="*", default=["default"], type=str)
11+
12+
def handle(self, *args, **options):
13+
queue_names = options["queue_name"]
14+
queue_depths = Job.get_queue_depths()
15+
16+
queue_depths_string = " ".join(
17+
[
18+
"{queue_name}={queue_depth}".format(
19+
queue_name=queue_name, queue_depth=queue_depths.get(queue_name, 0),
20+
)
21+
for queue_name in queue_names
22+
]
23+
)
24+
25+
self.stdout.write(
26+
"event=queue_depths {queue_depths}".format(queue_depths=queue_depths_string)
27+
)

django_dbq/models.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
get_creation_hook_name,
77
)
88
from jsonfield import JSONField
9-
from django.db.models import UUIDField
9+
from django.db.models import UUIDField, Count
1010
import datetime
1111
import logging
1212
import uuid
@@ -137,3 +137,17 @@ def run_creation_hook(self):
137137
logger.info("Running creation hook %s for new job", creation_hook_name)
138138
creation_hook_function = import_string(creation_hook_name)
139139
creation_hook_function(self)
140+
141+
@staticmethod
142+
def get_queue_depths():
143+
annotation_dicts = (
144+
Job.objects.filter(state__in=(Job.STATES.READY, Job.STATES.NEW))
145+
.values("queue_name")
146+
.order_by("queue_name")
147+
.annotate(Count("queue_name"))
148+
)
149+
150+
return {
151+
annotation_dict["queue_name"]: annotation_dict["queue_name__count"]
152+
for annotation_dict in annotation_dicts
153+
}

django_dbq/tests.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,70 @@ def test_worker_with_queue_name(self):
8181
self.assertTrue("test_queue" in output)
8282

8383

84+
@override_settings(JOBS={"testjob": {"tasks": ["a"]}})
85+
class JobModelMethodTestCase(TestCase):
86+
def test_get_queue_depths(self):
87+
Job.objects.create(name="testjob", queue_name="default")
88+
Job.objects.create(name="testjob", queue_name="testworker")
89+
Job.objects.create(name="testjob", queue_name="testworker")
90+
Job.objects.create(
91+
name="testjob", queue_name="testworker", state=Job.STATES.FAILED
92+
)
93+
Job.objects.create(
94+
name="testjob", queue_name="testworker", state=Job.STATES.COMPLETE
95+
)
96+
97+
queue_depths = Job.get_queue_depths()
98+
self.assertDictEqual(queue_depths, {"default": 1, "testworker": 2})
99+
100+
101+
@override_settings(JOBS={"testjob": {"tasks": ["a"]}})
102+
class QueueDepthTestCase(TestCase):
103+
def test_queue_depth(self):
104+
105+
Job.objects.create(name="testjob", state=Job.STATES.FAILED)
106+
Job.objects.create(name="testjob", state=Job.STATES.NEW)
107+
Job.objects.create(name="testjob", state=Job.STATES.FAILED)
108+
Job.objects.create(name="testjob", state=Job.STATES.COMPLETE)
109+
Job.objects.create(name="testjob", state=Job.STATES.READY)
110+
Job.objects.create(
111+
name="testjob", queue_name="testqueue", state=Job.STATES.READY
112+
)
113+
Job.objects.create(
114+
name="testjob", queue_name="testqueue", state=Job.STATES.READY
115+
)
116+
117+
stdout = StringIO()
118+
call_command("queue_depth", stdout=stdout)
119+
output = stdout.getvalue()
120+
self.assertEqual(output.strip(), "event=queue_depths default=2")
121+
122+
def test_queue_depth_multiple_queues(self):
123+
124+
Job.objects.create(name="testjob", state=Job.STATES.FAILED)
125+
Job.objects.create(name="testjob", state=Job.STATES.NEW)
126+
Job.objects.create(name="testjob", state=Job.STATES.FAILED)
127+
Job.objects.create(name="testjob", state=Job.STATES.COMPLETE)
128+
Job.objects.create(name="testjob", state=Job.STATES.READY)
129+
Job.objects.create(
130+
name="testjob", queue_name="testqueue", state=Job.STATES.READY
131+
)
132+
Job.objects.create(
133+
name="testjob", queue_name="testqueue", state=Job.STATES.READY
134+
)
135+
136+
stdout = StringIO()
137+
call_command("queue_depth", queue_name=("default", "testqueue",), stdout=stdout)
138+
output = stdout.getvalue()
139+
self.assertEqual(output.strip(), "event=queue_depths default=2 testqueue=2")
140+
141+
def test_queue_depth_for_queue_with_zero_jobs(self):
142+
stdout = StringIO()
143+
call_command("queue_depth", queue_name=("otherqueue",), stdout=stdout)
144+
output = stdout.getvalue()
145+
self.assertEqual(output.strip(), "event=queue_depths otherqueue=0")
146+
147+
84148
@freezegun.freeze_time()
85149
@mock.patch("django_dbq.management.commands.worker.sleep")
86150
@mock.patch("django_dbq.management.commands.worker.process_job")

0 commit comments

Comments
 (0)