From 001598704f647493254e7e48da9297d5f96f551d Mon Sep 17 00:00:00 2001 From: Tigran Najaryan Date: Mon, 3 Feb 2025 17:38:10 -0500 Subject: [PATCH] Introduce minimal WireSchema This decouples human-readable Schema that is in JSON format from WireSchema that is serialized in binary format for efficiency. WireSchema contains only the information that is necessary for wire compatibility checks. Resolves https://github.com/splunk/stef/issues/24 (Note that we still need struct names for checks). Here are byte size comparisons between JSON and WIRE for the Otel/TEF schema with "Metrics" root: ``` JSON: 2976, zstd: 760 WIRE: 115, zstd: 113 ``` --- go/grpc/client.go | 6 +- go/grpc/server.go | 4 +- go/otel/grpc_test.go | 4 +- go/otel/manual_test.go | 17 +- go/otel/oteltef/anyvalue.go | 12 +- go/otel/oteltef/dicts.go | 6 +- go/otel/oteltef/envelope.go | 12 +- go/otel/oteltef/event.go | 12 +- go/otel/oteltef/exemplar.go | 12 +- go/otel/oteltef/exemplarvalue.go | 12 +- go/otel/oteltef/histogramvalue.go | 12 +- go/otel/oteltef/link.go | 12 +- go/otel/oteltef/metric.go | 12 +- go/otel/oteltef/metrics.go | 12 +- go/otel/oteltef/metricswriter.go | 12 +- go/otel/oteltef/point.go | 12 +- go/otel/oteltef/pointvalue.go | 12 +- go/otel/oteltef/resource.go | 12 +- go/otel/oteltef/scope.go | 12 +- go/otel/oteltef/span.go | 12 +- go/otel/oteltef/spans.go | 12 +- go/otel/oteltef/spanstatus.go | 12 +- go/otel/oteltef/spanswriter.go | 12 +- go/pkg/basereader.go | 6 +- go/pkg/schema/schema.go | 458 +---------------- go/pkg/schema/schema_test.go | 577 +++++++++++++++++++++- go/pkg/schema/testdata/oteltef.wire.json | 467 +++++++++++++++++ go/pkg/schema/wireschema.go | 142 ++++++ go/pkg/writeropts.go | 2 +- otelcol/cmd/stefmockserver/main.go | 2 +- otelcol/internal/stefexporter/exporter.go | 2 +- stefgen/generator/writer.go | 3 +- stefgen/templates/dicts.go.tmpl | 6 +- stefgen/templates/oneof.go.tmpl | 12 +- stefgen/templates/struct.go.tmpl | 12 +- stefgen/templates/writer.go.tmpl | 10 +- 36 files changed, 1343 insertions(+), 609 deletions(-) create mode 100755 go/pkg/schema/testdata/oteltef.wire.json create mode 100644 go/pkg/schema/wireschema.go diff --git a/go/grpc/client.go b/go/grpc/client.go index eefad13..b6b07b6 100644 --- a/go/grpc/client.go +++ b/go/grpc/client.go @@ -28,7 +28,7 @@ type Client struct { grpcClient stef_proto.STEFDestinationClient stream stef_proto.STEFDestination_StreamClient callbacks ClientCallbacks - clientSchema *schema.Schema + clientSchema *schema.WireSchema logger types.Logger // Running state @@ -88,7 +88,7 @@ type ClientSettings struct { Logger types.Logger // gRPC stream to send data over. GrpcClient stef_proto.STEFDestinationClient - ClientSchema *schema.Schema + ClientSchema *schema.WireSchema Callbacks ClientCallbacks } @@ -154,7 +154,7 @@ func (c *Client) Connect(ctx context.Context) (pkg.ChunkWriter, pkg.WriterOption } // Unmarshal server schema. - var serverSchema schema.Schema + var serverSchema schema.WireSchema buf := bytes.NewBuffer(capabilities.Capabilities.Schema) err = serverSchema.Deserialize(buf) if err != nil { diff --git a/go/grpc/server.go b/go/grpc/server.go index c252d7a..6c20566 100644 --- a/go/grpc/server.go +++ b/go/grpc/server.go @@ -124,7 +124,7 @@ type StreamServer struct { stef_proto.UnimplementedSTEFDestinationServer logger types.Logger - serverSchema *schema.Schema + serverSchema *schema.WireSchema maxDictBytes uint64 onStream func(reader GrpcReader, ackFunc func(sequenceId uint64) error) error } @@ -133,7 +133,7 @@ var _ stef_proto.STEFDestinationServer = (*StreamServer)(nil) type ServerSettings struct { Logger types.Logger - ServerSchema *schema.Schema + ServerSchema *schema.WireSchema MaxDictBytes uint64 OnStream func(reader GrpcReader, ackFunc func(sequenceId uint64) error) error } diff --git a/go/otel/grpc_test.go b/go/otel/grpc_test.go index e377ecc..936bf71 100644 --- a/go/otel/grpc_test.go +++ b/go/otel/grpc_test.go @@ -41,7 +41,7 @@ func TestGrpcWriteRead(t *testing.T) { recordsReceivedAndVerified := make(chan struct{}) serverSettings := stefgrpc.ServerSettings{ - ServerSchema: schema, + ServerSchema: &schema, OnStream: func(source stefgrpc.GrpcReader, ackFunc func(sequenceId uint64) error) error { reader, err := oteltef.NewMetricsReader(source) require.NoError(t, err) @@ -95,7 +95,7 @@ func TestGrpcWriteRead(t *testing.T) { clientSettings := stefgrpc.ClientSettings{ GrpcClient: grpcClient, - ClientSchema: schema, + ClientSchema: &schema, Callbacks: callbacks, } client := stefgrpc.NewClient(clientSettings) diff --git a/go/otel/manual_test.go b/go/otel/manual_test.go index 8445a04..5ecda88 100644 --- a/go/otel/manual_test.go +++ b/go/otel/manual_test.go @@ -284,7 +284,7 @@ func TestAnyValue(t *testing.T) { } } -func writeReadRecord(t *testing.T, withSchema *schema.Schema) *oteltef.Metrics { +func writeReadRecord(t *testing.T, withSchema *schema.WireSchema) *oteltef.Metrics { buf := &countingChunkWriter{} writer, err := oteltef.NewMetricsWriter(buf, pkg.WriterOptions{Schema: withSchema}) require.NoError(t, err) @@ -321,7 +321,7 @@ func TestWriteOverrideSchema(t *testing.T) { assert.EqualValues(t, 4.5, readRecord.Point().Value().Float64()) // Write/read using full, unmodified schema - readRecord = writeReadRecord(t, schem) + readRecord = writeReadRecord(t, &schem) assert.EqualValues(t, "abc", readRecord.Metric().Name()) assert.EqualValues(t, "scope", readRecord.Scope().Name()) assert.EqualValues(t, 123, readRecord.Point().Timestamp()) @@ -329,15 +329,13 @@ func TestWriteOverrideSchema(t *testing.T) { assert.EqualValues(t, 4.5, readRecord.Point().Value().Float64()) // Remove "Monotonic" field (field #8) from "Metric" struct in the schema. - metric := schem.Structs["Metric"] - metric.Fields = metric.Fields[:7] + schem.StructFieldCount["Metric"] = 7 // Remove "Float64" field (field #2) from "PointValue" oneof struct in the schema. - pointValueOneOf := schem.Structs["PointValue"] - pointValueOneOf.Fields = pointValueOneOf.Fields[:1] + schem.StructFieldCount["PointValue"] = 1 // Write/read using reduced schema - readRecord = writeReadRecord(t, schem) + readRecord = writeReadRecord(t, &schem) assert.EqualValues(t, "abc", readRecord.Metric().Name()) assert.EqualValues(t, "scope", readRecord.Scope().Name()) assert.EqualValues(t, 123, readRecord.Point().Timestamp()) @@ -350,11 +348,10 @@ func TestWriteOverrideSchema(t *testing.T) { assert.EqualValues(t, 0.0, readRecord.Point().Value().Float64()) // Remove the entire "Point" field (field #6) from "Record" struct in the schema. - mainStruct := schem.Structs["Metrics"] - mainStruct.Fields = mainStruct.Fields[:5] + schem.StructFieldCount["Metrics"] = 5 // Write/read using reduced schema - readRecord = writeReadRecord(t, schem) + readRecord = writeReadRecord(t, &schem) assert.EqualValues(t, "abc", readRecord.Metric().Name()) assert.EqualValues(t, "scope", readRecord.Scope().Name()) // All Point fields are default values because Point field was not encoded by Writer. diff --git a/go/otel/oteltef/anyvalue.go b/go/otel/oteltef/anyvalue.go index f9badd5..ffa4f40 100644 --- a/go/otel/oteltef/anyvalue.go +++ b/go/otel/oteltef/anyvalue.go @@ -341,13 +341,13 @@ func (e *AnyValueEncoder) Init(state *WriterState, columns *pkg.WriteColumnSet) e.limiter = &state.limiter if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["AnyValue"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("AnyValue") + if !ok { return fmt.Errorf("cannot find oneof in override schema: %s", "AnyValue") } // Number of fields in the target schema. - e.fieldCount = uint(len(overrideSchema.Fields)) + e.fieldCount = fieldCount } else { // Keep all fields when encoding. e.fieldCount = 7 @@ -523,13 +523,13 @@ func (d *AnyValueDecoder) Init(state *ReaderState, columns *pkg.ReadColumnSet) e state.AnyValueDecoder = d if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["AnyValue"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("AnyValue") + if !ok { return fmt.Errorf("cannot find oneof in override schema: %s", "AnyValue") } // Number of fields in the target schema. - d.fieldCount = uint(len(overrideSchema.Fields)) + d.fieldCount = fieldCount } else { // Keep all fields when encoding. d.fieldCount = 7 diff --git a/go/otel/oteltef/dicts.go b/go/otel/oteltef/dicts.go index 2029284..e65eafb 100644 --- a/go/otel/oteltef/dicts.go +++ b/go/otel/oteltef/dicts.go @@ -12,7 +12,7 @@ type WriterState struct { // OverrideSchema is set if encoding should perform a translation into the target // schema. The specified schema must be compatible with endoders' schema. - OverrideSchema *schema.Schema + OverrideSchema *schema.WireSchema // Dictionaries AnyValueString encoders.StringEncoderDict @@ -104,7 +104,7 @@ func (d *WriterState) Reset() { type ReaderState struct { // OverrideSchema is set if decoding should perform a translation from specfied // schema. OverrideSchema must be compatible with decoders' schema. - OverrideSchema *schema.Schema + OverrideSchema *schema.WireSchema // Dictionaries AnyValueString encoders.StringDecoderDict @@ -151,7 +151,7 @@ type ReaderState struct { SpansDecoder *SpansDecoder } -func (d *ReaderState) Init(overrideSchema *schema.Schema) { +func (d *ReaderState) Init(overrideSchema *schema.WireSchema) { d.OverrideSchema = overrideSchema d.AnyValueString.Init() d.AttributeKey.Init() diff --git a/go/otel/oteltef/envelope.go b/go/otel/oteltef/envelope.go index 8573822..a1b55b7 100644 --- a/go/otel/oteltef/envelope.go +++ b/go/otel/oteltef/envelope.go @@ -150,13 +150,13 @@ func (e *EnvelopeEncoder) Init(state *WriterState, columns *pkg.WriteColumnSet) e.limiter = &state.limiter if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["Envelope"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("Envelope") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "Envelope") } // Number of fields in the target schema. - e.fieldCount = uint(len(overrideSchema.Fields)) + e.fieldCount = fieldCount // Set that many 1 bits in the keepFieldMask. All fields with higher number // will be skipped when encoding. @@ -246,13 +246,13 @@ func (d *EnvelopeDecoder) Init(state *ReaderState, columns *pkg.ReadColumnSet) e state.EnvelopeDecoder = d if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["Envelope"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("Envelope") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "Envelope") } // Number of fields in the target schema. - d.fieldCount = uint(len(overrideSchema.Fields)) + d.fieldCount = fieldCount } else { // Keep all fields when encoding. d.fieldCount = 1 diff --git a/go/otel/oteltef/event.go b/go/otel/oteltef/event.go index ab52472..1ed1cba 100644 --- a/go/otel/oteltef/event.go +++ b/go/otel/oteltef/event.go @@ -264,13 +264,13 @@ func (e *EventEncoder) Init(state *WriterState, columns *pkg.WriteColumnSet) err e.limiter = &state.limiter if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["Event"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("Event") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "Event") } // Number of fields in the target schema. - e.fieldCount = uint(len(overrideSchema.Fields)) + e.fieldCount = fieldCount // Set that many 1 bits in the keepFieldMask. All fields with higher number // will be skipped when encoding. @@ -414,13 +414,13 @@ func (d *EventDecoder) Init(state *ReaderState, columns *pkg.ReadColumnSet) erro state.EventDecoder = d if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["Event"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("Event") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "Event") } // Number of fields in the target schema. - d.fieldCount = uint(len(overrideSchema.Fields)) + d.fieldCount = fieldCount } else { // Keep all fields when encoding. d.fieldCount = 4 diff --git a/go/otel/oteltef/exemplar.go b/go/otel/oteltef/exemplar.go index 1cf3837..4c72b7c 100644 --- a/go/otel/oteltef/exemplar.go +++ b/go/otel/oteltef/exemplar.go @@ -293,13 +293,13 @@ func (e *ExemplarEncoder) Init(state *WriterState, columns *pkg.WriteColumnSet) e.limiter = &state.limiter if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["Exemplar"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("Exemplar") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "Exemplar") } // Number of fields in the target schema. - e.fieldCount = uint(len(overrideSchema.Fields)) + e.fieldCount = fieldCount // Set that many 1 bits in the keepFieldMask. All fields with higher number // will be skipped when encoding. @@ -461,13 +461,13 @@ func (d *ExemplarDecoder) Init(state *ReaderState, columns *pkg.ReadColumnSet) e state.ExemplarDecoder = d if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["Exemplar"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("Exemplar") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "Exemplar") } // Number of fields in the target schema. - d.fieldCount = uint(len(overrideSchema.Fields)) + d.fieldCount = fieldCount } else { // Keep all fields when encoding. d.fieldCount = 5 diff --git a/go/otel/oteltef/exemplarvalue.go b/go/otel/oteltef/exemplarvalue.go index dbe4a37..a3dbdcc 100644 --- a/go/otel/oteltef/exemplarvalue.go +++ b/go/otel/oteltef/exemplarvalue.go @@ -203,13 +203,13 @@ func (e *ExemplarValueEncoder) Init(state *WriterState, columns *pkg.WriteColumn e.limiter = &state.limiter if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["ExemplarValue"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("ExemplarValue") + if !ok { return fmt.Errorf("cannot find oneof in override schema: %s", "ExemplarValue") } // Number of fields in the target schema. - e.fieldCount = uint(len(overrideSchema.Fields)) + e.fieldCount = fieldCount } else { // Keep all fields when encoding. e.fieldCount = 2 @@ -305,13 +305,13 @@ func (d *ExemplarValueDecoder) Init(state *ReaderState, columns *pkg.ReadColumnS state.ExemplarValueDecoder = d if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["ExemplarValue"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("ExemplarValue") + if !ok { return fmt.Errorf("cannot find oneof in override schema: %s", "ExemplarValue") } // Number of fields in the target schema. - d.fieldCount = uint(len(overrideSchema.Fields)) + d.fieldCount = fieldCount } else { // Keep all fields when encoding. d.fieldCount = 2 diff --git a/go/otel/oteltef/histogramvalue.go b/go/otel/oteltef/histogramvalue.go index ba6e357..fa5cec4 100644 --- a/go/otel/oteltef/histogramvalue.go +++ b/go/otel/oteltef/histogramvalue.go @@ -370,13 +370,13 @@ func (e *HistogramValueEncoder) Init(state *WriterState, columns *pkg.WriteColum e.limiter = &state.limiter if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["HistogramValue"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("HistogramValue") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "HistogramValue") } // Number of fields in the target schema. - e.fieldCount = uint(len(overrideSchema.Fields)) + e.fieldCount = fieldCount // Set that many 1 bits in the keepFieldMask. All fields with higher number // will be skipped when encoding. @@ -544,13 +544,13 @@ func (d *HistogramValueDecoder) Init(state *ReaderState, columns *pkg.ReadColumn state.HistogramValueDecoder = d if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["HistogramValue"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("HistogramValue") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "HistogramValue") } // Number of fields in the target schema. - d.fieldCount = uint(len(overrideSchema.Fields)) + d.fieldCount = fieldCount } else { // Keep all fields when encoding. d.fieldCount = 5 diff --git a/go/otel/oteltef/link.go b/go/otel/oteltef/link.go index 73b4852..00d7c88 100644 --- a/go/otel/oteltef/link.go +++ b/go/otel/oteltef/link.go @@ -340,13 +340,13 @@ func (e *LinkEncoder) Init(state *WriterState, columns *pkg.WriteColumnSet) erro e.limiter = &state.limiter if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["Link"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("Link") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "Link") } // Number of fields in the target schema. - e.fieldCount = uint(len(overrideSchema.Fields)) + e.fieldCount = fieldCount // Set that many 1 bits in the keepFieldMask. All fields with higher number // will be skipped when encoding. @@ -526,13 +526,13 @@ func (d *LinkDecoder) Init(state *ReaderState, columns *pkg.ReadColumnSet) error state.LinkDecoder = d if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["Link"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("Link") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "Link") } // Number of fields in the target schema. - d.fieldCount = uint(len(overrideSchema.Fields)) + d.fieldCount = fieldCount } else { // Keep all fields when encoding. d.fieldCount = 6 diff --git a/go/otel/oteltef/metric.go b/go/otel/oteltef/metric.go index 199b2ad..742943e 100644 --- a/go/otel/oteltef/metric.go +++ b/go/otel/oteltef/metric.go @@ -434,13 +434,13 @@ func (e *MetricEncoder) Init(state *WriterState, columns *pkg.WriteColumnSet) er e.dict = &state.Metric if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["Metric"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("Metric") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "Metric") } // Number of fields in the target schema. - e.fieldCount = uint(len(overrideSchema.Fields)) + e.fieldCount = fieldCount // Set that many 1 bits in the keepFieldMask. All fields with higher number // will be skipped when encoding. @@ -686,13 +686,13 @@ func (d *MetricDecoder) Init(state *ReaderState, columns *pkg.ReadColumnSet) err state.MetricDecoder = d if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["Metric"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("Metric") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "Metric") } // Number of fields in the target schema. - d.fieldCount = uint(len(overrideSchema.Fields)) + d.fieldCount = fieldCount } else { // Keep all fields when encoding. d.fieldCount = 8 diff --git a/go/otel/oteltef/metrics.go b/go/otel/oteltef/metrics.go index e68edcc..2788bc5 100644 --- a/go/otel/oteltef/metrics.go +++ b/go/otel/oteltef/metrics.go @@ -298,13 +298,13 @@ func (e *MetricsEncoder) Init(state *WriterState, columns *pkg.WriteColumnSet) e e.limiter = &state.limiter if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["Metrics"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("Metrics") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "Metrics") } // Number of fields in the target schema. - e.fieldCount = uint(len(overrideSchema.Fields)) + e.fieldCount = fieldCount // Set that many 1 bits in the keepFieldMask. All fields with higher number // will be skipped when encoding. @@ -484,13 +484,13 @@ func (d *MetricsDecoder) Init(state *ReaderState, columns *pkg.ReadColumnSet) er state.MetricsDecoder = d if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["Metrics"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("Metrics") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "Metrics") } // Number of fields in the target schema. - d.fieldCount = uint(len(overrideSchema.Fields)) + d.fieldCount = fieldCount } else { // Keep all fields when encoding. d.fieldCount = 6 diff --git a/go/otel/oteltef/metricswriter.go b/go/otel/oteltef/metricswriter.go index d13f5c2..80405ee 100644 --- a/go/otel/oteltef/metricswriter.go +++ b/go/otel/oteltef/metricswriter.go @@ -206,12 +206,12 @@ func (w *MetricsWriter) Flush() error { return w.restartFrame(w.opts.FrameRestartFlags) } -const wireSchemaMetrics = "\aMetrics\v\x02\bAnyValue\a$\x0eAnyValueString\x03\x00\x02\n\v\bAnyValue\f\fKeyValueList\x05\x00\bEnvelope\x01\f\x12EnvelopeAttributes\x00\bExemplar\x05\x01\v\rExemplarValue%\x04Span%\x05Trace\f\nAttributes\x02\rExemplarValue\x02\x00\x02\x00\x0eHistogramValue\x05\x00\x12\x12\x12\n\x00\x04\x06Metric\x06Metric\b$\nMetricName$\x11MetricDescription$\nMetricUnit\x01\f\nAttributes\n\x02\x01\x03\x01\aMetrics\x06\v\bEnvelope\v\x06Metric\v\bResource\v\x05Scope\f\nAttributes\v\x05Point\x00\x05Point\x04\x01\x01\v\nPointValue\n\v\bExemplar\x02\nPointValue\x03\x00\x02\v\x0eHistogramValue\x04\bResource\bResource\x03$\tSchemaURL\f\nAttributes\x01\x04\x05Scope\x05Scope\x05$\tScopeName$\fScopeVersion$\tSchemaURL\f\nAttributes\x01\x03\nAttributes$\fAttributeKey\v\bAnyValue\fKeyValueList\x04\v\bAnyValue\x12EnvelopeAttributes\x04\x05" +const wireSchemaMetrics = "\v\bAnyValue\a\bEnvelope\x01\bExemplar\x05\rExemplarValue\x02\x0eHistogramValue\x05\x06Metric\b\aMetrics\x06\x05Point\x04\nPointValue\x03\bResource\x03\x05Scope\x05" -func MetricsWireSchema() (*schema.Schema, error) { - var d schema.Schema - if err := d.Deserialize(bytes.NewBuffer([]byte(wireSchemaMetrics))); err != nil { - return nil, err +func MetricsWireSchema() (schema.WireSchema, error) { + var w schema.WireSchema + if err := w.Deserialize(bytes.NewBuffer([]byte(wireSchemaMetrics))); err != nil { + return w, err } - return &d, nil + return w, nil } diff --git a/go/otel/oteltef/point.go b/go/otel/oteltef/point.go index c792d1a..cbdaa23 100644 --- a/go/otel/oteltef/point.go +++ b/go/otel/oteltef/point.go @@ -255,13 +255,13 @@ func (e *PointEncoder) Init(state *WriterState, columns *pkg.WriteColumnSet) err e.limiter = &state.limiter if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["Point"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("Point") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "Point") } // Number of fields in the target schema. - e.fieldCount = uint(len(overrideSchema.Fields)) + e.fieldCount = fieldCount // Set that many 1 bits in the keepFieldMask. All fields with higher number // will be skipped when encoding. @@ -405,13 +405,13 @@ func (d *PointDecoder) Init(state *ReaderState, columns *pkg.ReadColumnSet) erro state.PointDecoder = d if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["Point"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("Point") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "Point") } // Number of fields in the target schema. - d.fieldCount = uint(len(overrideSchema.Fields)) + d.fieldCount = fieldCount } else { // Keep all fields when encoding. d.fieldCount = 4 diff --git a/go/otel/oteltef/pointvalue.go b/go/otel/oteltef/pointvalue.go index 8097619..90e608a 100644 --- a/go/otel/oteltef/pointvalue.go +++ b/go/otel/oteltef/pointvalue.go @@ -227,13 +227,13 @@ func (e *PointValueEncoder) Init(state *WriterState, columns *pkg.WriteColumnSet e.limiter = &state.limiter if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["PointValue"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("PointValue") + if !ok { return fmt.Errorf("cannot find oneof in override schema: %s", "PointValue") } // Number of fields in the target schema. - e.fieldCount = uint(len(overrideSchema.Fields)) + e.fieldCount = fieldCount } else { // Keep all fields when encoding. e.fieldCount = 3 @@ -345,13 +345,13 @@ func (d *PointValueDecoder) Init(state *ReaderState, columns *pkg.ReadColumnSet) state.PointValueDecoder = d if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["PointValue"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("PointValue") + if !ok { return fmt.Errorf("cannot find oneof in override schema: %s", "PointValue") } // Number of fields in the target schema. - d.fieldCount = uint(len(overrideSchema.Fields)) + d.fieldCount = fieldCount } else { // Keep all fields when encoding. d.fieldCount = 3 diff --git a/go/otel/oteltef/resource.go b/go/otel/oteltef/resource.go index ca9cd5c..107fe2a 100644 --- a/go/otel/oteltef/resource.go +++ b/go/otel/oteltef/resource.go @@ -253,13 +253,13 @@ func (e *ResourceEncoder) Init(state *WriterState, columns *pkg.WriteColumnSet) e.dict = &state.Resource if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["Resource"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("Resource") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "Resource") } // Number of fields in the target schema. - e.fieldCount = uint(len(overrideSchema.Fields)) + e.fieldCount = fieldCount // Set that many 1 bits in the keepFieldMask. All fields with higher number // will be skipped when encoding. @@ -415,13 +415,13 @@ func (d *ResourceDecoder) Init(state *ReaderState, columns *pkg.ReadColumnSet) e state.ResourceDecoder = d if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["Resource"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("Resource") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "Resource") } // Number of fields in the target schema. - d.fieldCount = uint(len(overrideSchema.Fields)) + d.fieldCount = fieldCount } else { // Keep all fields when encoding. d.fieldCount = 3 diff --git a/go/otel/oteltef/scope.go b/go/otel/oteltef/scope.go index 5d5aa14..c8f44bf 100644 --- a/go/otel/oteltef/scope.go +++ b/go/otel/oteltef/scope.go @@ -329,13 +329,13 @@ func (e *ScopeEncoder) Init(state *WriterState, columns *pkg.WriteColumnSet) err e.dict = &state.Scope if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["Scope"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("Scope") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "Scope") } // Number of fields in the target schema. - e.fieldCount = uint(len(overrideSchema.Fields)) + e.fieldCount = fieldCount // Set that many 1 bits in the keepFieldMask. All fields with higher number // will be skipped when encoding. @@ -527,13 +527,13 @@ func (d *ScopeDecoder) Init(state *ReaderState, columns *pkg.ReadColumnSet) erro state.ScopeDecoder = d if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["Scope"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("Scope") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "Scope") } // Number of fields in the target schema. - d.fieldCount = uint(len(overrideSchema.Fields)) + d.fieldCount = fieldCount } else { // Keep all fields when encoding. d.fieldCount = 5 diff --git a/go/otel/oteltef/span.go b/go/otel/oteltef/span.go index 9a1658b..282c9a1 100644 --- a/go/otel/oteltef/span.go +++ b/go/otel/oteltef/span.go @@ -617,13 +617,13 @@ func (e *SpanEncoder) Init(state *WriterState, columns *pkg.WriteColumnSet) erro e.limiter = &state.limiter if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["Span"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("Span") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "Span") } // Number of fields in the target schema. - e.fieldCount = uint(len(overrideSchema.Fields)) + e.fieldCount = fieldCount // Set that many 1 bits in the keepFieldMask. All fields with higher number // will be skipped when encoding. @@ -947,13 +947,13 @@ func (d *SpanDecoder) Init(state *ReaderState, columns *pkg.ReadColumnSet) error state.SpanDecoder = d if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["Span"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("Span") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "Span") } // Number of fields in the target schema. - d.fieldCount = uint(len(overrideSchema.Fields)) + d.fieldCount = fieldCount } else { // Keep all fields when encoding. d.fieldCount = 14 diff --git a/go/otel/oteltef/spans.go b/go/otel/oteltef/spans.go index 236074b..59f0789 100644 --- a/go/otel/oteltef/spans.go +++ b/go/otel/oteltef/spans.go @@ -239,13 +239,13 @@ func (e *SpansEncoder) Init(state *WriterState, columns *pkg.WriteColumnSet) err e.limiter = &state.limiter if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["Spans"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("Spans") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "Spans") } // Number of fields in the target schema. - e.fieldCount = uint(len(overrideSchema.Fields)) + e.fieldCount = fieldCount // Set that many 1 bits in the keepFieldMask. All fields with higher number // will be skipped when encoding. @@ -389,13 +389,13 @@ func (d *SpansDecoder) Init(state *ReaderState, columns *pkg.ReadColumnSet) erro state.SpansDecoder = d if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["Spans"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("Spans") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "Spans") } // Number of fields in the target schema. - d.fieldCount = uint(len(overrideSchema.Fields)) + d.fieldCount = fieldCount } else { // Keep all fields when encoding. d.fieldCount = 4 diff --git a/go/otel/oteltef/spanstatus.go b/go/otel/oteltef/spanstatus.go index e10b1bb..2e8517e 100644 --- a/go/otel/oteltef/spanstatus.go +++ b/go/otel/oteltef/spanstatus.go @@ -197,13 +197,13 @@ func (e *SpanStatusEncoder) Init(state *WriterState, columns *pkg.WriteColumnSet e.limiter = &state.limiter if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["SpanStatus"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("SpanStatus") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "SpanStatus") } // Number of fields in the target schema. - e.fieldCount = uint(len(overrideSchema.Fields)) + e.fieldCount = fieldCount // Set that many 1 bits in the keepFieldMask. All fields with higher number // will be skipped when encoding. @@ -311,13 +311,13 @@ func (d *SpanStatusDecoder) Init(state *ReaderState, columns *pkg.ReadColumnSet) state.SpanStatusDecoder = d if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs["SpanStatus"] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount("SpanStatus") + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", "SpanStatus") } // Number of fields in the target schema. - d.fieldCount = uint(len(overrideSchema.Fields)) + d.fieldCount = fieldCount } else { // Keep all fields when encoding. d.fieldCount = 2 diff --git a/go/otel/oteltef/spanswriter.go b/go/otel/oteltef/spanswriter.go index 0bfd5f7..caecaf5 100644 --- a/go/otel/oteltef/spanswriter.go +++ b/go/otel/oteltef/spanswriter.go @@ -206,12 +206,12 @@ func (w *SpansWriter) Flush() error { return w.restartFrame(w.opts.FrameRestartFlags) } -const wireSchemaSpans = "\x05Spans\t\x02\bAnyValue\a$\x0eAnyValueString\x03\x00\x02\n\v\bAnyValue\f\fKeyValueList\x05\x00\bEnvelope\x01\f\x12EnvelopeAttributes\x00\x05Event\x04$\rSpanEventName\x01\f\nAttributes\x01\x00\x04Link\x06\x05\x05\x04\x01\f\nAttributes\x01\x04\bResource\bResource\x03$\tSchemaURL\f\nAttributes\x01\x04\x05Scope\x05Scope\x05$\tScopeName$\fScopeVersion$\tSchemaURL\f\nAttributes\x01\x00\x04Span\x0e\x05\x05\x04\x05\x01$\bSpanName\x01\x01\x01\f\nAttributes\x01\n\v\x05Event\n\v\x04Link\v\nSpanStatus\x00\nSpanStatus\x02\x04\x01\x01\x05Spans\x04\v\bEnvelope\v\bResource\v\x05Scope\v\x04Span\x03\x12EnvelopeAttributes\x04\x05\nAttributes$\fAttributeKey\v\bAnyValue\fKeyValueList\x04\v\bAnyValue" +const wireSchemaSpans = "\t\bAnyValue\a\bEnvelope\x01\x05Event\x04\x04Link\x06\bResource\x03\x05Scope\x05\x04Span\x0e\nSpanStatus\x02\x05Spans\x04" -func SpansWireSchema() (*schema.Schema, error) { - var d schema.Schema - if err := d.Deserialize(bytes.NewBuffer([]byte(wireSchemaSpans))); err != nil { - return nil, err +func SpansWireSchema() (schema.WireSchema, error) { + var w schema.WireSchema + if err := w.Deserialize(bytes.NewBuffer([]byte(wireSchemaSpans))); err != nil { + return w, err } - return &d, nil + return w, nil } diff --git a/go/pkg/basereader.go b/go/pkg/basereader.go index 194d994..8a9719e 100644 --- a/go/pkg/basereader.go +++ b/go/pkg/basereader.go @@ -15,7 +15,7 @@ type BaseReader struct { FixedHeader FixedHeader VarHeader VarHeader - Schema *schema.Schema + Schema *schema.WireSchema ReadBufs ReadBufs @@ -86,7 +86,7 @@ func (r *BaseReader) ReadFixedHeader() error { return nil } -func (r *BaseReader) ReadVarHeader(ownSchema *schema.Schema) error { +func (r *BaseReader) ReadVarHeader(ownSchema schema.WireSchema) error { if _, err := r.FrameDecoder.Next(); err != nil { return err } @@ -106,7 +106,7 @@ func (r *BaseReader) ReadVarHeader(ownSchema *schema.Schema) error { if len(r.VarHeader.SchemaWireBytes) != 0 { buf = bytes.NewBuffer(r.VarHeader.SchemaWireBytes) - r.Schema = &schema.Schema{} + r.Schema = &schema.WireSchema{} err = r.Schema.Deserialize(buf) if err != nil { return err diff --git a/go/pkg/schema/schema.go b/go/pkg/schema/schema.go index 8ea943a..2149730 100644 --- a/go/pkg/schema/schema.go +++ b/go/pkg/schema/schema.go @@ -1,16 +1,10 @@ package schema import ( - "bytes" - "encoding/binary" - "errors" "fmt" - "sort" - - "github.com/splunk/stef/go/pkg/internal" ) -// Schema is a STEF schema description. +// Schema is a STEF schema description, serializable in JSON format. type Schema struct { PackageName string `json:"package,omitempty"` Structs map[string]*Struct `json:"structs"` @@ -18,16 +12,6 @@ type Schema struct { MainStruct string `json:"main"` } -const ( - MaxStructOrMultimapCount = 256 - MaxStructFieldCount = 256 -) - -var ( - errStructOrMultimapCountLimit = errors.New("struct or multimap count limit exceeded") - errStructFieldCountLimit = errors.New("struct field count limit exceeded") -) - type Compatibility int const ( @@ -195,20 +179,6 @@ func isCompatibleFieldType( return true } -// Minify removes data that is not necessary for wire format identification (such as field names). -// Typically, Minify is used before the schema is serialized and sent over network -// to avoid unnecessary overhead. -func (d *Schema) Minify() { - d.PackageName = "" - for _, struc := range d.Structs { - struc.minify() - } - - for _, m := range d.Multimaps { - m.minify() - } -} - // PrunedForRoot produces a pruned copy of the schema that includes the specified root // struct and parts of schema reachable from that root. Unreachable parts of the schema // are excluded. @@ -225,111 +195,6 @@ func (d *Schema) PrunedForRoot(rootStructName string) (*Schema, error) { return &out, nil } -/* -Binary serialization format: - -Schema { - MainStruct: String - StructCount: U64 - *Struct: Struct - MultimapCount: U64 - *Multimap: Multimap -} -*/ - -// Serialize the schema to binary format. -func (d *Schema) Serialize(dst *bytes.Buffer) error { - if err := internal.WriteString(d.MainStruct, dst); err != nil { - return nil - } - - if err := internal.WriteUvarint(uint64(len(d.Structs)), dst); err != nil { - return err - } - - // Sort for deterministic serialization. - var structs []string - for name := range d.Structs { - structs = append(structs, name) - } - sort.Strings(structs) - - for _, name := range structs { - str := d.Structs[name] - if err := str.serialize(name, dst); err != nil { - return nil - } - } - - if err := internal.WriteUvarint(uint64(len(d.Multimaps)), dst); err != nil { - return err - } - - var multimaps []string - for name := range d.Multimaps { - multimaps = append(multimaps, name) - } - sort.Strings(structs) - - for _, name := range multimaps { - mm := d.Multimaps[name] - if err := mm.serialize(name, dst); err != nil { - return nil - } - } - - return nil -} - -// Deserialize the schema from binary format. -func (d *Schema) Deserialize(src *bytes.Buffer) error { - var err error - d.MainStruct, err = internal.ReadString(src) - if err != nil { - return err - } - - count, err := binary.ReadUvarint(src) - if err != nil { - return err - } - - if count > MaxStructOrMultimapCount { - return errStructOrMultimapCountLimit - } - - d.Structs = make(map[string]*Struct, count) - for i := 0; i < int(count); i++ { - var str Struct - if err := str.deserialize(src); err != nil { - return err - } - d.Structs[str.Name] = &str - str.Name = "" - } - - count, err = binary.ReadUvarint(src) - if err != nil { - return err - } - - if count > MaxStructOrMultimapCount { - return errStructOrMultimapCountLimit - } - - d.Multimaps = make(map[string]*Multimap, count) - for i := 0; i < int(count); i++ { - var mm Multimap - if err := mm.deserialize(src); err != nil { - return err - } - d.Multimaps[mm.Name] = &mm - mm.Name = "" - } - - return nil -} - func (d *Schema) copyPrunedFieldType(fieldType *FieldType, dst *Schema) error { if fieldType.Struct != "" { if err := d.copyPrunedStruct(fieldType.Struct, dst); err != nil { @@ -359,6 +224,7 @@ func (d *Schema) copyPrunedStruct(strucName string, dst *Schema) error { } dstStruc := &Struct{ + Name: strucName, OneOf: srcStruc.OneOf, DictName: srcStruc.DictName, IsRoot: srcStruc.IsRoot, @@ -388,10 +254,11 @@ func (d *Schema) copyPrunedMultiMap(multiMapName string, dst *Schema) error { } dstMultimap := &Multimap{ + Name: multiMapName, Key: srcMultiMap.Key, Value: srcMultiMap.Value, } - dst.Multimaps[srcMultiMap.Name] = dstMultimap + dst.Multimaps[multiMapName] = dstMultimap if err := d.copyPrunedFieldType(&dstMultimap.Key.Type, dst); err != nil { return err @@ -404,6 +271,16 @@ func (d *Schema) copyPrunedMultiMap(multiMapName string, dst *Schema) error { return nil } +func (d *Schema) ToWire() WireSchema { + w := WireSchema{ + StructFieldCount: make(map[string]uint), + } + for k, v := range d.Structs { + w.StructFieldCount[k] = uint(len(v.Fields)) + } + return w +} + type Struct struct { Name string `json:"name,omitempty"` OneOf bool `json:"oneof,omitempty"` @@ -412,138 +289,12 @@ type Struct struct { Fields []StructField `json:"fields"` } -func (s *Struct) minify() { - // Name is not needed to identify wire format. It is already - // recording as the containing map element key. - s.Name = "" - - for i := range s.Fields { - s.Fields[i].minify() - } -} - -/* -Binary serialization format: - -Struct { - Flag: 8 - Name: String - /DictName: String/ - FieldCount: U64 - *Field: FieldType -} -*/ - -type structFlag byte - -const ( - structFlagIsRoot structFlag = 1 << iota - structFlagOneOf - structFlagHasDict -) - -func (s *Struct) serialize(name string, dst *bytes.Buffer) error { - var flags structFlag - if s.IsRoot { - flags |= structFlagIsRoot - } - if s.OneOf { - flags |= structFlagOneOf - } - if s.DictName != "" { - flags |= structFlagHasDict - } - if err := dst.WriteByte(byte(flags)); err != nil { - return err - } - if err := internal.WriteString(name, dst); err != nil { - return err - } - if s.DictName != "" { - if err := internal.WriteString(s.DictName, dst); err != nil { - return err - } - } - - if err := internal.WriteUvarint(uint64(len(s.Fields)), dst); err != nil { - return err - } - - for _, field := range s.Fields { - if err := field.serialize(dst); err != nil { - return err - } - } - - return nil -} - -func (s *Struct) deserialize(buf *bytes.Buffer) error { - f, err := buf.ReadByte() - if err != nil { - return err - } - flags := structFlag(f) - - if flags&structFlagIsRoot != 0 { - s.IsRoot = true - } - if flags&structFlagOneOf != 0 { - s.OneOf = true - } - - s.Name, err = internal.ReadString(buf) - if err != nil { - return err - } - - if flags&structFlagHasDict != 0 { - s.DictName, err = internal.ReadString(buf) - if err != nil { - return err - } - } - - count, err := binary.ReadUvarint(buf) - if err != nil { - return err - } - - if count > MaxStructFieldCount { - return errStructFieldCountLimit - } - - s.Fields = make([]StructField, count) - for i := range s.Fields { - if err := s.Fields[i].deserialize(buf); err != nil { - return err - } - } - - return nil -} - type StructField struct { FieldType Name string `json:"name,omitempty"` Optional bool `json:"optional,omitempty"` } -func (f *StructField) minify() { - // Name is not needed to identify wire format. - f.Name = "" -} - -func (f *StructField) serialize(buf *bytes.Buffer) error { - return f.FieldType.serialize(buf, f.Optional) -} - -func (f *StructField) deserialize(buf *bytes.Buffer) error { - var err error - f.Optional, err = f.FieldType.deserialize(buf) - return err -} - type PrimitiveFieldType int const ( @@ -563,193 +314,12 @@ type FieldType struct { DictName string `json:"dict,omitempty"` } -type TypeDescr byte - -const ( - TypeDescrArray TypeDescr = TypeDescr(iota + 10) - TypeDescrStruct - TypeDescrMultimap - TypeDescrTypeMask = 0b001111 - TypeDescrOptional = 0b010000 - TypeDescrHasDict = 0b100000 -) - -/* -Binary serialization format: - -FieldType { - TypeDescr: 8 - /ElemType: FieldType/ - /StructName: String/ - /MultimapName: String/ - /DictName: String/ -} -*/ - -func (f *FieldType) serialize(buf *bytes.Buffer, optional bool) error { - var typeDescr TypeDescr - - // Bits 0-3 - type: 0-5 = primitive type - // 6-9 = reserved - // 10 = array - // 11 = struct - // 12 = multimap - // 13-15 = reserved - // Bit 4 - optional - // Bit 5 - has dict. - // But 6-7 - reserved. - - if f.Primitive != nil { - typeDescr = TypeDescr(*f.Primitive) - } else if f.Array != nil { - typeDescr = TypeDescrArray - } else if f.Struct != "" { - typeDescr = TypeDescrStruct - } else if f.MultiMap != "" { - typeDescr = TypeDescrMultimap - } else { - panic("unknown field type") - } - - if optional { - typeDescr |= TypeDescrOptional - } - - if f.DictName != "" { - typeDescr |= TypeDescrHasDict - } - - if err := buf.WriteByte(byte(typeDescr)); err != nil { - return err - } - - if f.Array != nil { - return f.Array.serialize(buf, false) - } else if f.Struct != "" { - if err := internal.WriteString(f.Struct, buf); err != nil { - return err - } - } else if f.MultiMap != "" { - if err := internal.WriteString(f.MultiMap, buf); err != nil { - return err - } - } - - if f.DictName != "" { - if err := internal.WriteString(f.DictName, buf); err != nil { - return err - } - } - - return nil -} - -func (f *FieldType) deserialize(buf *bytes.Buffer) (optional bool, err error) { - td, err := buf.ReadByte() - if err != nil { - return false, err - } - typeDescr := TypeDescr(td) - - optional = typeDescr&TypeDescrOptional != 0 - typ := typeDescr & TypeDescrTypeMask - switch typ { - case TypeDescrArray: - var elemType FieldType - _, err = elemType.deserialize(buf) - if err != nil { - return false, err - } - f.Array = &elemType - - case TypeDescrStruct: - f.Struct, err = internal.ReadString(buf) - if err != nil { - return false, err - } - - case TypeDescrMultimap: - f.MultiMap, err = internal.ReadString(buf) - if err != nil { - return false, err - } - - default: - if byte(typ) <= byte(PrimitiveTypeBytes) { - p := PrimitiveFieldType(typ) - f.Primitive = &p - } else { - return false, errors.New("unknown type") - } - } - - if typeDescr&TypeDescrHasDict != 0 { - f.DictName, err = internal.ReadString(buf) - if err != nil { - return false, err - } - } - - return optional, nil -} - type MultimapField struct { Type FieldType `json:"type"` } -func (f *MultimapField) minify() { -} - -func (f *MultimapField) serialize(buf *bytes.Buffer) error { - return f.Type.serialize(buf, false) -} - -func (f *MultimapField) deserialize(buf *bytes.Buffer) error { - _, err := f.Type.deserialize(buf) - return err -} - type Multimap struct { Name string `json:"name,omitempty"` Key MultimapField `json:"key"` Value MultimapField `json:"value"` } - -func (m *Multimap) minify() { - m.Name = "" - m.Key.minify() - m.Value.minify() -} - -/* -Binary serialization format: - -Multimap { - Name: String - Key: FieldType - Value: FieldType -} -*/ - -func (m *Multimap) serialize(name string, buf *bytes.Buffer) error { - if err := internal.WriteString(name, buf); err != nil { - return err - } - if err := m.Key.serialize(buf); err != nil { - return err - } - return m.Value.serialize(buf) -} - -func (m *Multimap) deserialize(buf *bytes.Buffer) error { - var err error - m.Name, err = internal.ReadString(buf) - if err != nil { - return err - } - - if err := m.Key.deserialize(buf); err != nil { - return err - } - return m.Value.deserialize(buf) -} diff --git a/go/pkg/schema/schema_test.go b/go/pkg/schema/schema_test.go index d466c66..9d1bf13 100644 --- a/go/pkg/schema/schema_test.go +++ b/go/pkg/schema/schema_test.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "math/rand" "os" "reflect" "testing" @@ -31,7 +32,6 @@ func TestSerializeSchema(t *testing.T) { prunedSchema, err := schema.PrunedForRoot("Metrics") require.NoError(t, err) - prunedSchema.Minify() minifiedJson, err := json.Marshal(prunedSchema) require.NoError(t, err) @@ -39,23 +39,24 @@ func TestSerializeSchema(t *testing.T) { fmt.Printf("JSON: %5d, zstd: %4d\n", len(minifiedJson), len(compressedJson)) + wireSchema := prunedSchema.ToWire() var wireBytes bytes.Buffer - err = prunedSchema.Serialize(&wireBytes) + err = wireSchema.Serialize(&wireBytes) require.NoError(t, err) compressedBin := compressZstd(wireBytes.Bytes()) - fmt.Printf("BIN: %5d, zstd: %4d\n", wireBytes.Len(), len(compressedBin)) + fmt.Printf("WIRE: %5d, zstd: %4d\n", wireBytes.Len(), len(compressedBin)) - var readSchema Schema + var readSchema WireSchema err = readSchema.Deserialize(&wireBytes) require.NoError(t, err) - diff := cmp.Diff(prunedSchema, &readSchema) + diff := cmp.Diff(wireSchema, readSchema) if diff != "" { assert.Fail(t, diff) } - assert.True(t, reflect.DeepEqual(prunedSchema, &readSchema)) + assert.True(t, reflect.DeepEqual(wireSchema, readSchema)) } func FuzzDeserialize(f *testing.F) { @@ -72,10 +73,9 @@ func FuzzDeserialize(f *testing.F) { prunedSchema, err := schema.PrunedForRoot(root) require.NoError(f, err) - prunedSchema.Minify() - + wireSchema := prunedSchema.ToWire() var wireBytes bytes.Buffer - err = prunedSchema.Serialize(&wireBytes) + err = wireSchema.Serialize(&wireBytes) require.NoError(f, err) f.Add(wireBytes.Bytes()) @@ -83,8 +83,565 @@ func FuzzDeserialize(f *testing.F) { f.Fuzz( func(t *testing.T, data []byte) { - var readSchema Schema + var readSchema WireSchema _ = readSchema.Deserialize(bytes.NewBuffer(data)) }, ) } + +func TestSchemaSelfCompatible(t *testing.T) { + p := PrimitiveTypeString + schemas := []*Schema{ + { + PackageName: "pkg", + Structs: map[string]*Struct{ + "Root": {Name: "Root"}, + }, + MainStruct: "Root", + }, + { + PackageName: "pkg", + Structs: map[string]*Struct{ + "Root": { + Name: "Root", + Fields: []StructField{ + { + FieldType: FieldType{MultiMap: "Multi"}, + Name: "F1", + }, + }, + }, + }, + Multimaps: map[string]*Multimap{ + "Multi": { + Name: "Multi", + Key: MultimapField{Type: FieldType{Primitive: &p}}, + Value: MultimapField{Type: FieldType{Primitive: &p}}, + }, + }, + MainStruct: "Root", + }, + } + + for _, schema := range schemas { + wireSchema := schema.ToWire() + compat, err := wireSchema.Compatible(&wireSchema) + require.NoError(t, err) + assert.EqualValues(t, CompatibilityExact, compat) + } +} + +func TestSchemaSuperset(t *testing.T) { + primitiveTypeInt64 := PrimitiveTypeInt64 + primitiveTypeString := PrimitiveTypeString + + tests := []struct { + old *Schema + new *Schema + }{ + { + old: &Schema{ + PackageName: "abc", + Structs: map[string]*Struct{ + "Root": { + Name: "Root", + Fields: []StructField{ + { + FieldType: FieldType{ + Primitive: &primitiveTypeInt64, + }, + Name: "F1", + }, + }, + }, + }, + Multimaps: nil, + MainStruct: "Root", + }, + new: &Schema{ + PackageName: "def", + Structs: map[string]*Struct{ + "Root": { + Name: "Root", + Fields: []StructField{ + { + FieldType: FieldType{ + Primitive: &primitiveTypeInt64, + }, + Name: "F1", + }, + { + FieldType: FieldType{ + Primitive: &primitiveTypeInt64, + }, + Name: "F2", + }, + }, + }, + }, + Multimaps: nil, + MainStruct: "Root", + }, + }, + { + old: &Schema{ + PackageName: "abc", + Structs: map[string]*Struct{ + "Root": { + Name: "Root", + Fields: []StructField{ + { + FieldType: FieldType{ + Primitive: &primitiveTypeInt64, + }, + Name: "F1", + }, + { + FieldType: FieldType{ + Struct: "A", + }, + Name: "F2", + }, + }, + }, + "A": { + Name: "A", + Fields: []StructField{ + { + FieldType: FieldType{ + Primitive: &primitiveTypeInt64, + }, + Name: "F1", + }, + { + FieldType: FieldType{ + Struct: "B", + }, + Name: "F2", + Optional: true, + }, + { + FieldType: FieldType{ + MultiMap: "M", + }, + Name: "F3", + }, + }, + }, + "B": { + Name: "B", + Fields: []StructField{ + { + FieldType: FieldType{ + Primitive: &primitiveTypeInt64, + }, + Name: "F1", + }, + { + FieldType: FieldType{ + Struct: "A", + }, + Name: "F2", + }, + }, + }, + }, + Multimaps: map[string]*Multimap{ + "M": { + Name: "M", + Key: MultimapField{Type: FieldType{Primitive: &primitiveTypeInt64}}, + Value: MultimapField{Type: FieldType{Primitive: &primitiveTypeString}}, + }, + }, + MainStruct: "Root", + }, + new: &Schema{ + PackageName: "def", + Structs: map[string]*Struct{ + "Root": { + Name: "Root", + Fields: []StructField{ + { + FieldType: FieldType{ + Primitive: &primitiveTypeInt64, + }, + Name: "F1", + }, + { + FieldType: FieldType{ + Struct: "A", + }, + Name: "F2", + }, + { + FieldType: FieldType{ + Struct: "D", + }, + Name: "F3", + }, + }, + }, + "A": { + Name: "A", + Fields: []StructField{ + { + FieldType: FieldType{ + Primitive: &primitiveTypeInt64, + }, + Name: "F1", + }, + { + FieldType: FieldType{ + Struct: "B", + }, + Name: "F2", + Optional: true, + }, + { + FieldType: FieldType{ + MultiMap: "M", + }, + Name: "F3", + }, + }, + }, + "B": { + Name: "B", + Fields: []StructField{ + { + FieldType: FieldType{ + Primitive: &primitiveTypeInt64, + }, + Name: "F1", + }, + { + FieldType: FieldType{ + Struct: "A", + }, + Name: "F2", + }, + { + FieldType: FieldType{ + Primitive: &primitiveTypeInt64, + }, + Name: "F3", + }, + }, + }, + "D": { + Name: "D", + OneOf: true, + DictName: "", + Fields: []StructField{ + { + FieldType: FieldType{Primitive: &primitiveTypeInt64}, + Name: "F1", + }, + }, + }, + }, + Multimaps: map[string]*Multimap{ + "M": { + Name: "M", + Key: MultimapField{Type: FieldType{Primitive: &primitiveTypeInt64}}, + Value: MultimapField{Type: FieldType{Primitive: &primitiveTypeString}}, + }, + }, + MainStruct: "Root2", + }, + }, + } + + for _, test := range tests { + oldSchema := test.old.ToWire() + newSchema := test.new.ToWire() + + compat, err := newSchema.Compatible(&oldSchema) + require.NoError(t, err) + assert.EqualValues(t, CompatibilitySuperset, compat) + } +} + +func TestSchemaIncompatible(t *testing.T) { + primitiveTypeInt64 := PrimitiveTypeInt64 + + tests := []struct { + old *Schema + new *Schema + err string + }{ + { + old: &Schema{ + PackageName: "abc", + Structs: map[string]*Struct{ + "Root": { + Name: "Root", + Fields: []StructField{ + { + FieldType: FieldType{ + Primitive: &primitiveTypeInt64, + }, + Name: "F1", + }, + { + FieldType: FieldType{ + Primitive: &primitiveTypeInt64, + }, + Name: "F2", + }, + }, + }, + }, + Multimaps: nil, + MainStruct: "Root", + }, + new: &Schema{ + PackageName: "def", + Structs: map[string]*Struct{ + "Root": { + Name: "Root", + Fields: []StructField{ + { + FieldType: FieldType{ + Primitive: &primitiveTypeInt64, + }, + Name: "F1", + }, + }, + }, + }, + MainStruct: "Root", + }, + err: "struct Root has fewer fields in new schema (1 vs 2)", + }, + } + + for _, test := range tests { + oldSchema := test.old.ToWire() + newSchema := test.new.ToWire() + + compat, err := newSchema.Compatible(&oldSchema) + require.Error(t, err) + assert.EqualValues(t, test.err, err.Error()) + assert.EqualValues(t, CompatibilityIncompatible, compat) + } +} + +func expandSchema(t *testing.T, r *rand.Rand, orig *Schema) (cpy *Schema) { + cpy, err := orig.PrunedForRoot(orig.MainStruct) + require.NoError(t, err) + for { + for _, str := range cpy.Structs { + if expandStruct(t, r, cpy, str) { + return cpy + } + } + } +} + +func expandStruct(t *testing.T, r *rand.Rand, schema *Schema, str *Struct) bool { + if r.Intn(10) == 0 { + field := StructField{ + FieldType: FieldType{}, + Name: fmt.Sprintf("Field#%d", len(str.Fields)+1), + } + + p := PrimitiveTypeString + switch r.Intn(4) { + case 0: + field.FieldType.Primitive = &p + if r.Intn(10) == 0 { + field.DictName = "Dict" + field.Name + } + + case 1: + f := FieldType{Primitive: &p} + field.FieldType.Array = &f + case 2: + multimapIdx := r.Intn(len(schema.Multimaps)) + i := 0 + for multimapName := range schema.Multimaps { + if i == multimapIdx { + field.FieldType.MultiMap = multimapName + break + } + i++ + } + case 3: + if r.Intn(2) == 0 { + // Add new struct + struc := Struct{ + Name: fmt.Sprintf("Struct#%d", len(schema.Structs)), + Fields: []StructField{}, + } + schema.Structs[struc.Name] = &struc + field.FieldType.Struct = struc.Name + } else { + structIdx := r.Intn(len(schema.Structs)) + i := 0 + for structName := range schema.Structs { + if i == structIdx { + field.FieldType.Struct = structName + break + } + i++ + } + } + } + + str.Fields = append(str.Fields, field) + return true + } + + for _, field := range str.Fields { + if field.Struct != "" { + if r.Intn(10) == 0 { + childStruct := schema.Structs[field.Struct] + changed := expandStruct(t, r, schema, childStruct) + if changed { + return true + } + } + } + } + + return false +} + +func shrinkSchema(t *testing.T, r *rand.Rand, orig *Schema) (cpy *Schema) { + cpy, err := orig.PrunedForRoot(orig.MainStruct) + require.NoError(t, err) + for { + for _, str := range cpy.Structs { + if shrinkStruct(t, r, cpy, str) { + return cpy + } + } + } +} + +func shrinkStruct(t *testing.T, r *rand.Rand, schema *Schema, str *Struct) bool { + if r.Intn(10) == 0 && len(str.Fields) > 0 { + str.Fields = str.Fields[0 : len(str.Fields)-1] + return true + } + + for _, field := range str.Fields { + if field.Struct != "" { + if r.Intn(3) == 0 { + childStruct := schema.Structs[field.Struct] + changed := shrinkStruct(t, r, schema, childStruct) + if changed { + return true + } + } + } + } + + return false +} + +func TestSchemaExpand(t *testing.T) { + schemaJson, err := os.ReadFile("testdata/oteltef.wire.json") + require.NoError(t, err) + + orig := &Schema{} + err = json.Unmarshal(schemaJson, &orig) + require.NoError(t, err) + orig, err = orig.PrunedForRoot(orig.MainStruct) + require.NoError(t, err) + + r := rand.New(rand.NewSource(42)) + + // Expand one field at a time and check compatibility. + for i := 0; i < 200; i++ { + expanded := expandSchema(t, r, orig) + expandedWire := expanded.ToWire() + require.NoError(t, err) + + // Exact compatible with itself + compat, err := expandedWire.Compatible(&expandedWire) + require.NoError(t, err, i) + assert.EqualValues(t, CompatibilityExact, compat, i) + + // Expanding is compatible / superset + origWire := orig.ToWire() + require.NoError(t, err, i) + compat, err = expandedWire.Compatible(&origWire) + require.NoError(t, err, i) + assert.EqualValues(t, CompatibilitySuperset, compat, i) + + // Opposite direction is incompatible + compat, err = origWire.Compatible(&expandedWire) + require.Error(t, err, i) + assert.EqualValues(t, CompatibilityIncompatible, compat, i) + + // Also check that serialization works correctly. + + // Serialize + var buf bytes.Buffer + err = expandedWire.Serialize(&buf) + require.NoError(t, err) + + // Deserialize + var cpy WireSchema + err = cpy.Deserialize(&buf) + require.NoError(t, err) + + // Compare deserialized schema + require.EqualValues(t, expandedWire, cpy) + + orig = expanded + } +} + +func TestSchemaShrink(t *testing.T) { + schemaJson, err := os.ReadFile("testdata/oteltef.wire.json") + require.NoError(t, err) + + orig := &Schema{} + err = json.Unmarshal(schemaJson, &orig) + require.NoError(t, err) + + r := rand.New(rand.NewSource(42)) + + // Expand the schema, make it much bigger, so there is room for shrinking. + for i := 0; i < 200; i++ { + orig = expandSchema(t, r, orig) + } + + // Now shrink one field at a time and check compatibility. + for i := 0; i < 100; i++ { + shrinked := shrinkSchema(t, r, orig) + shrinkedWire := shrinked.ToWire() + require.NoError(t, err) + + // Shrinking is incompatible + origWire := orig.ToWire() + compat, err := shrinkedWire.Compatible(&origWire) + require.Error(t, err, i) + assert.EqualValues(t, CompatibilityIncompatible, compat, i) + + // Opposite direction is compatible/superset + compat, err = origWire.Compatible(&shrinkedWire) + require.NoError(t, err, i) + assert.EqualValues(t, CompatibilitySuperset, compat, i) + + // Also check that serialization works correctly. + + // Serialize + var buf bytes.Buffer + err = shrinkedWire.Serialize(&buf) + require.NoError(t, err) + + // Deserialize + var cpy WireSchema + err = cpy.Deserialize(&buf) + require.NoError(t, err) + + // Compare deserialized schema + require.EqualValues(t, shrinkedWire, cpy) + + orig = shrinked + } +} diff --git a/go/pkg/schema/testdata/oteltef.wire.json b/go/pkg/schema/testdata/oteltef.wire.json new file mode 100755 index 0000000..d9cd4ad --- /dev/null +++ b/go/pkg/schema/testdata/oteltef.wire.json @@ -0,0 +1,467 @@ +{ + "package": "oteltef", + "structs": { + "AnyValue": { + "name": "AnyValue", + "oneof": true, + "fields": [ + { + "primitive": 4, + "dict": "AnyValueString", + "name": "String" + }, + { + "primitive": 3, + "name": "Bool" + }, + { + "primitive": 0, + "name": "Int64" + }, + { + "primitive": 2, + "name": "Float64" + }, + { + "array": { + "struct": "AnyValue" + }, + "name": "Array", + "recursive": true + }, + { + "multimap": "KeyValueList", + "name": "KVList", + "recursive": true + }, + { + "primitive": 5, + "name": "Bytes" + } + ] + }, + "Envelope": { + "name": "Envelope", + "fields": [ + { + "multimap": "EnvelopeAttributes", + "name": "Attributes" + } + ] + }, + "Exemplar": { + "name": "Exemplar", + "fields": [ + { + "primitive": 1, + "name": "Timestamp" + }, + { + "struct": "ExemplarValue", + "name": "Value" + }, + { + "primitive": 5, + "dict": "Span", + "name": "SpanID" + }, + { + "primitive": 5, + "dict": "Trace", + "name": "TraceID" + }, + { + "multimap": "Attributes", + "name": "FilteredAttributes" + } + ] + }, + "ExemplarValue": { + "name": "ExemplarValue", + "oneof": true, + "fields": [ + { + "primitive": 0, + "name": "Int64" + }, + { + "primitive": 2, + "name": "Float64" + } + ] + }, + "HistogramValue": { + "name": "HistogramValue", + "fields": [ + { + "primitive": 0, + "name": "Count" + }, + { + "primitive": 2, + "name": "Sum", + "optional": true + }, + { + "primitive": 2, + "name": "Min", + "optional": true + }, + { + "primitive": 2, + "name": "Max", + "optional": true + }, + { + "array": { + "primitive": 0 + }, + "name": "BucketCounts" + } + ] + }, + "Metric": { + "name": "Metric", + "dict": "Metric", + "fields": [ + { + "primitive": 4, + "dict": "MetricName", + "name": "Name" + }, + { + "primitive": 4, + "dict": "MetricDescription", + "name": "Description" + }, + { + "primitive": 4, + "dict": "MetricUnit", + "name": "Unit" + }, + { + "primitive": 1, + "name": "Type" + }, + { + "multimap": "Attributes", + "name": "Metadata" + }, + { + "array": { + "primitive": 2 + }, + "name": "HistogramBounds" + }, + { + "primitive": 1, + "name": "AggregationTemporality" + }, + { + "primitive": 3, + "name": "Monotonic" + } + ] + }, + "Point": { + "name": "Point", + "fields": [ + { + "primitive": 1, + "name": "StartTimestamp" + }, + { + "primitive": 1, + "name": "Timestamp" + }, + { + "struct": "PointValue", + "name": "Value" + }, + { + "array": { + "struct": "Exemplar" + }, + "name": "Exemplars" + } + ] + }, + "PointValue": { + "name": "PointValue", + "oneof": true, + "fields": [ + { + "primitive": 0, + "name": "Int64" + }, + { + "primitive": 2, + "name": "Float64" + }, + { + "struct": "HistogramValue", + "name": "Histogram" + } + ] + }, + "Metrics": { + "name": "Metrics", + "root": true, + "fields": [ + { + "struct": "Envelope", + "name": "Envelope" + }, + { + "struct": "Metric", + "name": "Metric" + }, + { + "struct": "Resource", + "name": "Resource" + }, + { + "struct": "Scope", + "name": "Scope" + }, + { + "multimap": "Attributes", + "name": "Attributes" + }, + { + "struct": "Point", + "name": "Point" + } + ] + }, + "Resource": { + "name": "Resource", + "dict": "Resource", + "fields": [ + { + "primitive": 4, + "dict": "SchemaURL", + "name": "SchemaURL" + }, + { + "multimap": "Attributes", + "name": "Attributes" + } + ] + }, + "Scope": { + "name": "Scope", + "dict": "Scope", + "fields": [ + { + "primitive": 4, + "dict": "ScopeName", + "name": "Name" + }, + { + "primitive": 4, + "dict": "ScopeVersion", + "name": "Version" + }, + { + "primitive": 4, + "dict": "SchemaURL", + "name": "SchemaURL" + }, + { + "multimap": "Attributes", + "name": "Attributes" + } + ] + }, + + "Span": { + "name": "Span", + "fields": [ + { + "primitive": 5, + "name": "TraceID" + }, + { + "primitive": 5, + "name": "SpanID" + }, + { + "primitive": 4, + "name": "TraceState" + }, + { + "primitive": 5, + "name": "ParentSpanID" + }, + { + "primitive": 1, + "name": "Flags" + }, + { + "primitive": 4, + "dict": "SpanName", + "name": "Name" + }, + { + "primitive": 1, + "name": "Kind" + }, + { + "primitive": 1, + "name": "StartTimeUnixNano" + }, + { + "primitive": 1, + "name": "EndTimeUnixNano" + }, + { + "multimap": "Attributes", + "name": "Attributes" + }, + { + "array": { + "struct": "Event" + }, + "name": "Events" + }, + { + "array": { + "struct": "Link" + }, + "name": "Links" + }, + { + "struct": "SpanStatus", + "name": "Status" + } + ] + }, + + "Link": { + "name": "Link", + "fields": [ + { + "primitive": 5, + "name": "TraceID" + }, + { + "primitive": 5, + "name": "SpanID" + }, + { + "primitive": 4, + "name": "TraceState" + }, + { + "primitive": 1, + "name": "Flags" + }, + { + "multimap": "Attributes", + "name": "Attributes" + } + ] + }, + + "Event": { + "name": "Event", + "fields": [ + { + "primitive": 4, + "dict": "SpanEventName", + "name": "Name" + }, + { + "primitive": 1, + "name": "TimeUnixNano" + }, + { + "multimap": "Attributes", + "name": "Attributes" + } + ] + }, + + "SpanStatus": { + "name": "SpanStatus", + "fields": [ + { + "primitive": 4, + "name": "Message" + }, + { + "primitive": 1, + "name": "Code" + } + ] + }, + + "Spans": { + "name": "Spans", + "root": true, + "fields": [ + { + "struct": "Envelope", + "name": "Envelope" + }, + { + "struct": "Resource", + "name": "Resource" + }, + { + "struct": "Scope", + "name": "Scope" + }, + { + "struct": "Span", + "name": "Span" + } + ] + } + }, + "multimaps": { + "Attributes": { + "name": "Attributes", + "key": { + "type": { + "primitive": 4, + "dict": "AttributeKey" + } + }, + "value": { + "type": { + "struct": "AnyValue" + } + } + }, + "EnvelopeAttributes": { + "name": "EnvelopeAttributes", + "key": { + "type": { + "primitive": 4 + } + }, + "value": { + "type": { + "primitive": 5 + } + } + }, + "KeyValueList": { + "name": "KeyValueList", + "key": { + "type": { + "primitive": 4 + } + }, + "value": { + "type": { + "struct": "AnyValue" + }, + "recursive": true + } + } + }, + "main": "Metrics" +} \ No newline at end of file diff --git a/go/pkg/schema/wireschema.go b/go/pkg/schema/wireschema.go new file mode 100644 index 0000000..1543432 --- /dev/null +++ b/go/pkg/schema/wireschema.go @@ -0,0 +1,142 @@ +package schema + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "sort" + + "github.com/splunk/stef/go/pkg/internal" +) + +// WireSchema is caries only the parts of the schema which are necessary to be +// communicated between readers and writers which need to work with evolving +// versions of the same schema. +// +// WireSchema allows readers and writers to perform compatibility checks +// of their schema version with the schema version that a peer they communicate +// with uses. +// +// The only valid way to evolve a STEF schema is by adding new fields at the end +// of the existing structs. This means that in order to correctly read/write an +// evolved schema the only necessary information is the number of the fields in +// in each struct. This is precisely the information that is recorded in WireSchema. +// +// The full schema information can be recorded in a schema.Schema, however that +// full information is not necessary for wire compatibility checks. +type WireSchema struct { + // Number of fields in each struct (by struct name) + StructFieldCount map[string]uint +} + +func (w *WireSchema) FieldCount(structName string) (uint, bool) { + count, ok := w.StructFieldCount[structName] + return count, ok +} + +const ( + MaxStructOrMultimapCount = 1024 +) + +var ( + errStructCountLimit = errors.New("struct count limit exceeded") +) + +/* +Serialization format: + +WireSchema { + StructCount: U64 + *Struct: WireStruct +} +WireStruct { + StructName: String + FieldCount: U64 +} +String { + LengthInBytes: U64 + *Bytes: 8 +} +*/ + +// Serialize the schema to binary format. +func (w *WireSchema) Serialize(dst *bytes.Buffer) error { + if err := internal.WriteUvarint(uint64(len(w.StructFieldCount)), dst); err != nil { + return err + } + + // Sort for deterministic serialization. + var structs []string + for name := range w.StructFieldCount { + structs = append(structs, name) + } + sort.Strings(structs) + + for _, structName := range structs { + fieldCount := w.StructFieldCount[structName] + + if err := internal.WriteString(structName, dst); err != nil { + return err + } + if err := internal.WriteUvarint(uint64(fieldCount), dst); err != nil { + return err + } + } + return nil +} + +// Deserialize the schema from binary format. +func (w *WireSchema) Deserialize(src *bytes.Buffer) error { + count, err := binary.ReadUvarint(src) + if err != nil { + return err + } + + if count > MaxStructOrMultimapCount { + return errStructCountLimit + } + + w.StructFieldCount = make(map[string]uint, count) + for i := 0; i < int(count); i++ { + structName, err := internal.ReadString(src) + if err != nil { + return err + } + fieldCount, err := binary.ReadUvarint(src) + if err != nil { + return err + } + + w.StructFieldCount[structName] = uint(fieldCount) + } + return nil +} + +// Compatible checks backward compatibility of this schema with oldSchema. +// If the schemas are incompatible returns CompatibilityIncompatible and an error. +func (w *WireSchema) Compatible(oldSchema *WireSchema) (Compatibility, error) { + exactCompat := true + for structName, fieldCount := range oldSchema.StructFieldCount { + newCount, exists := w.StructFieldCount[structName] + if !exists { + return CompatibilityIncompatible, + fmt.Errorf("struct %s does not exist in new schema", structName) + } + if newCount < fieldCount { + return CompatibilityIncompatible, + fmt.Errorf( + "struct %s has fewer fields in new schema (%d vs %d)", structName, + newCount, fieldCount, + ) + } else if newCount > fieldCount { + exactCompat = false + } + } + + if exactCompat { + return CompatibilityExact, nil + } + + return CompatibilitySuperset, nil +} diff --git a/go/pkg/writeropts.go b/go/pkg/writeropts.go index 5c0a7cb..9237754 100644 --- a/go/pkg/writeropts.go +++ b/go/pkg/writeropts.go @@ -69,7 +69,7 @@ type WriterOptions struct { // The schema must be compatible with Writer's native schema otherwise // an error will be returned when attempting to create the Writer. // In nil the Writer will write in its native schema. - Schema *schema.Schema + Schema *schema.WireSchema // UserData is optional user-defined data that will be stored in the header. UserData map[string]string diff --git a/otelcol/cmd/stefmockserver/main.go b/otelcol/cmd/stefmockserver/main.go index 3b09b78..19c1385 100644 --- a/otelcol/cmd/stefmockserver/main.go +++ b/otelcol/cmd/stefmockserver/main.go @@ -86,7 +86,7 @@ func main() { settings := tefgrpc.ServerSettings{ Logger: nil, - ServerSchema: schema, + ServerSchema: &schema, MaxDictBytes: 0, OnStream: onStream, } diff --git a/otelcol/internal/stefexporter/exporter.go b/otelcol/internal/stefexporter/exporter.go index 96495c5..feefbda 100644 --- a/otelcol/internal/stefexporter/exporter.go +++ b/otelcol/internal/stefexporter/exporter.go @@ -211,7 +211,7 @@ func (s *stefExporter) startGrpcClient(compression pkg.Compression) error { settings := tef_grpc.ClientSettings{ Logger: &wrapLogger{s.logger}, GrpcClient: grpcClient, - ClientSchema: schema, + ClientSchema: &schema, Callbacks: tef_grpc.ClientCallbacks{ OnAck: s.onGrpcAck, }, diff --git a/stefgen/generator/writer.go b/stefgen/generator/writer.go index 44171d5..5546c9f 100644 --- a/stefgen/generator/writer.go +++ b/stefgen/generator/writer.go @@ -17,10 +17,11 @@ func (g *Generator) oWriters() error { } func (g *Generator) oWriter(str *genStructDef) error { - wireSchema, err := g.schema.PrunedForRoot(str.Name) + prunedSchema, err := g.schema.PrunedForRoot(str.Name) if err != nil { return err } + wireSchema := prunedSchema.ToWire() fileName := strings.ToLower(str.Name) + "writer" diff --git a/stefgen/templates/dicts.go.tmpl b/stefgen/templates/dicts.go.tmpl index b695711..deb0387 100644 --- a/stefgen/templates/dicts.go.tmpl +++ b/stefgen/templates/dicts.go.tmpl @@ -11,7 +11,7 @@ type WriterState struct { // OverrideSchema is set if encoding should perform a translation into the target // schema. The specified schema must be compatible with endoders' schema. - OverrideSchema *schema.Schema + OverrideSchema *schema.WireSchema // Dictionaries {{range .Dicts -}} @@ -44,7 +44,7 @@ func (d *WriterState) Reset() { type ReaderState struct { // OverrideSchema is set if decoding should perform a translation from specfied // schema. OverrideSchema must be compatible with decoders' schema. - OverrideSchema *schema.Schema + OverrideSchema *schema.WireSchema // Dictionaries {{range .Dicts -}} @@ -57,7 +57,7 @@ type ReaderState struct { {{end}} } -func (d* ReaderState) Init(overrideSchema *schema.Schema) { +func (d* ReaderState) Init(overrideSchema *schema.WireSchema) { d.OverrideSchema = overrideSchema {{range .Dicts -}} d.{{.DictName}}.Init() diff --git a/stefgen/templates/oneof.go.tmpl b/stefgen/templates/oneof.go.tmpl index 7597c7f..cbe95f9 100644 --- a/stefgen/templates/oneof.go.tmpl +++ b/stefgen/templates/oneof.go.tmpl @@ -253,13 +253,13 @@ func (e *{{ .StructName }}Encoder) Init(state* WriterState, columns *pkg.WriteCo {{end}} if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs[{{printf "%q" .StructName}}] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount({{printf "%q" .StructName}}) + if !ok { return fmt.Errorf("cannot find oneof in override schema: %s", {{printf "%q" .StructName}}) } // Number of fields in the target schema. - e.fieldCount = uint(len(overrideSchema.Fields)) + e.fieldCount = fieldCount } else { // Keep all fields when encoding. e.fieldCount = {{len .Fields}} @@ -358,13 +358,13 @@ func (d *{{ .StructName }}Decoder) Init(state* ReaderState, columns *pkg.ReadCol state.{{ .StructName }}Decoder = d if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs[{{printf "%q" .StructName}}] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount({{printf "%q" .StructName}}) + if !ok { return fmt.Errorf("cannot find oneof in override schema: %s", {{printf "%q" .StructName}}) } // Number of fields in the target schema. - d.fieldCount = uint(len(overrideSchema.Fields)) + d.fieldCount = fieldCount } else { // Keep all fields when encoding. d.fieldCount = {{len .Fields}} diff --git a/stefgen/templates/struct.go.tmpl b/stefgen/templates/struct.go.tmpl index b09865d..97e4e30 100644 --- a/stefgen/templates/struct.go.tmpl +++ b/stefgen/templates/struct.go.tmpl @@ -282,13 +282,13 @@ func (e *{{ .StructName }}Encoder) Init(state* WriterState, columns *pkg.WriteCo {{- end}} if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs[{{printf "%q" .StructName}}] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount({{printf "%q" .StructName}}) + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", {{printf "%q" .StructName}}) } // Number of fields in the target schema. - e.fieldCount = uint(len(overrideSchema.Fields)) + e.fieldCount = fieldCount // Set that many 1 bits in the keepFieldMask. All fields with higher number // will be skipped when encoding. @@ -442,13 +442,13 @@ func (d *{{ .StructName }}Decoder) Init(state* ReaderState, columns *pkg.ReadCol state.{{ .StructName }}Decoder = d if state.OverrideSchema != nil { - overrideSchema, ok := state.OverrideSchema.Structs[{{printf "%q" .StructName}}] - if !ok || overrideSchema == nil { + fieldCount, ok := state.OverrideSchema.FieldCount({{printf "%q" .StructName}}) + if !ok { return fmt.Errorf("cannot find struct in override schema: %s", {{printf "%q" .StructName}}) } // Number of fields in the target schema. - d.fieldCount = uint(len(overrideSchema.Fields)) + d.fieldCount = fieldCount } else { // Keep all fields when encoding. d.fieldCount = {{len .Fields}} diff --git a/stefgen/templates/writer.go.tmpl b/stefgen/templates/writer.go.tmpl index 3e75dbe..52dcb12 100644 --- a/stefgen/templates/writer.go.tmpl +++ b/stefgen/templates/writer.go.tmpl @@ -207,10 +207,10 @@ func (w *{{.StructName}}Writer) Flush() error { const wireSchema{{.StructName}} = {{printf "%q" .Schema}} -func {{.StructName}}WireSchema() (*schema.Schema, error) { - var d schema.Schema - if err := d.Deserialize(bytes.NewBuffer([]byte(wireSchema{{.StructName}}))); err != nil { - return nil, err +func {{.StructName}}WireSchema() (schema.WireSchema, error) { + var w schema.WireSchema + if err := w.Deserialize(bytes.NewBuffer([]byte(wireSchema{{.StructName}}))); err != nil { + return w, err } - return &d,nil + return w,nil } \ No newline at end of file