diff --git a/field_struct_avro_schema.gemspec b/field_struct_avro_schema.gemspec index e4fbbaa..5026624 100644 --- a/field_struct_avro_schema.gemspec +++ b/field_struct_avro_schema.gemspec @@ -39,6 +39,7 @@ Gem::Specification.new do |spec| spec.add_dependency 'activemodel' spec.add_dependency 'avro', '~> 1.11.0' spec.add_dependency 'avro-builder' + spec.add_dependency 'avro_turf' spec.add_dependency 'excon' spec.add_development_dependency 'bundler' diff --git a/lib/field_struct/avro_schema.rb b/lib/field_struct/avro_schema.rb index 33f6b5f..5b6c1b2 100644 --- a/lib/field_struct/avro_schema.rb +++ b/lib/field_struct/avro_schema.rb @@ -7,6 +7,8 @@ require 'avro_acima' require 'avro/builder' +require 'avro_turf' +require 'avro_turf/confluent_schema_registry' require 'excon' require 'field_struct' diff --git a/lib/field_struct/avro_schema/configuration.rb b/lib/field_struct/avro_schema/configuration.rb new file mode 100644 index 0000000..0ae2794 --- /dev/null +++ b/lib/field_struct/avro_schema/configuration.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +require 'uri' + +module FieldStruct + module AvroSchema + class Configuration + attr_accessor :user_name, :password, :schema_registry_url, :automatic_schema_registration + + def initialize + @user_name = env_or_default('KAFKA_SCHEMA_REGISTRY_USERNAME', nil) + @password = env_or_default('KAFKA_SCHEMA_REGISTRY_PASSWORD', nil) + @schema_registry_url = env_or_default('KAFKA_SCHEMA_REGISTRY_URL', 'http://localhost:8081') + @automatic_schema_registration = env_or_default('KAFKA_AUTO_REGISTER_SCHEMAS', 'false') == 'true' + end + + def schema_registry_base_url + uri = URI(schema_registry_url) + uri.path = '' + uri.query = nil + uri.to_s + end + + def schema_registry_path_prefix + uri = URI(schema_registry_url) + uri.path + end + + private + + def env_or_default(name, default_value) + value = ENV.fetch(name, nil) + value = default_value if value.blank? + value + end + end + end +end diff --git a/lib/field_struct/avro_schema/kafka.rb b/lib/field_struct/avro_schema/kafka.rb index 8d415f4..11d753c 100644 --- a/lib/field_struct/avro_schema/kafka.rb +++ b/lib/field_struct/avro_schema/kafka.rb @@ -1,7 +1,6 @@ # frozen_string_literal: true require_relative 'kafka/in_memory_cache' -require_relative 'kafka/schema_registry' require_relative 'kafka/cached_schema_registry' require_relative 'kafka/schema_store' @@ -13,6 +12,7 @@ require_relative 'kafka/coders/json_encoder' require_relative 'kafka/coders/string_decoder' require_relative 'kafka/coders/string_encoder' +require_relative 'configuration' module FieldStruct module AvroSchema @@ -34,6 +34,10 @@ def logger=(value) @logger = value end + def configuration + @configuration ||= Configuration.new + end + def events @events ||= {} end @@ -43,7 +47,12 @@ def schema_registry end def base_schema_registry - @base_schema_registry ||= SchemaRegistry.new registry_url, logger: logger + @base_schema_registry ||= + AvroTurf::ConfluentSchemaRegistry.new configuration.schema_registry_base_url, + user: configuration.user_name, + password: configuration.password, + path_prefix: configuration.schema_registry_path_prefix, + logger: logger end def registry_url @@ -99,11 +108,14 @@ def register_event_schemas def register_event_schema(klass) return nil unless klass.publishable? - id = if ENV.fetch('KAFKA_AUTO_REGISTER', 'false') == 'true' + id = if configuration.automatic_schema_registration schema_registry.register build_subject_name(klass), klass.schema else - schema_registry.check build_subject_name(klass), klass.schema + data = schema_registry.check build_subject_name(klass), klass.schema + data.fetch('id') end + raise StandardError, "Schema Not Found -- Schema Name: #{klass.default_schema_record_name}" if id.blank? + klass.schema_id id klass rescue StandardError => e diff --git a/lib/field_struct/avro_schema/kafka/cached_schema_registry.rb b/lib/field_struct/avro_schema/kafka/cached_schema_registry.rb index b82fe6c..b0f087d 100644 --- a/lib/field_struct/avro_schema/kafka/cached_schema_registry.rb +++ b/lib/field_struct/avro_schema/kafka/cached_schema_registry.rb @@ -71,10 +71,16 @@ def mock! def to_s format '#<%s upstream=%s cache=%s>', self.class.name, - @upstream.inspect, + upstream_inspect, @cache.inspect end + def upstream_inspect + format '#<%s url=%s>', + @upstream.class.name, + @upstream.instance_variable_get(:@url).inspect + end + alias inspect to_s end end diff --git a/lib/field_struct/avro_schema/kafka/schema_registry.rb b/lib/field_struct/avro_schema/kafka/schema_registry.rb deleted file mode 100644 index 9297bb0..0000000 --- a/lib/field_struct/avro_schema/kafka/schema_registry.rb +++ /dev/null @@ -1,143 +0,0 @@ -# frozen_string_literal: true - -# -# Shamelessly taken from: -# https://github.com/dasch/avro_turf/blob/master/lib/avro_turf/confluent_schema_registry.rb -# - -module FieldStruct - module AvroSchema - module Kafka - class SchemaRegistry - CONTENT_TYPE = 'application/vnd.schemaregistry.v1+json' - - # rubocop:disable Metrics/ParameterLists - def initialize( - url, - logger: AvroSchema.logger, - proxy: nil, - client_cert: nil, - client_key: nil, - client_key_pass: nil, - client_cert_data: nil, - client_key_data: nil - ) - @logger = logger - headers = { - 'Content-Type' => CONTENT_TYPE - } - headers[:proxy] = proxy if proxy&.present? - @connection = Excon.new( - url, - headers: headers, - client_cert: client_cert, - client_key: client_key, - client_key_pass: client_key_pass, - client_cert_data: client_cert_data, - client_key_data: client_key_data - ) - end - # rubocop:enable Metrics/ParameterLists - - def fetch(id) - @logger.info "Fetching schema with id #{id}" - data = get "/schemas/ids/#{id}" - data.fetch 'schema' - end - - def register(subject, schema) - data = post "/subjects/#{subject}/versions", body: { schema: schema.to_s }.to_json - id = data.fetch('id') - @logger.info "Registered schema for subject [#{subject}] id = #{id}" - id - end - - # List all subjects - def subjects - get '/subjects' - end - - # List all versions for a subject - def subject_versions(subject) - get "/subjects/#{subject}/versions" - end - - # Get a specific version for a subject - def subject_version(subject, version = 'latest') - get "/subjects/#{subject}/versions/#{version}" - end - - # Check if a schema exists. Returns nil if not found. - def check(subject, schema) - response = post "/subjects/#{subject}", - expects: [200, 404], - body: { schema: schema.to_s } - data = response.to_json - data unless data.key?('error_code') - end - - # Check if a schema is compatible with the stored version. - # Returns: - # - true if compatible - # - nil if the subject or version does not exist - # - false if incompatible - # http://docs.confluent.io/3.1.2/schema-registry/docs/api.html#compatibility - def compatible?(subject, schema, version = 'latest') - response = post "/compatibility/subjects/#{subject}/versions/#{version}", - expects: [200, 404], - body: { schema: schema.to_s } - data = response.to_json - data.fetch('is_compatible', false) unless data.key?('error_code') - end - - # Get global config - def global_config - get '/config' - end - - # Update global config - def update_global_config(config) - put '/config', { body: config.to_json } - end - - # Get config for subject - def subject_config(subject) - get "/config/#{subject}" - end - - # Update config for subject - def update_subject_config(subject, config) - put "/config/#{subject}", { body: config.to_json } - end - - def to_s - format '#<%s url=%s>', - self.class.name, - @url.inspect - end - - alias inspect to_s - - private - - def get(path, **options) - request path, method: :get, **options - end - - def put(path, **options) - request path, method: :put, **options - end - - def post(path, **options) - request path, method: :post, **options - end - - def request(path, **options) - options = { expects: 200, path: path }.merge! options - response = @connection.request options - JSON.parse response.body - end - end - end - end -end diff --git a/spec/kafka_spec.rb b/spec/kafka_spec.rb index db1c6b9..7adb695 100644 --- a/spec/kafka_spec.rb +++ b/spec/kafka_spec.rb @@ -45,15 +45,14 @@ it { expect(subject).to be_a FieldStruct::AvroSchema::Kafka::CachedSchemaRegistry } it do expect(subject.inspect).to eq '# cache=#>' end end describe '.base_schema_registry' do subject { described_class.base_schema_registry } - it { expect(subject).to be_a FieldStruct::AvroSchema::Kafka::SchemaRegistry } - it { expect(subject.inspect).to eq '#' } + it { expect(subject).to be_a AvroTurf::ConfluentSchemaRegistry } end describe '.registry_url' do subject { described_class.registry_url } diff --git a/spec/structs/company_spec.rb b/spec/structs/company_spec.rb index de21f06..ad84fed 100644 --- a/spec/structs/company_spec.rb +++ b/spec/structs/company_spec.rb @@ -503,11 +503,11 @@ it('to_hash') { compare instance.to_hash, company_attrs.deep_stringify_keys } end end - context 'registration', :vcr, :registers, :env_change do - let(:env) { { 'KAFKA_AUTO_REGISTER' => 'true' } } + context 'registration', :vcr, :registers do let(:registration) { kafka.register_event_schema described_class } it('Kafka has event registered') { expect(kafka.events[described_class.name]).to eq described_class } it 'registers with schema_registry' do + expect(kafka.configuration).to receive(:automatic_schema_registration).and_return(true) expect { registration }.to_not raise_error expect(described_class.schema_id).to eq exp_schema_id end diff --git a/spec/structs/custom_record_name_spec.rb b/spec/structs/custom_record_name_spec.rb index be39db4..f563d19 100644 --- a/spec/structs/custom_record_name_spec.rb +++ b/spec/structs/custom_record_name_spec.rb @@ -189,11 +189,11 @@ it('to_hash') { compare instance.to_hash, person_attrs.deep_stringify_keys } end end - context 'registration', :env_change do - let(:env) { { 'KAFKA_AUTO_REGISTER' => 'true' } } + context 'registration' do let(:registration) { kafka.register_event_schema described_class } it('Kafka has event registered') { expect(kafka.events[described_class.name]).to eq described_class } it 'registers with schema_registry', :vcr, :registers do + expect(kafka.configuration).to receive(:automatic_schema_registration).and_return(true) expect { registration }.to_not raise_error expect(described_class.schema_id).to eq exp_schema_id end diff --git a/spec/structs/developer_spec.rb b/spec/structs/developer_spec.rb index 902788f..8fefb18 100644 --- a/spec/structs/developer_spec.rb +++ b/spec/structs/developer_spec.rb @@ -217,11 +217,11 @@ it('to_hash') { compare instance.to_hash, developer_attrs.deep_stringify_keys } end end - context 'registration', :env_change do - let(:env) { { 'KAFKA_AUTO_REGISTER' => 'true' } } + context 'registration' do let(:registration) { kafka.register_event_schema described_class } it('Kafka has event registered') { expect(kafka.events[described_class.name]).to eq described_class } it 'registers with schema_registry', :vcr, :registers do + expect(kafka.configuration).to receive(:automatic_schema_registration).and_return(true) expect { registration }.to_not raise_error expect(described_class.schema_id).to eq exp_schema_id end diff --git a/spec/structs/employee_spec.rb b/spec/structs/employee_spec.rb index 5c965a7..f685e5a 100644 --- a/spec/structs/employee_spec.rb +++ b/spec/structs/employee_spec.rb @@ -205,11 +205,11 @@ it('to_hash') { compare instance.to_hash, employee_attrs.deep_stringify_keys } end end - context 'registration', :env_change do - let(:env) { { 'KAFKA_AUTO_REGISTER' => 'true' } } + context 'registration' do let(:registration) { kafka.register_event_schema described_class } it('Kafka has event registered') { expect(kafka.events[described_class.name]).to eq described_class } it 'registers with schema_registry', :vcr, :registers do + expect(kafka.configuration).to receive(:automatic_schema_registration).and_return(true) expect { registration }.to_not raise_error expect(described_class.schema_id).to eq exp_schema_id end diff --git a/spec/structs/friend_spec.rb b/spec/structs/friend_spec.rb index ae6e3f0..aefde60 100644 --- a/spec/structs/friend_spec.rb +++ b/spec/structs/friend_spec.rb @@ -247,11 +247,11 @@ it('to_hash') { compare instance.to_hash, friend_attrs.deep_stringify_keys } end end - context 'registration', :env_change do - let(:env) { { 'KAFKA_AUTO_REGISTER' => 'true' } } + context 'registration' do let(:registration) { kafka.register_event_schema described_class } it('Kafka has event registered') { expect(kafka.events[described_class.name]).to eq described_class } it 'registers with schema_registry', :vcr, :registers do + expect(kafka.configuration).to receive(:automatic_schema_registration).and_return(true) expect { registration }.to_not raise_error expect(described_class.schema_id).to eq exp_schema_id end diff --git a/spec/structs/person_spec.rb b/spec/structs/person_spec.rb index 7da2ba1..fc298e2 100644 --- a/spec/structs/person_spec.rb +++ b/spec/structs/person_spec.rb @@ -189,11 +189,11 @@ it('to_hash') { compare instance.to_hash, person_attrs.deep_stringify_keys } end end - context 'registration', :env_change do - let(:env) { { 'KAFKA_AUTO_REGISTER' => 'true' } } + context 'registration' do let(:registration) { kafka.register_event_schema described_class } it('Kafka has event registered') { expect(kafka.events[described_class.name]).to eq described_class } it 'registers with schema_registry', :vcr, :registers do + expect(kafka.configuration).to receive(:automatic_schema_registration).and_return(true) expect { registration }.to_not raise_error expect(described_class.schema_id).to eq exp_schema_id end diff --git a/spec/structs/runner_spec.rb b/spec/structs/runner_spec.rb index c133a1f..2c0d47d 100644 --- a/spec/structs/runner_spec.rb +++ b/spec/structs/runner_spec.rb @@ -262,11 +262,11 @@ it('to_hash') { compare instance.to_hash, runner_attrs.deep_stringify_keys } end end - context 'registration', :env_change do - let(:env) { { 'KAFKA_AUTO_REGISTER' => 'true' } } + context 'registration' do let(:registration) { kafka.register_event_schema described_class } it('Kafka has event registered') { expect(kafka.events[described_class.name]).to eq described_class } it 'registers with schema_registry', :vcr, :registers do + expect(kafka.configuration).to receive(:automatic_schema_registration).and_return(true) expect { registration }.to_not raise_error expect(described_class.schema_id).to eq exp_schema_id end diff --git a/spec/structs/team_spec.rb b/spec/structs/team_spec.rb index 31f40d2..6074d87 100644 --- a/spec/structs/team_spec.rb +++ b/spec/structs/team_spec.rb @@ -387,11 +387,11 @@ it('to_hash') { compare instance.to_hash, team_attrs.deep_stringify_keys } end end - context 'registration', :env_change do - let(:env) { { 'KAFKA_AUTO_REGISTER' => 'true' } } + context 'registration' do let(:registration) { kafka.register_event_schema described_class } it('Kafka has event registered') { expect(kafka.events[described_class.name]).to eq described_class } it 'registers with schema_registry', :vcr, :registers do + expect(kafka.configuration).to receive(:automatic_schema_registration).and_return(true) expect { registration }.to_not raise_error expect(described_class.schema_id).to eq exp_schema_id end diff --git a/spec/structs/user_spec.rb b/spec/structs/user_spec.rb index f5fc884..f99d64f 100644 --- a/spec/structs/user_spec.rb +++ b/spec/structs/user_spec.rb @@ -322,11 +322,11 @@ it('#schema_id') { expect(instance.schema_id).to be_nil } it('attributes') { compare instance.attributes, user_attrs.stringify_keys } end - context 'registration', :env_change do - let(:env) { { 'KAFKA_AUTO_REGISTER' => 'true' } } + context 'registration' do let(:registration) { kafka.register_event_schema described_class } it('Kafka has event registered') { expect(kafka.events[described_class.name]).to eq described_class } it 'registers with schema_registry', :vcr, :registers do + expect(kafka.configuration).to receive(:automatic_schema_registration).and_return(true) expect { registration }.to_not raise_error expect(described_class.schema_id).to eq exp_schema_id end diff --git a/spec/support/env.rb b/spec/support/env.rb index 870546c..a816a35 100644 --- a/spec/support/env.rb +++ b/spec/support/env.rb @@ -64,4 +64,5 @@ def change_argv_set(*entries) RSpec.configure do |config| config.include EnvHelpers config.around(:example, env_change: true) { |example| change_env_set(env) { example.run } } + config.around(:example, env_set: true) { |example| change_env(env, val) { example.run } } end