Skip to content

Commit

Permalink
Preserve timestamps on migration from LevelDB to Postgres
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Broadhurst <peter.broadhurst@kaleido.io>
  • Loading branch information
peterbroadhurst committed Jul 11, 2023
1 parent bdeb537 commit bc77ecb
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 20 deletions.
2 changes: 1 addition & 1 deletion internal/persistence/dbmigration/leveldb2postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func MigrateLevelDBToPostgres(ctx context.Context) (err error) {
return i18n.NewError(ctx, tmmsgs.MsgPersistenceInitFail, "leveldb", err)
}
defer m.source.Close(ctx)
if m.target, err = postgres.NewPostgresPersistence(ctx, tmconfig.PostgresSection, nonceStateTimeout); err != nil {
if m.target, err = postgres.NewPostgresPersistence(ctx, tmconfig.PostgresSection, nonceStateTimeout, postgres.ForMigration); err != nil {
return i18n.NewError(ctx, tmmsgs.MsgPersistenceInitFail, "postgres", err)
}
defer m.target.Close(ctx)
Expand Down
3 changes: 2 additions & 1 deletion internal/persistence/postgres/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/hyperledger/firefly-transaction-manager/pkg/apitypes"
)

func (p *sqlPersistence) newCheckpointCollection() *dbsql.CrudBase[*apitypes.EventStreamCheckpoint] {
func (p *sqlPersistence) newCheckpointCollection(forMigration bool) *dbsql.CrudBase[*apitypes.EventStreamCheckpoint] {
collection := &dbsql.CrudBase[*apitypes.EventStreamCheckpoint]{
DB: p.db,
Table: "checkpoints",
Expand All @@ -39,6 +39,7 @@ func (p *sqlPersistence) newCheckpointCollection() *dbsql.CrudBase[*apitypes.Eve
"streamid": "id",
},
PatchDisabled: true,
TimesDisabled: forMigration,
NilValue: func() *apitypes.EventStreamCheckpoint { return nil },
NewInstance: func() *apitypes.EventStreamCheckpoint { return &apitypes.EventStreamCheckpoint{} },
GetFieldPtr: func(inst *apitypes.EventStreamCheckpoint, col string) interface{} {
Expand Down
3 changes: 2 additions & 1 deletion internal/persistence/postgres/confirmations.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/hyperledger/firefly-transaction-manager/pkg/apitypes"
)

func (p *sqlPersistence) newConfirmationsCollection() *dbsql.CrudBase[*apitypes.ConfirmationRecord] {
func (p *sqlPersistence) newConfirmationsCollection(forMigration bool) *dbsql.CrudBase[*apitypes.ConfirmationRecord] {
collection := &dbsql.CrudBase[*apitypes.ConfirmationRecord]{
DB: p.db,
Table: "confirmations",
Expand All @@ -47,6 +47,7 @@ func (p *sqlPersistence) newConfirmationsCollection() *dbsql.CrudBase[*apitypes.
"parenthash": "parent_hash",
},
PatchDisabled: true,
TimesDisabled: forMigration,
NilValue: func() *apitypes.ConfirmationRecord { return nil },
NewInstance: func() *apitypes.ConfirmationRecord {
return &apitypes.ConfirmationRecord{
Expand Down
7 changes: 4 additions & 3 deletions internal/persistence/postgres/eventstreams.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/hyperledger/firefly-transaction-manager/pkg/apitypes"
)

func (p *sqlPersistence) newEventStreamsCollection() *dbsql.CrudBase[*apitypes.EventStream] {
func (p *sqlPersistence) newEventStreamsCollection(forMigration bool) *dbsql.CrudBase[*apitypes.EventStream] {
collection := &dbsql.CrudBase[*apitypes.EventStream]{
DB: p.db,
Table: "eventstreams",
Expand Down Expand Up @@ -56,8 +56,9 @@ func (p *sqlPersistence) newEventStreamsCollection() *dbsql.CrudBase[*apitypes.E
"webhook": "webhook_config",
"websocket": "webhsocket_config",
},
NilValue: func() *apitypes.EventStream { return nil },
NewInstance: func() *apitypes.EventStream { return &apitypes.EventStream{} },
TimesDisabled: forMigration,
NilValue: func() *apitypes.EventStream { return nil },
NewInstance: func() *apitypes.EventStream { return &apitypes.EventStream{} },
GetFieldPtr: func(inst *apitypes.EventStream, col string) interface{} {
switch col {
case dbsql.ColumnID:
Expand Down
7 changes: 4 additions & 3 deletions internal/persistence/postgres/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/hyperledger/firefly-transaction-manager/pkg/apitypes"
)

func (p *sqlPersistence) newListenersCollection() *dbsql.CrudBase[*apitypes.Listener] {
func (p *sqlPersistence) newListenersCollection(forMigration bool) *dbsql.CrudBase[*apitypes.Listener] {
collection := &dbsql.CrudBase[*apitypes.Listener]{
DB: p.db,
Table: "listeners",
Expand All @@ -46,8 +46,9 @@ func (p *sqlPersistence) newListenersCollection() *dbsql.CrudBase[*apitypes.List
"streamid": "stream_id",
"from_block": "fromblock",
},
NilValue: func() *apitypes.Listener { return nil },
NewInstance: func() *apitypes.Listener { return &apitypes.Listener{} },
TimesDisabled: forMigration,
NilValue: func() *apitypes.Listener { return nil },
NewInstance: func() *apitypes.Listener { return &apitypes.Listener{} },
GetFieldPtr: func(inst *apitypes.Listener, col string) interface{} {
switch col {
case dbsql.ColumnID:
Expand Down
8 changes: 6 additions & 2 deletions internal/persistence/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,17 @@ type Postgres struct {
dbsql.Database
}

type CodeUsageOptions int

const ForMigration CodeUsageOptions = iota

var psql *Postgres

func NewPostgresPersistence(bgCtx context.Context, conf config.Section, nonceStateTimeout time.Duration) (persistence.Persistence, error) {
func NewPostgresPersistence(bgCtx context.Context, conf config.Section, nonceStateTimeout time.Duration, codeOptions ...CodeUsageOptions) (persistence.Persistence, error) {
if err := psql.Database.Init(bgCtx, psql, conf); err != nil {
return nil, err
}
return newSQLPersistence(bgCtx, &psql.Database, conf, nonceStateTimeout)
return newSQLPersistence(bgCtx, &psql.Database, conf, nonceStateTimeout, codeOptions...)
}

func (psql *Postgres) Name() string {
Expand Down
3 changes: 2 additions & 1 deletion internal/persistence/postgres/receipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi"
)

func (p *sqlPersistence) newReceiptsCollection() *dbsql.CrudBase[*apitypes.ReceiptRecord] {
func (p *sqlPersistence) newReceiptsCollection(forMigration bool) *dbsql.CrudBase[*apitypes.ReceiptRecord] {
collection := &dbsql.CrudBase[*apitypes.ReceiptRecord]{
DB: p.db,
Table: "receipts",
Expand All @@ -51,6 +51,7 @@ func (p *sqlPersistence) newReceiptsCollection() *dbsql.CrudBase[*apitypes.Recei
"contractlocation": "contract_loc",
},
PatchDisabled: true,
TimesDisabled: forMigration,
NilValue: func() *apitypes.ReceiptRecord { return nil },
NewInstance: func() *apitypes.ReceiptRecord {
return &apitypes.ReceiptRecord{
Expand Down
21 changes: 14 additions & 7 deletions internal/persistence/postgres/sqlpersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,25 @@ func InitConfig(conf config.Section) {
conf.AddKnownKey(ConfigTXWriterBatchSize, 100)
}

func newSQLPersistence(bgCtx context.Context, db *dbsql.Database, conf config.Section, nonceStateTimeout time.Duration) (p *sqlPersistence, err error) {
func newSQLPersistence(bgCtx context.Context, db *dbsql.Database, conf config.Section, nonceStateTimeout time.Duration, codeOptions ...CodeUsageOptions) (p *sqlPersistence, err error) {
p = &sqlPersistence{
db: db,
}

p.transactions = p.newTransactionCollection()
p.checkpoints = p.newCheckpointCollection()
p.confirmations = p.newConfirmationsCollection()
p.receipts = p.newReceiptsCollection()
forMigration := false
for _, co := range codeOptions {
if co == ForMigration {
forMigration = true
}
}

p.transactions = p.newTransactionCollection(forMigration)
p.checkpoints = p.newCheckpointCollection(forMigration)
p.confirmations = p.newConfirmationsCollection(forMigration)
p.receipts = p.newReceiptsCollection(forMigration)
p.txHistory = p.newTXHistoryCollection()
p.eventStreams = p.newEventStreamsCollection()
p.listeners = p.newListenersCollection()
p.eventStreams = p.newEventStreamsCollection(forMigration)
p.listeners = p.newListenersCollection(forMigration)

p.historySummaryLimit = conf.GetInt(ConfigTXWriterHistorySummaryLimit)
p.nonceStateTimeout = nonceStateTimeout
Expand Down
18 changes: 18 additions & 0 deletions internal/persistence/postgres/sqlpersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,24 @@ func newMockSQLPersistence(t *testing.T, init ...func(dbconf config.Section)) (c

}

func TestNewSQLPersistenceForMigration(t *testing.T) {

db, _ := dbsql.NewMockProvider().UTInit()

config.RootConfigReset()
dbconf := config.RootSection("utdb")
InitConfig(dbconf)

p, err := newSQLPersistence(context.Background(), &db.Database, dbconf, 1*time.Hour, ForMigration)
assert.NoError(t, err)
defer p.Close(context.Background())

assert.True(t, p.transactions.TimesDisabled)
assert.True(t, p.eventStreams.TimesDisabled)
assert.True(t, p.listeners.TimesDisabled)

}

func TestNewSQLPersistenceTXWriterFail(t *testing.T) {

db, _ := dbsql.NewMockProvider().UTInit()
Expand Down
3 changes: 2 additions & 1 deletion internal/persistence/postgres/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi"
)

func (p *sqlPersistence) newTransactionCollection() *dbsql.CrudBase[*apitypes.ManagedTX] {
func (p *sqlPersistence) newTransactionCollection(forMigration bool) *dbsql.CrudBase[*apitypes.ManagedTX] {
collection := &dbsql.CrudBase[*apitypes.ManagedTX]{
DB: p.db,
Table: "transactions",
Expand Down Expand Up @@ -68,6 +68,7 @@ func (p *sqlPersistence) newTransactionCollection() *dbsql.CrudBase[*apitypes.Ma
"errormessage": "error_message",
},
PatchDisabled: true,
TimesDisabled: forMigration,
NilValue: func() *apitypes.ManagedTX { return nil },
NewInstance: func() *apitypes.ManagedTX { return &apitypes.ManagedTX{} },
GetFieldPtr: func(inst *apitypes.ManagedTX, col string) interface{} {
Expand Down

0 comments on commit bc77ecb

Please sign in to comment.