From c7bd315262121e4f21403b1fb77702ab3f3df125 Mon Sep 17 00:00:00 2001 From: eduardowitter Date: Thu, 9 Dec 2021 14:19:13 -0300 Subject: [PATCH] Allow producing messages without specifying a Key (#30) * Allow producing messages without specifying a Key According to [1], Keys are optional. With this change, by not setting a key for a message it will be considered null. Example: ~~~ let messages = [ { value: JSON.stringify({ firstname: "firstname-" + index, lastname: "lastname-" + index, }), // Only value is set. The key field is not set. }, ]; let error = produceWithConfiguration( producer, messages, configuration, null, //!< Null for key schema, not used valueSchema ); ~~~ [1] https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets * Adding test scripts * Explicitly set 'nil' for .Key in message if key is not set * Revert "Explicitly set 'nil' for .Key in message if key is not set" This reverts commit fc959d45054fe58198fa8b6398e7edeed6e64231. * scripts: show consumer messages as debugs If --verbose is provided, messages received by the consumer as displayed to make clear that only 'value' is present. Example from test_avro_with_schema_registry_no_leys.js: DEBU[0000] Message: {"value":{"firstname":"firstname-2","lastname":"lastname-2"}} source=console --- producer.go | 34 ++++--- scripts/test_avro_no_key.js | 97 +++++++++++++++++++ .../test_avro_with_schema_registry_no_key.js | 95 ++++++++++++++++++ 3 files changed, 213 insertions(+), 13 deletions(-) create mode 100644 scripts/test_avro_no_key.js create mode 100644 scripts/test_avro_with_schema_registry_no_key.js diff --git a/producer.go b/producer.go index 294ad75..de7cdb6 100644 --- a/producer.go +++ b/producer.go @@ -90,30 +90,38 @@ func ProduceInternal( kafkaMessages := make([]kafkago.Message, len(messages)) for i, message := range messages { - key := []byte(message["key"]) - if keySchema != "" { - key = ToAvro(message["key"], keySchema) + + kafkaMessages[i] = kafkago.Message{} + + // If a key was provided, add it to the message. Keys are optional. + if _, has_key := message["key"]; has_key { + key := []byte(message["key"]) + if keySchema != "" { + key = ToAvro(message["key"], keySchema) + } + keyData, err := addMagicByteAndSchemaIdPrefix(configuration, key, writer.Stats().Topic, "key", keySchema) + if err != nil { + ReportError(err, "Creation of key bytes failed.") + return err + } + + kafkaMessages[i].Key = keyData } + // Then add then message value := []byte(message["value"]) if valueSchema != "" { value = ToAvro(message["value"], valueSchema) } - keyData, err := addMagicByteAndSchemaIdPrefix(configuration, key, writer.Stats().Topic, "key", keySchema) - if err != nil { - ReportError(err, "Creation of key bytes failed.") - return err - } valueData, err := addMagicByteAndSchemaIdPrefix(configuration, value, writer.Stats().Topic, "value", valueSchema) if err != nil { - ReportError(err, "Creation of key bytes failed.") + ReportError(err, "Creation of message bytes failed.") return err } - kafkaMessages[i] = kafkago.Message{ - Key: keyData, - Value: valueData, - } + + kafkaMessages[i].Value = valueData + } err = writer.WriteMessages(ctx, kafkaMessages...) diff --git a/scripts/test_avro_no_key.js b/scripts/test_avro_no_key.js new file mode 100644 index 0000000..d8bb4f2 --- /dev/null +++ b/scripts/test_avro_no_key.js @@ -0,0 +1,97 @@ +/* + +This is a k6 test script that imports the xk6-kafka and +tests Kafka by sending 200 Avro messages per iteration +without any associated key. +*/ + +import { check } from "k6"; +import { writer, produce, reader, consume, createTopic } from "k6/x/kafka"; // import kafka extension + +const bootstrapServers = ["localhost:9092"]; +const kafkaTopic = "xk6_kafka_avro_topic"; + +const producer = writer(bootstrapServers, kafkaTopic); +const consumer = reader(bootstrapServers, kafkaTopic); + +const valueSchema = JSON.stringify({ + type: "record", + name: "Value", + namespace: "dev.mostafa.xk6.kafka", + fields: [ + { + name: "name", + type: "string", + }, + { + name: "version", + type: "string", + }, + { + name: "author", + type: "string", + }, + { + name: "description", + type: "string", + }, + { + name: "url", + type: "string", + }, + { + name: "index", + type: "int", + }, + ], +}); + +createTopic(bootstrapServers[0], kafkaTopic); + +export default function () { + for (let index = 0; index < 100; index++) { + let messages = [ + { + value: JSON.stringify({ + name: "xk6-kafka", + version: "0.2.1", + author: "Mostafa Moradian", + description: + "k6 extension to load test Apache Kafka with support for Avro messages", + url: "https://mostafa.dev", + index: index, + }), + }, + { + value: JSON.stringify({ + name: "xk6-kafka", + version: "0.2.1", + author: "Mostafa Moradian", + description: + "k6 extension to load test Apache Kafka with support for Avro messages", + url: "https://mostafa.dev", + index: index, + }), + }, + ]; + let error = produce(producer, messages, null, valueSchema); + check(error, { + "is sent": (err) => err == undefined, + }); + } + + // Read 10 messages only + let rx_messages = consume(consumer, 10, null, valueSchema); + check(rx_messages, { + "10 messages returned": (msgs) => msgs.length == 10, + }); + + for (let index = 0; index < rx_messages.length; index++) { + console.debug('Received Message: ' + JSON.stringify(rx_messages[index])); + } +} + +export function teardown(data) { + producer.close(); + consumer.close(); +} diff --git a/scripts/test_avro_with_schema_registry_no_key.js b/scripts/test_avro_with_schema_registry_no_key.js new file mode 100644 index 0000000..8df9c59 --- /dev/null +++ b/scripts/test_avro_with_schema_registry_no_key.js @@ -0,0 +1,95 @@ +/* +This is a k6 test script that imports the xk6-kafka and +tests Kafka with a 100 Avro messages per iteration. +*/ + +import { check } from "k6"; +import { + writer, + reader, + consumeWithConfiguration, + produceWithConfiguration, + createTopic, +} from "k6/x/kafka"; // import kafka extension + +const bootstrapServers = ["localhost:9092"]; +const topic = "com.example.person"; + +const producer = writer(bootstrapServers, topic, null); +const consumer = reader(bootstrapServers, topic, null, "", null, null); + +const valueSchema = `{ + "name": "ValueSchema", + "type": "record", + "namespace": "com.example", + "fields": [ + { + "name": "firstname", + "type": "string" + }, + { + "name": "lastname", + "type": "string" + } + ] +}`; + +var configuration = JSON.stringify({ + consumer: { + keyDeserializer: "", + valueDeserializer: + "io.confluent.kafka.serializers.KafkaAvroDeserializer", + }, + producer: { + keySerializer: "", + valueSerializer: "io.confluent.kafka.serializers.KafkaAvroSerializer", + }, + schemaRegistry: { + url: "http://localhost:8081", + }, +}); + +createTopic(bootstrapServers[0], topic); + +export default function () { + for (let index = 0; index < 100; index++) { + let messages = [ + { + value: JSON.stringify({ + firstname: "firstname-" + index, + lastname: "lastname-" + index, + }), + }, + ]; + let error = produceWithConfiguration( + producer, + messages, + configuration, + null, + valueSchema + ); + check(error, { + "is sent": (err) => err == undefined, + }); + } + + let rx_messages = consumeWithConfiguration( + consumer, + 20, + configuration, + null, + valueSchema + ); + check(rx_messages, { + "20 message returned": (msgs) => msgs.length == 20, + }); + + for (let index = 0; index < rx_messages.length; index++) { + console.debug('Received Message: ' + JSON.stringify(rx_messages[index])); + } +} + +export function teardown(data) { + producer.close(); + consumer.close(); +}