Skip to content

Commit

Permalink
rework exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
pjfanning committed Apr 13, 2024
1 parent e2dd191 commit 7bb31ba
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ import slick.jdbc.SQLServerProfile
private[jdbc] def selectFromDbByPersistenceId(persistenceId: Rep[String]) =
durableStateTable.filter(_.persistenceId === persistenceId)

private[jdbc] def selectFromDbByPersistenceId(persistenceId: String) =
durableStateTable.filter(_.persistenceId === persistenceId)

private[jdbc] def insertDbWithDurableState(row: DurableStateTables.DurableStateRow, seqNextValue: String) = {
sqlu"""INSERT INTO #${durableStateTableCfg.tableName}
(
Expand Down Expand Up @@ -109,7 +112,7 @@ import slick.jdbc.SQLServerProfile
*
* @since 1.1.0
*/
def deleteBasedOnPersistenceIdAndRevision(persistenceId: String, revision: Long) = {
private[jdbc] def deleteBasedOnPersistenceIdAndRevision(persistenceId: String, revision: Long) = {
durableStateTable.filter(r => r.persistenceId === persistenceId && r.revision === revision).delete
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,27 @@ class JdbcDurableStateStore[A](
override def deleteObject(persistenceId: String): Future[Done] =
db.run(queries.deleteFromDb(persistenceId).map(_ => Done))

override def deleteObject(persistenceId: String, revision: Long): Future[Done] =
db.run(queries.deleteBasedOnPersistenceIdAndRevision(persistenceId, revision).map {
count =>
{
if (count == 0)
throw new DurableStateStoreException(
s"Object with persistenceId [$persistenceId] and revision [$revision] does not exist", null)
else Done
}
})
override def deleteObject(persistenceId: String, revision: Long): Future[Done] = {
db.run(queries.deleteBasedOnPersistenceIdAndRevision(persistenceId, revision)).flatMap { count =>
{
if (count == 0) {
db.run(queries.selectFromDbByPersistenceId(persistenceId).result).map { state =>
state.headOption match {
case Some(row) if row.revision < revision =>
throw new DurableStateStoreException(
s"Out of date revision [$revision] for object with persistenceId [$persistenceId]", null)
case Some(_) =>
throw new DurableStateStoreException(
s"Unknown revision [$revision] for object with persistenceId [$persistenceId]", null)
case _ =>
throw new DurableStateStoreException(
s"Object with persistenceId [$persistenceId] does not exist", null)
}
}
} else Future.successful(Done)
}
}
}

override def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed] = {
Source
Expand Down

0 comments on commit 7bb31ba

Please sign in to comment.