Skip to content

Commit

Permalink
Merge pull request #263 from bowphp/feat-integrate-queue-adapters
Browse files Browse the repository at this point in the history
Feat integrate queue adapters
  • Loading branch information
papac authored Sep 22, 2023
2 parents c4220ad + 2f4ba40 commit c0c7624
Show file tree
Hide file tree
Showing 18 changed files with 554 additions and 66 deletions.
14 changes: 13 additions & 1 deletion src/Application/Exception/BaseErrorHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
use Bow\View\View;
use Bow\Http\Exception\HttpException;
use Bow\Validation\Exception\ValidationException;
use Policier\Exception\TokenExpiredException;
use Policier\Exception\TokenInvalidException;

class BaseErrorHandler
{
Expand All @@ -32,6 +34,14 @@ protected function render($view, $data = []): string
*/
protected function json($exception, $code = null)
{
if ($exception instanceof TokenInvalidException) {
$code = 'TOKEN_INVALID';
}

if ($exception instanceof TokenExpiredException) {
$code = 'TOKEN_EXPIRED';
}

if (is_null($code)) {
if (method_exists($exception, 'getStatus')) {
$code = $exception->getStatus();
Expand Down Expand Up @@ -66,6 +76,8 @@ protected function json($exception, $code = null)
$response["trace"] = $exception->getTrace();
}

return response()->json($response, $status);
response()->status($status);

return die(json_encode($response));
}
}
1 change: 1 addition & 0 deletions src/Console/Command.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class Command extends AbstractCommand
"key" => \Bow\Console\Command\GenerateKeyCommand::class,
"resource" => \Bow\Console\Command\GenerateResourceControllerCommand::class,
"session" => \Bow\Console\Command\GenerateSessionCommand::class,
"queue" => \Bow\Console\Command\GenerateQueueCommand::class,
"cache" => \Bow\Console\Command\GenerateCacheCommand::class,
],
"runner" => [
Expand Down
34 changes: 34 additions & 0 deletions src/Console/Command/GenerateQueueCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

declare(strict_types=1);

namespace Bow\Console\Command;

use Bow\Console\Color;
use Bow\Console\Generator;
use Bow\Support\Str;

class GenerateQueueCommand extends AbstractCommand
{
/**
* Generate session
*
* @return void
*/
public function generate(): void
{
$create_at = date("YmdHis");
$filename = sprintf("Version%s%sTable", $create_at, ucfirst(Str::camel('queue')));

$generator = new Generator(
$this->setting->getMigrationDirectory(),
$filename
);

$generator->write('model/queue', [
'className' => $filename
]);

echo Color::green('Queue migration created.');
}
}
2 changes: 1 addition & 1 deletion src/Console/Console.php
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ private function generate(): void
{
$action = $this->arg->getAction();

if (!in_array($action, ['key', 'resource', 'session', 'cache'])) {
if (!in_array($action, ['key', 'resource', 'session', 'cache', 'queue'])) {
$this->throwFailsCommand('This action is not exists', 'help generate');
}

Expand Down
35 changes: 35 additions & 0 deletions src/Console/stubs/model/queue.stub
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

use Bow\Database\Migration\Migration;
use Bow\Database\Migration\SQLGenerator;

class {className} extends Migration
{
/**
* Up Migration
*/
public function up(): void
{
$this->create("queues", function (SQLGenerator $table) {
$table->addString('id', ["primary" => true]);
$table->addString('queue');
$table->addText('payload');
$table->addInteger('attempts', ["default" => 3]);
$table->addEnum('status', [
"size" => ["waiting", "processing", "reserved", "failed", "done"],
"default" => "waiting",
]);
$table->addDatetime('avalaibled_at');
$table->addDatetime('reserved_at', ["nullable" => true, "default" => null]);
$table->addDatetime('created_at');
});
}

/**
* Rollback migration
*/
public function rollback(): void
{
$this->dropIfExists("queues");
}
}
2 changes: 1 addition & 1 deletion src/Http/Client/HttpClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ private function execute(): string
/**
* Set Curl CURLOPT_RETURNTRANSFER option
*
* @return bool
* @return void
*/
private function applyCommonOptions()
{
Expand Down
25 changes: 1 addition & 24 deletions src/Queue/Adapters/BeanstalkdAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,6 @@ class BeanstalkdAdapter extends QueueAdapter
*/
private Pheanstalk $pheanstalk;

/**
* Determine the default watch name
*
* @var string
*/
private string $queue = "default";

/**
* The number of working attempts
*
* @var int
*/
private int $tries;

/**
* Define the sleep time
*
* @var int
*/
private int $sleep = 5;

/**
* Configure Beanstalkd driver
*
Expand Down Expand Up @@ -122,12 +101,10 @@ public function size(?string $queue = null): int
* Queue a job
*
* @param ProducerService $producer
* @return QueueAdapter
* @return void
*/
public function push(ProducerService $producer): void
{
// TODO: should be removed
// $this->flush();
$queues = (array) cache("beanstalkd:queues");

if (!in_array($producer->getQueue(), $queues)) {
Expand Down
150 changes: 150 additions & 0 deletions src/Queue/Adapters/DatabaseAdapter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
<?php

namespace Bow\Queue\Adapters;

use Bow\Database\Database;
use Bow\Database\QueryBuilder;
use Bow\Queue\ProducerService;

class DatabaseAdapter extends QueueAdapter
{
/**
* Define the instance Pheanstalk
*
* @var QueryBuilder
*/
private QueryBuilder $table;

/**
* Configure Beanstalkd driver
*
* @param array $queue
* @return mixed
*/
public function configure(array $queue): DatabaseAdapter
{
$this->table = Database::table($queue["table"] ?? "queue_jobs");

return $this;
}

/**
* Get the size of the queue.
*
* @param string $queue
* @return int
*/
public function size(?string $queue = null): int
{
return $this->table
->where("queue", $this->getQueue($queue))
->count();
}

/**
* Queue a job
*
* @param ProducerService $producer
* @return QueueAdapter
*/
public function push(ProducerService $producer): void
{
$this->table->insert([
"id" => $this->generateId(),
"queue" => $this->getQueue(),
"payload" => base64_encode($this->serializeProducer($producer)),
"attempts" => $this->tries,
"status" => "waiting",
"avalaibled_at" => date("Y-m-d H:i:s", time() + $producer->getDelay()),
"reserved_at" => null,
"created_at" => date("Y-m-d H:i:s"),
]);
}

/**
* Run the worker
*
* @param string|null $queue
* @return mixed
*/
public function run(string $queue = null): void
{
// we want jobs from define queue only.
$queue = $this->getQueue($queue);
$queues = $this->table
->where("queue", $queue)
->whereIn("status", ["waiting", "reserved"])
->get();

if (count($queues) == 0) {
$this->sleep($this->sleep ?? 5);
return;
}

foreach ($queues as $job) {
try {
$producer = $this->unserializeProducer(base64_decode($job->payload));
if (strtotime($job->avalaibled_at) >= time()) {
if (!is_null($job->reserved_at) && strtotime($job->reserved_at) < time()) {
continue;
}
$this->table->where("id", $job->id)->update([
"status" => "processing",
]);
$this->execute($producer, $job);
continue;
}
} catch (\Exception $e) {
error_log($e->getMessage());
app('logger')->error($e->getMessage(), $e->getTrace());
cache("failed:job:" . $job->id, $job->payload);
if (!isset($producer)) {
$this->sleep(1);
continue;
}
if ($producer->jobShouldBeDelete() || $job->attempts <= 0) {
$this->table->where("id", $job->id)->delete();
$this->sleep(1);
continue;
}
$this->table->where("id", $job->id)->update([
"status" => "reserved",
"attempts" => $job->attempts - 1,
"avalaibled_at" => date("Y-m-d H:i:s", time() + $producer->getDelay()),
"reserved_at" => date("Y-m-d H:i:s", time() + $producer->getRetry())
]);
$this->sleep(1);
}
}
}

/**
* Process the next job on the queue.
*
* @param ProducerService $producer
* @param mixed $job
*/
private function execute(ProducerService $producer, mixed $job)
{
call_user_func([$producer, "process"]);
$this->table->where("id", $job->id)->update([
"status" => "done"
]);
sleep($this->sleep ?? 5);
}

/**
* Flush the queue table
*
* @param ?string $queue
* @return void
*/
public function flush(?string $queue = null): void
{
if (is_null($queue)) {
$this->table->truncate();
} else {
$this->table->where("queue", $queue)->delete();
}
}
}
Loading

0 comments on commit c0c7624

Please sign in to comment.