From f94c78a2830d5c629de73b257fddb045953d5e08 Mon Sep 17 00:00:00 2001
From: Daniel Orner
Date: Thu, 2 Jan 2020 10:19:49 -0500
Subject: [PATCH] Add schema backends
---
CHANGELOG.md | 6 +
Gemfile.lock | 10 +-
README.md | 44 +++-
deimos-ruby.gemspec | 4 +-
docs/CONFIGURATION.md | 1 +
docs/DATABASE_BACKEND.md | 2 +-
lib/deimos.rb | 27 ++-
lib/deimos/active_record_consumer.rb | 21 +-
lib/deimos/active_record_producer.rb | 4 +-
lib/deimos/avro_data_coder.rb | 89 ---------
lib/deimos/avro_data_decoder.rb | 36 ----
lib/deimos/avro_data_encoder.rb | 51 -----
lib/deimos/backends/base.rb | 32 +++
lib/deimos/backends/db.rb | 2 +-
lib/deimos/backends/kafka.rb | 2 +-
lib/deimos/backends/kafka_async.rb | 2 +-
lib/deimos/base_consumer.rb | 14 +-
lib/deimos/batch_consumer.rb | 1 -
lib/deimos/config/configuration.rb | 4 +
lib/deimos/consumer.rb | 2 -
lib/deimos/message.rb | 14 +-
lib/deimos/monkey_patches/schema_store.rb | 19 --
lib/deimos/producer.rb | 22 +-
lib/deimos/publish_backend.rb | 30 ---
lib/deimos/schema_backends/avro_base.rb | 108 ++++++++++
lib/deimos/schema_backends/avro_local.rb | 30 +++
.../avro_schema_coercer.rb} | 90 ++++-----
.../schema_backends/avro_schema_registry.rb | 34 ++++
lib/deimos/schema_backends/avro_validation.rb | 21 ++
lib/deimos/schema_backends/base.rb | 130 ++++++++++++
lib/deimos/schema_backends/mock.rb | 42 ++++
lib/deimos/test_helpers.rb | 189 +++---------------
spec/avro_data_decoder_spec.rb | 18 --
spec/avro_data_encoder_spec.rb | 37 ----
spec/batch_consumer_spec.rb | 8 -
spec/consumer_spec.rb | 1 -
spec/deimos_spec.rb | 4 -
spec/producer_spec.rb | 52 ++---
spec/publish_backend_spec.rb | 2 +-
spec/schema_backends/avro_base_shared.rb | 174 ++++++++++++++++
spec/schema_backends/avro_local_spec.rb | 32 +++
.../avro_schema_registry_spec.rb | 32 +++
spec/schema_backends/avro_validation_spec.rb | 24 +++
spec/schema_backends/base_spec.rb | 29 +++
spec/spec_helper.rb | 6 +
spec/updateable_schema_store_spec.rb | 36 ----
46 files changed, 894 insertions(+), 644 deletions(-)
delete mode 100644 lib/deimos/avro_data_coder.rb
delete mode 100644 lib/deimos/avro_data_decoder.rb
delete mode 100644 lib/deimos/avro_data_encoder.rb
create mode 100644 lib/deimos/backends/base.rb
delete mode 100644 lib/deimos/monkey_patches/schema_store.rb
delete mode 100644 lib/deimos/publish_backend.rb
create mode 100644 lib/deimos/schema_backends/avro_base.rb
create mode 100644 lib/deimos/schema_backends/avro_local.rb
rename lib/deimos/{schema_coercer.rb => schema_backends/avro_schema_coercer.rb} (80%)
create mode 100644 lib/deimos/schema_backends/avro_schema_registry.rb
create mode 100644 lib/deimos/schema_backends/avro_validation.rb
create mode 100644 lib/deimos/schema_backends/base.rb
create mode 100644 lib/deimos/schema_backends/mock.rb
delete mode 100644 spec/avro_data_decoder_spec.rb
delete mode 100644 spec/avro_data_encoder_spec.rb
create mode 100644 spec/schema_backends/avro_base_shared.rb
create mode 100644 spec/schema_backends/avro_local_spec.rb
create mode 100644 spec/schema_backends/avro_schema_registry_spec.rb
create mode 100644 spec/schema_backends/avro_validation_spec.rb
create mode 100644 spec/schema_backends/base_spec.rb
delete mode 100644 spec/updateable_schema_store_spec.rb
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f47836c1..2812df6a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## UNRELEASED
+- Added schema backends, which should simplify Avro encoding and make it
+ more flexible for unit tests and local development.
+- BREAKING CHANGE: Deimos no longer comes with `avro_turf` as a dependency.
+ You will need to include it if you are Avro-encoding or decoding your
+ messages.
+
# [1.4.0-beta7] - 2019-12-16
- Clone loggers when assigning to multiple levels.
diff --git a/Gemfile.lock b/Gemfile.lock
index 7164adcd..48849a8c 100644
--- a/Gemfile.lock
+++ b/Gemfile.lock
@@ -2,8 +2,6 @@ PATH
remote: .
specs:
deimos-ruby (1.4.0.pre.beta7)
- avro-patches (~> 0.3)
- avro_turf (~> 0.8)
phobos (~> 1.8.2.pre.beta2)
ruby-kafka (~> 0.7)
@@ -55,10 +53,8 @@ GEM
tzinfo (~> 1.1)
arel (9.0.0)
ast (2.4.0)
- avro (1.8.2)
+ avro (1.9.1)
multi_json
- avro-patches (0.4.1)
- avro (= 1.8.2)
avro_turf (0.11.0)
avro (>= 1.7.7, < 1.10)
excon (~> 0.45)
@@ -74,7 +70,7 @@ GEM
digest-crc (0.4.1)
dogstatsd-ruby (4.5.0)
erubi (1.9.0)
- excon (0.71.0)
+ excon (0.71.1)
exponential-backoff (0.0.4)
ffi (1.11.3)
formatador (0.2.5)
@@ -228,6 +224,8 @@ PLATFORMS
DEPENDENCIES
activerecord (~> 5.2)
activerecord-import
+ avro (~> 1.9)
+ avro_turf (~> 0.8)
bundler (~> 1)
ddtrace (~> 0.11)
deimos-ruby!
diff --git a/README.md b/README.md
index 87474662..bd9022e0 100644
--- a/README.md
+++ b/README.md
@@ -6,7 +6,7 @@
-A Ruby framework for marrying Kafka, Avro, and/or ActiveRecord and provide
+A Ruby framework for marrying Kafka, a schema definition like Avro, and/or ActiveRecord and provide
a useful toolbox of goodies for Ruby-based Kafka development.
Built on Phobos and hence Ruby-Kafka.
@@ -14,6 +14,7 @@ Built on Phobos and hence Ruby-Kafka.
* [Installation](#installation)
* [Versioning](#versioning)
* [Configuration](#configuration)
+ * [Schemas](#schemas)
* [Producers](#producers)
* [Auto-added Fields](#auto-added-fields)
* [Coerced Values](#coerced-values)
@@ -60,6 +61,27 @@ gem 'deimos-ruby', '~> 1.1'
For a full configuration reference, please see [the configuration docs ](docs/CONFIGURATION.md).
+# Schemas
+
+Deimos was originally written only supporting Avro encoding via a schema registry.
+This has since been expanded to a plugin architecture allowing messages to be
+encoded and decoded via any schema specification you wish.
+
+Currently we have the following possible schema backends:
+* Avro Local (use pure Avro)
+* Avro Schema Registry (use the Confluent Schema Registry)
+* Avro Validation (validate using an Avro schema but leave decoded - this is useful
+ for unit testing and development)
+* Mock (no actual encoding/decoding).
+
+Note that to use Avro-encoding, you must include the [avro_turf](https://github.com/dasch/avro_turf) gem in your
+Gemfile.
+
+Other possible schemas could include [Protobuf](https://developers.google.com/protocol-buffers), [JSONSchema](https://json-schema.org/), etc. Feel free to
+contribute!
+
+To create a new schema backend, please see the existing examples [here](lib/deimos/schema_backends).
+
# Producers
Producers will look like this:
@@ -137,7 +159,7 @@ produced by Phobos and RubyKafka):
* topic
* exception_object
* payloads - the unencoded payloads
-* `encode_messages` - sent when messages are being Avro-encoded.
+* `encode_messages` - sent when messages are being schema-encoded.
* producer - the class that produced the message
* topic
* payloads - the unencoded payloads
@@ -165,8 +187,8 @@ Similarly:
### Kafka Message Keys
Topics representing events rather than domain data don't need keys. However,
-best practice for domain messages is to Avro-encode message keys
-with a separate Avro schema.
+best practice for domain messages is to schema-encode message keys
+with a separate schema.
This enforced by requiring producers to define a `key_config` directive. If
any message comes in with a key, the producer will error out if `key_config` is
@@ -179,7 +201,7 @@ There are three possible configurations to use:
all your messages in a topic need to have a key, or they all need to have
no key. This is a good choice for events that aren't keyed - you can still
set a partition key.
-* `key_config plain: true` - this indicates that you are not using an Avro-encoded
+* `key_config plain: true` - this indicates that you are not using an encoded
key. Use this for legacy topics - new topics should not use this setting.
* `key_config schema: 'MyKeySchema-key'` - this tells the producer to look for
an existing key schema named `MyKeySchema-key` in the schema registry and to
@@ -234,8 +256,8 @@ like this:
```
If you publish a payload `{ "test_id" => "123", "some_int" => 123 }`, this
-will be turned into a key that looks like `{ "test_id" => "123"}` and encoded
-via Avro before being sent to Kafka.
+will be turned into a key that looks like `{ "test_id" => "123"}` and schema-encoded
+before being sent to Kafka.
If you are using `plain` or `schema` as your config, you will need to have a
special `payload_key` key to your payload hash. This will be extracted and
@@ -261,7 +283,7 @@ class MyConsumer < Deimos::Consumer
def consume(payload, metadata)
# Same method as Phobos consumers.
- # payload is an Avro-decoded hash.
+ # payload is an schema-decoded hash.
# metadata is a hash that contains information like :key and :topic.
# In general, your key should be included in the payload itself. However,
# if you need to access it separately from the payload, you can use
@@ -311,7 +333,7 @@ this sample:
class MyBatchConsumer < Deimos::BatchConsumer
def consume_batch(payloads, metadata)
- # payloads is an array of Avro-decoded hashes.
+ # payloads is an array of schema-decoded hashes.
# metadata is a hash that contains information like :keys and :topic.
# Keys are automatically decoded and available as an array with
# the same cardinality as the payloads. If you need to iterate
@@ -604,7 +626,7 @@ Also see [deimos.rb](lib/deimos.rb) under `Configure metrics` to see how the met
Deimos also includes some tracing for kafka consumers. It ships with
DataDog support, but you can add custom tracing providers as well.
-Trace spans are used for when incoming messages are avro decoded, and a
+Trace spans are used for when incoming messages are schema-decoded, and a
separate span for message consume logic.
### Configuring Tracing Providers
@@ -749,7 +771,7 @@ be
}
```
-Both payload and key will be Avro-decoded as necessary according to the
+Both payload and key will be schema-decoded as necessary according to the
key config.
You can also just pass an existing producer or consumer class into the method,
diff --git a/deimos-ruby.gemspec b/deimos-ruby.gemspec
index 11962c8a..208043cf 100644
--- a/deimos-ruby.gemspec
+++ b/deimos-ruby.gemspec
@@ -18,13 +18,13 @@ Gem::Specification.new do |spec|
spec.test_files = spec.files.grep(%r{^(test|spec|features)/})
spec.require_paths = ['lib']
- spec.add_runtime_dependency('avro-patches', '~> 0.3')
- spec.add_runtime_dependency('avro_turf', '~> 0.8')
spec.add_runtime_dependency('phobos', '~> 1.8.2.pre.beta2')
spec.add_runtime_dependency('ruby-kafka', '~> 0.7')
spec.add_development_dependency('activerecord', '~> 5.2')
spec.add_development_dependency('activerecord-import')
+ spec.add_development_dependency('avro', '~> 1.9')
+ spec.add_development_dependency('avro_turf', '~> 0.8')
spec.add_development_dependency('bundler', '~> 1')
spec.add_development_dependency('ddtrace', '~> 0.11')
spec.add_development_dependency('dogstatsd-ruby', '~> 4.2')
diff --git a/docs/CONFIGURATION.md b/docs/CONFIGURATION.md
index d55905a9..93e1aa58 100644
--- a/docs/CONFIGURATION.md
+++ b/docs/CONFIGURATION.md
@@ -144,6 +144,7 @@ producers.backend|`:kafka_async`|Currently can be set to `:db`, `:kafka`, or `:k
Config name|Default|Description
-----------|-------|-----------
+schema.backend|`:mock`|Backend representing the schema encoder/decoder. You can see a full list [here](../lib/deimos/schema_backends).
schema.registry_url|`http://localhost:8081`|URL of the Confluent schema registry.
schema.path|nil|Local path to find your schemas.
diff --git a/docs/DATABASE_BACKEND.md b/docs/DATABASE_BACKEND.md
index ae150432..b642fd2b 100644
--- a/docs/DATABASE_BACKEND.md
+++ b/docs/DATABASE_BACKEND.md
@@ -77,7 +77,7 @@ The database backend consists of three tables:
* `kafka_messages` - this keeps track of the messages that were "published",
including the payload, topic, key and partition key. These messages
- are *raw data* - all processing, including Avro encoding, must happen
+ are *raw data* - all processing, including schema-encoding, must happen
upstream before they are inserted.
* `kafka_topic_info` - this table is essentially a lock table used to ensure
that only one producer thread is ever "working" on a topic at a time.
diff --git a/lib/deimos.rb b/lib/deimos.rb
index 4d38bab8..13d99c94 100644
--- a/lib/deimos.rb
+++ b/lib/deimos.rb
@@ -1,12 +1,8 @@
# frozen_string_literal: true
-require 'avro-patches'
-require 'avro_turf'
require 'phobos'
require 'deimos/version'
require 'deimos/config/configuration'
-require 'deimos/avro_data_encoder'
-require 'deimos/avro_data_decoder'
require 'deimos/producer'
require 'deimos/active_record_producer'
require 'deimos/active_record_consumer'
@@ -15,16 +11,18 @@
require 'deimos/instrumentation'
require 'deimos/utils/lag_reporter'
-require 'deimos/publish_backend'
+require 'deimos/backends/base'
require 'deimos/backends/kafka'
require 'deimos/backends/kafka_async'
+require 'deimos/schema_backends/base'
+
require 'deimos/monkey_patches/ruby_kafka_heartbeat'
-require 'deimos/monkey_patches/schema_store'
require 'deimos/monkey_patches/phobos_producer'
require 'deimos/monkey_patches/phobos_cli'
require 'deimos/railtie' if defined?(Rails)
+
if defined?(ActiveRecord)
require 'deimos/kafka_source'
require 'deimos/kafka_topic_info'
@@ -33,6 +31,7 @@
require 'deimos/utils/executor.rb'
require 'deimos/utils/db_producer.rb'
end
+
require 'deimos/utils/inline_consumer'
require 'yaml'
require 'erb'
@@ -40,6 +39,22 @@
# Parent module.
module Deimos
class << self
+ # @return [Class < Deimos::SchemaBackends::Base]
+ def schema_backend_class
+ backend = Deimos.config.schema.backend.to_s
+
+ require "deimos/schema_backends/#{backend}"
+
+ "Deimos::SchemaBackends::#{backend.classify}".constantize
+ end
+
+ # @param schema [String|Symbol]
+ # @param namespace [String]
+ # @return [Deimos::SchemaBackends::Base]
+ def schema_backend(schema:, namespace:)
+ schema_backend_class.new(schema: schema, namespace: namespace)
+ end
+
# Start the DB producers to send Kafka messages.
# @param thread_count [Integer] the number of threads to start.
def start_db_backend!(thread_count: 1)
diff --git a/lib/deimos/active_record_consumer.rb b/lib/deimos/active_record_consumer.rb
index e2d218e5..413a06e4 100644
--- a/lib/deimos/active_record_consumer.rb
+++ b/lib/deimos/active_record_consumer.rb
@@ -68,31 +68,30 @@ def destroy_record(record)
def record_attributes(payload)
klass = self.class.config[:record_class]
attributes = {}
- schema = self.class.decoder.avro_schema
- schema.fields.each do |field|
+ self.class.decoder.schema_fields.each do |field|
column = klass.columns.find { |c| c.name == field.name }
next if column.nil?
next if %w(updated_at created_at).include?(field.name)
- attributes[field.name] = _coerce_field(field, column, payload[field.name])
+ attributes[field.name] = _coerce_field(column, payload[field.name])
end
attributes
end
private
- # @param field [Avro::Schema]
# @param column [ActiveRecord::ConnectionAdapters::Column]
# @param val [Object]
- def _coerce_field(field, column, val)
+ def _coerce_field(column, val)
return nil if val.nil?
- field_type = field.type.type.to_sym
- if field_type == :union
- union_types = field.type.schemas.map { |s| s.type.to_sym }
- field_type = union_types.find { |t| t != :null }
- end
- if column.type == :datetime && %i(int long).include?(field_type)
+ is_integer = begin
+ val.is_a?(Integer) || (val.is_a?(String) && Integer(val))
+ rescue StandardError
+ false
+ end
+
+ if column.type == :datetime && is_integer
return Time.zone.strptime(val.to_s, '%s')
end
diff --git a/lib/deimos/active_record_producer.rb b/lib/deimos/active_record_producer.rb
index b1af4f11..c149bc5b 100644
--- a/lib/deimos/active_record_producer.rb
+++ b/lib/deimos/active_record_producer.rb
@@ -53,10 +53,10 @@ def send_events(records, force_send: false)
# is not set.
# @return [Hash]
def generate_payload(attributes, _record)
- schema = self.encoder.avro_schema
+ fields = self.encoder.schema_fields
payload = attributes.stringify_keys
payload.delete_if do |k, _|
- k.to_sym != :payload_key && !schema.fields.find { |f| f.name == k }
+ k.to_sym != :payload_key && !fields.map(&:name).include?(k)
end
end
end
diff --git a/lib/deimos/avro_data_coder.rb b/lib/deimos/avro_data_coder.rb
deleted file mode 100644
index 91460f91..00000000
--- a/lib/deimos/avro_data_coder.rb
+++ /dev/null
@@ -1,89 +0,0 @@
-# frozen_string_literal: true
-
-module Deimos
- # Base class for the encoder / decoder classes.
- class AvroDataCoder
- attr_accessor :schema, :namespace, :config, :schema_store
-
- # @param schema [String]
- # @param namespace [String]
- # @param schema_store [AvroTurf::SchemaStore]
- def initialize(schema:, namespace:, schema_store: nil)
- @schema = schema
- @namespace = namespace
- @schema_store = schema_store ||
- AvroTurf::SchemaStore.new(path: Deimos.config.schema.path)
- end
-
- # @param schema [String]
- # @return [Avro::Schema]
- def avro_schema(schema=nil)
- schema ||= @schema
- @schema_store.find(schema, @namespace)
- end
-
- private
-
- # @return [AvroTurf]
- def avro_turf
- @avro_turf ||= AvroTurf.new(
- schemas_path: Deimos.config.schema.path,
- schema_store: @schema_store
- )
- @avro_turf
- end
-
- # @return [AvroTurf::Messaging]
- def avro_turf_messaging
- @avro_turf_messaging ||= AvroTurf::Messaging.new(
- schema_store: @schema_store,
- registry_url: Deimos.config.schema.registry_url,
- schemas_path: Deimos.config.schema.path,
- namespace: @namespace
- )
- end
-
- # Generate a key schema from the given value schema and key ID. This
- # is used when encoding or decoding keys from an existing value schema.
- # @param key_id [Symbol]
- # @return [Hash]
- def _generate_key_schema(key_id)
- return @key_schema if @key_schema
-
- value_schema = @schema_store.find(@schema, @namespace)
- key_field = value_schema.fields.find { |f| f.name == key_id.to_s }
- name = _key_schema_name(@schema)
- @key_schema = {
- 'type' => 'record',
- 'name' => name,
- 'namespace' => @namespace,
- 'doc' => "Key for #{@namespace}.#{@schema}",
- 'fields' => [
- {
- 'name' => key_id,
- 'type' => key_field.type.type_sym.to_s
- }
- ]
- }
- @schema_store.add_schema(@key_schema)
- @key_schema
- end
-
- # @param value_schema [Hash]
- # @return [String]
- def _field_name_from_schema(value_schema)
- raise "Schema #{@schema} not found!" if value_schema.nil?
- if value_schema['fields'].nil? || value_schema['fields'].empty?
- raise "Schema #{@schema} has no fields!"
- end
-
- value_schema['fields'][0]['name']
- end
-
- # @param schema [String]
- # @return [String]
- def _key_schema_name(schema)
- "#{schema.gsub('-value', '')}_key"
- end
- end
-end
diff --git a/lib/deimos/avro_data_decoder.rb b/lib/deimos/avro_data_decoder.rb
deleted file mode 100644
index 98ca6c62..00000000
--- a/lib/deimos/avro_data_decoder.rb
+++ /dev/null
@@ -1,36 +0,0 @@
-# frozen_string_literal: true
-
-require 'avro_turf/messaging'
-require 'deimos/avro_data_coder'
-
-module Deimos
- # Service Object to decode avro messages
- class AvroDataDecoder < AvroDataCoder
- # Decode some data.
- # @param payload [Hash|String]
- # @param schema [String]
- # @return [Hash]
- def decode(payload, schema: nil)
- schema ||= @schema
- avro_turf_messaging.decode(payload, schema_name: schema)
- end
-
- # Decode against a local schema.
- # @param payload [Hash]
- # @param schema [String]
- # @return [Hash]
- def decode_local(payload, schema: nil)
- schema ||= @schema
- avro_turf.decode(payload, schema_name: schema, namespace: @namespace)
- end
-
- # @param payload [String] the encoded key.
- # @param key_id [String|Symbol]
- # @return [Object] the decoded key (int/long/string).
- def decode_key(payload, key_id)
- key_schema = _generate_key_schema(key_id)
- field_name = _field_name_from_schema(key_schema)
- decode(payload, schema: key_schema['name'])[field_name]
- end
- end
-end
diff --git a/lib/deimos/avro_data_encoder.rb b/lib/deimos/avro_data_encoder.rb
deleted file mode 100644
index 57c01606..00000000
--- a/lib/deimos/avro_data_encoder.rb
+++ /dev/null
@@ -1,51 +0,0 @@
-# frozen_string_literal: true
-
-require 'avro_turf/messaging'
-require 'deimos/avro_data_coder'
-
-module Deimos
- # Service Object to decode Avro messages.
- class AvroDataEncoder < AvroDataCoder
- # @param payload [Hash]
- # @param schema [String]
- # @return [String]
- def encode_local(payload, schema: nil)
- schema ||= @schema
- Avro::SchemaValidator.validate!(avro_schema(schema), payload,
- recursive: true,
- fail_on_extra_fields: true)
- avro_turf.encode(payload, schema_name: schema, namespace: @namespace)
- rescue Avro::IO::AvroTypeError
- # throw a more detailed error
- value_schema = @schema_store.find(schema, @namespace)
- Avro::SchemaValidator.validate!(value_schema, payload)
- end
-
- # @param payload [Hash]
- # @param schema [String]
- # @param topic [String]
- # @return [String]
- def encode(payload, schema: nil, topic: nil)
- schema ||= @schema
- Avro::SchemaValidator.validate!(avro_schema(schema), payload,
- recursive: true,
- fail_on_extra_fields: true)
- avro_turf_messaging.encode(payload, schema_name: schema, subject: topic)
- rescue Avro::IO::AvroTypeError
- # throw a more detailed error
- schema = @schema_store.find(@schema, @namespace)
- Avro::SchemaValidator.validate!(schema, payload)
- end
-
- # @param key_id [Symbol|String]
- # @param key [Object]
- # @param topic [String]
- # @return [String] the encoded key.
- def encode_key(key_id, key, topic=nil)
- key_schema = _generate_key_schema(key_id)
- field_name = _field_name_from_schema(key_schema)
- payload = { field_name => key }
- encode(payload, schema: key_schema['name'], topic: topic)
- end
- end
-end
diff --git a/lib/deimos/backends/base.rb b/lib/deimos/backends/base.rb
new file mode 100644
index 00000000..e9c23ad2
--- /dev/null
+++ b/lib/deimos/backends/base.rb
@@ -0,0 +1,32 @@
+# frozen_string_literal: true
+
+module Deimos
+ module Backends
+ # Abstract class for all publish backends.
+ class Base
+ class << self
+ # @param producer_class [Class < Deimos::Producer]
+ # @param messages [Array]
+ def publish(producer_class:, messages:)
+ Deimos.config.logger.info(
+ message: 'Publishing messages',
+ topic: producer_class.topic,
+ payloads: messages.map do |message|
+ {
+ payload: message.payload,
+ key: message.key
+ }
+ end
+ )
+ execute(producer_class: producer_class, messages: messages)
+ end
+
+ # @param producer_class [Class < Deimos::Producer]
+ # @param messages [Array]
+ def execute(producer_class:, messages:)
+ raise NotImplementedError
+ end
+ end
+ end
+ end
+end
diff --git a/lib/deimos/backends/db.rb b/lib/deimos/backends/db.rb
index 42fc2dcf..70a92fbc 100644
--- a/lib/deimos/backends/db.rb
+++ b/lib/deimos/backends/db.rb
@@ -6,7 +6,7 @@ module Deimos
module Backends
# Backend which saves messages to the database instead of immediately
# sending them.
- class Db < Deimos::PublishBackend
+ class Db < Base
class << self
# :nodoc:
def execute(producer_class:, messages:)
diff --git a/lib/deimos/backends/kafka.rb b/lib/deimos/backends/kafka.rb
index e5aefaa9..6c5b8b56 100644
--- a/lib/deimos/backends/kafka.rb
+++ b/lib/deimos/backends/kafka.rb
@@ -3,7 +3,7 @@
module Deimos
module Backends
# Default backend to produce to Kafka.
- class Kafka < Deimos::PublishBackend
+ class Kafka < Base
include Phobos::Producer
# Shut down the producer if necessary.
diff --git a/lib/deimos/backends/kafka_async.rb b/lib/deimos/backends/kafka_async.rb
index f43847c4..0b8eb701 100644
--- a/lib/deimos/backends/kafka_async.rb
+++ b/lib/deimos/backends/kafka_async.rb
@@ -3,7 +3,7 @@
module Deimos
module Backends
# Backend which produces to Kafka via an async producer.
- class KafkaAsync < Deimos::PublishBackend
+ class KafkaAsync < Base
include Phobos::Producer
# Shut down the producer cleanly.
diff --git a/lib/deimos/base_consumer.rb b/lib/deimos/base_consumer.rb
index 7ae90d82..59ad8767 100644
--- a/lib/deimos/base_consumer.rb
+++ b/lib/deimos/base_consumer.rb
@@ -6,20 +6,20 @@ class BaseConsumer
include SharedConfig
class << self
- # @return [AvroDataEncoder]
+ # @return [Deimos::SchemaBackends::Base]
def decoder
- @decoder ||= AvroDataDecoder.new(schema: config[:schema],
- namespace: config[:namespace])
+ @decoder ||= Deimos.schema_backend(schema: config[:schema],
+ namespace: config[:namespace])
end
- # @return [AvroDataEncoder]
+ # @return [Deimos::SchemaBackends::Base]
def key_decoder
- @key_decoder ||= AvroDataDecoder.new(schema: config[:key_schema],
- namespace: config[:namespace])
+ @key_decoder ||= Deimos.schema_backend(schema: config[:key_schema],
+ namespace: config[:namespace])
end
end
- # Helper method to decode an Avro-encoded key.
+ # Helper method to decode an encoded key.
# @param key [String]
# @return [Object] the decoded key.
def decode_key(key)
diff --git a/lib/deimos/batch_consumer.rb b/lib/deimos/batch_consumer.rb
index 148038e2..e2dbc26b 100644
--- a/lib/deimos/batch_consumer.rb
+++ b/lib/deimos/batch_consumer.rb
@@ -1,6 +1,5 @@
# frozen_string_literal: true
-require 'deimos/avro_data_decoder'
require 'deimos/base_consumer'
require 'phobos/batch_handler'
diff --git a/lib/deimos/config/configuration.rb b/lib/deimos/config/configuration.rb
index a92f9932..b18c557d 100644
--- a/lib/deimos/config/configuration.rb
+++ b/lib/deimos/config/configuration.rb
@@ -251,6 +251,10 @@ def self.configure_producer_or_consumer(kafka_config)
end
setting :schema do
+
+ # Backend class to use when encoding/decoding messages.
+ setting :backend, :mock
+
# URL of the Confluent schema registry.
# @return [String]
setting :registry_url, 'http://localhost:8081'
diff --git a/lib/deimos/consumer.rb b/lib/deimos/consumer.rb
index 4327b887..c84496fa 100644
--- a/lib/deimos/consumer.rb
+++ b/lib/deimos/consumer.rb
@@ -1,11 +1,9 @@
# frozen_string_literal: true
-require 'deimos/avro_data_decoder'
require 'deimos/base_consumer'
require 'deimos/shared_config'
require 'phobos/handler'
require 'active_support/all'
-require 'ddtrace'
# Class to consume messages coming from the pipeline topic
# Note: According to the docs, instances of your handler will be created
diff --git a/lib/deimos/message.rb b/lib/deimos/message.rb
index 65247e2d..a07b7179 100644
--- a/lib/deimos/message.rb
+++ b/lib/deimos/message.rb
@@ -18,23 +18,23 @@ def initialize(payload, producer, topic: nil, key: nil, partition_key: nil)
# Add message_id and timestamp default values if they are in the
# schema and don't already have values.
- # @param schema [Avro::Schema]
- def add_fields(schema)
+ # @param fields [Array] existing fields in the schema.
+ def add_fields(fields)
return if @payload.except(:payload_key, :partition_key).blank?
- if schema.fields.any? { |f| f.name == 'message_id' }
+ if fields.include?('message_id')
@payload['message_id'] ||= SecureRandom.uuid
end
- if schema.fields.any? { |f| f.name == 'timestamp' }
+ if fields.include?('timestamp')
@payload['timestamp'] ||= Time.now.in_time_zone.to_s
end
end
- # @param schema [Avro::Schema]
- def coerce_fields(schema)
+ # @param encoder [Deimos::SchemaBackends::Base]
+ def coerce_fields(encoder)
return if payload.nil?
- @payload = SchemaCoercer.new(schema).coerce(@payload)
+ @payload = encoder.coerce(@payload)
end
# @return [Hash]
diff --git a/lib/deimos/monkey_patches/schema_store.rb b/lib/deimos/monkey_patches/schema_store.rb
deleted file mode 100644
index ca37fc58..00000000
--- a/lib/deimos/monkey_patches/schema_store.rb
+++ /dev/null
@@ -1,19 +0,0 @@
-# frozen_string_literal: true
-
-require 'avro_turf/schema_store'
-
-# Allows us to add in-memory schemas to the schema store in
-# addition to the ones stored in the file system.
-class AvroTurf::SchemaStore
- attr_accessor :schemas
-
- # @param schema_hash [Hash]
- def add_schema(schema_hash)
- name = schema_hash['name']
- namespace = schema_hash['namespace']
- full_name = Avro::Name.make_fullname(name, namespace)
- return if @schemas.key?(full_name)
-
- Avro::Schema.real_parse(schema_hash, @schemas)
- end
-end
diff --git a/lib/deimos/producer.rb b/lib/deimos/producer.rb
index e1213d92..854ebd79 100644
--- a/lib/deimos/producer.rb
+++ b/lib/deimos/producer.rb
@@ -1,9 +1,7 @@
# frozen_string_literal: true
-require 'deimos/avro_data_encoder'
require 'deimos/message'
require 'deimos/shared_config'
-require 'deimos/schema_coercer'
require 'phobos/producer'
require 'active_support/notifications'
@@ -143,23 +141,23 @@ def produce_batch(backend, batch)
backend.publish(producer_class: self, messages: batch)
end
- # @return [AvroDataEncoder]
+ # @return [Deimos::SchemaBackends::Base]
def encoder
- @encoder ||= AvroDataEncoder.new(schema: config[:schema],
- namespace: config[:namespace])
+ @encoder ||= Deimos.schema_backend(schema: config[:schema],
+ namespace: config[:namespace])
end
- # @return [AvroDataEncoder]
+ # @return [Deimos::SchemaBackends::Base]
def key_encoder
- @key_encoder ||= AvroDataEncoder.new(schema: config[:key_schema],
- namespace: config[:namespace])
+ @key_encoder ||= Deimos.schema_backend(schema: config[:key_schema],
+ namespace: config[:namespace])
end
# Override this in active record producers to add
# non-schema fields to check for updates
# @return [Array] fields to check for updates
def watched_attributes
- self.encoder.avro_schema.fields.map(&:name)
+ self.encoder.schema_fields.map(&:name)
end
private
@@ -169,13 +167,13 @@ def _process_message(message)
# this violates the Law of Demeter but it has to happen in a very
# specific order and requires a bunch of methods on the producer
# to work correctly.
- message.add_fields(encoder.avro_schema)
+ message.add_fields(encoder.schema_fields.map(&:name))
message.partition_key = self.partition_key(message.payload)
message.key = _retrieve_key(message.payload)
# need to do this before _coerce_fields because that might result
# in an empty payload which is an *error* whereas this is intended.
message.payload = nil if message.payload.blank?
- message.coerce_fields(encoder.avro_schema)
+ message.coerce_fields(encoder)
message.encoded_key = _encode_key(message.key)
message.topic = self.topic
message.encoded_payload = if message.payload.nil?
@@ -200,7 +198,7 @@ def _encode_key(key)
end
if config[:key_field]
- encoder.encode_key(config[:key_field], key, "#{config[:topic]}-key")
+ encoder.encode_key(config[:key_field], key, topic: "#{config[:topic]}-key")
elsif config[:key_schema]
key_encoder.encode(key, topic: "#{config[:topic]}-key")
else
diff --git a/lib/deimos/publish_backend.rb b/lib/deimos/publish_backend.rb
deleted file mode 100644
index 4ea8893a..00000000
--- a/lib/deimos/publish_backend.rb
+++ /dev/null
@@ -1,30 +0,0 @@
-# frozen_string_literal: true
-
-module Deimos
- # Abstract class for all publish backends.
- class PublishBackend
- class << self
- # @param producer_class [Class < Deimos::Producer]
- # @param messages [Array]
- def publish(producer_class:, messages:)
- Deimos.config.logger.info(
- message: 'Publishing messages',
- topic: producer_class.topic,
- payloads: messages.map do |message|
- {
- payload: message.payload,
- key: message.key
- }
- end
- )
- execute(producer_class: producer_class, messages: messages)
- end
-
- # @param producer_class [Class < Deimos::Producer]
- # @param messages [Array]
- def execute(producer_class:, messages:)
- raise NotImplementedError
- end
- end
- end
-end
diff --git a/lib/deimos/schema_backends/avro_base.rb b/lib/deimos/schema_backends/avro_base.rb
new file mode 100644
index 00000000..b48703bb
--- /dev/null
+++ b/lib/deimos/schema_backends/avro_base.rb
@@ -0,0 +1,108 @@
+# frozen_string_literal: true
+
+require_relative 'base'
+require 'avro'
+require 'avro_turf'
+require 'avro_turf/mutable_schema_store'
+require_relative 'avro_schema_coercer'
+
+module Deimos
+ module SchemaBackends
+ # Encode / decode using Avro, either locally or via schema registry.
+ class AvroBase < Base
+ attr_accessor :schema_store
+
+ # @override
+ def initialize(schema:, namespace:)
+ super(schema: schema, namespace: namespace)
+ @schema_store = AvroTurf::MutableSchemaStore.new(path: Deimos.config.schema.path)
+ end
+
+ # @override
+ def encode_key(key_id, key, topic: nil)
+ @key_schema ||= _generate_key_schema(key_id)
+ field_name = _field_name_from_schema(@key_schema)
+ payload = { field_name => key }
+ encode(payload, schema: @key_schema['name'], topic: topic)
+ end
+
+ # @override
+ def decode_key(payload, key_id)
+ @key_schema ||= _generate_key_schema(key_id)
+ field_name = _field_name_from_schema(@key_schema)
+ decode(payload, schema: @key_schema['name'])[field_name]
+ end
+
+ # @override
+ def coerce_field(field, value)
+ AvroSchemaCoercer.new(avro_schema).coerce_type(field.type, value)
+ end
+
+ # @override
+ def schema_fields
+ avro_schema.fields.map { |field| SchemaField.new(field.name, field.type) }
+ end
+
+ # @override
+ def validate(payload, schema:)
+ Avro::SchemaValidator.validate!(avro_schema(schema), payload,
+ recursive: true,
+ fail_on_extra_fields: true)
+ end
+
+ # @override
+ def self.mock_backend
+ :avro_validation
+ end
+
+ private
+
+ # @param schema [String]
+ # @return [Avro::Schema]
+ def avro_schema(schema=nil)
+ schema ||= @schema
+ @schema_store.find(schema, @namespace)
+ end
+
+ # Generate a key schema from the given value schema and key ID. This
+ # is used when encoding or decoding keys from an existing value schema.
+ # @param key_id [Symbol]
+ # @return [Hash]
+ def _generate_key_schema(key_id)
+ key_field = avro_schema.fields.find { |f| f.name == key_id.to_s }
+ name = _key_schema_name(@schema)
+ key_schema = {
+ 'type' => 'record',
+ 'name' => name,
+ 'namespace' => @namespace,
+ 'doc' => "Key for #{@namespace}.#{@schema}",
+ 'fields' => [
+ {
+ 'name' => key_id,
+ 'type' => key_field.type.type_sym.to_s
+ }
+ ]
+ }
+ @schema_store.add_schema(key_schema)
+ key_schema
+ end
+
+ # @param value_schema [Hash]
+ # @return [String]
+ def _field_name_from_schema(value_schema)
+ raise "Schema #{@schema} not found!" if value_schema.nil?
+ if value_schema['fields'].nil? || value_schema['fields'].empty?
+ raise "Schema #{@schema} has no fields!"
+ end
+
+ value_schema['fields'][0]['name']
+ end
+
+ # @param schema [String]
+ # @return [String]
+ def _key_schema_name(schema)
+ "#{schema.gsub('-value', '')}_key"
+ end
+ end
+ end
+end
diff --git a/lib/deimos/schema_backends/avro_local.rb b/lib/deimos/schema_backends/avro_local.rb
new file mode 100644
index 00000000..596421f4
--- /dev/null
+++ b/lib/deimos/schema_backends/avro_local.rb
@@ -0,0 +1,30 @@
+# frozen_string_literal: true
+
+require_relative 'avro_base'
+
+module Deimos
+ module SchemaBackends
+ # Encode / decode using local Avro encoding.
+ class AvroLocal < AvroBase
+ # @override
+ def decode_payload(payload, schema:)
+ avro_turf.decode(payload, schema_name: schema, namespace: @namespace)
+ end
+
+ # @override
+ def encode_payload(payload, schema: nil, topic: nil)
+ avro_turf.encode(payload, schema_name: schema, namespace: @namespace)
+ end
+
+ private
+
+ # @return [AvroTurf]
+ def avro_turf
+ @avro_turf ||= AvroTurf.new(
+ schemas_path: Deimos.config.schema.path,
+ schema_store: @schema_store
+ )
+ end
+ end
+ end
+end
diff --git a/lib/deimos/schema_coercer.rb b/lib/deimos/schema_backends/avro_schema_coercer.rb
similarity index 80%
rename from lib/deimos/schema_coercer.rb
rename to lib/deimos/schema_backends/avro_schema_coercer.rb
index a616c1aa..523a9bb3 100644
--- a/lib/deimos/schema_coercer.rb
+++ b/lib/deimos/schema_backends/avro_schema_coercer.rb
@@ -1,66 +1,20 @@
# frozen_string_literal: true
+require 'active_support/time'
+
module Deimos
# Class to coerce values in a payload to match a schema.
- class SchemaCoercer
+ class AvroSchemaCoercer
# @param schema [Avro::Schema]
def initialize(schema)
@schema = schema
end
- # @param payload [Hash]
- # @return [HashWithIndifferentAccess]
- def coerce(payload)
- result = {}
- @schema.fields.each do |field|
- name = field.name
- next unless payload.key?(name)
-
- val = payload[name]
- result[name] = _coerce_type(field.type, val)
- end
- result.with_indifferent_access
- end
-
- private
-
- # @param val [String]
- # @return [Boolean]
- def _is_integer_string?(val)
- return false unless val.is_a?(String)
-
- begin
- true if Integer(val)
- rescue StandardError
- false
- end
- end
-
- # @param val [String]
- # @return [Boolean]
- def _is_float_string?(val)
- return false unless val.is_a?(String)
-
- begin
- true if Float(val)
- rescue StandardError
- false
- end
- end
-
- # @param val [Object]
- # @return [Boolean]
- def _is_to_s_defined?(val)
- return false if val.nil?
-
- Object.instance_method(:to_s).bind(val).call != val.to_s
- end
-
# @param type [Symbol]
# @param val [Object]
# @return [Object]
- def _coerce_type(type, val)
- int_classes = [Time, DateTime, ActiveSupport::TimeWithZone]
+ def coerce_type(type, val)
+ int_classes = [Time, ActiveSupport::TimeWithZone]
field_type = type.type.to_sym
if field_type == :union
union_types = type.schemas.map { |s| s.type.to_sym }
@@ -104,5 +58,39 @@ def _coerce_type(type, val)
val
end
end
+
+ private
+
+ # @param val [String]
+ # @return [Boolean]
+ def _is_integer_string?(val)
+ return false unless val.is_a?(String)
+
+ begin
+ true if Integer(val)
+ rescue StandardError
+ false
+ end
+ end
+
+ # @param val [String]
+ # @return [Boolean]
+ def _is_float_string?(val)
+ return false unless val.is_a?(String)
+
+ begin
+ true if Float(val)
+ rescue StandardError
+ false
+ end
+ end
+
+ # @param val [Object]
+ # @return [Boolean]
+ def _is_to_s_defined?(val)
+ return false if val.nil?
+
+ Object.instance_method(:to_s).bind(val).call != val.to_s
+ end
end
end
diff --git a/lib/deimos/schema_backends/avro_schema_registry.rb b/lib/deimos/schema_backends/avro_schema_registry.rb
new file mode 100644
index 00000000..1ae4b839
--- /dev/null
+++ b/lib/deimos/schema_backends/avro_schema_registry.rb
@@ -0,0 +1,34 @@
+# frozen_string_literal: true
+
+require_relative 'avro_base'
+require_relative 'avro_validation'
+require 'avro_turf/messaging'
+
+module Deimos
+ module SchemaBackends
+ # Encode / decode using the Avro schema registry.
+ class AvroSchemaRegistry < AvroBase
+ # @override
+ def decode_payload(payload, schema:)
+ avro_turf_messaging.decode(payload, schema_name: schema)
+ end
+
+ # @override
+ def encode_payload(payload, schema: nil, topic: nil)
+ avro_turf_messaging.encode(payload, schema_name: schema, subject: topic)
+ end
+
+ private
+
+ # @return [AvroTurf::Messaging]
+ def avro_turf_messaging
+ @avro_turf_messaging ||= AvroTurf::Messaging.new(
+ schema_store: @schema_store,
+ registry_url: Deimos.config.schema.registry_url,
+ schemas_path: Deimos.config.schema.path,
+ namespace: @namespace
+ )
+ end
+ end
+ end
+end
diff --git a/lib/deimos/schema_backends/avro_validation.rb b/lib/deimos/schema_backends/avro_validation.rb
new file mode 100644
index 00000000..2842d23f
--- /dev/null
+++ b/lib/deimos/schema_backends/avro_validation.rb
@@ -0,0 +1,21 @@
+# frozen_string_literal: true
+
+require_relative 'avro_base'
+
+module Deimos
+ module SchemaBackends
+ # Leave Ruby hashes as is but validate them against the schema.
+ # Useful for unit tests.
+ class AvroValidation < AvroBase
+ # @override
+ def decode_payload(payload, schema: nil)
+ payload.with_indifferent_access
+ end
+
+ # @override
+ def encode_payload(payload, schema: nil, topic: nil)
+ payload.with_indifferent_access
+ end
+ end
+ end
+end
diff --git a/lib/deimos/schema_backends/base.rb b/lib/deimos/schema_backends/base.rb
new file mode 100644
index 00000000..a2b4da75
--- /dev/null
+++ b/lib/deimos/schema_backends/base.rb
@@ -0,0 +1,130 @@
+# frozen_string_literal: true
+
+module Deimos
+ # Represents a field in the schema.
+ class SchemaField
+ attr_accessor :name, :type
+
+ # @param name [String]
+ # @param type [Object]
+ def initialize(name, type)
+ @name = name
+ @type = type
+ end
+ end
+
+ module SchemaBackends
+ # Base class for encoding / decoding.
+ class Base
+ attr_accessor :schema, :namespace, :key_schema
+
+ # @param schema [String|Symbol]
+ # @param namespace [String]
+ def initialize(schema:, namespace: nil)
+ @schema = schema
+ @namespace = namespace
+ end
+
+ # Encode a payload with a schema. Public method.
+ # @param payload [Hash]
+ # @param schema [Symbol|String]
+ # @param topic [String]
+ # @return [String]
+ def encode(payload, schema: nil, topic: nil)
+ validate(payload, schema: schema || @schema)
+ encode_payload(payload, schema: schema || @schema, topic: topic)
+ end
+
+ # Decode a payload with a schema. Public method.
+ # @param payload [String]
+ # @param schema [Symbol|String]
+ # @return [Hash]
+ def decode(payload, schema: nil)
+ decode_payload(payload, schema: schema || @schema)
+ end
+
+ # Given a hash, coerce its types to our schema. To be defined by subclass.
+ # @param payload [Hash]
+ # @return [Hash]
+ def coerce(payload)
+ result = {}
+ self.schema_fields.each do |field|
+ name = field.name
+ next unless payload.key?(name)
+
+ val = payload[name]
+ result[name] = coerce_field(field, val)
+ end
+ result.with_indifferent_access
+ end
+
+ # Indicate a class which should act as a mocked version of this backend.
+ # This class should perform all validations but not actually do any
+ # encoding.
+ # Note that the "mock" version (e.g. avro_validation) should return
+ # its own symbol when this is called, since it may be called multiple
+ # times depending on the order of RSpec helpers.
+ # @return [Symbol]
+ def self.mock_backend
+ :mock
+ end
+
+ # Encode a payload. To be defined by subclass.
+ # @param payload [Hash]
+ # @param schema [Symbol|String]
+ # @param topic [String]
+ # @return [String]
+ def encode_payload(_payload, schema:, topic: nil)
+ raise NotImplementedError
+ end
+
+ # Decode a payload. To be defined by subclass.
+ # @param payload [String]
+ # @param schema [String|Symbol]
+ # @return [Hash]
+ def decode_payload(_payload, schema:)
+ raise NotImplementedError
+ end
+
+ # Validate that a payload matches the schema. To be defined by subclass.
+ # @param payload [Hash]
+ # @param schema [String|Symbol]
+ def validate(_payload, schema:)
+ raise NotImplementedError
+ end
+
+ # List of field names belonging to the schema. To be defined by subclass.
+ # @return [Array]
+ def schema_fields
+ raise NotImplementedError
+ end
+
+ # Given a value and a field definition (as defined by whatever the
+ # underlying schema library is), coerce the given value to
+ # the given field type.
+ # @param field [SchemaField]
+ # @param value [Object]
+ # @return [Object]
+ def coerce_field(_field, _value)
+ raise NotImplementedError
+ end
+
+ # Encode a message key. To be defined by subclass.
+ # @param key [String] the value to use as the key.
+ # @param key_id [Symbol|String] the field name of the key.
+ # @param topic [String]
+ # @return [String]
+ def encode_key(_key, _key_id, topic: nil)
+ raise NotImplementedError
+ end
+
+ # Decode a message key. To be defined by subclass.
+ # @param payload [Hash] the message itself.
+ # @param key_id [Symbol|String] the field in the message to decode.
+ # @return [String]
+ def decode_key(_payload, _key_id)
+ raise NotImplementedError
+ end
+ end
+ end
+end
diff --git a/lib/deimos/schema_backends/mock.rb b/lib/deimos/schema_backends/mock.rb
new file mode 100644
index 00000000..63cc34c3
--- /dev/null
+++ b/lib/deimos/schema_backends/mock.rb
@@ -0,0 +1,42 @@
+# frozen_string_literal: true
+
+module Deimos
+ module SchemaBackends
+ # Mock implementation of a schema backend that does no encoding or validation.
+ class Mock < Base
+ # @override
+ def decode_payload(payload, schema:)
+ payload.is_a?(String) ? 'payload-decoded' : payload.map { |k, v| [k, "decoded-#{v}"] }
+ end
+
+ # @override
+ def encode_payload(payload, schema:, topic: nil)
+ payload.is_a?(String) ? 'payload-encoded' : payload.map { |k, v| [k, "encoded-#{v}"] }
+ end
+
+ # @override
+ def validate(_payload, schema:)
+ end
+
+ # @override
+ def schema_fields
+ []
+ end
+
+ # @override
+ def coerce_field(_type, value)
+ value
+ end
+
+ # @override
+ def encode_key(key_id, key)
+ { key_id => key }
+ end
+
+ # @override
+ def decode_key(payload, key_id)
+ payload[key_id]
+ end
+ end
+ end
+end
diff --git a/lib/deimos/test_helpers.rb b/lib/deimos/test_helpers.rb
index 8c710ecd..57433ef0 100644
--- a/lib/deimos/test_helpers.rb
+++ b/lib/deimos/test_helpers.rb
@@ -2,7 +2,6 @@
require 'active_support/concern'
require 'active_support/core_ext'
-require 'avro_turf'
require 'deimos/tracing/mock'
require 'deimos/metrics/mock'
@@ -21,68 +20,19 @@ def sent_messages
end
included do
- # @param encoder_schema [String]
- # @param namespace [String]
- # @return [Deimos::AvroDataEncoder]
- def create_encoder(encoder_schema, namespace)
- encoder = Deimos::AvroDataEncoder.new(schema: encoder_schema,
- namespace: namespace)
-
- # we added and_wrap_original to RSpec 2 but the regular block
- # syntax wasn't working for some reason - block wasn't being passed
- # to the method
- block = proc do |m, *args|
- m.call(*args)
- args[0]
- end
- allow(encoder).to receive(:encode_local).and_wrap_original(&block)
- allow(encoder).to receive(:encode) do |payload, schema: nil, topic: nil|
- encoder.encode_local(payload, schema: schema)
- end
-
- block = proc do |m, *args|
- m.call(*args)&.values&.first
- end
- allow(encoder).to receive(:encode_key).and_wrap_original(&block)
- encoder
- end
-
- # @param decoder_schema [String]
- # @param namespace [String]
- # @return [Deimos::AvroDataDecoder]
- def create_decoder(decoder_schema, namespace)
- decoder = Deimos::AvroDataDecoder.new(schema: decoder_schema,
- namespace: namespace)
- allow(decoder).to receive(:decode_local) { |payload| payload }
- allow(decoder).to receive(:decode) do |payload, schema: nil|
- schema ||= decoder.schema
- if schema && decoder.namespace
- # Validate against local schema.
- encoder = Deimos::AvroDataEncoder.new(schema: schema,
- namespace: decoder.namespace)
- encoder.schema_store = decoder.schema_store
- payload = payload.respond_to?(:stringify_keys) ? payload.stringify_keys : payload
- encoder.encode_local(payload)
- end
- payload
- end
- allow(decoder).to receive(:decode_key) do |payload, _key_id|
- payload.values.first
- end
- decoder
- end
RSpec.configure do |config|
config.before(:suite) do
- Deimos.configure do |fr_config|
- fr_config.logger = Logger.new(STDOUT)
- fr_config.consumers.reraise_errors = true
- fr_config.kafka.seed_brokers ||= ['test_broker']
+ Deimos.configure do |d_config|
+ d_config.logger = Logger.new(STDOUT)
+ d_config.consumers.reraise_errors = true
+ d_config.kafka.seed_brokers ||= ['test_broker']
+ d_config.schema.backend = Deimos.schema_backend_class.mock_backend
end
end
-
end
+
before(:each) do
client = double('client').as_null_object
allow(client).to receive(:time) do |*_args, &block|
@@ -98,55 +48,21 @@ def stub_producers_and_consumers!
allow(Deimos::Producer).to receive(:produce_batch) do |_, batch|
Deimos::TestHelpers.sent_messages.concat(batch.map(&:to_h))
end
-
- Deimos::Producer.descendants.each do |klass|
- next if klass == Deimos::ActiveRecordProducer # "abstract" class
-
- stub_producer(klass)
- end
-
- Deimos::Consumer.descendants.each do |klass|
- # TODO: remove this when ActiveRecordConsumer uses batching
- next if klass == Deimos::ActiveRecordConsumer # "abstract" class
-
- stub_consumer(klass)
- end
-
- Deimos::BatchConsumer.descendants.each do |klass|
- next if klass == Deimos::ActiveRecordConsumer # "abstract" class
-
- stub_batch_consumer(klass)
- end
end
- # Stub a given producer class.
- # @param klass [Class < Deimos::Producer]
- def stub_producer(klass)
- allow(klass).to receive(:encoder) do
- create_encoder(klass.config[:schema], klass.config[:namespace])
- end
- allow(klass).to receive(:key_encoder) do
- create_encoder(klass.config[:key_schema], klass.config[:namespace])
- end
+ # :nodoc:
+ def stub_producer(_klass)
+ warn('Stubbing producers is no longer necessary and this method will be removed in 3.0')
end
- # Stub a given consumer class.
- # @param klass [Class < Deimos::Consumer]
- def stub_consumer(klass)
- _stub_base_consumer(klass)
- klass.class_eval do
- alias_method(:old_consume, :consume) unless self.instance_methods.include?(:old_consume)
- end
- allow_any_instance_of(klass).to receive(:consume) do |instance, payload, metadata|
- metadata[:key] = klass.new.decode_key(metadata[:key]) if klass.config[:key_configured]
- instance.old_consume(payload, metadata)
- end
+ # :nodoc:
+ def stub_consumer(_klass)
+ warn('Stubbing consumers is no longer necessary and this method will be removed in 3.0')
end
- # Stub a given batch consumer class.
- # @param klass [Class < Deimos::BatchConsumer]
- def stub_batch_consumer(klass)
- _stub_base_consumer(klass)
+ # :nodoc:
+ def stub_batch_consumer(_klass)
+ warn('Stubbing batch consumers is no longer necessary and this method will be removed in 3.0')
end
# get the difference of 2 hashes.
@@ -248,7 +164,7 @@ def was_message_sent?(message, topic, key=nil)
end
# Test that a given handler will consume a given payload correctly, i.e.
- # that the Avro schema is correct. If
+ # that the schema is correct. If
# a block is given, that block will be executed when `consume` is called.
# Otherwise it will just confirm that `consume` is called at all.
# @param handler_class_or_topic [Class|String] Class which inherits from
@@ -300,33 +216,18 @@ def test_consume_message(handler_class_or_topic,
).send(:process_message, payload)
end
- # Check to see that a given message will fail due to Avro errors.
+ # Check to see that a given message will fail due to validation errors.
# @param handler_class [Class]
# @param payload [Hash]
def test_consume_invalid_message(handler_class, payload)
- handler = handler_class.new
- allow(handler_class).to receive(:new).and_return(handler)
- listener = double('listener',
- handler_class: handler_class,
- encoding: nil)
- message = double('message',
- key: _key_from_consumer(handler_class),
- partition_key: nil,
- partition: 1,
- offset: 1,
- value: payload)
-
expect {
- Phobos::Actions::ProcessMessage.new(
- listener: listener,
- message: message,
- listener_metadata: { topic: 'my-topic' }
- ).send(:process_message, payload)
- }.to raise_error(Avro::SchemaValidator::ValidationError)
+ handler_class.decoder.validate(payload,
+ schema: handler_class.decoder.schema)
+ }.to raise_error
end
# Test that a given handler will consume a given batch payload correctly,
- # i.e. that the Avro schema is correct. If
+ # i.e. that the schema is correct. If
# a block is given, that block will be executed when `consume` is called.
# Otherwise it will just confirm that `consume` is called at all.
# @param handler_class_or_topic [Class|String] Class which inherits from
@@ -384,7 +285,7 @@ def test_consume_batch(handler_class_or_topic,
action.send(:execute)
end
- # Check to see that a given message will fail due to Avro errors.
+ # Check to see that a given message will fail due to validation errors.
# @param handler_class [Class]
# @param payloads [Array]
def test_consume_batch_invalid_message(handler_class, payloads)
@@ -418,33 +319,21 @@ def test_consume_batch_invalid_message(handler_class, payloads)
allow(action).to receive(:handle_error) { |e| raise e }
expect { action.send(:execute) }.
- to raise_error(Avro::SchemaValidator::ValidationError)
- end
-
- # @param schema1 [String|Hash] a file path, JSON string, or
- # hash representing a schema.
- # @param schema2 [String|Hash] a file path, JSON string, or
- # hash representing a schema.
- # @return [Boolean] true if the schemas are compatible, false otherwise.
- def self.schemas_compatible?(schema1, schema2)
- json1, json2 = [schema1, schema2].map do |schema|
- if schema.is_a?(String)
- schema = File.read(schema) unless schema.strip.starts_with?('{') # file path
- MultiJson.load(schema)
- else
- schema
- end
- end
- avro_schema1 = Avro::Schema.real_parse(json1, {})
- avro_schema2 = Avro::Schema.real_parse(json2, {})
- Avro::SchemaCompatibility.mutual_read?(avro_schema1, avro_schema2)
+ to raise_error
end
private
def _key_from_consumer(consumer)
- if consumer.config[:key_field] || consumer.config[:key_schema]
- { 'test' => 1 }
+ if consumer.config[:key_field]
+ { consumer.config[:key_field] => 1 }
+ elsif consumer.config[:key_schema]
+ backend = consumer.decoder
+ old_schema = backend.schema
+ backend.schema = consumer.config[:key_schema]
+ key = backend.schema_fields.map { |field| [field.name, 1] }.to_h
+ backend.schema = old_schema
+ key
elsif consumer.config[:no_keys]
nil
else
@@ -461,19 +350,5 @@ def _get_handler_class_from_topic(topic)
handler.handler.constantize
end
-
- # Stub shared methods between consumers/batch consumers
- # @param [Class < Deimos::BaseConsumer] klass Consumer class to stub
- def _stub_base_consumer(klass)
- allow(klass).to receive(:decoder) do
- create_decoder(klass.config[:schema], klass.config[:namespace])
- end
-
- if klass.config[:key_schema] # rubocop:disable Style/GuardClause
- allow(klass).to receive(:key_decoder) do
- create_decoder(klass.config[:key_schema], klass.config[:namespace])
- end
- end
- end
end
end
diff --git a/spec/avro_data_decoder_spec.rb b/spec/avro_data_decoder_spec.rb
deleted file mode 100644
index 9aac804e..00000000
--- a/spec/avro_data_decoder_spec.rb
+++ /dev/null
@@ -1,18 +0,0 @@
-# frozen_string_literal: true
-
-describe Deimos::AvroDataDecoder do
-
- let(:decoder) do
- decoder = described_class.new(schema: 'MySchema',
- namespace: 'com.my-namespace')
- allow(decoder).to(receive(:decode)) { |payload| payload }
- decoder
- end
-
- it 'should decode a key' do
- # reset stub from TestHelpers
- allow(described_class).to receive(:new).and_call_original
- expect(decoder.decode_key({ 'test_id' => '123' }, 'test_id')).to eq('123')
- end
-
-end
diff --git a/spec/avro_data_encoder_spec.rb b/spec/avro_data_encoder_spec.rb
deleted file mode 100644
index ec330ee6..00000000
--- a/spec/avro_data_encoder_spec.rb
+++ /dev/null
@@ -1,37 +0,0 @@
-# frozen_string_literal: true
-
-require 'avro_turf/messaging'
-
-describe Deimos::AvroDataEncoder do
-
- let(:encoder) do
- encoder = described_class.new(schema: 'MySchema',
- namespace: 'com.my-namespace')
- allow(encoder).to(receive(:encode)) { |payload| payload }
- encoder
- end
-
- specify 'generate_key_schema' do
- expect_any_instance_of(AvroTurf::SchemaStore).
- to receive(:add_schema).with(
- 'type' => 'record',
- 'name' => 'MySchema_key',
- 'namespace' => 'com.my-namespace',
- 'doc' => 'Key for com.my-namespace.MySchema',
- 'fields' => [
- {
- 'name' => 'test_id',
- 'type' => 'string'
- }
- ]
- )
- encoder.send(:_generate_key_schema, 'test_id')
- end
-
- it 'should encode a key' do
- # reset stub from TestHelpers
- allow(described_class).to receive(:new).and_call_original
- expect(encoder.encode_key('test_id', '123')).to eq('test_id' => '123')
- end
-
-end
diff --git a/spec/batch_consumer_spec.rb b/spec/batch_consumer_spec.rb
index fba86e02..4e3f2657 100644
--- a/spec/batch_consumer_spec.rb
+++ b/spec/batch_consumer_spec.rb
@@ -80,11 +80,6 @@ def consume_batch(_payloads, _metadata)
end
it 'should decode payloads for all messages in the batch' do
- expect_any_instance_of(Deimos::AvroDataDecoder).
- to receive(:decode).with(batch[0])
- expect_any_instance_of(Deimos::AvroDataDecoder).
- to receive(:decode).with(batch[1])
-
test_consume_batch('my_batch_consume_topic', batch) do |received, _metadata|
# Mock decoder simply returns the payload
expect(received).to eq(batch)
@@ -110,7 +105,6 @@ def consume_batch(_payloads, _metadata)
key_config plain: true
end
stub_const('ConsumerTest::MyBatchConsumer', consumer_class)
- stub_batch_consumer(consumer_class)
test_consume_batch('my_batch_consume_topic', batch, keys: [1, 2]) do |_received, metadata|
expect(metadata[:keys]).to eq([1, 2])
@@ -132,7 +126,6 @@ def consume_batch(_payloads, _metadata)
end
end
stub_const('ConsumerTest::MyBatchConsumer', consumer_class)
- stub_batch_consumer(consumer_class)
allow(Deimos.config.metrics).to receive(:histogram)
end
@@ -215,7 +208,6 @@ def consume_batch(_payloads, _metadata)
end
end
stub_const('ConsumerTest::MyBatchConsumer', consumer_class)
- stub_batch_consumer(consumer_class)
allow(Deimos.config.metrics).to receive(:histogram)
end
diff --git a/spec/consumer_spec.rb b/spec/consumer_spec.rb
index c9eab7aa..357257d6 100644
--- a/spec/consumer_spec.rb
+++ b/spec/consumer_spec.rb
@@ -150,7 +150,6 @@ def consume(_payload, _metadata)
end
end
stub_const('ConsumerTest::MyConsumer', consumer_class)
- stub_consumer(consumer_class)
end
it 'should consume a message' do
diff --git a/spec/deimos_spec.rb b/spec/deimos_spec.rb
index 11834e66..af942fd2 100644
--- a/spec/deimos_spec.rb
+++ b/spec/deimos_spec.rb
@@ -67,10 +67,6 @@
end
describe '#start_db_backend!' do
- before(:each) do
- allow(described_class).to receive(:run_db_backend)
- end
-
it 'should start if backend is db and thread_count is > 0' do
signal_handler = instance_double(Deimos::Utils::SignalHandler)
allow(signal_handler).to receive(:run!)
diff --git a/spec/producer_spec.rb b/spec/producer_spec.rb
index 36e980a1..ad3b3524 100644
--- a/spec/producer_spec.rb
+++ b/spec/producer_spec.rb
@@ -62,8 +62,9 @@ def self.partition_key(payload)
subscriber = Deimos.subscribe('produce') do |event|
expect(event.payload[:payloads]).to eq([{ 'invalid' => 'key' }])
end
+ expect(MyProducer.encoder).to receive(:validate).and_raise('OH NOES')
expect { MyProducer.publish('invalid' => 'key', :payload_key => 'key') }.
- to raise_error(Avro::SchemaValidator::ValidationError)
+ to raise_error('OH NOES')
Deimos.unsubscribe(subscriber)
end
@@ -198,25 +199,16 @@ def self.partition_key(payload)
end
it 'should encode the key' do
- encoder = instance_double(Deimos::AvroDataEncoder)
- allow(Deimos::Message).to receive(:new).and_wrap_original do |m, hash, producer|
- message = m.call(hash, producer)
- allow(message).to receive(:add_fields)
- allow(message).to receive(:coerce_fields)
- message
- end
- allow(MyProducer).to receive(:encoder).and_return(encoder).at_least(:once)
- allow(encoder).to receive(:avro_schema)
- expect(encoder).to receive(:encode_key).with('test_id', 'foo', 'my-topic-key')
- expect(encoder).to receive(:encode_key).with('test_id', 'bar', 'my-topic-key')
- expect(encoder).to receive(:encode).with({
- 'test_id' => 'foo',
- 'some_int' => 123
- }, { topic: 'my-topic-value' })
- expect(encoder).to receive(:encode).with({
- 'test_id' => 'bar',
- 'some_int' => 124
- }, { topic: 'my-topic-value' })
+ expect(MyProducer.encoder).to receive(:encode_key).with('test_id', 'foo', topic: 'my-topic-key')
+ expect(MyProducer.encoder).to receive(:encode_key).with('test_id', 'bar', topic: 'my-topic-key')
+ expect(MyProducer.encoder).to receive(:encode).with({
+ 'test_id' => 'foo',
+ 'some_int' => 123
+ }, { topic: 'my-topic-value' })
+ expect(MyProducer.encoder).to receive(:encode).with({
+ 'test_id' => 'bar',
+ 'some_int' => 124
+ }, { topic: 'my-topic-value' })
MyProducer.publish_list(
[{ 'test_id' => 'foo', 'some_int' => 123 },
@@ -225,13 +217,7 @@ def self.partition_key(payload)
end
it 'should not encode with plaintext key' do
- key_encoder = Deimos::AvroDataEncoder.new(
- schema: 'MySchema',
- namespace: 'com.my-namespace'
- )
- allow(key_encoder).to receive(:encode)
- allow(MyNonEncodedProducer).to receive(:encoder).and_return(key_encoder)
- expect(key_encoder).not_to receive(:encode_key)
+ expect(MyNonEncodedProducer.key_encoder).not_to receive(:encode_key)
MyNonEncodedProducer.publish_list(
[{ 'test_id' => 'foo', 'some_int' => 123, :payload_key => 'foo_key' },
@@ -240,14 +226,10 @@ def self.partition_key(payload)
end
it 'should encode with a schema' do
-
- encoder = instance_double(Deimos::AvroDataEncoder)
- expect(MySchemaProducer).to receive(:key_encoder).and_return(encoder).
- at_least(:once)
- expect(encoder).to receive(:encode).with({ 'test_id' => 'foo_key' },
- { topic: 'my-topic2-key' })
- expect(encoder).to receive(:encode).with({ 'test_id' => 'bar_key' },
- { topic: 'my-topic2-key' })
+ expect(MySchemaProducer.key_encoder).to receive(:encode).with({ 'test_id' => 'foo_key' },
+ { topic: 'my-topic2-key' })
+ expect(MySchemaProducer.key_encoder).to receive(:encode).with({ 'test_id' => 'bar_key' },
+ { topic: 'my-topic2-key' })
MySchemaProducer.publish_list(
[{ 'test_id' => 'foo', 'some_int' => 123,
diff --git a/spec/publish_backend_spec.rb b/spec/publish_backend_spec.rb
index dba8c0be..e7988120 100644
--- a/spec/publish_backend_spec.rb
+++ b/spec/publish_backend_spec.rb
@@ -1,6 +1,6 @@
# frozen_string_literal: true
-RSpec.describe Deimos::PublishBackend do
+RSpec.describe Deimos::Backends::Base do
include_context 'with publish_backend'
it 'should call execute' do
expect(described_class).to receive(:execute).
diff --git a/spec/schema_backends/avro_base_shared.rb b/spec/schema_backends/avro_base_shared.rb
new file mode 100644
index 00000000..73c06b53
--- /dev/null
+++ b/spec/schema_backends/avro_base_shared.rb
@@ -0,0 +1,174 @@
+# frozen_string_literal: true
+
+require 'deimos/schema_backends/avro_base'
+
+RSpec.shared_examples_for('an Avro backend') do
+ let(:backend) { described_class.new(schema: 'MySchema', namespace: 'com.my-namespace') }
+
+ let(:full_schema) do
+ {
+ 'type' => 'record',
+ 'name' => 'schema1',
+ 'namespace' => 'com.my-namespace',
+ 'fields' => [
+ {
+ 'name' => 'int-field',
+ 'type' => 'int'
+ },
+ {
+ 'name' => 'long-field',
+ 'type' => 'long'
+ },
+ {
+ 'name' => 'float-field',
+ 'type' => 'float'
+ },
+ {
+ 'name' => 'double-field',
+ 'type' => 'double'
+ },
+ {
+ 'name' => 'string-field',
+ 'type' => 'string'
+ },
+ {
+ 'name' => 'boolean-field',
+ 'type' => 'boolean'
+ },
+ {
+ 'name' => 'union-field',
+ 'type' => %w(null string)
+ },
+ {
+ 'name' => 'union-int-field',
+ 'type' => %w(null int)
+ }
+ ]
+ }
+ end
+
+ specify('#encode_key') do
+ expect(backend).to receive(:encode).
+ with({ 'test_id' => 1 }, { schema: 'MySchema_key', topic: 'topic' }).and_return('itsme')
+ expect(backend.encode_key('test_id', 1, topic: 'topic')).to eq('itsme')
+ expect(backend.schema_store.find('MySchema_key', 'com.my-namespace').to_avro).
+ to eq(
+ 'doc' => 'Key for com.my-namespace.MySchema',
+ 'fields' => [
+ { 'name' => 'test_id', 'type' => 'string' }
+ ],
+ 'name' => 'MySchema_key',
+ 'namespace' => 'com.my-namespace',
+ 'type' => 'record'
+ )
+ end
+
+ specify('#decode_key') do
+ expect(backend).to receive(:decode).
+ with('payload', schema: 'MySchema_key').
+ and_return('test_id' => 1)
+ expect(backend.decode_key('payload', 'test_id')).to eq(1)
+ end
+
+ describe('#validate') do
+ it 'should pass valid schemas' do
+ expect {
+ backend.validate({ 'test_id' => 'hi', 'some_int' => 4 }, { schema: 'MySchema' })
+ }.not_to raise_error
+ end
+
+ it 'should fail invalid schemas' do
+ expect {
+ backend.validate({ 'test_id2' => 'hi', 'some_int' => 4 }, { schema: 'MySchema' })
+ }.to raise_error(Avro::SchemaValidator::ValidationError)
+ end
+
+ end
+
+ describe '#coerce' do
+ let(:payload) do
+ {
+ 'int-field' => 1,
+ 'long-field' => 11_111_111_111_111_111_111,
+ 'float-field' => 1.0,
+ 'double-field' => 2.0,
+ 'string-field' => 'hi mom',
+ 'boolean-field' => true,
+ 'union-field' => nil,
+ 'union-int-field' => nil
+ }
+ end
+
+ before(:each) do
+ backend.schema_store.add_schema(full_schema)
+ backend.schema = 'schema1'
+ end
+
+ it 'should leave numbers as is' do
+ result = backend.coerce(payload)
+ expect(result['int-field']).to eq(1)
+ expect(result['long-field']).to eq(11_111_111_111_111_111_111)
+ expect(result['float-field']).to eq(1.0)
+ expect(result['double-field']).to eq(2.0)
+ expect(result['boolean-field']).to eq(true)
+ expect(result['union-field']).to eq(nil)
+ end
+
+ it 'should coerce strings to numbers' do
+ result = backend.coerce(payload.merge(
+ 'int-field' => '1',
+ 'long-field' => '123',
+ 'float-field' => '1.1',
+ 'double-field' => '2.1'
+ ))
+ expect(result['int-field']).to eq(1)
+ expect(result['long-field']).to eq(123)
+ expect(result['float-field']).to eq(1.1)
+ expect(result['double-field']).to eq(2.1)
+ end
+
+ it 'should coerce Time to number' do
+ result = backend.coerce(payload.merge('int-field' => Time.find_zone('UTC').local(2019, 5, 5)))
+ expect(result['int-field']).to eq(1_557_014_400)
+ end
+
+ it 'should coerce symbols to string' do
+ result = backend.coerce(payload.merge('string-field' => :itsme))
+ expect(result['string-field']).to eq('itsme')
+ end
+
+ it 'should convert string-like things to string' do
+ stringy = Class.new do
+ # :nodoc:
+ def initialize(str)
+ @st = str
+ end
+
+ # :nodoc:
+ def to_s
+ @st
+ end
+
+ # :nodoc:
+ def to_str
+ @st
+ end
+ end
+ stub_const('Stringy', stringy)
+ result = backend.coerce(payload.merge('string-field' => Stringy.new('itsyou')))
+ expect(result['string-field']).to eq('itsyou')
+ end
+
+ it 'should convert null to false' do
+ result = backend.coerce(payload.merge('boolean-field' => nil))
+ expect(result['boolean-field']).to eq(false)
+ end
+
+ it 'should convert unions' do
+ result = backend.coerce(payload.merge('union-field' => :itsme))
+ expect(result['union-field']).to eq('itsme')
+ end
+
+ end
+
+end
diff --git a/spec/schema_backends/avro_local_spec.rb b/spec/schema_backends/avro_local_spec.rb
new file mode 100644
index 00000000..33f512b6
--- /dev/null
+++ b/spec/schema_backends/avro_local_spec.rb
@@ -0,0 +1,32 @@
+# frozen_string_literal: true
+
+require_relative 'avro_base_shared'
+require 'deimos/schema_backends/avro_local'
+
+RSpec.describe Deimos::SchemaBackends::AvroLocal do
+ let(:payload) do
+ {
+ 'test_id' => 'some string',
+ 'some_int' => 3
+ }
+ end
+ let(:backend) { described_class.new(schema: 'MySchema', namespace: 'com.my-namespace') }
+
+ it_should_behave_like 'an Avro backend'
+
+ it 'should encode and decode correctly' do
+ avro_turf = instance_double(AvroTurf)
+ expect(avro_turf).to receive(:encode).
+ with(payload, schema_name: 'MySchema', namespace: 'com.my-namespace').
+ and_return('encoded-payload')
+ expect(avro_turf).to receive(:decode).
+ with('encoded-payload', schema_name: 'MySchema', namespace: 'com.my-namespace').
+ and_return(payload)
+ allow(backend).to receive(:avro_turf).and_return(avro_turf)
+ results = backend.encode(payload)
+ expect(results).to eq('encoded-payload')
+ results = backend.decode(results)
+ expect(results).to eq(payload)
+ end
+
+end
diff --git a/spec/schema_backends/avro_schema_registry_spec.rb b/spec/schema_backends/avro_schema_registry_spec.rb
new file mode 100644
index 00000000..0aa4ae17
--- /dev/null
+++ b/spec/schema_backends/avro_schema_registry_spec.rb
@@ -0,0 +1,32 @@
+# frozen_string_literal: true
+
+require_relative 'avro_base_shared'
+require 'deimos/schema_backends/avro_schema_registry'
+
+RSpec.describe Deimos::SchemaBackends::AvroSchemaRegistry do
+ let(:payload) do
+ {
+ 'test_id' => 'some string',
+ 'some_int' => 3
+ }
+ end
+ let(:backend) { described_class.new(schema: 'MySchema', namespace: 'com.my-namespace') }
+
+ it_should_behave_like 'an Avro backend'
+
+ it 'should encode and decode correctly' do
+ avro_turf = instance_double(AvroTurf::Messaging)
+ expect(avro_turf).to receive(:encode).
+ with(payload, schema_name: 'MySchema', subject: 'topic').
+ and_return('encoded-payload')
+ expect(avro_turf).to receive(:decode).
+ with('encoded-payload', schema_name: 'MySchema').
+ and_return(payload)
+ allow(backend).to receive(:avro_turf_messaging).and_return(avro_turf)
+ results = backend.encode(payload, topic: 'topic')
+ expect(results).to eq('encoded-payload')
+ results = backend.decode(results)
+ expect(results).to eq(payload)
+ end
+
+end
diff --git a/spec/schema_backends/avro_validation_spec.rb b/spec/schema_backends/avro_validation_spec.rb
new file mode 100644
index 00000000..586ab602
--- /dev/null
+++ b/spec/schema_backends/avro_validation_spec.rb
@@ -0,0 +1,24 @@
+# frozen_string_literal: true
+
+require_relative 'avro_base_shared'
+require 'deimos/schema_backends/avro_validation'
+
+RSpec.describe Deimos::SchemaBackends::AvroValidation do
+ let(:payload) do
+ {
+ 'test_id' => 'some string',
+ 'some_int' => 3
+ }
+ end
+ let(:backend) { described_class.new(schema: 'MySchema', namespace: 'com.my-namespace') }
+
+ it_should_behave_like 'an Avro backend'
+
+ it 'should encode and decode correctly' do
+ results = backend.encode(payload)
+ expect(results).to eq(payload)
+ results = backend.decode(results)
+ expect(results).to eq(payload)
+ end
+
+end
diff --git a/spec/schema_backends/base_spec.rb b/spec/schema_backends/base_spec.rb
new file mode 100644
index 00000000..dcebe519
--- /dev/null
+++ b/spec/schema_backends/base_spec.rb
@@ -0,0 +1,29 @@
+# frozen_string_literal: true
+
+describe Deimos::SchemaBackends::Base do
+ let(:backend) { described_class.new(schema: 'schema', namespace: 'namespace') }
+ let(:payload) { { foo: 1 } }
+
+ it 'should validate on encode' do
+ expect(backend).to receive(:validate).with(payload, schema: 'schema')
+ expect(backend).to receive(:encode_payload).with(payload, schema: 'schema', topic: 'topic')
+ backend.encode(payload, topic: 'topic')
+ end
+
+ it 'should validate and encode a passed schema' do
+ expect(backend).to receive(:validate).with(payload, schema: 'schema2')
+ expect(backend).to receive(:encode_payload).with(payload, schema: 'schema2', topic: 'topic')
+ backend.encode(payload, schema: 'schema2', topic: 'topic')
+ end
+
+ it 'should decode a schema' do
+ expect(backend).to receive(:decode_payload).with(payload, schema: 'schema')
+ backend.decode(payload)
+ end
+
+ it 'should decode a passed schema' do
+ expect(backend).to receive(:decode_payload).with(payload, schema: 'schema2')
+ backend.decode(payload, schema: 'schema2')
+ end
+
+end
diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb
index 09a080ab..c3c5c0b0 100644
--- a/spec/spec_helper.rb
+++ b/spec/spec_helper.rb
@@ -146,6 +146,11 @@ def setup_db(options)
setup_db(DbConfigs::DB_OPTIONS.last)
end
+ config.mock_with(:rspec) do |mocks|
+ mocks.yield_receiver_to_any_instance_implementation_blocks = true
+ mocks.verify_partial_doubles = true
+ end
+
config.before(:each) do |ex|
Deimos.config.reset!
Deimos.configure do |deimos_config|
@@ -156,6 +161,7 @@ def setup_db(options)
deimos_config.kafka.seed_brokers = ENV['KAFKA_SEED_BROKER'] || 'localhost:9092'
deimos_config.logger = Logger.new('/dev/null')
deimos_config.logger.level = Logger::INFO
+ deimos_config.schema.backend = :avro_validation
end
stub_producers_and_consumers! unless ex.metadata[:integration]
end
diff --git a/spec/updateable_schema_store_spec.rb b/spec/updateable_schema_store_spec.rb
deleted file mode 100644
index 877cbdec..00000000
--- a/spec/updateable_schema_store_spec.rb
+++ /dev/null
@@ -1,36 +0,0 @@
-# frozen_string_literal: true
-
-describe AvroTurf::SchemaStore do
-
- it 'should add an in-memory schema' do
- schema_store = described_class.new(path: Deimos.config.schema.path)
- schema_store.load_schemas!
- found_schema = schema_store.find('MySchema', 'com.my-namespace').as_json
- expect(found_schema['name']).to eq('MySchema')
- expect(found_schema['namespace']).to eq('com.my-namespace')
- expect(found_schema['fields'].size).to eq(2)
- expect(found_schema['fields'][0]['type']['type_sym']).to eq('string')
- expect(found_schema['fields'][0]['name']).to eq('test_id')
- new_schema = {
- 'namespace' => 'com.my-namespace',
- 'name' => 'MyNewSchema',
- 'type' => 'record',
- 'doc' => 'Test schema',
- 'fields' => [
- {
- 'name' => 'my_id',
- 'type' => 'int',
- 'doc' => 'test int'
- }
- ]
- }
- schema_store.add_schema(new_schema)
- found_schema = schema_store.find('MyNewSchema', 'com.my-namespace').
- as_json
- expect(found_schema['name']).to eq('MyNewSchema')
- expect(found_schema['namespace']).to eq('com.my-namespace')
- expect(found_schema['fields'].size).to eq(1)
- expect(found_schema['fields'][0]['type']['type_sym']).to eq('int')
- expect(found_schema['fields'][0]['name']).to eq('my_id')
- end
-end