Skip to content

Commit

Permalink
Merge pull request #108 from hyperledger/sql-logging
Browse files Browse the repository at this point in the history
Better logging for transaction writer, and TXUpdate de-dup
  • Loading branch information
nguyer authored Feb 13, 2024
2 parents 042cf0e + 58daa9d commit a085feb
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 10 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.1
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/hyperledger/firefly-common v1.4.2
github.com/hyperledger/firefly-common v1.4.6-0.20240131185020-80d20a173401
github.com/lib/pq v1.10.9
github.com/oklog/ulid/v2 v2.1.0
github.com/prometheus/client_golang v1.18.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/huandu/xstrings v1.3.3/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU=
github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/hyperledger/firefly-common v1.4.2 h1:sBbiTFWDu1qCnXFA6JobasJl4AXphCAUZU/R4nyWPdE=
github.com/hyperledger/firefly-common v1.4.2/go.mod h1:jkErZdQmC9fsAJZQO427tURdwB9iiW+NMUZSqS3eBIE=
github.com/hyperledger/firefly-common v1.4.6-0.20240131185020-80d20a173401 h1:bcIg8zUalHyjxPmhIggwg/VK/IVDvpH2XJwCkfAYrSU=
github.com/hyperledger/firefly-common v1.4.6-0.20240131185020-80d20a173401/go.mod h1:jkErZdQmC9fsAJZQO427tURdwB9iiW+NMUZSqS3eBIE=
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
Expand Down
7 changes: 5 additions & 2 deletions internal/persistence/postgres/eventstreams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,11 @@ func TestEventStreamAfterPaginatePSQL(t *testing.T) {
var eventStreams []*apitypes.EventStream
for i := 0; i < 20; i++ {
es := &apitypes.EventStream{
ID: fftypes.NewUUID(),
Name: strPtr(fmt.Sprintf("es_%.3d", i)),
ID: fftypes.NewUUID(),
Name: strPtr(fmt.Sprintf("es_%.3d", i)),
BatchTimeout: ffDurationPtr(22222 * time.Second),
RetryTimeout: ffDurationPtr(33333 * time.Second),
BlockedRetryDelay: ffDurationPtr(44444 * time.Second),
}
err := p.WriteStream(ctx, es)
assert.NoError(t, err)
Expand Down
26 changes: 22 additions & 4 deletions internal/persistence/postgres/transaction_writer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -40,6 +40,7 @@ type transactionOperation struct {
sentConflict bool
done chan error

opID string
isShutdown bool
txInsert *apitypes.ManagedTX
noncePreAssigned bool
Expand Down Expand Up @@ -122,6 +123,7 @@ func newTransactionWriter(bgCtx context.Context, p *sqlPersistence, conf config.

func newTransactionOperation(txID string) *transactionOperation {
return &transactionOperation{
opID: fftypes.ShortID(),
txID: txID,
done: make(chan error, 1), // 1 slot to ensure we don't block the writer
}
Expand All @@ -130,6 +132,7 @@ func newTransactionOperation(txID string) *transactionOperation {
func (op *transactionOperation) flush(ctx context.Context) error {
select {
case err := <-op.done:
log.L(ctx).Debugf("Flushed write operation %s (err=%v)", op.opID, err)
return err
case <-ctx.Done():
return i18n.NewError(ctx, i18n.MsgContextCanceled)
Expand Down Expand Up @@ -165,6 +168,7 @@ func (tw *transactionWriter) queue(ctx context.Context, op *transactionOperation
h := fnv.New32a() // simple non-cryptographic hash algo
_, _ = h.Write([]byte(hashKey))
routine := h.Sum32() % tw.workerCount
log.L(ctx).Debugf("Queuing write operation %s to worker tx_writer_%.4d", op.opID, routine)
select {
case tw.workQueues[routine] <- op: // it's queued
case <-ctx.Done(): // timeout of caller context
Expand All @@ -180,6 +184,7 @@ func (tw *transactionWriter) worker(i int) {
defer close(tw.workersDone[i])
workerID := fmt.Sprintf("tx_writer_%.4d", i)
ctx := log.WithLogField(tw.bgCtx, "job", workerID)
l := log.L(ctx)
var batch *transactionWriterBatch
batchCount := 0
workQueue := tw.workQueues[i]
Expand Down Expand Up @@ -208,18 +213,20 @@ func (tw *transactionWriter) worker(i int) {
batchCount++
}
batch.ops = append(batch.ops, op)
l.Debugf("Added write operation %s to batch %s (len=%d)", op.opID, batch.id, len(batch.ops))
case <-timeoutContext.Done():
timedOut = true
select {
case <-ctx.Done():
log.L(ctx).Debugf("Transaction writer ending")
l.Debugf("Transaction writer ending")
return
default:
}
}

if batch != nil && (timedOut || (len(batch.ops) >= tw.batchMaxSize)) {
batch.timeoutCancel()
l.Debugf("Running batch %s (len=%d)", batch.id, len(batch.ops))
tw.runBatch(ctx, batch)
batch = nil
}
Expand Down Expand Up @@ -383,6 +390,7 @@ func (tw *transactionWriter) preInsertIdempotencyCheck(ctx context.Context, b *t
txOp.sentConflict = true
txOp.done <- i18n.NewError(ctx, tmmsgs.MsgDuplicateID, txOp.txID)
} else {
log.L(ctx).Debugf("Adding TX %s from write operation %s to insert idx=%d", txOp.txID, txOp.opID, len(validInserts))
validInserts = append(validInserts, txOp.txInsert)
}
}
Expand Down Expand Up @@ -413,9 +421,19 @@ func (tw *transactionWriter) executeBatchOps(ctx context.Context, b *transaction
}
}
// Do all the transaction updates
mergedUpdates := make(map[string]*apitypes.TXUpdates)
for _, op := range b.txUpdates {
if err := tw.p.updateTransaction(ctx, op.txID, op.txUpdate); err != nil {
log.L(ctx).Errorf("Update transaction %s failed: %s", op.txID, err)
update, merge := mergedUpdates[op.txID]
if merge {
update.Merge(op.txUpdate)
} else {
mergedUpdates[op.txID] = op.txUpdate
}
log.L(ctx).Debugf("Updating transaction %s in write operation %s (merged=%t)", op.txID, op.opID, merge)
}
for txID, update := range mergedUpdates {
if err := tw.p.updateTransaction(ctx, txID, update); err != nil {
log.L(ctx).Errorf("Update transaction %s failed: %s", txID, err)
return err
}
}
Expand Down
73 changes: 73 additions & 0 deletions internal/persistence/postgres/transaction_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-transaction-manager/pkg/apitypes"
"github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -275,6 +276,48 @@ func TestExecuteBatchOpsUpdateTXFail(t *testing.T) {
assert.NoError(t, mdb.ExpectationsWereMet())
}

func TestExecuteBatchOpsUpdateTXMerge(t *testing.T) {
logrus.SetLevel(logrus.TraceLevel)

ctx, p, mdb, done := newMockSQLPersistence(t)
defer done()

mdb.ExpectBegin()
mdb.ExpectExec("UPDATE.*").WillReturnResult(sqlmock.NewResult(-1, 1))
mdb.ExpectExec("UPDATE.*").WillReturnResult(sqlmock.NewResult(-1, 1))
mdb.ExpectCommit()

err := p.db.RunAsGroup(ctx, func(ctx context.Context) error {
return p.writer.executeBatchOps(ctx, &transactionWriterBatch{
txUpdates: []*transactionOperation{
{
txID: "11111",
txUpdate: &apitypes.TXUpdates{
Status: ptrTo(apitypes.TxStatusPending),
From: strPtr("0xaaaaa"),
},
},
{
txID: "22222",
txUpdate: &apitypes.TXUpdates{
Status: ptrTo(apitypes.TxStatusPending),
},
},
{
txID: "11111",
txUpdate: &apitypes.TXUpdates{
Status: ptrTo(apitypes.TxStatusSucceeded),
TransactionHash: strPtr("0xaabbcc"),
},
},
},
})
})
assert.NoError(t, err)

assert.NoError(t, mdb.ExpectationsWereMet())
}

func TestExecuteBatchOpsUpsertReceiptFail(t *testing.T) {
ctx, p, mdb, done := newMockSQLPersistence(t)
defer done()
Expand Down Expand Up @@ -455,3 +498,33 @@ func TestQueueClosedContext(t *testing.T) {
p.writer.queue(closedCtx, newTransactionOperation("tx1"))

}

func TestStopDoneWorker(t *testing.T) {
tw := &transactionWriter{
workersDone: []chan struct{}{
make(chan struct{}),
},
}
tw.bgCtx, tw.cancelCtx = context.WithCancel(context.Background())
close(tw.workersDone[0])
tw.stop()
}

func TestStopDoneCtx(t *testing.T) {
tw := &transactionWriter{
workersDone: []chan struct{}{
make(chan struct{}, 1),
},
}
tw.bgCtx, tw.cancelCtx = context.WithCancel(context.Background())
tw.cancelCtx()
go func() {
time.Sleep(10 * time.Millisecond)
tw.workersDone[0] <- struct{}{}
}()
tw.stop()
}

func ptrTo[T any](v T) *T {
return &v
}
47 changes: 46 additions & 1 deletion pkg/apitypes/managed_tx.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -208,6 +208,51 @@ type TXUpdates struct {
ErrorMessage *string `json:"errorMessage,omitempty"`
}

func (txu *TXUpdates) Merge(txu2 *TXUpdates) {
if txu2.Status != nil {
txu.Status = txu2.Status
}
if txu2.DeleteRequested != nil {
txu.DeleteRequested = txu2.DeleteRequested
}
if txu2.From != nil {
txu.From = txu2.From
}
if txu2.To != nil {
txu.To = txu2.To
}
if txu2.Nonce != nil {
txu.Nonce = txu2.Nonce
}
if txu2.Gas != nil {
txu.Gas = txu2.Gas
}
if txu2.Value != nil {
txu.Value = txu2.Value
}
if txu2.GasPrice != nil {
txu.GasPrice = txu2.GasPrice
}
if txu2.TransactionData != nil {
txu.TransactionData = txu2.TransactionData
}
if txu2.TransactionHash != nil {
txu.TransactionHash = txu2.TransactionHash
}
if txu2.PolicyInfo != nil {
txu.PolicyInfo = txu2.PolicyInfo
}
if txu2.FirstSubmit != nil {
txu.FirstSubmit = txu2.FirstSubmit
}
if txu2.LastSubmit != nil {
txu.LastSubmit = txu2.LastSubmit
}
if txu2.ErrorMessage != nil {
txu.ErrorMessage = txu2.ErrorMessage
}
}

// TXWithStatus is a convenience object that fetches all data about a transaction into one
// large JSON payload (with limits on certain parts, such as the history entries).
// Note that in LevelDB persistence this is the stored form of the single document object.
Expand Down
28 changes: 28 additions & 0 deletions pkg/apitypes/managed_tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,31 @@ func TestReceiptRecord(t *testing.T) {
r.SetUpdated(t2)
assert.Equal(t, t2, r.Updated)
}

func TestTXUpdatesMerge(t *testing.T) {
txu := &TXUpdates{}
txu2 := &TXUpdates{
Status: ptrTo(TxStatusPending),
DeleteRequested: fftypes.Now(),
From: ptrTo("1111"),
To: ptrTo("2222"),
Nonce: fftypes.NewFFBigInt(3333),
Gas: fftypes.NewFFBigInt(4444),
Value: fftypes.NewFFBigInt(5555),
GasPrice: fftypes.JSONAnyPtr(`{"some": "stuff"}`),
TransactionData: ptrTo("xxxx"),
TransactionHash: ptrTo("yyyy"),
PolicyInfo: fftypes.JSONAnyPtr(`{"more": "stuff"}`),
FirstSubmit: fftypes.Now(),
LastSubmit: fftypes.Now(),
ErrorMessage: ptrTo("pop"),
}
txu.Merge(txu2)
assert.Equal(t, *txu2, *txu)
txu.Merge(&TXUpdates{})
assert.Equal(t, *txu2, *txu)
}

func ptrTo[T any](v T) *T {
return &v
}

0 comments on commit a085feb

Please sign in to comment.