Skip to content

Commit

Permalink
feat(job): finished job api
Browse files Browse the repository at this point in the history
  • Loading branch information
246859 committed Sep 2, 2024
1 parent 4608edf commit 9c6eba3
Show file tree
Hide file tree
Showing 17 changed files with 371 additions and 266 deletions.
1 change: 1 addition & 0 deletions server/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Router struct {
System system.Router
User user.Router
Dst dst.Router
Job job.Router
}

var Provider = wire.NewSet(
Expand Down
4 changes: 2 additions & 2 deletions server/api/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (
"github.com/ginx-contribs/ginx/pkg/resp"
)

func NewJobAPI(jHandler *job.JobHandler) *JobAPI {
func NewJobAPI(jHandler *job.Handler) *JobAPI {
return &JobAPI{jHandler: jHandler}
}

type JobAPI struct {
jHandler *job.JobHandler
jHandler *job.Handler
}

// List
Expand Down
6 changes: 3 additions & 3 deletions server/api/job/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ type Router struct {
}

func NewRouter(root *ginx.RouterGroup, job *JobAPI) Router {
group := root.Group("job")
group := root.Group("/job")
group.GET("/info", job.Info)
group.GET("/list", job.List)
group.GET("/start", job.Start)
group.GET("/stop", job.Stop)
group.POST("/start", job.Start)
group.POST("/stop", job.Stop)

return Router{Job: job}
}
2 changes: 2 additions & 0 deletions server/api/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ definitions:
allOf:
- $ref: '#/definitions/types.Usage'
description: 'verify code usage: 1-register 2-reset password'
maximum: 2
minimum: 1
required:
- usage
type: object
Expand Down
11 changes: 9 additions & 2 deletions server/data/repo/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ type JobRepo struct {
Ent *ent.Client
}

func (j *JobRepo) Clear(ctx context.Context) (int, error) {
return j.Ent.CronJob.Delete().Exec(ctx)
}

// UpsertOne creates a new Job if it is not existing, otherwise it wille update it.
func (j *JobRepo) UpsertOne(ctx context.Context, job *ent.CronJob) error {
first, err := j.Ent.CronJob.Query().Where(cronjob.Name(job.Name)).First(ctx)
Expand All @@ -23,9 +27,8 @@ func (j *JobRepo) UpsertOne(ctx context.Context, job *ent.CronJob) error {
} else if err != nil {
return err
}

_, err = j.Ent.CronJob.UpdateOneID(first.ID).
SetEntryID(job.ID).
SetEntryID(job.EntryID).
SetPrev(job.Prev).
SetNext(job.Next).Save(ctx)
if err != nil {
Expand All @@ -39,6 +42,10 @@ func (j *JobRepo) QueryOne(ctx context.Context, name string) (*ent.CronJob, erro
return j.Ent.CronJob.Query().Where(cronjob.Name(name)).First(ctx)
}

func (j *JobRepo) FindByEntryId(ctx context.Context, ids ...int) ([]*ent.CronJob, error) {
return j.Ent.CronJob.Query().Where(cronjob.EntryIDIn(ids...)).All(ctx)
}

// ListByPage returns a list of jobs
func (j *JobRepo) ListByPage(ctx context.Context, page int, size int, search string) ([]*ent.CronJob, int, error) {
if page <= 0 {
Expand Down
142 changes: 142 additions & 0 deletions server/handler/job/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package job

import (
"context"
"errors"
"github.com/dstgo/lobby/server/data/ent"
"github.com/dstgo/lobby/server/data/repo"
"github.com/dstgo/lobby/server/types"
"github.com/ginx-contribs/ginx/pkg/resp/statuserr"
"github.com/robfig/cron/v3"
)

func NewJobHandler(jobrepo *repo.JobRepo, cronjob *CronJob) *Handler {
return &Handler{jobRepo: jobrepo, cronjob: cronjob}
}

type Handler struct {
jobRepo *repo.JobRepo
cronjob *CronJob
}

// List returns a list of jobs by page
func (h *Handler) List(ctx context.Context, page int, size int, search string) (types.JobPageList, error) {
list, total, err := h.jobRepo.ListByPage(ctx, page, size, search)
if err != nil {
return types.JobPageList{}, statuserr.InternalError(err)
}
var jobs []types.JobInfo
for _, j := range list {
entry := h.cronjob.cron.Entry(cron.EntryID(j.EntryID))
info := types.JobInfo{
Entry: j.EntryID,
Name: j.Name,
Cron: j.Cron,
Prev: j.Prev,
Next: j.Next,
}
// if the entry is already in the future list
if (entry != cron.Entry{}) {
// revise Prev Next timestamp
if entry.Prev.UnixMicro() < 0 {
info.Prev = j.Prev
} else {
info.Prev = entry.Prev.UnixMicro()
}
if entry.Next.UnixMicro() < 0 {
info.Next = j.Next
} else {
info.Next = entry.Next.UnixMicro()
}
}
jobs = append(jobs, info)
}
return types.JobPageList{
Total: total,
List: jobs,
}, err
}

func (h *Handler) Upsert(ctx context.Context, job FutureJob) error {
return h.jobRepo.UpsertOne(ctx, &ent.CronJob{
Name: job.Name(),
Cron: job.Cron(),
EntryID: int(job.ID),
Prev: job.Prev.UnixMicro(),
Next: job.Next.UnixMicro(),
})
}

func (h *Handler) GetOne(ctx context.Context, name string) (types.JobInfo, error) {
one, err := h.jobRepo.QueryOne(ctx, name)
if ent.IsNotFound(err) {
return types.JobInfo{}, types.ErrJobNotFound
} else if err != nil {
return types.JobInfo{}, statuserr.InternalError(err)
}
entry := h.cronjob.cron.Entry(cron.EntryID(one.EntryID))
info := types.JobInfo{
Name: one.Name,
Entry: one.EntryID,
Cron: one.Cron,
Next: one.Next,
Prev: one.Prev,
}
// if the entry is already in the future list
if (entry != cron.Entry{}) {
if entry.Prev.UnixMicro() < 0 {
info.Prev = one.Prev
} else {
info.Prev = entry.Prev.UnixMicro()
}
if entry.Next.UnixMicro() < 0 {
info.Next = one.Next
} else {
info.Next = entry.Next.UnixMicro()
}
}
return info, nil
}

// Stop removes the job from the future scheduled jobs list
func (h *Handler) Stop(ctx context.Context, name string) error {
job, e := h.cronjob.GetJob(name)
if !e {
return errors.New("job not found")
}
// remove the job from the future scheduler
h.cronjob.DelJob(name)
// update information
err := h.jobRepo.UpsertOne(ctx, &ent.CronJob{
Cron: job.Cron(),
// set the entryId = -1 to indicate this job is stopped
EntryID: -1,
Prev: job.Prev.UnixMicro(),
Next: -1,
})
if err != nil {
return err
}
return nil
}

// Start add job into future scheduled jobs list
func (h *Handler) Start(ctx context.Context, name string) error {
err := h.cronjob.ContinueJob(name)
if err != nil {
return err
}
job, _ := h.cronjob.GetJob(name)

// update information
err = h.jobRepo.UpsertOne(ctx, &ent.CronJob{
Cron: job.Cron(),
EntryID: int(job.ID),
Prev: job.Prev.UnixMicro(),
Next: job.Next.UnixMicro(),
})
if err != nil {
return err
}
return nil
}
21 changes: 17 additions & 4 deletions server/jobs/hooks.go → server/handler/job/hooks.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package jobs
package job

import (
"context"
"log/slog"
"time"
)
Expand All @@ -9,9 +10,10 @@ type BeforeHook func(job FutureJob, round int64)

type AfterHook func(job FutureJob, round int64, elapsed time.Duration, err error, attrs ...any)

// LogBefore log job information before job execution
func LogBefore() BeforeHook {
return func(job FutureJob, round int64) {
slog.Info("cron job prepared,",
slog.Info("cron job prepared",
slog.Int64("round", round),
slog.String("name", job.Name()),
slog.Int("entry", int(job.ID)),
Expand All @@ -20,6 +22,7 @@ func LogBefore() BeforeHook {
}
}

// LogAfter log job information before job execution
func LogAfter() AfterHook {
return func(job FutureJob, round int64, elapsed time.Duration, err error, attrs ...any) {
baseAttrs := []any{
Expand All @@ -31,9 +34,19 @@ func LogAfter() AfterHook {
slog.Time("next", job.Next),
}
if err != nil {
slog.Error("cron job failed,", append(baseAttrs, slog.Any("error", err))...)
slog.Error("cron job failed", append(baseAttrs, slog.Any("error", err))...)
} else {
slog.Info("cron job finished,", append(baseAttrs, slog.Group("result", attrs...))...)
slog.Info("cron job finished", append(baseAttrs, slog.Group("result", attrs...))...)
}
}
}

// UpdateBefore update job information in the db
func UpdateBefore(h *Handler) BeforeHook {
return func(job FutureJob, round int64) {
err := h.Upsert(context.Background(), job)
if err != nil {
slog.Error("update job failed", slog.Any("error", err))
}
}
}
Loading

0 comments on commit 9c6eba3

Please sign in to comment.