Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions kq/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ import (
type (
PushOption func(options *pushOptions)

// KeyValue represents a key-value pair for batch sending
KeyValue struct {
Key string
Value string
}

Pusher struct {
topic string
producer kafkaWriter
Expand Down Expand Up @@ -178,3 +184,57 @@ func WithSyncPush() PushOption {
options.syncPush = true
}
}

// BatchPush sends multiple messages to the Kafka topic.
// It generates timestamp-based keys for each message automatically.
func (p *Pusher) BatchPush(ctx context.Context, msgs []string) error {
if len(msgs) == 0 {
return nil
}

keyValues := make([]KeyValue, len(msgs))
baseTime := time.Now().UnixNano()
for i, msg := range msgs {
keyValues[i] = KeyValue{
Key: strconv.FormatInt(baseTime+int64(i), 10),
Value: msg,
}
}
return p.BatchPushWithKeys(ctx, keyValues)
}

// BatchPushWithKeys sends multiple key-value pairs to the Kafka topic.
func (p *Pusher) BatchPushWithKeys(ctx context.Context, keyValues []KeyValue) error {
if len(keyValues) == 0 {
return nil
}

msgs := make([]kafka.Message, len(keyValues))
for i, kv := range keyValues {
msg := kafka.Message{
Key: []byte(kv.Key),
Value: []byte(kv.Value),
}

// wrap message into message carrier for tracing
mc := internal.NewMessageCarrier(internal.NewMessage(&msg))
// inject trace context into message
otel.GetTextMapPropagator().Inject(ctx, mc)

msgs[i] = msg
}

// Handle sync vs async mode
if p.executor != nil {
// Async mode: use executor for batching
for _, msg := range msgs {
if err := p.executor.Add(msg, len(msg.Value)); err != nil {
return err
}
}
return nil
} else {
// Sync mode: send directly
return p.producer.WriteMessages(ctx, msgs...)
}
}
97 changes: 97 additions & 0 deletions kq/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,100 @@ func TestPusher_PushWithKey_Error(t *testing.T) {
assert.Equal(t, expectedError, err)
mockWriter.AssertExpectations(t)
}

func TestPusher_BatchPush(t *testing.T) {
mockWriter := new(mockKafkaWriter)
pusher := &Pusher{
producer: mockWriter,
topic: "test-topic",
}

ctx := context.Background()
msgs := []string{"message1", "message2", "message3"}

mockWriter.On("WriteMessages", ctx, mock.MatchedBy(func(msgs []kafka.Message) bool {
return len(msgs) == 3 &&
string(msgs[0].Value) == "message1" &&
string(msgs[1].Value) == "message2" &&
string(msgs[2].Value) == "message3"
})).Return(nil)

err := pusher.BatchPush(ctx, msgs)
assert.NoError(t, err)
mockWriter.AssertExpectations(t)
}

func TestPusher_BatchPushWithKeys(t *testing.T) {
mockWriter := new(mockKafkaWriter)
pusher := &Pusher{
producer: mockWriter,
topic: "test-topic",
}

ctx := context.Background()
keyValues := []KeyValue{
{Key: "key1", Value: "value1"},
{Key: "key2", Value: "value2"},
}

mockWriter.On("WriteMessages", ctx, mock.MatchedBy(func(msgs []kafka.Message) bool {
return len(msgs) == 2 &&
string(msgs[0].Key) == "key1" && string(msgs[0].Value) == "value1" &&
string(msgs[1].Key) == "key2" && string(msgs[1].Value) == "value2"
})).Return(nil)

err := pusher.BatchPushWithKeys(ctx, keyValues)
assert.NoError(t, err)
mockWriter.AssertExpectations(t)
}

func TestPusher_BatchPush_EmptyMessages(t *testing.T) {
mockWriter := new(mockKafkaWriter)
pusher := &Pusher{
producer: mockWriter,
topic: "test-topic",
}

ctx := context.Background()
var msgs []string

err := pusher.BatchPush(ctx, msgs)
assert.NoError(t, err)
// Should not call WriteMessages for empty slice
mockWriter.AssertNotCalled(t, "WriteMessages")
}

func TestPusher_BatchPushWithKeys_EmptyKeyValues(t *testing.T) {
mockWriter := new(mockKafkaWriter)
pusher := &Pusher{
producer: mockWriter,
topic: "test-topic",
}

ctx := context.Background()
var keyValues []KeyValue

err := pusher.BatchPushWithKeys(ctx, keyValues)
assert.NoError(t, err)
// Should not call WriteMessages for empty slice
mockWriter.AssertNotCalled(t, "WriteMessages")
}

func TestPusher_BatchPush_Error(t *testing.T) {
mockWriter := new(mockKafkaWriter)
pusher := &Pusher{
producer: mockWriter,
topic: "test-topic",
}

ctx := context.Background()
msgs := []string{"message1", "message2"}

expectedError := errors.New("batch write error")
mockWriter.On("WriteMessages", ctx, mock.AnythingOfType("[]kafka.Message")).Return(expectedError)

err := pusher.BatchPush(ctx, msgs)
assert.Error(t, err)
assert.Equal(t, expectedError, err)
mockWriter.AssertExpectations(t)
}