Skip to content

Commit

Permalink
Updates to OTel-Arrow v0.24.0 deps (#33518)
Browse files Browse the repository at this point in the history
**Description:** 
This is a copy of the otel-arrow repository exporter and receiver
components at

open-telemetry/otel-arrow@dd6e224

Updates both go.mods to OTel-Arrow v0.24.0 dependencies, the current
release.

Applies repo-specific `make gci` logic.

**Link to tracking Issue:** #26491
  • Loading branch information
jmacd authored Jun 13, 2024
1 parent 7519485 commit 71bb4b0
Show file tree
Hide file tree
Showing 22 changed files with 1,014 additions and 339 deletions.
27 changes: 27 additions & 0 deletions .chloggen/otel-arrow-v024.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: OTel-Arrow

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Update to OTel-Arrow v0.24.0

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26491]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
5 changes: 2 additions & 3 deletions exporter/otelarrowexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelar
go 1.21.0

require (
github.com/apache/arrow/go/v14 v14.0.2
github.com/apache/arrow/go/v16 v16.1.0
github.com/open-telemetry/otel-arrow v0.24.0
github.com/open-telemetry/otel-arrow/collector v0.23.0
github.com/open-telemetry/otel-arrow/collector v0.24.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector v0.102.2-0.20240611143128-7dfb57b9ad1c
go.opentelemetry.io/collector/component v0.102.2-0.20240611143128-7dfb57b9ad1c
Expand Down Expand Up @@ -36,7 +36,6 @@ require (

require (
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/apache/arrow/go/v16 v16.1.0 // indirect
github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
Expand Down
6 changes: 2 additions & 4 deletions exporter/otelarrowexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 9 additions & 4 deletions exporter/otelarrowexporter/internal/arrow/bestofn.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ import (
"math/rand"
"runtime"
"sort"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// bestOfNPrioritizer is a prioritizer that selects a less-loaded stream to write.
Expand Down Expand Up @@ -42,16 +46,17 @@ type streamSorter struct {

var _ streamPrioritizer = &bestOfNPrioritizer{}

func newBestOfNPrioritizer(dc doneCancel, numChoices, numStreams int, lf loadFunc) (*bestOfNPrioritizer, []*streamWorkState) {
func newBestOfNPrioritizer(dc doneCancel, numChoices, numStreams int, lf loadFunc, maxLifetime time.Duration) (*bestOfNPrioritizer, []*streamWorkState) {
var state []*streamWorkState

// Limit numChoices to the number of streams.
numChoices = min(numStreams, numChoices)

for i := 0; i < numStreams; i++ {
ws := &streamWorkState{
waiters: map[int64]chan<- error{},
toWrite: make(chan writeItem, 1),
maxStreamLifetime: addJitter(maxLifetime),
waiters: map[int64]chan<- error{},
toWrite: make(chan writeItem, 1),
}

state = append(state, ws)
Expand Down Expand Up @@ -112,7 +117,7 @@ func (lp *bestOfNPrioritizer) sendAndWait(ctx context.Context, errCh <-chan erro
case <-lp.done:
return ErrStreamRestarting
case <-ctx.Done():
return context.Canceled
return status.Errorf(codes.Canceled, "stream wait: %v", ctx.Err())
case lp.input <- wri:
return waitForWrite(ctx, errCh, lp.done)
}
Expand Down
19 changes: 13 additions & 6 deletions exporter/otelarrowexporter/internal/arrow/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
)

// Exporter is 1:1 with exporter, isolates arrow-specific
Expand All @@ -32,9 +34,7 @@ type Exporter struct {
// prioritizerName the name of a balancer policy.
prioritizerName PrioritizerName

// maxStreamLifetime is a limit on duration for streams. A
// slight "jitter" is applied relative to this value on a
// per-stream basis.
// maxStreamLifetime is a limit on duration for streams.
maxStreamLifetime time.Duration

// disableDowngrade prevents downgrade from occurring, supports
Expand Down Expand Up @@ -156,7 +156,7 @@ func (e *Exporter) Start(ctx context.Context) error {
downCtx, downDc := newDoneCancel(ctx)

var sws []*streamWorkState
e.ready, sws = newStreamPrioritizer(downDc, e.prioritizerName, e.numStreams)
e.ready, sws = newStreamPrioritizer(downDc, e.prioritizerName, e.numStreams, e.maxStreamLifetime)

for _, ws := range sws {
e.startArrowStream(downCtx, ws)
Expand Down Expand Up @@ -236,7 +236,6 @@ func (e *Exporter) runArrowStream(ctx context.Context, dc doneCancel, state *str
producer := e.newProducer()

stream := newStream(producer, e.ready, e.telemetry, e.netReporter, state)
stream.maxStreamLifetime = addJitter(e.maxStreamLifetime)

defer func() {
if err := producer.Close(); err != nil {
Expand All @@ -258,6 +257,14 @@ func (e *Exporter) runArrowStream(ctx context.Context, dc doneCancel, state *str
//
// consumer should fall back to standard OTLP, (true, nil)
func (e *Exporter) SendAndWait(ctx context.Context, data any) (bool, error) {
// If the incoming context is already canceled, return the
// same error condition a unary gRPC or HTTP exporter would do.
select {
case <-ctx.Done():
return false, status.Errorf(codes.Canceled, "context done before send: %v", ctx.Err())
default:
}

errCh := make(chan error, 1)

// Note that if the OTLP exporter's gRPC Headers field was
Expand Down Expand Up @@ -343,7 +350,7 @@ func waitForWrite(ctx context.Context, errCh <-chan error, down <-chan struct{})
select {
case <-ctx.Done():
// This caller's context timed out.
return ctx.Err()
return status.Errorf(codes.Canceled, "send wait: %v", ctx.Err())
case <-down:
return ErrStreamRestarting
case err := <-errCh:
Expand Down
26 changes: 22 additions & 4 deletions exporter/otelarrowexporter/internal/arrow/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package arrow
import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"sync/atomic"
Expand All @@ -31,7 +30,9 @@ import (
"go.uber.org/zap/zaptest"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

var AllPrioritizers = []PrioritizerName{LeastLoadedPrioritizer, LeastLoadedTwoPrioritizer}
Expand Down Expand Up @@ -278,7 +279,18 @@ func TestArrowExporterTimeout(t *testing.T) {
sent, err := tc.exporter.SendAndWait(ctx, twoTraces)
require.True(t, sent)
require.Error(t, err)
require.True(t, errors.Is(err, context.Canceled))

stat, is := status.FromError(err)
require.True(t, is, "is a gRPC status")
require.Equal(t, codes.Canceled, stat.Code())

// Repeat the request, will get immediate timeout.
sent, err = tc.exporter.SendAndWait(ctx, twoTraces)
require.False(t, sent)
stat, is = status.FromError(err)
require.True(t, is, "is a gRPC status error: %v", err)
require.Equal(t, "context done before send: context canceled", stat.Message())
require.Equal(t, codes.Canceled, stat.Code())

require.NoError(t, tc.exporter.Shutdown(ctx))
})
Expand Down Expand Up @@ -406,7 +418,10 @@ func TestArrowExporterConnectTimeout(t *testing.T) {
}()
_, err := tc.exporter.SendAndWait(ctx, twoTraces)
require.Error(t, err)
require.True(t, errors.Is(err, context.Canceled))

stat, is := status.FromError(err)
require.True(t, is, "is a gRPC status error: %v", err)
require.Equal(t, codes.Canceled, stat.Code())

require.NoError(t, tc.exporter.Shutdown(bg))
})
Expand Down Expand Up @@ -489,7 +504,10 @@ func TestArrowExporterStreamRace(t *testing.T) {
// This blocks until the cancelation.
_, err := tc.exporter.SendAndWait(callctx, twoTraces)
require.Error(t, err)
require.True(t, errors.Is(err, context.Canceled))

stat, is := status.FromError(err)
require.True(t, is, "is a gRPC status error: %v", err)
require.Equal(t, codes.Canceled, stat.Code())
}()
}

Expand Down
7 changes: 4 additions & 3 deletions exporter/otelarrowexporter/internal/arrow/prioritizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

"go.opentelemetry.io/collector/component"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -50,18 +51,18 @@ type streamWriter interface {
sendAndWait(context.Context, <-chan error, writeItem) error
}

func newStreamPrioritizer(dc doneCancel, name PrioritizerName, numStreams int) (streamPrioritizer, []*streamWorkState) {
func newStreamPrioritizer(dc doneCancel, name PrioritizerName, numStreams int, maxLifetime time.Duration) (streamPrioritizer, []*streamWorkState) {
if name == unsetPrioritizer {
name = DefaultPrioritizer
}
if strings.HasPrefix(string(name), llPrefix) {
// error was checked and reported in Validate
n, err := strconv.Atoi(string(name[len(llPrefix):]))
if err == nil {
return newBestOfNPrioritizer(dc, n, numStreams, pendingRequests)
return newBestOfNPrioritizer(dc, n, numStreams, pendingRequests, maxLifetime)
}
}
return newBestOfNPrioritizer(dc, numStreams, numStreams, pendingRequests)
return newBestOfNPrioritizer(dc, numStreams, numStreams, pendingRequests, maxLifetime)
}

// pendingRequests is the load function used by leastloadedN.
Expand Down
Loading

0 comments on commit 71bb4b0

Please sign in to comment.