Skip to content

Commit

Permalink
Merge pull request #89 from hyperledger/psql-2
Browse files Browse the repository at this point in the history
Add suspend/resume transaction actions, and tweaks to history API
  • Loading branch information
nguyer committed Jul 3, 2023
2 parents ec94e7b + 13744db commit 7b4c815
Show file tree
Hide file tree
Showing 29 changed files with 797 additions and 173 deletions.
2 changes: 1 addition & 1 deletion config.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@
|batchTimeout|Duration to hold batch open for new transaction operations before flushing to the DB|[`time.Duration`](https://pkg.go.dev/time#Duration)|`10ms`
|cacheSlots|Number of transactions to hold cached metadata for to avoid DB read operations to calculate history|`int`|`1000`
|count|Number of transactions writing routines to start|`int`|`5`
|historyCompactionInterval|Duration between cleanup activities on the DB for a transaction with a large history|[`time.Duration`](https://pkg.go.dev/time#Duration)|`5m`
|historyCompactionInterval|Duration between cleanup activities on the DB for a transaction with a large history|[`time.Duration`](https://pkg.go.dev/time#Duration)|`0`
|historySummaryLimit|Maximum number of action entries to return embedded in the JSON response object when querying a transaction summary|`int`|`50`

## policyengine
Expand Down
23 changes: 13 additions & 10 deletions internal/persistence/leveldb/leveldb_persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func (p *leveldbPersistence) ListTransactionsPending(ctx context.Context, afterS
func (p *leveldbPersistence) GetTransactionByID(ctx context.Context, txID string) (tx *apitypes.ManagedTX, err error) {
p.txMux.RLock()
defer p.txMux.RUnlock()
txh, err := p.GetTransactionByIDWithStatus(ctx, txID)
txh, err := p.GetTransactionByIDWithStatus(ctx, txID, false)
if err != nil || txh == nil {
return nil, err
}
Expand All @@ -408,7 +408,7 @@ func (p *leveldbPersistence) GetTransactionByID(ctx context.Context, txID string
func (p *leveldbPersistence) GetTransactionReceipt(ctx context.Context, txID string) (receipt *ffcapi.TransactionReceiptResponse, err error) {
p.txMux.RLock()
defer p.txMux.RUnlock()
txh, err := p.GetTransactionByIDWithStatus(ctx, txID)
txh, err := p.GetTransactionByIDWithStatus(ctx, txID, false)
if err != nil || txh == nil {
return nil, err
}
Expand All @@ -418,7 +418,7 @@ func (p *leveldbPersistence) GetTransactionReceipt(ctx context.Context, txID str
func (p *leveldbPersistence) GetTransactionConfirmations(ctx context.Context, txID string) (confirmations []*apitypes.Confirmation, err error) {
p.txMux.RLock()
defer p.txMux.RUnlock()
txh, err := p.GetTransactionByIDWithStatus(ctx, txID)
txh, err := p.GetTransactionByIDWithStatus(ctx, txID, false)
if err != nil || txh == nil {
return nil, err
}
Expand Down Expand Up @@ -448,12 +448,15 @@ func migrateTX(tx *apitypes.ManagedTX) {
tx.DeprecatedTransactionHeaders = &tx.TransactionHeaders
}

func (p *leveldbPersistence) GetTransactionByIDWithStatus(ctx context.Context, txID string) (tx *apitypes.TXWithStatus, err error) {
func (p *leveldbPersistence) GetTransactionByIDWithStatus(ctx context.Context, txID string, history bool) (tx *apitypes.TXWithStatus, err error) {
p.txMux.RLock()
defer p.txMux.RUnlock()
err = p.readJSON(ctx, txDataKey(txID), &tx)
if tx != nil {
migrateTX(tx.ManagedTX)
if !history {
tx.History = nil
}
}
return tx, err
}
Expand Down Expand Up @@ -497,7 +500,7 @@ func (p *leveldbPersistence) InsertTransactionWithNextNonce(ctx context.Context,
}

func (p *leveldbPersistence) getPersistedTX(ctx context.Context, txID string) (tx *apitypes.TXWithStatus, err error) {
tx, err = p.GetTransactionByIDWithStatus(ctx, txID)
tx, err = p.GetTransactionByIDWithStatus(ctx, txID, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -718,7 +721,7 @@ func (p *leveldbPersistence) AddSubStatusAction(ctx context.Context, txID string
for _, entry := range currentSubStatus.Actions {
if entry.Action == action {
alreadyRecordedAction = true
entry.Count++
entry.OccurrenceCount++
entry.LastOccurrence = fftypes.Now()

if errInfo != nil {
Expand All @@ -736,10 +739,10 @@ func (p *leveldbPersistence) AddSubStatusAction(ctx context.Context, txID string
if !alreadyRecordedAction {
// If this is an entirely new action for this status entry, add it to the list
newAction := &apitypes.TxHistoryActionEntry{
Time: fftypes.Now(),
Action: action,
LastOccurrence: fftypes.Now(),
Count: 1,
Time: fftypes.Now(),
Action: action,
LastOccurrence: fftypes.Now(),
OccurrenceCount: 1,
}

if errInfo != nil {
Expand Down
51 changes: 28 additions & 23 deletions internal/persistence/leveldb/leveldb_persistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ func TestManagedTXSubStatus(t *testing.T) {
assert.NoError(t, err)
}

txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID)
txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID, true)
assert.NoError(t, err)
assert.Equal(t, 1, len(txh.History))
assert.Equal(t, "Received", string(txh.History[0].Status))
Expand All @@ -850,7 +850,7 @@ func TestManagedTXSubStatus(t *testing.T) {
err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusTracking, apitypes.TxActionAssignNonce, nil, nil)
assert.NoError(t, err)

txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID)
txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true)
assert.NoError(t, err)
assert.Equal(t, 2, len(txh.History))

Expand All @@ -863,7 +863,7 @@ func TestManagedTXSubStatus(t *testing.T) {
assert.NoError(t, err)
}

txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID)
txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true)
assert.NoError(t, err)
assert.Equal(t, 50, len(txh.History))

Expand All @@ -879,22 +879,22 @@ func TestManagedTXSubStatusRepeat(t *testing.T) {
// Add a sub-status
err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusReceived, apitypes.TxActionAssignNonce, nil, nil)
assert.NoError(t, err)
txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID)
txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID, true)
assert.NoError(t, err)
assert.Equal(t, 1, len(txh.History))
assert.Equal(t, 2, len(txh.DeprecatedHistorySummary))

// Add another sub-status
err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusTracking, apitypes.TxActionSubmitTransaction, nil, nil)
assert.NoError(t, err)
txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID)
txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true)
assert.Equal(t, 2, len(txh.History))
assert.Equal(t, 4, len(txh.DeprecatedHistorySummary))

// Add another that we've seen before
err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusReceived, apitypes.TxActionSubmitTransaction, nil, nil)
assert.NoError(t, err)
txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID)
txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true)
assert.Equal(t, 3, len(txh.History)) // This goes up
assert.Equal(t, 4, len(txh.DeprecatedHistorySummary)) // This doesn't
}
Expand All @@ -913,32 +913,32 @@ func TestManagedTXSubStatusAction(t *testing.T) {
// Add an action
err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusReceived, apitypes.TxActionAssignNonce, nil, nil)
assert.NoError(t, err)
txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID)
txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID, true)
assert.NoError(t, err)
assert.Equal(t, 1, len(txh.History[0].Actions))
assert.Nil(t, txh.History[0].Actions[0].LastErrorTime)

// Add another action
err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusReceived, apitypes.TxActionRetrieveGasPrice, nil, fftypes.JSONAnyPtr(`{"gasError":"Acme Gas Oracle RC=12345"}`))
assert.NoError(t, err)
txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID)
txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true)
assert.NoError(t, err)
assert.Equal(t, 2, len(txh.History[0].Actions))
assert.Equal(t, (*txh.History[0].Actions[1].LastError).String(), `{"gasError":"Acme Gas Oracle RC=12345"}`)

// Add the same action which should cause the previous one to inc its counter
err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusReceived, apitypes.TxActionRetrieveGasPrice, fftypes.JSONAnyPtr(`{"info":"helloworld"}`), fftypes.JSONAnyPtr(`{"error":"nogood"}`))
assert.NoError(t, err)
txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID)
txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true)
assert.NoError(t, err)
assert.Equal(t, 2, len(txh.History[0].Actions))
assert.Equal(t, txh.History[0].Actions[1].Action, apitypes.TxActionRetrieveGasPrice)
assert.Equal(t, 2, txh.History[0].Actions[1].Count)
assert.Equal(t, 2, txh.History[0].Actions[1].OccurrenceCount)

// Add the same action but with new error information should update the last error field
err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusReceived, apitypes.TxActionRetrieveGasPrice, nil, fftypes.JSONAnyPtr(`{"gasError":"Acme Gas Oracle RC=67890"}`))
assert.NoError(t, err)
txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID)
txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true)
assert.NoError(t, err)
assert.Equal(t, 2, len(txh.History[0].Actions))
assert.NotNil(t, txh.History[0].Actions[1].LastErrorTime)
Expand All @@ -948,41 +948,41 @@ func TestManagedTXSubStatusAction(t *testing.T) {
reason := "known_transaction"
err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusReceived, apitypes.TxActionSubmitTransaction, fftypes.JSONAnyPtr(`{"reason":"`+reason+`"}`), nil)
assert.NoError(t, err)
txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID)
txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true)
assert.NoError(t, err)
assert.Equal(t, 3, len(txh.History[0].Actions))
assert.Equal(t, txh.History[0].Actions[2].Action, apitypes.TxActionSubmitTransaction)
assert.Equal(t, 1, txh.History[0].Actions[2].Count)
assert.Equal(t, 1, txh.History[0].Actions[2].OccurrenceCount)
assert.Nil(t, txh.History[0].Actions[2].LastErrorTime)

// Add one more type of action

receiptId := "123456"
err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusReceived, apitypes.TxActionReceiveReceipt, fftypes.JSONAnyPtr(`{"receiptId":"`+receiptId+`"}`), nil)
assert.NoError(t, err)
txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID)
txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true)
assert.NoError(t, err)
assert.Equal(t, 4, len(txh.History[0].Actions))
assert.Equal(t, txh.History[0].Actions[3].Action, apitypes.TxActionReceiveReceipt)
assert.Equal(t, 1, txh.History[0].Actions[3].Count)
assert.Equal(t, 1, txh.History[0].Actions[3].OccurrenceCount)
assert.Nil(t, txh.History[0].Actions[3].LastErrorTime)

// History is the complete list of unique sub-status types and actions
txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID)
txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true)
assert.NoError(t, err)
assert.Equal(t, 5, len(txh.DeprecatedHistorySummary))

// Add some new sub-status and actions to check max lengths are correct
// Seen one of these before - should increase summary length by 1
err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusConfirmed, apitypes.TxActionReceiveReceipt, fftypes.JSONAnyPtr(`{"receiptId":"`+receiptId+`"}`), nil)
assert.NoError(t, err)
txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID)
txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true)
assert.Equal(t, 6, len(txh.DeprecatedHistorySummary))

// Seen both of these before - no change expected
err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusReceived, apitypes.TxActionAssignNonce, nil, nil)
assert.NoError(t, err)
txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID)
txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true)
assert.NoError(t, err)
assert.Equal(t, 6, len(txh.DeprecatedHistorySummary))

Expand Down Expand Up @@ -1074,7 +1074,7 @@ func TestManagedTXSubStatusInvalidJSON(t *testing.T) {
// Add a new type of action
err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusReceived, apitypes.TxActionSubmitTransaction, fftypes.JSONAnyPtr(`{"reason":"`+reason+`"}`), nil)
assert.NoError(t, err)
txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID)
txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID, true)
assert.NoError(t, err)
val, err := json.Marshal(txh.History[0].Actions[0].LastInfo)

Expand All @@ -1101,7 +1101,7 @@ func TestManagedTXSubStatusMaxEntries(t *testing.T) {
assert.NoError(t, err)
}

txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID)
txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID, true)
assert.NoError(t, err)
assert.Equal(t, 50, len(txh.History))

Expand All @@ -1119,7 +1119,7 @@ func TestMaxHistoryCountSetToZero(t *testing.T) {
err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusReceived, apitypes.TxActionSubmitTransaction, nil, nil)
assert.NoError(t, err)

txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID)
txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID, true)
assert.NoError(t, err)
assert.Equal(t, 0, len(txh.History))
assert.Equal(t, 0, len(txh.DeprecatedHistorySummary))
Expand All @@ -1134,14 +1134,19 @@ func TestAddReceivedStatusWhenNothingSet(t *testing.T) {
err := p.InsertTransactionWithNextNonce(ctx, mtx, func(ctx context.Context, signer string) (uint64, error) { return 12345, nil })
assert.NoError(t, err)

txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID)
txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID, true)
assert.NoError(t, err)
assert.NotNil(t, 0, txh.History)
assert.Equal(t, 0, len(txh.History))

txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, false)
assert.NoError(t, err)
assert.Nil(t, txh.History)

err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusReceived, apitypes.TxActionSubmitTransaction, nil, nil)
assert.NoError(t, err)

txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID)
txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true)
assert.NoError(t, err)
assert.Equal(t, 1, len(txh.History))
assert.Equal(t, 1, len(txh.History[0].Actions))
Expand Down
4 changes: 2 additions & 2 deletions internal/persistence/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ var TXHistoryFilters = &ffapi.QueryFields{
"lastoccurrence": &ffapi.TimeField{},
"substatus": &ffapi.StringField{},
"action": &ffapi.StringField{},
"count": &ffapi.Int64Field{},
"occurrences": &ffapi.Int64Field{},
"lasterror": &ffapi.JSONField{},
"lasterrortime": &ffapi.TimeField{},
"lastinfo": &ffapi.JSONField{},
Expand Down Expand Up @@ -173,7 +173,7 @@ type TransactionPersistence interface {
ListTransactionsByNonce(ctx context.Context, signer string, after *fftypes.FFBigInt, limit int, dir SortDirection) ([]*apitypes.ManagedTX, error) // reverse nonce order within signer
ListTransactionsPending(ctx context.Context, afterSequenceID string, limit int, dir SortDirection) ([]*apitypes.ManagedTX, error) // reverse insertion order, only those in pending state
GetTransactionByID(ctx context.Context, txID string) (*apitypes.ManagedTX, error)
GetTransactionByIDWithStatus(ctx context.Context, txID string) (*apitypes.TXWithStatus, error)
GetTransactionByIDWithStatus(ctx context.Context, txID string, history bool) (*apitypes.TXWithStatus, error)
GetTransactionByNonce(ctx context.Context, signer string, nonce *fftypes.FFBigInt) (*apitypes.ManagedTX, error)
InsertTransactionWithNextNonce(ctx context.Context, tx *apitypes.ManagedTX, lookupNextNonce NextNonceCallback) error
UpdateTransaction(ctx context.Context, txID string, updates *apitypes.TXUpdates) error
Expand Down
2 changes: 1 addition & 1 deletion internal/persistence/postgres/sqlpersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func InitConfig(conf config.Section) {

conf.AddKnownKey(ConfigTXWriterCacheSlots, 1000)
conf.AddKnownKey(ConfigTXWriterHistorySummaryLimit, 50) // returned on TX status
conf.AddKnownKey(ConfigTXWriterHistoryCompactionInterval, "5m")
conf.AddKnownKey(ConfigTXWriterHistoryCompactionInterval, "0" /* disabled by default */)
conf.AddKnownKey(ConfigTXWriterCount, 5)
conf.AddKnownKey(ConfigTXWriterBatchTimeout, "10ms")
conf.AddKnownKey(ConfigTXWriterBatchSize, 100)
Expand Down
5 changes: 4 additions & 1 deletion internal/persistence/postgres/sqlpersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@ import (
"github.com/stretchr/testify/assert"
)

func newMockSQLPersistence(t *testing.T) (context.Context, *sqlPersistence, sqlmock.Sqlmock, func()) {
func newMockSQLPersistence(t *testing.T, init ...func(dbconf config.Section)) (context.Context, *sqlPersistence, sqlmock.Sqlmock, func()) {

ctx, cancelCtx := context.WithCancel(context.Background())
db, dbm := dbsql.NewMockProvider().UTInit()

config.RootConfigReset()
dbconf := config.RootSection("utdb")
InitConfig(dbconf)
for _, fn := range init {
fn(dbconf)
}

p, err := newSQLPersistence(ctx, &db.Database, dbconf, 1*time.Hour)
assert.NoError(t, err)
Expand Down
10 changes: 6 additions & 4 deletions internal/persistence/postgres/transaction_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,12 @@ func (tw *transactionWriter) executeBatchOps(ctx context.Context, b *transaction
}
}
// Do the compression checks
for txID := range b.compressionChecks {
if err := tw.compressionCheck(ctx, txID); err != nil {
log.L(ctx).Errorf("Compression check for %s failed: %s", txID, err)
return err
if tw.compressionInterval > 0 {
for txID := range b.compressionChecks {
if err := tw.compressionCheck(ctx, txID); err != nil {
log.L(ctx).Errorf("Compression check for %s failed: %s", txID, err)
return err
}
}
}
// Do all the transaction deletes
Expand Down
19 changes: 11 additions & 8 deletions internal/persistence/postgres/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (p *sqlPersistence) GetTransactionByID(ctx context.Context, txID string) (*
return p.transactions.GetByID(ctx, txID)
}

func (p *sqlPersistence) GetTransactionByIDWithStatus(ctx context.Context, txID string) (*apitypes.TXWithStatus, error) {
func (p *sqlPersistence) GetTransactionByIDWithStatus(ctx context.Context, txID string, withHistory bool) (*apitypes.TXWithStatus, error) {
tx, err := p.transactions.GetByID(ctx, txID)
if tx == nil || err != nil {
return nil, err
Expand All @@ -193,16 +193,19 @@ func (p *sqlPersistence) GetTransactionByIDWithStatus(ctx context.Context, txID
if err != nil {
return nil, err
}
history, err := p.buildHistorySummary(ctx, txID, true, p.historySummaryLimit, nil)
if err != nil {
return nil, err
}
return &apitypes.TXWithStatus{
txh := &apitypes.TXWithStatus{
ManagedTX: tx,
Receipt: receipt,
Confirmations: confirmations,
History: history.entries,
}, nil
}
if withHistory {
history, err := p.buildHistorySummary(ctx, txID, true, p.historySummaryLimit, nil)
if err != nil {
return nil, err
}
txh.History = history.entries
}
return txh, nil
}

func (p *sqlPersistence) GetTransactionByNonce(ctx context.Context, signer string, nonce *fftypes.FFBigInt) (*apitypes.ManagedTX, error) {
Expand Down
Loading

0 comments on commit 7b4c815

Please sign in to comment.