From bc77ecb6e0bc9946928043738bc722580d8d93e0 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Tue, 11 Jul 2023 00:36:04 -0400 Subject: [PATCH] Preserve timestamps on migration from LevelDB to Postgres Signed-off-by: Peter Broadhurst --- .../dbmigration/leveldb2postgres.go | 2 +- internal/persistence/postgres/checkpoints.go | 3 ++- .../persistence/postgres/confirmations.go | 3 ++- internal/persistence/postgres/eventstreams.go | 7 ++++--- internal/persistence/postgres/listeners.go | 7 ++++--- internal/persistence/postgres/postgres.go | 8 +++++-- internal/persistence/postgres/receipts.go | 3 ++- .../persistence/postgres/sqlpersistence.go | 21 ++++++++++++------- .../postgres/sqlpersistence_test.go | 18 ++++++++++++++++ internal/persistence/postgres/transactions.go | 3 ++- 10 files changed, 55 insertions(+), 20 deletions(-) diff --git a/internal/persistence/dbmigration/leveldb2postgres.go b/internal/persistence/dbmigration/leveldb2postgres.go index 2a82c4c6..d109c87f 100644 --- a/internal/persistence/dbmigration/leveldb2postgres.go +++ b/internal/persistence/dbmigration/leveldb2postgres.go @@ -37,7 +37,7 @@ func MigrateLevelDBToPostgres(ctx context.Context) (err error) { return i18n.NewError(ctx, tmmsgs.MsgPersistenceInitFail, "leveldb", err) } defer m.source.Close(ctx) - if m.target, err = postgres.NewPostgresPersistence(ctx, tmconfig.PostgresSection, nonceStateTimeout); err != nil { + if m.target, err = postgres.NewPostgresPersistence(ctx, tmconfig.PostgresSection, nonceStateTimeout, postgres.ForMigration); err != nil { return i18n.NewError(ctx, tmmsgs.MsgPersistenceInitFail, "postgres", err) } defer m.target.Close(ctx) diff --git a/internal/persistence/postgres/checkpoints.go b/internal/persistence/postgres/checkpoints.go index 8b3acb7a..62d4d2e6 100644 --- a/internal/persistence/postgres/checkpoints.go +++ b/internal/persistence/postgres/checkpoints.go @@ -24,7 +24,7 @@ import ( "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" ) -func (p *sqlPersistence) newCheckpointCollection() *dbsql.CrudBase[*apitypes.EventStreamCheckpoint] { +func (p *sqlPersistence) newCheckpointCollection(forMigration bool) *dbsql.CrudBase[*apitypes.EventStreamCheckpoint] { collection := &dbsql.CrudBase[*apitypes.EventStreamCheckpoint]{ DB: p.db, Table: "checkpoints", @@ -39,6 +39,7 @@ func (p *sqlPersistence) newCheckpointCollection() *dbsql.CrudBase[*apitypes.Eve "streamid": "id", }, PatchDisabled: true, + TimesDisabled: forMigration, NilValue: func() *apitypes.EventStreamCheckpoint { return nil }, NewInstance: func() *apitypes.EventStreamCheckpoint { return &apitypes.EventStreamCheckpoint{} }, GetFieldPtr: func(inst *apitypes.EventStreamCheckpoint, col string) interface{} { diff --git a/internal/persistence/postgres/confirmations.go b/internal/persistence/postgres/confirmations.go index 48894402..40966558 100644 --- a/internal/persistence/postgres/confirmations.go +++ b/internal/persistence/postgres/confirmations.go @@ -26,7 +26,7 @@ import ( "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" ) -func (p *sqlPersistence) newConfirmationsCollection() *dbsql.CrudBase[*apitypes.ConfirmationRecord] { +func (p *sqlPersistence) newConfirmationsCollection(forMigration bool) *dbsql.CrudBase[*apitypes.ConfirmationRecord] { collection := &dbsql.CrudBase[*apitypes.ConfirmationRecord]{ DB: p.db, Table: "confirmations", @@ -47,6 +47,7 @@ func (p *sqlPersistence) newConfirmationsCollection() *dbsql.CrudBase[*apitypes. "parenthash": "parent_hash", }, PatchDisabled: true, + TimesDisabled: forMigration, NilValue: func() *apitypes.ConfirmationRecord { return nil }, NewInstance: func() *apitypes.ConfirmationRecord { return &apitypes.ConfirmationRecord{ diff --git a/internal/persistence/postgres/eventstreams.go b/internal/persistence/postgres/eventstreams.go index f37b7177..a87db7e3 100644 --- a/internal/persistence/postgres/eventstreams.go +++ b/internal/persistence/postgres/eventstreams.go @@ -26,7 +26,7 @@ import ( "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" ) -func (p *sqlPersistence) newEventStreamsCollection() *dbsql.CrudBase[*apitypes.EventStream] { +func (p *sqlPersistence) newEventStreamsCollection(forMigration bool) *dbsql.CrudBase[*apitypes.EventStream] { collection := &dbsql.CrudBase[*apitypes.EventStream]{ DB: p.db, Table: "eventstreams", @@ -56,8 +56,9 @@ func (p *sqlPersistence) newEventStreamsCollection() *dbsql.CrudBase[*apitypes.E "webhook": "webhook_config", "websocket": "webhsocket_config", }, - NilValue: func() *apitypes.EventStream { return nil }, - NewInstance: func() *apitypes.EventStream { return &apitypes.EventStream{} }, + TimesDisabled: forMigration, + NilValue: func() *apitypes.EventStream { return nil }, + NewInstance: func() *apitypes.EventStream { return &apitypes.EventStream{} }, GetFieldPtr: func(inst *apitypes.EventStream, col string) interface{} { switch col { case dbsql.ColumnID: diff --git a/internal/persistence/postgres/listeners.go b/internal/persistence/postgres/listeners.go index 307fba85..d6543842 100644 --- a/internal/persistence/postgres/listeners.go +++ b/internal/persistence/postgres/listeners.go @@ -26,7 +26,7 @@ import ( "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" ) -func (p *sqlPersistence) newListenersCollection() *dbsql.CrudBase[*apitypes.Listener] { +func (p *sqlPersistence) newListenersCollection(forMigration bool) *dbsql.CrudBase[*apitypes.Listener] { collection := &dbsql.CrudBase[*apitypes.Listener]{ DB: p.db, Table: "listeners", @@ -46,8 +46,9 @@ func (p *sqlPersistence) newListenersCollection() *dbsql.CrudBase[*apitypes.List "streamid": "stream_id", "from_block": "fromblock", }, - NilValue: func() *apitypes.Listener { return nil }, - NewInstance: func() *apitypes.Listener { return &apitypes.Listener{} }, + TimesDisabled: forMigration, + NilValue: func() *apitypes.Listener { return nil }, + NewInstance: func() *apitypes.Listener { return &apitypes.Listener{} }, GetFieldPtr: func(inst *apitypes.Listener, col string) interface{} { switch col { case dbsql.ColumnID: diff --git a/internal/persistence/postgres/postgres.go b/internal/persistence/postgres/postgres.go index 460beb5a..d8ad93d4 100644 --- a/internal/persistence/postgres/postgres.go +++ b/internal/persistence/postgres/postgres.go @@ -40,13 +40,17 @@ type Postgres struct { dbsql.Database } +type CodeUsageOptions int + +const ForMigration CodeUsageOptions = iota + var psql *Postgres -func NewPostgresPersistence(bgCtx context.Context, conf config.Section, nonceStateTimeout time.Duration) (persistence.Persistence, error) { +func NewPostgresPersistence(bgCtx context.Context, conf config.Section, nonceStateTimeout time.Duration, codeOptions ...CodeUsageOptions) (persistence.Persistence, error) { if err := psql.Database.Init(bgCtx, psql, conf); err != nil { return nil, err } - return newSQLPersistence(bgCtx, &psql.Database, conf, nonceStateTimeout) + return newSQLPersistence(bgCtx, &psql.Database, conf, nonceStateTimeout, codeOptions...) } func (psql *Postgres) Name() string { diff --git a/internal/persistence/postgres/receipts.go b/internal/persistence/postgres/receipts.go index 5f98dd9d..2053a26c 100644 --- a/internal/persistence/postgres/receipts.go +++ b/internal/persistence/postgres/receipts.go @@ -24,7 +24,7 @@ import ( "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" ) -func (p *sqlPersistence) newReceiptsCollection() *dbsql.CrudBase[*apitypes.ReceiptRecord] { +func (p *sqlPersistence) newReceiptsCollection(forMigration bool) *dbsql.CrudBase[*apitypes.ReceiptRecord] { collection := &dbsql.CrudBase[*apitypes.ReceiptRecord]{ DB: p.db, Table: "receipts", @@ -51,6 +51,7 @@ func (p *sqlPersistence) newReceiptsCollection() *dbsql.CrudBase[*apitypes.Recei "contractlocation": "contract_loc", }, PatchDisabled: true, + TimesDisabled: forMigration, NilValue: func() *apitypes.ReceiptRecord { return nil }, NewInstance: func() *apitypes.ReceiptRecord { return &apitypes.ReceiptRecord{ diff --git a/internal/persistence/postgres/sqlpersistence.go b/internal/persistence/postgres/sqlpersistence.go index 6898f270..646b9b54 100644 --- a/internal/persistence/postgres/sqlpersistence.go +++ b/internal/persistence/postgres/sqlpersistence.go @@ -68,18 +68,25 @@ func InitConfig(conf config.Section) { conf.AddKnownKey(ConfigTXWriterBatchSize, 100) } -func newSQLPersistence(bgCtx context.Context, db *dbsql.Database, conf config.Section, nonceStateTimeout time.Duration) (p *sqlPersistence, err error) { +func newSQLPersistence(bgCtx context.Context, db *dbsql.Database, conf config.Section, nonceStateTimeout time.Duration, codeOptions ...CodeUsageOptions) (p *sqlPersistence, err error) { p = &sqlPersistence{ db: db, } - p.transactions = p.newTransactionCollection() - p.checkpoints = p.newCheckpointCollection() - p.confirmations = p.newConfirmationsCollection() - p.receipts = p.newReceiptsCollection() + forMigration := false + for _, co := range codeOptions { + if co == ForMigration { + forMigration = true + } + } + + p.transactions = p.newTransactionCollection(forMigration) + p.checkpoints = p.newCheckpointCollection(forMigration) + p.confirmations = p.newConfirmationsCollection(forMigration) + p.receipts = p.newReceiptsCollection(forMigration) p.txHistory = p.newTXHistoryCollection() - p.eventStreams = p.newEventStreamsCollection() - p.listeners = p.newListenersCollection() + p.eventStreams = p.newEventStreamsCollection(forMigration) + p.listeners = p.newListenersCollection(forMigration) p.historySummaryLimit = conf.GetInt(ConfigTXWriterHistorySummaryLimit) p.nonceStateTimeout = nonceStateTimeout diff --git a/internal/persistence/postgres/sqlpersistence_test.go b/internal/persistence/postgres/sqlpersistence_test.go index eb64e3ea..48addb5f 100644 --- a/internal/persistence/postgres/sqlpersistence_test.go +++ b/internal/persistence/postgres/sqlpersistence_test.go @@ -52,6 +52,24 @@ func newMockSQLPersistence(t *testing.T, init ...func(dbconf config.Section)) (c } +func TestNewSQLPersistenceForMigration(t *testing.T) { + + db, _ := dbsql.NewMockProvider().UTInit() + + config.RootConfigReset() + dbconf := config.RootSection("utdb") + InitConfig(dbconf) + + p, err := newSQLPersistence(context.Background(), &db.Database, dbconf, 1*time.Hour, ForMigration) + assert.NoError(t, err) + defer p.Close(context.Background()) + + assert.True(t, p.transactions.TimesDisabled) + assert.True(t, p.eventStreams.TimesDisabled) + assert.True(t, p.listeners.TimesDisabled) + +} + func TestNewSQLPersistenceTXWriterFail(t *testing.T) { db, _ := dbsql.NewMockProvider().UTInit() diff --git a/internal/persistence/postgres/transactions.go b/internal/persistence/postgres/transactions.go index 9df722ea..ab3d93dc 100644 --- a/internal/persistence/postgres/transactions.go +++ b/internal/persistence/postgres/transactions.go @@ -28,7 +28,7 @@ import ( "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" ) -func (p *sqlPersistence) newTransactionCollection() *dbsql.CrudBase[*apitypes.ManagedTX] { +func (p *sqlPersistence) newTransactionCollection(forMigration bool) *dbsql.CrudBase[*apitypes.ManagedTX] { collection := &dbsql.CrudBase[*apitypes.ManagedTX]{ DB: p.db, Table: "transactions", @@ -68,6 +68,7 @@ func (p *sqlPersistence) newTransactionCollection() *dbsql.CrudBase[*apitypes.Ma "errormessage": "error_message", }, PatchDisabled: true, + TimesDisabled: forMigration, NilValue: func() *apitypes.ManagedTX { return nil }, NewInstance: func() *apitypes.ManagedTX { return &apitypes.ManagedTX{} }, GetFieldPtr: func(inst *apitypes.ManagedTX, col string) interface{} {