Skip to content

Commit

Permalink
take revision value into account when deleting DurableState (#156)
Browse files Browse the repository at this point in the history
* take revision value into account when deleting

add test

add tests

scalafmt

update test

Update DurableStateQueries.scala

scalafmt

* temporarily throw DurableStateStoreException

* Update JdbcDurableStateStore.scala

* rework exceptions

* Update configuration.md

* Update configuration.md

* Update JdbcDurableStateSpec.scala

* uptake apache/pekko#1271

* scalafmt

* Update JdbcDurableStateSpec.scala

* try to fix test

* Update DurableStateExceptionSupport.scala

* refactor

* use parasitic context

* apply review suggestions

* review change

* Update JdbcDurableStateStore.scala

* revert doc change

* Update JdbcDurableStateStore.scala
  • Loading branch information
pjfanning committed Apr 25, 2024
1 parent 3ee66be commit 08db474
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package org.apache.pekko.persistence.jdbc.state
import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.persistence.jdbc.config.DurableStateTableConfiguration

import slick.jdbc.{ H2Profile, JdbcProfile, OracleProfile, PostgresProfile, SQLServerProfile, SetParameter }

/**
Expand Down Expand Up @@ -92,6 +93,16 @@ import slick.jdbc.{ H2Profile, JdbcProfile, OracleProfile, PostgresProfile, SQLS
durableStateTable.filter(_.persistenceId === persistenceId).delete
}

/**
* Deletes a particular revision of an object based on its persistenceId.
* This revision may no longer exist and if so, no delete will occur.
*
* @since 1.1.0
*/
private[jdbc] def deleteBasedOnPersistenceIdAndRevision(persistenceId: String, revision: Long) = {
selectFromDbByPersistenceId(persistenceId).filter(_.revision === revision).delete
}

def deleteAllFromDb() = {
durableStateTable.delete
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.persistence.jdbc.state.scaladsl

import java.lang.invoke.{ MethodHandles, MethodType }

import scala.util.Try

/**
* INTERNAL API
*
* Support for creating a `DeleteRevisionException`if the class is
* available on the classpath. Pekko 1.0 does not have this class, but
* it is added in Pekko 1.1.
*/
private[scaladsl] object DurableStateExceptionSupport {
val DeleteRevisionExceptionClass =
"org.apache.pekko.persistence.state.exception.DeleteRevisionException"

private def exceptionClassOpt: Option[Class[_]] =
Try(Class.forName(DeleteRevisionExceptionClass)).toOption

private val constructorOpt = exceptionClassOpt.map { clz =>
val mt = MethodType.methodType(classOf[Unit], classOf[String])
MethodHandles.publicLookup().findConstructor(clz, mt)
}

def createDeleteRevisionExceptionIfSupported(message: String): Option[Exception] =
constructorOpt.map { constructor =>
constructor.invoke(message).asInstanceOf[Exception]
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,23 @@ import slick.jdbc.{ JdbcBackend, JdbcProfile }
import org.apache.pekko
import pekko.{ Done, NotUsed }
import pekko.actor.ExtendedActorSystem
import pekko.annotation.ApiMayChange
import pekko.dispatch.ExecutionContexts
import pekko.pattern.ask
import pekko.persistence.jdbc.PekkoSerialization
import pekko.persistence.jdbc.state.DurableStateQueries
import pekko.persistence.jdbc.config.DurableStateTableConfiguration
import pekko.persistence.jdbc.state.{ DurableStateTables, OffsetSyntax }
import pekko.persistence.query.{ DurableStateChange, Offset }
import pekko.persistence.jdbc.journal.dao.FlowControl
import pekko.persistence.jdbc.state.{ scaladsl => jdbcStateScalaDsl }
import pekko.persistence.state.{ scaladsl => stateScalaDsl }
import pekko.persistence.query.{ DurableStateChange, Offset, UpdatedDurableState }
import pekko.persistence.query.{ scaladsl => queryScalaDsl }
import pekko.persistence.state.{ scaladsl => stateScalaDsl }
import pekko.serialization.Serialization
import pekko.stream.scaladsl.{ Sink, Source }
import pekko.stream.{ Materializer, SystemMaterializer }
import pekko.util.Timeout
import OffsetSyntax._
import pekko.annotation.ApiMayChange
import pekko.persistence.query.UpdatedDurableState

object JdbcDurableStateStore {
val Identifier = "jdbc-durable-state-store"
Expand Down Expand Up @@ -70,7 +70,7 @@ class JdbcDurableStateStore[A](
durableStateConfig.stateSequenceConfig),
s"pekko-persistence-jdbc-durable-state-sequence-actor")

def getObject(persistenceId: String): Future[stateScalaDsl.GetObjectResult[A]] = {
override def getObject(persistenceId: String): Future[stateScalaDsl.GetObjectResult[A]] = {
db.run(queries.selectFromDbByPersistenceId(persistenceId).result).map { rows =>
rows.headOption match {
case Some(row) =>
Expand All @@ -84,7 +84,7 @@ class JdbcDurableStateStore[A](
}
}

def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = {
override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = {
require(revision > 0)
val row =
PekkoSerialization.serialize(serialization, value).map { serialized =>
Expand Down Expand Up @@ -113,21 +113,35 @@ class JdbcDurableStateStore[A](
}
}

def deleteObject(persistenceId: String): Future[Done] =
override def deleteObject(persistenceId: String): Future[Done] =
db.run(queries.deleteFromDb(persistenceId).map(_ => Done))

def deleteObject(persistenceId: String, revision: Long): Future[Done] =
db.run(queries.deleteFromDb(persistenceId).map(_ => Done))
override def deleteObject(persistenceId: String, revision: Long): Future[Done] =
db.run(queries.deleteBasedOnPersistenceIdAndRevision(persistenceId, revision)).map { count =>
if (count != 1) {
// if you run this code with Pekko 1.0.x, no exception will be thrown here
// this matches the behavior of pekko-connectors-jdbc 1.0.x
// if you run this code with Pekko 1.1.x, a DeleteRevisionException will be thrown here
val msg = if (count == 0) {
s"Failed to delete object with persistenceId [$persistenceId] and revision [$revision]"
} else {
s"Delete object succeeded for persistenceId [$persistenceId] and revision [$revision] but more than one row was affected ($count rows)"
}
DurableStateExceptionSupport.createDeleteRevisionExceptionIfSupported(msg)
.foreach(throw _)
}
Done
}(ExecutionContexts.parasitic)

def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed] = {
override def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed] = {
Source
.futureSource(maxStateStoreOffset().map { maxOrderingInDb =>
changesByTag(tag, offset.value, terminateAfterOffset = Some(maxOrderingInDb))
})
.mapMaterializedValue(_ => NotUsed)
}

def changes(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed] =
override def changes(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed] =
changesByTag(tag, offset.value, terminateAfterOffset = None)

private def currentChangesByTag(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,47 @@ abstract class JdbcDurableStateSpec(config: Config, schemaType: SchemaType) exte
}
}
}
"fail to delete old object revision" in {
val f = for {
n <- stateStoreString.upsertObject("p987", 1, "a valid string", "t123")
_ = n shouldBe pekko.Done
g <- stateStoreString.getObject("p987")
_ = g.value shouldBe Some("a valid string")
u <- stateStoreString.upsertObject("p987", 2, "updated valid string", "t123")
_ = u shouldBe pekko.Done
d <- stateStoreString.deleteObject("p987", 1)
} yield d
if (pekko.Version.current.startsWith("1.0")) {
whenReady(f) { v =>
v shouldBe pekko.Done
}
} else {
whenReady(f.failed) { e =>
e.getClass.getName shouldEqual DurableStateExceptionSupport.DeleteRevisionExceptionClass
e.getMessage should include("Failed to delete object with persistenceId [p987] and revision [1]")
}
}
}
"delete latest object revision but not older one" in {
whenReady {
for {

n <- stateStoreString.upsertObject("p9876", 1, "a valid string", "t123")
_ = n shouldBe pekko.Done
g <- stateStoreString.getObject("p9876")
_ = g.value shouldBe Some("a valid string")
u <- stateStoreString.upsertObject("p9876", 2, "updated valid string", "t123")
_ = u shouldBe pekko.Done
d <- stateStoreString.deleteObject("p9876", 2)
_ = d shouldBe pekko.Done
h <- stateStoreString.getObject("p9876")

} yield h
} { v =>
// current behavior is that deleting the latest revision means getObject returns None (we don't preserve older revisions)
v.value shouldBe None
}
}
}

"A durable state store with payload that needs custom serializer" must withActorSystem { implicit system =>
Expand Down

0 comments on commit 08db474

Please sign in to comment.