-
Notifications
You must be signed in to change notification settings - Fork 72
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Allow producing messages without specifying a Key (#30)
* Allow producing messages without specifying a Key According to [1], Keys are optional. With this change, by not setting a key for a message it will be considered null. Example: ~~~ let messages = [ { value: JSON.stringify({ firstname: "firstname-" + index, lastname: "lastname-" + index, }), // Only value is set. The key field is not set. }, ]; let error = produceWithConfiguration( producer, messages, configuration, null, //!< Null for key schema, not used valueSchema ); ~~~ [1] https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets * Adding test scripts * Explicitly set 'nil' for .Key in message if key is not set * Revert "Explicitly set 'nil' for .Key in message if key is not set" This reverts commit fc959d4. * scripts: show consumer messages as debugs If --verbose is provided, messages received by the consumer as displayed to make clear that only 'value' is present. Example from test_avro_with_schema_registry_no_leys.js: DEBU[0000] Message: {"value":{"firstname":"firstname-2","lastname":"lastname-2"}} source=console
- Loading branch information
1 parent
f2f51c7
commit c7bd315
Showing
3 changed files
with
213 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
/* | ||
This is a k6 test script that imports the xk6-kafka and | ||
tests Kafka by sending 200 Avro messages per iteration | ||
without any associated key. | ||
*/ | ||
|
||
import { check } from "k6"; | ||
import { writer, produce, reader, consume, createTopic } from "k6/x/kafka"; // import kafka extension | ||
|
||
const bootstrapServers = ["localhost:9092"]; | ||
const kafkaTopic = "xk6_kafka_avro_topic"; | ||
|
||
const producer = writer(bootstrapServers, kafkaTopic); | ||
const consumer = reader(bootstrapServers, kafkaTopic); | ||
|
||
const valueSchema = JSON.stringify({ | ||
type: "record", | ||
name: "Value", | ||
namespace: "dev.mostafa.xk6.kafka", | ||
fields: [ | ||
{ | ||
name: "name", | ||
type: "string", | ||
}, | ||
{ | ||
name: "version", | ||
type: "string", | ||
}, | ||
{ | ||
name: "author", | ||
type: "string", | ||
}, | ||
{ | ||
name: "description", | ||
type: "string", | ||
}, | ||
{ | ||
name: "url", | ||
type: "string", | ||
}, | ||
{ | ||
name: "index", | ||
type: "int", | ||
}, | ||
], | ||
}); | ||
|
||
createTopic(bootstrapServers[0], kafkaTopic); | ||
|
||
export default function () { | ||
for (let index = 0; index < 100; index++) { | ||
let messages = [ | ||
{ | ||
value: JSON.stringify({ | ||
name: "xk6-kafka", | ||
version: "0.2.1", | ||
author: "Mostafa Moradian", | ||
description: | ||
"k6 extension to load test Apache Kafka with support for Avro messages", | ||
url: "https://mostafa.dev", | ||
index: index, | ||
}), | ||
}, | ||
{ | ||
value: JSON.stringify({ | ||
name: "xk6-kafka", | ||
version: "0.2.1", | ||
author: "Mostafa Moradian", | ||
description: | ||
"k6 extension to load test Apache Kafka with support for Avro messages", | ||
url: "https://mostafa.dev", | ||
index: index, | ||
}), | ||
}, | ||
]; | ||
let error = produce(producer, messages, null, valueSchema); | ||
check(error, { | ||
"is sent": (err) => err == undefined, | ||
}); | ||
} | ||
|
||
// Read 10 messages only | ||
let rx_messages = consume(consumer, 10, null, valueSchema); | ||
check(rx_messages, { | ||
"10 messages returned": (msgs) => msgs.length == 10, | ||
}); | ||
|
||
for (let index = 0; index < rx_messages.length; index++) { | ||
console.debug('Received Message: ' + JSON.stringify(rx_messages[index])); | ||
} | ||
} | ||
|
||
export function teardown(data) { | ||
producer.close(); | ||
consumer.close(); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
/* | ||
This is a k6 test script that imports the xk6-kafka and | ||
tests Kafka with a 100 Avro messages per iteration. | ||
*/ | ||
|
||
import { check } from "k6"; | ||
import { | ||
writer, | ||
reader, | ||
consumeWithConfiguration, | ||
produceWithConfiguration, | ||
createTopic, | ||
} from "k6/x/kafka"; // import kafka extension | ||
|
||
const bootstrapServers = ["localhost:9092"]; | ||
const topic = "com.example.person"; | ||
|
||
const producer = writer(bootstrapServers, topic, null); | ||
const consumer = reader(bootstrapServers, topic, null, "", null, null); | ||
|
||
const valueSchema = `{ | ||
"name": "ValueSchema", | ||
"type": "record", | ||
"namespace": "com.example", | ||
"fields": [ | ||
{ | ||
"name": "firstname", | ||
"type": "string" | ||
}, | ||
{ | ||
"name": "lastname", | ||
"type": "string" | ||
} | ||
] | ||
}`; | ||
|
||
var configuration = JSON.stringify({ | ||
consumer: { | ||
keyDeserializer: "", | ||
valueDeserializer: | ||
"io.confluent.kafka.serializers.KafkaAvroDeserializer", | ||
}, | ||
producer: { | ||
keySerializer: "", | ||
valueSerializer: "io.confluent.kafka.serializers.KafkaAvroSerializer", | ||
}, | ||
schemaRegistry: { | ||
url: "http://localhost:8081", | ||
}, | ||
}); | ||
|
||
createTopic(bootstrapServers[0], topic); | ||
|
||
export default function () { | ||
for (let index = 0; index < 100; index++) { | ||
let messages = [ | ||
{ | ||
value: JSON.stringify({ | ||
firstname: "firstname-" + index, | ||
lastname: "lastname-" + index, | ||
}), | ||
}, | ||
]; | ||
let error = produceWithConfiguration( | ||
producer, | ||
messages, | ||
configuration, | ||
null, | ||
valueSchema | ||
); | ||
check(error, { | ||
"is sent": (err) => err == undefined, | ||
}); | ||
} | ||
|
||
let rx_messages = consumeWithConfiguration( | ||
consumer, | ||
20, | ||
configuration, | ||
null, | ||
valueSchema | ||
); | ||
check(rx_messages, { | ||
"20 message returned": (msgs) => msgs.length == 20, | ||
}); | ||
|
||
for (let index = 0; index < rx_messages.length; index++) { | ||
console.debug('Received Message: ' + JSON.stringify(rx_messages[index])); | ||
} | ||
} | ||
|
||
export function teardown(data) { | ||
producer.close(); | ||
consumer.close(); | ||
} |