Skip to content

Commit af8d14c

Browse files
committed
Instrument adapter queries for external programs.
When executing queries via the adapter, we may not recieve the instrumentation we expect. The ActiveRecord adapter uses a raw connection which bypasses the ActiveRecord instrumentation layer and instead talks directly to PG, making it impossible to know what queries are being executed. This change cribs heavily from the implementation of the `log` method on the `AbstractAdapter` class in ActiveRecord to make it easy to add instrumentation to an adapter. By default the instrumenter is set to `nil`, and instrumentation will be a no-op; otherwise the query will be instrumented via the `instrument` method and passed the relevant payload to handle. The structure of this is deliberately similar to that of the event emitted by ActiveRecord's `sql.active_record` notification.
1 parent cfb84d4 commit af8d14c

File tree

3 files changed

+71
-4
lines changed

3 files changed

+71
-4
lines changed

lib/que/adapters/active_record.rb

+5
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ class ActiveRecord < Base
1313
::PG::UnableToSend,
1414
].freeze
1515

16+
def initialize(_thing = nil)
17+
super
18+
@instrumenter = ActiveSupport::Notifications.instrumenter
19+
end
20+
1621
def checkout
1722
checkout_activerecord_adapter { |conn| yield conn.raw_connection }
1823
rescue *AR_UNAVAILABLE_CONNECTION_ERRORS => e

lib/que/adapters/base.rb

+34-2
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@ module Adapters
1313
class UnavailableConnection < StandardError; end
1414

1515
class Base
16+
attr_reader :instrumenter
17+
1618
def initialize(_thing = nil)
1719
@prepared_statements = {}
20+
@instrumenter = nil
1821
end
1922

2023
# The only method that adapters really need to implement. Should lock a
@@ -62,7 +65,14 @@ def in_transaction?
6265

6366
def execute_sql(sql, params)
6467
args = params.empty? ? [sql] : [sql, params]
65-
checkout { |conn| conn.async_exec(*args) }
68+
69+
checkout do |conn|
70+
log(sql, conn, params, async: true) do |notification_payload|
71+
conn.async_exec(*args).tap do |result|
72+
notification_payload[:row_count] = result.count
73+
end
74+
end
75+
end
6676
end
6777

6878
def execute_prepared(name, params)
@@ -81,7 +91,11 @@ def execute_prepared(name, params)
8191
prepared_just_now = statements[name] = true
8292
end
8393

84-
conn.exec_prepared("que_#{name}", params)
94+
log(SQL[name], conn, params, async: false) do |notification_payload|
95+
conn.exec_prepared("que_#{name}", params).tap do |result|
96+
notification_payload[:row_count] = result.count
97+
end
98+
end
8599
rescue ::PG::InvalidSqlStatementName => e
86100
# Reconnections on ActiveRecord can cause the same connection
87101
# objects to refer to new backends, so recover as well as we can.
@@ -97,6 +111,24 @@ def execute_prepared(name, params)
97111
end
98112
end
99113

114+
# rubocop:disable Metrics/ParameterLists
115+
def log(sql, conn, binds = [], type_casted_binds = [], name = "SQL", statement_name = nil, async: false, &block)
116+
return yield({}) if instrumenter.nil?
117+
118+
instrumenter.instrument(
119+
"que.execute",
120+
sql: sql,
121+
name: name,
122+
binds: binds,
123+
type_casted_binds: type_casted_binds,
124+
async: async,
125+
statement_name: statement_name,
126+
connection: conn,
127+
&block
128+
)
129+
end
130+
# rubocop:enable Metrics/ParameterLists
131+
100132
CAST_PROCS = {
101133
# booleans
102134
16 => ->(value) {

spec/lib/que/job_spec.rb

+32-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@
55
RSpec.describe Que::Job do
66
describe ".enqueue" do
77
let(:run_at) { postgres_now }
8+
let(:job_class) do
9+
Class.new(described_class).tap do |klass|
10+
stub_const("FooBarJob", klass)
11+
end
12+
end
813

914
it "adds job to que_jobs table" do
1015
expect { described_class.enqueue(:hello, run_at: run_at) }.
@@ -41,15 +46,14 @@
4146
que_job_id: an_instance_of(Integer),
4247
queue: "default",
4348
priority: 100,
44-
job_class: a_string_including("Class"),
49+
job_class: "FooBarJob",
4550
retryable: true,
4651
run_at: run_at,
4752
args: [500, "gbp", "testing"],
4853
custom_log_1: 500,
4954
custom_log_2: "test-log",
5055
)
5156

52-
job_class = Class.new(described_class)
5357
job_class.custom_log_context ->(attrs) {
5458
{
5559
custom_log_1: attrs[:args][0],
@@ -59,6 +63,32 @@
5963
job_class.enqueue(500, :gbp, :testing, run_at: run_at)
6064
end
6165

66+
it "instruments queries using ActiveSupport::Notifications" do
67+
events = []
68+
69+
ActiveSupport::Notifications.subscribe("que.execute") do |event|
70+
events << event
71+
end
72+
73+
job_class.enqueue("foobar")
74+
75+
expect(events).to include(
76+
having_attributes(
77+
name: "que.execute",
78+
payload: hash_including(
79+
sql: /INSERT INTO que_jobs/,
80+
name: "SQL",
81+
binds: [nil, nil, nil, "FooBarJob", true, "[\"foobar\"]"],
82+
type_casted_binds: [],
83+
async: false,
84+
statement_name: nil,
85+
connection: instance_of(PG::Connection),
86+
row_count: 1,
87+
),
88+
),
89+
)
90+
end
91+
6292
context "with a custom adapter specified" do
6393
let(:custom_adapter) { Que.adapter.dup }
6494
let(:job_with_adapter) { Class.new(described_class) }

0 commit comments

Comments
 (0)