diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..b376bba --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,54 @@ +## [Unreleased] + +## [0.1.10] _2021-08-26_ + +### Fixed + +- Nil pointer fixed when executing the workers + ([PR #11](https://github.com/cycloidio/goworker/pull/11)) + +## [0.1.9] _2021-07-30_ + +### Added + +- Flag `-max-age-retries` to remove retried failed jobs after that duration + ([PR #9](https://github.com/cycloidio/goworker/pull/9)) +- New flag `-force-prune` to remove workers not on the heartbeat list + ([PR #8](https://github.com/cycloidio/goworker/pull/8)) + +## [0.1.8] _2021-06-11_ + +### Added + +- Closed function to be able to wait for the workers to fully finish + ([PR #6](https://github.com/cycloidio/goworker/pull/6)) + +## [0.1.7] _2021-06-09_ + +### Fixed + +- Solved error when trying to prune workers with `go-redis` + ([PR #4](https://github.com/cycloidio/goworker/pull/4)) + +## [0.1.6] _2021-06-08_ + +### Changed + +- Change the module definition from `github.com/benmanns/goworker` to `github.com/cycloidio/goworker` + ([PR #3](https://github.com/cycloidio/goworker/pull/3)) + +## [0.1.5] _2021-06-08_ + +### Added + +- Added heartbeat and prune functions to clean stuck workers + ([Issue benmanns/goworker#65](https://github.com/benmanns/goworker/issues/65)) + +### Changed + +- Moved from `redigo` to `go-redis` + ([Issue benmanns/goworker#69](https://github.com/benmanns/goworker/issues/69)) + +## [0.1.4] _2021-06-07_ + +Fork from https://github.com/benmanns/goworker from master beeing https://github.com/benmanns/goworker/commit/d28a4f34a4d183f3ea2e51b4b8268807e0984942 diff --git a/failure.go b/failure.go index c6349f5..df97ab0 100644 --- a/failure.go +++ b/failure.go @@ -1,15 +1,36 @@ package goworker -import ( - "time" +import "time" + +const ( + retriedAtLayout = "2006/01/02 15:04:05" + failedAtLayout = "2006/01/02 15:04:05 -07:00" ) type failure struct { - FailedAt time.Time `json:"failed_at"` - Payload Payload `json:"payload"` - Exception string `json:"exception"` - Error string `json:"error"` - Backtrace []string `json:"backtrace"` - Worker *worker `json:"worker"` - Queue string `json:"queue"` + FailedAt string `json:"failed_at"` + Payload Payload `json:"payload"` + Exception string `json:"exception"` + Error string `json:"error"` + Backtrace []string `json:"backtrace"` + Worker *worker `json:"worker"` + Queue string `json:"queue"` + RetriedAt string `json:"retried_at"` +} + +// GetRetriedAtTime returns the RetriedAt as a time.Time +// converting it from the string. If it's not set it'll return +// an empty time.Time +func (f *failure) GetRetriedAtTime() (time.Time, error) { + if f.RetriedAt == "" { + return time.Time{}, nil + } + + return time.Parse(retriedAtLayout, f.RetriedAt) +} + +// SetFailedAt will set the FailedAt value with t with +// the right format +func (f *failure) SetFailedAt(t time.Time) { + f.FailedAt = t.Format(failedAtLayout) } diff --git a/flags.go b/flags.go index ce82412..2cba552 100644 --- a/flags.go +++ b/flags.go @@ -77,6 +77,20 @@ // encoded in scientific notation, losing // pecision. This will default to true soon. // +// -max-age-retries=1s +// — This flag will enable a feature to automatically +// clean the retried failed jobs older than the +// specified max age/duration (time.Duration). +// By default is disabled if enabled it'll +// check every 1m for old retries. +// -force-prune=false +// — Will prune all workers that are not inside +// of the heartbeat set, not just the expired ones. +// This option is not compatible with older +// versions of Resque (any port) as older versions +// may not have heartbeat so this would delete +// real working workers. +// // You can also configure your own flags for use // within your workers. Be sure to set them // before calling goworker.Main(). It is okay to @@ -128,6 +142,10 @@ func init() { flag.BoolVar(&workerSettings.UseNumber, "use-number", false, "use json.Number instead of float64 when decoding numbers in JSON. will default to true soon") flag.BoolVar(&workerSettings.SkipTLSVerify, "insecure-tls", false, "skip TLS validation") + + flag.DurationVar(&workerSettings.MaxAgeRetries, "max-age-retries", 0, "max age of the retried failed jobs before cleaning them") + + flag.BoolVar(&workerSettings.ForcePrune, "force-prune", false, "Forced the deletion of workers that are not present on the heartbeat set. WARNING: This is not compatible with older versions of Resque (any port) that do not have heartbeat as it'll then delete working workers.") } func flags() error { diff --git a/go.mod b/go.mod index 00a08f7..7d6b57c 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,7 @@ module github.com/skaurus/goworker go 1.18 require ( - github.com/cihub/seelog v0.0.0-20140730094913-72ae425987bc - github.com/gomodule/redigo v1.8.2 - golang.org/x/net v0.0.0-20200822124328-c89045814202 - vitess.io/vitess v3.0.0-rc.3.0.20181212200900-e2c5239f54d1+incompatible + github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 + github.com/go-redis/redis/v7 v7.4.0 + golang.org/x/net v0.0.0-20201110031124-69a78807bb2b ) diff --git a/go.sum b/go.sum index da88c49..c2d4190 100644 --- a/go.sum +++ b/go.sum @@ -1,25 +1,45 @@ -github.com/cihub/seelog v0.0.0-20140730094913-72ae425987bc h1:HSZdsOzV0MO6cEcf31hZoT6KJGI806Z523bkYPDwkQs= -github.com/cihub/seelog v0.0.0-20140730094913-72ae425987bc/go.mod h1:9d6lWj8KzO/fd/NrVaLscBKmPigpZpn5YawRPw+e3Yo= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/gomodule/redigo v1.8.2 h1:H5XSIre1MB5NbPYFp+i1NBbb5qN1W8Y8YAQoAYbkm8k= -github.com/gomodule/redigo v1.8.2/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= -github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 h1:kHaBemcxl8o/pQ5VM1c8PVE1PubbNx3mjUr09OqWGCs= +github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575/go.mod h1:9d6lWj8KzO/fd/NrVaLscBKmPigpZpn5YawRPw+e3Yo= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/go-redis/redis/v7 v7.4.0 h1:7obg6wUoj05T0EpY0o8B59S9w5yeMWql7sw2kwNW1x4= +github.com/go-redis/redis/v7 v7.4.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo= +github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= +github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20200822124328-c89045814202 h1:VvcQYSHwXgi7W+TpUR6A9g6Up98WAHf3f/ulnJ62IyA= -golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b h1:uwuIcX0g4Yl1NC5XAz37xsr2lTtcqevgzYNVt49waME= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -vitess.io/vitess v3.0.0-rc.3.0.20181212200900-e2c5239f54d1+incompatible h1:TCG4ZGCiFNr7XGP8nhT++5Wwi1jRC6Xk9IPxZiBQXB0= -vitess.io/vitess v3.0.0-rc.3.0.20181212200900-e2c5239f54d1+incompatible/go.mod h1:h4qvkyNYTOC0xI+vcidSWoka0gQAZc9ZPHbkHo48gP0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/goworker.go b/goworker.go index cf2faad..be2ef02 100644 --- a/goworker.go +++ b/goworker.go @@ -1,25 +1,39 @@ package goworker import ( + "crypto/tls" + "crypto/x509" + "encoding/json" + "fmt" + "io/ioutil" "os" "strconv" "sync" "time" + "github.com/go-redis/redis/v7" + "golang.org/x/net/context" "github.com/cihub/seelog" - "vitess.io/vitess/go/pools" ) var ( logger seelog.LoggerInterface - pool *pools.ResourcePool + client *redis.Client ctx context.Context initMutex sync.Mutex initialized bool ) +const ( + keyForCleaningExpiredRetries = "cleaning_expired_retried_in_progress" +) + +var ( + cleaningExpiredRetriesInterval = time.Minute +) + var workerSettings WorkerSettings type WorkerSettings struct { @@ -36,6 +50,10 @@ type WorkerSettings struct { UseNumber bool SkipTLSVerify bool TLSCertPath string + MaxAgeRetries time.Duration + ForcePrune bool + + closed chan struct{} } func SetSettings(settings WorkerSettings) { @@ -61,54 +79,76 @@ func Init() error { } ctx = context.Background() - pool = newRedisPool(workerSettings.URI, workerSettings.Connections, workerSettings.Connections, time.Minute) + opts, err := redis.ParseURL(workerSettings.URI) + if err != nil { + return err + } + + if len(workerSettings.TLSCertPath) > 0 { + certPool, err := getCertPool() + if err != nil { + return err + } + opts.TLSConfig = &tls.Config{ + RootCAs: certPool, + InsecureSkipVerify: workerSettings.SkipTLSVerify, + } + } + + client = redis.NewClient(opts).WithContext(ctx) + err = client.Ping().Err() + if err != nil { + return err + } + + workerSettings.closed = make(chan struct{}) initialized = true } + return nil } -// GetConn returns a connection from the goworker Redis -// connection pool. When using the pool, check in -// connections as quickly as possible, because holding a -// connection will cause concurrent worker functions to lock -// while they wait for an available connection. Expect this -// API to change drastically. -func GetConn() (*RedisConn, error) { - resource, err := pool.Get(ctx) - +func getCertPool() (*x509.CertPool, error) { + rootCAs, _ := x509.SystemCertPool() + if rootCAs == nil { + rootCAs = x509.NewCertPool() + } + certs, err := ioutil.ReadFile(workerSettings.TLSCertPath) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to read %q for the RootCA pool: %v", workerSettings.TLSCertPath, err) } - return resource.(*RedisConn), nil -} - -// PutConn puts a connection back into the connection pool. -// Run this as soon as you finish using a connection that -// you got from GetConn. Expect this API to change -// drastically. -func PutConn(conn *RedisConn) { - pool.Put(conn) + if ok := rootCAs.AppendCertsFromPEM(certs); !ok { + return nil, fmt.Errorf("failed to append %q to the RootCA pool: %v", workerSettings.TLSCertPath, err) + } + return rootCAs, nil } // Close cleans up resources initialized by goworker. This // will be called by Work when cleaning up. However, if you // are using the Init function to access goworker functions // and configuration without processing jobs by calling -// Work, you should run this function when cleaning up. For -// example, -// -// if err := goworker.Init(); err != nil { -// fmt.Println("Error:", err) -// } -// defer goworker.Close() -func Close() { +// Work, you should run this function when cleaning up. +func Close() error { initMutex.Lock() defer initMutex.Unlock() if initialized { - pool.Close() + err := client.Close() + if err != nil { + return err + } initialized = false + close(workerSettings.closed) } + + return nil +} + +// Closed will return a channel that will be +// closed once the full process is done closing +// and cleaning all the workers +func Closed() <-chan struct{} { + return workerSettings.closed } // Work starts the goworker process. Check for errors in @@ -135,16 +175,94 @@ func Work() error { } var monitor sync.WaitGroup + var wk *worker for id := 0; id < workerSettings.Concurrency; id++ { worker, err := newWorker(strconv.Itoa(id), workerSettings.Queues) if err != nil { return err } + if wk == nil { + wk = worker + } worker.work(jobs, &monitor) } + // Once all the workers have started we prune the dead ones + // this way we prevent from pruning workers that have just + // started and not registered to the Heartbeat in case + // of ForcePrune is enabled. + wk.pruneDeadWorkers(client) + + if hasToCleanRetries() { + cleanExpiredRetryTicker := time.NewTicker(cleaningExpiredRetriesInterval) + waitChan := make(chan struct{}) + go func() { + monitor.Wait() + close(waitChan) + }() + for { + select { + case <-cleanExpiredRetryTicker.C: + cleanExpiredRetries() + case <-waitChan: + cleanExpiredRetryTicker.Stop() + return nil + } + } + } + monitor.Wait() return nil } + +func hasToCleanRetries() bool { + return workerSettings.MaxAgeRetries != 0 +} + +func cleanExpiredRetries() { + // This is used to set a lock so this operation is not done by more than 1 worker at the same time + ok, err := client.SetNX(fmt.Sprintf("%s%s", workerSettings.Namespace, keyForCleaningExpiredRetries), os.Getpid(), cleaningExpiredRetriesInterval/2).Result() + if err != nil { + logger.Criticalf("Error on setting lock to clean retries: %v", err) + return + } + + if !ok { + return + } + + failures, err := client.LRange(fmt.Sprintf("%sfailed", workerSettings.Namespace), 0, -1).Result() + if err != nil { + logger.Criticalf("Error on getting list of all failed jobs: %v", err) + return + } + + for i, fail := range failures { + var f failure + err = json.Unmarshal([]byte(fail), &f) + if err != nil { + logger.Criticalf("Error on unmarshaling failure: %v", err) + return + } + ra, err := f.GetRetriedAtTime() + if err != nil { + logger.Criticalf("Error on GetRetriedAtTime of failure job %q: %v", fail, err) + return + } + if ra == *new(time.Time) { + continue + } + + // If the RetryAt has exceeded the MaxAgeRetries then we'll + // remove the job from the list of failed jobs + if ra.Add(workerSettings.MaxAgeRetries).Before(time.Now()) { + hopefullyUniqueValueWeCanUseToDeleteJob := "" + // This logic what it does it replace first the value (with the LSet) and then remove the first + // occurrence on the failed queue of the replaced value. This value is the 'hopefullyUniqueValueWeCanUseToDeleteJob' + client.LSet(fmt.Sprintf("%sfailed", workerSettings.Namespace), int64(i), hopefullyUniqueValueWeCanUseToDeleteJob) + client.LRem(fmt.Sprintf("%sfailed", workerSettings.Namespace), 1, hopefullyUniqueValueWeCanUseToDeleteJob) + } + } +} diff --git a/poller.go b/poller.go index 54ad696..0f6737f 100644 --- a/poller.go +++ b/poller.go @@ -5,6 +5,8 @@ import ( "encoding/json" "fmt" "time" + + "github.com/go-redis/redis/v7" ) type poller struct { @@ -23,20 +25,24 @@ func newPoller(queues []string, isStrict bool) (*poller, error) { }, nil } -func (p *poller) getJob(conn *RedisConn) (*Job, error) { +func (p *poller) getJob(c *redis.Client) (*Job, error) { for _, queue := range p.queues(p.isStrict) { logger.Debugf("Checking %s", queue) - reply, err := conn.Do("LPOP", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, queue)) + result, err := c.LPop(fmt.Sprintf("%squeue:%s", workerSettings.Namespace, queue)).Result() if err != nil { + // no jobs for now, continue on another queue + if err == redis.Nil { + continue + } return nil, err } - if reply != nil { + if result != "" { logger.Debugf("Found job on %s", queue) job := &Job{Queue: queue} - decoder := json.NewDecoder(bytes.NewReader(reply.([]byte))) + decoder := json.NewDecoder(bytes.NewReader([]byte(result))) if workerSettings.UseNumber { decoder.UseNumber() } @@ -52,31 +58,29 @@ func (p *poller) getJob(conn *RedisConn) (*Job, error) { } func (p *poller) poll(interval time.Duration, quit <-chan bool) (<-chan *Job, error) { - jobs := make(chan *Job) + err := p.open(client) + if err != nil { + return nil, err + } - conn, err := GetConn() + err = p.start(client) if err != nil { - logger.Criticalf("Error on getting connection in poller %s: %v", p, err) - close(jobs) return nil, err - } else { - p.open(conn) - p.start(conn) - PutConn(conn) } + jobs := make(chan *Job) go func() { defer func() { close(jobs) - conn, err := GetConn() + err = p.finish(client) + if err != nil { + return + } + + err = p.close(client) if err != nil { - logger.Criticalf("Error on getting connection in poller %s: %v", p, err) return - } else { - p.finish(conn) - p.close(conn) - PutConn(conn) } }() @@ -85,22 +89,17 @@ func (p *poller) poll(interval time.Duration, quit <-chan bool) (<-chan *Job, er case <-quit: return default: - conn, err := GetConn() - if err != nil { - logger.Criticalf("Error on getting connection in poller %s: %v", p, err) - return - } - - job, err := p.getJob(conn) + job, err := p.getJob(client) if err != nil { - logger.Criticalf("Error on %v getting job from %v: %v", p, p.Queues, err) - PutConn(conn) + logger.Criticalf("Error on %v getting job from %v: %+v", p, p.Queues, err) return } if job != nil { - conn.Send("INCR", fmt.Sprintf("%sstat:processed:%v", workerSettings.Namespace, p)) - conn.Flush() - PutConn(conn) + err = client.Incr(fmt.Sprintf("%sstat:processed:%v", workerSettings.Namespace, p)).Err() + if err != nil { + return + } + select { case jobs <- job: case <-quit: @@ -109,19 +108,15 @@ func (p *poller) poll(interval time.Duration, quit <-chan bool) (<-chan *Job, er logger.Criticalf("Error requeueing %v: %v", job, err) return } - conn, err := GetConn() + + err = client.LPush(fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buf).Err() if err != nil { - logger.Criticalf("Error on getting connection in poller %s: %v", p, err) return } - conn.Send("LPUSH", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buf) - conn.Flush() - PutConn(conn) return } } else { - PutConn(conn) if workerSettings.ExitOnComplete { return } diff --git a/process.go b/process.go index 82bcd9e..31323c4 100644 --- a/process.go +++ b/process.go @@ -6,6 +6,8 @@ import ( "os" "strings" "time" + + "github.com/go-redis/redis/v7" ) type process struct { @@ -33,44 +35,89 @@ func (p *process) String() string { return fmt.Sprintf("%s:%d-%s:%s", p.Hostname, p.Pid, p.ID, strings.Join(p.Queues, ",")) } -func (p *process) open(conn *RedisConn) error { - conn.Send("SADD", fmt.Sprintf("%sworkers", workerSettings.Namespace), p) - conn.Send("SET", fmt.Sprintf("%sstat:processed:%v", workerSettings.Namespace, p), "0") - conn.Send("SET", fmt.Sprintf("%sstat:failed:%v", workerSettings.Namespace, p), "0") - conn.Flush() +func (p *process) open(c *redis.Client) error { + err := c.SAdd(fmt.Sprintf("%sworkers", workerSettings.Namespace), p.String()).Err() + if err != nil { + return err + } + + err = c.Set(fmt.Sprintf("%sstat:processed:%v", workerSettings.Namespace, p), "0", 0).Err() + if err != nil { + return err + } + + err = c.Set(fmt.Sprintf("%sstat:failed:%v", workerSettings.Namespace, p), "0", 0).Err() + if err != nil { + return err + } + + // We set the heartbeat as the first thing + err = c.HSet(fmt.Sprintf("%s%s", workerSettings.Namespace, heartbeatKey), p.String(), time.Now().Format(time.RFC3339)).Err() + if err != nil { + return err + } return nil } -func (p *process) close(conn *RedisConn) error { +func (p *process) close(c *redis.Client) error { logger.Infof("%v shutdown", p) - conn.Send("SREM", fmt.Sprintf("%sworkers", workerSettings.Namespace), p) - conn.Send("DEL", fmt.Sprintf("%sstat:processed:%s", workerSettings.Namespace, p)) - conn.Send("DEL", fmt.Sprintf("%sstat:failed:%s", workerSettings.Namespace, p)) - conn.Flush() + err := c.SRem(fmt.Sprintf("%sworkers", workerSettings.Namespace), p.String()).Err() + if err != nil { + return err + } + + err = c.Del(fmt.Sprintf("%sstat:processed:%s", workerSettings.Namespace, p)).Err() + if err != nil { + return err + } + + err = c.Del(fmt.Sprintf("%sstat:failed:%s", workerSettings.Namespace, p)).Err() + if err != nil { + return err + } + + err = c.HDel(fmt.Sprintf("%s%s", workerSettings.Namespace, heartbeatKey), p.String()).Err() + if err != nil { + return err + } return nil } -func (p *process) start(conn *RedisConn) error { - conn.Send("SET", fmt.Sprintf("%sworker:%s:started", workerSettings.Namespace, p), time.Now().String()) - conn.Flush() +func (p *process) start(c *redis.Client) error { + err := c.Set(fmt.Sprintf("%sworker:%s:started", workerSettings.Namespace, p), time.Now().String(), 0).Err() + if err != nil { + return err + } return nil } -func (p *process) finish(conn *RedisConn) error { - conn.Send("DEL", fmt.Sprintf("%sworker:%s", workerSettings.Namespace, p)) - conn.Send("DEL", fmt.Sprintf("%sworker:%s:started", workerSettings.Namespace, p)) - conn.Flush() +func (p *process) finish(c *redis.Client) error { + err := c.Del(fmt.Sprintf("%sworker:%s", workerSettings.Namespace, p)).Err() + if err != nil { + return err + } + + err = c.Del(fmt.Sprintf("%sworker:%s:started", workerSettings.Namespace, p)).Err() + if err != nil { + return err + } return nil } -func (p *process) fail(conn *RedisConn) error { - conn.Send("INCR", fmt.Sprintf("%sstat:failed", workerSettings.Namespace)) - conn.Send("INCR", fmt.Sprintf("%sstat:failed:%s", workerSettings.Namespace, p)) - conn.Flush() +func (p *process) fail(c *redis.Client) error { + err := c.Incr(fmt.Sprintf("%sstat:failed", workerSettings.Namespace)).Err() + if err != nil { + return err + } + + err = c.Incr(fmt.Sprintf("%sstat:failed:%s", workerSettings.Namespace, p)).Err() + if err != nil { + return err + } return nil } diff --git a/redis.go b/redis.go deleted file mode 100644 index 085993b..0000000 --- a/redis.go +++ /dev/null @@ -1,118 +0,0 @@ -package goworker - -import ( - "crypto/tls" - "crypto/x509" - "errors" - "fmt" - "io/ioutil" - "net/url" - "time" - - "github.com/gomodule/redigo/redis" - "vitess.io/vitess/go/pools" -) - -var ( - errorInvalidScheme = errors.New("invalid Redis database URI scheme") -) - -type RedisConn struct { - redis.Conn -} - -func (r *RedisConn) Close() { - _ = r.Conn.Close() -} - -func newRedisFactory(uri string) pools.Factory { - return func() (pools.Resource, error) { - return redisConnFromURI(uri) - } -} - -func newRedisPool(uri string, capacity int, maxCapacity int, idleTimout time.Duration) *pools.ResourcePool { - return pools.NewResourcePool(newRedisFactory(uri), capacity, maxCapacity, idleTimout) -} - -func redisConnFromURI(uriString string) (*RedisConn, error) { - uri, err := url.Parse(uriString) - if err != nil { - return nil, err - } - - var network string - var host string - var password string - var db string - var dialOptions []redis.DialOption - - switch uri.Scheme { - case "redis", "rediss": - network = "tcp" - host = uri.Host - if uri.User != nil { - password, _ = uri.User.Password() - } - if len(uri.Path) > 1 { - db = uri.Path[1:] - } - if uri.Scheme == "rediss" { - dialOptions = append(dialOptions, redis.DialUseTLS(true)) - dialOptions = append(dialOptions, redis.DialTLSSkipVerify(workerSettings.SkipTLSVerify)) - if len(workerSettings.TLSCertPath) > 0 { - pool, err := getCertPool(workerSettings.TLSCertPath) - if err != nil { - return nil, err - } - config := &tls.Config{ - RootCAs: pool, - } - dialOptions = append(dialOptions, redis.DialTLSConfig(config)) - } - } - case "unix": - network = "unix" - host = uri.Path - default: - return nil, errorInvalidScheme - } - - conn, err := redis.Dial(network, host, dialOptions...) - if err != nil { - return nil, err - } - - if password != "" { - _, err := conn.Do("AUTH", password) - if err != nil { - conn.Close() - return nil, err - } - } - - if db != "" { - _, err := conn.Do("SELECT", db) - if err != nil { - conn.Close() - return nil, err - } - } - - return &RedisConn{Conn: conn}, nil -} - -func getCertPool(certPath string) (*x509.CertPool, error) { - rootCAs, _ := x509.SystemCertPool() - if rootCAs == nil { - rootCAs = x509.NewCertPool() - } - certs, err := ioutil.ReadFile(workerSettings.TLSCertPath) - if err != nil { - return nil, fmt.Errorf("Failed to read %q for the RootCA pool: %v", workerSettings.TLSCertPath, err) - } - if ok := rootCAs.AppendCertsFromPEM(certs); !ok { - return nil, fmt.Errorf("Failed to append %q to the RootCA pool: %v", workerSettings.TLSCertPath, err) - } - return rootCAs, nil -} diff --git a/worker.go b/worker.go index cecd34f..d86d8a3 100644 --- a/worker.go +++ b/worker.go @@ -4,12 +4,25 @@ import ( "encoding/json" "errors" "fmt" + "strconv" + "strings" "sync" "time" + + "github.com/go-redis/redis/v7" +) + +const ( + heartbeatInterval = time.Minute + heartbeatKey = "workers:heartbeat" + keyForWorkersPruning = "pruning_dead_workers_in_progress" + pruneInterval = heartbeatInterval * 5 ) type worker struct { process + + heartbeatTicker *time.Ticker } func newWorker(id string, queues []string) (*worker, error) { @@ -18,15 +31,34 @@ func newWorker(id string, queues []string) (*worker, error) { return nil, err } return &worker{ - process: *process, + process: *process, + heartbeatTicker: time.NewTicker(heartbeatInterval), }, nil } +// MarshalJSON marshals the worker into a []byte func (w *worker) MarshalJSON() ([]byte, error) { return json.Marshal(w.String()) } -func (w *worker) start(conn *RedisConn, job *Job) error { +// UnmarshalJSON converts the b into a woker +func (w *worker) UnmarshalJSON(b []byte) error { + s := string(b) + parts := strings.Split(s, ":") + pidAndID := strings.Split(parts[1], "-") + pid, _ := strconv.Atoi(pidAndID[0]) + w = &worker{ + process: process{ + Hostname: parts[0], + Pid: int(pid), + ID: pidAndID[1], + Queues: strings.Split(parts[2], ","), + }, + } + return nil +} + +func (w *worker) start(c *redis.Client, job *Job) error { work := &work{ Queue: job.Queue, RunAt: time.Now(), @@ -38,69 +70,191 @@ func (w *worker) start(conn *RedisConn, job *Job) error { return err } - conn.Send("SET", fmt.Sprintf("%sworker:%s", workerSettings.Namespace, w), buffer) + err = c.Set(fmt.Sprintf("%sworker:%s", workerSettings.Namespace, w), buffer, 0).Err() + if err != nil { + return err + } + logger.Debugf("Processing %s since %s [%v]", work.Queue, work.RunAt, work.Payload.Class) - return w.process.start(conn) + return w.process.start(c) +} + +func (w *worker) startHeartbeat(c *redis.Client) { + go func() { + for { + select { + case <-w.heartbeatTicker.C: + err := c.HSet(fmt.Sprintf("%s%s", workerSettings.Namespace, heartbeatKey), w.process.String(), time.Now().Format(time.RFC3339)).Err() + if err != nil { + logger.Criticalf("Error on setting hearbeat: %v", err) + return + } + } + } + }() } -func (w *worker) fail(conn *RedisConn, job *Job, err error) error { +func (w *worker) pruneDeadWorkers(c *redis.Client) { + // Block with set+nx+ex + ok, err := c.SetNX(fmt.Sprintf("%s%s", workerSettings.Namespace, keyForWorkersPruning), w.String(), heartbeatInterval).Result() + if err != nil { + logger.Criticalf("Error on setting lock to prune workers: %v", err) + return + } + + if !ok { + return + } + // Get all workers + workers, err := c.SMembers(fmt.Sprintf("%sworkers", workerSettings.Namespace)).Result() + if err != nil { + logger.Criticalf("Error on getting list of all workers: %v", err) + return + } + + // Get all workers that have sent a heartbeat and now is expired + heartbeatWorkers, err := c.HGetAll(fmt.Sprintf("%s%s", workerSettings.Namespace, heartbeatKey)).Result() + if err != nil { + logger.Criticalf("Error on getting list of all workers with heartbeat: %v", err) + return + } + + hearbeatExpiredWorkers := make(map[string]struct{}) + for k, v := range heartbeatWorkers { + if v == "" { + continue + } + + t, err := time.Parse(time.RFC3339, v) + if err != nil { + logger.Criticalf("Error on parsing the time of %q: %v", v, err) + return + } + + if time.Since(t) > pruneInterval { + hearbeatExpiredWorkers[k] = struct{}{} + } + } + + // If a worker is on the expired list kill it + for _, w := range workers { + _, hbeok := hearbeatExpiredWorkers[w] + _, hbok := heartbeatWorkers[w] + // We want to prune workers that: + // * Are expired + // * Are not on the heartbeat set and ForcePrune is set + // If they are neither of those then we do not want to expire them + if hbeok || (!hbok && workerSettings.ForcePrune) { + logger.Infof("Pruning dead worker %q", w) + + parts := strings.Split(w, ":") + pidAndID := strings.Split(parts[1], "-") + pid, _ := strconv.Atoi(pidAndID[0]) + wp := process{ + Hostname: parts[0], + Pid: int(pid), + ID: pidAndID[1], + Queues: strings.Split(parts[2], ","), + } + + bwork, err := c.Get(fmt.Sprintf("%sworker:%s", workerSettings.Namespace, wp.String())).Bytes() + if err != nil && err != redis.Nil { + logger.Criticalf("Error on getting worker work for pruning: %v", err) + return + } + if bwork != nil { + var work = work{} + err = json.Unmarshal(bwork, &work) + if err != nil { + logger.Criticalf("Error unmarshaling worker job: %v", err) + return + } + + // If it has a job flag it as failed + wk := worker{process: wp} + wk.fail(c, &Job{ + Queue: work.Queue, + Payload: work.Payload, + }, fmt.Errorf("Worker %s did not gracefully exit while processing %s", wk.process.String(), work.Payload.Class)) + } + + wp.close(c) + } + } +} + +func (w *worker) fail(c *redis.Client, job *Job, err error) error { failure := &failure{ - FailedAt: time.Now(), Payload: job.Payload, Exception: "Error", Error: err.Error(), Worker: w, Queue: job.Queue, } + failure.SetFailedAt(time.Now()) buffer, err := json.Marshal(failure) if err != nil { return err } - conn.Send("RPUSH", fmt.Sprintf("%sfailed", workerSettings.Namespace), buffer) - return w.process.fail(conn) + err = c.RPush(fmt.Sprintf("%sfailed", workerSettings.Namespace), buffer).Err() + if err != nil { + return err + } + + return w.process.fail(c) } -func (w *worker) succeed(conn *RedisConn, job *Job) error { - conn.Send("INCR", fmt.Sprintf("%sstat:processed", workerSettings.Namespace)) - conn.Send("INCR", fmt.Sprintf("%sstat:processed:%s", workerSettings.Namespace, w)) +func (w *worker) succeed(c *redis.Client) error { + err := c.Incr(fmt.Sprintf("%sstat:processed", workerSettings.Namespace)).Err() + if err != nil { + return err + } + + err = c.Incr(fmt.Sprintf("%sstat:processed:%s", workerSettings.Namespace, w)).Err() + if err != nil { + return err + } return nil } -func (w *worker) finish(conn *RedisConn, job *Job, err error) error { +func (w *worker) finish(c *redis.Client, job *Job, err error) error { if err != nil { - w.fail(conn, job, err) + err = w.fail(c, job, err) + if err != nil { + return err + } } else { - w.succeed(conn, job) + err = w.succeed(c) + if err != nil { + return err + } } - return w.process.finish(conn) + return w.process.finish(c) } func (w *worker) work(jobs <-chan *Job, monitor *sync.WaitGroup) { - conn, err := GetConn() + err := w.open(client) if err != nil { - logger.Criticalf("Error on getting connection in worker %v: %v", w, err) + logger.Criticalf("Error on opening worker %v: %v", w, err) return - } else { - w.open(conn) - PutConn(conn) } + w.startHeartbeat(client) + defer w.heartbeatTicker.Stop() + monitor.Add(1) go func() { defer func() { defer monitor.Done() - conn, err := GetConn() + err := w.close(client) if err != nil { - logger.Criticalf("Error on getting connection in worker %v: %v", w, err) + logger.Criticalf("Error on closing worker %v: %v", w, err) return - } else { - w.close(conn) - PutConn(conn) } }() for job := range jobs { @@ -112,13 +266,10 @@ func (w *worker) work(jobs <-chan *Job, monitor *sync.WaitGroup) { errorLog := fmt.Sprintf("No worker for %s in queue %s with args %v", job.Payload.Class, job.Queue, job.Payload.Args) logger.Critical(errorLog) - conn, err := GetConn() + err := w.finish(client, job, errors.New(errorLog)) if err != nil { - logger.Criticalf("Error on getting connection in worker %v: %v", w, err) + logger.Criticalf("Error on finishing worker %v: %v", w, err) return - } else { - w.finish(conn, job, errors.New(errorLog)) - PutConn(conn) } } } @@ -127,29 +278,26 @@ func (w *worker) work(jobs <-chan *Job, monitor *sync.WaitGroup) { func (w *worker) run(job *Job, workerFunc workerFunc) { var err error + defer func() { - conn, errCon := GetConn() - if errCon != nil { - logger.Criticalf("Error on getting connection in worker on finish %v: %v", w, errCon) + errFinish := w.finish(client, job, err) + if errFinish != nil { + logger.Criticalf("Error on finishing worker %v: %v", w, errFinish) return - } else { - w.finish(conn, job, err) - PutConn(conn) } }() + defer func() { if r := recover(); r != nil { err = errors.New(fmt.Sprint(r)) } }() - conn, err := GetConn() - if err != nil { - logger.Criticalf("Error on getting connection in worker on start %v: %v", w, err) + errStart := w.start(client, job) + if errStart != nil { + logger.Criticalf("Error on starting worker %v: %v", w, errStart) return - } else { - w.start(conn, job) - PutConn(conn) } + err = workerFunc(job.Queue, job.Payload.Args...) } diff --git a/workers.go b/workers.go index 0a5fb6f..42aac91 100644 --- a/workers.go +++ b/workers.go @@ -27,30 +27,23 @@ func Enqueue(job *Job) error { return err } - conn, err := GetConn() - if err != nil { - logger.Criticalf("Error on getting connection on enqueue") - return err - } - defer PutConn(conn) - buffer, err := json.Marshal(job.Payload) if err != nil { logger.Criticalf("Cant marshal payload on enqueue") return err } - err = conn.Send("RPUSH", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buffer) + err = client.RPush(fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buffer).Err() if err != nil { logger.Criticalf("Cant push to queue") return err } - err = conn.Send("SADD", fmt.Sprintf("%squeues", workerSettings.Namespace), job.Queue) + err = client.SAdd(fmt.Sprintf("%squeues", workerSettings.Namespace), job.Queue).Err() if err != nil { logger.Criticalf("Cant register queue to list of use queues") return err } - return conn.Flush() + return nil }