diff --git a/pkg/pusher/metrics.go b/pkg/pusher/metrics.go index 35337a52ee4..5221978c91b 100644 --- a/pkg/pusher/metrics.go +++ b/pkg/pusher/metrics.go @@ -19,6 +19,8 @@ type metrics struct { ReceiptDepth *prometheus.CounterVec ShallowReceiptDepth *prometheus.CounterVec + + ShallowReceipt prometheus.Counter } func newMetrics() metrics { @@ -82,6 +84,12 @@ func newMetrics() metrics { }, []string{"depth"}, ), + ShallowReceipt: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "shallow_receipt", + Help: "Total shallow receipts.", + }), } } diff --git a/pkg/pusher/pusher.go b/pkg/pusher/pusher.go index 83f89866bdf..3b52a559b36 100644 --- a/pkg/pusher/pusher.go +++ b/pkg/pusher/pusher.go @@ -319,6 +319,7 @@ func (s *Service) checkReceipt(receipt *pushsync.Receipt) error { // if the receipt po is out of depth AND the receipt has not yet hit the maximum retry limit, reject the receipt. if po < d && s.attempts.try(addr) { s.metrics.ShallowReceiptDepth.WithLabelValues(strconv.Itoa(int(po))).Inc() + s.metrics.ShallowReceipt.Inc() return fmt.Errorf("pusher: shallow receipt depth %d, want at least %d", po, d) } loggerV1.Debug("chunk pushed", "chunk_address", addr, "peer_address", peer, "proximity_order", po) diff --git a/pkg/retrieval/retrieval.go b/pkg/retrieval/retrieval.go index 73b49906a82..014a444450c 100644 --- a/pkg/retrieval/retrieval.go +++ b/pkg/retrieval/retrieval.go @@ -160,9 +160,6 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s errorsLeft = maxRetrievedErrors } - ctx, cancel := context.WithCancel(ctx) - defer cancel() - resultC := make(chan retrievalResult, 1) retryC := make(chan struct{}, 1) @@ -268,7 +265,7 @@ func (s *Service) RetrieveChunk(ctx context.Context, chunkAddr, sourcePeerAddr s return v.(swarm.Chunk), nil } -func (s *Service) retrieveChunk(parentCtx context.Context, chunkAddr, peer swarm.Address, result chan retrievalResult, action accounting.Action, isOrigin bool) { +func (s *Service) retrieveChunk(ctx context.Context, chunkAddr, peer swarm.Address, result chan retrievalResult, action accounting.Action, isOrigin bool) { var ( startTime = time.Now() @@ -276,20 +273,20 @@ func (s *Service) retrieveChunk(parentCtx context.Context, chunkAddr, peer swarm chunk swarm.Chunk ) + ctx, cancel := context.WithTimeout(ctx, retrieveChunkTimeout) + defer cancel() + defer func() { if err != nil { s.metrics.TotalErrors.Inc() } select { case result <- retrievalResult{err: err, chunk: chunk, peer: peer}: - case <-parentCtx.Done(): + case <-ctx.Done(): return } }() - ctx, cancel := context.WithTimeout(parentCtx, retrieveChunkTimeout) - defer cancel() - defer action.Cleanup() stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName) diff --git a/pkg/salud/salud.go b/pkg/salud/salud.go index e6dde57823b..caffba34151 100644 --- a/pkg/salud/salud.go +++ b/pkg/salud/salud.go @@ -2,8 +2,8 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// Package salud monitors the storage radius and request response duration of peers -// and blocklists peers to maintain network salud (health). +// Package salud monitors the connected peers, calculates certain thresholds, and marks peers as unhealthy that +// fall short of the thresholds to maintain network salud (health). package salud import ( @@ -104,9 +104,9 @@ type peer struct { bin uint8 } -// salud acquires the status snapshot of every peer and computes an avg response duration -// and the most common storage radius and based on these values, it blocklist peers that fall beyond -// some allowed threshold. +// salud acquires the status snapshot of every peer and computes an nth percentile of response duration and connected +// per count, the most common storage radius, and the batch commitment, and based on these values, marks peers as unhealhy that fall beyond +// the allowed thresholds. func (s *service) salud(mode string, minPeersPerbin int) { var ( diff --git a/pkg/status/status.go b/pkg/status/status.go index beab920b987..6d27d4d6523 100644 --- a/pkg/status/status.go +++ b/pkg/status/status.go @@ -6,9 +6,7 @@ package status import ( "context" - "errors" "fmt" - "io" "github.com/ethersphere/bee/pkg/log" "github.com/ethersphere/bee/pkg/p2p" @@ -106,9 +104,6 @@ func (s *Service) PeerSnapshot(ctx context.Context, peer swarm.Address) (*Snapsh ss := new(pb.Snapshot) if err := r.ReadMsgWithContext(ctx, ss); err != nil { - if errors.Is(err, io.EOF) { - return nil, nil - } return nil, fmt.Errorf("read message failed: %w", err) } @@ -139,7 +134,7 @@ func (s *Service) handler(ctx context.Context, _ p2p.Peer, stream p2p.Stream) er }() var msgGet pb.Get - if err := r.ReadMsgWithContext(ctx, &msgGet); err != nil && !errors.Is(err, io.EOF) { + if err := r.ReadMsgWithContext(ctx, &msgGet); err != nil { loggerV2.Debug("read message failed", "error", err) return fmt.Errorf("read message: %w", err) }