Skip to content

Commit

Permalink
Code cleanup and testing
Browse files Browse the repository at this point in the history
  • Loading branch information
vtsykun committed Jan 16, 2020
1 parent 4eae7fd commit fede989
Show file tree
Hide file tree
Showing 16 changed files with 124 additions and 101 deletions.
16 changes: 5 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,20 +178,14 @@ okvpn_cron:
with_stamps:
- 'Packagist\WebBundle\Cron\WorkerStamp'

# You can add other custom options for tasks and create your own middleware.
tasks: # Defined tasks via configuration
-
command: 'app:cron:sync-amazon-orders' # Your symfony console command name
cron: "*/30 * * * *"
async: true
lock: true
arguments: { '--transport': 15 }
# Here you can add other custom options for tasks and create your own middleware.
-
command: 'App\Cron\YouServiceName' # Your service name
cron: "0 0 * * *"
command: 'app:noaa:gfs-grib-download'
cron: '34,45 */6 * * *'
messenger: { routing: lowpriority } # See Messenger configuration

lock: true
arguments: { '--transport': '0p25' }
# Here you can also add other custom options and create your own middleware.
-
command: "bash /root/renew.sh > /root/renew.txt" # Shell command
group: root # Group filter. You can run `bin/console okvpn:cron --group=root` under the root user
Expand Down
2 changes: 1 addition & 1 deletion src/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public function getConfigTreeBuilder()
// Disable doctrine listener for classes in search bundle for MQ performance
// this will leave the search functionality and if you need to update the index, you can do it manually
$rootNode->children()
->scalarNode('timezone')->end()
->scalarNode('timezone')->defaultNull()->end()
->arrayNode('messenger')
->children()
->booleanNode('enable')->defaultFalse()->end()
Expand Down
2 changes: 2 additions & 0 deletions src/DependencyInjection/OkvpnCronExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public function load(array $configs, ContainerBuilder $container)
$container->getDefinition('okvpn_cron.schedule_factory')
->replaceArgument(0, $config['with_stamps'] ?? [])
->replaceArgument(1, $defaultStamps);
$container->getDefinition('okvpn_cron.middleware.cron_expression')
->replaceArgument(0, $config['timezone']);

$container->registerForAutoconfiguration(MiddlewareEngineInterface::class)
->addTag('okvpn_cron.middleware');
Expand Down
2 changes: 1 addition & 1 deletion src/Messenger/CronMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use Okvpn\Bundle\CronBundle\Model\ScheduleEnvelope;

class CronMessage
final class CronMessage
{
private $schedule;

Expand Down
5 changes: 2 additions & 3 deletions src/Messenger/CronMessageHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@

namespace Okvpn\Bundle\CronBundle\Messenger;

use Okvpn\Bundle\CronBundle\Model\MessengerStamp;
use Okvpn\Bundle\CronBundle\Runner\ScheduleRunnerInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class CronMessageHandler implements MessageHandlerInterface
final class CronMessageHandler implements MessageHandlerInterface
{
private $runner;

Expand All @@ -19,6 +18,6 @@ public function __construct(ScheduleRunnerInterface $runner)

public function __invoke(CronMessage $message)
{
return $this->runner->execute($message->getSchedule()->without(MessengerStamp::class));
return $this->runner->execute($message->getSchedule());
}
}
25 changes: 9 additions & 16 deletions src/Messenger/CronSendersLocator.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@

namespace Okvpn\Bundle\CronBundle\Messenger;

use Okvpn\Bundle\CronBundle\Model\MessengerStamp;
use Psr\Container\ContainerInterface;
use Symfony\Component\Messenger\Envelope as MessengerEnvelope;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface;

class CronSendersLocator implements SendersLocatorInterface
final class CronSendersLocator implements SendersLocatorInterface
{
private $sendersLocatorWrapper;
private $container;
Expand All @@ -23,24 +22,18 @@ public function __construct(SendersLocatorInterface $sendersLocatorWrapper, Cont
/**
* @inheritDoc
*/
public function getSenders(MessengerEnvelope $messengerEnvelope): iterable
public function getSenders(Envelope $envelope): iterable
{
$message = $messengerEnvelope->getMessage();
if (!$message instanceof CronMessage || null === $message->getSchedule()->get(MessengerStamp::class)) {
return yield from $this->sendersLocatorWrapper->getSenders($messengerEnvelope);
if (!$envelope->getMessage() instanceof CronMessage || null === $envelope->last(RoutingStamp::class)) {
return yield from $this->sendersLocatorWrapper->getSenders($envelope);
}

$routingKey = $message->getSchedule()->get(MessengerStamp::class)->getRouting();
if (empty($routingKey)) {
return yield from $this->sendersLocatorWrapper->getSenders($messengerEnvelope);
}

foreach ($routingKey as $routing) {
if (!$this->container->has($routing)) {
throw new \RuntimeException(sprintf('Could not find messenger transport "%s" for schedule "%s".', $routing, $message->getCommand()));
foreach ($envelope->all(RoutingStamp::class) as $stamp) {
if (!$this->container->has($alias = $stamp->getRoute())) {
throw new \RuntimeException(sprintf('Could not find messenger transport "%s" for schedule.', $alias));
}

yield $routing => $this->container->get($routing);
yield $alias => $this->container->get($alias);
}
}
}
25 changes: 25 additions & 0 deletions src/Messenger/RoutingStamp.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);

namespace Okvpn\Bundle\CronBundle\Messenger;

use Symfony\Component\Messenger\Stamp\StampInterface;

/**
* @internal
*/
final class RoutingStamp implements StampInterface
{
private $routeAlias;

public function __construct(string $routeAlias)
{
$this->routeAlias = $routeAlias;
}

public function getRoute(): string
{
return $this->routeAlias;
}
}
56 changes: 28 additions & 28 deletions src/Middleware/AsyncProcessEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,35 @@ public function __construct(string $sysTempDir = null)
*/
public function handle(ScheduleEnvelope $envelope, StackInterface $stack): ScheduleEnvelope
{
if ($envelope->has(AsyncStamp::class)) {
$envelope = $envelope->without(AsyncStamp::class);
$phpFinder = new PhpExecutableFinder();
$phpPath = $phpFinder->find();

$filename = $this->tempDir . DIRECTORY_SEPARATOR . 'okvpn-cron-' . md5(random_bytes(10)) . '.txt';
file_put_contents($filename, serialize($envelope));

// create command string
$runCommand = sprintf(
'%s %s %s %s',
$phpPath,
$_SERVER['argv'][0],
CronExecuteCommand::$defaultName,
$filename
);

// workaround for Windows
if (defined('PHP_WINDOWS_VERSION_BUILD')) {
$wsh = new \COM('WScript.shell');
$wsh->Run($runCommand, 0, false);
} else {
// run command
shell_exec(sprintf('%s > /dev/null 2>&1 & echo $!', $runCommand));
}

return $stack->end()->handle($envelope, $stack);
if (false === $envelope->has(AsyncStamp::class)) {
return $stack->next()->handle($envelope, $stack);
}

return $stack->next()->handle($envelope, $stack);
$envelope = $envelope->without(AsyncStamp::class);
$phpFinder = new PhpExecutableFinder();
$phpPath = $phpFinder->find();

$filename = $this->tempDir . DIRECTORY_SEPARATOR . 'okvpn-cron-' . md5(random_bytes(10)) . '.txt';
file_put_contents($filename, serialize($envelope));

// create command string
$runCommand = sprintf(
'%s %s %s %s',
$phpPath,
$_SERVER['argv'][0],
CronExecuteCommand::$defaultName,
$filename
);

// workaround for Windows
if (defined('PHP_WINDOWS_VERSION_BUILD')) {
$wsh = new \COM('WScript.shell');
$wsh->Run($runCommand, 0, false);
} else {
// run command
shell_exec(sprintf('%s > /dev/null 2>&1 & echo $!', $runCommand));
}

return $stack->end()->handle($envelope, $stack);
}
}
21 changes: 15 additions & 6 deletions src/Middleware/CronMiddlewareEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,32 @@

namespace Okvpn\Bundle\CronBundle\Middleware;

use Cron\CronExpression;
use Okvpn\Bundle\CronBundle\Model\ScheduleEnvelope;
use Okvpn\Bundle\CronBundle\Model\ScheduleStamp;

final class CronMiddlewareEngine implements MiddlewareEngineInterface
{
private $timeZone;

public function __construct(string $timeZone = null)
{
$this->timeZone = $timeZone;
}

/**
* @inheritDoc
*/
public function handle(ScheduleEnvelope $envelope, StackInterface $stack): ScheduleEnvelope
{
/** @var ScheduleStamp $stamp */
if ($stamp = $envelope->get(ScheduleStamp::class)) {
if (!$stamp->cronExpression()->isDue()) {
return $stack->end()->handle($envelope, $stack);
}
if (!$stamp = $envelope->get(ScheduleStamp::class)) {
$stack->next()->handle($envelope, $stack);
}

if (CronExpression::factory($stamp->cronExpression())->isDue('now', $this->timeZone)) {
return $stack->next()->handle($envelope->without(ScheduleStamp::class), $stack);
}

return $stack->next()->handle($envelope->without(ScheduleStamp::class), $stack);
return $stack->end()->handle($envelope, $stack);
}
}
40 changes: 20 additions & 20 deletions src/Middleware/LockMiddlewareEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,27 @@ public function __construct(LockFactory $factory = null)
public function handle(ScheduleEnvelope $envelope, StackInterface $stack): ScheduleEnvelope
{
/** @var LockStamp $stamp */
if ($stamp = $envelope->get(LockStamp::class)) {
if (null === $this->factory) {
throw new \LogicException('You need to install and configure symfony/lock to run tasks with locking.');
}

$lock = $this->factory->createLock(
$stamp->lockName(),
$stamp->getTtl()
);

if (!$lock->acquire()) {
return $stack->end()->handle($envelope, $stack);
}

try {
return $stack->next()->handle($envelope->without(LockStamp::class), $stack);
} finally {
$lock->release();
}
if (!$stamp = $envelope->get(LockStamp::class)) {
return $stack->next()->handle($envelope, $stack);
}

return $stack->next()->handle($envelope, $stack);
if (null === $this->factory) {
throw new \LogicException('You need to install symfony/lock to run tasks with locking.');
}

$lock = $this->factory->createLock(
$stamp->lockName(),
$stamp->getTtl()
);

if (!$lock->acquire()) {
return $stack->end()->handle($envelope, $stack);
}

try {
return $stack->next()->handle($envelope->without(LockStamp::class), $stack);
} finally {
$lock->release();
}
}
}
5 changes: 4 additions & 1 deletion src/Middleware/MessengerEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Okvpn\Bundle\CronBundle\Middleware;

use Okvpn\Bundle\CronBundle\Messenger\CronMessage;
use Okvpn\Bundle\CronBundle\Messenger\RoutingStamp;
use Okvpn\Bundle\CronBundle\Model\MessengerStamp;
use Okvpn\Bundle\CronBundle\Model\ScheduleEnvelope;
use Symfony\Component\Messenger\MessageBusInterface;
Expand Down Expand Up @@ -32,7 +33,9 @@ public function handle(ScheduleEnvelope $envelope, StackInterface $stack): Sched
}

$message = new CronMessage($envelope);
$this->messageBus->dispatch($message);
$routing = $envelope->get(MessengerStamp::class)->getRouting();
$stamps = \array_map(function ($route) {return new RoutingStamp($route);}, $routing);
$this->messageBus->dispatch($message, $stamps);

return $stack->end()->handle($envelope, $stack);
}
Expand Down
11 changes: 5 additions & 6 deletions src/Middleware/ShellInvokeEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
use Okvpn\Bundle\CronBundle\Model\OutputStamp;
use Okvpn\Bundle\CronBundle\Model\ScheduleEnvelope;
use Okvpn\Bundle\CronBundle\Model\ShellStamp;
use Okvpn\Bundle\CronBundle\Model\TimeoutStamp;
use Symfony\Component\Process\Process;

final class ShellInvokeEngine implements MiddlewareEngineInterface
Expand All @@ -17,7 +16,7 @@ final class ShellInvokeEngine implements MiddlewareEngineInterface
*/
public function handle(ScheduleEnvelope $envelope, StackInterface $stack): ScheduleEnvelope
{
if (false === $envelope->has(ShellStamp::class)) {
if (!$stamp = $envelope->get(ShellStamp::class)) {
return $stack->next()->handle($envelope, $stack);
}

Expand All @@ -33,16 +32,16 @@ public function handle(ScheduleEnvelope $envelope, StackInterface $stack): Sched
$process = new Process($command);
}

if ($stamp = $envelope->get(TimeoutStamp::class)) {
$process->setTimeout($stamp->getTimeout());
if (null !== $timeout = $stamp->getTimeout()) {
$process->setTimeout($timeout);
}

$output = null;
try {
$process->run();
$output = $process->getErrorOutput() . "\n" . $process->getOutput();
$output = $process->getErrorOutput() . $process->getOutput();
} catch (\Exception $exception) {
$output = $exception->getMessage() . "\n" . $process->getErrorOutput() . "\n" . $process->getOutput();
$output = $exception->getMessage() . $process->getErrorOutput() . $process->getOutput();
}

return $stack->end()->handle($envelope->with(new OutputStamp($output)), $stack);
Expand Down
2 changes: 1 addition & 1 deletion src/Model/LockStamp.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public function __construct($lock = null)
$this->lockName = isset($lock['name']) ? (string) $lock['name'] :
(is_string($lock) ? $lock : md5(serialize($lock)));

$this->ttl = isset($lock['ttl']) ? (int)$lock['ttl'] : null;
$this->ttl = isset($lock['ttl']) ? (int) $lock['ttl'] : null;
}

/**
Expand Down
6 changes: 3 additions & 3 deletions src/Model/MessengerStamp.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ class MessengerStamp implements CommandStamp
*/
public function __construct($messenger = null)
{
$this->routing = isset($messenger['routing']) ? (array) $messenger['routing'] : null;
$this->routing = isset($messenger['routing']) ? (array) $messenger['routing'] : [];
}

/**
* @return array|string[]|null
* @return array|string[]
*/
public function getRouting()
public function getRouting(): array
{
return $this->routing;
}
Expand Down
Loading

0 comments on commit fede989

Please sign in to comment.