diff --git a/reader.go b/reader.go index c885dde..ccbc919 100644 --- a/reader.go +++ b/reader.go @@ -331,6 +331,11 @@ func (k *Kafka) consume( ctxWithTimeout, cancel := context.WithTimeout(ctx, maxWait) msg, err := reader.ReadMessage(ctxWithTimeout) cancel() + if errors.Is(ctxWithTimeout.Err(), context.DeadlineExceeded) { + k.reportReaderStats(reader.Stats()) + + return messages + } if errors.Is(err, io.EOF) { k.reportReaderStats(reader.Stats()) diff --git a/reader_test.go b/reader_test.go index ed924b9..0a60626 100644 --- a/reader_test.go +++ b/reader_test.go @@ -21,34 +21,72 @@ func TestConsumerMaxWaitExceeded(t *testing.T) { defer writer.Close() // Create a reader to consume messages. - assert.NotPanics(t, func() { - reader := test.module.Kafka.reader(&ReaderConfig{ - Brokers: []string{"localhost:9092"}, - Topic: test.topicName, - MaxWait: Duration{time.Second * 3}, - }) - assert.NotNil(t, reader) - defer reader.Close() + reader := test.module.Kafka.reader(&ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: test.topicName, + MaxWait: Duration{250 * time.Millisecond}, + }) + assert.NotNil(t, reader) + defer reader.Close() - // Switch to VU code. - require.NoError(t, test.moveToVUCode()) + // Switch to VU code. + require.NoError(t, test.moveToVUCode()) - // Consume a message in the VU function. - assert.Panics(t, func() { - messages := test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1}) - assert.Empty(t, messages) - }) + test.module.Kafka.produce(writer, &ProduceConfig{ + Messages: []Message{ + { + Value: test.module.Kafka.serialize(&Container{ + Data: "value1", + SchemaType: String, + }), + }, + }, }) + // Consume a message in the VU function. + messages := test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1}) + assert.Equal(t, 1, len(messages)) + // Check if no message was consumed. metricsValues := test.getCounterMetricsValues() assert.Equal(t, 1.0, metricsValues[test.module.metrics.ReaderDials.Name]) assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderErrors.Name]) - assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderBytes.Name]) - assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderMessages.Name]) + assert.Equal(t, 6.0, metricsValues[test.module.metrics.ReaderBytes.Name]) + assert.Equal(t, 1.0, metricsValues[test.module.metrics.ReaderMessages.Name]) assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderRebalances.Name]) } +// TestConsumerPanicsOnClose tests the consume function when the reader is being +// closed. Closing the reader while reading is considered unexpected and should +// panic. +func TestConsumerPanicsOnClose(t *testing.T) { + test := getTestModuleInstance(t) + test.createTopic() + + // Create a reader to consume messages. + reader := test.module.Kafka.reader(&ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: test.topicName, + MaxWait: Duration{time.Second * 3}, + }) + defer reader.Close() + + go func() { + // Wait for a bit so consumption starts. + time.Sleep(250 * time.Millisecond) + // Close reader to cause consume to panic. + reader.Close() + }() + + // Switch to VU code. + require.NoError(t, test.moveToVUCode()) + + // Consume a message in the VU function. + assert.Panics(t, func() { + test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1}) + }) +} + // TestConsume tests the consume function. // nolint: funlen func TestConsume(t *testing.T) {