diff --git a/WORKSPACE b/WORKSPACE index da58ae67634..0f06aa21123 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -54,7 +54,7 @@ maven_install( "commons-validator:commons-validator:1.7", "org.apache.commons:commons-lang3:3.12.0", "org.hamcrest:hamcrest-core:1.3", - "io.openmessaging.storage:dledger:0.3.1", + "io.openmessaging.storage:dledger:0.3.2", "net.java.dev.jna:jna:4.2.2", "ch.qos.logback:logback-classic:1.2.10", "ch.qos.logback:logback-core:1.2.10", diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java index 70c65c00f40..f67967e9600 100644 --- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java +++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerControllerStateMachine.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.controller.impl; import io.openmessaging.storage.dledger.entry.DLedgerEntry; +import io.openmessaging.storage.dledger.exception.DLedgerException; import io.openmessaging.storage.dledger.snapshot.SnapshotReader; import io.openmessaging.storage.dledger.snapshot.SnapshotWriter; import io.openmessaging.storage.dledger.statemachine.CommittedEntryIterator; @@ -28,8 +29,6 @@ import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import java.util.concurrent.CompletableFuture; - /** * The state machine implementation of the dledger controller */ @@ -46,6 +45,11 @@ public DLedgerControllerStateMachine(final ReplicasInfoManager replicasInfoManag this.dLedgerId = generateDLedgerId(dLedgerGroupId, dLedgerSelfId); } + @Override + public String generateDLedgerId(String dLedgerGroupId, String dLedgerSelfId) { + return new StringBuilder(20).append(dLedgerGroupId).append("#").append(dLedgerSelfId).toString(); + } + @Override public void onApply(CommittedEntryIterator iterator) { int applyingSize = 0; @@ -66,7 +70,8 @@ public void onApply(CommittedEntryIterator iterator) { } @Override - public void onSnapshotSave(SnapshotWriter writer, CompletableFuture future) { + public boolean onSnapshotSave(SnapshotWriter writer) { + return true; } @Override @@ -76,6 +81,12 @@ public boolean onSnapshotLoad(SnapshotReader reader) { @Override public void onShutdown() { + log.info("StateMachine {} onShutdown", this.dLedgerId); + } + + @Override + public void onError(DLedgerException exception) { + log.error("Encountered an error on StateMachine {}, dLedger may stop working since some error occurs, you should figure out the cause and repair or remove this node.", this.dLedgerId, exception); } @Override diff --git a/pom.xml b/pom.xml index 56830f15a3e..4343c3a5748 100644 --- a/pom.xml +++ b/pom.xml @@ -123,7 +123,7 @@ 1.8.0 0.33.0 1.8.1 - 0.3.1.2 + 0.3.2 6.0.53 1.0-beta-4 1.4.2