Skip to content
3 changes: 1 addition & 2 deletions backend/api/handler/coze/loop/apis/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions backend/modules/observability/application/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ var (
mq2.NewSpanWithAnnotationProducerImpl,
redis2.NewSpansRedisDaoImpl,
mysqldao.NewTrajectoryConfigDaoImpl,
obmetrics.NewConsumeMetric,
)
openApiSet = wire.NewSet(
NewOpenAPIApplication,
Expand Down Expand Up @@ -452,6 +453,7 @@ func InitTraceIngestionApplication(
mqFactory mq.IFactory,
persistentCmdable redis.PersistentCmdable,
idGenerator idgen.IIDGenerator,
meter metrics.Meter,
) (ITraceIngestionApplication, error) {
wire.Build(traceIngestionSet)
return nil, nil
Expand Down
7 changes: 4 additions & 3 deletions backend/modules/observability/application/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"os/signal"
"syscall"

"github.com/coze-dev/coze-loop/backend/infra/metrics"
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/component"
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/exporter"
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/collector/processor"
Expand Down Expand Up @@ -109,6 +110,7 @@ func (cfg *Config) Validate() error {
type Settings struct {
Factories func() (Factories, error)
ConfigProvider ConfigProvider
ConsumeMetric metrics.Metric
}

type Collector struct {
Expand All @@ -135,8 +137,8 @@ func (col *Collector) WaitForReady() {
}

// 通常在异步线程中进行, 主线程需要等待初始化完成
func (col *Collector) Run(ctx context.Context) error {
if err := col.setupConfigurationComponents(ctx); err != nil {
func (col *Collector) Run(ctx context.Context, hook func() error) error {
if err := col.setupConfigurationComponentsWithHook(ctx, hook); err != nil {
return err
}
signal.Notify(col.signalsChannel, os.Interrupt, syscall.SIGTERM)
Expand All @@ -151,8 +153,8 @@ func (col *Collector) Run(ctx context.Context) error {
}

// 同步阻塞执行
func (col *Collector) RunInOne(ctx context.Context) error {
if err := col.setupConfigurationComponents(ctx); err != nil {
func (col *Collector) RunInOne(ctx context.Context, hook func() error) error {
if err := col.setupConfigurationComponentsWithHook(ctx, hook); err != nil {
return err
}
signal.Notify(col.signalsChannel, os.Interrupt, syscall.SIGTERM)
Expand All @@ -165,7 +167,7 @@ func (col *Collector) RunInOne(ctx context.Context) error {
return col.shutdown(ctx)
}

func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
func (col *Collector) setupConfigurationComponentsWithHook(ctx context.Context, hook func() error) error {
factories, err := col.set.Factories()
if err != nil {
return err
Expand All @@ -183,6 +185,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
ProcessorBuilder: processor.NewBuilder(cfg.Processors, factories.Processors),
ExporterBuilder: exporter.NewBuilder(cfg.Exporters, factories.Exporters),
PipelineConfig: tenantCfg,
ConsumeMetric: col.set.ConsumeMetric,
})
if err != nil {
return err
Expand All @@ -195,6 +198,10 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
return fmt.Errorf("failed to start tenant %q, %v", tenantName, err)
}
}

if err = hook(); err != nil {
fmt.Printf("hook failed, %v\n", err)
}
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ func (t *Traces) SpansCount() int {
return ret
}

func (t *Traces) SpansCountByPSM() map[string]int {
result := make(map[string]int)
for _, trace := range t.TraceData {
for _, span := range trace.SpanList {
result[span.PSM]++
}
}
return result
}

type BaseConsumer interface{}

//go:generate mockgen -destination=mocks/consumer.go -package=mocks . Consumer
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package consumer

import "context"

type injectConsumer struct {
inner Consumer
}

func NewInjectConsumer(inner Consumer) Consumer {
return &injectConsumer{inner: inner}
}

func (c *injectConsumer) ConsumeTraces(ctx context.Context, tds Traces) error {
ctx = NewSpanStatsContext(ctx)
InjectSpanCounts(ctx, tds)
return c.inner.ConsumeTraces(ctx, tds)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package consumer

import (
"context"
"sync/atomic"
"time"

"github.com/coze-dev/coze-loop/backend/infra/metrics"
obmetrics "github.com/coze-dev/coze-loop/backend/modules/observability/infra/metrics"
"github.com/coze-dev/coze-loop/backend/pkg/logs"
)

type ObserveConsumer struct {
name string
inner Consumer
nextElapsed *atomic.Int64
metric metrics.Metric
}

func NewObserveConsumer(name string, inner Consumer, nextElapsed *atomic.Int64, metric metrics.Metric) Consumer {
return &ObserveConsumer{
name: name,
inner: inner,
nextElapsed: nextElapsed,
metric: metric,
}
}

func (t *ObserveConsumer) ConsumeTraces(ctx context.Context, tds Traces) error {
if t.nextElapsed != nil {
t.nextElapsed.Store(0)
}

start := time.Now()
err := t.inner.ConsumeTraces(ctx, tds)
total := time.Since(start)

var selfDuration time.Duration
if t.nextElapsed != nil {
selfDuration = total - time.Duration(t.nextElapsed.Load())
} else {
selfDuration = total
}

isErr := err != nil
if t.metric != nil {
logs.CtxInfo(ctx, "ObserveConsumer[%s] ConsumeTraces, self_duration=%s, is_err=%s, spans_count=%d", t.name, selfDuration, boolToStr(isErr), tds.SpansCount())
psmCounts := tds.SpansCountByPSM()
for psm, count := range psmCounts {
t.metric.Emit(
[]metrics.T{
{Name: obmetrics.ConsumeTagNode, Value: t.name},
{Name: obmetrics.ConsumeTagIsErr, Value: boolToStr(isErr)},
{Name: obmetrics.ConsumeTagPSM, Value: psm},
{Name: obmetrics.ConsumeTagTenant, Value: tds.Tenant},
},
metrics.Counter(1, metrics.WithSuffix(obmetrics.ConsumeSuffixThroughput)),
metrics.Timer(selfDuration.Microseconds(), metrics.WithSuffix(obmetrics.ConsumeSuffixLatency)),
metrics.Counter(int64(count), metrics.WithSuffix(obmetrics.ConsumeSuffixSpans)),
)
}
}

if err != nil {
logs.CtxWarn(ctx, "ObserveConsumer[%s] ConsumeTraces failed, self_duration=%s, err=%v", t.name, selfDuration, err)
}
return err
}

type stopwatchConsumer struct {
inner Consumer
elapsed *atomic.Int64
}

func NewStopwatchConsumer(inner Consumer, elapsed *atomic.Int64) Consumer {
return &stopwatchConsumer{
inner: inner,
elapsed: elapsed,
}
}

func (s *stopwatchConsumer) ConsumeTraces(ctx context.Context, tds Traces) error {
start := time.Now()
err := s.inner.ConsumeTraces(ctx, tds)
s.elapsed.Add(time.Since(start).Nanoseconds())
return err
}

func boolToStr(b bool) string {
if b {
return "true"
}
return "false"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package consumer

import (
"context"
"errors"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"

metricsmocks "github.com/coze-dev/coze-loop/backend/infra/metrics/mocks"
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity"
"github.com/coze-dev/coze-loop/backend/modules/observability/domain/trace/entity/loop_span"
)

func TestObserveConsumer_ConsumeTraces_Success(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockMetric := metricsmocks.NewMockMetric(ctrl)

inner := &mockConsumer{}
timed := NewObserveConsumer("test_node", inner, nil, mockMetric)

err := timed.ConsumeTraces(context.Background(), Traces{})
assert.NoError(t, err)
}

func TestObserveConsumer_ConsumeTraces_Error(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockMetric := metricsmocks.NewMockMetric(ctrl)

expectedErr := errors.New("consume failed")
inner := &errConsumer{err: expectedErr}
timed := NewObserveConsumer("test_node", inner, nil, mockMetric)

err := timed.ConsumeTraces(context.Background(), Traces{})
assert.ErrorIs(t, err, expectedErr)
}

func TestObserveConsumer_ConsumeTraces_NilMetric(t *testing.T) {
inner := &mockConsumer{}
timed := NewObserveConsumer("test_node", inner, nil, nil)

err := timed.ConsumeTraces(context.Background(), Traces{})
assert.NoError(t, err)
}

func TestObserveConsumer_SubtractsNextElapsed(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockMetric := metricsmocks.NewMockMetric(ctrl)

nextElapsed := &atomic.Int64{}
sleepDuration := 50 * time.Millisecond

inner := &sleepConsumer{
duration: sleepDuration,
afterSleep: func() {
nextElapsed.Store((100 * time.Millisecond).Nanoseconds())
},
}
timed := NewObserveConsumer("test_node", inner, nextElapsed, mockMetric)

err := timed.ConsumeTraces(context.Background(), Traces{})
assert.NoError(t, err)
}

func TestObserveConsumer_GroupByPSM(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockMetric := metricsmocks.NewMockMetric(ctrl)
mockMetric.EXPECT().Emit(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2)

inner := &mockConsumer{}
timed := NewObserveConsumer("test_node", inner, nil, mockMetric)

traces := Traces{
Tenant: "test_tenant",
TraceData: []*entity.TraceData{
{
SpanList: loop_span.SpanList{
{PSM: "svc-a"},
{PSM: "svc-a"},
{PSM: "svc-b"},
},
},
},
}
err := timed.ConsumeTraces(context.Background(), traces)
assert.NoError(t, err)
}

func TestStopwatchConsumer_RecordsElapsed(t *testing.T) {
elapsed := &atomic.Int64{}
inner := &sleepConsumer{duration: 10 * time.Millisecond}
sw := NewStopwatchConsumer(inner, elapsed)

err := sw.ConsumeTraces(context.Background(), Traces{})
assert.NoError(t, err)
assert.Greater(t, elapsed.Load(), int64(0))
}

func TestStopwatchConsumer_AccumulatesElapsed(t *testing.T) {
elapsed := &atomic.Int64{}
inner := &sleepConsumer{duration: 5 * time.Millisecond}
sw := NewStopwatchConsumer(inner, elapsed)

_ = sw.ConsumeTraces(context.Background(), Traces{})
first := elapsed.Load()
_ = sw.ConsumeTraces(context.Background(), Traces{})
second := elapsed.Load()
assert.Greater(t, second, first)
}

type errConsumer struct {
err error
}

func (e *errConsumer) ConsumeTraces(ctx context.Context, tds Traces) error {
return e.err
}

type sleepConsumer struct {
duration time.Duration
afterSleep func()
}

func (s *sleepConsumer) ConsumeTraces(ctx context.Context, tds Traces) error {
time.Sleep(s.duration)
if s.afterSleep != nil {
s.afterSleep()
}
return nil
}
Loading
Loading