Skip to content

Commit

Permalink
fix: add reachability field to status response (#4054)
Browse files Browse the repository at this point in the history
  • Loading branch information
notanatol authored May 15, 2023
1 parent a740dcb commit 3bca4de
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 46 deletions.
4 changes: 3 additions & 1 deletion openapi/SwarmCommon.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
openapi: 3.0.3
info:
version: 3.2.1
version: 3.2.2
title: Common Data Types
description: |
\*****bzzz*****
Expand Down Expand Up @@ -861,6 +861,8 @@ components:
type: boolean
BatchCommitment:
type: integer
isReachable:
type: boolean

StatusResponse:
type: object
Expand Down
5 changes: 4 additions & 1 deletion pkg/api/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type statusSnapshotResponse struct {
NeighborhoodSize uint64 `json:"neighborhoodSize"`
RequestFailed bool `json:"requestFailed,omitempty"`
BatchCommitment uint64 `json:"batchCommitment"`
IsReachable bool `json:"isReachable"`
}

type statusResponse struct {
Expand Down Expand Up @@ -77,6 +78,7 @@ func (s *Service) statusGetHandler(w http.ResponseWriter, _ *http.Request) {
ConnectedPeers: ss.ConnectedPeers,
NeighborhoodSize: ss.NeighborhoodSize,
BatchCommitment: ss.BatchCommitment,
IsReachable: ss.IsReachable,
})
}

Expand Down Expand Up @@ -121,6 +123,7 @@ func (s *Service) statusGetPeersHandler(w http.ResponseWriter, r *http.Request)
snapshot.ConnectedPeers = ss.ConnectedPeers
snapshot.NeighborhoodSize = ss.NeighborhoodSize
snapshot.BatchCommitment = ss.BatchCommitment
snapshot.IsReachable = ss.IsReachable
}

mu.Lock()
Expand All @@ -133,7 +136,7 @@ func (s *Service) statusGetPeersHandler(w http.ResponseWriter, r *http.Request)

err := s.topologyDriver.EachConnectedPeer(
peerFunc,
topology.Filter{Reachable: true},
topology.Filter{},
)
if err != nil {
logger.Debug("status snapshot", "error", err)
Expand Down
4 changes: 4 additions & 0 deletions pkg/api/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestGetStatus(t *testing.T) {
ConnectedPeers: 0,
NeighborhoodSize: 0,
BatchCommitment: 1,
IsReachable: true,
}

ssMock := &statusSnapshotMock{
Expand Down Expand Up @@ -99,6 +100,9 @@ func (m *topologyPeersIterNoopMock) EachConnectedPeer(_ topology.EachPeerFunc, _
func (m *topologyPeersIterNoopMock) EachConnectedPeerRev(_ topology.EachPeerFunc, _ topology.Filter) error {
return nil
}
func (m *topologyPeersIterNoopMock) IsReachable() bool {
return true
}

// statusSnapshotMock satisfies the following interfaces:
// - depthmonitor.ReserveReporter
Expand Down
88 changes: 62 additions & 26 deletions pkg/status/internal/pb/status.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/status/internal/pb/status.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ message Snapshot {
uint64 NeighborhoodSize = 5;
string BeeMode = 6;
uint64 BatchCommitment = 7;
bool IsReachable = 8;
}
42 changes: 25 additions & 17 deletions pkg/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,16 @@ const (
// Snapshot is the current snapshot of the system.
type Snapshot pb.Snapshot

type topologyDriver interface {
topology.PeerIterator
IsReachable() bool
}

// Service is the status service.
type Service struct {
logger log.Logger
streamer p2p.Streamer
topologyIter topology.PeerIterator
logger log.Logger
streamer p2p.Streamer
topologyDriver topologyDriver

beeMode string
reserve depthmonitor.ReserveReporter
Expand All @@ -52,23 +57,23 @@ func (s *Service) LocalSnapshot() (*Snapshot, error) {
connectedPeers uint64
neighborhoodSize uint64
)
err := s.topologyIter.EachConnectedPeer(
err := s.topologyDriver.EachConnectedPeer(
func(_ swarm.Address, po uint8) (bool, bool, error) {
connectedPeers++
if po >= storageRadius {
neighborhoodSize++
}
return false, false, nil
},
topology.Filter{Reachable: true},
topology.Filter{},
)
if err != nil {
return nil, fmt.Errorf("iterate connected peers: %w", err)
}

commitment, err := s.commitment.Commitment()
if err != nil {
return nil, fmt.Errorf("batchstore commitemnt: %w", err)
return nil, fmt.Errorf("batchstore commitment: %w", err)
}

return &Snapshot{
Expand All @@ -79,6 +84,7 @@ func (s *Service) LocalSnapshot() (*Snapshot, error) {
ConnectedPeers: connectedPeers,
NeighborhoodSize: neighborhoodSize,
BatchCommitment: commitment,
IsReachable: s.topologyDriver.IsReachable(),
}, nil
}

Expand All @@ -105,6 +111,7 @@ func (s *Service) PeerSnapshot(ctx context.Context, peer swarm.Address) (*Snapsh
}
return nil, fmt.Errorf("read message failed: %w", err)
}

return (*Snapshot)(ss), nil
}

Expand Down Expand Up @@ -142,15 +149,15 @@ func (s *Service) handler(ctx context.Context, _ p2p.Peer, stream p2p.Stream) er
connectedPeers uint64
neighborhoodSize uint64
)
err := s.topologyIter.EachConnectedPeer(
err := s.topologyDriver.EachConnectedPeer(
func(_ swarm.Address, po uint8) (bool, bool, error) {
connectedPeers++
if po >= storageRadius {
neighborhoodSize++
}
return false, false, nil
},
topology.Filter{Reachable: true},
topology.Filter{},
)
if err != nil {
s.logger.Error(err, "iteration of connected peers failed")
Expand All @@ -170,6 +177,7 @@ func (s *Service) handler(ctx context.Context, _ p2p.Peer, stream p2p.Stream) er
ConnectedPeers: connectedPeers,
NeighborhoodSize: neighborhoodSize,
BatchCommitment: commitment,
IsReachable: s.topologyDriver.IsReachable(),
}); err != nil {
loggerV2.Debug("write message failed", "error", err)
return fmt.Errorf("write message: %w", err)
Expand All @@ -182,21 +190,21 @@ func (s *Service) handler(ctx context.Context, _ p2p.Peer, stream p2p.Stream) er
func NewService(
logger log.Logger,
streamer p2p.Streamer,
topologyIter topology.PeerIterator,
topology topologyDriver,
beeMode string,
reserve depthmonitor.ReserveReporter,
sync depthmonitor.SyncReporter,
radius postage.RadiusReporter,
commitment postage.CommitmentGetter,
) *Service {
return &Service{
logger: logger.WithName(loggerName).Register(),
streamer: streamer,
topologyIter: topologyIter,
beeMode: beeMode,
reserve: reserve,
sync: sync,
radius: radius,
commitment: commitment,
logger: logger.WithName(loggerName).Register(),
streamer: streamer,
topologyDriver: topology,
beeMode: beeMode,
reserve: reserve,
sync: sync,
radius: radius,
commitment: commitment,
}
}
5 changes: 5 additions & 0 deletions pkg/status/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestStatus(t *testing.T) {
PullsyncRate: 64,
StorageRadius: 8,
BatchCommitment: 1024,
IsReachable: true,
}

sssMock := &statusSnapshotMock{want}
Expand Down Expand Up @@ -100,6 +101,10 @@ func (m *topologyPeersIterNoopMock) EachConnectedPeerRev(_ topology.EachPeerFunc
return nil
}

func (m *topologyPeersIterNoopMock) IsReachable() bool {
return true
}

// statusSnapshotMock satisfies the following interfaces:
// - depthmonitor.ReserveReporter
// - depthmonitor.SyncReporter
Expand Down
1 change: 0 additions & 1 deletion pkg/topology/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,6 @@ func (k *Kad) UpdateReachability(status p2p.ReachabilityStatus) {
// p2p.ReachabilityStatusUnknown are ignored.
func (k *Kad) UpdatePeerHealth(peer swarm.Address, health bool) {
k.collector.Record(peer, im.PeerHealth(health))
// k.logger.Debug("health of peer updated", "peer_address", peer, "health", health)
}

// SubscribeTopologyChange returns the channel that signals when the connected peers
Expand Down

0 comments on commit 3bca4de

Please sign in to comment.