Skip to content

Commit 3f77f26

Browse files
committed
Add Que::Job#bulk_enqueue method for enqueuing jobs in bulk
1 parent 6939583 commit 3f77f26

File tree

3 files changed

+445
-1
lines changed

3 files changed

+445
-1
lines changed

lib/que.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class << self
6969

7070
# Copy some commonly-used methods here, for convenience.
7171
def_delegators :pool, :execute, :checkout, :in_transaction?
72-
def_delegators Job, :enqueue, :run_synchronously, :run_synchronously=
72+
def_delegators Job, :enqueue, :bulk_enqueue, :run_synchronously, :run_synchronously=
7373
def_delegators Migrations, :db_version, :migrate!
7474

7575
# Global configuration logic.

lib/que/job.bulk_enqueue_spec.rb

Lines changed: 383 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,383 @@
1+
# frozen_string_literal: true
2+
3+
require 'spec_helper'
4+
5+
describe Que::Job, '.bulk_enqueue' do
6+
def assert_enqueue(
7+
expected_queue: 'default',
8+
expected_priority: 100,
9+
expected_run_at: Time.now,
10+
expected_job_class: Que::Job,
11+
expected_result_class: nil,
12+
expected_args:,
13+
expected_kwargs:,
14+
expected_tags: nil,
15+
expected_count:,
16+
&enqueue_block
17+
)
18+
19+
assert_equal 0, jobs_dataset.count
20+
21+
results = enqueue_block.call
22+
23+
assert_equal expected_count, jobs_dataset.count
24+
25+
results.each_with_index do |result, i|
26+
assert_kind_of Que::Job, result
27+
assert_instance_of (expected_result_class || expected_job_class), result
28+
29+
assert_equal expected_priority, result.que_attrs[:priority]
30+
assert_equal expected_args[i], result.que_attrs[:args]
31+
assert_equal expected_kwargs[i], result.que_attrs[:kwargs]
32+
33+
if expected_tags.nil?
34+
assert_equal({}, result.que_attrs[:data])
35+
else
36+
assert_equal expected_tags, result.que_attrs[:data][:tags]
37+
end
38+
end
39+
40+
job = jobs_dataset.each_with_index do |job, i|
41+
assert_equal expected_queue, job[:queue]
42+
assert_equal expected_priority, job[:priority]
43+
assert_in_delta job[:run_at], expected_run_at, QueSpec::TIME_SKEW
44+
assert_equal expected_job_class.to_s, job[:job_class]
45+
assert_equal expected_args[i], job[:args]
46+
assert_equal expected_kwargs[i], job[:kwargs]
47+
end
48+
49+
jobs_dataset.delete
50+
end
51+
52+
it "should be able to queue multiple jobs" do
53+
assert_enqueue(
54+
expected_count: 3,
55+
expected_args: Array.new(3) { [] },
56+
expected_kwargs: Array.new(3) { {} },
57+
) do
58+
Que.bulk_enqueue(
59+
Array.new(3) { { args: [], kwargs: {} } }
60+
)
61+
end
62+
end
63+
64+
it "should be able to queue multiple jobs with arguments" do
65+
assert_enqueue(
66+
expected_count: 3,
67+
expected_args: [[1, 'two'], ['3', 4], [5, 6]],
68+
expected_kwargs: [{ seven: '8' }, { '9': 10 }, { eleven: 'twelve' }],
69+
) do
70+
Que.bulk_enqueue(
71+
[
72+
{ args: [1, 'two'], kwargs: { seven: '8' } },
73+
{ args: ['3', 4], kwargs: { '9' => 10 } },
74+
{ args: [5, 6], kwargs: { 'eleven' => 'twelve' } },
75+
]
76+
)
77+
end
78+
end
79+
80+
# it "should be able to queue a job with complex arguments" do
81+
# assert_enqueue(
82+
# expected_args: [
83+
# 1,
84+
# 'two',
85+
# ],
86+
# expected_kwargs: {
87+
# string: "string",
88+
# integer: 5,
89+
# array: [1, "two", {three: 3}],
90+
# hash: {one: 1, two: 'two', three: [3]},
91+
# },
92+
# ) do
93+
# Que.enqueue(
94+
# 1,
95+
# 'two',
96+
# string: "string",
97+
# integer: 5,
98+
# array: [1, "two", {three: 3}],
99+
# hash: {one: 1, two: 'two', three: [3]},
100+
# )
101+
# end
102+
# end
103+
#
104+
# it "should be able to handle a namespaced job class" do
105+
# assert_enqueue(
106+
# expected_args: [1],
107+
# expected_job_class: NamespacedJobNamespace::NamespacedJob,
108+
# ) { NamespacedJobNamespace::NamespacedJob.enqueue(1) }
109+
# end
110+
#
111+
# it "should error appropriately on an anonymous job subclass" do
112+
# klass = Class.new(Que::Job)
113+
#
114+
# error = assert_raises(Que::Error) { klass.enqueue(1) }
115+
#
116+
# assert_equal \
117+
# "Can't enqueue an anonymous subclass of Que::Job",
118+
# error.message
119+
# end
120+
#
121+
# it "should be able to queue a job with a specific queue name" do
122+
# assert_enqueue(
123+
# expected_args: [1],
124+
# expected_queue: 'special_queue_name',
125+
# ) { Que.enqueue(1, job_options: { queue: 'special_queue_name' }) }
126+
# end
127+
#
128+
# it "should be able to queue a job with a specific time to run" do
129+
# assert_enqueue(
130+
# expected_args: [1],
131+
# expected_run_at: Time.now + 60,
132+
# ) { Que.enqueue(1, job_options: { run_at: Time.now + 60 }) }
133+
# end
134+
#
135+
# it "should be able to queue a job with a specific priority" do
136+
# assert_enqueue(
137+
# expected_args: [1],
138+
# expected_priority: 4,
139+
# ) { Que.enqueue(1, job_options: { priority: 4 }) }
140+
# end
141+
#
142+
# it "should be able to queue a job with options in addition to args and kwargs" do
143+
# assert_enqueue(
144+
# expected_args: [1],
145+
# expected_kwargs: { string: "string" },
146+
# expected_run_at: Time.now + 60,
147+
# expected_priority: 4,
148+
# ) { Que.enqueue(1, string: "string", job_options: { run_at: Time.now + 60, priority: 4 }) }
149+
# end
150+
#
151+
# it "should no longer fall back to using job options specified at the top level if not specified in job_options" do
152+
# assert_enqueue(
153+
# expected_args: [1],
154+
# expected_kwargs: { string: "string", run_at: Time.utc(2050).to_s, priority: 10 },
155+
# expected_run_at: Time.now,
156+
# expected_priority: 15,
157+
# ) { Que.enqueue(1, string: "string", run_at: Time.utc(2050), priority: 10, job_options: { priority: 15 }) }
158+
# end
159+
#
160+
# describe "when enqueuing a job with tags" do
161+
# it "should be able to specify tags on a case-by-case basis" do
162+
# assert_enqueue(
163+
# expected_args: [1],
164+
# expected_kwargs: { string: "string" },
165+
# expected_tags: ["tag_1", "tag_2"],
166+
# ) { Que.enqueue(1, string: "string", job_options: { tags: ["tag_1", "tag_2"] }) }
167+
# end
168+
#
169+
# it "should no longer fall back to using tags specified at the top level if not specified in job_options" do
170+
# assert_enqueue(
171+
# expected_args: [1],
172+
# expected_kwargs: { string: "string", tags: ["tag_1", "tag_2"] },
173+
# expected_tags: nil,
174+
# ) { Que.enqueue(1, string: "string", tags: ["tag_1", "tag_2"]) }
175+
# end
176+
#
177+
# it "should raise an error if passing too many tags" do
178+
# error =
179+
# assert_raises(Que::Error) do
180+
# Que::Job.enqueue 1, string: "string", job_options: { tags: %w[a b c d e f] }
181+
# end
182+
#
183+
# assert_equal \
184+
# "Can't enqueue a job with more than 5 tags! (passed 6)",
185+
# error.message
186+
# end
187+
#
188+
# it "should raise an error if any of the tags are too long" do
189+
# error =
190+
# assert_raises(Que::Error) do
191+
# Que::Job.enqueue 1, string: "string", job_options: { tags: ["a" * 101] }
192+
# end
193+
#
194+
# assert_equal \
195+
# "Can't enqueue a job with a tag longer than 100 characters! (\"#{"a" * 101}\")",
196+
# error.message
197+
# end
198+
# end
199+
#
200+
# it "should respect a job class defined as a string" do
201+
# class MyJobClass < Que::Job; end
202+
#
203+
# assert_enqueue(
204+
# expected_args: ['argument'],
205+
# expected_kwargs: { other_arg: "other_arg" },
206+
# expected_job_class: MyJobClass,
207+
# expected_result_class: Que::Job
208+
# ) { Que.enqueue('argument', other_arg: "other_arg", job_options: { job_class: 'MyJobClass' }) }
209+
# end
210+
#
211+
# describe "when there's a hierarchy of job classes" do
212+
# class PriorityDefaultJob < Que::Job
213+
# self.priority = 3
214+
# end
215+
#
216+
# class PrioritySubclassJob < PriorityDefaultJob
217+
# end
218+
#
219+
# class RunAtDefaultJob < Que::Job
220+
# self.run_at = -> { Time.now + 30 }
221+
# end
222+
#
223+
# class RunAtSubclassJob < RunAtDefaultJob
224+
# end
225+
#
226+
# class QueueDefaultJob < Que::Job
227+
# self.queue = :queue_1
228+
# end
229+
#
230+
# class QueueSubclassJob < QueueDefaultJob
231+
# end
232+
#
233+
# describe "priority" do
234+
# it "should respect a default priority in a job class" do
235+
# assert_enqueue(
236+
# expected_args: [1],
237+
# expected_priority: 3,
238+
# expected_job_class: PriorityDefaultJob
239+
# ) { PriorityDefaultJob.enqueue(1) }
240+
#
241+
# assert_enqueue(
242+
# expected_args: [1],
243+
# expected_priority: 4,
244+
# expected_job_class: PriorityDefaultJob
245+
# ) { PriorityDefaultJob.enqueue(1, job_options: { priority: 4 }) }
246+
# end
247+
#
248+
# it "should respect an inherited priority in a job class" do
249+
# assert_enqueue(
250+
# expected_args: [1],
251+
# expected_priority: 3,
252+
# expected_job_class: PrioritySubclassJob
253+
# ) { PrioritySubclassJob.enqueue(1) }
254+
#
255+
# assert_enqueue(
256+
# expected_args: [1],
257+
# expected_priority: 4,
258+
# expected_job_class: PrioritySubclassJob
259+
# ) { PrioritySubclassJob.enqueue(1, job_options: { priority: 4 }) }
260+
# end
261+
#
262+
# it "should respect an overridden priority in a job class" do
263+
# begin
264+
# PrioritySubclassJob.priority = 60
265+
#
266+
# assert_enqueue(
267+
# expected_args: [1],
268+
# expected_priority: 60,
269+
# expected_job_class: PrioritySubclassJob
270+
# ) { PrioritySubclassJob.enqueue(1) }
271+
#
272+
# assert_enqueue(
273+
# expected_args: [1],
274+
# expected_priority: 4,
275+
# expected_job_class: PrioritySubclassJob
276+
# ) { PrioritySubclassJob.enqueue(1, job_options: { priority: 4 }) }
277+
# ensure
278+
# PrioritySubclassJob.remove_instance_variable(:@priority)
279+
# end
280+
# end
281+
# end
282+
#
283+
# describe "run_at" do
284+
# it "should respect a default run_at in a job class" do
285+
# assert_enqueue(
286+
# expected_args: [1],
287+
# expected_run_at: Time.now + 30,
288+
# expected_job_class: RunAtDefaultJob
289+
# ) { RunAtDefaultJob.enqueue(1) }
290+
#
291+
# assert_enqueue(
292+
# expected_args: [1],
293+
# expected_run_at: Time.now + 60,
294+
# expected_job_class: RunAtDefaultJob
295+
# ) { RunAtDefaultJob.enqueue(1, job_options: { run_at: Time.now + 60 }) }
296+
# end
297+
#
298+
# it "should respect an inherited run_at in a job class" do
299+
# assert_enqueue(
300+
# expected_args: [1],
301+
# expected_run_at: Time.now + 30,
302+
# expected_job_class: RunAtSubclassJob
303+
# ) { RunAtSubclassJob.enqueue(1) }
304+
#
305+
# assert_enqueue(
306+
# expected_args: [1],
307+
# expected_run_at: Time.now + 60,
308+
# expected_job_class: RunAtSubclassJob
309+
# ) { RunAtSubclassJob.enqueue(1, job_options: { run_at: Time.now + 60 }) }
310+
# end
311+
#
312+
# it "should respect an overridden run_at in a job class" do
313+
# begin
314+
# RunAtSubclassJob.run_at = -> {Time.now + 90}
315+
#
316+
# assert_enqueue(
317+
# expected_args: [1],
318+
# expected_run_at: Time.now + 90,
319+
# expected_job_class: RunAtSubclassJob
320+
# ) { RunAtSubclassJob.enqueue(1) }
321+
#
322+
# assert_enqueue(
323+
# expected_args: [1],
324+
# expected_run_at: Time.now + 60,
325+
# expected_job_class: RunAtSubclassJob
326+
# ) { RunAtSubclassJob.enqueue(1, job_options: { run_at: Time.now + 60 }) }
327+
# ensure
328+
# RunAtSubclassJob.remove_instance_variable(:@run_at)
329+
# end
330+
# end
331+
# end
332+
#
333+
# describe "queue" do
334+
# it "should respect a default queue in a job class" do
335+
# assert_enqueue(
336+
# expected_args: [1],
337+
# expected_queue: 'queue_1',
338+
# expected_job_class: QueueDefaultJob
339+
# ) { QueueDefaultJob.enqueue(1) }
340+
#
341+
# assert_enqueue(
342+
# expected_args: [1],
343+
# expected_queue: 'queue_3',
344+
# expected_job_class: QueueDefaultJob
345+
# ) { QueueDefaultJob.enqueue(1, job_options: { queue: 'queue_3' }) }
346+
# end
347+
#
348+
# it "should respect an inherited queue in a job class" do
349+
# assert_enqueue(
350+
# expected_args: [1],
351+
# expected_queue: 'queue_1',
352+
# expected_job_class: QueueSubclassJob
353+
# ) { QueueSubclassJob.enqueue(1) }
354+
#
355+
# assert_enqueue(
356+
# expected_args: [1],
357+
# expected_queue: 'queue_3',
358+
# expected_job_class: QueueSubclassJob
359+
# ) { QueueSubclassJob.enqueue(1, job_options: { queue: 'queue_3' }) }
360+
# end
361+
#
362+
# it "should respect an overridden queue in a job class" do
363+
# begin
364+
# QueueSubclassJob.queue = :queue_2
365+
#
366+
# assert_enqueue(
367+
# expected_args: [1],
368+
# expected_queue: 'queue_2',
369+
# expected_job_class: QueueSubclassJob
370+
# ) { QueueSubclassJob.enqueue(1) }
371+
#
372+
# assert_enqueue(
373+
# expected_args: [1],
374+
# expected_queue: 'queue_3',
375+
# expected_job_class: QueueSubclassJob
376+
# ) { QueueSubclassJob.enqueue(1, job_options: { queue: 'queue_3' }) }
377+
# ensure
378+
# QueueSubclassJob.remove_instance_variable(:@queue)
379+
# end
380+
# end
381+
# end
382+
# end
383+
end

0 commit comments

Comments
 (0)