Skip to content

Commit

Permalink
Migration to ZIO 2 (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
leszekgruchala authored Jan 16, 2023
1 parent 8ac9104 commit b130176
Show file tree
Hide file tree
Showing 26 changed files with 353 additions and 422 deletions.
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = "3.5.8"
version = "3.6.1"
runner.dialect = scala3
encoding = "UTF-8"
maxColumn = 120
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '3'
services:

redis:
image: redis:6.2.6-alpine
image: redis:7-alpine
container_name: redis-streams-zio-redis
restart: unless-stopped
volumes:
Expand Down
2 changes: 1 addition & 1 deletion project/Common.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object Common {
.settings(
organization := "io.kensu",
name := "redis-streams-zio",
scalaVersion := "3.1.3",
scalaVersion := "3.2.1",
version := "1.0.0-SNAPSHOT",
scalacOptions ++= commonScalacOptions,
Compile / console / scalacOptions --= Seq("-Werror"),
Expand Down
15 changes: 8 additions & 7 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,24 @@ import sbt._
object Dependencies {

object Version {
val zio = "1.0.14"
val zioConfig = "2.0.4"
val zio = "2.0.5"
val zioConfig = "3.0.7"
}

val zio = Seq(
"dev.zio" %% "zio" % Version.zio,
"dev.zio" %% "zio-streams" % Version.zio,
"dev.zio" %% "zio-logging-slf4j" % "0.5.14",
"dev.zio" %% "zio-logging-slf4j" % "2.1.7",
"dev.zio" %% "zio-config-typesafe" % Version.zioConfig,
"dev.zio" %% "zio-config-magnolia" % Version.zioConfig,
"dev.zio" %% "zio-test-sbt" % Version.zio % Test
"dev.zio" %% "zio-test-sbt" % Version.zio % Test,
"dev.zio" %% "zio-mock" % "1.0.0-RC9" % Test
)

val redisson = Seq("org.redisson" % "redisson" % "3.17.4")
val redisson = Seq("org.redisson" % "redisson" % "3.19.1")

val logback = Seq(
"ch.qos.logback" % "logback-classic" % "1.2.11",
"org.slf4j" % "log4j-over-slf4j" % "1.7.36"
"ch.qos.logback" % "logback-classic" % "1.4.5",
"org.slf4j" % "log4j-over-slf4j" % "2.0.6"
)
}
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 1.7.1
sbt.version = 1.8.2
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
//https://scalameta.org/scalafmt/#sbt-scalafmt
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.0")
1 change: 0 additions & 1 deletion src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="false" debug="false">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<withJansi>true</withJansi>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>[%blue(%date{ISO8601})][%highlight(%-5level)][%magenta(%replace(%logger{10}.%M){'\.\?+|\$*\.\$.+\$+\d*|\$',''})][%thread] - %msg %cyan(%X{correlation_id} %X{client_ip} %marker) %n%rootException</pattern>
</encoder>
Expand Down
50 changes: 16 additions & 34 deletions src/main/scala/io/kensu/redis_streams_zio/Consumer.scala
Original file line number Diff line number Diff line change
@@ -1,40 +1,21 @@
package io.kensu.redis_streams_zio

import io.kensu.redis_streams_zio.config.Configs
import io.kensu.redis_streams_zio.config.{Configs, NotificationsStreamConsumerConfig}
import io.kensu.redis_streams_zio.logging.KensuLogAnnotation
import io.kensu.redis_streams_zio.redis.RedisClient
import io.kensu.redis_streams_zio.redis.streams.{NotificationsRedisStream, StreamInstance}
import io.kensu.redis_streams_zio.redis.streams.notifications.{NotificationsConsumer, NotificationsStaleEventsCollector}
import zio.*
import zio.clock.Clock
import io.kensu.redis_streams_zio.redis.streams.{NotificationsRedisStream, RedisStream, StreamInstance}
import zio.config.syntax.*
import zio.duration.*
import zio.logging.*
import zio.logging.slf4j.Slf4jLogger
import zio.logging.backend.SLF4J
import zio.{Clock, ZIOAppDefault, *}

object Consumer extends App:

override def run(args: List[String]): URIO[ZEnv, ExitCode] =
streams.useForever
.provideCustomLayer(liveEnv)
.exitCode
object Consumer extends ZIOAppDefault:

private val streams =
ZManaged.makeInterruptible {
for
shutdownHook <- Promise.make[Throwable, Unit]
notificationStreamFiber <- notificationsStream(shutdownHook)
yield (shutdownHook, notificationStreamFiber)
} { (shutdownHook, notificationStreamFiber) =>
(for
_ <- log.info("Halting streams")
_ <- shutdownHook.succeed(())
_ <- shutdownHook.await
_ <- log.info("Shutting down streams... this may take a few seconds")
_ <- notificationStreamFiber.join `race` ZIO.sleep(5.seconds)
_ <- log.info("Streams shut down")
yield ()).ignore
}
ZIO.acquireRelease(Promise.make[Throwable, Unit])(hook =>
hook.succeed(()) *> ZIO.logInfo("Shutting down streams... this may take a moment")
).flatMap(notificationsStream)

private def notificationsStream(shutdownHook: Promise[Throwable, Unit]) =
for
Expand All @@ -45,19 +26,20 @@ object Consumer extends App:
private val liveEnv =
val appConfig = Configs.appConfig

val logging: ULayer[Logging] = Slf4jLogger.makeWithAnnotationsAsMdc(
mdcAnnotations = List(KensuLogAnnotation.CorrelationId),
logFormat = (_, msg) => msg
) >>> Logging.modifyLogger(_.derive(KensuLogAnnotation.InitialLogContext))
val logging = Runtime.removeDefaultLoggers >>> SLF4J.slf4j

val redisClient = appConfig.narrow(_.redis) >>> RedisClient.live

val notificationsStream =
val streamInstance = appConfig.narrow(_.redisStreams.consumers).map(hasConsumers =>
Has(StreamInstance.Notifications(hasConsumers.get.notifications.streamName))
ZEnvironment(StreamInstance.Notifications(hasConsumers.get.notifications.streamName))
)
(streamInstance ++ redisClient) >>> NotificationsRedisStream.redisson

val clock = ZLayer.identity[Clock]
ZLayer.make[NotificationsStreamConsumerConfig & RedisStream[StreamInstance.Notifications]](
logging,
appConfig.narrow(_.redisStreams.consumers.notifications),
notificationsStream
)

clock ++ logging ++ appConfig.narrow(_.redisStreams.consumers.notifications) ++ notificationsStream
override val run = ZIO.scoped(streams *> ZIO.never).provideLayer(liveEnv) @@ KensuLogAnnotation.InitialLogContext
54 changes: 22 additions & 32 deletions src/main/scala/io/kensu/redis_streams_zio/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,57 +4,47 @@ import io.kensu.redis_streams_zio.config.{Configs, NotificationsStreamProducerCo
import io.kensu.redis_streams_zio.logging.KensuLogAnnotation
import io.kensu.redis_streams_zio.redis.RedisClient
import io.kensu.redis_streams_zio.redis.streams.{NotificationsRedisStream, StreamInstance}
import io.kensu.redis_streams_zio.services.producers.NotificationsEventProducer
import zio.*
import zio.clock.Clock
import io.kensu.redis_streams_zio.services.producers.{EventProducer, NotificationsEventProducer}
import zio.Random.nextString
import zio.config.getConfig
import zio.config.syntax.ZIOConfigNarrowOps
import zio.duration.durationInt
import zio.logging.Logging
import zio.logging.slf4j.Slf4jLogger
import zio.random.nextString
import io.kensu.redis_streams_zio.services.producers.EventProducer
import zio.random.Random
import zio.config.syntax.*
import zio.logging.backend.SLF4J
import zio.logging.backend.SLF4J.loggerNameAnnotationKey
import zio.{Clock, Random, ZIOAppDefault, ZIOAspect, *}

object Producer extends App:
object Producer extends ZIOAppDefault:

override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] =
sentNotification
.repeat(Schedule.spaced(5.seconds).jittered)
.provideCustomLayer(liveEnv)
.exitCode

val sentNotification: ZIO[
Has[NotificationsStreamProducerConfig] & Random & Has[EventProducer[StreamInstance.Notifications]],
Throwable,
Unit
] =
private val sentNotification =
for
config <- getConfig[NotificationsStreamProducerConfig]
str <- nextString(10)
_ <- NotificationsEventProducer(_.publish(config.addKey, str))
_ <- ZIO.serviceWithZIO[EventProducer[StreamInstance.Notifications]](_.publish(config.addKey, str))
yield ()

private val liveEnv =
val appConfig = Configs.appConfig
val producerConfig = appConfig.narrow(_.redisStreams.producers.notifications)

val logging: ULayer[Logging] = Slf4jLogger.makeWithAnnotationsAsMdc(
mdcAnnotations = List(KensuLogAnnotation.CorrelationId),
logFormat = (_, msg) => msg
) >>> Logging.modifyLogger(_.derive(KensuLogAnnotation.InitialLogContext))
val logging = Runtime.removeDefaultLoggers >>> SLF4J.slf4j

val redisClient = appConfig.narrow(_.redis) >>> RedisClient.live

val clock = ZLayer.identity[Clock]

val notificationsStream =
val redisStream =
val streamInstance = appConfig.narrow(_.redisStreams.producers).map { hasProducers =>
Has(StreamInstance.Notifications(hasProducers.get.notifications.streamName))
ZEnvironment(StreamInstance.Notifications(hasProducers.get.notifications.streamName))
}
(streamInstance ++ redisClient) >>> NotificationsRedisStream.redisson

(redisStream ++ clock ++ logging) >>> NotificationsEventProducer.redis
(redisStream ++ logging) >>> NotificationsEventProducer.redis

clock ++ logging ++ producerConfig ++ notificationsStream
ZLayer.make[NotificationsStreamProducerConfig & EventProducer[StreamInstance.Notifications]](
logging,
producerConfig,
notificationsStream
)

override val run =
sentNotification
.repeat(Schedule.spaced(5.seconds).jittered)
.provideLayer(liveEnv) @@ KensuLogAnnotation.InitialLogContext
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.kensu.redis_streams_zio.common

opaque type CorrelationId = String

object CorrelationId:
def apply(value: String): CorrelationId = value
final case class CorrelationId(value: String) extends AnyVal {
override def toString: String = value
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.kensu.redis_streams_zio.common

import zio.Schedule
import zio.duration.{Duration, *}
import zio.{Duration, *}

object Scheduling:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import zio.config.ConfigDescriptor.*
import zio.config.*
import zio.config.magnolia.{descriptor, Descriptor}
import zio.config.typesafe.TypesafeConfig
import zio.duration.Duration
import zio.{Has, Layer, UIO}
import zio.*

final case class RootConfig(
kensu: AppConfig
Expand Down Expand Up @@ -108,5 +107,5 @@ object Configs:
import zio.config.syntax.*
import zio.config.typesafe.*

val appConfig: Layer[ReadError[String], Has[AppConfig]] =
read(descriptor[RootConfig].from(TypesafeConfigSource.fromResourcePath)).map(_.kensu).toLayer
val appConfig: Layer[ReadError[String], AppConfig] =
ZLayer.fromZIO(read(descriptor[RootConfig].from(TypesafeConfigSource.fromResourcePath)).map(_.kensu))
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
package io.kensu.redis_streams_zio.logging

import io.kensu.redis_streams_zio.common.CorrelationId
import zio.logging.LogAnnotation
import zio.logging.LogContext
import zio.logging.{LogAnnotation, LogContext}

object KensuLogAnnotation:

val AppCorrelationId: CorrelationId = io.kensu.redis_streams_zio.common.CorrelationId("application")

val CorrelationId: LogAnnotation[CorrelationId] = LogAnnotation[CorrelationId](
name = "correlation_id",
initialValue = AppCorrelationId,
combine = (_, r) => r,
render = _.toString
name = "correlation_id",
combine = (_, r) => r,
render = _.value
)

val InitialLogContext: LogContext => LogContext = KensuLogAnnotation.CorrelationId(AppCorrelationId)
val InitialLogContext = KensuLogAnnotation.CorrelationId(AppCorrelationId)
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@ import zio.config.getConfig

object RedisClient:

type RedisClient = Has[RedissonClient]

val live: ZLayer[Has[RedisConfig], Throwable, RedisClient] =
ZLayer.fromManaged {
ZManaged.make(
val live: ZLayer[RedisConfig, Throwable, RedissonClient] =
ZLayer.scoped {
ZIO.acquireRelease(
getConfig[RedisConfig].map { config =>
val redissonConfig = new Config()
redissonConfig.setCodec(new ByteArrayCodec())
Expand All @@ -24,8 +22,8 @@ object RedisClient:
.setPassword(config.password)
Redisson.create(redissonConfig);
}
)(c => ZIO.effect(c.shutdown()).orDie)
)(c => ZIO.logInfo("Shutting down Redisson") *> ZIO.attempt(c.shutdown()).orDie)
}

def getStream[K, V](name: StreamName): ZIO[RedisClient, Nothing, RStream[K, V]] =
ZIO.service[RedissonClient].flatMap(client => UIO(client.getStream[K, V](name.value)))
def getStream[K, V](name: StreamName): ZIO[RedissonClient, Nothing, RStream[K, V]] =
ZIO.serviceWith[RedissonClient](_.getStream[K, V](name.value))
Loading

0 comments on commit b130176

Please sign in to comment.