Skip to content

Commit

Permalink
feat: peer injects existing task metadata to scheduler at startup
Browse files Browse the repository at this point in the history
Signed-off-by: BruceAko <chongzhi@hust.edu.cn>
  • Loading branch information
BruceAko committed Aug 17, 2024
1 parent d68e707 commit ac81da7
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 1 deletion.
14 changes: 14 additions & 0 deletions scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,20 @@ var (
Help: "Counter of the number of failed of the leaving host.",
})

AnnouncePeersCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "announce_peers_total",
Help: "Counter of the number of the announcing peers.",
})

AnnouncePeersFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "announce_peers_failure_total",
Help: "Counter of the number of failed of the announcing peers.",
})

SyncProbesCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Expand Down
9 changes: 8 additions & 1 deletion scheduler/rpcserver/scheduler_server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,16 @@ func (s *schedulerServerV2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesSe
return nil
}

// TODO Implement the following methods.
// AnnouncePeers announces peers to scheduler.
func (s *schedulerServerV2) AnnouncePeers(stream schedulerv2.Scheduler_AnnouncePeersServer) error {
// Collect AnnouncePeersCount metrics.
metrics.AnnouncePeersCount.Inc()
if err := s.service.AnnouncePeers(stream); err != nil {
// Collect AnnouncePeersFailureCount metrics.
metrics.AnnouncePeersFailureCount.Inc()
return err
}

return nil
}

Expand Down
132 changes: 132 additions & 0 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,36 @@ func (v *V2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error {
}
}

// AnnouncePeers announces peers to scheduler at startup.
func (v *V2) AnnouncePeers(stream schedulerv2.Scheduler_AnnouncePeersServer) error {
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()

for {
select {
case <-ctx.Done():
logger.Info("context was done")
return ctx.Err()
default:
}

request, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}

logger.Errorf("receive error: %s", err.Error())
return err
}

if err := v.handleAnnouncePeersRequest(ctx, request); err != nil {
logger.Error(err)
return err
}
}
}

// handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest.
func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error {
// Handle resource included host, task, and peer.
Expand Down Expand Up @@ -1440,3 +1470,105 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, taskID string, download

return nil
}

// handleAnnouncePeersRequest handles AnnouncePeersRequest.
func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv2.AnnouncePeersRequest) error {
for _, p := range request.Peers {
hostID := p.GetHost().GetId()
peerTask := p.GetTask()
taskID := peerTask.GetId()
peerID := p.GetId()
download := &commonv2.Download{
PieceLength: peerTask.GetPieceLength(),
Digest: peerTask.Digest,
Url: peerTask.GetUrl(),
Tag: peerTask.Tag,
Application: peerTask.Application,
Type: peerTask.GetType(),
FilteredQueryParams: peerTask.GetFilteredQueryParams(),
RequestHeader: peerTask.GetRequestHeader(),
Priority: p.GetPriority(),
Range: p.GetRange(),
}

_, task, peer, err := v.handleResource(ctx, nil, hostID, taskID, peerID, download)
if err != nil {
return err
}

// If the task dag size exceeds the limit,
// then set the peer state to PeerStateLeave.
if task.PeerCount() > resource.PeerCountLimitForTask || task.FSM.Is(resource.TaskEventLeave) {
peer.Log.Info("task dag size exceeds the limit, causing the peer to leave")
if err := peer.FSM.Event(context.Background(), resource.PeerEventLeave); err != nil {
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
return status.Error(codes.Internal, err.Error())
}

v.resource.PeerManager().Delete(peer.ID)
peer.Log.Info("peer has been reclaimed")
continue
}

// If the task state is not TaskStateSucceeded,
// advance the task state to TaskStateSucceeded.
if !task.FSM.Is(resource.TaskStateSucceeded) {
if task.FSM.Can(resource.TaskEventDownload) {
if err := task.FSM.Event(ctx, resource.TaskEventDownload); err != nil {
msg := fmt.Sprintf("task fsm event failed: %s", err.Error())
peer.Log.Error(msg)
return status.Error(codes.Internal, err.Error())
}
}

// Construct piece.
for _, pieceInfo := range p.Pieces {
piece := &resource.Piece{
Number: int32(pieceInfo.GetNumber()),
ParentID: pieceInfo.GetParentId(),
Offset: pieceInfo.GetOffset(),
Length: pieceInfo.GetLength(),
TrafficType: commonv2.TrafficType_LOCAL_PEER,
Cost: 0,
CreatedAt: time.Now(),
}

if len(pieceInfo.GetDigest()) > 0 {
d, err := digest.Parse(pieceInfo.GetDigest())
if err != nil {
return status.Errorf(codes.InvalidArgument, err.Error())
}

piece.Digest = d
}

// Update peer.UpdatedAt to prevent the peer from being GC.
peer.StorePiece(piece)
peer.FinishedPieces.Set(uint(piece.Number))
peer.AppendPieceCost(piece.Cost)
peer.PieceUpdatedAt.Store(time.Now())
peer.UpdatedAt.Store(time.Now())

// Handle the task.
task.StorePiece(piece)
task.UpdatedAt.Store(time.Now())
}

if peer.Range == nil && !peer.Task.FSM.Is(resource.TaskStateSucceeded) {
peer.Task.ContentLength.Store(int64(len(p.Pieces)))
peer.Task.TotalPieceCount.Store(int32(task.ContentLength.Load()))
if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownloadSucceeded); err != nil {
return status.Error(codes.Internal, err.Error())
}
}
}

// If the peer state is not PeerStateSucceeded,
// advance the peer state to PeerStateSucceeded.
if !peer.FSM.Is(resource.PeerStateSucceeded) {
peer.FSM.SetState(resource.PeerStateSucceeded)
}
}

return nil
}

0 comments on commit ac81da7

Please sign in to comment.