Skip to content

Commit

Permalink
Introduce minimal WireSchema
Browse files Browse the repository at this point in the history
This decouples human-readable Schema that is in JSON
format from WireSchema that is serialized in binary
format for efficiency. WireSchema contains only the
information that is necessary for wire compatibility
checks.

Resolves #24
(Note that we still need struct names for checks).

Here are byte size comparisons between JSON and WIRE
for the Otel/TEF schema with "Metrics" root:

```
JSON:  2976, zstd:  760
WIRE:   115, zstd:  113
```
  • Loading branch information
tigrannajaryan committed Feb 4, 2025
1 parent 31ca1ff commit 0015987
Show file tree
Hide file tree
Showing 36 changed files with 1,343 additions and 609 deletions.
6 changes: 3 additions & 3 deletions go/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Client struct {
grpcClient stef_proto.STEFDestinationClient
stream stef_proto.STEFDestination_StreamClient
callbacks ClientCallbacks
clientSchema *schema.Schema
clientSchema *schema.WireSchema
logger types.Logger

// Running state
Expand Down Expand Up @@ -88,7 +88,7 @@ type ClientSettings struct {
Logger types.Logger
// gRPC stream to send data over.
GrpcClient stef_proto.STEFDestinationClient
ClientSchema *schema.Schema
ClientSchema *schema.WireSchema
Callbacks ClientCallbacks
}

Expand Down Expand Up @@ -154,7 +154,7 @@ func (c *Client) Connect(ctx context.Context) (pkg.ChunkWriter, pkg.WriterOption
}

// Unmarshal server schema.
var serverSchema schema.Schema
var serverSchema schema.WireSchema
buf := bytes.NewBuffer(capabilities.Capabilities.Schema)
err = serverSchema.Deserialize(buf)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions go/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ type StreamServer struct {
stef_proto.UnimplementedSTEFDestinationServer

logger types.Logger
serverSchema *schema.Schema
serverSchema *schema.WireSchema
maxDictBytes uint64
onStream func(reader GrpcReader, ackFunc func(sequenceId uint64) error) error
}
Expand All @@ -133,7 +133,7 @@ var _ stef_proto.STEFDestinationServer = (*StreamServer)(nil)

type ServerSettings struct {
Logger types.Logger
ServerSchema *schema.Schema
ServerSchema *schema.WireSchema
MaxDictBytes uint64
OnStream func(reader GrpcReader, ackFunc func(sequenceId uint64) error) error
}
Expand Down
4 changes: 2 additions & 2 deletions go/otel/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestGrpcWriteRead(t *testing.T) {

recordsReceivedAndVerified := make(chan struct{})
serverSettings := stefgrpc.ServerSettings{
ServerSchema: schema,
ServerSchema: &schema,
OnStream: func(source stefgrpc.GrpcReader, ackFunc func(sequenceId uint64) error) error {
reader, err := oteltef.NewMetricsReader(source)
require.NoError(t, err)
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestGrpcWriteRead(t *testing.T) {

clientSettings := stefgrpc.ClientSettings{
GrpcClient: grpcClient,
ClientSchema: schema,
ClientSchema: &schema,
Callbacks: callbacks,
}
client := stefgrpc.NewClient(clientSettings)
Expand Down
17 changes: 7 additions & 10 deletions go/otel/manual_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func TestAnyValue(t *testing.T) {
}
}

func writeReadRecord(t *testing.T, withSchema *schema.Schema) *oteltef.Metrics {
func writeReadRecord(t *testing.T, withSchema *schema.WireSchema) *oteltef.Metrics {
buf := &countingChunkWriter{}
writer, err := oteltef.NewMetricsWriter(buf, pkg.WriterOptions{Schema: withSchema})
require.NoError(t, err)
Expand Down Expand Up @@ -321,23 +321,21 @@ func TestWriteOverrideSchema(t *testing.T) {
assert.EqualValues(t, 4.5, readRecord.Point().Value().Float64())

// Write/read using full, unmodified schema
readRecord = writeReadRecord(t, schem)
readRecord = writeReadRecord(t, &schem)
assert.EqualValues(t, "abc", readRecord.Metric().Name())
assert.EqualValues(t, "scope", readRecord.Scope().Name())
assert.EqualValues(t, 123, readRecord.Point().Timestamp())
assert.EqualValues(t, oteltef.PointValueTypeFloat64, readRecord.Point().Value().Type())
assert.EqualValues(t, 4.5, readRecord.Point().Value().Float64())

// Remove "Monotonic" field (field #8) from "Metric" struct in the schema.
metric := schem.Structs["Metric"]
metric.Fields = metric.Fields[:7]
schem.StructFieldCount["Metric"] = 7

// Remove "Float64" field (field #2) from "PointValue" oneof struct in the schema.
pointValueOneOf := schem.Structs["PointValue"]
pointValueOneOf.Fields = pointValueOneOf.Fields[:1]
schem.StructFieldCount["PointValue"] = 1

// Write/read using reduced schema
readRecord = writeReadRecord(t, schem)
readRecord = writeReadRecord(t, &schem)
assert.EqualValues(t, "abc", readRecord.Metric().Name())
assert.EqualValues(t, "scope", readRecord.Scope().Name())
assert.EqualValues(t, 123, readRecord.Point().Timestamp())
Expand All @@ -350,11 +348,10 @@ func TestWriteOverrideSchema(t *testing.T) {
assert.EqualValues(t, 0.0, readRecord.Point().Value().Float64())

// Remove the entire "Point" field (field #6) from "Record" struct in the schema.
mainStruct := schem.Structs["Metrics"]
mainStruct.Fields = mainStruct.Fields[:5]
schem.StructFieldCount["Metrics"] = 5

// Write/read using reduced schema
readRecord = writeReadRecord(t, schem)
readRecord = writeReadRecord(t, &schem)
assert.EqualValues(t, "abc", readRecord.Metric().Name())
assert.EqualValues(t, "scope", readRecord.Scope().Name())
// All Point fields are default values because Point field was not encoded by Writer.
Expand Down
12 changes: 6 additions & 6 deletions go/otel/oteltef/anyvalue.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions go/otel/oteltef/dicts.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions go/otel/oteltef/envelope.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions go/otel/oteltef/event.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions go/otel/oteltef/exemplar.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions go/otel/oteltef/exemplarvalue.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions go/otel/oteltef/histogramvalue.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions go/otel/oteltef/link.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 0015987

Please sign in to comment.