From ddca4f7c422d7eebce49c097ad6383595a95c25e Mon Sep 17 00:00:00 2001 From: Godfrey Chan Date: Wed, 20 Aug 2025 23:22:18 -0700 Subject: [PATCH] Fix `Delayed::Job` probe crash on deserialization errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, the `Delayed::Job` probe eagerly deserializes the `payload_object` in order to name the trace. This moves the work ahead of where it would normally happen, and in case of an error during deserialization (e.g. syntax error in the YAML payload, or a deleted database record as in the customer issue), the error will be raised outside of the block where it's normally be caught. This fix defers the naming until a spot where we can guarantee DJ's successful deserialization, preventing the crashes while capturing the deserialization (database load) time within the trace. If an error occurs prior to reaching that step, the trace will have a default "unknown" label. It is possible that this would affect some traces that we previously were able to name successfully – specifically, if an error occurs in a lifecycle hook. If this turns out to be an issue, we can try and opportunistically check for `@payload_object` on the `job` instance during the `:error` and `:failure` hook, but this is reaching too deep into the implementation internals for my taste. And honestly it seems fair enough to categorize anything that never made it to the actual `#perform` as generic "unknown+error" traces. Fixes #491 --- lib/skylight/probes/delayed_job.rb | 65 +++++++++++++--------- spec/integration/delayed_job_spec.rb | 82 ++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+), 25 deletions(-) diff --git a/lib/skylight/probes/delayed_job.rb b/lib/skylight/probes/delayed_job.rb index ae7a7886..7a846f74 100644 --- a/lib/skylight/probes/delayed_job.rb +++ b/lib/skylight/probes/delayed_job.rb @@ -8,21 +8,36 @@ module DelayedJob begin require "delayed/plugin" + UNKNOWN = "" + class Plugin < ::Delayed::Plugin callbacks do |lifecycle| lifecycle.around(:perform) { |worker, job, &block| sk_instrument(worker, job, &block) } - lifecycle.after(:error) { |_worker, _job| Skylight.trace&.segment = "error" } + lifecycle.after(:failure) { |_worker, _job| Skylight.trace&.segment = "error" } end class << self include Skylight::Util::Logging + # This is called quite early in Delayed::Worker + # + # Typically, the `:perform` lifecycle hook is called before the + # `payload_object` has been deserialized, so we can't name the + # trace yet. + # + # If we call `payload_object` here, we would move the work of + # loading the object ahead of where it naturally happens, which + # means the database load time won't be instrumented. On the other + # hand, should the deserialization fail, we would have moved the + # timing of the error as well. Crucially – it would have moved it + # outside of the spot where these errors are normally caught and + # reported by the worker. + # + # See https://github.com/skylightio/skylight-ruby/issues/491 def sk_instrument(_worker, job) - endpoint = Skylight::Probes::DelayedJob.handler_name(job) - Skylight.trace( - endpoint, + UNKNOWN, "app.delayed_job.worker", "Delayed::Worker#run", component: :worker, @@ -41,15 +56,6 @@ def sk_instrument(_worker, job) $stderr.puts "[SKYLIGHT] The delayed_job probe was requested, but Delayed::Plugin was not defined." end - UNKNOWN = "" - - def self.handler_name(job) - payload_object = - job.respond_to?(:payload_object_without_sk) ? job.payload_object_without_sk : job.payload_object - - payload_object_name(payload_object) - end - def self.payload_object_name(payload_object) if payload_object.is_a?(::Delayed::PerformableMethod) payload_object.display_name @@ -77,18 +83,27 @@ def self.payload_object_source_meta(payload_object) class InstrumentationProxy < SimpleDelegator def perform - source_meta = Skylight::Probes::DelayedJob.payload_object_source_meta(__getobj__) - - opts = { - category: "app.delayed_job.job", - title: format_source(*source_meta), - meta: { - source_location_hint: source_meta - }, - internal: true - } - - Skylight.instrument(opts) { __getobj__.perform } + if (trace = Skylight.instrumenter&.current_trace) + if trace.endpoint == UNKNOWN + # At this point, deserialization was, by definition, successful. + # So it'd be safe to set the endpoint name based on the payload + # object here. + trace.endpoint = Skylight::Probes::DelayedJob.payload_object_name(__getobj__) + end + + source_meta = Skylight::Probes::DelayedJob.payload_object_source_meta(__getobj__) + + opts = { + category: "app.delayed_job.job", + title: format_source(*source_meta), + meta: { + source_location_hint: source_meta + }, + internal: true + } + + Skylight.instrument(opts) { __getobj__.perform } + end end # Used by Delayed::Backend::Base to determine Job#name diff --git a/spec/integration/delayed_job_spec.rb b/spec/integration/delayed_job_spec.rb index ada4917d..7455a1d4 100644 --- a/spec/integration/delayed_job_spec.rb +++ b/spec/integration/delayed_job_spec.rb @@ -276,6 +276,88 @@ def enqueue_job(*args) ) end end + + context "with ActiveRecord model" do + let(:users_migration) do + base = ActiveRecord::Migration + base = base::Current if defined?(base::Current) + + Class.new(base) do + def self.up + create_table :users, force: true do |table| + table.string :name, null: false + table.timestamps null: true + end + end + + def self.down + drop_table :users + end + end + end + + around do |example| + with_sqlite(migration: users_migration) do + example.call + end + end + + before do + stub_const("SkDelayedRecord", Class.new(ActiveRecord::Base) do + self.table_name = "users" + + def good_method + Skylight.instrument(category: "app.zomg") do + SpecHelper.clock.skip 1 + end + end + + def self.good_method + new.good_method + end + end) + end + + # overrides enqueue_job on the outer context + def enqueue_job(_method_name, *, class_method: false) + if class_method + SkDelayedRecord.delay(queue: "queue-name").good_method + else + SkDelayedRecord.create!(name: "test-record").tap do |record| + record.delay(queue: "queue-name").good_method + end + end + end + + specify "instance method" do + enqueue_and_process_job(:good_method) + + server.wait resource: "/report" + report = server.reports[0].to_simple_report + expect(report.endpoint.name).to eq("SkDelayedRecord#good_methodqueue-name") + end + + specify "class method" do + enqueue_and_process_job(:good_method, class_method: true) + + server.wait resource: "/report" + report = server.reports[0].to_simple_report + expect(report.endpoint.name).to eq("SkDelayedRecord.good_methodqueue-name") + end + + specify "instance method on a deleted record" do + SkDelayedRecord.create!(name: "test-record").tap do |record| + record.delay(queue: "queue-name").good_method + record.destroy! + end + + expect { worker.work_off }.not_to raise_error + + server.wait resource: "/report" + report = server.reports[0].to_simple_report + expect(report.endpoint.name).to eq("error") + end + end end def enqueue_job(method_name, *, class_method: false)