Skip to content

Commit

Permalink
Replace deprecated statefulMapConcat with statefulMap
Browse files Browse the repository at this point in the history
  • Loading branch information
pbernet committed Jan 12, 2024
1 parent 0b7c703 commit c4ad9e7
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 27 deletions.
31 changes: 18 additions & 13 deletions src/main/scala/sample/stream_shared_state/WindowingExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sample.stream_shared_state
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.Source

import java.time.format.DateTimeFormatter
import java.time.{Instant, OffsetDateTime, ZoneId}
import scala.collection.mutable
import scala.concurrent.duration._
Expand Down Expand Up @@ -39,7 +40,7 @@ object WindowingExample extends App {
val delayFactor = 8
val acceptedMaxDelay = 4.seconds.toMillis // Lower value leads to dropping of events

implicit val ordering = new Ordering[MyEvent]{
implicit val ordering: Ordering[MyEvent] = new Ordering[MyEvent] {
def compare(x: MyEvent, y: MyEvent): Int = {
if (x.timestamp < y.timestamp) -1
else if (x.timestamp > y.timestamp) 1
Expand All @@ -50,10 +51,14 @@ object WindowingExample extends App {
Source
.tick(0.seconds, 1.second, "")
.map(_ => createEvent())
.statefulMapConcat { () =>
val generator = new CommandGenerator()
ev => generator.forEvent(ev)
}
.statefulMap(
// state creation function
() => new CommandGenerator())(
// mapping function
(generator, nextElem) => (generator, generator.forEvent(nextElem)),
// cleanup function
generator => Some(generator.forEvent(createEvent())))
.mapConcat(identity) // flatten
.groupBy(maxSubstreams, command => command.w, allowClosedSubstreamRecreation = true)
.takeWhile(!_.isInstanceOf[CloseWindow])
.fold(AggregateEventData(Window(0L, 0L), mutable.TreeSet[MyEvent]())) {
Expand Down Expand Up @@ -116,7 +121,7 @@ object WindowingExample extends App {
// watermark: the timestamp of the *newest* event minus acceptedMaxDelay
watermark = math.max(watermark, ev.timestamp - acceptedMaxDelay)
if (ev.timestamp < watermark) {
println(s"Dropping $ev, watermark is at: ${tsToString(watermark)}")
println(s"Dropping event: $ev, watermark is at: ${tsToString(watermark)}")
Nil
} else {
val eventWindows = Window.windowsFor(ev.timestamp)
Expand All @@ -129,11 +134,11 @@ object WindowingExample extends App {
} else None
}

val openCommands = eventWindows.flatMap { w =>
if (!openWindows.contains(w)) {
println(s"Open new $w")
openWindows.add(w)
Some(OpenWindow(w))
val openCommands = eventWindows.flatMap { ew =>
if (!openWindows.contains(ew)) {
println(s"Open new $ew")
openWindows.add(ew)
Some(OpenWindow(ew))
} else None
}

Expand All @@ -144,13 +149,13 @@ object WindowingExample extends App {
}
}

case class AggregateEventData(w: Window, events: mutable.TreeSet[MyEvent]) {
case class AggregateEventData(w: Window, events: mutable.TreeSet[MyEvent]) {
override def toString =
s"From: ${tsToString(w.startTs)} to: ${tsToString(w.stopTs)}, there were: ${events.size} events. Details: $events"
}

def tsToString(ts: Long) = OffsetDateTime
.ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault())
.toLocalTime
.toString
.format(DateTimeFormatter.ofPattern("HH:mm:ss"))
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.apache.pekko.stream.scaladsl.{Keep, Sink, Source, SourceQueueWithComp
import org.apache.pekko.stream.{OverflowStrategy, QueueOfferResult}
import org.slf4j.{Logger, LoggerFactory}

import java.time.format.DateTimeFormatter
import java.time.{Instant, OffsetDateTime, ZoneId}
import scala.collection.mutable
import scala.collection.parallel.CollectionConverters._
Expand All @@ -29,7 +30,7 @@ object WindowingExampleScale extends App {
val delayFactor = 8
val acceptedMaxDelay = 6.seconds.toMillis // Lower value leads to dropping of events

implicit val ordering = new Ordering[MyEvent] {
implicit val ordering: Ordering[MyEvent] = new Ordering[MyEvent] {
def compare(x: MyEvent, y: MyEvent): Int = {
if (x.timestamp < y.timestamp) -1
else if (x.timestamp > y.timestamp) 1
Expand All @@ -44,10 +45,14 @@ object WindowingExampleScale extends App {
val windowingProcessorSourceQueue: SourceQueueWithComplete[MyEvent] =
Source
.queue[MyEvent](bufferSize, OverflowStrategy.backpressure, maxConcurrentOffers)
.statefulMapConcat { () =>
val generator = new CommandGenerator()
ev => generator.forEvent(ev)
}
.statefulMap(
// state creation function
() => new CommandGenerator())(
// mapping function
(generator, nextElem) => (generator, generator.forEvent(nextElem)),
// cleanup function
generator => Some(generator.forEvent(createEvent(0))))
.mapConcat(identity) // flatten
.groupBy(maxSubstreams, command => command.w, allowClosedSubstreamRecreation = true)
.takeWhile(!_.isInstanceOf[CloseWindow])
.fold(AggregateEventData(Window(0L, 0L), mutable.TreeSet[MyEvent]())) {
Expand All @@ -64,19 +69,19 @@ object WindowingExampleScale extends App {

(1 to numberOfPublishingClients).par.foreach(each => client(each))

def client(nbr: Int) = {
logger.info(s"Starting client with id: $nbr")
def client(id: Int) = {
logger.info(s"Starting client with id: $id")
Source
.tick(0.seconds, 10.millis, "")
.map(_ => createEvent())
.map(_ => createEvent(id))
.map(offerToSourceQueue)
.runWith(Sink.ignore)
}

private def createEvent() = {
private def createEvent(id: Int) = {
val now = System.currentTimeMillis()
val delay = random.nextInt(delayFactor)
val myEvent = MyEvent(now - delay * 1000L)
val myEvent = MyEvent(now - delay * 1000L, id)
logger.debug(s"$myEvent")
myEvent
}
Expand All @@ -90,9 +95,9 @@ object WindowingExampleScale extends App {
}
}

case class MyEvent(timestamp: Long) {
case class MyEvent(timestamp: Long, id: Int) {
override def toString =
s"Event: ${tsToString(timestamp)}"
s"Event: ${tsToString(timestamp)} source: $id"
}

case class Window(startTs: Long, stopTs: Long) {
Expand Down Expand Up @@ -132,7 +137,7 @@ object WindowingExampleScale extends App {
// watermark: the timestamp of the *newest* event minus acceptedMaxDelay
watermark = math.max(watermark, ev.timestamp - acceptedMaxDelay)
if (ev.timestamp < watermark) {
logger.debug(s"Dropping $ev, watermark is at: ${tsToString(watermark)}")
logger.debug(s"Dropping event: $ev, watermark is at: ${tsToString(watermark)}")
Nil
} else {
val eventWindows = Window.windowsFor(ev.timestamp)
Expand Down Expand Up @@ -168,5 +173,5 @@ object WindowingExampleScale extends App {
def tsToString(ts: Long) = OffsetDateTime
.ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault())
.toLocalTime
.toString
.format(DateTimeFormatter.ofPattern("HH:mm:ss"))
}

0 comments on commit c4ad9e7

Please sign in to comment.