diff --git a/dex-it/src/test/scala/com/wavesplatform/it/sync/orders/OrderDynamicFeeTestSuite.scala b/dex-it/src/test/scala/com/wavesplatform/it/sync/orders/OrderDynamicFeeTestSuite.scala index 91ebba87d0..5676b9f133 100644 --- a/dex-it/src/test/scala/com/wavesplatform/it/sync/orders/OrderDynamicFeeTestSuite.scala +++ b/dex-it/src/test/scala/com/wavesplatform/it/sync/orders/OrderDynamicFeeTestSuite.scala @@ -802,7 +802,7 @@ class OrderDynamicFeeTestSuite extends OrderFeeBaseTestSuite { broadcastAndAwait(mkTransfer(alice, bob, defaultAssetQuantity / 2, eth, 0.005.waves)) - dex1.restartWithNewSuiteConfig(ConfigFactory.parseString("waves.dex.order-fee.-1.mode = percent")) + dex1.restartWithNewSuiteConfig(ConfigFactory.parseString("waves.dex.order-fee.-1.mode = percent").withFallback(dexInitialSuiteConfig)) check() dex1.restartWithNewSuiteConfig(ConfigFactory.parseString("waves.dex.order-fee.-1.mode = fixed").withFallback(dexInitialSuiteConfig)) diff --git a/waves-integration-it/src/test/scala/com/wavesplatform/dex/grpc/integration/clients/BlockchainUpdatesClientTestSuite.scala b/waves-integration-it/src/test/scala/com/wavesplatform/dex/grpc/integration/clients/BlockchainUpdatesClientTestSuite.scala index 5d86677859..72a84e5b8c 100644 --- a/waves-integration-it/src/test/scala/com/wavesplatform/dex/grpc/integration/clients/BlockchainUpdatesClientTestSuite.scala +++ b/waves-integration-it/src/test/scala/com/wavesplatform/dex/grpc/integration/clients/BlockchainUpdatesClientTestSuite.scala @@ -11,11 +11,11 @@ import com.wavesplatform.dex.grpc.integration.clients.domain.portfolio.Implicits import com.wavesplatform.dex.grpc.integration.clients.domain.{TransactionWithChanges, WavesNodeEvent} import com.wavesplatform.dex.grpc.integration.protobuf.PbToDexConversions._ import com.wavesplatform.dex.grpc.integration.settings.GrpcClientSettings +import com.wavesplatform.dex.grpc.integration.tool.RestartableManagedChannel import com.wavesplatform.dex.it.api.HasToxiProxy import com.wavesplatform.dex.it.docker.WavesNodeContainer import com.wavesplatform.dex.it.test.NoStackTraceCancelAfterFailure import im.mak.waves.transactions.Transaction -import io.grpc.ManagedChannel import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.nio.NioSocketChannel import monix.eval.Task @@ -63,13 +63,15 @@ class BlockchainUpdatesClientTestSuite extends IntegrationSuiteBase with HasToxi noDataTimeout = 5.minutes ) - private lazy val blockchainUpdatesChannel: ManagedChannel = - grpcSettings.toNettyChannelBuilder - .executor((command: Runnable) => grpcExecutor.execute(command)) - .eventLoopGroup(eventLoopGroup) - .channelType(classOf[NioSocketChannel]) - .usePlaintext() - .build + private lazy val blockchainUpdatesChannel: RestartableManagedChannel = + new RestartableManagedChannel(() => + grpcSettings.toNettyChannelBuilder + .executor((command: Runnable) => grpcExecutor.execute(command)) + .eventLoopGroup(eventLoopGroup) + .channelType(classOf[NioSocketChannel]) + .usePlaintext() + .build + ) private lazy val client = new DefaultBlockchainUpdatesClient(eventLoopGroup, blockchainUpdatesChannel, monixScheduler, grpcSettings.noDataTimeout)( diff --git a/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/blockchainupdates/BlockchainUpdatesClient.scala b/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/blockchainupdates/BlockchainUpdatesClient.scala index 3e5a47b893..0db49cbf84 100644 --- a/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/blockchainupdates/BlockchainUpdatesClient.scala +++ b/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/blockchainupdates/BlockchainUpdatesClient.scala @@ -2,12 +2,12 @@ package com.wavesplatform.dex.grpc.integration.clients.blockchainupdates import com.wavesplatform.dex.domain.utils.ScorexLogging import com.wavesplatform.dex.grpc.integration.effect.Implicits.NettyFutureOps -import io.grpc.ManagedChannel +import com.wavesplatform.dex.grpc.integration.tool.RestartableManagedChannel import io.netty.channel.EventLoopGroup import monix.execution.Scheduler import java.util.concurrent.TimeUnit -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} trait BlockchainUpdatesClient { @@ -23,7 +23,7 @@ trait BlockchainUpdatesClient { class DefaultBlockchainUpdatesClient( eventLoopGroup: EventLoopGroup, - channel: ManagedChannel, + channel: RestartableManagedChannel, monixScheduler: Scheduler, noDataTimeout: FiniteDuration )(implicit grpcExecutionContext: ExecutionContext) @@ -34,8 +34,7 @@ class DefaultBlockchainUpdatesClient( override def close(): Future[Unit] = { blockchainEvents.close() - channel.shutdown() - channel.awaitTermination(500, TimeUnit.MILLISECONDS) + channel.shutdown(500.millis) // TODO DEX-998 if (eventLoopGroup.isShuttingDown) Future.successful(()) else eventLoopGroup.shutdownGracefully(0, 500, TimeUnit.MILLISECONDS).asScala.map(_ => ()) diff --git a/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/blockchainupdates/GrpcBlockchainUpdatesControlledStream.scala b/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/blockchainupdates/GrpcBlockchainUpdatesControlledStream.scala index a9354531d6..62c9e2a33b 100644 --- a/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/blockchainupdates/GrpcBlockchainUpdatesControlledStream.scala +++ b/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/blockchainupdates/GrpcBlockchainUpdatesControlledStream.scala @@ -5,12 +5,13 @@ import com.wavesplatform.dex.domain.utils.ScorexLogging import com.wavesplatform.dex.grpc.integration.clients.ControlledStream.SystemEvent import com.wavesplatform.dex.grpc.integration.clients.domain.BlockRef import com.wavesplatform.dex.grpc.integration.protobuf.PbToDexConversions._ +import com.wavesplatform.dex.grpc.integration.tool.RestartableManagedChannel import com.wavesplatform.dex.grpc.observers.IntegrationObserver import com.wavesplatform.events.api.grpc.protobuf.{BlockchainUpdatesApiGrpc, SubscribeEvent, SubscribeRequest} import com.wavesplatform.events.protobuf.BlockchainUpdated.Append.Body import com.wavesplatform.events.protobuf.BlockchainUpdated.Update import io.grpc.stub.ClientCalls -import io.grpc.{CallOptions, ClientCall, Grpc, ManagedChannel} +import io.grpc.{CallOptions, ClientCall, Grpc} import monix.execution.{Cancelable, Scheduler} import monix.reactive.Observable import monix.reactive.subjects.ConcurrentSubject @@ -23,7 +24,7 @@ import scala.concurrent.duration.FiniteDuration From the docs of reactive streams: the grammar must still be respected: (onNext)* (onComplete | onError) On error we just restart the stream, so r receives updates from a new stream. That is why we don't propagate errors to r */ -class GrpcBlockchainUpdatesControlledStream(channel: ManagedChannel, noDataTimeout: FiniteDuration)(implicit scheduler: Scheduler) +class GrpcBlockchainUpdatesControlledStream(channel: RestartableManagedChannel, noDataTimeout: FiniteDuration)(implicit scheduler: Scheduler) extends BlockchainUpdatesControlledStream with ScorexLogging { @@ -39,7 +40,8 @@ class GrpcBlockchainUpdatesControlledStream(channel: ManagedChannel, noDataTimeo require(height >= 1, "We can not get blocks on height <= 0") log.info(s"Connecting to Blockchain events stream, getting blocks from $height") - val call = channel.newCall(BlockchainUpdatesApiGrpc.METHOD_SUBSCRIBE, CallOptions.DEFAULT) + channel.restart() + val call = channel.getChannel.newCall(BlockchainUpdatesApiGrpc.METHOD_SUBSCRIBE, CallOptions.DEFAULT) val observer = new BlockchainUpdatesObserver(call) grpcObserver.getAndSet(observer.some).foreach(_.close()) ClientCalls.asyncServerStreamingCall(call, new SubscribeRequest(height), observer) @@ -119,6 +121,8 @@ class GrpcBlockchainUpdatesControlledStream(channel: ManagedChannel, noDataTimeo if (grpcObserver.get().contains(this)) { log.warn(s"No data for $noDataTimeout, restarting!") stopGrpcObserver() + channel.stop() + internalSystemStream.onNext(SystemEvent.Stopped) } else log.warn("False positive no-data timeout") })) .foreach(_.cancel()) diff --git a/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/combined/CombinedWavesBlockchainClient.scala b/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/combined/CombinedWavesBlockchainClient.scala index 15ab7a83ec..6487eb3d00 100644 --- a/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/combined/CombinedWavesBlockchainClient.scala +++ b/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/clients/combined/CombinedWavesBlockchainClient.scala @@ -28,8 +28,8 @@ import com.wavesplatform.dex.grpc.integration.protobuf.DexToPbConversions._ import com.wavesplatform.dex.grpc.integration.protobuf.PbToDexConversions._ import com.wavesplatform.dex.grpc.integration.services.UtxTransaction import com.wavesplatform.dex.grpc.integration.settings.WavesBlockchainClientSettings +import com.wavesplatform.dex.grpc.integration.tool.RestartableManagedChannel import com.wavesplatform.protobuf.transaction.SignedTransaction -import io.grpc.ManagedChannel import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.nio.NioSocketChannel import monix.eval.Task @@ -53,7 +53,7 @@ class CombinedWavesBlockchainClient( type Balances = Map[Address, Map[Asset, Long]] type Leases = Map[Address, Long] - @volatile private var blockchainStatus: Status = Starting() + @volatile private var blockchainStatus: Status = Starting() override def status(): Status = blockchainStatus @@ -242,6 +242,7 @@ class CombinedWavesBlockchainClient( override def close(): Future[Unit] = meClient.close().zip(bClient.close()).map(_ => ()) + } object CombinedWavesBlockchainClient extends ScorexLogging { @@ -264,7 +265,7 @@ object CombinedWavesBlockchainClient extends ScorexLogging { val eventLoopGroup = new NioEventLoopGroup log.info(s"Building Matcher Extension gRPC client for server: ${wavesBlockchainClientSettings.grpc.target}") - val matcherExtensionChannel: ManagedChannel = + val matcherExtensionChannel = wavesBlockchainClientSettings.grpc.toNettyChannelBuilder .executor((command: Runnable) => grpcExecutionContext.execute(command)) .eventLoopGroup(eventLoopGroup) @@ -273,13 +274,15 @@ object CombinedWavesBlockchainClient extends ScorexLogging { .build log.info(s"Building Blockchain Updates Extension gRPC client for server: ${wavesBlockchainClientSettings.blockchainUpdatesGrpc.target}") - val blockchainUpdatesChannel: ManagedChannel = - wavesBlockchainClientSettings.blockchainUpdatesGrpc.toNettyChannelBuilder - .executor((command: Runnable) => grpcExecutionContext.execute(command)) - .eventLoopGroup(eventLoopGroup) - .channelType(classOf[NioSocketChannel]) - .usePlaintext() - .build + val blockchainUpdatesChannel = + new RestartableManagedChannel(() => + wavesBlockchainClientSettings.blockchainUpdatesGrpc.toNettyChannelBuilder + .executor((command: Runnable) => grpcExecutionContext.execute(command)) + .eventLoopGroup(eventLoopGroup) + .channelType(classOf[NioSocketChannel]) + .usePlaintext() + .build + ) new CombinedWavesBlockchainClient( wavesBlockchainClientSettings.combinedClientSettings, diff --git a/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/tool/RestartableManagedChannel.scala b/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/tool/RestartableManagedChannel.scala new file mode 100644 index 0000000000..119f17bd66 --- /dev/null +++ b/waves-integration/src/main/scala/com/wavesplatform/dex/grpc/integration/tool/RestartableManagedChannel.scala @@ -0,0 +1,53 @@ +package com.wavesplatform.dex.grpc.integration.tool + +import io.grpc.ManagedChannel + +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.Duration + +final class RestartableManagedChannel(mkManagedChannel: () => ManagedChannel) { + + private var channel: ManagedChannel = _ + private var isClosed: Boolean = false + + def stop(): Unit = synchronized { + checkIsClosed() + if (channel != null) { + channel.shutdown() + channel = null + } + } + + def restart(): Unit = synchronized { + checkIsClosed() + if (channel != null) + channel.shutdown() + channel = mkManagedChannel() + } + + def getChannel: ManagedChannel = synchronized { + checkIsClosed() + if (channel == null) + channel = mkManagedChannel() + channel + } + + def shutdown(awaitTime: Duration): Unit = synchronized { + mkClosed() + if (channel != null) { + channel.shutdown() + channel.awaitTermination(awaitTime.toMillis, TimeUnit.MILLISECONDS) + channel = null + } + } + + private def checkIsClosed(): Unit = + if (isClosed) + throw new RuntimeException("managed channel is closed") + + private def mkClosed(): Unit = { + checkIsClosed() + isClosed = true + } + +} diff --git a/waves-integration/src/test/scala/com/wavesplatform/dex/grpc/integration/tool/RestartableManagedChannelSuite.scala b/waves-integration/src/test/scala/com/wavesplatform/dex/grpc/integration/tool/RestartableManagedChannelSuite.scala new file mode 100644 index 0000000000..ea70097cd3 --- /dev/null +++ b/waves-integration/src/test/scala/com/wavesplatform/dex/grpc/integration/tool/RestartableManagedChannelSuite.scala @@ -0,0 +1,80 @@ +package com.wavesplatform.dex.grpc.integration.tool + +import com.wavesplatform.dex.WavesIntegrationSuiteBase +import io.grpc.ManagedChannel +import org.scalamock.scalatest.MockFactory + +import java.util.concurrent.TimeUnit +import scala.concurrent.duration._ + +class RestartableManagedChannelSuite extends WavesIntegrationSuiteBase with MockFactory { + + "RestartableManagedChannel should" - { + + "getChannel" in { + val channel = mock[ManagedChannel] + val maker = mockFunction[ManagedChannel] + maker.expects().returning(channel).once() + val restartableManagedChannel = new RestartableManagedChannel(maker) + restartableManagedChannel.getChannel shouldBe channel + } + + "shutdown" in testShutdown { (awaitTime, restartableManagedChannel) => + restartableManagedChannel.shutdown(awaitTime) + } + + "not do any ops after shutting down" in testShutdown { (awaitTime, restartableManagedChannel) => + restartableManagedChannel.shutdown(awaitTime) + intercept[RuntimeException](restartableManagedChannel.restart()) + intercept[RuntimeException](restartableManagedChannel.getChannel) + intercept[RuntimeException](restartableManagedChannel.shutdown(awaitTime)) + } + + "restart without triggering getChannel" in { + val channel = mock[ManagedChannel] + val maker = mockFunction[ManagedChannel] + maker.expects().returning(channel).once() + val restartableManagedChannel = new RestartableManagedChannel(maker) + restartableManagedChannel.restart() + restartableManagedChannel.getChannel shouldBe channel + } + + "restart" in { + val channel1 = mock[ManagedChannel] + val channel2 = mock[ManagedChannel] + val maker = mockFunction[ManagedChannel] + maker.expects().returning(channel1).once() + maker.expects().returning(channel2).once() + (channel1.shutdown _).expects().returning(channel1).once() + val restartableManagedChannel = new RestartableManagedChannel(maker) + restartableManagedChannel.getChannel //force channel creation + restartableManagedChannel.restart() + } + + "stop current channel" in { + val channel = mock[ManagedChannel] + (channel.shutdown _).expects().returning(channel).once() + val maker = mockFunction[ManagedChannel] + maker.expects().returning(channel).once() + val restartableManagedChannel = new RestartableManagedChannel(maker) + restartableManagedChannel.getChannel //force channel creation + restartableManagedChannel.stop() + } + } + + private def testShutdown(f: (Duration, RestartableManagedChannel) => Unit): Unit = { + val awaitTime = 10.seconds + val channel = mock[ManagedChannel] + (channel.shutdown _).expects().returning(channel).once() + (channel.awaitTermination(_: Long, _: TimeUnit)) + .expects(awaitTime.toMillis, TimeUnit.MILLISECONDS) + .returning(true) + .once() + val maker = mockFunction[ManagedChannel] + maker.expects().returning(channel).once() + val restartableManagedChannel = new RestartableManagedChannel(maker) + restartableManagedChannel.getChannel //force channel creation + f(awaitTime, restartableManagedChannel) + } + +}