diff --git a/api-docs/docs/README.md b/api-docs/docs/README.md index bb43d14..9199401 100644 --- a/api-docs/docs/README.md +++ b/api-docs/docs/README.md @@ -92,4 +92,4 @@ const jks = LoadJKS({ #### Defined in -[index.d.ts:543](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L543) +[index.d.ts:550](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L550) diff --git a/api-docs/docs/classes/Connection.md b/api-docs/docs/classes/Connection.md index baf5708..77a1596 100644 --- a/api-docs/docs/classes/Connection.md +++ b/api-docs/docs/classes/Connection.md @@ -44,7 +44,7 @@ connection.close(); #### Defined in -[index.d.ts:400](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L400) +[index.d.ts:407](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L407) ## Methods @@ -64,7 +64,7 @@ connection.close(); #### Defined in -[index.d.ts:426](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L426) +[index.d.ts:433](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L433) --- @@ -89,7 +89,7 @@ Create a new topic. #### Defined in -[index.d.ts:407](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L407) +[index.d.ts:414](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L414) --- @@ -114,7 +114,7 @@ Delete a topic. #### Defined in -[index.d.ts:414](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L414) +[index.d.ts:421](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L421) --- @@ -133,4 +133,4 @@ List topics. #### Defined in -[index.d.ts:420](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L420) +[index.d.ts:427](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L427) diff --git a/api-docs/docs/classes/Reader.md b/api-docs/docs/classes/Reader.md index c252c0a..3bae5c6 100644 --- a/api-docs/docs/classes/Reader.md +++ b/api-docs/docs/classes/Reader.md @@ -43,7 +43,7 @@ reader.close(); #### Defined in -[index.d.ts:359](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L359) +[index.d.ts:366](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L366) ## Methods @@ -63,7 +63,7 @@ reader.close(); #### Defined in -[index.d.ts:372](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L372) +[index.d.ts:379](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L379) --- @@ -88,4 +88,4 @@ Read messages from Kafka. #### Defined in -[index.d.ts:366](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L366) +[index.d.ts:373](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L373) diff --git a/api-docs/docs/classes/SchemaRegistry.md b/api-docs/docs/classes/SchemaRegistry.md index a13f255..2c9b016 100644 --- a/api-docs/docs/classes/SchemaRegistry.md +++ b/api-docs/docs/classes/SchemaRegistry.md @@ -79,7 +79,7 @@ writer.produce({ #### Defined in -[index.d.ts:488](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L488) +[index.d.ts:495](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L495) ## Methods @@ -104,7 +104,7 @@ Create or update a schema on Schema Registry. #### Defined in -[index.d.ts:502](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L502) +[index.d.ts:509](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L509) --- @@ -129,7 +129,7 @@ Deserializes the given data and schema into its original form. #### Defined in -[index.d.ts:523](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L523) +[index.d.ts:530](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L530) --- @@ -154,7 +154,7 @@ Get a schema from Schema Registry by version and subject. #### Defined in -[index.d.ts:495](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L495) +[index.d.ts:502](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L502) --- @@ -179,7 +179,7 @@ Returns the subject name for the given SubjectNameConfig. #### Defined in -[index.d.ts:509](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L509) +[index.d.ts:516](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L516) --- @@ -204,4 +204,4 @@ Serializes the given data and schema into a byte array. #### Defined in -[index.d.ts:516](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L516) +[index.d.ts:523](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L523) diff --git a/api-docs/docs/classes/Writer.md b/api-docs/docs/classes/Writer.md index a3eb511..0497af2 100644 --- a/api-docs/docs/classes/Writer.md +++ b/api-docs/docs/classes/Writer.md @@ -51,7 +51,7 @@ writer.close(); #### Defined in -[index.d.ts:317](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L317) +[index.d.ts:324](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L324) ## Methods @@ -71,7 +71,7 @@ writer.close(); #### Defined in -[index.d.ts:330](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L330) +[index.d.ts:337](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L337) --- @@ -96,4 +96,4 @@ Write messages to Kafka. #### Defined in -[index.d.ts:324](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L324) +[index.d.ts:331](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L331) diff --git a/api-docs/docs/interfaces/ConfigEntry.md b/api-docs/docs/interfaces/ConfigEntry.md index 285ce45..8b7d73d 100644 --- a/api-docs/docs/interfaces/ConfigEntry.md +++ b/api-docs/docs/interfaces/ConfigEntry.md @@ -15,7 +15,7 @@ #### Defined in -[index.d.ts:223](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L223) +[index.d.ts:230](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L230) --- @@ -25,4 +25,4 @@ #### Defined in -[index.d.ts:224](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L224) +[index.d.ts:231](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L231) diff --git a/api-docs/docs/interfaces/ConnectionConfig.md b/api-docs/docs/interfaces/ConnectionConfig.md index 4d39157..dde9c69 100644 --- a/api-docs/docs/interfaces/ConnectionConfig.md +++ b/api-docs/docs/interfaces/ConnectionConfig.md @@ -16,7 +16,7 @@ #### Defined in -[index.d.ts:210](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L210) +[index.d.ts:217](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L217) --- @@ -26,7 +26,7 @@ #### Defined in -[index.d.ts:211](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L211) +[index.d.ts:218](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L218) --- @@ -36,4 +36,4 @@ #### Defined in -[index.d.ts:212](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L212) +[index.d.ts:219](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L219) diff --git a/api-docs/docs/interfaces/ConsumeConfig.md b/api-docs/docs/interfaces/ConsumeConfig.md index aff1fa0..94e5b5b 100644 --- a/api-docs/docs/interfaces/ConsumeConfig.md +++ b/api-docs/docs/interfaces/ConsumeConfig.md @@ -1,21 +1,39 @@ # Interface: ConsumeConfig +Configuration for Consume method. + ## Table of contents ### Properties +- [expectTimeout](ConsumeConfig.md#expecttimeout) - [limit](ConsumeConfig.md#limit) - [nanoPrecision](ConsumeConfig.md#nanoprecision) ## Properties +### expectTimeout + +• **expectTimeout**: `boolean` + +If true, return whatever messages have been collected when maxWait is +passed. + +#### Defined in + +[index.d.ts:212](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L212) + +--- + ### limit • **limit**: `number` +collect this many messages before returning. + #### Defined in -[index.d.ts:204](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L204) +[index.d.ts:205](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L205) --- @@ -23,6 +41,8 @@ • **nanoPrecision**: `boolean` +If true, returned message RFC3339 timestamps carry nanosecond precision. + #### Defined in -[index.d.ts:205](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L205) +[index.d.ts:207](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L207) diff --git a/api-docs/docs/interfaces/Container.md b/api-docs/docs/interfaces/Container.md index 21910a8..3535bf6 100644 --- a/api-docs/docs/interfaces/Container.md +++ b/api-docs/docs/interfaces/Container.md @@ -16,7 +16,7 @@ #### Defined in -[index.d.ts:263](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L263) +[index.d.ts:270](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L270) --- @@ -26,7 +26,7 @@ #### Defined in -[index.d.ts:264](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L264) +[index.d.ts:271](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L271) --- @@ -36,4 +36,4 @@ #### Defined in -[index.d.ts:265](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L265) +[index.d.ts:272](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L272) diff --git a/api-docs/docs/interfaces/JKS.md b/api-docs/docs/interfaces/JKS.md index 9eb122d..54ffcb1 100644 --- a/api-docs/docs/interfaces/JKS.md +++ b/api-docs/docs/interfaces/JKS.md @@ -16,7 +16,7 @@ #### Defined in -[index.d.ts:278](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L278) +[index.d.ts:285](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L285) --- @@ -26,7 +26,7 @@ #### Defined in -[index.d.ts:279](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L279) +[index.d.ts:286](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L286) --- @@ -36,4 +36,4 @@ #### Defined in -[index.d.ts:280](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L280) +[index.d.ts:287](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L287) diff --git a/api-docs/docs/interfaces/JKSConfig.md b/api-docs/docs/interfaces/JKSConfig.md index ea9dbdf..6098f9c 100644 --- a/api-docs/docs/interfaces/JKSConfig.md +++ b/api-docs/docs/interfaces/JKSConfig.md @@ -19,7 +19,7 @@ #### Defined in -[index.d.ts:271](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L271) +[index.d.ts:278](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L278) --- @@ -29,7 +29,7 @@ #### Defined in -[index.d.ts:272](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L272) +[index.d.ts:279](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L279) --- @@ -39,7 +39,7 @@ #### Defined in -[index.d.ts:273](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L273) +[index.d.ts:280](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L280) --- @@ -49,7 +49,7 @@ #### Defined in -[index.d.ts:270](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L270) +[index.d.ts:277](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L277) --- @@ -59,7 +59,7 @@ #### Defined in -[index.d.ts:269](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L269) +[index.d.ts:276](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L276) --- @@ -69,4 +69,4 @@ #### Defined in -[index.d.ts:274](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L274) +[index.d.ts:281](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L281) diff --git a/api-docs/docs/interfaces/Reference.md b/api-docs/docs/interfaces/Reference.md index 8d25db2..b7eae28 100644 --- a/api-docs/docs/interfaces/Reference.md +++ b/api-docs/docs/interfaces/Reference.md @@ -16,7 +16,7 @@ #### Defined in -[index.d.ts:239](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L239) +[index.d.ts:246](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L246) --- @@ -26,7 +26,7 @@ #### Defined in -[index.d.ts:240](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L240) +[index.d.ts:247](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L247) --- @@ -36,4 +36,4 @@ #### Defined in -[index.d.ts:241](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L241) +[index.d.ts:248](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L248) diff --git a/api-docs/docs/interfaces/ReplicaAssignment.md b/api-docs/docs/interfaces/ReplicaAssignment.md index 6a2a8d9..d2813fd 100644 --- a/api-docs/docs/interfaces/ReplicaAssignment.md +++ b/api-docs/docs/interfaces/ReplicaAssignment.md @@ -15,7 +15,7 @@ #### Defined in -[index.d.ts:217](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L217) +[index.d.ts:224](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L224) --- @@ -25,4 +25,4 @@ #### Defined in -[index.d.ts:218](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L218) +[index.d.ts:225](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L225) diff --git a/api-docs/docs/interfaces/Schema.md b/api-docs/docs/interfaces/Schema.md index b7c529f..b09498d 100644 --- a/api-docs/docs/interfaces/Schema.md +++ b/api-docs/docs/interfaces/Schema.md @@ -20,7 +20,7 @@ #### Defined in -[index.d.ts:246](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L246) +[index.d.ts:253](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L253) --- @@ -30,7 +30,7 @@ #### Defined in -[index.d.ts:247](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L247) +[index.d.ts:254](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L254) --- @@ -40,7 +40,7 @@ #### Defined in -[index.d.ts:251](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L251) +[index.d.ts:258](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L258) --- @@ -50,7 +50,7 @@ #### Defined in -[index.d.ts:248](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L248) +[index.d.ts:255](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L255) --- @@ -60,7 +60,7 @@ #### Defined in -[index.d.ts:249](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L249) +[index.d.ts:256](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L256) --- @@ -70,7 +70,7 @@ #### Defined in -[index.d.ts:252](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L252) +[index.d.ts:259](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L259) --- @@ -80,4 +80,4 @@ #### Defined in -[index.d.ts:250](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L250) +[index.d.ts:257](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L257) diff --git a/api-docs/docs/interfaces/SubjectNameConfig.md b/api-docs/docs/interfaces/SubjectNameConfig.md index 9b636f1..024cbaa 100644 --- a/api-docs/docs/interfaces/SubjectNameConfig.md +++ b/api-docs/docs/interfaces/SubjectNameConfig.md @@ -17,7 +17,7 @@ #### Defined in -[index.d.ts:258](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L258) +[index.d.ts:265](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L265) --- @@ -27,7 +27,7 @@ #### Defined in -[index.d.ts:256](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L256) +[index.d.ts:263](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L263) --- @@ -37,7 +37,7 @@ #### Defined in -[index.d.ts:259](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L259) +[index.d.ts:266](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L266) --- @@ -47,4 +47,4 @@ #### Defined in -[index.d.ts:257](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L257) +[index.d.ts:264](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L264) diff --git a/api-docs/docs/interfaces/TopicConfig.md b/api-docs/docs/interfaces/TopicConfig.md index e1665ca..3ea6294 100644 --- a/api-docs/docs/interfaces/TopicConfig.md +++ b/api-docs/docs/interfaces/TopicConfig.md @@ -18,7 +18,7 @@ #### Defined in -[index.d.ts:233](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L233) +[index.d.ts:240](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L240) --- @@ -28,7 +28,7 @@ #### Defined in -[index.d.ts:230](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L230) +[index.d.ts:237](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L237) --- @@ -38,7 +38,7 @@ #### Defined in -[index.d.ts:232](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L232) +[index.d.ts:239](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L239) --- @@ -48,7 +48,7 @@ #### Defined in -[index.d.ts:231](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L231) +[index.d.ts:238](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L238) --- @@ -58,4 +58,4 @@ #### Defined in -[index.d.ts:229](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L229) +[index.d.ts:236](https://github.com/mostafa/xk6-kafka/blob/main/api-docs/index.d.ts#L236) diff --git a/api-docs/index.d.ts b/api-docs/index.d.ts index 5b69d44..aaf4da7 100644 --- a/api-docs/index.d.ts +++ b/api-docs/index.d.ts @@ -199,10 +199,17 @@ export interface ReaderConfig { tls: TLSConfig; } -/* Configuration for Consume method. */ +/** Configuration for Consume method. */ export interface ConsumeConfig { + /** collect this many messages before returning. */ limit: number; + /** If true, returned message RFC3339 timestamps carry nanosecond precision. */ nanoPrecision: boolean; + /** + * If true, return whatever messages have been collected when maxWait is + * passed. + * */ + expectTimeout: boolean; } /* Configuration for creating a Connector instance for working with topics. */ diff --git a/reader.go b/reader.go index c885dde..b6ce70a 100644 --- a/reader.go +++ b/reader.go @@ -77,6 +77,7 @@ type ReaderConfig struct { type ConsumeConfig struct { Limit int64 `json:"limit"` NanoPrecision bool `json:"nanoPrecision"` + ExpectTimeout bool `json:"expectTimeout"` } type Duration struct { @@ -331,6 +332,11 @@ func (k *Kafka) consume( ctxWithTimeout, cancel := context.WithTimeout(ctx, maxWait) msg, err := reader.ReadMessage(ctxWithTimeout) cancel() + if errors.Is(ctxWithTimeout.Err(), context.DeadlineExceeded) && consumeConfig.ExpectTimeout { + k.reportReaderStats(reader.Stats()) + + return messages + } if errors.Is(err, io.EOF) { k.reportReaderStats(reader.Stats()) diff --git a/reader_test.go b/reader_test.go index ed924b9..871b0d1 100644 --- a/reader_test.go +++ b/reader_test.go @@ -21,32 +21,75 @@ func TestConsumerMaxWaitExceeded(t *testing.T) { defer writer.Close() // Create a reader to consume messages. - assert.NotPanics(t, func() { - reader := test.module.Kafka.reader(&ReaderConfig{ - Brokers: []string{"localhost:9092"}, - Topic: test.topicName, - MaxWait: Duration{time.Second * 3}, - }) - assert.NotNil(t, reader) - defer reader.Close() + reader := test.module.Kafka.reader(&ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: test.topicName, + MaxWait: Duration{250 * time.Millisecond}, + }) + assert.NotNil(t, reader) + defer reader.Close() - // Switch to VU code. - require.NoError(t, test.moveToVUCode()) + // Switch to VU code. + require.NoError(t, test.moveToVUCode()) - // Consume a message in the VU function. - assert.Panics(t, func() { - messages := test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1}) - assert.Empty(t, messages) - }) + test.module.Kafka.produce(writer, &ProduceConfig{ + Messages: []Message{ + { + Value: test.module.Kafka.serialize(&Container{ + Data: "value1", + SchemaType: String, + }), + }, + }, }) - // Check if no message was consumed. + // Allow receiving messages consumed before MaxWait + messages := test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 2, ExpectTimeout: true}) + assert.Equal(t, 1, len(messages)) + + // Check that message was consumed. metricsValues := test.getCounterMetricsValues() assert.Equal(t, 1.0, metricsValues[test.module.metrics.ReaderDials.Name]) assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderErrors.Name]) - assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderBytes.Name]) - assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderMessages.Name]) + assert.Equal(t, 6.0, metricsValues[test.module.metrics.ReaderBytes.Name]) + assert.Equal(t, 1.0, metricsValues[test.module.metrics.ReaderMessages.Name]) assert.Equal(t, 0.0, metricsValues[test.module.metrics.ReaderRebalances.Name]) + + // Fail on deadline in the default case + assert.Panics(t, func() { + test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 2}) + }) +} + +// TestConsumerPanicsOnClose tests the consume function when the reader is being +// closed. Closing the reader while reading is considered unexpected and should +// panic. +func TestConsumerPanicsOnClose(t *testing.T) { + test := getTestModuleInstance(t) + test.createTopic() + + // Create a reader to consume messages. + reader := test.module.Kafka.reader(&ReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: test.topicName, + MaxWait: Duration{time.Second * 3}, + }) + defer reader.Close() + + go func() { + // Wait for a bit so consumption starts. + time.Sleep(250 * time.Millisecond) + // Close reader to cause consume to panic. + reader.Close() + }() + + // Switch to VU code. + require.NoError(t, test.moveToVUCode()) + + // Consume a message in the VU function. + assert.Panics(t, func() { + test.module.Kafka.consume(reader, &ConsumeConfig{Limit: 1}) + }) } // TestConsume tests the consume function.