Skip to content

Commit

Permalink
fix(pusher, retrieval, salud): panic and leak fix (#4077)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed May 19, 2023
1 parent 3bca4de commit 8e269c8
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 19 deletions.
8 changes: 8 additions & 0 deletions pkg/pusher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type metrics struct {

ReceiptDepth *prometheus.CounterVec
ShallowReceiptDepth *prometheus.CounterVec

ShallowReceipt prometheus.Counter
}

func newMetrics() metrics {
Expand Down Expand Up @@ -82,6 +84,12 @@ func newMetrics() metrics {
},
[]string{"depth"},
),
ShallowReceipt: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "shallow_receipt",
Help: "Total shallow receipts.",
}),
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/pusher/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ func (s *Service) checkReceipt(receipt *pushsync.Receipt) error {
// if the receipt po is out of depth AND the receipt has not yet hit the maximum retry limit, reject the receipt.
if po < d && s.attempts.try(addr) {
s.metrics.ShallowReceiptDepth.WithLabelValues(strconv.Itoa(int(po))).Inc()
s.metrics.ShallowReceipt.Inc()
return fmt.Errorf("pusher: shallow receipt depth %d, want at least %d", po, d)
}
loggerV1.Debug("chunk pushed", "chunk_address", addr, "peer_address", peer, "proximity_order", po)
Expand Down
13 changes: 5 additions & 8 deletions pkg/retrieval/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,6 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s
errorsLeft = maxRetrievedErrors
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

resultC := make(chan retrievalResult, 1)
retryC := make(chan struct{}, 1)

Expand Down Expand Up @@ -268,28 +265,28 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s
return v.(swarm.Chunk), nil
}

func (s *Service) retrieveChunk(parentCtx context.Context, chunkAddr, peer swarm.Address, result chan retrievalResult, action accounting.Action, isOrigin bool) {
func (s *Service) retrieveChunk(ctx context.Context, chunkAddr, peer swarm.Address, result chan retrievalResult, action accounting.Action, isOrigin bool) {

var (
startTime = time.Now()
err error
chunk swarm.Chunk
)

ctx, cancel := context.WithTimeout(ctx, retrieveChunkTimeout)
defer cancel()

defer func() {
if err != nil {
s.metrics.TotalErrors.Inc()
}
select {
case result <- retrievalResult{err: err, chunk: chunk, peer: peer}:
case <-parentCtx.Done():
case <-ctx.Done():
return
}
}()

ctx, cancel := context.WithTimeout(parentCtx, retrieveChunkTimeout)
defer cancel()

defer action.Cleanup()

stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName)
Expand Down
10 changes: 5 additions & 5 deletions pkg/salud/salud.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package salud monitors the storage radius and request response duration of peers
// and blocklists peers to maintain network salud (health).
// Package salud monitors the connected peers, calculates certain thresholds, and marks peers as unhealthy that
// fall short of the thresholds to maintain network salud (health).
package salud

import (
Expand Down Expand Up @@ -104,9 +104,9 @@ type peer struct {
bin uint8
}

// salud acquires the status snapshot of every peer and computes an avg response duration
// and the most common storage radius and based on these values, it blocklist peers that fall beyond
// some allowed threshold.
// salud acquires the status snapshot of every peer and computes an nth percentile of response duration and connected
// per count, the most common storage radius, and the batch commitment, and based on these values, marks peers as unhealhy that fall beyond
// the allowed thresholds.
func (s *service) salud(mode string, minPeersPerbin int) {

var (
Expand Down
7 changes: 1 addition & 6 deletions pkg/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ package status

import (
"context"
"errors"
"fmt"
"io"

"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/p2p"
Expand Down Expand Up @@ -106,9 +104,6 @@ func (s *Service) PeerSnapshot(ctx context.Context, peer swarm.Address) (*Snapsh

ss := new(pb.Snapshot)
if err := r.ReadMsgWithContext(ctx, ss); err != nil {
if errors.Is(err, io.EOF) {
return nil, nil
}
return nil, fmt.Errorf("read message failed: %w", err)
}

Expand Down Expand Up @@ -139,7 +134,7 @@ func (s *Service) handler(ctx context.Context, _ p2p.Peer, stream p2p.Stream) er
}()

var msgGet pb.Get
if err := r.ReadMsgWithContext(ctx, &msgGet); err != nil && !errors.Is(err, io.EOF) {
if err := r.ReadMsgWithContext(ctx, &msgGet); err != nil {
loggerV2.Debug("read message failed", "error", err)
return fmt.Errorf("read message: %w", err)
}
Expand Down

0 comments on commit 8e269c8

Please sign in to comment.