Skip to content

Commit

Permalink
Add schema backends
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Orner committed Jan 13, 2020
1 parent 66cc0bc commit b28c10a
Show file tree
Hide file tree
Showing 46 changed files with 898 additions and 647 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## UNRELEASED

- Added schema backends, which should simplify Avro encoding and make it
more flexible for unit tests and local development.
- BREAKING CHANGE: Deimos no longer comes with `avro_turf` as a dependency.
You will need to include it if you are Avro-encoding or decoding your
messages.

- Add `:test` producer backend which replaces the existing TestHelpers
functionality of writing messages to an in-memory hash.

Expand Down
10 changes: 4 additions & 6 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ PATH
remote: .
specs:
deimos-ruby (1.4.0.pre.beta7)
avro-patches (~> 0.3)
avro_turf (~> 0.8)
phobos (~> 1.8.2.pre.beta2)
ruby-kafka (~> 0.7)

Expand Down Expand Up @@ -55,10 +53,8 @@ GEM
tzinfo (~> 1.1)
arel (9.0.0)
ast (2.4.0)
avro (1.8.2)
avro (1.9.1)
multi_json
avro-patches (0.4.1)
avro (= 1.8.2)
avro_turf (0.11.0)
avro (>= 1.7.7, < 1.10)
excon (~> 0.45)
Expand All @@ -74,7 +70,7 @@ GEM
digest-crc (0.4.1)
dogstatsd-ruby (4.5.0)
erubi (1.9.0)
excon (0.71.0)
excon (0.71.1)
exponential-backoff (0.0.4)
ffi (1.11.3)
formatador (0.2.5)
Expand Down Expand Up @@ -228,6 +224,8 @@ PLATFORMS
DEPENDENCIES
activerecord (~> 5.2)
activerecord-import
avro (~> 1.9)
avro_turf (~> 0.8)
bundler (~> 1)
ddtrace (~> 0.11)
deimos-ruby!
Expand Down
44 changes: 33 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
<img src="https://img.shields.io/codeclimate/maintainability/flipp-oss/deimos.svg"/>
</p>

A Ruby framework for marrying Kafka, Avro, and/or ActiveRecord and provide
A Ruby framework for marrying Kafka, a schema definition like Avro, and/or ActiveRecord and provide
a useful toolbox of goodies for Ruby-based Kafka development.
Built on Phobos and hence Ruby-Kafka.

<!--ts-->
* [Installation](#installation)
* [Versioning](#versioning)
* [Configuration](#configuration)
* [Schemas](#schemas)
* [Producers](#producers)
* [Auto-added Fields](#auto-added-fields)
* [Coerced Values](#coerced-values)
Expand Down Expand Up @@ -60,6 +61,27 @@ gem 'deimos-ruby', '~> 1.1'

For a full configuration reference, please see [the configuration docs ](docs/CONFIGURATION.md).

# Schemas

Deimos was originally written only supporting Avro encoding via a schema registry.
This has since been expanded to a plugin architecture allowing messages to be
encoded and decoded via any schema specification you wish.

Currently we have the following possible schema backends:
* Avro Local (use pure Avro)
* Avro Schema Registry (use the Confluent Schema Registry)
* Avro Validation (validate using an Avro schema but leave decoded - this is useful
for unit testing and development)
* Mock (no actual encoding/decoding).

Note that to use Avro-encoding, you must include the [avro_turf](https://github.com/dasch/avro_turf) gem in your
Gemfile.

Other possible schemas could include [Protobuf](https://developers.google.com/protocol-buffers), [JSONSchema](https://json-schema.org/), etc. Feel free to
contribute!

To create a new schema backend, please see the existing examples [here](lib/deimos/schema_backends).

# Producers

Producers will look like this:
Expand Down Expand Up @@ -137,7 +159,7 @@ produced by Phobos and RubyKafka):
* topic
* exception_object
* payloads - the unencoded payloads
* `encode_messages` - sent when messages are being Avro-encoded.
* `encode_messages` - sent when messages are being schema-encoded.
* producer - the class that produced the message
* topic
* payloads - the unencoded payloads
Expand Down Expand Up @@ -165,8 +187,8 @@ Similarly:
### Kafka Message Keys

Topics representing events rather than domain data don't need keys. However,
best practice for domain messages is to Avro-encode message keys
with a separate Avro schema.
best practice for domain messages is to schema-encode message keys
with a separate schema.

This enforced by requiring producers to define a `key_config` directive. If
any message comes in with a key, the producer will error out if `key_config` is
Expand All @@ -179,7 +201,7 @@ There are three possible configurations to use:
all your messages in a topic need to have a key, or they all need to have
no key. This is a good choice for events that aren't keyed - you can still
set a partition key.
* `key_config plain: true` - this indicates that you are not using an Avro-encoded
* `key_config plain: true` - this indicates that you are not using an encoded
key. Use this for legacy topics - new topics should not use this setting.
* `key_config schema: 'MyKeySchema-key'` - this tells the producer to look for
an existing key schema named `MyKeySchema-key` in the schema registry and to
Expand Down Expand Up @@ -234,8 +256,8 @@ like this:
```

If you publish a payload `{ "test_id" => "123", "some_int" => 123 }`, this
will be turned into a key that looks like `{ "test_id" => "123"}` and encoded
via Avro before being sent to Kafka.
will be turned into a key that looks like `{ "test_id" => "123"}` and schema-encoded
before being sent to Kafka.

If you are using `plain` or `schema` as your config, you will need to have a
special `payload_key` key to your payload hash. This will be extracted and
Expand All @@ -261,7 +283,7 @@ class MyConsumer < Deimos::Consumer

def consume(payload, metadata)
# Same method as Phobos consumers.
# payload is an Avro-decoded hash.
# payload is an schema-decoded hash.
# metadata is a hash that contains information like :key and :topic.
# In general, your key should be included in the payload itself. However,
# if you need to access it separately from the payload, you can use
Expand Down Expand Up @@ -311,7 +333,7 @@ this sample:
class MyBatchConsumer < Deimos::BatchConsumer

def consume_batch(payloads, metadata)
# payloads is an array of Avro-decoded hashes.
# payloads is an array of schema-decoded hashes.
# metadata is a hash that contains information like :keys and :topic.
# Keys are automatically decoded and available as an array with
# the same cardinality as the payloads. If you need to iterate
Expand Down Expand Up @@ -604,7 +626,7 @@ Also see [deimos.rb](lib/deimos.rb) under `Configure metrics` to see how the met
Deimos also includes some tracing for kafka consumers. It ships with
DataDog support, but you can add custom tracing providers as well.

Trace spans are used for when incoming messages are avro decoded, and a
Trace spans are used for when incoming messages are schema-decoded, and a
separate span for message consume logic.

### Configuring Tracing Providers
Expand Down Expand Up @@ -749,7 +771,7 @@ be
}
```

Both payload and key will be Avro-decoded as necessary according to the
Both payload and key will be schema-decoded as necessary according to the
key config.

You can also just pass an existing producer or consumer class into the method,
Expand Down
4 changes: 2 additions & 2 deletions deimos-ruby.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ Gem::Specification.new do |spec|
spec.test_files = spec.files.grep(%r{^(test|spec|features)/})
spec.require_paths = ['lib']

spec.add_runtime_dependency('avro-patches', '~> 0.3')
spec.add_runtime_dependency('avro_turf', '~> 0.8')
spec.add_runtime_dependency('phobos', '~> 1.8.2.pre.beta2')
spec.add_runtime_dependency('ruby-kafka', '~> 0.7')

spec.add_development_dependency('activerecord', '~> 5.2')
spec.add_development_dependency('activerecord-import')
spec.add_development_dependency('avro', '~> 1.9')
spec.add_development_dependency('avro_turf', '~> 0.8')
spec.add_development_dependency('bundler', '~> 1')
spec.add_development_dependency('ddtrace', '~> 0.11')
spec.add_development_dependency('dogstatsd-ruby', '~> 4.2')
Expand Down
1 change: 1 addition & 0 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ producers.backend|`:kafka_async`|Currently can be set to `:db`, `:kafka`, or `:k

Config name|Default|Description
-----------|-------|-----------
schema.backend|`:mock`|Backend representing the schema encoder/decoder. You can see a full list [here](../lib/deimos/schema_backends).
schema.registry_url|`http://localhost:8081`|URL of the Confluent schema registry.
schema.path|nil|Local path to find your schemas.

Expand Down
2 changes: 1 addition & 1 deletion docs/DATABASE_BACKEND.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ The database backend consists of three tables:

* `kafka_messages` - this keeps track of the messages that were "published",
including the payload, topic, key and partition key. These messages
are *raw data* - all processing, including Avro encoding, must happen
are *raw data* - all processing, including schema-encoding, must happen
upstream before they are inserted.
* `kafka_topic_info` - this table is essentially a lock table used to ensure
that only one producer thread is ever "working" on a topic at a time.
Expand Down
27 changes: 21 additions & 6 deletions lib/deimos.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
# frozen_string_literal: true

require 'avro-patches'
require 'avro_turf'
require 'phobos'
require 'deimos/version'
require 'deimos/config/configuration'
require 'deimos/avro_data_encoder'
require 'deimos/avro_data_decoder'
require 'deimos/producer'
require 'deimos/active_record_producer'
require 'deimos/active_record_consumer'
Expand All @@ -15,17 +11,19 @@
require 'deimos/instrumentation'
require 'deimos/utils/lag_reporter'

require 'deimos/publish_backend'
require 'deimos/backends/base'
require 'deimos/backends/kafka'
require 'deimos/backends/kafka_async'
require 'deimos/backends/test'

require 'deimos/schema_backends/base'

require 'deimos/monkey_patches/ruby_kafka_heartbeat'
require 'deimos/monkey_patches/schema_store'
require 'deimos/monkey_patches/phobos_producer'
require 'deimos/monkey_patches/phobos_cli'

require 'deimos/railtie' if defined?(Rails)

if defined?(ActiveRecord)
require 'deimos/kafka_source'
require 'deimos/kafka_topic_info'
Expand All @@ -34,13 +32,30 @@
require 'deimos/utils/executor.rb'
require 'deimos/utils/db_producer.rb'
end

require 'deimos/utils/inline_consumer'
require 'yaml'
require 'erb'

# Parent module.
module Deimos
class << self
# @return [Class < Deimos::SchemaBackends::Base]
def schema_backend_class
backend = Deimos.config.schema.backend.to_s

require "deimos/schema_backends/#{backend}"

"Deimos::SchemaBackends::#{backend.classify}".constantize
end

# @param schema [String|Symbol]
# @param namespace [String]
# @return [Deimos::SchemaBackends::Base]
def schema_backend(schema:, namespace:)
schema_backend_class.new(schema: schema, namespace: namespace)
end

# Start the DB producers to send Kafka messages.
# @param thread_count [Integer] the number of threads to start.
def start_db_backend!(thread_count: 1)
Expand Down
21 changes: 10 additions & 11 deletions lib/deimos/active_record_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,31 +68,30 @@ def destroy_record(record)
def record_attributes(payload)
klass = self.class.config[:record_class]
attributes = {}
schema = self.class.decoder.avro_schema
schema.fields.each do |field|
self.class.decoder.schema_fields.each do |field|
column = klass.columns.find { |c| c.name == field.name }
next if column.nil?
next if %w(updated_at created_at).include?(field.name)

attributes[field.name] = _coerce_field(field, column, payload[field.name])
attributes[field.name] = _coerce_field(column, payload[field.name])
end
attributes
end

private

# @param field [Avro::Schema]
# @param column [ActiveRecord::ConnectionAdapters::Column]
# @param val [Object]
def _coerce_field(field, column, val)
def _coerce_field(column, val)
return nil if val.nil?

field_type = field.type.type.to_sym
if field_type == :union
union_types = field.type.schemas.map { |s| s.type.to_sym }
field_type = union_types.find { |t| t != :null }
end
if column.type == :datetime && %i(int long).include?(field_type)
is_integer = begin
val.is_a?(Integer) || (val.is_a?(String) && Integer(val))
rescue StandardError
false
end

if column.type == :datetime && is_integer
return Time.zone.strptime(val.to_s, '%s')
end

Expand Down
4 changes: 2 additions & 2 deletions lib/deimos/active_record_producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ def send_events(records, force_send: false)
# is not set.
# @return [Hash]
def generate_payload(attributes, _record)
schema = self.encoder.avro_schema
fields = self.encoder.schema_fields
payload = attributes.stringify_keys
payload.delete_if do |k, _|
k.to_sym != :payload_key && !schema.fields.find { |f| f.name == k }
k.to_sym != :payload_key && !fields.map(&:name).include?(k)
end
end
end
Expand Down
Loading

0 comments on commit b28c10a

Please sign in to comment.