diff --git a/server/lib/reverse_etl/extractors/incremental_delta.rb b/server/lib/reverse_etl/extractors/incremental_delta.rb index 991e8a692..f40897aa9 100644 --- a/server/lib/reverse_etl/extractors/incremental_delta.rb +++ b/server/lib/reverse_etl/extractors/incremental_delta.rb @@ -40,6 +40,8 @@ def process_record(message, sync_run, model) fingerprint = generate_fingerprint(record.data) primary_key = record.data.with_indifferent_access[model.primary_key] + raise StandardError, "Primary key cannot be blank" if primary_key.blank? + sync_record = find_or_initialize_sync_record(sync_run, primary_key) update_or_create_sync_record(sync_record, record, sync_run, fingerprint) rescue StandardError => e diff --git a/server/spec/lib/reverse_etl/extractors/incremental_delta_spec.rb b/server/spec/lib/reverse_etl/extractors/incremental_delta_spec.rb index d314d39dc..3136817bd 100644 --- a/server/spec/lib/reverse_etl/extractors/incremental_delta_spec.rb +++ b/server/spec/lib/reverse_etl/extractors/incremental_delta_spec.rb @@ -126,4 +126,29 @@ # TODO: test for partial recovery via currrent offset end + + describe "#process_record" do + let(:sync_run) do + create(:sync_run, sync:, workspace: sync.workspace, source:, destination:, model: sync.model, status: "started") + end + let(:model) { create(:model) } + + context "when the primary key is blank" do + it "does not call update_or_create_sync_record" do + message = double("Message", record: double("Record", data: { "TestPrimaryKey" => nil })) + expect(subject).not_to receive(:find_or_initialize_sync_record) + expect(subject).not_to receive(:update_or_create_sync_record) + subject.send(:process_record, message, sync_run, model) + end + end + + context "when the primary key is not blank" do + it "calls find_or_initialize_sync_record and update_or_create_sync_record" do + message = double("Message", record: double("Record", data: { "TestPrimaryKey" => 1 })) + expect(subject).to receive(:find_or_initialize_sync_record) + expect(subject).to receive(:update_or_create_sync_record) + subject.send(:process_record, message, sync_run, model) + end + end + end end