Skip to content

Commit

Permalink
Fix Remove entries permanently from stream once consumed
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
  • Loading branch information
pditommaso committed Sep 17, 2024
1 parent 268c76e commit adfad9d
Showing 1 changed file with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,12 @@ class RedisMessageStream implements MessageStream<String> {
try (Jedis jedis = pool.getResource()) {
final entry = claimMessage(jedis,streamId) ?: readMessage(jedis, streamId)
if( entry && consumer.accept(entry.getFields().get(DATA_FIELD)) ) {
// acknowledge the job after processing
final tx = jedis.multi()
// acknowledge the entry has been processed so that it cannot be claimed anymore
tx.xack(streamId, CONSUMER_GROUP_NAME, entry.getID())
// this remove permanently the entry from the stream
jedis.xack(streamId, CONSUMER_GROUP_NAME, entry.getID())
tx.xdel(streamId, entry.getID())
tx.exec()
return true
}
else
Expand Down

0 comments on commit adfad9d

Please sign in to comment.