From e06e81e5ac8aabe8d7aed01363b1ff2cc0c8f720 Mon Sep 17 00:00:00 2001 From: Alex Zhukov Date: Sat, 14 Oct 2017 20:03:36 -0700 Subject: [PATCH] data corruption test case --- src/main/java/rtalk/RTalk.java | 23 +++++++++++++++++++---- src/test/java/rtalk/RTalkTest.java | 26 ++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/src/main/java/rtalk/RTalk.java b/src/main/java/rtalk/RTalk.java index d05733f..88b1fe5 100644 --- a/src/main/java/rtalk/RTalk.java +++ b/src/main/java/rtalk/RTalk.java @@ -1,6 +1,7 @@ package rtalk; import static java.util.UUID.randomUUID; +import static java.util.logging.Level.SEVERE; import static java.util.stream.Collectors.toMap; import java.io.PrintWriter; @@ -11,12 +12,15 @@ import java.util.UUID; import java.util.function.Consumer; import java.util.function.Function; +import java.util.logging.Logger; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.Transaction; public class RTalk { + //java util logging is used here to introduce no external dependencies + private final static Logger log = Logger.getLogger(RTalk.class.getName()); public static final String KICKED = "KICKED"; public static final String DELETED = "DELETED"; @@ -212,7 +216,8 @@ protected Response on(Response response) { } private static final String fTube = "tube"; - private static final String fState = "state"; + //VisibleForTesting + static final String fState = "state"; private static final String fPriority = "pri"; private static final String fReserves = "reserves"; private static final String fCtime = "ctime"; @@ -225,7 +230,8 @@ protected Response on(Response response) { private static final String fBuryReason = "error"; - private String kJob(String id) { + //VisibleForTesting + String kJob(String id) { return tube + "_" + id; } @@ -393,10 +399,19 @@ public synchronized Response reserve(long blockTimeoutMsec) { Optional firstJob_ = ids .stream() .map(id -> _getJob(r, id)) - .filter(j -> j != null && !Job.BURIED.equals(j.state)) + .filter(j -> { + boolean ok = j != null && !Job.BURIED.equals(j.state); + if (j == null) { + log.log(SEVERE, "data corruption detected on tube " + tube + " job == null in ready queue " + + kReadyQueue); + } else if (Job.BURIED.equals(j.state)) { + log.log(SEVERE, "data corruption detected on tube " + tube + " job " + j.id + + " is BURIED but in ready queue " + kReadyQueue); + } + return ok; + }) .findFirst(); if (!firstJob_.isPresent() && (readyQueueSize != 0 || toLong(r.zcard(kReadyQueue)) != 0)) { - System.err.println("data corruption detected on tube " + tube); ids = r.zrange(kReadyQueue, 0, -1); firstJob_ = ids .stream() diff --git a/src/test/java/rtalk/RTalkTest.java b/src/test/java/rtalk/RTalkTest.java index 7b622c4..79508f1 100644 --- a/src/test/java/rtalk/RTalkTest.java +++ b/src/test/java/rtalk/RTalkTest.java @@ -14,6 +14,7 @@ import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.junit.Before; @@ -232,4 +233,29 @@ public void testTouchKeepsJobReserved() throws Exception { Job statsJob2 = rt.statsJob(put.id); assertEquals(Job.RESERVED, statsJob2.state); } + + @Test + public void testDataCorruption() throws Exception { + RTalk rt = new RTalk(jedisPool); + Response a = rt.put(0, 0, 1000, "a"); + Response b = rt.put(1, 0, 1000, "b"); + + //emulate data corruption + jedisPool.getResource().del(rt.kJob(a.id)); + Response reserve = rt.reserve(); + assertEquals(b.id, reserve.id); + } + + @Test + public void testDataCorruptionBuried() throws Exception { + RTalk rt = new RTalk(jedisPool); + Response a = rt.put(0, 0, 1000, "a"); + Response b = rt.put(1, 0, 1000, "b"); + + //emulate data corruption + jedisPool.getResource().hset(rt.kJob(a.id), RTalk.fState, Job.BURIED); + + Response reserve = rt.reserve(); + assertEquals(b.id, reserve.id); + } }