From c1593b9991ae9b8cc65e9fd7e6cf12bbb46e0f6f Mon Sep 17 00:00:00 2001 From: Isaac Seymour Date: Fri, 12 Jan 2018 12:31:13 +0000 Subject: [PATCH] Rubocop --- Gemfile | 2 + Rakefile | 2 + bin/console | 1 + es_index.gemspec | 2 + lib/es_index.rb | 2 + lib/es_index/config.rb | 2 + lib/es_index/index_manager.rb | 48 +++++++++-------- lib/es_index/indexer.rb | 98 +++++++++++++++++------------------ lib/es_index/version.rb | 4 +- spec/es_index_spec.rb | 2 + spec/spec_helper.rb | 2 + 11 files changed, 93 insertions(+), 72 deletions(-) diff --git a/Gemfile b/Gemfile index 9347a6b..7f0cf34 100644 --- a/Gemfile +++ b/Gemfile @@ -1,3 +1,5 @@ +# frozen_string_literal: true + source 'https://rubygems.org' # Specify your gem's dependencies in es_index.gemspec diff --git a/Rakefile b/Rakefile index 4c774a2..82bb534 100644 --- a/Rakefile +++ b/Rakefile @@ -1,3 +1,5 @@ +# frozen_string_literal: true + require 'bundler/gem_tasks' require 'rspec/core/rake_task' diff --git a/bin/console b/bin/console index 6b2d112..680b653 100755 --- a/bin/console +++ b/bin/console @@ -1,4 +1,5 @@ #!/usr/bin/env ruby +# frozen_string_literal: true require 'bundler/setup' require 'es_index' diff --git a/es_index.gemspec b/es_index.gemspec index e41a79a..9a90a09 100644 --- a/es_index.gemspec +++ b/es_index.gemspec @@ -1,4 +1,6 @@ +# frozen_string_literal: true + lib = File.expand_path('../lib', __FILE__) $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) require 'es_index/version' diff --git a/lib/es_index.rb b/lib/es_index.rb index 9cf07f4..6679837 100644 --- a/lib/es_index.rb +++ b/lib/es_index.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + require 'es_index/version' require 'es_index/config' require 'es_index/indexer' diff --git a/lib/es_index/config.rb b/lib/es_index/config.rb index 09c10b9..b22c3ff 100644 --- a/lib/es_index/config.rb +++ b/lib/es_index/config.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + module EsIndex class Config attr_reader :client, :data_source diff --git a/lib/es_index/index_manager.rb b/lib/es_index/index_manager.rb index 73588dd..907a129 100644 --- a/lib/es_index/index_manager.rb +++ b/lib/es_index/index_manager.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + module EsIndex class IndexManager def initialize(config) @@ -7,12 +9,12 @@ def initialize(config) def create_index(unique_name) full_name = [config.read_alias, unique_name].join('_') - config.client.indices.create( + client.indices.create( index: full_name, body: config.index_definition ) - config.client.indices.put_alias(index: full_name, name: config.write_alias) + client.indices.put_alias(index: full_name, name: config.write_alias) end def populate_index(unique_name = nil, batch_size: 3000) @@ -27,32 +29,33 @@ def populate_index(unique_name = nil, batch_size: 3000) end end + # rubocop:disable Metrics/AbcSize def switch_read_index(new_name) new_index = [config.read_alias, new_name].join('_') old_index = - if config.client.indices.exists_alias?(name: config.read_alias) - config.client.indices.get_alias(name: config.read_alias).keys.first + if client.indices.exists_alias?(name: config.read_alias) + client.indices.get_alias(name: config.read_alias).keys.first end remove_action = ({ remove: { index: old_index, alias: config.read_alias } } if old_index) - config.client.indices.update_aliases(body: { - actions: [ - remove_action, - { add: { index: new_index, alias: config.read_alias } } - ].compact - }) + client.indices.update_aliases(body: { + actions: [ + remove_action, + { add: { index: new_index, alias: config.read_alias } } + ].compact + }) end def stop_dual_writes logger.info('Stopping dual writes - making index read and write aliases the same') - current_index = config.client.indices.get_alias(name: config.read_alias).keys.first + current_index = client.indices.get_alias(name: config.read_alias).keys.first logger.info("Currently used index is #{current_index}") - other_write_indices = config.client.indices.get_alias(name: config.write_alias).keys + other_write_indices = client.indices.get_alias(name: config.write_alias).keys .reject { |name| name == current_index } if other_write_indices.none? @@ -65,34 +68,35 @@ def stop_dual_writes actions = other_write_indices.map do |index| { remove: { index: index, alias: config.write_alias } } end - config.client.indices.update_aliases(body: { actions: actions }) + client.indices.update_aliases(body: { actions: actions }) end def cleanup_old_indices logger.info('Cleaning up old indices in Elasticsearch') - current_index = config.client.indices.get_alias(name: config.read_alias).keys.first + current_index = client.indices.get_alias(name: config.read_alias).keys.first logger.info("Currently used index is #{current_index}") - indices_to_delete = config.client - .cat - .indices(format: :json) - .map { |index| index['index'] } - .select { |name| name.start_with?(config.read_alias) } - .reject { |name| name == current_index } + indices_to_delete = client + .cat + .indices(format: :json) + .map { |index| index['index'] } + .select { |name| name.start_with?(config.read_alias) } + .reject { |name| name == current_index } if indices_to_delete.none? logger.info('Nothing to do: no old indices') return end logger.info("Deleting #{indices_to_delete.count} old indices: #{indices_to_delete.join(', ')}") - config.client.indices.delete(index: indices_to_delete) + client.indices.delete(index: indices_to_delete) end + # rubocop:enable Metrics/AbcSize private attr_reader :config - delegate :logger, to: :config + delegate :client, :logger, to: :config def indexer @indexer ||= Indexer.new(config) diff --git a/lib/es_index/indexer.rb b/lib/es_index/indexer.rb index 2dc2929..bd9a2d7 100644 --- a/lib/es_index/indexer.rb +++ b/lib/es_index/indexer.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + module EsIndex class Indexer def initialize(config) @@ -5,48 +7,27 @@ def initialize(config) end def index_batch(batch, index_name: nil) - index_name ||= config.write_alias + indices = Array(index_name || write_indices) logger.info("ES: Indexing #{config.type} record") version = current_version - commands = batch.map do |record| - { - index: { - _index: index_name, - _type: config.type, - _id: record.id, - _version: version, - _version_type: :external, - data: config.index_data(record) - } - } - end - - result = config.client.bulk(body: commands) - return result unless result['errors'] - result['items'].map { |item| item['error'] }.compact + execute_bulk( + indices.flat_map do |index| + batch.map do |record| + index_command(index: index, version: version, record: record) + end + end + ) end def index_record(record) version = current_version - indices = config.client.indices.get_alias(name: config.write_alias).keys - - commands = indices.map do |index| - { - index: { - _index: index, - _type: config.type, - _version: version, - _version_type: :external, - _id: record.id, - data: config.index_data(record) - } - } - end - result = config.client.bulk(body: commands) - return [] unless result['errors'] - result['items'].map { |item| item['error'] }.compact + execute_bulk( + write_indices.map do |index| + index_command(index: index, version: version, record: record) + end + ) end def delete_by_id(id) @@ -65,24 +46,20 @@ def delete_by_ids(ids) logger.info('ES: Deleting batch records') indices = config.client.indices.get_alias(name: config.write_alias).keys - commands = [] - indices.each do |index| - commands += ids.map do |id| - { - delete: { - _index: index, - _type: config.type, - _id: id + execute_bulk( + indices.flat_map do |index| + ids.map do |id| + { + delete: { + _index: index, + _type: config.type, + _id: id + } } - } + end end - end - - result = config.client.bulk(body: commands) - - return [] unless result['errors'] - result['items'].map { |item| item['error'] }.compact + ) end def delete_by_query(query) @@ -99,5 +76,28 @@ def delete_by_query(query) def current_version config.data_source.connection.select_one('SELECT txid_current()').fetch('txid_current') end + + def write_indices + config.client.indices.get_alias(name: config.write_alias).keys + end + + def index_command(index:, version:, record:) + { + index: { + _index: index, + _type: config.type, + _id: record.id, + _version: version, + _version_type: :external, + data: config.index_data(record) + } + } + end + + def execute_bulk(commands) + result = config.client.bulk(body: commands) + return result unless result['errors'] + result['items'].map { |item| item['error'] }.compact + end end end diff --git a/lib/es_index/version.rb b/lib/es_index/version.rb index 2d22bf7..c9958a1 100644 --- a/lib/es_index/version.rb +++ b/lib/es_index/version.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + module EsIndex - VERSION = '0.1.0'.freeze + VERSION = '0.1.0' end diff --git a/spec/es_index_spec.rb b/spec/es_index_spec.rb index 946dd8b..5223ce0 100644 --- a/spec/es_index_spec.rb +++ b/spec/es_index_spec.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + RSpec.describe EsIndex do it 'has a version number' do expect(EsIndex::VERSION).not_to be nil diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 3b53799..b135232 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + require 'bundler/setup' require 'es_index'