Skip to content

Commit

Permalink
Rubocop
Browse files Browse the repository at this point in the history
  • Loading branch information
isaacseymour committed Jan 12, 2018
1 parent e352e87 commit c1593b9
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 72 deletions.
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# frozen_string_literal: true

source 'https://rubygems.org'

# Specify your gem's dependencies in es_index.gemspec
Expand Down
2 changes: 2 additions & 0 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# frozen_string_literal: true

require 'bundler/gem_tasks'
require 'rspec/core/rake_task'

Expand Down
1 change: 1 addition & 0 deletions bin/console
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

require 'bundler/setup'
require 'es_index'
Expand Down
2 changes: 2 additions & 0 deletions es_index.gemspec
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@

# frozen_string_literal: true

lib = File.expand_path('../lib', __FILE__)
$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)
require 'es_index/version'
Expand Down
2 changes: 2 additions & 0 deletions lib/es_index.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# frozen_string_literal: true

require 'es_index/version'
require 'es_index/config'
require 'es_index/indexer'
Expand Down
2 changes: 2 additions & 0 deletions lib/es_index/config.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# frozen_string_literal: true

module EsIndex
class Config
attr_reader :client, :data_source
Expand Down
48 changes: 26 additions & 22 deletions lib/es_index/index_manager.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# frozen_string_literal: true

module EsIndex
class IndexManager
def initialize(config)
Expand All @@ -7,12 +9,12 @@ def initialize(config)
def create_index(unique_name)
full_name = [config.read_alias, unique_name].join('_')

config.client.indices.create(
client.indices.create(
index: full_name,
body: config.index_definition
)

config.client.indices.put_alias(index: full_name, name: config.write_alias)
client.indices.put_alias(index: full_name, name: config.write_alias)
end

def populate_index(unique_name = nil, batch_size: 3000)
Expand All @@ -27,32 +29,33 @@ def populate_index(unique_name = nil, batch_size: 3000)
end
end

# rubocop:disable Metrics/AbcSize
def switch_read_index(new_name)
new_index = [config.read_alias, new_name].join('_')

old_index =
if config.client.indices.exists_alias?(name: config.read_alias)
config.client.indices.get_alias(name: config.read_alias).keys.first
if client.indices.exists_alias?(name: config.read_alias)
client.indices.get_alias(name: config.read_alias).keys.first
end

remove_action =
({ remove: { index: old_index, alias: config.read_alias } } if old_index)

config.client.indices.update_aliases(body: {
actions: [
remove_action,
{ add: { index: new_index, alias: config.read_alias } }
].compact
})
client.indices.update_aliases(body: {
actions: [
remove_action,
{ add: { index: new_index, alias: config.read_alias } }
].compact
})
end

def stop_dual_writes
logger.info('Stopping dual writes - making index read and write aliases the same')
current_index = config.client.indices.get_alias(name: config.read_alias).keys.first
current_index = client.indices.get_alias(name: config.read_alias).keys.first

logger.info("Currently used index is #{current_index}")

other_write_indices = config.client.indices.get_alias(name: config.write_alias).keys
other_write_indices = client.indices.get_alias(name: config.write_alias).keys
.reject { |name| name == current_index }

if other_write_indices.none?
Expand All @@ -65,34 +68,35 @@ def stop_dual_writes
actions = other_write_indices.map do |index|
{ remove: { index: index, alias: config.write_alias } }
end
config.client.indices.update_aliases(body: { actions: actions })
client.indices.update_aliases(body: { actions: actions })
end

def cleanup_old_indices
logger.info('Cleaning up old indices in Elasticsearch')
current_index = config.client.indices.get_alias(name: config.read_alias).keys.first
current_index = client.indices.get_alias(name: config.read_alias).keys.first

logger.info("Currently used index is #{current_index}")

indices_to_delete = config.client
.cat
.indices(format: :json)
.map { |index| index['index'] }
.select { |name| name.start_with?(config.read_alias) }
.reject { |name| name == current_index }
indices_to_delete = client
.cat
.indices(format: :json)
.map { |index| index['index'] }
.select { |name| name.start_with?(config.read_alias) }
.reject { |name| name == current_index }

if indices_to_delete.none?
logger.info('Nothing to do: no old indices')
return
end
logger.info("Deleting #{indices_to_delete.count} old indices: #{indices_to_delete.join(', ')}")
config.client.indices.delete(index: indices_to_delete)
client.indices.delete(index: indices_to_delete)
end
# rubocop:enable Metrics/AbcSize

private

attr_reader :config
delegate :logger, to: :config
delegate :client, :logger, to: :config

def indexer
@indexer ||= Indexer.new(config)
Expand Down
98 changes: 49 additions & 49 deletions lib/es_index/indexer.rb
Original file line number Diff line number Diff line change
@@ -1,52 +1,33 @@
# frozen_string_literal: true

module EsIndex
class Indexer
def initialize(config)
@config = config
end

def index_batch(batch, index_name: nil)
index_name ||= config.write_alias
indices = Array(index_name || write_indices)
logger.info("ES: Indexing #{config.type} record")

version = current_version
commands = batch.map do |record|
{
index: {
_index: index_name,
_type: config.type,
_id: record.id,
_version: version,
_version_type: :external,
data: config.index_data(record)
}
}
end

result = config.client.bulk(body: commands)
return result unless result['errors']
result['items'].map { |item| item['error'] }.compact
execute_bulk(
indices.flat_map do |index|
batch.map do |record|
index_command(index: index, version: version, record: record)
end
end
)
end

def index_record(record)
version = current_version
indices = config.client.indices.get_alias(name: config.write_alias).keys

commands = indices.map do |index|
{
index: {
_index: index,
_type: config.type,
_version: version,
_version_type: :external,
_id: record.id,
data: config.index_data(record)
}
}
end

result = config.client.bulk(body: commands)
return [] unless result['errors']
result['items'].map { |item| item['error'] }.compact
execute_bulk(
write_indices.map do |index|
index_command(index: index, version: version, record: record)
end
)
end

def delete_by_id(id)
Expand All @@ -65,24 +46,20 @@ def delete_by_ids(ids)
logger.info('ES: Deleting batch records')

indices = config.client.indices.get_alias(name: config.write_alias).keys
commands = []

indices.each do |index|
commands += ids.map do |id|
{
delete: {
_index: index,
_type: config.type,
_id: id
execute_bulk(
indices.flat_map do |index|
ids.map do |id|
{
delete: {
_index: index,
_type: config.type,
_id: id
}
}
}
end
end
end

result = config.client.bulk(body: commands)

return [] unless result['errors']
result['items'].map { |item| item['error'] }.compact
)
end

def delete_by_query(query)
Expand All @@ -99,5 +76,28 @@ def delete_by_query(query)
def current_version
config.data_source.connection.select_one('SELECT txid_current()').fetch('txid_current')
end

def write_indices
config.client.indices.get_alias(name: config.write_alias).keys
end

def index_command(index:, version:, record:)
{
index: {
_index: index,
_type: config.type,
_id: record.id,
_version: version,
_version_type: :external,
data: config.index_data(record)
}
}
end

def execute_bulk(commands)
result = config.client.bulk(body: commands)
return result unless result['errors']
result['items'].map { |item| item['error'] }.compact
end
end
end
4 changes: 3 additions & 1 deletion lib/es_index/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# frozen_string_literal: true

module EsIndex
VERSION = '0.1.0'.freeze
VERSION = '0.1.0'
end
2 changes: 2 additions & 0 deletions spec/es_index_spec.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# frozen_string_literal: true

RSpec.describe EsIndex do
it 'has a version number' do
expect(EsIndex::VERSION).not_to be nil
Expand Down
2 changes: 2 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# frozen_string_literal: true

require 'bundler/setup'
require 'es_index'

Expand Down

0 comments on commit c1593b9

Please sign in to comment.