Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

新增auth支持 #328

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
307 changes: 65 additions & 242 deletions lib/Resque.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
<?php
require_once dirname(__FILE__) . '/Resque/Event.php';
require_once dirname(__FILE__) . '/Resque/Exception.php';

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These lines are from before Composer was introduced...

/**
* Base Resque class.
*
Expand All @@ -10,8 +13,6 @@ class Resque
{
const VERSION = '1.2';

const DEFAULT_INTERVAL = 5;

/**
* @var Resque_Redis Instance of Resque_Redis that talks to redis.
*/
Expand All @@ -28,20 +29,31 @@ class Resque
*/
protected static $redisDatabase = 0;

/**
* @var string auth of Redis database
*/
protected static $auth;

/**
* @var int PID of current process. Used to detect changes when forking
* and implement "thread" safety to avoid race conditions.
*/
protected static $pid = null;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems useful, though.

/**
* Given a host/port combination separated by a colon, set it as
* the redis server that Resque will talk to.
*
* @param mixed $server Host/port combination separated by a colon, DSN-formatted URI, or
* a callable that receives the configured database ID
* and returns a Resque_Redis instance, or
* @param mixed $server Host/port combination separated by a colon, or
* a nested array of servers with host/port pairs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropping a lot of valid server definitions, here...

* @param int $database
* @param string $auth
*/
public static function setBackend($server, $database = 0)
public static function setBackend($server, $database = 0, $auth = '')
{
self::$redisServer = $server;
self::$redisDatabase = $database;
self::$auth = $auth;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see why this might be tempting to have. Feels a bit superfluous, though.

self::$redis = null;
}

Expand All @@ -52,44 +64,47 @@ public static function setBackend($server, $database = 0)
*/
public static function redis()
{
if (self::$redis !== null) {
return self::$redis;
// Detect when the PID of the current process has changed (from a fork, etc)
// and force a reconnect to redis.
$pid = getmypid();
if (self::$pid !== $pid) {
self::$redis = null;
self::$pid = $pid;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bit still looks helpful, though.

}

if (is_callable(self::$redisServer)) {
self::$redis = call_user_func(self::$redisServer, self::$redisDatabase);
} else {
self::$redis = new Resque_Redis(self::$redisServer, self::$redisDatabase);
if(!is_null(self::$redis)) {
return self::$redis;
}

return self::$redis;
}

/**
* fork() helper method for php-resque that handles issues PHP socket
* and phpredis have with passing around sockets between child/parent
* processes.
*
* Will close connection to Redis before forking.
*
* @return int Return vars as per pcntl_fork(). False if pcntl_fork is unavailable
*/
public static function fork()
{
if(!function_exists('pcntl_fork')) {
return false;
$server = self::$redisServer;
if (empty($server)) {
$server = 'localhost:6379';
}

// Close the connection to Redis before forking.
// This is a workaround for issues phpredis has.
self::$redis = null;

$pid = pcntl_fork();
if($pid === -1) {
throw new RuntimeException('Unable to fork child worker.');
if(is_array($server)) {
require_once dirname(__FILE__) . '/Resque/RedisCluster.php';
self::$redis = new Resque_RedisCluster($server);
if (empty(self::$auth) === false) {
self::$redis->auth(self::$auth);
}
}
else {
if (strpos($server, 'unix:') === false) {
list($host, $port) = explode(':', $server);
}
else {
$host = $server;
$port = null;
}
require_once dirname(__FILE__) . '/Resque/Redis.php';
self::$redis = new Resque_Redis($host, $port);
if (empty(self::$auth) === false) {
self::$redis->auth(self::$auth);
}
}

return $pid;
self::$redis->select(self::$redisDatabase);
return self::$redis;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this logic is unnecessary with Credis, and is in fact redundant. Assuming it dates from before the switch to Composer, as well.

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Resque::fork() method is pretty essential to the way the library operates now. It shouldn't be removed.


/**
Expand All @@ -101,16 +116,8 @@ public static function fork()
*/
public static function push($queue, $item)
{
$encodedItem = json_encode($item);
if ($encodedItem === false) {
return false;
}
self::redis()->sadd('queues', $queue);
$length = self::redis()->rpush('queue:' . $queue, $encodedItem);
if ($length < 1) {
return false;
}
return true;
self::redis()->rpush('queue:' . $queue, json_encode($item));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This removes a lot of error handling...


/**
Expand All @@ -122,82 +129,18 @@ public static function push($queue, $item)
*/
public static function pop($queue)
{
$item = self::redis()->lpop('queue:' . $queue);

$item = self::redis()->lpop('queue:' . $queue);
if(!$item) {
return;
}

return json_decode($item, true);
}

/**
* Remove items of the specified queue
*
* @param string $queue The name of the queue to fetch an item from.
* @param array $items
* @return integer number of deleted items
*/
public static function dequeue($queue, $items = Array())
{
if(count($items) > 0) {
return self::removeItems($queue, $items);
} else {
return self::removeList($queue);
}
}

/**
* Remove specified queue
*
* @param string $queue The name of the queue to remove.
* @return integer Number of deleted items
*/
public static function removeQueue($queue)
{
$num = self::removeList($queue);
self::redis()->srem('queues', $queue);
return $num;
}

/**
* Pop an item off the end of the specified queues, using blocking list pop,
* decode it and return it.
*
* @param array $queues
* @param int $timeout
* @return null|array Decoded item from the queue.
*/
public static function blpop(array $queues, $timeout)
{
$list = array();
foreach($queues AS $queue) {
$list[] = 'queue:' . $queue;
}

$item = self::redis()->blpop($list, (int)$timeout);

if(!$item) {
return;
}

/**
* Normally the Resque_Redis class returns queue names without the prefix
* But the blpop is a bit different. It returns the name as prefix:queue:name
* So we need to strip off the prefix:queue: part
*/
$queue = substr($item[0], strlen(self::redis()->getPrefix() . 'queue:'));

return array(
'queue' => $queue,
'payload' => json_decode($item[1], true)
);
}

/**
* Return the size (number of pending jobs) of the specified queue.
*
* @param string $queue name of the queue to be checked for pending jobs
* @param $queue name of the queue to be checked for pending jobs
*
* @return int The size of the queue.
*/
Expand All @@ -214,28 +157,21 @@ public static function size($queue)
* @param array $args Any optional arguments that should be passed when the job is executed.
* @param boolean $trackStatus Set to true to be able to monitor the status of a job.
*
* @return string|boolean Job ID when the job was created, false if creation was cancelled due to beforeEnqueue
* @return string
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've grown rather fond of being able to cancel jobs after they've been queued. Can't remove this feature.

public static function enqueue($queue, $class, $args = null, $trackStatus = false)
{
$id = Resque::generateJobId();
$hookParams = array(
'class' => $class,
'args' => $args,
'queue' => $queue,
'id' => $id,
);
try {
Resque_Event::trigger('beforeEnqueue', $hookParams);
require_once dirname(__FILE__) . '/Resque/Job.php';
$result = Resque_Job::create($queue, $class, $args, $trackStatus);
if ($result) {
Resque_Event::trigger('afterEnqueue', array(
'class' => $class,
'args' => $args,
'queue' => $queue,
));
}
catch(Resque_Job_DontCreate $e) {
return false;
}

Resque_Job::create($queue, $class, $args, $trackStatus, $id);
Resque_Event::trigger('afterEnqueue', $hookParams);

return $id;
return $result;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again dropping a lot of error handling...


/**
Expand All @@ -246,6 +182,7 @@ public static function enqueue($queue, $class, $args = null, $trackStatus = fals
*/
public static function reserve($queue)
{
require_once dirname(__FILE__) . '/Resque/Job.php';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Composer handles this, now...

return Resque_Job::reserve($queue);
}

Expand All @@ -262,118 +199,4 @@ public static function queues()
}
return $queues;
}

/**
* Remove Items from the queue
* Safely moving each item to a temporary queue before processing it
* If the Job matches, counts otherwise puts it in a requeue_queue
* which at the end eventually be copied back into the original queue
*
* @private
*
* @param string $queue The name of the queue
* @param array $items
* @return integer number of deleted items
*/
private static function removeItems($queue, $items = Array())
{
$counter = 0;
$originalQueue = 'queue:'. $queue;
$tempQueue = $originalQueue. ':temp:'. time();
$requeueQueue = $tempQueue. ':requeue';

// move each item from original queue to temp queue and process it
$finished = false;
while (!$finished) {
$string = self::redis()->rpoplpush($originalQueue, self::redis()->getPrefix() . $tempQueue);

if (!empty($string)) {
if(self::matchItem($string, $items)) {
self::redis()->rpop($tempQueue);
$counter++;
} else {
self::redis()->rpoplpush($tempQueue, self::redis()->getPrefix() . $requeueQueue);
}
} else {
$finished = true;
}
}

// move back from temp queue to original queue
$finished = false;
while (!$finished) {
$string = self::redis()->rpoplpush($requeueQueue, self::redis()->getPrefix() .$originalQueue);
if (empty($string)) {
$finished = true;
}
}

// remove temp queue and requeue queue
self::redis()->del($requeueQueue);
self::redis()->del($tempQueue);

return $counter;
}

/**
* matching item
* item can be ['class'] or ['class' => 'id'] or ['class' => {:foo => 1, :bar => 2}]
* @private
*
* @params string $string redis result in json
* @params $items
*
* @return (bool)
*/
private static function matchItem($string, $items)
{
$decoded = json_decode($string, true);

foreach($items as $key => $val) {
# class name only ex: item[0] = ['class']
if (is_numeric($key)) {
if($decoded['class'] == $val) {
return true;
}
# class name with args , example: item[0] = ['class' => {'foo' => 1, 'bar' => 2}]
} elseif (is_array($val)) {
$decodedArgs = (array)$decoded['args'][0];
if ($decoded['class'] == $key &&
count($decodedArgs) > 0 && count(array_diff($decodedArgs, $val)) == 0) {
return true;
}
# class name with ID, example: item[0] = ['class' => 'id']
} else {
if ($decoded['class'] == $key && $decoded['id'] == $val) {
return true;
}
}
}
return false;
}

/**
* Remove List
*
* @private
*
* @params string $queue the name of the queue
* @return integer number of deleted items belongs to this list
*/
private static function removeList($queue)
{
$counter = self::size($queue);
$result = self::redis()->del('queue:' . $queue);
return ($result == 1) ? $counter : 0;
}

/*
* Generate an identifier to attach to a job for status tracking.
*
* @return string
*/
public static function generateJobId()
{
return md5(uniqid('', true));
}
}