Skip to content

Commit

Permalink
feat: 🌟 Version 2.0 🌟 - Rewrite Chain Processor
Browse files Browse the repository at this point in the history
- This optimizes the chain processor as the old version relied on a lot of StopItems
- Allows ChainProcessor to output through generators the items. This is great to remove all limitations of the current sub chains.
- ChainRepeatOperation now can handle more complex cases & asynchronous tasks.
  • Loading branch information
oliverde8 committed Nov 6, 2024
1 parent 8dc2c90 commit 3fcbcf7
Show file tree
Hide file tree
Showing 22 changed files with 522 additions and 290 deletions.
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
# 🌟 2.0.0 🌟

- :star2: - Complete rewrite of the ChainProcessor
- :star2: - Optimized chain processor not to send as many stop items when multiple chains are involved
- :star2: - Allows ChainProcessor to output through generators the items at the end. This is great to remove all limitations of the current sub chains.
- :star2: - ChainRepeatOperation now can handle more complex cases & asynchronous tasks. The operation is not considered as experimental anymore.
- :star2: - Added FailSafe operation allowing to execute sub-chains and catching exception in them.

- -:exclamation: **Deprecation** The chainProcessorInterface was changed significantly.
- This means any complex custom operations using the chainProcessor needs to be redone. => This shouldn't affect any load/extract or transform operation.
- This means any integrations using the ETL also needs to be updated.
- For context, this change was not done lightly, the ETL was initially developed using php5.5. The php language
has evolved and some complex cases the ETL didn't handle well could be easily fixed by using the new features of
the language (mostly using Generators).
- -:exclamation: **Deprecation** Support for symfony 4 and 5 was dropped.

# 1.2.0
- :star2: Feature #14 - Added possibility to create subchains.
- :star2: Feature #34 - Allow chain's to be observed to see progress.
Expand Down
15 changes: 8 additions & 7 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@
"license": "MIT",
"description": "A small etl coded in php, to extract/transform/load Edit with ease.",
"require": {
"php": ">=8.1",
"php": ">=8.2",
"oliverde8/associative-array-simplified": "^1.0",
"psr/log": "^1.0|^2.0|^3.0",
"symfony/expression-language": "^4.3|^5.0|^6.0|^7.0",
"symfony/filesystem": "^5.4|^6.0|^7.0",
"symfony/validator": "^4.3|^5.0|^6.0|^7.0",
"symfony/yaml": "^4.3|^5.0|^6.0|^7.0"
"psr/log": "^2.0|^3.0",
"symfony/expression-language": "^6.0|^7.0",
"symfony/filesystem": "^6.0|^7.0",
"symfony/validator": "^6.0|^7.0",
"symfony/yaml": "^6.0|^7.0"
},
"require-dev": {
"phpunit/php-code-coverage": "^9.2",
"phpunit/phpunit": "9.4.*"
"phpunit/phpunit": "^9.6",
"rector/rector": "^1.2"
},
"suggest": {
"league/flysystem": "For transfaring files from external file systems",
Expand Down
2 changes: 1 addition & 1 deletion old-docs/examples/00-describe/01-csv-transform.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
$symfonyOutput = new \Oliverde8\Component\PhpEtl\Output\SymfonyConsoleOutput($output);

$chainProcessor->process(
new ArrayIterator([getProcessFilePath(__DIR__, "/customers.csv")]),
new ArrayIterator(["/customers.csv"]),
[],
function (array $operationStates, int $itemsProcessed, int $itemsReturned, bool $hasEnded) use ($symfonyOutput) {
$symfonyOutput->output($operationStates, $hasEnded);
Expand Down
2 changes: 1 addition & 1 deletion old-docs/examples/00-describe/02-csv-transform-merge.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
$chainProcessor = getChainProcessor(__DIR__ . '/01-csv-transform.yml');

$chainProcessor->process(
new ArrayIterator([__DIR__ . "/customers.csv", __DIR__ . "/customers2.csv"]),
new ArrayIterator(["/customers.csv", "/customers2.csv"]),
[]
);
2 changes: 1 addition & 1 deletion old-docs/examples/00-describe/03-json-grouped-merge.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
$chainProcessor = getChainProcessor(__FILE__);

$chainProcessor->process(
new ArrayIterator([__DIR__ . "/customers.csv", __DIR__ . "/customers2.csv"]),
new ArrayIterator(["/customers.csv", "/customers2.csv"]),
[]
);
2 changes: 1 addition & 1 deletion old-docs/examples/00-describe/04-csv-filter.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
$chainProcessor = getChainProcessor(__FILE__);

$chainProcessor->process(
new ArrayIterator([__DIR__ . "/customers.csv", __DIR__ . "/customers2.csv"]),
new ArrayIterator(["/customers.csv", "/customers2.csv"]),
[]
);
2 changes: 1 addition & 1 deletion old-docs/examples/00-describe/06-csv-transform.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@
$chainProcessor = getChainProcessor(__FILE__, $context);

$chainProcessor->process(
new ArrayIterator([__DIR__ . "/customers.csv"]),
new ArrayIterator(["/customers.csv"]),
$context
);
2 changes: 1 addition & 1 deletion old-docs/examples/00-describe/07-json-transform.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
$chainProcessor = getChainProcessor(__FILE__);

$chainProcessor->process(
new ArrayIterator([__DIR__ . "/products.json"]),
new ArrayIterator(["/products.json"]),
[
'locales' => ['fr_FR', 'en_US']
]
Expand Down
1 change: 1 addition & 0 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<env name="APP_DEBUG" value="1" />
<env name="APP_SECRET" value="s$cretf0rt3st" />
<env name="SHELL_VERBOSITY" value="-1" />
<env name="XDEBUG_MODE" value="coverage" />
<!-- define your env variables for the test env here -->
</php>

Expand Down
14 changes: 14 additions & 0 deletions rector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

use Rector\Config\RectorConfig;

return RectorConfig::configure()
->withPaths([
__DIR__ . '/old-docs',
__DIR__ . '/src',
])
// uncomment to reach your current PHP version
// ->withPhpSets()
->withTypeCoverageLevel(0);
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ public function __construct(string $operation, string $class, protected readonly

public function build(string $operation, array $options): ChainOperationInterface
{
$maxAsync = $options['maxASynchronousItems'] ?? 0;
$allowAsync = $maxAsync > 0;

// Do not allow
$chainProcessor = $this->builder->buildChainProcessor($options['chain'],[], 0);
return $this->create($chainProcessor, $options['validationExpr']);
$chainProcessor = $this->builder->buildChainProcessor($options['chain'],[], $maxAsync);
return $this->create($chainProcessor, $options['validationExpr'], $allowAsync);
}

protected function configureValidator(): Constraint
Expand All @@ -33,6 +36,9 @@ protected function configureValidator(): Constraint
new Assert\Type(["type" => "string"]),
new Assert\NotBlank(),
],
'maxASynchronousItems' => [
new Assert\Type(["type" => "int"]),
]
]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@

namespace Oliverde8\Component\PhpEtl\ChainOperation;

use oliverde8\AssociativeArraySimplified\AssociativeArray;
use Oliverde8\Component\PhpEtl\ChainProcessor;
use Oliverde8\Component\PhpEtl\Item\DataItemInterface;
use Oliverde8\Component\PhpEtl\Item\GroupedItem;
use Oliverde8\Component\PhpEtl\Item\ItemInterface;
use Oliverde8\Component\PhpEtl\Model\ExecutionContext;
use Oliverde8\Component\PhpEtl\Model\RepeatOperationIterator;
use Symfony\Component\ExpressionLanguage\ExpressionLanguage;

class ChainRepeatOperation extends AbstractChainOperation implements DetailedObservableOperation
Expand All @@ -20,7 +18,8 @@ class ChainRepeatOperation extends AbstractChainOperation implements DetailedObs

public function __construct(
protected readonly ChainProcessor $chainProcessor,
protected readonly string $validationExpression
protected readonly string $validationExpression,
protected readonly bool $allowAsynchronous = false,
) {
$this->onSplittedChainOperationConstruct([$chainProcessor]);
$this->expressionLanguage = new ExpressionLanguage();
Expand All @@ -29,12 +28,22 @@ public function __construct(
public function processData(DataItemInterface $inputItem, ExecutionContext $context): ItemInterface
{
// Nothing to process.
return new GroupedItem(new RepeatOperationIterator(
$this->chainProcessor,
$inputItem,
$context,
$this
));
return new GroupedItem($this->repeatOnItem($inputItem, $context));
}

public function repeatOnItem(DataItemInterface $inputItem, ExecutionContext $context): \Generator
{
$invalidItem = false;
do {
$item = null;
foreach ($this->chainProcessor->processGenerator($inputItem, $context, withStop: false, allowAsynchronous: $this->allowAsynchronous) as $item) {
if ($this->itemIsValid($item, $context)) {
yield $item;
} else {
$invalidItem = true;
}
}
} while ($item && !$invalidItem);
}

public function itemIsValid(ItemInterface $item, ExecutionContext $context): bool
Expand All @@ -43,7 +52,8 @@ public function itemIsValid(ItemInterface $item, ExecutionContext $context): boo
$values = ['data' => $item->getData(), 'context' => $context];
return $this->expressionLanguage->evaluate($this->validationExpression, $values);
}

return false;

// If not a data, then it's valid.
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public function __construct(array $chainProcessors)
public function processData(DataItemInterface $item, ExecutionContext $context): ItemInterface
{
foreach ($this->chainProcessors as $chainProcessor) {
$chainProcessor->processItemWithChain($item, 0, $context);
foreach ($chainProcessor->processGenerator($item, $context, withStop: false) as $newItem) {}
}

// Nothing to process.
Expand All @@ -51,12 +51,7 @@ public function processData(DataItemInterface $item, ExecutionContext $context):
public function processStop(StopItem $item, ExecutionContext $context): ItemInterface
{
foreach ($this->chainProcessors as $chainProcessor) {
$result = $chainProcessor->processItemWithChain($item, 0, $context);

if ($result !== $item) {
// Return a new stop item in order to continue flushing out data with stop items.
$item = new StopItem();
}
foreach ($chainProcessor->processGenerator($item, $context) as $newItem) {}
}

return $item;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?php
declare(strict_types=1);

namespace Oliverde8\Component\PhpEtl\ChainOperation;

use Oliverde8\Component\PhpEtl\ChainProcessor;
use Oliverde8\Component\PhpEtl\Item\DataItemInterface;
use Oliverde8\Component\PhpEtl\Item\GroupedItem;
use Oliverde8\Component\PhpEtl\Item\ItemInterface;
use Oliverde8\Component\PhpEtl\Item\StopItem;
use Oliverde8\Component\PhpEtl\Model\ExecutionContext;

class FailSafeOperation implements ChainOperationInterface, DetailedObservableOperation
{
use SplittedChainOperationTrait;

protected int $count = 0;

public function __construct(
protected readonly ChainProcessor $chainProcessor,
protected readonly array $exceptionsToCatch,
protected readonly int $nbAttempts,
){}

public function process(ItemInterface $item, ExecutionContext $context)
{
if ($item instanceof StopItem) {
foreach ($this->repeatOnItem($item, $context) as $newItem) {}
return $item;
}

return new GroupedItem($this->repeatOnItem($item, $context));
}

public function repeatOnItem(ItemInterface $inputItem, ExecutionContext $context): \Generator
{
$nbAttempts = 0;
do {
try {
foreach ($this->chainProcessor->processGenerator($inputItem, $context, withStop: false) as $newItem) {
yield $newItem;
}
return;
} catch (\Exception $exception) {
$nbAttempts++;
$exceptionHandled = false;
foreach ($this->exceptionsToCatch as $exceptionType) {
if ($exception instanceof $exceptionType) {
$exceptionHandled = true;
}
}

if (!$exceptionHandled || $nbAttempts >= $this->nbAttempts) {
$context->getLogger()->error("Failed to handle exception in fail safe!", ['exception' => $exception]);
throw $exception;
} else {
$context->getLogger()->warning("Handling exception with fail safe!", ['exception' => $exception, 'nbAttempts' => $nbAttempts]);
}
}
} while ($nbAttempts < $this->nbAttempts);
}
}
Loading

0 comments on commit 3fcbcf7

Please sign in to comment.