Skip to content

Commit 799153b

Browse files
author
wanghuaiyuan
committed
Add metrics to test accurately to the millisecond level
1 parent 27fece9 commit 799153b

File tree

5 files changed

+69
-9
lines changed

5 files changed

+69
-9
lines changed

store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1649,6 +1649,22 @@ public void setTimerPrecisionMs(int timerPrecisionMs) {
16491649
this.timerPrecisionMs = candidates[candidates.length - 1];
16501650
}
16511651

1652+
// visible for test
1653+
public void setTimerPrecision(int timerPrecisionMs) {
1654+
if (enableTimerMessageOnRocksDB) {
1655+
this.timerPrecisionMs = timerPrecisionMs;
1656+
return;
1657+
}
1658+
int[] candidates = {100, 200, 500, 1000};
1659+
for (int i = 1; i < candidates.length; i++) {
1660+
if (timerPrecisionMs < candidates[i]) {
1661+
this.timerPrecisionMs = candidates[i - 1];
1662+
return;
1663+
}
1664+
}
1665+
this.timerPrecisionMs = candidates[candidates.length - 1];
1666+
}
1667+
16521668
public int getTimerRollWindowSlot() {
16531669
return timerRollWindowSlot;
16541670
}

store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageKVStore.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public interface TimerMessageKVStore {
7676

7777
/**
7878
* Get the checkpoint of the timer message kv store.
79+
* Key : columnFamily byte[]; Value : checkpoint long.
7980
* @param columnFamily the column family of the timer message kv store.
8081
* @return the checkpoint of the timer message kv store.
8182
*/
@@ -86,5 +87,5 @@ public interface TimerMessageKVStore {
8687
* @param key the key of the metric.
8788
* @param update the value of the metric.
8889
*/
89-
void syncMetric(long key, long update);
90+
void syncMetric(long key, int update);
9091
}

store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStorage.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ public int getMetricSize(long lowerTime, long upperTime) {
238238

239239
try (ReadOptions readOptions = new ReadOptions()
240240
.setIterateLowerBound(new Slice(ByteBuffer.allocate(Long.BYTES).putLong(lowerTime).array()))
241-
.setIterateLowerBound(new Slice(ByteBuffer.allocate(Long.BYTES).putLong(upperTime).array()));
241+
.setIterateUpperBound(new Slice(ByteBuffer.allocate(Long.BYTES).putLong(upperTime).array()));
242242
RocksIterator iterator = db.newIterator(metricColumnFamilyHandle, readOptions)) {
243243
iterator.seek(ByteBuffer.allocate(Long.BYTES).putLong(lowerTime).array());
244244
while (iterator.isValid()) {
@@ -260,9 +260,14 @@ public long getCheckpoint(byte[] columnFamily) {
260260
}
261261

262262
@Override
263-
public void syncMetric(long key, long update) {
263+
public void syncMetric(long key, int update) {
264264
try {
265-
db.put(metricColumnFamilyHandle, ByteBuffer.allocate(8).putLong(key).array(), ByteBuffer.allocate(4).putInt((int) update).array());
265+
byte[] keyBytes = db.get(metricColumnFamilyHandle, ByteBuffer.allocate(8).putLong(key).array());
266+
if (keyBytes != null) {
267+
ByteBuffer oldValue = ByteBuffer.wrap(keyBytes);
268+
update = oldValue.getInt() + update;
269+
}
270+
db.put(metricColumnFamilyHandle, ByteBuffer.allocate(8).putLong(key).array(), ByteBuffer.allocate(4).putInt(update).array());
266271
} catch (RocksDBException e) {
267272
throw new RuntimeException("Sync metric to RocksDB error", e);
268273
}

store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStore.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ private void calcTimerDistribution() {
227227
int slotNumber = precisionMs;
228228
int rocksdbNumber = 0;
229229
for (int i = 0; i < this.slotSize; i++) {
230-
timerMetrics.updateDistPair(i, timerMessageKVStore.getMetricSize(rocksdbNumber, rocksdbNumber + slotNumber - 1));
230+
timerMetrics.updateDistPair(i, timerMessageKVStore.getMetricSize(rocksdbNumber, rocksdbNumber + slotNumber));
231231
rocksdbNumber += slotNumber;
232232
}
233233
}
@@ -436,12 +436,16 @@ public void run() {
436436
try {
437437
List<TimerMessageRecord> timerMessageRecord = dequeuePutQueue.poll(100L * precisionMs / 1000, TimeUnit.MILLISECONDS);
438438
int flag = 0;
439+
long delayTime = -1;
439440
if (null == timerMessageRecord || timerMessageRecord.isEmpty()) {
440441
continue;
441442
}
442443
for (TimerMessageRecord record : timerMessageRecord) {
443444
MessageExt msg = record.getMessageExt();
444445
MessageExtBrokerInner messageExtBrokerInner = convert(msg, record.isRoll());
446+
if (delayTime == -1) {
447+
delayTime = Long.parseLong(record.getMessageExt().getProperty(TIMER_OUT_MS));
448+
}
445449
flag = record.getMessageExt().getProperty(MessageConst.PROPERTY_TIMER_DEL_FLAG) == null ?
446450
0 : Integer.parseInt(record.getMessageExt().getProperty(MessageConst.PROPERTY_TIMER_DEL_FLAG));
447451
boolean processed = false;
@@ -470,6 +474,7 @@ public void run() {
470474
addMetric(msg, -1);
471475
addMetric((int) (Long.parseLong(msg.getProperty(TIMER_OUT_MS)) / precisionMs % slotSize), -1);
472476
}
477+
timerMessageKVStore.syncMetric(delayTime % metricsIntervalMs, -timerMessageRecord.size());
473478
timerMessageKVStore.deleteAssignRecords(getColumnFamily(flag), timerMessageRecord, timerMessageRecord.get(0).getReadOffset());
474479
} catch (InterruptedException e) {
475480
TimerMessageRocksDBStore.log.error("Error occurred in " + getServiceName(), e);
@@ -802,4 +807,12 @@ public long getEnqueueBehindMillis() {
802807
public long getDequeueBehindMillis() {
803808
return System.currentTimeMillis() - timerGetMessageServices.get(0).checkpoint;
804809
}
810+
811+
public TimerMessageKVStore getTimerMessageKVStore() {
812+
return timerMessageKVStore;
813+
}
814+
815+
public long getMetricsIntervalMs() {
816+
return metricsIntervalMs;
817+
}
805818
}

store/src/test/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStoreTest.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
public class TimerMessageRocksDBStoreTest {
6767
MessageStore messageStore;
6868
MessageStoreConfig storeConfig;
69-
int precisionMs = 500;
69+
int precisionMs = 1;
7070
AtomicInteger counter = new AtomicInteger(0);
7171
private SocketAddress bornHost;
7272
private SocketAddress storeHost;
@@ -80,6 +80,8 @@ public void setUp() throws Exception {
8080
storeConfig.setEnableTimerMessageOnRocksDB(true);
8181
storeConfig.setStorePathRootDir(baseDir);
8282
storeConfig.setStorePathCommitLog(baseDir + File.separator + "commitlog");
83+
storeConfig.setTimerPrecision(1);
84+
precisionMs = storeConfig.getTimerPrecisionMs();
8385
messageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("TimerTest",
8486
false), new MyMessageArrivingListener(), new BrokerConfig(), new ConcurrentHashMap<>());
8587

@@ -158,9 +160,7 @@ private PutMessageResult transformTimerMessage(TimerMessageRocksDBStore timerMes
158160
}
159161

160162
int timerPrecisionMs = storeConfig.getTimerPrecisionMs();
161-
if (deliverMs % timerPrecisionMs == 0) {
162-
deliverMs -= timerPrecisionMs;
163-
} else {
163+
if (deliverMs % timerPrecisionMs != 0) {
164164
deliverMs = deliverMs / timerPrecisionMs * timerPrecisionMs;
165165
}
166166

@@ -333,6 +333,31 @@ public void testExtractUniqueKey() {
333333
assertEquals("123456", TimerMessageStore.extractUniqueKey(deleteKey));
334334
}
335335

336+
@Test
337+
public void testGetTimerMetrics() {
338+
String topic = "TimerRocksdbTest_testGetTimerMetrics";
339+
TimerMessageRocksDBStore timerMessageStore = createTimerMessageRocksDBStore(null);
340+
timerMessageStore.load();
341+
timerMessageStore.start();
342+
storeConfig.setTimerStopDequeue(true);
343+
long delayMs = System.currentTimeMillis() + 3000;
344+
345+
for (int i = 0; i < 10; i++) {
346+
MessageExtBrokerInner inner = buildMessage(delayMs, topic, false);
347+
transformTimerMessage(timerMessageStore, inner);
348+
assertEquals(PutMessageStatus.PUT_OK, messageStore.putMessage(inner).getPutMessageStatus());
349+
}
350+
await().atMost(3000, TimeUnit.MILLISECONDS).until(() -> timerMessageStore.getTimerMessageKVStore().
351+
getMetricSize(delayMs / precisionMs * precisionMs % timerMessageStore.getMetricsIntervalMs(),
352+
delayMs / precisionMs * precisionMs % timerMessageStore.getMetricsIntervalMs() + precisionMs) == 10);
353+
354+
storeConfig.setTimerStopDequeue(false);
355+
await().atMost(3000, TimeUnit.MILLISECONDS).until(() -> timerMessageStore.getTimerMessageKVStore().
356+
getMetricSize(delayMs / precisionMs * precisionMs % timerMessageStore.getMetricsIntervalMs(),
357+
delayMs / precisionMs * precisionMs % timerMessageStore.getMetricsIntervalMs() + precisionMs) == 0);
358+
timerMessageStore.shutdown();
359+
}
360+
336361
private class MyMessageArrivingListener implements MessageArrivingListener {
337362
@Override
338363
public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,

0 commit comments

Comments
 (0)