diff --git a/reader.go b/reader.go index c885dde..b6ce70a 100644 --- a/reader.go +++ b/reader.go @@ -77,6 +77,7 @@ type ReaderConfig struct { type ConsumeConfig struct { Limit int64 `json:"limit"` NanoPrecision bool `json:"nanoPrecision"` + ExpectTimeout bool `json:"expectTimeout"` } type Duration struct { @@ -331,6 +332,11 @@ func (k *Kafka) consume( ctxWithTimeout, cancel := context.WithTimeout(ctx, maxWait) msg, err := reader.ReadMessage(ctxWithTimeout) cancel() + if errors.Is(ctxWithTimeout.Err(), context.DeadlineExceeded) && consumeConfig.ExpectTimeout { + k.reportReaderStats(reader.Stats()) + + return messages + } if errors.Is(err, io.EOF) { k.reportReaderStats(reader.Stats()) diff --git a/reader_test.go b/reader_test.go index ed924b9..871b0d1 100644 --- a/reader_test.go +++ b/reader_test.go @@ -21,32 +21,75 @@ func TestConsumerMaxWaitExceeded(t *testing.T) { defer writer.Close() // Create a reader to consume messages. - assert.NotPanics(t, func() { - reader := test.module.Kafka.reader(&ReaderConfig{ - Brokers: []string{"localhost:9092"}, - Topic: test.topicName, - MaxWait: Duration{time.Second * 3}, - }) - assert.NotNil(t, reader) - defer reader.Close() + reader := test.module.Kafka.reader(&ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: test.topicName, + MaxWait: Duration{250 * time.Millisecond}, + }) + assert.NotNil(t, reader) + defer reader.Close() - // Switch to VU code. - require.NoError(t, test.moveToVUCode()) + // Switch to VU code. + require.NoError(t, test.moveToVUCode()) - // Consume a message in the VU function. - assert.Panics(t, func() { - messages := test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1}) - assert.Empty(t, messages) - }) + test.module.Kafka.produce(writer, &ProduceConfig{ + Messages: []Message{ + { + Value: test.module.Kafka.serialize(&Container{ + Data: "value1", + SchemaType: String, + }), + }, + }, }) - // Check if no message was consumed. + // Allow receiving messages consumed before MaxWait + messages := test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 2, ExpectTimeout: true}) + assert.Equal(t, 1, len(messages)) + + // Check that message was consumed. metricsValues := test.getCounterMetricsValues() assert.Equal(t, 1.0, metricsValues[test.module.metrics.ReaderDials.Name]) assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderErrors.Name]) - assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderBytes.Name]) - assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderMessages.Name]) + assert.Equal(t, 6.0, metricsValues[test.module.metrics.ReaderBytes.Name]) + assert.Equal(t, 1.0, metricsValues[test.module.metrics.ReaderMessages.Name]) assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderRebalances.Name]) + + // Fail on deadline in the default case + assert.Panics(t, func() { + test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 2}) + }) +} + +// TestConsumerPanicsOnClose tests the consume function when the reader is being +// closed. Closing the reader while reading is considered unexpected and should +// panic. +func TestConsumerPanicsOnClose(t *testing.T) { + test := getTestModuleInstance(t) + test.createTopic() + + // Create a reader to consume messages. + reader := test.module.Kafka.reader(&ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: test.topicName, + MaxWait: Duration{time.Second * 3}, + }) + defer reader.Close() + + go func() { + // Wait for a bit so consumption starts. + time.Sleep(250 * time.Millisecond) + // Close reader to cause consume to panic. + reader.Close() + }() + + // Switch to VU code. + require.NoError(t, test.moveToVUCode()) + + // Consume a message in the VU function. + assert.Panics(t, func() { + test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1}) + }) } // TestConsume tests the consume function.