diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b22c6f7..597adf95 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## UNRELEASED +- Added schema backends, which should simplify Avro encoding and make it + more flexible for unit tests and local development. +- BREAKING CHANGE: Deimos no longer comes with `avro_turf` as a dependency. + You will need to include it if you are Avro-encoding or decoding your + messages. + - Add `:test` producer backend which replaces the existing TestHelpers functionality of writing messages to an in-memory hash. diff --git a/Gemfile.lock b/Gemfile.lock index 7164adcd..48849a8c 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -2,8 +2,6 @@ PATH remote: . specs: deimos-ruby (1.4.0.pre.beta7) - avro-patches (~> 0.3) - avro_turf (~> 0.8) phobos (~> 1.8.2.pre.beta2) ruby-kafka (~> 0.7) @@ -55,10 +53,8 @@ GEM tzinfo (~> 1.1) arel (9.0.0) ast (2.4.0) - avro (1.8.2) + avro (1.9.1) multi_json - avro-patches (0.4.1) - avro (= 1.8.2) avro_turf (0.11.0) avro (>= 1.7.7, < 1.10) excon (~> 0.45) @@ -74,7 +70,7 @@ GEM digest-crc (0.4.1) dogstatsd-ruby (4.5.0) erubi (1.9.0) - excon (0.71.0) + excon (0.71.1) exponential-backoff (0.0.4) ffi (1.11.3) formatador (0.2.5) @@ -228,6 +224,8 @@ PLATFORMS DEPENDENCIES activerecord (~> 5.2) activerecord-import + avro (~> 1.9) + avro_turf (~> 0.8) bundler (~> 1) ddtrace (~> 0.11) deimos-ruby! diff --git a/README.md b/README.md index 87474662..bd9022e0 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@

-A Ruby framework for marrying Kafka, Avro, and/or ActiveRecord and provide +A Ruby framework for marrying Kafka, a schema definition like Avro, and/or ActiveRecord and provide a useful toolbox of goodies for Ruby-based Kafka development. Built on Phobos and hence Ruby-Kafka. @@ -14,6 +14,7 @@ Built on Phobos and hence Ruby-Kafka. * [Installation](#installation) * [Versioning](#versioning) * [Configuration](#configuration) + * [Schemas](#schemas) * [Producers](#producers) * [Auto-added Fields](#auto-added-fields) * [Coerced Values](#coerced-values) @@ -60,6 +61,27 @@ gem 'deimos-ruby', '~> 1.1' For a full configuration reference, please see [the configuration docs ](docs/CONFIGURATION.md). +# Schemas + +Deimos was originally written only supporting Avro encoding via a schema registry. +This has since been expanded to a plugin architecture allowing messages to be +encoded and decoded via any schema specification you wish. + +Currently we have the following possible schema backends: +* Avro Local (use pure Avro) +* Avro Schema Registry (use the Confluent Schema Registry) +* Avro Validation (validate using an Avro schema but leave decoded - this is useful + for unit testing and development) +* Mock (no actual encoding/decoding). + +Note that to use Avro-encoding, you must include the [avro_turf](https://github.com/dasch/avro_turf) gem in your +Gemfile. + +Other possible schemas could include [Protobuf](https://developers.google.com/protocol-buffers), [JSONSchema](https://json-schema.org/), etc. Feel free to +contribute! + +To create a new schema backend, please see the existing examples [here](lib/deimos/schema_backends). + # Producers Producers will look like this: @@ -137,7 +159,7 @@ produced by Phobos and RubyKafka): * topic * exception_object * payloads - the unencoded payloads -* `encode_messages` - sent when messages are being Avro-encoded. +* `encode_messages` - sent when messages are being schema-encoded. * producer - the class that produced the message * topic * payloads - the unencoded payloads @@ -165,8 +187,8 @@ Similarly: ### Kafka Message Keys Topics representing events rather than domain data don't need keys. However, -best practice for domain messages is to Avro-encode message keys -with a separate Avro schema. +best practice for domain messages is to schema-encode message keys +with a separate schema. This enforced by requiring producers to define a `key_config` directive. If any message comes in with a key, the producer will error out if `key_config` is @@ -179,7 +201,7 @@ There are three possible configurations to use: all your messages in a topic need to have a key, or they all need to have no key. This is a good choice for events that aren't keyed - you can still set a partition key. -* `key_config plain: true` - this indicates that you are not using an Avro-encoded +* `key_config plain: true` - this indicates that you are not using an encoded key. Use this for legacy topics - new topics should not use this setting. * `key_config schema: 'MyKeySchema-key'` - this tells the producer to look for an existing key schema named `MyKeySchema-key` in the schema registry and to @@ -234,8 +256,8 @@ like this: ``` If you publish a payload `{ "test_id" => "123", "some_int" => 123 }`, this -will be turned into a key that looks like `{ "test_id" => "123"}` and encoded -via Avro before being sent to Kafka. +will be turned into a key that looks like `{ "test_id" => "123"}` and schema-encoded +before being sent to Kafka. If you are using `plain` or `schema` as your config, you will need to have a special `payload_key` key to your payload hash. This will be extracted and @@ -261,7 +283,7 @@ class MyConsumer < Deimos::Consumer def consume(payload, metadata) # Same method as Phobos consumers. - # payload is an Avro-decoded hash. + # payload is an schema-decoded hash. # metadata is a hash that contains information like :key and :topic. # In general, your key should be included in the payload itself. However, # if you need to access it separately from the payload, you can use @@ -311,7 +333,7 @@ this sample: class MyBatchConsumer < Deimos::BatchConsumer def consume_batch(payloads, metadata) - # payloads is an array of Avro-decoded hashes. + # payloads is an array of schema-decoded hashes. # metadata is a hash that contains information like :keys and :topic. # Keys are automatically decoded and available as an array with # the same cardinality as the payloads. If you need to iterate @@ -604,7 +626,7 @@ Also see [deimos.rb](lib/deimos.rb) under `Configure metrics` to see how the met Deimos also includes some tracing for kafka consumers. It ships with DataDog support, but you can add custom tracing providers as well. -Trace spans are used for when incoming messages are avro decoded, and a +Trace spans are used for when incoming messages are schema-decoded, and a separate span for message consume logic. ### Configuring Tracing Providers @@ -749,7 +771,7 @@ be } ``` -Both payload and key will be Avro-decoded as necessary according to the +Both payload and key will be schema-decoded as necessary according to the key config. You can also just pass an existing producer or consumer class into the method, diff --git a/deimos-ruby.gemspec b/deimos-ruby.gemspec index 11962c8a..208043cf 100644 --- a/deimos-ruby.gemspec +++ b/deimos-ruby.gemspec @@ -18,13 +18,13 @@ Gem::Specification.new do |spec| spec.test_files = spec.files.grep(%r{^(test|spec|features)/}) spec.require_paths = ['lib'] - spec.add_runtime_dependency('avro-patches', '~> 0.3') - spec.add_runtime_dependency('avro_turf', '~> 0.8') spec.add_runtime_dependency('phobos', '~> 1.8.2.pre.beta2') spec.add_runtime_dependency('ruby-kafka', '~> 0.7') spec.add_development_dependency('activerecord', '~> 5.2') spec.add_development_dependency('activerecord-import') + spec.add_development_dependency('avro', '~> 1.9') + spec.add_development_dependency('avro_turf', '~> 0.8') spec.add_development_dependency('bundler', '~> 1') spec.add_development_dependency('ddtrace', '~> 0.11') spec.add_development_dependency('dogstatsd-ruby', '~> 4.2') diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md index d55905a9..93e1aa58 100644 --- a/docs/CONFIGURATION.md +++ b/docs/CONFIGURATION.md @@ -144,6 +144,7 @@ producers.backend|`:kafka_async`|Currently can be set to `:db`, `:kafka`, or `:k Config name|Default|Description -----------|-------|----------- +schema.backend|`:mock`|Backend representing the schema encoder/decoder. You can see a full list [here](../lib/deimos/schema_backends). schema.registry_url|`http://localhost:8081`|URL of the Confluent schema registry. schema.path|nil|Local path to find your schemas. diff --git a/docs/DATABASE_BACKEND.md b/docs/DATABASE_BACKEND.md index ae150432..b642fd2b 100644 --- a/docs/DATABASE_BACKEND.md +++ b/docs/DATABASE_BACKEND.md @@ -77,7 +77,7 @@ The database backend consists of three tables: * `kafka_messages` - this keeps track of the messages that were "published", including the payload, topic, key and partition key. These messages - are *raw data* - all processing, including Avro encoding, must happen + are *raw data* - all processing, including schema-encoding, must happen upstream before they are inserted. * `kafka_topic_info` - this table is essentially a lock table used to ensure that only one producer thread is ever "working" on a topic at a time. diff --git a/lib/deimos.rb b/lib/deimos.rb index 192b711c..38105428 100644 --- a/lib/deimos.rb +++ b/lib/deimos.rb @@ -1,12 +1,8 @@ # frozen_string_literal: true -require 'avro-patches' -require 'avro_turf' require 'phobos' require 'deimos/version' require 'deimos/config/configuration' -require 'deimos/avro_data_encoder' -require 'deimos/avro_data_decoder' require 'deimos/producer' require 'deimos/active_record_producer' require 'deimos/active_record_consumer' @@ -15,17 +11,19 @@ require 'deimos/instrumentation' require 'deimos/utils/lag_reporter' -require 'deimos/publish_backend' +require 'deimos/backends/base' require 'deimos/backends/kafka' require 'deimos/backends/kafka_async' require 'deimos/backends/test' +require 'deimos/schema_backends/base' + require 'deimos/monkey_patches/ruby_kafka_heartbeat' -require 'deimos/monkey_patches/schema_store' require 'deimos/monkey_patches/phobos_producer' require 'deimos/monkey_patches/phobos_cli' require 'deimos/railtie' if defined?(Rails) + if defined?(ActiveRecord) require 'deimos/kafka_source' require 'deimos/kafka_topic_info' @@ -34,6 +32,7 @@ require 'deimos/utils/executor.rb' require 'deimos/utils/db_producer.rb' end + require 'deimos/utils/inline_consumer' require 'yaml' require 'erb' @@ -41,6 +40,22 @@ # Parent module. module Deimos class << self + # @return [Class < Deimos::SchemaBackends::Base] + def schema_backend_class + backend = Deimos.config.schema.backend.to_s + + require "deimos/schema_backends/#{backend}" + + "Deimos::SchemaBackends::#{backend.classify}".constantize + end + + # @param schema [String|Symbol] + # @param namespace [String] + # @return [Deimos::SchemaBackends::Base] + def schema_backend(schema:, namespace:) + schema_backend_class.new(schema: schema, namespace: namespace) + end + # Start the DB producers to send Kafka messages. # @param thread_count [Integer] the number of threads to start. def start_db_backend!(thread_count: 1) diff --git a/lib/deimos/active_record_consumer.rb b/lib/deimos/active_record_consumer.rb index e2d218e5..413a06e4 100644 --- a/lib/deimos/active_record_consumer.rb +++ b/lib/deimos/active_record_consumer.rb @@ -68,31 +68,30 @@ def destroy_record(record) def record_attributes(payload) klass = self.class.config[:record_class] attributes = {} - schema = self.class.decoder.avro_schema - schema.fields.each do |field| + self.class.decoder.schema_fields.each do |field| column = klass.columns.find { |c| c.name == field.name } next if column.nil? next if %w(updated_at created_at).include?(field.name) - attributes[field.name] = _coerce_field(field, column, payload[field.name]) + attributes[field.name] = _coerce_field(column, payload[field.name]) end attributes end private - # @param field [Avro::Schema] # @param column [ActiveRecord::ConnectionAdapters::Column] # @param val [Object] - def _coerce_field(field, column, val) + def _coerce_field(column, val) return nil if val.nil? - field_type = field.type.type.to_sym - if field_type == :union - union_types = field.type.schemas.map { |s| s.type.to_sym } - field_type = union_types.find { |t| t != :null } - end - if column.type == :datetime && %i(int long).include?(field_type) + is_integer = begin + val.is_a?(Integer) || (val.is_a?(String) && Integer(val)) + rescue StandardError + false + end + + if column.type == :datetime && is_integer return Time.zone.strptime(val.to_s, '%s') end diff --git a/lib/deimos/active_record_producer.rb b/lib/deimos/active_record_producer.rb index b1af4f11..c149bc5b 100644 --- a/lib/deimos/active_record_producer.rb +++ b/lib/deimos/active_record_producer.rb @@ -53,10 +53,10 @@ def send_events(records, force_send: false) # is not set. # @return [Hash] def generate_payload(attributes, _record) - schema = self.encoder.avro_schema + fields = self.encoder.schema_fields payload = attributes.stringify_keys payload.delete_if do |k, _| - k.to_sym != :payload_key && !schema.fields.find { |f| f.name == k } + k.to_sym != :payload_key && !fields.map(&:name).include?(k) end end end diff --git a/lib/deimos/avro_data_coder.rb b/lib/deimos/avro_data_coder.rb deleted file mode 100644 index 91460f91..00000000 --- a/lib/deimos/avro_data_coder.rb +++ /dev/null @@ -1,89 +0,0 @@ -# frozen_string_literal: true - -module Deimos - # Base class for the encoder / decoder classes. - class AvroDataCoder - attr_accessor :schema, :namespace, :config, :schema_store - - # @param schema [String] - # @param namespace [String] - # @param schema_store [AvroTurf::SchemaStore] - def initialize(schema:, namespace:, schema_store: nil) - @schema = schema - @namespace = namespace - @schema_store = schema_store || - AvroTurf::SchemaStore.new(path: Deimos.config.schema.path) - end - - # @param schema [String] - # @return [Avro::Schema] - def avro_schema(schema=nil) - schema ||= @schema - @schema_store.find(schema, @namespace) - end - - private - - # @return [AvroTurf] - def avro_turf - @avro_turf ||= AvroTurf.new( - schemas_path: Deimos.config.schema.path, - schema_store: @schema_store - ) - @avro_turf - end - - # @return [AvroTurf::Messaging] - def avro_turf_messaging - @avro_turf_messaging ||= AvroTurf::Messaging.new( - schema_store: @schema_store, - registry_url: Deimos.config.schema.registry_url, - schemas_path: Deimos.config.schema.path, - namespace: @namespace - ) - end - - # Generate a key schema from the given value schema and key ID. This - # is used when encoding or decoding keys from an existing value schema. - # @param key_id [Symbol] - # @return [Hash] - def _generate_key_schema(key_id) - return @key_schema if @key_schema - - value_schema = @schema_store.find(@schema, @namespace) - key_field = value_schema.fields.find { |f| f.name == key_id.to_s } - name = _key_schema_name(@schema) - @key_schema = { - 'type' => 'record', - 'name' => name, - 'namespace' => @namespace, - 'doc' => "Key for #{@namespace}.#{@schema}", - 'fields' => [ - { - 'name' => key_id, - 'type' => key_field.type.type_sym.to_s - } - ] - } - @schema_store.add_schema(@key_schema) - @key_schema - end - - # @param value_schema [Hash] - # @return [String] - def _field_name_from_schema(value_schema) - raise "Schema #{@schema} not found!" if value_schema.nil? - if value_schema['fields'].nil? || value_schema['fields'].empty? - raise "Schema #{@schema} has no fields!" - end - - value_schema['fields'][0]['name'] - end - - # @param schema [String] - # @return [String] - def _key_schema_name(schema) - "#{schema.gsub('-value', '')}_key" - end - end -end diff --git a/lib/deimos/avro_data_decoder.rb b/lib/deimos/avro_data_decoder.rb deleted file mode 100644 index 98ca6c62..00000000 --- a/lib/deimos/avro_data_decoder.rb +++ /dev/null @@ -1,36 +0,0 @@ -# frozen_string_literal: true - -require 'avro_turf/messaging' -require 'deimos/avro_data_coder' - -module Deimos - # Service Object to decode avro messages - class AvroDataDecoder < AvroDataCoder - # Decode some data. - # @param payload [Hash|String] - # @param schema [String] - # @return [Hash] - def decode(payload, schema: nil) - schema ||= @schema - avro_turf_messaging.decode(payload, schema_name: schema) - end - - # Decode against a local schema. - # @param payload [Hash] - # @param schema [String] - # @return [Hash] - def decode_local(payload, schema: nil) - schema ||= @schema - avro_turf.decode(payload, schema_name: schema, namespace: @namespace) - end - - # @param payload [String] the encoded key. - # @param key_id [String|Symbol] - # @return [Object] the decoded key (int/long/string). - def decode_key(payload, key_id) - key_schema = _generate_key_schema(key_id) - field_name = _field_name_from_schema(key_schema) - decode(payload, schema: key_schema['name'])[field_name] - end - end -end diff --git a/lib/deimos/avro_data_encoder.rb b/lib/deimos/avro_data_encoder.rb deleted file mode 100644 index 57c01606..00000000 --- a/lib/deimos/avro_data_encoder.rb +++ /dev/null @@ -1,51 +0,0 @@ -# frozen_string_literal: true - -require 'avro_turf/messaging' -require 'deimos/avro_data_coder' - -module Deimos - # Service Object to decode Avro messages. - class AvroDataEncoder < AvroDataCoder - # @param payload [Hash] - # @param schema [String] - # @return [String] - def encode_local(payload, schema: nil) - schema ||= @schema - Avro::SchemaValidator.validate!(avro_schema(schema), payload, - recursive: true, - fail_on_extra_fields: true) - avro_turf.encode(payload, schema_name: schema, namespace: @namespace) - rescue Avro::IO::AvroTypeError - # throw a more detailed error - value_schema = @schema_store.find(schema, @namespace) - Avro::SchemaValidator.validate!(value_schema, payload) - end - - # @param payload [Hash] - # @param schema [String] - # @param topic [String] - # @return [String] - def encode(payload, schema: nil, topic: nil) - schema ||= @schema - Avro::SchemaValidator.validate!(avro_schema(schema), payload, - recursive: true, - fail_on_extra_fields: true) - avro_turf_messaging.encode(payload, schema_name: schema, subject: topic) - rescue Avro::IO::AvroTypeError - # throw a more detailed error - schema = @schema_store.find(@schema, @namespace) - Avro::SchemaValidator.validate!(schema, payload) - end - - # @param key_id [Symbol|String] - # @param key [Object] - # @param topic [String] - # @return [String] the encoded key. - def encode_key(key_id, key, topic=nil) - key_schema = _generate_key_schema(key_id) - field_name = _field_name_from_schema(key_schema) - payload = { field_name => key } - encode(payload, schema: key_schema['name'], topic: topic) - end - end -end diff --git a/lib/deimos/backends/base.rb b/lib/deimos/backends/base.rb new file mode 100644 index 00000000..e9c23ad2 --- /dev/null +++ b/lib/deimos/backends/base.rb @@ -0,0 +1,32 @@ +# frozen_string_literal: true + +module Deimos + module Backends + # Abstract class for all publish backends. + class Base + class << self + # @param producer_class [Class < Deimos::Producer] + # @param messages [Array] + def publish(producer_class:, messages:) + Deimos.config.logger.info( + message: 'Publishing messages', + topic: producer_class.topic, + payloads: messages.map do |message| + { + payload: message.payload, + key: message.key + } + end + ) + execute(producer_class: producer_class, messages: messages) + end + + # @param producer_class [Class < Deimos::Producer] + # @param messages [Array] + def execute(producer_class:, messages:) + raise NotImplementedError + end + end + end + end +end diff --git a/lib/deimos/backends/db.rb b/lib/deimos/backends/db.rb index 42fc2dcf..70a92fbc 100644 --- a/lib/deimos/backends/db.rb +++ b/lib/deimos/backends/db.rb @@ -6,7 +6,7 @@ module Deimos module Backends # Backend which saves messages to the database instead of immediately # sending them. - class Db < Deimos::PublishBackend + class Db < Base class << self # :nodoc: def execute(producer_class:, messages:) diff --git a/lib/deimos/backends/kafka.rb b/lib/deimos/backends/kafka.rb index e5aefaa9..6c5b8b56 100644 --- a/lib/deimos/backends/kafka.rb +++ b/lib/deimos/backends/kafka.rb @@ -3,7 +3,7 @@ module Deimos module Backends # Default backend to produce to Kafka. - class Kafka < Deimos::PublishBackend + class Kafka < Base include Phobos::Producer # Shut down the producer if necessary. diff --git a/lib/deimos/backends/kafka_async.rb b/lib/deimos/backends/kafka_async.rb index f43847c4..0b8eb701 100644 --- a/lib/deimos/backends/kafka_async.rb +++ b/lib/deimos/backends/kafka_async.rb @@ -3,7 +3,7 @@ module Deimos module Backends # Backend which produces to Kafka via an async producer. - class KafkaAsync < Deimos::PublishBackend + class KafkaAsync < Base include Phobos::Producer # Shut down the producer cleanly. diff --git a/lib/deimos/base_consumer.rb b/lib/deimos/base_consumer.rb index 7ae90d82..59ad8767 100644 --- a/lib/deimos/base_consumer.rb +++ b/lib/deimos/base_consumer.rb @@ -6,20 +6,20 @@ class BaseConsumer include SharedConfig class << self - # @return [AvroDataEncoder] + # @return [Deimos::SchemaBackends::Base] def decoder - @decoder ||= AvroDataDecoder.new(schema: config[:schema], - namespace: config[:namespace]) + @decoder ||= Deimos.schema_backend(schema: config[:schema], + namespace: config[:namespace]) end - # @return [AvroDataEncoder] + # @return [Deimos::SchemaBackends::Base] def key_decoder - @key_decoder ||= AvroDataDecoder.new(schema: config[:key_schema], - namespace: config[:namespace]) + @key_decoder ||= Deimos.schema_backend(schema: config[:key_schema], + namespace: config[:namespace]) end end - # Helper method to decode an Avro-encoded key. + # Helper method to decode an encoded key. # @param key [String] # @return [Object] the decoded key. def decode_key(key) diff --git a/lib/deimos/batch_consumer.rb b/lib/deimos/batch_consumer.rb index 148038e2..e2dbc26b 100644 --- a/lib/deimos/batch_consumer.rb +++ b/lib/deimos/batch_consumer.rb @@ -1,6 +1,5 @@ # frozen_string_literal: true -require 'deimos/avro_data_decoder' require 'deimos/base_consumer' require 'phobos/batch_handler' diff --git a/lib/deimos/config/configuration.rb b/lib/deimos/config/configuration.rb index a92f9932..b18c557d 100644 --- a/lib/deimos/config/configuration.rb +++ b/lib/deimos/config/configuration.rb @@ -251,6 +251,10 @@ def self.configure_producer_or_consumer(kafka_config) end setting :schema do + + # Backend class to use when encoding/decoding messages. + setting :backend, :mock + # URL of the Confluent schema registry. # @return [String] setting :registry_url, 'http://localhost:8081' diff --git a/lib/deimos/consumer.rb b/lib/deimos/consumer.rb index 4327b887..c84496fa 100644 --- a/lib/deimos/consumer.rb +++ b/lib/deimos/consumer.rb @@ -1,11 +1,9 @@ # frozen_string_literal: true -require 'deimos/avro_data_decoder' require 'deimos/base_consumer' require 'deimos/shared_config' require 'phobos/handler' require 'active_support/all' -require 'ddtrace' # Class to consume messages coming from the pipeline topic # Note: According to the docs, instances of your handler will be created diff --git a/lib/deimos/message.rb b/lib/deimos/message.rb index 65247e2d..a07b7179 100644 --- a/lib/deimos/message.rb +++ b/lib/deimos/message.rb @@ -18,23 +18,23 @@ def initialize(payload, producer, topic: nil, key: nil, partition_key: nil) # Add message_id and timestamp default values if they are in the # schema and don't already have values. - # @param schema [Avro::Schema] - def add_fields(schema) + # @param fields [Array] existing fields in the schema. + def add_fields(fields) return if @payload.except(:payload_key, :partition_key).blank? - if schema.fields.any? { |f| f.name == 'message_id' } + if fields.include?('message_id') @payload['message_id'] ||= SecureRandom.uuid end - if schema.fields.any? { |f| f.name == 'timestamp' } + if fields.include?('timestamp') @payload['timestamp'] ||= Time.now.in_time_zone.to_s end end - # @param schema [Avro::Schema] - def coerce_fields(schema) + # @param encoder [Deimos::SchemaBackends::Base] + def coerce_fields(encoder) return if payload.nil? - @payload = SchemaCoercer.new(schema).coerce(@payload) + @payload = encoder.coerce(@payload) end # @return [Hash] diff --git a/lib/deimos/monkey_patches/schema_store.rb b/lib/deimos/monkey_patches/schema_store.rb deleted file mode 100644 index ca37fc58..00000000 --- a/lib/deimos/monkey_patches/schema_store.rb +++ /dev/null @@ -1,19 +0,0 @@ -# frozen_string_literal: true - -require 'avro_turf/schema_store' - -# Allows us to add in-memory schemas to the schema store in -# addition to the ones stored in the file system. -class AvroTurf::SchemaStore - attr_accessor :schemas - - # @param schema_hash [Hash] - def add_schema(schema_hash) - name = schema_hash['name'] - namespace = schema_hash['namespace'] - full_name = Avro::Name.make_fullname(name, namespace) - return if @schemas.key?(full_name) - - Avro::Schema.real_parse(schema_hash, @schemas) - end -end diff --git a/lib/deimos/producer.rb b/lib/deimos/producer.rb index e1213d92..854ebd79 100644 --- a/lib/deimos/producer.rb +++ b/lib/deimos/producer.rb @@ -1,9 +1,7 @@ # frozen_string_literal: true -require 'deimos/avro_data_encoder' require 'deimos/message' require 'deimos/shared_config' -require 'deimos/schema_coercer' require 'phobos/producer' require 'active_support/notifications' @@ -143,23 +141,23 @@ def produce_batch(backend, batch) backend.publish(producer_class: self, messages: batch) end - # @return [AvroDataEncoder] + # @return [Deimos::SchemaBackends::Base] def encoder - @encoder ||= AvroDataEncoder.new(schema: config[:schema], - namespace: config[:namespace]) + @encoder ||= Deimos.schema_backend(schema: config[:schema], + namespace: config[:namespace]) end - # @return [AvroDataEncoder] + # @return [Deimos::SchemaBackends::Base] def key_encoder - @key_encoder ||= AvroDataEncoder.new(schema: config[:key_schema], - namespace: config[:namespace]) + @key_encoder ||= Deimos.schema_backend(schema: config[:key_schema], + namespace: config[:namespace]) end # Override this in active record producers to add # non-schema fields to check for updates # @return [Array] fields to check for updates def watched_attributes - self.encoder.avro_schema.fields.map(&:name) + self.encoder.schema_fields.map(&:name) end private @@ -169,13 +167,13 @@ def _process_message(message) # this violates the Law of Demeter but it has to happen in a very # specific order and requires a bunch of methods on the producer # to work correctly. - message.add_fields(encoder.avro_schema) + message.add_fields(encoder.schema_fields.map(&:name)) message.partition_key = self.partition_key(message.payload) message.key = _retrieve_key(message.payload) # need to do this before _coerce_fields because that might result # in an empty payload which is an *error* whereas this is intended. message.payload = nil if message.payload.blank? - message.coerce_fields(encoder.avro_schema) + message.coerce_fields(encoder) message.encoded_key = _encode_key(message.key) message.topic = self.topic message.encoded_payload = if message.payload.nil? @@ -200,7 +198,7 @@ def _encode_key(key) end if config[:key_field] - encoder.encode_key(config[:key_field], key, "#{config[:topic]}-key") + encoder.encode_key(config[:key_field], key, topic: "#{config[:topic]}-key") elsif config[:key_schema] key_encoder.encode(key, topic: "#{config[:topic]}-key") else diff --git a/lib/deimos/publish_backend.rb b/lib/deimos/publish_backend.rb deleted file mode 100644 index 4ea8893a..00000000 --- a/lib/deimos/publish_backend.rb +++ /dev/null @@ -1,30 +0,0 @@ -# frozen_string_literal: true - -module Deimos - # Abstract class for all publish backends. - class PublishBackend - class << self - # @param producer_class [Class < Deimos::Producer] - # @param messages [Array] - def publish(producer_class:, messages:) - Deimos.config.logger.info( - message: 'Publishing messages', - topic: producer_class.topic, - payloads: messages.map do |message| - { - payload: message.payload, - key: message.key - } - end - ) - execute(producer_class: producer_class, messages: messages) - end - - # @param producer_class [Class < Deimos::Producer] - # @param messages [Array] - def execute(producer_class:, messages:) - raise NotImplementedError - end - end - end -end diff --git a/lib/deimos/schema_backends/avro_base.rb b/lib/deimos/schema_backends/avro_base.rb new file mode 100644 index 00000000..b48703bb --- /dev/null +++ b/lib/deimos/schema_backends/avro_base.rb @@ -0,0 +1,108 @@ +# frozen_string_literal: true + +require_relative 'base' +require 'avro' +require 'avro_turf' +require 'avro_turf/mutable_schema_store' +require_relative 'avro_schema_coercer' + +module Deimos + module SchemaBackends + # Encode / decode using Avro, either locally or via schema registry. + class AvroBase < Base + attr_accessor :schema_store + + # @override + def initialize(schema:, namespace:) + super(schema: schema, namespace: namespace) + @schema_store = AvroTurf::MutableSchemaStore.new(path: Deimos.config.schema.path) + end + + # @override + def encode_key(key_id, key, topic: nil) + @key_schema ||= _generate_key_schema(key_id) + field_name = _field_name_from_schema(@key_schema) + payload = { field_name => key } + encode(payload, schema: @key_schema['name'], topic: topic) + end + + # @override + def decode_key(payload, key_id) + @key_schema ||= _generate_key_schema(key_id) + field_name = _field_name_from_schema(@key_schema) + decode(payload, schema: @key_schema['name'])[field_name] + end + + # @override + def coerce_field(field, value) + AvroSchemaCoercer.new(avro_schema).coerce_type(field.type, value) + end + + # @override + def schema_fields + avro_schema.fields.map { |field| SchemaField.new(field.name, field.type) } + end + + # @override + def validate(payload, schema:) + Avro::SchemaValidator.validate!(avro_schema(schema), payload, + recursive: true, + fail_on_extra_fields: true) + end + + # @override + def self.mock_backend + :avro_validation + end + + private + + # @param schema [String] + # @return [Avro::Schema] + def avro_schema(schema=nil) + schema ||= @schema + @schema_store.find(schema, @namespace) + end + + # Generate a key schema from the given value schema and key ID. This + # is used when encoding or decoding keys from an existing value schema. + # @param key_id [Symbol] + # @return [Hash] + def _generate_key_schema(key_id) + key_field = avro_schema.fields.find { |f| f.name == key_id.to_s } + name = _key_schema_name(@schema) + key_schema = { + 'type' => 'record', + 'name' => name, + 'namespace' => @namespace, + 'doc' => "Key for #{@namespace}.#{@schema}", + 'fields' => [ + { + 'name' => key_id, + 'type' => key_field.type.type_sym.to_s + } + ] + } + @schema_store.add_schema(key_schema) + key_schema + end + + # @param value_schema [Hash] + # @return [String] + def _field_name_from_schema(value_schema) + raise "Schema #{@schema} not found!" if value_schema.nil? + if value_schema['fields'].nil? || value_schema['fields'].empty? + raise "Schema #{@schema} has no fields!" + end + + value_schema['fields'][0]['name'] + end + + # @param schema [String] + # @return [String] + def _key_schema_name(schema) + "#{schema.gsub('-value', '')}_key" + end + end + end +end diff --git a/lib/deimos/schema_backends/avro_local.rb b/lib/deimos/schema_backends/avro_local.rb new file mode 100644 index 00000000..596421f4 --- /dev/null +++ b/lib/deimos/schema_backends/avro_local.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +require_relative 'avro_base' + +module Deimos + module SchemaBackends + # Encode / decode using local Avro encoding. + class AvroLocal < AvroBase + # @override + def decode_payload(payload, schema:) + avro_turf.decode(payload, schema_name: schema, namespace: @namespace) + end + + # @override + def encode_payload(payload, schema: nil, topic: nil) + avro_turf.encode(payload, schema_name: schema, namespace: @namespace) + end + + private + + # @return [AvroTurf] + def avro_turf + @avro_turf ||= AvroTurf.new( + schemas_path: Deimos.config.schema.path, + schema_store: @schema_store + ) + end + end + end +end diff --git a/lib/deimos/schema_coercer.rb b/lib/deimos/schema_backends/avro_schema_coercer.rb similarity index 80% rename from lib/deimos/schema_coercer.rb rename to lib/deimos/schema_backends/avro_schema_coercer.rb index a616c1aa..523a9bb3 100644 --- a/lib/deimos/schema_coercer.rb +++ b/lib/deimos/schema_backends/avro_schema_coercer.rb @@ -1,66 +1,20 @@ # frozen_string_literal: true +require 'active_support/time' + module Deimos # Class to coerce values in a payload to match a schema. - class SchemaCoercer + class AvroSchemaCoercer # @param schema [Avro::Schema] def initialize(schema) @schema = schema end - # @param payload [Hash] - # @return [HashWithIndifferentAccess] - def coerce(payload) - result = {} - @schema.fields.each do |field| - name = field.name - next unless payload.key?(name) - - val = payload[name] - result[name] = _coerce_type(field.type, val) - end - result.with_indifferent_access - end - - private - - # @param val [String] - # @return [Boolean] - def _is_integer_string?(val) - return false unless val.is_a?(String) - - begin - true if Integer(val) - rescue StandardError - false - end - end - - # @param val [String] - # @return [Boolean] - def _is_float_string?(val) - return false unless val.is_a?(String) - - begin - true if Float(val) - rescue StandardError - false - end - end - - # @param val [Object] - # @return [Boolean] - def _is_to_s_defined?(val) - return false if val.nil? - - Object.instance_method(:to_s).bind(val).call != val.to_s - end - # @param type [Symbol] # @param val [Object] # @return [Object] - def _coerce_type(type, val) - int_classes = [Time, DateTime, ActiveSupport::TimeWithZone] + def coerce_type(type, val) + int_classes = [Time, ActiveSupport::TimeWithZone] field_type = type.type.to_sym if field_type == :union union_types = type.schemas.map { |s| s.type.to_sym } @@ -104,5 +58,39 @@ def _coerce_type(type, val) val end end + + private + + # @param val [String] + # @return [Boolean] + def _is_integer_string?(val) + return false unless val.is_a?(String) + + begin + true if Integer(val) + rescue StandardError + false + end + end + + # @param val [String] + # @return [Boolean] + def _is_float_string?(val) + return false unless val.is_a?(String) + + begin + true if Float(val) + rescue StandardError + false + end + end + + # @param val [Object] + # @return [Boolean] + def _is_to_s_defined?(val) + return false if val.nil? + + Object.instance_method(:to_s).bind(val).call != val.to_s + end end end diff --git a/lib/deimos/schema_backends/avro_schema_registry.rb b/lib/deimos/schema_backends/avro_schema_registry.rb new file mode 100644 index 00000000..1ae4b839 --- /dev/null +++ b/lib/deimos/schema_backends/avro_schema_registry.rb @@ -0,0 +1,34 @@ +# frozen_string_literal: true + +require_relative 'avro_base' +require_relative 'avro_validation' +require 'avro_turf/messaging' + +module Deimos + module SchemaBackends + # Encode / decode using the Avro schema registry. + class AvroSchemaRegistry < AvroBase + # @override + def decode_payload(payload, schema:) + avro_turf_messaging.decode(payload, schema_name: schema) + end + + # @override + def encode_payload(payload, schema: nil, topic: nil) + avro_turf_messaging.encode(payload, schema_name: schema, subject: topic) + end + + private + + # @return [AvroTurf::Messaging] + def avro_turf_messaging + @avro_turf_messaging ||= AvroTurf::Messaging.new( + schema_store: @schema_store, + registry_url: Deimos.config.schema.registry_url, + schemas_path: Deimos.config.schema.path, + namespace: @namespace + ) + end + end + end +end diff --git a/lib/deimos/schema_backends/avro_validation.rb b/lib/deimos/schema_backends/avro_validation.rb new file mode 100644 index 00000000..2842d23f --- /dev/null +++ b/lib/deimos/schema_backends/avro_validation.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +require_relative 'avro_base' + +module Deimos + module SchemaBackends + # Leave Ruby hashes as is but validate them against the schema. + # Useful for unit tests. + class AvroValidation < AvroBase + # @override + def decode_payload(payload, schema: nil) + payload.with_indifferent_access + end + + # @override + def encode_payload(payload, schema: nil, topic: nil) + payload.with_indifferent_access + end + end + end +end diff --git a/lib/deimos/schema_backends/base.rb b/lib/deimos/schema_backends/base.rb new file mode 100644 index 00000000..a2b4da75 --- /dev/null +++ b/lib/deimos/schema_backends/base.rb @@ -0,0 +1,130 @@ +# frozen_string_literal: true + +module Deimos + # Represents a field in the schema. + class SchemaField + attr_accessor :name, :type + + # @param name [String] + # @param type [Object] + def initialize(name, type) + @name = name + @type = type + end + end + + module SchemaBackends + # Base class for encoding / decoding. + class Base + attr_accessor :schema, :namespace, :key_schema + + # @param schema [String|Symbol] + # @param namespace [String] + def initialize(schema:, namespace: nil) + @schema = schema + @namespace = namespace + end + + # Encode a payload with a schema. Public method. + # @param payload [Hash] + # @param schema [Symbol|String] + # @param topic [String] + # @return [String] + def encode(payload, schema: nil, topic: nil) + validate(payload, schema: schema || @schema) + encode_payload(payload, schema: schema || @schema, topic: topic) + end + + # Decode a payload with a schema. Public method. + # @param payload [String] + # @param schema [Symbol|String] + # @return [Hash] + def decode(payload, schema: nil) + decode_payload(payload, schema: schema || @schema) + end + + # Given a hash, coerce its types to our schema. To be defined by subclass. + # @param payload [Hash] + # @return [Hash] + def coerce(payload) + result = {} + self.schema_fields.each do |field| + name = field.name + next unless payload.key?(name) + + val = payload[name] + result[name] = coerce_field(field, val) + end + result.with_indifferent_access + end + + # Indicate a class which should act as a mocked version of this backend. + # This class should perform all validations but not actually do any + # encoding. + # Note that the "mock" version (e.g. avro_validation) should return + # its own symbol when this is called, since it may be called multiple + # times depending on the order of RSpec helpers. + # @return [Symbol] + def self.mock_backend + :mock + end + + # Encode a payload. To be defined by subclass. + # @param payload [Hash] + # @param schema [Symbol|String] + # @param topic [String] + # @return [String] + def encode_payload(_payload, schema:, topic: nil) + raise NotImplementedError + end + + # Decode a payload. To be defined by subclass. + # @param payload [String] + # @param schema [String|Symbol] + # @return [Hash] + def decode_payload(_payload, schema:) + raise NotImplementedError + end + + # Validate that a payload matches the schema. To be defined by subclass. + # @param payload [Hash] + # @param schema [String|Symbol] + def validate(_payload, schema:) + raise NotImplementedError + end + + # List of field names belonging to the schema. To be defined by subclass. + # @return [Array] + def schema_fields + raise NotImplementedError + end + + # Given a value and a field definition (as defined by whatever the + # underlying schema library is), coerce the given value to + # the given field type. + # @param field [SchemaField] + # @param value [Object] + # @return [Object] + def coerce_field(_field, _value) + raise NotImplementedError + end + + # Encode a message key. To be defined by subclass. + # @param key [String] the value to use as the key. + # @param key_id [Symbol|String] the field name of the key. + # @param topic [String] + # @return [String] + def encode_key(_key, _key_id, topic: nil) + raise NotImplementedError + end + + # Decode a message key. To be defined by subclass. + # @param payload [Hash] the message itself. + # @param key_id [Symbol|String] the field in the message to decode. + # @return [String] + def decode_key(_payload, _key_id) + raise NotImplementedError + end + end + end +end diff --git a/lib/deimos/schema_backends/mock.rb b/lib/deimos/schema_backends/mock.rb new file mode 100644 index 00000000..63cc34c3 --- /dev/null +++ b/lib/deimos/schema_backends/mock.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +module Deimos + module SchemaBackends + # Mock implementation of a schema backend that does no encoding or validation. + class Mock < Base + # @override + def decode_payload(payload, schema:) + payload.is_a?(String) ? 'payload-decoded' : payload.map { |k, v| [k, "decoded-#{v}"] } + end + + # @override + def encode_payload(payload, schema:, topic: nil) + payload.is_a?(String) ? 'payload-encoded' : payload.map { |k, v| [k, "encoded-#{v}"] } + end + + # @override + def validate(_payload, schema:) + end + + # @override + def schema_fields + [] + end + + # @override + def coerce_field(_type, value) + value + end + + # @override + def encode_key(key_id, key) + { key_id => key } + end + + # @override + def decode_key(payload, key_id) + payload[key_id] + end + end + end +end diff --git a/lib/deimos/test_helpers.rb b/lib/deimos/test_helpers.rb index 9d7e8554..491a8c6c 100644 --- a/lib/deimos/test_helpers.rb +++ b/lib/deimos/test_helpers.rb @@ -2,7 +2,6 @@ require 'active_support/concern' require 'active_support/core_ext' -require 'avro_turf' require 'deimos/tracing/mock' require 'deimos/metrics/mock' @@ -22,129 +21,47 @@ def sent_messages end included do - # @param encoder_schema [String] - # @param namespace [String] - # @return [Deimos::AvroDataEncoder] - def create_encoder(encoder_schema, namespace) - encoder = Deimos::AvroDataEncoder.new(schema: encoder_schema, - namespace: namespace) - - # we added and_wrap_original to RSpec 2 but the regular block - # syntax wasn't working for some reason - block wasn't being passed - # to the method - block = proc do |m, *args| - m.call(*args) - args[0] - end - allow(encoder).to receive(:encode_local).and_wrap_original(&block) - allow(encoder).to receive(:encode) do |payload, schema: nil, topic: nil| - encoder.encode_local(payload, schema: schema) - end - - block = proc do |m, *args| - m.call(*args)&.values&.first - end - allow(encoder).to receive(:encode_key).and_wrap_original(&block) - encoder - end - - # @param decoder_schema [String] - # @param namespace [String] - # @return [Deimos::AvroDataDecoder] - def create_decoder(decoder_schema, namespace) - decoder = Deimos::AvroDataDecoder.new(schema: decoder_schema, - namespace: namespace) - allow(decoder).to receive(:decode_local) { |payload| payload } - allow(decoder).to receive(:decode) do |payload, schema: nil| - schema ||= decoder.schema - if schema && decoder.namespace - # Validate against local schema. - encoder = Deimos::AvroDataEncoder.new(schema: schema, - namespace: decoder.namespace) - encoder.schema_store = decoder.schema_store - payload = payload.respond_to?(:stringify_keys) ? payload.stringify_keys : payload - encoder.encode_local(payload) - end - payload - end - allow(decoder).to receive(:decode_key) do |payload, _key_id| - payload.values.first - end - decoder - end RSpec.configure do |config| config.before(:suite) do - Deimos.configure do |fr_config| - fr_config.logger = Logger.new(STDOUT) - fr_config.consumers.reraise_errors = true - fr_config.kafka.seed_brokers ||= ['test_broker'] + Deimos.configure do |d_config| + d_config.logger = Logger.new(STDOUT) + d_config.consumers.reraise_errors = true + d_config.kafka.seed_brokers ||= ['test_broker'] + d_config.schema.backend = Deimos.schema_backend_class.mock_backend + d_config.producers.backend = :test end end - end + before(:each) do client = double('client').as_null_object allow(client).to receive(:time) do |*_args, &block| block.call end + Deimos::Backends::Test.sent_messages.clear end end - # Stub all already-loaded producers and consumers for unit testing purposes. + # :nodoc: def stub_producers_and_consumers! - Deimos.configure { |c| c.producers.backend = :test } - Deimos::Backends::Test.sent_messages.clear - - Deimos::Producer.descendants.each do |klass| - next if klass == Deimos::ActiveRecordProducer # "abstract" class - - stub_producer(klass) - end - - Deimos::Consumer.descendants.each do |klass| - # TODO: remove this when ActiveRecordConsumer uses batching - next if klass == Deimos::ActiveRecordConsumer # "abstract" class - - stub_consumer(klass) - end - - Deimos::BatchConsumer.descendants.each do |klass| - next if klass == Deimos::ActiveRecordConsumer # "abstract" class - - stub_batch_consumer(klass) - end + warn('stub_producers_and_consumers! is no longer necessary and this method will be removed in 3.0') end - # Stub a given producer class. - # @param klass [Class < Deimos::Producer] - def stub_producer(klass) - allow(klass).to receive(:encoder) do - create_encoder(klass.config[:schema], klass.config[:namespace]) - end - allow(klass).to receive(:key_encoder) do - create_encoder(klass.config[:key_schema], klass.config[:namespace]) - end + # :nodoc: + def stub_producer(_klass) + warn('Stubbing producers is no longer necessary and this method will be removed in 3.0') end - # Stub a given consumer class. - # @param klass [Class < Deimos::Consumer] - def stub_consumer(klass) - _stub_base_consumer(klass) - klass.class_eval do - alias_method(:old_consume, :consume) unless self.instance_methods.include?(:old_consume) - end - allow_any_instance_of(klass).to receive(:consume) do |instance, payload, metadata| - metadata[:key] = klass.new.decode_key(metadata[:key]) if klass.config[:key_configured] - instance.old_consume(payload, metadata) - end + # :nodoc: + def stub_consumer(_klass) + warn('Stubbing consumers is no longer necessary and this method will be removed in 3.0') end - # Stub a given batch consumer class. - # @param klass [Class < Deimos::BatchConsumer] - def stub_batch_consumer(klass) - _stub_base_consumer(klass) + # :nodoc: + def stub_batch_consumer(_klass) + warn('Stubbing batch consumers is no longer necessary and this method will be removed in 3.0') end # get the difference of 2 hashes. @@ -246,7 +163,7 @@ def was_message_sent?(message, topic, key=nil) end # Test that a given handler will consume a given payload correctly, i.e. - # that the Avro schema is correct. If + # that the schema is correct. If # a block is given, that block will be executed when `consume` is called. # Otherwise it will just confirm that `consume` is called at all. # @param handler_class_or_topic [Class|String] Class which inherits from @@ -298,33 +215,18 @@ def test_consume_message(handler_class_or_topic, ).send(:process_message, payload) end - # Check to see that a given message will fail due to Avro errors. + # Check to see that a given message will fail due to validation errors. # @param handler_class [Class] # @param payload [Hash] def test_consume_invalid_message(handler_class, payload) - handler = handler_class.new - allow(handler_class).to receive(:new).and_return(handler) - listener = double('listener', - handler_class: handler_class, - encoding: nil) - message = double('message', - key: _key_from_consumer(handler_class), - partition_key: nil, - partition: 1, - offset: 1, - value: payload) - expect { - Phobos::Actions::ProcessMessage.new( - listener: listener, - message: message, - listener_metadata: { topic: 'my-topic' } - ).send(:process_message, payload) - }.to raise_error(Avro::SchemaValidator::ValidationError) + handler_class.decoder.validate(payload, + schema: handler_class.decoder.schema) + }.to raise_error end # Test that a given handler will consume a given batch payload correctly, - # i.e. that the Avro schema is correct. If + # i.e. that the schema is correct. If # a block is given, that block will be executed when `consume` is called. # Otherwise it will just confirm that `consume` is called at all. # @param handler_class_or_topic [Class|String] Class which inherits from @@ -382,7 +284,7 @@ def test_consume_batch(handler_class_or_topic, action.send(:execute) end - # Check to see that a given message will fail due to Avro errors. + # Check to see that a given message will fail due to validation errors. # @param handler_class [Class] # @param payloads [Array] def test_consume_batch_invalid_message(handler_class, payloads) @@ -416,33 +318,21 @@ def test_consume_batch_invalid_message(handler_class, payloads) allow(action).to receive(:handle_error) { |e| raise e } expect { action.send(:execute) }. - to raise_error(Avro::SchemaValidator::ValidationError) - end - - # @param schema1 [String|Hash] a file path, JSON string, or - # hash representing a schema. - # @param schema2 [String|Hash] a file path, JSON string, or - # hash representing a schema. - # @return [Boolean] true if the schemas are compatible, false otherwise. - def self.schemas_compatible?(schema1, schema2) - json1, json2 = [schema1, schema2].map do |schema| - if schema.is_a?(String) - schema = File.read(schema) unless schema.strip.starts_with?('{') # file path - MultiJson.load(schema) - else - schema - end - end - avro_schema1 = Avro::Schema.real_parse(json1, {}) - avro_schema2 = Avro::Schema.real_parse(json2, {}) - Avro::SchemaCompatibility.mutual_read?(avro_schema1, avro_schema2) + to raise_error end private def _key_from_consumer(consumer) - if consumer.config[:key_field] || consumer.config[:key_schema] - { 'test' => 1 } + if consumer.config[:key_field] + { consumer.config[:key_field] => 1 } + elsif consumer.config[:key_schema] + backend = consumer.decoder + old_schema = backend.schema + backend.schema = consumer.config[:key_schema] + key = backend.schema_fields.map { |field| [field.name, 1] }.to_h + backend.schema = old_schema + key elsif consumer.config[:no_keys] nil else @@ -459,19 +349,5 @@ def _get_handler_class_from_topic(topic) handler.handler.constantize end - - # Stub shared methods between consumers/batch consumers - # @param [Class < Deimos::BaseConsumer] klass Consumer class to stub - def _stub_base_consumer(klass) - allow(klass).to receive(:decoder) do - create_decoder(klass.config[:schema], klass.config[:namespace]) - end - - if klass.config[:key_schema] # rubocop:disable Style/GuardClause - allow(klass).to receive(:key_decoder) do - create_decoder(klass.config[:key_schema], klass.config[:namespace]) - end - end - end end end diff --git a/spec/avro_data_decoder_spec.rb b/spec/avro_data_decoder_spec.rb deleted file mode 100644 index 9aac804e..00000000 --- a/spec/avro_data_decoder_spec.rb +++ /dev/null @@ -1,18 +0,0 @@ -# frozen_string_literal: true - -describe Deimos::AvroDataDecoder do - - let(:decoder) do - decoder = described_class.new(schema: 'MySchema', - namespace: 'com.my-namespace') - allow(decoder).to(receive(:decode)) { |payload| payload } - decoder - end - - it 'should decode a key' do - # reset stub from TestHelpers - allow(described_class).to receive(:new).and_call_original - expect(decoder.decode_key({ 'test_id' => '123' }, 'test_id')).to eq('123') - end - -end diff --git a/spec/avro_data_encoder_spec.rb b/spec/avro_data_encoder_spec.rb deleted file mode 100644 index ec330ee6..00000000 --- a/spec/avro_data_encoder_spec.rb +++ /dev/null @@ -1,37 +0,0 @@ -# frozen_string_literal: true - -require 'avro_turf/messaging' - -describe Deimos::AvroDataEncoder do - - let(:encoder) do - encoder = described_class.new(schema: 'MySchema', - namespace: 'com.my-namespace') - allow(encoder).to(receive(:encode)) { |payload| payload } - encoder - end - - specify 'generate_key_schema' do - expect_any_instance_of(AvroTurf::SchemaStore). - to receive(:add_schema).with( - 'type' => 'record', - 'name' => 'MySchema_key', - 'namespace' => 'com.my-namespace', - 'doc' => 'Key for com.my-namespace.MySchema', - 'fields' => [ - { - 'name' => 'test_id', - 'type' => 'string' - } - ] - ) - encoder.send(:_generate_key_schema, 'test_id') - end - - it 'should encode a key' do - # reset stub from TestHelpers - allow(described_class).to receive(:new).and_call_original - expect(encoder.encode_key('test_id', '123')).to eq('test_id' => '123') - end - -end diff --git a/spec/batch_consumer_spec.rb b/spec/batch_consumer_spec.rb index fba86e02..4e3f2657 100644 --- a/spec/batch_consumer_spec.rb +++ b/spec/batch_consumer_spec.rb @@ -80,11 +80,6 @@ def consume_batch(_payloads, _metadata) end it 'should decode payloads for all messages in the batch' do - expect_any_instance_of(Deimos::AvroDataDecoder). - to receive(:decode).with(batch[0]) - expect_any_instance_of(Deimos::AvroDataDecoder). - to receive(:decode).with(batch[1]) - test_consume_batch('my_batch_consume_topic', batch) do |received, _metadata| # Mock decoder simply returns the payload expect(received).to eq(batch) @@ -110,7 +105,6 @@ def consume_batch(_payloads, _metadata) key_config plain: true end stub_const('ConsumerTest::MyBatchConsumer', consumer_class) - stub_batch_consumer(consumer_class) test_consume_batch('my_batch_consume_topic', batch, keys: [1, 2]) do |_received, metadata| expect(metadata[:keys]).to eq([1, 2]) @@ -132,7 +126,6 @@ def consume_batch(_payloads, _metadata) end end stub_const('ConsumerTest::MyBatchConsumer', consumer_class) - stub_batch_consumer(consumer_class) allow(Deimos.config.metrics).to receive(:histogram) end @@ -215,7 +208,6 @@ def consume_batch(_payloads, _metadata) end end stub_const('ConsumerTest::MyBatchConsumer', consumer_class) - stub_batch_consumer(consumer_class) allow(Deimos.config.metrics).to receive(:histogram) end diff --git a/spec/consumer_spec.rb b/spec/consumer_spec.rb index c9eab7aa..357257d6 100644 --- a/spec/consumer_spec.rb +++ b/spec/consumer_spec.rb @@ -150,7 +150,6 @@ def consume(_payload, _metadata) end end stub_const('ConsumerTest::MyConsumer', consumer_class) - stub_consumer(consumer_class) end it 'should consume a message' do diff --git a/spec/deimos_spec.rb b/spec/deimos_spec.rb index 11834e66..af942fd2 100644 --- a/spec/deimos_spec.rb +++ b/spec/deimos_spec.rb @@ -67,10 +67,6 @@ end describe '#start_db_backend!' do - before(:each) do - allow(described_class).to receive(:run_db_backend) - end - it 'should start if backend is db and thread_count is > 0' do signal_handler = instance_double(Deimos::Utils::SignalHandler) allow(signal_handler).to receive(:run!) diff --git a/spec/producer_spec.rb b/spec/producer_spec.rb index 06843793..49d82b50 100644 --- a/spec/producer_spec.rb +++ b/spec/producer_spec.rb @@ -62,8 +62,9 @@ def self.partition_key(payload) subscriber = Deimos.subscribe('produce') do |event| expect(event.payload[:payloads]).to eq([{ 'invalid' => 'key' }]) end + expect(MyProducer.encoder).to receive(:validate).and_raise('OH NOES') expect { MyProducer.publish('invalid' => 'key', :payload_key => 'key') }. - to raise_error(Avro::SchemaValidator::ValidationError) + to raise_error('OH NOES') Deimos.unsubscribe(subscriber) end @@ -198,25 +199,16 @@ def self.partition_key(payload) end it 'should encode the key' do - encoder = instance_double(Deimos::AvroDataEncoder) - allow(Deimos::Message).to receive(:new).and_wrap_original do |m, hash, producer| - message = m.call(hash, producer) - allow(message).to receive(:add_fields) - allow(message).to receive(:coerce_fields) - message - end - allow(MyProducer).to receive(:encoder).and_return(encoder).at_least(:once) - allow(encoder).to receive(:avro_schema) - expect(encoder).to receive(:encode_key).with('test_id', 'foo', 'my-topic-key') - expect(encoder).to receive(:encode_key).with('test_id', 'bar', 'my-topic-key') - expect(encoder).to receive(:encode).with({ - 'test_id' => 'foo', - 'some_int' => 123 - }, { topic: 'my-topic-value' }) - expect(encoder).to receive(:encode).with({ - 'test_id' => 'bar', - 'some_int' => 124 - }, { topic: 'my-topic-value' }) + expect(MyProducer.encoder).to receive(:encode_key).with('test_id', 'foo', topic: 'my-topic-key') + expect(MyProducer.encoder).to receive(:encode_key).with('test_id', 'bar', topic: 'my-topic-key') + expect(MyProducer.encoder).to receive(:encode).with({ + 'test_id' => 'foo', + 'some_int' => 123 + }, { topic: 'my-topic-value' }) + expect(MyProducer.encoder).to receive(:encode).with({ + 'test_id' => 'bar', + 'some_int' => 124 + }, { topic: 'my-topic-value' }) MyProducer.publish_list( [{ 'test_id' => 'foo', 'some_int' => 123 }, @@ -225,13 +217,7 @@ def self.partition_key(payload) end it 'should not encode with plaintext key' do - key_encoder = Deimos::AvroDataEncoder.new( - schema: 'MySchema', - namespace: 'com.my-namespace' - ) - allow(key_encoder).to receive(:encode) - allow(MyNonEncodedProducer).to receive(:encoder).and_return(key_encoder) - expect(key_encoder).not_to receive(:encode_key) + expect(MyNonEncodedProducer.key_encoder).not_to receive(:encode_key) MyNonEncodedProducer.publish_list( [{ 'test_id' => 'foo', 'some_int' => 123, :payload_key => 'foo_key' }, @@ -240,14 +226,10 @@ def self.partition_key(payload) end it 'should encode with a schema' do - - encoder = instance_double(Deimos::AvroDataEncoder) - expect(MySchemaProducer).to receive(:key_encoder).and_return(encoder). - at_least(:once) - expect(encoder).to receive(:encode).with({ 'test_id' => 'foo_key' }, - { topic: 'my-topic2-key' }) - expect(encoder).to receive(:encode).with({ 'test_id' => 'bar_key' }, - { topic: 'my-topic2-key' }) + expect(MySchemaProducer.key_encoder).to receive(:encode).with({ 'test_id' => 'foo_key' }, + { topic: 'my-topic2-key' }) + expect(MySchemaProducer.key_encoder).to receive(:encode).with({ 'test_id' => 'bar_key' }, + { topic: 'my-topic2-key' }) MySchemaProducer.publish_list( [{ 'test_id' => 'foo', 'some_int' => 123, diff --git a/spec/publish_backend_spec.rb b/spec/publish_backend_spec.rb index dba8c0be..e7988120 100644 --- a/spec/publish_backend_spec.rb +++ b/spec/publish_backend_spec.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -RSpec.describe Deimos::PublishBackend do +RSpec.describe Deimos::Backends::Base do include_context 'with publish_backend' it 'should call execute' do expect(described_class).to receive(:execute). diff --git a/spec/schema_backends/avro_base_shared.rb b/spec/schema_backends/avro_base_shared.rb new file mode 100644 index 00000000..73c06b53 --- /dev/null +++ b/spec/schema_backends/avro_base_shared.rb @@ -0,0 +1,174 @@ +# frozen_string_literal: true + +require 'deimos/schema_backends/avro_base' + +RSpec.shared_examples_for('an Avro backend') do + let(:backend) { described_class.new(schema: 'MySchema', namespace: 'com.my-namespace') } + + let(:full_schema) do + { + 'type' => 'record', + 'name' => 'schema1', + 'namespace' => 'com.my-namespace', + 'fields' => [ + { + 'name' => 'int-field', + 'type' => 'int' + }, + { + 'name' => 'long-field', + 'type' => 'long' + }, + { + 'name' => 'float-field', + 'type' => 'float' + }, + { + 'name' => 'double-field', + 'type' => 'double' + }, + { + 'name' => 'string-field', + 'type' => 'string' + }, + { + 'name' => 'boolean-field', + 'type' => 'boolean' + }, + { + 'name' => 'union-field', + 'type' => %w(null string) + }, + { + 'name' => 'union-int-field', + 'type' => %w(null int) + } + ] + } + end + + specify('#encode_key') do + expect(backend).to receive(:encode). + with({ 'test_id' => 1 }, { schema: 'MySchema_key', topic: 'topic' }).and_return('itsme') + expect(backend.encode_key('test_id', 1, topic: 'topic')).to eq('itsme') + expect(backend.schema_store.find('MySchema_key', 'com.my-namespace').to_avro). + to eq( + 'doc' => 'Key for com.my-namespace.MySchema', + 'fields' => [ + { 'name' => 'test_id', 'type' => 'string' } + ], + 'name' => 'MySchema_key', + 'namespace' => 'com.my-namespace', + 'type' => 'record' + ) + end + + specify('#decode_key') do + expect(backend).to receive(:decode). + with('payload', schema: 'MySchema_key'). + and_return('test_id' => 1) + expect(backend.decode_key('payload', 'test_id')).to eq(1) + end + + describe('#validate') do + it 'should pass valid schemas' do + expect { + backend.validate({ 'test_id' => 'hi', 'some_int' => 4 }, { schema: 'MySchema' }) + }.not_to raise_error + end + + it 'should fail invalid schemas' do + expect { + backend.validate({ 'test_id2' => 'hi', 'some_int' => 4 }, { schema: 'MySchema' }) + }.to raise_error(Avro::SchemaValidator::ValidationError) + end + + end + + describe '#coerce' do + let(:payload) do + { + 'int-field' => 1, + 'long-field' => 11_111_111_111_111_111_111, + 'float-field' => 1.0, + 'double-field' => 2.0, + 'string-field' => 'hi mom', + 'boolean-field' => true, + 'union-field' => nil, + 'union-int-field' => nil + } + end + + before(:each) do + backend.schema_store.add_schema(full_schema) + backend.schema = 'schema1' + end + + it 'should leave numbers as is' do + result = backend.coerce(payload) + expect(result['int-field']).to eq(1) + expect(result['long-field']).to eq(11_111_111_111_111_111_111) + expect(result['float-field']).to eq(1.0) + expect(result['double-field']).to eq(2.0) + expect(result['boolean-field']).to eq(true) + expect(result['union-field']).to eq(nil) + end + + it 'should coerce strings to numbers' do + result = backend.coerce(payload.merge( + 'int-field' => '1', + 'long-field' => '123', + 'float-field' => '1.1', + 'double-field' => '2.1' + )) + expect(result['int-field']).to eq(1) + expect(result['long-field']).to eq(123) + expect(result['float-field']).to eq(1.1) + expect(result['double-field']).to eq(2.1) + end + + it 'should coerce Time to number' do + result = backend.coerce(payload.merge('int-field' => Time.find_zone('UTC').local(2019, 5, 5))) + expect(result['int-field']).to eq(1_557_014_400) + end + + it 'should coerce symbols to string' do + result = backend.coerce(payload.merge('string-field' => :itsme)) + expect(result['string-field']).to eq('itsme') + end + + it 'should convert string-like things to string' do + stringy = Class.new do + # :nodoc: + def initialize(str) + @st = str + end + + # :nodoc: + def to_s + @st + end + + # :nodoc: + def to_str + @st + end + end + stub_const('Stringy', stringy) + result = backend.coerce(payload.merge('string-field' => Stringy.new('itsyou'))) + expect(result['string-field']).to eq('itsyou') + end + + it 'should convert null to false' do + result = backend.coerce(payload.merge('boolean-field' => nil)) + expect(result['boolean-field']).to eq(false) + end + + it 'should convert unions' do + result = backend.coerce(payload.merge('union-field' => :itsme)) + expect(result['union-field']).to eq('itsme') + end + + end + +end diff --git a/spec/schema_backends/avro_local_spec.rb b/spec/schema_backends/avro_local_spec.rb new file mode 100644 index 00000000..33f512b6 --- /dev/null +++ b/spec/schema_backends/avro_local_spec.rb @@ -0,0 +1,32 @@ +# frozen_string_literal: true + +require_relative 'avro_base_shared' +require 'deimos/schema_backends/avro_local' + +RSpec.describe Deimos::SchemaBackends::AvroLocal do + let(:payload) do + { + 'test_id' => 'some string', + 'some_int' => 3 + } + end + let(:backend) { described_class.new(schema: 'MySchema', namespace: 'com.my-namespace') } + + it_should_behave_like 'an Avro backend' + + it 'should encode and decode correctly' do + avro_turf = instance_double(AvroTurf) + expect(avro_turf).to receive(:encode). + with(payload, schema_name: 'MySchema', namespace: 'com.my-namespace'). + and_return('encoded-payload') + expect(avro_turf).to receive(:decode). + with('encoded-payload', schema_name: 'MySchema', namespace: 'com.my-namespace'). + and_return(payload) + allow(backend).to receive(:avro_turf).and_return(avro_turf) + results = backend.encode(payload) + expect(results).to eq('encoded-payload') + results = backend.decode(results) + expect(results).to eq(payload) + end + +end diff --git a/spec/schema_backends/avro_schema_registry_spec.rb b/spec/schema_backends/avro_schema_registry_spec.rb new file mode 100644 index 00000000..0aa4ae17 --- /dev/null +++ b/spec/schema_backends/avro_schema_registry_spec.rb @@ -0,0 +1,32 @@ +# frozen_string_literal: true + +require_relative 'avro_base_shared' +require 'deimos/schema_backends/avro_schema_registry' + +RSpec.describe Deimos::SchemaBackends::AvroSchemaRegistry do + let(:payload) do + { + 'test_id' => 'some string', + 'some_int' => 3 + } + end + let(:backend) { described_class.new(schema: 'MySchema', namespace: 'com.my-namespace') } + + it_should_behave_like 'an Avro backend' + + it 'should encode and decode correctly' do + avro_turf = instance_double(AvroTurf::Messaging) + expect(avro_turf).to receive(:encode). + with(payload, schema_name: 'MySchema', subject: 'topic'). + and_return('encoded-payload') + expect(avro_turf).to receive(:decode). + with('encoded-payload', schema_name: 'MySchema'). + and_return(payload) + allow(backend).to receive(:avro_turf_messaging).and_return(avro_turf) + results = backend.encode(payload, topic: 'topic') + expect(results).to eq('encoded-payload') + results = backend.decode(results) + expect(results).to eq(payload) + end + +end diff --git a/spec/schema_backends/avro_validation_spec.rb b/spec/schema_backends/avro_validation_spec.rb new file mode 100644 index 00000000..586ab602 --- /dev/null +++ b/spec/schema_backends/avro_validation_spec.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true + +require_relative 'avro_base_shared' +require 'deimos/schema_backends/avro_validation' + +RSpec.describe Deimos::SchemaBackends::AvroValidation do + let(:payload) do + { + 'test_id' => 'some string', + 'some_int' => 3 + } + end + let(:backend) { described_class.new(schema: 'MySchema', namespace: 'com.my-namespace') } + + it_should_behave_like 'an Avro backend' + + it 'should encode and decode correctly' do + results = backend.encode(payload) + expect(results).to eq(payload) + results = backend.decode(results) + expect(results).to eq(payload) + end + +end diff --git a/spec/schema_backends/base_spec.rb b/spec/schema_backends/base_spec.rb new file mode 100644 index 00000000..dcebe519 --- /dev/null +++ b/spec/schema_backends/base_spec.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +describe Deimos::SchemaBackends::Base do + let(:backend) { described_class.new(schema: 'schema', namespace: 'namespace') } + let(:payload) { { foo: 1 } } + + it 'should validate on encode' do + expect(backend).to receive(:validate).with(payload, schema: 'schema') + expect(backend).to receive(:encode_payload).with(payload, schema: 'schema', topic: 'topic') + backend.encode(payload, topic: 'topic') + end + + it 'should validate and encode a passed schema' do + expect(backend).to receive(:validate).with(payload, schema: 'schema2') + expect(backend).to receive(:encode_payload).with(payload, schema: 'schema2', topic: 'topic') + backend.encode(payload, schema: 'schema2', topic: 'topic') + end + + it 'should decode a schema' do + expect(backend).to receive(:decode_payload).with(payload, schema: 'schema') + backend.decode(payload) + end + + it 'should decode a passed schema' do + expect(backend).to receive(:decode_payload).with(payload, schema: 'schema2') + backend.decode(payload, schema: 'schema2') + end + +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 09a080ab..c3c5c0b0 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -146,6 +146,11 @@ def setup_db(options) setup_db(DbConfigs::DB_OPTIONS.last) end + config.mock_with(:rspec) do |mocks| + mocks.yield_receiver_to_any_instance_implementation_blocks = true + mocks.verify_partial_doubles = true + end + config.before(:each) do |ex| Deimos.config.reset! Deimos.configure do |deimos_config| @@ -156,6 +161,7 @@ def setup_db(options) deimos_config.kafka.seed_brokers = ENV['KAFKA_SEED_BROKER'] || 'localhost:9092' deimos_config.logger = Logger.new('/dev/null') deimos_config.logger.level = Logger::INFO + deimos_config.schema.backend = :avro_validation end stub_producers_and_consumers! unless ex.metadata[:integration] end diff --git a/spec/updateable_schema_store_spec.rb b/spec/updateable_schema_store_spec.rb deleted file mode 100644 index 877cbdec..00000000 --- a/spec/updateable_schema_store_spec.rb +++ /dev/null @@ -1,36 +0,0 @@ -# frozen_string_literal: true - -describe AvroTurf::SchemaStore do - - it 'should add an in-memory schema' do - schema_store = described_class.new(path: Deimos.config.schema.path) - schema_store.load_schemas! - found_schema = schema_store.find('MySchema', 'com.my-namespace').as_json - expect(found_schema['name']).to eq('MySchema') - expect(found_schema['namespace']).to eq('com.my-namespace') - expect(found_schema['fields'].size).to eq(2) - expect(found_schema['fields'][0]['type']['type_sym']).to eq('string') - expect(found_schema['fields'][0]['name']).to eq('test_id') - new_schema = { - 'namespace' => 'com.my-namespace', - 'name' => 'MyNewSchema', - 'type' => 'record', - 'doc' => 'Test schema', - 'fields' => [ - { - 'name' => 'my_id', - 'type' => 'int', - 'doc' => 'test int' - } - ] - } - schema_store.add_schema(new_schema) - found_schema = schema_store.find('MyNewSchema', 'com.my-namespace'). - as_json - expect(found_schema['name']).to eq('MyNewSchema') - expect(found_schema['namespace']).to eq('com.my-namespace') - expect(found_schema['fields'].size).to eq(1) - expect(found_schema['fields'][0]['type']['type_sym']).to eq('int') - expect(found_schema['fields'][0]['name']).to eq('my_id') - end -end