diff --git a/scheduler/metrics/metrics.go b/scheduler/metrics/metrics.go index 08cd9fb432e..6a5379b6fb4 100644 --- a/scheduler/metrics/metrics.go +++ b/scheduler/metrics/metrics.go @@ -238,6 +238,20 @@ var ( Help: "Counter of the number of failed of the leaving host.", }) + AnnouncePeersCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "announce_peers_total", + Help: "Counter of the number of the announcing peers.", + }) + + AnnouncePeersFailureCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "announce_peers_failure_total", + Help: "Counter of the number of failed of the announcing peers.", + }) + SyncProbesCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, diff --git a/scheduler/rpcserver/scheduler_server_v2.go b/scheduler/rpcserver/scheduler_server_v2.go index 5ab10babce3..c8940675495 100644 --- a/scheduler/rpcserver/scheduler_server_v2.go +++ b/scheduler/rpcserver/scheduler_server_v2.go @@ -164,9 +164,16 @@ func (s *schedulerServerV2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesSe return nil } -// TODO Implement the following methods. // AnnouncePeers announces peers to scheduler. func (s *schedulerServerV2) AnnouncePeers(stream schedulerv2.Scheduler_AnnouncePeersServer) error { + // Collect AnnouncePeersCount metrics. + metrics.AnnouncePeersCount.Inc() + if err := s.service.AnnouncePeers(stream); err != nil { + // Collect AnnouncePeersFailureCount metrics. + metrics.AnnouncePeersFailureCount.Inc() + return err + } + return nil } diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 39d20edc37d..4dc5464ec01 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -93,7 +93,7 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error for { select { case <-ctx.Done(): - logger.Info("context was done") + logger.Info("announce peer context was done") return ctx.Err() default: } @@ -141,7 +141,7 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error case *schedulerv2.AnnouncePeerRequest_DownloadPeerFinishedRequest: downloadPeerFinishedRequest := announcePeerRequest.DownloadPeerFinishedRequest log.Infof("receive DownloadPeerFinishedRequest, content length: %d, piece count: %d", downloadPeerFinishedRequest.GetContentLength(), downloadPeerFinishedRequest.GetPieceCount()) - // Notice: Handler uses context.Background() to avoid stream cancel by dfdameon. + // Notice: Handler uses context.Background() to avoid stream cancel by dfdaemon. if err := v.handleDownloadPeerFinishedRequest(context.Background(), req.GetPeerId()); err != nil { log.Error(err) return err @@ -149,21 +149,21 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest: downloadPeerBackToSourceFinishedRequest := announcePeerRequest.DownloadPeerBackToSourceFinishedRequest log.Infof("receive DownloadPeerBackToSourceFinishedRequest, content length: %d, piece count: %d", downloadPeerBackToSourceFinishedRequest.GetContentLength(), downloadPeerBackToSourceFinishedRequest.GetPieceCount()) - // Notice: Handler uses context.Background() to avoid stream cancel by dfdameon. + // Notice: Handler uses context.Background() to avoid stream cancel by dfdaemon. if err := v.handleDownloadPeerBackToSourceFinishedRequest(context.Background(), req.GetPeerId(), downloadPeerBackToSourceFinishedRequest); err != nil { log.Error(err) return err } case *schedulerv2.AnnouncePeerRequest_DownloadPeerFailedRequest: log.Infof("receive DownloadPeerFailedRequest, description: %s", announcePeerRequest.DownloadPeerFailedRequest.GetDescription()) - // Notice: Handler uses context.Background() to avoid stream cancel by dfdameon. + // Notice: Handler uses context.Background() to avoid stream cancel by dfdaemon. if err := v.handleDownloadPeerFailedRequest(context.Background(), req.GetPeerId()); err != nil { log.Error(err) return err } case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFailedRequest: log.Infof("receive DownloadPeerBackToSourceFailedRequest, description: %s", announcePeerRequest.DownloadPeerBackToSourceFailedRequest.GetDescription()) - // Notice: Handler uses context.Background() to avoid stream cancel by dfdameon. + // Notice: Handler uses context.Background() to avoid stream cancel by dfdaemon. if err := v.handleDownloadPeerBackToSourceFailedRequest(context.Background(), req.GetPeerId()); err != nil { log.Error(err) return err @@ -862,6 +862,41 @@ func (v *V2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error { } } +// AnnouncePeers announces peers to scheduler at startup. +func (v *V2) AnnouncePeers(stream schedulerv2.Scheduler_AnnouncePeersServer) error { + ctx, cancel := context.WithCancel(stream.Context()) + defer cancel() + + announcePeersCount := 0 + + for { + select { + case <-ctx.Done(): + logger.Info("announce peers context was done") + return ctx.Err() + default: + } + + request, err := stream.Recv() + if err != nil { + if err == io.EOF { + logger.Infof("announce %d peers", announcePeersCount) + return nil + } + + logger.Errorf("receive error: %s", err.Error()) + return err + } + + peers, err := v.handleAnnouncePeersRequest(ctx, request) + if err != nil { + logger.Error(err) + return err + } + announcePeersCount += len(peers) + } +} + // handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest. func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error { // Handle resource included host, task, and peer. @@ -1363,7 +1398,10 @@ func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_An // Store new peer or load peer. peer, loaded := v.resource.PeerManager().Load(peerID) if !loaded { - options := []resource.PeerOption{resource.WithPriority(download.GetPriority()), resource.WithAnnouncePeerStream(stream)} + options := []resource.PeerOption{resource.WithPriority(download.GetPriority())} + if stream != nil { + options = append(options, resource.WithAnnouncePeerStream(stream)) + } if download.GetRange() != nil { options = append(options, resource.WithRange(http.Range{Start: int64(download.Range.GetStart()), Length: int64(download.Range.GetLength())})) } @@ -1449,3 +1487,104 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, taskID string, download return nil } + +// handleAnnouncePeersRequest handles AnnouncePeersRequest. +func (v *V2) handleAnnouncePeersRequest(ctx context.Context, request *schedulerv2.AnnouncePeersRequest) (peers []*resource.Peer, err error) { + for _, p := range request.Peers { + hostID := p.GetHost().GetId() + peerTask := p.GetTask() + if peerTask == nil { + return nil, status.Error(codes.InvalidArgument, "request is invalid and doesn't contain a task") + } + taskID := peerTask.GetId() + peerID := p.GetId() + download := &commonv2.Download{ + PieceLength: &peerTask.PieceLength, + Digest: peerTask.Digest, + Url: peerTask.GetUrl(), + Tag: peerTask.Tag, + Application: peerTask.Application, + Type: peerTask.GetType(), + FilteredQueryParams: peerTask.GetFilteredQueryParams(), + RequestHeader: peerTask.GetRequestHeader(), + Priority: p.GetPriority(), + Range: p.GetRange(), + } + + _, task, peer, err := v.handleResource(ctx, nil, hostID, taskID, peerID, download) + if err != nil { + return nil, err + } + + // If the task dag size exceeds the limit, + // then set the peer state to PeerStateLeave. + if task.PeerCount() > resource.PeerCountLimitForTask || task.FSM.Is(resource.TaskEventLeave) { + peer.Log.Info("task dag size exceeds the limit, causing the peer to leave") + if err := peer.FSM.Event(context.Background(), resource.PeerEventLeave); err != nil { + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) + return nil, status.Error(codes.Internal, err.Error()) + } + + v.resource.PeerManager().Delete(peer.ID) + peer.Log.Info("peer has been reclaimed") + continue + } + + // If the task state is not TaskStateSucceeded, + // advance the task state to TaskStateSucceeded. + if !task.FSM.Is(resource.TaskStateSucceeded) { + if task.FSM.Can(resource.TaskEventDownload) { + task.FSM.SetState(resource.TaskStateRunning) + } + + // Construct piece. + for _, pieceInfo := range p.Pieces { + piece := &resource.Piece{ + Number: int32(pieceInfo.GetNumber()), + ParentID: pieceInfo.GetParentId(), + Offset: pieceInfo.GetOffset(), + Length: pieceInfo.GetLength(), + TrafficType: commonv2.TrafficType_LOCAL_PEER, + Cost: 0, + CreatedAt: time.Now(), + } + + if len(pieceInfo.GetDigest()) > 0 { + d, err := digest.Parse(pieceInfo.GetDigest()) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, err.Error()) + } + + piece.Digest = d + } + + // Update peer.UpdatedAt to prevent the peer from being GC. + peer.StorePiece(piece) + peer.FinishedPieces.Set(uint(piece.Number)) + peer.AppendPieceCost(piece.Cost) + peer.PieceUpdatedAt.Store(time.Now()) + peer.UpdatedAt.Store(time.Now()) + + // Handle the task. + task.StorePiece(piece) + task.UpdatedAt.Store(time.Now()) + } + + if peer.Range == nil && !peer.Task.FSM.Is(resource.TaskStateSucceeded) { + peer.Task.ContentLength.Store(int64(len(p.Pieces))) + peer.Task.TotalPieceCount.Store(int32(task.ContentLength.Load())) + peer.Task.FSM.SetState(resource.TaskStateSucceeded) + } + } + + // If the peer state is not PeerStateSucceeded, + // advance the peer state to PeerStateSucceeded. + if !peer.FSM.Is(resource.PeerStateSucceeded) { + peer.FSM.SetState(resource.PeerStateSucceeded) + } + + peers = append(peers, peer) + } + + return peers, nil +} diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go index eeef14cc30a..6574c19c249 100644 --- a/scheduler/service/service_v2_test.go +++ b/scheduler/service/service_v2_test.go @@ -44,6 +44,7 @@ import ( schedulerv2mocks "d7y.io/api/v2/pkg/apis/scheduler/v2/mocks" managertypes "d7y.io/dragonfly/v2/manager/types" + "d7y.io/dragonfly/v2/pkg/idgen" nethttp "d7y.io/dragonfly/v2/pkg/net/http" pkgtypes "d7y.io/dragonfly/v2/pkg/types" "d7y.io/dragonfly/v2/scheduler/config" @@ -3098,7 +3099,7 @@ func TestServiceV2_handleResource(t *testing.T) { assert.Equal(peer.Priority, download.Priority) assert.Equal(peer.Range.Start, int64(download.Range.Start)) assert.Equal(peer.Range.Length, int64(download.Range.Length)) - assert.NotNil(peer.AnnouncePeerStream) + assert.NotNil(peer.AnnouncePeerStream.Load()) assert.EqualValues(peer.Host, mockHost) assert.EqualValues(peer.Task, mockTask) }, @@ -3407,3 +3408,263 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) { }) } } + +func TestServiceV2_handleAnnouncePeersRequest(t *testing.T) { + tests := []struct { + name string + request *schedulerv2.AnnouncePeersRequest + run func(t *testing.T, svc *V2, request *schedulerv2.AnnouncePeersRequest, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) + }{ + { + name: "task and host exist in scheduler, peer does not", + request: &schedulerv2.AnnouncePeersRequest{ + Peers: []*commonv2.Peer{ + { + Id: mockPeerID, + Pieces: []*commonv2.Piece{ + { + Number: uint32(mockPiece.Number), + ParentId: &mockPiece.ParentID, + Offset: mockPiece.Offset, + Length: mockPiece.Length, + Digest: mockPiece.Digest.String(), + TrafficType: &mockPiece.TrafficType, + Cost: durationpb.New(mockPiece.Cost), + CreatedAt: timestamppb.New(mockPiece.CreatedAt), + }, + }, + Task: &commonv2.Task{ + Id: mockTaskID, + PieceLength: uint64(mockTaskPieceLength), + ContentLength: uint64(1024), + }, + Host: &commonv2.Host{ + Id: mockHostID, + }, + }, + }, + }, + run: func(t *testing.T, svc *V2, request *schedulerv2.AnnouncePeersRequest, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(mockTask.ID)).Return(mockTask, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(mockPeer.ID)).Return(nil, false).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Store(gomock.Any()).Return().Times(1), + ) + + assert := assert.New(t) + peers, err := svc.handleAnnouncePeersRequest(context.Background(), request) + assert.NoError(err) + peer := peers[0] + assert.Equal(peer.ID, mockPeer.ID) + assert.Nil(peer.AnnouncePeerStream.Load()) + assert.True(peer.FSM.Is(resource.PeerStateSucceeded)) + assert.True(peer.Task.FSM.Is(resource.PeerStateSucceeded)) + assert.EqualValues(peer.Host, mockHost) + assert.EqualValues(peer.Task, mockTask) + piece, _ := peer.Pieces.Load(mockPiece.Number) + assert.EqualValues(piece.(*resource.Piece).Digest, mockPiece.Digest) + }, + }, + { + name: "invalid request with no task", + request: &schedulerv2.AnnouncePeersRequest{ + Peers: []*commonv2.Peer{ + { + Id: mockPeerID, + Pieces: []*commonv2.Piece{ + { + Number: uint32(mockPiece.Number), + ParentId: &mockPiece.ParentID, + Offset: mockPiece.Offset, + Length: mockPiece.Length, + Digest: mockPiece.Digest.String(), + TrafficType: &mockPiece.TrafficType, + Cost: durationpb.New(mockPiece.Cost), + CreatedAt: timestamppb.New(mockPiece.CreatedAt), + }, + }, + Host: &commonv2.Host{ + Id: mockHostID, + }, + }, + }, + }, + run: func(t *testing.T, svc *V2, request *schedulerv2.AnnouncePeersRequest, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + + assert := assert.New(t) + _, err := svc.handleAnnouncePeersRequest(context.Background(), request) + assert.ErrorIs(err, status.Error(codes.InvalidArgument, "request is invalid and doesn't contain a task")) + }, + }, + { + name: "host does not exist in scheduler", + request: &schedulerv2.AnnouncePeersRequest{ + Peers: []*commonv2.Peer{ + { + Id: mockPeerID, + Task: &commonv2.Task{ + Id: mockTaskID, + PieceLength: uint64(mockTaskPieceLength), + ContentLength: uint64(1024), + }, + Host: &commonv2.Host{ + Id: mockHostID, + }, + }, + }, + }, + run: func(t *testing.T, svc *V2, request *schedulerv2.AnnouncePeersRequest, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockHost.ID)).Return(nil, false).Times(1), + ) + + assert := assert.New(t) + _, err := svc.handleAnnouncePeersRequest(context.Background(), request) + assert.ErrorIs(err, status.Errorf(codes.NotFound, "host %s not found", mockHost.ID)) + }, + }, + { + name: "task dag size exceeds PeerCountLimitForTask", + request: &schedulerv2.AnnouncePeersRequest{ + Peers: []*commonv2.Peer{ + { + Id: mockPeerID, + Pieces: []*commonv2.Piece{ + { + Number: uint32(mockPiece.Number), + ParentId: &mockPiece.ParentID, + Offset: mockPiece.Offset, + Length: mockPiece.Length, + Digest: mockPiece.Digest.String(), + TrafficType: &mockPiece.TrafficType, + Cost: durationpb.New(mockPiece.Cost), + CreatedAt: timestamppb.New(mockPiece.CreatedAt), + }, + }, + Task: &commonv2.Task{ + Id: mockTaskID, + PieceLength: uint64(mockTaskPieceLength), + ContentLength: uint64(1024), + }, + Host: &commonv2.Host{ + Id: mockHostID, + }, + }, + }, + }, + run: func(t *testing.T, svc *V2, request *schedulerv2.AnnouncePeersRequest, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(mockTask.ID)).Return(mockTask, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(mockPeer.ID)).Return(nil, false).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Store(gomock.Any()).Return().Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Delete(gomock.Eq(mockPeer.ID)).Return().Times(1), + mp.Load(gomock.Eq(mockPeer.ID)).Return(nil, false).Times(1), + ) + for i := 0; i < resource.PeerCountLimitForTask+1; i++ { + peer := resource.NewPeer(idgen.PeerIDV1("127.0.0.1"), mockResourceConfig, mockTask, mockHost) + mockTask.StorePeer(peer) + } + + assert := assert.New(t) + _, err := svc.handleAnnouncePeersRequest(context.Background(), request) + assert.NoError(err) + _, loaded := peerManager.Load(mockPeer.ID) + assert.Equal(loaded, false) + }, + }, + { + name: "construct piece fails due to invalid digest", + request: &schedulerv2.AnnouncePeersRequest{ + Peers: []*commonv2.Peer{ + { + Id: mockPeerID, + Pieces: []*commonv2.Piece{ + { + Number: uint32(mockPiece.Number), + ParentId: &mockPiece.ParentID, + Offset: mockPiece.Offset, + Length: mockPiece.Length, + Digest: mockPiece.Digest.String() + ":", + TrafficType: &mockPiece.TrafficType, + Cost: durationpb.New(mockPiece.Cost), + CreatedAt: timestamppb.New(mockPiece.CreatedAt), + }, + }, + Task: &commonv2.Task{ + Id: mockTaskID, + PieceLength: uint64(mockTaskPieceLength), + ContentLength: uint64(1024), + }, + Host: &commonv2.Host{ + Id: mockHostID, + }, + }, + }, + }, + run: func(t *testing.T, svc *V2, request *schedulerv2.AnnouncePeersRequest, mockHost *resource.Host, mockTask *resource.Task, mockPeer *resource.Peer, + hostManager resource.HostManager, taskManager resource.TaskManager, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, + mt *resource.MockTaskManagerMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockHost.ID)).Return(mockHost, true).Times(1), + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Eq(mockTask.ID)).Return(mockTask, true).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(mockPeer.ID)).Return(nil, false).Times(1), + mr.PeerManager().Return(peerManager).Times(1), + mp.Store(gomock.Any()).Return().Times(1), + ) + + assert := assert.New(t) + _, err := svc.handleAnnouncePeersRequest(context.Background(), request) + assert.ErrorIs(err, status.Errorf(codes.InvalidArgument, "invalid digest")) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + scheduling := schedulingmocks.NewMockScheduling(ctl) + res := resource.NewMockResource(ctl) + dynconfig := configmocks.NewMockDynconfigInterface(ctl) + storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) + hostManager := resource.NewMockHostManager(ctl) + taskManager := resource.NewMockTaskManager(ctl) + peerManager := resource.NewMockPeerManager(ctl) + + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockPeer := resource.NewPeer(mockPeerID, mockResourceConfig, mockTask, mockHost) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) + + tc.run(t, svc, tc.request, mockHost, mockTask, mockPeer, hostManager, taskManager, peerManager, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT()) + }) + } +}