Skip to content

Commit

Permalink
feat: peer injects existing task metadata to scheduler at startup
Browse files Browse the repository at this point in the history
Signed-off-by: BruceAko <chongzhi@hust.edu.cn>
  • Loading branch information
BruceAko committed Aug 29, 2024
1 parent 2c0ae78 commit 1e78607
Show file tree
Hide file tree
Showing 4 changed files with 429 additions and 8 deletions.
14 changes: 14 additions & 0 deletions scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,20 @@ var (
Help: "Counter of the number of failed of the leaving host.",
})

AnnouncePeersCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "announce_peers_total",
Help: "Counter of the number of the announcing peers.",
})

AnnouncePeersFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "announce_peers_failure_total",
Help: "Counter of the number of failed of the announcing peers.",
})

SyncProbesCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Expand Down
9 changes: 8 additions & 1 deletion scheduler/rpcserver/scheduler_server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,16 @@ func (s *schedulerServerV2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesSe
return nil
}

// TODO Implement the following methods.
// AnnouncePeers announces peers to scheduler.
func (s *schedulerServerV2) AnnouncePeers(stream schedulerv2.Scheduler_AnnouncePeersServer) error {
// Collect AnnouncePeersCount metrics.
metrics.AnnouncePeersCount.Inc()
if err := s.service.AnnouncePeers(stream); err != nil {
// Collect AnnouncePeersFailureCount metrics.
metrics.AnnouncePeersFailureCount.Inc()
return err
}

return nil
}

Expand Down
151 changes: 145 additions & 6 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error
for {
select {
case <-ctx.Done():
logger.Info("context was done")
logger.Info("announce peer context was done")
return ctx.Err()
default:
}
Expand Down Expand Up @@ -141,29 +141,29 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error
case *schedulerv2.AnnouncePeerRequest_DownloadPeerFinishedRequest:
downloadPeerFinishedRequest := announcePeerRequest.DownloadPeerFinishedRequest
log.Infof("receive DownloadPeerFinishedRequest, content length: %d, piece count: %d", downloadPeerFinishedRequest.GetContentLength(), downloadPeerFinishedRequest.GetPieceCount())
// Notice: Handler uses context.Background() to avoid stream cancel by dfdameon.
// Notice: Handler uses context.Background() to avoid stream cancel by dfdaemon.
if err := v.handleDownloadPeerFinishedRequest(context.Background(), req.GetPeerId()); err != nil {
log.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest:
downloadPeerBackToSourceFinishedRequest := announcePeerRequest.DownloadPeerBackToSourceFinishedRequest
log.Infof("receive DownloadPeerBackToSourceFinishedRequest, content length: %d, piece count: %d", downloadPeerBackToSourceFinishedRequest.GetContentLength(), downloadPeerBackToSourceFinishedRequest.GetPieceCount())
// Notice: Handler uses context.Background() to avoid stream cancel by dfdameon.
// Notice: Handler uses context.Background() to avoid stream cancel by dfdaemon.
if err := v.handleDownloadPeerBackToSourceFinishedRequest(context.Background(), req.GetPeerId(), downloadPeerBackToSourceFinishedRequest); err != nil {
log.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_DownloadPeerFailedRequest:
log.Infof("receive DownloadPeerFailedRequest, description: %s", announcePeerRequest.DownloadPeerFailedRequest.GetDescription())
// Notice: Handler uses context.Background() to avoid stream cancel by dfdameon.
// Notice: Handler uses context.Background() to avoid stream cancel by dfdaemon.
if err := v.handleDownloadPeerFailedRequest(context.Background(), req.GetPeerId()); err != nil {
log.Error(err)
return err
}
case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFailedRequest:
log.Infof("receive DownloadPeerBackToSourceFailedRequest, description: %s", announcePeerRequest.DownloadPeerBackToSourceFailedRequest.GetDescription())
// Notice: Handler uses context.Background() to avoid stream cancel by dfdameon.
// Notice: Handler uses context.Background() to avoid stream cancel by dfdaemon.
if err := v.handleDownloadPeerBackToSourceFailedRequest(context.Background(), req.GetPeerId()); err != nil {
log.Error(err)
return err
Expand Down Expand Up @@ -862,6 +862,41 @@ func (v *V2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error {
}
}

// AnnouncePeers announces peers to scheduler at startup.
func (v *V2) AnnouncePeers(stream schedulerv2.Scheduler_AnnouncePeersServer) error {
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()

announcePeersCount := 0

for {
select {
case <-ctx.Done():
logger.Info("announce peers context was done")
return ctx.Err()
default:
}

request, err := stream.Recv()
if err != nil {
if err == io.EOF {
logger.Infof("announce %d peers", announcePeersCount)
return nil
}

logger.Errorf("receive error: %s", err.Error())
return err
}

peers, err := v.handleAnnouncePeersRequest(ctx, request)
if err != nil {
logger.Error(err)
return err
}
announcePeersCount += len(peers)
}
}

// handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest.
func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error {
// Handle resource included host, task, and peer.
Expand Down Expand Up @@ -1363,7 +1398,10 @@ func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_An
// Store new peer or load peer.
peer, loaded := v.resource.PeerManager().Load(peerID)
if !loaded {
options := []resource.PeerOption{resource.WithPriority(download.GetPriority()), resource.WithAnnouncePeerStream(stream)}
options := []resource.PeerOption{resource.WithPriority(download.GetPriority())}
if stream != nil {
options = append(options, resource.WithAnnouncePeerStream(stream))
}
if download.GetRange() != nil {
options = append(options, resource.WithRange(http.Range{Start: int64(download.Range.GetStart()), Length: int64(download.Range.GetLength())}))
}
Expand Down Expand Up @@ -1449,3 +1487,104 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, taskID string, download

return nil
}

// handleAnnouncePeersRequest handles AnnouncePeersRequest.
func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv2.AnnouncePeersRequest) (peers []*resource.Peer, err error) {
for _, p := range request.Peers {
hostID := p.GetHost().GetId()
peerTask := p.GetTask()
if peerTask == nil {
return nil, status.Error(codes.InvalidArgument, "request is invalid and doesn't contain a task")
}
taskID := peerTask.GetId()
peerID := p.GetId()
download := &commonv2.Download{
PieceLength: &peerTask.PieceLength,
Digest: peerTask.Digest,
Url: peerTask.GetUrl(),
Tag: peerTask.Tag,
Application: peerTask.Application,
Type: peerTask.GetType(),
FilteredQueryParams: peerTask.GetFilteredQueryParams(),
RequestHeader: peerTask.GetRequestHeader(),
Priority: p.GetPriority(),
Range: p.GetRange(),
}

_, task, peer, err := v.handleResource(ctx, nil, hostID, taskID, peerID, download)
if err != nil {
return nil, err
}

// If the task dag size exceeds the limit,
// then set the peer state to PeerStateLeave.
if task.PeerCount() > resource.PeerCountLimitForTask || task.FSM.Is(resource.TaskEventLeave) {
peer.Log.Info("task dag size exceeds the limit, causing the peer to leave")
if err := peer.FSM.Event(context.Background(), resource.PeerEventLeave); err != nil {
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
return nil, status.Error(codes.Internal, err.Error())
}

v.resource.PeerManager().Delete(peer.ID)
peer.Log.Info("peer has been reclaimed")
continue
}

// If the task state is not TaskStateSucceeded,
// advance the task state to TaskStateSucceeded.
if !task.FSM.Is(resource.TaskStateSucceeded) {
if task.FSM.Can(resource.TaskEventDownload) {
task.FSM.SetState(resource.TaskStateRunning)
}

// Construct piece.
for _, pieceInfo := range p.Pieces {
piece := &resource.Piece{
Number: int32(pieceInfo.GetNumber()),
ParentID: pieceInfo.GetParentId(),
Offset: pieceInfo.GetOffset(),
Length: pieceInfo.GetLength(),
TrafficType: commonv2.TrafficType_LOCAL_PEER,
Cost: 0,
CreatedAt: time.Now(),
}

if len(pieceInfo.GetDigest()) > 0 {
d, err := digest.Parse(pieceInfo.GetDigest())
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}

piece.Digest = d
}

// Update peer.UpdatedAt to prevent the peer from being GC.
peer.StorePiece(piece)
peer.FinishedPieces.Set(uint(piece.Number))
peer.AppendPieceCost(piece.Cost)
peer.PieceUpdatedAt.Store(time.Now())
peer.UpdatedAt.Store(time.Now())

// Handle the task.
task.StorePiece(piece)
task.UpdatedAt.Store(time.Now())
}

if peer.Range == nil && !peer.Task.FSM.Is(resource.TaskStateSucceeded) {
peer.Task.ContentLength.Store(int64(len(p.Pieces)))
peer.Task.TotalPieceCount.Store(int32(task.ContentLength.Load()))
peer.Task.FSM.SetState(resource.TaskStateSucceeded)
}
}

// If the peer state is not PeerStateSucceeded,
// advance the peer state to PeerStateSucceeded.
if !peer.FSM.Is(resource.PeerStateSucceeded) {
peer.FSM.SetState(resource.PeerStateSucceeded)
}

peers = append(peers, peer)
}

return peers, nil
}
Loading

0 comments on commit 1e78607

Please sign in to comment.