Skip to content

Commit

Permalink
#462 Perpetual Stream should allow overriding the state to run a stre…
Browse files Browse the repository at this point in the history
…am (#476) (#485)

(cherry picked from commit da4350a)
  • Loading branch information
anilgursel authored and sebady committed Jul 10, 2017
1 parent d5d1dd8 commit 227f9aa
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 5 deletions.
61 changes: 61 additions & 0 deletions docs/streams-lifecycle.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ class WellBehavedStream extends PerpetualStream[Future[Done]] {

That's it. This stream is well behaved because it materializes to the sink's materialized value, which is a `Future[Done]`.

### Override Lifecycle State to run the stream

There may be scenarios where a stream need to be materialized at a different lifecycle than `active`. In such scenarios, override `streamRunLifecycleState`, e.g.,:

```scala
override lazy val streamRunLifecycleState: LifecycleState = Initializing
```

### Shutdown Overrides
It is sometimes not possible to define a well behaved stream. For instance, the `Sink` may not materialize to a `Future` or you need to do further cleanup at shutdown. For this reason, it is possible to override `shutdown` as in the following code:

Expand Down Expand Up @@ -125,6 +133,59 @@ class MsgReceivingStream extends PerpetualStream[(ActorRef, Future[Done])] {
}
```

## Connecting a Perpetual Stream with an HTTP Flow

Akka HTTP allows defining a `Flow[HttpRequest, HttpResponse, NotUsed]`, which gets materialized for each http connection. There are scenarios where an app needs to connect the http flow to a long running stream that needs to be materialized only once (e.g., publishing to Kafka). Akka HTTP enables end-to-end streaming in such scenarios with [MergeHub](http://doc.akka.io/docs/akka/current/scala/stream/stream-dynamic.html#dynamic-fan-in-and-fan-out-with-mergehub-and-broadcasthub). squbs provides utilities to easily connect an http flow with a `PerpetualStream` that uses `MergeHub`.


Below is a sample `PerpetualStream` that uses `MergeHub`.

```scala
class PerpetualStreamWithMergeHub extends PerpetualStream[Sink[MyMessage, NotUsed]] {

override lazy val streamRunLifecycleState: LifecycleState = Initializing

/**
* Describe your graph by implementing streamGraph
*
* @return The graph.
*/
override def streamGraph= MergeHub.source[MyMessage].to(Sink.ignore)
}
```

Let's add the above `PerpetualStream` in `squbs-meta.conf`. Please see [Well Known Actors](bootstrap.md#well-known-actors) for more details.

```
cube-name = org.squbs.stream.mycube
cube-version = "0.0.1"
squbs-services = [
{
class-name = org.squbs.stream.HttpFlowWithMergeHub
web-context = mergehub
}
]
squbs-actors = [
{
class-name = org.squbs.stream.PerpetualStreamWithMergeHub
name = perpetualStreamWithMergeHub
}
]
```

The HTTP `FlowDefinition` can be connected to the `PerpetualStream` as follows by extending `PerpetualStreamMatValue` and using `matValue` method:

```scala
class HttpFlowWithMergeHub extends FlowDefinition with PerpetualStreamMatValue[Sink[MyMessage, NotUsed]] {

override val flow: Flow[HttpRequest, HttpResponse, NotUsed] =
Flow[HttpRequest]
.mapAsync(1)(Unmarshal(_).to[MyMessage])
.alsoTo(matValue("/user/mycube/perpetualStreamWithMergeHub"))
.map { myMessage => HttpResponse(entity = s"Received Id: ${myMessage.id}") }
}
```

## Making A Lifecycle-Sensitive Source
If you wish to have a source that is fully connected to the lifecycle events of squbs, you can wrap any source with `LifecycleManaged`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
package org.squbs.stream

import akka.Done
import akka.actor.{Actor, ActorLogging, ActorRef, Stash, Terminated}
import akka.actor.{Actor, ActorContext, ActorIdentity, ActorLogging, ActorRef, Identify, Props, Stash, Terminated}
import akka.stream.Supervision._
import akka.stream.scaladsl.RunnableGraph
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, KillSwitches, Supervision}
import akka.util.Timeout
import org.squbs.lifecycle.{GracefulStop, GracefulStopHelper}
import org.squbs.unicomplex._

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.reflect.ClassTag

/**
* Traits for perpetual streams that start and stop with the server. The simplest conforming implementation
Expand Down Expand Up @@ -69,18 +72,23 @@ trait PerpetualStream[T] extends Actor with ActorLogging with Stash with Gracefu
*/
lazy val killSwitch = KillSwitches.shared(getClass.getName)

/**
* By default the stream is run when system state is Active. Override this if you want it to run in a different
* lifecycle phase.
*/
lazy val streamRunLifecycleState: LifecycleState = Active

implicit val materializer =
ActorMaterializer(ActorMaterializerSettings(context.system).withSupervisionStrategy(decider))

Unicomplex() ! SystemState
Unicomplex() ! ObtainLifecycleEvents(Active, Stopping)
Unicomplex() ! ObtainLifecycleEvents(streamRunLifecycleState, Stopping)

context.become(starting)

final def starting: Receive = {
case Active =>
context.become(running orElse receive)
case `streamRunLifecycleState` =>
context.become(running orElse flowMatValue orElse receive)
matValueOption = Option(streamGraph.run())
unstashAll()
case _ => stash()
Expand All @@ -103,6 +111,10 @@ trait PerpetualStream[T] extends Actor with ActorLogging with Stash with Gracefu
if (remaining.nonEmpty) context become stopped(remaining) else context stop self
}

final def flowMatValue: Receive = {
case MatValueRequest => sender() ! matValue
}

def receive: Receive = PartialFunction.empty

/**
Expand Down Expand Up @@ -131,3 +143,43 @@ trait PerpetualStream[T] extends Actor with ActorLogging with Stash with Gracefu
}
}
}

case object MatValueRequest

trait PerpetualStreamMatValue[T] {
protected val context: ActorContext

def matValue(perpetualStreamName: String)(implicit classTag: ClassTag[T]): T = {
implicit val timeout = Timeout(10 seconds)
import akka.pattern.ask

val responseF =
(context.actorOf(Props(classOf[MatValueRetrieverActor[T]], perpetualStreamName)) ? MatValueRequest).mapTo[T]

// Exception! This code is executed only at startup. We really need a better API, though.
Await.result(responseF, timeout.duration)
}
}

class MatValueRetrieverActor[T](wellKnownActorName: String) extends Actor {
val identifyId = 1
var flowActor: ActorRef = _

case object RetryMatValueRequest

override def receive: Receive = {
case MatValueRequest =>
self ! RetryMatValueRequest
flowActor = sender()
case RetryMatValueRequest =>
context.actorSelection(wellKnownActorName) ! Identify(identifyId)
case ActorIdentity(`identifyId`, Some(ref)) =>
ref ! MatValueRequest
case ActorIdentity(`identifyId`, None) =>
import context.dispatcher
context.system.scheduler.scheduleOnce(1 second, self, RetryMatValueRequest)
case matValue: T =>
flowActor ! matValue
context.stop(self)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
cube-name = org.squbs.stream.test.PerpetualStreamMergeHubSpec
cube-version = "0.0.1"
squbs-services = [
{
class-name = org.squbs.stream.HttpFlowWithMergeHub
web-context = mergehub
}
]
squbs-actors = [
{
class-name = org.squbs.stream.PerpetualStreamWithMergeHub
name = perpetualStreamWithMergeHub
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright 2017 PayPal
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.squbs.stream

import akka.NotUsed
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri}
import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshal, Unmarshaller}
import akka.pattern.ask
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, MergeHub, Sink}
import akka.testkit.TestKit
import com.typesafe.config.ConfigFactory
import org.scalatest.{FlatSpecLike, Matchers}
import org.squbs.unicomplex.Timeouts.{awaitMax, _}
import org.squbs.unicomplex._

import scala.collection.mutable
import scala.concurrent.Await

object PerpetualStreamMergeHubSpec {
val dummyJarsDir = getClass.getClassLoader.getResource("classpaths").getPath
val classPaths = Array("PerpetualStreamMergeHubSpec") map (dummyJarsDir + "/" + _)

val config = ConfigFactory.parseString(
s"""
|squbs {
| actorsystem-name = PerpetualStreamMergeHubSpec
| ${JMX.prefixConfig} = true
|}
""".stripMargin
)

val boot = UnicomplexBoot(config)
.createUsing {
(name, config) => ActorSystem(name, config)
}
.scanComponents(classPaths)
.start()
}

class PerpetualStreamMergeHubSpec extends TestKit(PerpetualStreamMergeHubSpec.boot.actorSystem)
with FlatSpecLike with Matchers {

val portBindings = Await.result((Unicomplex(system).uniActor ? PortBindings).mapTo[Map[String, Int]], awaitMax)
val psActorName = "/user/PerpetualStreamMergeHubSpec/perpetualStreamWithMergeHub"
val actorRef = Await.result((system.actorSelection(psActorName) ? RetrieveMyMessageStorageActorRef).mapTo[ActorRef], awaitMax)
val port = portBindings("default-listener")


it should "connect streams with mergehub" in {

implicit val ac = ActorMaterializer()
Http().singleRequest(HttpRequest(uri = Uri(s"http://127.0.0.1:$port/mergehub"), entity = "10"))
Http().singleRequest(HttpRequest(uri = Uri(s"http://127.0.0.1:$port/mergehub"), entity = "11"))

awaitAssert {
val messages = Await.result((actorRef ? RetrieveMyMessages).mapTo[mutable.Set[MyMessage]], awaitMax)
messages should have size 2
messages should contain(MyMessage(10))
messages should contain(MyMessage(11))
}
}
}

case class MyMessage(id: Int)

class HttpFlowWithMergeHub extends FlowDefinition with PerpetualStreamMatValue[Sink[MyMessage, NotUsed]] {

import context.dispatcher

import scala.concurrent.duration._
implicit val mat = ActorMaterializer()

implicit val myMessageUnmarshaller: FromEntityUnmarshaller[MyMessage] =
Unmarshaller { implicit ex entity entity.toStrict(1.second).map(e => MyMessage(e.data.utf8String.toInt)) }

override val flow: Flow[HttpRequest, HttpResponse, NotUsed] =
Flow[HttpRequest]
.mapAsync(1)(Unmarshal(_).to[MyMessage])
.alsoTo(matValue("/user/PerpetualStreamMergeHubSpec/perpetualStreamWithMergeHub"))
.map { myMessage => HttpResponse(entity = s"Received Id: ${myMessage.id}") }
}

class PerpetualStreamWithMergeHub extends PerpetualStream[Sink[MyMessage, NotUsed]] {

override lazy val streamRunLifecycleState: LifecycleState = Initializing

val source = MergeHub.source[MyMessage]

val myMessageStorageActor = context.actorOf(Props[MyMessageStorageActor])

/**
* Describe your graph by implementing streamGraph
*
* @return The graph.
*/
override def streamGraph= source.to(Sink.actorRef(myMessageStorageActor, "Done"))

override def receive: Receive = {
case RetrieveMyMessageStorageActorRef => sender() ! myMessageStorageActor
}
}

object RetrieveMyMessages
object RetrieveMyMessageStorageActorRef

class MyMessageStorageActor extends Actor {

val myMessageSet = mutable.Set[MyMessage]()

override def receive: Receive = {
case myMessage: MyMessage => myMessageSet.add(myMessage)
case RetrieveMyMessages => sender() ! myMessageSet
}
}

0 comments on commit 227f9aa

Please sign in to comment.