Skip to content

Commit

Permalink
feat: Update Consumer#poll to raise an exception when message is an e…
Browse files Browse the repository at this point in the history
…rror

The message object returned by `LibRdKafka.consumer_poll` can be a
proper message or an event or error. In the case of an error we will now
raise a ConsumerException by default. The previous behaviour can be
maintained by passing `raise_on_error: false`.

Also increase timeouts in integration tests for Github CI.
  • Loading branch information
ukdave committed Dec 18, 2024
1 parent a322616 commit 92d35fe
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 6 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Changelog

## [Unreleased]
### Changed
- Update `Kafka::Consumer#poll` and `Kafka::Consumer#each` to automatically raise a `Kafka::ConsumerException` if the
message is an error. Pass `raise_on_error: false` to maintain the previous behaviour.

## v0.5.0 - 2024-03-18
### Added
Expand Down
53 changes: 53 additions & 0 deletions spec/integration/consumer_errors_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
require "../spec_helper"

describe "Consumer error handling" do
describe "#poll" do
it "raises errors by default" do
consumer = Kafka::Consumer.new({"bootstrap.servers" => "127.0.0.1:9094", "group.id" => "foo_group", "broker.address.family" => "v4"})
consumer.subscribe("non-existent-topic")

expect_raises(Kafka::ConsumerException, "librdkafka error - Broker: Unknown topic or partition") do
consumer.poll(timeout_ms: 10_000)
end
ensure
consumer.try(&.close)
end

it "doesn't raise errors when raise_on_error is false" do
consumer = Kafka::Consumer.new({"bootstrap.servers" => "127.0.0.1:9094", "group.id" => "foo_group", "broker.address.family" => "v4"})
consumer.subscribe("non-existent-topic")

message = consumer.poll(timeout_ms: 10_000, raise_on_error: false)
message.not_nil!.err.not_nil!.message.should eq "Broker: Unknown topic or partition"
ensure
consumer.try(&.close)
end
end

describe "#each" do
it "raises errors by default" do
consumer = Kafka::Consumer.new({"bootstrap.servers" => "127.0.0.1:9094", "group.id" => "foo_group", "broker.address.family" => "v4"})
consumer.subscribe("non-existent-topic")

expect_raises(Kafka::ConsumerException, "librdkafka error - Broker: Unknown topic or partition") do
consumer.each(timeout: 1000) do |_message|
break
end
end
ensure
consumer.try(&.close)
end

it "doesn't raise errors when raise_on_error is false" do
consumer = Kafka::Consumer.new({"bootstrap.servers" => "127.0.0.1:9094", "group.id" => "foo_group", "broker.address.family" => "v4"})
consumer.subscribe("non-existent-topic")

consumer.each(timeout: 1000, raise_on_error: false) do |message|
message.err.not_nil!.message.should eq "Broker: Unknown topic or partition"
break
end
ensure
consumer.try(&.close)
end
end
end
6 changes: 3 additions & 3 deletions spec/integration/produce_consume_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ describe "Producing & Consuming" do
iterations += 1
print "."
producer.produce(topic: "foo", payload: {"foo" => "bar"}.to_json.to_slice)
producer.flush
producer.flush(timeout: 10_000)

message = consumer.poll(1000)
break if !message.nil? || iterations >= 10
message = consumer.poll(timeout_ms: 10_000, raise_on_error: false)
break if !message.nil? && message.err.nil? || iterations >= 10
end

raise "message is nil" if message.nil?
Expand Down
10 changes: 7 additions & 3 deletions src/kafka/consumer.cr
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,26 @@ module Kafka
# Poll the consumer for messages or events.
#
# Calls the `rd_kafka_consumer_poll` C function.
def poll(timeout_ms : Int32) : Message?
def poll(timeout_ms : Int32, raise_on_error : Bool = true) : Message?
message_ptr = LibRdKafka.consumer_poll(@handle, timeout_ms)
return if message_ptr.null?

message = Message.new(message_ptr.value)
LibRdKafka.message_destroy(message_ptr)
if raise_on_error && (err = message.err)
raise ConsumerException.new(err.message)
end

message
end

# Loops indefinitely calling `#poll` at the given interval `timeout`.
#
# At the beginning of each loop, `Fiber.yield` is called allow other Fibers to run.
def each(timeout = 250)
def each(timeout = 250, raise_on_error = true, &)
loop do
Fiber.yield
resp = poll(timeout)
resp = poll(timeout, raise_on_error)
next if resp.nil?
yield resp
break unless @running
Expand Down

0 comments on commit 92d35fe

Please sign in to comment.