Skip to content

Commit

Permalink
Updating to use avro_turf and configuration settings for Environment …
Browse files Browse the repository at this point in the history
…variables
  • Loading branch information
matthewrampey committed Dec 8, 2023
1 parent 332e639 commit c3bc5b3
Show file tree
Hide file tree
Showing 17 changed files with 85 additions and 169 deletions.
1 change: 1 addition & 0 deletions field_struct_avro_schema.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Gem::Specification.new do |spec|
spec.add_dependency 'activemodel'
spec.add_dependency 'avro', '~> 1.11.0'
spec.add_dependency 'avro-builder'
spec.add_dependency 'avro_turf'
spec.add_dependency 'excon'

spec.add_development_dependency 'bundler'
Expand Down
2 changes: 2 additions & 0 deletions lib/field_struct/avro_schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

require 'avro_acima'
require 'avro/builder'
require 'avro_turf'
require 'avro_turf/confluent_schema_registry'
require 'excon'
require 'field_struct'

Expand Down
38 changes: 38 additions & 0 deletions lib/field_struct/avro_schema/configuration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# frozen_string_literal: true

require 'uri'

module FieldStruct
module AvroSchema
class Configuration
attr_accessor :user_name, :password, :schema_registry_url, :automatic_schema_registration

def initialize
@user_name = env_or_default('KAFKA_SCHEMA_REGISTRY_USERNAME', nil)
@password = env_or_default('KAFKA_SCHEMA_REGISTRY_PASSWORD', nil)
@schema_registry_url = env_or_default('KAFKA_SCHEMA_REGISTRY_URL', 'http://localhost:8081')
@automatic_schema_registration = env_or_default('KAFKA_AUTO_REGISTER_SCHEMAS', 'false') == 'true'
end

def schema_registry_base_url
uri = URI(schema_registry_url)
uri.path = ''
uri.query = nil
uri.to_s
end

def schema_registry_path_prefix
uri = URI(schema_registry_url)
uri.path
end

private

def env_or_default(name, default_value)
value = ENV.fetch(name, nil)
value = default_value if value.blank?
value
end
end
end
end
20 changes: 16 additions & 4 deletions lib/field_struct/avro_schema/kafka.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# frozen_string_literal: true

require_relative 'kafka/in_memory_cache'
require_relative 'kafka/schema_registry'
require_relative 'kafka/cached_schema_registry'
require_relative 'kafka/schema_store'

Expand All @@ -13,6 +12,7 @@
require_relative 'kafka/coders/json_encoder'
require_relative 'kafka/coders/string_decoder'
require_relative 'kafka/coders/string_encoder'
require_relative 'configuration'

module FieldStruct
module AvroSchema
Expand All @@ -34,6 +34,10 @@ def logger=(value)
@logger = value
end

def configuration
@configuration ||= Configuration.new
end

def events
@events ||= {}
end
Expand All @@ -43,7 +47,12 @@ def schema_registry
end

def base_schema_registry
@base_schema_registry ||= SchemaRegistry.new registry_url, logger: logger
@base_schema_registry ||=
AvroTurf::ConfluentSchemaRegistry.new configuration.schema_registry_base_url,
user: configuration.user_name,
password: configuration.password,
path_prefix: configuration.schema_registry_path_prefix,
logger: logger
end

def registry_url
Expand Down Expand Up @@ -99,11 +108,14 @@ def register_event_schemas
def register_event_schema(klass)
return nil unless klass.publishable?

id = if ENV.fetch('KAFKA_AUTO_REGISTER', 'false') == 'true'
id = if configuration.automatic_schema_registration
schema_registry.register build_subject_name(klass), klass.schema
else
schema_registry.check build_subject_name(klass), klass.schema
data = schema_registry.check build_subject_name(klass), klass.schema
data.fetch('id')
end
raise StandardError, "Schema Not Found -- Schema Name: #{klass.default_schema_record_name}" if id.blank?

klass.schema_id id
klass
rescue StandardError => e
Expand Down
8 changes: 7 additions & 1 deletion lib/field_struct/avro_schema/kafka/cached_schema_registry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,16 @@ def mock!
def to_s
format '#<%s upstream=%s cache=%s>',
self.class.name,
@upstream.inspect,
upstream_inspect,
@cache.inspect
end

def upstream_inspect
format '#<%s url=%s>',
@upstream.class.name,
@upstream.instance_variable_get(:@url).inspect
end

alias inspect to_s
end
end
Expand Down
143 changes: 0 additions & 143 deletions lib/field_struct/avro_schema/kafka/schema_registry.rb

This file was deleted.

5 changes: 2 additions & 3 deletions spec/kafka_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,14 @@
it { expect(subject).to be_a FieldStruct::AvroSchema::Kafka::CachedSchemaRegistry }
it do
expect(subject.inspect).to eq '#<FieldStruct::AvroSchema::Kafka::CachedSchemaRegistry ' \
'upstream=#<FieldStruct::AvroSchema::Kafka::SchemaRegistry ' \
'upstream=#<AvroTurf::ConfluentSchemaRegistry ' \
'url=nil> cache=#<FieldStruct::AvroSchema::Kafka::InMemoryCache ' \
'schemas_by_id=0 ids_by_schema=0 schema_by_subject_version=0>>'
end
end
describe '.base_schema_registry' do
subject { described_class.base_schema_registry }
it { expect(subject).to be_a FieldStruct::AvroSchema::Kafka::SchemaRegistry }
it { expect(subject.inspect).to eq '#<FieldStruct::AvroSchema::Kafka::SchemaRegistry url=nil>' }
it { expect(subject).to be_a AvroTurf::ConfluentSchemaRegistry }
end
describe '.registry_url' do
subject { described_class.registry_url }
Expand Down
4 changes: 2 additions & 2 deletions spec/structs/company_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -503,11 +503,11 @@
it('to_hash') { compare instance.to_hash, company_attrs.deep_stringify_keys }
end
end
context 'registration', :vcr, :registers, :env_change do
let(:env) { { 'KAFKA_AUTO_REGISTER' => 'true' } }
context 'registration', :vcr, :registers do
let(:registration) { kafka.register_event_schema described_class }
it('Kafka has event registered') { expect(kafka.events[described_class.name]).to eq described_class }
it 'registers with schema_registry' do
expect(kafka.configuration).to receive(:automatic_schema_registration).and_return(true)
expect { registration }.to_not raise_error
expect(described_class.schema_id).to eq exp_schema_id
end
Expand Down
4 changes: 2 additions & 2 deletions spec/structs/custom_record_name_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,11 @@
it('to_hash') { compare instance.to_hash, person_attrs.deep_stringify_keys }
end
end
context 'registration', :env_change do
let(:env) { { 'KAFKA_AUTO_REGISTER' => 'true' } }
context 'registration' do
let(:registration) { kafka.register_event_schema described_class }
it('Kafka has event registered') { expect(kafka.events[described_class.name]).to eq described_class }
it 'registers with schema_registry', :vcr, :registers do
expect(kafka.configuration).to receive(:automatic_schema_registration).and_return(true)
expect { registration }.to_not raise_error
expect(described_class.schema_id).to eq exp_schema_id
end
Expand Down
4 changes: 2 additions & 2 deletions spec/structs/developer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,11 @@
it('to_hash') { compare instance.to_hash, developer_attrs.deep_stringify_keys }
end
end
context 'registration', :env_change do
let(:env) { { 'KAFKA_AUTO_REGISTER' => 'true' } }
context 'registration' do
let(:registration) { kafka.register_event_schema described_class }
it('Kafka has event registered') { expect(kafka.events[described_class.name]).to eq described_class }
it 'registers with schema_registry', :vcr, :registers do
expect(kafka.configuration).to receive(:automatic_schema_registration).and_return(true)
expect { registration }.to_not raise_error
expect(described_class.schema_id).to eq exp_schema_id
end
Expand Down
4 changes: 2 additions & 2 deletions spec/structs/employee_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,11 @@
it('to_hash') { compare instance.to_hash, employee_attrs.deep_stringify_keys }
end
end
context 'registration', :env_change do
let(:env) { { 'KAFKA_AUTO_REGISTER' => 'true' } }
context 'registration' do
let(:registration) { kafka.register_event_schema described_class }
it('Kafka has event registered') { expect(kafka.events[described_class.name]).to eq described_class }
it 'registers with schema_registry', :vcr, :registers do
expect(kafka.configuration).to receive(:automatic_schema_registration).and_return(true)
expect { registration }.to_not raise_error
expect(described_class.schema_id).to eq exp_schema_id
end
Expand Down
4 changes: 2 additions & 2 deletions spec/structs/friend_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,11 @@
it('to_hash') { compare instance.to_hash, friend_attrs.deep_stringify_keys }
end
end
context 'registration', :env_change do
let(:env) { { 'KAFKA_AUTO_REGISTER' => 'true' } }
context 'registration' do
let(:registration) { kafka.register_event_schema described_class }
it('Kafka has event registered') { expect(kafka.events[described_class.name]).to eq described_class }
it 'registers with schema_registry', :vcr, :registers do
expect(kafka.configuration).to receive(:automatic_schema_registration).and_return(true)
expect { registration }.to_not raise_error
expect(described_class.schema_id).to eq exp_schema_id
end
Expand Down
4 changes: 2 additions & 2 deletions spec/structs/person_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,11 @@
it('to_hash') { compare instance.to_hash, person_attrs.deep_stringify_keys }
end
end
context 'registration', :env_change do
let(:env) { { 'KAFKA_AUTO_REGISTER' => 'true' } }
context 'registration' do
let(:registration) { kafka.register_event_schema described_class }
it('Kafka has event registered') { expect(kafka.events[described_class.name]).to eq described_class }
it 'registers with schema_registry', :vcr, :registers do
expect(kafka.configuration).to receive(:automatic_schema_registration).and_return(true)
expect { registration }.to_not raise_error
expect(described_class.schema_id).to eq exp_schema_id
end
Expand Down
4 changes: 2 additions & 2 deletions spec/structs/runner_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,11 @@
it('to_hash') { compare instance.to_hash, runner_attrs.deep_stringify_keys }
end
end
context 'registration', :env_change do
let(:env) { { 'KAFKA_AUTO_REGISTER' => 'true' } }
context 'registration' do
let(:registration) { kafka.register_event_schema described_class }
it('Kafka has event registered') { expect(kafka.events[described_class.name]).to eq described_class }
it 'registers with schema_registry', :vcr, :registers do
expect(kafka.configuration).to receive(:automatic_schema_registration).and_return(true)
expect { registration }.to_not raise_error
expect(described_class.schema_id).to eq exp_schema_id
end
Expand Down
Loading

0 comments on commit c3bc5b3

Please sign in to comment.