Skip to content

Commit c0e712d

Browse files
committed
Support bulk enqueue with differing class and options
Allow bulk enqueue of multiple different job classes and differing job options in a single `.bulk_enqueue` block. Each job can now differ by job class, queue, priority, run at and tags (in addition to args and kwargs).
1 parent 6aac827 commit c0e712d

File tree

2 files changed

+228
-95
lines changed

2 files changed

+228
-95
lines changed

lib/que/job.rb

Lines changed: 78 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -29,21 +29,18 @@ class Job
2929

3030
SQL[:bulk_insert_jobs] =
3131
%{
32-
WITH args_and_kwargs as (
33-
SELECT * from json_to_recordset(coalesce($5, '[{args:{},kwargs:{}}]')::json) as x(args jsonb, kwargs jsonb)
34-
)
3532
INSERT INTO public.que_jobs
3633
(queue, priority, run_at, job_class, args, kwargs, data, job_schema_version)
3734
SELECT
38-
coalesce($1, 'default')::text,
39-
coalesce($2, 100)::smallint,
40-
coalesce($3, now())::timestamptz,
41-
$4::text,
42-
args_and_kwargs.args,
43-
args_and_kwargs.kwargs,
44-
coalesce($6, '{}')::jsonb,
35+
coalesce(queue, 'default')::text,
36+
coalesce(priority, 100)::smallint,
37+
coalesce(run_at, now())::timestamptz,
38+
job_class,
39+
coalesce(args, '[]')::jsonb,
40+
coalesce(kwargs, '{}')::jsonb,
41+
coalesce(data, '{}')::jsonb,
4542
#{Que.job_schema_version}
46-
FROM args_and_kwargs
43+
FROM json_populate_recordset(null::que_jobs, $1)
4744
RETURNING *
4845
}
4946

@@ -82,6 +79,9 @@ def enqueue(*args)
8279

8380
job_options = kwargs.delete(:job_options) || {}
8481

82+
job_class = job_options[:job_class] || name ||
83+
raise(Error, "Can't enqueue an anonymous subclass of Que::Job")
84+
8585
if job_options[:tags]
8686
if job_options[:tags].length > MAXIMUM_TAGS_COUNT
8787
raise Que::Error, "Can't enqueue a job with more than #{MAXIMUM_TAGS_COUNT} tags! (passed #{job_options[:tags].length})"
@@ -94,28 +94,40 @@ def enqueue(*args)
9494
end
9595
end
9696

97-
attrs = {
98-
queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue,
99-
priority: job_options[:priority] || resolve_que_setting(:priority),
100-
run_at: job_options[:run_at] || resolve_que_setting(:run_at),
101-
args: args,
102-
kwargs: kwargs,
103-
data: job_options[:tags] ? { tags: job_options[:tags] } : {},
104-
job_class: \
105-
job_options[:job_class] || name ||
106-
raise(Error, "Can't enqueue an anonymous subclass of Que::Job"),
107-
}
108-
10997
if Thread.current[:que_jobs_to_bulk_insert]
98+
# Don't resolve class settings during `.enqueue`, only resolve them
99+
# during `._bulk_enqueue_insert` so they can be overwritten by specifying
100+
# them in `.bulk_enqueue`.
101+
attrs = {
102+
queue: job_options[:queue],
103+
priority: job_options[:priority],
104+
run_at: job_options[:run_at],
105+
job_class: job_class == 'Que::Job' ? nil : job_class,
106+
args: args,
107+
kwargs: kwargs,
108+
data: job_options[:tags] && { tags: job_options[:tags] },
109+
klass: self,
110+
}
111+
110112
if self.name == 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper'
111113
raise Que::Error, "Que.bulk_enqueue does not support ActiveJob."
112114
end
113115

114-
raise Que::Error, "When using .bulk_enqueue, job_options must be passed to that method rather than .enqueue" unless job_options == {}
115-
116116
Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs] << attrs
117-
new({})
118-
elsif attrs[:run_at].nil? && resolve_que_setting(:run_synchronously)
117+
return new({})
118+
end
119+
120+
attrs = {
121+
queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue,
122+
priority: job_options[:priority] || resolve_que_setting(:priority),
123+
run_at: job_options[:run_at] || resolve_que_setting(:run_at),
124+
job_class: job_class,
125+
args: args,
126+
kwargs: kwargs,
127+
data: job_options[:tags] ? { tags: job_options[:tags] } : {},
128+
}
129+
130+
if attrs[:run_at].nil? && resolve_que_setting(:run_synchronously)
119131
attrs.merge!(
120132
args: Que.deserialize_json(Que.serialize_json(attrs[:args])),
121133
kwargs: Que.deserialize_json(Que.serialize_json(attrs[:kwargs])),
@@ -144,16 +156,13 @@ def bulk_enqueue(job_options: {}, notify: false)
144156
jobs_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs]
145157
job_options = Thread.current[:que_jobs_to_bulk_insert][:job_options]
146158
return [] if jobs_attrs.empty?
147-
raise Que::Error, "When using .bulk_enqueue, all jobs enqueued must be of the same job class" unless jobs_attrs.map { |attrs| attrs[:job_class] }.uniq.one?
148-
args_and_kwargs_array = jobs_attrs.map { |attrs| attrs.slice(:args, :kwargs) }
149-
klass = job_options[:job_class] ? Que::Job : Que.constantize(jobs_attrs.first[:job_class])
150-
klass._bulk_enqueue_insert(args_and_kwargs_array, job_options: job_options, notify: notify)
159+
_bulk_enqueue_insert(jobs_attrs, job_options: job_options, notify: notify)
151160
ensure
152161
Thread.current[:que_jobs_to_bulk_insert] = nil
153162
end
154163

155-
def _bulk_enqueue_insert(args_and_kwargs_array, job_options: {}, notify:)
156-
raise 'Unexpected bulk args format' if !args_and_kwargs_array.is_a?(Array) || !args_and_kwargs_array.all? { |a| a.is_a?(Hash) }
164+
def _bulk_enqueue_insert(jobs_attrs, job_options: {}, notify: false)
165+
raise 'Unexpected bulk args format' if !jobs_attrs.is_a?(Array) || !jobs_attrs.all? { |a| a.is_a?(Hash) }
157166

158167
if job_options[:tags]
159168
if job_options[:tags].length > MAXIMUM_TAGS_COUNT
@@ -167,49 +176,43 @@ def _bulk_enqueue_insert(args_and_kwargs_array, job_options: {}, notify:)
167176
end
168177
end
169178

170-
args_and_kwargs_array = args_and_kwargs_array.map do |args_and_kwargs|
171-
args_and_kwargs.merge(
172-
args: args_and_kwargs.fetch(:args, []),
173-
kwargs: args_and_kwargs.fetch(:kwargs, {}),
174-
)
175-
end
176-
177-
attrs = {
178-
queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue,
179-
priority: job_options[:priority] || resolve_que_setting(:priority),
180-
run_at: job_options[:run_at] || resolve_que_setting(:run_at),
181-
args_and_kwargs_array: args_and_kwargs_array,
182-
data: job_options[:tags] ? { tags: job_options[:tags] } : {},
183-
job_class: \
184-
job_options[:job_class] || name ||
185-
raise(Error, "Can't enqueue an anonymous subclass of Que::Job"),
186-
}
187-
188-
if attrs[:run_at].nil? && resolve_que_setting(:run_synchronously)
189-
args_and_kwargs_array = Que.deserialize_json(Que.serialize_json(attrs.delete(:args_and_kwargs_array)))
190-
args_and_kwargs_array.map do |args_and_kwargs|
191-
_run_attrs(
192-
attrs.merge(
193-
args: args_and_kwargs.fetch(:args),
194-
kwargs: args_and_kwargs.fetch(:kwargs),
195-
),
179+
jobs_attrs = jobs_attrs.map do |attrs|
180+
klass = attrs[:klass] || self
181+
182+
attrs = {
183+
queue: attrs[:queue] || job_options[:queue] || klass.resolve_que_setting(:queue) || Que.default_queue,
184+
priority: attrs[:priority] || job_options[:priority] || klass.resolve_que_setting(:priority),
185+
run_at: attrs[:run_at] || job_options[:run_at] || klass.resolve_que_setting(:run_at),
186+
job_class: attrs[:job_class] || job_options[:job_class] || klass.name,
187+
args: attrs[:args] || [],
188+
kwargs: attrs[:kwargs] || {},
189+
data: attrs[:data] || (job_options[:tags] ? { tags: job_options[:tags] } : {}),
190+
klass: klass
191+
}
192+
193+
if attrs[:run_at].nil? && klass.resolve_que_setting(:run_synchronously)
194+
klass._run_attrs(
195+
attrs.except(:klass).merge(
196+
args: Que.deserialize_json(Que.serialize_json(attrs[:args])),
197+
kwargs: Que.deserialize_json(Que.serialize_json(attrs[:kwargs])),
198+
data: Que.deserialize_json(Que.serialize_json(attrs[:data])),
199+
)
196200
)
201+
nil
202+
else
203+
attrs
197204
end
198-
else
199-
attrs.merge!(
200-
args_and_kwargs_array: Que.serialize_json(attrs[:args_and_kwargs_array]),
201-
data: Que.serialize_json(attrs[:data]),
202-
)
203-
values_array =
204-
Que.transaction do
205-
Que.execute('SET LOCAL que.skip_notify TO true') unless notify
206-
Que.execute(
207-
:bulk_insert_jobs,
208-
attrs.values_at(:queue, :priority, :run_at, :job_class, :args_and_kwargs_array, :data),
209-
)
210-
end
211-
values_array.map(&method(:new))
212-
end
205+
end.compact
206+
207+
values_array =
208+
Que.transaction do
209+
Que.execute('SET LOCAL que.skip_notify TO true') unless notify
210+
Que.execute(
211+
:bulk_insert_jobs,
212+
[Que.serialize_json(jobs_attrs.map { |attrs| attrs.except(:klass) })]
213+
)
214+
end
215+
values_array.zip(jobs_attrs).map { |values, attrs| attrs.fetch(:klass).new(values) }
213216
end
214217

215218
def run(*args)
@@ -237,7 +240,7 @@ def resolve_que_setting(setting, *args)
237240
end
238241
end
239242

240-
private
243+
protected
241244

242245
def _run_attrs(attrs)
243246
attrs[:error_count] = 0

0 commit comments

Comments
 (0)