Skip to content

Commit

Permalink
feat: optimize GetTaskJob and DeleteTaskJob (#3434)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi authored Aug 16, 2024
1 parent 8516147 commit 517aed7
Show file tree
Hide file tree
Showing 34 changed files with 354 additions and 589 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/compatibility-e2e-v2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@ jobs:
include:
- module: manager
image: manager
image-tag: v2.1.53
image-tag: v2.1.55
chart-name: manager
- module: scheduler
image: scheduler
image-tag: v2.1.53
image-tag: v2.1.55
chart-name: scheduler
- module: client
image: client
image-tag: v0.1.96
image-tag: v0.1.100
chart-name: client
- module: seed-client
image: client
image-tag: v0.1.96
image-tag: v0.1.100
chart-name: seed-client

steps:
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.21

require (
d7y.io/api/v2 v2.0.142
d7y.io/api/v2 v2.0.148
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.8
github.com/Showmax/go-fqdn v1.0.0
Expand Down Expand Up @@ -129,7 +129,7 @@ require (
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/go-metrics v0.0.1 // indirect
github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1 // indirect
github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect
github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.4 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
d7y.io/api/v2 v2.0.142 h1:u1gQZwCKJODdJB883J9e5SRK9VmOTmApX13FX/bR0Vk=
d7y.io/api/v2 v2.0.142/go.mod h1:IakrltEphFvcLIQs3NVeb9PAe66MGDnd2/HMs9DKYu8=
d7y.io/api/v2 v2.0.148 h1:11waj+EuaHdx95Fkr3hXJJckNGw9Hu5U0ohtCbpIirw=
d7y.io/api/v2 v2.0.148/go.mod h1:hyEaaIglThVWRHODk2yHN/tpa1L+/nPgdQFwPsL6fNc=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down Expand Up @@ -434,8 +434,8 @@ github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5y
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A=
github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew=
github.com/envoyproxy/protoc-gen-validate v1.1.0 h1:tntQDh69XqOCOZsDz0lVJQez/2L6Uu2PdjCQwWCJ3bM=
github.com/envoyproxy/protoc-gen-validate v1.1.0/go.mod h1:sXRDRVmzEbkM7CVcM06s9shE/m23dg3wzjl0UWqJ2q4=
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
Expand Down
6 changes: 3 additions & 3 deletions internal/job/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ const (
// SyncPeersJob is the name of syncing peers job.
SyncPeersJob = "sync_peers"

// ListTasksJob is the name of listing tasks job.
ListTasksJob = "list_tasks"
// GetTaskJob is the name of getting task job.
GetTaskJob = "get_task"

// DeleteTasksJob is the name of deleting tasks job.
// DeleteTaskJob is the name of deleting task job.
DeleteTaskJob = "delete_task"
)

Expand Down
31 changes: 15 additions & 16 deletions internal/job/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package job

import "d7y.io/dragonfly/v2/scheduler/resource"

// PreheatRequest defines the request parameters for preheating.
type PreheatRequest struct {
URL string `json:"url" validate:"required,url"`
Tag string `json:"tag" validate:"omitempty"`
Expand All @@ -26,20 +27,20 @@ type PreheatRequest struct {
Headers map[string]string `json:"headers" validate:"omitempty"`
Application string `json:"application" validate:"omitempty"`
Priority int32 `json:"priority" validate:"omitempty"`
PieceLength uint32 `json:"pieceLength" validate:"omitempty"`
}

// PreheatResponse defines the response parameters for preheating.
type PreheatResponse struct {
TaskID string `json:"taskID"`
TaskID string `json:"task_id"`
}

// ListTasksRequest defines the request parameters for listing tasks.
type ListTasksRequest struct {
// GetTaskRequest defines the request parameters for getting task.
type GetTaskRequest struct {
TaskID string `json:"task_id" validate:"required"`
}

// ListTasksResponse defines the response parameters for listing tasks.
type ListTasksResponse struct {
// GetTaskResponse defines the response parameters for getting task.
type GetTaskResponse struct {
Peers []*resource.Peer `json:"peers"`
}

Expand All @@ -48,16 +49,14 @@ type DeleteTaskRequest struct {
TaskID string `json:"task_id" validate:"required"`
}

// Task includes information about a task along with peer details and a description.
type Task struct {
Task *resource.Task `json:"task"`
Peer *resource.Peer `json:"peer"`
Description string `json:"description"`
// DeleteTaskResponse defines the response parameters for deleting task.
type DeleteTaskResponse struct {
SuccessPeers []*DeletePeerResponse `json:"success_peers"`
FailurePeers []*DeletePeerResponse `json:"failure_peers"`
}

// DeleteTaskResponse represents the response after attempting to delete tasks,
// categorizing them into successfully and unsuccessfully deleted.
type DeleteTaskResponse struct {
SuccessTasks []*Task `json:"success_tasks"`
FailureTasks []*Task `json:"failure_tasks"`
// DeletePeerResponse represents the response after attempting to delete a peer.
type DeletePeerResponse struct {
Peer *resource.Peer `json:"peer"`
Description string `json:"description"`
}
11 changes: 0 additions & 11 deletions manager/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,6 @@ type JobConfig struct {

// Sync peers configuration.
SyncPeers SyncPeersConfig `yaml:"syncPeers" mapstructure:"syncPeers"`

// Manager tasks configuration.
ManagerTasks ManagerTasksConfig `yaml:"managerTasks" mapstructure:"managerTasks"`
}

type PreheatConfig struct {
Expand All @@ -318,11 +315,6 @@ type SyncPeersConfig struct {
Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"`
}

type ManagerTasksConfig struct {
// Timeout is the timeout for manager tasks information for the single scheduler.
Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"`
}

type PreheatTLSClientConfig struct {
// CACert is the CA certificate for preheat tls handshake, it can be path or PEM format string.
CACert types.PEMContent `yaml:"caCert" mapstructure:"caCert"`
Expand Down Expand Up @@ -463,9 +455,6 @@ func New() *Config {
Interval: DefaultJobSyncPeersInterval,
Timeout: DefaultJobSyncPeersTimeout,
},
ManagerTasks: ManagerTasksConfig{
Timeout: DefaultJobManagerTasksTimeout,
},
},
ObjectStorage: ObjectStorageConfig{
Enable: false,
Expand Down
14 changes: 6 additions & 8 deletions manager/config/constant_otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ package config
import "go.opentelemetry.io/otel/attribute"

const (
AttributeID = attribute.Key("d7y.manager.id")
AttributePreheatType = attribute.Key("d7y.manager.preheat.type")
AttributePreheatURL = attribute.Key("d7y.manager.preheat.url")
AttributeDeleteTaskID = attribute.Key("d7y.manager.delete_task.id")
AttributeListTasksID = attribute.Key("d7y.manager.list_tasks.id")
AttributeListTasksPage = attribute.Key("d7y.manager.list_tasks.page")
AttributeListTasksPerPage = attribute.Key("d7y.manager.list_tasks.per_page")
AttributeID = attribute.Key("d7y.manager.id")
AttributePreheatType = attribute.Key("d7y.manager.preheat.type")
AttributePreheatURL = attribute.Key("d7y.manager.preheat.url")
AttributeDeleteTaskID = attribute.Key("d7y.manager.delete_task.id")
AttributeGetTaskID = attribute.Key("d7y.manager.get_task.id")
)

const (
Expand All @@ -34,5 +32,5 @@ const (
SpanGetLayers = "get-layers"
SpanAuthWithRegistry = "auth-with-registry"
SpanDeleteTask = "delete-task"
SpanListTasks = "list-tasks"
SpanGetTask = "get-task"
)
3 changes: 0 additions & 3 deletions manager/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,6 @@ const (

// DefaultJobSyncPeersTimeout is the default timeout for syncing all peers information from the scheduler.
DefaultJobSyncPeersTimeout = 10 * time.Minute

// DefaultJobManagerTasksTimeout is the default timeout for manager tasks, for delete task and list tasks.
DefaultJobManagerTasksTimeout = 10 * time.Minute
)

const (
Expand Down
12 changes: 6 additions & 6 deletions manager/handlers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,28 +60,28 @@ func (h *Handlers) CreateJob(ctx *gin.Context) {
}

ctx.JSON(http.StatusOK, job)
case job.DeleteTaskJob:
var json types.CreateDeleteTaskJobRequest
case job.GetTaskJob:
var json types.CreateGetTaskJobRequest
if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}

job, err := h.service.CreateDeleteTaskJob(ctx.Request.Context(), json)
job, err := h.service.CreateGetTaskJob(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err) // nolint: errcheck
return
}

ctx.JSON(http.StatusOK, job)
case job.ListTasksJob:
var json types.CreateListTasksJobRequest
case job.DeleteTaskJob:
var json types.CreateDeleteTaskJobRequest
if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}

job, err := h.service.CreateListTasksJob(ctx.Request.Context(), json)
job, err := h.service.CreateDeleteTaskJob(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err) // nolint: errcheck
return
Expand Down
22 changes: 11 additions & 11 deletions manager/handlers/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ var (
"user_id": 4,
"bio": "bio"
}`
mockListTasksJobReqBody = `
mockGetTaskJobReqBody = `
{
"type": "list_tasks",
"type": "get_task",
"user_id": 4,
"bio": "bio"
}`
Expand All @@ -62,12 +62,12 @@ var (
Type: "preheat",
BIO: "bio",
}
mockListTasksCreateJobRequest = types.CreateListTasksJobRequest{
mockCreateGetTaskJobRequest = types.CreateGetTaskJobRequest{
UserID: 4,
Type: "list_tasks",
Type: "get_task",
BIO: "bio",
}
mockDeleteTaskCreateJobRequest = types.CreateDeleteTaskJobRequest{
mockCreateDeleteTaskJobRequest = types.CreateDeleteTaskJobRequest{
UserID: 4,
Type: "delete_task",
BIO: "bio",
Expand All @@ -83,10 +83,10 @@ var (
BIO: "bio",
TaskID: "2",
}
mockListTasksJobModel = &models.Job{
mockGetTaskJobModel = &models.Job{
BaseModel: mockBaseModel,
UserID: 4,
Type: "list_tasks",
Type: "get_task",
BIO: "bio",
TaskID: "2",
}
Expand Down Expand Up @@ -153,24 +153,24 @@ func TestHandlers_CreateJob(t *testing.T) {
},
{
name: "success",
req: httptest.NewRequest(http.MethodPost, "/oapi/v1/jobs", strings.NewReader(mockListTasksJobReqBody)),
req: httptest.NewRequest(http.MethodPost, "/oapi/v1/jobs", strings.NewReader(mockGetTaskJobReqBody)),
mock: func(ms *mocks.MockServiceMockRecorder) {
ms.CreateListTasksJob(gomock.Any(), gomock.Eq(mockListTasksCreateJobRequest)).Return(mockListTasksJobModel, nil).Times(1)
ms.CreateGetTaskJob(gomock.Any(), gomock.Eq(mockCreateGetTaskJobRequest)).Return(mockGetTaskJobModel, nil).Times(1)
},
expect: func(t *testing.T, w *httptest.ResponseRecorder) {
assert := assert.New(t)
assert.Equal(http.StatusOK, w.Code)
job := models.Job{}
err := json.Unmarshal(w.Body.Bytes(), &job)
assert.NoError(err)
assert.Equal(mockListTasksJobModel, &job)
assert.Equal(mockGetTaskJobModel, &job)
},
},
{
name: "success",
req: httptest.NewRequest(http.MethodPost, "/oapi/v1/jobs", strings.NewReader(mockDeleteTaskJobReqBody)),
mock: func(ms *mocks.MockServiceMockRecorder) {
ms.CreateDeleteTaskJob(gomock.Any(), gomock.Eq(mockDeleteTaskCreateJobRequest)).Return(mockDeleteTaskJobModel, nil).Times(1)
ms.CreateDeleteTaskJob(gomock.Any(), gomock.Eq(mockCreateDeleteTaskJobRequest)).Return(mockDeleteTaskJobModel, nil).Times(1)
},
expect: func(t *testing.T, w *httptest.ResponseRecorder) {
assert := assert.New(t)
Expand Down
19 changes: 9 additions & 10 deletions manager/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Job struct {
*internaljob.Job
Preheat
SyncPeers
ManagerTasks
Task
}

// New returns a new Job.
Expand Down Expand Up @@ -75,13 +75,12 @@ func New(cfg *config.Config, gdb *gorm.DB) (*Job, error) {
return nil, err
}

managerTasks := newManagerTasks(j, cfg.Job.ManagerTasks.Timeout)

task := newTask(j)
return &Job{
Job: j,
Preheat: preheat,
SyncPeers: syncPeers,
ManagerTasks: managerTasks,
Job: j,
Preheat: preheat,
SyncPeers: syncPeers,
Task: task,
}, nil
}

Expand All @@ -96,18 +95,18 @@ func (j *Job) Stop() {
}

// getSchedulerQueues gets scheduler queues.
func getSchedulerQueues(schedulers []models.Scheduler) []internaljob.Queue {
func getSchedulerQueues(schedulers []models.Scheduler) ([]internaljob.Queue, error) {
var queues []internaljob.Queue
for _, scheduler := range schedulers {
queue, err := internaljob.GetSchedulerQueue(scheduler.SchedulerClusterID, scheduler.Hostname)
if err != nil {
continue
return nil, err
}

queues = append(queues, queue)
}

return queues
return queues, nil
}

// getSchedulerQueue gets scheduler queue.
Expand Down
Loading

0 comments on commit 517aed7

Please sign in to comment.