Skip to content

Commit

Permalink
fix: add skip_rows column in sync run
Browse files Browse the repository at this point in the history
  • Loading branch information
afthabvp committed Apr 22, 2024
1 parent fa5535a commit 61662c0
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
class AddSkipRowsToSyncRuns < ActiveRecord::Migration[7.1]
def up
add_column :sync_runs, :skip_rows, :integer, default: 0
end

def down
remove_column :sync_runs, :skip_rows
end
end
3 changes: 2 additions & 1 deletion server/db/schema.rb

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 15 additions & 12 deletions server/lib/reverse_etl/extractors/incremental_delta.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ class IncrementalDelta < Base
# TODO: Make it as class method
def read(sync_run_id, activity)
total_query_rows = 0
skip_rows = 0

sync_run = SyncRun.find(sync_run_id)

return log_sync_run_error(sync_run) unless sync_run.may_query?
Expand All @@ -21,9 +23,9 @@ def read(sync_run_id, activity)
current_offset, last_cursor_field_value|

total_query_rows += records.count
process_records(records, sync_run, model)
skip_rows += process_records(records, sync_run, model)
heartbeat(activity)
sync_run.update(current_offset:, total_query_rows:)
sync_run.update(current_offset:, total_query_rows:, skip_rows:)
sync_run.sync.update(current_cursor_field: last_cursor_field_value)
end
# change state querying to queued
Expand All @@ -33,20 +35,19 @@ def read(sync_run_id, activity)
private

def process_records(records, sync_run, model)
Parallel.each(records, in_threads: THREAD_COUNT) do |message|
process_record(message, sync_run, model)
end
Parallel.map(records, in_threads: THREAD_COUNT) do |message|
record = message.record
fingerprint = generate_fingerprint(record.data)
sync_record = process_record(record, sync_run, model)
update_or_create_sync_record(sync_record, record, sync_run, fingerprint)
end.sum
end

def process_record(message, sync_run, model)
record = message.record
fingerprint = generate_fingerprint(record.data)
def process_record(record, sync_run, model)
primary_key = record.data.with_indifferent_access[model.primary_key]

raise StandardError, "Primary key cannot be blank" if primary_key.blank?

sync_record = find_or_initialize_sync_record(sync_run, primary_key)
update_or_create_sync_record(sync_record, record, sync_run, fingerprint)
find_or_initialize_sync_record(sync_run, primary_key)
rescue StandardError => e
Temporal.logger.error(error_message: e.message,
sync_run_id: sync_run.id,
Expand All @@ -73,15 +74,17 @@ def action(sync_record)
end

def update_or_create_sync_record(sync_record, record, sync_run, fingerprint)
return unless new_record?(sync_record, fingerprint)
return 1 unless new_record?(sync_record, fingerprint)

sync_record.assign_attributes(
sync_run_id: sync_run.id,
action: action(sync_record),
fingerprint:,
record: record.data
)

sync_record.save!
0
end
end
end
Expand Down
30 changes: 26 additions & 4 deletions server/spec/lib/reverse_etl/extractors/incremental_delta_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -139,20 +139,42 @@

context "when the primary key is blank" do
it "does not call update_or_create_sync_record" do
message = double("Message", record: double("Record", data: { "TestPrimaryKey" => nil }))
message = double("Record", data: { "TestPrimaryKey" => nil })
expect(subject).not_to receive(:find_or_initialize_sync_record)
expect(subject).not_to receive(:update_or_create_sync_record)
subject.send(:process_record, message, sync_run, model)
end
end

context "when the primary key is not blank" do
it "calls find_or_initialize_sync_record and update_or_create_sync_record" do
message = double("Message", record: double("Record", data: { "TestPrimaryKey" => 1 }))
message = double("Record", data: { "TestPrimaryKey" => 1 })
expect(subject).to receive(:find_or_initialize_sync_record)
expect(subject).to receive(:update_or_create_sync_record)

subject.send(:process_record, message, sync_run, model)
end
end
end

describe "#process_records" do
# record2 and record3 are duplicate
let(:records) { [record1, record2, record3] }

let(:records_without_dup) { [record1, record2] }

context "process_records records" do
it "process_records records with duplicates" do
skip_rows = subject.send(:process_records, records, sync_run1, sync_run1.model)

expect(skip_rows).to eq(1)
end
end

context "process_records records" do
it "process_records records without duplicate" do
skip_rows = subject.send(:process_records, records_without_dup, sync_run1, sync_run1.model)

expect(skip_rows).to eq(0)
end
end
end
end

0 comments on commit 61662c0

Please sign in to comment.