Skip to content

Commit

Permalink
Merge pull request #6 from oozzal/add-message-topic
Browse files Browse the repository at this point in the history
add topic method to Kafka::Message
  • Loading branch information
stufro authored Mar 18, 2024
2 parents 7c1edfe + a22770d commit afb007b
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 5 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/lib/
/shard.lock
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ consumer = Kafka::Consumer.new({"bootstrap.servers" => "localhost:9092", "broker
consumer.subscribe("topic_name")
consumer.each do |message|
# message is an instance of Kafka::Message
puts message.payload
puts "#{String.new(message.topic)} -> #{String.new(message.payload)}
end
consumer.close
```
Expand Down
1 change: 1 addition & 0 deletions spec/integration/produce_consume_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ describe "Producing & Consuming" do
raise "message is nil" if message.nil?

String.new(message.payload).should eq({"foo" => "bar"}.to_json)
String.new(message.topic).should eq("foo")
ensure
consumer.try(&.close)
producer.try(&.flush)
Expand Down
11 changes: 7 additions & 4 deletions src/kafka/message.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ module Kafka
@err : RdKafka::Error?
@offset : Int64?
@timestamp : Int64?
@partition = LibRdKafka::PARTITION_UNASSIGNED
@rkt : LibRdKafka::Topic
getter err, offset, key, payload, partition, timestamp

def initialize(@payload : Bytes, @key : Bytes)
@partition = LibRdKafka::PARTITION_UNASSIGNED
end

def initialize(msg : LibRdKafka::Message)
if msg.err != LibRdKafka::OK
@err = RdKafka::Error.new(msg.err)
Expand All @@ -23,6 +21,11 @@ module Kafka
@partition = msg.partition
@offset = msg.offset
@timestamp = msg.timestamp
@rkt = msg.rkt
end

def topic
LibRdKafka.topic_name(@rkt)
end
end
end

0 comments on commit afb007b

Please sign in to comment.