Skip to content

Commit

Permalink
Merge pull request #97 from hyperledger/migration-timestamps
Browse files Browse the repository at this point in the history
Preserve timestamps on migration from LevelDB to Postgres
  • Loading branch information
peterbroadhurst committed Jul 14, 2023
2 parents bdeb537 + 6f22a7c commit fe5ace8
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 16 deletions.
2 changes: 1 addition & 1 deletion internal/persistence/dbmigration/leveldb2postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func MigrateLevelDBToPostgres(ctx context.Context) (err error) {
return i18n.NewError(ctx, tmmsgs.MsgPersistenceInitFail, "leveldb", err)
}
defer m.source.Close(ctx)
if m.target, err = postgres.NewPostgresPersistence(ctx, tmconfig.PostgresSection, nonceStateTimeout); err != nil {
if m.target, err = postgres.NewPostgresPersistence(ctx, tmconfig.PostgresSection, nonceStateTimeout, postgres.ForMigration); err != nil {
return i18n.NewError(ctx, tmmsgs.MsgPersistenceInitFail, "postgres", err)
}
defer m.target.Close(ctx)
Expand Down
25 changes: 25 additions & 0 deletions internal/persistence/dbmigration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ func (m *dbMigration) migrateEventStream(ctx context.Context, es *apitypes.Event
return err
}
if existingES == nil {
if es.Created == nil {
es.Created = fftypes.Now()
}
if es.Updated == nil {
es.Updated = es.Created
}
log.L(ctx).Infof("Writing event stream %s to target", es.ID)
if err := m.target.WriteStream(ctx, es); err != nil {
return err
Expand All @@ -96,6 +102,13 @@ func (m *dbMigration) migrateEventStream(ctx context.Context, es *apitypes.Event
return err
}
if cp != nil && existingCP == nil {
// LevelDB didn't have timestamps in checkpoints
if cp.FirstCheckpoint == nil {
cp.FirstCheckpoint = fftypes.Now()
}
if cp.Time == nil {
cp.Time = cp.FirstCheckpoint
}
log.L(ctx).Infof("Writing checkpoint %s to target", cp.StreamID)
if err := m.target.WriteCheckpoint(ctx, cp); err != nil {
return err
Expand Down Expand Up @@ -138,6 +151,12 @@ func (m *dbMigration) migrateListener(ctx context.Context, l *apitypes.Listener)
}
if existingL == nil {
log.L(ctx).Infof("Writing listener %s to target", l.ID)
if l.Created == nil {
l.Created = fftypes.Now()
}
if l.Updated == nil {
l.Updated = l.Created
}
if err := m.target.WriteListener(ctx, l); err != nil {
return err
}
Expand Down Expand Up @@ -185,6 +204,12 @@ func (m *dbMigration) migrateTransaction(ctx context.Context, mtx *apitypes.Mana
}
if existingTX == nil {
log.L(ctx).Infof("Writing transaction %s to target", tx.ID)
if tx.Created == nil {
tx.Created = fftypes.Now()
}
if tx.Updated == nil {
tx.Updated = tx.Created
}
if err := m.target.InsertTransactionPreAssignedNonce(ctx, tx.ManagedTX); err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion internal/persistence/postgres/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/hyperledger/firefly-transaction-manager/pkg/apitypes"
)

func (p *sqlPersistence) newCheckpointCollection() *dbsql.CrudBase[*apitypes.EventStreamCheckpoint] {
func (p *sqlPersistence) newCheckpointCollection(forMigration bool) *dbsql.CrudBase[*apitypes.EventStreamCheckpoint] {
collection := &dbsql.CrudBase[*apitypes.EventStreamCheckpoint]{
DB: p.db,
Table: "checkpoints",
Expand All @@ -39,6 +39,7 @@ func (p *sqlPersistence) newCheckpointCollection() *dbsql.CrudBase[*apitypes.Eve
"streamid": "id",
},
PatchDisabled: true,
TimesDisabled: forMigration,
NilValue: func() *apitypes.EventStreamCheckpoint { return nil },
NewInstance: func() *apitypes.EventStreamCheckpoint { return &apitypes.EventStreamCheckpoint{} },
GetFieldPtr: func(inst *apitypes.EventStreamCheckpoint, col string) interface{} {
Expand Down
7 changes: 4 additions & 3 deletions internal/persistence/postgres/eventstreams.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/hyperledger/firefly-transaction-manager/pkg/apitypes"
)

func (p *sqlPersistence) newEventStreamsCollection() *dbsql.CrudBase[*apitypes.EventStream] {
func (p *sqlPersistence) newEventStreamsCollection(forMigration bool) *dbsql.CrudBase[*apitypes.EventStream] {
collection := &dbsql.CrudBase[*apitypes.EventStream]{
DB: p.db,
Table: "eventstreams",
Expand Down Expand Up @@ -56,8 +56,9 @@ func (p *sqlPersistence) newEventStreamsCollection() *dbsql.CrudBase[*apitypes.E
"webhook": "webhook_config",
"websocket": "webhsocket_config",
},
NilValue: func() *apitypes.EventStream { return nil },
NewInstance: func() *apitypes.EventStream { return &apitypes.EventStream{} },
TimesDisabled: forMigration,
NilValue: func() *apitypes.EventStream { return nil },
NewInstance: func() *apitypes.EventStream { return &apitypes.EventStream{} },
GetFieldPtr: func(inst *apitypes.EventStream, col string) interface{} {
switch col {
case dbsql.ColumnID:
Expand Down
7 changes: 4 additions & 3 deletions internal/persistence/postgres/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/hyperledger/firefly-transaction-manager/pkg/apitypes"
)

func (p *sqlPersistence) newListenersCollection() *dbsql.CrudBase[*apitypes.Listener] {
func (p *sqlPersistence) newListenersCollection(forMigration bool) *dbsql.CrudBase[*apitypes.Listener] {
collection := &dbsql.CrudBase[*apitypes.Listener]{
DB: p.db,
Table: "listeners",
Expand All @@ -46,8 +46,9 @@ func (p *sqlPersistence) newListenersCollection() *dbsql.CrudBase[*apitypes.List
"streamid": "stream_id",
"from_block": "fromblock",
},
NilValue: func() *apitypes.Listener { return nil },
NewInstance: func() *apitypes.Listener { return &apitypes.Listener{} },
TimesDisabled: forMigration,
NilValue: func() *apitypes.Listener { return nil },
NewInstance: func() *apitypes.Listener { return &apitypes.Listener{} },
GetFieldPtr: func(inst *apitypes.Listener, col string) interface{} {
switch col {
case dbsql.ColumnID:
Expand Down
8 changes: 6 additions & 2 deletions internal/persistence/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,17 @@ type Postgres struct {
dbsql.Database
}

type CodeUsageOptions int

const ForMigration CodeUsageOptions = iota

var psql *Postgres

func NewPostgresPersistence(bgCtx context.Context, conf config.Section, nonceStateTimeout time.Duration) (persistence.Persistence, error) {
func NewPostgresPersistence(bgCtx context.Context, conf config.Section, nonceStateTimeout time.Duration, codeOptions ...CodeUsageOptions) (persistence.Persistence, error) {
if err := psql.Database.Init(bgCtx, psql, conf); err != nil {
return nil, err
}
return newSQLPersistence(bgCtx, &psql.Database, conf, nonceStateTimeout)
return newSQLPersistence(bgCtx, &psql.Database, conf, nonceStateTimeout, codeOptions...)
}

func (psql *Postgres) Name() string {
Expand Down
17 changes: 12 additions & 5 deletions internal/persistence/postgres/sqlpersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,25 @@ func InitConfig(conf config.Section) {
conf.AddKnownKey(ConfigTXWriterBatchSize, 100)
}

func newSQLPersistence(bgCtx context.Context, db *dbsql.Database, conf config.Section, nonceStateTimeout time.Duration) (p *sqlPersistence, err error) {
func newSQLPersistence(bgCtx context.Context, db *dbsql.Database, conf config.Section, nonceStateTimeout time.Duration, codeOptions ...CodeUsageOptions) (p *sqlPersistence, err error) {
p = &sqlPersistence{
db: db,
}

p.transactions = p.newTransactionCollection()
p.checkpoints = p.newCheckpointCollection()
forMigration := false
for _, co := range codeOptions {
if co == ForMigration {
forMigration = true
}
}

p.eventStreams = p.newEventStreamsCollection(forMigration)
p.checkpoints = p.newCheckpointCollection(forMigration)
p.listeners = p.newListenersCollection(forMigration)
p.transactions = p.newTransactionCollection(forMigration)
p.confirmations = p.newConfirmationsCollection()
p.receipts = p.newReceiptsCollection()
p.txHistory = p.newTXHistoryCollection()
p.eventStreams = p.newEventStreamsCollection()
p.listeners = p.newListenersCollection()

p.historySummaryLimit = conf.GetInt(ConfigTXWriterHistorySummaryLimit)
p.nonceStateTimeout = nonceStateTimeout
Expand Down
18 changes: 18 additions & 0 deletions internal/persistence/postgres/sqlpersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,24 @@ func newMockSQLPersistence(t *testing.T, init ...func(dbconf config.Section)) (c

}

func TestNewSQLPersistenceForMigration(t *testing.T) {

db, _ := dbsql.NewMockProvider().UTInit()

config.RootConfigReset()
dbconf := config.RootSection("utdb")
InitConfig(dbconf)

p, err := newSQLPersistence(context.Background(), &db.Database, dbconf, 1*time.Hour, ForMigration)
assert.NoError(t, err)
defer p.Close(context.Background())

assert.True(t, p.transactions.TimesDisabled)
assert.True(t, p.eventStreams.TimesDisabled)
assert.True(t, p.listeners.TimesDisabled)

}

func TestNewSQLPersistenceTXWriterFail(t *testing.T) {

db, _ := dbsql.NewMockProvider().UTInit()
Expand Down
3 changes: 2 additions & 1 deletion internal/persistence/postgres/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi"
)

func (p *sqlPersistence) newTransactionCollection() *dbsql.CrudBase[*apitypes.ManagedTX] {
func (p *sqlPersistence) newTransactionCollection(forMigration bool) *dbsql.CrudBase[*apitypes.ManagedTX] {
collection := &dbsql.CrudBase[*apitypes.ManagedTX]{
DB: p.db,
Table: "transactions",
Expand Down Expand Up @@ -68,6 +68,7 @@ func (p *sqlPersistence) newTransactionCollection() *dbsql.CrudBase[*apitypes.Ma
"errormessage": "error_message",
},
PatchDisabled: true,
TimesDisabled: forMigration,
NilValue: func() *apitypes.ManagedTX { return nil },
NewInstance: func() *apitypes.ManagedTX { return &apitypes.ManagedTX{} },
GetFieldPtr: func(inst *apitypes.ManagedTX, col string) interface{} {
Expand Down

0 comments on commit fe5ace8

Please sign in to comment.