diff --git a/clients/golang/examples/example_add_job.go b/clients/golang/examples/example_add_job.go index 1d1b4bb..c995c44 100644 --- a/clients/golang/examples/example_add_job.go +++ b/clients/golang/examples/example_add_job.go @@ -1,14 +1,23 @@ package main import ( + "context" "fmt" + "net/http" - "taskforce.sh/bullmq_proxy_client/queue" + "taskforce.sh/bullmq_proxy_client/pkg/queue" ) func main() { - q := queue.NewQueue("ws://localhost:8080/queues/test?token=1234") - + h := make(http.Header) + h.Set("Authorization", "Bearer 1234") + ctx := context.Background() + q, err := queue.NewQueue(ctx, "ws://localhost:8080/queues/test", h) + if err != nil { + fmt.Println("Error Connecting:", err) + return + } + // Sample usage of your library's functionality. jobResponse, err := q.AddJob("testJob", map[string]string{"key": "value"}, nil) if err != nil { diff --git a/clients/golang/go.mod b/clients/golang/go.mod index 2c16cff..b161625 100644 --- a/clients/golang/go.mod +++ b/clients/golang/go.mod @@ -2,4 +2,15 @@ module taskforce.sh/bullmq_proxy_client go 1.21.1 -require github.com/gorilla/websocket v1.5.0 // indirect +require ( + github.com/go-resty/resty/v2 v2.12.0 + github.com/gorilla/websocket v1.5.0 + github.com/stretchr/testify v1.9.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/net v0.22.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/clients/golang/go.sum b/clients/golang/go.sum index e5a03d4..dca9b92 100644 --- a/clients/golang/go.sum +++ b/clients/golang/go.sum @@ -1,2 +1,60 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-resty/resty/v2 v2.12.0 h1:rsVL8P90LFvkUYq/V5BTVe203WfRIU4gvcf+yfzJzGA= +github.com/go-resty/resty/v2 v2.12.0/go.mod h1:o0yGPrkS3lOe1+eFajk6kBW8ScXzwU3hD69/gt2yB/0= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= +golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= +golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/clients/golang/pkg/bull/queue.go b/clients/golang/pkg/bull/queue.go new file mode 100644 index 0000000..0a03df9 --- /dev/null +++ b/clients/golang/pkg/bull/queue.go @@ -0,0 +1,32 @@ +package bull + +import ( + "context" + + "taskforce.sh/bullmq_proxy_client/pkg/client" + "taskforce.sh/bullmq_proxy_client/pkg/client/proxyapi" +) + +type Queue struct { + c *client.Client + name string +} + +func NewQueue(c *client.Client, name string) *Queue { + return &Queue{ + c: c, + name: name, + } +} + +func (q *Queue) AddJobs(ctx context.Context, jobs ...*proxyapi.JobSpec) ([]*proxyapi.Job, error) { + return q.c.AddJobs(ctx, q.name, jobs) +} + +func (q *Queue) GetJob(ctx context.Context, id string) (*proxyapi.Job, error) { + return q.c.GetJob(ctx, q.name, id) +} + +func (q *Queue) GetJobs(ctx context.Context) (*proxyapi.GetJobsResponse, error) { + return q.c.GetJobs(ctx, q.name) +} diff --git a/clients/golang/pkg/bull/worker.go b/clients/golang/pkg/bull/worker.go new file mode 100644 index 0000000..066be5f --- /dev/null +++ b/clients/golang/pkg/bull/worker.go @@ -0,0 +1,45 @@ +package bull + +import ( + "context" + + "taskforce.sh/bullmq_proxy_client/pkg/client" + "taskforce.sh/bullmq_proxy_client/pkg/client/proxyapi" +) + +type Worker struct { + c *client.Client + queue string +} + +func NewWorker(c *client.Client, queue string) *Worker { + return &Worker{ + c: c, + queue: queue, + } +} + +func (w *Worker) Start(ctx context.Context, endpoint proxyapi.WorkerEndpoint, options *proxyapi.WorkerSimpleOptions) (func(ctx context.Context) error, error) { + // attempt to add a worker + err := w.c.AddWorker(ctx, &proxyapi.WorkerMetadata{ + Queue: w.queue, + Endpoint: endpoint, + Options: options, + }) + if err != nil { + return nil, err + } + stopCh := make(chan struct{}) + go func() { + for { + select { + case <-stopCh: + default: + } + } + }() + return func(ctx context.Context) error { + close(stopCh) + return w.c.RemoveWorker(ctx, w.queue) + }, nil +} diff --git a/clients/golang/pkg/client/client.go b/clients/golang/pkg/client/client.go new file mode 100644 index 0000000..e285248 --- /dev/null +++ b/clients/golang/pkg/client/client.go @@ -0,0 +1,161 @@ +package client + +import ( + "context" + "fmt" + "net/url" + "strconv" + "time" + + "github.com/go-resty/resty/v2" + "taskforce.sh/bullmq_proxy_client/pkg/client/proxyapi" + "taskforce.sh/bullmq_proxy_client/pkg/client/wsclient" +) + +// client represents a rooturl +type Client struct { + rootUrl string + authToken string + + // non configurable below / set on construction + httpClient *resty.Client +} + +type ClientOpts func(*Client) + +func NewClient(opts ...ClientOpts) *Client { + c := &Client{ + rootUrl: "http://localhost:8080", + } + for _, v := range opts { + v(c) + } + + c.httpClient = resty.New(). + SetBaseURL(c.rootUrl). + SetTimeout(1*time.Minute). + SetHeader("authorization", "Bearer "+c.authToken) + + return c +} + +func (c *Client) DialQueue(ctx context.Context, queueName string) (*wsclient.WebSocket[*proxyapi.QueueCommand], error) { + joinedPath, err := url.JoinPath(c.rootUrl, "ws", "queues", queueName) + if err != nil { + return nil, err + } + return wsclient.New[*proxyapi.QueueCommand](ctx, joinedPath, c.httpClient.Header) +} + +func (c *Client) DialWorker(ctx context.Context, queueName string, concurrency int) (*wsclient.WebSocket[*proxyapi.WorkerCommand], error) { + joinedPath, err := url.JoinPath(c.rootUrl, "ws", "queues", queueName, "process", strconv.Itoa(concurrency)) + if err != nil { + return nil, err + } + return wsclient.New[*proxyapi.WorkerCommand](ctx, joinedPath, c.httpClient.Header) +} + +func (c *Client) DialQueueEvents(ctx context.Context, queueName string) (*wsclient.WebSocket[any], error) { + joinedPath, err := url.JoinPath(c.rootUrl, "ws", "queues", queueName, "events") + if err != nil { + return nil, err + } + return wsclient.New[any](ctx, joinedPath, c.httpClient.Header) +} + +func (c *Client) AddJobs(ctx context.Context, queueName string, jobs []*proxyapi.JobSpec) (out []*proxyapi.Job, err error) { + _, err = c.httpClient.R(). + SetBody(jobs). + SetResult(out). + ForceContentType("application/json"). + Post(fmt.Sprintf("/queues/%s/jobs", queueName)) + if err != nil { + return nil, err + } + return out, nil +} +func (c *Client) GetJobs(ctx context.Context, queueName string) (out *proxyapi.GetJobsResponse, err error) { + _, err = c.httpClient.R(). + SetResult(out). + Get(fmt.Sprintf("/queues/%s/jobs", queueName)) + if err != nil { + return nil, err + } + return out, nil +} +func (c *Client) GetJob(ctx context.Context, queueName string, jobId string) (out *proxyapi.Job, err error) { + _, err = c.httpClient.R(). + SetResult(out). + Get(fmt.Sprintf("/queues/%s/jobs/%s", queueName, jobId)) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *Client) AddWorker(ctx context.Context, spec *proxyapi.WorkerMetadata) (err error) { + _, err = c.httpClient.R(). + SetBody(spec). + ForceContentType("application/json"). + Post("/workers") + if err != nil { + return err + } + return nil +} +func (c *Client) GetWorkers(ctx context.Context) (out *proxyapi.WorkerMetadata, err error) { + _, err = c.httpClient.R(). + SetResult(out). + Get("/workers") + if err != nil { + return nil, err + } + return out, nil +} +func (c *Client) RemoveWorker(ctx context.Context, queueName string) error { + _, err := c.httpClient.R(). + Delete(fmt.Sprintf("/workers/%s", queueName)) + if err != nil { + return err + } + return nil +} + +func (c *Client) UpdateProgress(ctx context.Context, queueName string, jobId string, progress any) (err error) { + _, err = c.httpClient.R(). + SetBody(progress). + ForceContentType("application/json"). + Post(fmt.Sprintf("/queues/%s/jobs/%s/progress", queueName, jobId)) + if err != nil { + return err + } + return nil +} +func (c *Client) AddLog(ctx context.Context, queueName string, jobId string, log string) (err error) { + _, err = c.httpClient.R(). + SetBody(log). + ForceContentType("application/json"). + Post(fmt.Sprintf("/queues/%s/jobs/%s/logs", queueName, jobId)) + if err != nil { + return err + } + return nil +} +func (c *Client) GetLogs(ctx context.Context, queueName string, jobId string, start int, length int) (out *proxyapi.JobLog, err error) { + _, err = c.httpClient.R(). + SetResult(out). + Get(fmt.Sprintf("/queues/%s/jobs/%s/logs", queueName, jobId)) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *Client) ServerInfo(ctx context.Context) (string, error) { + resp, err := c.httpClient.R().Get("/") + if err != nil { + return "", err + } + + return string(resp.Body()), nil +} diff --git a/clients/golang/pkg/client/options.go b/clients/golang/pkg/client/options.go new file mode 100644 index 0000000..fe9355d --- /dev/null +++ b/clients/golang/pkg/client/options.go @@ -0,0 +1,12 @@ +package client + +func RootUrl(url string) ClientOpts { + return func(c *Client) { + c.rootUrl = url + } +} +func AuthToken(token string) ClientOpts { + return func(c *Client) { + c.authToken = token + } +} diff --git a/clients/golang/pkg/client/proxyapi/job.go b/clients/golang/pkg/client/proxyapi/job.go new file mode 100644 index 0000000..e7da3d4 --- /dev/null +++ b/clients/golang/pkg/client/proxyapi/job.go @@ -0,0 +1,51 @@ +package proxyapi + +type Job struct { + // The name of the Job + Name string `json:"name"` + // The payload for this job. + Data any `json:"data"` + // The options object for this job. + Opts *BaseJobOptions `json:"opts"` + // The ID of the job. + ID *string `json:"id,omitempty"` + // It includes the prefix, the namespace separator :, and queue name. + // See https://www.gnu.org/software/gawk/manual/html_node/Qualified-Names.html + QueueQualifiedName string `json:"queueQualifiedName"` + // The progress a job has performed so far. + // Default value is 0. + Progress any `json:"progress,omitempty"` + // The value returned by the processor when processing this job. + // Default value is null. + Returnvalue any `json:"returnvalue,omitempty"` + // Stacktrace for the error (for failed jobs). + // Default value is null. + Stacktrace []string `json:"stacktrace,omitempty"` + // An amount of milliseconds to wait until this job can be processed. + // Default value is 0. + Delay int64 `json:"delay,omitempty"` + // Timestamp when the job was created (unless overridden with job options). + Timestamp int64 `json:"timestamp,omitempty"` + // Number of attempts when job is moved to active. + // Default value is 0. + AttemptsStarted int `json:"attemptsStarted,omitempty"` + // Number of attempts after the job has failed. + // Default value is 0. + AttemptsMade int `json:"attemptsMade,omitempty"` + // Reason for failing. + FailedReason string `json:"failedReason,omitempty"` + // Timestamp when the job was finished. + FinishedOn *int64 `json:"finishedOn,omitempty,omitempty"` + // Timestamp when the job was processed. + ProcessedOn *int64 `json:"processedOn,omitempty,omitempty"` + // The key of the parent job. + ParentKey *string `json:"parentKey,omitempty,omitempty"` + // The parent job. + Parent *JobRef `json:"parent,omitempty,omitempty"` + // The key of the repeat job. + RepeatJobKey *string `json:"repeatJobKey,omitempty,omitempty"` + // The token of the job. + Token *string `json:"token,omitempty,omitempty"` + // The ID of the process that processed the job. + ProcessedBy *string `json:"processedBy,omitempty,omitempty"` +} diff --git a/clients/golang/pkg/client/proxyapi/job_opts.go b/clients/golang/pkg/client/proxyapi/job_opts.go new file mode 100644 index 0000000..7cfa054 --- /dev/null +++ b/clients/golang/pkg/client/proxyapi/job_opts.go @@ -0,0 +1,63 @@ +package proxyapi + +type JobSpec struct { + Name string `json:"name"` + Data string `json:"data"` + Options *BulkJobOptions `json:"opts,omitempty"` +} + +type JobRef struct { + Id string `json:"id"` + + /** + * It includes the prefix, the namespace separator :, and queue name. + * @see https://www.gnu.org/software/gawk/manual/html_node/Qualified-Names.html + */ + Queue string `json:"queue"` +} + +type DefaultJobOptions struct { + Timestamp *int `json:"timestamp,omitempty"` + Priority int `json:"priority,omitempty"` + Delay int `json:"delay,omitempty"` + Attempts int `json:"attempts,omitempty"` + Backoff *BackoffStrategy `json:"backoff,omitempty"` //TODO: support a custom unmarshaler + Lifo bool `json:"lifo,omitempty"` + RemoveOnComplete *RemovePolicy `json:"removeOnComplete,omitempty"` + RemoveOnFail *RemovePolicy `json:"removeOnFail,omitempty"` + KeepLogs int `json:"keepLogs,omitempty"` + StackTraceLimit int `json:"stackTraceLimit,omitempty"` + SizeLimit int `json:"sizeLimit,omitempty"` +} + +type BaseJobOptions struct { + DefaultJobOptions + + Repeat *RepeatOptions `json:"repeat,omitempty"` + RepeatJobKey *string `json:"repeatJobKey,omitempty"` + + JobId *string `json:"jobId,omitempty"` + Parent *JobRef `json:"parent,omitempty"` + + /** + * Internal property used by repeatable jobs. + */ + PrevMillis *int `json:"prevMillis,omitempty"` +} + +type BulkJobOptions struct { + BaseJobOptions + + FailParentOnFailure bool `json:"failParentOnFailure"` + IgnoreDependencyOnFailure bool `json:"ignoreDependencyOnFailure"` + RemoveDependencyOnFailure bool `json:"removeDependencyOnFailure"` + + // Repeat cannot be set for bulk jobs, so we just don't send the repeat option by setting the json tag to - + Repeat any `json:"-"` + RepeatJobKey string `json:"-"` +} + +type RedisJobOptions struct { + BaseJobOptions +} + diff --git a/clients/golang/pkg/client/proxyapi/objects.go b/clients/golang/pkg/client/proxyapi/objects.go new file mode 100644 index 0000000..f1c2a94 --- /dev/null +++ b/clients/golang/pkg/client/proxyapi/objects.go @@ -0,0 +1,23 @@ +package proxyapi + +type GetJobsResponse struct { + Counts int `json:"counts"` + Jobs []*Job `json:"jobs"` + Start int `json:"start"` + Length int `json:"length"` +} + +type JobLog struct { + Logs []string `json:"logs"` + Count int `json:"count"` +} + +type QueueCommand struct { + Fn string `json:"fn"` + Args []interface{} `json:"args"` +} + +type WorkerCommand struct { + Type string `json:"type"` + Payload interface{} `json:"payload"` +} diff --git a/clients/golang/pkg/client/proxyapi/strategy.go b/clients/golang/pkg/client/proxyapi/strategy.go new file mode 100644 index 0000000..2269543 --- /dev/null +++ b/clients/golang/pkg/client/proxyapi/strategy.go @@ -0,0 +1,140 @@ +package proxyapi + +import ( + "bytes" + "encoding/json" + "time" +) + +type BackoffStrategy struct { + Type any + Delay float64 +} + +func (b *BackoffStrategy) UnmarshalJSON(xs []byte) error { + if len(xs) == 0 { + return nil + } + var backoffStrategy struct { + Type any `json:"type,omitempty"` + Delay float64 `json:"delay,omitempty"` + } + // assume object if start with { + if xs[0] == '{' { + err := json.Unmarshal(xs, &backoffStrategy) + if err != nil { + return err + } + b.Type = backoffStrategy.Type + b.Delay = backoffStrategy.Delay + return nil + } + // otherwise, assume number (delay) + err := json.Unmarshal(xs, &b.Delay) + if err != nil { + return err + } + + return nil +} + +func (b *BackoffStrategy) MarshalJSON() ([]byte, error) { + if b.Type == nil { + return json.Marshal(b.Delay) + } + return json.Marshal(map[string]any{ + "type": b.Type, + "delay": b.Delay, + }) +} + +// RemovePolicy represents TS type boolean | number | KeepJobs; +// if Always == true, then always remove, regardless of Count and AgeSeconds +// if Count != 0, keep up to N, for up to AgeSeconds if set +// if AgeSeconds is set, keep up to AgeSeconds. If Count==0, keep infinite, otherwise defer to KeepPolicy +type RemovePolicy struct { + Always bool + Count int + AgeSeconds float64 +} + +func (r *RemovePolicy) UnmarshalJSON(xs []byte) error { + if len(xs) == 0 { + return nil + } + if bytes.Equal(xs, []byte("true")) { + r.Always = true + return nil + } + if bytes.Equal(xs, []byte("false")) { + r.Always = false + return nil + } + var removePolicy struct { + Count int `json:"Count,omitempty"` + Age float64 `json:"Age,omitempty"` + } + // assume object if start with { + if xs[0] == '{' { + err := json.Unmarshal(xs, &removePolicy) + if err != nil { + return err + } + r.Count = removePolicy.Count + r.AgeSeconds = removePolicy.Age + return nil + } + // otherwise, assume count + err := json.Unmarshal(xs, &r.Count) + if err != nil { + return err + } + return nil + +} + +func (r *RemovePolicy) MarshalJSON() ([]byte, error) { + if r.Always { + return json.Marshal(true) + } + if r.Count == 0 && r.AgeSeconds == 0 { + return json.Marshal(false) + } + if r.AgeSeconds != 0 { + return json.Marshal(map[string]any{ + "age": r.AgeSeconds, + "count": r.Count, + }) + } + return json.Marshal(r.Count) +} + +type RepeatOptions struct { + CurrentDate *time.Time `json:"currentDate"` + StartDate *time.Time `json:"startDate"` + EndDate *time.Time `json:"endDate"` + UTC *bool `json:"utc"` + TZ *string `json:"tz"` + NthDayOfWeek *int `json:"nthDayOfWeek"` + + // A repeat pattern + Pattern *string `json:"pattern,omitempty"` + // Custom repeatable key. This is the key that holds the "metadata" + // of a given repeatable job. This key is normally auto-generated but + // it is sometimes useful to specify a custom key for easier retrieval + // of repeatable jobs. + Key *string `json:"key,omitempty"` + // Number of times the job should repeat at max. + Limit *int `json:"limit,omitempty"` + // Repeat after this amount of milliseconds + // (`pattern` setting cannot be used together with this setting.) + Every *int `json:"every,omitempty"` + // Repeated job should start right now + // ( work only with every settings) + Immediately *bool `json:"immediately,omitempty"` + // The start value for the repeat iteration count. + Count *int `json:"count,omitempty"` + PrevMillis *int `json:"prevMillis,omitempty"` + Offset *int `json:"offset,omitempty"` + JobId *string `json:"jobId,omitempty"` +} diff --git a/clients/golang/pkg/client/proxyapi/strategy_test.go b/clients/golang/pkg/client/proxyapi/strategy_test.go new file mode 100644 index 0000000..442776f --- /dev/null +++ b/clients/golang/pkg/client/proxyapi/strategy_test.go @@ -0,0 +1,23 @@ +package proxyapi_test + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/require" + "taskforce.sh/bullmq_proxy_client/pkg/client/proxyapi" +) + +func TestBulkJobOptionsMarshalNoRepeat(t *testing.T) { + ans, err := json.Marshal(&proxyapi.BulkJobOptions{ + Repeat: "test", + RepeatJobKey: "foo", + }) + require.NoError(t, err) + var o map[string]any + err = json.Unmarshal(ans, &o) + require.NoError(t, err) + require.NotContains(t, o, "repeat") + require.NotContains(t, o, "repeatJobKey") + +} diff --git a/clients/golang/pkg/client/proxyapi/worker.go b/clients/golang/pkg/client/proxyapi/worker.go new file mode 100644 index 0000000..fe9f8c9 --- /dev/null +++ b/clients/golang/pkg/client/proxyapi/worker.go @@ -0,0 +1,34 @@ +package proxyapi + +type WorkerMetrics struct { + MaxDataPoints int `json:"maxDataPoints,omitempty"` +} + +type LimiterMetrics struct { + Max int `json:"max,omitempty"` + Duration float64 `json:"duration,omitempty"` +} + +type WorkerMetadata struct { + Queue string `json:"queue,omitempty"` + Endpoint WorkerEndpoint `json:"endpoint,omitempty"` + Options *WorkerSimpleOptions `json:"opts,omitempty"` +} +type WorkerSimpleOptions struct { + Name string `json:"name,omitempty"` + Concurrency int `json:"concurrency,omitempty"` + Limiter *LimiterMetrics `json:"limiter,omitempty"` + Metrics *WorkerMetrics `json:"metrics,omitempty"` + MaxStalledCount int `json:"maxStalledCount,omitempty"` + StalledInterval float64 `json:"stalledInterval,omitempty"` + RemoveOnComplete *RemovePolicy `json:"removeOnComplete,omitempty"` + RemoveOnFail *RemovePolicy `json:"removeOnFail,omitempty"` + Prefix string `json:"prefix,omitempty"` +} + +type WorkerEndpoint struct { + Url string `json:"url"` + Method string `json:"method,omitempty"` + Headers map[string]string `json:"headers,omitempty"` + Timeout float64 `json:"timeout,omitempty"` +} diff --git a/clients/golang/pkg/client/wsclient/wsclient.go b/clients/golang/pkg/client/wsclient/wsclient.go new file mode 100644 index 0000000..e839e5e --- /dev/null +++ b/clients/golang/pkg/client/wsclient/wsclient.go @@ -0,0 +1,179 @@ +package wsclient + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log" + "net/http" + "sync" + "sync/atomic" + "time" + + "github.com/gorilla/websocket" +) + +/* +type Message struct { + ID int `json:"id"` + Data interface{} `json:"data"` +} +*/ + +type Message[T any] struct { + ID int `json:"id"` + Data T `json:"data"` +} + +type QueueResult struct { + OK *json.RawMessage `json:"ok,omitempty"` + Err *ErrorDetail `json:"err,omitempty"` +} + +type ErrorDetail struct { + Message string `json:"message"` + Stack string `json:"stack"` +} + +const ( + // Time to wait before timing out a response from server + responseTimeout = time.Second * 10 +) + +// Struct to hold pending messages +type PendingMessage[T any] struct { + messageID int + response chan Message[QueueResult] +} + +// Struct to hold the WebSocket connection +type WebSocket[T any] struct { + conn *websocket.Conn + currentMsgID atomic.Int64 + + writeMutex sync.Mutex + + pendingMsgs map[int]chan Message[QueueResult] + pendingMsgLock sync.Mutex + + stopCh chan struct{} + closeOnce sync.Once +} + +func New[T any](ctx context.Context, url string, headers http.Header) (*WebSocket[T], error) { + return ConnectWebSocket[T](ctx, url, headers) +} + +var defaultUpgrader = websocket.Upgrader{ + HandshakeTimeout: responseTimeout, + EnableCompression: false, +} + +func ConnectWebSocket[T any](ctx context.Context, url string, headers http.Header) (*WebSocket[T], error) { + var conn, _, err = websocket.DefaultDialer.DialContext(ctx, url, headers) + if err != nil { + log.Fatal("Error connecting to WebSocket:", err) + } + + var ws = &WebSocket[T]{ + conn: conn, + pendingMsgs: make(map[int]chan Message[QueueResult]), + pendingMsgLock: sync.Mutex{}, + writeMutex: sync.Mutex{}, + stopCh: make(chan struct{}), + } + ws.currentMsgID.Add(1) + + go ws.listenForResponses() + + return ws, nil +} + +func (ws *WebSocket[T]) listenForResponses() { + for { + select { + case <-ws.stopCh: + return + default: + } + var msg Message[QueueResult] + err := ws.conn.ReadJSON(&msg) + if err != nil { + log.Println("Error reading message:", err) + ws.Close() + return + } + + func() { + ws.pendingMsgLock.Lock() + defer ws.pendingMsgLock.Unlock() + if ch, ok := ws.pendingMsgs[msg.ID]; ok { + ch <- msg + delete(ws.pendingMsgs, msg.ID) + } + }() + } +} + +func (ws *WebSocket[T]) ReceiveMessage() (*Message[T], error) { + var msg Message[T] + err := ws.conn.ReadJSON(&msg) + if err != nil { + return nil, err + } + return &msg, err +} + +func (ws *WebSocket[T]) Request(msg T) (*json.RawMessage, error) { + msgID := int(ws.currentMsgID.Add(1)) + respChan := make(chan Message[QueueResult]) + ws.pendingMsgLock.Lock() + ws.pendingMsgs[msgID] = respChan + ws.pendingMsgLock.Unlock() + + message := &Message[any]{ + ID: msgID, + Data: msg, + } + err := ws.writeMessage(message) + if err != nil { + return nil, err + } + + select { + case resp := <-respChan: + var result = resp.Data + if result.Err != nil { + return nil, fmt.Errorf("%s\n%s", result.Err.Message, result.Err.Stack) + } + return result.OK, nil + case <-time.After(responseTimeout): + return nil, errors.New("Response timed out") + } +} + +func (ws *WebSocket[T]) Respond(id int, data any) error { + message := &Message[any]{ + ID: id, + Data: data, + } + return ws.writeMessage(message) +} + +func (ws *WebSocket[T]) writeMessage(msg *Message[any]) error { + ws.writeMutex.Lock() // Lock before writing to the connection + err := ws.conn.WriteJSON(msg) + ws.writeMutex.Unlock() // Unlock after writing + if err != nil { + return err + } + return nil +} + +func (ws *WebSocket[T]) Close() { + ws.closeOnce.Do(func() { + close(ws.stopCh) + ws.conn.Close() + }) +} diff --git a/clients/golang/pkg/queue/queue.go b/clients/golang/pkg/queue/queue.go new file mode 100644 index 0000000..2db3ac5 --- /dev/null +++ b/clients/golang/pkg/queue/queue.go @@ -0,0 +1,77 @@ +package queue + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + "taskforce.sh/bullmq_proxy_client/pkg/client/proxyapi" + "taskforce.sh/bullmq_proxy_client/pkg/client/wsclient" +) + +type Queue struct { + ws *wsclient.WebSocket[*proxyapi.QueueCommand] +} + +func NewQueue(ctx context.Context, url string, headers http.Header) (*Queue, error) { + ws, err := wsclient.New[*proxyapi.QueueCommand](ctx, url, headers) + if err != nil { + return nil, err + } + return &Queue{ws: ws}, nil +} + +type JobResponse struct { + AttemptsMade int `json:"attemptsMade,omitempty"` + Data map[string]interface{} `json:"data,omitempty"` + Delay int `json:"delay,omitempty"` + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Opts JobOpts `json:"opts,omitempty"` + Progress int `json:"progress,omitempty"` + QueueQualifiedName string `json:"queueQualifiedName,omitempty"` + ReturnValue interface{} `json:"returnvalue,omitempty"` + Stacktrace interface{} `json:"stacktrace,omitempty"` + Timestamp int64 `json:"timestamp,omitempty"` +} + +type JobOpts struct { + Attempts *int `json:"attempts,omitempty"` + Delay *int `json:"delay,omitempty"` +} + +func (q *Queue) AddJob(name string, data interface{}, opts interface{}) (*JobResponse, error) { + cmd := &proxyapi.QueueCommand{ + Fn: "add", + Args: []interface{}{name, data, opts}, + } + + rawData, err := q.ws.Request(cmd) + if err != nil { + return nil, fmt.Errorf("Failed to send message: %v", err) + } + + if rawData == nil { + return nil, fmt.Errorf("Failed to receive response") + } + + var jobResp JobResponse + if err := json.Unmarshal(*rawData, &jobResp); err != nil { + return nil, fmt.Errorf("Failed to unmarshal response: %v", err) + } + + return &jobResp, nil +} + +func (q *Queue) PauseJob() { + cmd := &proxyapi.QueueCommand{ + Fn: "pause", + Args: []interface{}{}, + } + q.ws.Request(cmd) +} + +func (q *Queue) Close() { + q.ws.Close() +} diff --git a/clients/golang/queue/worker.go b/clients/golang/pkg/queue/worker.go similarity index 62% rename from clients/golang/queue/worker.go rename to clients/golang/pkg/queue/worker.go index 86c29c4..fb88c87 100644 --- a/clients/golang/queue/worker.go +++ b/clients/golang/pkg/queue/worker.go @@ -1,17 +1,15 @@ package queue import ( + "context" "fmt" + "net/http" // add other required imports - "taskforce.sh/bullmq_proxy_client/wsclient" + "taskforce.sh/bullmq_proxy_client/pkg/client/proxyapi" + "taskforce.sh/bullmq_proxy_client/pkg/client/wsclient" ) -type WorkerCommand struct { - Type string `json:"type"` - Payload interface{} `json:"payload"` -} - type JobResult struct { Result interface{} `json:"result"` } @@ -24,22 +22,27 @@ type JobError struct { type ProcessorFunc func(job interface{}) (interface{}, error) type QueueWorker struct { - ws *wsclient.WebSocket[WorkerCommand] + ws *wsclient.WebSocket[*proxyapi.WorkerCommand] } -func NewWorker(host string, queueName string, token string, concurrency int, processor ProcessorFunc) *QueueWorker { - url := fmt.Sprintf("%s/queues/%s/process/%d?token=%s", host, queueName, concurrency, token) - var ws = wsclient.New[WorkerCommand](url) +func NewWorker(ctx context.Context, host string, queueName string, token string, concurrency int, processor ProcessorFunc) (*QueueWorker, error) { + url := fmt.Sprintf("%s/ws/queues/%s/process/%d", host, queueName, concurrency) + h := make(http.Header) + h.Set("Authorization", "Bearer "+token) + ws, err := wsclient.New[*proxyapi.WorkerCommand](ctx, url, h) + if err != nil { + return nil, err + } var qw = &QueueWorker{ws: ws} go qw.listen(processor) - return qw; + return qw, nil } func (qw *QueueWorker) listen(processor ProcessorFunc) { for { - message, err := qw.ws.ReceiveWebSocketMessage() // assuming this blocks until a message is received + message, err := qw.ws.ReceiveMessage() // assuming this blocks until a message is received if err != nil { fmt.Printf("Error receiving message: %v\n", err) continue diff --git a/clients/golang/pkg/util/ptr.go b/clients/golang/pkg/util/ptr.go new file mode 100644 index 0000000..994d51f --- /dev/null +++ b/clients/golang/pkg/util/ptr.go @@ -0,0 +1,5 @@ +package util + +func Ptr[T any](x T) *T { + return &x +} diff --git a/clients/golang/queue/queue.go b/clients/golang/queue/queue.go deleted file mode 100644 index b503a6c..0000000 --- a/clients/golang/queue/queue.go +++ /dev/null @@ -1,76 +0,0 @@ -package queue - -import ( - "encoding/json" - "fmt" - - "taskforce.sh/bullmq_proxy_client/wsclient" -) - -type QueueCommand struct { - Fn string `json:"fn"` - Args []interface{} `json:"args"` -} - -type Queue struct { - ws *wsclient.WebSocket[QueueCommand] -} - -func NewQueue(url string) *Queue { - var ws = wsclient.New[QueueCommand](url) - return &Queue{ws: ws} -} - -type JobResponse struct { - AttemptsMade int `json:"attemptsMade,omitempty"` - Data map[string]interface{} `json:"data,omitempty"` - Delay int `json:"delay,omitempty"` - ID string `json:"id,omitempty"` - Name string `json:"name,omitempty"` - Opts JobOpts `json:"opts,omitempty"` - Progress int `json:"progress,omitempty"` - QueueQualifiedName string `json:"queueQualifiedName,omitempty"` - ReturnValue interface{} `json:"returnvalue,omitempty"` - Stacktrace interface{} `json:"stacktrace,omitempty"` - Timestamp int64 `json:"timestamp,omitempty"` -} - -type JobOpts struct { - Attempts *int `json:"attempts,omitempty"` - Delay *int `json:"delay,omitempty"` -} - -func (q *Queue) AddJob(name string, data interface{}, opts interface{}) (*JobResponse, error) { - cmd := QueueCommand{ - Fn: "add", - Args: []interface{}{name, data, opts}, - } - - rawData, err := q.ws.SendWebSocketMessage(cmd) - if err != nil { - return nil, fmt.Errorf("Failed to send message: %v", err) - } - - if(rawData == nil) { - return nil, fmt.Errorf("Failed to receive response") - } - - var jobResp JobResponse - if err := json.Unmarshal(*rawData, &jobResp); err != nil { - return nil, fmt.Errorf("Failed to unmarshal response: %v", err) - } - - return &jobResp, nil -} - -func (q *Queue) PauseJob() { - cmd := QueueCommand{ - Fn: "pause", - Args: []interface{}{}, - } - q.ws.SendWebSocketMessage(cmd) -} - -func (q *Queue) Close() { - q.ws.Close() -} diff --git a/clients/golang/tests/queue_test.go b/clients/golang/tests/queue_test.go index 42987df..cdbe28b 100644 --- a/clients/golang/tests/queue_test.go +++ b/clients/golang/tests/queue_test.go @@ -1,16 +1,23 @@ package wsclient import ( + "context" + "net/http" "sync" "testing" "time" - "taskforce.sh/bullmq_proxy_client/queue" + "github.com/stretchr/testify/require" + "taskforce.sh/bullmq_proxy_client/pkg/queue" ) func TestAddJob(t *testing.T) { + ctx := context.Background() + h := make(http.Header) + h.Set("Authorization", "Bearer 1234") const numJobs = 10 - q := queue.NewQueue("ws://localhost:8080/queues/test?token=1234") + q, err := queue.NewQueue(ctx, "ws://localhost:8080/ws/queues/test", h) + require.NoError(t, err) // Use a wait group to wait for all goroutines to finish var wg sync.WaitGroup @@ -48,12 +55,13 @@ func TestAddJob(t *testing.T) { // Channels for signaling var jobCount int - - worker := queue.NewWorker("ws://localhost:8080", "test", "1234", 10, func(job interface{}) (interface{}, error) { + + worker, err := queue.NewWorker(ctx, "ws://localhost:8080", "test", "1234", 10, func(job interface{}) (interface{}, error) { t.Logf("Processing job: %v", job) - jobCount++ + jobCount++ return nil, nil }) + require.NoError(t, err) // Sleep for a bit to allow the worker to process time.Sleep(8 * time.Second) diff --git a/clients/golang/wsclient/wsclient.go b/clients/golang/wsclient/wsclient.go deleted file mode 100644 index b38e57b..0000000 --- a/clients/golang/wsclient/wsclient.go +++ /dev/null @@ -1,154 +0,0 @@ -package wsclient - -import ( - "encoding/json" - "errors" - "fmt" - "log" - "sync" - "time" - - "github.com/gorilla/websocket" -) - -/* -type Message struct { - ID int `json:"id"` - Data interface{} `json:"data"` -} -*/ - -type Message[T any] struct { - ID int `json:"id"` - Data T `json:"data"` -} - -type QueueResult struct { - OK *json.RawMessage `json:"ok,omitempty"` - Err *ErrorDetail `json:"err,omitempty"` -} - -type ErrorDetail struct { - Message string `json:"message"` - Stack string `json:"stack"` -} - -const ( - // Time to wait before timing out a response from server - responseTimeout = time.Second * 10 -) - -// Struct to hold pending messages -type PendingMessage[T any] struct { - messageID int - response chan Message[QueueResult] -} - -// Struct to hold the WebSocket connection -type WebSocket[T any] struct { - conn *websocket.Conn - currentMsgID int - pendingMsgs map[int]chan Message[QueueResult] - pendingMsgLock sync.Mutex - writeMutex sync.Mutex -} - -func New[T any](url string) *WebSocket[T] { - return ConnectWebSocket[T](url) -} - -func ConnectWebSocket[T any](url string) *WebSocket[T] { - var conn, _, err = websocket.DefaultDialer.Dial(url, nil) - if err != nil { - log.Fatal("Error connecting to WebSocket:", err) - } - - var ws = &WebSocket[T]{ - conn: conn, - currentMsgID: 1, - pendingMsgs: make(map[int]chan Message[QueueResult]), - pendingMsgLock: sync.Mutex{}, - writeMutex: sync.Mutex{}, - } - - go ws.listenForResponses() - - return ws -} - -func (ws *WebSocket[T]) listenForResponses() { - for { - var msg Message[QueueResult] - err := ws.conn.ReadJSON(&msg) - if err != nil { - log.Println("Error reading message:", err) - continue - } - - ws.pendingMsgLock.Lock() - if ch, ok := ws.pendingMsgs[msg.ID]; ok { - ch <- msg - delete(ws.pendingMsgs, msg.ID) - } - ws.pendingMsgLock.Unlock() - } -} - -func (ws *WebSocket[T]) SendWebSocketMessage(msg T) (*json.RawMessage, error) { - ws.pendingMsgLock.Lock() - msgID := ws.currentMsgID - ws.currentMsgID++ - ws.pendingMsgLock.Unlock() - - respChan := make(chan Message[QueueResult]) - ws.pendingMsgLock.Lock() - ws.pendingMsgs[msgID] = respChan - ws.pendingMsgLock.Unlock() - - message := Message[T]{ - ID: msgID, - Data: msg, - } - - ws.writeMutex.Lock() // Lock before writing to the connection - err := ws.conn.WriteJSON(message) - defer ws.writeMutex.Unlock() // Unlock after writing - - if err != nil { - return nil, err - } - - select { - case resp := <-respChan: - var result = resp.Data - if result.Err != nil { - return nil, fmt.Errorf("%s\n%s", result.Err.Message, result.Err.Stack) - } - return result.OK, nil - case <-time.After(responseTimeout): - return nil, errors.New("Response timed out") - } -} - -func (ws *WebSocket[T]) ReceiveWebSocketMessage() (Message[T], error) { - var msg Message[T] - err := ws.conn.ReadJSON(&msg) - return msg, err -} - -func (ws *WebSocket[T]) Close() { - ws.conn.Close() -} - -func (ws *WebSocket[T]) Respond(id int, data interface{}) error { - message := Message[any]{ - ID: id, - Data: data, - } - - ws.writeMutex.Lock() // Lock before writing to the connection - err := ws.conn.WriteJSON(message) - defer ws.writeMutex.Unlock() // Unlock after writing - - return err -} diff --git a/src/controllers/http/queue-http-controller.ts b/src/controllers/http/queue-http-controller.ts index e2fd2e8..bef54b0 100644 --- a/src/controllers/http/queue-http-controller.ts +++ b/src/controllers/http/queue-http-controller.ts @@ -9,8 +9,8 @@ type QueueHttpControllerOpts = Omit; export const QueueHttpController = { /** * addJobs - * @param opts - * @returns + * @param opts + * @returns */ addJobs: async (opts: QueueHttpControllerOpts) => { const queueName = opts.params.queueName; @@ -90,8 +90,8 @@ export const QueueHttpController = { /** * getJob - * @param opts - * @returns + * @param opts + * @returns */ getJob: async (opts: QueueHttpControllerOpts) => { const queueName = opts.params.queueName; diff --git a/src/index.ts b/src/index.ts index 74db2f6..f917330 100644 --- a/src/index.ts +++ b/src/index.ts @@ -15,6 +15,7 @@ let workersConnection: Redis | Cluster; if (config.redis.url) { connection = new IORedis(config.redis.url, { retryStrategy: () => 3000, + // maxRetriesPerRequest: 20, enableOfflineQueue: false, }); } else { @@ -24,6 +25,7 @@ if (config.redis.url) { password: config.redis.password, username: config.redis.username, tls: config.redis.tls, + // maxRetriesPerRequest: 20, retryStrategy: () => 3000, }); } diff --git a/src/proxy.ts b/src/proxy.ts index ce30901..58ea968 100644 --- a/src/proxy.ts +++ b/src/proxy.ts @@ -58,7 +58,7 @@ export interface ProxyOpts { /** * Cleans the proxy metadata from the Redis host. - * @param connection + * @param connection */ export const cleanProxy = async (connection: Redis | Cluster, ) => {