From 7ad8a38c2e1cce8f751fc454d29b213222cfc761 Mon Sep 17 00:00:00 2001 From: xescugc Date: Mon, 7 Jun 2021 11:00:29 +0200 Subject: [PATCH 01/16] CHANGELOG: Stareted the CHANGELOG Mentioning the 1.4 that we'll create is based on master, as the last release is 1.3 and it's old --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..b26bdf1 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,5 @@ +## [Unreleased] + +## [0.1.4] _2021-06-07_ + +Fork from https://github.com/benmanns/goworker from master beeing https://github.com/benmanns/goworker/commit/d28a4f34a4d183f3ea2e51b4b8268807e0984942 From 915dfaeeb8d38463c41bcfee767337bfe240f0b4 Mon Sep 17 00:00:00 2001 From: kerak19 Date: Mon, 21 Dec 2020 14:05:26 +0100 Subject: [PATCH 02/16] goworker: replace redigo with go-redis --- goworker.go | 79 +++++++++++++++++++++-------------- poller.go | 67 ++++++++++++++--------------- process.go | 78 ++++++++++++++++++++++++---------- redis.go | 118 ---------------------------------------------------- worker.go | 91 ++++++++++++++++++++++------------------ workers.go | 13 ++---- 6 files changed, 190 insertions(+), 256 deletions(-) delete mode 100644 redis.go diff --git a/goworker.go b/goworker.go index cf2faad..835cc78 100644 --- a/goworker.go +++ b/goworker.go @@ -1,20 +1,25 @@ package goworker import ( + "crypto/tls" + "crypto/x509" + "fmt" + "io/ioutil" "os" "strconv" "sync" "time" + "github.com/go-redis/redis/v7" + "golang.org/x/net/context" "github.com/cihub/seelog" - "vitess.io/vitess/go/pools" ) var ( logger seelog.LoggerInterface - pool *pools.ResourcePool + client *redis.Client ctx context.Context initMutex sync.Mutex initialized bool @@ -61,54 +66,66 @@ func Init() error { } ctx = context.Background() - pool = newRedisPool(workerSettings.URI, workerSettings.Connections, workerSettings.Connections, time.Minute) + opts, err := redis.ParseURL(workerSettings.URI) + if err != nil { + return err + } + + if len(workerSettings.TLSCertPath) > 0 { + certPool, err := getCertPool() + if err != nil { + return err + } + opts.TLSConfig = &tls.Config{ + RootCAs: certPool, + InsecureSkipVerify: workerSettings.SkipTLSVerify, + } + } + + client = redis.NewClient(opts).WithContext(ctx) + err = client.Ping().Err() + if err != nil { + return err + } initialized = true } + return nil } -// GetConn returns a connection from the goworker Redis -// connection pool. When using the pool, check in -// connections as quickly as possible, because holding a -// connection will cause concurrent worker functions to lock -// while they wait for an available connection. Expect this -// API to change drastically. -func GetConn() (*RedisConn, error) { - resource, err := pool.Get(ctx) - +func getCertPool() (*x509.CertPool, error) { + rootCAs, _ := x509.SystemCertPool() + if rootCAs == nil { + rootCAs = x509.NewCertPool() + } + certs, err := ioutil.ReadFile(workerSettings.TLSCertPath) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to read %q for the RootCA pool: %v", workerSettings.TLSCertPath, err) } - return resource.(*RedisConn), nil -} - -// PutConn puts a connection back into the connection pool. -// Run this as soon as you finish using a connection that -// you got from GetConn. Expect this API to change -// drastically. -func PutConn(conn *RedisConn) { - pool.Put(conn) + if ok := rootCAs.AppendCertsFromPEM(certs); !ok { + return nil, fmt.Errorf("failed to append %q to the RootCA pool: %v", workerSettings.TLSCertPath, err) + } + return rootCAs, nil } // Close cleans up resources initialized by goworker. This // will be called by Work when cleaning up. However, if you // are using the Init function to access goworker functions // and configuration without processing jobs by calling -// Work, you should run this function when cleaning up. For -// example, -// -// if err := goworker.Init(); err != nil { -// fmt.Println("Error:", err) -// } -// defer goworker.Close() -func Close() { +// Work, you should run this function when cleaning up. +func Close() error { initMutex.Lock() defer initMutex.Unlock() if initialized { - pool.Close() + err := client.Close() + if err != nil { + return err + } initialized = false } + + return nil } // Work starts the goworker process. Check for errors in diff --git a/poller.go b/poller.go index 54ad696..0f6737f 100644 --- a/poller.go +++ b/poller.go @@ -5,6 +5,8 @@ import ( "encoding/json" "fmt" "time" + + "github.com/go-redis/redis/v7" ) type poller struct { @@ -23,20 +25,24 @@ func newPoller(queues []string, isStrict bool) (*poller, error) { }, nil } -func (p *poller) getJob(conn *RedisConn) (*Job, error) { +func (p *poller) getJob(c *redis.Client) (*Job, error) { for _, queue := range p.queues(p.isStrict) { logger.Debugf("Checking %s", queue) - reply, err := conn.Do("LPOP", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, queue)) + result, err := c.LPop(fmt.Sprintf("%squeue:%s", workerSettings.Namespace, queue)).Result() if err != nil { + // no jobs for now, continue on another queue + if err == redis.Nil { + continue + } return nil, err } - if reply != nil { + if result != "" { logger.Debugf("Found job on %s", queue) job := &Job{Queue: queue} - decoder := json.NewDecoder(bytes.NewReader(reply.([]byte))) + decoder := json.NewDecoder(bytes.NewReader([]byte(result))) if workerSettings.UseNumber { decoder.UseNumber() } @@ -52,31 +58,29 @@ func (p *poller) getJob(conn *RedisConn) (*Job, error) { } func (p *poller) poll(interval time.Duration, quit <-chan bool) (<-chan *Job, error) { - jobs := make(chan *Job) + err := p.open(client) + if err != nil { + return nil, err + } - conn, err := GetConn() + err = p.start(client) if err != nil { - logger.Criticalf("Error on getting connection in poller %s: %v", p, err) - close(jobs) return nil, err - } else { - p.open(conn) - p.start(conn) - PutConn(conn) } + jobs := make(chan *Job) go func() { defer func() { close(jobs) - conn, err := GetConn() + err = p.finish(client) + if err != nil { + return + } + + err = p.close(client) if err != nil { - logger.Criticalf("Error on getting connection in poller %s: %v", p, err) return - } else { - p.finish(conn) - p.close(conn) - PutConn(conn) } }() @@ -85,22 +89,17 @@ func (p *poller) poll(interval time.Duration, quit <-chan bool) (<-chan *Job, er case <-quit: return default: - conn, err := GetConn() - if err != nil { - logger.Criticalf("Error on getting connection in poller %s: %v", p, err) - return - } - - job, err := p.getJob(conn) + job, err := p.getJob(client) if err != nil { - logger.Criticalf("Error on %v getting job from %v: %v", p, p.Queues, err) - PutConn(conn) + logger.Criticalf("Error on %v getting job from %v: %+v", p, p.Queues, err) return } if job != nil { - conn.Send("INCR", fmt.Sprintf("%sstat:processed:%v", workerSettings.Namespace, p)) - conn.Flush() - PutConn(conn) + err = client.Incr(fmt.Sprintf("%sstat:processed:%v", workerSettings.Namespace, p)).Err() + if err != nil { + return + } + select { case jobs <- job: case <-quit: @@ -109,19 +108,15 @@ func (p *poller) poll(interval time.Duration, quit <-chan bool) (<-chan *Job, er logger.Criticalf("Error requeueing %v: %v", job, err) return } - conn, err := GetConn() + + err = client.LPush(fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buf).Err() if err != nil { - logger.Criticalf("Error on getting connection in poller %s: %v", p, err) return } - conn.Send("LPUSH", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buf) - conn.Flush() - PutConn(conn) return } } else { - PutConn(conn) if workerSettings.ExitOnComplete { return } diff --git a/process.go b/process.go index 82bcd9e..8ded419 100644 --- a/process.go +++ b/process.go @@ -6,6 +6,8 @@ import ( "os" "strings" "time" + + "github.com/go-redis/redis/v7" ) type process struct { @@ -33,44 +35,78 @@ func (p *process) String() string { return fmt.Sprintf("%s:%d-%s:%s", p.Hostname, p.Pid, p.ID, strings.Join(p.Queues, ",")) } -func (p *process) open(conn *RedisConn) error { - conn.Send("SADD", fmt.Sprintf("%sworkers", workerSettings.Namespace), p) - conn.Send("SET", fmt.Sprintf("%sstat:processed:%v", workerSettings.Namespace, p), "0") - conn.Send("SET", fmt.Sprintf("%sstat:failed:%v", workerSettings.Namespace, p), "0") - conn.Flush() +func (p *process) open(c *redis.Client) error { + err := c.SAdd(fmt.Sprintf("%sworkers", workerSettings.Namespace), p.String()).Err() + if err != nil { + return err + } + + err = c.Set(fmt.Sprintf("%sstat:processed:%v", workerSettings.Namespace, p), "0", 0).Err() + if err != nil { + return err + } + + err = c.Set(fmt.Sprintf("%sstat:failed:%v", workerSettings.Namespace, p), "0", 0).Err() + if err != nil { + return err + } return nil } -func (p *process) close(conn *RedisConn) error { +func (p *process) close(c *redis.Client) error { logger.Infof("%v shutdown", p) - conn.Send("SREM", fmt.Sprintf("%sworkers", workerSettings.Namespace), p) - conn.Send("DEL", fmt.Sprintf("%sstat:processed:%s", workerSettings.Namespace, p)) - conn.Send("DEL", fmt.Sprintf("%sstat:failed:%s", workerSettings.Namespace, p)) - conn.Flush() + err := c.SRem(fmt.Sprintf("%sworkers", workerSettings.Namespace), p.String()).Err() + if err != nil { + return err + } + + err = c.Del(fmt.Sprintf("%sstat:processed:%s", workerSettings.Namespace, p)).Err() + if err != nil { + return err + } + + err = c.Del(fmt.Sprintf("%sstat:failed:%s", workerSettings.Namespace, p)).Err() + if err != nil { + return err + } return nil } -func (p *process) start(conn *RedisConn) error { - conn.Send("SET", fmt.Sprintf("%sworker:%s:started", workerSettings.Namespace, p), time.Now().String()) - conn.Flush() +func (p *process) start(c *redis.Client) error { + err := c.Set(fmt.Sprintf("%sworker:%s:started", workerSettings.Namespace, p), time.Now().String(), 0).Err() + if err != nil { + return err + } return nil } -func (p *process) finish(conn *RedisConn) error { - conn.Send("DEL", fmt.Sprintf("%sworker:%s", workerSettings.Namespace, p)) - conn.Send("DEL", fmt.Sprintf("%sworker:%s:started", workerSettings.Namespace, p)) - conn.Flush() +func (p *process) finish(c *redis.Client) error { + err := c.Del(fmt.Sprintf("%sworker:%s", workerSettings.Namespace, p)).Err() + if err != nil { + return err + } + + err = c.Del(fmt.Sprintf("%sworker:%s:started", workerSettings.Namespace, p)).Err() + if err != nil { + return err + } return nil } -func (p *process) fail(conn *RedisConn) error { - conn.Send("INCR", fmt.Sprintf("%sstat:failed", workerSettings.Namespace)) - conn.Send("INCR", fmt.Sprintf("%sstat:failed:%s", workerSettings.Namespace, p)) - conn.Flush() +func (p *process) fail(c *redis.Client) error { + err := c.Incr(fmt.Sprintf("%sstat:failed", workerSettings.Namespace)).Err() + if err != nil { + return err + } + + err = c.Incr(fmt.Sprintf("%sstat:failed:%s", workerSettings.Namespace, p)).Err() + if err != nil { + return err + } return nil } diff --git a/redis.go b/redis.go deleted file mode 100644 index 085993b..0000000 --- a/redis.go +++ /dev/null @@ -1,118 +0,0 @@ -package goworker - -import ( - "crypto/tls" - "crypto/x509" - "errors" - "fmt" - "io/ioutil" - "net/url" - "time" - - "github.com/gomodule/redigo/redis" - "vitess.io/vitess/go/pools" -) - -var ( - errorInvalidScheme = errors.New("invalid Redis database URI scheme") -) - -type RedisConn struct { - redis.Conn -} - -func (r *RedisConn) Close() { - _ = r.Conn.Close() -} - -func newRedisFactory(uri string) pools.Factory { - return func() (pools.Resource, error) { - return redisConnFromURI(uri) - } -} - -func newRedisPool(uri string, capacity int, maxCapacity int, idleTimout time.Duration) *pools.ResourcePool { - return pools.NewResourcePool(newRedisFactory(uri), capacity, maxCapacity, idleTimout) -} - -func redisConnFromURI(uriString string) (*RedisConn, error) { - uri, err := url.Parse(uriString) - if err != nil { - return nil, err - } - - var network string - var host string - var password string - var db string - var dialOptions []redis.DialOption - - switch uri.Scheme { - case "redis", "rediss": - network = "tcp" - host = uri.Host - if uri.User != nil { - password, _ = uri.User.Password() - } - if len(uri.Path) > 1 { - db = uri.Path[1:] - } - if uri.Scheme == "rediss" { - dialOptions = append(dialOptions, redis.DialUseTLS(true)) - dialOptions = append(dialOptions, redis.DialTLSSkipVerify(workerSettings.SkipTLSVerify)) - if len(workerSettings.TLSCertPath) > 0 { - pool, err := getCertPool(workerSettings.TLSCertPath) - if err != nil { - return nil, err - } - config := &tls.Config{ - RootCAs: pool, - } - dialOptions = append(dialOptions, redis.DialTLSConfig(config)) - } - } - case "unix": - network = "unix" - host = uri.Path - default: - return nil, errorInvalidScheme - } - - conn, err := redis.Dial(network, host, dialOptions...) - if err != nil { - return nil, err - } - - if password != "" { - _, err := conn.Do("AUTH", password) - if err != nil { - conn.Close() - return nil, err - } - } - - if db != "" { - _, err := conn.Do("SELECT", db) - if err != nil { - conn.Close() - return nil, err - } - } - - return &RedisConn{Conn: conn}, nil -} - -func getCertPool(certPath string) (*x509.CertPool, error) { - rootCAs, _ := x509.SystemCertPool() - if rootCAs == nil { - rootCAs = x509.NewCertPool() - } - certs, err := ioutil.ReadFile(workerSettings.TLSCertPath) - if err != nil { - return nil, fmt.Errorf("Failed to read %q for the RootCA pool: %v", workerSettings.TLSCertPath, err) - } - if ok := rootCAs.AppendCertsFromPEM(certs); !ok { - return nil, fmt.Errorf("Failed to append %q to the RootCA pool: %v", workerSettings.TLSCertPath, err) - } - return rootCAs, nil -} diff --git a/worker.go b/worker.go index cecd34f..c8db26f 100644 --- a/worker.go +++ b/worker.go @@ -6,6 +6,8 @@ import ( "fmt" "sync" "time" + + "github.com/go-redis/redis/v7" ) type worker struct { @@ -26,7 +28,7 @@ func (w *worker) MarshalJSON() ([]byte, error) { return json.Marshal(w.String()) } -func (w *worker) start(conn *RedisConn, job *Job) error { +func (w *worker) start(c *redis.Client, job *Job) error { work := &work{ Queue: job.Queue, RunAt: time.Now(), @@ -38,13 +40,17 @@ func (w *worker) start(conn *RedisConn, job *Job) error { return err } - conn.Send("SET", fmt.Sprintf("%sworker:%s", workerSettings.Namespace, w), buffer) + err = c.Set(fmt.Sprintf("%sworker:%s", workerSettings.Namespace, w), buffer, 0).Err() + if err != nil { + return err + } + logger.Debugf("Processing %s since %s [%v]", work.Queue, work.RunAt, work.Payload.Class) - return w.process.start(conn) + return w.process.start(c) } -func (w *worker) fail(conn *RedisConn, job *Job, err error) error { +func (w *worker) fail(c *redis.Client, job *Job, err error) error { failure := &failure{ FailedAt: time.Now(), Payload: job.Payload, @@ -57,35 +63,49 @@ func (w *worker) fail(conn *RedisConn, job *Job, err error) error { if err != nil { return err } - conn.Send("RPUSH", fmt.Sprintf("%sfailed", workerSettings.Namespace), buffer) - return w.process.fail(conn) + err = c.RPush(fmt.Sprintf("%sfailed", workerSettings.Namespace), buffer).Err() + if err != nil { + return err + } + + return w.process.fail(c) } -func (w *worker) succeed(conn *RedisConn, job *Job) error { - conn.Send("INCR", fmt.Sprintf("%sstat:processed", workerSettings.Namespace)) - conn.Send("INCR", fmt.Sprintf("%sstat:processed:%s", workerSettings.Namespace, w)) +func (w *worker) succeed(c *redis.Client) error { + err := c.Incr(fmt.Sprintf("%sstat:processed", workerSettings.Namespace)).Err() + if err != nil { + return err + } + + err = c.Incr(fmt.Sprintf("%sstat:processed:%s", workerSettings.Namespace, w)).Err() + if err != nil { + return err + } return nil } -func (w *worker) finish(conn *RedisConn, job *Job, err error) error { +func (w *worker) finish(c *redis.Client, job *Job, err error) error { if err != nil { - w.fail(conn, job, err) + err = w.fail(c, job, err) + if err != nil { + return err + } } else { - w.succeed(conn, job) + err = w.succeed(c) + if err != nil { + return err + } } - return w.process.finish(conn) + return w.process.finish(c) } func (w *worker) work(jobs <-chan *Job, monitor *sync.WaitGroup) { - conn, err := GetConn() + err := w.open(client) if err != nil { - logger.Criticalf("Error on getting connection in worker %v: %v", w, err) + logger.Criticalf("Error on opening worker %v: %v", w, err) return - } else { - w.open(conn) - PutConn(conn) } monitor.Add(1) @@ -94,13 +114,10 @@ func (w *worker) work(jobs <-chan *Job, monitor *sync.WaitGroup) { defer func() { defer monitor.Done() - conn, err := GetConn() + err := w.close(client) if err != nil { - logger.Criticalf("Error on getting connection in worker %v: %v", w, err) + logger.Criticalf("Error on closing worker %v: %v", w, err) return - } else { - w.close(conn) - PutConn(conn) } }() for job := range jobs { @@ -112,13 +129,10 @@ func (w *worker) work(jobs <-chan *Job, monitor *sync.WaitGroup) { errorLog := fmt.Sprintf("No worker for %s in queue %s with args %v", job.Payload.Class, job.Queue, job.Payload.Args) logger.Critical(errorLog) - conn, err := GetConn() + err := w.finish(client, job, errors.New(errorLog)) if err != nil { - logger.Criticalf("Error on getting connection in worker %v: %v", w, err) + logger.Criticalf("Error on finishing worker %v: %v", w, err) return - } else { - w.finish(conn, job, errors.New(errorLog)) - PutConn(conn) } } } @@ -127,29 +141,26 @@ func (w *worker) work(jobs <-chan *Job, monitor *sync.WaitGroup) { func (w *worker) run(job *Job, workerFunc workerFunc) { var err error + defer func() { - conn, errCon := GetConn() - if errCon != nil { - logger.Criticalf("Error on getting connection in worker on finish %v: %v", w, errCon) + errFinish := w.finish(client, job, err) + if errFinish != nil { + logger.Criticalf("Error on finishing worker %v: %v", w, errFinish) return - } else { - w.finish(conn, job, err) - PutConn(conn) } }() + defer func() { if r := recover(); r != nil { err = errors.New(fmt.Sprint(r)) } }() - conn, err := GetConn() - if err != nil { - logger.Criticalf("Error on getting connection in worker on start %v: %v", w, err) + errStart := w.start(client, job) + if errStart != nil { + logger.Criticalf("Error on starting worker %v: %v", w, errStart) return - } else { - w.start(conn, job) - PutConn(conn) } + err = workerFunc(job.Queue, job.Payload.Args...) } diff --git a/workers.go b/workers.go index 0a5fb6f..42aac91 100644 --- a/workers.go +++ b/workers.go @@ -27,30 +27,23 @@ func Enqueue(job *Job) error { return err } - conn, err := GetConn() - if err != nil { - logger.Criticalf("Error on getting connection on enqueue") - return err - } - defer PutConn(conn) - buffer, err := json.Marshal(job.Payload) if err != nil { logger.Criticalf("Cant marshal payload on enqueue") return err } - err = conn.Send("RPUSH", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buffer) + err = client.RPush(fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buffer).Err() if err != nil { logger.Criticalf("Cant push to queue") return err } - err = conn.Send("SADD", fmt.Sprintf("%squeues", workerSettings.Namespace), job.Queue) + err = client.SAdd(fmt.Sprintf("%squeues", workerSettings.Namespace), job.Queue).Err() if err != nil { logger.Criticalf("Cant register queue to list of use queues") return err } - return conn.Flush() + return nil } From 4d43e80c213040a8878f53866705304f7ef644d9 Mon Sep 17 00:00:00 2001 From: Dmitry Shalashov Date: Tue, 26 Jul 2022 19:53:23 +0300 Subject: [PATCH 03/16] chore: rebase go.sum --- go.mod | 7 +++---- go.sum | 61 +++++++++++++++++++++++++++++++++++++++++----------------- 2 files changed, 46 insertions(+), 22 deletions(-) diff --git a/go.mod b/go.mod index 00a08f7..7d6b57c 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,7 @@ module github.com/skaurus/goworker go 1.18 require ( - github.com/cihub/seelog v0.0.0-20140730094913-72ae425987bc - github.com/gomodule/redigo v1.8.2 - golang.org/x/net v0.0.0-20200822124328-c89045814202 - vitess.io/vitess v3.0.0-rc.3.0.20181212200900-e2c5239f54d1+incompatible + github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 + github.com/go-redis/redis/v7 v7.4.0 + golang.org/x/net v0.0.0-20201110031124-69a78807bb2b ) diff --git a/go.sum b/go.sum index da88c49..3faafc2 100644 --- a/go.sum +++ b/go.sum @@ -1,25 +1,50 @@ -github.com/cihub/seelog v0.0.0-20140730094913-72ae425987bc h1:HSZdsOzV0MO6cEcf31hZoT6KJGI806Z523bkYPDwkQs= -github.com/cihub/seelog v0.0.0-20140730094913-72ae425987bc/go.mod h1:9d6lWj8KzO/fd/NrVaLscBKmPigpZpn5YawRPw+e3Yo= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/gomodule/redigo v1.8.2 h1:H5XSIre1MB5NbPYFp+i1NBbb5qN1W8Y8YAQoAYbkm8k= -github.com/gomodule/redigo v1.8.2/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= -github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 h1:kHaBemcxl8o/pQ5VM1c8PVE1PubbNx3mjUr09OqWGCs= +github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575/go.mod h1:9d6lWj8KzO/fd/NrVaLscBKmPigpZpn5YawRPw+e3Yo= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/go-redis/redis/v7 v7.4.0 h1:7obg6wUoj05T0EpY0o8B59S9w5yeMWql7sw2kwNW1x4= +github.com/go-redis/redis/v7 v7.4.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo= +github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= +github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20200822124328-c89045814202 h1:VvcQYSHwXgi7W+TpUR6A9g6Up98WAHf3f/ulnJ62IyA= -golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b h1:uwuIcX0g4Yl1NC5XAz37xsr2lTtcqevgzYNVt49waME= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -vitess.io/vitess v3.0.0-rc.3.0.20181212200900-e2c5239f54d1+incompatible h1:TCG4ZGCiFNr7XGP8nhT++5Wwi1jRC6Xk9IPxZiBQXB0= -vitess.io/vitess v3.0.0-rc.3.0.20181212200900-e2c5239f54d1+incompatible/go.mod h1:h4qvkyNYTOC0xI+vcidSWoka0gQAZc9ZPHbkHo48gP0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= From 2346ce1486c721b430c9abd66ba8555aed39650e Mon Sep 17 00:00:00 2001 From: xescugc Date: Mon, 7 Jun 2021 11:18:55 +0200 Subject: [PATCH 04/16] CHANGELOG: Add entry for Redis client migration --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b26bdf1..f59f07b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## [Unreleased] +- Moved from `redigo` to `go-redis` + ([Issue benmanns/goworker#69](https://github.com/benmanns/goworker/issues/69)) + ## [0.1.4] _2021-06-07_ Fork from https://github.com/benmanns/goworker from master beeing https://github.com/benmanns/goworker/commit/d28a4f34a4d183f3ea2e51b4b8268807e0984942 From 106314aad3ec0cd18607ccd13cf97b5565ec908f Mon Sep 17 00:00:00 2001 From: xescugc Date: Fri, 4 Jun 2021 18:22:32 +0200 Subject: [PATCH 05/16] worker: Add logic for heatbeat and prune This logic is ported from the Ruby. It allows each worker to heartbeat Redis so if everything is killed instead of keeping them on the DB it'll try to prun them after a while (5'). The only logic ont ported from Ruby is the one about checking the PID. --- CHANGELOG.md | 7 +++ process.go | 10 +++++ worker.go | 117 ++++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 133 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f59f07b..34bcada 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ ## [Unreleased] +### Added + +- Added heartbeat and prune functions to clean stuck workers + ([Issue benmanns/goworker#65](https://github.com/benmanns/goworker/issues/65)) + +### Changed + - Moved from `redigo` to `go-redis` ([Issue benmanns/goworker#69](https://github.com/benmanns/goworker/issues/69)) diff --git a/process.go b/process.go index 8ded419..9c999f9 100644 --- a/process.go +++ b/process.go @@ -51,6 +51,11 @@ func (p *process) open(c *redis.Client) error { return err } + err = c.HSet(fmt.Sprintf("%s%s", workerSettings.Namespace, heartbeatKey), p.String(), time.Now().Format(time.RFC3339)).Err() + if err != nil { + return err + } + return nil } @@ -71,6 +76,11 @@ func (p *process) close(c *redis.Client) error { return err } + err = c.HDel(fmt.Sprintf("%s%s", workerSettings.Namespace, heartbeatKey), p.String()).Err() + if err != nil { + return err + } + return nil } diff --git a/worker.go b/worker.go index c8db26f..1ea4c6a 100644 --- a/worker.go +++ b/worker.go @@ -4,14 +4,25 @@ import ( "encoding/json" "errors" "fmt" + "strconv" + "strings" "sync" "time" "github.com/go-redis/redis/v7" ) +const ( + heartbeatInterval = time.Minute + heartbeatKey = "workers:heartbeat" + keyForWorkersPruning = "pruning_dead_workers_in_progress" + pruneInterval = heartbeatInterval * 5 +) + type worker struct { process + + heartbeatTicker *time.Ticker } func newWorker(id string, queues []string) (*worker, error) { @@ -20,7 +31,8 @@ func newWorker(id string, queues []string) (*worker, error) { return nil, err } return &worker{ - process: *process, + process: *process, + heartbeatTicker: time.NewTicker(heartbeatInterval), }, nil } @@ -50,6 +62,104 @@ func (w *worker) start(c *redis.Client, job *Job) error { return w.process.start(c) } +func (w *worker) startHeartbeat(c *redis.Client) { + go func() { + for { + select { + case <-w.heartbeatTicker.C: + err := c.HSet(fmt.Sprintf("%s%s", workerSettings.Namespace, heartbeatKey), w.process.String(), time.Now().Format(time.RFC3339)).Err() + if err != nil { + logger.Criticalf("Error on setting hearbeat: %v", err) + return + } + } + } + }() +} + +func (w *worker) pruneDeadWorkers(c *redis.Client) { + // Block with set+nx+ex + ok, err := c.SetNX(fmt.Sprintf("%s%s", workerSettings.Namespace, keyForWorkersPruning), w.String(), heartbeatInterval).Result() + if err != nil { + logger.Criticalf("Error on setting lock to prune workers: %v", err) + return + } + + if !ok { + return + } + // Get all workers + workers, err := c.SMembers(fmt.Sprintf("%sworkers", workerSettings.Namespace)).Result() + if err != nil { + logger.Criticalf("Error on getting list of all workers: %v", err) + return + } + + // Get all workers that have sent a heartbeat and now is expired + heartbeatWorkers, err := c.HGetAll(fmt.Sprintf("%s%s", workerSettings.Namespace, heartbeatKey)).Result() + if err != nil { + logger.Criticalf("Error on getting list of all workers with heartbeat: %v", err) + return + } + + hearbeatExpiredWorkers := make(map[string]struct{}) + for k, v := range heartbeatWorkers { + if v == "" { + continue + } + + t, err := time.Parse(time.RFC3339, v) + if err != nil { + logger.Criticalf("Error on parsing the time of %q: %v", v, err) + return + } + + if time.Since(t) > pruneInterval { + hearbeatExpiredWorkers[k] = struct{}{} + } + } + + // If a worker is on the expired list kill it + for _, w := range workers { + if _, ok := hearbeatExpiredWorkers[w]; ok { + logger.Infof("Pruning dead worker %q", w) + + parts := strings.Split(w, ":") + pidAndID := strings.Split(parts[1], "-") + pid, _ := strconv.Atoi(pidAndID[0]) + wp := process{ + Hostname: parts[0], + Pid: int(pid), + ID: pidAndID[1], + Queues: strings.Split(parts[2], ","), + } + + bwork, err := c.Get(fmt.Sprintf("%sworker:%s", workerSettings.Namespace, wp.String())).Bytes() + if err != nil { + logger.Criticalf("Error on getting worker work for pruning: %v", err) + return + } + if bwork != nil { + var work = work{} + err = json.Unmarshal(bwork, &work) + if err != nil { + logger.Criticalf("Error unmarshaling worker job: %v", err) + return + } + + // If it has a job flag it as failed + wk := worker{process: wp} + wk.fail(c, &Job{ + Queue: work.Queue, + Payload: work.Payload, + }, fmt.Errorf("Worker %s did not gracefully exit while processing %s", wk.process.String(), work.Payload.Class)) + } + + wp.close(c) + } + } +} + func (w *worker) fail(c *redis.Client, job *Job, err error) error { failure := &failure{ FailedAt: time.Now(), @@ -108,6 +218,11 @@ func (w *worker) work(jobs <-chan *Job, monitor *sync.WaitGroup) { return } + w.startHeartbeat(client) + defer w.heartbeatTicker.Stop() + + w.pruneDeadWorkers(client) + monitor.Add(1) go func() { From 1e9cce2195a28de8662fd96f30699b416d58a7db Mon Sep 17 00:00:00 2001 From: xescugc Date: Tue, 8 Jun 2021 16:47:45 +0200 Subject: [PATCH 06/16] CHANGELOG: Bump version 0.1.5 --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 34bcada..af6bd81 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ ## [Unreleased] +## [0.1.5] _2021-06-08_ + ### Added - Added heartbeat and prune functions to clean stuck workers From f85b442a1213413760f2f5124d9e6a93e2707664 Mon Sep 17 00:00:00 2001 From: xescugc Date: Tue, 8 Jun 2021 17:05:08 +0200 Subject: [PATCH 07/16] CHANGELOG: Quick release to change the module definition Otherwise it cannot be imported from the outseide --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index af6bd81..9f75f44 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ ## [Unreleased] +## [0.1.6] _2021-06-08_ + +### Changed + +- Change the module definition from `github.com/benmanns/goworker` to `github.com/cycloidio/goworker` + ([PR #3](https://github.com/cycloidio/goworker/pull/3)) + ## [0.1.5] _2021-06-08_ ### Added From e149b39569c737e5e5774d0a87e831399c855ded Mon Sep 17 00:00:00 2001 From: xescugc Date: Wed, 9 Jun 2021 10:23:11 +0200 Subject: [PATCH 08/16] worker: Fix issue with go-redis syntax When redis returns nil they return an error with an specific logic to check it with --- CHANGELOG.md | 5 +++++ worker.go | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f75f44..723626e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ ## [Unreleased] +### Fixed + +- Solved error when trying to prune workers with `go-redis` + ([PR #5](https://github.com/cycloidio/goworker/pull/5)) + ## [0.1.6] _2021-06-08_ ### Changed diff --git a/worker.go b/worker.go index 1ea4c6a..462d88d 100644 --- a/worker.go +++ b/worker.go @@ -135,7 +135,7 @@ func (w *worker) pruneDeadWorkers(c *redis.Client) { } bwork, err := c.Get(fmt.Sprintf("%sworker:%s", workerSettings.Namespace, wp.String())).Bytes() - if err != nil { + if err != nil && err != redis.Nil { logger.Criticalf("Error on getting worker work for pruning: %v", err) return } From f5808fc47947f77e9963abc08ec14cab0794a883 Mon Sep 17 00:00:00 2001 From: xescugc Date: Wed, 9 Jun 2021 10:26:49 +0200 Subject: [PATCH 09/16] CHANGELOG: Bump release --- CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 723626e..8cb2e16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,11 @@ ## [Unreleased] +## [0.1.7] _2021-06-09_ + ### Fixed - Solved error when trying to prune workers with `go-redis` - ([PR #5](https://github.com/cycloidio/goworker/pull/5)) + ([PR #4](https://github.com/cycloidio/goworker/pull/4)) ## [0.1.6] _2021-06-08_ From 41602e29c205fab93732b03d17212ba248088b17 Mon Sep 17 00:00:00 2001 From: xescugc Date: Fri, 11 Jun 2021 12:52:31 +0200 Subject: [PATCH 10/16] goworker: Added a function 'Closed()' that will return when the process fully closed It's useful when you want to exactly know when the worker has fully stopped and cleaned. This can be useful in cases in which you have to block something until the worker is closed for example if the worker is in goroutines and the main process is killed you could end up with workers not beeing cleaned so using this function would avoid this. --- CHANGELOG.md | 5 +++++ goworker.go | 12 ++++++++++++ 2 files changed, 17 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8cb2e16..0aed525 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ ## [Unreleased] +### Added + +- Closed function to be able to wait for the workers to fully finish + ([PR #6](https://github.com/cycloidio/goworker/issues/6)) + ## [0.1.7] _2021-06-09_ ### Fixed diff --git a/goworker.go b/goworker.go index 835cc78..4b1ca64 100644 --- a/goworker.go +++ b/goworker.go @@ -41,6 +41,8 @@ type WorkerSettings struct { UseNumber bool SkipTLSVerify bool TLSCertPath string + + closed chan struct{} } func SetSettings(settings WorkerSettings) { @@ -88,6 +90,8 @@ func Init() error { return err } + workerSettings.closed = make(chan struct{}) + initialized = true } @@ -123,11 +127,19 @@ func Close() error { return err } initialized = false + close(workerSettings.closed) } return nil } +// Closed will return a channel that will be +// closed once the full process is done closing +// and cleaning all the workers +func Closed() <-chan struct{} { + return workerSettings.closed +} + // Work starts the goworker process. Check for errors in // the return value. Work will take over the Go executable // and will run until a QUIT, INT, or TERM signal is From e2d25d610943587812a8eebabc8fec2360805f01 Mon Sep 17 00:00:00 2001 From: xescugc Date: Fri, 11 Jun 2021 16:22:23 +0200 Subject: [PATCH 11/16] CHANGELOG: Bumped to 0.1.8 --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0aed525..0531416 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ ## [Unreleased] +## [0.1.8] _2021-06-11_ + ### Added - Closed function to be able to wait for the workers to fully finish From fbbb9c4f920b9205f24abfb9ca7b35211fc1a79a Mon Sep 17 00:00:00 2001 From: xescugc Date: Thu, 1 Jul 2021 13:09:10 +0200 Subject: [PATCH 12/16] goworker: Added 'MaxAgeRetries' option to the Goworker This option is useful to automatically remove retried failed jobs from the 'failed' queue that exceede that duration, this check will be done every 1m. Also changed the 'failed.FailedAt' and added 'failed.RetriedAt' and switched them to type string. The main reason is that the Ruby lib is setting those values in an specific format and Ruby can read multiple formats into one, but GO cannot and we need to actually use the same ones or the unmarshaler does not work so I decided to switch them to 'string' and add helpers to set/get the values that will directly convert them. All the logic has more or less been ported from the Ruby version, on how to remove failed jobs and how the data is stored, as the 'MaxAgeRetries' is something unique from this GO version --- CHANGELOG.md | 5 ++++ failure.go | 39 ++++++++++++++++++++------ flags.go | 9 ++++++ goworker.go | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++++ worker.go | 20 +++++++++++++- 5 files changed, 141 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0531416..85e8572 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ ## [Unreleased] +### Added + +- Flag `-max-age-retries` to remove retried failed jobs after that duration + ([PR #9](https://github.com/cycloidio/goworker/pull/9)) + ## [0.1.8] _2021-06-11_ ### Added diff --git a/failure.go b/failure.go index c6349f5..df97ab0 100644 --- a/failure.go +++ b/failure.go @@ -1,15 +1,36 @@ package goworker -import ( - "time" +import "time" + +const ( + retriedAtLayout = "2006/01/02 15:04:05" + failedAtLayout = "2006/01/02 15:04:05 -07:00" ) type failure struct { - FailedAt time.Time `json:"failed_at"` - Payload Payload `json:"payload"` - Exception string `json:"exception"` - Error string `json:"error"` - Backtrace []string `json:"backtrace"` - Worker *worker `json:"worker"` - Queue string `json:"queue"` + FailedAt string `json:"failed_at"` + Payload Payload `json:"payload"` + Exception string `json:"exception"` + Error string `json:"error"` + Backtrace []string `json:"backtrace"` + Worker *worker `json:"worker"` + Queue string `json:"queue"` + RetriedAt string `json:"retried_at"` +} + +// GetRetriedAtTime returns the RetriedAt as a time.Time +// converting it from the string. If it's not set it'll return +// an empty time.Time +func (f *failure) GetRetriedAtTime() (time.Time, error) { + if f.RetriedAt == "" { + return time.Time{}, nil + } + + return time.Parse(retriedAtLayout, f.RetriedAt) +} + +// SetFailedAt will set the FailedAt value with t with +// the right format +func (f *failure) SetFailedAt(t time.Time) { + f.FailedAt = t.Format(failedAtLayout) } diff --git a/flags.go b/flags.go index ce82412..9cbef62 100644 --- a/flags.go +++ b/flags.go @@ -77,6 +77,13 @@ // encoded in scientific notation, losing // pecision. This will default to true soon. // +// -max-age-retries=1s +// — This flag will enable a feature to automatically +// clean the retried failed jobs older than the +// specified max age/duration (time.Duration). +// By default is disabled if enabled it'll +// check every 1m for old retries. +// // You can also configure your own flags for use // within your workers. Be sure to set them // before calling goworker.Main(). It is okay to @@ -128,6 +135,8 @@ func init() { flag.BoolVar(&workerSettings.UseNumber, "use-number", false, "use json.Number instead of float64 when decoding numbers in JSON. will default to true soon") flag.BoolVar(&workerSettings.SkipTLSVerify, "insecure-tls", false, "skip TLS validation") + + flag.DurationVar(&workerSettings.MaxAgeRetries, "max-age-retries", 0, "max age of the retried failed jobs before cleaning them") } func flags() error { diff --git a/goworker.go b/goworker.go index 4b1ca64..d02db18 100644 --- a/goworker.go +++ b/goworker.go @@ -3,6 +3,7 @@ package goworker import ( "crypto/tls" "crypto/x509" + "encoding/json" "fmt" "io/ioutil" "os" @@ -25,6 +26,14 @@ var ( initialized bool ) +const ( + keyForCleaningExpiredRetries = "cleaning_expired_retried_in_progress" +) + +var ( + cleaningExpiredRetriesInterval = time.Minute +) + var workerSettings WorkerSettings type WorkerSettings struct { @@ -41,6 +50,7 @@ type WorkerSettings struct { UseNumber bool SkipTLSVerify bool TLSCertPath string + MaxAgeRetries time.Duration closed chan struct{} } @@ -173,7 +183,75 @@ func Work() error { worker.work(jobs, &monitor) } + if hasToCleanRetries() { + cleanExpiredRetryTicker := time.NewTicker(cleaningExpiredRetriesInterval) + waitChan := make(chan struct{}) + go func() { + monitor.Wait() + close(waitChan) + }() + for { + select { + case <-cleanExpiredRetryTicker.C: + cleanExpiredRetries() + case <-waitChan: + cleanExpiredRetryTicker.Stop() + return nil + } + } + } + monitor.Wait() return nil } + +func hasToCleanRetries() bool { + return workerSettings.MaxAgeRetries != 0 +} + +func cleanExpiredRetries() { + // This is used to set a lock so this operation is not done by more than 1 worker at the same time + ok, err := client.SetNX(fmt.Sprintf("%s%s", workerSettings.Namespace, keyForCleaningExpiredRetries), os.Getpid(), cleaningExpiredRetriesInterval/2).Result() + if err != nil { + logger.Criticalf("Error on setting lock to clean retries: %v", err) + return + } + + if !ok { + return + } + + failures, err := client.LRange(fmt.Sprintf("%sfailed", workerSettings.Namespace), 0, -1).Result() + if err != nil { + logger.Criticalf("Error on getting list of all failed jobs: %v", err) + return + } + + for i, fail := range failures { + var f failure + err = json.Unmarshal([]byte(fail), &f) + if err != nil { + logger.Criticalf("Error on unmarshaling failure: %v", err) + return + } + ra, err := f.GetRetriedAtTime() + if err != nil { + logger.Criticalf("Error on GetRetriedAtTime of failure job %q: %v", fail, err) + return + } + if ra == *new(time.Time) { + continue + } + + // If the RetryAt has exceeded the MaxAgeRetries then we'll + // remove the job from the list of failed jobs + if ra.Add(workerSettings.MaxAgeRetries).Before(time.Now()) { + hopefullyUniqueValueWeCanUseToDeleteJob := "" + // This logic what it does it replace first the value (with the LSet) and then remove the first + // occurrence on the failed queue of the replaced value. This value is the 'hopefullyUniqueValueWeCanUseToDeleteJob' + client.LSet(fmt.Sprintf("%sfailed", workerSettings.Namespace), int64(i), hopefullyUniqueValueWeCanUseToDeleteJob) + client.LRem(fmt.Sprintf("%sfailed", workerSettings.Namespace), 1, hopefullyUniqueValueWeCanUseToDeleteJob) + } + } +} diff --git a/worker.go b/worker.go index 462d88d..ee106b6 100644 --- a/worker.go +++ b/worker.go @@ -36,10 +36,28 @@ func newWorker(id string, queues []string) (*worker, error) { }, nil } +// MarshalJSON marshals the worker into a []byte func (w *worker) MarshalJSON() ([]byte, error) { return json.Marshal(w.String()) } +// UnmarshalJSON converts the b into a woker +func (w *worker) UnmarshalJSON(b []byte) error { + s := string(b) + parts := strings.Split(s, ":") + pidAndID := strings.Split(parts[1], "-") + pid, _ := strconv.Atoi(pidAndID[0]) + w = &worker{ + process: process{ + Hostname: parts[0], + Pid: int(pid), + ID: pidAndID[1], + Queues: strings.Split(parts[2], ","), + }, + } + return nil +} + func (w *worker) start(c *redis.Client, job *Job) error { work := &work{ Queue: job.Queue, @@ -162,13 +180,13 @@ func (w *worker) pruneDeadWorkers(c *redis.Client) { func (w *worker) fail(c *redis.Client, job *Job, err error) error { failure := &failure{ - FailedAt: time.Now(), Payload: job.Payload, Exception: "Error", Error: err.Error(), Worker: w, Queue: job.Queue, } + failure.SetFailedAt(time.Now()) buffer, err := json.Marshal(failure) if err != nil { return err From 48ca555b6a935eed5ef714ecf9c486b3f2efc1b0 Mon Sep 17 00:00:00 2001 From: xescugc Date: Wed, 30 Jun 2021 16:38:00 +0200 Subject: [PATCH 13/16] worker: New option ForcePrune to clean workers not on the heartbeat This will remove workers thare could be stuck but it's not backwards compatible if enabled --- CHANGELOG.md | 4 +++- flags.go | 9 +++++++++ goworker.go | 11 +++++++++++ process.go | 1 + worker.go | 10 +++++++--- 5 files changed, 31 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 85e8572..9baea2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,13 +4,15 @@ - Flag `-max-age-retries` to remove retried failed jobs after that duration ([PR #9](https://github.com/cycloidio/goworker/pull/9)) +- New flag `-force-prune` to remove workers not on the heartbeat list + ([PR #8](https://github.com/cycloidio/goworker/pull/8)) ## [0.1.8] _2021-06-11_ ### Added - Closed function to be able to wait for the workers to fully finish - ([PR #6](https://github.com/cycloidio/goworker/issues/6)) + ([PR #6](https://github.com/cycloidio/goworker/pull/6)) ## [0.1.7] _2021-06-09_ diff --git a/flags.go b/flags.go index 9cbef62..2cba552 100644 --- a/flags.go +++ b/flags.go @@ -83,6 +83,13 @@ // specified max age/duration (time.Duration). // By default is disabled if enabled it'll // check every 1m for old retries. +// -force-prune=false +// — Will prune all workers that are not inside +// of the heartbeat set, not just the expired ones. +// This option is not compatible with older +// versions of Resque (any port) as older versions +// may not have heartbeat so this would delete +// real working workers. // // You can also configure your own flags for use // within your workers. Be sure to set them @@ -137,6 +144,8 @@ func init() { flag.BoolVar(&workerSettings.SkipTLSVerify, "insecure-tls", false, "skip TLS validation") flag.DurationVar(&workerSettings.MaxAgeRetries, "max-age-retries", 0, "max age of the retried failed jobs before cleaning them") + + flag.BoolVar(&workerSettings.ForcePrune, "force-prune", false, "Forced the deletion of workers that are not present on the heartbeat set. WARNING: This is not compatible with older versions of Resque (any port) that do not have heartbeat as it'll then delete working workers.") } func flags() error { diff --git a/goworker.go b/goworker.go index d02db18..948ef87 100644 --- a/goworker.go +++ b/goworker.go @@ -51,6 +51,7 @@ type WorkerSettings struct { SkipTLSVerify bool TLSCertPath string MaxAgeRetries time.Duration + ForcePrune bool closed chan struct{} } @@ -174,15 +175,25 @@ func Work() error { } var monitor sync.WaitGroup + var wk *worker for id := 0; id < workerSettings.Concurrency; id++ { worker, err := newWorker(strconv.Itoa(id), workerSettings.Queues) if err != nil { return err } + if wk != nil { + wk = worker + } worker.work(jobs, &monitor) } + // Once all the workers have started we prune the dead ones + // this way we prevent from pruning workers that have just + // started and not registered to the Heartbeat in case + // of ForcePrune is enabled. + wk.pruneDeadWorkers(client) + if hasToCleanRetries() { cleanExpiredRetryTicker := time.NewTicker(cleaningExpiredRetriesInterval) waitChan := make(chan struct{}) diff --git a/process.go b/process.go index 9c999f9..31323c4 100644 --- a/process.go +++ b/process.go @@ -51,6 +51,7 @@ func (p *process) open(c *redis.Client) error { return err } + // We set the heartbeat as the first thing err = c.HSet(fmt.Sprintf("%s%s", workerSettings.Namespace, heartbeatKey), p.String(), time.Now().Format(time.RFC3339)).Err() if err != nil { return err diff --git a/worker.go b/worker.go index ee106b6..d86d8a3 100644 --- a/worker.go +++ b/worker.go @@ -139,7 +139,13 @@ func (w *worker) pruneDeadWorkers(c *redis.Client) { // If a worker is on the expired list kill it for _, w := range workers { - if _, ok := hearbeatExpiredWorkers[w]; ok { + _, hbeok := hearbeatExpiredWorkers[w] + _, hbok := heartbeatWorkers[w] + // We want to prune workers that: + // * Are expired + // * Are not on the heartbeat set and ForcePrune is set + // If they are neither of those then we do not want to expire them + if hbeok || (!hbok && workerSettings.ForcePrune) { logger.Infof("Pruning dead worker %q", w) parts := strings.Split(w, ":") @@ -239,8 +245,6 @@ func (w *worker) work(jobs <-chan *Job, monitor *sync.WaitGroup) { w.startHeartbeat(client) defer w.heartbeatTicker.Stop() - w.pruneDeadWorkers(client) - monitor.Add(1) go func() { From 5a95af9ae4c95a2be1d8803161f0080453195f44 Mon Sep 17 00:00:00 2001 From: xescugc Date: Fri, 30 Jul 2021 12:07:26 +0200 Subject: [PATCH 14/16] CHANGELOG: Bump version --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9baea2e..077ea04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ ## [Unreleased] +## [0.1.9] _2021-07-30_ + ### Added - Flag `-max-age-retries` to remove retried failed jobs after that duration From afc4ad74d48e4bbdc2a16636b444edc3ba33c04b Mon Sep 17 00:00:00 2001 From: xescugc Date: Thu, 26 Aug 2021 16:04:56 +0200 Subject: [PATCH 15/16] goworker: Fixed null pointer exception when running prune For some reason with previous tests this was not found but now it's fixed --- CHANGELOG.md | 7 +++++++ goworker.go | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 077ea04..b376bba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ ## [Unreleased] +## [0.1.10] _2021-08-26_ + +### Fixed + +- Nil pointer fixed when executing the workers + ([PR #11](https://github.com/cycloidio/goworker/pull/11)) + ## [0.1.9] _2021-07-30_ ### Added diff --git a/goworker.go b/goworker.go index 948ef87..be2ef02 100644 --- a/goworker.go +++ b/goworker.go @@ -182,7 +182,7 @@ func Work() error { if err != nil { return err } - if wk != nil { + if wk == nil { wk = worker } worker.work(jobs, &monitor) From c24f222e0da5559ad6c9a7fde3c40bc8d5d00a4c Mon Sep 17 00:00:00 2001 From: Dmitry Shalashov Date: Tue, 26 Jul 2022 19:49:37 +0300 Subject: [PATCH 16/16] chore: go mod tidy --- go.sum | 5 ----- 1 file changed, 5 deletions(-) diff --git a/go.sum b/go.sum index 3faafc2..c2d4190 100644 --- a/go.sum +++ b/go.sum @@ -1,18 +1,14 @@ github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 h1:kHaBemcxl8o/pQ5VM1c8PVE1PubbNx3mjUr09OqWGCs= github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575/go.mod h1:9d6lWj8KzO/fd/NrVaLscBKmPigpZpn5YawRPw+e3Yo= -github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/go-redis/redis/v7 v7.4.0 h1:7obg6wUoj05T0EpY0o8B59S9w5yeMWql7sw2kwNW1x4= github.com/go-redis/redis/v7 v7.4.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo= @@ -39,7 +35,6 @@ golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=