From 85c5cc0c2213a3ef304c02608eca61a9f933c390 Mon Sep 17 00:00:00 2001 From: Erik Heggenhougen Date: Mon, 6 Oct 2025 09:08:49 +0200 Subject: [PATCH 1/3] datahub-issue-219: Add reset-metadata operate command --- internal/jobs/operate.go | 16 +++++++++------- pkg/api/job_operate.go | 9 ++++++++- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/internal/jobs/operate.go b/internal/jobs/operate.go index 947a1b2..ac7046c 100644 --- a/internal/jobs/operate.go +++ b/internal/jobs/operate.go @@ -17,13 +17,14 @@ package jobs import ( "context" "fmt" + "os" + "time" + "github.com/mimiro-io/datahub-cli/internal/login" "github.com/mimiro-io/datahub-cli/internal/utils" "github.com/mimiro-io/datahub-cli/pkg/api" "github.com/rotisserie/eris" "golang.org/x/sync/errgroup" - "os" - "time" "github.com/pterm/pterm" "github.com/spf13/cobra" @@ -41,8 +42,8 @@ func CmdOperate() *cobra.Command { cmd := &cobra.Command{ Use: "operate", - Short: "Run, stop, pause, resume and kill jobs with given job id", - Long: `Run, stop, pause, resume and kill jobs with given job id, For example: + Short: "Run, stop, pause, resume, reset-metadata and kill jobs with given job id", + Long: `Run, stop, pause, resume, reset-metadata and kill jobs with given job id, For example: mim jobs operate -i -o stop mim jobs operate --id --operation stop @@ -126,7 +127,7 @@ func CmdOperate() *cobra.Command { cmd.Flags().StringVarP(&jobType, "jobType", "t", "", "Job type for operation run: fullsync or incremental") cmd.Flags().BoolVarP(&wait, "wait", "w", false, "Use together with run operation to wait for the job to finish. When set you can only have 1 job id.") _ = cmd.RegisterFlagCompletionFunc("operation", func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { - return []string{"run", "stop", "pause", "resume", "kill", "reset"}, cobra.ShellCompDirectiveDefault + return []string{"run", "stop", "pause", "resume", "kill", "reset", "reset-metadata"}, cobra.ShellCompDirectiveDefault }) _ = cmd.RegisterFlagCompletionFunc("jobType", func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { return []string{"fullsync", "incremental"}, cobra.ShellCompDirectiveDefault @@ -157,9 +158,10 @@ func operate(jm *api.JobManager, operation string, id api.JobId, since string, j _, err = jm.Operate.Run(ctx, id.Id, jobType) } case "test": - _, err = jm.Operate.Test(ctx, id.Id) + case "reset-metadata": + _, err = jm.Operate.ResetMetadata(ctx, id.Id) default: - pterm.Warning.Println("Unsupported operation! Supported is run, pause, resume, reset and kill.") + pterm.Warning.Println("Unsupported operation! Supported is run, pause, resume, reset, kill and reset-metadata.") } return err } diff --git a/pkg/api/job_operate.go b/pkg/api/job_operate.go index eadd767..6112a0b 100644 --- a/pkg/api/job_operate.go +++ b/pkg/api/job_operate.go @@ -16,7 +16,7 @@ type JobOperationResponse struct { JobId string `json:"jobId"` } -// NewJobOperation will return a new JobOperation service. If you are already using the JobManager, then +// NewJobOperation will return a new JobOperation service. If you are already using the Jobe, then // this will have been initiated for you already, and you can call jm.Operate.XX. // Note that all methods take an ignored context, these should be refactored later together with a rework of the // http clients, as they are a bit all over the place. @@ -62,3 +62,10 @@ func (o *JobOperation) Run(_ context.Context, jobId string, jobType string) (Job func (o *JobOperation) Test(_ context.Context, jobId string) (JobOperationResponse, error) { return web.Put[JobOperationResponse](o.server, o.token, fmt.Sprintf("/job/%s/testx", jobId)) } + +// Reset will reset the since tokens on a job, running the job from the start of the dataset +func (o *JobOperation) ResetMetadata(_ context.Context, jobId string) (JobOperationResponse, error) { + endpoint := fmt.Sprintf("/job/%s/reset_meta", jobId) + + return web.Put[JobOperationResponse](o.server, o.token, endpoint) +} From e399e1998f9d4517d3bdf98b0650f53692b003e0 Mon Sep 17 00:00:00 2001 From: Erik Heggenhougen Date: Mon, 6 Oct 2025 09:10:15 +0200 Subject: [PATCH 2/3] datahub-issue-219: Fix typo --- pkg/api/job_operate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/api/job_operate.go b/pkg/api/job_operate.go index 6112a0b..0cb9da3 100644 --- a/pkg/api/job_operate.go +++ b/pkg/api/job_operate.go @@ -16,7 +16,7 @@ type JobOperationResponse struct { JobId string `json:"jobId"` } -// NewJobOperation will return a new JobOperation service. If you are already using the Jobe, then +// NewJobOperation will return a new JobOperation service. If you are already using the JobManager, then // this will have been initiated for you already, and you can call jm.Operate.XX. // Note that all methods take an ignored context, these should be refactored later together with a rework of the // http clients, as they are a bit all over the place. From 82553dba730d3be52b51cfe1540e3cb9c452a9e3 Mon Sep 17 00:00:00 2001 From: Erik Heggenhougen Date: Mon, 6 Oct 2025 09:15:34 +0200 Subject: [PATCH 3/3] datahub-issue-219: Add back the test case --- internal/jobs/operate.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/jobs/operate.go b/internal/jobs/operate.go index ac7046c..91440b4 100644 --- a/internal/jobs/operate.go +++ b/internal/jobs/operate.go @@ -158,6 +158,7 @@ func operate(jm *api.JobManager, operation string, id api.JobId, since string, j _, err = jm.Operate.Run(ctx, id.Id, jobType) } case "test": + _, err = jm.Operate.Test(ctx, id.Id) case "reset-metadata": _, err = jm.Operate.ResetMetadata(ctx, id.Id) default: