From 72ac17be84dc47041148ed02581a08f3ed0d6aac Mon Sep 17 00:00:00 2001 From: TangJie Date: Wed, 18 Dec 2024 12:19:59 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E4=BA=8B=E5=8A=A1=E6=B6=88=E6=81=AF?= =?UTF-8?q?=EF=BC=8C=20=E5=9B=9E=E6=9F=A5=E5=88=A4=E6=96=ADOP=5FHALF=5FQUE?= =?UTF-8?q?UE=20=E6=B6=88=E6=81=AF=E6=98=AF=E5=90=A6=E5=9C=A8HALF=5FQUEUE?= =?UTF-8?q?=E4=B8=AD=EF=BC=8C=E4=BC=98=E5=8C=96=E5=88=A4=E6=96=AD=E6=96=B9?= =?UTF-8?q?=E5=BC=8F=E3=80=82=20Transaction=20message,=20check=20back=20to?= =?UTF-8?q?=20determine=20if=20the=20OP-HALF-QUEUE=20message=20is=20in=20H?= =?UTF-8?q?ALF-QUEUE,=20optimize=20the=20judgment=20method?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../transaction/queue/TransactionalMessageServiceImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java index 9fdfd0a7101..a2469a1e1cf 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java @@ -197,15 +197,15 @@ public void check(long transactionTimeout, int transactionCheckMax, long nextOpOffset = pullResult.getNextBeginOffset(); int putInQueueCount = 0; int escapeFailCnt = 0; + Long removedOpOffset; while (true) { if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) { log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT); break; } - if (removeMap.containsKey(i)) { + if ((removedOpOffset = removeMap.remove(i)) != null) { log.debug("Half offset {} has been committed/rolled back", i); - Long removedOpOffset = removeMap.remove(i); opMsgMap.get(removedOpOffset).remove(i); if (opMsgMap.get(removedOpOffset).size() == 0) { opMsgMap.remove(removedOpOffset); From a9731204800161522b3e7ae82cef4c6f6eef0c27 Mon Sep 17 00:00:00 2001 From: TangJie Date: Wed, 18 Dec 2024 18:41:23 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E4=BA=8B=E5=8A=A1=E6=B6=88=E6=81=AF?= =?UTF-8?q?=EF=BC=8C=20=E5=9B=9E=E6=9F=A5=E5=88=A4=E6=96=ADOP=5FHALF=5FQUE?= =?UTF-8?q?UE=20=E6=B6=88=E6=81=AF=E6=98=AF=E5=90=A6=E5=9C=A8HALF=5FQUEUE?= =?UTF-8?q?=E4=B8=AD=EF=BC=8C=E4=BC=98=E5=8C=96=E5=88=A4=E6=96=AD=E6=96=B9?= =?UTF-8?q?=E5=BC=8F=E3=80=82=20Transaction=20message,=20check=20back=20to?= =?UTF-8?q?=20determine=20if=20the=20OP-HALF-QUEUE=20message=20is=20in=20H?= =?UTF-8?q?ALF-QUEUE,=20optimize=20the=20judgment=20method?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../transaction/queue/TransactionalMessageServiceImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java index a2469a1e1cf..3d237949409 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java @@ -197,13 +197,13 @@ public void check(long transactionTimeout, int transactionCheckMax, long nextOpOffset = pullResult.getNextBeginOffset(); int putInQueueCount = 0; int escapeFailCnt = 0; - Long removedOpOffset; while (true) { if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) { log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT); break; } + Long removedOpOffset; if ((removedOpOffset = removeMap.remove(i)) != null) { log.debug("Half offset {} has been committed/rolled back", i); opMsgMap.get(removedOpOffset).remove(i); From 4066b9bb9710876da8ec2ab234f57b493f6c0c75 Mon Sep 17 00:00:00 2001 From: TangJie Date: Wed, 18 Dec 2024 19:00:54 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E4=BA=8B=E5=8A=A1=E6=B6=88=E6=81=AF?= =?UTF-8?q?=EF=BC=8C=20=E5=9B=9E=E6=9F=A5=E5=88=A4=E6=96=ADOP=5FHALF=5FQUE?= =?UTF-8?q?UE=20=E6=B6=88=E6=81=AF=E6=98=AF=E5=90=A6=E5=9C=A8HALF=5FQUEUE?= =?UTF-8?q?=E4=B8=AD=EF=BC=8C=E4=BC=98=E5=8C=96=E5=88=A4=E6=96=AD=E6=96=B9?= =?UTF-8?q?=E5=BC=8F=E3=80=82=20Transaction=20message,=20check=20back=20to?= =?UTF-8?q?=20determine=20if=20the=20OP-HALF-QUEUE=20message=20is=20in=20H?= =?UTF-8?q?ALF-QUEUE,=20optimize=20the=20judgment=20method?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../transaction/queue/TransactionalMessageServiceImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java index 3d237949409..017803c624c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java @@ -456,8 +456,8 @@ private boolean checkPrepareQueueOffset(HashMap removeMap, List Date: Wed, 18 Dec 2024 19:05:17 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E4=BA=8B=E5=8A=A1=E6=B6=88=E6=81=AF?= =?UTF-8?q?=EF=BC=8C=20=E5=9B=9E=E6=9F=A5=E5=88=A4=E6=96=ADOP=5FHALF=5FQUE?= =?UTF-8?q?UE=20=E6=B6=88=E6=81=AF=E6=98=AF=E5=90=A6=E5=9C=A8HALF=5FQUEUE?= =?UTF-8?q?=E4=B8=AD=EF=BC=8C=E4=BC=98=E5=8C=96=E5=88=A4=E6=96=AD=E6=96=B9?= =?UTF-8?q?=E5=BC=8F=E3=80=82=20Transaction=20message,=20check=20back=20to?= =?UTF-8?q?=20determine=20if=20the=20OP-HALF-QUEUE=20message=20is=20in=20H?= =?UTF-8?q?ALF-QUEUE,=20optimize=20the=20judgment=20method?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../transaction/queue/TransactionalMessageServiceImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java index 017803c624c..137422e58cf 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java @@ -203,7 +203,7 @@ public void check(long transactionTimeout, int transactionCheckMax, log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT); break; } - Long removedOpOffset; + Long removedOpOffset = null; if ((removedOpOffset = removeMap.remove(i)) != null) { log.debug("Half offset {} has been committed/rolled back", i); opMsgMap.get(removedOpOffset).remove(i); @@ -456,7 +456,7 @@ private boolean checkPrepareQueueOffset(HashMap removeMap, List Date: Wed, 18 Dec 2024 19:17:55 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E4=BA=8B=E5=8A=A1=E6=B6=88=E6=81=AF?= =?UTF-8?q?=EF=BC=8C=20=E5=9B=9E=E6=9F=A5=E5=88=A4=E6=96=ADOP=5FHALF=5FQUE?= =?UTF-8?q?UE=20=E6=B6=88=E6=81=AF=E6=98=AF=E5=90=A6=E5=9C=A8HALF=5FQUEUE?= =?UTF-8?q?=E4=B8=AD=EF=BC=8C=E4=BC=98=E5=8C=96=E5=88=A4=E6=96=AD=E6=96=B9?= =?UTF-8?q?=E5=BC=8F=E3=80=82=20Transaction=20message,=20check=20back=20to?= =?UTF-8?q?=20determine=20if=20the=20OP-HALF-QUEUE=20message=20is=20in=20H?= =?UTF-8?q?ALF-QUEUE,=20optimize=20the=20judgment=20method?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../transaction/queue/TransactionalMessageServiceImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java index 137422e58cf..017803c624c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java @@ -203,7 +203,7 @@ public void check(long transactionTimeout, int transactionCheckMax, log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT); break; } - Long removedOpOffset = null; + Long removedOpOffset; if ((removedOpOffset = removeMap.remove(i)) != null) { log.debug("Half offset {} has been committed/rolled back", i); opMsgMap.get(removedOpOffset).remove(i); @@ -456,7 +456,7 @@ private boolean checkPrepareQueueOffset(HashMap removeMap, List