Skip to content

Commit

Permalink
fix: check if stream is nil in handleResource()
Browse files Browse the repository at this point in the history
Signed-off-by: BruceAko <chongzhi@hust.edu.cn>
  • Loading branch information
BruceAko committed Aug 14, 2024
1 parent f04f031 commit ffebe4f
Showing 1 changed file with 16 additions and 9 deletions.
25 changes: 16 additions & 9 deletions scheduler/service/service_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ func (v *V2) AnnouncePeers(stream schedulerv2.Scheduler_AnnouncePeersServer) err
return err
}

if err := v.handleAnnouncePeersRequest(ctx, request); err != nil {
if _, err := v.handleAnnouncePeersRequest(ctx, request); err != nil {
logger.Error(err)
return err
}
Expand Down Expand Up @@ -1384,7 +1384,10 @@ func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_An
// Store new peer or load peer.
peer, loaded := v.resource.PeerManager().Load(peerID)
if !loaded {
options := []resource.PeerOption{resource.WithPriority(download.GetPriority()), resource.WithAnnouncePeerStream(stream)}
options := []resource.PeerOption{resource.WithPriority(download.GetPriority())}
if stream != nil {
options = append(options, resource.WithAnnouncePeerStream(stream))
}
if download.GetRange() != nil {
options = append(options, resource.WithRange(http.Range{Start: int64(download.Range.GetStart()), Length: int64(download.Range.GetLength())}))
}
Expand Down Expand Up @@ -1472,7 +1475,9 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, taskID string, download
}

// handleAnnouncePeersRequest handles AnnouncePeersRequest.
func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv2.AnnouncePeersRequest) error {
func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv2.AnnouncePeersRequest) ([]*resource.Peer, error) {
var peers []*resource.Peer

for _, p := range request.Peers {
hostID := p.GetHost().GetId()
peerTask := p.GetTask()
Expand All @@ -1493,7 +1498,7 @@ func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv

_, task, peer, err := v.handleResource(ctx, nil, hostID, taskID, peerID, download)
if err != nil {
return err
return nil, err
}

// If the task dag size exceeds the limit,
Expand All @@ -1502,7 +1507,7 @@ func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv
peer.Log.Info("task dag size exceeds the limit, causing the peer to leave")
if err := peer.FSM.Event(context.Background(), resource.PeerEventLeave); err != nil {
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
return status.Error(codes.Internal, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}

v.resource.PeerManager().Delete(peer.ID)
Expand All @@ -1517,7 +1522,7 @@ func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv
if err := task.FSM.Event(ctx, resource.TaskEventDownload); err != nil {
msg := fmt.Sprintf("task fsm event failed: %s", err.Error())
peer.Log.Error(msg)
return status.Error(codes.Internal, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
}

Expand All @@ -1536,7 +1541,7 @@ func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv
if len(pieceInfo.GetDigest()) > 0 {
d, err := digest.Parse(pieceInfo.GetDigest())
if err != nil {
return status.Errorf(codes.InvalidArgument, err.Error())
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}

piece.Digest = d
Expand All @@ -1558,7 +1563,7 @@ func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv
peer.Task.ContentLength.Store(int64(len(p.Pieces)))
peer.Task.TotalPieceCount.Store(int32(task.ContentLength.Load()))
if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownloadSucceeded); err != nil {
return status.Error(codes.Internal, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
}
}
Expand All @@ -1568,7 +1573,9 @@ func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv
if !peer.FSM.Is(resource.PeerStateSucceeded) {
peer.FSM.SetState(resource.PeerStateSucceeded)
}

peers = append(peers, peer)
}

return nil
return peers, nil
}

0 comments on commit ffebe4f

Please sign in to comment.