Skip to content

Commit 0e804df

Browse files
committed
merge develop
1 parent b12664b commit 0e804df

File tree

9 files changed

+69
-65
lines changed

9 files changed

+69
-65
lines changed

broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConfigManager.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@
1616
*/
1717
package org.apache.rocketmq.broker.config.v1;
1818

19-
import com.alibaba.fastjson.JSON;
20-
import java.nio.charset.Charset;
21-
import java.nio.charset.StandardCharsets;
22-
import java.util.function.BiConsumer;
19+
import com.alibaba.fastjson2.JSON;
2320
import org.apache.commons.lang3.StringUtils;
2421
import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
2522
import org.apache.rocketmq.common.constant.LoggerName;
@@ -33,6 +30,10 @@
3330
import org.rocksdb.Statistics;
3431
import org.rocksdb.WriteBatch;
3532

33+
import java.nio.charset.Charset;
34+
import java.nio.charset.StandardCharsets;
35+
import java.util.function.BiConsumer;
36+
3637
public class RocksDBConfigManager {
3738
protected static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
3839

broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,8 @@
1616
*/
1717
package org.apache.rocketmq.broker.config.v1;
1818

19-
import com.alibaba.fastjson.JSON;
20-
import com.alibaba.fastjson.serializer.SerializerFeature;
21-
import java.nio.file.Path;
22-
import java.nio.file.Paths;
23-
import java.util.Map.Entry;
24-
import java.util.concurrent.ConcurrentMap;
19+
import com.alibaba.fastjson2.JSON;
20+
import com.alibaba.fastjson2.JSONWriter;
2521
import org.apache.commons.lang3.StringUtils;
2622
import org.apache.rocketmq.broker.BrokerController;
2723
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
@@ -35,6 +31,11 @@
3531
import org.rocksdb.CompressionType;
3632
import org.rocksdb.WriteBatch;
3733

34+
import java.nio.file.Path;
35+
import java.nio.file.Paths;
36+
import java.util.Map.Entry;
37+
import java.util.concurrent.ConcurrentMap;
38+
3839
public class RocksDBConsumerOffsetManager extends ConsumerOffsetManager {
3940

4041
protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -184,7 +185,7 @@ private void putWriteBatch(final WriteBatch writeBatch, final String topicGroupN
184185
byte[] keyBytes = topicGroupName.getBytes(DataConverter.CHARSET_UTF8);
185186
RocksDBOffsetSerializeWrapper wrapper = new RocksDBOffsetSerializeWrapper();
186187
wrapper.setOffsetTable(offsetMap);
187-
byte[] valueBytes = JSON.toJSONBytes(wrapper, SerializerFeature.BrowserCompatible);
188+
byte[] valueBytes = JSON.toJSONBytes(wrapper, JSONWriter.Feature.BrowserCompatible);
188189
rocksDBConfigManager.writeBatchPutOperation(writeBatch, keyBytes, valueBytes);
189190
}
190191

broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManager.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,9 @@
1616
*/
1717
package org.apache.rocketmq.broker.config.v1;
1818

19-
import com.alibaba.fastjson.JSON;
20-
import com.alibaba.fastjson.JSONObject;
21-
import com.alibaba.fastjson.serializer.SerializerFeature;
22-
import java.nio.file.Path;
23-
import java.nio.file.Paths;
24-
import java.util.Map;
25-
import java.util.Set;
26-
import java.util.concurrent.ConcurrentHashMap;
27-
import java.util.concurrent.ConcurrentMap;
28-
import java.util.function.BiConsumer;
19+
import com.alibaba.fastjson2.JSON;
20+
import com.alibaba.fastjson2.JSONObject;
21+
import com.alibaba.fastjson2.JSONWriter;
2922
import org.apache.commons.lang3.StringUtils;
3023
import org.apache.rocketmq.broker.BrokerController;
3124
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
@@ -35,6 +28,14 @@
3528
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
3629
import org.rocksdb.CompressionType;
3730

31+
import java.nio.file.Path;
32+
import java.nio.file.Paths;
33+
import java.util.Map;
34+
import java.util.Set;
35+
import java.util.concurrent.ConcurrentHashMap;
36+
import java.util.concurrent.ConcurrentMap;
37+
import java.util.function.BiConsumer;
38+
3839
public class RocksDBSubscriptionGroupManager extends SubscriptionGroupManager {
3940

4041
protected transient RocksDBConfigManager rocksDBConfigManager;
@@ -162,7 +163,7 @@ public SubscriptionGroupConfig putSubscriptionGroupConfig(SubscriptionGroupConfi
162163

163164
try {
164165
byte[] keyBytes = groupName.getBytes(RocksDBConfigManager.CHARSET);
165-
byte[] valueBytes = JSON.toJSONBytes(subscriptionGroupConfig, SerializerFeature.BrowserCompatible);
166+
byte[] valueBytes = JSON.toJSONBytes(subscriptionGroupConfig, JSONWriter.Feature.BrowserCompatible);
166167
this.rocksDBConfigManager.put(keyBytes, valueBytes);
167168
} catch (Exception e) {
168169
log.error("kv put sub Failed, {}", subscriptionGroupConfig.toString());
@@ -177,7 +178,7 @@ protected SubscriptionGroupConfig putSubscriptionGroupConfigIfAbsent(Subscriptio
177178
if (oldConfig == null) {
178179
try {
179180
byte[] keyBytes = groupName.getBytes(RocksDBConfigManager.CHARSET);
180-
byte[] valueBytes = JSON.toJSONBytes(subscriptionGroupConfig, SerializerFeature.BrowserCompatible);
181+
byte[] valueBytes = JSON.toJSONBytes(subscriptionGroupConfig, JSONWriter.Feature.BrowserCompatible);
181182
this.rocksDBConfigManager.put(keyBytes, valueBytes);
182183
} catch (Exception e) {
183184
log.error("kv put sub Failed, {}", subscriptionGroupConfig.toString());

broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManager.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,8 @@
1616
*/
1717
package org.apache.rocketmq.broker.config.v1;
1818

19-
import com.alibaba.fastjson.JSON;
20-
import com.alibaba.fastjson.serializer.SerializerFeature;
21-
import java.nio.file.Path;
22-
import java.nio.file.Paths;
23-
import java.util.Map;
24-
import java.util.concurrent.ConcurrentMap;
19+
import com.alibaba.fastjson2.JSON;
20+
import com.alibaba.fastjson2.JSONWriter;
2521
import org.apache.commons.lang3.StringUtils;
2622
import org.apache.rocketmq.broker.BrokerController;
2723
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
@@ -32,6 +28,11 @@
3228
import org.apache.rocketmq.remoting.protocol.DataVersion;
3329
import org.rocksdb.CompressionType;
3430

31+
import java.nio.file.Path;
32+
import java.nio.file.Paths;
33+
import java.util.Map;
34+
import java.util.concurrent.ConcurrentMap;
35+
3536
public class RocksDBTopicConfigManager extends TopicConfigManager {
3637
private static final String VERSION_COLUMN_FAMILY = "topicVersion";
3738
private static final String TOPIC_COLUMN_FAMILY = "topic";
@@ -142,7 +143,7 @@ public TopicConfig putTopicConfig(TopicConfig topicConfig) {
142143
TopicConfig oldTopicConfig = this.topicConfigTable.put(topicName, topicConfig);
143144
try {
144145
byte[] keyBytes = topicName.getBytes(DataConverter.CHARSET_UTF8);
145-
byte[] valueBytes = JSON.toJSONBytes(topicConfig, SerializerFeature.BrowserCompatible);
146+
byte[] valueBytes = JSON.toJSONBytes(topicConfig, JSONWriter.Feature.BrowserCompatible);
146147
this.rocksDBConfigManager.put(keyBytes, valueBytes);
147148
} catch (Exception e) {
148149
log.error("kv put topic Failed, {}", topicConfig.toString(), e);

broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,13 @@
1616
*/
1717
package org.apache.rocketmq.broker.processor;
1818

19-
import com.alibaba.fastjson.JSON;
19+
import com.alibaba.fastjson2.JSON;
2020
import com.github.benmanes.caffeine.cache.Cache;
2121
import io.netty.channel.Channel;
2222
import io.netty.channel.ChannelFutureListener;
2323
import io.netty.channel.ChannelHandlerContext;
2424
import io.netty.channel.FileRegion;
2525
import io.opentelemetry.api.common.Attributes;
26-
import java.nio.ByteBuffer;
27-
import java.nio.charset.StandardCharsets;
28-
import java.util.Iterator;
29-
import java.util.List;
30-
import java.util.Map;
31-
import java.util.Map.Entry;
32-
import java.util.Random;
33-
import java.util.concurrent.CompletableFuture;
34-
import java.util.concurrent.ConcurrentHashMap;
35-
import java.util.concurrent.ConcurrentSkipListSet;
36-
import java.util.concurrent.TimeUnit;
37-
import java.util.concurrent.atomic.AtomicBoolean;
38-
import java.util.concurrent.atomic.AtomicLong;
3926
import org.apache.rocketmq.broker.BrokerController;
4027
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
4128
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
@@ -44,7 +31,6 @@
4431
import org.apache.rocketmq.broker.longpolling.PollingResult;
4532
import org.apache.rocketmq.broker.longpolling.PopLongPollingService;
4633
import org.apache.rocketmq.broker.longpolling.PopRequest;
47-
4834
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
4935
import org.apache.rocketmq.broker.pop.PopConsumerContext;
5036
import org.apache.rocketmq.common.BrokerConfig;
@@ -90,6 +76,20 @@
9076
import org.apache.rocketmq.store.pop.BatchAckMsg;
9177
import org.apache.rocketmq.store.pop.PopCheckPoint;
9278

79+
import java.nio.ByteBuffer;
80+
import java.nio.charset.StandardCharsets;
81+
import java.util.Iterator;
82+
import java.util.List;
83+
import java.util.Map;
84+
import java.util.Map.Entry;
85+
import java.util.Random;
86+
import java.util.concurrent.CompletableFuture;
87+
import java.util.concurrent.ConcurrentHashMap;
88+
import java.util.concurrent.ConcurrentSkipListSet;
89+
import java.util.concurrent.TimeUnit;
90+
import java.util.concurrent.atomic.AtomicBoolean;
91+
import java.util.concurrent.atomic.AtomicLong;
92+
9393
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
9494
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_RETRY;
9595
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;

broker/src/test/java/org/apache/rocketmq/broker/RocksDBConfigManagerTest.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,14 @@
1717

1818
package org.apache.rocketmq.broker;
1919

20-
import com.alibaba.fastjson2.JSON;
20+
import org.apache.rocketmq.broker.config.v1.RocksDBConfigManager;
2121
import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
2222
import org.apache.rocketmq.remoting.protocol.DataVersion;
2323
import org.junit.Before;
2424
import org.junit.Test;
2525

26-
import java.nio.charset.StandardCharsets;
27-
2826
import static org.junit.Assert.assertEquals;
2927
import static org.junit.Assert.assertTrue;
30-
import static org.mockito.ArgumentMatchers.eq;
3128
import static org.mockito.Mockito.spy;
3229
import static org.mockito.Mockito.times;
3330
import static org.mockito.Mockito.verify;
@@ -51,10 +48,8 @@ public void setUp() throws IllegalAccessException {
5148
public void testLoadDataVersion() throws Exception {
5249
DataVersion expected = new DataVersion();
5350
expected.nextVersion();
54-
String jsonData = JSON.toJSONString(expected);
55-
byte[] mockDataVersion = jsonData.getBytes(StandardCharsets.UTF_8);
5651

57-
when(rocksDBConfigManager.configRocksDBStorage.getKvDataVersion()).thenReturn(mockDataVersion);
52+
when(rocksDBConfigManager.getKvDataVersion()).thenReturn(expected);
5853

5954
boolean result = rocksDBConfigManager.loadDataVersion();
6055

@@ -67,9 +62,6 @@ public void testLoadDataVersion() throws Exception {
6762
public void testUpdateKvDataVersion() throws Exception {
6863
rocksDBConfigManager.updateKvDataVersion();
6964

70-
DataVersion expectedDataVersion = rocksDBConfigManager.getKvDataVersion();
71-
verify(rocksDBConfigManager.configRocksDBStorage, times(1)).updateKvDataVersion(
72-
eq(JSON.toJSONString(expectedDataVersion).getBytes(StandardCharsets.UTF_8))
73-
);
65+
verify(rocksDBConfigManager, times(1)).updateKvDataVersion();
7466
}
7567
}

broker/src/test/java/org/apache/rocketmq/broker/config/v1/RocksDBSubscriptionGroupManagerTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import org.apache.commons.lang3.reflect.FieldUtils;
2020
import org.apache.rocketmq.broker.BrokerController;
21-
import org.apache.rocketmq.broker.RocksDBConfigManager;
21+
import org.apache.rocketmq.common.BrokerConfig;
2222
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
2323
import org.apache.rocketmq.store.config.MessageStoreConfig;
2424
import org.junit.Before;
@@ -33,6 +33,7 @@
3333
import static org.junit.Assert.assertTrue;
3434
import static org.junit.jupiter.api.Assertions.assertEquals;
3535
import static org.junit.jupiter.api.Assertions.assertNull;
36+
import static org.mockito.Mockito.mock;
3637
import static org.mockito.Mockito.when;
3738

3839
@RunWith(MockitoJUnitRunner.class)
@@ -55,6 +56,9 @@ public void init() throws IllegalAccessException {
5556
when(messageStoreConfig.getMemTableFlushIntervalMs()).thenReturn(1000L);
5657
when(messageStoreConfig.getRocksdbCompressionType()).thenReturn("LZ4_COMPRESSION");
5758
when(messageStoreConfig.getStorePathRootDir()).thenReturn("/");
59+
BrokerConfig brokerConfig = mock(BrokerConfig.class);
60+
when(brokerConfig.isUseSingleRocksDBForAllConfigs()).thenReturn(true);
61+
when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
5862
rocksDBSubscriptionGroupManager = new RocksDBSubscriptionGroupManager(brokerController);
5963
FieldUtils.writeDeclaredField(rocksDBSubscriptionGroupManager, "rocksDBConfigManager", rocksDBConfigManager, true);
6064
}

broker/src/test/java/org/apache/rocketmq/broker/config/v1/RocksDBTopicConfigManagerTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import org.apache.commons.lang3.reflect.FieldUtils;
2020
import org.apache.rocketmq.broker.BrokerController;
21-
import org.apache.rocketmq.broker.RocksDBConfigManager;
21+
import org.apache.rocketmq.common.BrokerConfig;
2222
import org.apache.rocketmq.common.TopicConfig;
2323
import org.apache.rocketmq.store.config.MessageStoreConfig;
2424
import org.junit.Before;
@@ -34,7 +34,7 @@
3434
import static org.junit.Assert.assertNull;
3535
import static org.junit.jupiter.api.Assertions.assertEquals;
3636
import static org.mockito.ArgumentMatchers.any;
37-
import static org.mockito.ArgumentMatchers.anyInt;
37+
import static org.mockito.Mockito.mock;
3838
import static org.mockito.Mockito.times;
3939
import static org.mockito.Mockito.verify;
4040
import static org.mockito.Mockito.when;
@@ -59,6 +59,9 @@ public void init() throws Exception {
5959
when(messageStoreConfig.getMemTableFlushIntervalMs()).thenReturn(1000L);
6060
when(messageStoreConfig.getRocksdbCompressionType()).thenReturn("LZ4_COMPRESSION");
6161
when(messageStoreConfig.getStorePathRootDir()).thenReturn("/");
62+
BrokerConfig brokerConfig = mock(BrokerConfig.class);
63+
when(brokerConfig.isUseSingleRocksDBForAllConfigs()).thenReturn(true);
64+
when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
6265
rocksDBTopicConfigManager = new RocksDBTopicConfigManager(brokerController);
6366
FieldUtils.writeDeclaredField(rocksDBTopicConfigManager, "rocksDBConfigManager", rocksDBConfigManager, true);
6467
}
@@ -89,6 +92,6 @@ public void testPutTopicConfig() throws Exception {
8992
newTopicConfig.setWriteQueueNums(10);
9093

9194
assertNull(rocksDBTopicConfigManager.putTopicConfig(newTopicConfig));
92-
verify(rocksDBConfigManager, times(1)).put(any(byte[].class), anyInt(), any(byte[].class));
95+
verify(rocksDBConfigManager, times(1)).put(any(byte[].class), any(byte[].class));
9396
}
9497
}

controller/src/test/java/org/apache/rocketmq/controller/impl/RaftBrokerHeartBeatManagerTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,9 @@
1616
*/
1717
package org.apache.rocketmq.controller.impl;
1818

19-
import com.alibaba.fastjson.JSON;
19+
import com.alibaba.fastjson2.JSON;
2020
import io.netty.channel.Channel;
2121
import io.netty.channel.DefaultChannelPromise;
22-
import java.util.Collections;
23-
import java.util.concurrent.CompletableFuture;
24-
import java.util.concurrent.CountDownLatch;
25-
import java.util.concurrent.TimeUnit;
2622
import org.apache.rocketmq.common.ControllerConfig;
2723
import org.apache.rocketmq.controller.impl.heartbeat.BrokerIdentityInfo;
2824
import org.apache.rocketmq.controller.impl.heartbeat.RaftBrokerHeartBeatManager;
@@ -34,6 +30,11 @@
3430
import org.mockito.Mock;
3531
import org.mockito.junit.MockitoJUnitRunner;
3632

33+
import java.util.Collections;
34+
import java.util.concurrent.CompletableFuture;
35+
import java.util.concurrent.CountDownLatch;
36+
import java.util.concurrent.TimeUnit;
37+
3738
import static org.junit.Assert.assertTrue;
3839
import static org.mockito.ArgumentMatchers.any;
3940
import static org.mockito.Mockito.when;

0 commit comments

Comments
 (0)