Skip to content

Commit

Permalink
txHandler: kick in ARL at 1/2 of a base backlog capacity (algorand#5873)
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy authored Dec 21, 2023
1 parent a4fcdfa commit 55cbb7f
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 14 deletions.
19 changes: 11 additions & 8 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ type TxHandler struct {
streamVerifierDropped chan *verify.UnverifiedTxnSigJob
erl *util.ElasticRateLimiter
appLimiter *appRateLimiter
appLimiterBacklogThreshold int
}

// TxHandlerOpts is TxHandler configuration options
Expand Down Expand Up @@ -203,6 +204,8 @@ func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) {
uint64(opts.Config.TxBacklogAppTxPerSecondRate),
time.Duration(opts.Config.TxBacklogServiceRateWindowSeconds)*time.Second,
)
// set appLimiter triggering threshold at 50% of the base backlog size
handler.appLimiterBacklogThreshold = int(float64(opts.Config.TxBacklogSize) * float64(opts.Config.TxBacklogRateLimitingCongestionPct) / 100)
}
}

Expand Down Expand Up @@ -596,11 +599,8 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net

var err error
var capguard *util.ErlCapacityGuard
var congested bool
if handler.erl != nil || handler.appLimiter != nil {
congested = float64(cap(handler.backlogQueue))*handler.backlogCongestionThreshold < float64(len(handler.backlogQueue))
}
if handler.erl != nil {
congestedERL := float64(cap(handler.backlogQueue))*handler.backlogCongestionThreshold < float64(len(handler.backlogQueue))
// consume a capacity unit
// if the elastic rate limiter cannot vend a capacity, the error it returns
// is sufficient to indicate that we should enable Congestion Control, because
Expand All @@ -613,7 +613,7 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
return network.OutgoingMessage{Action: network.Ignore}
}
// if the backlog Queue has 50% of its buffer back, turn congestion control off
if !congested {
if !congestedERL {
handler.erl.DisableCongestionControl()
}
}
Expand Down Expand Up @@ -663,9 +663,12 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
}

// rate limit per application in a group. Limiting any app in a group drops the entire message.
if handler.appLimiter != nil && congested && handler.appLimiter.shouldDrop(unverifiedTxGroup, rawmsg.Sender.(network.IPAddressable).RoutingAddr()) {
transactionMessagesAppLimiterDrop.Inc(nil)
return network.OutgoingMessage{Action: network.Ignore}
if handler.appLimiter != nil {
congestedARL := len(handler.backlogQueue) > handler.appLimiterBacklogThreshold
if congestedARL && handler.appLimiter.shouldDrop(unverifiedTxGroup, rawmsg.Sender.(network.IPAddressable).RoutingAddr()) {
transactionMessagesAppLimiterDrop.Inc(nil)
return network.OutgoingMessage{Action: network.Ignore}
}
}

select {
Expand Down
12 changes: 6 additions & 6 deletions data/txHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2522,9 +2522,7 @@ func TestTxHandlerAppRateLimiterERLEnabled(t *testing.T) {
cfg.TxBacklogAppTxRateLimiterMaxSize = 100
cfg.TxBacklogServiceRateWindowSeconds = 1
cfg.TxBacklogAppTxPerSecondRate = 3
cfg.TxBacklogReservedCapacityPerPeer = 2
cfg.TxBacklogSize = 1
cfg.IncomingConnectionsLimit = 1
cfg.TxBacklogSize = 3
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, bookkeeping.GenesisBalances{}, genesisID, genesisHash, nil, cfg)
require.NoError(t, err)
defer ledger.Close()
Expand Down Expand Up @@ -2585,15 +2583,17 @@ func TestTxHandlerAppRateLimiterERLEnabled(t *testing.T) {
sender := mockSender{}

// submit and ensure it is accepted
congested := float64(cap(handler.backlogQueue))*0.5 < float64(len(handler.backlogQueue))
pct := float64(cfg.TxBacklogRateLimitingCongestionPct) / 100
limit := int(float64(cfg.TxBacklogSize) * pct)
congested := len(handler.backlogQueue) > limit
require.False(t, congested)

action := handler.processIncomingTxn(network.IncomingMessage{Data: blob, Sender: sender})
require.Equal(t, network.OutgoingMessage{Action: network.Ignore}, action)
require.Equal(t, 1, len(handler.backlogQueue))

// repeat the same txn, we are still not congested
congested = float64(cap(handler.backlogQueue))*0.5 < float64(len(handler.backlogQueue))
congested = len(handler.backlogQueue) > limit
require.False(t, congested)

signedTx = tx.Sign(keypair())
Expand All @@ -2603,7 +2603,7 @@ func TestTxHandlerAppRateLimiterERLEnabled(t *testing.T) {
require.Equal(t, 2, len(handler.backlogQueue))
require.Equal(t, 0, handler.appLimiter.len()) // no rate limiting yet

congested = float64(cap(handler.backlogQueue))*0.5 < float64(len(handler.backlogQueue))
congested = len(handler.backlogQueue) > limit
require.True(t, congested)

// submit it again and the app rate limiter should kick in
Expand Down

0 comments on commit 55cbb7f

Please sign in to comment.