Skip to content

Commit

Permalink
Simplified caching pipeline batch size configuration (#1239)
Browse files Browse the repository at this point in the history
  • Loading branch information
norberttech authored Sep 29, 2024
1 parent 243f2df commit aa124c4
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 50 deletions.
2 changes: 0 additions & 2 deletions src/core/etl/src/Flow/ETL/Config/Cache/CacheConfig.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@ final class CacheConfig
public const CACHE_DIR_ENV = 'FLOW_LOCAL_FILESYSTEM_CACHE_DIR';

/**
* @param int<1, max> $cacheBatchSize
* @param int<1, max> $externalSortBucketsCount
*/
public function __construct(
public readonly Cache $cache,
public readonly int $cacheBatchSize,
public readonly Path $localFilesystemCacheDir,
public readonly int $externalSortBucketsCount,
) {
Expand Down
25 changes: 3 additions & 22 deletions src/core/etl/src/Flow/ETL/Config/Cache/CacheConfigBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,15 @@
namespace Flow\ETL\Config\Cache;

use function Flow\Filesystem\DSL\protocol;
use Flow\ETL\Cache;
use Flow\ETL\Cache\{Implementation\FilesystemCache};
use Flow\ETL\Exception\{InvalidArgumentException, RuntimeException};
use Flow\Filesystem\{FilesystemTable, Path};
use Flow\Serializer\Serializer;

final class CacheConfigBuilder
{
private ?\Flow\ETL\Cache $cache = null;

/**
* @var int<1, max>
*/
private int $cacheBatchSize = 100;
private ?Cache $cache = null;

/**
* @var int<1, max>
Expand Down Expand Up @@ -46,33 +42,18 @@ public function build(FilesystemTable $fstab, Serializer $serializer) : CacheCon
$serializer,
cacheDir: Path::realpath($cachePath)
),
cacheBatchSize: $this->cacheBatchSize,
localFilesystemCacheDir: Path::realpath($cachePath),
externalSortBucketsCount: $this->externalSortBucketsCount
);
}

public function cache(\Flow\ETL\Cache $cache) : self
public function cache(Cache $cache) : self
{
$this->cache = $cache;

return $this;
}

/**
* @param int<1, max> $cacheBatchSize
*/
public function cacheBatchSize(int $cacheBatchSize) : self
{
if ($cacheBatchSize < 1) {
throw new InvalidArgumentException('Cache batch size must be greater than 0');
}

$this->cacheBatchSize = $cacheBatchSize;

return $this;
}

/**
* @param int<1, max> $externalSortBucketsCount
*/
Expand Down
13 changes: 0 additions & 13 deletions src/core/etl/src/Flow/ETL/Config/ConfigBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
use function Flow\Filesystem\DSL\fstab;
use Flow\ETL\Config\Cache\CacheConfigBuilder;
use Flow\ETL\Config\Sort\SortConfigBuilder;
use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\Filesystem\FilesystemStreams;
use Flow\ETL\Monitoring\Memory\Unit;
use Flow\ETL\PHP\Type\Caster;
Expand Down Expand Up @@ -84,18 +83,6 @@ public function cache(Cache $cache) : self
return $this;
}

/**
* @param int<1, max> $cacheBatchSize
*
* @throws InvalidArgumentException
*/
public function cacheBatchSize(int $cacheBatchSize) : self
{
$this->cache->cacheBatchSize($cacheBatchSize);

return $this;
}

public function dontPutInputIntoRows() : self
{
$this->putInputIntoRows = false;
Expand Down
11 changes: 9 additions & 2 deletions src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ public function batchSize(int $size) : self
* Cache type can be set through ConfigBuilder.
* By default everything is cached in system tmp dir.
*
* Important: cache batch size might significantly improve performance when processing large amount of rows.
* Larger batch size will increase memory consumption but will reduce number of IO operations.
* When not set, the batch size is taken from the last DataFrame::batchSize() call.
*
* @lazy
*
* @param null|string $id
Expand All @@ -183,8 +187,11 @@ public function cache(?string $id = null, ?int $cacheBatchSize = null) : self
throw new InvalidArgumentException('Cache batch size must be greater than 0');
}

$this->batchSize($cacheBatchSize ?? $this->context->config->cache->cacheBatchSize);
$this->pipeline = new LinkedPipeline(new CachingPipeline($this->pipeline, $id));
if ($cacheBatchSize) {
$this->pipeline = new LinkedPipeline(new CachingPipeline(new BatchingPipeline($this->pipeline, $cacheBatchSize), $id));
} else {
$this->pipeline = new LinkedPipeline(new CachingPipeline($this->pipeline, $id));
}

return $this;
}
Expand Down
9 changes: 0 additions & 9 deletions src/core/etl/src/Flow/ETL/Pipeline/PipelineMessage.php

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@

namespace Flow\ETL\Tests\Integration\DataFrame;

use function Flow\ETL\DSL\{config_builder, df, from_cache};
use function Flow\ETL\DSL\{config_builder, df, from_array, from_cache};
use Flow\ETL\Cache\CacheIndex;
use Flow\ETL\Cache\Implementation\InMemoryCache;
use Flow\ETL\Tests\Double\FakeExtractor;
use Flow\ETL\Tests\Integration\IntegrationTestCase;
use Flow\ETL\{Extractor, FlowContext};
use Flow\ETL\{Extractor, FlowContext, Rows};

final class CacheTest extends IntegrationTestCase
{
Expand Down Expand Up @@ -57,4 +57,57 @@ public function extract(FlowContext $context) : \Generator
self::assertEquals(1, $spyExtractor->extractions);
self::assertFalse($cache->has('test_etl_cache'));
}

public function test_cache_with_previously_set_batch_size() : void
{
$cache = new InMemoryCache();

df(config_builder()->cache($cache))
->read(
from_array(\array_map(
fn (int $i) => ['id' => $i],
\range(1, 100)
))
)
->batchSize(20)
->cache('test')
->run();

/** @var CacheIndex $cacheIndex */
$cacheIndex = $cache->get('test');

self::assertCount(5, $cacheIndex->values());

foreach ($cacheIndex->values() as $index => $cacheRowsKey) {
$rows = $cache->get($cacheRowsKey);
self::assertInstanceOf(Rows::class, $rows);
self::assertCount(20, $rows);
}
}

public function test_cache_without_previously_set_batch_size() : void
{
$cache = new InMemoryCache();

df(config_builder()->cache($cache))
->read(
from_array(\array_map(
fn (int $i) => ['id' => $i],
\range(1, 100)
))
)
->cache('test')
->run();

/** @var CacheIndex $cacheIndex */
$cacheIndex = $cache->get('test');

self::assertCount(100, $cacheIndex->values());

foreach ($cacheIndex->values() as $index => $cacheRowsKey) {
$rows = $cache->get($cacheRowsKey);
self::assertInstanceOf(Rows::class, $rows);
self::assertCount(1, $rows);
}
}
}

0 comments on commit aa124c4

Please sign in to comment.