From 7b8f3734da334be4a5ff616a32809424ecedd959 Mon Sep 17 00:00:00 2001 From: MoceanLiu Date: Fri, 17 Feb 2017 18:17:35 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E6=96=B0=E5=A2=9Eauth=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/Resque.php | 307 +++++++++++-------------------------------------- 1 file changed, 65 insertions(+), 242 deletions(-) diff --git a/lib/Resque.php b/lib/Resque.php index d03b2ecf..c4f8e28d 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -1,4 +1,7 @@ auth(self::$auth); + } + } + else { + if (strpos($server, 'unix:') === false) { + list($host, $port) = explode(':', $server); + } + else { + $host = $server; + $port = null; + } + require_once dirname(__FILE__) . '/Resque/Redis.php'; + self::$redis = new Resque_Redis($host, $port); + if (empty(self::$auth) === false) { + self::$redis->auth(self::$auth); + } } - return $pid; + self::$redis->select(self::$redisDatabase); + return self::$redis; } /** @@ -101,16 +116,8 @@ public static function fork() */ public static function push($queue, $item) { - $encodedItem = json_encode($item); - if ($encodedItem === false) { - return false; - } self::redis()->sadd('queues', $queue); - $length = self::redis()->rpush('queue:' . $queue, $encodedItem); - if ($length < 1) { - return false; - } - return true; + self::redis()->rpush('queue:' . $queue, json_encode($item)); } /** @@ -122,8 +129,7 @@ public static function push($queue, $item) */ public static function pop($queue) { - $item = self::redis()->lpop('queue:' . $queue); - + $item = self::redis()->lpop('queue:' . $queue); if(!$item) { return; } @@ -131,73 +137,10 @@ public static function pop($queue) return json_decode($item, true); } - /** - * Remove items of the specified queue - * - * @param string $queue The name of the queue to fetch an item from. - * @param array $items - * @return integer number of deleted items - */ - public static function dequeue($queue, $items = Array()) - { - if(count($items) > 0) { - return self::removeItems($queue, $items); - } else { - return self::removeList($queue); - } - } - - /** - * Remove specified queue - * - * @param string $queue The name of the queue to remove. - * @return integer Number of deleted items - */ - public static function removeQueue($queue) - { - $num = self::removeList($queue); - self::redis()->srem('queues', $queue); - return $num; - } - - /** - * Pop an item off the end of the specified queues, using blocking list pop, - * decode it and return it. - * - * @param array $queues - * @param int $timeout - * @return null|array Decoded item from the queue. - */ - public static function blpop(array $queues, $timeout) - { - $list = array(); - foreach($queues AS $queue) { - $list[] = 'queue:' . $queue; - } - - $item = self::redis()->blpop($list, (int)$timeout); - - if(!$item) { - return; - } - - /** - * Normally the Resque_Redis class returns queue names without the prefix - * But the blpop is a bit different. It returns the name as prefix:queue:name - * So we need to strip off the prefix:queue: part - */ - $queue = substr($item[0], strlen(self::redis()->getPrefix() . 'queue:')); - - return array( - 'queue' => $queue, - 'payload' => json_decode($item[1], true) - ); - } - /** * Return the size (number of pending jobs) of the specified queue. * - * @param string $queue name of the queue to be checked for pending jobs + * @param $queue name of the queue to be checked for pending jobs * * @return int The size of the queue. */ @@ -214,28 +157,21 @@ public static function size($queue) * @param array $args Any optional arguments that should be passed when the job is executed. * @param boolean $trackStatus Set to true to be able to monitor the status of a job. * - * @return string|boolean Job ID when the job was created, false if creation was cancelled due to beforeEnqueue + * @return string */ public static function enqueue($queue, $class, $args = null, $trackStatus = false) { - $id = Resque::generateJobId(); - $hookParams = array( - 'class' => $class, - 'args' => $args, - 'queue' => $queue, - 'id' => $id, - ); - try { - Resque_Event::trigger('beforeEnqueue', $hookParams); + require_once dirname(__FILE__) . '/Resque/Job.php'; + $result = Resque_Job::create($queue, $class, $args, $trackStatus); + if ($result) { + Resque_Event::trigger('afterEnqueue', array( + 'class' => $class, + 'args' => $args, + 'queue' => $queue, + )); } - catch(Resque_Job_DontCreate $e) { - return false; - } - - Resque_Job::create($queue, $class, $args, $trackStatus, $id); - Resque_Event::trigger('afterEnqueue', $hookParams); - return $id; + return $result; } /** @@ -246,6 +182,7 @@ public static function enqueue($queue, $class, $args = null, $trackStatus = fals */ public static function reserve($queue) { + require_once dirname(__FILE__) . '/Resque/Job.php'; return Resque_Job::reserve($queue); } @@ -262,118 +199,4 @@ public static function queues() } return $queues; } - - /** - * Remove Items from the queue - * Safely moving each item to a temporary queue before processing it - * If the Job matches, counts otherwise puts it in a requeue_queue - * which at the end eventually be copied back into the original queue - * - * @private - * - * @param string $queue The name of the queue - * @param array $items - * @return integer number of deleted items - */ - private static function removeItems($queue, $items = Array()) - { - $counter = 0; - $originalQueue = 'queue:'. $queue; - $tempQueue = $originalQueue. ':temp:'. time(); - $requeueQueue = $tempQueue. ':requeue'; - - // move each item from original queue to temp queue and process it - $finished = false; - while (!$finished) { - $string = self::redis()->rpoplpush($originalQueue, self::redis()->getPrefix() . $tempQueue); - - if (!empty($string)) { - if(self::matchItem($string, $items)) { - self::redis()->rpop($tempQueue); - $counter++; - } else { - self::redis()->rpoplpush($tempQueue, self::redis()->getPrefix() . $requeueQueue); - } - } else { - $finished = true; - } - } - - // move back from temp queue to original queue - $finished = false; - while (!$finished) { - $string = self::redis()->rpoplpush($requeueQueue, self::redis()->getPrefix() .$originalQueue); - if (empty($string)) { - $finished = true; - } - } - - // remove temp queue and requeue queue - self::redis()->del($requeueQueue); - self::redis()->del($tempQueue); - - return $counter; - } - - /** - * matching item - * item can be ['class'] or ['class' => 'id'] or ['class' => {:foo => 1, :bar => 2}] - * @private - * - * @params string $string redis result in json - * @params $items - * - * @return (bool) - */ - private static function matchItem($string, $items) - { - $decoded = json_decode($string, true); - - foreach($items as $key => $val) { - # class name only ex: item[0] = ['class'] - if (is_numeric($key)) { - if($decoded['class'] == $val) { - return true; - } - # class name with args , example: item[0] = ['class' => {'foo' => 1, 'bar' => 2}] - } elseif (is_array($val)) { - $decodedArgs = (array)$decoded['args'][0]; - if ($decoded['class'] == $key && - count($decodedArgs) > 0 && count(array_diff($decodedArgs, $val)) == 0) { - return true; - } - # class name with ID, example: item[0] = ['class' => 'id'] - } else { - if ($decoded['class'] == $key && $decoded['id'] == $val) { - return true; - } - } - } - return false; - } - - /** - * Remove List - * - * @private - * - * @params string $queue the name of the queue - * @return integer number of deleted items belongs to this list - */ - private static function removeList($queue) - { - $counter = self::size($queue); - $result = self::redis()->del('queue:' . $queue); - return ($result == 1) ? $counter : 0; - } - - /* - * Generate an identifier to attach to a job for status tracking. - * - * @return string - */ - public static function generateJobId() - { - return md5(uniqid('', true)); - } } From 6e0bd66576b970fbaff4d4824e2fd087905f8569 Mon Sep 17 00:00:00 2001 From: MoceanLiu Date: Fri, 17 Feb 2017 18:45:11 +0800 Subject: [PATCH 2/2] Update composer.json --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 28a0bf58..22aebc62 100644 --- a/composer.json +++ b/composer.json @@ -1,5 +1,5 @@ { - "name": "chrisboulton/php-resque", + "name": "beck/php-resque", "type": "library", "description": "Redis backed library for creating background jobs and processing them later. Based on resque for Ruby.", "keywords": ["job", "background", "redis", "resque"],