Skip to content

Commit

Permalink
Reduce SQL calls when incrementing/decrementing run counters (#881)
Browse files Browse the repository at this point in the history
* Reduce SQL calls when incrementing run counters

SQL logs before changes (using `#increment!`):

```sql
Bulkrax::ImporterRun Load (8.9ms)  SELECT  "bulkrax_importer_runs".* FROM "bulkrax_importer_runs" WHERE "bulkrax_importer_runs"."id" = $1 LIMIT $2  [["id", 1], ["LIMIT", 1]]
Bulkrax::ImporterRun Update All (2.5ms)  UPDATE "bulkrax_importer_runs" SET "processed_records" = COALESCE("processed_records", 0) + 1 WHERE "bulkrax_importer_runs"."id" = $1  [["id", 1]]
```

SQL logs after changes (using `#increment_counter`):

```sql
Bulkrax::ImporterRun Update All (6.5ms)  UPDATE "bulkrax_importer_runs" SET "processed_records" = COALESCE("processed_records", 0) + 1 WHERE "bulkrax_importer_runs"."id" = $1  [["id", 1]]
```

The `SELECT` statement serves no purpose since the `UPDATE` statement is
atomic

* replace #decrement! with #decrement_counter

* put rubocop disable comments back

* WIP: update counter-related specs

* fix more specs

* make some counter specs more specific

* remove references to "children" ImporterRun columns

These columns (`processed_children` and `failed_children`) were renamed
in the RenameChildrenCountersToRelationships migration; they no longer
exist

* prefer avoiding raw SQL

* rubocop
  • Loading branch information
bkiahstroud authored Oct 31, 2023
1 parent f997321 commit 1c39f9f
Show file tree
Hide file tree
Showing 14 changed files with 269 additions and 92 deletions.
4 changes: 2 additions & 2 deletions app/jobs/bulkrax/create_relationships_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def perform(parent_identifier:, importer_run_id:) # rubocop:disable Metrics/AbcS

if errors.present?
# rubocop:disable Rails/SkipsModelValidations
importer_run.increment!(:failed_relationships, number_of_failures)
ImporterRun.update_counters(importer_run_id, failed_relationships: number_of_failures)
# rubocop:enable Rails/SkipsModelValidations

parent_entry&.set_status_info(errors.last, importer_run)
Expand All @@ -108,7 +108,7 @@ def perform(parent_identifier:, importer_run_id:) # rubocop:disable Metrics/AbcS
return false # stop current job from continuing to run after rescheduling
else
# rubocop:disable Rails/SkipsModelValidations
Bulkrax::ImporterRun.find(importer_run_id).increment!(:processed_relationships, number_of_successes)
ImporterRun.update_counters(importer_run_id, processed_relationships: number_of_successes)
# rubocop:enable Rails/SkipsModelValidations
end
end
Expand Down
8 changes: 4 additions & 4 deletions app/jobs/bulkrax/delete_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ module Bulkrax
class DeleteJob < ApplicationJob
queue_as :import

# rubocop:disable Rails/SkipsModelValidations
def perform(entry, importer_run)
obj = entry.factory.find
obj&.delete
ImporterRun.find(importer_run.id).increment!(:deleted_records)
ImporterRun.find(importer_run.id).decrement!(:enqueued_records)
# rubocop:disable Rails/SkipsModelValidations
ImporterRun.increment_counter(:deleted_records, importer_run.id)
ImporterRun.decrement_counter(:enqueued_records, importer_run.id)
# rubocop:enable Rails/SkipsModelValidations
entry.save!
entry.importer.current_run = ImporterRun.find(importer_run.id)
entry.importer.record_status
entry.set_status_info("Deleted", ImporterRun.find(importer_run.id))
end
# rubocop:enable Rails/SkipsModelValidations
end
end
12 changes: 6 additions & 6 deletions app/jobs/bulkrax/export_work_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ def perform(*args)
entry.save
rescue StandardError
# rubocop:disable Rails/SkipsModelValidations
exporter_run.increment!(:failed_records)
exporter_run.decrement!(:enqueued_records) unless exporter_run.enqueued_records <= 0
ExporterRun.increment_counter(:failed_records, args[1])
ExporterRun.decrement_counter(:enqueued_records, args[1]) unless exporter_run.reload.enqueued_records <= 0
raise
else
if entry.failed?
exporter_run.increment!(:failed_records)
exporter_run.decrement!(:enqueued_records) unless exporter_run.enqueued_records <= 0
ExporterRun.increment_counter(:failed_records, args[1])
ExporterRun.decrement_counter(:enqueued_records, args[1]) unless exporter_run.reload.enqueued_records <= 0
raise entry.reload.current_status.error_class.constantize
else
exporter_run.increment!(:processed_records)
exporter_run.decrement!(:enqueued_records) unless exporter_run.enqueued_records <= 0
ExporterRun.increment_counter(:processed_records, args[1])
ExporterRun.decrement_counter(:enqueued_records, args[1]) unless exporter_run.reload.enqueued_records <= 0
end
# rubocop:enable Rails/SkipsModelValidations
end
Expand Down
12 changes: 6 additions & 6 deletions app/jobs/bulkrax/import_collection_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ def perform(*args)
begin
entry.build
entry.save!
ImporterRun.find(args[1]).increment!(:processed_records)
ImporterRun.find(args[1]).increment!(:processed_collections)
ImporterRun.find(args[1]).decrement!(:enqueued_records) unless ImporterRun.find(args[1]).enqueued_records <= 0 # rubocop:disable Style/IdenticalConditionalBranches
ImporterRun.increment_counter(:processed_records, args[1])
ImporterRun.increment_counter(:processed_collections, args[1])
ImporterRun.decrement_counter(:enqueued_records, args[1]) unless ImporterRun.find(args[1]).enqueued_records <= 0 # rubocop:disable Style/IdenticalConditionalBranches
rescue => e
ImporterRun.find(args[1]).increment!(:failed_records)
ImporterRun.find(args[1]).increment!(:failed_collections)
ImporterRun.find(args[1]).decrement!(:enqueued_records) unless ImporterRun.find(args[1]).enqueued_records <= 0 # rubocop:disable Style/IdenticalConditionalBranches
ImporterRun.increment_counter(:failed_records, args[1])
ImporterRun.increment_counter(:failed_collections, args[1])
ImporterRun.decrement_counter(:enqueued_records, args[1]) unless ImporterRun.find(args[1]).enqueued_records <= 0 # rubocop:disable Style/IdenticalConditionalBranches
raise e
end
entry.importer.current_run = ImporterRun.find(args[1])
Expand Down
12 changes: 6 additions & 6 deletions app/jobs/bulkrax/import_file_set_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ def perform(entry_id, importer_run_id)
entry.build
if entry.succeeded?
# rubocop:disable Rails/SkipsModelValidations
ImporterRun.find(importer_run_id).increment!(:processed_records)
ImporterRun.find(importer_run_id).increment!(:processed_file_sets)
ImporterRun.increment_counter(:processed_records, importer_run_id)
ImporterRun.increment_counter(:processed_file_sets, importer_run_id)
else
ImporterRun.find(importer_run_id).increment!(:failed_records)
ImporterRun.find(importer_run_id).increment!(:failed_file_sets)
ImporterRun.increment_counter(:failed_records, importer_run_id)
ImporterRun.increment_counter(:failed_file_sets, importer_run_id)
# rubocop:enable Rails/SkipsModelValidations
end
ImporterRun.find(importer_run_id).decrement!(:enqueued_records) unless ImporterRun.find(importer_run_id).enqueued_records <= 0 # rubocop:disable Rails/SkipsModelValidations
ImporterRun.decrement_counter(:enqueued_records, importer_run_id) unless ImporterRun.find(importer_run_id).enqueued_records <= 0 # rubocop:disable Rails/SkipsModelValidations
entry.save!
entry.importer.current_run = ImporterRun.find(importer_run_id)
entry.importer.record_status
Expand All @@ -40,7 +40,7 @@ def perform(entry_id, importer_run_id)
if entry.import_attempts < 5
ImportFileSetJob.set(wait: (entry.import_attempts + 1).minutes).perform_later(entry_id, importer_run_id)
else
ImporterRun.find(importer_run_id).decrement!(:enqueued_records) # rubocop:disable Rails/SkipsModelValidations
ImporterRun.decrement_counter(:enqueued_records, importer_run_id) # rubocop:disable Rails/SkipsModelValidations
entry.set_status_info(e)
end
end
Expand Down
10 changes: 5 additions & 5 deletions app/jobs/bulkrax/import_work_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ def perform(entry_id, run_id, time_to_live = 3, *)
entry = Entry.find(entry_id)
entry.build
if entry.status == "Complete"
ImporterRun.find(run_id).increment!(:processed_records)
ImporterRun.find(run_id).increment!(:processed_works)
ImporterRun.increment_counter(:processed_records, run_id)
ImporterRun.increment_counter(:processed_works, run_id)
else
# do not retry here because whatever parse error kept you from creating a work will likely
# keep preventing you from doing so.
ImporterRun.find(run_id).increment!(:failed_records)
ImporterRun.find(run_id).increment!(:failed_works)
ImporterRun.increment_counter(:failed_records, run_id)
ImporterRun.increment_counter(:failed_works, run_id)
end
# Regardless of completion or not, we want to decrement the enqueued records.
ImporterRun.find(run_id).decrement!(:enqueued_records) unless ImporterRun.find(run_id).enqueued_records <= 0
ImporterRun.decrement_counter(:enqueued_records, run_id) unless ImporterRun.find(run_id).enqueued_records <= 0

entry.save!
entry.importer.current_run = ImporterRun.find(run_id)
Expand Down
4 changes: 2 additions & 2 deletions app/parsers/bulkrax/application_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ def invalid_record(message)
current_run.invalid_records ||= ""
current_run.invalid_records += message
current_run.save
ImporterRun.find(current_run.id).increment!(:failed_records)
ImporterRun.find(current_run.id).decrement!(:enqueued_records) unless ImporterRun.find(current_run.id).enqueued_records <= 0 # rubocop:disable Style/IdenticalConditionalBranches
ImporterRun.increment_counter(:failed_records, current_run.id)
ImporterRun.decrement_counter(:enqueued_records, current_run.id) unless ImporterRun.find(current_run.id).enqueued_records <= 0 # rubocop:disable Style/IdenticalConditionalBranches
end
# rubocop:enable Rails/SkipsModelValidations

Expand Down
8 changes: 4 additions & 4 deletions spec/factories/bulkrax_exporter_runs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

FactoryBot.define do
factory :bulkrax_exporter_run, class: 'Bulkrax::ExporterRun' do
exporter { nil }
exporter { FactoryBot.build(:bulkrax_exporter) }
total_work_entries { 1 }
enqueued_records { 1 }
processed_records { 1 }
deleted_records { 1 }
failed_records { 1 }
processed_records { 0 }
deleted_records { 0 }
failed_records { 0 }
end
end
6 changes: 3 additions & 3 deletions spec/factories/bulkrax_importer_runs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
importer { FactoryBot.build(:bulkrax_importer) }
total_work_entries { 1 }
enqueued_records { 1 }
processed_records { 1 }
deleted_records { 1 }
failed_records { 1 }
processed_records { 0 }
deleted_records { 0 }
failed_records { 0 }
end
end
24 changes: 14 additions & 10 deletions spec/jobs/bulkrax/delete_work_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
module Bulkrax
RSpec.describe DeleteWorkJob, type: :job do
subject(:delete_work_job) { described_class.new }
let(:entry) { FactoryBot.build(:bulkrax_entry) }
let(:importer_run) { FactoryBot.build(:bulkrax_importer_run) }
let(:entry) { create(:bulkrax_entry) }
let(:importer_run) { create(:bulkrax_importer_run) }

describe 'successful job object removed' do
before do
Expand All @@ -18,12 +18,14 @@ module Bulkrax
end

it 'increments :deleted_records' do
entry.save
importer_run.save
expect(importer_run.enqueued_records).to eq(1)
expect(importer_run.deleted_records).to eq(0)

delete_work_job.perform(entry, importer_run)
importer_run.reload
expect(importer_run.enqueued_records).to equal(0)
expect(importer_run.deleted_records).to equal(2)

expect(importer_run.enqueued_records).to eq(0)
expect(importer_run.deleted_records).to eq(1)
end
end

Expand All @@ -35,12 +37,14 @@ module Bulkrax
end

it 'increments :deleted_records' do
entry.save
importer_run.save
expect(importer_run.enqueued_records).to eq(1)
expect(importer_run.deleted_records).to eq(0)

delete_work_job.perform(entry, importer_run)
importer_run.reload
expect(importer_run.enqueued_records).to equal(0)
expect(importer_run.deleted_records).to equal(2)

expect(importer_run.enqueued_records).to eq(0)
expect(importer_run.deleted_records).to eq(1)
end
end
end
Expand Down
38 changes: 29 additions & 9 deletions spec/jobs/bulkrax/export_work_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,43 @@
module Bulkrax
RSpec.describe ExportWorkJob, type: :job do
subject(:export_work_job) { described_class.new }
let(:entry) { FactoryBot.build(:bulkrax_entry) }
let(:exporter_run) { FactoryBot.build(:bulkrax_exporter_run) }
let(:exporter) { create(:bulkrax_exporter, :all) }
let(:entry) { create(:bulkrax_entry, importerexporter: exporter) }
let(:exporter_run) { create(:bulkrax_exporter_run, exporter: exporter) }

before do
allow(Bulkrax::Entry).to receive(:find).with(1).and_return(entry)
allow(Bulkrax::ExporterRun).to receive(:find).with(1).and_return(exporter_run)
allow(entry).to receive(:build)
end

describe 'successful job' do
before do
allow(entry).to receive(:save).and_return(true)
it 'increments :processed_records' do
expect(exporter_run.processed_records).to eq(0)

export_work_job.perform(entry.id, exporter_run.id)
exporter_run.reload

expect(exporter_run.processed_records).to eq(1)
end
it 'increments :processed_records and decrements enqueued record' do
expect(exporter_run).to receive(:increment!).with(:processed_records)
expect(exporter_run).to receive(:decrement!).with(:enqueued_records)
export_work_job.perform(1, 1)

it 'decrements :enqueued_records' do
expect(exporter_run.enqueued_records).to eq(1)

export_work_job.perform(entry.id, exporter_run.id)
exporter_run.reload

expect(exporter_run.enqueued_records).to eq(0)
end

it "doesn't change unrelated counters" do
expect(exporter_run.failed_records).to eq(0)
expect(exporter_run.deleted_records).to eq(0)

export_work_job.perform(1, exporter_run.id)
exporter_run.reload

expect(exporter_run.failed_records).to eq(0)
expect(exporter_run.deleted_records).to eq(0)
end
end
end
Expand Down
Loading

0 comments on commit 1c39f9f

Please sign in to comment.