Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: race condition when reading and writing to inflight queue #116

Merged
merged 12 commits into from
Apr 24, 2024
41 changes: 28 additions & 13 deletions pkg/txhandler/simple/policyloop.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -103,6 +103,8 @@ func (sth *simpleTransactionHandler) markInflightUpdate() {
}

func (sth *simpleTransactionHandler) updateInflightSet(ctx context.Context) bool {
sth.inflightRWMux.Lock()
defer sth.inflightRWMux.Unlock()

oldInflight := sth.inflight
sth.inflight = make([]*pendingState, 0, len(oldInflight))
Expand All @@ -118,6 +120,7 @@ func (sth *simpleTransactionHandler) updateInflightSet(ctx context.Context) bool

// If we are not at maximum, then query if there are more candidates now
spaces := sth.maxInFlight - len(sth.inflight)
log.L(sth.ctx).Tracef("Number of spaces left '%v'", spaces)
if spaces > 0 {
var after string
if len(sth.inflight) > 0 {
Expand Down Expand Up @@ -152,7 +155,7 @@ func (sth *simpleTransactionHandler) updateInflightSet(ctx context.Context) bool
}
newLen := len(sth.inflight)
if newLen > 0 {
log.L(ctx).Debugf("Inflight set updated len=%d head-seq=%s tail-seq=%s old-tail=%s", len(sth.inflight), sth.inflight[0].mtx.SequenceID, sth.inflight[newLen-1].mtx.SequenceID, after)
log.L(ctx).Debugf("Inflight set updated with %d additional transactions, length is now %d head-id:%s head-seq=%s tail-id:%s tail-seq=%s old-tail=%s", len(additional), len(sth.inflight), sth.inflight[0].mtx.ID, sth.inflight[0].mtx.SequenceID, sth.inflight[newLen-1].mtx.ID, sth.inflight[newLen-1].mtx.SequenceID, after)
}
}
sth.setTransactionInflightQueueMetrics(ctx)
Expand All @@ -161,6 +164,7 @@ func (sth *simpleTransactionHandler) updateInflightSet(ctx context.Context) bool
}

func (sth *simpleTransactionHandler) policyLoopCycle(ctx context.Context, inflightStale bool) {
log.L(ctx).Tracef("policyLoopCycle triggered inflightStatle=%v", inflightStale)

// Process any synchronous commands first - these might not be in our inflight set
sth.processPolicyAPIRequests(ctx)
Expand All @@ -170,9 +174,12 @@ func (sth *simpleTransactionHandler) policyLoopCycle(ctx context.Context, inflig
return
}
}
// Go through executing the policy engine against them

sth.inflightRWMux.RLock()
defer sth.inflightRWMux.RUnlock()
// Go through executing the policy engine against them
for _, pending := range sth.inflight {
log.L(ctx).Tracef("Executing policy against tx-id=%v", pending.mtx.ID)
err := sth.execPolicy(ctx, pending, nil)
if err != nil {
log.L(ctx).Errorf("Failed policy cycle transaction=%s operation=%s: %s", pending.mtx.TransactionHash, pending.mtx.ID, err)
Expand Down Expand Up @@ -204,13 +211,17 @@ func (sth *simpleTransactionHandler) processPolicyAPIRequests(ctx context.Contex

for _, request := range requests {
var pending *pendingState

sth.inflightRWMux.RLock()
// If this transaction is in-flight, we use that record
for _, inflight := range sth.inflight {
if inflight.mtx.ID == request.txID {
pending = inflight
break
}
}
sth.inflightRWMux.RUnlock()
// If this transaction is in-flight, we use that record
if pending == nil {
mtx, err := sth.getTransactionByID(ctx, request.txID)
if err != nil {
Expand Down Expand Up @@ -246,7 +257,9 @@ func (sth *simpleTransactionHandler) processPolicyAPIRequests(ctx context.Contex
func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context, pending *pendingState, syncRequest *policyEngineAPIRequestType) (ctx *RunContext, err error) {

// Take a snapshot of the pending state under the lock
sth.mux.Lock()
pending.mux.Lock()
defer pending.mux.Unlock()

mtx := pending.mtx
ctx = &RunContext{
Context: baseCtx,
Expand All @@ -267,7 +280,6 @@ func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context
ctx.UpdateType = Update // might change to delete later
ctx.TXUpdates.DeleteRequested = mtx.DeleteRequested
}
sth.mux.Unlock()

// Process any state updates that were queued to us from notifications from the confirmation manager
if receiptNotify != nil {
Expand All @@ -279,11 +291,9 @@ func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context
sth.incTransactionOperationCounter(ctx, pending.mtx.Namespace(ctx), "received_receipt")

// Clear the notification (as long as no other came through)
sth.mux.Lock()
if pending.receiptNotify == receiptNotify {
pending.receiptNotify = nil
}
sth.mux.Unlock()
}

if confirmNotify != nil && ctx.Confirmations != nil {
Expand All @@ -297,11 +307,9 @@ func (sth *simpleTransactionHandler) pendingToRunContext(baseCtx context.Context
}

// Clear the notification (as long as no other came through)
sth.mux.Lock()
if pending.confirmNotify == confirmNotify {
pending.confirmNotify = nil
}
sth.mux.Unlock()
}

return ctx, nil
Expand All @@ -318,6 +326,7 @@ func (sth *simpleTransactionHandler) execPolicy(baseCtx context.Context, pending
completed := false
switch {
case ctx.Confirmed && ctx.SyncAction != ActionDelete:
log.L(sth.ctx).Tracef("Transaction '%s' confirmed", ctx.TX.ID)
completed = true
ctx.UpdateType = Update
if ctx.Receipt != nil && ctx.Receipt.Success {
Expand Down Expand Up @@ -481,44 +490,50 @@ func (sth *simpleTransactionHandler) policyEngineAPIRequest(ctx context.Context,

func (sth *simpleTransactionHandler) HandleTransactionConfirmations(ctx context.Context, txID string, notification *apitypes.ConfirmationsNotification) (err error) {
// Will be picked up on the next policy loop cycle
sth.inflightRWMux.RLock()
var pending *pendingState
for _, p := range sth.inflight {
if p.mtx.ID == txID {
pending = p
break
}
}
sth.inflightRWMux.RUnlock()
if pending == nil {
err = i18n.NewError(ctx, tmmsgs.MsgTransactionNotFound, txID)
return
}
sth.mux.Lock()
pending.mux.Lock()
pending.confirmed = notification.Confirmed
pending.confirmNotify = fftypes.Now()
pending.confirmations = notification
pending.mux.Unlock()
log.L(ctx).Infof("Received %d confirmations (resync=%t)", len(notification.Confirmations), notification.NewFork)
sth.mux.Unlock()

sth.markInflightUpdate()
return
}
func (sth *simpleTransactionHandler) HandleTransactionReceiptReceived(ctx context.Context, txID string, receipt *ffcapi.TransactionReceiptResponse) (err error) {
log.L(ctx).Tracef("Handle transaction receipt received %s", txID)
sth.inflightRWMux.RLock()
var pending *pendingState
for _, p := range sth.inflight {
if p.mtx.ID == txID {
pending = p
break
}
}
sth.inflightRWMux.RUnlock()
if pending == nil {
err = i18n.NewError(ctx, tmmsgs.MsgTransactionNotFound, txID)
return
}
pending.mux.Lock()
// Will be picked up on the next policy loop cycle - guaranteed to occur before Confirmed
sth.mux.Lock()
pending.receiptNotify = fftypes.Now()
pending.receipt = receipt
sth.mux.Unlock()
pending.mux.Unlock()
// Will be picked up on the next policy loop cycle - guaranteed to occur before Confirmed
sth.markInflightUpdate()
return
}
10 changes: 7 additions & 3 deletions pkg/txhandler/simple/simple_transaction_handler.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -176,7 +176,8 @@ type simpleTransactionHandler struct {
policyLoopDone chan struct{}
inflightStale chan bool
inflightUpdate chan bool
mux sync.Mutex
mux sync.RWMutex
inflightRWMux sync.RWMutex
inflight []*pendingState
policyEngineAPIRequests []*policyEngineAPIRequest
maxInFlight int
Expand All @@ -195,6 +196,9 @@ type pendingState struct {
confirmNotify *fftypes.FFTime
remove bool
subStatus apitypes.TxSubStatus
// This mutex only works in a slice when the slice contains a pointer to this struct
// appends to a slice copy memory but when storing pointers it does not
mux sync.Mutex
}

type simplePolicyInfo struct {
Expand Down Expand Up @@ -344,8 +348,8 @@ func (sth *simpleTransactionHandler) createManagedTx(ctx context.Context, txID s
}

func (sth *simpleTransactionHandler) submitTX(ctx *RunContext) (reason ffcapi.ErrorReason, err error) {

mtx := ctx.TX

mtx.GasPrice, err = sth.getGasPrice(ctx, sth.toolkit.Connector)
if err != nil {
ctx.AddSubStatusAction(apitypes.TxActionRetrieveGasPrice, nil, fftypes.JSONAnyPtr(`{"error":"`+err.Error()+`"}`))
Expand Down