Skip to content

Commit

Permalink
feat: support searching task by url for GetTask and DeleteTask (#3607)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi authored Oct 28, 2024
1 parent bad6972 commit 5a51a61
Show file tree
Hide file tree
Showing 21 changed files with 170 additions and 170 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/compatibility-e2e-v2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ jobs:
chart-name: scheduler
- module: client
image: client
image-tag: v0.1.113
image-tag: v0.1.115
chart-name: client
- module: seed-client
image: client
image-tag: v0.1.113
image-tag: v0.1.115
chart-name: seed-client

steps:
Expand Down
1 change: 0 additions & 1 deletion internal/job/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
type PreheatRequest struct {
URL string `json:"url" validate:"required,url"`
Tag string `json:"tag" validate:"omitempty"`
Digest string `json:"digest" validate:"omitempty"`
FilteredQueryParams string `json:"filtered_query_params" validate:"omitempty"`
Headers map[string]string `json:"headers" validate:"omitempty"`
Application string `json:"application" validate:"omitempty"`
Expand Down
10 changes: 10 additions & 0 deletions manager/handlers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ func (h *Handlers) CreateJob(ctx *gin.Context) {
return
}

if json.Args.TaskID == "" && json.Args.URL == "" {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": "invalid params: task_id or url is required"})
return
}

job, err := h.service.CreateGetTaskJob(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err) // nolint: errcheck
Expand All @@ -84,6 +89,11 @@ func (h *Handlers) CreateJob(ctx *gin.Context) {
return
}

if json.Args.TaskID == "" && json.Args.URL == "" {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": "invalid params: task_id or url is required"})
return
}

job, err := h.service.CreateDeleteTaskJob(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err) // nolint: errcheck
Expand Down
14 changes: 12 additions & 2 deletions manager/handlers/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,19 @@ var (
{
"type": "get_task",
"user_id": 4,
"bio": "bio"
"bio": "bio",
"args": {
"task_id": "7575d21d69495905a4709bf4e10d0e5cffcf7fd1e76e93171e0ef6e0abcf07a8"
}
}`
mockDeleteTaskJobReqBody = `
{
"type": "delete_task",
"user_id": 4,
"bio": "bio"
"bio": "bio",
"args": {
"task_id": "04a29122b0c4d0affde2d577fb36bb956caa3da10e9130375623c24a5f865a49"
}
}`
mockOtherJobReqBody = `
{
Expand All @@ -66,11 +72,15 @@ var (
UserID: 4,
Type: "get_task",
BIO: "bio",
Args: types.GetTaskArgs{TaskID: "7575d21d69495905a4709bf4e10d0e5cffcf7fd1e76e93171e0ef6e0abcf07a8"},
}
mockCreateDeleteTaskJobRequest = types.CreateDeleteTaskJobRequest{
UserID: 4,
Type: "delete_task",
BIO: "bio",
Args: types.DeleteTaskArgs{
TaskID: "04a29122b0c4d0affde2d577fb36bb956caa3da10e9130375623c24a5f865a49",
},
}
mockUpdateJobRequest = types.UpdateJobRequest{
UserID: 4,
Expand Down
20 changes: 18 additions & 2 deletions manager/job/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"d7y.io/dragonfly/v2/manager/config"
"d7y.io/dragonfly/v2/manager/models"
"d7y.io/dragonfly/v2/manager/types"
"d7y.io/dragonfly/v2/pkg/idgen"
)

// Task is an interface for manager tasks.
Expand Down Expand Up @@ -60,7 +61,14 @@ func (t *task) CreateGetTask(ctx context.Context, schedulers []models.Scheduler,
span.SetAttributes(config.AttributeGetTaskID.String(json.TaskID))
defer span.End()

args, err := internaljob.MarshalRequest(json)
taskID := json.TaskID
if json.URL != "" {
taskID = idgen.TaskIDV2(json.URL, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams))
}

args, err := internaljob.MarshalRequest(internaljob.GetTaskRequest{
TaskID: taskID,
})
if err != nil {
logger.Errorf("get tasks marshal request: %v, error: %v", args, err)
return nil, err
Expand Down Expand Up @@ -111,7 +119,15 @@ func (t *task) CreateDeleteTask(ctx context.Context, schedulers []models.Schedul
span.SetAttributes(config.AttributeDeleteTaskID.String(json.TaskID))
defer span.End()

args, err := internaljob.MarshalRequest(json)
taskID := json.TaskID
if json.URL != "" {
taskID = idgen.TaskIDV2(json.URL, json.Tag, json.Application, idgen.ParseFilteredQueryParams(json.FilteredQueryParams))
}

args, err := internaljob.MarshalRequest(internaljob.DeleteTaskRequest{
TaskID: taskID,
Timeout: json.Timeout,
})
if err != nil {
logger.Errorf("delete task marshal request: %v, error: %v", args, err)
return nil, err
Expand Down
29 changes: 27 additions & 2 deletions manager/types/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,20 @@ type CreateGetTaskJobRequest struct {
}

type GetTaskArgs struct {
TaskID string `json:"task_id" binding:"required"`
// TaskID is the task id for getting.
TaskID string `json:"task_id" binding:"omitempty"`

// URL is the download url of the task.
URL string `json:"url" binding:"omitempty"`

// Tag is the tag of the task.
Tag string `json:"tag" binding:"omitempty"`

// Application is the application of the task.
Application string `json:"application" binding:"omitempty"`

// FilteredQueryParams is the filtered query params of the task.
FilteredQueryParams string `json:"filtered_query_params" binding:"omitempty"`
}

type CreateDeleteTaskJobRequest struct {
Expand All @@ -125,7 +138,19 @@ type CreateDeleteTaskJobRequest struct {

type DeleteTaskArgs struct {
// TaskID is the task id for deleting.
TaskID string `json:"task_id" binding:"required"`
TaskID string `json:"task_id" binding:"omitempty"`

// URL is the download url of the task.
URL string `json:"url" binding:"omitempty"`

// Tag is the tag of the task.
Tag string `json:"tag" binding:"omitempty"`

// Application is the application of the task.
Application string `json:"application" binding:"omitempty"`

// FilteredQueryParams is the filtered query params of the task.
FilteredQueryParams string `json:"filtered_query_params" binding:"omitempty"`

// Timeout is the timeout for deleting, default is 30 minutes.
Timeout time.Duration `json:"timeout" binding:"omitempty"`
Expand Down
10 changes: 5 additions & 5 deletions pkg/idgen/task_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func taskIDV1(url string, meta *commonv1.UrlMeta, ignoreRange bool) string {
return pkgdigest.SHA256FromStrings(url)
}

filteredQueryParams := parseFilteredQueryParams(meta.Filter)
filteredQueryParams := ParseFilteredQueryParams(meta.Filter)

var (
u string
Expand Down Expand Up @@ -81,8 +81,8 @@ func taskIDV1(url string, meta *commonv1.UrlMeta, ignoreRange bool) string {
return pkgdigest.SHA256FromStrings(data...)
}

// parseFilteredQueryParams parses filtered query params.
func parseFilteredQueryParams(rawFilteredQueryParams string) []string {
// ParseFilteredQueryParams parses filtered query params.
func ParseFilteredQueryParams(rawFilteredQueryParams string) []string {
if pkgstrings.IsBlank(rawFilteredQueryParams) {
return nil
}
Expand All @@ -91,11 +91,11 @@ func parseFilteredQueryParams(rawFilteredQueryParams string) []string {
}

// TaskIDV2 generates v2 version of task id.
func TaskIDV2(url, digest, tag, application string, filteredQueryParams []string) string {
func TaskIDV2(url, tag, application string, filteredQueryParams []string) string {
url, err := neturl.FilterQueryParams(url, filteredQueryParams)
if err != nil {
url = ""
}

return pkgdigest.SHA256FromStrings(url, digest, tag, application)
return pkgdigest.SHA256FromStrings(url, tag, application)
}
15 changes: 2 additions & 13 deletions pkg/idgen/task_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ func TestTaskIDV2(t *testing.T) {
tests := []struct {
name string
url string
digest string
tag string
application string
filters []string
Expand All @@ -119,22 +118,12 @@ func TestTaskIDV2(t *testing.T) {
{
name: "generate taskID",
url: "https://example.com",
digest: "sha256:c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4",
tag: "foo",
application: "bar",
filters: []string{},
expect: func(t *testing.T, d any) {
assert := assert.New(t)
assert.Equal(d, "c8659b8372599cf22c7a2de260dd6e148fca6d4e1c2940703022867f739d071d")
},
},
{
name: "generate taskID with digest",
url: "https://example.com",
digest: "sha256:c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4",
expect: func(t *testing.T, d any) {
assert := assert.New(t)
assert.Equal(d, "60469c583429af631a45540f05e08805b31ca4f84e7974cad35cfc84c197bcf8")
assert.Equal(d, "160fa7f001d9d2e893130894fbb60a5fb006e1d61bff82955f2946582bc9de1d")
},
},
{
Expand Down Expand Up @@ -168,7 +157,7 @@ func TestTaskIDV2(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tc.expect(t, TaskIDV2(tc.url, tc.digest, tc.tag, tc.application, tc.filters))
tc.expect(t, TaskIDV2(tc.url, tc.tag, tc.application, tc.filters))
})
}
}
5 changes: 1 addition & 4 deletions scheduler/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (j *job) preheat(ctx context.Context, data string) (string, error) {
return "", err
}

taskID := idgen.TaskIDV2(req.URL, req.Digest, req.Tag, req.Application, strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator))
taskID := idgen.TaskIDV2(req.URL, req.Tag, req.Application, strings.Split(req.FilteredQueryParams, idgen.FilteredQueryParamsSeparator))
log := logger.WithTask(taskID, req.URL)
log.Infof("preheat %s request: %#v", req.URL, req)

Expand Down Expand Up @@ -279,7 +279,6 @@ func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internalj
taskID,
&dfdaemonv2.DownloadTaskRequest{Download: &commonv2.Download{
Url: req.URL,
Digest: &req.Digest,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Expand Down Expand Up @@ -376,7 +375,6 @@ func (j *job) preheatAllPeers(ctx context.Context, taskID string, req *internalj
// preheatV1 preheats job by v1 grpc protocol.
func (j *job) preheatV1(ctx context.Context, taskID string, req *internaljob.PreheatRequest, log *logger.SugaredLoggerOnWith) (*internaljob.PreheatResponse, error) {
urlMeta := &commonv1.UrlMeta{
Digest: req.Digest,
Tag: req.Tag,
Filter: req.FilteredQueryParams,
Header: req.Headers,
Expand Down Expand Up @@ -424,7 +422,6 @@ func (j *job) preheatV2(ctx context.Context, taskID string, req *internaljob.Pre
stream, err := j.resource.SeedPeer().Client().DownloadTask(ctx, taskID, &dfdaemonv2.DownloadTaskRequest{
Download: &commonv2.Download{
Url: req.URL,
Digest: &req.Digest,
Type: commonv2.TaskType_STANDARD,
Tag: &req.Tag,
Application: &req.Application,
Expand Down
2 changes: 1 addition & 1 deletion scheduler/resource/standard/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var (

mockTaskBackToSourceLimit int32 = 200
mockTaskURL = "http://example.com/foo"
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4")
mockTaskTag = "d7y"
mockTaskApplication = "foo"
Expand Down
2 changes: 1 addition & 1 deletion scheduler/scheduling/evaluator/evaluator_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ var (

mockTaskBackToSourceLimit int32 = 200
mockTaskURL = "http://example.com/foo"
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4")
mockTaskTag = "d7y"
mockTaskApplication = "foo"
Expand Down
2 changes: 1 addition & 1 deletion scheduler/scheduling/scheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ var (

mockTaskBackToSourceLimit int32 = 200
mockTaskURL = "http://example.com/foo"
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4")
mockTaskTag = "d7y"
mockTaskApplication = "foo"
Expand Down
2 changes: 1 addition & 1 deletion scheduler/service/service_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ var (

mockTaskBackToSourceLimit int32 = 200
mockTaskURL = "http://example.com/foo"
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskDigest.String(), mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskID = idgen.TaskIDV2(mockTaskURL, mockTaskTag, mockTaskApplication, mockTaskFilteredQueryParams)
mockTaskDigest = digest.New(digest.AlgorithmSHA256, "c71d239df91726fc519c6eb72d318ec65820627232b2f796219e87dcf35d0ab4")
mockTaskTag = "d7y"
mockTaskApplication = "foo"
Expand Down
8 changes: 4 additions & 4 deletions test/e2e/v2/concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var _ = Describe("Download Concurrency", func() {
Expect(err).NotTo(HaveOccurred())

fileMetadata := util.FileMetadata{
ID: "de7d72a4f865bb1b1d3a9b7288bfd369a500277f5565736b2ba67aa205958df7",
ID: "14b31801ea6990788057b965fbc51e44bf73800462915fdfa0fda8182acca4d6",
Sha256: "fc44bbbba20490450c73530db3d1b935f893f38d7d8084ca132952a765ff5ff6",
}

Expand Down Expand Up @@ -67,7 +67,7 @@ var _ = Describe("Download Concurrency", func() {
Expect(err).NotTo(HaveOccurred())

fileMetadata := util.FileMetadata{
ID: "510f018dc34c7e6ced07db2e88654a4e565e7982d5c73994e48e901f633c8113",
ID: "958e177b56be708c9d7ec193ae8cef399b39faff8234af33efa4cbe097d1fc5f",
Sha256: "dc102987a36be20846821ac74648534863ff0fe8897d4250273a6ffc80481d91",
}

Expand Down Expand Up @@ -97,7 +97,7 @@ var _ = Describe("Download Concurrency", func() {
Expect(err).NotTo(HaveOccurred())

fileMetadata := util.FileMetadata{
ID: "381ee3f1dd0b55d151997e107e5517e4ac315677a4ed67c3cd814fe7b86481d1",
ID: "dd573cf9c3e1a79402b8423abcd1ba987c1b1ee9c49069d139d71106a260b055",
Sha256: "54e54b7ff54ef70d4db2adcd24a27e3b9af3cd99fc0213983bac1e8035429be6",
}

Expand Down Expand Up @@ -127,7 +127,7 @@ var _ = Describe("Download Concurrency", func() {
Expect(err).NotTo(HaveOccurred())

fileMetadata := util.FileMetadata{
ID: "0068ce9e9beaca3ec33911d537be56de2d12e1b201bf3230aefe803919c373a5",
ID: "f1957adc26ec326800ced850d72e583a03be0999ba80d9aa2e3ba57ef4ddaf17",
Sha256: "87c09b7c338f258809ca2d436bbe06ac94a3166b3f3e1125a86f35d9a9aa1d2f",
}

Expand Down
Loading

0 comments on commit 5a51a61

Please sign in to comment.