diff --git a/CHANGELOG.md b/CHANGELOG.md
index e4029f9..9004072 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,19 @@
+# 🌟 2.0.0 🌟
+
+- :star2: - Complete rewrite of the ChainProcessor
+- :star2: - Optimized chain processor not to send as many stop items when multiple chains are involved
+- :star2: - Allows ChainProcessor to output through generators the items at the end. This is great to remove all limitations of the current sub chains.
+- :star2: - ChainRepeatOperation now can handle more complex cases & asynchronous tasks. The operation is not considered as experimental anymore.
+- :star2: - Added FailSafe operation allowing to execute sub-chains and catching exception in them.
+
+- -:exclamation: **Deprecation** The chainProcessorInterface was changed significantly.
+ - This means any complex custom operations using the chainProcessor needs to be redone. => This shouldn't affect any load/extract or transform operation.
+ - This means any integrations using the ETL also needs to be updated.
+ - For context, this change was not done lightly, the ETL was initially developed using php5.5. The php language
+ has evolved and some complex cases the ETL didn't handle well could be easily fixed by using the new features of
+ the language (mostly using Generators).
+- -:exclamation: **Deprecation** Support for symfony 4 and 5 was dropped.
+
# 1.2.0
- :star2: Feature #14 - Added possibility to create subchains.
- :star2: Feature #34 - Allow chain's to be observed to see progress.
diff --git a/composer.json b/composer.json
index 207ea7e..f1d4514 100644
--- a/composer.json
+++ b/composer.json
@@ -4,17 +4,18 @@
"license": "MIT",
"description": "A small etl coded in php, to extract/transform/load Edit with ease.",
"require": {
- "php": ">=8.1",
+ "php": ">=8.2",
"oliverde8/associative-array-simplified": "^1.0",
- "psr/log": "^1.0|^2.0|^3.0",
- "symfony/expression-language": "^4.3|^5.0|^6.0|^7.0",
- "symfony/filesystem": "^5.4|^6.0|^7.0",
- "symfony/validator": "^4.3|^5.0|^6.0|^7.0",
- "symfony/yaml": "^4.3|^5.0|^6.0|^7.0"
+ "psr/log": "^2.0|^3.0",
+ "symfony/expression-language": "^6.0|^7.0",
+ "symfony/filesystem": "^6.0|^7.0",
+ "symfony/validator": "^6.0|^7.0",
+ "symfony/yaml": "^6.0|^7.0"
},
"require-dev": {
"phpunit/php-code-coverage": "^9.2",
- "phpunit/phpunit": "9.4.*"
+ "phpunit/phpunit": "^9.6",
+ "rector/rector": "^1.2"
},
"suggest": {
"league/flysystem": "For transfaring files from external file systems",
diff --git a/old-docs/examples/00-describe/01-csv-transform.php b/old-docs/examples/00-describe/01-csv-transform.php
index 9cde0a9..10c25f2 100644
--- a/old-docs/examples/00-describe/01-csv-transform.php
+++ b/old-docs/examples/00-describe/01-csv-transform.php
@@ -8,7 +8,7 @@
$symfonyOutput = new \Oliverde8\Component\PhpEtl\Output\SymfonyConsoleOutput($output);
$chainProcessor->process(
- new ArrayIterator([getProcessFilePath(__DIR__, "/customers.csv")]),
+ new ArrayIterator(["/customers.csv"]),
[],
function (array $operationStates, int $itemsProcessed, int $itemsReturned, bool $hasEnded) use ($symfonyOutput) {
$symfonyOutput->output($operationStates, $hasEnded);
diff --git a/old-docs/examples/00-describe/02-csv-transform-merge.php b/old-docs/examples/00-describe/02-csv-transform-merge.php
index bd5d352..92288de 100644
--- a/old-docs/examples/00-describe/02-csv-transform-merge.php
+++ b/old-docs/examples/00-describe/02-csv-transform-merge.php
@@ -5,6 +5,6 @@
$chainProcessor = getChainProcessor(__DIR__ . '/01-csv-transform.yml');
$chainProcessor->process(
- new ArrayIterator([__DIR__ . "/customers.csv", __DIR__ . "/customers2.csv"]),
+ new ArrayIterator(["/customers.csv", "/customers2.csv"]),
[]
);
\ No newline at end of file
diff --git a/old-docs/examples/00-describe/03-json-grouped-merge.php b/old-docs/examples/00-describe/03-json-grouped-merge.php
index 7de2a8e..96b749d 100644
--- a/old-docs/examples/00-describe/03-json-grouped-merge.php
+++ b/old-docs/examples/00-describe/03-json-grouped-merge.php
@@ -5,6 +5,6 @@
$chainProcessor = getChainProcessor(__FILE__);
$chainProcessor->process(
- new ArrayIterator([__DIR__ . "/customers.csv", __DIR__ . "/customers2.csv"]),
+ new ArrayIterator(["/customers.csv", "/customers2.csv"]),
[]
);
\ No newline at end of file
diff --git a/old-docs/examples/00-describe/04-csv-filter.php b/old-docs/examples/00-describe/04-csv-filter.php
index 7de2a8e..96b749d 100644
--- a/old-docs/examples/00-describe/04-csv-filter.php
+++ b/old-docs/examples/00-describe/04-csv-filter.php
@@ -5,6 +5,6 @@
$chainProcessor = getChainProcessor(__FILE__);
$chainProcessor->process(
- new ArrayIterator([__DIR__ . "/customers.csv", __DIR__ . "/customers2.csv"]),
+ new ArrayIterator(["/customers.csv", "/customers2.csv"]),
[]
);
\ No newline at end of file
diff --git a/old-docs/examples/00-describe/06-csv-transform.php b/old-docs/examples/00-describe/06-csv-transform.php
index f5127b8..fd1b5a3 100644
--- a/old-docs/examples/00-describe/06-csv-transform.php
+++ b/old-docs/examples/00-describe/06-csv-transform.php
@@ -11,6 +11,6 @@
$chainProcessor = getChainProcessor(__FILE__, $context);
$chainProcessor->process(
- new ArrayIterator([__DIR__ . "/customers.csv"]),
+ new ArrayIterator(["/customers.csv"]),
$context
);
\ No newline at end of file
diff --git a/old-docs/examples/00-describe/07-json-transform.php b/old-docs/examples/00-describe/07-json-transform.php
index 30899c1..0da662e 100644
--- a/old-docs/examples/00-describe/07-json-transform.php
+++ b/old-docs/examples/00-describe/07-json-transform.php
@@ -5,7 +5,7 @@
$chainProcessor = getChainProcessor(__FILE__);
$chainProcessor->process(
- new ArrayIterator([__DIR__ . "/products.json"]),
+ new ArrayIterator(["/products.json"]),
[
'locales' => ['fr_FR', 'en_US']
]
diff --git a/phpunit.xml.dist b/phpunit.xml.dist
index 19f41f6..5e73f71 100644
--- a/phpunit.xml.dist
+++ b/phpunit.xml.dist
@@ -14,6 +14,7 @@
+
diff --git a/rector.php b/rector.php
new file mode 100644
index 0000000..4d23113
--- /dev/null
+++ b/rector.php
@@ -0,0 +1,14 @@
+withPaths([
+ __DIR__ . '/old-docs',
+ __DIR__ . '/src',
+ ])
+ // uncomment to reach your current PHP version
+ // ->withPhpSets()
+ ->withTypeCoverageLevel(0);
diff --git a/src/Oliverde8/Component/PhpEtl/Builder/Factories/ChainRepeatFactory.php b/src/Oliverde8/Component/PhpEtl/Builder/Factories/ChainRepeatFactory.php
index f064daa..660bd65 100644
--- a/src/Oliverde8/Component/PhpEtl/Builder/Factories/ChainRepeatFactory.php
+++ b/src/Oliverde8/Component/PhpEtl/Builder/Factories/ChainRepeatFactory.php
@@ -17,9 +17,12 @@ public function __construct(string $operation, string $class, protected readonly
public function build(string $operation, array $options): ChainOperationInterface
{
+ $maxAsync = $options['maxASynchronousItems'] ?? 0;
+ $allowAsync = $maxAsync > 0;
+
// Do not allow
- $chainProcessor = $this->builder->buildChainProcessor($options['chain'],[], 0);
- return $this->create($chainProcessor, $options['validationExpr']);
+ $chainProcessor = $this->builder->buildChainProcessor($options['chain'],[], $maxAsync);
+ return $this->create($chainProcessor, $options['validationExpr'], $allowAsync);
}
protected function configureValidator(): Constraint
@@ -33,6 +36,9 @@ protected function configureValidator(): Constraint
new Assert\Type(["type" => "string"]),
new Assert\NotBlank(),
],
+ 'maxASynchronousItems' => [
+ new Assert\Type(["type" => "int"]),
+ ]
]);
}
}
diff --git a/src/Oliverde8/Component/PhpEtl/ChainOperation/ChainRepeatOperation.php b/src/Oliverde8/Component/PhpEtl/ChainOperation/ChainRepeatOperation.php
index 8de0ea1..1327b78 100644
--- a/src/Oliverde8/Component/PhpEtl/ChainOperation/ChainRepeatOperation.php
+++ b/src/Oliverde8/Component/PhpEtl/ChainOperation/ChainRepeatOperation.php
@@ -3,13 +3,11 @@
namespace Oliverde8\Component\PhpEtl\ChainOperation;
-use oliverde8\AssociativeArraySimplified\AssociativeArray;
use Oliverde8\Component\PhpEtl\ChainProcessor;
use Oliverde8\Component\PhpEtl\Item\DataItemInterface;
use Oliverde8\Component\PhpEtl\Item\GroupedItem;
use Oliverde8\Component\PhpEtl\Item\ItemInterface;
use Oliverde8\Component\PhpEtl\Model\ExecutionContext;
-use Oliverde8\Component\PhpEtl\Model\RepeatOperationIterator;
use Symfony\Component\ExpressionLanguage\ExpressionLanguage;
class ChainRepeatOperation extends AbstractChainOperation implements DetailedObservableOperation
@@ -20,7 +18,8 @@ class ChainRepeatOperation extends AbstractChainOperation implements DetailedObs
public function __construct(
protected readonly ChainProcessor $chainProcessor,
- protected readonly string $validationExpression
+ protected readonly string $validationExpression,
+ protected readonly bool $allowAsynchronous = false,
) {
$this->onSplittedChainOperationConstruct([$chainProcessor]);
$this->expressionLanguage = new ExpressionLanguage();
@@ -29,12 +28,22 @@ public function __construct(
public function processData(DataItemInterface $inputItem, ExecutionContext $context): ItemInterface
{
// Nothing to process.
- return new GroupedItem(new RepeatOperationIterator(
- $this->chainProcessor,
- $inputItem,
- $context,
- $this
- ));
+ return new GroupedItem($this->repeatOnItem($inputItem, $context));
+ }
+
+ public function repeatOnItem(DataItemInterface $inputItem, ExecutionContext $context): \Generator
+ {
+ $invalidItem = false;
+ do {
+ $item = null;
+ foreach ($this->chainProcessor->processGenerator($inputItem, $context, withStop: false, allowAsynchronous: $this->allowAsynchronous) as $item) {
+ if ($this->itemIsValid($item, $context)) {
+ yield $item;
+ } else {
+ $invalidItem = true;
+ }
+ }
+ } while ($item && !$invalidItem);
}
public function itemIsValid(ItemInterface $item, ExecutionContext $context): bool
@@ -43,7 +52,8 @@ public function itemIsValid(ItemInterface $item, ExecutionContext $context): boo
$values = ['data' => $item->getData(), 'context' => $context];
return $this->expressionLanguage->evaluate($this->validationExpression, $values);
}
-
- return false;
+
+ // If not a data, then it's valid.
+ return true;
}
}
diff --git a/src/Oliverde8/Component/PhpEtl/ChainOperation/ChainSplitOperation.php b/src/Oliverde8/Component/PhpEtl/ChainOperation/ChainSplitOperation.php
index fde377b..5e519ae 100644
--- a/src/Oliverde8/Component/PhpEtl/ChainOperation/ChainSplitOperation.php
+++ b/src/Oliverde8/Component/PhpEtl/ChainOperation/ChainSplitOperation.php
@@ -41,7 +41,7 @@ public function __construct(array $chainProcessors)
public function processData(DataItemInterface $item, ExecutionContext $context): ItemInterface
{
foreach ($this->chainProcessors as $chainProcessor) {
- $chainProcessor->processItemWithChain($item, 0, $context);
+ foreach ($chainProcessor->processGenerator($item, $context, withStop: false) as $newItem) {}
}
// Nothing to process.
@@ -51,12 +51,7 @@ public function processData(DataItemInterface $item, ExecutionContext $context):
public function processStop(StopItem $item, ExecutionContext $context): ItemInterface
{
foreach ($this->chainProcessors as $chainProcessor) {
- $result = $chainProcessor->processItemWithChain($item, 0, $context);
-
- if ($result !== $item) {
- // Return a new stop item in order to continue flushing out data with stop items.
- $item = new StopItem();
- }
+ foreach ($chainProcessor->processGenerator($item, $context) as $newItem) {}
}
return $item;
diff --git a/src/Oliverde8/Component/PhpEtl/ChainOperation/FailSafeOperation.php b/src/Oliverde8/Component/PhpEtl/ChainOperation/FailSafeOperation.php
new file mode 100644
index 0000000..d622892
--- /dev/null
+++ b/src/Oliverde8/Component/PhpEtl/ChainOperation/FailSafeOperation.php
@@ -0,0 +1,62 @@
+repeatOnItem($item, $context) as $newItem) {}
+ return $item;
+ }
+
+ return new GroupedItem($this->repeatOnItem($item, $context));
+ }
+
+ public function repeatOnItem(ItemInterface $inputItem, ExecutionContext $context): \Generator
+ {
+ $nbAttempts = 0;
+ do {
+ try {
+ foreach ($this->chainProcessor->processGenerator($inputItem, $context, withStop: false) as $newItem) {
+ yield $newItem;
+ }
+ return;
+ } catch (\Exception $exception) {
+ $nbAttempts++;
+ $exceptionHandled = false;
+ foreach ($this->exceptionsToCatch as $exceptionType) {
+ if ($exception instanceof $exceptionType) {
+ $exceptionHandled = true;
+ }
+ }
+
+ if (!$exceptionHandled || $nbAttempts >= $this->nbAttempts) {
+ $context->getLogger()->error("Failed to handle exception in fail safe!", ['exception' => $exception]);
+ throw $exception;
+ } else {
+ $context->getLogger()->warning("Handling exception with fail safe!", ['exception' => $exception, 'nbAttempts' => $nbAttempts]);
+ }
+ }
+ } while ($nbAttempts < $this->nbAttempts);
+ }
+}
\ No newline at end of file
diff --git a/src/Oliverde8/Component/PhpEtl/ChainProcessor.php b/src/Oliverde8/Component/PhpEtl/ChainProcessor.php
index 5362b4b..25c2134 100644
--- a/src/Oliverde8/Component/PhpEtl/ChainProcessor.php
+++ b/src/Oliverde8/Component/PhpEtl/ChainProcessor.php
@@ -1,5 +1,4 @@
- * @copyright 2018 Oliverde8
- * @package Oliverde8\Component\PhpEtl
- */
-class ChainProcessor extends LoggerContext implements ChainProcessorInterface
+final class ChainProcessor extends LoggerContext implements ChainProcessorInterface
{
const KEY_LOGGER_ETL_IDENTIFIER = 'etl.identifier';
@@ -35,6 +29,8 @@ class ChainProcessor extends LoggerContext implements ChainProcessorInterface
/** @var string[] */
protected readonly array $chainLinkNames;
+ protected readonly int $chainEnd;
+
protected ?ChainObserverInterface $chainObserver = null;
protected array $asyncItems = [];
@@ -53,137 +49,150 @@ public function __construct(
{
$this->chainLinkNames = array_keys($chainLinks);
$this->chainLinks = array_values($chainLinks);
+ $this->chainEnd = count($chainLinks) - 1;
}
- public function process(\Iterator $items, array $parameters, ?callable $observerCallback = null)
+
+ public function process(\Iterator|ItemInterface $item, array $parameters, ?callable $observerCallback = null, $withStop = true): void
{
$context = $this->contextFactory->get($parameters);
$context->replaceLoggerContext($parameters);
$context->setLoggerContext(self::KEY_LOGGER_ETL_IDENTIFIER, '');
- $this->initObserver($observerCallback);
+ foreach ($this->processGenerator($item, $context, $observerCallback, $withStop) as $item) {}
- try {
- $context->getLogger()->info("Starting etl process!");
- $this->processItems($items, 0, $context);
- $context->getLogger()->info("Finished etl process!");
- $context->finalise();
- } catch (\Exception $e) {
- $params['exception'] = $e;
- $context->getLogger()->info("Failed during etl process!", $params);
- $context->finalise();
- throw $e;
+ if ($this->chainObserver) {
+ $this->chainObserver->onFinish();
}
- $this->chainObserver->onFinish();
}
- public function isShared(): bool
- {
- return $this->isShared;
- }
-
- /**
- * Process list of items with chain starting at $startAt.
- */
- protected function processItems(\Iterator $items, int $startAt, ExecutionContext $context, bool $withStop = true)
- {
- $identifierPrefix = $context->getParameter('etl.identifier');
-
- $count = 1;
- foreach ($items as $item) {
- $context->setLoggerContext(self::KEY_LOGGER_ETL_IDENTIFIER, $identifierPrefix . $count++);
+ public function processGenerator(
+ \Iterator|ItemInterface $item,
+ ExecutionContext $context,
+ ?callable $observerCallback = null,
+ bool $withStop = true,
+ bool $allowAsynchronous = true
+ ): \Generator {
+ $context->getLogger()->info("Starting etl process!");
+ $this->initObserver($observerCallback);
- if ($item instanceof ItemInterface) {
- $dataItem = $item;
- } else {
- $dataItem = new DataItem($item);
- }
- $this->processItemWithChain($dataItem, $startAt, $context);
+ $originalMaxAsynchronousItems = $this->maxAsynchronousItems;
+ if (!$allowAsynchronous) {
+ $this->maxAsynchronousItems = 0;
}
-
- $stopItem = new StopItem();
- if ($withStop) {
- $stopItem = new StopItem(true);
- $context->setLoggerContext(self::KEY_LOGGER_ETL_IDENTIFIER, $identifierPrefix . 'STOP');
- while ($this->processItemWithChain($stopItem, $startAt, $context) !== $stopItem) {
- // Executing stop until the system stops.
- }
+ if ($item instanceof \Iterator) {
+ $item = new GroupedItem($item);
}
- return $stopItem;
- }
-
- public function processItemWithChain(
- ItemInterface $item,
- int $startAt,
- ExecutionContext $context,
- ?callable $observerCallback = null
- ): ItemInterface {
- $this->initObserver($observerCallback);
+ foreach($this->processItemAt($item, $context, 0) as $newItem) {
+ yield $newItem;
+ }
- if ($item instanceof StopItem) {
- $this->endAllAsyncOperations();
+ // Finalise all remaining asynchronous items.
+ foreach ($this->handleAsyncItems(0) as $newItem) {
+ yield $newItem;
}
- for ($chainNumber = $startAt; $chainNumber < count($this->chainLinks); $chainNumber++) {
- $item = $this->processItemWithOperation($item, $chainNumber, $context);
- $item = $this->processItem($item, $chainNumber, $context);
+ if ($withStop) {
+ $stopItem = new StopItem();
+ $newItem = $stopItem;
+ do {
+ foreach ($this->processItemAt($stopItem, $context, 0) as $newItem) {
+ yield $newItem;
+ }
+ } while ($newItem !== $stopItem);
}
- return $item;
+ $this->maxAsynchronousItems = $originalMaxAsynchronousItems;
}
- public function processItem(ItemInterface $item, int $chainNumber, ExecutionContext $context): ItemInterface
+ protected function processItemAt(ItemInterface $item, ExecutionContext $context, int $chainNumber = 0): \Generator
{
- $this->processAsyncOperations();
-
- if ($item instanceof AsyncItemInterface && $this->maxAsynchronousItems === 0) {
- while ($item->isRunning()) {
- usleep(1000);
+ if ($chainNumber > $this->chainEnd) {
+ // End chain !!
+ if ($item instanceof GroupedItemInterface) {
+ foreach ($this->getItemsFromGroupItem($item) as $newItem) {
+ yield $newItem;
+ }
}
- $item = $item->getItem();
- }
- if ($item instanceof AsyncItemInterface) {
- while (count($this->asyncItems) >= $this->maxAsynchronousItems) {
- usleep(1000);
- $this->processAsyncOperations();
+ elseif ($item instanceof MixItem) {
+ foreach ($item->getItems() as $item) {
+ foreach ($this->processItemAt($item, $context, $chainNumber) as $newItem) {
+ yield $newItem;
+ }
+ }
+ } elseif ($item instanceof AsyncItemInterface) {
+ $this->asyncItems[] = ['item' => $item, 'context' => $context, 'chain_number' => $chainNumber];
+ $newItem = $this->handleAsyncItems();
+ foreach($newItem as $resultItem) {
+ yield $resultItem;
+ }
+ }else {
+ yield $item;
}
- $this->asyncItems[] = [
- 'item' => $item,
- 'context' => $context,
- 'chain_number' => $chainNumber,
- ];
- return new ChainBreakItem();
+ } else if ($item instanceof GroupedItemInterface) {
+ foreach ($this->getItemsFromGroupItem($item) as $groupedItem) {
+ foreach ($this->processItemAt($groupedItem, $context, $chainNumber) as $newItem) {
+ yield $newItem;
+ }
+ }
+ } elseif ($item instanceof ChainBreakItem) {
+ yield $item;
} elseif ($item instanceof MixItem) {
- $context->setLoggerContext(self::KEY_LOGGER_ETL_IDENTIFIER, "chain link:{$this->chainLinkNames[$chainNumber]}-");
-
foreach ($item->getItems() as $mixItem) {
- if ($mixItem instanceof AsyncItemInterface) {
- $item = $this->processItemWithChain($mixItem, $chainNumber, $context);
- } elseif ($mixItem instanceof GroupedItemInterface) {
- $item = $this->processItems($mixItem->getIterator(), $chainNumber + 1, $context, false);
- } else {
- $item = $this->processItemWithChain($mixItem, $chainNumber + 1, $context);
+ foreach($this->processItemAt($mixItem, $context, $chainNumber) as $newItem) {
+ yield $newItem;
}
}
+ } elseif ($item instanceof AsyncItemInterface) {
+ $this->asyncItems[] = ['item' => $item, 'context' => $context, 'chain_number' => $chainNumber];
+ $newItem = $this->handleAsyncItems();
+ foreach($newItem as $resultItem) {
+ yield $resultItem;
+ }
+ } else {
+ $newItem = $this->processItemWithOperation($item, $context, $chainNumber);
+ foreach($this->processItemAt($newItem, $context, $chainNumber + 1) as $resultItem) {
+ yield $resultItem;
+ }
+ }
+ }
- if ($item instanceof StopItem) {
- return $item;
+ protected function getItemsFromGroupItem(GroupedItem $item): \Generator
+ {
+ foreach ($item->getIterator() as $item) {
+ if (!is_object($item)) {
+ $item = new DataItem($item);
+ } elseif (!($item instanceof DataItemInterface)) {
+ $item = new DataItem($item);
}
- return new ChainBreakItem();
- } elseif ($item instanceof GroupedItemInterface) {
- $context->setLoggerContext(self::KEY_LOGGER_ETL_IDENTIFIER, "chain link:{$this->chainLinkNames[$chainNumber]}-");
- return $this->processItems($item->getIterator(), $chainNumber + 1, $context, false);
- } else if ($item instanceof ChainBreakItem) {
- return $item;
+ yield $item;
}
+ }
- return $item;
+ protected function handleAsyncItems(int $maxItems = null): \Generator
+ {
+ if ($maxItems === null) {
+ $maxItems = $this->maxAsynchronousItems;
+ }
+
+ // Start by checking if in item is finished.
+ foreach ($this->checkAsyncItems() as $newItem) {
+ yield $newItem;
+ }
+
+ // If we have to many items in queue, well wait until it improves.
+ while (count($this->asyncItems) != 0 && count($this->asyncItems) >= $maxItems) {
+ usleep(1000);
+ foreach ($this->checkAsyncItems() as $newItem) {
+ yield $newItem;
+ }
+ }
}
- protected function processAsyncOperations()
+ protected function checkAsyncItems(): \Generator
{
$toProcess = [];
@@ -194,49 +203,44 @@ protected function processAsyncOperations()
$newItem = $item['item']->getItem();
// We consider that the process finished only once the async operation is done.
- $this->chainObserver->onAfterProcess($chainNumber, $this->chainLinks[$chainNumber], $newItem);
+ if (isset($this->chainLinks[$chainNumber])) {
+ $this->chainObserver->onAfterProcess($chainNumber, $this->chainLinks[$chainNumber], $newItem);
+ }
unset($this->asyncItems[$id]);
- $toProcess[] = [$newItem, $chainNumber + 1, $item['context']];
+ $toProcess[] = [$newItem, $item['context'], $chainNumber];
}
}
foreach ($toProcess as $arguments) {
- $this->processItemWithChain(...$arguments);
+ foreach ($this->processItemAt(...$arguments) as $newItem) {
+ yield $newItem;
+ }
}
}
- protected function endAllAsyncOperations()
- {
- while (!empty($this->asyncItems)) {
- $this->processAsyncOperations();
- if (!empty($this->asyncItems)) {
- usleep(1000);
- }
- }
- }
- /**
- * Process an item and handle errors during the process.
- *
- * @throws ChainOperationException
- */
- protected function processItemWithOperation(ItemInterface $item, int $chainNumber, ExecutionContext &$context): ItemInterface
+ protected function processItemWithOperation(ItemInterface $item, ExecutionContext $context, int $chainNumber = 0): ItemInterface
{
try {
+ $context->setLoggerContext(self::KEY_LOGGER_ETL_IDENTIFIER, "chain link:{$this->chainLinkNames[$chainNumber]}-");
$this->chainObserver->onBeforeProcess($chainNumber, $this->chainLinks[$chainNumber], $item);
- $result = $this->chainLinks[$chainNumber]->process($item, $context);
- $this->chainObserver->onAfterProcess($chainNumber, $this->chainLinks[$chainNumber], $result);
+ $context->getLogger()->info("Starting etl process!");
- return $result;
+ $newItem = $this->chainLinks[$chainNumber]->process($item, $context);
+
+ $context->getLogger()->info("Finished etl process!");
+ $this->chainObserver->onAfterProcess($chainNumber, $this->chainLinks[$chainNumber], $newItem);
+
+ return $newItem;
} catch (\Exception $exception) {
throw new ChainOperationException(
"An exception was thrown during the handling of the chain link : "
- . "{$this->chainLinkNames[$chainNumber]} "
- . "with the item {$context->getParameter(self::KEY_LOGGER_ETL_IDENTIFIER)}.",
+ . "{$this->chainLinkNames[$chainNumber]} "
+ . "with the item {$context->getParameter(self::KEY_LOGGER_ETL_IDENTIFIER)}.",
0,
$exception,
- $this->chainLinkNames[$chainNumber]
+ (string) $this->chainLinkNames[$chainNumber]
);
}
}
@@ -255,20 +259,4 @@ public function initObserver(?callable $observerCallback = null): ChainObserver
return $this->chainObserver;
}
-
- /**
- * @return ChainOperationInterface[]
- */
- public function getChainLinks(): array
- {
- return $this->chainLinks;
- }
-
- /**
- * @return string[]
- */
- public function getChainLinkNames(): array
- {
- return $this->chainLinkNames;
- }
-}
+}
\ No newline at end of file
diff --git a/src/Oliverde8/Component/PhpEtl/ChainProcessorInterface.php b/src/Oliverde8/Component/PhpEtl/ChainProcessorInterface.php
index 7b197c4..fc99a14 100644
--- a/src/Oliverde8/Component/PhpEtl/ChainProcessorInterface.php
+++ b/src/Oliverde8/Component/PhpEtl/ChainProcessorInterface.php
@@ -4,30 +4,18 @@
namespace Oliverde8\Component\PhpEtl;
-use Oliverde8\Component\PhpEtl\Exception\ChainOperationException;
use Oliverde8\Component\PhpEtl\Item\ItemInterface;
use Oliverde8\Component\PhpEtl\Model\ExecutionContext;
-/**
- * Class ChainProcessorInterface
- *
- * @author de Cramer Oliver
- * @copyright 2018 Oliverde8
- * @package Oliverde8\Component\PhpEtl
- */
interface ChainProcessorInterface
{
- /**
- * Process items.
- *
- * @param \Iterator $items
- * @param array $parameters
- * @throws ChainOperationException
- */
- public function process(\Iterator $items, array $parameters);
+ public function process(\Iterator|ItemInterface $item, array $parameters, ?callable $observerCallback = null, $withStop = true): void;
- /**
- * Process an item, with chains starting at.
- */
- public function processItemWithChain(ItemInterface $item, int $startAt, ExecutionContext $context);
+ public function processGenerator(
+ \Iterator|ItemInterface $item,
+ ExecutionContext $context,
+ ?callable $observerCallback = null,
+ bool $withStop = true,
+ bool $allowAsynchronous = true
+ ): \Generator;
}
diff --git a/src/Oliverde8/Component/PhpEtl/Item/GroupedItem.php b/src/Oliverde8/Component/PhpEtl/Item/GroupedItem.php
index 47a5b87..0d62981 100644
--- a/src/Oliverde8/Component/PhpEtl/Item/GroupedItem.php
+++ b/src/Oliverde8/Component/PhpEtl/Item/GroupedItem.php
@@ -16,11 +16,6 @@ class GroupedItem implements GroupedItemInterface
protected \Iterator $iterator;
- /**
- * GroupedItem constructor.
- *
- * @param $iterator
- */
public function __construct(\Iterator $iterator)
{
$this->iterator = $iterator;
diff --git a/src/Oliverde8/Component/PhpEtl/Model/RepeatOperationIterator.php b/src/Oliverde8/Component/PhpEtl/Model/RepeatOperationIterator.php
deleted file mode 100644
index a5acaf0..0000000
--- a/src/Oliverde8/Component/PhpEtl/Model/RepeatOperationIterator.php
+++ /dev/null
@@ -1,70 +0,0 @@
-initIterator();
- return $this->lastItem;
- }
-
- public function next(): void
- {
- if (!$this->valid()) {
- $this->lastItem = new ChainBreakItem();
- return;
- }
-
- $this->lastItem = $this->chainProcessor->processItemWithChain($this->inputItem, 0, $this->context);
-
- if (!$this->valid()) {
- $this->lastItem = new ChainBreakItem();
- }
- $this->key++;
- }
-
- public function key(): mixed
- {
- $this->initIterator();
-
- return $this->key;
- }
-
- public function valid(): bool
- {
- if ($this->lastItem) {
- return $this->operation->itemIsValid($this->lastItem, $this->context);
- }
- return true;
- }
-
- public function rewind(): void {}
-
- protected function initIterator(): void
- {
- if ($this->lastItem === null) {
- $this->key = 0;
- $this->next();
- }
- }
-}
diff --git a/src/Oliverde8/Component/PhpEtl/Tests/ChainOperation/ChainRepeatOperationTest.php b/src/Oliverde8/Component/PhpEtl/Tests/ChainOperation/ChainRepeatOperationTest.php
new file mode 100644
index 0000000..0016919
--- /dev/null
+++ b/src/Oliverde8/Component/PhpEtl/Tests/ChainOperation/ChainRepeatOperationTest.php
@@ -0,0 +1,118 @@
+ $callNum++]);
+ });
+ $endOperation = new CallbackTransformerOperation(function (ItemInterface $item) use (&$results) {
+ $results[] = $item->getData();
+ return $item;
+ });
+
+ $chain = $this->createChain([$repeatedOperation], [$endOperation], 'data["val"] != 3');
+ $chain->process(new \ArrayIterator([['var' => 1]]), []);
+
+ $this->assertEquals([['val' => 0], ['val' => 1], ['val' => 2]], $results);
+ }
+
+ public function testRepeatGroupedItem()
+ {
+ $callNum = 0;
+ $results = [];
+ $repeatedOperation = new CallbackTransformerOperation(function (ItemInterface $item) use (&$callNum) {
+ return new GroupedItem(new \ArrayIterator([['val' => $callNum++], ['val' => $callNum++]]));
+ });
+ $endOperation = new CallbackTransformerOperation(function (ItemInterface $item) use (&$results) {
+ if ($item instanceof DataItem) {
+ $results[] = $item->getData();
+ }
+ return $item;
+ });
+
+ $chain = $this->createChain([$repeatedOperation], [$endOperation], 'data["val"] != 3');
+ $chain->process(new \ArrayIterator([['var' => 1]]), []);
+ $this->assertEquals([['val' => 0], ['val' => 1], ['val' => 2]], $results);
+ }
+
+ public function testAsyncDisabled()
+ {
+ $results = [];
+ $callNum = 0;
+ $repeatedOperation = new CallbackTransformerOperation(function (ItemInterface $item) use (&$callNum) {
+ return new MixItem([
+ new TestAsyncItem(new DataItem(['val' => $callNum++, 'speed' => 'slow']), 2),
+ new TestAsyncItem(new DataItem(['val' => $callNum++, 'speed' => 'fast']), 1),
+ ]);
+ });
+ $endOperation = new CallbackTransformerOperation(function (ItemInterface $item) use (&$results) {
+ $results[] = $item->getData();
+ return $item;
+ });
+
+ $chain = $this->createChain([$repeatedOperation], [$endOperation], 'data["val"] != 3');
+ $chain->process(new \ArrayIterator([['var' => 1]]), []);
+
+ // Fast items will need to wait the slow items as we do not allow asynchronous execution inside this repeat.
+ $this->assertEquals([['val' => 0, 'speed' => 'slow'], ['val' => 1, 'speed' => 'fast'], ['val' => 2, 'speed' => 'slow']], $results);
+ }
+
+ public function testAsyncEnabled()
+ {
+ $results = [];
+ $callNum = 0;
+ $repeatedOperation = new CallbackTransformerOperation(function (ItemInterface $item) use (&$callNum) {
+ return new MixItem([
+ new TestAsyncItem(new DataItem(['val' => $callNum++, 'speed' => 'slow']), 2),
+ new TestAsyncItem(new DataItem(['val' => $callNum++, 'speed' => 'fast']), 1),
+ ]);
+ });
+ $endOperation = new CallbackTransformerOperation(function (ItemInterface $item) use (&$results) {
+ $results[] = $item->getData();
+ return $item;
+ });
+
+ $chain = $this->createChain([$repeatedOperation], [$endOperation], 'data["val"] != 5', true);
+ $chain->process(new \ArrayIterator([['var' => 1]]), []);
+
+ // Fast items will need to wait the slow items as we do not allow asynchronous execution inside this repeat.
+ $this->assertEquals([
+ ['val' => 1, 'speed' => 'fast'],
+ ['val' => 0, 'speed' => 'slow'],
+ ['val' => 3, 'speed' => 'fast'],
+ ['val' => 2, 'speed' => 'slow'],
+ ['val' => 4, 'speed' => 'slow'], // The second fast is not returned as our condition blocks it.
+ ], $results);
+ }
+
+ protected function createChain(array $repeatedOperations, array $afterOperations, string $expression, bool $allowAsync = false): ChainProcessor
+ {
+ $executionFactory = new ExecutionContextFactory();
+ $repeatOperation = new ChainRepeatOperation(
+ new ChainProcessor($repeatedOperations, $executionFactory),
+ $expression,
+ $allowAsync,
+ );
+
+ array_unshift($afterOperations, $repeatOperation);
+ return new ChainProcessor($afterOperations, $executionFactory);
+ }
+}
\ No newline at end of file
diff --git a/src/Oliverde8/Component/PhpEtl/Tests/ChainOperation/ChainSplitOperationTest.php b/src/Oliverde8/Component/PhpEtl/Tests/ChainOperation/ChainSplitOperationTest.php
index 940abb0..025be19 100644
--- a/src/Oliverde8/Component/PhpEtl/Tests/ChainOperation/ChainSplitOperationTest.php
+++ b/src/Oliverde8/Component/PhpEtl/Tests/ChainOperation/ChainSplitOperationTest.php
@@ -14,6 +14,7 @@
use Oliverde8\Component\PhpEtl\Item\DataItem;
use Oliverde8\Component\PhpEtl\Item\StopItem;
use Oliverde8\Component\PhpEtl\Model\ExecutionContext;
+use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
class ChainSplitOperationTest extends TestCase
@@ -34,8 +35,8 @@ public function testDataProcessing()
foreach ($processors as $i => $processor) {
$processor->expects($this->exactly(2))
- ->method('processItemWithChain')
- ->withConsecutive([$datas[0], 0, $context], [$datas[1], 0, $context]);
+ ->method('processGenerator')
+ ->withConsecutive([$datas[0], $context], [$datas[1], $context]);
}
@@ -48,32 +49,35 @@ public function testStopProcess()
{
$stopItem = new StopItem();
- /** @var \PHPUnit_Framework_MockObject_MockObject[] $processors */
+ /** @var MockObject[] $processors */
$processors = [
$this->getMockBuilder(ChainProcessorInterface::class)->getMock(),
$this->getMockBuilder(ChainProcessorInterface::class)->getMock(),
];
$context = $this->getMockBuilder(ExecutionContext::class)->disableOriginalConstructor()->getMock();
- $datas = [
- new DataItem(['test-1']),
- new DataItem(['test-2']),
- ];
+ $datas = [new DataItem(['test-1']), new DataItem(['test-2'])];
$processors[0]
->expects($this->exactly(3))
- ->method('processItemWithChain')
- ->withConsecutive([$datas[0], 0, $context], [$stopItem, 0, $context], [$stopItem, 0, $context])
- ->willReturnOnConsecutiveCalls($datas[0], $datas[1], $stopItem);
+ ->method('processGenerator')
+ ->withConsecutive([$datas[0], $context], [$datas[1], $context], [$stopItem,$context, null, true]);
$processors[1]
->expects($this->exactly(3))
- ->method('processItemWithChain')
- ->withConsecutive([$datas[0], 0, $context], [$stopItem, 0, $context], [$stopItem, 0, $context])
- ->willReturnOnConsecutiveCalls($datas[0], $stopItem, $stopItem);
+ ->method('processGenerator')
+ ->withConsecutive([$datas[0], $context], [$datas[1], $context], [$stopItem, $context, null, true]);
$splitOperation = new ChainSplitOperation($processors);
$splitOperation->process($datas[0], $context);
+ $splitOperation->process($datas[1], $context);
while ($splitOperation->process($stopItem, $context) !== $stopItem);
}
+
+ protected function arrayAsGenerator(array $array)
+ {
+ foreach ($array as $item) {
+ yield $item;
+ }
+ }
}
diff --git a/src/Oliverde8/Component/PhpEtl/Tests/ChainOperation/FailSafeOperationTest.php b/src/Oliverde8/Component/PhpEtl/Tests/ChainOperation/FailSafeOperationTest.php
new file mode 100644
index 0000000..6d817df
--- /dev/null
+++ b/src/Oliverde8/Component/PhpEtl/Tests/ChainOperation/FailSafeOperationTest.php
@@ -0,0 +1,78 @@
+ $callNum++]);
+ }
+ if ($callNum == 1) {
+ $callNum++;
+ throw new \Exception("Exception at $callNum");
+ }
+ });
+ $endOperation = new CallbackTransformerOperation(function (ItemInterface $item) use (&$results) {
+ $results[] = $item->getData();
+ return $item;
+ });
+
+ $chain = $this->createChain([$failOperation], [$endOperation]);
+ $chain->process(new \ArrayIterator([['var' => 1],['var' => 2]]), []);
+
+ $this->assertEquals([['val' => 0], ['val' => 2]], $results);
+ }
+
+ public function testToManyFail()
+ {
+ $callNum = 0;
+ $failOperation = new CallbackTransformerOperation(function (ItemInterface $item) use (&$callNum) {
+ $callNum++;
+ throw new \Exception("Exception at $callNum");
+ });
+ $results = [];
+ $endOperation = new CallbackTransformerOperation(function (ItemInterface $item) use (&$results) {
+ $results[] = $item->getData();
+ return $item;
+ });
+
+ $chain = $this->createChain([$failOperation], [$endOperation]);
+ $e = null;
+ try {
+ $chain->process(new \ArrayIterator([['var' => 1], ['var' => 2]]), []);
+ } catch (\Exception $e) {}
+
+ $this->assertInstanceOf(ChainOperationException::class, $e);
+ $this->assertInstanceOf(\Exception::class, $e->getPrevious());
+ $this->assertEquals("Exception at 2", $e->getPrevious()->getMessage());
+ $this->assertEquals([], $results);
+ }
+
+ protected function createChain(array $failSafeOperation, array $afterOperations): ChainProcessor
+ {
+ $executionFactory = new ExecutionContextFactory();
+ $repeatOperation = new FailSafeOperation(
+ new ChainProcessor($failSafeOperation, $executionFactory),
+ [\Exception::class],
+ 2,
+ );
+
+ array_unshift($afterOperations, $repeatOperation);
+ return new ChainProcessor($afterOperations, $executionFactory);
+ }
+}
\ No newline at end of file
diff --git a/src/Oliverde8/Component/PhpEtl/Tests/ChainProcessorTest.php b/src/Oliverde8/Component/PhpEtl/Tests/ChainProcessorTest.php
index 3096959..4972f8b 100644
--- a/src/Oliverde8/Component/PhpEtl/Tests/ChainProcessorTest.php
+++ b/src/Oliverde8/Component/PhpEtl/Tests/ChainProcessorTest.php
@@ -8,6 +8,8 @@
namespace Oliverde8\Component\PhpEtl\Tests;
+use Oliverde8\Component\PhpEtl\Model\ExecutionContext;
+use Oliverde8\Component\PhpEtl\Model\File\LocalFileSystem;
use Oliverde8\Component\PhpEtl\Tests\Item\TestAsyncItem;
use Oliverde8\Component\PhpEtl\ChainOperation\Grouping\SimpleGroupingOperation;
use Oliverde8\Component\PhpEtl\ChainOperation\Transformer\CallbackTransformerOperation;
@@ -199,16 +201,40 @@ public function testAsyncItems()
return $item;
});
- $chainProcessor = new ChainProcessor(
- ['mock' => $mock, 'mocked_end' => $mockEnd],
- new ExecutionContextFactory()
- );
+ $chainProcessor = new ChainProcessor(['mock' => $mock, 'mocked_end' => $mockEnd], new ExecutionContextFactory());
$chainProcessor->process(new \ArrayIterator([1, 2]), ['toto']);
// Order of items changes
$this->assertEquals(["I am fast", "I am slow"], $results);
}
+ public function testReturnOfAsyncItems()
+ {
+ $callNum = 0;
+
+ $mock = new CallbackTransformerOperation(function (ItemInterface $item) use (&$callNum) {
+ $callNum++;
+ if ($callNum == 1) {
+ return new TestAsyncItem(new DataItem('I am slow'), 2);
+ } else {
+ return new TestAsyncItem(new DataItem('I am fast'), 1);
+ }
+ });
+ $mockEnd = new CallbackTransformerOperation(function (ItemInterface $item) use (&$results) {
+ return $item;
+ });
+
+ $chainProcessor = new ChainProcessor(['mock' => $mock, 'mocked_end' => $mockEnd], new ExecutionContextFactory());
+ foreach ($chainProcessor->processGenerator(new \ArrayIterator([1, 2]), new ExecutionContext([], new LocalFileSystem())) as $item) {
+ if ($item instanceof DataItem) {
+ $results[] = $item->getData();
+ }
+ }
+
+ // Order of items changes
+ $this->assertEquals(["I am fast", "I am slow"], $results);
+ }
+
public function testMaxAsyncItems()
{
$results = [];
@@ -219,7 +245,7 @@ public function testMaxAsyncItems()
if ($callNum == 1) {
return new TestAsyncItem(new DataItem('I am slow1'), 2);
} elseif ($callNum == 2) {
- return new TestAsyncItem(new DataItem('I am slow2'), 3);
+ return new TestAsyncItem(new DataItem('I am slow2'), 4);
} else {
return new TestAsyncItem(new DataItem('I am speed'), 1);
}