diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/DurableStateQueries.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/DurableStateQueries.scala index 3c39ce31..7b433fb4 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/DurableStateQueries.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/DurableStateQueries.scala @@ -17,6 +17,7 @@ package org.apache.pekko.persistence.jdbc.state import org.apache.pekko import pekko.annotation.InternalApi import pekko.persistence.jdbc.config.DurableStateTableConfiguration + import slick.jdbc.{ H2Profile, JdbcProfile, OracleProfile, PostgresProfile, SQLServerProfile, SetParameter } /** @@ -92,6 +93,16 @@ import slick.jdbc.{ H2Profile, JdbcProfile, OracleProfile, PostgresProfile, SQLS durableStateTable.filter(_.persistenceId === persistenceId).delete } + /** + * Deletes a particular revision of an object based on its persistenceId. + * This revision may no longer exist and if so, no delete will occur. + * + * @since 1.1.0 + */ + private[jdbc] def deleteBasedOnPersistenceIdAndRevision(persistenceId: String, revision: Long) = { + selectFromDbByPersistenceId(persistenceId).filter(_.revision === revision).delete + } + def deleteAllFromDb() = { durableStateTable.delete } diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateExceptionSupport.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateExceptionSupport.scala new file mode 100644 index 00000000..88e19316 --- /dev/null +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateExceptionSupport.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.persistence.jdbc.state.scaladsl + +import java.lang.invoke.{ MethodHandles, MethodType } + +import scala.util.Try + +/** + * INTERNAL API + * + * Support for creating a `DeleteRevisionException`if the class is + * available on the classpath. Pekko 1.0 does not have this class, but + * it is added in Pekko 1.1. + */ +private[scaladsl] object DurableStateExceptionSupport { + val DeleteRevisionExceptionClass = + "org.apache.pekko.persistence.state.exception.DeleteRevisionException" + + private def exceptionClassOpt: Option[Class[_]] = + Try(Class.forName(DeleteRevisionExceptionClass)).toOption + + private val constructorOpt = exceptionClassOpt.map { clz => + val mt = MethodType.methodType(classOf[Unit], classOf[String]) + MethodHandles.publicLookup().findConstructor(clz, mt) + } + + def createDeleteRevisionExceptionIfSupported(message: String): Option[Exception] = + constructorOpt.map { constructor => + constructor.invoke(message).asInstanceOf[Exception] + } + +} diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala index e0795e48..b1acbf17 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateStore.scala @@ -22,23 +22,23 @@ import slick.jdbc.{ JdbcBackend, JdbcProfile } import org.apache.pekko import pekko.{ Done, NotUsed } import pekko.actor.ExtendedActorSystem +import pekko.annotation.ApiMayChange +import pekko.dispatch.ExecutionContexts import pekko.pattern.ask import pekko.persistence.jdbc.PekkoSerialization import pekko.persistence.jdbc.state.DurableStateQueries import pekko.persistence.jdbc.config.DurableStateTableConfiguration import pekko.persistence.jdbc.state.{ DurableStateTables, OffsetSyntax } -import pekko.persistence.query.{ DurableStateChange, Offset } import pekko.persistence.jdbc.journal.dao.FlowControl import pekko.persistence.jdbc.state.{ scaladsl => jdbcStateScalaDsl } -import pekko.persistence.state.{ scaladsl => stateScalaDsl } +import pekko.persistence.query.{ DurableStateChange, Offset, UpdatedDurableState } import pekko.persistence.query.{ scaladsl => queryScalaDsl } +import pekko.persistence.state.{ scaladsl => stateScalaDsl } import pekko.serialization.Serialization import pekko.stream.scaladsl.{ Sink, Source } import pekko.stream.{ Materializer, SystemMaterializer } import pekko.util.Timeout import OffsetSyntax._ -import pekko.annotation.ApiMayChange -import pekko.persistence.query.UpdatedDurableState object JdbcDurableStateStore { val Identifier = "jdbc-durable-state-store" @@ -70,7 +70,7 @@ class JdbcDurableStateStore[A]( durableStateConfig.stateSequenceConfig), s"pekko-persistence-jdbc-durable-state-sequence-actor") - def getObject(persistenceId: String): Future[stateScalaDsl.GetObjectResult[A]] = { + override def getObject(persistenceId: String): Future[stateScalaDsl.GetObjectResult[A]] = { db.run(queries.selectFromDbByPersistenceId(persistenceId).result).map { rows => rows.headOption match { case Some(row) => @@ -84,7 +84,7 @@ class JdbcDurableStateStore[A]( } } - def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = { + override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = { require(revision > 0) val row = PekkoSerialization.serialize(serialization, value).map { serialized => @@ -113,13 +113,27 @@ class JdbcDurableStateStore[A]( } } - def deleteObject(persistenceId: String): Future[Done] = + override def deleteObject(persistenceId: String): Future[Done] = db.run(queries.deleteFromDb(persistenceId).map(_ => Done)) - def deleteObject(persistenceId: String, revision: Long): Future[Done] = - db.run(queries.deleteFromDb(persistenceId).map(_ => Done)) + override def deleteObject(persistenceId: String, revision: Long): Future[Done] = + db.run(queries.deleteBasedOnPersistenceIdAndRevision(persistenceId, revision)).map { count => + if (count != 1) { + // if you run this code with Pekko 1.0.x, no exception will be thrown here + // this matches the behavior of pekko-connectors-jdbc 1.0.x + // if you run this code with Pekko 1.1.x, a DeleteRevisionException will be thrown here + val msg = if (count == 0) { + s"Failed to delete object with persistenceId [$persistenceId] and revision [$revision]" + } else { + s"Delete object succeeded for persistenceId [$persistenceId] and revision [$revision] but more than one row was affected ($count rows)" + } + DurableStateExceptionSupport.createDeleteRevisionExceptionIfSupported(msg) + .foreach(throw _) + } + Done + }(ExecutionContexts.parasitic) - def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed] = { + override def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed] = { Source .futureSource(maxStateStoreOffset().map { maxOrderingInDb => changesByTag(tag, offset.value, terminateAfterOffset = Some(maxOrderingInDb)) @@ -127,7 +141,7 @@ class JdbcDurableStateStore[A]( .mapMaterializedValue(_ => NotUsed) } - def changes(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed] = + override def changes(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed] = changesByTag(tag, offset.value, terminateAfterOffset = None) private def currentChangesByTag( diff --git a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala index 84192ce6..01681f4e 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/JdbcDurableStateSpec.scala @@ -120,6 +120,47 @@ abstract class JdbcDurableStateSpec(config: Config, schemaType: SchemaType) exte } } } + "fail to delete old object revision" in { + val f = for { + n <- stateStoreString.upsertObject("p987", 1, "a valid string", "t123") + _ = n shouldBe pekko.Done + g <- stateStoreString.getObject("p987") + _ = g.value shouldBe Some("a valid string") + u <- stateStoreString.upsertObject("p987", 2, "updated valid string", "t123") + _ = u shouldBe pekko.Done + d <- stateStoreString.deleteObject("p987", 1) + } yield d + if (pekko.Version.current.startsWith("1.0")) { + whenReady(f) { v => + v shouldBe pekko.Done + } + } else { + whenReady(f.failed) { e => + e.getClass.getName shouldEqual DurableStateExceptionSupport.DeleteRevisionExceptionClass + e.getMessage should include("Failed to delete object with persistenceId [p987] and revision [1]") + } + } + } + "delete latest object revision but not older one" in { + whenReady { + for { + + n <- stateStoreString.upsertObject("p9876", 1, "a valid string", "t123") + _ = n shouldBe pekko.Done + g <- stateStoreString.getObject("p9876") + _ = g.value shouldBe Some("a valid string") + u <- stateStoreString.upsertObject("p9876", 2, "updated valid string", "t123") + _ = u shouldBe pekko.Done + d <- stateStoreString.deleteObject("p9876", 2) + _ = d shouldBe pekko.Done + h <- stateStoreString.getObject("p9876") + + } yield h + } { v => + // current behavior is that deleting the latest revision means getObject returns None (we don't preserve older revisions) + v.value shouldBe None + } + } } "A durable state store with payload that needs custom serializer" must withActorSystem { implicit system =>