Skip to content

Commit

Permalink
feat: durable state database support (#158)
Browse files Browse the repository at this point in the history
resolves: #56, support Oracle and SQLServer(MSSQL)
  • Loading branch information
Roiocam authored Apr 19, 2024
1 parent 333613b commit 8df083a
Show file tree
Hide file tree
Showing 20 changed files with 329 additions and 42 deletions.
39 changes: 39 additions & 0 deletions core/src/main/resources/schema/oracle/oracle-create-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,42 @@ BEGIN
EXECUTE IMMEDIATE 'ALTER SEQUENCE EVENT_JOURNAL__ORDERING_SEQ INCREMENT BY 1 MINVALUE 0';
END;
/

CREATE SEQUENCE DURABLE_STATE__GLOBAL_OFFSET_SEQ START WITH 1 INCREMENT BY 1 NOMAXVALUE
/

CREATE TABLE DURABLE_STATE
(
GLOBAL_OFFSET NUMERIC NOT NULL,
PERSISTENCE_ID VARCHAR(255) NOT NULL,
REVISION NUMERIC NOT NULL,
STATE_PAYLOAD BLOB NOT NULL,
STATE_SERIAL_ID NUMBER(10) NOT NULL,
STATE_SERIAL_MANIFEST VARCHAR(255),
TAG VARCHAR(255) NOT NULL,
STATE_TIMESTAMP NUMERIC NOT NULL,
PRIMARY KEY (PERSISTENCE_ID)
)
/

CREATE OR REPLACE TRIGGER DURABLE_STATE__GLOBAL_OFFSET_SEQ_TRG
before insert
on DURABLE_STATE
REFERENCING NEW AS NEW
FOR EACH ROW
WHEN (new.GLOBAL_OFFSET is null)
begin
select DURABLE_STATE__GLOBAL_OFFSET_SEQ.nextval into :new.GLOBAL_OFFSET from sys.dual;
end;
/

CREATE OR REPLACE PROCEDURE "RESET__GLOBAL_OFFSET"
IS
l_value NUMBER;
BEGIN
EXECUTE IMMEDIATE 'SELECT DURABLE_STATE__GLOBAL_OFFSET_SEQ.nextval FROM dual' INTO l_value;
EXECUTE IMMEDIATE 'ALTER SEQUENCE DURABLE_STATE__GLOBAL_OFFSET_SEQ INCREMENT BY -' || l_value || ' MINVALUE 0';
EXECUTE IMMEDIATE 'SELECT DURABLE_STATE__GLOBAL_OFFSET_SEQ.nextval FROM dual' INTO l_value;
EXECUTE IMMEDIATE 'ALTER SEQUENCE DURABLE_STATE__GLOBAL_OFFSET_SEQ INCREMENT BY 1 MINVALUE 0';
END;
/
18 changes: 12 additions & 6 deletions core/src/main/resources/schema/oracle/oracle-drop-schema.sql
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
ALTER SESSION SET ddl_lock_timeout = 15
/

DROP TABLE EVENT_TAG CASCADE CONSTRAINT
DROP SEQUENCE EVENT_JOURNAL__ORDERING_SEQ
/

DROP TABLE EVENT_JOURNAL CASCADE CONSTRAINT
DROP TRIGGER EVENT_JOURNAL__ORDERING_TRG
/

DROP TABLE SNAPSHOT CASCADE CONSTRAINT
DROP SEQUENCE DURABLE_STATE__GLOBAL_OFFSET_SEQ
/

DROP TABLE SNAPSHOT CASCADE CONSTRAINT
DROP TRIGGER DURABLE_STATE__GLOBAL_OFFSET_SEQ_TRG
/

DROP SEQUENCE EVENT_JOURNAL__ORDERING_SEQ
DROP TABLE EVENT_TAG CASCADE CONSTRAINT
/

DROP TRIGGER EVENT_JOURNAL__ORDERING_TRG
DROP TABLE EVENT_JOURNAL CASCADE CONSTRAINT
/

DROP TABLE SNAPSHOT CASCADE CONSTRAINT
/

DROP TABLE DURABLE_STATE CASCADE CONSTRAINT
/
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,24 @@ CREATE TABLE "snapshot" (
PRIMARY KEY ("persistence_id", "sequence_number")
)

-- Create Sequence Object
CREATE SEQUENCE global_offset
START WITH 1
INCREMENT BY 1;

CREATE TABLE durable_state
(
"global_offset" BIGINT
CONSTRAINT [df_global_offset] DEFAULT
(NEXT VALUE FOR global_offset),
"persistence_id" VARCHAR(255) NOT NULL,
"revision" NUMERIC(10, 0) NOT NULL,
"state_payload" VARBINARY(MAX) NOT NULL,
"state_serial_id" INTEGER NOT NULL,
"state_serial_manifest" VARCHAR(MAX),
"tag" VARCHAR(255),
"state_timestamp" BIGINT NOT NULL
PRIMARY KEY ("persistence_id")
);
CREATE INDEX durable_state_tag_idx on durable_state (tag);
CREATE UNIQUE INDEX durable_state_global_offset_idx ON durable_state (global_offset);
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
DROP TABLE IF EXISTS event_tag;
DROP TABLE IF EXISTS event_journal;
DROP TABLE IF EXISTS snapshot;
DROP TABLE IF EXISTS durable_state;
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,8 @@ package org.apache.pekko.persistence.jdbc.state

import org.apache.pekko
import pekko.annotation.InternalApi
import slick.jdbc.{ JdbcProfile, SetParameter }
import slick.jdbc.H2Profile
import slick.jdbc.MySQLProfile
import slick.jdbc.OracleProfile
import slick.jdbc.PostgresProfile
import slick.jdbc.SQLServerProfile
import pekko.persistence.jdbc.config.DurableStateTableConfiguration
import slick.jdbc.{ H2Profile, JdbcProfile, OracleProfile, PostgresProfile, SQLServerProfile, SetParameter }

/**
* INTERNAL API
Expand All @@ -31,22 +26,17 @@ import pekko.persistence.jdbc.config.DurableStateTableConfiguration
val profile: JdbcProfile,
override val durableStateTableCfg: DurableStateTableConfiguration)
extends DurableStateTables {

import profile.api._

private def slickProfileToSchemaType(profile: JdbcProfile): String =
profile match {
case PostgresProfile => "Postgres"
case MySQLProfile => "MySQL"
case OracleProfile => "Oracle"
case SQLServerProfile => "SqlServer"
case H2Profile => "H2"
case _ => throw new IllegalArgumentException(s"Unknown JdbcProfile $profile encountered")
}

lazy val sequenceNextValUpdater = slickProfileToSchemaType(profile) match {
case "H2" => new H2SequenceNextValUpdater(profile, durableStateTableCfg)
case "Postgres" => new PostgresSequenceNextValUpdater(profile, durableStateTableCfg)
case _ => ???
lazy val sequenceNextValUpdater = profile match {
case H2Profile => new H2SequenceNextValUpdater(profile, durableStateTableCfg)
case PostgresProfile => new PostgresSequenceNextValUpdater(profile, durableStateTableCfg)
case SQLServerProfile => new SqlServerSequenceNextValUpdater(profile, durableStateTableCfg)
case OracleProfile => new OracleSequenceNextValUpdater(profile, durableStateTableCfg)
// TODO https://github.com/apache/pekko-persistence-jdbc/issues/174
// case MySQLProfile => new MySQLSequenceNextValUpdater(profile, durableStateTableCfg)
case _ => throw new UnsupportedOperationException(s"Unsupported JdbcProfile <$profile> for durableState.")
}

implicit val uuidSetter: SetParameter[Array[Byte]] = SetParameter[Array[Byte]] { case (bytes, params) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ import slick.sql.SqlStreamingAction
/**
* INTERNAL API
*/
@InternalApi private[jdbc] class H2SequenceNextValUpdater(
profile: JdbcProfile,
val durableStateTableCfg: DurableStateTableConfiguration)
@InternalApi private[jdbc] final class H2SequenceNextValUpdater(
profile: JdbcProfile, durableStateTableCfg: DurableStateTableConfiguration)
extends SequenceNextValUpdater {

import profile.api._

// H2 dependent (https://stackoverflow.com/questions/36244641/h2-equivalent-of-postgres-serial-or-bigserial-column)
Expand All @@ -50,13 +50,40 @@ import slick.sql.SqlStreamingAction
/**
* INTERNAL API
*/
@InternalApi private[jdbc] class PostgresSequenceNextValUpdater(
profile: JdbcProfile,
val durableStateTableCfg: DurableStateTableConfiguration)
@InternalApi private[jdbc] final class PostgresSequenceNextValUpdater(
profile: JdbcProfile, durableStateTableCfg: DurableStateTableConfiguration)
extends SequenceNextValUpdater {

import profile.api._

def getSequenceNextValueExpr() =
sql"""SELECT nextval(pg_get_serial_sequence('#${durableStateTableCfg.tableName}', '#${durableStateTableCfg.columnNames.globalOffset}'))""".as[
String]
}

/**
* INTERNAL API
*/
@InternalApi private[jdbc] final class SqlServerSequenceNextValUpdater(profile: JdbcProfile,
durableStateTableCfg: DurableStateTableConfiguration)
extends SequenceNextValUpdater {

import profile.api._

def getSequenceNextValueExpr() =
sql"""SELECT NEXT VALUE FOR #${durableStateTableCfg.columnNames.globalOffset}""".as[String]
}

/**
* INTERNAL API
*/
@InternalApi private[jdbc] final class OracleSequenceNextValUpdater(profile: JdbcProfile,
durableStateTableCfg: DurableStateTableConfiguration)
extends SequenceNextValUpdater {

import profile.api._
final val nextValFetcher =
s"""(SELECT nextval(pg_get_serial_sequence('${durableStateTableCfg.tableName}', '${durableStateTableCfg.columnNames.globalOffset}')))"""

def getSequenceNextValueExpr() = sql"""#$nextValFetcher""".as[String]
def getSequenceNextValueExpr() =
sql"""SELECT #${durableStateTableCfg.tableName}__#${durableStateTableCfg.columnNames.globalOffset}_SEQ.nextval FROM DUAL""".as[
String]
}
6 changes: 6 additions & 0 deletions core/src/test/resources/mysql-application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ jdbc-read-journal {
slick = ${slick}
}

# TODO https://github.com/apache/pekko-persistence-jdbc/issues/174
# the pekko-persistence-jdbc provider in use for durable state store
#jdbc-durable-state-store {
# slick = ${slick}
#}

slick {
profile = "slick.jdbc.MySQLProfile$"
db {
Expand Down
7 changes: 7 additions & 0 deletions core/src/test/resources/mysql-shared-db-application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,10 @@ jdbc-snapshot-store {
jdbc-read-journal {
use-shared-db = "slick"
}

# TODO https://github.com/apache/pekko-persistence-jdbc/issues/174
# the pekko-persistence-jdbc provider in use for durable state store
#jdbc-durable-state-store {
# use-shared-db = "slick"
#}

5 changes: 5 additions & 0 deletions core/src/test/resources/oracle-application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ jdbc-read-journal {
slick = ${slick}
}

# the pekko-persistence-jdbc provider in use for durable state store
jdbc-durable-state-store {
slick = ${slick}
}

slick {
profile = "slick.jdbc.OracleProfile$"
db {
Expand Down
19 changes: 19 additions & 0 deletions core/src/test/resources/oracle-schema-overrides.conf
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,22 @@ jdbc-journal {

}

jdbc-durable-state-store {
tables {
durable_state {
tableName = "DURABLE_STATE"
schemaName = "SYSTEM"

columnNames {
globalOffset = "GLOBAL_OFFSET"
persistenceId = "PERSISTENCE_ID"
revision = "REVISION"
statePayload = "STATE_PAYLOAD"
stateSerId = "STATE_SERIAL_ID"
stateSerManifest = "STATE_SERIAL_MANIFEST"
tag = "TAG"
stateTimestamp = "STATE_TIMESTAMP"
}
}
}
}
8 changes: 8 additions & 0 deletions core/src/test/resources/oracle-shared-db-application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ pekko {
// Enable the line below to automatically start the snapshot-store when the actorsystem is started
// auto-start-snapshot-stores = ["jdbc-snapshot-store"]
}
state {
plugin = "jdbc-durable-state-store"
}
}
}

Expand Down Expand Up @@ -62,3 +65,8 @@ jdbc-snapshot-store {
jdbc-read-journal {
use-shared-db = "slick"
}

# the pekko-persistence-jdbc provider in use for durable state store
jdbc-durable-state-store {
use-shared-db = "slick"
}
11 changes: 11 additions & 0 deletions core/src/test/resources/sqlserver-application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ jdbc-read-journal {
slick = ${slick}
}

# the pekko-persistence-jdbc provider in use for durable state store
jdbc-durable-state-store {
tables {
durable_state {
schemaName = "dbo"
}
}

slick = ${slick}
}

slick {
profile = "slick.jdbc.SQLServerProfile$"
db {
Expand Down
9 changes: 9 additions & 0 deletions core/src/test/resources/sqlserver-shared-db-application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pekko {
// Enable the line below to automatically start the snapshot-store when the actorsystem is started
// auto-start-snapshot-stores = ["jdbc-snapshot-store"]
}
state {
plugin = "jdbc-durable-state-store"
}
}
}

Expand Down Expand Up @@ -61,3 +64,9 @@ jdbc-snapshot-store {
jdbc-read-journal {
use-shared-db = "slick"
}

# the pekko-persistence-jdbc provider in use for durable state store
jdbc-durable-state-store {
use-shared-db = "slick"
}

Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ class MockDurableStateSequenceActorTest extends SharedActorSystemTestSpec {
lazy val cfg = customConfig
.getConfig("jdbc-durable-state-store")
.withFallback(system.settings.config.getConfig("jdbc-durable-state-store"))
.withFallback(ConfigFactory.load("h2-application.conf").getConfig("jdbc-durable-state-store"))
.withFallback(config.getConfig("jdbc-durable-state-store"))

val stateTableConfig = new DurableStateTableConfiguration(cfg)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.apache.pekko
import pekko.actor._
import pekko.persistence.jdbc.state.{ MyPayload, OffsetSyntax }
import OffsetSyntax._
import pekko.persistence.jdbc.testkit.internal.{ H2, Postgres, SchemaType }
import pekko.persistence.jdbc.testkit.internal.{ H2, Oracle, Postgres, SchemaType, SqlServer }
import pekko.persistence.query.{ NoOffset, Offset, Sequence, UpdatedDurableState }
import pekko.stream.scaladsl.Sink
import org.scalatest.time.{ Millis, Seconds, Span }
Expand Down Expand Up @@ -83,7 +83,14 @@ abstract class JdbcDurableStateSpec(config: Config, schemaType: SchemaType) exte
e shouldBe an[org.h2.jdbc.JdbcSQLIntegrityConstraintViolationException]
case Postgres =>
e shouldBe an[org.postgresql.util.PSQLException]
case _ => ???
// TODO https://github.com/apache/pekko-persistence-jdbc/issues/174
// case MySQL =>
// e shouldBe an[java.sql.SQLIntegrityConstraintViolationException]
case Oracle =>
e shouldBe an[java.sql.SQLIntegrityConstraintViolationException]
case SqlServer =>
e shouldBe an[com.microsoft.sqlserver.jdbc.SQLServerException]
case _ => throw new UnsupportedOperationException(s"Unsupported <$schemaType> for durableState.")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@
package org.apache.pekko.persistence.jdbc.state.scaladsl

import com.typesafe.config.{ Config, ConfigFactory }

import scala.concurrent.duration._
import scala.util.{ Failure, Success }
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time._

import org.apache.pekko
import pekko.actor._
import pekko.persistence.jdbc.db.SlickDatabase
import pekko.persistence.jdbc.config._
import pekko.persistence.jdbc.testkit.internal.{ H2, Postgres, SchemaType }
import pekko.persistence.jdbc.testkit.internal.{ H2, Oracle, Postgres, SchemaType, SqlServer }
import pekko.persistence.jdbc.util.DropCreate
import pekko.serialization.SerializationExtension
import pekko.util.Timeout
Expand All @@ -49,7 +49,11 @@ abstract class StateSpecBase(val config: Config, schemaType: SchemaType)
private[jdbc] def schemaTypeToProfile(s: SchemaType) = s match {
case H2 => slick.jdbc.H2Profile
case Postgres => slick.jdbc.PostgresProfile
case _ => ???
// TODO https://github.com/apache/pekko-persistence-jdbc/issues/174
// case MySQL => slick.jdbc.MySQLProfile
case SqlServer => slick.jdbc.SQLServerProfile
case Oracle => slick.jdbc.OracleProfile
case _ => throw new UnsupportedOperationException(s"Unsupported <$s> for durableState.")
}

val customSerializers = ConfigFactory.parseString("""
Expand Down
Loading

0 comments on commit 8df083a

Please sign in to comment.