-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reporting indexing events to SDR based on solr responses #1366
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -170,7 +170,6 @@ def geoserver_url(record) | |
# delete records form the index | ||
context.output_hash['id'] = ["stanford-#{druid}"] | ||
|
||
SdrEvents.report_indexing_deleted(druid, target: settings['purl_fetcher.target']) | ||
context.skip!("Delete: #{druid}") | ||
end | ||
|
||
|
@@ -465,12 +464,11 @@ def geoserver_url(record) | |
end | ||
end | ||
|
||
each_record do |record, context| | ||
each_record do |_record, context| | ||
t0 = context.clipboard[:benchmark_start_time] | ||
t1 = Time.now | ||
|
||
logger.debug('geo_config.rb') { "Processed #{context.output_hash['id']} (#{t1 - t0}s)" } | ||
SdrEvents.report_indexing_success(record.druid, target: settings['purl_fetcher.target']) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. moved to writer |
||
end | ||
|
||
# rubocop:disable Metrics/MethodLength | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,19 +31,19 @@ def put(context) | |
end | ||
|
||
def drain_queue | ||
batch = Traject::Util.drain_queue(@batched_queue) | ||
@thread_pool.maybe_in_thread_pool(batch) { |batch_arg| send_batch(batch_arg) } | ||
contexts = Traject::Util.drain_queue(@batched_queue) | ||
@thread_pool.maybe_in_thread_pool(contexts) { |batch_arg| send_batch(batch_arg) } | ||
end | ||
|
||
# Send the given batch of contexts. If something goes wrong, send | ||
# them one at a time. | ||
# @param [Array<Traject::Indexer::Context>] an array of contexts | ||
def send_batch(batch) | ||
def send_batch(contexts) | ||
batch = Batch.new(contexts) | ||
return if batch.empty? | ||
|
||
json_package = generate_json(batch) | ||
begin | ||
resp = @http_client.post @solr_update_url, json_package, 'Content-type' => 'application/json' | ||
resp = @http_client.post @solr_update_url, batch.generate_json, 'Content-type' => 'application/json' | ||
rescue StandardError => exception # rubocop:disable Naming/RescuedExceptionsVariableName https://github.com/rubocop/rubocop/issues/11809 | ||
end | ||
|
||
|
@@ -58,25 +58,27 @@ def send_batch(batch) | |
|
||
@retry_count += 1 | ||
|
||
batch.each do |c| | ||
batch.each do |context| | ||
sleep rand(0..max_sleep_seconds) | ||
if send_single(c) | ||
if send_single(context) | ||
@retry_count = [0, @retry_count - 0.1].min | ||
else | ||
@retry_count += 0.1 | ||
end | ||
end | ||
else | ||
@retry_count = 0 | ||
SdrEvents.report_indexing_batch_success(batch, target: @settings['purl_fetcher.target']) | ||
end | ||
end | ||
|
||
# Send a single context to Solr, logging an error if need be | ||
# @param [Traject::Indexer::Context] c The context whose document you want to send | ||
def send_single(c) | ||
json_package = generate_json([c]) | ||
def send_single(context) | ||
batch = Batch.new([context]) | ||
|
||
begin | ||
resp = @http_client.post @solr_update_url, json_package, 'Content-type' => 'application/json' | ||
resp = @http_client.post @solr_update_url, batch.generate_json, 'Content-type' => 'application/json' | ||
# Catch Timeouts and network errors as skipped records, but otherwise | ||
# allow unexpected errors to propagate up. | ||
rescue *skippable_exceptions => exception # rubocop:disable Naming/RescuedExceptionsVariableName https://github.com/rubocop/rubocop/issues/11809 | ||
|
@@ -89,17 +91,20 @@ def send_single(c) | |
else | ||
"Solr error response: #{resp.status}: #{resp.body}" | ||
end | ||
logger.error "Could not add record #{c.record_inspect}: #{msg}" | ||
logger.error "Could not add record #{context.record_inspect}: #{msg}" | ||
logger.debug("\t" + exception.backtrace.join("\n\t")) if exception | ||
logger.debug(c.source_record.to_s) if c.source_record | ||
logger.debug(context.source_record.to_s) if context.source_record | ||
|
||
@skipped_record_incrementer.increment | ||
if @max_skipped and skipped_record_count > @max_skipped | ||
raise MaxSkippedRecordsExceeded, | ||
"#{self.class.name}: Exceeded maximum number of skipped records (#{@max_skipped}): aborting" | ||
end | ||
|
||
SdrEvents.report_indexing_batch_errored(batch, target: @settings['purl_fetcher.target'], exception: msg) | ||
return false | ||
else | ||
SdrEvents.report_indexing_batch_success(batch, target: @settings['purl_fetcher.target']) | ||
end | ||
|
||
true | ||
|
@@ -109,18 +114,44 @@ def max_sleep_seconds | |
Float(2**@retry_count) | ||
end | ||
|
||
def generate_json(batch) | ||
arr = [] | ||
# Collection of Traject contexts to be sent to solr | ||
class Batch | ||
Comment on lines
+117
to
+118
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm having trouble understanding what the responsibility of this class is. It seems like it does several things, which makes me wonder if it should be several classes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I initially extracted it because I needed an underlying data structure that mapped desired actions to documents. That data structure could then be used to generate the JSON to solr as well as to report about the status of the actions. I could instead extract the reporting-related stuff to methods directly on the writer that take a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, it feels to me like the reporting stuff should be separate if possible. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved it to the |
||
def initialize(contexts) | ||
@contexts = contexts | ||
end | ||
|
||
batch.each do |c| | ||
if c.skip? | ||
id = Array(c.output_hash['id']).first | ||
arr << "delete: #{JSON.generate(id)}" if id | ||
else | ||
arr << "add: #{JSON.generate(doc: c.output_hash)}" | ||
end | ||
def empty? | ||
@contexts.empty? | ||
end | ||
|
||
def each(&) | ||
@contexts.each(&) | ||
end | ||
|
||
'{' + arr.join(",\n") + '}' | ||
# Array of [action, druid, data] triples, where action is :add or :delete | ||
# and data is either the doc id or the full doc hash. Druid is empty for | ||
# non-SDR content. | ||
def actions | ||
@actions ||= @contexts.map do |context| | ||
if context.skip? | ||
id = Array(context.output_hash['id']).first | ||
[:delete, context.source_record&.druid, id] if id | ||
else | ||
[:add, context.source_record&.druid, context.output_hash] | ||
end | ||
end.compact | ||
end | ||
|
||
# Make a JSON string for sending to solr /update API | ||
def generate_json | ||
actions.map do |action, _druid, data| | ||
case action | ||
when :delete | ||
"\"delete\":#{JSON.generate(data)}" | ||
when :add | ||
"\"add\":#{JSON.generate(doc: data)}" | ||
end | ||
end.join(",\n").prepend('{').concat('}') | ||
end | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved to writer