From b13017650bc1f63740f772f4fb60ec660155550e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Leszek=20Grucha=C5=82a?= Date: Mon, 16 Jan 2023 15:41:49 +0100 Subject: [PATCH] Migration to ZIO 2 (#14) --- .scalafmt.conf | 2 +- docker/docker-compose.yaml | 2 +- project/Common.scala | 2 +- project/Dependencies.scala | 15 +- project/build.properties | 2 +- project/plugins.sbt | 2 +- src/main/resources/logback.xml | 1 - .../io/kensu/redis_streams_zio/Consumer.scala | 50 ++----- .../io/kensu/redis_streams_zio/Producer.scala | 54 +++---- .../redis_streams_zio/common/Messages.scala | 7 +- .../redis_streams_zio/common/Scheduling.scala | 2 +- .../redis_streams_zio/config/Configs.scala | 7 +- .../logging/KensuLogAnnotation.scala | 12 +- .../redis_streams_zio/redis/RedisClient.scala | 14 +- .../redis/streams/RedisConsumer.scala | 59 ++++---- .../streams/RedisStaleEventsCollector.scala | 30 ++-- .../redis/streams/RedisStream.scala | 64 ++++---- .../notifications/NotificationsConsumer.scala | 46 +++--- .../NotificationsStaleEventsCollector.scala | 19 +-- .../services/producers/EventProducer.scala | 39 +++-- .../redis/PropertyGenerators.scala | 18 ++- .../redis/RedisConsumerSpec.scala | 137 +++++++++--------- .../redis/RedisStaleEventsCollectorSpec.scala | 45 +++--- .../producers/EventProducerSpec.scala | 35 +++-- .../NotificationsEventProducerMock.scala | 21 --- .../mocks/NotificationsRedisStreamMock.scala | 90 ++++++------ 26 files changed, 353 insertions(+), 422 deletions(-) delete mode 100644 src/test/scala/io/kensu/redis_streams_zio/specs/mocks/NotificationsEventProducerMock.scala diff --git a/.scalafmt.conf b/.scalafmt.conf index f132105..51ee7ca 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,4 +1,4 @@ -version = "3.5.8" +version = "3.6.1" runner.dialect = scala3 encoding = "UTF-8" maxColumn = 120 diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index a1c74ff..a7d8c00 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -2,7 +2,7 @@ version: '3' services: redis: - image: redis:6.2.6-alpine + image: redis:7-alpine container_name: redis-streams-zio-redis restart: unless-stopped volumes: diff --git a/project/Common.scala b/project/Common.scala index a7dc1de..cf7e0e1 100644 --- a/project/Common.scala +++ b/project/Common.scala @@ -27,7 +27,7 @@ object Common { .settings( organization := "io.kensu", name := "redis-streams-zio", - scalaVersion := "3.1.3", + scalaVersion := "3.2.1", version := "1.0.0-SNAPSHOT", scalacOptions ++= commonScalacOptions, Compile / console / scalacOptions --= Seq("-Werror"), diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 6428239..866e987 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -3,23 +3,24 @@ import sbt._ object Dependencies { object Version { - val zio = "1.0.14" - val zioConfig = "2.0.4" + val zio = "2.0.5" + val zioConfig = "3.0.7" } val zio = Seq( "dev.zio" %% "zio" % Version.zio, "dev.zio" %% "zio-streams" % Version.zio, - "dev.zio" %% "zio-logging-slf4j" % "0.5.14", + "dev.zio" %% "zio-logging-slf4j" % "2.1.7", "dev.zio" %% "zio-config-typesafe" % Version.zioConfig, "dev.zio" %% "zio-config-magnolia" % Version.zioConfig, - "dev.zio" %% "zio-test-sbt" % Version.zio % Test + "dev.zio" %% "zio-test-sbt" % Version.zio % Test, + "dev.zio" %% "zio-mock" % "1.0.0-RC9" % Test ) - val redisson = Seq("org.redisson" % "redisson" % "3.17.4") + val redisson = Seq("org.redisson" % "redisson" % "3.19.1") val logback = Seq( - "ch.qos.logback" % "logback-classic" % "1.2.11", - "org.slf4j" % "log4j-over-slf4j" % "1.7.36" + "ch.qos.logback" % "logback-classic" % "1.4.5", + "org.slf4j" % "log4j-over-slf4j" % "2.0.6" ) } diff --git a/project/build.properties b/project/build.properties index d738b85..f344c14 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version = 1.7.1 +sbt.version = 1.8.2 diff --git a/project/plugins.sbt b/project/plugins.sbt index dc308ea..74f9f92 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,2 +1,2 @@ //https://scalameta.org/scalafmt/#sbt-scalafmt -addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6") +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.0") diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 1d115ea..beee469 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -1,7 +1,6 @@ - true [%blue(%date{ISO8601})][%highlight(%-5level)][%magenta(%replace(%logger{10}.%M){'\.\?+|\$*\.\$.+\$+\d*|\$',''})][%thread] - %msg %cyan(%X{correlation_id} %X{client_ip} %marker) %n%rootException diff --git a/src/main/scala/io/kensu/redis_streams_zio/Consumer.scala b/src/main/scala/io/kensu/redis_streams_zio/Consumer.scala index 0fc792d..4af2feb 100644 --- a/src/main/scala/io/kensu/redis_streams_zio/Consumer.scala +++ b/src/main/scala/io/kensu/redis_streams_zio/Consumer.scala @@ -1,40 +1,21 @@ package io.kensu.redis_streams_zio -import io.kensu.redis_streams_zio.config.Configs +import io.kensu.redis_streams_zio.config.{Configs, NotificationsStreamConsumerConfig} import io.kensu.redis_streams_zio.logging.KensuLogAnnotation import io.kensu.redis_streams_zio.redis.RedisClient -import io.kensu.redis_streams_zio.redis.streams.{NotificationsRedisStream, StreamInstance} import io.kensu.redis_streams_zio.redis.streams.notifications.{NotificationsConsumer, NotificationsStaleEventsCollector} -import zio.* -import zio.clock.Clock +import io.kensu.redis_streams_zio.redis.streams.{NotificationsRedisStream, RedisStream, StreamInstance} import zio.config.syntax.* -import zio.duration.* import zio.logging.* -import zio.logging.slf4j.Slf4jLogger +import zio.logging.backend.SLF4J +import zio.{Clock, ZIOAppDefault, *} -object Consumer extends App: - - override def run(args: List[String]): URIO[ZEnv, ExitCode] = - streams.useForever - .provideCustomLayer(liveEnv) - .exitCode +object Consumer extends ZIOAppDefault: private val streams = - ZManaged.makeInterruptible { - for - shutdownHook <- Promise.make[Throwable, Unit] - notificationStreamFiber <- notificationsStream(shutdownHook) - yield (shutdownHook, notificationStreamFiber) - } { (shutdownHook, notificationStreamFiber) => - (for - _ <- log.info("Halting streams") - _ <- shutdownHook.succeed(()) - _ <- shutdownHook.await - _ <- log.info("Shutting down streams... this may take a few seconds") - _ <- notificationStreamFiber.join `race` ZIO.sleep(5.seconds) - _ <- log.info("Streams shut down") - yield ()).ignore - } + ZIO.acquireRelease(Promise.make[Throwable, Unit])(hook => + hook.succeed(()) *> ZIO.logInfo("Shutting down streams... this may take a moment") + ).flatMap(notificationsStream) private def notificationsStream(shutdownHook: Promise[Throwable, Unit]) = for @@ -45,19 +26,20 @@ object Consumer extends App: private val liveEnv = val appConfig = Configs.appConfig - val logging: ULayer[Logging] = Slf4jLogger.makeWithAnnotationsAsMdc( - mdcAnnotations = List(KensuLogAnnotation.CorrelationId), - logFormat = (_, msg) => msg - ) >>> Logging.modifyLogger(_.derive(KensuLogAnnotation.InitialLogContext)) + val logging = Runtime.removeDefaultLoggers >>> SLF4J.slf4j val redisClient = appConfig.narrow(_.redis) >>> RedisClient.live val notificationsStream = val streamInstance = appConfig.narrow(_.redisStreams.consumers).map(hasConsumers => - Has(StreamInstance.Notifications(hasConsumers.get.notifications.streamName)) + ZEnvironment(StreamInstance.Notifications(hasConsumers.get.notifications.streamName)) ) (streamInstance ++ redisClient) >>> NotificationsRedisStream.redisson - val clock = ZLayer.identity[Clock] + ZLayer.make[NotificationsStreamConsumerConfig & RedisStream[StreamInstance.Notifications]]( + logging, + appConfig.narrow(_.redisStreams.consumers.notifications), + notificationsStream + ) - clock ++ logging ++ appConfig.narrow(_.redisStreams.consumers.notifications) ++ notificationsStream + override val run = ZIO.scoped(streams *> ZIO.never).provideLayer(liveEnv) @@ KensuLogAnnotation.InitialLogContext diff --git a/src/main/scala/io/kensu/redis_streams_zio/Producer.scala b/src/main/scala/io/kensu/redis_streams_zio/Producer.scala index 9489ac9..8d14b1a 100644 --- a/src/main/scala/io/kensu/redis_streams_zio/Producer.scala +++ b/src/main/scala/io/kensu/redis_streams_zio/Producer.scala @@ -4,57 +4,47 @@ import io.kensu.redis_streams_zio.config.{Configs, NotificationsStreamProducerCo import io.kensu.redis_streams_zio.logging.KensuLogAnnotation import io.kensu.redis_streams_zio.redis.RedisClient import io.kensu.redis_streams_zio.redis.streams.{NotificationsRedisStream, StreamInstance} -import io.kensu.redis_streams_zio.services.producers.NotificationsEventProducer -import zio.* -import zio.clock.Clock +import io.kensu.redis_streams_zio.services.producers.{EventProducer, NotificationsEventProducer} +import zio.Random.nextString import zio.config.getConfig -import zio.config.syntax.ZIOConfigNarrowOps -import zio.duration.durationInt -import zio.logging.Logging -import zio.logging.slf4j.Slf4jLogger -import zio.random.nextString -import io.kensu.redis_streams_zio.services.producers.EventProducer -import zio.random.Random +import zio.config.syntax.* +import zio.logging.backend.SLF4J +import zio.logging.backend.SLF4J.loggerNameAnnotationKey +import zio.{Clock, Random, ZIOAppDefault, ZIOAspect, *} -object Producer extends App: +object Producer extends ZIOAppDefault: - override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = - sentNotification - .repeat(Schedule.spaced(5.seconds).jittered) - .provideCustomLayer(liveEnv) - .exitCode - - val sentNotification: ZIO[ - Has[NotificationsStreamProducerConfig] & Random & Has[EventProducer[StreamInstance.Notifications]], - Throwable, - Unit - ] = + private val sentNotification = for config <- getConfig[NotificationsStreamProducerConfig] str <- nextString(10) - _ <- NotificationsEventProducer(_.publish(config.addKey, str)) + _ <- ZIO.serviceWithZIO[EventProducer[StreamInstance.Notifications]](_.publish(config.addKey, str)) yield () private val liveEnv = val appConfig = Configs.appConfig val producerConfig = appConfig.narrow(_.redisStreams.producers.notifications) - val logging: ULayer[Logging] = Slf4jLogger.makeWithAnnotationsAsMdc( - mdcAnnotations = List(KensuLogAnnotation.CorrelationId), - logFormat = (_, msg) => msg - ) >>> Logging.modifyLogger(_.derive(KensuLogAnnotation.InitialLogContext)) + val logging = Runtime.removeDefaultLoggers >>> SLF4J.slf4j val redisClient = appConfig.narrow(_.redis) >>> RedisClient.live - val clock = ZLayer.identity[Clock] - val notificationsStream = val redisStream = val streamInstance = appConfig.narrow(_.redisStreams.producers).map { hasProducers => - Has(StreamInstance.Notifications(hasProducers.get.notifications.streamName)) + ZEnvironment(StreamInstance.Notifications(hasProducers.get.notifications.streamName)) } (streamInstance ++ redisClient) >>> NotificationsRedisStream.redisson - (redisStream ++ clock ++ logging) >>> NotificationsEventProducer.redis + (redisStream ++ logging) >>> NotificationsEventProducer.redis - clock ++ logging ++ producerConfig ++ notificationsStream + ZLayer.make[NotificationsStreamProducerConfig & EventProducer[StreamInstance.Notifications]]( + logging, + producerConfig, + notificationsStream + ) + + override val run = + sentNotification + .repeat(Schedule.spaced(5.seconds).jittered) + .provideLayer(liveEnv) @@ KensuLogAnnotation.InitialLogContext diff --git a/src/main/scala/io/kensu/redis_streams_zio/common/Messages.scala b/src/main/scala/io/kensu/redis_streams_zio/common/Messages.scala index d4fb520..77bdd51 100644 --- a/src/main/scala/io/kensu/redis_streams_zio/common/Messages.scala +++ b/src/main/scala/io/kensu/redis_streams_zio/common/Messages.scala @@ -1,6 +1,5 @@ package io.kensu.redis_streams_zio.common -opaque type CorrelationId = String - -object CorrelationId: - def apply(value: String): CorrelationId = value +final case class CorrelationId(value: String) extends AnyVal { + override def toString: String = value +} diff --git a/src/main/scala/io/kensu/redis_streams_zio/common/Scheduling.scala b/src/main/scala/io/kensu/redis_streams_zio/common/Scheduling.scala index 166a51a..80d271c 100644 --- a/src/main/scala/io/kensu/redis_streams_zio/common/Scheduling.scala +++ b/src/main/scala/io/kensu/redis_streams_zio/common/Scheduling.scala @@ -1,7 +1,7 @@ package io.kensu.redis_streams_zio.common import zio.Schedule -import zio.duration.{Duration, *} +import zio.{Duration, *} object Scheduling: diff --git a/src/main/scala/io/kensu/redis_streams_zio/config/Configs.scala b/src/main/scala/io/kensu/redis_streams_zio/config/Configs.scala index 8883797..74e496c 100644 --- a/src/main/scala/io/kensu/redis_streams_zio/config/Configs.scala +++ b/src/main/scala/io/kensu/redis_streams_zio/config/Configs.scala @@ -5,8 +5,7 @@ import zio.config.ConfigDescriptor.* import zio.config.* import zio.config.magnolia.{descriptor, Descriptor} import zio.config.typesafe.TypesafeConfig -import zio.duration.Duration -import zio.{Has, Layer, UIO} +import zio.* final case class RootConfig( kensu: AppConfig @@ -108,5 +107,5 @@ object Configs: import zio.config.syntax.* import zio.config.typesafe.* - val appConfig: Layer[ReadError[String], Has[AppConfig]] = - read(descriptor[RootConfig].from(TypesafeConfigSource.fromResourcePath)).map(_.kensu).toLayer + val appConfig: Layer[ReadError[String], AppConfig] = + ZLayer.fromZIO(read(descriptor[RootConfig].from(TypesafeConfigSource.fromResourcePath)).map(_.kensu)) diff --git a/src/main/scala/io/kensu/redis_streams_zio/logging/KensuLogAnnotation.scala b/src/main/scala/io/kensu/redis_streams_zio/logging/KensuLogAnnotation.scala index 8500bc8..45af6ef 100644 --- a/src/main/scala/io/kensu/redis_streams_zio/logging/KensuLogAnnotation.scala +++ b/src/main/scala/io/kensu/redis_streams_zio/logging/KensuLogAnnotation.scala @@ -1,18 +1,16 @@ package io.kensu.redis_streams_zio.logging import io.kensu.redis_streams_zio.common.CorrelationId -import zio.logging.LogAnnotation -import zio.logging.LogContext +import zio.logging.{LogAnnotation, LogContext} object KensuLogAnnotation: val AppCorrelationId: CorrelationId = io.kensu.redis_streams_zio.common.CorrelationId("application") val CorrelationId: LogAnnotation[CorrelationId] = LogAnnotation[CorrelationId]( - name = "correlation_id", - initialValue = AppCorrelationId, - combine = (_, r) => r, - render = _.toString + name = "correlation_id", + combine = (_, r) => r, + render = _.value ) - val InitialLogContext: LogContext => LogContext = KensuLogAnnotation.CorrelationId(AppCorrelationId) + val InitialLogContext = KensuLogAnnotation.CorrelationId(AppCorrelationId) diff --git a/src/main/scala/io/kensu/redis_streams_zio/redis/RedisClient.scala b/src/main/scala/io/kensu/redis_streams_zio/redis/RedisClient.scala index e66b0ab..0e75377 100644 --- a/src/main/scala/io/kensu/redis_streams_zio/redis/RedisClient.scala +++ b/src/main/scala/io/kensu/redis_streams_zio/redis/RedisClient.scala @@ -10,11 +10,9 @@ import zio.config.getConfig object RedisClient: - type RedisClient = Has[RedissonClient] - - val live: ZLayer[Has[RedisConfig], Throwable, RedisClient] = - ZLayer.fromManaged { - ZManaged.make( + val live: ZLayer[RedisConfig, Throwable, RedissonClient] = + ZLayer.scoped { + ZIO.acquireRelease( getConfig[RedisConfig].map { config => val redissonConfig = new Config() redissonConfig.setCodec(new ByteArrayCodec()) @@ -24,8 +22,8 @@ object RedisClient: .setPassword(config.password) Redisson.create(redissonConfig); } - )(c => ZIO.effect(c.shutdown()).orDie) + )(c => ZIO.logInfo("Shutting down Redisson") *> ZIO.attempt(c.shutdown()).orDie) } - def getStream[K, V](name: StreamName): ZIO[RedisClient, Nothing, RStream[K, V]] = - ZIO.service[RedissonClient].flatMap(client => UIO(client.getStream[K, V](name.value))) + def getStream[K, V](name: StreamName): ZIO[RedissonClient, Nothing, RStream[K, V]] = + ZIO.serviceWith[RedissonClient](_.getStream[K, V](name.value)) diff --git a/src/main/scala/io/kensu/redis_streams_zio/redis/streams/RedisConsumer.scala b/src/main/scala/io/kensu/redis_streams_zio/redis/streams/RedisConsumer.scala index d2f64cf..0c35036 100644 --- a/src/main/scala/io/kensu/redis_streams_zio/redis/streams/RedisConsumer.scala +++ b/src/main/scala/io/kensu/redis_streams_zio/redis/streams/RedisConsumer.scala @@ -4,40 +4,40 @@ import io.kensu.redis_streams_zio.common.Scheduling import io.kensu.redis_streams_zio.config.StreamConsumerConfig import org.redisson.api.StreamMessageId import zio.* -import zio.clock.* import zio.config.getConfig import zio.logging.* import zio.stream.ZStream import java.time.Instant +import zio.Clock object RedisConsumer: type StreamInput[S <: StreamInstance, C <: StreamConsumerConfig] = - ZStream[Has[RedisStream[S]] & Has[C] & Logging & Clock, Throwable, ReadGroupResult] + ZStream[RedisStream[S] & C, Throwable, ReadGroupResult] type StreamOutput[R, S <: StreamInstance, C <: StreamConsumerConfig] = - ZStream[R & Has[RedisStream[S]] & Has[C] & Logging & Clock, Throwable, Option[StreamMessageId]] + ZStream[R & RedisStream[S] & C, Throwable, Option[StreamMessageId]] def executeFor[R, S <: StreamInstance, C <: StreamConsumerConfig]( shutdownHook: Promise[Throwable, Unit], eventsProcessor: StreamInput[S, C] => StreamOutput[R, S, C], repeatStrategy: Schedule[R, Any, Unit] = Schedule.forever.unit - )(using Tag[RedisStream[S]], Tag[C]): ZIO[R & Has[RedisStream[S]] & Has[C] & Logging & Clock, Throwable, Long] = + )(using Tag[RedisStream[S]], Tag[C]): ZIO[R & RedisStream[S] & C, Throwable, Long] = ZIO.service[C].flatMap { config => - def setupStream(status: RefM[StreamSourceStatus]) = + def setupStream(status: Ref.Synchronized[StreamSourceStatus]) = ZStream - .fromEffect(getEvents[Nothing, S, C](status)) + .fromZIO(getEvents[Nothing, S, C](status)) .flattenChunks .groupByKey(_.data.isEmpty) { case (true, stream) => stream - .tap(r => log.debug(s"Event ${r.messageId} has no value, will skip processing and acknowledge")) + .tap(r => ZIO.logDebug(s"Event ${r.messageId} has no value, will skip processing and acknowledge")) .map(r => Some(r.messageId)) case (false, stream) => - stream.via(eventsProcessor) + stream.viaFunction(eventsProcessor) } - .mapM(acknowledge[S, C]) + .mapZIO(acknowledge[S, C]) .repeat(repeatStrategy) .haltWhen(shutdownHook) @@ -46,16 +46,16 @@ object RedisConsumer: _ <- assureStreamGroup result <- setupStream(streamStatus).runSum yield result) - .tapCause(t => log.error(s"Failed processing ${config.streamName} stream, will be retried", t)) + .tapErrorCause(t => ZIO.logErrorCause(s"Failed processing ${config.streamName} stream, will be retried", t)) .retry(Scheduling.exponentialBackoff(config.retry.min, config.retry.max, config.retry.factor)) } private val initialStreamStatus = - clock.instant - .flatMap(t => - RefM.make( + Clock.instant + .flatMap(now => + Ref.Synchronized.make( StreamSourceStatus( - lastPendingAttempt = t, + lastPendingAttempt = now, keepPending = true, checkedMessages = Set.empty ) @@ -69,17 +69,17 @@ object RedisConsumer: getConfig[C].flatMap { config => val groupName = config.groupName for - _ <- log.info(s"Looking for Redis group $groupName") + _ <- ZIO.logInfo(s"Looking for Redis group $groupName") exists <- RedisStream.listGroups[S].map(_.exists(_.getName == groupName.value)) - res <- if exists then log.info(s"Redis consumer group $groupName already created") + res <- if exists then ZIO.logInfo(s"Redis consumer group $groupName already created") else - log.info(s"Creating Redis consumer group $groupName") *> + ZIO.logInfo(s"Creating Redis consumer group $groupName") *> RedisStream.createGroup[S](groupName, CreateGroupStrategy.Newest) yield res } private def getEvents[R, S <: StreamInstance, C <: StreamConsumerConfig]( - streamStatus: RefM[StreamSourceStatus] + streamStatus: Ref.Synchronized[StreamSourceStatus] )(using Tag[RedisStream[S]], Tag[C]) = getConfig[C].flatMap { config => val group = config.groupName @@ -91,27 +91,27 @@ object RedisConsumer: else "new" -> ListGroupStrategy.New val commonLogMsg = s"$logMsgType events for group $group and consumer $consumer" - log.debug(s"Attempt to check $commonLogMsg") *> + ZIO.logDebug(s"Attempt to check $commonLogMsg") *> RedisStream .readGroup[S](group, consumer, 10, config.readTimeout, msgType) .map { events => if checkPending then events.filterNot(e => checkedMessages.contains(e.messageId)) else events } - .tap(result => log.info(s"Got ${result.size} $commonLogMsg").when(result.nonEmpty)) - .tapCause(c => log.error(s"Failed to consume $commonLogMsg", c)) + .tap(result => ZIO.logInfo(s"Got ${result.size} $commonLogMsg").when(result.nonEmpty)) + .tapErrorCause(c => ZIO.logErrorCause(s"Failed to consume $commonLogMsg", c)) def shouldCheckPending(status: StreamSourceStatus, currentInstant: Instant) = status.keepPending || status.lastPendingAttempt .plusMillis(config.checkPendingEvery.toMillis) .isBefore(currentInstant) - streamStatus.modify { oldStatus => - for - now <- clock.instant + streamStatus.modifyZIO { oldStatus => + for { + now <- Clock.instant checkPending = shouldCheckPending(oldStatus, now) events <- loadEvents(checkPending, oldStatus.checkedMessages) - yield + } yield { val newStatus = if checkPending then StreamSourceStatus( @@ -121,6 +121,7 @@ object RedisConsumer: ) else oldStatus.copy(keepPending = false, checkedMessages = Set.empty) events -> newStatus + } } } @@ -132,14 +133,14 @@ object RedisConsumer: val commonLogMsg = s"for group ${config.groupName} and consumer ${config.consumerName}" msgId match case Some(redisId) => - log.debug(s"Attempt to acknowledge Redis event $redisId $commonLogMsg") *> + ZIO.logDebug(s"Attempt to acknowledge Redis event $redisId $commonLogMsg") *> RedisStream .ack[S](config.groupName, NonEmptyChunk.single(redisId)) .tapBoth( - t => log.throwable(s"Failed to acknowledge Redis event $redisId $commonLogMsg", t), - _ => log.info(s"Successfully acknowledged Redis event $redisId $commonLogMsg") + t => ZIO.logErrorCause(s"Failed to acknowledge Redis event $redisId $commonLogMsg", Cause.fail(t)), + _ => ZIO.logInfo(s"Successfully acknowledged Redis event $redisId $commonLogMsg") ) - case None => UIO(0L) + case None => ZIO.succeed(0L) } private[streams] case class StreamSourceStatus( diff --git a/src/main/scala/io/kensu/redis_streams_zio/redis/streams/RedisStaleEventsCollector.scala b/src/main/scala/io/kensu/redis_streams_zio/redis/streams/RedisStaleEventsCollector.scala index 5745a8a..1b8f469 100644 --- a/src/main/scala/io/kensu/redis_streams_zio/redis/streams/RedisStaleEventsCollector.scala +++ b/src/main/scala/io/kensu/redis_streams_zio/redis/streams/RedisStaleEventsCollector.scala @@ -4,7 +4,7 @@ import io.kensu.redis_streams_zio.common.Scheduling import io.kensu.redis_streams_zio.config.StreamConsumerConfig import org.redisson.api.StreamMessageId import zio.* -import zio.clock.Clock +import zio.Clock import zio.config.getConfig import zio.logging.* @@ -19,7 +19,7 @@ object RedisStaleEventsCollector: )( using Tag[RedisStream[S]], Tag[C] - ): ZIO[Has[RedisStream[S]] & Has[C] & Logging & Clock, Throwable, Long] = + ): ZIO[RedisStream[S] & C, Throwable, Long] = getConfig[C].flatMap { config => getPendingEvents .repeat( @@ -27,7 +27,9 @@ object RedisStaleEventsCollector: .getOrElse(Schedule.fixed(config.claiming.repeatEvery)) *> Schedule.collectAll ) .delay(config.claiming.initialDelay) - .tapCause(t => log.error(s"Failed claiming process for ${config.streamName} stream, will be retried", t)) + .tapErrorCause(t => + ZIO.logErrorCause(s"Failed claiming process for ${config.streamName} stream, will be retried", t) + ) .retry(Scheduling.exponentialBackoff(config.retry.min, config.retry.max, config.retry.factor)) .map(_.sum) } @@ -39,7 +41,7 @@ object RedisStaleEventsCollector: getConfig[C].flatMap { config => val group = config.groupName val consumer = config.consumerName - log.debug(s"Listing pending messages for group $group and consumer $consumer") *> + ZIO.logDebug(s"Listing pending messages for group $group and consumer $consumer") *> RedisStream.listPending[S](group, 100).flatMap { pendingEntries => val conf = config.claiming val (deliveriesExceededMessages, rest) = @@ -64,14 +66,14 @@ object RedisStaleEventsCollector: val batchSize = messageIds.size val commonLogMsg = s"batch of $batchSize messages for group $group" NonEmptyChunk.fromChunk(messageIds) match - case None => UIO(0L) + case None => ZIO.succeed(0L) case Some(ids) => - log.debug(s"Attempt to acknowledge $commonLogMsg") *> + ZIO.logDebug(s"Attempt to acknowledge $commonLogMsg") *> RedisStream .ack[S](group, ids) .tapBoth( - t => log.throwable(s"Failed to acknowledge $commonLogMsg", t), - _ => log.info(s"Successfully acknowledged $commonLogMsg") + t => ZIO.logErrorCause(s"Failed to acknowledge $commonLogMsg", Cause.fail(t)), + _ => ZIO.logInfo(s"Successfully acknowledged $commonLogMsg") ) } @@ -84,14 +86,18 @@ object RedisStaleEventsCollector: val batchSize = messageIds.size val commonLogMsg = s"for group $group to consumer $consumer" NonEmptyChunk.fromChunk(messageIds) match - case None => UIO(0L) + case None => ZIO.succeed(0L) case Some(ids) => - log.debug(s"Attempt to claim batch of $batchSize messages $commonLogMsg $ids") *> + ZIO.logDebug(s"Attempt to claim batch of $batchSize messages $commonLogMsg $ids") *> RedisStream .fastClaim[S](group, consumer, config.claiming.maxIdleTime, ids) .map(_.size.toLong) .tapBoth( - t => log.throwable(s"Failed to claim $commonLogMsg (maybe an attempt to claim the same resource)", t), - size => log.info(s"Successfully claimed batch of $size messages $commonLogMsg") + t => + ZIO.logErrorCause( + s"Failed to claim $commonLogMsg (maybe an attempt to claim the same resource)", + Cause.fail(t) + ), + size => ZIO.logInfo(s"Successfully claimed batch of $size messages $commonLogMsg") ) } diff --git a/src/main/scala/io/kensu/redis_streams_zio/redis/streams/RedisStream.scala b/src/main/scala/io/kensu/redis_streams_zio/redis/streams/RedisStream.scala index 66e784f..9e378b2 100644 --- a/src/main/scala/io/kensu/redis_streams_zio/redis/streams/RedisStream.scala +++ b/src/main/scala/io/kensu/redis_streams_zio/redis/streams/RedisStream.scala @@ -3,12 +3,11 @@ package io.kensu.redis_streams_zio.redis.streams import java.util.concurrent.TimeUnit import io.kensu.redis_streams_zio.config.{StreamConsumerName, StreamGroupName, StreamKey} -import io.kensu.redis_streams_zio.redis.RedisClient.RedisClient import org.redisson.api.{PendingEntry, RedissonClient, StreamGroup, StreamMessageId} import org.redisson.api.stream.StreamAddArgs import org.redisson.client.RedisException import zio.* -import zio.duration.Duration +import zio.Duration import scala.jdk.CollectionConverters.* @@ -62,17 +61,17 @@ trait RedisStream[S <: StreamInstance]: def add(key: StreamKey, payload: Chunk[Byte]): Task[StreamMessageId] -private[streams] final class RedissonRedisStream[S <: StreamInstance]( +final case class RedissonRedisStream[S <: StreamInstance]( instance: S, redisson: RedissonClient ) extends RedisStream[S]: private val redissonStream = redisson.getStream[Array[Byte], Array[Byte]](instance.name.value) - override val streamInstance: UIO[StreamInstance] = UIO(instance) + override val streamInstance: UIO[StreamInstance] = ZIO.succeed(instance) override val listGroups: Task[Chunk[StreamGroup]] = - Task + ZIO .fromCompletionStage(redissonStream.listGroupsAsync()) .map(l => Chunk.fromIterable(l.asScala)) .catchSome { @@ -84,7 +83,7 @@ private[streams] final class RedissonRedisStream[S <: StreamInstance]( strategy match case CreateGroupStrategy.Newest => StreamMessageId.NEWEST case CreateGroupStrategy.All => StreamMessageId.ALL - Task.fromCompletionStage(redissonStream.createGroupAsync(groupName.value, redisStrategy)).unit + ZIO.fromCompletionStage(redissonStream.createGroupAsync(groupName.value, redisStrategy)).unit override def readGroup( groupName: StreamGroupName, @@ -97,7 +96,7 @@ private[streams] final class RedissonRedisStream[S <: StreamInstance]( strategy match case ListGroupStrategy.New => StreamMessageId.NEVER_DELIVERED case ListGroupStrategy.Pending => StreamMessageId.ALL - Task + ZIO .fromCompletionStage( redissonStream .readGroupAsync( @@ -109,7 +108,7 @@ private[streams] final class RedissonRedisStream[S <: StreamInstance]( redisStrategy ) ).flatMap { messages => - ZIO.effect { + ZIO.attempt { if messages == null then Chunk.empty else Chunk.fromIterable(messages.asScala).map { (msgId, m) => @@ -122,7 +121,7 @@ private[streams] final class RedissonRedisStream[S <: StreamInstance]( } override def ack(groupName: StreamGroupName, ids: NonEmptyChunk[StreamMessageId]): Task[Long] = - Task.fromCompletionStage(redissonStream.ackAsync(groupName.value, ids*)).map(_.longValue()) + ZIO.fromCompletionStage(redissonStream.ackAsync(groupName.value, ids*)).map(_.longValue()) override def fastClaim( groupName: StreamGroupName, @@ -130,7 +129,7 @@ private[streams] final class RedissonRedisStream[S <: StreamInstance]( maxIdleTime: Duration, ids: NonEmptyChunk[StreamMessageId] ): Task[Chunk[StreamMessageId]] = - Task + ZIO .fromCompletionStage( redissonStream .fastClaimAsync( @@ -144,28 +143,28 @@ private[streams] final class RedissonRedisStream[S <: StreamInstance]( .map(l => Chunk.fromIterable(l.asScala)) override def listPending(groupName: StreamGroupName, count: Int): Task[Chunk[PendingEntry]] = - Task + ZIO .fromCompletionStage( redissonStream.listPendingAsync(groupName.value, StreamMessageId.MIN, StreamMessageId.MAX, count) ) .map(l => Chunk.fromIterable(l.asScala)) override def add(key: StreamKey, payload: Chunk[Byte]): Task[StreamMessageId] = - Task.fromCompletionStage(redissonStream.addAsync(StreamAddArgs.entry(key.value.getBytes("UTF-8"), payload.toArray))) + ZIO.fromCompletionStage(redissonStream.addAsync(StreamAddArgs.entry(key.value.getBytes("UTF-8"), payload.toArray))) object RedisStream: - def streamInstance[S <: StreamInstance](using Tag[RedisStream[S]]): RIO[Has[RedisStream[S]], StreamInstance] = - ZIO.serviceWith[RedisStream[S]](_.streamInstance) + def streamInstance[S <: StreamInstance](using Tag[RedisStream[S]]): RIO[RedisStream[S], StreamInstance] = + ZIO.serviceWithZIO[RedisStream[S]](_.streamInstance) - def listGroups[S <: StreamInstance](using Tag[RedisStream[S]]): RIO[Has[RedisStream[S]], Chunk[StreamGroup]] = - ZIO.serviceWith[RedisStream[S]](_.listGroups) + def listGroups[S <: StreamInstance](using Tag[RedisStream[S]]): RIO[RedisStream[S], Chunk[StreamGroup]] = + ZIO.serviceWithZIO[RedisStream[S]](_.listGroups) def createGroup[S <: StreamInstance]( groupName: StreamGroupName, strategy: CreateGroupStrategy - )(using Tag[RedisStream[S]]): RIO[Has[RedisStream[S]], Unit] = - ZIO.serviceWith[RedisStream[S]](_.createGroup(groupName, strategy)) + )(using Tag[RedisStream[S]]): RIO[RedisStream[S], Unit] = + ZIO.serviceWithZIO[RedisStream[S]](_.createGroup(groupName, strategy)) def readGroup[S <: StreamInstance]( groupName: StreamGroupName, @@ -173,39 +172,38 @@ object RedisStream: count: Int, timeout: Duration, strategy: ListGroupStrategy - )(using Tag[RedisStream[S]]): RIO[Has[RedisStream[S]], Chunk[ReadGroupResult]] = - ZIO.serviceWith[RedisStream[S]](_.readGroup(groupName, consumerName, count, timeout, strategy)) + )(using Tag[RedisStream[S]]): RIO[RedisStream[S], Chunk[ReadGroupResult]] = + ZIO.serviceWithZIO[RedisStream[S]](_.readGroup(groupName, consumerName, count, timeout, strategy)) def ack[S <: StreamInstance]( groupName: StreamGroupName, ids: NonEmptyChunk[StreamMessageId] - )(using Tag[RedisStream[S]]): RIO[Has[RedisStream[S]], Long] = - ZIO.serviceWith[RedisStream[S]](_.ack(groupName, ids)) + )(using Tag[RedisStream[S]]): RIO[RedisStream[S], Long] = + ZIO.serviceWithZIO[RedisStream[S]](_.ack(groupName, ids)) def fastClaim[S <: StreamInstance]( groupName: StreamGroupName, consumerName: StreamConsumerName, maxIdleTime: Duration, ids: NonEmptyChunk[StreamMessageId] - )(using Tag[RedisStream[S]]): RIO[Has[RedisStream[S]], Chunk[StreamMessageId]] = - ZIO.serviceWith[RedisStream[S]](_.fastClaim(groupName, consumerName, maxIdleTime, ids)) + )(using Tag[RedisStream[S]]): RIO[RedisStream[S], Chunk[StreamMessageId]] = + ZIO.serviceWithZIO[RedisStream[S]](_.fastClaim(groupName, consumerName, maxIdleTime, ids)) def listPending[S <: StreamInstance]( groupName: StreamGroupName, count: Int - )(using Tag[RedisStream[S]]): RIO[Has[RedisStream[S]], Chunk[PendingEntry]] = - ZIO.serviceWith[RedisStream[S]](_.listPending(groupName, count)) + )(using Tag[RedisStream[S]]): RIO[RedisStream[S], Chunk[PendingEntry]] = + ZIO.serviceWithZIO[RedisStream[S]](_.listPending(groupName, count)) def add[S <: StreamInstance](key: StreamKey, payload: Chunk[Byte])( using Tag[RedisStream[S]] - ): RIO[Has[RedisStream[S]], StreamMessageId] = - ZIO.serviceWith[RedisStream[S]](_.add(key, payload)) + ): RIO[RedisStream[S], StreamMessageId] = + ZIO.serviceWithZIO[RedisStream[S]](_.add(key, payload)) /** An additional, stream instance predefined definition for easier API usage and future refactoring. */ -object NotificationsRedisStream extends Accessible[RedisStream[StreamInstance.Notifications]]: +object NotificationsRedisStream: val redisson: URLayer[ - Has[StreamInstance.Notifications] & RedisClient, - Has[RedisStream[StreamInstance.Notifications]] - ] = - (new RedissonRedisStream[StreamInstance.Notifications](_, _)).toLayer + StreamInstance.Notifications & RedissonClient, + RedisStream[StreamInstance.Notifications] + ] = ZLayer.fromFunction(RedissonRedisStream[StreamInstance.Notifications].apply) diff --git a/src/main/scala/io/kensu/redis_streams_zio/redis/streams/notifications/NotificationsConsumer.scala b/src/main/scala/io/kensu/redis_streams_zio/redis/streams/notifications/NotificationsConsumer.scala index 220775a..bcae14f 100644 --- a/src/main/scala/io/kensu/redis_streams_zio/redis/streams/notifications/NotificationsConsumer.scala +++ b/src/main/scala/io/kensu/redis_streams_zio/redis/streams/notifications/NotificationsConsumer.scala @@ -2,33 +2,24 @@ package io.kensu.redis_streams_zio.redis.streams.notifications import io.kensu.redis_streams_zio.common.RetryableStreamError import io.kensu.redis_streams_zio.config.NotificationsStreamConsumerConfig +import io.kensu.redis_streams_zio.redis.streams.* import io.kensu.redis_streams_zio.redis.streams.dto.{Event, IncorrectEvent, NotificationAddedEvent} -import io.kensu.redis_streams_zio.redis.streams.{ReadGroupData, ReadGroupResult, RedisConsumer, StreamInstance} import zio.* import zio.config.getConfig -import zio.logging.LogAnnotation.Name import zio.logging.* -import io.kensu.redis_streams_zio.redis.streams.RedisStream -import zio.clock.Clock +import zio.logging.backend.SLF4J object NotificationsConsumer: def run(shutdownHook: Promise[Throwable, Unit]): ZIO[ - Logging & Has[NotificationsStreamConsumerConfig] & - Has[RedisStream[StreamInstance.Notifications]] & Has[NotificationsStreamConsumerConfig] & Logging & Clock, + RedisStream[StreamInstance.Notifications] & NotificationsStreamConsumerConfig, Throwable, Long ] = - log.locally(Name(List(getClass.getName))) { - RedisConsumer.executeFor[ - Has[NotificationsStreamConsumerConfig], - StreamInstance.Notifications, - NotificationsStreamConsumerConfig - ]( - shutdownHook = shutdownHook, - eventsProcessor = _.mapM(eventParser).flattenChunks.mapMPar(4)(eventProcessor) - ) - } + RedisConsumer.executeFor[Any, StreamInstance.Notifications, NotificationsStreamConsumerConfig]( + shutdownHook = shutdownHook, + eventsProcessor = _.mapZIO(eventParser).flattenChunks.mapZIOPar(4)(eventProcessor) + ) @@ SLF4J.loggerName(getClass.getName) private def eventParser(rawResult: ReadGroupResult) = getConfig[NotificationsStreamConsumerConfig].flatMap { config => @@ -39,38 +30,35 @@ object NotificationsConsumer: case ReadGroupData(key, value) => key match case config.addKey => - log.info(s"Parsing add event $msgId") *> - ZIO.effect(NotificationAddedEvent(msgId, new String(value.toArray, "UTF-8"))) + ZIO.logInfo(s"Parsing add event $msgId") *> + ZIO.attempt(NotificationAddedEvent(msgId, new String(value.toArray, "UTF-8"))) case _ => - log.info(s"Received unsupported stream key $key for event $msgId") *> - ZIO.effectTotal(IncorrectEvent(msgId)) + ZIO.logInfo(s"Received unsupported stream key $key for event $msgId") *> + ZIO.succeed(IncorrectEvent(msgId)) } .catchAllCause(ex => - log - .error(s"Failed to deserialize event $msgId", ex) + ZIO.logErrorCause(s"Failed to deserialize event $msgId", ex) .as(Chunk(IncorrectEvent(msgId))) ) } private def eventProcessor(event: Event) = val id = event.streamMessageId - log.debug(s"Processing event $event") *> + ZIO.logDebug(s"Processing event $event") *> additionalWork(event) .as(id) .asSome .catchAll { case RetryableStreamError => - log - .warn(s"StreamMessageId $id was not processed successfully, scheduled for pending") + ZIO.logWarning(s"StreamMessageId $id was not processed successfully, scheduled for pending") .as(None) case t => - log - .throwable(s"StreamMessageId $id was not processed successfully and can't be retried", t) + ZIO.logErrorCause(s"StreamMessageId $id was not processed successfully and can't be retried", Cause.fail(t)) .as(id) .asSome } private def additionalWork(event: Event) = event match - case IncorrectEvent(msgId) => Task(s"Nothing to do for event $msgId") + case IncorrectEvent(msgId) => ZIO.attempt(s"Nothing to do for event $msgId") case NotificationAddedEvent(msgId, payload) => - Task.effect(s"Effectfully processed add notification event $msgId with data $payload") + ZIO.attempt(s"Effectfully processed add notification event $msgId with data $payload") diff --git a/src/main/scala/io/kensu/redis_streams_zio/redis/streams/notifications/NotificationsStaleEventsCollector.scala b/src/main/scala/io/kensu/redis_streams_zio/redis/streams/notifications/NotificationsStaleEventsCollector.scala index 3dae578..ce8eacc 100644 --- a/src/main/scala/io/kensu/redis_streams_zio/redis/streams/notifications/NotificationsStaleEventsCollector.scala +++ b/src/main/scala/io/kensu/redis_streams_zio/redis/streams/notifications/NotificationsStaleEventsCollector.scala @@ -1,21 +1,18 @@ package io.kensu.redis_streams_zio.redis.streams.notifications import io.kensu.redis_streams_zio.config.NotificationsStreamConsumerConfig -import io.kensu.redis_streams_zio.redis.streams.{RedisStaleEventsCollector, StreamInstance} -import zio.logging.LogAnnotation.Name -import zio.logging.log -import io.kensu.redis_streams_zio.redis.streams.RedisStream -import zio.{Has, ZIO} -import zio.clock.Clock -import zio.logging.Logging +import io.kensu.redis_streams_zio.redis.streams.{RedisStaleEventsCollector, RedisStream, StreamInstance} +import zio.{Clock, ZIO} +import zio.logging.backend.SLF4J object NotificationsStaleEventsCollector: def run(): ZIO[ - Logging & Has[RedisStream[StreamInstance.Notifications]] & Has[NotificationsStreamConsumerConfig] & Logging & Clock, + RedisStream[StreamInstance.Notifications] & NotificationsStreamConsumerConfig, Throwable, Long ] = - log.locally(Name(List(getClass.getName))) { - RedisStaleEventsCollector.executeFor[StreamInstance.Notifications, NotificationsStreamConsumerConfig]() - } + RedisStaleEventsCollector.executeFor[ + StreamInstance.Notifications, + NotificationsStreamConsumerConfig + ]() @@ SLF4J.loggerName(getClass.getName) diff --git a/src/main/scala/io/kensu/redis_streams_zio/services/producers/EventProducer.scala b/src/main/scala/io/kensu/redis_streams_zio/services/producers/EventProducer.scala index 071d812..2b2dfa8 100644 --- a/src/main/scala/io/kensu/redis_streams_zio/services/producers/EventProducer.scala +++ b/src/main/scala/io/kensu/redis_streams_zio/services/producers/EventProducer.scala @@ -5,9 +5,7 @@ import io.kensu.redis_streams_zio.redis.streams.{RedisStream, StreamInstance} import io.kensu.redis_streams_zio.redis.streams.NotificationsRedisStream import zio.* import zio.Schedule.Decision -import zio.clock.Clock -import zio.duration.* -import zio.logging.{Logger, Logging} +import zio.Clock trait EventSerializable[E]: def serialize(e: E): Array[Byte] @@ -42,42 +40,41 @@ trait EventProducer[S <: StreamInstance]: event: E ): Task[PublishedEventId] -final case class RedisEventProducer[S <: StreamInstance: Tag]( - stream: RedisStream[S], - clock: Clock.Service, - log: Logger[String] -) extends EventProducer[S]: - - private val env = Has(clock) +final case class RedisEventProducer[S <: StreamInstance: Tag](stream: RedisStream[S]) extends EventProducer[S]: override def publish[E: EventSerializable: Tag](key: StreamKey, event: E): Task[PublishedEventId] = stream.streamInstance.map(_.name).flatMap { streamName => val send = - log.debug(s"Producing event to $streamName -> $key") *> + ZIO.logDebug(s"Producing event to $streamName -> $key") *> stream .add(key, Chunk.fromArray(EventSerializable[E].serialize(event))) .map(redisId => PublishedEventId(redisId.toString)) .tapBoth( - ex => log.throwable(s"Failed to produce an event to $streamName -> $key", ex), - msgId => log.info(s"Successfully produced an event to $streamName -> $key. StreamMessageId: $msgId") + ex => ZIO.logErrorCause(s"Failed to produce an event to $streamName -> $key", Cause.fail(ex)), + msgId => ZIO.logInfo(s"Successfully produced an event to $streamName -> $key. StreamMessageId: $msgId") ) val retryPolicy = Schedule.exponential(3.seconds) *> Schedule .recurs(3) - .onDecision { - case Decision.Done(_) => log.warn(s"An event is done retrying publishing") - case Decision.Continue(attempt, _, _) => log.info(s"An event will be retried #${attempt + 1}") + .onDecision { (attempt, _, decision) => + decision match + case Decision.Done => ZIO.logWarning(s"An event is done retrying publishing").when(attempt > 0) + case Decision.Continue(_) => ZIO.logInfo(s"An event will be retried #${attempt + 1}") } - send.retry(retryPolicy).provide(env) + send.retry(retryPolicy) } /** An additional, stream instance predefined definition for easier API usage and future refactoring. */ -object NotificationsEventProducer extends Accessible[EventProducer[StreamInstance.Notifications]]: +object NotificationsEventProducer: val redis: URLayer[ - Has[RedisStream[StreamInstance.Notifications]] & Clock & Logging, - Has[EventProducer[StreamInstance.Notifications]] + RedisStream[StreamInstance.Notifications], + EventProducer[StreamInstance.Notifications] ] = - (RedisEventProducer[StreamInstance.Notifications](_, _, _)).toLayer + ZLayer { + for { + stream <- ZIO.service[RedisStream[StreamInstance.Notifications]] + } yield RedisEventProducer[StreamInstance.Notifications](stream) + } diff --git a/src/test/scala/io/kensu/redis_streams_zio/redis/PropertyGenerators.scala b/src/test/scala/io/kensu/redis_streams_zio/redis/PropertyGenerators.scala index bc29f23..6cae412 100644 --- a/src/test/scala/io/kensu/redis_streams_zio/redis/PropertyGenerators.scala +++ b/src/test/scala/io/kensu/redis_streams_zio/redis/PropertyGenerators.scala @@ -3,22 +3,24 @@ package io.kensu.redis_streams_zio.redis import io.kensu.redis_streams_zio.config.StreamKey import io.kensu.redis_streams_zio.redis.streams.{ReadGroupData, ReadGroupResult} import org.redisson.api.StreamMessageId -import zio.random.Random import zio.test.Gen.* -import zio.test.{Gen, Sized} +import zio.test.Gen import zio.{Chunk, Promise} +import zio.Random +import zio.test.Sized +import zio.test.Gen.string object PropertyGenerators: - val promise: Gen[Any, Promise[Throwable, Unit]] = fromEffect(Promise.make[Throwable, Unit]) - val streamMessageId: Gen[Random, StreamMessageId] = long(1L, 99999999999L).map(new StreamMessageId(_)) + val promise: Gen[Any, Promise[Throwable, Unit]] = fromZIO(Promise.make[Throwable, Unit]) + val streamMessageId: Gen[Any, StreamMessageId] = long(1L, 99999999999L).map(new StreamMessageId(_)) def redisData( streamKey: StreamKey - ): Gen[Random & Sized, ReadGroupResult] = - (anyString <*> streamMessageId).map { (msg, msgId) => + ): Gen[Any, ReadGroupResult] = + (string <*> streamMessageId).map { (msg, msgId) => ReadGroupResult(msgId, Chunk(ReadGroupData(streamKey, Chunk.fromArray(msg.getBytes("UTF-8"))))) } - def uniqueRedisData(streamKey: StreamKey): Gen[Random & Sized, (ReadGroupResult, ReadGroupResult)] = - (redisData(streamKey) <&> redisData(streamKey)).filter((a, b) => a.messageId != b.messageId) + def uniqueRedisData(streamKey: StreamKey): Gen[Any, (ReadGroupResult, ReadGroupResult)] = + redisData(streamKey).zip(redisData(streamKey)).filter((a, b) => a.messageId != b.messageId) diff --git a/src/test/scala/io/kensu/redis_streams_zio/redis/RedisConsumerSpec.scala b/src/test/scala/io/kensu/redis_streams_zio/redis/RedisConsumerSpec.scala index 07ca42a..a1ee181 100644 --- a/src/test/scala/io/kensu/redis_streams_zio/redis/RedisConsumerSpec.scala +++ b/src/test/scala/io/kensu/redis_streams_zio/redis/RedisConsumerSpec.scala @@ -8,23 +8,22 @@ import io.kensu.redis_streams_zio.specs.mocks.NotificationsRedisStreamMock import org.redisson.api.{StreamGroup, StreamMessageId} import zio.* import zio.Schedule.Decision -import zio.clock.Clock -import zio.duration.{durationInt, Duration} -import zio.logging.Logging +import zio.Clock +import zio.{durationInt, Duration} import zio.test.* import zio.test.Assertion.* -import zio.test.environment.{TestClock, TestEnvironment} -import zio.test.mock.Expectation.* +import zio.test.TestEnvironment +import zio.mock.Expectation.* +import zio.test.{TestClock, ZIOSpecDefault} -object RedisConsumerSpec extends DefaultRunnableSpec: +object RedisConsumerSpec extends ZIOSpecDefault: import TestData.* - override val spec: ZSpec[TestEnvironment, Failure] = - import zio.duration.* + override val spec = suite("RedisZStream.executeFor")( - testM("reuse consumer group if the requested one exists") { - checkAllM(promise) { + test("reuse consumer group if the requested one exists") { + checkAll(promise) { shutdownHook => val redisStreamMock = NotificationsRedisStreamMock.ListGroups( @@ -35,15 +34,15 @@ object RedisConsumerSpec extends DefaultRunnableSpec: RedisConsumer .executeFor[Any, StreamInstance.Notifications, StreamConsumerConfig]( shutdownHook = shutdownHook, - eventsProcessor = _.mapM(_ => ZIO.none), + eventsProcessor = _.mapZIO(_ => ZIO.none), repeatStrategy = Schedule.recurs(0).unit ) .map(assert(_)(equalTo(0L))) - .provideCustomLayer(testEnv(redisStreamMock)) + .provideLayer(testEnv(redisStreamMock)) } }, - testM("create consumer group if there is no one available") { - checkAllM(promise) { + test("create consumer group if there is no one available") { + checkAll(promise) { shutdownHook => val redisStreamMock = NotificationsRedisStreamMock.ListGroups(value(Chunk.empty)) ++ @@ -56,15 +55,15 @@ object RedisConsumerSpec extends DefaultRunnableSpec: RedisConsumer .executeFor[Any, StreamInstance.Notifications, StreamConsumerConfig]( shutdownHook = shutdownHook, - eventsProcessor = _.mapM(_ => ZIO.none), + eventsProcessor = _.mapZIO(_ => ZIO.none), repeatStrategy = Schedule.recurs(0).unit ) .map(assert(_)(equalTo(0L))) - .provideCustomLayer(testEnv(redisStreamMock)) + .provideLayer(testEnv(redisStreamMock)) } }, - testM("create consumer group if there is no requested one available") { - checkAllM(promise) { + test("create consumer group if there is no requested one available") { + checkAll(promise) { shutdownHook => val redisStreamMock = NotificationsRedisStreamMock.ListGroups(value(Chunk(new StreamGroup("no-way", 0, 0, null)))) ++ @@ -77,15 +76,15 @@ object RedisConsumerSpec extends DefaultRunnableSpec: RedisConsumer .executeFor[Any, StreamInstance.Notifications, StreamConsumerConfig]( shutdownHook = shutdownHook, - eventsProcessor = _.mapM(_ => ZIO.none), + eventsProcessor = _.mapZIO(_ => ZIO.none), repeatStrategy = Schedule.recurs(0).unit ) .map(assert(_)(equalTo(0L))) - .provideCustomLayer(testEnv(redisStreamMock)) + .provideLayer(testEnv(redisStreamMock)) } }, - testM("get PENDING messages initially") { - checkAllM(promise) { + test("get PENDING messages initially") { + checkAll(promise) { shutdownHook => val redisStreamMock = NotificationsRedisStreamMock.ListGroups( @@ -100,11 +99,11 @@ object RedisConsumerSpec extends DefaultRunnableSpec: repeatStrategy = Schedule.recurs(0).unit ) .map(assert(_)(equalTo(0L))) - .provideCustomLayer(testEnv(redisStreamMock)) + .provideLayer(testEnv(redisStreamMock)) } }, - testM("get PENDING messages initially, keep asking till asked for all and then ask for NEW messages") { - checkAllM(promise, redisData(streamKey), redisData(streamKey)) { + test("get PENDING messages initially, keep asking till asked for all and then ask for NEW messages") { + checkAll(promise, redisData(streamKey), redisData(streamKey)) { (shutdownHook, redisData1, redisData2) => val redisStreamMock = NotificationsRedisStreamMock.ListGroups( @@ -136,11 +135,11 @@ object RedisConsumerSpec extends DefaultRunnableSpec: repeatStrategy = Schedule.recurs(3).unit ) .map(assert(_)(equalTo(2L))) - .provideCustomLayer(testEnv(redisStreamMock)) + .provideLayer(testEnv(redisStreamMock)) } }, - testM("do not process the same PENDING messages in case they cannot be acknowledged") { - checkAllM(promise, redisData(streamKey), redisData(streamKey)) { + test("do not process the same PENDING messages in case they cannot be acknowledged") { + checkAll(promise, redisData(streamKey), redisData(streamKey)) { (shutdownHook, redisData1, redisData2) => val redisStreamMock = NotificationsRedisStreamMock.ListGroups( @@ -158,15 +157,15 @@ object RedisConsumerSpec extends DefaultRunnableSpec: RedisConsumer .executeFor[Any, StreamInstance.Notifications, StreamConsumerConfig]( shutdownHook = shutdownHook, - eventsProcessor = _.mapM(_ => ZIO.none), + eventsProcessor = _.mapZIO(_ => ZIO.none), repeatStrategy = Schedule.once ) .map(assert(_)(equalTo(0L))) - .provideCustomLayer(testEnv(redisStreamMock)) + .provideLayer(testEnv(redisStreamMock)) } }, - testM("keep getting NEW messages") { - checkAllM(promise, redisData(streamKey), redisData(streamKey)) { + test("keep getting NEW messages") { + checkAll(promise, redisData(streamKey), redisData(streamKey)) { (shutdownHook, redisData1, redisData2) => val redisStreamMock = NotificationsRedisStreamMock.ListGroups( @@ -192,11 +191,11 @@ object RedisConsumerSpec extends DefaultRunnableSpec: repeatStrategy = Schedule.recurs(3).unit ) .map(assert(_)(equalTo(2L))) - .provideCustomLayer(testEnv(redisStreamMock)) + .provideLayer(testEnv(redisStreamMock)) } }, - testM("keep getting NEW messages even if some are not acknowledged") { - checkAllM(promise, uniqueRedisData(streamKey)) { + test("keep getting NEW messages even if some are not acknowledged") { + checkAll(promise, uniqueRedisData(streamKey)) { case (shutdownHook, (redisData1, redisData2)) => val eventProcessor: TestEvent => UIO[Option[StreamMessageId]] = e => { if e.id == redisData1.messageId then ZIO.none @@ -219,15 +218,15 @@ object RedisConsumerSpec extends DefaultRunnableSpec: RedisConsumer .executeFor[Any, StreamInstance.Notifications, StreamConsumerConfig]( shutdownHook = shutdownHook, - eventsProcessor = _.mapM(eventsMapper).mapM(eventProcessor), + eventsProcessor = _.mapZIO(eventsMapper).mapZIO(eventProcessor), repeatStrategy = Schedule.recurs(3).unit ) .map(assert(_)(equalTo(1L))) - .provideCustomLayer(testEnv(redisStreamMock)) + .provideLayer(testEnv(redisStreamMock)) } }, - testM("get PENDING messages every defined period") { - checkAllM(promise) { + test("get PENDING messages every defined period") { + checkAll(promise) { shutdownHook => val redisStreamMock = NotificationsRedisStreamMock.ListGroups( @@ -238,26 +237,25 @@ object RedisConsumerSpec extends DefaultRunnableSpec: NotificationsRedisStreamMock.ReadGroup(equalTo(pendingReadGroupCorrectArgs), value(Chunk.empty)) ++ NotificationsRedisStreamMock.ReadGroup(equalTo(pendingReadGroupCorrectArgs), value(Chunk.empty)) - def stream(clock: TestClock.Service) = + val stream = RedisConsumer .executeFor[Any, StreamInstance.Notifications, StreamConsumerConfig]( shutdownHook = shutdownHook, eventsProcessor = successfulEventProcessor(_, Chunk.empty), repeatStrategy = Schedule .recurs(3) - .onDecision(_ => clock.adjust(config.checkPendingEvery.plusSeconds(1))) + .onDecision((_, _, _) => TestClock.adjust(config.checkPendingEvery.plusSeconds(1))) .unit ) (for - clock <- ZIO.service[TestClock.Service] - result <- stream(clock) + result <- stream yield assert(result)(equalTo(0L))) - .provideSomeLayer[TestEnvironment](testEnv(redisStreamMock)) + .provideLayer(testEnv(redisStreamMock)) } }, - testM("acknowledge message with an empty value") { - checkAllM(promise, redisData(streamKey)) { (shutdownHook, message) => + test("acknowledge message with an empty value") { + checkAll(promise, redisData(streamKey)) { (shutdownHook, message) => val messageWithEmptyValue = message.copy(data = Chunk.empty) val redisStreamMock = NotificationsRedisStreamMock.ListGroups( @@ -280,11 +278,11 @@ object RedisConsumerSpec extends DefaultRunnableSpec: repeatStrategy = Schedule.recurs(0).unit ) .map(assert(_)(equalTo(1L))) - .provideCustomLayer(testEnv(redisStreamMock)) + .provideLayer(testEnv(redisStreamMock)) } } @@ TestAspect.nonFlaky, - testM("retry in case of group listing failure") { - checkAllM(promise) { + test("retry in case of group listing failure") { + checkAll(promise) { shutdownHook => val redisStreamMock = NotificationsRedisStreamMock.ListGroups(failure(new RuntimeException("BOOM"))) ++ @@ -309,11 +307,11 @@ object RedisConsumerSpec extends DefaultRunnableSpec: _ <- TestClock.adjust(config.retry.min.plusSeconds(1)) result <- forked.join yield assert(result)(equalTo(0L))) - .provideSomeLayer[TestEnvironment](testEnv(redisStreamMock)) + .provideLayer(testEnv(redisStreamMock)) } }, - testM("retry in case of group creation failure") { - checkAllM(promise) { + test("retry in case of group creation failure") { + checkAll(promise) { shutdownHook => val redisStreamMock = NotificationsRedisStreamMock.ListGroups(value(Chunk.empty)) ++ @@ -341,11 +339,11 @@ object RedisConsumerSpec extends DefaultRunnableSpec: _ <- TestClock.adjust(config.retry.min.plusSeconds(1)) result <- forked.join yield assert(result)(equalTo(0L))) - .provideSomeLayer[TestEnvironment](testEnv(redisStreamMock)) + .provideLayer(testEnv(redisStreamMock)) } }, - testM("retry in case of group reading failure") { - checkAllM(promise) { + test("retry in case of group reading failure") { + checkAll(promise) { shutdownHook => val redisStreamMock = NotificationsRedisStreamMock.ListGroups( @@ -374,11 +372,11 @@ object RedisConsumerSpec extends DefaultRunnableSpec: _ <- TestClock.adjust(config.retry.min.plusSeconds(1)) result <- forked.join yield assert(result)(equalTo(0L))) - .provideSomeLayer[TestEnvironment](testEnv(redisStreamMock)) + .provideLayer(testEnv(redisStreamMock)) } }, - testM("retry in case of acknowledge failure") { - checkAllM(promise, redisData(streamKey)) { + test("retry in case of acknowledge failure") { + checkAll(promise, redisData(streamKey)) { (shutdownHook, redisData) => val redisStreamMock = NotificationsRedisStreamMock.ListGroups( @@ -415,11 +413,11 @@ object RedisConsumerSpec extends DefaultRunnableSpec: _ <- TestClock.adjust(config.retry.min.plusSeconds(1)) result <- forked.join yield assert(result)(equalTo(1L))) - .provideSomeLayer[TestEnvironment](testEnv(redisStreamMock)) + .provideLayer(testEnv(redisStreamMock)) } }, - testM("triggering the shutdown hook will stop stream processing") { - checkAllM(promise, uniqueRedisData(streamKey)) { + test("triggering the shutdown hook will stop stream processing") { + checkAll(promise, uniqueRedisData(streamKey)) { case (shutdownHook, (redisData1, redisData2)) => val redisStreamMock = NotificationsRedisStreamMock.ListGroups( @@ -450,9 +448,10 @@ object RedisConsumerSpec extends DefaultRunnableSpec: shutdownHook = shutdownHook, eventsProcessor = successfulEventProcessor(_, Chunk(redisData1, redisData2)), repeatStrategy = Schedule.forever - .onDecision { - case Decision.Continue(attempt, _, _) if attempt == 1 => shutdownHook.succeed(()) - case _ => ZIO.unit + .onDecision { (attempt, l, decision) => + decision match + case Decision.Continue(_) if attempt == 1 => shutdownHook.succeed(()) + case _ => ZIO.unit } .unit ) @@ -462,7 +461,7 @@ object RedisConsumerSpec extends DefaultRunnableSpec: _ <- shutdownHook.await _ <- forked.join yield assertCompletes) - .provideSomeLayer[TestEnvironment](testEnv(redisStreamMock)) + .provideLayer(testEnv(redisStreamMock)) } } ) @@ TestAspect.timeout(30.seconds) @@ -474,13 +473,13 @@ object RedisConsumerSpec extends DefaultRunnableSpec: stream: StreamInput[StreamInstance.Notifications, StreamConsumerConfig], redisData: Chunk[ReadGroupResult] ) = - stream.mapM(eventsMapper).map(e => redisData.find(_.messageId == e.id).map(_.messageId)) + stream.mapZIO(eventsMapper).map(e => redisData.find(_.messageId == e.id).map(_.messageId)) private def failingEventProcessor(stream: StreamInput[StreamInstance.Notifications, StreamConsumerConfig]) = - stream.mapM(_ => IO.fail(new IllegalStateException("I should not be called"))) + stream.mapZIO(_ => ZIO.fail(new IllegalStateException("I should not be called"))) - private def testEnv(redisStreamMock: ULayer[Has[RedisStream[StreamInstance.Notifications]]]) = - ZLayer.succeed(config) ++ redisStreamMock ++ ZLayer.identity[Clock] ++ Logging.ignore + private def testEnv(redisStreamMock: ULayer[RedisStream[StreamInstance.Notifications]]) = + (Runtime.removeDefaultLoggers ++ ZLayer.succeed(config)) ++ redisStreamMock private[redis] case class TestEvent( id: StreamMessageId, diff --git a/src/test/scala/io/kensu/redis_streams_zio/redis/RedisStaleEventsCollectorSpec.scala b/src/test/scala/io/kensu/redis_streams_zio/redis/RedisStaleEventsCollectorSpec.scala index 6d17828..797473e 100644 --- a/src/test/scala/io/kensu/redis_streams_zio/redis/RedisStaleEventsCollectorSpec.scala +++ b/src/test/scala/io/kensu/redis_streams_zio/redis/RedisStaleEventsCollectorSpec.scala @@ -10,22 +10,17 @@ import io.kensu.redis_streams_zio.redis.streams.{ import io.kensu.redis_streams_zio.specs.mocks.NotificationsRedisStreamMock import org.redisson.api.{PendingEntry, StreamMessageId} import zio.* -import zio.clock.Clock -import zio.duration.{durationInt, Duration} -import zio.logging.Logging -import zio.test.{DefaultRunnableSpec, *} import zio.test.Assertion.* -import zio.test.environment.{TestClock, TestEnvironment} -import zio.test.mock.Expectation.* +import zio.mock.Expectation.* +import zio.test.* -object RedisStaleEventsCollectorSpec extends DefaultRunnableSpec: +object RedisStaleEventsCollectorSpec extends ZIOSpecDefault: import TestData.* - override val spec: ZSpec[TestEnvironment, Failure] = - import zio.duration.* + override val spec = suite("RedisZCollector.executeFor")( - testM("does not begin processing before initial delay") { + test("does not begin processing before initial delay") { val redisStreamMock = NotificationsRedisStreamMock.ListPending(equalTo(config.groupName, 100), value(Chunk.empty)).atMost(0) @@ -35,9 +30,9 @@ object RedisStaleEventsCollectorSpec extends DefaultRunnableSpec: _ <- TestClock.adjust(config.claiming.initialDelay.minusMillis(1)) _ <- forked.interrupt yield assertCompletes) - .provideSomeLayer[TestEnvironment](testEnv(redisStreamMock)) + .provideLayer(testEnv(redisStreamMock)) }, - testM("begin processing after initial delay") { + test("begin processing after initial delay") { val redisStreamMock = NotificationsRedisStreamMock.ListPending(equalTo(config.groupName, 100), value(Chunk.empty)) @@ -48,9 +43,9 @@ object RedisStaleEventsCollectorSpec extends DefaultRunnableSpec: _ <- TestClock.adjust(config.claiming.initialDelay) _ <- forked.interrupt yield assertCompletes) - .provideSomeLayer[TestEnvironment](testEnv(redisStreamMock)) + .provideLayer(testEnv(redisStreamMock)) }, - testM("claim only messages & exceeded idle time from other consumers") { + test("claim only messages & exceeded idle time from other consumers") { val goodEntry = new PendingEntry(new StreamMessageId(412323), config.consumerName.value, 1, 0) val otherConsumerExceededIdleTimeEntry = new PendingEntry( new StreamMessageId(65456345), @@ -87,9 +82,9 @@ object RedisStaleEventsCollectorSpec extends DefaultRunnableSpec: _ <- TestClock.adjust(config.claiming.initialDelay) _ <- forked.interrupt yield assertCompletes) - .provideSomeLayer[TestEnvironment](testEnv(redisStreamMock)) + .provideLayer(testEnv(redisStreamMock)) }, - testM("claim only half of suitable messages") { + test("claim only half of suitable messages") { val exceededIdleTimeEntry1 = new PendingEntry( new StreamMessageId(65456345), "other-consumer", @@ -125,9 +120,9 @@ object RedisStaleEventsCollectorSpec extends DefaultRunnableSpec: _ <- TestClock.adjust(config.claiming.initialDelay) _ <- forked.interrupt yield assertCompletes) - .provideSomeLayer[TestEnvironment](testEnv(redisStreamMock)) + .provideLayer(testEnv(redisStreamMock)) }, - testM("can keep repeating the claiming process") { + test("can keep repeating the claiming process") { val exceededIdleTimeEntry1 = new PendingEntry( new StreamMessageId(65456345), "other-consumer", @@ -179,9 +174,9 @@ object RedisStaleEventsCollectorSpec extends DefaultRunnableSpec: _ <- TestClock.adjust(config.claiming.initialDelay) _ <- forked.join yield assertCompletes) - .provideSomeLayer[TestEnvironment](testEnv(redisStreamMock)) + .provideLayer(testEnv(redisStreamMock)) }, - testM("acknowledge only messages with exceeded number of deliveries") { + test("acknowledge only messages with exceeded number of deliveries") { val goodEntry = new PendingEntry(new StreamMessageId(412323), config.consumerName.value, 1, 0) val otherConsumerExceededIdleTimeEntry = new PendingEntry( new StreamMessageId(65456345), @@ -216,9 +211,9 @@ object RedisStaleEventsCollectorSpec extends DefaultRunnableSpec: _ <- TestClock.adjust(config.claiming.initialDelay) _ <- forked.interrupt yield assertCompletes) - .provideSomeLayer[TestEnvironment](testEnv(redisStreamMock)) + .provideLayer(testEnv(redisStreamMock)) }, - testM("can keep repeating the acknowledge process") { + test("can keep repeating the acknowledge process") { val goodEntry = new PendingEntry(new StreamMessageId(412323), config.consumerName.value, 1, 0) val otherConsumerExceededIdleTimeEntry = new PendingEntry( new StreamMessageId(65456345), @@ -263,12 +258,12 @@ object RedisStaleEventsCollectorSpec extends DefaultRunnableSpec: _ <- TestClock.adjust(config.claiming.initialDelay) _ <- forked.join yield assertCompletes) - .provideSomeLayer[TestEnvironment](testEnv(redisStreamMock)) + .provideLayer(testEnv(redisStreamMock)) } ) @@ TestAspect.timeout(30.seconds) - private def testEnv(redisStreamMock: ULayer[Has[RedisStream[StreamInstance.Notifications]]]) = - ZLayer.succeed(config) ++ redisStreamMock ++ ZLayer.identity[Clock] ++ Logging.ignore + private def testEnv(redisStreamMock: ULayer[RedisStream[StreamInstance.Notifications]]) = + (Runtime.removeDefaultLoggers ++ ZLayer.succeed(config)) ++ redisStreamMock private object TestData: diff --git a/src/test/scala/io/kensu/redis_streams_zio/services/producers/EventProducerSpec.scala b/src/test/scala/io/kensu/redis_streams_zio/services/producers/EventProducerSpec.scala index ad0d940..4bc9b8b 100644 --- a/src/test/scala/io/kensu/redis_streams_zio/services/producers/EventProducerSpec.scala +++ b/src/test/scala/io/kensu/redis_streams_zio/services/producers/EventProducerSpec.scala @@ -7,26 +7,26 @@ import io.kensu.redis_streams_zio.redis.streams.NotificationsRedisStream import io.kensu.redis_streams_zio.redis.streams.{RedisStream, StreamInstance} import io.kensu.redis_streams_zio.specs.mocks.NotificationsRedisStreamMock import org.redisson.api.StreamMessageId -import zio.{Chunk, Has, ULayer, ZLayer} -import zio.clock.* -import zio.duration.* -import zio.logging.Logging + import zio.test.* import zio.test.Assertion.* -import zio.test.environment.{TestClock, TestEnvironment} -import zio.test.mock.Expectation.* +import zio.test.TestEnvironment +import zio.mock.Expectation.* +import zio.* +import zio.Clock.currentTime +import zio.test.ZIOSpecDefault -object EventProducerSpec extends DefaultRunnableSpec: +object EventProducerSpec extends ZIOSpecDefault: import TestData.* - private def testEnv(redisStreamMock: ULayer[Has[RedisStream[StreamInstance.Notifications]]]) = - (redisStreamMock ++ ZLayer.identity[Clock] ++ Logging.ignore) >>> NotificationsEventProducer.redis + private def testEnv(redisStreamMock: ULayer[RedisStream[StreamInstance.Notifications]]) = + (Runtime.removeDefaultLoggers ++ redisStreamMock) >>> NotificationsEventProducer.redis - override def spec: ZSpec[TestEnvironment, Failure] = + override val spec = suite("EventProducer.redis")( suite("publish")( - testM("fail if cannot send an event") { + test("fail if cannot send an event") { val redisStreamMock = NotificationsRedisStreamMock.StreamInstance(value(StreamInstance.Notifications(streamName))) ++ NotificationsRedisStreamMock.Add( @@ -48,16 +48,19 @@ object EventProducerSpec extends DefaultRunnableSpec: (for timeBefore <- currentTime(TimeUnit.SECONDS) - forked <- NotificationsEventProducer(_.publish(testStreamKey, testEvent)).run.fork + forked <- ZIO.serviceWithZIO[EventProducer[StreamInstance.Notifications]](_.publish( + testStreamKey, + testEvent + )).exit.fork _ <- TestClock.adjust(21.seconds) // 3 retries for 3 sec exponential * 2 msg <- forked.join timeAfter <- currentTime(TimeUnit.SECONDS) yield { assert(msg)(fails(isSubtype[RuntimeException](anything))) && assert(timeAfter - timeBefore)(isGreaterThanEqualTo(21L)) - }).provideSomeLayer[TestEnvironment](testEnv(redisStreamMock)) + }).provideLayer(testEnv(redisStreamMock)) }, - testM("succeed if can send an event") { + test("succeed if can send an event") { val redisStreamMock = NotificationsRedisStreamMock.StreamInstance(value(StreamInstance.Notifications(streamName))) ++ NotificationsRedisStreamMock.Add( @@ -65,9 +68,9 @@ object EventProducerSpec extends DefaultRunnableSpec: value(new StreamMessageId(123L, 456L)) ) - NotificationsEventProducer(_.publish(testStreamKey, testEvent)) + ZIO.serviceWithZIO[EventProducer[StreamInstance.Notifications]](_.publish(testStreamKey, testEvent)) .map(createdMsgId => assert(createdMsgId)(equalTo(PublishedEventId("123-456")))) - .provideCustomLayer(testEnv(redisStreamMock)) + .provideLayer(testEnv(redisStreamMock)) } ) ) diff --git a/src/test/scala/io/kensu/redis_streams_zio/specs/mocks/NotificationsEventProducerMock.scala b/src/test/scala/io/kensu/redis_streams_zio/specs/mocks/NotificationsEventProducerMock.scala deleted file mode 100644 index 7c7f3bd..0000000 --- a/src/test/scala/io/kensu/redis_streams_zio/specs/mocks/NotificationsEventProducerMock.scala +++ /dev/null @@ -1,21 +0,0 @@ -package io.kensu.redis_streams_zio.specs.mocks - -import io.kensu.redis_streams_zio.config.StreamKey -import io.kensu.redis_streams_zio.redis.streams.StreamInstance -import io.kensu.redis_streams_zio.services.producers.{EventProducer, EventSerializable, PublishedEventId} -import zio.{Has, Tag, Task, URLayer, ZLayer} -import zio.test.mock.* - -object NotificationsEventProducerMock extends Mock[Has[EventProducer[StreamInstance.Notifications]]]: - - object Publish extends Poly.Effect.Input[Throwable, PublishedEventId] - - override val compose: URLayer[Has[Proxy], Has[EventProducer[StreamInstance.Notifications]]] = - ZLayer.fromServiceM { proxy => - withRuntime.map { rts => - new EventProducer[StreamInstance.Notifications] { - override def publish[E: EventSerializable: Tag](streamKey: StreamKey, event: E): Task[PublishedEventId] = - proxy(Publish.of[(StreamKey, E)], streamKey, event) - } - } - } diff --git a/src/test/scala/io/kensu/redis_streams_zio/specs/mocks/NotificationsRedisStreamMock.scala b/src/test/scala/io/kensu/redis_streams_zio/specs/mocks/NotificationsRedisStreamMock.scala index 2b8ad4e..f6b3f85 100644 --- a/src/test/scala/io/kensu/redis_streams_zio/specs/mocks/NotificationsRedisStreamMock.scala +++ b/src/test/scala/io/kensu/redis_streams_zio/specs/mocks/NotificationsRedisStreamMock.scala @@ -10,11 +10,11 @@ import io.kensu.redis_streams_zio.redis.streams.{ StreamInstance } import org.redisson.api.{PendingEntry, StreamGroup, StreamMessageId} -import zio.test.mock.* -import zio.{Chunk, Has, NonEmptyChunk, Task, UIO, URLayer, ZLayer} -import zio.duration.Duration +import zio.mock.* +import zio.{Chunk, NonEmptyChunk, Task, UIO, URLayer, ZLayer} +import zio.* -object NotificationsRedisStreamMock extends Mock[Has[RedisStream[StreamInstance.Notifications]]]: +object NotificationsRedisStreamMock extends Mock[RedisStream[StreamInstance.Notifications]]: object StreamInstance extends Effect[Unit, Nothing, StreamInstance] @@ -43,47 +43,47 @@ object NotificationsRedisStreamMock extends Mock[Has[RedisStream[StreamInstance. object Add extends Effect[(StreamKey, Chunk[Byte]), Throwable, StreamMessageId] override val compose: URLayer[ - Has[Proxy], - Has[RedisStream[io.kensu.redis_streams_zio.redis.streams.StreamInstance.Notifications]] + Proxy, + RedisStream[io.kensu.redis_streams_zio.redis.streams.StreamInstance.Notifications] ] = - ZLayer.fromServiceM { proxy => - withRuntime.as { - new RedisStream[io.kensu.redis_streams_zio.redis.streams.StreamInstance.Notifications] { - - override val streamInstance: UIO[StreamInstance] = - proxy(StreamInstance) - - override def listGroups: Task[Chunk[StreamGroup]] = - proxy(ListGroups) - - override def createGroup(groupName: StreamGroupName, strategy: CreateGroupStrategy): Task[Unit] = - proxy(CreateGroup, groupName, strategy) - - override def readGroup( - groupName: StreamGroupName, - consumerName: StreamConsumerName, - count: Int, - timeout: Duration, - strategy: ListGroupStrategy - ): Task[Chunk[ReadGroupResult]] = - proxy(ReadGroup, groupName, consumerName, count, timeout, strategy) - - override def ack(groupName: StreamGroupName, ids: NonEmptyChunk[StreamMessageId]): Task[Long] = - proxy(Ack, groupName, ids) - - override def fastClaim( - groupName: StreamGroupName, - consumerName: StreamConsumerName, - maxIdleTime: Duration, - ids: NonEmptyChunk[StreamMessageId] - ): Task[Chunk[StreamMessageId]] = - proxy(FastClaim, groupName, consumerName, maxIdleTime, ids) - - override def listPending(groupName: StreamGroupName, count: Int): Task[Chunk[PendingEntry]] = - proxy(ListPending, groupName, count) - - override def add(key: StreamKey, payload: Chunk[Byte]): Task[StreamMessageId] = - proxy(Add, key, payload) - } + ZLayer { + for { + proxy <- ZIO.service[Proxy] + } yield new RedisStream[io.kensu.redis_streams_zio.redis.streams.StreamInstance.Notifications] { + + override val streamInstance: UIO[StreamInstance] = + proxy(StreamInstance) + + override def listGroups: Task[Chunk[StreamGroup]] = + proxy(ListGroups) + + override def createGroup(groupName: StreamGroupName, strategy: CreateGroupStrategy): Task[Unit] = + proxy(CreateGroup, groupName, strategy) + + override def readGroup( + groupName: StreamGroupName, + consumerName: StreamConsumerName, + count: Int, + timeout: Duration, + strategy: ListGroupStrategy + ): Task[Chunk[ReadGroupResult]] = + proxy(ReadGroup, groupName, consumerName, count, timeout, strategy) + + override def ack(groupName: StreamGroupName, ids: NonEmptyChunk[StreamMessageId]): Task[Long] = + proxy(Ack, groupName, ids) + + override def fastClaim( + groupName: StreamGroupName, + consumerName: StreamConsumerName, + maxIdleTime: Duration, + ids: NonEmptyChunk[StreamMessageId] + ): Task[Chunk[StreamMessageId]] = + proxy(FastClaim, groupName, consumerName, maxIdleTime, ids) + + override def listPending(groupName: StreamGroupName, count: Int): Task[Chunk[PendingEntry]] = + proxy(ListPending, groupName, count) + + override def add(key: StreamKey, payload: Chunk[Byte]): Task[StreamMessageId] = + proxy(Add, key, payload) } }