Skip to content

Commit 6787f1e

Browse files
author
Daniel Orner
committed
Add schema backends
1 parent 0e00cb6 commit 6787f1e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+929
-642
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## UNRELEASED
99

10+
- Added schema backends, which should simplify Avro encoding and make it
11+
more flexible for unit tests and local development.
12+
- BREAKING CHANGE: Deimos no longer comes with `avro_turf` as a dependency.
13+
You will need to include it if you are Avro-encoding or decoding your
14+
messages.
15+
1016
# [1.4.0-beta7] - 2019-12-16
1117
- Clone loggers when assigning to multiple levels.
1218

Gemfile.lock

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ PATH
22
remote: .
33
specs:
44
deimos-ruby (1.4.0.pre.beta7)
5-
avro-patches (~> 0.3)
6-
avro_turf (~> 0.8)
75
phobos (~> 1.8.2.pre.beta2)
86
ruby-kafka (~> 0.7)
97

@@ -55,10 +53,8 @@ GEM
5553
tzinfo (~> 1.1)
5654
arel (9.0.0)
5755
ast (2.4.0)
58-
avro (1.8.2)
56+
avro (1.9.1)
5957
multi_json
60-
avro-patches (0.4.1)
61-
avro (= 1.8.2)
6258
avro_turf (0.11.0)
6359
avro (>= 1.7.7, < 1.10)
6460
excon (~> 0.45)
@@ -74,7 +70,7 @@ GEM
7470
digest-crc (0.4.1)
7571
dogstatsd-ruby (4.5.0)
7672
erubi (1.9.0)
77-
excon (0.71.0)
73+
excon (0.71.1)
7874
exponential-backoff (0.0.4)
7975
ffi (1.11.3)
8076
formatador (0.2.5)
@@ -228,6 +224,8 @@ PLATFORMS
228224
DEPENDENCIES
229225
activerecord (~> 5.2)
230226
activerecord-import
227+
avro (~> 1.9)
228+
avro_turf (~> 0.8)
231229
bundler (~> 1)
232230
ddtrace (~> 0.11)
233231
deimos-ruby!

README.md

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@
66
<img src="https://img.shields.io/codeclimate/maintainability/flipp-oss/deimos.svg"/>
77
</p>
88

9-
A Ruby framework for marrying Kafka, Avro, and/or ActiveRecord and provide
9+
A Ruby framework for marrying Kafka, a schema definition like Avro, and/or ActiveRecord and provide
1010
a useful toolbox of goodies for Ruby-based Kafka development.
1111
Built on Phobos and hence Ruby-Kafka.
1212

1313
<!--ts-->
1414
* [Installation](#installation)
1515
* [Versioning](#versioning)
1616
* [Configuration](#configuration)
17+
* [Schemas](#schemas)
1718
* [Producers](#producers)
1819
* [Auto-added Fields](#auto-added-fields)
1920
* [Coerced Values](#coerced-values)
@@ -60,6 +61,27 @@ gem 'deimos-ruby', '~> 1.1'
6061

6162
For a full configuration reference, please see [the configuration docs ](docs/CONFIGURATION.md).
6263

64+
# Schemas
65+
66+
Deimos was originally written only supporting Avro encoding via a schema registry.
67+
This has since been expanded to a plugin architecture allowing messages to be
68+
encoded and decoded via any schema specification you wish.
69+
70+
Currently we have the following possible schema backends:
71+
* Avro Local (use pure Avro)
72+
* Avro Schema Registry (use the Confluent Schema Registry)
73+
* Avro Validation (validate using an Avro schema but leave decoded - this is useful
74+
for unit testing and development)
75+
* Mock (no actual encoding/decoding).
76+
77+
Note that to use Avro-encoding, you must include the [avro_turf](https://github.com/dasch/avro_turf) gem in your
78+
Gemfile.
79+
80+
Other possible schemas could include [Protobuf](https://developers.google.com/protocol-buffers), [JSONSchema](https://json-schema.org/), etc. Feel free to
81+
contribute!
82+
83+
To create a new schema backend, please see the existing examples [here](lib/deimos/schema_backends).
84+
6385
# Producers
6486

6587
Producers will look like this:
@@ -137,7 +159,7 @@ produced by Phobos and RubyKafka):
137159
* topic
138160
* exception_object
139161
* payloads - the unencoded payloads
140-
* `encode_messages` - sent when messages are being Avro-encoded.
162+
* `encode_messages` - sent when messages are being schema-encoded.
141163
* producer - the class that produced the message
142164
* topic
143165
* payloads - the unencoded payloads
@@ -165,8 +187,8 @@ Similarly:
165187
### Kafka Message Keys
166188

167189
Topics representing events rather than domain data don't need keys. However,
168-
best practice for domain messages is to Avro-encode message keys
169-
with a separate Avro schema.
190+
best practice for domain messages is to schema-encode message keys
191+
with a separate schema.
170192

171193
This enforced by requiring producers to define a `key_config` directive. If
172194
any message comes in with a key, the producer will error out if `key_config` is
@@ -179,7 +201,7 @@ There are three possible configurations to use:
179201
all your messages in a topic need to have a key, or they all need to have
180202
no key. This is a good choice for events that aren't keyed - you can still
181203
set a partition key.
182-
* `key_config plain: true` - this indicates that you are not using an Avro-encoded
204+
* `key_config plain: true` - this indicates that you are not using an encoded
183205
key. Use this for legacy topics - new topics should not use this setting.
184206
* `key_config schema: 'MyKeySchema-key'` - this tells the producer to look for
185207
an existing key schema named `MyKeySchema-key` in the schema registry and to
@@ -234,8 +256,8 @@ like this:
234256
```
235257

236258
If you publish a payload `{ "test_id" => "123", "some_int" => 123 }`, this
237-
will be turned into a key that looks like `{ "test_id" => "123"}` and encoded
238-
via Avro before being sent to Kafka.
259+
will be turned into a key that looks like `{ "test_id" => "123"}` and schema-encoded
260+
before being sent to Kafka.
239261

240262
If you are using `plain` or `schema` as your config, you will need to have a
241263
special `payload_key` key to your payload hash. This will be extracted and
@@ -261,7 +283,7 @@ class MyConsumer < Deimos::Consumer
261283

262284
def consume(payload, metadata)
263285
# Same method as Phobos consumers.
264-
# payload is an Avro-decoded hash.
286+
# payload is an schema-decoded hash.
265287
# metadata is a hash that contains information like :key and :topic.
266288
# In general, your key should be included in the payload itself. However,
267289
# if you need to access it separately from the payload, you can use
@@ -311,7 +333,7 @@ this sample:
311333
class MyBatchConsumer < Deimos::BatchConsumer
312334

313335
def consume_batch(payloads, metadata)
314-
# payloads is an array of Avro-decoded hashes.
336+
# payloads is an array of schema-decoded hashes.
315337
# metadata is a hash that contains information like :keys and :topic.
316338
# Keys are automatically decoded and available as an array with
317339
# the same cardinality as the payloads. If you need to iterate
@@ -604,7 +626,7 @@ Also see [deimos.rb](lib/deimos.rb) under `Configure metrics` to see how the met
604626
Deimos also includes some tracing for kafka consumers. It ships with
605627
DataDog support, but you can add custom tracing providers as well.
606628

607-
Trace spans are used for when incoming messages are avro decoded, and a
629+
Trace spans are used for when incoming messages are schema-decoded, and a
608630
separate span for message consume logic.
609631

610632
### Configuring Tracing Providers
@@ -749,7 +771,7 @@ be
749771
}
750772
```
751773

752-
Both payload and key will be Avro-decoded as necessary according to the
774+
Both payload and key will be schema-decoded as necessary according to the
753775
key config.
754776

755777
You can also just pass an existing producer or consumer class into the method,

deimos-ruby.gemspec

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ Gem::Specification.new do |spec|
1818
spec.test_files = spec.files.grep(%r{^(test|spec|features)/})
1919
spec.require_paths = ['lib']
2020

21-
spec.add_runtime_dependency('avro-patches', '~> 0.3')
22-
spec.add_runtime_dependency('avro_turf', '~> 0.8')
2321
spec.add_runtime_dependency('phobos', '~> 1.8.2.pre.beta2')
2422
spec.add_runtime_dependency('ruby-kafka', '~> 0.7')
2523

2624
spec.add_development_dependency('activerecord', '~> 5.2')
2725
spec.add_development_dependency('activerecord-import')
26+
spec.add_development_dependency('avro', '~> 1.9')
27+
spec.add_development_dependency('avro_turf', '~> 0.8')
2828
spec.add_development_dependency('bundler', '~> 1')
2929
spec.add_development_dependency('ddtrace', '~> 0.11')
3030
spec.add_development_dependency('dogstatsd-ruby', '~> 4.2')

docs/CONFIGURATION.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ producers.backend|`:kafka_async`|Currently can be set to `:db`, `:kafka`, or `:k
144144

145145
Config name|Default|Description
146146
-----------|-------|-----------
147+
schema.backend|`:mock`|Backend representing the schema encoder/decoder. You can see a full list [here](../lib/deimos/schema_backends).
147148
schema.registry_url|`http://localhost:8081`|URL of the Confluent schema registry.
148149
schema.path|nil|Local path to find your schemas.
149150

docs/DATABASE_BACKEND.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ The database backend consists of three tables:
7777

7878
* `kafka_messages` - this keeps track of the messages that were "published",
7979
including the payload, topic, key and partition key. These messages
80-
are *raw data* - all processing, including Avro encoding, must happen
80+
are *raw data* - all processing, including schema-encoding, must happen
8181
upstream before they are inserted.
8282
* `kafka_topic_info` - this table is essentially a lock table used to ensure
8383
that only one producer thread is ever "working" on a topic at a time.

lib/deimos.rb

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,8 @@
11
# frozen_string_literal: true
22

3-
require 'avro-patches'
4-
require 'avro_turf'
53
require 'phobos'
64
require 'deimos/version'
75
require 'deimos/config/configuration'
8-
require 'deimos/avro_data_encoder'
9-
require 'deimos/avro_data_decoder'
106
require 'deimos/producer'
117
require 'deimos/active_record_producer'
128
require 'deimos/active_record_consumer'
@@ -15,16 +11,18 @@
1511
require 'deimos/instrumentation'
1612
require 'deimos/utils/lag_reporter'
1713

18-
require 'deimos/publish_backend'
14+
require 'deimos/backends/base'
1915
require 'deimos/backends/kafka'
2016
require 'deimos/backends/kafka_async'
2117

18+
require 'deimos/schema_backends/base'
19+
2220
require 'deimos/monkey_patches/ruby_kafka_heartbeat'
23-
require 'deimos/monkey_patches/schema_store'
2421
require 'deimos/monkey_patches/phobos_producer'
2522
require 'deimos/monkey_patches/phobos_cli'
2623

2724
require 'deimos/railtie' if defined?(Rails)
25+
2826
if defined?(ActiveRecord)
2927
require 'deimos/kafka_source'
3028
require 'deimos/kafka_topic_info'
@@ -33,13 +31,30 @@
3331
require 'deimos/utils/executor.rb'
3432
require 'deimos/utils/db_producer.rb'
3533
end
34+
3635
require 'deimos/utils/inline_consumer'
3736
require 'yaml'
3837
require 'erb'
3938

4039
# Parent module.
4140
module Deimos
4241
class << self
42+
# @return [Class < Deimos::SchemaBackends::Base]
43+
def schema_backend_class
44+
backend = Deimos.config.schema.backend.to_s
45+
46+
require "deimos/schema_backends/#{backend}"
47+
48+
"Deimos::SchemaBackends::#{backend.classify}".constantize
49+
end
50+
51+
# @param schema [String|Symbol]
52+
# @param namespace [String]
53+
# @return [Deimos::SchemaBackends::Base]
54+
def schema_backend(schema:, namespace:)
55+
schema_backend_class.new(schema: schema, namespace: namespace)
56+
end
57+
4358
# Start the DB producers to send Kafka messages.
4459
# @param thread_count [Integer] the number of threads to start.
4560
def start_db_backend!(thread_count: 1)

lib/deimos/active_record_consumer.rb

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,7 @@ def destroy_record(record)
6868
def record_attributes(payload)
6969
klass = self.class.config[:record_class]
7070
attributes = {}
71-
schema = self.class.decoder.avro_schema
72-
schema.fields.each do |field|
71+
self.class.decoder.schema_fields.each do |field|
7372
column = klass.columns.find { |c| c.name == field.name }
7473
next if column.nil?
7574
next if %w(updated_at created_at).include?(field.name)
@@ -81,18 +80,13 @@ def record_attributes(payload)
8180

8281
private
8382

84-
# @param field [Avro::Schema]
83+
# @param field [Deimos::SchemaField]
8584
# @param column [ActiveRecord::ConnectionAdapters::Column]
8685
# @param val [Object]
8786
def _coerce_field(field, column, val)
8887
return nil if val.nil?
8988

90-
field_type = field.type.type.to_sym
91-
if field_type == :union
92-
union_types = field.type.schemas.map { |s| s.type.to_sym }
93-
field_type = union_types.find { |t| t != :null }
94-
end
95-
if column.type == :datetime && %i(int long).include?(field_type)
89+
if column.type == :datetime && self.class.decoder.numeric_field?(field)
9690
return Time.zone.strptime(val.to_s, '%s')
9791
end
9892

lib/deimos/active_record_producer.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ def send_events(records, force_send: false)
5353
# is not set.
5454
# @return [Hash]
5555
def generate_payload(attributes, _record)
56-
schema = self.encoder.avro_schema
56+
fields = self.encoder.schema_fields
5757
payload = attributes.stringify_keys
5858
payload.delete_if do |k, _|
59-
k.to_sym != :payload_key && !schema.fields.find { |f| f.name == k }
59+
k.to_sym != :payload_key && !fields.map(&:name).include?(k)
6060
end
6161
end
6262
end

0 commit comments

Comments
 (0)