diff --git a/.chloggen/otel-arrow-v024.yaml b/.chloggen/otel-arrow-v024.yaml new file mode 100644 index 000000000000..f873aa9691a2 --- /dev/null +++ b/.chloggen/otel-arrow-v024.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: OTel-Arrow + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Update to OTel-Arrow v0.24.0 + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [26491] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/otelarrowexporter/go.mod b/exporter/otelarrowexporter/go.mod index 529971751603..86fa96bdeb25 100644 --- a/exporter/otelarrowexporter/go.mod +++ b/exporter/otelarrowexporter/go.mod @@ -3,9 +3,9 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelar go 1.21.0 require ( - github.com/apache/arrow/go/v14 v14.0.2 + github.com/apache/arrow/go/v16 v16.1.0 github.com/open-telemetry/otel-arrow v0.24.0 - github.com/open-telemetry/otel-arrow/collector v0.23.0 + github.com/open-telemetry/otel-arrow/collector v0.24.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector v0.102.2-0.20240611143128-7dfb57b9ad1c go.opentelemetry.io/collector/component v0.102.2-0.20240611143128-7dfb57b9ad1c @@ -36,7 +36,6 @@ require ( require ( github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect - github.com/apache/arrow/go/v16 v16.1.0 // indirect github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect diff --git a/exporter/otelarrowexporter/go.sum b/exporter/otelarrowexporter/go.sum index 576b63750d98..62bc951ca84f 100644 --- a/exporter/otelarrowexporter/go.sum +++ b/exporter/otelarrowexporter/go.sum @@ -3,8 +3,6 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM= github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= -github.com/apache/arrow/go/v14 v14.0.2 h1:N8OkaJEOfI3mEZt07BIkvo4sC6XDbL+48MBPWO5IONw= -github.com/apache/arrow/go/v14 v14.0.2/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybFg8QBQ5LU+eBY= github.com/apache/arrow/go/v16 v16.1.0 h1:dwgfOya6s03CzH9JrjCBx6bkVb4yPD4ma3haj9p7FXI= github.com/apache/arrow/go/v16 v16.1.0/go.mod h1:9wnc9mn6vEDTRIm4+27pEjQpRKuTvBaessPoEXQzxWA= github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc h1:Keo7wQ7UODUaHcEi7ltENhbAK2VgZjfat6mLy03tQzo= @@ -88,8 +86,8 @@ github.com/mostynb/go-grpc-compression v1.2.3/go.mod h1:AghIxF3P57umzqM9yz795+y1 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/open-telemetry/otel-arrow v0.24.0 h1:hNUEbwHW/1gEOUiN+HoI+ITiXe2vSBaPWlE9FRwJwDE= github.com/open-telemetry/otel-arrow v0.24.0/go.mod h1:uzoHixEh6CUBZkP+vkRvyiHYUnYsAOUwCcfByQkSMM0= -github.com/open-telemetry/otel-arrow/collector v0.23.0 h1:ztmq1ipJBhm4xWjHDbmKOtgP3Nl/ZDoLX+3ThhzFs6k= -github.com/open-telemetry/otel-arrow/collector v0.23.0/go.mod h1:SLgLEhhcfR9MjG1taK8RPuwiuIoAPW7IpCjFBobwIUM= +github.com/open-telemetry/otel-arrow/collector v0.24.0 h1:NYTcgtwG0lQnoGcEomTTtueZxzk03xt+XEXN4L5kqHA= +github.com/open-telemetry/otel-arrow/collector v0.24.0/go.mod h1:+jJ3Vfhh685hXSw2Z1P1wl/rTqEKlSaJ4FocZI+xs+0= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/exporter/otelarrowexporter/internal/arrow/bestofn.go b/exporter/otelarrowexporter/internal/arrow/bestofn.go index ae4bce633643..443713cda815 100644 --- a/exporter/otelarrowexporter/internal/arrow/bestofn.go +++ b/exporter/otelarrowexporter/internal/arrow/bestofn.go @@ -8,6 +8,10 @@ import ( "math/rand" "runtime" "sort" + "time" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // bestOfNPrioritizer is a prioritizer that selects a less-loaded stream to write. @@ -42,7 +46,7 @@ type streamSorter struct { var _ streamPrioritizer = &bestOfNPrioritizer{} -func newBestOfNPrioritizer(dc doneCancel, numChoices, numStreams int, lf loadFunc) (*bestOfNPrioritizer, []*streamWorkState) { +func newBestOfNPrioritizer(dc doneCancel, numChoices, numStreams int, lf loadFunc, maxLifetime time.Duration) (*bestOfNPrioritizer, []*streamWorkState) { var state []*streamWorkState // Limit numChoices to the number of streams. @@ -50,8 +54,9 @@ func newBestOfNPrioritizer(dc doneCancel, numChoices, numStreams int, lf loadFun for i := 0; i < numStreams; i++ { ws := &streamWorkState{ - waiters: map[int64]chan<- error{}, - toWrite: make(chan writeItem, 1), + maxStreamLifetime: addJitter(maxLifetime), + waiters: map[int64]chan<- error{}, + toWrite: make(chan writeItem, 1), } state = append(state, ws) @@ -112,7 +117,7 @@ func (lp *bestOfNPrioritizer) sendAndWait(ctx context.Context, errCh <-chan erro case <-lp.done: return ErrStreamRestarting case <-ctx.Done(): - return context.Canceled + return status.Errorf(codes.Canceled, "stream wait: %v", ctx.Err()) case lp.input <- wri: return waitForWrite(ctx, errCh, lp.done) } diff --git a/exporter/otelarrowexporter/internal/arrow/exporter.go b/exporter/otelarrowexporter/internal/arrow/exporter.go index 18b3259d3b4a..8c08bf588a51 100644 --- a/exporter/otelarrowexporter/internal/arrow/exporter.go +++ b/exporter/otelarrowexporter/internal/arrow/exporter.go @@ -20,7 +20,9 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/status" ) // Exporter is 1:1 with exporter, isolates arrow-specific @@ -32,9 +34,7 @@ type Exporter struct { // prioritizerName the name of a balancer policy. prioritizerName PrioritizerName - // maxStreamLifetime is a limit on duration for streams. A - // slight "jitter" is applied relative to this value on a - // per-stream basis. + // maxStreamLifetime is a limit on duration for streams. maxStreamLifetime time.Duration // disableDowngrade prevents downgrade from occurring, supports @@ -156,7 +156,7 @@ func (e *Exporter) Start(ctx context.Context) error { downCtx, downDc := newDoneCancel(ctx) var sws []*streamWorkState - e.ready, sws = newStreamPrioritizer(downDc, e.prioritizerName, e.numStreams) + e.ready, sws = newStreamPrioritizer(downDc, e.prioritizerName, e.numStreams, e.maxStreamLifetime) for _, ws := range sws { e.startArrowStream(downCtx, ws) @@ -236,7 +236,6 @@ func (e *Exporter) runArrowStream(ctx context.Context, dc doneCancel, state *str producer := e.newProducer() stream := newStream(producer, e.ready, e.telemetry, e.netReporter, state) - stream.maxStreamLifetime = addJitter(e.maxStreamLifetime) defer func() { if err := producer.Close(); err != nil { @@ -258,6 +257,14 @@ func (e *Exporter) runArrowStream(ctx context.Context, dc doneCancel, state *str // // consumer should fall back to standard OTLP, (true, nil) func (e *Exporter) SendAndWait(ctx context.Context, data any) (bool, error) { + // If the incoming context is already canceled, return the + // same error condition a unary gRPC or HTTP exporter would do. + select { + case <-ctx.Done(): + return false, status.Errorf(codes.Canceled, "context done before send: %v", ctx.Err()) + default: + } + errCh := make(chan error, 1) // Note that if the OTLP exporter's gRPC Headers field was @@ -343,7 +350,7 @@ func waitForWrite(ctx context.Context, errCh <-chan error, down <-chan struct{}) select { case <-ctx.Done(): // This caller's context timed out. - return ctx.Err() + return status.Errorf(codes.Canceled, "send wait: %v", ctx.Err()) case <-down: return ErrStreamRestarting case err := <-errCh: diff --git a/exporter/otelarrowexporter/internal/arrow/exporter_test.go b/exporter/otelarrowexporter/internal/arrow/exporter_test.go index 276e5f3fa437..40de3857c5af 100644 --- a/exporter/otelarrowexporter/internal/arrow/exporter_test.go +++ b/exporter/otelarrowexporter/internal/arrow/exporter_test.go @@ -6,7 +6,6 @@ package arrow import ( "context" "encoding/json" - "errors" "fmt" "sync" "sync/atomic" @@ -31,7 +30,9 @@ import ( "go.uber.org/zap/zaptest" "golang.org/x/net/http2/hpack" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" ) var AllPrioritizers = []PrioritizerName{LeastLoadedPrioritizer, LeastLoadedTwoPrioritizer} @@ -278,7 +279,18 @@ func TestArrowExporterTimeout(t *testing.T) { sent, err := tc.exporter.SendAndWait(ctx, twoTraces) require.True(t, sent) require.Error(t, err) - require.True(t, errors.Is(err, context.Canceled)) + + stat, is := status.FromError(err) + require.True(t, is, "is a gRPC status") + require.Equal(t, codes.Canceled, stat.Code()) + + // Repeat the request, will get immediate timeout. + sent, err = tc.exporter.SendAndWait(ctx, twoTraces) + require.False(t, sent) + stat, is = status.FromError(err) + require.True(t, is, "is a gRPC status error: %v", err) + require.Equal(t, "context done before send: context canceled", stat.Message()) + require.Equal(t, codes.Canceled, stat.Code()) require.NoError(t, tc.exporter.Shutdown(ctx)) }) @@ -406,7 +418,10 @@ func TestArrowExporterConnectTimeout(t *testing.T) { }() _, err := tc.exporter.SendAndWait(ctx, twoTraces) require.Error(t, err) - require.True(t, errors.Is(err, context.Canceled)) + + stat, is := status.FromError(err) + require.True(t, is, "is a gRPC status error: %v", err) + require.Equal(t, codes.Canceled, stat.Code()) require.NoError(t, tc.exporter.Shutdown(bg)) }) @@ -489,7 +504,10 @@ func TestArrowExporterStreamRace(t *testing.T) { // This blocks until the cancelation. _, err := tc.exporter.SendAndWait(callctx, twoTraces) require.Error(t, err) - require.True(t, errors.Is(err, context.Canceled)) + + stat, is := status.FromError(err) + require.True(t, is, "is a gRPC status error: %v", err) + require.Equal(t, codes.Canceled, stat.Code()) }() } diff --git a/exporter/otelarrowexporter/internal/arrow/prioritizer.go b/exporter/otelarrowexporter/internal/arrow/prioritizer.go index 84220338348f..551b9f781fc3 100644 --- a/exporter/otelarrowexporter/internal/arrow/prioritizer.go +++ b/exporter/otelarrowexporter/internal/arrow/prioritizer.go @@ -8,6 +8,7 @@ import ( "fmt" "strconv" "strings" + "time" "go.opentelemetry.io/collector/component" "google.golang.org/grpc/codes" @@ -50,7 +51,7 @@ type streamWriter interface { sendAndWait(context.Context, <-chan error, writeItem) error } -func newStreamPrioritizer(dc doneCancel, name PrioritizerName, numStreams int) (streamPrioritizer, []*streamWorkState) { +func newStreamPrioritizer(dc doneCancel, name PrioritizerName, numStreams int, maxLifetime time.Duration) (streamPrioritizer, []*streamWorkState) { if name == unsetPrioritizer { name = DefaultPrioritizer } @@ -58,10 +59,10 @@ func newStreamPrioritizer(dc doneCancel, name PrioritizerName, numStreams int) ( // error was checked and reported in Validate n, err := strconv.Atoi(string(name[len(llPrefix):])) if err == nil { - return newBestOfNPrioritizer(dc, n, numStreams, pendingRequests) + return newBestOfNPrioritizer(dc, n, numStreams, pendingRequests, maxLifetime) } } - return newBestOfNPrioritizer(dc, numStreams, numStreams, pendingRequests) + return newBestOfNPrioritizer(dc, numStreams, numStreams, pendingRequests, maxLifetime) } // pendingRequests is the load function used by leastloadedN. diff --git a/exporter/otelarrowexporter/internal/arrow/stream.go b/exporter/otelarrowexporter/internal/arrow/stream.go index 7070d8c6ea42..ecedd88ae105 100644 --- a/exporter/otelarrowexporter/internal/arrow/stream.go +++ b/exporter/otelarrowexporter/internal/arrow/stream.go @@ -7,7 +7,6 @@ import ( "bytes" "context" "errors" - "fmt" "io" "sync" "time" @@ -16,7 +15,6 @@ import ( "github.com/open-telemetry/otel-arrow/collector/netstats" arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" @@ -34,12 +32,6 @@ import ( // Stream is 1:1 with gRPC stream. type Stream struct { - // maxStreamLifetime is the max timeout before stream - // should be closed on the client side. This ensures a - // graceful shutdown before max_connection_age is reached - // on the server side. - maxStreamLifetime time.Duration - // producer is exclusive to the holder of the stream. producer arrowRecord.ProducerAPI @@ -77,6 +69,11 @@ type streamWorkState struct { // prioritizer and a stream. toWrite chan writeItem + // maxStreamLifetime is a limit on duration for streams. A + // slight "jitter" is applied relative to this value on a + // per-stream basis. + maxStreamLifetime time.Duration + // lock protects waiters lock sync.Mutex @@ -130,9 +127,9 @@ func (s *Stream) setBatchChannel(batchID int64, errCh chan<- error) { s.workState.waiters[batchID] = errCh } -// logStreamError decides how to log an error. `which` indicates the -// stream direction, will be "reader" or "writer". -func (s *Stream) logStreamError(which string, err error) { +// logStreamError decides how to log an error. `where` indicates the +// error location, will be "reader" or "writer". +func (s *Stream) logStreamError(where string, err error) { var code codes.Code var msg string // gRPC tends to supply status-wrapped errors, so we always @@ -151,9 +148,9 @@ func (s *Stream) logStreamError(which string, err error) { msg = err.Error() } if code == codes.Canceled { - s.telemetry.Logger.Debug("arrow stream shutdown", zap.String("which", which), zap.String("message", msg)) + s.telemetry.Logger.Debug("arrow stream shutdown", zap.String("message", msg), zap.String("where", where)) } else { - s.telemetry.Logger.Error("arrow stream error", zap.String("which", which), zap.String("message", msg), zap.Int("code", int(code))) + s.telemetry.Logger.Error("arrow stream error", zap.Int("code", int(code)), zap.String("message", msg), zap.String("where", where)) } } @@ -254,8 +251,8 @@ func (s *Stream) write(ctx context.Context) (retErr error) { hdrsEnc := hpack.NewEncoder(&hdrsBuf) var timerCh <-chan time.Time - if s.maxStreamLifetime != 0 { - timer := time.NewTimer(s.maxStreamLifetime) + if s.workState.maxStreamLifetime != 0 { + timer := time.NewTimer(s.workState.maxStreamLifetime) timerCh = timer.C defer timer.Stop() } @@ -269,7 +266,7 @@ func (s *Stream) write(ctx context.Context) (retErr error) { return nil case wri = <-s.workState.toWrite: case <-ctx.Done(): - return ctx.Err() + return status.Errorf(codes.Canceled, "stream input: %v", ctx.Err()) } err := s.encodeAndSend(wri, &hdrsBuf, hdrsEnc) @@ -314,8 +311,8 @@ func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hp if err != nil { // This is some kind of internal error. We will restart the // stream and mark this record as a permanent one. - err = fmt.Errorf("encode: %w", err) - wri.errCh <- consumererror.NewPermanent(err) + err = status.Errorf(codes.Internal, "encode: %v", err) + wri.errCh <- err return err } @@ -331,8 +328,8 @@ func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hp // This case is like the encode-failure case // above, we will restart the stream but consider // this a permenent error. - err = fmt.Errorf("hpack: %w", err) - wri.errCh <- consumererror.NewPermanent(err) + err = status.Errorf(codes.Internal, "hpack: %v", err) + wri.errCh <- err return err } } @@ -346,13 +343,10 @@ func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hp // unreliable for arrow transport, so we instrument it // directly here. Only the primary direction of transport // is instrumented this way. - if wri.uncompSize != 0 { - var sized netstats.SizesStruct - sized.Method = s.method - sized.Length = int64(wri.uncompSize) - s.netReporter.CountSend(ctx, sized) - s.netReporter.SetSpanSizeAttributes(ctx, sized) - } + var sized netstats.SizesStruct + sized.Method = s.method + sized.Length = int64(wri.uncompSize) + s.netReporter.CountSend(ctx, sized) if err := s.client.Send(batch); err != nil { // The error will be sent to errCh during cleanup for this stream. @@ -380,24 +374,24 @@ func (s *Stream) read(_ context.Context) error { } if err = s.processBatchStatus(resp); err != nil { - return fmt.Errorf("process: %w", err) + return err } } } // getSenderChannel takes the stream lock and removes the corresonding // sender channel. -func (sws *streamWorkState) getSenderChannel(status *arrowpb.BatchStatus) (chan<- error, error) { +func (sws *streamWorkState) getSenderChannel(bstat *arrowpb.BatchStatus) (chan<- error, error) { sws.lock.Lock() defer sws.lock.Unlock() - ch, ok := sws.waiters[status.BatchId] + ch, ok := sws.waiters[bstat.BatchId] if !ok { // Will break the stream. - return nil, fmt.Errorf("unrecognized batch ID: %d", status.BatchId) + return nil, status.Errorf(codes.Internal, "unrecognized batch ID: %d", bstat.BatchId) } - delete(sws.waiters, status.BatchId) + delete(sws.waiters, bstat.BatchId) return ch, nil } @@ -458,7 +452,7 @@ func (s *Stream) encode(records any) (_ *arrowpb.BatchArrowRecords, retErr error zap.Reflect("recovered", err), zap.Stack("stacktrace"), ) - retErr = fmt.Errorf("panic in otel-arrow-adapter: %v", err) + retErr = status.Errorf(codes.Internal, "panic in otel-arrow-adapter: %v", err) } }() var batch *arrowpb.BatchArrowRecords @@ -471,7 +465,7 @@ func (s *Stream) encode(records any) (_ *arrowpb.BatchArrowRecords, retErr error case pmetric.Metrics: batch, err = s.producer.BatchArrowRecordsFromMetrics(data) default: - return nil, fmt.Errorf("unsupported OTLP type: %T", records) + return nil, status.Errorf(codes.Unimplemented, "unsupported OTel-Arrow signal type: %T", records) } return batch, err } diff --git a/exporter/otelarrowexporter/internal/arrow/stream_test.go b/exporter/otelarrowexporter/internal/arrow/stream_test.go index e916667c455c..416f80864b17 100644 --- a/exporter/otelarrowexporter/internal/arrow/stream_test.go +++ b/exporter/otelarrowexporter/internal/arrow/stream_test.go @@ -15,9 +15,10 @@ import ( "github.com/open-telemetry/otel-arrow/collector/netstats" arrowRecordMock "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record/mock" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/consumer/consumererror" "go.uber.org/mock/gomock" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) var oneBatch = &arrowpb.BatchArrowRecords{ @@ -44,7 +45,7 @@ func newStreamTestCase(t *testing.T, pname PrioritizerName) *streamTestCase { producer := arrowRecordMock.NewMockProducerAPI(ctrl) bg, dc := newDoneCancel(context.Background()) - prio, state := newStreamPrioritizer(dc, pname, 1) + prio, state := newStreamPrioritizer(dc, pname, 1, 10*time.Second) ctc := newCommonTestCase(t, NotNoisy) cts := ctc.newMockStream(bg) @@ -53,7 +54,6 @@ func newStreamTestCase(t *testing.T, pname PrioritizerName) *streamTestCase { ctc.requestMetadataCall.AnyTimes().Return(nil, nil) stream := newStream(producer, prio, ctc.telset, netstats.Noop{}, state[0]) - stream.maxStreamLifetime = 10 * time.Second fromTracesCall := producer.EXPECT().BatchArrowRecordsFromTraces(gomock.Any()).Times(0) fromMetricsCall := producer.EXPECT().BatchArrowRecordsFromMetrics(gomock.Any()).Times(0) @@ -143,7 +143,6 @@ func TestStreamNoMaxLifetime(t *testing.T) { t.Run(string(pname), func(t *testing.T) { tc := newStreamTestCase(t, pname) - tc.stream.maxStreamLifetime = 0 tc.fromTracesCall.Times(1).Return(oneBatch, nil) tc.closeSendCall.Times(0) @@ -182,8 +181,12 @@ func TestStreamEncodeError(t *testing.T) { // sender should get a permanent testErr err := tc.mustSendAndWait() require.Error(t, err) - require.True(t, errors.Is(err, testErr)) - require.True(t, consumererror.IsPermanent(err)) + + stat, is := status.FromError(err) + require.True(t, is, "is a gRPC status error: %v", err) + require.Equal(t, codes.Internal, stat.Code()) + + require.Contains(t, stat.Message(), testErr.Error()) }) } } diff --git a/exporter/otelarrowexporter/otelarrow.go b/exporter/otelarrowexporter/otelarrow.go index e3f0a76fe6b3..67beb5dee75e 100644 --- a/exporter/otelarrowexporter/otelarrow.go +++ b/exporter/otelarrowexporter/otelarrow.go @@ -10,7 +10,7 @@ import ( "runtime" "time" - arrowPkg "github.com/apache/arrow/go/v14/arrow" + arrowPkg "github.com/apache/arrow/go/v16/arrow" "github.com/open-telemetry/otel-arrow/collector/compression/zstd" "github.com/open-telemetry/otel-arrow/collector/netstats" arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" diff --git a/exporter/otelarrowexporter/otelarrow_test.go b/exporter/otelarrowexporter/otelarrow_test.go index 558f45b43b8b..15b813550b82 100644 --- a/exporter/otelarrowexporter/otelarrow_test.go +++ b/exporter/otelarrowexporter/otelarrow_test.go @@ -935,10 +935,6 @@ func testSendArrowTraces(t *testing.T, clientWaitForReady, streamServiceAvailabl require.NoError(t, err) require.NotNil(t, exp) - defer func() { - assert.NoError(t, exp.Shutdown(context.Background())) - }() - type isUserCall struct{} host := newHostWithExtensions( @@ -956,6 +952,15 @@ func testSendArrowTraces(t *testing.T, clientWaitForReady, streamServiceAvailabl assert.NoError(t, exp.Start(context.Background(), host)) rcv, _ := otelArrowTracesReceiverOnGRPCServer(ln, false) + + defer func() { + // Shutdown before GracefulStop, because otherwise we + // wait for a full stream lifetime instead of closing + // after requests are served. + assert.NoError(t, exp.Shutdown(context.Background())) + rcv.srv.GracefulStop() + }() + if streamServiceAvailable { rcv.startStreamMockArrowTraces(t, okStatusFor) } @@ -988,8 +993,6 @@ func testSendArrowTraces(t *testing.T, clientWaitForReady, streamServiceAvailabl md := rcv.getMetadata() require.EqualValues(t, []string{"arrow"}, md.Get("callerid")) require.EqualValues(t, expectedHeader, md.Get("header")) - - rcv.srv.GracefulStop() } func okStatusFor(id int64) *arrowpb.BatchStatus { @@ -1102,16 +1105,17 @@ func TestSendArrowFailedTraces(t *testing.T) { require.NoError(t, err) require.NotNil(t, exp) - defer func() { - assert.NoError(t, exp.Shutdown(context.Background())) - }() - host := componenttest.NewNopHost() assert.NoError(t, exp.Start(context.Background(), host)) rcv, _ := otelArrowTracesReceiverOnGRPCServer(ln, false) rcv.startStreamMockArrowTraces(t, failedStatusFor) + defer func() { + assert.NoError(t, exp.Shutdown(context.Background())) + rcv.srv.GracefulStop() + }() + // Delay the server start, slightly. go func() { time.Sleep(100 * time.Millisecond) @@ -1133,8 +1137,6 @@ func TestSendArrowFailedTraces(t *testing.T) { assert.EqualValues(t, int32(2), rcv.totalItems.Load()) assert.EqualValues(t, int32(1), rcv.requestCount.Load()) assert.EqualValues(t, td, rcv.getLastRequest()) - - rcv.srv.GracefulStop() } func TestUserDialOptions(t *testing.T) { diff --git a/receiver/otelarrowreceiver/README.md b/receiver/otelarrowreceiver/README.md index f0fee43036c4..043a2a8100a5 100644 --- a/receiver/otelarrowreceiver/README.md +++ b/receiver/otelarrowreceiver/README.md @@ -87,6 +87,13 @@ When the limit is reached, the receiver will return RESOURCE_EXHAUSTED error codes to the receiver, which are [conditionally retryable, see exporter retry configuration](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md). +- `admission_limit_mib` (default: 64): limits the number of requests that are received by the stream based on request size information available. This should not be confused with `memory_limit_mib` which limits allocations made by the consumer when translating arrow records into pdata objects. i.e. request size is used to control how much traffic we admit, but does not control how much memory is used during request processing. + +- `waiter_limit` (default: 1000): limits the number of requests waiting on admission once `admission_limit_mib` is reached. This is another dimension of memory limiting that ensures waiters are not holding onto a significant amount of memory while waiting to be processed. + +`admission_limit_mib` and `waiter_limit` are arguments supplied to [admission.BoundedQueue](https://github.com/open-telemetry/otel-arrow/tree/main/collector/admission). This custom semaphore is meant to be used within receivers to help limit memory within the collector pipeline. + + ### Compression Configuration In the `arrow` configuration block, `zstd` sub-section applies to all diff --git a/receiver/otelarrowreceiver/config.go b/receiver/otelarrowreceiver/config.go index 38b8e2a1aa2c..8361c0ba7bc8 100644 --- a/receiver/otelarrowreceiver/config.go +++ b/receiver/otelarrowreceiver/config.go @@ -24,6 +24,16 @@ type ArrowConfig struct { // passing through, they will see ResourceExhausted errors. MemoryLimitMiB uint64 `mapstructure:"memory_limit_mib"` + // AdmissionLimitMiB limits the number of requests that are received by the stream based on + // request size information available. Request size is used to control how much traffic we admit + // for processing, but does not control how much memory is used during request processing. + AdmissionLimitMiB uint64 `mapstructure:"admission_limit_mib"` + + // WaiterLimit is the limit on the number of waiters waiting to be processed and consumed. + // This is a dimension of memory limiting to ensure waiters are not consuming an + // unexpectedly large amount of memory in the arrow receiver. + WaiterLimit int64 `mapstructure:"waiter_limit"` + // Zstd settings apply to OTel-Arrow use of gRPC specifically. Zstd zstd.DecoderConfig `mapstructure:"zstd"` } diff --git a/receiver/otelarrowreceiver/config_test.go b/receiver/otelarrowreceiver/config_test.go index f756e1b6b1a5..fc25f24abd89 100644 --- a/receiver/otelarrowreceiver/config_test.go +++ b/receiver/otelarrowreceiver/config_test.go @@ -77,7 +77,9 @@ func TestUnmarshalConfig(t *testing.T) { }, }, Arrow: ArrowConfig{ - MemoryLimitMiB: 123, + MemoryLimitMiB: 123, + AdmissionLimitMiB: 80, + WaiterLimit: 100, }, }, }, cfg) @@ -101,7 +103,9 @@ func TestUnmarshalConfigUnix(t *testing.T) { ReadBufferSize: 512 * 1024, }, Arrow: ArrowConfig{ - MemoryLimitMiB: defaultMemoryLimitMiB, + MemoryLimitMiB: defaultMemoryLimitMiB, + AdmissionLimitMiB: defaultAdmissionLimitMiB, + WaiterLimit: defaultWaiterLimit, }, }, }, cfg) diff --git a/receiver/otelarrowreceiver/factory.go b/receiver/otelarrowreceiver/factory.go index e59c8fdc5d77..48279d76cc3a 100644 --- a/receiver/otelarrowreceiver/factory.go +++ b/receiver/otelarrowreceiver/factory.go @@ -19,7 +19,9 @@ import ( const ( defaultGRPCEndpoint = "0.0.0.0:4317" - defaultMemoryLimitMiB = 128 + defaultMemoryLimitMiB = 128 + defaultAdmissionLimitMiB = defaultMemoryLimitMiB / 2 + defaultWaiterLimit = 1000 ) // NewFactory creates a new OTel-Arrow receiver factory. @@ -45,7 +47,9 @@ func createDefaultConfig() component.Config { ReadBufferSize: 512 * 1024, }, Arrow: ArrowConfig{ - MemoryLimitMiB: defaultMemoryLimitMiB, + MemoryLimitMiB: defaultMemoryLimitMiB, + AdmissionLimitMiB: defaultAdmissionLimitMiB, + WaiterLimit: defaultWaiterLimit, }, }, } @@ -108,7 +112,7 @@ func createLog( return r, nil } -// This is the map of already created OTLP receivers for particular configurations. +// This is the map of already created OTel-Arrow receivers for particular configurations. // We maintain this map because the Factory is asked trace and metric receivers separately // when it gets CreateTracesReceiver() and CreateMetricsReceiver() but they must not // create separate objects, they must use one otelArrowReceiver object per configuration. diff --git a/receiver/otelarrowreceiver/go.mod b/receiver/otelarrowreceiver/go.mod index a3023565ad41..fc0d932e293b 100644 --- a/receiver/otelarrowreceiver/go.mod +++ b/receiver/otelarrowreceiver/go.mod @@ -4,7 +4,7 @@ go 1.21.0 require ( github.com/open-telemetry/otel-arrow v0.24.0 - github.com/open-telemetry/otel-arrow/collector v0.23.0 + github.com/open-telemetry/otel-arrow/collector v0.24.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector v0.102.2-0.20240611143128-7dfb57b9ad1c go.opentelemetry.io/collector/component v0.102.2-0.20240611143128-7dfb57b9ad1c @@ -28,13 +28,16 @@ require ( go.uber.org/zap v1.27.0 golang.org/x/net v0.26.0 google.golang.org/grpc v1.64.0 + google.golang.org/protobuf v1.34.2 ) require ( github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect github.com/apache/arrow/go/v16 v16.1.0 // indirect github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc // indirect + github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/buger/jsonparser v1.1.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect @@ -55,6 +58,7 @@ require ( github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.1.1 // indirect + github.com/mailru/easyjson v0.7.7 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect @@ -66,6 +70,7 @@ require ( github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.54.0 // indirect github.com/prometheus/procfs v0.15.0 // indirect + github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.opentelemetry.io/collector/config/configcompression v1.9.1-0.20240611143128-7dfb57b9ad1c // indirect @@ -85,6 +90,5 @@ require ( golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240520151616-dc85e6b867a5 // indirect - google.golang.org/protobuf v1.34.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/receiver/otelarrowreceiver/go.sum b/receiver/otelarrowreceiver/go.sum index db2d6d9f1446..1e920d341328 100644 --- a/receiver/otelarrowreceiver/go.sum +++ b/receiver/otelarrowreceiver/go.sum @@ -7,10 +7,14 @@ github.com/apache/arrow/go/v16 v16.1.0 h1:dwgfOya6s03CzH9JrjCBx6bkVb4yPD4ma3haj9 github.com/apache/arrow/go/v16 v16.1.0/go.mod h1:9wnc9mn6vEDTRIm4+27pEjQpRKuTvBaessPoEXQzxWA= github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc h1:Keo7wQ7UODUaHcEi7ltENhbAK2VgZjfat6mLy03tQzo= github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc/go.mod h1:k08r+Yj1PRAmuayFiRK6MYuR5Ve4IuZtTfxErMIh0+c= +github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= +github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/brianvoe/gofakeit/v6 v6.17.0 h1:obbQTJeHfktJtiZzq0Q1bEpsNUs+yHrYlPVWt7BtmJ4= github.com/brianvoe/gofakeit/v6 v6.17.0/go.mod h1:Ow6qC71xtwm79anlwKRlWZW6zVq9D2XHE4QSSMP/rU8= +github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= +github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -49,6 +53,7 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= @@ -70,6 +75,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= @@ -84,8 +91,8 @@ github.com/mostynb/go-grpc-compression v1.2.3/go.mod h1:AghIxF3P57umzqM9yz795+y1 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/open-telemetry/otel-arrow v0.24.0 h1:hNUEbwHW/1gEOUiN+HoI+ITiXe2vSBaPWlE9FRwJwDE= github.com/open-telemetry/otel-arrow v0.24.0/go.mod h1:uzoHixEh6CUBZkP+vkRvyiHYUnYsAOUwCcfByQkSMM0= -github.com/open-telemetry/otel-arrow/collector v0.23.0 h1:ztmq1ipJBhm4xWjHDbmKOtgP3Nl/ZDoLX+3ThhzFs6k= -github.com/open-telemetry/otel-arrow/collector v0.23.0/go.mod h1:SLgLEhhcfR9MjG1taK8RPuwiuIoAPW7IpCjFBobwIUM= +github.com/open-telemetry/otel-arrow/collector v0.24.0 h1:NYTcgtwG0lQnoGcEomTTtueZxzk03xt+XEXN4L5kqHA= +github.com/open-telemetry/otel-arrow/collector v0.24.0/go.mod h1:+jJ3Vfhh685hXSw2Z1P1wl/rTqEKlSaJ4FocZI+xs+0= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -105,6 +112,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= +github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow.go b/receiver/otelarrowreceiver/internal/arrow/arrow.go index c3808c749f41..1aee357d9df1 100644 --- a/receiver/otelarrowreceiver/internal/arrow/arrow.go +++ b/receiver/otelarrowreceiver/internal/arrow/arrow.go @@ -8,15 +8,20 @@ import ( "errors" "fmt" "io" + "net" + "runtime" + "strconv" "strings" + "sync" + "sync/atomic" arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" + "github.com/open-telemetry/otel-arrow/collector/admission" "github.com/open-telemetry/otel-arrow/collector/netstats" arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/extension/auth" @@ -27,6 +32,7 @@ import ( "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/otel" otelcodes "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" "go.uber.org/multierr" @@ -36,20 +42,20 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - - md "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/metadata" + "google.golang.org/protobuf/proto" ) const ( streamFormat = "arrow" hpackMaxDynamicSize = 4096 + scopeName = "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver" ) var ( ErrNoMetricsConsumer = fmt.Errorf("no metrics consumer") ErrNoLogsConsumer = fmt.Errorf("no logs consumer") ErrNoTracesConsumer = fmt.Errorf("no traces consumer") - ErrUnrecognizedPayload = fmt.Errorf("unrecognized OTLP payload") + ErrUnrecognizedPayload = consumererror.NewPermanent(fmt.Errorf("unrecognized OTel-Arrow payload")) ) type Consumers interface { @@ -65,14 +71,23 @@ type Receiver struct { arrowpb.UnsafeArrowLogsServiceServer arrowpb.UnsafeArrowMetricsServiceServer - telemetry component.TelemetrySettings - tracer trace.Tracer - obsrecv *receiverhelper.ObsReport - gsettings configgrpc.ServerConfig - authServer auth.Server - newConsumer func() arrowRecord.ConsumerAPI - netReporter netstats.Interface - telemetryBuilder *md.TelemetryBuilder + telemetry component.TelemetrySettings + tracer trace.Tracer + obsrecv *receiverhelper.ObsReport + gsettings configgrpc.ServerConfig + authServer auth.Server + newConsumer func() arrowRecord.ConsumerAPI + netReporter netstats.Interface + recvInFlightBytes metric.Int64UpDownCounter + recvInFlightItems metric.Int64UpDownCounter + recvInFlightRequests metric.Int64UpDownCounter + boundedQueue *admission.BoundedQueue +} + +// receiverStream holds the inFlightWG for a single stream. +type receiverStream struct { + *Receiver + inFlightWG sync.WaitGroup } // New creates a new Receiver reference. @@ -83,23 +98,48 @@ func New( gsettings configgrpc.ServerConfig, authServer auth.Server, newConsumer func() arrowRecord.ConsumerAPI, + bq *admission.BoundedQueue, netReporter netstats.Interface, ) (*Receiver, error) { - telemetryBuilder, err := md.NewTelemetryBuilder(set.TelemetrySettings) - if err != nil { - return nil, err + tracer := set.TelemetrySettings.TracerProvider.Tracer("otel-arrow-receiver") + var errors, err error + recv := &Receiver{ + Consumers: cs, + obsrecv: obsrecv, + telemetry: set.TelemetrySettings, + tracer: tracer, + authServer: authServer, + newConsumer: newConsumer, + gsettings: gsettings, + netReporter: netReporter, + boundedQueue: bq, + } + + meter := recv.telemetry.MeterProvider.Meter(scopeName) + recv.recvInFlightBytes, err = meter.Int64UpDownCounter( + "otel_arrow_receiver_in_flight_bytes", + metric.WithDescription("Number of bytes in flight"), + metric.WithUnit("By"), + ) + errors = multierr.Append(errors, err) + + recv.recvInFlightItems, err = meter.Int64UpDownCounter( + "otel_arrow_receiver_in_flight_items", + metric.WithDescription("Number of items in flight"), + ) + errors = multierr.Append(errors, err) + + recv.recvInFlightRequests, err = meter.Int64UpDownCounter( + "otel_arrow_receiver_in_flight_requests", + metric.WithDescription("Number of requests in flight"), + ) + errors = multierr.Append(errors, err) + + if errors != nil { + return nil, errors } - return &Receiver{ - Consumers: cs, - obsrecv: obsrecv, - telemetry: set.TelemetrySettings, - tracer: md.Tracer(set.TelemetrySettings), - authServer: authServer, - newConsumer: newConsumer, - gsettings: gsettings, - netReporter: netReporter, - telemetryBuilder: telemetryBuilder, - }, nil + + return recv, nil } // headerReceiver contains the state necessary to decode per-request metadata @@ -250,7 +290,7 @@ func (h *headerReceiver) newContext(ctx context.Context, hdrs map[string][]strin } // logStreamError decides how to log an error. -func (r *Receiver) logStreamError(err error) { +func (r *Receiver) logStreamError(err error, where string) { var code codes.Code var msg string // gRPC tends to supply status-wrapped errors, so we always @@ -270,9 +310,9 @@ func (r *Receiver) logStreamError(err error) { } if code == codes.Canceled { - r.telemetry.Logger.Debug("arrow stream shutdown", zap.String("message", msg)) + r.telemetry.Logger.Debug("arrow stream shutdown", zap.String("message", msg), zap.String("where", where)) } else { - r.telemetry.Logger.Error("arrow stream error", zap.String("message", msg), zap.Int("code", int(code))) + r.telemetry.Logger.Error("arrow stream error", zap.Int("code", int(code)), zap.String("message", msg), zap.String("where", where)) } } @@ -304,247 +344,555 @@ type anyStreamServer interface { grpc.ServerStream } +type batchResp struct { + id int64 + err error +} + +func (r *Receiver) recoverErr(retErr *error) { + if err := recover(); err != nil { + // When this happens, the stacktrace is + // important and lost if we don't capture it + // here. + r.telemetry.Logger.Error("panic detail in otel-arrow-adapter", + zap.Reflect("recovered", err), + zap.Stack("stacktrace"), + ) + *retErr = status.Errorf(codes.Internal, "panic in otel-arrow-adapter: %v", err) + } +} + func (r *Receiver) anyStream(serverStream anyStreamServer, method string) (retErr error) { streamCtx := serverStream.Context() ac := r.newConsumer() - hrcv := newHeaderReceiver(serverStream.Context(), r.authServer, r.gsettings.IncludeMetadata) defer func() { - if err := recover(); err != nil { - // When this happens, the stacktrace is - // important and lost if we don't capture it - // here. - r.telemetry.Logger.Debug("panic detail in otel-arrow-adapter", - zap.Reflect("recovered", err), - zap.Stack("stacktrace"), - ) - retErr = fmt.Errorf("panic in otel-arrow-adapter: %v", err) - } if err := ac.Close(); err != nil { r.telemetry.Logger.Error("arrow stream close", zap.Error(err)) } }() + defer r.recoverErr(&retErr) + + // doneCancel allows an error in the sender/receiver to + // interrupt the corresponding thread. + doneCtx, doneCancel := context.WithCancel(streamCtx) + defer doneCancel() + + // streamErrCh returns up to two errors from the sender and + // receiver threads started below. + streamErrCh := make(chan error, 2) + pendingCh := make(chan batchResp, runtime.NumCPU()) + + // wg is used to ensure this thread returns after both + // sender and recevier threads return. + var sendWG sync.WaitGroup + var recvWG sync.WaitGroup + sendWG.Add(1) + recvWG.Add(1) + + rstream := &receiverStream{ + Receiver: r, + } - for { - // Receive a batch corresponding with one ptrace.Traces, pmetric.Metrics, - // or plog.Logs item. - req, err := serverStream.Recv() - if err != nil { - // This includes the case where a client called CloseSend(), in - // which case we see an EOF error here. - r.logStreamError(err) - - if errors.Is(err, io.EOF) { - return status.Error(codes.Canceled, "client stream shutdown") - } else if errors.Is(err, context.Canceled) { - return status.Error(codes.Canceled, "server stream shutdown") - } - return err + go func() { + var err error + defer recvWG.Done() + defer r.recoverErr(&err) + err = rstream.srvReceiveLoop(doneCtx, serverStream, pendingCh, method, ac) + streamErrCh <- err + }() + + go func() { + var err error + defer sendWG.Done() + defer r.recoverErr(&err) + err = rstream.srvSendLoop(doneCtx, serverStream, &recvWG, pendingCh) + streamErrCh <- err + }() + + // Wait for sender/receiver threads to return before returning. + defer recvWG.Wait() + defer sendWG.Wait() + + select { + case <-doneCtx.Done(): + return status.Error(codes.Canceled, "server stream shutdown") + case retErr = <-streamErrCh: + doneCancel() + return + } +} + +func (r *receiverStream) newInFlightData(ctx context.Context, method string, batchID int64, pendingCh chan<- batchResp) (context.Context, *inFlightData) { + ctx, span := r.tracer.Start(ctx, "otel_arrow_stream_inflight") + + r.inFlightWG.Add(1) + r.recvInFlightRequests.Add(ctx, 1) + id := &inFlightData{ + receiverStream: r, + method: method, + batchID: batchID, + pendingCh: pendingCh, + span: span, + } + id.refs.Add(1) + return ctx, id +} + +// inFlightData is responsible for storing the resources held by one request. +type inFlightData struct { + // Receiver is the owner of the resources held by this object. + *receiverStream + + method string + batchID int64 + pendingCh chan<- batchResp + span trace.Span + + // refs counts the number of goroutines holding this object. + // initially the recvOne() body, on success the + // consumeAndRespond() function. + refs atomic.Int32 + + numAcquired int64 // how many bytes held in the semaphore + numItems int // how many items + uncompSize int64 // uncompressed data size +} + +func (id *inFlightData) recvDone(ctx context.Context, recvErrPtr *error) { + retErr := *recvErrPtr + + if retErr != nil { + // logStreamError because this response will break the stream. + id.logStreamError(retErr, "recv") + id.span.SetStatus(otelcodes.Error, retErr.Error()) + } + + id.anyDone(ctx) +} + +func (id *inFlightData) consumeDone(ctx context.Context, consumeErrPtr *error) { + retErr := *consumeErrPtr + + if retErr != nil { + // debug-level because the error was external from the pipeline. + id.telemetry.Logger.Debug("otel-arrow consume", zap.Error(retErr)) + id.span.SetStatus(otelcodes.Error, retErr.Error()) + } + + id.replyToCaller(retErr) + id.anyDone(ctx) +} + +func (id *inFlightData) replyToCaller(callerErr error) { + id.pendingCh <- batchResp{ + id: id.batchID, + err: callerErr, + } +} + +func (id *inFlightData) anyDone(ctx context.Context) { + // check if there are still refs, in which case leave the in-flight + // counts where they are. + if id.refs.Add(-1) != 0 { + return + } + + id.span.End() + + if id.numAcquired != 0 { + if err := id.boundedQueue.Release(id.numAcquired); err != nil { + id.telemetry.Logger.Error("release error", zap.Error(err)) } + } - // Check for optional headers and set the incoming context. - thisCtx, authHdrs, err := hrcv.combineHeaders(streamCtx, req.GetHeaders()) - if err != nil { - // Failing to parse the incoming headers breaks the stream. - r.telemetry.Logger.Error("arrow metadata error", zap.Error(err)) - return err + if id.uncompSize != 0 { + id.recvInFlightBytes.Add(ctx, -id.uncompSize) + } + if id.numItems != 0 { + id.recvInFlightItems.Add(ctx, int64(-id.numItems)) + } + + // The netstats code knows that uncompressed size is + // unreliable for arrow transport, so we instrument it + // directly here. Only the primary direction of transport + // is instrumented this way. + var sized netstats.SizesStruct + sized.Method = id.method + sized.Length = id.uncompSize + id.netReporter.CountReceive(ctx, sized) + + id.recvInFlightRequests.Add(ctx, -1) + id.inFlightWG.Done() +} + +// recvOne begins processing a single Arrow batch. +// +// If an error is encountered before Arrow data is successfully consumed, +// the stream will break and the error will be returned immediately. +// +// If the error is due to authorization, the stream remains unbroken +// and the request fails. +// +// If not enough resources are available, the stream will block (if +// waiting permitted) or break (insufficient waiters). +// +// Assuming success, a new goroutine is created to handle consuming the +// data. +// +// This handles constructing an inFlightData object, which itself +// tracks everything that needs to be used by instrumention when the +// batch finishes. +func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStreamServer, hrcv *headerReceiver, pendingCh chan<- batchResp, method string, ac arrowRecord.ConsumerAPI) (retErr error) { + + // Receive a batch corresponding with one ptrace.Traces, pmetric.Metrics, + // or plog.Logs item. + req, err := serverStream.Recv() + + // inflightCtx is carried through into consumeAndProcess on the success path. + inflightCtx, flight := r.newInFlightData(streamCtx, method, req.GetBatchId(), pendingCh) + defer flight.recvDone(inflightCtx, &retErr) + + if err != nil { + if errors.Is(err, io.EOF) { + return status.Error(codes.Canceled, "client stream shutdown") + } else if errors.Is(err, context.Canceled) { + return status.Error(codes.Canceled, "server stream shutdown") } + // Note: err is directly from gRPC, should already have status. + return err + } + // Check for optional headers and set the incoming context. + inflightCtx, authHdrs, err := hrcv.combineHeaders(inflightCtx, req.GetHeaders()) + if err != nil { + // Failing to parse the incoming headers breaks the stream. + return status.Errorf(codes.Internal, "arrow metadata error: %v", err) + } + + // start this span after hrcv.combineHeaders returns extracted context. This will allow this span + // to be a part of the data path trace instead of only being included as a child of the stream inflight trace. + inflightCtx, span := r.tracer.Start(inflightCtx, "otel_arrow_stream_recv") + defer span.End() + + // Authorize the request, if configured, prior to acquiring resources. + if r.authServer != nil { var authErr error - if r.authServer != nil { - var newCtx context.Context - if newCtx, err = r.authServer.Authenticate(thisCtx, authHdrs); err != nil { - authErr = err - } else { - thisCtx = newCtx - } + inflightCtx, authErr = r.authServer.Authenticate(inflightCtx, authHdrs) + if authErr != nil { + flight.replyToCaller(status.Error(codes.Unauthenticated, authErr.Error())) + return nil + } + } + + var prevAcquiredBytes int64 + uncompSizeHeaderStr, uncompSizeHeaderFound := authHdrs["otlp-pdata-size"] + if !uncompSizeHeaderFound || len(uncompSizeHeaderStr) == 0 { + // This is a compressed size so make sure to acquire the difference when request is decompressed. + prevAcquiredBytes = int64(proto.Size(req)) + } else { + prevAcquiredBytes, err = strconv.ParseInt(uncompSizeHeaderStr[0], 10, 64) + if err != nil { + return status.Errorf(codes.Internal, "failed to convert string to request size: %v", err) } + } - if err := r.processAndConsume(thisCtx, method, ac, req, serverStream, authErr); err != nil { - return err + // Use the bounded queue to memory limit based on incoming + // uncompressed request size and waiters. Acquire will fail + // immediately if there are too many waiters, or will + // otherwise block until timeout or enough memory becomes + // available. + err = r.boundedQueue.Acquire(inflightCtx, prevAcquiredBytes) + if err != nil { + return status.Errorf(codes.ResourceExhausted, "otel-arrow bounded queue: %v", err) + } + flight.numAcquired = prevAcquiredBytes + + data, numItems, uncompSize, err := r.consumeBatch(ac, req) + + if err != nil { + if errors.Is(err, arrowRecord.ErrConsumerMemoryLimit) { + return status.Errorf(codes.ResourceExhausted, "otel-arrow decode: %v", err) } + return status.Errorf(codes.Internal, "otel-arrow decode: %v", err) + } + + flight.uncompSize = uncompSize + flight.numItems = numItems + + r.recvInFlightBytes.Add(inflightCtx, uncompSize) + r.recvInFlightItems.Add(inflightCtx, int64(numItems)) + + numAcquired, err := r.acquireAdditionalBytes(inflightCtx, prevAcquiredBytes, uncompSize, hrcv.connInfo.Addr, uncompSizeHeaderFound) + + flight.numAcquired = numAcquired + if err != nil { + return status.Errorf(codes.ResourceExhausted, "otel-arrow bounded queue re-acquire: %v", err) } + + // Recognize that the request is still in-flight via consumeAndRespond() + flight.refs.Add(1) + + // consumeAndRespond consumes the data and returns control to the sender loop. + go r.consumeAndRespond(inflightCtx, data, flight) + + return nil } -func (r *Receiver) processAndConsume(ctx context.Context, method string, arrowConsumer arrowRecord.ConsumerAPI, req *arrowpb.BatchArrowRecords, serverStream anyStreamServer, authErr error) (retErr error) { +// consumeAndRespond finishes the span started in recvOne and logs the +// result after invoking the pipeline to consume the data. +func (r *Receiver) consumeAndRespond(ctx context.Context, data any, flight *inFlightData) { var err error + defer flight.consumeDone(ctx, &err) - ctx, span := r.tracer.Start(ctx, "otel_arrow_stream_recv") - defer span.End() + // recoverErr is a special function because it recovers panics, so we + // keep it in a separate defer than the processing above, which will + // run after the panic is recovered into an ordinary error. + defer r.recoverErr(&err) - r.telemetryBuilder.OtelArrowReceiverInFlightRequests.Add(ctx, 1) - defer func() { - r.telemetryBuilder.OtelArrowReceiverInFlightRequests.Add(ctx, -1) - // Set span status if an error is returned. - if retErr != nil { - span := trace.SpanFromContext(ctx) - span.SetStatus(otelcodes.Error, retErr.Error()) - } - }() + err = r.consumeData(ctx, data, flight) +} - // Process records: an error in this code path does - // not necessarily break the stream. - if authErr != nil { - err = authErr - } else { - err = r.processRecords(ctx, method, arrowConsumer, req) +// srvReceiveLoop repeatedly receives one batch of data. +func (r *receiverStream) srvReceiveLoop(ctx context.Context, serverStream anyStreamServer, pendingCh chan<- batchResp, method string, ac arrowRecord.ConsumerAPI) (retErr error) { + hrcv := newHeaderReceiver(ctx, r.authServer, r.gsettings.IncludeMetadata) + for { + select { + case <-ctx.Done(): + return status.Error(codes.Canceled, "server stream shutdown") + default: + if err := r.recvOne(ctx, serverStream, hrcv, pendingCh, method, ac); err != nil { + return err + } + } } +} +// srvReceiveLoop repeatedly sends one batch data response. +func (r *receiverStream) sendOne(serverStream anyStreamServer, resp batchResp) error { // Note: Statuses can be batched, but we do not take // advantage of this feature. - status := &arrowpb.BatchStatus{ - BatchId: req.GetBatchId(), + bs := &arrowpb.BatchStatus{ + BatchId: resp.id, } - if err == nil { - status.StatusCode = arrowpb.StatusCode_OK + if resp.err == nil { + bs.StatusCode = arrowpb.StatusCode_OK } else { - status.StatusMessage = err.Error() - switch { - case errors.Is(err, arrowRecord.ErrConsumerMemoryLimit): - r.telemetry.Logger.Error("arrow resource exhausted", zap.Error(err)) - status.StatusCode = arrowpb.StatusCode_RESOURCE_EXHAUSTED - case consumererror.IsPermanent(err): - r.telemetry.Logger.Error("arrow data error", zap.Error(err)) - status.StatusCode = arrowpb.StatusCode_INVALID_ARGUMENT - default: - r.telemetry.Logger.Debug("arrow consumer error", zap.Error(err)) - status.StatusCode = arrowpb.StatusCode_UNAVAILABLE + // Generally, code in the receiver should use + // status.Errorf(codes.XXX, ...) so that we take the + // first branch. + if gsc, ok := status.FromError(resp.err); ok { + bs.StatusCode = arrowpb.StatusCode(gsc.Code()) + bs.StatusMessage = gsc.Message() + } else { + // Ideally, we don't take this branch because all code uses + // gRPC status constructors and we've taken the branch above. + // + // This is a fallback for several broad categories of error. + bs.StatusMessage = resp.err.Error() + + switch { + case consumererror.IsPermanent(resp.err): + // Some kind of pipeline error, somewhere downstream. + r.telemetry.Logger.Error("arrow data error", zap.Error(resp.err)) + bs.StatusCode = arrowpb.StatusCode_INVALID_ARGUMENT + default: + // Probably a pipeline error, retryable. + r.telemetry.Logger.Debug("arrow consumer error", zap.Error(resp.err)) + bs.StatusCode = arrowpb.StatusCode_UNAVAILABLE + } } } - err = serverStream.Send(status) - if err != nil { - r.logStreamError(err) + if err := serverStream.Send(bs); err != nil { + // logStreamError because this response will break the stream. + r.logStreamError(err, "send") return err } + return nil } -// processRecords returns an error and a boolean indicating whether -// the error (true) was from processing the data (i.e., invalid -// argument) or (false) from the consuming pipeline. The boolean is -// not used when success (nil error) is returned. -func (r *Receiver) processRecords(ctx context.Context, method string, arrowConsumer arrowRecord.ConsumerAPI, records *arrowpb.BatchArrowRecords) error { +func (r *receiverStream) flushSender(serverStream anyStreamServer, recvWG *sync.WaitGroup, pendingCh <-chan batchResp) error { + // wait to ensure no more items are accepted + recvWG.Wait() + + // wait for all responses to be sent + r.inFlightWG.Wait() + + for { + select { + case resp := <-pendingCh: + if err := r.sendOne(serverStream, resp); err != nil { + return err + } + default: + // Currently nothing left in pendingCh. + return nil + } + } +} + +func (r *receiverStream) srvSendLoop(ctx context.Context, serverStream anyStreamServer, recvWG *sync.WaitGroup, pendingCh <-chan batchResp) error { + for { + select { + case <-ctx.Done(): + return r.flushSender(serverStream, recvWG, pendingCh) + case resp := <-pendingCh: + if err := r.sendOne(serverStream, resp); err != nil { + return err + } + } + } +} + +// consumeBatch applies the batch to the Arrow Consumer, returns a +// slice of pdata objects of the corresponding data type as `any`. +// along with the number of items and true uncompressed size. +func (r *Receiver) consumeBatch(arrowConsumer arrowRecord.ConsumerAPI, records *arrowpb.BatchArrowRecords) (retData any, numItems int, uncompSize int64, retErr error) { + payloads := records.GetArrowPayloads() if len(payloads) == 0 { - return nil - } - var uncompSize int64 - if r.telemetry.MetricsLevel > configtelemetry.LevelNormal { - defer func() { - // The netstats code knows that uncompressed size is - // unreliable for arrow transport, so we instrument it - // directly here. Only the primary direction of transport - // is instrumented this way. - var sized netstats.SizesStruct - sized.Method = method - if r.telemetry.MetricsLevel > configtelemetry.LevelNormal { - sized.Length = uncompSize - } - r.netReporter.CountReceive(ctx, sized) - r.netReporter.SetSpanSizeAttributes(ctx, sized) - }() + return nil, 0, 0, nil } switch payloads[0].Type { case arrowpb.ArrowPayloadType_UNIVARIATE_METRICS: if r.Metrics() == nil { - return status.Error(codes.Unimplemented, "metrics service not available") + return nil, 0, 0, status.Error(codes.Unimplemented, "metrics service not available") } var sizer pmetric.ProtoMarshaler - var numPts int - - ctx = r.obsrecv.StartMetricsOp(ctx) data, err := arrowConsumer.MetricsFrom(records) - if err != nil { - err = consumererror.NewPermanent(err) - } else { + if err == nil { for _, metrics := range data { - items := metrics.DataPointCount() - sz := int64(sizer.MetricsSize(metrics)) - - r.telemetryBuilder.OtelArrowReceiverInFlightBytes.Add(ctx, sz) - r.telemetryBuilder.OtelArrowReceiverInFlightItems.Add(ctx, int64(items)) - - numPts += items - uncompSize += sz - err = multierr.Append(err, - r.Metrics().ConsumeMetrics(ctx, metrics), - ) + numItems += metrics.DataPointCount() + uncompSize += int64(sizer.MetricsSize(metrics)) } - // entire request has been processed, decrement counter. - r.telemetryBuilder.OtelArrowReceiverInFlightBytes.Add(ctx, -uncompSize) - r.telemetryBuilder.OtelArrowReceiverInFlightItems.Add(ctx, int64(-numPts)) } - r.obsrecv.EndMetricsOp(ctx, streamFormat, numPts, err) - return err + retData = data + retErr = err case arrowpb.ArrowPayloadType_LOGS: if r.Logs() == nil { - return status.Error(codes.Unimplemented, "logs service not available") + return nil, 0, 0, status.Error(codes.Unimplemented, "logs service not available") } var sizer plog.ProtoMarshaler - var numLogs int - ctx = r.obsrecv.StartLogsOp(ctx) data, err := arrowConsumer.LogsFrom(records) - if err != nil { - err = consumererror.NewPermanent(err) - } else { + if err == nil { for _, logs := range data { - items := logs.LogRecordCount() - sz := int64(sizer.LogsSize(logs)) - - r.telemetryBuilder.OtelArrowReceiverInFlightBytes.Add(ctx, sz) - r.telemetryBuilder.OtelArrowReceiverInFlightItems.Add(ctx, int64(items)) - numLogs += items - uncompSize += sz - err = multierr.Append(err, - r.Logs().ConsumeLogs(ctx, logs), - ) + numItems += logs.LogRecordCount() + uncompSize += int64(sizer.LogsSize(logs)) } - // entire request has been processed, decrement counter. - r.telemetryBuilder.OtelArrowReceiverInFlightBytes.Add(ctx, -uncompSize) - r.telemetryBuilder.OtelArrowReceiverInFlightItems.Add(ctx, int64(-numLogs)) } - r.obsrecv.EndLogsOp(ctx, streamFormat, numLogs, err) - return err + retData = data + retErr = err case arrowpb.ArrowPayloadType_SPANS: if r.Traces() == nil { - return status.Error(codes.Unimplemented, "traces service not available") + return nil, 0, 0, status.Error(codes.Unimplemented, "traces service not available") } var sizer ptrace.ProtoMarshaler - var numSpans int - ctx = r.obsrecv.StartTracesOp(ctx) data, err := arrowConsumer.TracesFrom(records) - if err != nil { - err = consumererror.NewPermanent(err) - } else { + if err == nil { for _, traces := range data { - items := traces.SpanCount() - sz := int64(sizer.TracesSize(traces)) + numItems += traces.SpanCount() + uncompSize += int64(sizer.TracesSize(traces)) + } + } + retData = data + retErr = err - r.telemetryBuilder.OtelArrowReceiverInFlightBytes.Add(ctx, sz) - r.telemetryBuilder.OtelArrowReceiverInFlightItems.Add(ctx, int64(items)) + default: + retErr = ErrUnrecognizedPayload + } - numSpans += items - uncompSize += sz - err = multierr.Append(err, - r.Traces().ConsumeTraces(ctx, traces), - ) - } + return retData, numItems, uncompSize, retErr +} + +// consumeData invokes the next pipeline consumer for a received batch of data. +// it uses the standard OTel collector instrumentation (receiverhelper.ObsReport). +// +// if any errors are permanent, returns a permanent error. +func (r *Receiver) consumeData(ctx context.Context, data any, flight *inFlightData) (retErr error) { + oneOp := func(err error) { + retErr = multierr.Append(retErr, err) + } + var final func(context.Context, string, int, error) - // entire request has been processed, decrement counter. - r.telemetryBuilder.OtelArrowReceiverInFlightBytes.Add(ctx, -uncompSize) - r.telemetryBuilder.OtelArrowReceiverInFlightItems.Add(ctx, int64(-numSpans)) + switch items := data.(type) { + case []pmetric.Metrics: + ctx = r.obsrecv.StartMetricsOp(ctx) + for _, metrics := range items { + oneOp(r.Metrics().ConsumeMetrics(ctx, metrics)) } - r.obsrecv.EndTracesOp(ctx, streamFormat, numSpans, err) - return err + final = r.obsrecv.EndMetricsOp + + case []plog.Logs: + ctx = r.obsrecv.StartLogsOp(ctx) + for _, logs := range items { + oneOp(r.Logs().ConsumeLogs(ctx, logs)) + } + final = r.obsrecv.EndLogsOp + + case []ptrace.Traces: + ctx = r.obsrecv.StartTracesOp(ctx) + for _, traces := range items { + oneOp(r.Traces().ConsumeTraces(ctx, traces)) + } + final = r.obsrecv.EndTracesOp default: - return ErrUnrecognizedPayload + retErr = ErrUnrecognizedPayload + } + if final != nil { + final(ctx, streamFormat, flight.numItems, retErr) + } + return retErr +} + +func (r *Receiver) acquireAdditionalBytes(ctx context.Context, prevAcquired, uncompSize int64, addr net.Addr, uncompSizeHeaderFound bool) (int64, error) { + diff := uncompSize - prevAcquired + + if diff == 0 { + return uncompSize, nil + } + + if uncompSizeHeaderFound { + var clientAddr string + if addr != nil { + clientAddr = addr.String() + } + // a mismatch between header set by exporter and the uncompSize just calculated. + r.telemetry.Logger.Debug("mismatch between uncompressed size in receiver and otlp-pdata-size header", + zap.String("client-address", clientAddr), + zap.Int("uncompsize", int(uncompSize)), + zap.Int("otlp-pdata-size", int(prevAcquired)), + ) + } else if diff < 0 { + // proto.Size() on compressed request was greater than pdata uncompressed size. + r.telemetry.Logger.Debug("uncompressed size is less than compressed size", + zap.Int("uncompressed", int(uncompSize)), + zap.Int("compressed", int(prevAcquired)), + ) + } + + if diff < 0 { + // If the difference is negative, release the overage. + if err := r.boundedQueue.Release(-diff); err != nil { + return 0, err + } + } else { + // Release previously acquired bytes to prevent deadlock and + // reacquire the uncompressed size we just calculated. + if err := r.boundedQueue.Release(prevAcquired); err != nil { + return 0, err + } + if err := r.boundedQueue.Acquire(ctx, uncompSize); err != nil { + return 0, err + } } + return uncompSize, nil } diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go index 62f40f976296..a11362789b01 100644 --- a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go +++ b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go @@ -9,12 +9,15 @@ import ( "encoding/json" "fmt" "io" + "strconv" "strings" "sync" "testing" + "time" arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" arrowCollectorMock "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1/mock" + "github.com/open-telemetry/otel-arrow/collector/admission" "github.com/open-telemetry/otel-arrow/collector/netstats" "github.com/open-telemetry/otel-arrow/collector/testdata" arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" @@ -46,6 +49,10 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/arrow/mock" ) +func defaultBQ() *admission.BoundedQueue { + return admission.NewBoundedQueue(int64(100000), int64(10)) +} + type compareJSONTraces struct{ ptrace.Traces } type compareJSONMetrics struct{ pmetric.Metrics } type compareJSONLogs struct{ plog.Logs } @@ -102,7 +109,7 @@ func (healthyTestChannel) onConsume() error { type unhealthyTestChannel struct{} func (unhealthyTestChannel) onConsume() error { - return fmt.Errorf("consumer unhealthy") + return status.Errorf(codes.Unavailable, "consumer unhealthy") } type recvResult struct { @@ -270,22 +277,6 @@ func statusUnavailableFor(batchID int64, msg string) *arrowpb.BatchStatus { } } -func statusInvalidFor(batchID int64, msg string) *arrowpb.BatchStatus { - return &arrowpb.BatchStatus{ - BatchId: batchID, - StatusCode: arrowpb.StatusCode_INVALID_ARGUMENT, - StatusMessage: msg, - } -} - -func statusExhaustedFor(batchID int64, msg string) *arrowpb.BatchStatus { - return &arrowpb.BatchStatus{ - BatchId: batchID, - StatusCode: arrowpb.StatusCode_RESOURCE_EXHAUSTED, - StatusMessage: msg, - } -} - func (ctc *commonTestCase) newRealConsumer() arrowRecord.ConsumerAPI { mock := arrowRecordMock.NewMockConsumerAPI(ctc.ctrl) cons := arrowRecord.NewConsumer() @@ -320,7 +311,7 @@ func (ctc *commonTestCase) newOOMConsumer() arrowRecord.ConsumerAPI { return mock } -func (ctc *commonTestCase) start(newConsumer func() arrowRecord.ConsumerAPI, opts ...func(*configgrpc.ServerConfig, *auth.Server)) { +func (ctc *commonTestCase) start(newConsumer func() arrowRecord.ConsumerAPI, bq *admission.BoundedQueue, opts ...func(*configgrpc.ServerConfig, *auth.Server)) { var authServer auth.Server var gsettings configgrpc.ServerConfig for _, gf := range opts { @@ -344,6 +335,7 @@ func (ctc *commonTestCase) start(newConsumer func() arrowRecord.ConsumerAPI, opt gsettings, authServer, newConsumer, + bq, netstats.Noop{}, ) require.NoError(ctc.T, err) @@ -353,13 +345,129 @@ func (ctc *commonTestCase) start(newConsumer func() arrowRecord.ConsumerAPI, opt } func requireCanceledStatus(t *testing.T, err error) { + requireStatus(t, codes.Canceled, err) +} + +func requireUnavailableStatus(t *testing.T, err error) { + requireStatus(t, codes.Unavailable, err) +} + +func requireInternalStatus(t *testing.T, err error) { + requireStatus(t, codes.Internal, err) +} + +func requireExhaustedStatus(t *testing.T, err error) { + requireStatus(t, codes.ResourceExhausted, err) +} + +func requireStatus(t *testing.T, code codes.Code, err error) { require.Error(t, err) status, ok := status.FromError(err) require.True(t, ok, "is status-wrapped %v", err) - require.Equal(t, codes.Canceled, status.Code()) + require.Equal(t, code, status.Code()) +} + +func TestBoundedQueueWithPdataHeaders(t *testing.T) { + var sizer ptrace.ProtoMarshaler + stdTesting := otelAssert.NewStdUnitTest(t) + pdataSizeTenTraces := sizer.TracesSize(testdata.GenerateTraces(10)) + defaultBoundedQueueLimit := int64(100000) + tests := []struct { + name string + numTraces int + includePdataHeader bool + pdataSize string + rejected bool + }{ + { + name: "no header compressed greater than uncompressed", + numTraces: 10, + }, + { + name: "no header compressed less than uncompressed", + numTraces: 100, + }, + { + name: "pdata header less than uncompressedSize", + numTraces: 10, + pdataSize: strconv.Itoa(pdataSizeTenTraces / 2), + includePdataHeader: true, + }, + { + name: "pdata header equal uncompressedSize", + numTraces: 10, + pdataSize: strconv.Itoa(pdataSizeTenTraces), + includePdataHeader: true, + }, + { + name: "pdata header greater than uncompressedSize", + numTraces: 10, + pdataSize: strconv.Itoa(pdataSizeTenTraces * 2), + includePdataHeader: true, + }, + { + name: "no header compressed accepted uncompressed rejected", + numTraces: 100, + rejected: true, + }, + { + name: "pdata header accepted uncompressed rejected", + numTraces: 100, + rejected: true, + pdataSize: strconv.Itoa(pdataSizeTenTraces), + includePdataHeader: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tc := healthyTestChannel{} + ctc := newCommonTestCase(t, tc) + + td := testdata.GenerateTraces(tt.numTraces) + batch, err := ctc.testProducer.BatchArrowRecordsFromTraces(td) + require.NoError(t, err) + if tt.includePdataHeader { + var hpb bytes.Buffer + hpe := hpack.NewEncoder(&hpb) + err = hpe.WriteField(hpack.HeaderField{ + Name: "otlp-pdata-size", + Value: tt.pdataSize, + }) + assert.NoError(t, err) + batch.Headers = make([]byte, hpb.Len()) + copy(batch.Headers, hpb.Bytes()) + } + + var bq *admission.BoundedQueue + if tt.rejected { + ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(0) + bq = admission.NewBoundedQueue(int64(sizer.TracesSize(td)-100), 10) + } else { + ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil) + bq = admission.NewBoundedQueue(defaultBoundedQueueLimit, 10) + } + + ctc.start(ctc.newRealConsumer, bq) + ctc.putBatch(batch, nil) + + if tt.rejected { + requireExhaustedStatus(t, ctc.wait()) + } else { + data := <-ctc.consume + actualTD := data.Data.(ptrace.Traces) + otelAssert.Equiv(stdTesting, []json.Marshaler{ + compareJSONTraces{td}, + }, []json.Marshaler{ + compareJSONTraces{actualTD}, + }) + requireCanceledStatus(t, ctc.cancelAndWait()) + } + }) + } } func TestReceiverTraces(t *testing.T) { + stdTesting := otelAssert.NewStdUnitTest(t) tc := healthyTestChannel{} ctc := newCommonTestCase(t, tc) @@ -369,10 +477,14 @@ func TestReceiverTraces(t *testing.T) { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil) - ctc.start(ctc.newRealConsumer) + ctc.start(ctc.newRealConsumer, defaultBQ()) ctc.putBatch(batch, nil) - assert.EqualValues(t, td, (<-ctc.consume).Data) + otelAssert.Equiv(stdTesting, []json.Marshaler{ + compareJSONTraces{td}, + }, []json.Marshaler{ + compareJSONTraces{(<-ctc.consume).Data.(ptrace.Traces)}, + }) err = ctc.cancelAndWait() requireCanceledStatus(t, err) @@ -388,7 +500,7 @@ func TestReceiverLogs(t *testing.T) { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil) - ctc.start(ctc.newRealConsumer) + ctc.start(ctc.newRealConsumer, defaultBQ()) ctc.putBatch(batch, nil) assert.EqualValues(t, []json.Marshaler{compareJSONLogs{ld}}, []json.Marshaler{compareJSONLogs{(<-ctc.consume).Data.(plog.Logs)}}) @@ -408,7 +520,7 @@ func TestReceiverMetrics(t *testing.T) { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil) - ctc.start(ctc.newRealConsumer) + ctc.start(ctc.newRealConsumer, defaultBQ()) ctc.putBatch(batch, nil) otelAssert.Equiv(stdTesting, []json.Marshaler{ @@ -425,7 +537,7 @@ func TestReceiverRecvError(t *testing.T) { tc := healthyTestChannel{} ctc := newCommonTestCase(t, tc) - ctc.start(ctc.newRealConsumer) + ctc.start(ctc.newRealConsumer, defaultBQ()) ctc.putBatch(nil, fmt.Errorf("test recv error")) @@ -442,16 +554,27 @@ func TestReceiverSendError(t *testing.T) { batch, err := ctc.testProducer.BatchArrowRecordsFromLogs(ld) require.NoError(t, err) - ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(fmt.Errorf("test send error")) + ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(status.Errorf(codes.Unavailable, "test send error")) - ctc.start(ctc.newRealConsumer) + ctc.start(ctc.newRealConsumer, defaultBQ()) ctc.putBatch(batch, nil) assert.EqualValues(t, ld, (<-ctc.consume).Data) + start := time.Now() + for time.Since(start) < 10*time.Second { + if ctc.ctrl.Satisfied() { + break + } + time.Sleep(time.Second) + } + + // Release the receiver -- the sender has seen an error by + // now and should return the stream. (Oddly, gRPC has no way + // to signal the receive call to fail using context.) + close(ctc.receive) err = ctc.wait() - require.Error(t, err) - require.Contains(t, err.Error(), "test send error") + requireUnavailableStatus(t, err) } func TestReceiverConsumeError(t *testing.T) { @@ -485,7 +608,7 @@ func TestReceiverConsumeError(t *testing.T) { ctc.stream.EXPECT().Send(statusUnavailableFor(batch.BatchId, "consumer unhealthy")).Times(1).Return(nil) - ctc.start(ctc.newRealConsumer) + ctc.start(ctc.newRealConsumer, defaultBQ()) ctc.putBatch(batch, nil) @@ -523,7 +646,7 @@ func TestReceiverInvalidData(t *testing.T) { } for _, item := range data { - tc := unhealthyTestChannel{} + tc := healthyTestChannel{} ctc := newCommonTestCase(t, tc) var batch *arrowpb.BatchArrowRecords @@ -542,13 +665,12 @@ func TestReceiverInvalidData(t *testing.T) { batch = copyBatch(batch) - ctc.stream.EXPECT().Send(statusInvalidFor(batch.BatchId, "Permanent error: test invalid error")).Times(1).Return(nil) - - ctc.start(ctc.newErrorConsumer) + // newErrorConsumer determines the internal error in decoding above + ctc.start(ctc.newErrorConsumer, defaultBQ()) ctc.putBatch(batch, nil) - err = ctc.cancelAndWait() - requireCanceledStatus(t, err) + err = ctc.wait() + requireInternalStatus(t, err) } } @@ -579,13 +701,13 @@ func TestReceiverMemoryLimit(t *testing.T) { batch = copyBatch(batch) - ctc.stream.EXPECT().Send(statusExhaustedFor(batch.BatchId, "Permanent error: test oom error "+arrowRecord.ErrConsumerMemoryLimit.Error())).Times(1).Return(nil) + // The Recv() returns an error, there are no Send() calls. - ctc.start(ctc.newOOMConsumer) + ctc.start(ctc.newOOMConsumer, defaultBQ()) ctc.putBatch(batch, nil) - err = ctc.cancelAndWait() - requireCanceledStatus(t, err) + err = ctc.wait() + requireExhaustedStatus(t, err) } } @@ -618,6 +740,7 @@ func copyBatch(in *arrowpb.BatchArrowRecords) *arrowpb.BatchArrowRecords { func TestReceiverEOF(t *testing.T) { tc := healthyTestChannel{} ctc := newCommonTestCase(t, tc) + stdTesting := otelAssert.NewStdUnitTest(t) // send a sequence of data then simulate closing the connection. const times = 10 @@ -627,7 +750,7 @@ func TestReceiverEOF(t *testing.T) { ctc.stream.EXPECT().Send(gomock.Any()).Times(times).Return(nil) - ctc.start(ctc.newRealConsumer) + ctc.start(ctc.newRealConsumer, defaultBQ()) go func() { for i := 0; i < times; i++ { @@ -658,7 +781,15 @@ func TestReceiverEOF(t *testing.T) { actualData = append(actualData, (<-ctc.consume).Data.(ptrace.Traces)) } - assert.EqualValues(t, expectData, actualData) + assert.Equal(t, len(expectData), len(actualData)) + + for i := 0; i < len(expectData); i++ { + otelAssert.Equiv(stdTesting, []json.Marshaler{ + compareJSONTraces{expectData[i]}, + }, []json.Marshaler{ + compareJSONTraces{actualData[i]}, + }) + } wg.Wait() } @@ -684,7 +815,7 @@ func testReceiverHeaders(t *testing.T, includeMeta bool) { ctc.stream.EXPECT().Send(gomock.Any()).Times(len(expectData)).Return(nil) - ctc.start(ctc.newRealConsumer, func(gsettings *configgrpc.ServerConfig, _ *auth.Server) { + ctc.start(ctc.newRealConsumer, defaultBQ(), func(gsettings *configgrpc.ServerConfig, _ *auth.Server) { gsettings.IncludeMetadata = includeMeta }) @@ -756,7 +887,7 @@ func TestReceiverCancel(t *testing.T) { ctc := newCommonTestCase(t, tc) ctc.cancel() - ctc.start(ctc.newRealConsumer) + ctc.start(ctc.newRealConsumer, defaultBQ()) err := ctc.wait() requireCanceledStatus(t, err) @@ -1046,7 +1177,7 @@ func testReceiverAuthHeaders(t *testing.T, includeMeta bool, dataAuth bool) { }) var authCall *gomock.Call - ctc.start(ctc.newRealConsumer, func(gsettings *configgrpc.ServerConfig, authPtr *auth.Server) { + ctc.start(ctc.newRealConsumer, defaultBQ(), func(gsettings *configgrpc.ServerConfig, authPtr *auth.Server) { gsettings.IncludeMetadata = includeMeta as := mock.NewMockServer(ctc.ctrl) diff --git a/receiver/otelarrowreceiver/otelarrow.go b/receiver/otelarrowreceiver/otelarrow.go index 84a4ebb6487e..9a1ef1cbd4f7 100644 --- a/receiver/otelarrowreceiver/otelarrow.go +++ b/receiver/otelarrowreceiver/otelarrow.go @@ -9,6 +9,7 @@ import ( "sync" arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1" + "github.com/open-telemetry/otel-arrow/collector/admission" "github.com/open-telemetry/otel-arrow/collector/compression/zstd" "github.com/open-telemetry/otel-arrow/collector/netstats" arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record" @@ -94,25 +95,26 @@ func (r *otelArrowReceiver) startGRPCServer(cfg configgrpc.ServerConfig, _ compo return nil } -func (r *otelArrowReceiver) startProtocolServers(host component.Host) error { +func (r *otelArrowReceiver) startProtocolServers(ctx context.Context, host component.Host) error { var err error var serverOpts []grpc.ServerOption if r.netReporter != nil { serverOpts = append(serverOpts, grpc.StatsHandler(r.netReporter.Handler())) } - r.serverGRPC, err = r.cfg.GRPC.ToServer(context.Background(), host, r.settings.TelemetrySettings, serverOpts...) + r.serverGRPC, err = r.cfg.GRPC.ToServer(ctx, host, r.settings.TelemetrySettings, serverOpts...) if err != nil { return err } var authServer auth.Server if r.cfg.GRPC.Auth != nil { - authServer, err = r.cfg.GRPC.Auth.GetServerAuthenticatorContext(context.Background(), host.GetExtensions()) + authServer, err = r.cfg.GRPC.Auth.GetServerAuthenticatorContext(ctx, host.GetExtensions()) if err != nil { return err } } + bq := admission.NewBoundedQueue(int64(r.cfg.Arrow.AdmissionLimitMiB<<20), r.cfg.Arrow.WaiterLimit) r.arrowReceiver, err = arrow.New(arrow.Consumers(r), r.settings, r.obsrepGRPC, r.cfg.GRPC, authServer, func() arrowRecord.ConsumerAPI { var opts []arrowRecord.Option @@ -124,7 +126,7 @@ func (r *otelArrowReceiver) startProtocolServers(host component.Host) error { opts = append(opts, arrowRecord.WithMeterProvider(r.settings.TelemetrySettings.MeterProvider, r.settings.TelemetrySettings.MetricsLevel)) } return arrowRecord.NewConsumer(opts...) - }, r.netReporter) + }, bq, r.netReporter) if err != nil { return err @@ -158,8 +160,8 @@ func (r *otelArrowReceiver) startProtocolServers(host component.Host) error { // Start runs the trace receiver on the gRPC server. Currently // it also enables the metrics receiver too. -func (r *otelArrowReceiver) Start(_ context.Context, host component.Host) error { - return r.startProtocolServers(host) +func (r *otelArrowReceiver) Start(ctx context.Context, host component.Host) error { + return r.startProtocolServers(ctx, host) } // Shutdown is a method to turn off receiving. diff --git a/receiver/otelarrowreceiver/otelarrow_test.go b/receiver/otelarrowreceiver/otelarrow_test.go index 38e2c5c643a3..6d2130ca4123 100644 --- a/receiver/otelarrowreceiver/otelarrow_test.go +++ b/receiver/otelarrowreceiver/otelarrow_test.go @@ -9,6 +9,7 @@ import ( "errors" "fmt" "net" + "strconv" "sync" "testing" "time" @@ -65,13 +66,13 @@ func TestGRPCNewPortAlreadyUsed(t *testing.T) { require.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) } -// TestOTLPReceiverGRPCTracesIngestTest checks that the gRPC trace receiver +// TestOTelArrowReceiverGRPCTracesIngestTest checks that the gRPC trace receiver // is returning the proper response (return and metrics) when the next consumer // in the pipeline reports error. The test changes the responses returned by the // next trace consumer, checks if data was passed down the pipeline and if // proper metrics were recorded. It also uses all endpoints supported by the // trace receiver. -func TestOTLPReceiverGRPCTracesIngestTest(t *testing.T) { +func TestOTelArrowReceiverGRPCTracesIngestTest(t *testing.T) { type ingestionStateTest struct { okToIngest bool expectedCode codes.Code @@ -236,7 +237,7 @@ func TestShutdown(t *testing.T) { nextSink := new(consumertest.TracesSink) - // Create OTLP receiver + // Create OTelArrow receiver factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) cfg.GRPC.NetAddr.Endpoint = endpointGrpc @@ -380,15 +381,25 @@ func (esc *errOrSinkConsumer) Reset() { type tracesSinkWithMetadata struct { consumertest.TracesSink - MDs []client.Metadata + + lock sync.Mutex + mds []client.Metadata } func (ts *tracesSinkWithMetadata) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { info := client.FromContext(ctx) - ts.MDs = append(ts.MDs, info.Metadata) + ts.lock.Lock() + defer ts.lock.Unlock() + ts.mds = append(ts.mds, info.Metadata) return ts.TracesSink.ConsumeTraces(ctx, td) } +func (ts *tracesSinkWithMetadata) Metadatas() []client.Metadata { + ts.lock.Lock() + defer ts.lock.Unlock() + return ts.mds +} + type anyStreamClient interface { Send(*arrowpb.BatchArrowRecords) error Recv() (*arrowpb.BatchStatus, error) @@ -470,12 +481,12 @@ func TestGRPCArrowReceiver(t *testing.T) { assert.Equal(t, expectTraces, sink.AllTraces()) - assert.Equal(t, len(expectMDs), len(sink.MDs)) + assert.Equal(t, len(expectMDs), len(sink.Metadatas())) // gRPC adds its own metadata keys, so we check for only the // expected ones below: for idx := range expectMDs { for key, vals := range expectMDs[idx] { - require.Equal(t, vals, sink.MDs[idx].Get(key), "for key %s", key) + require.Equal(t, vals, sink.Metadatas()[idx].Get(key), "for key %s", key) } } } @@ -565,8 +576,8 @@ func TestGRPCArrowReceiverAuth(t *testing.T) { // The stream has to be successful to get this far. The // authenticator fails every data item: require.Equal(t, batch.BatchId, resp.BatchId) - require.Equal(t, arrowpb.StatusCode_UNAVAILABLE, resp.StatusCode) - require.Equal(t, errorString, resp.StatusMessage) + require.Equal(t, arrowpb.StatusCode_UNAUTHENTICATED, resp.StatusCode) + require.Contains(t, resp.StatusMessage, errorString) } assert.NoError(t, cc.Close()) @@ -574,3 +585,92 @@ func TestGRPCArrowReceiverAuth(t *testing.T) { assert.Equal(t, 0, len(sink.AllTraces())) } + +func TestConcurrentArrowReceiver(t *testing.T) { + addr := testutil.GetAvailableLocalAddress(t) + sink := new(tracesSinkWithMetadata) + + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + cfg.GRPC.NetAddr.Endpoint = addr + cfg.GRPC.IncludeMetadata = true + id := component.NewID(component.MustNewType("arrow")) + tt := componenttest.NewNopTelemetrySettings() + ocr := newReceiver(t, factory, tt, cfg, id, sink, nil) + + require.NotNil(t, ocr) + require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost())) + + cc, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const itemsPerStream = 10 + const numStreams = 5 + + var wg sync.WaitGroup + wg.Add(numStreams) + + for j := 0; j < numStreams; j++ { + go func() { + defer wg.Done() + + client := arrowpb.NewArrowTracesServiceClient(cc) + stream, err := client.ArrowTraces(ctx, grpc.WaitForReady(true)) + require.NoError(t, err) + producer := arrowRecord.NewProducer() + + var headerBuf bytes.Buffer + hpd := hpack.NewEncoder(&headerBuf) + + // Repeatedly send traces via arrow. Set the expected traces + // metadata to receive. + for i := 0; i < itemsPerStream; i++ { + td := testdata.GenerateTraces(2) + + headerBuf.Reset() + err := hpd.WriteField(hpack.HeaderField{ + Name: "seq", + Value: fmt.Sprint(i), + }) + require.NoError(t, err) + + batch, err := producer.BatchArrowRecordsFromTraces(td) + require.NoError(t, err) + + batch.Headers = headerBuf.Bytes() + + err = stream.Send(batch) + + require.NoError(t, err) + + resp, err := stream.Recv() + require.NoError(t, err) + require.Equal(t, batch.BatchId, resp.BatchId) + require.Equal(t, arrowpb.StatusCode_OK, resp.StatusCode) + } + }() + } + wg.Wait() + + assert.NoError(t, cc.Close()) + require.NoError(t, ocr.Shutdown(context.Background())) + + counts := make([]int, itemsPerStream) + + // Two spans per stream/item. + require.Equal(t, itemsPerStream*numStreams*2, sink.SpanCount()) + require.Equal(t, itemsPerStream*numStreams, len(sink.Metadatas())) + + for _, md := range sink.Metadatas() { + val, err := strconv.Atoi(md.Get("seq")[0]) + require.NoError(t, err) + counts[val]++ + } + + for i := 0; i < itemsPerStream; i++ { + require.Equal(t, numStreams, counts[i]) + } +} diff --git a/receiver/otelarrowreceiver/testdata/config.yaml b/receiver/otelarrowreceiver/testdata/config.yaml index 0db443736428..726263f82f9f 100644 --- a/receiver/otelarrowreceiver/testdata/config.yaml +++ b/receiver/otelarrowreceiver/testdata/config.yaml @@ -27,3 +27,5 @@ protocols: permit_without_stream: true arrow: memory_limit_mib: 123 + admission_limit_mib: 80 + waiter_limit: 100