Skip to content

Commit

Permalink
Merge pull request #38 from Nosto/acknowledging-dequeued-message-to-r…
Browse files Browse the repository at this point in the history
…eturn-whether-message-existed-or-not

MultitenantQueue.delete to return boolean value indicating whether deleted message did exist in queue or not
  • Loading branch information
jari-kujansuu authored Nov 9, 2023
2 parents 5a210eb + f4e8e73 commit fd0f931
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 11 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

group 'com.nosto'
version '2.1.6'
version '2.1.7'

java {
sourceCompatibility "11"
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/com/nosto/redis/queue/AbstractScript.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
* Java wrapper for executing a Redis script.
*/
abstract class AbstractScript {
private final Long TRUE = 1L;

static byte[] bytes(String string) {
return string.getBytes(StandardCharsets.UTF_8);
}
Expand Down Expand Up @@ -128,9 +130,10 @@ EnqueueResult enqueue(Instant now, Duration invisiblePeriod, String queue, Tenan
* @param queue The name of the queue.
* @param tenant The tenant to whom the message belongs.
* @param key The de-duplication key of the message to be acked.
* @return true if acked message did exist in queue and false otherwise
*/
void ack(String queue, String tenant, String key) {
call(Function.ACK, slot(tenant), bytes(queue), bytes(tenant), bytes(key));
boolean ack(String queue, String tenant, String key) {
return TRUE.equals(call(Function.ACK, slot(tenant), bytes(queue), bytes(tenant), bytes(key)));
}

/**
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/com/nosto/redis/queue/MultitenantQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,15 @@ public List<TenantMessage> dequeue(Duration messageInvisibilityPeriod, int maxim
*
* @param tenant The tenant associated with the message.
* @param messageKey The key that identifies the message.
* @return true if deleted message did exist in queue and false otherwise
* If processing of dequeued message takes more time than defined invisibility period
* then same message can be dequeued again after invisibility period has passed. This
* can result to situation where multiple workers read same message and try to delete it
* after processing. If deleted message does not exist it indicates that it was dequeued
* and deleted already before meaning that invisibility period might be too short.
*/
public void delete(String tenant, String messageKey) {
redisScript.ack(queueName, tenant, messageKey);
public boolean delete(String tenant, String messageKey) {
return redisScript.ack(queueName, tenant, messageKey);
}

/**
Expand Down
3 changes: 2 additions & 1 deletion src/main/resources/queue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,12 @@ function public.ack(slot, queue, tenant, key)
local invisible_key = private.invisible_key(slot, queue, tenant)
local payload_key = private.payload_key(slot, queue, tenant)
redis.call("zrem", private.visible_key(slot, queue, tenant), key)
redis.call("hdel", payload_key, key)
local deleted_messages = tonumber(redis.call("hdel", payload_key, key))
redis.call("zrem", invisible_key, key)
if redis.call("hlen", payload_key) == 0 then
redis.call("zrem", private.schedule_key(slot, queue), tenant)
end
return deleted_messages == 1
end

function public.queuestats(slot, queue)
Expand Down
5 changes: 4 additions & 1 deletion src/test/java/com/nosto/redis/queue/LowLevelScriptTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,11 @@ public void ack() {
"q1",
new TenantMessage("t1", "foo", "bar".getBytes(StandardCharsets.UTF_8)));
dequeueAndAssert(Instant.EPOCH.plusSeconds(1), "q1", Duration.ZERO, "bar");
script.ack("q1", "t1", "foo");
// Acking message should return true indicating that message did exist in queue.
assertTrue(script.ack("q1", "t1", "foo"));
dequeueAndAssert(Instant.EPOCH.plusSeconds(2), "q1", Duration.ZERO);
// Acking same message again should return false indicating that message did not exist in queue anymore.
assertFalse(script.ack("q1", "t1", "foo"));
}

@Test
Expand Down
10 changes: 6 additions & 4 deletions src/test/java/com/nosto/redis/queue/MultitenantQueueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@
*/
package com.nosto.redis.queue;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
Expand All @@ -21,6 +18,8 @@

import org.junit.Test;

import static org.junit.Assert.*;

public class MultitenantQueueTest extends AbstractScriptTest {

@Test
Expand All @@ -35,7 +34,10 @@ public void delete() {
assertEquals(1, qStats.size());
assertEquals(new TenantStatistics("t1", 0, 2), qStats.get("t1"));

queue.delete("t1", "k1");
// Deleting message should return true indicating that message did exist in queue.
assertTrue(queue.delete("t1", "k1"));
// Deleting same message again should return false indicating that message did not exist in queue anymore.
assertFalse(queue.delete("t1", "k1"));

qStats = queue.getStatistics()
.getTenantStatistics();
Expand Down

0 comments on commit fd0f931

Please sign in to comment.