forked from shmilyzxt/yii2-queue
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathWorker.php
83 lines (73 loc) · 2.18 KB
/
Worker.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
<?php
/**
* 队列监听进程类,用户后台启动监听队列
* User: shmilyzxt 49783121@qq.com
* Date: 2016/11/22
* Time: 16:54
*/
namespace shmilyzxt\queue;
use common\tools\var_dumper;
use shmilyzxt\queue\base\Job;
use shmilyzxt\queue\base\Queue;
class Worker
{
/**
* 启用一个队列后台监听任务
* @param Queue $queue
* @param string $queueName 监听队列的名称(在pushon的时候把任务推送到哪个队列,则需要监听相应的队列才能获取任务)
* @param int $attempt 队列任务失败尝试次数,0为不限制
* @param int $memory 允许使用的最大内存
* @param int $sleep 每次检测的时间间隔
*/
public static function listen(Queue $queue, $queueName = 'default', $attempt = 10, $memory = 512, $sleep = 3, $delay = 0){
while (true){
try{
$job = $queue->pop($queueName);
}catch (\Exception $e){
throw $e;//正式使用环境要忽略异常或者记录异常
continue;
}
if($job instanceof Job){
if($attempt > 0 && $job->getAttempts() > $attempt){
$job->failed();
}else{
try{
//throw new \Exception("test failed");
$job->execute();
}catch (\Exception $e){
if (! $job->isDeleted()) {
$job->release($delay);
}
}
}
}else{
self::sleep($sleep);
}
if (self::memoryExceeded($memory)) {
self::stop();
}
}
}
/**
* 判断内存使用是否超出
* @param int $memoryLimit
* @return bool
*/
public static function memoryExceeded($memoryLimit)
{
return (memory_get_usage() / 1024 / 1024) >= $memoryLimit;
}
/**
* 停止队列监听
*/
public static function stop(){
die;
}
/**
* 休眠
*/
public static function sleep($seconds){
sleep($seconds);
echo "sleep\r\n";
}
}