diff --git a/scheduler/metrics/metrics.go b/scheduler/metrics/metrics.go index 602f63de6a9..f4e050a3e0d 100644 --- a/scheduler/metrics/metrics.go +++ b/scheduler/metrics/metrics.go @@ -238,6 +238,20 @@ var ( Help: "Counter of the number of failed of the leaving host.", }) + AnnouncePeersCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "announce_peers_total", + Help: "Counter of the number of the announcing peers.", + }) + + AnnouncePeersFailureCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "announce_peers_failure_total", + Help: "Counter of the number of failed of the announcing peers.", + }) + SyncProbesCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, diff --git a/scheduler/rpcserver/scheduler_server_v2.go b/scheduler/rpcserver/scheduler_server_v2.go index 5ab10babce3..c8940675495 100644 --- a/scheduler/rpcserver/scheduler_server_v2.go +++ b/scheduler/rpcserver/scheduler_server_v2.go @@ -164,9 +164,16 @@ func (s *schedulerServerV2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesSe return nil } -// TODO Implement the following methods. // AnnouncePeers announces peers to scheduler. func (s *schedulerServerV2) AnnouncePeers(stream schedulerv2.Scheduler_AnnouncePeersServer) error { + // Collect AnnouncePeersCount metrics. + metrics.AnnouncePeersCount.Inc() + if err := s.service.AnnouncePeers(stream); err != nil { + // Collect AnnouncePeersFailureCount metrics. + metrics.AnnouncePeersFailureCount.Inc() + return err + } + return nil } diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index e415466ea81..f913144a33f 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -862,6 +862,36 @@ func (v *V2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error { } } +// AnnouncePeers announces peers to scheduler at startup. +func (v *V2) AnnouncePeers(stream schedulerv2.Scheduler_AnnouncePeersServer) error { + ctx, cancel := context.WithCancel(stream.Context()) + defer cancel() + + for { + select { + case <-ctx.Done(): + logger.Info("context was done") + return ctx.Err() + default: + } + + request, err := stream.Recv() + if err != nil { + if err == io.EOF { + return nil + } + + logger.Errorf("receive error: %s", err.Error()) + return err + } + + if err := v.handleAnnouncePeersRequest(ctx, request); err != nil { + logger.Error(err) + return err + } + } +} + // handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest. func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error { // Handle resource included host, task, and peer. @@ -1440,3 +1470,105 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, taskID string, download return nil } + +// handleAnnouncePeersRequest handles AnnouncePeersRequest. +func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv2.AnnouncePeersRequest) error { + for _, p := range request.Peers { + hostID := p.GetHost().GetId() + peerTask := p.GetTask() + taskID := peerTask.GetId() + peerID := p.GetId() + download := &commonv2.Download{ + PieceLength: peerTask.GetPieceLength(), + Digest: peerTask.Digest, + Url: peerTask.GetUrl(), + Tag: peerTask.Tag, + Application: peerTask.Application, + Type: peerTask.GetType(), + FilteredQueryParams: peerTask.GetFilteredQueryParams(), + RequestHeader: peerTask.GetRequestHeader(), + Priority: p.GetPriority(), + Range: p.GetRange(), + } + + _, task, peer, err := v.handleResource(ctx, nil, hostID, taskID, peerID, download) + if err != nil { + return err + } + + // If the task dag size exceeds the limit, + // then set the peer state to PeerStateLeave. + if task.PeerCount() > resource.PeerCountLimitForTask || task.FSM.Is(resource.TaskEventLeave) { + peer.Log.Info("task dag size exceeds the limit, causing the peer to leave") + if err := peer.FSM.Event(context.Background(), resource.PeerEventLeave); err != nil { + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) + return status.Error(codes.Internal, err.Error()) + } + + v.resource.PeerManager().Delete(peer.ID) + peer.Log.Info("peer has been reclaimed") + continue + } + + // If the task state is not TaskStateSucceeded, + // advance the task state to TaskStateSucceeded. + if !task.FSM.Is(resource.TaskStateSucceeded) { + if task.FSM.Can(resource.TaskEventDownload) { + if err := task.FSM.Event(ctx, resource.TaskEventDownload); err != nil { + msg := fmt.Sprintf("task fsm event failed: %s", err.Error()) + peer.Log.Error(msg) + return status.Error(codes.Internal, err.Error()) + } + } + + // Construct piece. + for _, pieceInfo := range p.Pieces { + piece := &resource.Piece{ + Number: int32(pieceInfo.GetNumber()), + ParentID: pieceInfo.GetParentId(), + Offset: pieceInfo.GetOffset(), + Length: pieceInfo.GetLength(), + TrafficType: commonv2.TrafficType_LOCAL_PEER, + Cost: 0, + CreatedAt: time.Now(), + } + + if len(pieceInfo.GetDigest()) > 0 { + d, err := digest.Parse(pieceInfo.GetDigest()) + if err != nil { + return status.Errorf(codes.InvalidArgument, err.Error()) + } + + piece.Digest = d + } + + // Update peer.UpdatedAt to prevent the peer from being GC. + peer.StorePiece(piece) + peer.FinishedPieces.Set(uint(piece.Number)) + peer.AppendPieceCost(piece.Cost) + peer.PieceUpdatedAt.Store(time.Now()) + peer.UpdatedAt.Store(time.Now()) + + // Handle the task. + task.StorePiece(piece) + task.UpdatedAt.Store(time.Now()) + } + + if peer.Range == nil && !peer.Task.FSM.Is(resource.TaskStateSucceeded) { + peer.Task.ContentLength.Store(int64(len(p.Pieces))) + peer.Task.TotalPieceCount.Store(int32(task.ContentLength.Load())) + if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownloadSucceeded); err != nil { + return status.Error(codes.Internal, err.Error()) + } + } + } + + // If the peer state is not PeerStateSucceeded, + // advance the peer state to PeerStateSucceeded. + if !peer.FSM.Is(resource.PeerStateSucceeded) { + peer.FSM.SetState(resource.PeerStateSucceeded) + } + } + + return nil +}