Skip to content

Commit

Permalink
feat: peer injects existing task metadata to scheduler at startup
Browse files Browse the repository at this point in the history
Signed-off-by: BruceAko <chongzhi@hust.edu.cn>
  • Loading branch information
BruceAko committed Jul 23, 2024
1 parent 29ab308 commit aea7f27
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 1 deletion.
14 changes: 14 additions & 0 deletions scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,20 @@ var (
Help: "Counter of the number of failed of the leaving host.",
})

AnnouncePeersCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "announce_peers_total",
Help: "Counter of the number of the announcing peers.",
})

AnnouncePeersFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "announce_peers_failure_total",
Help: "Counter of the number of failed of the announcing peers.",
})

SyncProbesCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Expand Down
9 changes: 8 additions & 1 deletion scheduler/rpcserver/scheduler_server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,16 @@ func (s *schedulerServerV2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesSe
return nil
}

// TODO Implement the following methods.
// AnnouncePeers announces peers to scheduler.
func (s *schedulerServerV2) AnnouncePeers(stream schedulerv2.Scheduler_AnnouncePeersServer) error {
// Collect AnnouncePeersCount metrics.
metrics.AnnouncePeersCount.Inc()
if err := s.service.AnnouncePeers(stream); err != nil {
// Collect AnnouncePeersFailureCount metrics.
metrics.AnnouncePeersFailureCount.Inc()
return err
}

return nil
}

Expand Down
162 changes: 162 additions & 0 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,36 @@ func (v *V2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error {
}
}

// AnnouncePeers announces peers to scheduler at startup.
func (v *V2) AnnouncePeers(stream schedulerv2.Scheduler_AnnouncePeersServer) error {
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()

for {
select {
case <-ctx.Done():
logger.Info("context was done")
return ctx.Err()
default:
}

request, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}

logger.Errorf("receive error: %s", err.Error())
return err
}

if err := v.handleAnnouncePeersRequest(ctx, request); err != nil {
logger.Error(err)
return err
}
}
}

// handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest.
func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error {
// Handle resource included host, task, and peer.
Expand Down Expand Up @@ -1436,3 +1466,135 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, taskID string, download

return nil
}

// handleAnnouncePeersRequest handles AnnouncePeersRequest.
func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv2.AnnouncePeersRequest) error {
for _, p := range request.Peers {
// If the host does not exist and the host address cannot be found,
// it may cause an exception.
hostID := p.GetHost().GetId()
host, loaded := v.resource.HostManager().Load(hostID)
if !loaded {
return status.Errorf(codes.NotFound, "host %s not found", hostID)
}

// Store new task or update task.
peerTask := p.GetTask()
task, loaded := v.resource.TaskManager().Load(peerTask.GetId())
if !loaded {
options := []resource.TaskOption{resource.WithPieceLength(int32(peerTask.GetPieceLength()))}
if peerTask.GetDigest() != "" {
d, err := digest.Parse(peerTask.GetDigest())
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}

// If request has invalid digest, then new task with the nil digest.
options = append(options, resource.WithDigest(d))
}

task = resource.NewTask(peerTask.GetId(), peerTask.GetUrl(), peerTask.GetTag(), peerTask.GetApplication(), peerTask.GetType(),
peerTask.GetFilteredQueryParams(), peerTask.GetRequestHeader(), int32(v.config.Scheduler.BackToSourceCount), options...)
v.resource.TaskManager().Store(task)
} else {
task.URL = peerTask.GetUrl()
task.FilteredQueryParams = peerTask.GetFilteredQueryParams()
task.Header = peerTask.GetRequestHeader()
}

// Store new peer or load peer.
peer, loaded := v.resource.PeerManager().Load(p.GetId())
if !loaded {
options := []resource.PeerOption{resource.WithPriority(p.GetPriority())}
if p.GetRange() != nil {
options = append(options, resource.WithRange(http.Range{Start: int64(p.Range.GetStart()), Length: int64(p.Range.GetLength())}))
}

peer = resource.NewPeer(p.GetId(), &v.config.Resource, task, host, options...)
v.resource.PeerManager().Store(peer)
}

// If the task state is not TaskStateSucceeded,
// advance the task state to TaskStateSucceeded.
if !task.FSM.Is(resource.TaskStateSucceeded) {
if task.FSM.Can(resource.TaskEventDownload) {
if err := task.FSM.Event(ctx, resource.TaskEventDownload); err != nil {
msg := fmt.Sprintf("task fsm event failed: %s", err.Error())
peer.Log.Error(msg)
return status.Error(codes.Internal, err.Error())
}
}

// Construct piece.
for _, pieceInfo := range p.Pieces {
piece := &resource.Piece{
Number: int32(pieceInfo.GetNumber()),
ParentID: pieceInfo.GetParentId(),
Offset: pieceInfo.GetOffset(),
Length: pieceInfo.GetLength(),
TrafficType: commonv2.TrafficType_LOCAL_PEER,
Cost: 0,
CreatedAt: time.Now(),
}

if len(pieceInfo.GetDigest()) > 0 {
d, err := digest.Parse(pieceInfo.GetDigest())
if err != nil {
return status.Errorf(codes.InvalidArgument, err.Error())
}

piece.Digest = d
}

// Handle peer with piece finished request. When the piece is downloaded successfully, peer.UpdatedAt needs
// to be updated to prevent the peer from being GC during the download process.
peer.StorePiece(piece)
peer.FinishedPieces.Set(uint(piece.Number))
peer.AppendPieceCost(piece.Cost)
peer.PieceUpdatedAt.Store(time.Now())
peer.UpdatedAt.Store(time.Now())

// Handle task with piece back-to-source finished request.
task.StorePiece(piece)
task.UpdatedAt.Store(time.Now())
}

// Handle task with peer back-to-source finished request, peer can only represent
// a successful task after downloading the complete task.
if peer.Range == nil && !peer.Task.FSM.Is(resource.TaskStateSucceeded) {
peer.Task.ContentLength.Store(int64(len(p.Pieces)))
peer.Task.TotalPieceCount.Store(int32(task.ContentLength.Load()))
if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownloadSucceeded); err != nil {
return status.Error(codes.Internal, err.Error())
}
}
}

// If the peer state is not PeerStateSucceeded,
// advance the peer state to PeerStateSucceeded.
if !peer.FSM.Is(resource.PeerStateSucceeded) {
if peer.FSM.Is(resource.PeerStatePending) {
if err := peer.FSM.Event(ctx, resource.PeerEventRegisterNormal); err != nil {
return status.Error(codes.Internal, err.Error())
}
}

// Handle peer with peer started request.
if !peer.FSM.Is(resource.PeerStateRunning) {
if err := peer.FSM.Event(ctx, resource.PeerEventDownload); err != nil {
// Collect DownloadPeerStartedFailureCount metrics.
metrics.DownloadPeerStartedFailureCount.WithLabelValues(p.GetPriority().String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
return status.Error(codes.Internal, err.Error())
}
}

// Handle peer with peer finished request.
if err := peer.FSM.Event(ctx, resource.PeerEventDownloadSucceeded); err != nil {
return status.Error(codes.Internal, err.Error())
}
}
}

return nil
}

0 comments on commit aea7f27

Please sign in to comment.