Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When maxWait is reached, consume return messages received so far #1

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions kafka_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package kafka

import (
"context"
"fmt"
"testing"
"time"

"github.com/grafana/sobek"
kafkago "github.com/segmentio/kafka-go"
Expand All @@ -20,6 +22,7 @@ const (

// struct to keep all the things test need in one place.
type kafkaTest struct {
topicName string
rt *sobek.Runtime
module *Module
vu *modulestest.VU
Expand Down Expand Up @@ -52,8 +55,10 @@ func getTestModuleInstance(tb testing.TB) *kafkaTest {
require.True(tb, ok)

require.NoError(tb, runtime.Set("kafka", moduleInstance.Exports().Default))
topicName := fmt.Sprintf("%s-%d", tb.Name(), time.Now().UnixMilli())

return &kafkaTest{
topicName: topicName,
rt: runtime,
module: moduleInstance,
vu: mockVU,
Expand Down Expand Up @@ -99,37 +104,37 @@ func (k *kafkaTest) getCounterMetricsValues() map[string]float64 {
}

// newWriter creates a Kafka writer for the reader tests.
func (k *kafkaTest) newWriter(topicName string) *kafkago.Writer {
func (k *kafkaTest) newWriter() *kafkago.Writer {
// Create a writer to produce messages.
return k.module.Kafka.writer(&WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: topicName,
Topic: k.topicName,
})
}

// newReader creates a Kafka reader for the reader tests.
func (k *kafkaTest) newReader(topicName string) *kafkago.Reader {
func (k *kafkaTest) newReader() *kafkago.Reader {
// Create a reader to consume messages.
return k.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topicName,
Topic: k.topicName,
})
}

// createTopic creates a topic.
func (k *kafkaTest) createTopic(topicName string) {
func (k *kafkaTest) createTopic() {
// Create a connection to Kafka.
connection := k.module.Kafka.getKafkaControllerConnection(&ConnectionConfig{
Address: "localhost:9092",
})
defer connection.Close()

// Create a topic.
k.module.Kafka.createTopic(connection, &kafkago.TopicConfig{Topic: topicName})
k.module.Kafka.createTopic(connection, &kafkago.TopicConfig{Topic: k.topicName})
}

// topicExists checks if a topic exists.
func (k *kafkaTest) topicExists(topicName string) bool {
func (k *kafkaTest) topicExists() bool {
// Create a connection to Kafka.
connection := k.module.Kafka.getKafkaControllerConnection(&ConnectionConfig{
Address: "localhost:9092",
Expand All @@ -139,7 +144,7 @@ func (k *kafkaTest) topicExists(topicName string) bool {
// Create a topic.
topics := k.module.Kafka.listTopics(connection)
for _, topic := range topics {
if topic == topicName {
if topic == k.topicName {
return true
}
}
Expand Down
6 changes: 6 additions & 0 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type ReaderConfig struct {
type ConsumeConfig struct {
Limit int64 `json:"limit"`
NanoPrecision bool `json:"nanoPrecision"`
ExpectTimeout bool `json:"expectTimeout"`
}

type Duration struct {
Expand Down Expand Up @@ -331,6 +332,11 @@ func (k *Kafka) consume(
ctxWithTimeout, cancel := context.WithTimeout(ctx, maxWait)
msg, err := reader.ReadMessage(ctxWithTimeout)
cancel()
if errors.Is(ctxWithTimeout.Err(), context.DeadlineExceeded) && consumeConfig.ExpectTimeout {
k.reportReaderStats(reader.Stats())

return messages
}
if errors.Is(err, io.EOF) {
k.reportReaderStats(reader.Stats())

Expand Down
158 changes: 100 additions & 58 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,53 +16,97 @@ import (
// The reader should not hang
func TestConsumerMaxWaitExceeded(t *testing.T) {
test := getTestModuleInstance(t)
writer := test.newWriter("test-topic")
test.createTopic()
writer := test.newWriter()
defer writer.Close()

// Create a reader to consume messages.
assert.NotPanics(t, func() {
reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
MaxWait: Duration{time.Second * 3},
})
assert.NotNil(t, reader)
defer reader.Close()
reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: test.topicName,
MaxWait: Duration{250 * time.Millisecond},
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need 3 seconds to get down to the consume() call even on noisy Github :)

})
assert.NotNil(t, reader)
defer reader.Close()

// Switch to VU code.
require.NoError(t, test.moveToVUCode())
// Switch to VU code.
require.NoError(t, test.moveToVUCode())

// Consume a message in the VU function.
assert.Panics(t, func() {
messages := test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1})
assert.Empty(t, messages)
})
test.module.Kafka.produce(writer, &ProduceConfig{
Messages: []Message{
{
Value: test.module.Kafka.serialize(&Container{
Data: "value1",
SchemaType: String,
}),
},
},
})

// Check if no message was consumed.
// Allow receiving messages consumed before MaxWait
messages := test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 2, ExpectTimeout: true})
assert.Equal(t, 1, len(messages))

// Check that message was consumed.
metricsValues := test.getCounterMetricsValues()
assert.Equal(t, 1.0, metricsValues[test.module.metrics.ReaderDials.Name])
assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderErrors.Name])
assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderBytes.Name])
assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderMessages.Name])
assert.Equal(t, 6.0, metricsValues[test.module.metrics.ReaderBytes.Name])
assert.Equal(t, 1.0, metricsValues[test.module.metrics.ReaderMessages.Name])
assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderRebalances.Name])

// Fail on deadline in the default case
assert.Panics(t, func() {
test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 2})
})
}

// TestConsumerPanicsOnClose tests the consume function when the reader is being
// closed. Closing the reader while reading is considered unexpected and should
// panic.
func TestConsumerPanicsOnClose(t *testing.T) {
test := getTestModuleInstance(t)
test.createTopic()

// Create a reader to consume messages.
reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: test.topicName,
MaxWait: Duration{time.Second * 3},
})
defer reader.Close()

go func() {
// Wait for a bit so consumption starts.
time.Sleep(250 * time.Millisecond)
// Close reader to cause consume to panic.
reader.Close()
}()

// Switch to VU code.
require.NoError(t, test.moveToVUCode())

// Consume a message in the VU function.
assert.Panics(t, func() {
test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1})
})
}

// TestConsume tests the consume function.
// nolint: funlen
func TestConsume(t *testing.T) {
test := getTestModuleInstance(t)
test.createTopic("test-topic")
writer := test.newWriter("test-topic")
test.createTopic()
writer := test.newWriter()
defer writer.Close()

assert.True(t, test.topicExists("test-topic"))
assert.True(t, test.topicExists())

// Create a reader to consume messages.
assert.NotPanics(t, func() {
reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
Topic: test.topicName,
})
assert.NotNil(t, reader)
defer reader.Close()
Expand Down Expand Up @@ -139,19 +183,19 @@ func TestConsume(t *testing.T) {
// TestConsumeWithoutKey tests the consume function without a key.
func TestConsumeWithoutKey(t *testing.T) {
test := getTestModuleInstance(t)
writer := test.newWriter("test-topic")
test.createTopic()
writer := test.newWriter()
defer writer.Close()

reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: test.topicName,
})
assert.NotNil(t, reader)
defer reader.Close()

// Create a reader to consume messages.
assert.NotPanics(t, func() {
reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
Offset: 1,
})
assert.NotNil(t, reader)
defer reader.Close()

// Switch to VU code.
require.NoError(t, test.moveToVUCode())

Expand All @@ -164,7 +208,6 @@ func TestConsumeWithoutKey(t *testing.T) {
Data: "value1",
SchemaType: String,
}),
Offset: 1,
},
},
})
Expand Down Expand Up @@ -198,18 +241,19 @@ func TestConsumeWithoutKey(t *testing.T) {
// TestConsumerContextCancelled tests the consume function and fails on a cancelled context.
func TestConsumerContextCancelled(t *testing.T) {
test := getTestModuleInstance(t)
writer := test.newWriter("test-topic")
test.createTopic()
writer := test.newWriter()
defer writer.Close()

reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: test.topicName,
})
assert.NotNil(t, reader)
defer reader.Close()

// Create a reader to consume messages.
assert.NotPanics(t, func() {
reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
})
assert.NotNil(t, reader)
defer reader.Close()

// Switch to VU code.
require.NoError(t, test.moveToVUCode())

Expand All @@ -232,8 +276,7 @@ func TestConsumerContextCancelled(t *testing.T) {

// Consume a message in the VU function.
assert.Panics(t, func() {
messages := test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1})
assert.Empty(t, messages)
test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1})
})
})

Expand All @@ -249,19 +292,19 @@ func TestConsumerContextCancelled(t *testing.T) {
// TestConsumeJSON tests the consume function with a JSON value.
func TestConsumeJSON(t *testing.T) {
test := getTestModuleInstance(t)
writer := test.newWriter("test-topic")
test.createTopic()
writer := test.newWriter()
defer writer.Close()

reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: test.topicName,
})
assert.NotNil(t, reader)
defer reader.Close()

// Create a reader to consume messages.
assert.NotPanics(t, func() {
reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "test-topic",
Offset: 3,
})
assert.NotNil(t, reader)
defer reader.Close()

// Switch to VU code.
require.NoError(t, test.moveToVUCode())

Expand All @@ -273,8 +316,7 @@ func TestConsumeJSON(t *testing.T) {
test.module.Kafka.produce(writer, &ProduceConfig{
Messages: []Message{
{
Value: serialized,
Offset: 3,
Value: serialized,
},
},
})
Expand Down Expand Up @@ -309,8 +351,8 @@ func TestReaderClass(t *testing.T) {
test := getTestModuleInstance(t)

require.NoError(t, test.moveToVUCode())
test.createTopic("test-reader-class")
writer := test.newWriter("test-reader-class")
test.createTopic()
writer := test.newWriter()
defer writer.Close()

test.module.Kafka.produce(writer, &ProduceConfig{
Expand All @@ -334,7 +376,7 @@ func TestReaderClass(t *testing.T) {
test.module.vu.Runtime().ToValue(
map[string]interface{}{
"brokers": []string{"localhost:9092"},
"topic": "test-reader-class",
"topic": test.topicName,
"maxWait": "3s",
},
),
Expand All @@ -344,7 +386,7 @@ func TestReaderClass(t *testing.T) {
this := reader.Get("This").Export().(*kafkago.Reader)
assert.NotNil(t, this)
assert.Equal(t, this.Config().Brokers, []string{"localhost:9092"})
assert.Equal(t, this.Config().Topic, "test-reader-class")
assert.Equal(t, this.Config().Topic, test.topicName)
assert.Equal(t, this.Config().MaxWait, time.Second*3)

consume := reader.Get("consume").Export().(func(sobek.FunctionCall) sobek.Value)
Expand Down
6 changes: 3 additions & 3 deletions schema_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestGetLatestSchemaFails(t *testing.T) {
srClient := test.module.Kafka.schemaRegistryClient(&srConfig)
assert.Panics(t, func() {
schema := test.module.Kafka.getSchema(srClient, &Schema{
Subject: "test-subject",
Subject: "no-such-subject",
Version: 0,
})
assert.Equal(t, schema, nil)
Expand All @@ -125,7 +125,7 @@ func TestGetSchemaFails(t *testing.T) {
srClient := test.module.Kafka.schemaRegistryClient(&srConfig)
assert.Panics(t, func() {
schema := test.module.Kafka.getSchema(srClient, &Schema{
Subject: "test-subject",
Subject: "no-such-subject",
Version: 0,
})
assert.Equal(t, schema, nil)
Expand All @@ -147,7 +147,7 @@ func TestCreateSchemaFails(t *testing.T) {
srClient := test.module.Kafka.schemaRegistryClient(&srConfig)
assert.Panics(t, func() {
schema := test.module.Kafka.getSchema(srClient, &Schema{
Subject: "test-subject",
Subject: "no-such-subject",
Version: 0,
})
assert.Equal(t, schema, nil)
Expand Down
Loading