Skip to content
This repository has been archived by the owner on Mar 16, 2022. It is now read-only.

Commit

Permalink
Update failure handling in java support
Browse files Browse the repository at this point in the history
  • Loading branch information
pvlugter committed Aug 3, 2020
1 parent 995778d commit c394d25
Show file tree
Hide file tree
Showing 6 changed files with 449 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ object CloudStateRunner {
* CloudStateRunner can be seen as a low-level API for cases where [[io.cloudstate.javasupport.CloudState.start()]] isn't enough.
*/
final class CloudStateRunner private[this] (_system: ActorSystem, services: Map[String, StatefulService]) {
private[this] implicit final val system = _system
private[javasupport] implicit final val system = _system
private[this] implicit final val materializer: Materializer = ActorMaterializer()

private[this] final val configuration =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ private[impl] trait AbstractClientActionContext extends ClientActionContext {
checkActive()
if (error.isEmpty) {
error = Some(errorMessage)
logError(errorMessage)
throw FailInvoked
} else throw new IllegalStateException("fail(…) already previously invoked!")
}
Expand All @@ -81,6 +82,8 @@ private[impl] trait AbstractClientActionContext extends ClientActionContext {

final def hasError: Boolean = error.isDefined

protected def logError(message: String): Unit = ()

final def createClientAction(reply: Optional[JavaPbAny], allowNoReply: Boolean): Option[ClientAction] =
error match {
case Some(msg) => Some(ClientAction(ClientAction.Action.Failure(Failure(commandId, msg))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import io.cloudstate.javasupport.impl.{AnySupport, ReflectionHelper, ResolvedEnt

import scala.collection.concurrent.TrieMap
import com.google.protobuf.{Descriptors, Any => JavaPbAny}
import io.cloudstate.javasupport.impl.eventsourced.EventSourcedImpl.EntityException
import io.cloudstate.javasupport.{EntityFactory, ServiceCallFactory}

/**
Expand Down Expand Up @@ -78,7 +79,7 @@ private[impl] class AnnotationBasedEventSourcedSupport(
}
handler.invoke(entity, event, ctx)
case None =>
throw new RuntimeException(
throw EntityException(
s"No event handler found for event ${event.getClass} on $behaviorsString"
)
}
Expand All @@ -88,7 +89,8 @@ private[impl] class AnnotationBasedEventSourcedSupport(
behavior.commandHandlers.get(context.commandName()).map { handler =>
handler.invoke(entity, command, context)
} getOrElse {
throw new RuntimeException(
throw EntityException(
context,
s"No command handler found for command [${context.commandName()}] on $behaviorsString"
)
}
Expand All @@ -104,7 +106,7 @@ private[impl] class AnnotationBasedEventSourcedSupport(
}
handler.invoke(entity, snapshot, ctx)
case None =>
throw new RuntimeException(
throw EntityException(
s"No snapshot handler found for snapshot ${snapshot.getClass} on $behaviorsString"
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.util.Optional

import akka.NotUsed
import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.scaladsl.Flow
import com.google.protobuf.{Descriptors, Any => JavaPbAny}
import com.google.protobuf.any.{Any => ScalaPbAny}
Expand All @@ -35,14 +36,16 @@ import io.cloudstate.javasupport.impl.{
ResolvedEntityFactory,
ResolvedServiceMethod
}
import io.cloudstate.protocol.entity.{Command, Failure}
import io.cloudstate.protocol.event_sourced.EventSourcedStreamIn.Message.{
Command => InCommand,
Empty => InEmpty,
Event => InEvent,
Init => InInit
}
import io.cloudstate.protocol.event_sourced.EventSourcedStreamOut.Message.{Reply => OutReply}
import io.cloudstate.protocol.event_sourced.EventSourcedStreamOut.Message.{Failure => OutFailure, Reply => OutReply}
import io.cloudstate.protocol.event_sourced._
import scala.util.control.NonFatal

final class EventSourcedStatefulService(val factory: EventSourcedEntityFactory,
override val descriptor: Descriptors.ServiceDescriptor,
Expand All @@ -65,11 +68,53 @@ final class EventSourcedStatefulService(val factory: EventSourcedEntityFactory,
this
}

object EventSourcedImpl {
final case class EntityException(entityId: String, commandId: Long, commandName: String, message: String)
extends RuntimeException(message)

object EntityException {
def apply(message: String): EntityException =
EntityException(entityId = "", commandId = 0, commandName = "", message)

def apply(command: Command, message: String): EntityException =
EntityException(command.entityId, command.id, command.name, message)

def apply(context: CommandContext, message: String): EntityException =
EntityException(context.entityId, context.commandId, context.commandName, message)
}

object ProtocolException {
def apply(message: String): EntityException =
EntityException(entityId = "", commandId = 0, commandName = "", "Protocol error: " + message)

def apply(init: EventSourcedInit, message: String): EntityException =
EntityException(init.entityId, commandId = 0, commandName = "", "Protocol error: " + message)

def apply(command: Command, message: String): EntityException =
EntityException(command.entityId, command.id, command.name, "Protocol error: " + message)
}

def failure(cause: Throwable): Failure = cause match {
case e: EntityException => Failure(e.commandId, e.message)
case e => Failure(description = "Unexpected failure: " + e.getMessage)
}

def failureMessage(cause: Throwable): String = cause match {
case EntityException(entityId, commandId, commandName, _) =>
val commandDescription = if (commandId != 0) s" for command [$commandName]" else ""
val entityDescription = if (entityId.nonEmpty) s"entity [$entityId]" else "entity"
s"Terminating $entityDescription due to unexpected failure$commandDescription"
case _ => "Terminating entity due to unexpected failure"
}
}

final class EventSourcedImpl(_system: ActorSystem,
_services: Map[String, EventSourcedStatefulService],
rootContext: Context,
configuration: Configuration)
extends EventSourced {
import EventSourcedImpl._

private final val system = _system
private final val services = _services.iterator
.map({
Expand All @@ -79,6 +124,8 @@ final class EventSourcedImpl(_system: ActorSystem,
})
.toMap

private val log = Logging(system.eventStream, this.getClass)

/**
* The stream. One stream will be established per active entity.
* Once established, the first message sent will be Init, which contains the entity ID, and,
Expand All @@ -99,18 +146,17 @@ final class EventSourcedImpl(_system: ActorSystem,
case (Seq(EventSourcedStreamIn(InInit(init), _)), source) =>
source.via(runEntity(init))
case _ =>
// todo better error
throw new RuntimeException("Expected Init message")
throw ProtocolException("Expected Init message")
}
.recover {
case e =>
// FIXME translate to failure message
throw e
case error =>
log.error(error, failureMessage(error))
EventSourcedStreamOut(OutFailure(failure(error)))
}

private def runEntity(init: EventSourcedInit): Flow[EventSourcedStreamIn, EventSourcedStreamOut, NotUsed] = {
val service =
services.getOrElse(init.serviceName, throw new RuntimeException(s"Service not found: ${init.serviceName}"))
services.getOrElse(init.serviceName, throw ProtocolException(init, s"Service not found: ${init.serviceName}"))
val handler = service.factory.create(new EventSourcedContextImpl(init.entityId))
val thisEntityId = init.entityId

Expand All @@ -137,33 +183,35 @@ final class EventSourcedImpl(_system: ActorSystem,
(event.sequence, None)
case ((sequence, _), InCommand(command)) =>
if (thisEntityId != command.entityId)
throw new IllegalStateException("Receiving entity is not the intended recipient of command")
val cmd = ScalaPbAny.toJavaProto(command.payload.get)
val context = new CommandContextImpl(thisEntityId,
sequence,
command.name,
command.id,
service.anySupport,
handler,
service.snapshotEvery)
throw ProtocolException(command, "Receiving entity is not the intended recipient of command")
val cmd =
ScalaPbAny.toJavaProto(command.payload.getOrElse(throw ProtocolException(command, "No command payload")))
val context =
new CommandContextImpl(thisEntityId, sequence, command.name, command.id, service.anySupport, log)

val reply = try {
handler.handleCommand(cmd, context) // FIXME is this allowed to throw
handler.handleCommand(cmd, context)
} catch {
case FailInvoked =>
Optional.empty[JavaPbAny]()
// Ignore, error already captured
case FailInvoked => Optional.empty[JavaPbAny]() // Ignore, error already captured
case e: EntityException => throw e
case NonFatal(error) => throw EntityException(command, "Unexpected failure: " + error.getMessage)
} finally {
context.deactivate() // Very important!
}

val clientAction = context.createClientAction(reply, false)

if (!context.hasError) {
val endSequenceNumber = sequence + context.events.size
// apply events from successful command to local entity state
context.events.zipWithIndex.foreach {
case (event, i) =>
handler.handleEvent(ScalaPbAny.toJavaProto(event), new EventContextImpl(thisEntityId, sequence + i + 1))
}

val endSequenceNumber = sequence + context.events.size
val performSnapshot = (endSequenceNumber / service.snapshotEvery) > (sequence / service.snapshotEvery)
val snapshot =
if (context.performSnapshot) {
if (performSnapshot) {
val s = handler.snapshot(new SnapshotContext with AbstractContext {
override def entityId: String = entityId
override def sequenceNumber: Long = endSequenceNumber
Expand Down Expand Up @@ -195,9 +243,9 @@ final class EventSourcedImpl(_system: ActorSystem,
))
}
case (_, InInit(i)) =>
throw new IllegalStateException("Entity already inited")
throw ProtocolException(init, "Entity already inited")
case (_, InEmpty) =>
throw new IllegalStateException("Received empty/unknown message")
throw ProtocolException(init, "Received empty/unknown message")
}
.collect {
case (_, Some(message)) => EventSourcedStreamOut(message)
Expand All @@ -213,25 +261,22 @@ final class EventSourcedImpl(_system: ActorSystem,
override val commandName: String,
override val commandId: Long,
val anySupport: AnySupport,
val handler: EventSourcedEntityHandler,
val snapshotEvery: Int)
val log: LoggingAdapter)
extends CommandContext
with AbstractContext
with AbstractClientActionContext
with AbstractEffectContext
with ActivatableContext {

final var events: Vector[ScalaPbAny] = Vector.empty
final var performSnapshot: Boolean = false

override def emit(event: AnyRef): Unit = {
checkActive()
val encoded = anySupport.encodeScala(event)
val nextSequenceNumber = sequenceNumber + events.size + 1
handler.handleEvent(ScalaPbAny.toJavaProto(encoded), new EventContextImpl(entityId, nextSequenceNumber))
events :+= encoded
performSnapshot = (snapshotEvery > 0) && (performSnapshot || (nextSequenceNumber % snapshotEvery == 0))
events :+= anySupport.encodeScala(event)
}

override protected def logError(message: String): Unit =
log.error("Fail invoked for command [{}] for entity [{}]: {}", commandName, entityId, message)
}

class EventSourcedContextImpl(override final val entityId: String) extends EventSourcedContext with AbstractContext
Expand Down
Loading

0 comments on commit c394d25

Please sign in to comment.