Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fftimestamp fix, SQL DB persistence implementation #113

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/persistence/dbmigration/leveldb2postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ func MigrateLevelDBToPostgres(ctx context.Context) (err error) {
return i18n.NewError(ctx, tmmsgs.MsgPersistenceInitFail, "leveldb", err)
}
defer m.source.Close(ctx)
if m.target, err = postgres.NewPostgresPersistence(ctx, tmconfig.PostgresSection, nonceStateTimeout, postgres.ForMigration); err != nil {
if m.pg, err = postgres.NewPostgresPersistence(ctx, tmconfig.PostgresSection, nonceStateTimeout, postgres.ForMigration); err != nil {
return i18n.NewError(ctx, tmmsgs.MsgPersistenceInitFail, "postgres", err)
}
defer m.target.Close(ctx)
defer m.pg.Close(ctx)

return m.run(ctx)
}
2 changes: 2 additions & 0 deletions internal/persistence/dbmigration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly-transaction-manager/internal/persistence"
"github.com/hyperledger/firefly-transaction-manager/internal/persistence/postgres"
"github.com/hyperledger/firefly-transaction-manager/pkg/apitypes"
"github.com/hyperledger/firefly-transaction-manager/pkg/txhandler"
)
Expand All @@ -33,6 +34,7 @@ const (
type dbMigration struct {
source persistence.Persistence
target persistence.Persistence
pg postgres.SQLPersistence
}

func (m *dbMigration) run(ctx context.Context) error {
Expand Down
9 changes: 6 additions & 3 deletions internal/persistence/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/golang-migrate/migrate/v4/database/postgres"
"github.com/hyperledger/firefly-common/pkg/config"
"github.com/hyperledger/firefly-common/pkg/dbsql"
"github.com/hyperledger/firefly-transaction-manager/internal/persistence"

// Import pq driver
_ "github.com/lib/pq"
Expand All @@ -46,11 +45,15 @@ const ForMigration CodeUsageOptions = iota

var psql *Postgres

func NewPostgresPersistence(bgCtx context.Context, conf config.Section, nonceStateTimeout time.Duration, codeOptions ...CodeUsageOptions) (persistence.Persistence, error) {
func NewPostgresPersistence(bgCtx context.Context, conf config.Section, nonceStateTimeout time.Duration, codeOptions ...CodeUsageOptions) (SQLPersistence, error) {
if err := psql.Database.Init(bgCtx, psql, conf); err != nil {
return nil, err
}
return newSQLPersistence(bgCtx, &psql.Database, conf, nonceStateTimeout, codeOptions...)
p, err := newSQLPersistence(bgCtx, &psql.Database, conf, nonceStateTimeout, codeOptions...)
if err != nil {
return nil, err
}
return p, nil
}

func (psql *Postgres) Name() string {
Expand Down
6 changes: 6 additions & 0 deletions internal/persistence/postgres/sqlpersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ type sqlPersistence struct {
nonceStateTimeout time.Duration
}

type SQLPersistence interface {
RichQuery() persistence.RichQuery
seqAfterFilter(ctx context.Context, qf *ffapi.QueryFields, after *int64, limit int, dir txhandler.SortDirection, conditions ...ffapi.Filter) (filter ffapi.Filter)
Close(_ context.Context)
}

// InitConfig gets called after config reset to initialize the config structure
func InitConfig(conf config.Section) {
psql = &Postgres{}
Expand Down
10 changes: 5 additions & 5 deletions internal/persistence/postgres/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,15 @@ func TestTransactionBasicValidationPSQL(t *testing.T) {
}
err = p.AddTransactionConfirmations(ctx, txID, true, confirmations...)
assert.NoError(t, err)

SubActiontime := fftypes.Now()
// A couple of transaction history entries
err = p.AddSubStatusAction(ctx, txID, apitypes.TxSubStatusReceived, apitypes.TxActionAssignNonce, fftypes.JSONAnyPtr(`{"nonce":"11111"}`), nil)
err = p.AddSubStatusAction(ctx, txID, apitypes.TxSubStatusReceived, apitypes.TxActionAssignNonce, fftypes.JSONAnyPtr(`{"nonce":"11111"}`), nil, SubActiontime)
assert.NoError(t, err)
err = p.AddSubStatusAction(ctx, txID, apitypes.TxSubStatusReceived, apitypes.TxActionSubmitTransaction, nil, fftypes.JSONAnyPtr(`"failed to submit 1"`))
err = p.AddSubStatusAction(ctx, txID, apitypes.TxSubStatusReceived, apitypes.TxActionSubmitTransaction, nil, fftypes.JSONAnyPtr(`"failed to submit 1"`), SubActiontime)
assert.NoError(t, err)
err = p.AddSubStatusAction(ctx, txID, apitypes.TxSubStatusReceived, apitypes.TxActionSubmitTransaction, nil, fftypes.JSONAnyPtr(`"failed to submit 2"`))
err = p.AddSubStatusAction(ctx, txID, apitypes.TxSubStatusReceived, apitypes.TxActionSubmitTransaction, nil, fftypes.JSONAnyPtr(`"failed to submit 2"`), SubActiontime)
assert.NoError(t, err)
err = p.AddSubStatusAction(ctx, txID, apitypes.TxSubStatusTracking, apitypes.TxActionSubmitTransaction, fftypes.JSONAnyPtr(`{"txhash":"0x12345"}`), nil)
err = p.AddSubStatusAction(ctx, txID, apitypes.TxSubStatusTracking, apitypes.TxActionSubmitTransaction, fftypes.JSONAnyPtr(`{"txhash":"0x12345"}`), nil, SubActiontime)
assert.NoError(t, err)

// Finally the update - do a comprehensive one
Expand Down
9 changes: 4 additions & 5 deletions internal/persistence/postgres/txhistory.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,25 +95,24 @@ func (p *sqlPersistence) ListTransactionHistory(ctx context.Context, txID string
return p.txHistory.GetMany(ctx, filter.Condition(filter.Builder().Eq("transaction", txID)))
}

func (p *sqlPersistence) AddSubStatusAction(ctx context.Context, txID string, subStatus apitypes.TxSubStatus, action apitypes.TxAction, info *fftypes.JSONAny, errInfo *fftypes.JSONAny) error {
func (p *sqlPersistence) AddSubStatusAction(ctx context.Context, txID string, subStatus apitypes.TxSubStatus, action apitypes.TxAction, info *fftypes.JSONAny, errInfo *fftypes.JSONAny, t *fftypes.FFTime) error {
// Dispatch to TX writer
now := fftypes.Now()
op := newTransactionOperation(txID)
op.historyRecord = &apitypes.TXHistoryRecord{
ID: fftypes.NewUUID(),
TransactionID: txID,
SubStatus: subStatus,
TxHistoryActionEntry: apitypes.TxHistoryActionEntry{
OccurrenceCount: 1,
Time: now,
LastOccurrence: now,
Time: t,
LastOccurrence: t,
Chengxuan marked this conversation as resolved.
Show resolved Hide resolved
Action: action,
LastInfo: persistence.JSONOrString(info), // guard against bad JSON
LastError: persistence.JSONOrString(errInfo), // guard against bad JSON
},
}
if errInfo != nil {
op.historyRecord.LastErrorTime = fftypes.Now()
op.historyRecord.LastErrorTime = t
}
p.writer.queue(ctx, op)
return nil // completely async
Expand Down
23 changes: 12 additions & 11 deletions internal/persistence/postgres/txhistory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,38 +57,39 @@ func TestTXHistoryCompressionPSQL(t *testing.T) {
assert.NotEmpty(t, tx.SequenceID)

// Add a bunch of simulated status entries, which are good for compression
SubActiontime := fftypes.Now()
err = p.AddSubStatusAction(ctx, txID, apitypes.TxSubStatusReceived, apitypes.TxActionAssignNonce,
fftypes.JSONAnyPtr(`{"nonce":"11111"}`), nil)
fftypes.JSONAnyPtr(`{"nonce":"11111"}`), nil, SubActiontime)
assert.NoError(t, err)
err = p.AddSubStatusAction(ctx, txID, apitypes.TxSubStatusReceived, apitypes.TxActionSubmitTransaction,
nil, fftypes.JSONAnyPtr(`"failed to submit 111"`))
nil, fftypes.JSONAnyPtr(`"failed to submit 111"`), SubActiontime)
assert.NoError(t, err)
err = p.AddSubStatusAction(ctx, txID, apitypes.TxSubStatusReceived, apitypes.TxActionSubmitTransaction,
nil, fftypes.JSONAnyPtr(`"failed to submit 222"`))
nil, fftypes.JSONAnyPtr(`"failed to submit 222"`), SubActiontime)
assert.NoError(t, err)
err = p.AddSubStatusAction(ctx, txID, apitypes.TxSubStatusReceived, apitypes.TxActionSubmitTransaction,
nil, fftypes.JSONAnyPtr(`"failed to submit 333"`))
nil, fftypes.JSONAnyPtr(`"failed to submit 333"`), SubActiontime)
assert.NoError(t, err)
err = p.AddSubStatusAction(ctx, txID, apitypes.TxSubStatusTracking, apitypes.TxActionSubmitTransaction,
fftypes.JSONAnyPtr(`{"transactionHash":"0x12345"}`), nil)
fftypes.JSONAnyPtr(`{"transactionHash":"0x12345"}`), nil, SubActiontime)
assert.NoError(t, err)
err = p.AddSubStatusAction(ctx, txID, apitypes.TxSubStatusTracking, apitypes.TxActionReceiveReceipt,
fftypes.JSONAnyPtr(`{"transactionHash":"0x111111","blockNumber":"11111"}`), nil)
fftypes.JSONAnyPtr(`{"transactionHash":"0x111111","blockNumber":"11111"}`), nil, SubActiontime)
assert.NoError(t, err)
err = p.AddSubStatusAction(ctx, txID, apitypes.TxSubStatusTracking, apitypes.TxActionReceiveReceipt,
fftypes.JSONAnyPtr(`{"transactionHash":"0x222222","blockNumber":"22222"}`), nil)
fftypes.JSONAnyPtr(`{"transactionHash":"0x222222","blockNumber":"22222"}`), nil, SubActiontime)
assert.NoError(t, err)
err = p.AddSubStatusAction(ctx, txID, apitypes.TxSubStatusTracking, apitypes.TxActionReceiveReceipt,
fftypes.JSONAnyPtr(`{"transactionHash":"0x333333","blockNumber":"33333"}`), nil)
fftypes.JSONAnyPtr(`{"transactionHash":"0x333333","blockNumber":"33333"}`), nil, SubActiontime)
assert.NoError(t, err)
err = p.AddSubStatusAction(ctx, txID, apitypes.TxSubStatusTracking, apitypes.TxActionTimeout,
nil, fftypes.JSONAnyPtr(`"some error 444"`))
nil, fftypes.JSONAnyPtr(`"some error 444"`), SubActiontime)
assert.NoError(t, err)
err = p.AddSubStatusAction(ctx, txID, apitypes.TxSubStatusTracking, apitypes.TxActionTimeout,
nil, fftypes.JSONAnyPtr(`"some error 555"`))
nil, fftypes.JSONAnyPtr(`"some error 555"`), SubActiontime)
assert.NoError(t, err)
err = p.AddSubStatusAction(ctx, txID, apitypes.TxSubStatusConfirmed, apitypes.TxActionConfirmTransaction,
fftypes.JSONAnyPtr(`{"confirmed":true}`), nil)
fftypes.JSONAnyPtr(`{"confirmed":true}`), nil, SubActiontime)
assert.NoError(t, err)

// Flush with a TX update
Expand Down
3 changes: 2 additions & 1 deletion pkg/fftm/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type manager struct {
metricsServerDone chan error
metricsEnabled bool
metricsManager metrics.Metrics
sqlPersistence postgres.SQLPersistence
}

func InitConfig() {
Expand Down Expand Up @@ -150,7 +151,7 @@ func (m *manager) initPersistence(ctx context.Context) (err error) {
return i18n.NewError(ctx, tmmsgs.MsgPersistenceInitFail, pType, err)
}
case "postgres":
if m.persistence, err = postgres.NewPostgresPersistence(ctx, tmconfig.PostgresSection, nonceStateTimeout); err != nil {
if m.sqlPersistence, err = postgres.NewPostgresPersistence(ctx, tmconfig.PostgresSection, nonceStateTimeout); err != nil {
return i18n.NewError(ctx, tmmsgs.MsgPersistenceInitFail, pType, err)
}
if !config.GetBool(tmconfig.APISimpleQuery) {
Expand Down
Loading