Skip to content

Commit c36dcb6

Browse files
authored
Merge pull request #3137 from rockwotj/main
schema_registry_decode: support avro for decoding
2 parents e6a224d + baafe33 commit c36dcb6

11 files changed

+731
-61
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ All notable changes to this project will be documented in this file.
1010
- New `mysql_cdc` input supporting change data capture (CDC) from MySQL. (@rockwotj, @le-vlad)
1111
- Field `instance_id` added to `kafka`, `kafka_franz`, `ockam_kafka`, `redpanda`, `redpanda_common`, and `redpanda_migrator` inputs. (@rockwotj)
1212
- Fields `rebalance_timeout`, `session_timeout` and `heartbeat_interval` added to the `kafka_franz`, `redpanda`, `redpanda_common`, `redpanda_migrator` and `ockam_kafka` inputs. (@rockwotj)
13+
- Field `avro.preserve_logical_types` for processor `schema_registry_decode` was added to preserve logical types instead of decoding them as their primitive representation. (@rockwotj)
14+
15+
### Changed
16+
17+
- Field `avro_raw_json` was deprecated in favor of `avro.raw_unions` for processor `schema_registry_decode`. (@rockwotj)
1318

1419
## 4.45.1 - 2025-01-17
1520

docs/modules/components/pages/processors/schema_registry_decode.adoc

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ Common::
3636
# Common config fields, showing default values
3737
label: ""
3838
schema_registry_decode:
39+
avro:
40+
raw_unions: false # No default (optional)
41+
preserve_logical_types: false
3942
url: "" # No default (required)
4043
```
4144
@@ -48,7 +51,9 @@ Advanced::
4851
# All config fields, showing default values
4952
label: ""
5053
schema_registry_decode:
51-
avro_raw_json: false
54+
avro:
55+
raw_unions: false # No default (optional)
56+
preserve_logical_types: false
5257
url: "" # No default (required)
5358
oauth:
5459
enabled: false
@@ -91,8 +96,8 @@ This processor creates documents formatted as https://avro.apache.org/docs/curre
9196
For example, the union schema `["null","string","Foo"]`, where `Foo` is a record name, would encode:
9297
9398
- `null` as `null`;
94-
- the string `"a"` as `\{"string": "a"}`; and
95-
- a `Foo` instance as `\{"Foo": {...}}`, where `{...}` indicates the JSON encoding of a `Foo` instance.
99+
- the string `"a"` as `{"string": "a"}`; and
100+
- a `Foo` instance as `{"Foo": {...}}`, where `{...}` indicates the JSON encoding of a `Foo` instance.
96101
97102
However, it is possible to instead create documents in https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard/raw JSON format^] by setting the field <<avro_raw_json, `avro_raw_json`>> to `true`.
98103
@@ -103,9 +108,35 @@ This processor decodes protobuf messages to JSON documents, you can read more ab
103108
104109
== Fields
105110
106-
=== `avro_raw_json`
111+
=== `avro`
107112
108-
Whether Avro messages should be decoded into normal JSON ("json that meets the expectations of regular internet json") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[Avro JSON^]. If `true` the schema returned from the subject should be decoded as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard json^] instead of as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec[avro json^]. There is a https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249[comment in goavro^], the https://github.com/linkedin/goavro[underlining library used for avro serialization^], that explains in more detail the difference between the standard json and avro json.
113+
Configuration for how to decode schemas that are of type AVRO.
114+
115+
116+
*Type*: `object`
117+
118+
119+
=== `avro.raw_unions`
120+
121+
Whether avro messages should be decoded into normal JSON ("json that meets the expectations of regular internet json") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[JSON as specified in the Avro Spec^].
122+
123+
For example, if there is a union schema `["null", "string", "Foo"]` where `Foo` is a record name, with raw_unions as false (the default) you get:
124+
- `null` as `null`;
125+
- the string `"a"` as `{"string": "a"}`; and
126+
- a `Foo` instance as `{"Foo": {...}}`, where `{...}` indicates the JSON encoding of a `Foo` instance.
127+
128+
When raw_unions is set to true then the above union schema is decoded as the following:
129+
- `null` as `null`;
130+
- the string `"a"` as `"a"`; and
131+
- a `Foo` instance as `{...}`, where `{...}` indicates the JSON encoding of a `Foo` instance.
132+
133+
134+
*Type*: `bool`
135+
136+
137+
=== `avro.preserve_logical_types`
138+
139+
Whether logical types should be preserved or transformed back into their primitive type. By default, decimals are decoded as raw bytes and timestamps are decoded as plain integers. Setting this field to true keeps decimal types as numbers in bloblang and timestamps as time values.
109140
110141
111142
*Type*: `bool`

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ require (
7474
github.com/golang-jwt/jwt/v5 v5.2.1
7575
github.com/googleapis/go-sql-spanner v1.8.0
7676
github.com/gosimple/slug v1.14.0
77+
github.com/hamba/avro/v2 v2.22.2-0.20240625062549-66aad10411d9
7778
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c
7879
github.com/jackc/pgx/v4 v4.18.3
7980
github.com/jackc/pgx/v5 v5.6.0
@@ -173,7 +174,6 @@ require (
173174
github.com/containerd/platforms v0.2.1 // indirect
174175
github.com/envoyproxy/go-control-plane v0.13.0 // indirect
175176
github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect
176-
github.com/hamba/avro/v2 v2.22.2-0.20240625062549-66aad10411d9 // indirect
177177
github.com/json-iterator/go v1.1.12 // indirect
178178
github.com/jzelinskie/stringz v0.0.3 // indirect
179179
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect

internal/impl/confluent/processor_schema_registry_decode.go

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ This processor creates documents formatted as https://avro.apache.org/docs/curre
5252
For example, the union schema ` + "`[\"null\",\"string\",\"Foo\"]`, where `Foo`" + ` is a record name, would encode:
5353
5454
- ` + "`null` as `null`" + `;
55-
- the string ` + "`\"a\"` as `\\{\"string\": \"a\"}`" + `; and
56-
- a ` + "`Foo` instance as `\\{\"Foo\": {...}}`, where `{...}` indicates the JSON encoding of a `Foo`" + ` instance.
55+
- the string ` + "`\"a\"` as `{\"string\": \"a\"}`" + `; and
56+
- a ` + "`Foo` instance as `{\"Foo\": {...}}`, where `{...}` indicates the JSON encoding of a `Foo`" + ` instance.
5757
5858
However, it is possible to instead create documents in https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard/raw JSON format^] by setting the field ` + "<<avro_raw_json, `avro_raw_json`>> to `true`" + `.
5959
@@ -63,7 +63,25 @@ This processor decodes protobuf messages to JSON documents, you can read more ab
6363
`).
6464
Field(service.NewBoolField("avro_raw_json").
6565
Description("Whether Avro messages should be decoded into normal JSON (\"json that meets the expectations of regular internet json\") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[Avro JSON^]. If `true` the schema returned from the subject should be decoded as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodecForStandardJSONFull[standard json^] instead of as https://pkg.go.dev/github.com/linkedin/goavro/v2#NewCodec[avro json^]. There is a https://github.com/linkedin/goavro/blob/5ec5a5ee7ec82e16e6e2b438d610e1cab2588393/union.go#L224-L249[comment in goavro^], the https://github.com/linkedin/goavro[underlining library used for avro serialization^], that explains in more detail the difference between the standard json and avro json.").
66-
Advanced().Default(false)).
66+
Advanced().Default(false).Deprecated()).
67+
Fields(
68+
service.NewObjectField(
69+
"avro",
70+
service.NewBoolField("raw_unions").Description(`Whether avro messages should be decoded into normal JSON ("json that meets the expectations of regular internet json") rather than https://avro.apache.org/docs/current/specification/_print/#json-encoding[JSON as specified in the Avro Spec^].
71+
72+
For example, if there is a union schema `+"`"+`["null", "string", "Foo"]`+"`"+` where `+"`Foo`"+` is a record name, with raw_unions as false (the default) you get:
73+
- `+"`null` as `null`"+`;
74+
- the string `+"`\"a\"` as `{\"string\": \"a\"}`"+`; and
75+
- a `+"`Foo` instance as `{\"Foo\": {...}}`, where `{...}` indicates the JSON encoding of a `Foo`"+` instance.
76+
77+
When raw_unions is set to true then the above union schema is decoded as the following:
78+
- `+"`null` as `null`"+`;
79+
- the string `+"`\"a\"` as `\"a\"`"+`; and
80+
- a `+"`Foo` instance as `{...}`, where `{...}` indicates the JSON encoding of a `Foo`"+` instance.
81+
`).Optional(),
82+
service.NewBoolField("preserve_logical_types").Description(`Whether logical types should be preserved or transformed back into their primitive type. By default, decimals are decoded as raw bytes and timestamps are decoded as plain integers. Setting this field to true keeps decimal types as numbers in bloblang and timestamps as time values.`).Default(false),
83+
).Description("Configuration for how to decode schemas that are of type AVRO."),
84+
).
6785
Field(service.NewURLField("url").Description("The base URL of the schema registry service."))
6886

6987
for _, f := range service.NewHTTPRequestAuthSignerFields() {
@@ -86,9 +104,16 @@ func init() {
86104

87105
//------------------------------------------------------------------------------
88106

107+
type decodingConfig struct {
108+
avro struct {
109+
useHamba bool
110+
rawUnions bool
111+
}
112+
}
113+
89114
type schemaRegistryDecoder struct {
90-
avroRawJSON bool
91-
client *sr.Client
115+
cfg decodingConfig
116+
client *sr.Client
92117

93118
schemas map[int]*cachedSchemaDecoder
94119
cacheMut sync.RWMutex
@@ -112,26 +137,38 @@ func newSchemaRegistryDecoderFromConfig(conf *service.ParsedConfig, mgr *service
112137
if err != nil {
113138
return nil, err
114139
}
115-
avroRawJSON, err := conf.FieldBool("avro_raw_json")
140+
var cfg decodingConfig
141+
cfg.avro.rawUnions, err = conf.FieldBool("avro_raw_json")
142+
if err != nil {
143+
return nil, err
144+
}
145+
146+
cfg.avro.useHamba, err = conf.FieldBool("avro", "preserve_logical_types")
116147
if err != nil {
117148
return nil, err
118149
}
119-
return newSchemaRegistryDecoder(urlStr, authSigner, tlsConf, avroRawJSON, mgr)
150+
if conf.Contains("avro", "raw_unions") {
151+
cfg.avro.rawUnions, err = conf.FieldBool("avro", "raw_unions")
152+
if err != nil {
153+
return nil, err
154+
}
155+
}
156+
return newSchemaRegistryDecoder(urlStr, authSigner, tlsConf, cfg, mgr)
120157
}
121158

122159
func newSchemaRegistryDecoder(
123160
urlStr string,
124161
reqSigner func(f fs.FS, req *http.Request) error,
125162
tlsConf *tls.Config,
126-
avroRawJSON bool,
163+
cfg decodingConfig,
127164
mgr *service.Resources,
128165
) (*schemaRegistryDecoder, error) {
129166
s := &schemaRegistryDecoder{
130-
avroRawJSON: avroRawJSON,
131-
schemas: map[int]*cachedSchemaDecoder{},
132-
shutSig: shutdown.NewSignaller(),
133-
logger: mgr.Logger(),
134-
mgr: mgr,
167+
cfg: cfg,
168+
schemas: map[int]*cachedSchemaDecoder{},
169+
shutSig: shutdown.NewSignaller(),
170+
logger: mgr.Logger(),
171+
mgr: mgr,
135172
}
136173
var err error
137174
if s.client, err = sr.NewClient(urlStr, reqSigner, tlsConf, mgr); err != nil {
@@ -265,7 +302,11 @@ func (s *schemaRegistryDecoder) getDecoder(id int) (schemaDecoder, error) {
265302
case franz_sr.TypeJSON:
266303
decoder, err = s.getJSONDecoder(ctx, resPayload)
267304
default:
268-
decoder, err = s.getAvroDecoder(ctx, resPayload)
305+
if s.cfg.avro.useHamba {
306+
decoder, err = s.getHambaAvroDecoder(ctx, resPayload)
307+
} else {
308+
decoder, err = s.getGoAvroDecoder(ctx, resPayload)
309+
}
269310
}
270311
if err != nil {
271312
return nil, err

0 commit comments

Comments
 (0)