Skip to content

Commit 631f6dd

Browse files
authored
feat: preserve unrecognized logical types and properties (#469)
1 parent 4966106 commit 631f6dd

File tree

7 files changed

+823
-45
lines changed

7 files changed

+823
-45
lines changed

gen/testdata/golden.avsc

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,15 +106,13 @@
106106
{
107107
"name": "mapOfStrings",
108108
"type": {
109-
"name": "aMapOfStrings",
110109
"type": "map",
111110
"values": "string"
112111
}
113112
},
114113
{
115114
"name": "mapOfRecords",
116115
"type": {
117-
"name": "aMapOfRecords",
118116
"type": "map",
119117
"values": {
120118
"name": "RecordInMap",
@@ -175,7 +173,6 @@
175173
{
176174
"name": "aRecordArray",
177175
"type": {
178-
"name": "someRecordArray",
179176
"type": "array",
180177
"items": {
181178
"name": "recordInArray",

ocf/ocf_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,6 +1074,7 @@ func TestWithSchemaMarshaler(t *testing.T) {
10741074
"name": "meta",
10751075
"type": {
10761076
"type": "array",
1077+
"logicalType": "map",
10771078
"items": {
10781079
"type": "record",
10791080
"name": "FooMetadataEntry",

ocf/testdata/full-schema.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
"field-id": 5
3636
}
3737
]
38-
}
38+
},
39+
"logicalType": "map"
3940
},
4041
"field-id": 3
4142
}

schema.go

Lines changed: 60 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,21 @@ func (nullDefaultType) MarshalJSON() ([]byte, error) {
2525
var nullDefault nullDefaultType = struct{}{}
2626

2727
var (
28-
schemaReserved = []string{
29-
"doc", "fields", "items", "name", "namespace", "size", "symbols",
30-
"values", "type", "aliases", "logicalType", "precision", "scale",
31-
}
32-
fieldReserved = []string{"default", "doc", "name", "order", "type", "aliases"}
28+
// Note: order matches the order of properties as they are named in the spec.
29+
// https://avro.apache.org/docs/1.12.0/specification
30+
recordReserved = []string{"type", "name", "namespace", "doc", "aliases", "fields"}
31+
fieldReserved = []string{"name", "doc", "type", "order", "aliases", "default"}
32+
enumReserved = []string{"type", "name", "namespace", "aliases", "doc", "symbols", "default"}
33+
arrayReserved = []string{"type", "items"}
34+
mapReserved = []string{"type", "values"}
35+
fixedReserved = []string{"type", "name", "namespace", "aliases", "size"}
36+
fixedWithLogicalTypeReserved = []string{"type", "name", "namespace", "aliases", "size", "logicalType"}
37+
fixedWithDecimalTypeReserved = []string{
38+
"type", "name", "namespace", "aliases", "size", "logicalType", "precision", "scale",
39+
}
40+
primitiveReserved = []string{"type"}
41+
primitiveWithLogicalTypeReserved = []string{"type", "logicalType"}
42+
primitiveWithDecimalTypeReserved = []string{"type", "logicalType", "precision", "scale"}
3343
)
3444

3545
// Type is a schema type.
@@ -482,9 +492,16 @@ func NewPrimitiveSchema(t Type, l LogicalSchema, opts ...SchemaOption) *Primitiv
482492
for _, opt := range opts {
483493
opt(&cfg)
484494
}
485-
495+
reservedProps := primitiveReserved
496+
if l != nil {
497+
if l.Type() == Decimal {
498+
reservedProps = primitiveWithDecimalTypeReserved
499+
} else {
500+
reservedProps = primitiveWithLogicalTypeReserved
501+
}
502+
}
486503
return &PrimitiveSchema{
487-
properties: newProperties(cfg.props, schemaReserved),
504+
properties: newProperties(cfg.props, reservedProps),
488505
cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp},
489506
typ: t,
490507
logical: l,
@@ -574,7 +591,7 @@ func NewRecordSchema(name, namespace string, fields []*Field, opts ...SchemaOpti
574591

575592
return &RecordSchema{
576593
name: n,
577-
properties: newProperties(cfg.props, schemaReserved),
594+
properties: newProperties(cfg.props, recordReserved),
578595
cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp},
579596
fields: fields,
580597
doc: cfg.doc,
@@ -919,7 +936,7 @@ func NewEnumSchema(name, namespace string, symbols []string, opts ...SchemaOptio
919936

920937
return &EnumSchema{
921938
name: n,
922-
properties: newProperties(cfg.props, schemaReserved),
939+
properties: newProperties(cfg.props, enumReserved),
923940
cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp},
924941
symbols: symbols,
925942
def: def,
@@ -1072,7 +1089,7 @@ func NewArraySchema(items Schema, opts ...SchemaOption) *ArraySchema {
10721089
}
10731090

10741091
return &ArraySchema{
1075-
properties: newProperties(cfg.props, schemaReserved),
1092+
properties: newProperties(cfg.props, arrayReserved),
10761093
cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp},
10771094
items: items,
10781095
}
@@ -1142,7 +1159,7 @@ func NewMapSchema(values Schema, opts ...SchemaOption) *MapSchema {
11421159
}
11431160

11441161
return &MapSchema{
1145-
properties: newProperties(cfg.props, schemaReserved),
1162+
properties: newProperties(cfg.props, mapReserved),
11461163
cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp},
11471164
values: values,
11481165
}
@@ -1323,9 +1340,17 @@ func NewFixedSchema(
13231340
return nil, err
13241341
}
13251342

1343+
reservedProps := fixedReserved
1344+
if logical != nil {
1345+
if logical.Type() == Decimal {
1346+
reservedProps = fixedWithDecimalTypeReserved
1347+
} else {
1348+
reservedProps = fixedWithLogicalTypeReserved
1349+
}
1350+
}
13261351
return &FixedSchema{
13271352
name: n,
1328-
properties: newProperties(cfg.props, schemaReserved),
1353+
properties: newProperties(cfg.props, reservedProps),
13291354
cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp},
13301355
size: size,
13311356
logical: logical,
@@ -1406,9 +1431,22 @@ func (s *FixedSchema) CacheFingerprint() [32]byte {
14061431

14071432
// NullSchema is an Avro null type schema.
14081433
type NullSchema struct {
1434+
properties
14091435
fingerprinter
14101436
}
14111437

1438+
// NewNullSchema creates a new NullSchema.
1439+
func NewNullSchema(opts ...SchemaOption) *NullSchema {
1440+
var cfg schemaConfig
1441+
for _, opt := range opts {
1442+
opt(&cfg)
1443+
}
1444+
1445+
return &NullSchema{
1446+
properties: newProperties(cfg.props, primitiveReserved),
1447+
}
1448+
}
1449+
14121450
// Type returns the type of the schema.
14131451
func (s *NullSchema) Type() Type {
14141452
return Null
@@ -1421,7 +1459,16 @@ func (s *NullSchema) String() string {
14211459

14221460
// MarshalJSON marshals the schema to json.
14231461
func (s *NullSchema) MarshalJSON() ([]byte, error) {
1424-
return []byte(`"null"`), nil
1462+
if len(s.props) == 0 {
1463+
return []byte(`"null"`), nil
1464+
}
1465+
buf := new(bytes.Buffer)
1466+
buf.WriteString(`{"type":"null"`)
1467+
if err := s.marshalPropertiesToJSON(buf); err != nil {
1468+
return nil, err
1469+
}
1470+
buf.WriteString("}")
1471+
return buf.Bytes(), nil
14251472
}
14261473

14271474
// Fingerprint returns the SHA256 fingerprint of the schema.

schema_json_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ func TestSchema_JSON(t *testing.T) {
2323
input: `{"type":"null"}`,
2424
json: `"null"`,
2525
},
26+
{
27+
input: `{"type":"null","other":"foo"}`,
28+
json: `{"type":"null","other":"foo"}`,
29+
},
2630
{
2731
input: `"boolean"`,
2832
json: `"boolean"`,

schema_parse.go

Lines changed: 69 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,12 @@ func parsePrimitiveType(namespace, s string, cache *SchemaCache) (Schema, error)
121121

122122
func parseComplexType(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) {
123123
if val, ok := m["type"].([]any); ok {
124+
// Note: According to the spec, this is not allowed:
125+
// https://avro.apache.org/docs/1.12.0/specification/#schema-declaration
126+
// The "type" property in an object must be a string. A union type will be a slice,
127+
// but NOT an object with a "type" property that is a slice.
128+
// Might be advisable to remove this call (tradeoff between better conformance
129+
// with the spec vs. possible backwards-compatibility issue).
124130
return parseUnion(namespace, val, seen, cache)
125131
}
126132

@@ -131,10 +137,7 @@ func parseComplexType(namespace string, m map[string]any, seen seenCache, cache
131137
typ := Type(str)
132138

133139
switch typ {
134-
case Null:
135-
return &NullSchema{}, nil
136-
137-
case String, Bytes, Int, Long, Float, Double, Boolean:
140+
case String, Bytes, Int, Long, Float, Double, Boolean, Null:
138141
return parsePrimitive(typ, m)
139142

140143
case Record, Error:
@@ -158,14 +161,15 @@ func parseComplexType(namespace string, m map[string]any, seen seenCache, cache
158161
}
159162

160163
type primitiveSchema struct {
161-
LogicalType string `mapstructure:"logicalType"`
162-
Precision int `mapstructure:"precision"`
163-
Scale int `mapstructure:"scale"`
164-
Props map[string]any `mapstructure:",remain"`
164+
Type string `mapstructure:"type"`
165+
Props map[string]any `mapstructure:",remain"`
165166
}
166167

167168
func parsePrimitive(typ Type, m map[string]any) (Schema, error) {
168-
if m == nil {
169+
if len(m) == 0 {
170+
if typ == Null {
171+
return &NullSchema{}, nil
172+
}
169173
return NewPrimitiveSchema(typ, nil), nil
170174
}
171175

@@ -178,14 +182,20 @@ func parsePrimitive(typ Type, m map[string]any) (Schema, error) {
178182
}
179183

180184
var logical LogicalSchema
181-
if p.LogicalType != "" {
182-
logical = parsePrimitiveLogicalType(typ, p.LogicalType, p.Precision, p.Scale)
185+
if logicalType := logicalTypeProperty(p.Props); logicalType != "" {
186+
logical = parsePrimitiveLogicalType(typ, logicalType, p.Props)
187+
if logical != nil {
188+
delete(p.Props, "logicalType")
189+
}
183190
}
184191

192+
if typ == Null {
193+
return NewNullSchema(WithProps(p.Props)), nil
194+
}
185195
return NewPrimitiveSchema(typ, logical, WithProps(p.Props)), nil
186196
}
187197

188-
func parsePrimitiveLogicalType(typ Type, lt string, prec, scale int) LogicalSchema {
198+
func parsePrimitiveLogicalType(typ Type, lt string, props map[string]any) LogicalSchema {
189199
ltyp := LogicalType(lt)
190200
if (typ == String && ltyp == UUID) ||
191201
(typ == Int && ltyp == Date) ||
@@ -199,10 +209,10 @@ func parsePrimitiveLogicalType(typ Type, lt string, prec, scale int) LogicalSche
199209
}
200210

201211
if typ == Bytes && ltyp == Decimal {
202-
return parseDecimalLogicalType(-1, prec, scale)
212+
return parseDecimalLogicalType(-1, props)
203213
}
204214

205-
return nil
215+
return nil // otherwise, not a recognized logical type
206216
}
207217

208218
type recordSchema struct {
@@ -368,6 +378,7 @@ func parseEnum(namespace string, m map[string]any, seen seenCache, cache *Schema
368378
}
369379

370380
type arraySchema struct {
381+
Type string `mapstructure:"type"`
371382
Items any `mapstructure:"items"`
372383
Props map[string]any `mapstructure:",remain"`
373384
}
@@ -393,6 +404,7 @@ func parseArray(namespace string, m map[string]any, seen seenCache, cache *Schem
393404
}
394405

395406
type mapSchema struct {
407+
Type string `mapstructure:"type"`
396408
Values any `mapstructure:"values"`
397409
Props map[string]any `mapstructure:",remain"`
398410
}
@@ -431,15 +443,12 @@ func parseUnion(namespace string, v []any, seen seenCache, cache *SchemaCache) (
431443
}
432444

433445
type fixedSchema struct {
434-
Name string `mapstructure:"name"`
435-
Namespace string `mapstructure:"namespace"`
436-
Aliases []string `mapstructure:"aliases"`
437-
Type string `mapstructure:"type"`
438-
Size int `mapstructure:"size"`
439-
LogicalType string `mapstructure:"logicalType"`
440-
Precision int `mapstructure:"precision"`
441-
Scale int `mapstructure:"scale"`
442-
Props map[string]any `mapstructure:",remain"`
446+
Name string `mapstructure:"name"`
447+
Namespace string `mapstructure:"namespace"`
448+
Aliases []string `mapstructure:"aliases"`
449+
Type string `mapstructure:"type"`
450+
Size int `mapstructure:"size"`
451+
Props map[string]any `mapstructure:",remain"`
443452
}
444453

445454
func parseFixed(namespace string, m map[string]any, seen seenCache, cache *SchemaCache) (Schema, error) {
@@ -463,8 +472,11 @@ func parseFixed(namespace string, m map[string]any, seen seenCache, cache *Schem
463472
}
464473

465474
var logical LogicalSchema
466-
if f.LogicalType != "" {
467-
logical = parseFixedLogicalType(f.Size, f.LogicalType, f.Precision, f.Scale)
475+
if logicalType := logicalTypeProperty(f.Props); logicalType != "" {
476+
logical = parseFixedLogicalType(f.Size, logicalType, f.Props)
477+
if logical != nil {
478+
delete(f.Props, "logicalType")
479+
}
468480
}
469481

470482
fixed, err := NewFixedSchema(f.Name, f.Namespace, f.Size, logical, WithAliases(f.Aliases), WithProps(f.Props))
@@ -485,19 +497,41 @@ func parseFixed(namespace string, m map[string]any, seen seenCache, cache *Schem
485497
return fixed, nil
486498
}
487499

488-
func parseFixedLogicalType(size int, lt string, prec, scale int) LogicalSchema {
500+
func parseFixedLogicalType(size int, lt string, props map[string]any) LogicalSchema {
489501
ltyp := LogicalType(lt)
490502
switch {
491503
case ltyp == Duration && size == 12:
492504
return NewPrimitiveLogicalSchema(Duration)
493505
case ltyp == Decimal:
494-
return parseDecimalLogicalType(size, prec, scale)
506+
return parseDecimalLogicalType(size, props)
495507
}
496508

497509
return nil
498510
}
499511

500-
func parseDecimalLogicalType(size, prec, scale int) LogicalSchema {
512+
type decimalSchema struct {
513+
Precision int `mapstructure:"precision"`
514+
Scale int `mapstructure:"scale"`
515+
}
516+
517+
func parseDecimalLogicalType(size int, props map[string]any) LogicalSchema {
518+
var (
519+
d decimalSchema
520+
meta mapstructure.Metadata
521+
)
522+
if err := decodeMap(props, &d, &meta); err != nil {
523+
return nil
524+
}
525+
decType := newDecimalLogicalType(size, d.Precision, d.Scale)
526+
if decType != nil {
527+
// Remove the properties that we consumed
528+
delete(props, "precision")
529+
delete(props, "scale")
530+
}
531+
return decType
532+
}
533+
534+
func newDecimalLogicalType(size, prec, scale int) LogicalSchema {
501535
if prec <= 0 {
502536
return nil
503537
}
@@ -594,3 +628,10 @@ func (c seenCache) Add(name string) error {
594628
c[name] = struct{}{}
595629
return nil
596630
}
631+
632+
func logicalTypeProperty(props map[string]any) string {
633+
if lt, ok := props["logicalType"].(string); ok {
634+
return lt
635+
}
636+
return ""
637+
}

0 commit comments

Comments
 (0)