From e6d951167dc5556d368d17727f03a153f3d2cab5 Mon Sep 17 00:00:00 2001 From: David Bull Date: Wed, 18 Dec 2024 12:36:53 +0000 Subject: [PATCH] feat: Update Consumer#poll to raise an exception when message is an error The message object returned by `LibRdKafka.consumer_poll` can be a proper message or an event or error. In the case of an error we will now raise a ConsumerException by default. The previous behaviour can be maintained by passing `raise_on_error: false`. --- CHANGELOG.md | 3 ++ spec/integration/consumer_errors_spec.cr | 53 ++++++++++++++++++++++++ spec/integration/produce_consume_spec.cr | 4 +- src/kafka/consumer.cr | 10 +++-- 4 files changed, 65 insertions(+), 5 deletions(-) create mode 100644 spec/integration/consumer_errors_spec.cr diff --git a/CHANGELOG.md b/CHANGELOG.md index c301709..a1c65e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,9 @@ # Changelog ## [Unreleased] +### Changed +- Update `Kafka::Consumer#poll` and `Kafka::Consumer#each` to automatically raise a `Kafka::ConsumerException` if the + message is an error. Pass `raise_on_error: false` to maintain the previous behaviour. ## v0.5.0 - 2024-03-18 ### Added diff --git a/spec/integration/consumer_errors_spec.cr b/spec/integration/consumer_errors_spec.cr new file mode 100644 index 0000000..2a5bf51 --- /dev/null +++ b/spec/integration/consumer_errors_spec.cr @@ -0,0 +1,53 @@ +require "../spec_helper" + +describe "Consumer error handling" do + describe "#poll" do + it "raises errors by default" do + consumer = Kafka::Consumer.new({"bootstrap.servers" => "127.0.0.1:9094", "group.id" => "foo_group", "broker.address.family" => "v4"}) + consumer.subscribe("non-existent-topic") + + expect_raises(Kafka::ConsumerException, "librdkafka error - Broker: Unknown topic or partition") do + consumer.poll(timeout_ms: 10_000) + end + ensure + consumer.try(&.close) + end + + it "doesn't raise errors when raise_on_error is false" do + consumer = Kafka::Consumer.new({"bootstrap.servers" => "127.0.0.1:9094", "group.id" => "foo_group", "broker.address.family" => "v4"}) + consumer.subscribe("non-existent-topic") + + message = consumer.poll(timeout_ms: 10_000, raise_on_error: false) + message.not_nil!.err.not_nil!.message.should eq "Broker: Unknown topic or partition" + ensure + consumer.try(&.close) + end + end + + describe "#each" do + it "raises errors by default" do + consumer = Kafka::Consumer.new({"bootstrap.servers" => "127.0.0.1:9094", "group.id" => "foo_group", "broker.address.family" => "v4"}) + consumer.subscribe("non-existent-topic") + + expect_raises(Kafka::ConsumerException, "librdkafka error - Broker: Unknown topic or partition") do + consumer.each(timeout: 1000) do |_message| + break + end + end + ensure + consumer.try(&.close) + end + + it "doesn't raise errors when raise_on_error is false" do + consumer = Kafka::Consumer.new({"bootstrap.servers" => "127.0.0.1:9094", "group.id" => "foo_group", "broker.address.family" => "v4"}) + consumer.subscribe("non-existent-topic") + + consumer.each(timeout: 1000, raise_on_error: false) do |message| + message.err.not_nil!.message.should eq "Broker: Unknown topic or partition" + break + end + ensure + consumer.try(&.close) + end + end +end diff --git a/spec/integration/produce_consume_spec.cr b/spec/integration/produce_consume_spec.cr index a109a23..d363fca 100644 --- a/spec/integration/produce_consume_spec.cr +++ b/spec/integration/produce_consume_spec.cr @@ -12,9 +12,9 @@ describe "Producing & Consuming" do iterations += 1 print "." producer.produce(topic: "foo", payload: {"foo" => "bar"}.to_json.to_slice) - producer.flush + producer.flush(timeout: 10_000) - message = consumer.poll(1000) + message = consumer.poll(timeout_ms: 10_000) break if !message.nil? || iterations >= 10 end diff --git a/src/kafka/consumer.cr b/src/kafka/consumer.cr index 2fddc68..77cd032 100644 --- a/src/kafka/consumer.cr +++ b/src/kafka/consumer.cr @@ -40,22 +40,26 @@ module Kafka # Poll the consumer for messages or events. # # Calls the `rd_kafka_consumer_poll` C function. - def poll(timeout_ms : Int32) : Message? + def poll(timeout_ms : Int32, raise_on_error : Bool = true) : Message? message_ptr = LibRdKafka.consumer_poll(@handle, timeout_ms) return if message_ptr.null? message = Message.new(message_ptr.value) LibRdKafka.message_destroy(message_ptr) + if raise_on_error && (err = message.err) + raise ConsumerException.new(err.message) + end + message end # Loops indefinitely calling `#poll` at the given interval `timeout`. # # At the beginning of each loop, `Fiber.yield` is called allow other Fibers to run. - def each(timeout = 250) + def each(timeout = 250, raise_on_error = true, &) loop do Fiber.yield - resp = poll(timeout) + resp = poll(timeout, raise_on_error) next if resp.nil? yield resp break unless @running