Skip to content

Commit

Permalink
Fix restart ws head (emeraldpay#191)
Browse files Browse the repository at this point in the history
* Fix restart ws head
  • Loading branch information
KirillPamPam authored Apr 3, 2023
1 parent 97fa616 commit ceea848
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ open class SchedulersConfig {
return makeScheduler("head-scheduler", 5, monitoringConfig)
}

@Bean
open fun wsConnectionResubscribeScheduler(monitoringConfig: MonitoringConfig): Scheduler {
return makeScheduler("ws-connection-resubscribe-scheduler", 2, monitoringConfig)
}

@Bean
open fun grpcChannelExecutor(monitoringConfig: MonitoringConfig): Executor {
return makePool("grpc-client-channel", 10, monitoringConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ open class ConfiguredUpstreams(
@Qualifier("grpcChannelExecutor")
private val channelExecutor: Executor,
private val chainsConfig: ChainsConfig,
private val grpcTracing: GrpcTracing
private val grpcTracing: GrpcTracing,
private val wsConnectionResubscribeScheduler: Scheduler
) : ApplicationRunner {

private val log = LoggerFactory.getLogger(ConfiguredUpstreams::class.java)
Expand Down Expand Up @@ -400,7 +401,9 @@ open class ConfiguredUpstreams(
val httpFactory = buildHttpFactory(conn, urls)
log.info("Using ${chain.chainName} upstream, at ${urls.joinToString()}")
val connectorFactory =
EthereumConnectorFactory(conn.resolveMode(), wsFactoryApi, httpFactory, forkChoice, blockValidator)
EthereumConnectorFactory(
conn.resolveMode(), wsFactoryApi, httpFactory, forkChoice, blockValidator, wsConnectionResubscribeScheduler
)
if (!connectorFactory.isValid()) {
log.warn("Upstream configuration is invalid (probably no http endpoint)")
return null
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.upstream.BlockValidator
import io.emeraldpay.dshackle.upstream.Lifecycle
import io.emeraldpay.dshackle.upstream.WebsocketConnectionStatesHandler
import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse
Expand All @@ -32,6 +31,8 @@ import io.emeraldpay.etherjar.rpc.json.TransactionRefJson
import reactor.core.Disposable
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.Sinks
import reactor.core.scheduler.Scheduler
import reactor.core.scheduler.Schedulers
import reactor.retry.Repeat
import java.time.Duration
Expand All @@ -41,12 +42,21 @@ class EthereumWsHead(
forkChoice: ForkChoice,
blockValidator: BlockValidator,
private val api: JsonRpcReader,
wsSubscriptions: WsSubscriptions,
private val skipEnhance: Boolean
private val wsSubscriptions: WsSubscriptions,
private val skipEnhance: Boolean,
private val wsConnectionResubscribeScheduler: Scheduler
) : DefaultEthereumHead(upstreamId, forkChoice, blockValidator), Lifecycle {

private var connectionId: String? = null
private var subscribed = false
private var connected = false

private var subscription: Disposable? = null
private val wsConnectionStatesHandler = WebsocketConnectionStatesHandler(wsSubscriptions, this::onNoHeadUpdates)
private val noHeadUpdatesSink = Sinks.many().multicast().directBestEffort<Boolean>()

init {
registerHeadResubscribeFlux()
}

override fun isRunning(): Boolean {
return subscription != null
Expand All @@ -55,6 +65,7 @@ class EthereumWsHead(
override fun start() {
super.start()
this.subscription?.dispose()
this.subscribed = true
val heads = Flux.merge(
// get the current block, not just wait for the next update
getLatestBlock(api),
Expand All @@ -64,12 +75,11 @@ class EthereumWsHead(
}

override fun onNoHeadUpdates() {
log.warn("Restart ws head, upstreamId: $upstreamId")
start()
noHeadUpdatesSink.tryEmitNext(true)
}

fun listenNewHeads(): Flux<BlockContainer> {
return wsConnectionStatesHandler.subscribe("newHeads")
return subscribe()
.map {
Global.objectMapper.readValue(it, BlockJson::class.java) as BlockJson<TransactionRefJson>
}
Expand All @@ -88,6 +98,11 @@ class EthereumWsHead(
Mono.just(BlockContainer.from(block))
}
}
.timeout(Duration.ofSeconds(60), Mono.error(RuntimeException("No response from subscribe to newHeads")))
.onErrorResume {
subscribed = false
Mono.empty()
}
}

fun enhanceRealBlock(block: BlockJson<TransactionRefJson>): Mono<BlockContainer> {
Expand Down Expand Up @@ -118,5 +133,45 @@ class EthereumWsHead(
super.stop()
subscription?.dispose()
subscription = null
noHeadUpdatesSink.tryEmitComplete()
}

private fun subscribe(): Flux<ByteArray> {
return try {
wsSubscriptions.subscribe("newHeads")
.also {
connectionId = it.connectionId
if (!connected) {
connected = true
}
}.data
} catch (e: Exception) {
Flux.error(e)
}
}

private fun registerHeadResubscribeFlux() {
val connectionStates = wsSubscriptions.connectionInfoFlux()
.map {
if (it.connectionId == connectionId && it.connectionState == WsConnection.ConnectionState.DISCONNECTED) {
subscribed = false
connected = false
connectionId = null
} else if (it.connectionState == WsConnection.ConnectionState.CONNECTED) {
connected = true
return@map true
}
return@map false
}

Flux.merge(
noHeadUpdatesSink.asFlux(),
connectionStates,
).subscribeOn(wsConnectionResubscribeScheduler)
.filter { it && !subscribed && connected }
.subscribe {
log.warn("Restart ws head, upstreamId: $upstreamId")
start()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import io.emeraldpay.dshackle.upstream.ethereum.connectors.EthereumConnectorFact
import io.emeraldpay.dshackle.upstream.ethereum.connectors.EthereumConnectorFactory.ConnectorMode.RPC_REQUESTS_WITH_WS_HEAD
import io.emeraldpay.dshackle.upstream.ethereum.connectors.EthereumConnectorFactory.ConnectorMode.WS_ONLY
import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice
import reactor.core.scheduler.Scheduler

open class EthereumConnectorFactory(
private val connectorType: ConnectorMode,
private val wsFactory: EthereumWsConnectionPoolFactory?,
private val httpFactory: HttpFactory?,
private val forkChoice: ForkChoice,
private val blockValidator: BlockValidator,
private val wsConnectionResubscribeScheduler: Scheduler
) : ConnectorFactory {

override fun isValid(): Boolean {
Expand Down Expand Up @@ -47,7 +49,9 @@ open class EthereumConnectorFactory(
skipEnhance: Boolean
): EthereumConnector {
if (wsFactory != null && connectorType == WS_ONLY) {
return EthereumWsConnector(wsFactory, upstream, forkChoice, blockValidator, skipEnhance)
return EthereumWsConnector(
wsFactory, upstream, forkChoice, blockValidator, skipEnhance, wsConnectionResubscribeScheduler
)
}
if (httpFactory == null) {
throw java.lang.IllegalArgumentException("Can't create rpc connector if no http factory set")
Expand All @@ -59,7 +63,8 @@ open class EthereumConnectorFactory(
upstream.getId(),
forkChoice,
blockValidator,
skipEnhance
skipEnhance,
wsConnectionResubscribeScheduler
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.emeraldpay.dshackle.upstream.ethereum.connectors.EthereumConnectorFact
import io.emeraldpay.dshackle.upstream.forkchoice.AlwaysForkChoice
import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice
import org.slf4j.LoggerFactory
import reactor.core.scheduler.Scheduler
import java.time.Duration

class EthereumRpcConnector(
Expand All @@ -31,7 +32,8 @@ class EthereumRpcConnector(
id: String,
forkChoice: ForkChoice,
blockValidator: BlockValidator,
skipEnhance: Boolean
skipEnhance: Boolean,
wsConnectionResubscribeScheduler: Scheduler
) : EthereumConnector, CachesEnabled {
private val pool: WsConnectionPool?
private val head: Head
Expand All @@ -57,14 +59,20 @@ class EthereumRpcConnector(
}
RPC_REQUESTS_WITH_MIXED_HEAD -> {
val wsHead =
EthereumWsHead(id, AlwaysForkChoice(), blockValidator, getIngressReader(), WsSubscriptionsImpl(pool!!), skipEnhance)
EthereumWsHead(
id, AlwaysForkChoice(), blockValidator, getIngressReader(),
WsSubscriptionsImpl(pool!!), skipEnhance, wsConnectionResubscribeScheduler
)
// receive all new blocks through WebSockets, but also periodically verify with RPC in case if WS failed
val rpcHead =
EthereumRpcHead(getIngressReader(), AlwaysForkChoice(), id, blockValidator, Duration.ofSeconds(30))
MergedHead(listOf(rpcHead, wsHead), forkChoice, "Merged for $id")
}
RPC_REQUESTS_WITH_WS_HEAD -> {
EthereumWsHead(id, AlwaysForkChoice(), blockValidator, getIngressReader(), WsSubscriptionsImpl(pool!!), skipEnhance)
EthereumWsHead(
id, AlwaysForkChoice(), blockValidator, getIngressReader(),
WsSubscriptionsImpl(pool!!), skipEnhance, wsConnectionResubscribeScheduler
)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptionsImpl
import io.emeraldpay.dshackle.upstream.ethereum.subscribe.EthereumWsIngressSubscription
import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcWsClient
import reactor.core.scheduler.Scheduler

class EthereumWsConnector(
wsFactory: EthereumWsConnectionPoolFactory,
upstream: DefaultUpstream,
forkChoice: ForkChoice,
blockValidator: BlockValidator,
skipEnhance: Boolean
skipEnhance: Boolean,
wsConnectionResubscribeScheduler: Scheduler
) : EthereumConnector {
private val pool: WsConnectionPool
private val reader: JsonRpcReader
Expand All @@ -29,7 +31,10 @@ class EthereumWsConnector(
pool = wsFactory.create(upstream)
reader = JsonRpcWsClient(pool)
val wsSubscriptions = WsSubscriptionsImpl(pool)
head = EthereumWsHead(upstream.getId(), forkChoice, blockValidator, reader, wsSubscriptions, skipEnhance)
head = EthereumWsHead(
upstream.getId(), forkChoice, blockValidator, reader,
wsSubscriptions, skipEnhance, wsConnectionResubscribeScheduler
)
subscriptions = EthereumWsIngressSubscription(wsSubscriptions)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.emeraldpay.dshackle.startup

import io.emeraldpay.dshackle.Chain
import brave.Tracing
import brave.grpc.GrpcTracing
import io.emeraldpay.dshackle.Chain
Expand All @@ -12,6 +11,7 @@ import io.emeraldpay.dshackle.quorum.NonEmptyQuorum
import io.emeraldpay.dshackle.upstream.CallTargetsHolder
import io.emeraldpay.dshackle.upstream.calls.ManagedCallMethods
import org.springframework.context.ApplicationEventPublisher
import reactor.core.scheduler.Schedulers
import spock.lang.Specification

import java.util.concurrent.Executors
Expand All @@ -29,7 +29,8 @@ class ConfiguredUpstreamsSpec extends Specification {
Mock(ApplicationEventPublisher),
Executors.newFixedThreadPool(1),
ChainsConfig.default(),
GrpcTracing.create(Tracing.newBuilder().build())
GrpcTracing.create(Tracing.newBuilder().build()),
Schedulers.boundedElastic()
)
def methods = new UpstreamsConfig.Methods(
[
Expand Down Expand Up @@ -58,7 +59,8 @@ class ConfiguredUpstreamsSpec extends Specification {
Mock(ApplicationEventPublisher),
Executors.newFixedThreadPool(1),
ChainsConfig.default(),
GrpcTracing.create(Tracing.newBuilder().build())
GrpcTracing.create(Tracing.newBuilder().build()),
Schedulers.boundedElastic()
)
def methods = new UpstreamsConfig.Methods(
[
Expand Down Expand Up @@ -86,7 +88,8 @@ class ConfiguredUpstreamsSpec extends Specification {
Mock(ApplicationEventPublisher),
Executors.newFixedThreadPool(1),
ChainsConfig.default(),
GrpcTracing.create(Tracing.newBuilder().build())
GrpcTracing.create(Tracing.newBuilder().build()),
Schedulers.boundedElastic()
)
expect:
configurer.getHash(node, src) == expected
Expand All @@ -109,7 +112,8 @@ class ConfiguredUpstreamsSpec extends Specification {
Mock(ApplicationEventPublisher),
Executors.newFixedThreadPool(1),
ChainsConfig.default(),
GrpcTracing.create(Tracing.newBuilder().build())
GrpcTracing.create(Tracing.newBuilder().build()),
Schedulers.boundedElastic()
)
when:
def h1 = configurer.getHash(null, "hohoho")
Expand Down Expand Up @@ -137,7 +141,8 @@ class ConfiguredUpstreamsSpec extends Specification {
Mock(ApplicationEventPublisher),
Executors.newFixedThreadPool(1),
ChainsConfig.default(),
GrpcTracing.create(Tracing.newBuilder().build())
GrpcTracing.create(Tracing.newBuilder().build()),
Schedulers.boundedElastic()
)
def methodsGroup = new UpstreamsConfig.MethodGroups(
["filter"] as Set,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import io.emeraldpay.dshackle.upstream.calls.DefaultEthereumMethods
import io.emeraldpay.dshackle.upstream.ethereum.EthereumRpcUpstream
import io.emeraldpay.dshackle.upstream.ethereum.connectors.EthereumConnectorFactory
import io.emeraldpay.dshackle.upstream.forkchoice.MostWorkForkChoice
import reactor.core.scheduler.Schedulers
import reactor.test.StepVerifier
import spock.lang.Retry
import spock.lang.Specification
Expand All @@ -49,7 +50,10 @@ class FilteredApisSpec extends Specification {
def httpFactory = Mock(HttpFactory) {
create(_, _) >> TestingCommons.api().tap { it.id = "${i++}" }
}
def connectorFactory = new EthereumConnectorFactory(EthereumConnectorFactory.ConnectorMode.RPC_ONLY, null, httpFactory, new MostWorkForkChoice(), BlockValidator.ALWAYS_VALID)
def connectorFactory = new EthereumConnectorFactory(
EthereumConnectorFactory.ConnectorMode.RPC_ONLY, null, httpFactory,
new MostWorkForkChoice(), BlockValidator.ALWAYS_VALID, Schedulers.boundedElastic()
)
new EthereumRpcUpstream(
"test",
(byte)123,
Expand Down
Loading

0 comments on commit ceea848

Please sign in to comment.