Skip to content

Commit

Permalink
refactor: improve the code structure and documentation of qrynexporter.
Browse files Browse the repository at this point in the history
Rename the batchV2 struct to traceWithTagsBatch to improve code readability.
Update the names of struct fields to make them more descriptive.
Rename the traces_v2.go file to trace_batch_processor.go.
Use a custom contextKey type in the pushTraceData function to resolve the SA1029 warning.
Optimize README.md to provide more detailed configuration instructions.
These changes are aimed at improving code quality, maintainability, and documentation clarity.
  • Loading branch information
Cluas committed Oct 21, 2024
1 parent fd0f14f commit 45fdef0
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 46 deletions.
14 changes: 14 additions & 0 deletions exporter/qrynexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,25 @@


# Configuration options:
- `dsn` (required): Data Source Name for Clickhouse.
- Example: `tcp://localhost:9000/?database=cloki`

- `clustered_clickhouse` (required):
- Type: boolean
- Description: Set to `true` if using a Clickhouse cluster; otherwise, set to `false`.

- `client_side_trace_processing` (required):
- Type: boolean
- Default: `true`
- Description: Enables client-side processing of trace data. This can improve performance but may increase client-side resource usage.

<<<<<<< HEAD
- `dsn` (required): Clickhouse's dsn.
- `clustered_clickhouse` (required): true if clickhouse cluster is used
- `client_side_trace_processing`: use improved traces ingestion algorythm for clickhouse clusters.
Data ingestion is sess performant but more evenly distributed
=======
>>>>>>> e35202d (refactor: improve the code structure and documentation of qrynexporter.)
# Example:
## Simple Trace Data
Expand Down
4 changes: 4 additions & 0 deletions exporter/qrynexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ type Config struct {
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`

<<<<<<< HEAD

Check failure on line 33 in exporter/qrynexporter/config.go

View workflow job for this annotation

GitHub Actions / lint-and-test

syntax error: unexpected <<, expected field name or embedded type
=======

Check failure on line 34 in exporter/qrynexporter/config.go

View workflow job for this annotation

GitHub Actions / lint-and-test

syntax error: unexpected ==, expected field name or embedded type
// ClientSideTraceProcessing is a boolean that indicates whether to process traces on the client side.
>>>>>>> e35202d (refactor: improve the code structure and documentation of qrynexporter.)
ClientSideTraceProcessing bool `mapstructure:"client_side_trace_processing"`

ClusteredClickhouse bool `mapstructure:"clustered_clickhouse"`
Expand Down
2 changes: 1 addition & 1 deletion exporter/qrynexporter/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func (e *logsExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
}
}

if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil {
if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil {
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeLogs)))
e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
return err
Expand Down
2 changes: 1 addition & 1 deletion exporter/qrynexporter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func (e *metricsExporter) pushMetricsData(ctx context.Context, md pmetric.Metric
}
}

if err := batchSamplesAndTimeSeries(context.WithValue(ctx, "cluster", e.cluster), e.db, samples, timeSeries); err != nil {
if err := batchSamplesAndTimeSeries(context.WithValue(ctx, clusterKey, e.cluster), e.db, samples, timeSeries); err != nil {
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeMetrics)))
e.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
return err
Expand Down
10 changes: 5 additions & 5 deletions exporter/qrynexporter/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

var (
tracesInputSQL = func(clustered bool) string {
tracesInputSQL = func(_ bool) string {
return `INSERT INTO traces_input (
trace_id,
span_id,
Expand Down Expand Up @@ -110,8 +110,8 @@ func TracesTagsV2InputSQL(clustered bool) string {
//
// ) Engine=Null

// Trace represent trace model
type Trace struct {
// TraceInput represent trace model
type TraceInput struct {
TraceID string `ch:"trace_id"`
SpanID string `ch:"span_id"`
ParentID string `ch:"parent_id"`
Expand All @@ -124,7 +124,7 @@ type Trace struct {
Tags [][]string `ch:"tags"`
}

type TraceV2 struct {
type TempoTrace struct {
OID string `ch:"oid"`
TraceID []byte `ch:"trace_id"`
SpanID []byte `ch:"span_id"`
Expand All @@ -137,7 +137,7 @@ type TraceV2 struct {
Payload string `ch:"payload"`
}

type TraceTagsV2 struct {
type TempoTraceTag struct {
OID string `ch:"oid"`
Date time.Time `ch:"date"`
Key string `ch:"key"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,71 +3,72 @@ package qrynexporter
import (
"encoding/hex"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"time"

"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)

type batchV2 struct {
type traceWithTagsBatch struct {
driver.Batch
subBatch driver.Batch
tagsBatch driver.Batch
}

func (b *batchV2) AppendStruct(data any) error {
_data, ok := data.(*Trace)
func (b *traceWithTagsBatch) AppendStruct(v any) error {
ti, ok := v.(*TraceInput)
if !ok {
return fmt.Errorf("invalid data type, expected *Trace, got %T", data)
return fmt.Errorf("invalid data type, expected *Trace, got %T", v)
}
binTraceId, err := unhexAndPad(_data.TraceID, 16)
binTraceId, err := unhexAndPad(ti.TraceID, 16)
if err != nil {
return err
}
binParentID, err := unhexAndPad(_data.ParentID, 8)
binParentID, err := unhexAndPad(ti.ParentID, 8)
if err != nil {
return err
}
binSpanID, err := unhexAndPad(_data.SpanID, 8)
binSpanID, err := unhexAndPad(ti.SpanID, 8)
if err != nil {
return err
}
trace := &TraceV2{
trace := &TempoTrace{
OID: "0",
TraceID: binTraceId,
SpanID: binSpanID,
ParentID: binParentID,
Name: _data.Name,
TimestampNs: _data.TimestampNs,
DurationNs: _data.DurationNs,
ServiceName: _data.ServiceName,
PayloadType: _data.PayloadType,
Payload: _data.Payload,
Name: ti.Name,
TimestampNs: ti.TimestampNs,
DurationNs: ti.DurationNs,
ServiceName: ti.ServiceName,
PayloadType: ti.PayloadType,
Payload: ti.Payload,
}
err = b.Batch.AppendStruct(trace)
if err != nil {
return err
}
for _, tag := range _data.Tags {
attr := &TraceTagsV2{
for _, tag := range ti.Tags {
attr := &TempoTraceTag{
OID: "0",
Date: time.Unix(0, trace.TimestampNs).Truncate(time.Hour * 24),
Key: tag[0],
Val: tag[1],
TraceID: binTraceId,
SpanID: binSpanID,
TimestampNs: _data.TimestampNs,
DurationNs: _data.DurationNs,
TimestampNs: ti.TimestampNs,
DurationNs: ti.DurationNs,
}
err = b.subBatch.AppendStruct(attr)
err = b.tagsBatch.AppendStruct(attr)
if err != nil {
return err
}
}
return nil
}

func (b *batchV2) Abort() error {
func (b *traceWithTagsBatch) Abort() error {
var errs [2]error
errs[0] = b.Batch.Abort()
errs[1] = b.subBatch.Abort()
errs[1] = b.tagsBatch.Abort()
for _, err := range errs {
if err != nil {
return err
Expand All @@ -76,10 +77,10 @@ func (b *batchV2) Abort() error {
return nil
}

func (b *batchV2) Send() error {
func (b *traceWithTagsBatch) Send() error {
var errs [2]error
errs[0] = b.Batch.Send()
errs[1] = b.subBatch.Send()
errs[1] = b.tagsBatch.Send()
for _, err := range errs {
if err != nil {
return err
Expand Down
39 changes: 25 additions & 14 deletions exporter/qrynexporter/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ import (
"google.golang.org/protobuf/encoding/protojson"
)

type contextKey string

const (
spanLinkDataFormat = "%s|%s|%s|%s|%d"
spanLinkDataFormat = "%s|%s|%s|%s|%d"
clusterKey contextKey = "cluster"
)

var delegate = &protojson.MarshalOptions{
Expand All @@ -48,9 +51,9 @@ type tracesExporter struct {
logger *zap.Logger
meter metric.Meter

db clickhouse.Conn
cluster bool
v2 bool
db clickhouse.Conn
cluster bool
clientSide bool
}

// newTracesExporter returns a SpanWriter for the database
Expand All @@ -64,11 +67,19 @@ func newTracesExporter(logger *zap.Logger, cfg *Config, set *exporter.Settings)
return nil, err
}
exp := &tracesExporter{
<<<<<<< HEAD

Check failure on line 70 in exporter/qrynexporter/traces.go

View workflow job for this annotation

GitHub Actions / lint-and-test

syntax error: unexpected <<, expected expression
logger: logger,
meter: set.MeterProvider.Meter(typeStr),

Check failure on line 72 in exporter/qrynexporter/traces.go

View workflow job for this annotation

GitHub Actions / lint-and-test

syntax error: unexpected ) in composite literal; possibly missing comma or }
db: db,
cluster: cfg.ClusteredClickhouse,
v2: cfg.ClusteredClickhouse && cfg.ClientSideTraceProcessing,
=======
logger: logger,
meter: set.MeterProvider.Meter(typeStr),
db: db,
cluster: cfg.ClusteredClickhouse,
clientSide: cfg.ClientSideTraceProcessing,
>>>>>>> e35202d (refactor: improve the code structure and documentation of qrynexporter.)
}
if err := initMetrics(exp.meter); err != nil {

Check failure on line 84 in exporter/qrynexporter/traces.go

View workflow job for this annotation

GitHub Actions / lint-and-test

syntax error: non-declaration statement outside function body
exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error()))
Expand Down Expand Up @@ -165,11 +176,11 @@ func extractScopeTags(il pcommon.InstrumentationScope, tags map[string]string) {
}

func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans ptrace.ResourceSpansSlice) error {
isCluster := ctx.Value("cluster").(bool)
isCluster := ctx.Value(clusterKey).(bool)
var batch driver.Batch
var err error
if e.v2 {
batch, err = e.prepareBatchV2(ctx)
if e.clientSide {
batch, err = e.prepareBatchClientSide(ctx)
} else {
batch, err = e.db.PrepareBatch(ctx, tracesInputSQL(isCluster))
}
Expand All @@ -196,7 +207,7 @@ func (e *tracesExporter) exportResourceSapns(ctx context.Context, resourceSpans
return nil
}

func (e *tracesExporter) prepareBatchV2(ctx context.Context) (driver.Batch, error) {
func (e *tracesExporter) prepareBatchClientSide(ctx context.Context) (driver.Batch, error) {
batch, err := e.db.PrepareBatch(ctx, TracesV2InputSQL(e.cluster))
if err != nil {
return nil, err
Expand All @@ -206,15 +217,15 @@ func (e *tracesExporter) prepareBatchV2(ctx context.Context) (driver.Batch, erro
batch.Abort()
return nil, err
}
return &batchV2{
Batch: batch,
subBatch: subBatch,
return &traceWithTagsBatch{
Batch: batch,
tagsBatch: subBatch,
}, nil
}

// traceDataPusher implements OTEL exporterhelper.traceDataPusher
func (e *tracesExporter) pushTraceData(ctx context.Context, td ptrace.Traces) error {
_ctx := context.WithValue(ctx, "cluster", e.cluster)
_ctx := context.WithValue(ctx, clusterKey, e.cluster)
start := time.Now()
if err := e.exportResourceSapns(_ctx, td.ResourceSpans()); err != nil {
otelcolExporterQrynBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start.UnixMilli(), metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError, dataTypeTraces)))
Expand Down Expand Up @@ -387,7 +398,7 @@ func marshalSpanToJSON(span ptrace.Span, mergedAttributes pcommon.Map) ([]byte,
return delegate.Marshal(otlpSpan)
}

func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName string, tags map[string]string) (*Trace, error) {
func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName string, tags map[string]string) (*TraceInput, error) {
durationNano := uint64(span.EndTimestamp() - span.StartTimestamp())
tags = aggregateSpanTags(span, tags)
tags["service.name"] = serviceName
Expand All @@ -404,7 +415,7 @@ func convertTracesInput(span ptrace.Span, resource pcommon.Resource, serviceName
return nil, fmt.Errorf("failed to marshal span: %w", err)
}

trace := &Trace{
trace := &TraceInput{
TraceID: span.TraceID().String(),
SpanID: span.SpanID().String(),
ParentID: span.ParentSpanID().String(),
Expand Down

0 comments on commit 45fdef0

Please sign in to comment.