Skip to content

Commit

Permalink
Refactor SolrBetterJsonWriter for easier management of record batches
Browse files Browse the repository at this point in the history
  • Loading branch information
thatbudakguy committed Feb 27, 2024
1 parent 9ce95d2 commit c3bcde5
Showing 1 changed file with 48 additions and 22 deletions.
70 changes: 48 additions & 22 deletions lib/traject/writers/solr_better_json_writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ def put(context)
end

def drain_queue
batch = Traject::Util.drain_queue(@batched_queue)
@thread_pool.maybe_in_thread_pool(batch) { |batch_arg| send_batch(batch_arg) }
contexts = Traject::Util.drain_queue(@batched_queue)
@thread_pool.maybe_in_thread_pool(contexts) { |batch_arg| send_batch(batch_arg) }
end

# Send the given batch of contexts. If something goes wrong, send
# them one at a time.
# @param [Array<Traject::Indexer::Context>] an array of contexts
def send_batch(batch)
def send_batch(contexts)
batch = Batch.new(contexts)
return if batch.empty?

json_package = generate_json(batch)
begin
resp = @http_client.post @solr_update_url, json_package, 'Content-type' => 'application/json'
resp = @http_client.post @solr_update_url, batch.generate_json, 'Content-type' => 'application/json'
rescue StandardError => exception # rubocop:disable Naming/RescuedExceptionsVariableName https://github.com/rubocop/rubocop/issues/11809
end

Expand All @@ -58,9 +58,9 @@ def send_batch(batch)

@retry_count += 1

batch.each do |c|
batch.each do |context|
sleep rand(0..max_sleep_seconds)
if send_single(c)
if send_single(context)
@retry_count = [0, @retry_count - 0.1].min
else
@retry_count += 0.1
Expand All @@ -73,10 +73,11 @@ def send_batch(batch)

# Send a single context to Solr, logging an error if need be
# @param [Traject::Indexer::Context] c The context whose document you want to send
def send_single(c)
json_package = generate_json([c])
def send_single(context)
batch = Batch.new([context])

begin
resp = @http_client.post @solr_update_url, json_package, 'Content-type' => 'application/json'
resp = @http_client.post @solr_update_url, batch.generate_json, 'Content-type' => 'application/json'
# Catch Timeouts and network errors as skipped records, but otherwise
# allow unexpected errors to propagate up.
rescue *skippable_exceptions => exception # rubocop:disable Naming/RescuedExceptionsVariableName https://github.com/rubocop/rubocop/issues/11809
Expand All @@ -89,9 +90,9 @@ def send_single(c)
else
"Solr error response: #{resp.status}: #{resp.body}"
end
logger.error "Could not add record #{c.record_inspect}: #{msg}"
logger.error "Could not add record #{context.record_inspect}: #{msg}"
logger.debug("\t" + exception.backtrace.join("\n\t")) if exception
logger.debug(c.source_record.to_s) if c.source_record
logger.debug(context.source_record.to_s) if context.source_record

@skipped_record_incrementer.increment
if @max_skipped and skipped_record_count > @max_skipped
Expand All @@ -109,18 +110,43 @@ def max_sleep_seconds
Float(2**@retry_count)
end

def generate_json(batch)
arr = []
# Collection of Traject contexts to be sent to solr
class Batch
def initialize(contexts)
@contexts = contexts
end

batch.each do |c|
if c.skip?
id = Array(c.output_hash['id']).first
arr << "delete: #{JSON.generate(id)}" if id
else
arr << "add: #{JSON.generate(doc: c.output_hash)}"
end
def empty?
@contexts.empty?
end

def each(&)
@contexts.each(&)
end

'{' + arr.join(",\n") + '}'
# Array of [action, data] pairs, where action is :add or :delete
# and data is either the doc id or the full doc hash
def actions
@contexts.map do |c|
if c.skip?
id = Array(c.output_hash['id']).first
[:delete, id] if id
else
[:add, c.output_hash]
end
end.compact
end

# Make a JSON string for sending to solr /update API
def generate_json
actions.map do |action, data|
case action
when :delete
"delete: #{JSON.generate(data)}"
when :add
"add: #{JSON.generate(doc: data)}"
end
end.join(",\n").prepend('{').concat('}')
end
end
end

0 comments on commit c3bcde5

Please sign in to comment.