diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..f59f07b --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,8 @@ +## [Unreleased] + +- Moved from `redigo` to `go-redis` + ([Issue benmanns/goworker#69](https://github.com/benmanns/goworker/issues/69)) + +## [0.1.4] _2021-06-07_ + +Fork from https://github.com/benmanns/goworker from master beeing https://github.com/benmanns/goworker/commit/d28a4f34a4d183f3ea2e51b4b8268807e0984942 diff --git a/go.mod b/go.mod index 00a08f7..7d6b57c 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,7 @@ module github.com/skaurus/goworker go 1.18 require ( - github.com/cihub/seelog v0.0.0-20140730094913-72ae425987bc - github.com/gomodule/redigo v1.8.2 - golang.org/x/net v0.0.0-20200822124328-c89045814202 - vitess.io/vitess v3.0.0-rc.3.0.20181212200900-e2c5239f54d1+incompatible + github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 + github.com/go-redis/redis/v7 v7.4.0 + golang.org/x/net v0.0.0-20201110031124-69a78807bb2b ) diff --git a/go.sum b/go.sum index da88c49..3faafc2 100644 --- a/go.sum +++ b/go.sum @@ -1,25 +1,50 @@ -github.com/cihub/seelog v0.0.0-20140730094913-72ae425987bc h1:HSZdsOzV0MO6cEcf31hZoT6KJGI806Z523bkYPDwkQs= -github.com/cihub/seelog v0.0.0-20140730094913-72ae425987bc/go.mod h1:9d6lWj8KzO/fd/NrVaLscBKmPigpZpn5YawRPw+e3Yo= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/gomodule/redigo v1.8.2 h1:H5XSIre1MB5NbPYFp+i1NBbb5qN1W8Y8YAQoAYbkm8k= -github.com/gomodule/redigo v1.8.2/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= -github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 h1:kHaBemcxl8o/pQ5VM1c8PVE1PubbNx3mjUr09OqWGCs= +github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575/go.mod h1:9d6lWj8KzO/fd/NrVaLscBKmPigpZpn5YawRPw+e3Yo= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/go-redis/redis/v7 v7.4.0 h1:7obg6wUoj05T0EpY0o8B59S9w5yeMWql7sw2kwNW1x4= +github.com/go-redis/redis/v7 v7.4.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo= +github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= +github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20200822124328-c89045814202 h1:VvcQYSHwXgi7W+TpUR6A9g6Up98WAHf3f/ulnJ62IyA= -golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b h1:uwuIcX0g4Yl1NC5XAz37xsr2lTtcqevgzYNVt49waME= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -vitess.io/vitess v3.0.0-rc.3.0.20181212200900-e2c5239f54d1+incompatible h1:TCG4ZGCiFNr7XGP8nhT++5Wwi1jRC6Xk9IPxZiBQXB0= -vitess.io/vitess v3.0.0-rc.3.0.20181212200900-e2c5239f54d1+incompatible/go.mod h1:h4qvkyNYTOC0xI+vcidSWoka0gQAZc9ZPHbkHo48gP0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/goworker.go b/goworker.go index cf2faad..835cc78 100644 --- a/goworker.go +++ b/goworker.go @@ -1,20 +1,25 @@ package goworker import ( + "crypto/tls" + "crypto/x509" + "fmt" + "io/ioutil" "os" "strconv" "sync" "time" + "github.com/go-redis/redis/v7" + "golang.org/x/net/context" "github.com/cihub/seelog" - "vitess.io/vitess/go/pools" ) var ( logger seelog.LoggerInterface - pool *pools.ResourcePool + client *redis.Client ctx context.Context initMutex sync.Mutex initialized bool @@ -61,54 +66,66 @@ func Init() error { } ctx = context.Background() - pool = newRedisPool(workerSettings.URI, workerSettings.Connections, workerSettings.Connections, time.Minute) + opts, err := redis.ParseURL(workerSettings.URI) + if err != nil { + return err + } + + if len(workerSettings.TLSCertPath) > 0 { + certPool, err := getCertPool() + if err != nil { + return err + } + opts.TLSConfig = &tls.Config{ + RootCAs: certPool, + InsecureSkipVerify: workerSettings.SkipTLSVerify, + } + } + + client = redis.NewClient(opts).WithContext(ctx) + err = client.Ping().Err() + if err != nil { + return err + } initialized = true } + return nil } -// GetConn returns a connection from the goworker Redis -// connection pool. When using the pool, check in -// connections as quickly as possible, because holding a -// connection will cause concurrent worker functions to lock -// while they wait for an available connection. Expect this -// API to change drastically. -func GetConn() (*RedisConn, error) { - resource, err := pool.Get(ctx) - +func getCertPool() (*x509.CertPool, error) { + rootCAs, _ := x509.SystemCertPool() + if rootCAs == nil { + rootCAs = x509.NewCertPool() + } + certs, err := ioutil.ReadFile(workerSettings.TLSCertPath) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to read %q for the RootCA pool: %v", workerSettings.TLSCertPath, err) } - return resource.(*RedisConn), nil -} - -// PutConn puts a connection back into the connection pool. -// Run this as soon as you finish using a connection that -// you got from GetConn. Expect this API to change -// drastically. -func PutConn(conn *RedisConn) { - pool.Put(conn) + if ok := rootCAs.AppendCertsFromPEM(certs); !ok { + return nil, fmt.Errorf("failed to append %q to the RootCA pool: %v", workerSettings.TLSCertPath, err) + } + return rootCAs, nil } // Close cleans up resources initialized by goworker. This // will be called by Work when cleaning up. However, if you // are using the Init function to access goworker functions // and configuration without processing jobs by calling -// Work, you should run this function when cleaning up. For -// example, -// -// if err := goworker.Init(); err != nil { -// fmt.Println("Error:", err) -// } -// defer goworker.Close() -func Close() { +// Work, you should run this function when cleaning up. +func Close() error { initMutex.Lock() defer initMutex.Unlock() if initialized { - pool.Close() + err := client.Close() + if err != nil { + return err + } initialized = false } + + return nil } // Work starts the goworker process. Check for errors in diff --git a/poller.go b/poller.go index 54ad696..0f6737f 100644 --- a/poller.go +++ b/poller.go @@ -5,6 +5,8 @@ import ( "encoding/json" "fmt" "time" + + "github.com/go-redis/redis/v7" ) type poller struct { @@ -23,20 +25,24 @@ func newPoller(queues []string, isStrict bool) (*poller, error) { }, nil } -func (p *poller) getJob(conn *RedisConn) (*Job, error) { +func (p *poller) getJob(c *redis.Client) (*Job, error) { for _, queue := range p.queues(p.isStrict) { logger.Debugf("Checking %s", queue) - reply, err := conn.Do("LPOP", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, queue)) + result, err := c.LPop(fmt.Sprintf("%squeue:%s", workerSettings.Namespace, queue)).Result() if err != nil { + // no jobs for now, continue on another queue + if err == redis.Nil { + continue + } return nil, err } - if reply != nil { + if result != "" { logger.Debugf("Found job on %s", queue) job := &Job{Queue: queue} - decoder := json.NewDecoder(bytes.NewReader(reply.([]byte))) + decoder := json.NewDecoder(bytes.NewReader([]byte(result))) if workerSettings.UseNumber { decoder.UseNumber() } @@ -52,31 +58,29 @@ func (p *poller) getJob(conn *RedisConn) (*Job, error) { } func (p *poller) poll(interval time.Duration, quit <-chan bool) (<-chan *Job, error) { - jobs := make(chan *Job) + err := p.open(client) + if err != nil { + return nil, err + } - conn, err := GetConn() + err = p.start(client) if err != nil { - logger.Criticalf("Error on getting connection in poller %s: %v", p, err) - close(jobs) return nil, err - } else { - p.open(conn) - p.start(conn) - PutConn(conn) } + jobs := make(chan *Job) go func() { defer func() { close(jobs) - conn, err := GetConn() + err = p.finish(client) + if err != nil { + return + } + + err = p.close(client) if err != nil { - logger.Criticalf("Error on getting connection in poller %s: %v", p, err) return - } else { - p.finish(conn) - p.close(conn) - PutConn(conn) } }() @@ -85,22 +89,17 @@ func (p *poller) poll(interval time.Duration, quit <-chan bool) (<-chan *Job, er case <-quit: return default: - conn, err := GetConn() - if err != nil { - logger.Criticalf("Error on getting connection in poller %s: %v", p, err) - return - } - - job, err := p.getJob(conn) + job, err := p.getJob(client) if err != nil { - logger.Criticalf("Error on %v getting job from %v: %v", p, p.Queues, err) - PutConn(conn) + logger.Criticalf("Error on %v getting job from %v: %+v", p, p.Queues, err) return } if job != nil { - conn.Send("INCR", fmt.Sprintf("%sstat:processed:%v", workerSettings.Namespace, p)) - conn.Flush() - PutConn(conn) + err = client.Incr(fmt.Sprintf("%sstat:processed:%v", workerSettings.Namespace, p)).Err() + if err != nil { + return + } + select { case jobs <- job: case <-quit: @@ -109,19 +108,15 @@ func (p *poller) poll(interval time.Duration, quit <-chan bool) (<-chan *Job, er logger.Criticalf("Error requeueing %v: %v", job, err) return } - conn, err := GetConn() + + err = client.LPush(fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buf).Err() if err != nil { - logger.Criticalf("Error on getting connection in poller %s: %v", p, err) return } - conn.Send("LPUSH", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buf) - conn.Flush() - PutConn(conn) return } } else { - PutConn(conn) if workerSettings.ExitOnComplete { return } diff --git a/process.go b/process.go index 82bcd9e..8ded419 100644 --- a/process.go +++ b/process.go @@ -6,6 +6,8 @@ import ( "os" "strings" "time" + + "github.com/go-redis/redis/v7" ) type process struct { @@ -33,44 +35,78 @@ func (p *process) String() string { return fmt.Sprintf("%s:%d-%s:%s", p.Hostname, p.Pid, p.ID, strings.Join(p.Queues, ",")) } -func (p *process) open(conn *RedisConn) error { - conn.Send("SADD", fmt.Sprintf("%sworkers", workerSettings.Namespace), p) - conn.Send("SET", fmt.Sprintf("%sstat:processed:%v", workerSettings.Namespace, p), "0") - conn.Send("SET", fmt.Sprintf("%sstat:failed:%v", workerSettings.Namespace, p), "0") - conn.Flush() +func (p *process) open(c *redis.Client) error { + err := c.SAdd(fmt.Sprintf("%sworkers", workerSettings.Namespace), p.String()).Err() + if err != nil { + return err + } + + err = c.Set(fmt.Sprintf("%sstat:processed:%v", workerSettings.Namespace, p), "0", 0).Err() + if err != nil { + return err + } + + err = c.Set(fmt.Sprintf("%sstat:failed:%v", workerSettings.Namespace, p), "0", 0).Err() + if err != nil { + return err + } return nil } -func (p *process) close(conn *RedisConn) error { +func (p *process) close(c *redis.Client) error { logger.Infof("%v shutdown", p) - conn.Send("SREM", fmt.Sprintf("%sworkers", workerSettings.Namespace), p) - conn.Send("DEL", fmt.Sprintf("%sstat:processed:%s", workerSettings.Namespace, p)) - conn.Send("DEL", fmt.Sprintf("%sstat:failed:%s", workerSettings.Namespace, p)) - conn.Flush() + err := c.SRem(fmt.Sprintf("%sworkers", workerSettings.Namespace), p.String()).Err() + if err != nil { + return err + } + + err = c.Del(fmt.Sprintf("%sstat:processed:%s", workerSettings.Namespace, p)).Err() + if err != nil { + return err + } + + err = c.Del(fmt.Sprintf("%sstat:failed:%s", workerSettings.Namespace, p)).Err() + if err != nil { + return err + } return nil } -func (p *process) start(conn *RedisConn) error { - conn.Send("SET", fmt.Sprintf("%sworker:%s:started", workerSettings.Namespace, p), time.Now().String()) - conn.Flush() +func (p *process) start(c *redis.Client) error { + err := c.Set(fmt.Sprintf("%sworker:%s:started", workerSettings.Namespace, p), time.Now().String(), 0).Err() + if err != nil { + return err + } return nil } -func (p *process) finish(conn *RedisConn) error { - conn.Send("DEL", fmt.Sprintf("%sworker:%s", workerSettings.Namespace, p)) - conn.Send("DEL", fmt.Sprintf("%sworker:%s:started", workerSettings.Namespace, p)) - conn.Flush() +func (p *process) finish(c *redis.Client) error { + err := c.Del(fmt.Sprintf("%sworker:%s", workerSettings.Namespace, p)).Err() + if err != nil { + return err + } + + err = c.Del(fmt.Sprintf("%sworker:%s:started", workerSettings.Namespace, p)).Err() + if err != nil { + return err + } return nil } -func (p *process) fail(conn *RedisConn) error { - conn.Send("INCR", fmt.Sprintf("%sstat:failed", workerSettings.Namespace)) - conn.Send("INCR", fmt.Sprintf("%sstat:failed:%s", workerSettings.Namespace, p)) - conn.Flush() +func (p *process) fail(c *redis.Client) error { + err := c.Incr(fmt.Sprintf("%sstat:failed", workerSettings.Namespace)).Err() + if err != nil { + return err + } + + err = c.Incr(fmt.Sprintf("%sstat:failed:%s", workerSettings.Namespace, p)).Err() + if err != nil { + return err + } return nil } diff --git a/redis.go b/redis.go deleted file mode 100644 index 085993b..0000000 --- a/redis.go +++ /dev/null @@ -1,118 +0,0 @@ -package goworker - -import ( - "crypto/tls" - "crypto/x509" - "errors" - "fmt" - "io/ioutil" - "net/url" - "time" - - "github.com/gomodule/redigo/redis" - "vitess.io/vitess/go/pools" -) - -var ( - errorInvalidScheme = errors.New("invalid Redis database URI scheme") -) - -type RedisConn struct { - redis.Conn -} - -func (r *RedisConn) Close() { - _ = r.Conn.Close() -} - -func newRedisFactory(uri string) pools.Factory { - return func() (pools.Resource, error) { - return redisConnFromURI(uri) - } -} - -func newRedisPool(uri string, capacity int, maxCapacity int, idleTimout time.Duration) *pools.ResourcePool { - return pools.NewResourcePool(newRedisFactory(uri), capacity, maxCapacity, idleTimout) -} - -func redisConnFromURI(uriString string) (*RedisConn, error) { - uri, err := url.Parse(uriString) - if err != nil { - return nil, err - } - - var network string - var host string - var password string - var db string - var dialOptions []redis.DialOption - - switch uri.Scheme { - case "redis", "rediss": - network = "tcp" - host = uri.Host - if uri.User != nil { - password, _ = uri.User.Password() - } - if len(uri.Path) > 1 { - db = uri.Path[1:] - } - if uri.Scheme == "rediss" { - dialOptions = append(dialOptions, redis.DialUseTLS(true)) - dialOptions = append(dialOptions, redis.DialTLSSkipVerify(workerSettings.SkipTLSVerify)) - if len(workerSettings.TLSCertPath) > 0 { - pool, err := getCertPool(workerSettings.TLSCertPath) - if err != nil { - return nil, err - } - config := &tls.Config{ - RootCAs: pool, - } - dialOptions = append(dialOptions, redis.DialTLSConfig(config)) - } - } - case "unix": - network = "unix" - host = uri.Path - default: - return nil, errorInvalidScheme - } - - conn, err := redis.Dial(network, host, dialOptions...) - if err != nil { - return nil, err - } - - if password != "" { - _, err := conn.Do("AUTH", password) - if err != nil { - conn.Close() - return nil, err - } - } - - if db != "" { - _, err := conn.Do("SELECT", db) - if err != nil { - conn.Close() - return nil, err - } - } - - return &RedisConn{Conn: conn}, nil -} - -func getCertPool(certPath string) (*x509.CertPool, error) { - rootCAs, _ := x509.SystemCertPool() - if rootCAs == nil { - rootCAs = x509.NewCertPool() - } - certs, err := ioutil.ReadFile(workerSettings.TLSCertPath) - if err != nil { - return nil, fmt.Errorf("Failed to read %q for the RootCA pool: %v", workerSettings.TLSCertPath, err) - } - if ok := rootCAs.AppendCertsFromPEM(certs); !ok { - return nil, fmt.Errorf("Failed to append %q to the RootCA pool: %v", workerSettings.TLSCertPath, err) - } - return rootCAs, nil -} diff --git a/worker.go b/worker.go index cecd34f..c8db26f 100644 --- a/worker.go +++ b/worker.go @@ -6,6 +6,8 @@ import ( "fmt" "sync" "time" + + "github.com/go-redis/redis/v7" ) type worker struct { @@ -26,7 +28,7 @@ func (w *worker) MarshalJSON() ([]byte, error) { return json.Marshal(w.String()) } -func (w *worker) start(conn *RedisConn, job *Job) error { +func (w *worker) start(c *redis.Client, job *Job) error { work := &work{ Queue: job.Queue, RunAt: time.Now(), @@ -38,13 +40,17 @@ func (w *worker) start(conn *RedisConn, job *Job) error { return err } - conn.Send("SET", fmt.Sprintf("%sworker:%s", workerSettings.Namespace, w), buffer) + err = c.Set(fmt.Sprintf("%sworker:%s", workerSettings.Namespace, w), buffer, 0).Err() + if err != nil { + return err + } + logger.Debugf("Processing %s since %s [%v]", work.Queue, work.RunAt, work.Payload.Class) - return w.process.start(conn) + return w.process.start(c) } -func (w *worker) fail(conn *RedisConn, job *Job, err error) error { +func (w *worker) fail(c *redis.Client, job *Job, err error) error { failure := &failure{ FailedAt: time.Now(), Payload: job.Payload, @@ -57,35 +63,49 @@ func (w *worker) fail(conn *RedisConn, job *Job, err error) error { if err != nil { return err } - conn.Send("RPUSH", fmt.Sprintf("%sfailed", workerSettings.Namespace), buffer) - return w.process.fail(conn) + err = c.RPush(fmt.Sprintf("%sfailed", workerSettings.Namespace), buffer).Err() + if err != nil { + return err + } + + return w.process.fail(c) } -func (w *worker) succeed(conn *RedisConn, job *Job) error { - conn.Send("INCR", fmt.Sprintf("%sstat:processed", workerSettings.Namespace)) - conn.Send("INCR", fmt.Sprintf("%sstat:processed:%s", workerSettings.Namespace, w)) +func (w *worker) succeed(c *redis.Client) error { + err := c.Incr(fmt.Sprintf("%sstat:processed", workerSettings.Namespace)).Err() + if err != nil { + return err + } + + err = c.Incr(fmt.Sprintf("%sstat:processed:%s", workerSettings.Namespace, w)).Err() + if err != nil { + return err + } return nil } -func (w *worker) finish(conn *RedisConn, job *Job, err error) error { +func (w *worker) finish(c *redis.Client, job *Job, err error) error { if err != nil { - w.fail(conn, job, err) + err = w.fail(c, job, err) + if err != nil { + return err + } } else { - w.succeed(conn, job) + err = w.succeed(c) + if err != nil { + return err + } } - return w.process.finish(conn) + return w.process.finish(c) } func (w *worker) work(jobs <-chan *Job, monitor *sync.WaitGroup) { - conn, err := GetConn() + err := w.open(client) if err != nil { - logger.Criticalf("Error on getting connection in worker %v: %v", w, err) + logger.Criticalf("Error on opening worker %v: %v", w, err) return - } else { - w.open(conn) - PutConn(conn) } monitor.Add(1) @@ -94,13 +114,10 @@ func (w *worker) work(jobs <-chan *Job, monitor *sync.WaitGroup) { defer func() { defer monitor.Done() - conn, err := GetConn() + err := w.close(client) if err != nil { - logger.Criticalf("Error on getting connection in worker %v: %v", w, err) + logger.Criticalf("Error on closing worker %v: %v", w, err) return - } else { - w.close(conn) - PutConn(conn) } }() for job := range jobs { @@ -112,13 +129,10 @@ func (w *worker) work(jobs <-chan *Job, monitor *sync.WaitGroup) { errorLog := fmt.Sprintf("No worker for %s in queue %s with args %v", job.Payload.Class, job.Queue, job.Payload.Args) logger.Critical(errorLog) - conn, err := GetConn() + err := w.finish(client, job, errors.New(errorLog)) if err != nil { - logger.Criticalf("Error on getting connection in worker %v: %v", w, err) + logger.Criticalf("Error on finishing worker %v: %v", w, err) return - } else { - w.finish(conn, job, errors.New(errorLog)) - PutConn(conn) } } } @@ -127,29 +141,26 @@ func (w *worker) work(jobs <-chan *Job, monitor *sync.WaitGroup) { func (w *worker) run(job *Job, workerFunc workerFunc) { var err error + defer func() { - conn, errCon := GetConn() - if errCon != nil { - logger.Criticalf("Error on getting connection in worker on finish %v: %v", w, errCon) + errFinish := w.finish(client, job, err) + if errFinish != nil { + logger.Criticalf("Error on finishing worker %v: %v", w, errFinish) return - } else { - w.finish(conn, job, err) - PutConn(conn) } }() + defer func() { if r := recover(); r != nil { err = errors.New(fmt.Sprint(r)) } }() - conn, err := GetConn() - if err != nil { - logger.Criticalf("Error on getting connection in worker on start %v: %v", w, err) + errStart := w.start(client, job) + if errStart != nil { + logger.Criticalf("Error on starting worker %v: %v", w, errStart) return - } else { - w.start(conn, job) - PutConn(conn) } + err = workerFunc(job.Queue, job.Payload.Args...) } diff --git a/workers.go b/workers.go index 0a5fb6f..42aac91 100644 --- a/workers.go +++ b/workers.go @@ -27,30 +27,23 @@ func Enqueue(job *Job) error { return err } - conn, err := GetConn() - if err != nil { - logger.Criticalf("Error on getting connection on enqueue") - return err - } - defer PutConn(conn) - buffer, err := json.Marshal(job.Payload) if err != nil { logger.Criticalf("Cant marshal payload on enqueue") return err } - err = conn.Send("RPUSH", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buffer) + err = client.RPush(fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buffer).Err() if err != nil { logger.Criticalf("Cant push to queue") return err } - err = conn.Send("SADD", fmt.Sprintf("%squeues", workerSettings.Namespace), job.Queue) + err = client.SAdd(fmt.Sprintf("%squeues", workerSettings.Namespace), job.Queue).Err() if err != nil { logger.Criticalf("Cant register queue to list of use queues") return err } - return conn.Flush() + return nil }