Skip to content

Commit

Permalink
Merge pull request #1366 from sul-dlss/report-via-traject-writer
Browse files Browse the repository at this point in the history
Reporting indexing events to SDR based on solr responses
  • Loading branch information
jcoyne authored Mar 1, 2024
2 parents c826aad + f48aac7 commit c5e47a4
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 50 deletions.
28 changes: 28 additions & 0 deletions lib/sdr_events.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,34 @@ def report_indexing_errored(druid, target:, message:, context: nil)
create_event(druid:, target:, type: 'indexing_errored', data: { message:, context: }.compact)
end

# Take a SolrBetterJsonWriter::Batch and report successful adds/deletes
def report_indexing_batch_success(batch, target:)
batch.actions.each do |action, druid, _data|
next unless druid

case action
when :delete
report_indexing_deleted(druid, target:)
when :add
report_indexing_success(druid, target:)
end
end
end

# Take a SolrBetterJsonWriter::Batch and report failed adds/deletes
def report_indexing_batch_errored(batch, target:, exception:)
batch.actions.each do |action, druid, _data|
next unless druid

case action
when :delete
report_indexing_errored(druid, target:, message: 'delete failed', context: exception)
when :add
report_indexing_errored(druid, target:, message: 'add failed', context: exception)
end
end
end

private

# Generic event creation; prefer more specific methods
Expand Down
4 changes: 1 addition & 3 deletions lib/traject/config/geo_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ def geoserver_url(record)
# delete records form the index
context.output_hash['id'] = ["stanford-#{druid}"]

SdrEvents.report_indexing_deleted(druid, target: settings['purl_fetcher.target'])
context.skip!("Delete: #{druid}")
end

Expand Down Expand Up @@ -475,12 +474,11 @@ def geoserver_url(record)
end
end

each_record do |record, context|
each_record do |_record, context|
t0 = context.clipboard[:benchmark_start_time]
t1 = Time.now

logger.debug('geo_config.rb') { "Processed #{context.output_hash['id']} (#{t1 - t0}s)" }
SdrEvents.report_indexing_success(record.druid, target: settings['purl_fetcher.target'])
end

# rubocop:disable Metrics/MethodLength
Expand Down
75 changes: 53 additions & 22 deletions lib/traject/writers/solr_better_json_writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ def put(context)
end

def drain_queue
batch = Traject::Util.drain_queue(@batched_queue)
@thread_pool.maybe_in_thread_pool(batch) { |batch_arg| send_batch(batch_arg) }
contexts = Traject::Util.drain_queue(@batched_queue)
@thread_pool.maybe_in_thread_pool(contexts) { |batch_arg| send_batch(batch_arg) }
end

# Send the given batch of contexts. If something goes wrong, send
# them one at a time.
# @param [Array<Traject::Indexer::Context>] an array of contexts
def send_batch(batch)
def send_batch(contexts)
batch = Batch.new(contexts)
return if batch.empty?

json_package = generate_json(batch)
begin
resp = @http_client.post @solr_update_url, json_package, 'Content-type' => 'application/json'
resp = @http_client.post @solr_update_url, batch.generate_json, 'Content-type' => 'application/json'
rescue StandardError => exception # rubocop:disable Naming/RescuedExceptionsVariableName https://github.com/rubocop/rubocop/issues/11809
end

Expand All @@ -58,25 +58,27 @@ def send_batch(batch)

@retry_count += 1

batch.each do |c|
batch.each do |context|
sleep rand(0..max_sleep_seconds)
if send_single(c)
if send_single(context)
@retry_count = [0, @retry_count - 0.1].min
else
@retry_count += 0.1
end
end
else
@retry_count = 0
SdrEvents.report_indexing_batch_success(batch, target: @settings['purl_fetcher.target'])
end
end

# Send a single context to Solr, logging an error if need be
# @param [Traject::Indexer::Context] c The context whose document you want to send
def send_single(c)
json_package = generate_json([c])
def send_single(context)
batch = Batch.new([context])

begin
resp = @http_client.post @solr_update_url, json_package, 'Content-type' => 'application/json'
resp = @http_client.post @solr_update_url, batch.generate_json, 'Content-type' => 'application/json'
# Catch Timeouts and network errors as skipped records, but otherwise
# allow unexpected errors to propagate up.
rescue *skippable_exceptions => exception # rubocop:disable Naming/RescuedExceptionsVariableName https://github.com/rubocop/rubocop/issues/11809
Expand All @@ -89,17 +91,20 @@ def send_single(c)
else
"Solr error response: #{resp.status}: #{resp.body}"
end
logger.error "Could not add record #{c.record_inspect}: #{msg}"
logger.error "Could not add record #{context.record_inspect}: #{msg}"
logger.debug("\t" + exception.backtrace.join("\n\t")) if exception
logger.debug(c.source_record.to_s) if c.source_record
logger.debug(context.source_record.to_s) if context.source_record

@skipped_record_incrementer.increment
if @max_skipped and skipped_record_count > @max_skipped
raise MaxSkippedRecordsExceeded,
"#{self.class.name}: Exceeded maximum number of skipped records (#{@max_skipped}): aborting"
end

SdrEvents.report_indexing_batch_errored(batch, target: @settings['purl_fetcher.target'], exception: msg)
return false
else
SdrEvents.report_indexing_batch_success(batch, target: @settings['purl_fetcher.target'])
end

true
Expand All @@ -109,18 +114,44 @@ def max_sleep_seconds
Float(2**@retry_count)
end

def generate_json(batch)
arr = []
# Collection of Traject contexts to be sent to solr
class Batch
def initialize(contexts)
@contexts = contexts
end

batch.each do |c|
if c.skip?
id = Array(c.output_hash['id']).first
arr << "delete: #{JSON.generate(id)}" if id
else
arr << "add: #{JSON.generate(doc: c.output_hash)}"
end
def empty?
@contexts.empty?
end

def each(&)
@contexts.each(&)
end

'{' + arr.join(",\n") + '}'
# Array of [action, druid, data] triples, where action is :add or :delete
# and data is either the doc id or the full doc hash. Druid is empty for
# non-SDR content.
def actions
@actions ||= @contexts.map do |context|
if context.skip?
id = Array(context.output_hash['id']).first
[:delete, context.source_record&.druid, id] if id
else
[:add, context.source_record&.druid, context.output_hash]
end
end.compact
end

# Make a JSON string for sending to solr /update API
def generate_json
actions.map do |action, _druid, data|
case action
when :delete
"\"delete\":#{JSON.generate(data)}"
when :add
"\"add\":#{JSON.generate(doc: data)}"
end
end.join(",\n").prepend('{').concat('}')
end
end
end
16 changes: 0 additions & 16 deletions spec/integration/geo_config_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -312,22 +312,6 @@ def stub_mods_request(druid, body)
)
end

context 'when indexing is successful' do
it 'creates an indexing success event' do
expect(result).to be_a Hash
expect(SdrEvents).to have_received(:report_indexing_success).with(druid, target: 'Earthworks')
end
end

context 'when the item was deleted' do
let(:record) { { id: "druid:#{druid}", delete: true } }

it 'creates an indexing delete event' do
expect(result).to be_nil
expect(SdrEvents).to have_received(:report_indexing_deleted).with(druid, target: 'Earthworks')
end
end

context 'when the item has no public XML' do
before { allow(record).to receive(:public_xml).and_return(nil) }

Expand Down
109 changes: 100 additions & 9 deletions spec/lib/traject/writers/solr_better_json_writer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,16 @@
require 'spec_helper'

describe Traject::SolrBetterJsonWriter do
subject(:writer) { described_class.new('solr_json_writer.http_client' => http_client, 'solr.update_url' => 'http://localhost/solr', 'solr_better_json_writer.debounce_timeout' => 1) }
subject(:writer) do
described_class.new(
'solr_json_writer.http_client' => http_client,
'solr.update_url' => 'http://localhost/solr',
'solr_better_json_writer.debounce_timeout' => 1,
'purl_fetcher.target' => 'Searchworks',
'solr_writer.max_skipped' => 100
)
end

let(:http_client) { double(post: double(status: 200)) }
let(:doc) { Traject::Indexer::Context.new.tap { |doc| doc.output_hash['id'] = [1] } }
let(:skipped_doc) do
Expand Down Expand Up @@ -39,26 +48,108 @@
describe '#send_batch' do
it 'adds documents to solr' do
writer.send_batch([doc])
expect(http_client).to have_received(:post).with('http://localhost/solr', '{add: {"doc":{"id":[1]}}}',
'Content-type' => 'application/json')
expect(http_client).to have_received(:post).with(
'http://localhost/solr',
'{"add":{"doc":{"id":[1]}}}',
'Content-type' => 'application/json'
)
end

it 'deletes documents from solr' do
writer.send_batch([skipped_doc])
expect(http_client).to have_received(:post).with('http://localhost/solr', '{delete: 2}',
'Content-type' => 'application/json')
expect(http_client).to have_received(:post).with(
'http://localhost/solr',
'{"delete":2}',
'Content-type' => 'application/json'
)
end

it 'skips writing documents that have no id' do
writer.send_batch([bad_doc])
expect(http_client).to have_received(:post).with('http://localhost/solr', '{}',
'Content-type' => 'application/json')
expect(http_client).to have_received(:post).with(
'http://localhost/solr',
'{}',
'Content-type' => 'application/json'
)
end

it 'sends the request as a batch' do
writer.send_batch([doc, skipped_doc])
expect(http_client).to have_received(:post).with('http://localhost/solr',
"{add: {\"doc\":{\"id\":[1]}},\ndelete: 2}", 'Content-type' => 'application/json')
expect(http_client).to have_received(:post).with(
'http://localhost/solr',
"{\"add\":{\"doc\":{\"id\":[1]}},\n\"delete\":2}",
'Content-type' => 'application/json'
)
end
end

describe 'event reporting' do
let(:sdr_docs) do
[
Traject::Indexer::Context.new.tap do |doc|
doc.output_hash['id'] = [1]
doc.source_record = PublicXmlRecord.new('bb112zx3193')
end,
Traject::Indexer::Context.new.tap do |doc|
doc.output_hash['id'] = [2]
doc.source_record = PublicXmlRecord.new('py305sy7961')
end
]
end

before do
allow(Settings.sdr_events).to receive(:enabled).and_return(true)
allow(SdrEvents).to receive_messages(
report_indexing_deleted: true,
report_indexing_success: true,
report_indexing_errored: true
)
end

context 'when SDR events are disabled' do
before do
allow(Settings.sdr_events).to receive(:enabled).and_return(false)
allow(Dor::Event::Client).to receive(:create)
end

it 'does not report any events' do
writer.send_batch(sdr_docs)
expect(Dor::Event::Client).not_to have_received(:create)
end
end

context 'when all docs index successfully' do
it 'reports docs that are successfully added' do
writer.send_batch(sdr_docs)
expect(SdrEvents).to have_received(:report_indexing_success).with('bb112zx3193', target: 'Searchworks')
expect(SdrEvents).to have_received(:report_indexing_success).with('py305sy7961', target: 'Searchworks')
end

it 'reports docs that are successfully deleted' do
skipped_doc.source_record = PublicXmlRecord.new('bj057dg6517')
writer.send_batch([skipped_doc])
expect(SdrEvents).to have_received(:report_indexing_deleted).with('bj057dg6517', target: 'Searchworks')
end
end

context 'when some docs fail to index' do
let(:http_client) { double(post: double(status: 400, body: 'Malformed data')) }

it 'reports docs that fail to index' do
writer.send_batch(sdr_docs)
expect(SdrEvents).to have_received(:report_indexing_errored).with(
'bb112zx3193',
target: 'Searchworks',
message: 'add failed',
context: 'Solr error response: 400: Malformed data'
)
expect(SdrEvents).to have_received(:report_indexing_errored).with(
'py305sy7961',
target: 'Searchworks',
message: 'add failed',
context: 'Solr error response: 400: Malformed data'
)
end
end
end
end

0 comments on commit c5e47a4

Please sign in to comment.