diff --git a/config.md b/config.md index 1c9f3f35..998d146e 100644 --- a/config.md +++ b/config.md @@ -207,7 +207,7 @@ |batchTimeout|Duration to hold batch open for new transaction operations before flushing to the DB|[`time.Duration`](https://pkg.go.dev/time#Duration)|`10ms` |cacheSlots|Number of transactions to hold cached metadata for to avoid DB read operations to calculate history|`int`|`1000` |count|Number of transactions writing routines to start|`int`|`5` -|historyCompactionInterval|Duration between cleanup activities on the DB for a transaction with a large history|[`time.Duration`](https://pkg.go.dev/time#Duration)|`5m` +|historyCompactionInterval|Duration between cleanup activities on the DB for a transaction with a large history|[`time.Duration`](https://pkg.go.dev/time#Duration)|`0` |historySummaryLimit|Maximum number of action entries to return embedded in the JSON response object when querying a transaction summary|`int`|`50` ## policyengine diff --git a/internal/persistence/leveldb/leveldb_persistence.go b/internal/persistence/leveldb/leveldb_persistence.go index 6d7269c2..737dd011 100644 --- a/internal/persistence/leveldb/leveldb_persistence.go +++ b/internal/persistence/leveldb/leveldb_persistence.go @@ -398,7 +398,7 @@ func (p *leveldbPersistence) ListTransactionsPending(ctx context.Context, afterS func (p *leveldbPersistence) GetTransactionByID(ctx context.Context, txID string) (tx *apitypes.ManagedTX, err error) { p.txMux.RLock() defer p.txMux.RUnlock() - txh, err := p.GetTransactionByIDWithStatus(ctx, txID) + txh, err := p.GetTransactionByIDWithStatus(ctx, txID, false) if err != nil || txh == nil { return nil, err } @@ -408,7 +408,7 @@ func (p *leveldbPersistence) GetTransactionByID(ctx context.Context, txID string func (p *leveldbPersistence) GetTransactionReceipt(ctx context.Context, txID string) (receipt *ffcapi.TransactionReceiptResponse, err error) { p.txMux.RLock() defer p.txMux.RUnlock() - txh, err := p.GetTransactionByIDWithStatus(ctx, txID) + txh, err := p.GetTransactionByIDWithStatus(ctx, txID, false) if err != nil || txh == nil { return nil, err } @@ -418,7 +418,7 @@ func (p *leveldbPersistence) GetTransactionReceipt(ctx context.Context, txID str func (p *leveldbPersistence) GetTransactionConfirmations(ctx context.Context, txID string) (confirmations []*apitypes.Confirmation, err error) { p.txMux.RLock() defer p.txMux.RUnlock() - txh, err := p.GetTransactionByIDWithStatus(ctx, txID) + txh, err := p.GetTransactionByIDWithStatus(ctx, txID, false) if err != nil || txh == nil { return nil, err } @@ -448,12 +448,15 @@ func migrateTX(tx *apitypes.ManagedTX) { tx.DeprecatedTransactionHeaders = &tx.TransactionHeaders } -func (p *leveldbPersistence) GetTransactionByIDWithStatus(ctx context.Context, txID string) (tx *apitypes.TXWithStatus, err error) { +func (p *leveldbPersistence) GetTransactionByIDWithStatus(ctx context.Context, txID string, history bool) (tx *apitypes.TXWithStatus, err error) { p.txMux.RLock() defer p.txMux.RUnlock() err = p.readJSON(ctx, txDataKey(txID), &tx) if tx != nil { migrateTX(tx.ManagedTX) + if !history { + tx.History = nil + } } return tx, err } @@ -497,7 +500,7 @@ func (p *leveldbPersistence) InsertTransactionWithNextNonce(ctx context.Context, } func (p *leveldbPersistence) getPersistedTX(ctx context.Context, txID string) (tx *apitypes.TXWithStatus, err error) { - tx, err = p.GetTransactionByIDWithStatus(ctx, txID) + tx, err = p.GetTransactionByIDWithStatus(ctx, txID, true) if err != nil { return nil, err } @@ -718,7 +721,7 @@ func (p *leveldbPersistence) AddSubStatusAction(ctx context.Context, txID string for _, entry := range currentSubStatus.Actions { if entry.Action == action { alreadyRecordedAction = true - entry.Count++ + entry.OccurrenceCount++ entry.LastOccurrence = fftypes.Now() if errInfo != nil { @@ -736,10 +739,10 @@ func (p *leveldbPersistence) AddSubStatusAction(ctx context.Context, txID string if !alreadyRecordedAction { // If this is an entirely new action for this status entry, add it to the list newAction := &apitypes.TxHistoryActionEntry{ - Time: fftypes.Now(), - Action: action, - LastOccurrence: fftypes.Now(), - Count: 1, + Time: fftypes.Now(), + Action: action, + LastOccurrence: fftypes.Now(), + OccurrenceCount: 1, } if errInfo != nil { diff --git a/internal/persistence/leveldb/leveldb_persistence_test.go b/internal/persistence/leveldb/leveldb_persistence_test.go index cb8c7452..c4a688ee 100644 --- a/internal/persistence/leveldb/leveldb_persistence_test.go +++ b/internal/persistence/leveldb/leveldb_persistence_test.go @@ -840,7 +840,7 @@ func TestManagedTXSubStatus(t *testing.T) { assert.NoError(t, err) } - txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID) + txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID, true) assert.NoError(t, err) assert.Equal(t, 1, len(txh.History)) assert.Equal(t, "Received", string(txh.History[0].Status)) @@ -850,7 +850,7 @@ func TestManagedTXSubStatus(t *testing.T) { err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusTracking, apitypes.TxActionAssignNonce, nil, nil) assert.NoError(t, err) - txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID) + txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true) assert.NoError(t, err) assert.Equal(t, 2, len(txh.History)) @@ -863,7 +863,7 @@ func TestManagedTXSubStatus(t *testing.T) { assert.NoError(t, err) } - txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID) + txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true) assert.NoError(t, err) assert.Equal(t, 50, len(txh.History)) @@ -879,7 +879,7 @@ func TestManagedTXSubStatusRepeat(t *testing.T) { // Add a sub-status err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusReceived, apitypes.TxActionAssignNonce, nil, nil) assert.NoError(t, err) - txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID) + txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID, true) assert.NoError(t, err) assert.Equal(t, 1, len(txh.History)) assert.Equal(t, 2, len(txh.DeprecatedHistorySummary)) @@ -887,14 +887,14 @@ func TestManagedTXSubStatusRepeat(t *testing.T) { // Add another sub-status err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusTracking, apitypes.TxActionSubmitTransaction, nil, nil) assert.NoError(t, err) - txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID) + txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true) assert.Equal(t, 2, len(txh.History)) assert.Equal(t, 4, len(txh.DeprecatedHistorySummary)) // Add another that we've seen before err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusReceived, apitypes.TxActionSubmitTransaction, nil, nil) assert.NoError(t, err) - txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID) + txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true) assert.Equal(t, 3, len(txh.History)) // This goes up assert.Equal(t, 4, len(txh.DeprecatedHistorySummary)) // This doesn't } @@ -913,7 +913,7 @@ func TestManagedTXSubStatusAction(t *testing.T) { // Add an action err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusReceived, apitypes.TxActionAssignNonce, nil, nil) assert.NoError(t, err) - txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID) + txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID, true) assert.NoError(t, err) assert.Equal(t, 1, len(txh.History[0].Actions)) assert.Nil(t, txh.History[0].Actions[0].LastErrorTime) @@ -921,7 +921,7 @@ func TestManagedTXSubStatusAction(t *testing.T) { // Add another action err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusReceived, apitypes.TxActionRetrieveGasPrice, nil, fftypes.JSONAnyPtr(`{"gasError":"Acme Gas Oracle RC=12345"}`)) assert.NoError(t, err) - txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID) + txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true) assert.NoError(t, err) assert.Equal(t, 2, len(txh.History[0].Actions)) assert.Equal(t, (*txh.History[0].Actions[1].LastError).String(), `{"gasError":"Acme Gas Oracle RC=12345"}`) @@ -929,16 +929,16 @@ func TestManagedTXSubStatusAction(t *testing.T) { // Add the same action which should cause the previous one to inc its counter err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusReceived, apitypes.TxActionRetrieveGasPrice, fftypes.JSONAnyPtr(`{"info":"helloworld"}`), fftypes.JSONAnyPtr(`{"error":"nogood"}`)) assert.NoError(t, err) - txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID) + txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true) assert.NoError(t, err) assert.Equal(t, 2, len(txh.History[0].Actions)) assert.Equal(t, txh.History[0].Actions[1].Action, apitypes.TxActionRetrieveGasPrice) - assert.Equal(t, 2, txh.History[0].Actions[1].Count) + assert.Equal(t, 2, txh.History[0].Actions[1].OccurrenceCount) // Add the same action but with new error information should update the last error field err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusReceived, apitypes.TxActionRetrieveGasPrice, nil, fftypes.JSONAnyPtr(`{"gasError":"Acme Gas Oracle RC=67890"}`)) assert.NoError(t, err) - txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID) + txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true) assert.NoError(t, err) assert.Equal(t, 2, len(txh.History[0].Actions)) assert.NotNil(t, txh.History[0].Actions[1].LastErrorTime) @@ -948,11 +948,11 @@ func TestManagedTXSubStatusAction(t *testing.T) { reason := "known_transaction" err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusReceived, apitypes.TxActionSubmitTransaction, fftypes.JSONAnyPtr(`{"reason":"`+reason+`"}`), nil) assert.NoError(t, err) - txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID) + txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true) assert.NoError(t, err) assert.Equal(t, 3, len(txh.History[0].Actions)) assert.Equal(t, txh.History[0].Actions[2].Action, apitypes.TxActionSubmitTransaction) - assert.Equal(t, 1, txh.History[0].Actions[2].Count) + assert.Equal(t, 1, txh.History[0].Actions[2].OccurrenceCount) assert.Nil(t, txh.History[0].Actions[2].LastErrorTime) // Add one more type of action @@ -960,15 +960,15 @@ func TestManagedTXSubStatusAction(t *testing.T) { receiptId := "123456" err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusReceived, apitypes.TxActionReceiveReceipt, fftypes.JSONAnyPtr(`{"receiptId":"`+receiptId+`"}`), nil) assert.NoError(t, err) - txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID) + txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true) assert.NoError(t, err) assert.Equal(t, 4, len(txh.History[0].Actions)) assert.Equal(t, txh.History[0].Actions[3].Action, apitypes.TxActionReceiveReceipt) - assert.Equal(t, 1, txh.History[0].Actions[3].Count) + assert.Equal(t, 1, txh.History[0].Actions[3].OccurrenceCount) assert.Nil(t, txh.History[0].Actions[3].LastErrorTime) // History is the complete list of unique sub-status types and actions - txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID) + txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true) assert.NoError(t, err) assert.Equal(t, 5, len(txh.DeprecatedHistorySummary)) @@ -976,13 +976,13 @@ func TestManagedTXSubStatusAction(t *testing.T) { // Seen one of these before - should increase summary length by 1 err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusConfirmed, apitypes.TxActionReceiveReceipt, fftypes.JSONAnyPtr(`{"receiptId":"`+receiptId+`"}`), nil) assert.NoError(t, err) - txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID) + txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true) assert.Equal(t, 6, len(txh.DeprecatedHistorySummary)) // Seen both of these before - no change expected err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusReceived, apitypes.TxActionAssignNonce, nil, nil) assert.NoError(t, err) - txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID) + txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true) assert.NoError(t, err) assert.Equal(t, 6, len(txh.DeprecatedHistorySummary)) @@ -1074,7 +1074,7 @@ func TestManagedTXSubStatusInvalidJSON(t *testing.T) { // Add a new type of action err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusReceived, apitypes.TxActionSubmitTransaction, fftypes.JSONAnyPtr(`{"reason":"`+reason+`"}`), nil) assert.NoError(t, err) - txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID) + txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID, true) assert.NoError(t, err) val, err := json.Marshal(txh.History[0].Actions[0].LastInfo) @@ -1101,7 +1101,7 @@ func TestManagedTXSubStatusMaxEntries(t *testing.T) { assert.NoError(t, err) } - txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID) + txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID, true) assert.NoError(t, err) assert.Equal(t, 50, len(txh.History)) @@ -1119,7 +1119,7 @@ func TestMaxHistoryCountSetToZero(t *testing.T) { err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusReceived, apitypes.TxActionSubmitTransaction, nil, nil) assert.NoError(t, err) - txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID) + txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID, true) assert.NoError(t, err) assert.Equal(t, 0, len(txh.History)) assert.Equal(t, 0, len(txh.DeprecatedHistorySummary)) @@ -1134,14 +1134,19 @@ func TestAddReceivedStatusWhenNothingSet(t *testing.T) { err := p.InsertTransactionWithNextNonce(ctx, mtx, func(ctx context.Context, signer string) (uint64, error) { return 12345, nil }) assert.NoError(t, err) - txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID) + txh, err := p.GetTransactionByIDWithStatus(ctx, mtx.ID, true) assert.NoError(t, err) + assert.NotNil(t, 0, txh.History) assert.Equal(t, 0, len(txh.History)) + txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, false) + assert.NoError(t, err) + assert.Nil(t, txh.History) + err = p.AddSubStatusAction(ctx, mtx.ID, apitypes.TxSubStatusReceived, apitypes.TxActionSubmitTransaction, nil, nil) assert.NoError(t, err) - txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID) + txh, err = p.GetTransactionByIDWithStatus(ctx, mtx.ID, true) assert.NoError(t, err) assert.Equal(t, 1, len(txh.History)) assert.Equal(t, 1, len(txh.History[0].Actions)) diff --git a/internal/persistence/persistence.go b/internal/persistence/persistence.go index c729444e..bc81fac9 100644 --- a/internal/persistence/persistence.go +++ b/internal/persistence/persistence.go @@ -130,7 +130,7 @@ var TXHistoryFilters = &ffapi.QueryFields{ "lastoccurrence": &ffapi.TimeField{}, "substatus": &ffapi.StringField{}, "action": &ffapi.StringField{}, - "count": &ffapi.Int64Field{}, + "occurrences": &ffapi.Int64Field{}, "lasterror": &ffapi.JSONField{}, "lasterrortime": &ffapi.TimeField{}, "lastinfo": &ffapi.JSONField{}, @@ -173,7 +173,7 @@ type TransactionPersistence interface { ListTransactionsByNonce(ctx context.Context, signer string, after *fftypes.FFBigInt, limit int, dir SortDirection) ([]*apitypes.ManagedTX, error) // reverse nonce order within signer ListTransactionsPending(ctx context.Context, afterSequenceID string, limit int, dir SortDirection) ([]*apitypes.ManagedTX, error) // reverse insertion order, only those in pending state GetTransactionByID(ctx context.Context, txID string) (*apitypes.ManagedTX, error) - GetTransactionByIDWithStatus(ctx context.Context, txID string) (*apitypes.TXWithStatus, error) + GetTransactionByIDWithStatus(ctx context.Context, txID string, history bool) (*apitypes.TXWithStatus, error) GetTransactionByNonce(ctx context.Context, signer string, nonce *fftypes.FFBigInt) (*apitypes.ManagedTX, error) InsertTransactionWithNextNonce(ctx context.Context, tx *apitypes.ManagedTX, lookupNextNonce NextNonceCallback) error UpdateTransaction(ctx context.Context, txID string, updates *apitypes.TXUpdates) error diff --git a/internal/persistence/postgres/sqlpersistence.go b/internal/persistence/postgres/sqlpersistence.go index 3eba513a..4cbea421 100644 --- a/internal/persistence/postgres/sqlpersistence.go +++ b/internal/persistence/postgres/sqlpersistence.go @@ -62,7 +62,7 @@ func InitConfig(conf config.Section) { conf.AddKnownKey(ConfigTXWriterCacheSlots, 1000) conf.AddKnownKey(ConfigTXWriterHistorySummaryLimit, 50) // returned on TX status - conf.AddKnownKey(ConfigTXWriterHistoryCompactionInterval, "5m") + conf.AddKnownKey(ConfigTXWriterHistoryCompactionInterval, "0" /* disabled by default */) conf.AddKnownKey(ConfigTXWriterCount, 5) conf.AddKnownKey(ConfigTXWriterBatchTimeout, "10ms") conf.AddKnownKey(ConfigTXWriterBatchSize, 100) diff --git a/internal/persistence/postgres/sqlpersistence_test.go b/internal/persistence/postgres/sqlpersistence_test.go index e9cac418..0c1e9f22 100644 --- a/internal/persistence/postgres/sqlpersistence_test.go +++ b/internal/persistence/postgres/sqlpersistence_test.go @@ -28,7 +28,7 @@ import ( "github.com/stretchr/testify/assert" ) -func newMockSQLPersistence(t *testing.T) (context.Context, *sqlPersistence, sqlmock.Sqlmock, func()) { +func newMockSQLPersistence(t *testing.T, init ...func(dbconf config.Section)) (context.Context, *sqlPersistence, sqlmock.Sqlmock, func()) { ctx, cancelCtx := context.WithCancel(context.Background()) db, dbm := dbsql.NewMockProvider().UTInit() @@ -36,6 +36,9 @@ func newMockSQLPersistence(t *testing.T) (context.Context, *sqlPersistence, sqlm config.RootConfigReset() dbconf := config.RootSection("utdb") InitConfig(dbconf) + for _, fn := range init { + fn(dbconf) + } p, err := newSQLPersistence(ctx, &db.Database, dbconf, 1*time.Hour) assert.NoError(t, err) diff --git a/internal/persistence/postgres/transaction_writer.go b/internal/persistence/postgres/transaction_writer.go index d7b29aea..9a5beadf 100644 --- a/internal/persistence/postgres/transaction_writer.go +++ b/internal/persistence/postgres/transaction_writer.go @@ -431,10 +431,12 @@ func (tw *transactionWriter) executeBatchOps(ctx context.Context, b *transaction } } // Do the compression checks - for txID := range b.compressionChecks { - if err := tw.compressionCheck(ctx, txID); err != nil { - log.L(ctx).Errorf("Compression check for %s failed: %s", txID, err) - return err + if tw.compressionInterval > 0 { + for txID := range b.compressionChecks { + if err := tw.compressionCheck(ctx, txID); err != nil { + log.L(ctx).Errorf("Compression check for %s failed: %s", txID, err) + return err + } } } // Do all the transaction deletes diff --git a/internal/persistence/postgres/transactions.go b/internal/persistence/postgres/transactions.go index 913a5a9f..4a3ee8ce 100644 --- a/internal/persistence/postgres/transactions.go +++ b/internal/persistence/postgres/transactions.go @@ -176,7 +176,7 @@ func (p *sqlPersistence) GetTransactionByID(ctx context.Context, txID string) (* return p.transactions.GetByID(ctx, txID) } -func (p *sqlPersistence) GetTransactionByIDWithStatus(ctx context.Context, txID string) (*apitypes.TXWithStatus, error) { +func (p *sqlPersistence) GetTransactionByIDWithStatus(ctx context.Context, txID string, withHistory bool) (*apitypes.TXWithStatus, error) { tx, err := p.transactions.GetByID(ctx, txID) if tx == nil || err != nil { return nil, err @@ -193,16 +193,19 @@ func (p *sqlPersistence) GetTransactionByIDWithStatus(ctx context.Context, txID if err != nil { return nil, err } - history, err := p.buildHistorySummary(ctx, txID, true, p.historySummaryLimit, nil) - if err != nil { - return nil, err - } - return &apitypes.TXWithStatus{ + txh := &apitypes.TXWithStatus{ ManagedTX: tx, Receipt: receipt, Confirmations: confirmations, - History: history.entries, - }, nil + } + if withHistory { + history, err := p.buildHistorySummary(ctx, txID, true, p.historySummaryLimit, nil) + if err != nil { + return nil, err + } + txh.History = history.entries + } + return txh, nil } func (p *sqlPersistence) GetTransactionByNonce(ctx context.Context, signer string, nonce *fftypes.FFBigInt) (*apitypes.ManagedTX, error) { diff --git a/internal/persistence/postgres/transactions_test.go b/internal/persistence/postgres/transactions_test.go index ce7f2dbf..07a63c1e 100644 --- a/internal/persistence/postgres/transactions_test.go +++ b/internal/persistence/postgres/transactions_test.go @@ -133,7 +133,7 @@ func TestTransactionBasicValidationPSQL(t *testing.T) { assert.NoError(t, err) // Get back the merged object to check everything - mtx, err := p.GetTransactionByIDWithStatus(ctx, txID) + mtx, err := p.GetTransactionByIDWithStatus(ctx, txID, true) assert.NoError(t, err) assert.NotEqual(t, tx.Updated.String(), mtx.Updated.String()) @@ -168,13 +168,13 @@ func TestTransactionBasicValidationPSQL(t *testing.T) { Time: mtx.History[0].Time, Actions: []*apitypes.TxHistoryActionEntry{ // newest first { - Action: apitypes.TxActionSubmitTransaction, - Time: mtx.History[0].Actions[0].Time, - LastOccurrence: mtx.History[0].Actions[0].LastOccurrence, - Count: 1, - LastInfo: fftypes.JSONAnyPtr(`{"txhash":"0x12345"}`), - LastError: nil, - LastErrorTime: nil, + Action: apitypes.TxActionSubmitTransaction, + Time: mtx.History[0].Actions[0].Time, + LastOccurrence: mtx.History[0].Actions[0].LastOccurrence, + OccurrenceCount: 1, + LastInfo: fftypes.JSONAnyPtr(`{"txhash":"0x12345"}`), + LastError: nil, + LastErrorTime: nil, }, }, }, @@ -183,22 +183,22 @@ func TestTransactionBasicValidationPSQL(t *testing.T) { Time: mtx.History[1].Time, Actions: []*apitypes.TxHistoryActionEntry{ // newest first { - Action: apitypes.TxActionSubmitTransaction, - Time: mtx.History[1].Actions[0].Time, - LastOccurrence: mtx.History[1].Actions[0].LastOccurrence, - Count: 2, - LastInfo: nil, - LastError: fftypes.JSONAnyPtr(`"failed to submit 2"`), - LastErrorTime: mtx.History[1].Actions[0].LastErrorTime, + Action: apitypes.TxActionSubmitTransaction, + Time: mtx.History[1].Actions[0].Time, + LastOccurrence: mtx.History[1].Actions[0].LastOccurrence, + OccurrenceCount: 2, + LastInfo: nil, + LastError: fftypes.JSONAnyPtr(`"failed to submit 2"`), + LastErrorTime: mtx.History[1].Actions[0].LastErrorTime, }, { - Action: apitypes.TxActionAssignNonce, - Time: mtx.History[1].Actions[1].Time, - LastOccurrence: mtx.History[1].Actions[1].LastOccurrence, - Count: 1, - LastInfo: fftypes.JSONAnyPtr(`{"nonce":"11111"}`), - LastError: nil, - LastErrorTime: nil, + Action: apitypes.TxActionAssignNonce, + Time: mtx.History[1].Actions[1].Time, + LastOccurrence: mtx.History[1].Actions[1].LastOccurrence, + OccurrenceCount: 1, + LastInfo: fftypes.JSONAnyPtr(`{"nonce":"11111"}`), + LastError: nil, + LastErrorTime: nil, }, }, }, @@ -495,7 +495,7 @@ func TestGetTransactionByIDWithStatusHistorySummaryFail(t *testing.T) { ) mdb.ExpectQuery("SELECT.*txhistory").WillReturnError(fmt.Errorf("pop")) - _, err := p.GetTransactionByIDWithStatus(ctx, "tx1") + _, err := p.GetTransactionByIDWithStatus(ctx, "tx1", true) assert.Regexp(t, "FF00176", err) assert.NoError(t, mdb.ExpectationsWereMet()) @@ -511,7 +511,7 @@ func TestGetTransactionByIDWithStatusConfirmationsFail(t *testing.T) { ) mdb.ExpectQuery("SELECT.*confirmations").WillReturnError(fmt.Errorf("pop")) - _, err := p.GetTransactionByIDWithStatus(ctx, "tx1") + _, err := p.GetTransactionByIDWithStatus(ctx, "tx1", true) assert.Regexp(t, "FF00176", err) assert.NoError(t, mdb.ExpectationsWereMet()) @@ -524,7 +524,7 @@ func TestGetTransactionByIDWithStatusReceiptFail(t *testing.T) { mdb.ExpectQuery("SELECT.*transactions").WillReturnRows(newTXRow(p)) mdb.ExpectQuery("SELECT.*receipts").WillReturnError(fmt.Errorf("pop")) - _, err := p.GetTransactionByIDWithStatus(ctx, "tx1") + _, err := p.GetTransactionByIDWithStatus(ctx, "tx1", true) assert.Regexp(t, "FF00176", err) assert.NoError(t, mdb.ExpectationsWereMet()) @@ -536,7 +536,7 @@ func TestGetTransactionByIDTXFail(t *testing.T) { mdb.ExpectQuery("SELECT.*transactions").WillReturnError(fmt.Errorf("pop")) - _, err := p.GetTransactionByIDWithStatus(ctx, "tx1") + _, err := p.GetTransactionByIDWithStatus(ctx, "tx1", true) assert.Regexp(t, "FF00176", err) assert.NoError(t, mdb.ExpectationsWereMet()) diff --git a/internal/persistence/postgres/txhistory.go b/internal/persistence/postgres/txhistory.go index 3c3f628c..e2163d57 100644 --- a/internal/persistence/postgres/txhistory.go +++ b/internal/persistence/postgres/txhistory.go @@ -51,6 +51,7 @@ func (p *sqlPersistence) newTXHistoryCollection() *dbsql.CrudBase[*apitypes.TXHi "lasterror": "error", "lasterrortime": "error_time", "lastinfo": "info", + "occurrences": "count", }, PatchDisabled: true, TimesDisabled: true, @@ -67,7 +68,7 @@ func (p *sqlPersistence) newTXHistoryCollection() *dbsql.CrudBase[*apitypes.TXHi case "action": return &inst.Action case "count": - return &inst.Count + return &inst.OccurrenceCount case "time": return &inst.Time case "last_occurrence": @@ -99,12 +100,12 @@ func (p *sqlPersistence) AddSubStatusAction(ctx context.Context, txID string, su TransactionID: txID, SubStatus: subStatus, TxHistoryActionEntry: apitypes.TxHistoryActionEntry{ - Count: 1, - Time: now, - LastOccurrence: now, - Action: action, - LastInfo: persistence.JSONOrString(info), // guard against bad JSON - LastError: persistence.JSONOrString(errInfo), // guard against bad JSON + OccurrenceCount: 1, + Time: now, + LastOccurrence: now, + Action: action, + LastInfo: persistence.JSONOrString(info), // guard against bad JSON + LastError: persistence.JSONOrString(errInfo), // guard against bad JSON }, } if errInfo != nil { @@ -117,8 +118,8 @@ func (p *sqlPersistence) AddSubStatusAction(ctx context.Context, txID string, su func (p *sqlPersistence) compressHistory(ctx context.Context, txID string) error { result, err := p.buildHistorySummary(ctx, txID, false, 0, func(from, to *apitypes.TXHistoryRecord) error { update := persistence.TXHistoryFilters.NewUpdate(ctx). - Set("count", to.Count+1). // increment the count - Set("time", from.Time) // move the time on the newer record to be the time of the older record merged in + Set("occurrences", to.OccurrenceCount+1). // increment the count + Set("time", from.Time) // move the time on the newer record to be the time of the older record merged in if err := p.txHistory.Update(ctx, to.ID.String(), update); err != nil { return err } @@ -170,13 +171,13 @@ func (p *sqlPersistence) buildHistorySummary(ctx context.Context, txID string, b // Actions are also in descending order if buildResult { actionEntry := &apitypes.TxHistoryActionEntry{ - Time: h.Time, - LastOccurrence: h.LastOccurrence, - Action: h.Action, - Count: h.Count, - LastInfo: h.LastInfo, - LastError: h.LastError, - LastErrorTime: h.LastErrorTime, + Time: h.Time, + LastOccurrence: h.LastOccurrence, + Action: h.Action, + OccurrenceCount: h.OccurrenceCount, + LastInfo: h.LastInfo, + LastError: h.LastError, + LastErrorTime: h.LastErrorTime, } statusEntry := r.entries[len(r.entries)-1] statusEntry.Actions = append(statusEntry.Actions, actionEntry) @@ -193,11 +194,11 @@ func (p *sqlPersistence) buildHistorySummary(ctx context.Context, txID string, b return nil, err } } - lastRecordSameSubStatus.Count++ + lastRecordSameSubStatus.OccurrenceCount++ if buildResult { statusEntry := r.entries[len(r.entries)-1] actionEntry := statusEntry.Actions[len(statusEntry.Actions)-1] - actionEntry.Count++ + actionEntry.OccurrenceCount++ actionEntry.Time = h.Time } } diff --git a/internal/persistence/postgres/txhistory_test.go b/internal/persistence/postgres/txhistory_test.go index 584ca819..125ef59d 100644 --- a/internal/persistence/postgres/txhistory_test.go +++ b/internal/persistence/postgres/txhistory_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/DATA-DOG/go-sqlmock" + "github.com/hyperledger/firefly-common/pkg/config" "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly-transaction-manager/internal/persistence" "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" @@ -35,7 +36,9 @@ func TestTXHistoryCompressionPSQL(t *testing.T) { logrus.SetLevel(logrus.TraceLevel) // Do a set of transaction operations through the writers, and confirm the results are correct - ctx, p, _, done := initTestPSQL(t) + ctx, p, _, done := initTestPSQL(t, func(conf config.Section) { + conf.Set(ConfigTXWriterHistoryCompactionInterval, "5m") + }) defer done() // Write an initial transaction @@ -111,8 +114,8 @@ func TestTXHistoryCompressionPSQL(t *testing.T) { if h.LastError != nil { assert.NotNil(t, h.LastErrorTime) } - assert.NotZero(t, h.Count) - if h.Count == 1 { + assert.NotZero(t, h.OccurrenceCount) + if h.OccurrenceCount == 1 { assert.Equal(t, h.LastOccurrence.String(), h.Time.String()) } else { assert.GreaterOrEqual(t, *h.LastOccurrence.Time(), *h.Time.Time()) @@ -127,54 +130,54 @@ func TestTXHistoryCompressionPSQL(t *testing.T) { TransactionID: txID, SubStatus: apitypes.TxSubStatusConfirmed, TxHistoryActionEntry: apitypes.TxHistoryActionEntry{ - Action: apitypes.TxActionConfirmTransaction, - Count: 1, - LastInfo: fftypes.JSONAnyPtr(`{"confirmed":true}`), + Action: apitypes.TxActionConfirmTransaction, + OccurrenceCount: 1, + LastInfo: fftypes.JSONAnyPtr(`{"confirmed":true}`), }, }, { TransactionID: txID, SubStatus: apitypes.TxSubStatusTracking, TxHistoryActionEntry: apitypes.TxHistoryActionEntry{ - Action: apitypes.TxActionTimeout, - Count: 2, - LastError: fftypes.JSONAnyPtr(`"some error 555"`), + Action: apitypes.TxActionTimeout, + OccurrenceCount: 2, + LastError: fftypes.JSONAnyPtr(`"some error 555"`), }, }, { TransactionID: txID, SubStatus: apitypes.TxSubStatusTracking, TxHistoryActionEntry: apitypes.TxHistoryActionEntry{ - Action: apitypes.TxActionReceiveReceipt, - Count: 3, - LastInfo: fftypes.JSONAnyPtr(`{"transactionHash":"0x333333","blockNumber":"33333"}`), + Action: apitypes.TxActionReceiveReceipt, + OccurrenceCount: 3, + LastInfo: fftypes.JSONAnyPtr(`{"transactionHash":"0x333333","blockNumber":"33333"}`), }, }, { TransactionID: txID, SubStatus: apitypes.TxSubStatusTracking, TxHistoryActionEntry: apitypes.TxHistoryActionEntry{ - Action: apitypes.TxActionSubmitTransaction, - Count: 1, - LastInfo: fftypes.JSONAnyPtr(`{"transactionHash":"0x12345"}`), + Action: apitypes.TxActionSubmitTransaction, + OccurrenceCount: 1, + LastInfo: fftypes.JSONAnyPtr(`{"transactionHash":"0x12345"}`), }, }, { TransactionID: txID, SubStatus: apitypes.TxSubStatusReceived, TxHistoryActionEntry: apitypes.TxHistoryActionEntry{ - Action: apitypes.TxActionSubmitTransaction, - Count: 3, - LastError: fftypes.JSONAnyPtr(`"failed to submit 333"`), + Action: apitypes.TxActionSubmitTransaction, + OccurrenceCount: 3, + LastError: fftypes.JSONAnyPtr(`"failed to submit 333"`), }, }, { TransactionID: txID, SubStatus: apitypes.TxSubStatusReceived, TxHistoryActionEntry: apitypes.TxHistoryActionEntry{ - Action: apitypes.TxActionAssignNonce, - Count: 1, - LastInfo: fftypes.JSONAnyPtr(`{"nonce":"11111"}`), + Action: apitypes.TxActionAssignNonce, + OccurrenceCount: 1, + LastInfo: fftypes.JSONAnyPtr(`{"nonce":"11111"}`), }, }, }, history) @@ -182,7 +185,9 @@ func TestTXHistoryCompressionPSQL(t *testing.T) { } func TestCompactionFail(t *testing.T) { - ctx, p, mdb, done := newMockSQLPersistence(t) + ctx, p, mdb, done := newMockSQLPersistence(t, func(dbconf config.Section) { + dbconf.Set(ConfigTXWriterHistoryCompactionInterval, "5m") + }) defer done() longAgo := time.Now().Add(-1000 * time.Hour) @@ -238,7 +243,9 @@ func TestCompactionFail(t *testing.T) { } func TestCompactionOnCacheMiss(t *testing.T) { - ctx, p, mdb, done := newMockSQLPersistence(t) + ctx, p, mdb, done := newMockSQLPersistence(t, func(dbconf config.Section) { + dbconf.Set(ConfigTXWriterHistoryCompactionInterval, "5m") + }) defer done() b := &transactionWriterBatch{ diff --git a/internal/tmmsgs/en_api_descriptions.go b/internal/tmmsgs/en_api_descriptions.go index 74ee6d4b..f11767f2 100644 --- a/internal/tmmsgs/en_api_descriptions.go +++ b/internal/tmmsgs/en_api_descriptions.go @@ -58,6 +58,8 @@ var ( APIEndpointPostRootQueryOutput = ffm("api.endpoints.post.root.query.output", "The data result of a query against a smart contract") APIEndpointPostSubscriptionReset = ffm("api.endpoints.post.subscription.reset", "Reset listener - route deprecated in favor of /eventstreams/{streamId}/listeners/{listenerId}/reset") APIEndpointPostSubscriptions = ffm("api.endpoints.post.subscriptions", "Create new listener - route deprecated in favor of /eventstreams/{streamId}/listeners") + APIEndpointPostTransactionSuspend = ffm("api.endpoints.post.transactions.suspend", "Suspend processing on a pending transaction (no-op for completed transactions)") + APIEndpointPostTransactionResume = ffm("api.endpoints.post.transactions.resume", "Resume processing on a suspended transaction") APIParamStreamID = ffm("api.params.streamId", "Event Stream ID") APIParamListenerID = ffm("api.params.listenerId", "Listener ID") @@ -69,4 +71,5 @@ var ( APIParamSortDirection = ffm("api.params.sortDirection", "Sort direction: 'asc'/'ascending' or 'desc'/'descending'") APIParamSignerAddress = ffm("api.params.signerAddress", "A signing address, for example to get the gas token balance for") APIParamBlocktag = ffm("api.params.blocktag", "The optional block tag to use when making a gas token balance query") + APIParamHistory = ffm("api.params.history", "Include transaction history summary information") ) diff --git a/mocks/persistencemocks/persistence.go b/mocks/persistencemocks/persistence.go index 04acf44c..3b4ccc95 100644 --- a/mocks/persistencemocks/persistence.go +++ b/mocks/persistencemocks/persistence.go @@ -221,25 +221,25 @@ func (_m *Persistence) GetTransactionByID(ctx context.Context, txID string) (*ap return r0, r1 } -// GetTransactionByIDWithStatus provides a mock function with given fields: ctx, txID -func (_m *Persistence) GetTransactionByIDWithStatus(ctx context.Context, txID string) (*apitypes.TXWithStatus, error) { - ret := _m.Called(ctx, txID) +// GetTransactionByIDWithStatus provides a mock function with given fields: ctx, txID, history +func (_m *Persistence) GetTransactionByIDWithStatus(ctx context.Context, txID string, history bool) (*apitypes.TXWithStatus, error) { + ret := _m.Called(ctx, txID, history) var r0 *apitypes.TXWithStatus var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string) (*apitypes.TXWithStatus, error)); ok { - return rf(ctx, txID) + if rf, ok := ret.Get(0).(func(context.Context, string, bool) (*apitypes.TXWithStatus, error)); ok { + return rf(ctx, txID, history) } - if rf, ok := ret.Get(0).(func(context.Context, string) *apitypes.TXWithStatus); ok { - r0 = rf(ctx, txID) + if rf, ok := ret.Get(0).(func(context.Context, string, bool) *apitypes.TXWithStatus); ok { + r0 = rf(ctx, txID, history) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*apitypes.TXWithStatus) } } - if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { - r1 = rf(ctx, txID) + if rf, ok := ret.Get(1).(func(context.Context, string, bool) error); ok { + r1 = rf(ctx, txID, history) } else { r1 = ret.Error(1) } diff --git a/mocks/persistencemocks/transaction_persistence.go b/mocks/persistencemocks/transaction_persistence.go index 8b5bc0b2..35c0a4e6 100644 --- a/mocks/persistencemocks/transaction_persistence.go +++ b/mocks/persistencemocks/transaction_persistence.go @@ -82,25 +82,25 @@ func (_m *TransactionPersistence) GetTransactionByID(ctx context.Context, txID s return r0, r1 } -// GetTransactionByIDWithStatus provides a mock function with given fields: ctx, txID -func (_m *TransactionPersistence) GetTransactionByIDWithStatus(ctx context.Context, txID string) (*apitypes.TXWithStatus, error) { - ret := _m.Called(ctx, txID) +// GetTransactionByIDWithStatus provides a mock function with given fields: ctx, txID, history +func (_m *TransactionPersistence) GetTransactionByIDWithStatus(ctx context.Context, txID string, history bool) (*apitypes.TXWithStatus, error) { + ret := _m.Called(ctx, txID, history) var r0 *apitypes.TXWithStatus var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string) (*apitypes.TXWithStatus, error)); ok { - return rf(ctx, txID) + if rf, ok := ret.Get(0).(func(context.Context, string, bool) (*apitypes.TXWithStatus, error)); ok { + return rf(ctx, txID, history) } - if rf, ok := ret.Get(0).(func(context.Context, string) *apitypes.TXWithStatus); ok { - r0 = rf(ctx, txID) + if rf, ok := ret.Get(0).(func(context.Context, string, bool) *apitypes.TXWithStatus); ok { + r0 = rf(ctx, txID, history) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*apitypes.TXWithStatus) } } - if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { - r1 = rf(ctx, txID) + if rf, ok := ret.Get(1).(func(context.Context, string, bool) error); ok { + r1 = rf(ctx, txID, history) } else { r1 = ret.Error(1) } diff --git a/mocks/txhandlermocks/transaction_handler.go b/mocks/txhandlermocks/transaction_handler.go index 97eecf5d..0fc2bcae 100644 --- a/mocks/txhandlermocks/transaction_handler.go +++ b/mocks/txhandlermocks/transaction_handler.go @@ -97,6 +97,58 @@ func (_m *TransactionHandler) HandleNewTransaction(ctx context.Context, txReq *a return r0, r1 } +// HandleResumeTransaction provides a mock function with given fields: ctx, txID +func (_m *TransactionHandler) HandleResumeTransaction(ctx context.Context, txID string) (*apitypes.ManagedTX, error) { + ret := _m.Called(ctx, txID) + + var r0 *apitypes.ManagedTX + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*apitypes.ManagedTX, error)); ok { + return rf(ctx, txID) + } + if rf, ok := ret.Get(0).(func(context.Context, string) *apitypes.ManagedTX); ok { + r0 = rf(ctx, txID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*apitypes.ManagedTX) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, txID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// HandleSuspendTransaction provides a mock function with given fields: ctx, txID +func (_m *TransactionHandler) HandleSuspendTransaction(ctx context.Context, txID string) (*apitypes.ManagedTX, error) { + ret := _m.Called(ctx, txID) + + var r0 *apitypes.ManagedTX + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*apitypes.ManagedTX, error)); ok { + return rf(ctx, txID) + } + if rf, ok := ret.Get(0).(func(context.Context, string) *apitypes.ManagedTX); ok { + r0 = rf(ctx, txID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*apitypes.ManagedTX) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, txID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // HandleTransactionConfirmations provides a mock function with given fields: ctx, txID, notification func (_m *TransactionHandler) HandleTransactionConfirmations(ctx context.Context, txID string, notification *apitypes.ConfirmationsNotification) error { ret := _m.Called(ctx, txID, notification) diff --git a/pkg/apitypes/managed_tx.go b/pkg/apitypes/managed_tx.go index 464b5e6f..1a7ee1de 100644 --- a/pkg/apitypes/managed_tx.go +++ b/pkg/apitypes/managed_tx.go @@ -34,6 +34,8 @@ const ( TxStatusSucceeded TxStatus = "Succeeded" // TxStatusFailed happens when an error is reported by the infrastructure runtime TxStatusFailed TxStatus = "Failed" + // TxStatusSuspended indicates we are not actively doing any work with this transaction right now, until it's resumed to pending again + TxStatusSuspended TxStatus = "Suspended" ) // TxSubStatus is an intermediate status a transaction may go through @@ -97,13 +99,13 @@ const ( // when they occur multiple times. So if we are retrying the same set of actions over and over // again the list of actions does not grow. type TxHistoryActionEntry struct { - Time *fftypes.FFTime `json:"time"` - Action TxAction `json:"action"` - LastOccurrence *fftypes.FFTime `json:"lastOccurrence,omitempty"` - Count int `json:"count"` - LastError *fftypes.JSONAny `json:"lastError,omitempty"` - LastErrorTime *fftypes.FFTime `json:"lastErrorTime,omitempty"` - LastInfo *fftypes.JSONAny `json:"lastInfo,omitempty"` + Time *fftypes.FFTime `json:"time"` + Action TxAction `json:"action"` + LastOccurrence *fftypes.FFTime `json:"lastOccurrence,omitempty"` + OccurrenceCount int `json:"count"` // serialized as count for historical reasons + LastError *fftypes.JSONAny `json:"lastError,omitempty"` + LastErrorTime *fftypes.FFTime `json:"lastErrorTime,omitempty"` + LastInfo *fftypes.JSONAny `json:"lastInfo,omitempty"` } // TXHistoryRecord are the sequential persisted records, which might be state transitions, or actions within the current state. diff --git a/pkg/fftm/route_get_transaction.go b/pkg/fftm/route_get_transaction.go index b44cfa55..4ca23d32 100644 --- a/pkg/fftm/route_get_transaction.go +++ b/pkg/fftm/route_get_transaction.go @@ -18,6 +18,7 @@ package fftm import ( "net/http" + "strings" "github.com/hyperledger/firefly-common/pkg/ffapi" "github.com/hyperledger/firefly-transaction-manager/internal/tmmsgs" @@ -32,13 +33,15 @@ var getTransaction = func(m *manager) *ffapi.Route { PathParams: []*ffapi.PathParam{ {Name: "transactionId", Description: tmmsgs.APIParamTransactionID}, }, - QueryParams: nil, + QueryParams: []*ffapi.QueryParam{ + {Name: "history", Description: tmmsgs.APIParamHistory, IsBool: true}, + }, Description: tmmsgs.APIEndpointGetTransaction, JSONInputValue: nil, JSONOutputValue: func() interface{} { return &apitypes.TXWithStatus{} }, JSONOutputCodes: []int{http.StatusOK}, JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { - return m.getTransactionByID(r.Req.Context(), r.PP["transactionId"]) + return m.getTransactionByIDWithStatus(r.Req.Context(), r.PP["transactionId"], strings.EqualFold(r.QP["history"], "true")) }, } } diff --git a/pkg/fftm/route_get_transaction_test.go b/pkg/fftm/route_get_transaction_test.go index 89791b8c..a4064ffc 100644 --- a/pkg/fftm/route_get_transaction_test.go +++ b/pkg/fftm/route_get_transaction_test.go @@ -46,6 +46,27 @@ func TestGetTransaction(t *testing.T) { } +func TestGetTransactionNoStatus(t *testing.T) { + + url, m, done := newTestManager(t) + defer done() + err := m.Start() + assert.NoError(t, err) + + txIn := newTestTxn(t, m, "0xaaaaa", 10001, apitypes.TxStatusSucceeded) + + var txOut *apitypes.ManagedTX + res, err := resty.New().R(). + SetResult(&txOut). + Get(fmt.Sprintf("%s/transactions/%s?nostatus", url, txIn.ID)) + assert.NoError(t, err) + assert.Equal(t, 200, res.StatusCode()) + assert.NotNil(t, txOut.DeprecatedTransactionHeaders.From) // migration compatibility when using LevelDB + txOut.DeprecatedTransactionHeaders = nil + assert.Equal(t, *txIn, *txOut) + +} + func TestGetTransactionError(t *testing.T) { url, m, done := newTestManager(t) diff --git a/pkg/fftm/route_post_transaction_resume.go b/pkg/fftm/route_post_transaction_resume.go new file mode 100644 index 00000000..0d815e32 --- /dev/null +++ b/pkg/fftm/route_post_transaction_resume.go @@ -0,0 +1,45 @@ +// Copyright © 2023 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fftm + +import ( + "net/http" + + "github.com/hyperledger/firefly-common/pkg/ffapi" + "github.com/hyperledger/firefly-transaction-manager/internal/tmmsgs" + "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" +) + +var postTransactionResume = func(m *manager) *ffapi.Route { + return &ffapi.Route{ + Name: "postTransactionResume", + Path: "/transactions/{transactionId}/resume", + Method: http.MethodPost, + PathParams: []*ffapi.PathParam{ + {Name: "transactionId", Description: tmmsgs.APIParamTransactionID}, + }, + QueryParams: nil, + Description: tmmsgs.APIEndpointPostTransactionResume, + JSONInputValue: func() interface{} { return &struct{}{} }, + JSONOutputValue: func() interface{} { return &apitypes.ManagedTX{} }, + JSONOutputCodes: []int{http.StatusOK, http.StatusAccepted}, + JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { + r.SuccessStatus, output, err = m.requestTransactionResume(r.Req.Context(), r.PP["transactionId"]) + return output, err + }, + } +} diff --git a/pkg/fftm/route_post_transaction_resume_test.go b/pkg/fftm/route_post_transaction_resume_test.go new file mode 100644 index 00000000..40b2e148 --- /dev/null +++ b/pkg/fftm/route_post_transaction_resume_test.go @@ -0,0 +1,67 @@ +// Copyright © 2022 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fftm + +import ( + "fmt" + "testing" + + "github.com/go-resty/resty/v2" + "github.com/hyperledger/firefly-transaction-manager/mocks/txhandlermocks" + "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestPostTransactionResume(t *testing.T) { + + url, m, done := newTestManager(t) + defer done() + tx := newTestTxn(t, m, "0x0aaaaa", 10001, apitypes.TxStatusSucceeded) + txID := tx.ID + + err := m.Start() + assert.NoError(t, err) + + var txOut *apitypes.ManagedTX + res, err := resty.New().R(). + SetResult(&txOut). + SetBody(struct{}{}). + Post(fmt.Sprintf("%s/transactions/%s/resume", url, txID)) + assert.NoError(t, err) + assert.Equal(t, 202, res.StatusCode()) + assert.Equal(t, txID, txOut.ID) +} + +func TestPostTransactionResumeFailed(t *testing.T) { + url, m, done := newTestManager(t) + defer done() + + err := m.Start() + assert.NoError(t, err) + mth := txhandlermocks.TransactionHandler{} + mth.On("HandleResumeTransaction", mock.Anything, "1234").Return(nil, fmt.Errorf("error")).Once() + m.txHandler = &mth + + var txOut *apitypes.ManagedTX + res, err := resty.New().R(). + SetResult(&txOut). + SetBody(struct{}{}). + Post(fmt.Sprintf("%s/transactions/%s/resume", url, "1234")) + assert.NoError(t, err) + assert.Equal(t, 500, res.StatusCode()) +} diff --git a/pkg/fftm/route_post_transaction_suspend.go b/pkg/fftm/route_post_transaction_suspend.go new file mode 100644 index 00000000..ad9de3dc --- /dev/null +++ b/pkg/fftm/route_post_transaction_suspend.go @@ -0,0 +1,45 @@ +// Copyright © 2023 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fftm + +import ( + "net/http" + + "github.com/hyperledger/firefly-common/pkg/ffapi" + "github.com/hyperledger/firefly-transaction-manager/internal/tmmsgs" + "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" +) + +var postTransactionSuspend = func(m *manager) *ffapi.Route { + return &ffapi.Route{ + Name: "postTransactionSuspend", + Path: "/transactions/{transactionId}/suspend", + Method: http.MethodPost, + PathParams: []*ffapi.PathParam{ + {Name: "transactionId", Description: tmmsgs.APIParamTransactionID}, + }, + QueryParams: nil, + Description: tmmsgs.APIEndpointPostTransactionSuspend, + JSONInputValue: func() interface{} { return &struct{}{} }, + JSONOutputValue: func() interface{} { return &apitypes.ManagedTX{} }, + JSONOutputCodes: []int{http.StatusOK, http.StatusAccepted}, + JSONHandler: func(r *ffapi.APIRequest) (output interface{}, err error) { + r.SuccessStatus, output, err = m.requestTransactionSuspend(r.Req.Context(), r.PP["transactionId"]) + return output, err + }, + } +} diff --git a/pkg/fftm/route_post_transaction_suspend_test.go b/pkg/fftm/route_post_transaction_suspend_test.go new file mode 100644 index 00000000..8cd121d2 --- /dev/null +++ b/pkg/fftm/route_post_transaction_suspend_test.go @@ -0,0 +1,67 @@ +// Copyright © 2022 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fftm + +import ( + "fmt" + "testing" + + "github.com/go-resty/resty/v2" + "github.com/hyperledger/firefly-transaction-manager/mocks/txhandlermocks" + "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestPostTransactionSuspend(t *testing.T) { + + url, m, done := newTestManager(t) + defer done() + tx := newTestTxn(t, m, "0x0aaaaa", 10001, apitypes.TxStatusSucceeded) + txID := tx.ID + + err := m.Start() + assert.NoError(t, err) + + var txOut *apitypes.ManagedTX + res, err := resty.New().R(). + SetResult(&txOut). + SetBody(struct{}{}). + Post(fmt.Sprintf("%s/transactions/%s/suspend", url, txID)) + assert.NoError(t, err) + assert.Equal(t, 202, res.StatusCode()) + assert.Equal(t, txID, txOut.ID) +} + +func TestPostTransactionSuspendFailed(t *testing.T) { + url, m, done := newTestManager(t) + defer done() + + err := m.Start() + assert.NoError(t, err) + mth := txhandlermocks.TransactionHandler{} + mth.On("HandleSuspendTransaction", mock.Anything, "1234").Return(nil, fmt.Errorf("error")).Once() + m.txHandler = &mth + + var txOut *apitypes.ManagedTX + res, err := resty.New().R(). + SetResult(&txOut). + SetBody(struct{}{}). + Post(fmt.Sprintf("%s/transactions/%s/suspend", url, "1234")) + assert.NoError(t, err) + assert.Equal(t, 500, res.StatusCode()) +} diff --git a/pkg/fftm/routes.go b/pkg/fftm/routes.go index c98634d0..d6472040 100644 --- a/pkg/fftm/routes.go +++ b/pkg/fftm/routes.go @@ -51,5 +51,7 @@ func (m *manager) routes() []*ffapi.Route { postSubscriptions(m), getAddressBalance(m), getGasPrice(m), + postTransactionSuspend(m), + postTransactionResume(m), } } diff --git a/pkg/fftm/transaction_management.go b/pkg/fftm/transaction_management.go index 665fadd8..4765e7f6 100644 --- a/pkg/fftm/transaction_management.go +++ b/pkg/fftm/transaction_management.go @@ -28,8 +28,8 @@ import ( "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" ) -func (m *manager) getTransactionByID(ctx context.Context, txID string) (transaction *apitypes.TXWithStatus, err error) { - tx, err := m.persistence.GetTransactionByIDWithStatus(ctx, txID) +func (m *manager) getTransactionByIDWithStatus(ctx context.Context, txID string, withHistory bool) (transaction *apitypes.TXWithStatus, err error) { + tx, err := m.persistence.GetTransactionByIDWithStatus(ctx, txID, withHistory) if err != nil { return nil, err } @@ -96,3 +96,27 @@ func (m *manager) requestTransactionDeletion(ctx context.Context, txID string) ( return http.StatusAccepted, canceledTx, nil } + +func (m *manager) requestTransactionSuspend(ctx context.Context, txID string) (status int, transaction *apitypes.ManagedTX, err error) { + + canceledTx, err := m.txHandler.HandleSuspendTransaction(ctx, txID) + + if err != nil { + return http.StatusInternalServerError, nil, err + } + + return http.StatusAccepted, canceledTx, nil + +} + +func (m *manager) requestTransactionResume(ctx context.Context, txID string) (status int, transaction *apitypes.ManagedTX, err error) { + + canceledTx, err := m.txHandler.HandleResumeTransaction(ctx, txID) + + if err != nil { + return http.StatusInternalServerError, nil, err + } + + return http.StatusAccepted, canceledTx, nil + +} diff --git a/pkg/fftm/transaction_management_test.go b/pkg/fftm/transaction_management_test.go index d0c455d3..44cddac3 100644 --- a/pkg/fftm/transaction_management_test.go +++ b/pkg/fftm/transaction_management_test.go @@ -33,14 +33,14 @@ func TestGetTransactionErrors(t *testing.T) { defer close() mp := m.persistence.(*persistencemocks.Persistence) - mp.On("GetTransactionByIDWithStatus", m.ctx, mock.Anything).Return(nil, fmt.Errorf("pop")).Once() - mp.On("GetTransactionByIDWithStatus", m.ctx, mock.Anything).Return(nil, nil).Once() + mp.On("GetTransactionByIDWithStatus", m.ctx, mock.Anything, true).Return(nil, fmt.Errorf("pop")).Once() + mp.On("GetTransactionByIDWithStatus", m.ctx, mock.Anything, false).Return(nil, nil).Once() mp.On("Close", mock.Anything).Return(nil).Maybe() - _, err := m.getTransactionByID(m.ctx, "id") + _, err := m.getTransactionByIDWithStatus(m.ctx, "id", true) assert.Regexp(t, "pop", err) - _, err = m.getTransactionByID(m.ctx, "id") + _, err = m.getTransactionByIDWithStatus(m.ctx, "id", false) assert.Regexp(t, "FF21067", err) mp.AssertExpectations(t) diff --git a/pkg/txhandler/simple/policyloop.go b/pkg/txhandler/simple/policyloop.go index ed697344..7ee9a632 100644 --- a/pkg/txhandler/simple/policyloop.go +++ b/pkg/txhandler/simple/policyloop.go @@ -39,7 +39,10 @@ const metricsGaugeTransactionsInflightFreeDescription = "Number of transactions type policyEngineAPIRequestType int const ( - policyEngineAPIRequestTypeDelete policyEngineAPIRequestType = iota + ActionNone policyEngineAPIRequestType = iota + ActionDelete + ActionSuspend + ActionResume ) type policyEngineAPIRequest struct { @@ -170,7 +173,7 @@ func (sth *simpleTransactionHandler) policyLoopCycle(ctx context.Context, inflig // Go through executing the policy engine against them for _, pending := range sth.inflight { - err := sth.execPolicy(ctx, pending, false) + err := sth.execPolicy(ctx, pending, nil) if err != nil { log.L(ctx).Errorf("Failed policy cycle transaction=%s operation=%s: %s", pending.mtx.TransactionHash, pending.mtx.ID, err) } @@ -220,12 +223,12 @@ func (sth *simpleTransactionHandler) processPolicyAPIRequests(ctx context.Contex } switch request.requestType { - case policyEngineAPIRequestTypeDelete: - if err := sth.execPolicy(ctx, pending, true); err != nil { + case ActionDelete, ActionSuspend, ActionResume: + if err := sth.execPolicy(ctx, pending, &request.requestType); err != nil { request.response <- policyEngineAPIResponse{err: err} } else { res := policyEngineAPIResponse{tx: pending.mtx, status: http.StatusAccepted} - if pending.remove { + if pending.remove || request.requestType == ActionResume /* always sync */ { res.status = http.StatusOK // synchronously completed } request.response <- res @@ -239,7 +242,7 @@ func (sth *simpleTransactionHandler) processPolicyAPIRequests(ctx context.Contex } -func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context, pending *pendingState, syncDeleteRequest bool) (ctx *RunContext, err error) { +func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context, pending *pendingState, syncRequest *policyEngineAPIRequestType) (ctx *RunContext, err error) { // Take a snapshot of the pending state under the lock sth.mux.Lock() @@ -254,7 +257,11 @@ func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context } confirmNotify := pending.confirmNotify receiptNotify := pending.receiptNotify - if syncDeleteRequest && mtx.DeleteRequested == nil { + if syncRequest != nil { + ctx.SyncAction = *syncRequest + } + + if ctx.SyncAction == ActionDelete && mtx.DeleteRequested == nil { mtx.DeleteRequested = fftypes.Now() ctx.UpdateType = Update // might change to delete later ctx.TXUpdates.DeleteRequested = mtx.DeleteRequested @@ -299,9 +306,9 @@ func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context return ctx, nil } -func (sth *simpleTransactionHandler) execPolicy(baseCtx context.Context, pending *pendingState, syncDeleteRequest bool) (err error) { +func (sth *simpleTransactionHandler) execPolicy(baseCtx context.Context, pending *pendingState, syncRequest *policyEngineAPIRequestType) (err error) { - ctx, err := sth.pendingToRunContext(baseCtx, pending, syncDeleteRequest) + ctx, err := sth.pendingToRunContext(baseCtx, pending, syncRequest) if err != nil { return nil } @@ -309,7 +316,7 @@ func (sth *simpleTransactionHandler) execPolicy(baseCtx context.Context, pending completed := false switch { - case ctx.Confirmed && !syncDeleteRequest: + case ctx.Confirmed && ctx.SyncAction != ActionDelete: completed = true ctx.UpdateType = Update if ctx.Receipt != nil && ctx.Receipt.Success { @@ -319,13 +326,27 @@ func (sth *simpleTransactionHandler) execPolicy(baseCtx context.Context, pending mtx.Status = apitypes.TxStatusFailed ctx.TXUpdates.Status = &mtx.Status } - + case ctx.SyncAction == ActionSuspend: + // Whole cycle is a no-op if we're not pending + if mtx.Status == apitypes.TxStatusPending { + ctx.UpdateType = Update + completed = true // drop it out of the loop + mtx.Status = apitypes.TxStatusSuspended + ctx.TXUpdates.Status = &mtx.Status + } + case ctx.SyncAction == ActionResume: + // Whole cycle is a no-op if we're not suspended + if mtx.Status == apitypes.TxStatusSuspended { + ctx.UpdateType = Update + mtx.Status = apitypes.TxStatusPending + ctx.TXUpdates.Status = &mtx.Status + } default: // We get woken for lots of reasons to go through the policy loop, but we only want // to drive the policy engine at regular intervals. // So we track the last time we ran the policy engine against each pending item. // We always call the policy engine on every loop, when deletion has been requested. - if syncDeleteRequest || time.Since(pending.lastPolicyCycle) > sth.policyLoopInterval { + if ctx.SyncAction == ActionDelete || time.Since(pending.lastPolicyCycle) > sth.policyLoopInterval { // Pass the state to the pluggable policy engine to potentially perform more actions against it, // such as submitting for the first time, or raising the gas etc. @@ -396,13 +417,17 @@ func (sth *simpleTransactionHandler) flushChanges(ctx *RunContext, pending *pend log.L(ctx).Errorf("Failed to update transaction %s (status=%s): %s", mtx.ID, mtx.Status, err) return err } - if completed { + if ctx.SyncAction == ActionResume { + log.L(ctx).Infof("Transaction %s resumed", mtx.ID) + sth.markInflightStale() // this won't be in the in-flight set, so we need to pull it in if there's space + } else if completed { pending.remove = true // for the next time round the loop - log.L(ctx).Infof("Transaction %s marked complete (status=%s): %s", mtx.ID, mtx.Status, err) + log.L(ctx).Infof("Transaction %s removed from tracking (status=%s): %s", mtx.ID, mtx.Status, err) sth.markInflightStale() // if and only if the transaction is now resolved dispatch an event to event handler - // and discard any handling errors + // and discard any handling errors. + // Note that TxStatusSuspended has no action here. if mtx.Status == apitypes.TxStatusSucceeded { _ = sth.toolkit.EventHandler.HandleEvent(ctx, apitypes.ManagedTransactionEvent{ Type: apitypes.ManagedTXProcessSucceeded, diff --git a/pkg/txhandler/simple/policyloop_test.go b/pkg/txhandler/simple/policyloop_test.go index 12d3c5a1..5ad5fcd4 100644 --- a/pkg/txhandler/simple/policyloop_test.go +++ b/pkg/txhandler/simple/policyloop_test.go @@ -795,7 +795,7 @@ func TestExecPolicyGetTxFail(t *testing.T) { mp.On("GetTransactionByID", sth.ctx, tx.ID).Return(nil, fmt.Errorf("pop")) req := &policyEngineAPIRequest{ - requestType: policyEngineAPIRequestTypeDelete, + requestType: ActionDelete, txID: tx.ID, response: make(chan policyEngineAPIResponse, 1), } @@ -830,7 +830,7 @@ func TestExecPolicyDeleteFail(t *testing.T) { mp.On("DeleteTransaction", mock.Anything, tx.ID).Return(fmt.Errorf("pop")) req := &policyEngineAPIRequest{ - requestType: policyEngineAPIRequestTypeDelete, + requestType: ActionDelete, txID: tx.ID, response: make(chan policyEngineAPIResponse, 1), } @@ -875,7 +875,7 @@ func TestExecPolicyDeleteInflightSync(t *testing.T) { mp.On("DeleteTransaction", mock.Anything, tx.ID).Return(nil) req := &policyEngineAPIRequest{ - requestType: policyEngineAPIRequestTypeDelete, + requestType: ActionDelete, txID: tx.ID, response: make(chan policyEngineAPIResponse, 1), } @@ -892,6 +892,229 @@ func TestExecPolicyDeleteInflightSync(t *testing.T) { } +func TestExecPolicySuspendInflightSync(t *testing.T) { + f, tk, _, conf := newTestTransactionHandlerFactory(t) + conf.Set(FixedGasPrice, `12345`) + + th, err := f.NewTransactionHandler(context.Background(), conf) + assert.NoError(t, err) + + sth := th.(*simpleTransactionHandler) + sth.ctx = context.Background() + sth.Init(sth.ctx, tk) + eh := &fftm.ManagedTransactionEventHandler{ + Ctx: context.Background(), + TxHandler: sth, + } + mc := &confirmationsmocks.Manager{} + mc.On("Notify", mock.Anything).Return(nil) + eh.ConfirmationManager = mc + mws := &wsmocks.WebSocketServer{} + mws.On("SendReply", mock.Anything).Return(nil).Maybe() + + eh.WsServer = mws + sth.toolkit.EventHandler = eh + mp := sth.toolkit.TXPersistence.(*persistencemocks.Persistence) + mp.On("InsertTransactionWithNextNonce", sth.ctx, mock.Anything, mock.Anything).Return(nil, nil).Once() + mp.On("AddSubStatusAction", sth.ctx, mock.Anything, apitypes.TxSubStatusReceived, apitypes.TxActionAssignNonce, mock.Anything, mock.Anything).Return(nil) + tx := sendSampleTX(t, sth, "0xaaaaa", 12345, "") + sth.inflight = []*pendingState{{mtx: tx}} + mp.On("UpdateTransaction", mock.AnythingOfType("*simple.RunContext"), tx.ID, mock.MatchedBy(func(updates *apitypes.TXUpdates) bool { + return updates.Status != nil && *updates.Status == apitypes.TxStatusSuspended + })).Return(nil) + + req := &policyEngineAPIRequest{ + requestType: ActionSuspend, + txID: tx.ID, + response: make(chan policyEngineAPIResponse, 1), + } + sth.policyEngineAPIRequests = append(sth.policyEngineAPIRequests, req) + + sth.processPolicyAPIRequests(sth.ctx) + + res := <-req.response + assert.NoError(t, res.err) + assert.Equal(t, http.StatusOK, res.status) + assert.True(t, sth.inflight[0].remove) + + mp.AssertExpectations(t) + +} + +func TestExecPolicyResumeSync(t *testing.T) { + f, tk, _, conf := newTestTransactionHandlerFactory(t) + conf.Set(FixedGasPrice, `12345`) + + th, err := f.NewTransactionHandler(context.Background(), conf) + assert.NoError(t, err) + + sth := th.(*simpleTransactionHandler) + sth.ctx = context.Background() + sth.Init(sth.ctx, tk) + eh := &fftm.ManagedTransactionEventHandler{ + Ctx: context.Background(), + TxHandler: sth, + } + mc := &confirmationsmocks.Manager{} + mc.On("Notify", mock.Anything).Return(nil) + eh.ConfirmationManager = mc + mws := &wsmocks.WebSocketServer{} + mws.On("SendReply", mock.Anything).Return(nil).Maybe() + + eh.WsServer = mws + sth.toolkit.EventHandler = eh + mp := sth.toolkit.TXPersistence.(*persistencemocks.Persistence) + mp.On("InsertTransactionWithNextNonce", sth.ctx, mock.Anything, mock.Anything).Return(nil, nil).Once() + mp.On("AddSubStatusAction", sth.ctx, mock.Anything, apitypes.TxSubStatusReceived, apitypes.TxActionAssignNonce, mock.Anything, mock.Anything).Return(nil) + tx := sendSampleTX(t, sth, "0xaaaaa", 12345, "") + tx.Status = apitypes.TxStatusSuspended + mp.On("UpdateTransaction", mock.AnythingOfType("*simple.RunContext"), tx.ID, mock.MatchedBy(func(updates *apitypes.TXUpdates) bool { + tx.Status = apitypes.TxStatusPending + return updates.Status != nil && *updates.Status == apitypes.TxStatusPending + })).Return(nil) + mp.On("GetTransactionByID", mock.Anything, tx.ID).Return(tx, nil) + + req := &policyEngineAPIRequest{ + requestType: ActionResume, + txID: tx.ID, + response: make(chan policyEngineAPIResponse, 1), + } + sth.policyEngineAPIRequests = append(sth.policyEngineAPIRequests, req) + + sth.processPolicyAPIRequests(sth.ctx) + + res := <-req.response + assert.NoError(t, res.err) + assert.Equal(t, http.StatusOK, res.status) + + mp.AssertExpectations(t) + +} + +func TestExecHandleResumeQueuesResumeOk(t *testing.T) { + + f, tk, _, conf := newTestTransactionHandlerFactory(t) + conf.Set(FixedGasPrice, `12345`) + + th, err := f.NewTransactionHandler(context.Background(), conf) + assert.NoError(t, err) + + sth := th.(*simpleTransactionHandler) + sth.ctx = context.Background() + sth.Init(sth.ctx, tk) + + result := make(chan error) + go func() { + _, err := sth.HandleResumeTransaction(sth.ctx, "tx1") + result <- err + }() + + for len(sth.policyEngineAPIRequests) == 0 { + time.Sleep(1 * time.Millisecond) + } + req := sth.policyEngineAPIRequests[0] + assert.Equal(t, ActionResume, req.requestType) + req.response <- policyEngineAPIResponse{} + + err = <-result + assert.NoError(t, err) + +} + +func TestExecHandleResumeQueuesSuspendOk(t *testing.T) { + + f, tk, _, conf := newTestTransactionHandlerFactory(t) + conf.Set(FixedGasPrice, `12345`) + + th, err := f.NewTransactionHandler(context.Background(), conf) + assert.NoError(t, err) + + sth := th.(*simpleTransactionHandler) + sth.ctx = context.Background() + sth.Init(sth.ctx, tk) + + result := make(chan error) + go func() { + _, err := sth.HandleSuspendTransaction(sth.ctx, "tx1") + result <- err + }() + + for len(sth.policyEngineAPIRequests) == 0 { + time.Sleep(1 * time.Millisecond) + } + req := sth.policyEngineAPIRequests[0] + assert.Equal(t, ActionSuspend, req.requestType) + req.response <- policyEngineAPIResponse{} + + err = <-result + assert.NoError(t, err) + +} + +func TestExecHandleResumeQueuesResumeErr(t *testing.T) { + + f, tk, _, conf := newTestTransactionHandlerFactory(t) + conf.Set(FixedGasPrice, `12345`) + + th, err := f.NewTransactionHandler(context.Background(), conf) + assert.NoError(t, err) + + sth := th.(*simpleTransactionHandler) + sth.ctx = context.Background() + sth.Init(sth.ctx, tk) + + result := make(chan error) + go func() { + _, err := sth.HandleResumeTransaction(sth.ctx, "tx1") + result <- err + }() + + for len(sth.policyEngineAPIRequests) == 0 { + time.Sleep(1 * time.Millisecond) + } + req := sth.policyEngineAPIRequests[0] + assert.Equal(t, ActionResume, req.requestType) + req.response <- policyEngineAPIResponse{ + err: fmt.Errorf("pop"), + } + + err = <-result + assert.Regexp(t, "pop", err) + +} + +func TestExecHandleResumeQueuesSuspendErr(t *testing.T) { + + f, tk, _, conf := newTestTransactionHandlerFactory(t) + conf.Set(FixedGasPrice, `12345`) + + th, err := f.NewTransactionHandler(context.Background(), conf) + assert.NoError(t, err) + + sth := th.(*simpleTransactionHandler) + sth.ctx = context.Background() + sth.Init(sth.ctx, tk) + + result := make(chan error) + go func() { + _, err := sth.HandleSuspendTransaction(sth.ctx, "tx1") + result <- err + }() + + for len(sth.policyEngineAPIRequests) == 0 { + time.Sleep(1 * time.Millisecond) + } + req := sth.policyEngineAPIRequests[0] + assert.Equal(t, ActionSuspend, req.requestType) + req.response <- policyEngineAPIResponse{ + err: fmt.Errorf("pop"), + } + + err = <-result + assert.Regexp(t, "pop", err) + +} + func TestExecPolicyIdempotentCancellation(t *testing.T) { f, tk, _, conf := newTestTransactionHandlerFactory(t) conf.Set(FixedGasPrice, `12345`) @@ -983,7 +1206,7 @@ func TestPendingTransactionGetsRemoved(t *testing.T) { // add a delete request req := &policyEngineAPIRequest{ - requestType: policyEngineAPIRequestTypeDelete, + requestType: ActionDelete, txID: testTxID.String(), response: make(chan policyEngineAPIResponse, 1), } @@ -1018,7 +1241,7 @@ func TestExecPolicyDeleteNotFound(t *testing.T) { mp.On("GetTransactionByID", sth.ctx, "bad-id").Return(nil, nil) req := &policyEngineAPIRequest{ - requestType: policyEngineAPIRequestTypeDelete, + requestType: ActionDelete, txID: "bad-id", response: make(chan policyEngineAPIResponse, 1), } @@ -1104,7 +1327,7 @@ func TestExecPolicyUpdateNewInfo(t *testing.T) { FirstSubmit: fftypes.Now(), }, receipt: &ffcapi.TransactionReceiptResponse{}, - }, false) + }, nil) assert.NoError(t, err) } diff --git a/pkg/txhandler/simple/simple_transaction_handler.go b/pkg/txhandler/simple/simple_transaction_handler.go index bcacc7ee..6aef804e 100644 --- a/pkg/txhandler/simple/simple_transaction_handler.go +++ b/pkg/txhandler/simple/simple_transaction_handler.go @@ -66,6 +66,7 @@ type RunContext struct { Receipt *ffcapi.TransactionReceiptResponse Confirmations *apitypes.ConfirmationsNotification Confirmed bool + SyncAction policyEngineAPIRequestType // Input/output SubStatus apitypes.TxSubStatus Info *simplePolicyInfo // must be updated in-place and set UpdatedInfo to true as well as UpdateType = Update @@ -278,10 +279,26 @@ func (sth *simpleTransactionHandler) HandleNewContractDeployment(ctx context.Con func (sth *simpleTransactionHandler) HandleCancelTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) { res := sth.policyEngineAPIRequest(ctx, &policyEngineAPIRequest{ - requestType: policyEngineAPIRequestTypeDelete, + requestType: ActionDelete, txID: txID, }) - return res.tx, nil + return res.tx, res.err +} + +func (sth *simpleTransactionHandler) HandleSuspendTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) { + res := sth.policyEngineAPIRequest(ctx, &policyEngineAPIRequest{ + requestType: ActionSuspend, + txID: txID, + }) + return res.tx, res.err +} + +func (sth *simpleTransactionHandler) HandleResumeTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) { + res := sth.policyEngineAPIRequest(ctx, &policyEngineAPIRequest{ + requestType: ActionResume, + txID: txID, + }) + return res.tx, res.err } func (sth *simpleTransactionHandler) createManagedTx(ctx context.Context, txID string, txHeaders *ffcapi.TransactionHeaders, gas *fftypes.FFBigInt, transactionData string) (*apitypes.ManagedTX, error) { diff --git a/pkg/txhandler/txhandler.go b/pkg/txhandler/txhandler.go index 87d422d3..bba6fb9f 100644 --- a/pkg/txhandler/txhandler.go +++ b/pkg/txhandler/txhandler.go @@ -107,6 +107,10 @@ type TransactionHandler interface { HandleNewContractDeployment(ctx context.Context, txReq *apitypes.ContractDeployRequest) (mtx *apitypes.ManagedTX, err error) // HandleCancelTransaction - handles event of cancelling a managed transaction HandleCancelTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) + // HandleSuspendTransaction - handles event of suspending a managed transaction + HandleSuspendTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) + // HandleResumeTransaction - handles event of resuming a suspended managed transaction + HandleResumeTransaction(ctx context.Context, txID string) (mtx *apitypes.ManagedTX, err error) // Informational events: // HandleTransactionConfirmations - handles confirmations of blockchain transactions for a managed transaction