Skip to content

Commit 7db3319

Browse files
committed
coordinator: user state manager with Async interface
1 parent 92414c9 commit 7db3319

File tree

12 files changed

+79
-72
lines changed

12 files changed

+79
-72
lines changed

coordinator/app/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ dependencies {
2222
implementation project(':jvm-libs:linea:core:metrics')
2323
implementation project(':jvm-libs:linea:metrics:micrometer')
2424
implementation project(':jvm-libs:linea:core:domain-models')
25+
implementation project(':jvm-libs:linea:clients:linea-state-manager')
2526
implementation project(':coordinator:utilities')
2627
implementation project(':coordinator:core')
2728
implementation project(':coordinator:clients:shomei-client')

coordinator/app/src/main/kotlin/net/consensys/zkevm/coordinator/app/L1DependentApp.kt

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package net.consensys.zkevm.coordinator.app
22

3+
import build.linea.clients.StateManagerClientV1
4+
import build.linea.clients.StateManagerV1JsonRpcClient
35
import io.vertx.core.Vertx
46
import kotlinx.datetime.Clock
57
import net.consensys.linea.BlockNumberAndHash
@@ -319,6 +321,20 @@ class L1DependentApp(
319321
)
320322
}
321323

324+
private fun createStateManagerClientNew(
325+
stateManagerConfig: StateManagerClientConfig,
326+
logger: Logger
327+
): StateManagerClientV1 {
328+
return StateManagerV1JsonRpcClient(
329+
rpcClientFactory = httpJsonRpcClientFactory,
330+
endpoints = stateManagerConfig.endpoints.map { it.toURI() },
331+
maxInflightRequestsPerClient = stateManagerConfig.requestLimitPerEndpoint,
332+
requestRetry = stateManagerConfig.requestRetryConfig,
333+
zkStateManagerVersion = stateManagerConfig.version,
334+
logger = logger
335+
)
336+
}
337+
322338
private val lastFinalizedBlock = lastFinalizedBlock().get()
323339
private val lastProcessedBlockNumber = resumeConflationFrom(
324340
aggregationsRepository,
@@ -422,8 +438,16 @@ class L1DependentApp(
422438
private val conflationService: ConflationService =
423439
ConflationServiceImpl(calculator = conflationCalculator, metricsFacade = metricsFacade)
424440

425-
private val zkStateClient: Type2StateManagerClient =
426-
createStateManagerClient(configs.stateManager, LogManager.getLogger("clients.StateManagerShomeiClient"))
441+
// private val zkStateClient: Type2StateManagerClient =
442+
// createStateManagerClient(configs.stateManager, LogManager.getLogger("clients.StateManagerShomeiClient"))
443+
private val zkStateClient: StateManagerClientV1 = StateManagerV1JsonRpcClient(
444+
rpcClientFactory = httpJsonRpcClientFactory,
445+
endpoints = configs.stateManager.endpoints.map { it.toURI() },
446+
maxInflightRequestsPerClient = configs.stateManager.requestLimitPerEndpoint,
447+
requestRetry = configs.stateManager.requestRetryConfig,
448+
zkStateManagerVersion = configs.stateManager.version,
449+
logger = LogManager.getLogger("clients.StateManagerShomeiClient")
450+
)
427451

428452
private val lineaSmartContractClientForDataSubmission: LineaRollupSmartContractClient = run {
429453
// The below gas provider will act as the primary gas provider if L1
Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,21 @@
11
package net.consensys.zkevm.ethereum.coordination.blob
22

3-
import com.github.michaelbull.result.Err
4-
import com.github.michaelbull.result.Ok
5-
import net.consensys.zkevm.coordinator.clients.GetZkEVMStateMerkleProofResponse
6-
import net.consensys.zkevm.coordinator.clients.Type2StateManagerClient
3+
import build.linea.clients.GetStateMerkleProofRequest
4+
import build.linea.clients.StateManagerClientV1
5+
import build.linea.domain.BlockInterval
76
import tech.pegasys.teku.infrastructure.async.SafeFuture
8-
import tech.pegasys.teku.infrastructure.unsigned.UInt64
9-
10-
class BlobZkStateProviderImpl(private val zkStateClient: Type2StateManagerClient) : BlobZkStateProvider {
11-
private fun rollupGetZkEVMStateMerkleProof(startBlockNumber: ULong, endBlockNumber: ULong):
12-
SafeFuture<GetZkEVMStateMerkleProofResponse> {
13-
return zkStateClient.rollupGetZkEVMStateMerkleProof(
14-
UInt64.valueOf(startBlockNumber.toLong()),
15-
UInt64.valueOf(endBlockNumber.toLong())
16-
).thenCompose {
17-
when (it) {
18-
is Ok -> SafeFuture.completedFuture(it.value)
19-
is Err -> {
20-
SafeFuture.failedFuture(it.error.asException())
21-
}
22-
}
23-
}
24-
}
257

8+
class BlobZkStateProviderImpl(
9+
private val zkStateClient: StateManagerClientV1
10+
) : BlobZkStateProvider {
2611
override fun getBlobZKState(blockRange: ULongRange): SafeFuture<BlobZkState> {
27-
return rollupGetZkEVMStateMerkleProof(blockRange.first, blockRange.last).thenApply {
28-
BlobZkState(
29-
parentStateRootHash = it.zkParentStateRootHash.toArray(),
30-
finalStateRootHash = it.zkEndStateRootHash.toArray()
31-
)
32-
}
12+
return zkStateClient
13+
.makeRequest(GetStateMerkleProofRequest(BlockInterval(blockRange.first, blockRange.last)))
14+
.thenApply {
15+
BlobZkState(
16+
parentStateRootHash = it.zkParentStateRootHash,
17+
finalStateRootHash = it.zkEndStateRootHash
18+
)
19+
}
3320
}
3421
}

coordinator/clients/prover-client/file-based-client/src/main/kotlin/net/consensys/zkevm/coordinator/clients/prover/FileBasedExecutionProverClientV2.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ internal class ExecutionProofRequestDataDecorator(
6161
getBlockStateRootHash(request.blocks.first().blockNumber.toULong() - 1UL)
6262
) { blocksAndBridgeLogs, previousKeccakStateRootHash ->
6363
BatchExecutionProofRequestDto(
64-
zkParentStateRootHash = request.type2StateData.zkParentStateRootHash.toHexString(),
64+
zkParentStateRootHash = request.type2StateData.zkParentStateRootHash.encodeHex(),
6565
keccakParentStateRootHash = previousKeccakStateRootHash,
6666
conflatedExecutionTracesFile = request.tracesResponse.tracesFileName,
6767
tracesEngineVersion = request.tracesResponse.tracesEngineVersion,

coordinator/clients/prover-client/file-based-client/src/test/kotlin/net/consensys/zkevm/coordinator/clients/prover/ExecutionProofRequestDataDecoratorTest.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
package net.consensys.zkevm.coordinator.clients.prover
22

3+
import build.linea.clients.GetZkEVMStateMerkleProofResponse
34
import com.fasterxml.jackson.databind.node.ArrayNode
5+
import net.consensys.ByteArrayExt
46
import net.consensys.encodeHex
57
import net.consensys.zkevm.coordinator.clients.BatchExecutionProofRequestV1
68
import net.consensys.zkevm.coordinator.clients.GenerateTracesResponse
7-
import net.consensys.zkevm.coordinator.clients.GetZkEVMStateMerkleProofResponse
89
import net.consensys.zkevm.coordinator.clients.L2MessageServiceLogsClient
910
import net.consensys.zkevm.domain.RlpBridgeLogsData
1011
import net.consensys.zkevm.encoding.ExecutionPayloadV1Encoder
11-
import org.apache.tuweni.bytes.Bytes32
1212
import org.assertj.core.api.Assertions.assertThat
1313
import org.junit.jupiter.api.BeforeEach
1414
import org.junit.jupiter.api.Test
@@ -52,8 +52,8 @@ class ExecutionProofRequestDataDecoratorTest {
5252
val executionPayload2 = executionPayloadV1(blockNumber = 124, gasLimit = 20_000_000UL)
5353
val type2StateResponse = GetZkEVMStateMerkleProofResponse(
5454
zkStateMerkleProof = ArrayNode(null),
55-
zkParentStateRootHash = Bytes32.random(),
56-
zkEndStateRootHash = Bytes32.random(),
55+
zkParentStateRootHash = ByteArrayExt.random32(),
56+
zkEndStateRootHash = ByteArrayExt.random32(),
5757
zkStateManagerVersion = "2.0.0"
5858
)
5959
val generateTracesResponse = GenerateTracesResponse(
@@ -82,7 +82,7 @@ class ExecutionProofRequestDataDecoratorTest {
8282
val requestDto = requestDatDecorator.invoke(request).get()
8383

8484
assertThat(requestDto.keccakParentStateRootHash).isEqualTo(stateRoot)
85-
assertThat(requestDto.zkParentStateRootHash).isEqualTo(type2StateResponse.zkParentStateRootHash.toHexString())
85+
assertThat(requestDto.zkParentStateRootHash).isEqualTo(type2StateResponse.zkParentStateRootHash.encodeHex())
8686
assertThat(requestDto.conflatedExecutionTracesFile).isEqualTo("123-114-conflated-traces.json")
8787
assertThat(requestDto.tracesEngineVersion).isEqualTo("1.0.0")
8888
assertThat(requestDto.type2StateManagerVersion).isEqualTo("2.0.0")

coordinator/clients/type2-state-manager-client/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ dependencies {
77
implementation project(':jvm-libs:generic:json-rpc')
88
implementation project(':jvm-libs:linea:metrics:micrometer')
99
implementation project(':jvm-libs:generic:extensions:futures')
10+
implementation project(':jvm-libs:linea:clients:linea-state-manager')
1011

1112
implementation "tech.pegasys.teku.internal:bytes:${libs.versions.teku.get()}"
1213
implementation "io.vertx:vertx-core"

coordinator/core/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ dependencies {
99
api project(':jvm-libs:linea:core:domain-models')
1010
api project(':jvm-libs:linea:core:metrics')
1111
api project(':jvm-libs:linea:core:long-running-service')
12+
api project(':jvm-libs:linea:clients:linea-state-manager')
1213
api project(':jvm-libs:linea:core:traces')
1314
api project(':jvm-libs:generic:errors')
1415
api project(':jvm-libs:generic:extensions:kotlin')

coordinator/core/src/main/kotlin/net/consensys/zkevm/coordinator/clients/BatchExecutionProverRequestResponse.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import tech.pegasys.teku.ethereum.executionclient.schema.ExecutionPayloadV1
77
data class BatchExecutionProofRequestV1(
88
val blocks: List<ExecutionPayloadV1>,
99
val tracesResponse: GenerateTracesResponse,
10-
val type2StateData: GetZkEVMStateMerkleProofResponse
10+
val type2StateData: build.linea.clients.GetZkEVMStateMerkleProofResponse
1111
) : BlockInterval {
1212
override val startBlockNumber: ULong
1313
get() = blocks.first().blockNumber.toULong()

coordinator/core/src/main/kotlin/net/consensys/zkevm/ethereum/coordination/conflation/TracesConflationCoordinator.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package net.consensys.zkevm.ethereum.coordination.conflation
22

3+
import build.linea.clients.GetZkEVMStateMerkleProofResponse
34
import net.consensys.linea.BlockNumberAndHash
45
import net.consensys.zkevm.coordinator.clients.GenerateTracesResponse
5-
import net.consensys.zkevm.coordinator.clients.GetZkEVMStateMerkleProofResponse
66
import tech.pegasys.teku.infrastructure.async.SafeFuture
77

88
data class BlocksTracesConflated(

coordinator/core/src/main/kotlin/net/consensys/zkevm/ethereum/coordination/conflation/TracesConflationCoordinatorImpl.kt

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package net.consensys.zkevm.ethereum.coordination.conflation
22

3+
import build.linea.clients.GetStateMerkleProofRequest
4+
import build.linea.clients.GetZkEVMStateMerkleProofResponse
5+
import build.linea.clients.StateManagerClientV1
6+
import build.linea.domain.BlockInterval
37
import com.github.michaelbull.result.Err
48
import com.github.michaelbull.result.Ok
59
import com.github.michaelbull.result.Result
@@ -9,19 +13,15 @@ import com.github.michaelbull.result.mapBoth
913
import net.consensys.linea.BlockNumberAndHash
1014
import net.consensys.linea.errors.ErrorResponse
1115
import net.consensys.zkevm.coordinator.clients.GenerateTracesResponse
12-
import net.consensys.zkevm.coordinator.clients.GetZkEVMStateMerkleProofResponse
1316
import net.consensys.zkevm.coordinator.clients.TracesConflationClientV1
1417
import net.consensys.zkevm.coordinator.clients.TracesServiceErrorType
15-
import net.consensys.zkevm.coordinator.clients.Type2StateManagerClient
16-
import net.consensys.zkevm.coordinator.clients.Type2StateManagerErrorType
1718
import org.apache.logging.log4j.LogManager
1819
import org.apache.logging.log4j.Logger
1920
import tech.pegasys.teku.infrastructure.async.SafeFuture
20-
import tech.pegasys.teku.infrastructure.unsigned.UInt64
2121

2222
class TracesConflationCoordinatorImpl(
2323
private val tracesConflationClient: TracesConflationClientV1,
24-
private val zkStateClient: Type2StateManagerClient
24+
private val zkStateClient: StateManagerClientV1
2525
) : TracesConflationCoordinator {
2626
private val log: Logger = LogManager.getLogger(this::class.java)
2727
private fun requestConflatedTraces(
@@ -45,22 +45,7 @@ class TracesConflationCoordinatorImpl(
4545
startBlockNumber: ULong,
4646
endBlockNumber: ULong
4747
): SafeFuture<GetZkEVMStateMerkleProofResponse> {
48-
return zkStateClient
49-
.rollupGetZkEVMStateMerkleProof(
50-
UInt64.valueOf(startBlockNumber.toLong()),
51-
UInt64.valueOf(endBlockNumber.toLong())
52-
)
53-
.thenCompose { result:
54-
Result<GetZkEVMStateMerkleProofResponse, ErrorResponse<Type2StateManagerErrorType>>
55-
->
56-
result.mapBoth(
57-
{ SafeFuture.completedFuture(it) },
58-
{
59-
log.error("Type2State manager returned error={}", it)
60-
SafeFuture.failedFuture(it.asException("State manager error: ${it.message}"))
61-
}
62-
)
63-
}
48+
return zkStateClient.makeRequest(GetStateMerkleProofRequest(BlockInterval(startBlockNumber, endBlockNumber)))
6449
}
6550

6651
override fun conflateExecutionTraces(

jvm-libs/linea/clients/linea-state-manager/src/main/kotlin/build/linea/clients/StateManagerClientV1.kt

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@ enum class StateManagerErrorType : ClientError {
1313
BLOCK_MISSING_IN_CHAIN
1414
}
1515

16-
sealed interface StateManagerRequest
17-
sealed class GetChainHeadRequest() : StateManagerRequest
18-
data class GetStateMerkleProofRequest(
19-
val blockInterval: BlockInterval
20-
) : StateManagerRequest, BlockInterval by blockInterval
16+
sealed interface StateManagerRequest<ResponseType : StateManagerResponse> : ClientRequest<ResponseType>
17+
sealed class GetChainHeadRequest() : StateManagerRequest<GetChainHeadResponse>
18+
19+
data class GetStateMerkleProofRequest(val blockInterval: BlockInterval) :
20+
StateManagerRequest<GetZkEVMStateMerkleProofResponse>,
21+
BlockInterval by blockInterval
2122

2223
sealed interface StateManagerResponse
2324

@@ -57,14 +58,9 @@ data class GetZkEVMStateMerkleProofResponse(
5758
}
5859
}
5960

60-
// Type alias dedicated for each method
61-
typealias StateManagerClientToGetStateMerkleProofV0 =
62-
AsyncClient<GetStateMerkleProofRequest, GetZkEVMStateMerkleProofResponse>
63-
64-
typealias StateManagerClientToGetChainHeadV1 =
65-
AsyncClient<GetChainHeadRequest, ULong>
61+
data class GetChainHeadResponse(val headBlockNumber: ULong) : StateManagerResponse
6662

67-
interface StateManagerClientV1 {
63+
interface StateManagerClientV1 : AsyncClient<StateManagerRequest<*>> {
6864
/**
6965
* Get the head block number of the chain.
7066
* @return GetZkEVMStateMerkleProofResponse
@@ -84,4 +80,15 @@ interface StateManagerClientV1 {
8480
): SafeFuture<Result<GetZkEVMStateMerkleProofResponse, ErrorResponse<StateManagerErrorType>>>
8581

8682
fun rollupGetHeadBlockNumber(): SafeFuture<ULong>
83+
84+
override fun <Response> makeRequest(request: ClientRequest<Response>): SafeFuture<Response> {
85+
@Suppress("UNCHECKED_CAST")
86+
return when (request) {
87+
is GetStateMerkleProofRequest -> rollupGetStateMerkleProof(request.blockInterval) as SafeFuture<Response>
88+
is GetChainHeadRequest -> rollupGetHeadBlockNumber()
89+
.thenApply { GetChainHeadResponse(it) } as SafeFuture<Response>
90+
91+
else -> throw IllegalArgumentException("Unknown request type: $request")
92+
}
93+
}
8794
}

jvm-libs/linea/core/client-interface/src/main/kotlin/build/linea/clients/Client.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ interface Client<Request, Response> {
2222
fun makeRequest(request: Request): Response
2323
}
2424

25-
interface AsyncClient<Request, Response> {
26-
fun makeRequest(request: Request): SafeFuture<Response>
25+
interface ClientRequest<Response>
26+
interface AsyncClient<ReqSupperType> {
27+
fun <Response> makeRequest(request: ClientRequest<Response>): SafeFuture<Response>
2728
}
2829

2930
fun <T, E : ClientError> SafeFuture<Result<T, ErrorResponse<E>>>.unwrapResultMonad(): SafeFuture<T> {

0 commit comments

Comments
 (0)