Skip to content

Commit

Permalink
Flag to return messages received so far when maxWait is reached.
Browse files Browse the repository at this point in the history
The current behavior, to panic on deadline is problematic, since
there are a number of scenarios where you simply do not know how
many messages there are to consume or when they are produced. For
example, a load test may produce messages in bursts with jitter
for a burn-in test, to try to provoke i data races in the test
subject.

This commit introduces a flag ExpectTimeout which will return message
received so far.

The commit also adds a test to verify that we panic if Close() is
called while we are consuming. Given the nature of the problem
xk6-kafka is trying to solve, this sounds like an errar condition.
  • Loading branch information
bittrance committed Dec 3, 2024
1 parent 27bfd01 commit 7b8efa7
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 18 deletions.
6 changes: 6 additions & 0 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type ReaderConfig struct {
type ConsumeConfig struct {
Limit int64 `json:"limit"`
NanoPrecision bool `json:"nanoPrecision"`
ExpectTimeout bool `json:"expectTimeout"`
}

type Duration struct {
Expand Down Expand Up @@ -331,6 +332,11 @@ func (k *Kafka) consume(
ctxWithTimeout, cancel := context.WithTimeout(ctx, maxWait)
msg, err := reader.ReadMessage(ctxWithTimeout)
cancel()
if errors.Is(ctxWithTimeout.Err(), context.DeadlineExceeded) && consumeConfig.ExpectTimeout {
k.reportReaderStats(reader.Stats())

return messages
}
if errors.Is(err, io.EOF) {
k.reportReaderStats(reader.Stats())

Expand Down
79 changes: 61 additions & 18 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,75 @@ func TestConsumerMaxWaitExceeded(t *testing.T) {
defer writer.Close()

// Create a reader to consume messages.
assert.NotPanics(t, func() {
reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: test.topicName,
MaxWait: Duration{time.Second * 3},
})
assert.NotNil(t, reader)
defer reader.Close()
reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: test.topicName,
MaxWait: Duration{250 * time.Millisecond},
})
assert.NotNil(t, reader)
defer reader.Close()

// Switch to VU code.
require.NoError(t, test.moveToVUCode())
// Switch to VU code.
require.NoError(t, test.moveToVUCode())

// Consume a message in the VU function.
assert.Panics(t, func() {
messages := test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1})
assert.Empty(t, messages)
})
test.module.Kafka.produce(writer, &ProduceConfig{
Messages: []Message{
{
Value: test.module.Kafka.serialize(&Container{
Data: "value1",
SchemaType: String,
}),
},
},
})

// Check if no message was consumed.
// Allow receiving messages consumed before MaxWait
messages := test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 2, ExpectTimeout: true})
assert.Equal(t, 1, len(messages))

// Check that message was consumed.
metricsValues := test.getCounterMetricsValues()
assert.Equal(t, 1.0, metricsValues[test.module.metrics.ReaderDials.Name])
assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderErrors.Name])
assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderBytes.Name])
assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderMessages.Name])
assert.Equal(t, 6.0, metricsValues[test.module.metrics.ReaderBytes.Name])
assert.Equal(t, 1.0, metricsValues[test.module.metrics.ReaderMessages.Name])
assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderRebalances.Name])

// Fail on deadline in the default case
assert.Panics(t, func() {
test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 2})
})
}

// TestConsumerPanicsOnClose tests the consume function when the reader is being
// closed. Closing the reader while reading is considered unexpected and should
// panic.
func TestConsumerPanicsOnClose(t *testing.T) {
test := getTestModuleInstance(t)
test.createTopic()

// Create a reader to consume messages.
reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: test.topicName,
MaxWait: Duration{time.Second * 3},
})
defer reader.Close()

go func() {
// Wait for a bit so consumption starts.
time.Sleep(250 * time.Millisecond)
// Close reader to cause consume to panic.
reader.Close()
}()

// Switch to VU code.
require.NoError(t, test.moveToVUCode())

// Consume a message in the VU function.
assert.Panics(t, func() {
test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1})
})
}

// TestConsume tests the consume function.
Expand Down

0 comments on commit 7b8efa7

Please sign in to comment.