From 8ccc0cf0ef85707a8f05ba7009f84f3ccd28a1fb Mon Sep 17 00:00:00 2001 From: BruceAko Date: Mon, 12 Aug 2024 19:59:26 +0800 Subject: [PATCH] fix: check if stream is nil in handleResource() Signed-off-by: BruceAko --- scheduler/service/service_v2.go | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 575909337ce..93158aded72 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -885,7 +885,7 @@ func (v *V2) AnnouncePeers(stream schedulerv2.Scheduler_AnnouncePeersServer) err return err } - if err := v.handleAnnouncePeersRequest(ctx, request); err != nil { + if _, err := v.handleAnnouncePeersRequest(ctx, request); err != nil { logger.Error(err) return err } @@ -1393,7 +1393,10 @@ func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_An // Store new peer or load peer. peer, loaded := v.resource.PeerManager().Load(peerID) if !loaded { - options := []resource.PeerOption{resource.WithPriority(download.GetPriority()), resource.WithAnnouncePeerStream(stream)} + options := []resource.PeerOption{resource.WithPriority(download.GetPriority())} + if stream != nil { + options = append(options, resource.WithAnnouncePeerStream(stream)) + } if download.GetRange() != nil { options = append(options, resource.WithRange(http.Range{Start: int64(download.Range.GetStart()), Length: int64(download.Range.GetLength())})) } @@ -1481,7 +1484,9 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, taskID string, download } // handleAnnouncePeersRequest handles AnnouncePeersRequest. -func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv2.AnnouncePeersRequest) error { +func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv2.AnnouncePeersRequest) ([]*resource.Peer, error) { + var peers []*resource.Peer + for _, p := range request.Peers { hostID := p.GetHost().GetId() peerTask := p.GetTask() @@ -1502,7 +1507,7 @@ func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv _, task, peer, err := v.handleResource(ctx, nil, hostID, taskID, peerID, download) if err != nil { - return err + return nil, err } // If the task dag size exceeds the limit, @@ -1511,7 +1516,7 @@ func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv peer.Log.Info("task dag size exceeds the limit, causing the peer to leave") if err := peer.FSM.Event(context.Background(), resource.PeerEventLeave); err != nil { peer.Log.Errorf("peer fsm event failed: %s", err.Error()) - return status.Error(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } v.resource.PeerManager().Delete(peer.ID) @@ -1526,7 +1531,7 @@ func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv if err := task.FSM.Event(ctx, resource.TaskEventDownload); err != nil { msg := fmt.Sprintf("task fsm event failed: %s", err.Error()) peer.Log.Error(msg) - return status.Error(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } } @@ -1545,7 +1550,7 @@ func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv if len(pieceInfo.GetDigest()) > 0 { d, err := digest.Parse(pieceInfo.GetDigest()) if err != nil { - return status.Errorf(codes.InvalidArgument, err.Error()) + return nil, status.Errorf(codes.InvalidArgument, err.Error()) } piece.Digest = d @@ -1567,7 +1572,7 @@ func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv peer.Task.ContentLength.Store(int64(len(p.Pieces))) peer.Task.TotalPieceCount.Store(int32(task.ContentLength.Load())) if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownloadSucceeded); err != nil { - return status.Error(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } } } @@ -1577,7 +1582,9 @@ func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv if !peer.FSM.Is(resource.PeerStateSucceeded) { peer.FSM.SetState(resource.PeerStateSucceeded) } + + peers = append(peers, peer) } - return nil + return peers, nil }