Skip to content

Commit

Permalink
When maxWait is reached, consume return messages received so far.
Browse files Browse the repository at this point in the history
The previous behavior, to panic on deadline is problematic, since
there are a number of scenarios where you simply do not know how
many messages there are to consume or when they are produced. For
example, a load test may produce messages in bursts with jitter
for a burn-in test, to try to provoke i data races in the test
subject.

The commit also adds a test to verify that we panic if Close() is
called while we are consuming. Given the nature of the problem
xk6-kafka is trying to solve, this sounds like an errar condition.
  • Loading branch information
bittrance committed Dec 2, 2024
1 parent 6165af2 commit e413d37
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 17 deletions.
5 changes: 5 additions & 0 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,11 @@ func (k *Kafka) consume(
ctxWithTimeout, cancel := context.WithTimeout(ctx, maxWait)
msg, err := reader.ReadMessage(ctxWithTimeout)
cancel()
if errors.Is(ctxWithTimeout.Err(), context.DeadlineExceeded) {
k.reportReaderStats(reader.Stats())

return messages
}
if errors.Is(err, io.EOF) {
k.reportReaderStats(reader.Stats())

Expand Down
72 changes: 55 additions & 17 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,72 @@ func TestConsumerMaxWaitExceeded(t *testing.T) {
defer writer.Close()

// Create a reader to consume messages.
assert.NotPanics(t, func() {
reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: test.topicName,
MaxWait: Duration{time.Second * 3},
})
assert.NotNil(t, reader)
defer reader.Close()
reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: test.topicName,
MaxWait: Duration{250 * time.Millisecond},
})
assert.NotNil(t, reader)
defer reader.Close()

// Switch to VU code.
require.NoError(t, test.moveToVUCode())
// Switch to VU code.
require.NoError(t, test.moveToVUCode())

// Consume a message in the VU function.
assert.Panics(t, func() {
messages := test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1})
assert.Empty(t, messages)
})
test.module.Kafka.produce(writer, &ProduceConfig{
Messages: []Message{
{
Value: test.module.Kafka.serialize(&Container{
Data: "value1",
SchemaType: String,
}),
},
},
})

// Consume a message in the VU function.
messages := test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1})
assert.Equal(t, 1, len(messages))

// Check if no message was consumed.
metricsValues := test.getCounterMetricsValues()
assert.Equal(t, 1.0, metricsValues[test.module.metrics.ReaderDials.Name])
assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderErrors.Name])
assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderBytes.Name])
assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderMessages.Name])
assert.Equal(t, 6.0, metricsValues[test.module.metrics.ReaderBytes.Name])
assert.Equal(t, 1.0, metricsValues[test.module.metrics.ReaderMessages.Name])
assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderRebalances.Name])
}

// TestConsumerPanicsOnClose tests the consume function when the reader is being
// closed. Closing the reader while reading is considered unexpected and should
// panic.
func TestConsumerPanicsOnClose(t *testing.T) {
test := getTestModuleInstance(t)
test.createTopic()

// Create a reader to consume messages.
reader := test.module.Kafka.reader(&ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: test.topicName,
MaxWait: Duration{time.Second * 3},
})
defer reader.Close()

go func() {
// Wait for a bit so consumption starts.
time.Sleep(250 * time.Millisecond)
// Close reader to cause consume to panic.
reader.Close()
}()

// Switch to VU code.
require.NoError(t, test.moveToVUCode())

// Consume a message in the VU function.
assert.Panics(t, func() {
test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1})
})
}

// TestConsume tests the consume function.
// nolint: funlen
func TestConsume(t *testing.T) {
Expand Down

0 comments on commit e413d37

Please sign in to comment.