From ec58afe8f68707ac9da6def3ab7517e2240f55c4 Mon Sep 17 00:00:00 2001 From: Martin Zajic Date: Tue, 12 Nov 2019 16:24:56 +0100 Subject: [PATCH 1/4] use generator to optimize memory --- src/Backend/BackendImportAdapterInterface.php | 6 ++--- .../SnowflakeImportAdapterInterface.php | 4 +-- src/Storage/ABS/SnowflakeImportAdapter.php | 9 +++---- .../Snowflake/SnowflakeImportAdapter.php | 10 ++++---- src/Storage/Snowflake/Table.php | 1 - .../ABS/BaseFileTest.php | 0 .../ABS/DestinationFileTest.php | 0 .../ABS/SnowflakeExportAdapterTest.php | 0 .../ABS/SnowflakeImportAdapterTest.php | 25 ++++++++++++++----- .../ABS/SourceFileTest.php | 0 .../Snowflake/SnowflakeAdapterTest.php | 12 ++++++--- .../Snowflake/TableTest.php | 0 12 files changed, 41 insertions(+), 26 deletions(-) rename tests/unit/{SourceStorage => Storage}/ABS/BaseFileTest.php (100%) rename tests/unit/{SourceStorage => Storage}/ABS/DestinationFileTest.php (100%) rename tests/unit/{SourceStorage => Storage}/ABS/SnowflakeExportAdapterTest.php (100%) rename tests/unit/{SourceStorage => Storage}/ABS/SnowflakeImportAdapterTest.php (92%) rename tests/unit/{SourceStorage => Storage}/ABS/SourceFileTest.php (100%) rename tests/unit/{SourceStorage => Storage}/Snowflake/SnowflakeAdapterTest.php (91%) rename tests/unit/{SourceStorage => Storage}/Snowflake/TableTest.php (100%) diff --git a/src/Backend/BackendImportAdapterInterface.php b/src/Backend/BackendImportAdapterInterface.php index 4cd7880f..a184001b 100644 --- a/src/Backend/BackendImportAdapterInterface.php +++ b/src/Backend/BackendImportAdapterInterface.php @@ -4,6 +4,7 @@ namespace Keboola\Db\ImportExport\Backend; +use Generator; use Keboola\Db\ImportExport\ImportOptions; use Keboola\Db\ImportExport\Storage\DestinationInterface; use Keboola\Db\ImportExport\Storage\SourceInterface; @@ -12,12 +13,9 @@ interface BackendImportAdapterInterface { public function __construct(SourceInterface $source); - /** - * @return string[] - */ public function getCopyCommands( DestinationInterface $destination, ImportOptions $importOptions, string $stagingTableName - ): array; + ): Generator; } diff --git a/src/Backend/Snowflake/SnowflakeImportAdapterInterface.php b/src/Backend/Snowflake/SnowflakeImportAdapterInterface.php index c5f23a37..48a00d8d 100644 --- a/src/Backend/Snowflake/SnowflakeImportAdapterInterface.php +++ b/src/Backend/Snowflake/SnowflakeImportAdapterInterface.php @@ -4,6 +4,7 @@ namespace Keboola\Db\ImportExport\Backend\Snowflake; +use Generator; use Keboola\Db\ImportExport\Backend\ImportState; use Keboola\Db\ImportExport\ImportOptions; use Keboola\Db\ImportExport\Backend\BackendImportAdapterInterface; @@ -15,11 +16,10 @@ interface SnowflakeImportAdapterInterface extends BackendImportAdapterInterface /** * Snowflake import is handled differently for copy table2table and file2table * - * @param string[] $commands - sql commands array * @return int - number of imported rows */ public function executeCopyCommands( - array $commands, + Generator $commands, Connection $connection, DestinationInterface $destination, ImportOptions $importOptions, diff --git a/src/Storage/ABS/SnowflakeImportAdapter.php b/src/Storage/ABS/SnowflakeImportAdapter.php index c7cd89c3..22b34238 100644 --- a/src/Storage/ABS/SnowflakeImportAdapter.php +++ b/src/Storage/ABS/SnowflakeImportAdapter.php @@ -4,6 +4,7 @@ namespace Keboola\Db\ImportExport\Storage\ABS; +use Generator; use Keboola\Csv\CsvOptions; use Keboola\Db\ImportExport\Backend\ImporterInterface; use Keboola\Db\ImportExport\Backend\ImportState; @@ -35,7 +36,7 @@ public function __construct(SourceInterface $source) * @param Table $destination */ public function executeCopyCommands( - array $commands, + Generator $commands, Connection $connection, DestinationInterface $destination, ImportOptions $importOptions, @@ -62,11 +63,10 @@ public function getCopyCommands( DestinationInterface $destination, ImportOptions $importOptions, string $stagingTableName - ): array { + ): Generator { $filesToImport = $this->source->getManifestEntries(); - $commands = []; foreach (array_chunk($filesToImport, ImporterInterface::SLICED_FILES_CHUNK_SIZE) as $entries) { - $commands[] = sprintf( + yield sprintf( 'COPY INTO %s.%s FROM %s CREDENTIALS=(AZURE_SAS_TOKEN=\'%s\') @@ -88,7 +88,6 @@ function ($entry) { ) ); } - return $commands; } private function getCsvCopyCommandOptions( diff --git a/src/Storage/Snowflake/SnowflakeImportAdapter.php b/src/Storage/Snowflake/SnowflakeImportAdapter.php index 3e0be8d2..4a241a6b 100644 --- a/src/Storage/Snowflake/SnowflakeImportAdapter.php +++ b/src/Storage/Snowflake/SnowflakeImportAdapter.php @@ -4,9 +4,9 @@ namespace Keboola\Db\ImportExport\Storage\Snowflake; +use Generator; use Keboola\Db\ImportExport\Backend\ImportState; use Keboola\Db\ImportExport\ImportOptions; -use Keboola\Db\ImportExport\Backend\Snowflake\Helper\QuoteHelper; use Keboola\Db\ImportExport\Backend\Snowflake\SnowflakeImportAdapterInterface; use Keboola\Db\ImportExport\Storage\DestinationInterface; use Keboola\Db\ImportExport\Storage\SourceInterface; @@ -32,14 +32,14 @@ public function __construct(SourceInterface $source) * @param Table $destination */ public function executeCopyCommands( - array $commands, + Generator $commands, Connection $connection, DestinationInterface $destination, ImportOptions $importOptions, ImportState $importState ): int { $importState->startTimer('copyToStaging'); - $connection->query($commands[0]); + $connection->query($commands->current()); $rows = $connection->fetchAll(sprintf( 'SELECT COUNT(*) AS "count" FROM %s.%s', QueryBuilder::quoteIdentifier($destination->getSchema()), @@ -56,7 +56,7 @@ public function getCopyCommands( DestinationInterface $destination, ImportOptions $importOptions, string $stagingTableName - ): array { + ): Generator { $quotedColumns = array_map(function ($column) { return QueryBuilder::quoteIdentifier($column); }, $importOptions->getColumns()); @@ -75,6 +75,6 @@ public function getCopyCommands( QueryBuilder::quoteIdentifier($this->source->getTableName()), ); - return [$sql]; + yield $sql; } } diff --git a/src/Storage/Snowflake/Table.php b/src/Storage/Snowflake/Table.php index 5d738612..e027c77f 100644 --- a/src/Storage/Snowflake/Table.php +++ b/src/Storage/Snowflake/Table.php @@ -8,7 +8,6 @@ use Keboola\Db\ImportExport\Backend\BackendImportAdapterInterface; use Keboola\Db\ImportExport\Backend\ExporterInterface; use Keboola\Db\ImportExport\Backend\ImporterInterface; -use Keboola\Db\ImportExport\Backend\Snowflake\Helper\QuoteHelper; use Keboola\Db\ImportExport\Backend\Snowflake\Importer as SnowflakeImporter; use Keboola\Db\ImportExport\Storage\DestinationInterface; use Keboola\Db\ImportExport\Storage\NoBackendAdapterException; diff --git a/tests/unit/SourceStorage/ABS/BaseFileTest.php b/tests/unit/Storage/ABS/BaseFileTest.php similarity index 100% rename from tests/unit/SourceStorage/ABS/BaseFileTest.php rename to tests/unit/Storage/ABS/BaseFileTest.php diff --git a/tests/unit/SourceStorage/ABS/DestinationFileTest.php b/tests/unit/Storage/ABS/DestinationFileTest.php similarity index 100% rename from tests/unit/SourceStorage/ABS/DestinationFileTest.php rename to tests/unit/Storage/ABS/DestinationFileTest.php diff --git a/tests/unit/SourceStorage/ABS/SnowflakeExportAdapterTest.php b/tests/unit/Storage/ABS/SnowflakeExportAdapterTest.php similarity index 100% rename from tests/unit/SourceStorage/ABS/SnowflakeExportAdapterTest.php rename to tests/unit/Storage/ABS/SnowflakeExportAdapterTest.php diff --git a/tests/unit/SourceStorage/ABS/SnowflakeImportAdapterTest.php b/tests/unit/Storage/ABS/SnowflakeImportAdapterTest.php similarity index 92% rename from tests/unit/SourceStorage/ABS/SnowflakeImportAdapterTest.php rename to tests/unit/Storage/ABS/SnowflakeImportAdapterTest.php index 196a2bba..2febcfa6 100644 --- a/tests/unit/SourceStorage/ABS/SnowflakeImportAdapterTest.php +++ b/tests/unit/Storage/ABS/SnowflakeImportAdapterTest.php @@ -4,6 +4,7 @@ namespace Tests\Keboola\Db\ImportExportUnit\Storage\ABS; +use Generator; use Keboola\Csv\CsvOptions; use Keboola\Db\ImportExport\Backend\ImportState; use Keboola\Db\ImportExport\ImportOptions; @@ -30,9 +31,14 @@ public function testExecuteCopyCommands(): void /** @var ImportOptions|MockObject $options */ $options = self::createMock(ImportOptions::class); + $generator = function (): Generator { + yield 'cmd1'; + yield 'cmd2'; + }; + $adapter = new SnowflakeImportAdapter($source); $rows = $adapter->executeCopyCommands( - ['cmd1', 'cmd2'], + $generator(), $connection, new Storage\Snowflake\Table('', ''), $options, @@ -60,7 +66,7 @@ public function testGetCopyCommands(): void 'stagingTable' ); - self::assertSame([ + self::assertSame( <<current() + ); } public function testGetCopyCommandsChunk(): void @@ -105,7 +111,7 @@ public function testGetCopyCommandsChunk(): void return sprintf("'%s'", $file); }, $cmd2Files)); - self::assertSame([ + self::assertSame( <<current() + ); + + $commands->next(); + + self::assertSame( <<current() + ); } } diff --git a/tests/unit/SourceStorage/ABS/SourceFileTest.php b/tests/unit/Storage/ABS/SourceFileTest.php similarity index 100% rename from tests/unit/SourceStorage/ABS/SourceFileTest.php rename to tests/unit/Storage/ABS/SourceFileTest.php diff --git a/tests/unit/SourceStorage/Snowflake/SnowflakeAdapterTest.php b/tests/unit/Storage/Snowflake/SnowflakeAdapterTest.php similarity index 91% rename from tests/unit/SourceStorage/Snowflake/SnowflakeAdapterTest.php rename to tests/unit/Storage/Snowflake/SnowflakeAdapterTest.php index 0554d10c..70d5964a 100644 --- a/tests/unit/SourceStorage/Snowflake/SnowflakeAdapterTest.php +++ b/tests/unit/Storage/Snowflake/SnowflakeAdapterTest.php @@ -4,6 +4,7 @@ namespace Tests\Keboola\Db\ImportExportUnit\Storage\Snowflake; +use Generator; use Keboola\Db\ImportExport\Backend\ImportState; use Keboola\Db\ImportExport\ImportOptions; use Keboola\Db\ImportExport\Storage\Snowflake\SnowflakeImportAdapter; @@ -26,9 +27,13 @@ public function testExecuteCopyCommands(): void /** @var ImportOptions|MockObject $options */ $options = self::createMock(ImportOptions::class); + $generator = function (): Generator { + yield 'cmd1'; + }; + $adapter = new SnowflakeImportAdapter(new Storage\Snowflake\Table('schema', 'table')); $rows = $adapter->executeCopyCommands( - ['cmd1'], + $generator(), $connection, new Storage\Snowflake\Table('schema', 'table'), $options, @@ -54,8 +59,9 @@ public function testGetCopyCommands(): void 'stagingTable' ); - self::assertSame([ + self::assertSame( 'INSERT INTO "schema"."stagingTable" ("col1", "col2") SELECT "col1", "col2" FROM "schema"."table"', - ], $commands); + $commands->current() + ); } } diff --git a/tests/unit/SourceStorage/Snowflake/TableTest.php b/tests/unit/Storage/Snowflake/TableTest.php similarity index 100% rename from tests/unit/SourceStorage/Snowflake/TableTest.php rename to tests/unit/Storage/Snowflake/TableTest.php From 37c301f6fa595995a1a2feb206632b8e91e082e8 Mon Sep 17 00:00:00 2001 From: Martin Zajic Date: Wed, 13 Nov 2019 11:39:49 +0100 Subject: [PATCH 2/4] use pcrov/jsonreader to stream json file --- Dockerfile | 4 + composer.json | 2 + src/Storage/ABS/SnowflakeImportAdapter.php | 65 ++++-- src/Storage/ABS/SourceFile.php | 12 +- src/Storage/ManifestReader.php | 31 +++ tests/HugeManifest.php | 195 ++++++++++++++++++ tests/testHugeManifest.php | 24 +++ .../ABS/SnowflakeImportAdapterTest.php | 12 +- tests/unit/Storage/ABS/SourceFileTest.php | 8 +- 9 files changed, 323 insertions(+), 30 deletions(-) create mode 100644 src/Storage/ManifestReader.php create mode 100644 tests/HugeManifest.php create mode 100644 tests/testHugeManifest.php diff --git a/Dockerfile b/Dockerfile index 16ead017..0ee09124 100644 --- a/Dockerfile +++ b/Dockerfile @@ -24,6 +24,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ debsig-verify \ dirmngr \ gpg-agent \ + libicu-dev \ && rm -r /var/lib/apt/lists/* \ && sed -i 's/^# *\(en_US.UTF-8\)/\1/' /etc/locale.gen \ && locale-gen \ @@ -64,6 +65,9 @@ RUN mkdir -p ~/.gnupg \ && gpg --batch --delete-key --yes $SNOWFLAKE_GPG_KEY \ && dpkg -i /tmp/snowflake-odbc.deb +RUN docker-php-ext-configure intl \ + && docker-php-ext-install intl + ## Composer - deps always cached unless changed # First copy only composer files COPY composer.* /code/ diff --git a/composer.json b/composer.json index 1dcb443f..4978b4a4 100644 --- a/composer.json +++ b/composer.json @@ -7,6 +7,7 @@ "keboola/db-adapter-snowflake": "^1.0.0", "keboola/php-component": "^7.0.1", "microsoft/azure-storage-blob": "^1.4", + "pcrov/jsonreader": "^1.0", "symfony/stopwatch": "^4.3" }, "require-dev": { @@ -32,6 +33,7 @@ } }, "scripts": { + "testHuge": "php ./tests/testHugeManifest.php", "tests-unit": "phpunit tests/unit", "tests-functional": "phpunit tests/functional", "tests": [ diff --git a/src/Storage/ABS/SnowflakeImportAdapter.php b/src/Storage/ABS/SnowflakeImportAdapter.php index 22b34238..01cd0d36 100644 --- a/src/Storage/ABS/SnowflakeImportAdapter.php +++ b/src/Storage/ABS/SnowflakeImportAdapter.php @@ -64,30 +64,57 @@ public function getCopyCommands( ImportOptions $importOptions, string $stagingTableName ): Generator { - $filesToImport = $this->source->getManifestEntries(); - foreach (array_chunk($filesToImport, ImporterInterface::SLICED_FILES_CHUNK_SIZE) as $entries) { - yield sprintf( - 'COPY INTO %s.%s + $entriesInChunk = []; + foreach ($this->source->getManifestEntries() as $entry) { + $entriesInChunk[] = $entry; + if (count($entriesInChunk) === ImporterInterface::SLICED_FILES_CHUNK_SIZE) { + yield $this->getCopyCommand( + $destination, + $importOptions, + $stagingTableName, + $entriesInChunk + ); + $entriesInChunk = []; + } + } + yield $this->getCopyCommand( + $destination, + $importOptions, + $stagingTableName, + $entriesInChunk + ); + } + + /** + * @param Table $destination + */ + private function getCopyCommand( + DestinationInterface $destination, + ImportOptions $importOptions, + string $stagingTableName, + array $entriesInChunk + ): string { + return sprintf( + 'COPY INTO %s.%s FROM %s CREDENTIALS=(AZURE_SAS_TOKEN=\'%s\') FILE_FORMAT = (TYPE=CSV %s) FILES = (%s)', - QueryBuilder::quoteIdentifier($destination->getSchema()), - QueryBuilder::quoteIdentifier($stagingTableName), - QueryBuilder::quote($this->source->getContainerUrl()), - $this->source->getSasToken(), - implode(' ', $this->getCsvCopyCommandOptions($importOptions, $this->source->getCsvOptions())), - implode( - ', ', - array_map( - function ($entry) { - return QueryBuilder::quote(strtr($entry, [$this->source->getContainerUrl() => ''])); - }, - $entries - ) + QueryBuilder::quoteIdentifier($destination->getSchema()), + QueryBuilder::quoteIdentifier($stagingTableName), + QueryBuilder::quote($this->source->getContainerUrl()), + $this->source->getSasToken(), + implode(' ', $this->getCsvCopyCommandOptions($importOptions, $this->source->getCsvOptions())), + implode( + ', ', + array_map( + function ($entry) { + return QueryBuilder::quote(strtr($entry['url'], [$this->source->getContainerUrl() => ''])); + }, + $entriesInChunk ) - ); - } + ) + ); } private function getCsvCopyCommandOptions( diff --git a/src/Storage/ABS/SourceFile.php b/src/Storage/ABS/SourceFile.php index 5597de9c..23c3f36b 100644 --- a/src/Storage/ABS/SourceFile.php +++ b/src/Storage/ABS/SourceFile.php @@ -4,10 +4,12 @@ namespace Keboola\Db\ImportExport\Storage\ABS; +use Generator; use Keboola\Csv\CsvOptions; use Keboola\Db\ImportExport\Backend\BackendImportAdapterInterface; use Keboola\Db\ImportExport\Backend\ImporterInterface; use Keboola\Db\ImportExport\Backend\Snowflake\Importer as SnowflakeImporter; +use Keboola\Db\ImportExport\Storage\ManifestReader; use Keboola\Db\ImportExport\Storage\NoBackendAdapterException; use Keboola\Db\ImportExport\Storage\SourceInterface; use MicrosoftAzure\Storage\Blob\BlobRestProxy; @@ -54,10 +56,11 @@ public function getCsvOptions(): CsvOptions return $this->csvOptions; } - public function getManifestEntries(): array + public function getManifestEntries(): Generator { if (!$this->isSliced) { - return [$this->getContainerUrl() . $this->filePath]; + yield from [['url' => $this->getContainerUrl() . $this->filePath]]; + return; } $SASConnectionString = sprintf( @@ -74,9 +77,6 @@ public function getManifestEntries(): array ); $getResult = $blobClient->getBlob($this->container, $this->filePath); - $manifest = \GuzzleHttp\json_decode(stream_get_contents($getResult->getContentStream()), true); - return array_map(function ($entry) { - return $entry['url']; - }, $manifest['entries']); + yield from ManifestReader::readEntries($getResult->getContentStream()); } } diff --git a/src/Storage/ManifestReader.php b/src/Storage/ManifestReader.php new file mode 100644 index 00000000..4ff49810 --- /dev/null +++ b/src/Storage/ManifestReader.php @@ -0,0 +1,31 @@ +stream($stream); + + $reader->read('entries'); + $depth = $reader->depth(); // Check in a moment to break when the array is done. + + $reader->read(); // Step to the first element. + do { + yield $reader->value(); + } while ($reader->next() && $reader->depth() > $depth); // Read each sibling. + + $reader->close(); + } +} diff --git a/tests/HugeManifest.php b/tests/HugeManifest.php new file mode 100644 index 00000000..25ceb8cb --- /dev/null +++ b/tests/HugeManifest.php @@ -0,0 +1,195 @@ +accountName = $accountName; + $this->containerName = $containerName; + $this->loader = new AbsLoader($accountName, $containerName); + } + + public function run(): void + { + $this->temp = new Temp(); + $this->temp->initRunFolder(); + + $this->stopwatch = new Stopwatch(); + $this->stopwatch->start('test'); + $this->loader->deleteContainer(); + $this->loader->createContainer(); + + $this->stopwatch->start('upload'); + $this->uploadManifestAndSlices(); + $event = $this->stopwatch->stop('upload'); + echo 'max memory upload: ' . $event->getMemory() . PHP_EOL; + + $this->stopwatch->start('commands'); + $this->generateCommands(); + $event = $this->stopwatch->stop('commands'); + echo 'max memory commands: ' . $this->getMemoryForHuman($event->getMemory()) . PHP_EOL; + + $this->loader->deleteContainer(); + $event = $this->stopwatch->stop('test'); + echo 'max memory: ' . $this->getMemoryForHuman($event->getMemory()) . PHP_EOL; + } + + private function uploadManifestAndSlices(): void + { + $this->printMemory(); + $manifest = $this->openManifestFile(); + + echo 'Generating manifest' . PHP_EOL; + for ($i = 0; $i <= self::SLICES_TOTAL; $i++) { + $sliceName = sprintf('my_awesome_long_name_slice.csv_%d', $i); + fwrite($manifest, sprintf( + '{"url":"%s"}%s' . PHP_EOL, + $this->getAbsUrl($sliceName), + $i === self::SLICES_TOTAL ? '' : ',' + )); + } + + $this->closeManifestFile($manifest); + + echo PHP_EOL; + + echo 'Uploading manifest' . PHP_EOL; + + $this->loader->getBlobService()->createBlockBlob( + $this->containerName, + self::MANIFEST_FILE_NAME, + file_get_contents($this->getManifestFileName()) + ); + + echo sprintf('Manifest file size: %s bytes', filesize($this->getManifestFileName())) . PHP_EOL; + $this->printMemory(); + } + + private function printMemory(): void + { + $memUsage = memory_get_usage(true); + + echo $this->getMemoryForHuman($memUsage); + + echo PHP_EOL; + } + + private function getMemoryForHuman(int $memUsage): string + { + if ($memUsage < 1024) { + return $memUsage . ' bytes'; + } elseif ($memUsage < 1048576) { + return round($memUsage / 1024, 2) . ' kilobytes'; + } else { + return round($memUsage / 1048576, 2) . ' megabytes'; + } + } + + /** + * @return false|resource + */ + private function openManifestFile() + { + file_put_contents($this->getManifestFileName(), '{"entries":[' . PHP_EOL); + return fopen($this->getManifestFileName(), 'a'); + } + + private function getManifestFileName(): string + { + if ($this->manifestFile === null) { + $this->manifestFile = $this->temp->getTmpFolder() . '/' . self::MANIFEST_FILE_NAME; + } + + return $this->manifestFile; + } + + private function getAbsUrl(string $fileName): string + { + return sprintf( + 'azure://%s.%s/%s/%s', + $this->accountName, + Resources::BLOB_BASE_DNS_NAME, + $this->containerName, + $fileName + ); + } + + /** + * @param resource $resource + */ + private function closeManifestFile($resource): void + { + fwrite($resource, ']}'); + fclose($resource); + } + + private function generateCommands(): void + { + $source = new Storage\ABS\SourceFile( + $this->containerName, + self::MANIFEST_FILE_NAME, + $this->getCredentialsForAzureContainer($this->containerName), + $this->accountName, + new CsvOptions, + true + ); + $destination = new Storage\Snowflake\Table('schema', 'table'); + $options = new ImportOptions(); + $adapter = new Storage\ABS\SnowflakeImportAdapter($source); + echo 'Generating commands' . PHP_EOL; + foreach ($adapter->getCopyCommands( + $destination, + $options, + 'stagingTable' + ) as $index => $cmd) { + $this->stopwatch->lap('commands'); + }; + } +} diff --git a/tests/testHugeManifest.php b/tests/testHugeManifest.php new file mode 100644 index 00000000..0861dbec --- /dev/null +++ b/tests/testHugeManifest.php @@ -0,0 +1,24 @@ +run(); diff --git a/tests/unit/Storage/ABS/SnowflakeImportAdapterTest.php b/tests/unit/Storage/ABS/SnowflakeImportAdapterTest.php index 2febcfa6..47ec467f 100644 --- a/tests/unit/Storage/ABS/SnowflakeImportAdapterTest.php +++ b/tests/unit/Storage/ABS/SnowflakeImportAdapterTest.php @@ -53,7 +53,9 @@ public function testGetCopyCommands(): void /** @var Storage\ABS\SourceFile|MockObject $source */ $source = self::createMock(Storage\ABS\SourceFile::class); $source->expects(self::once())->method('getCsvOptions')->willReturn(new CsvOptions()); - $source->expects(self::once())->method('getManifestEntries')->willReturn(['azure://url']); + $source->expects(self::once())->method('getManifestEntries')->willReturnCallback(function () { + yield ['url' => 'azure://url']; + }); $source->expects(self::exactly(2))->method('getContainerUrl')->willReturn('containerUrl'); $source->expects(self::once())->method('getSasToken')->willReturn('sasToken'); @@ -88,7 +90,13 @@ public function testGetCopyCommandsChunk(): void /** @var Storage\ABS\SourceFile|MockObject $source */ $source = self::createMock(Storage\ABS\SourceFile::class); $source->expects(self::exactly(2))->method('getCsvOptions')->willReturn(new CsvOptions()); - $source->expects(self::exactly(1))->method('getManifestEntries')->willReturn($files); + $source->expects(self::exactly(1))->method('getManifestEntries')->willReturnCallback( + function () use ($files) { + foreach ($files as $file) { + yield ['url' => $file]; + } + } + ); $source->expects(self::exactly(1502/*Called for each entry plus 2times*/)) ->method('getContainerUrl')->willReturn('containerUrl'); $source->expects(self::exactly(2))->method('getSasToken')->willReturn('sasToken'); diff --git a/tests/unit/Storage/ABS/SourceFileTest.php b/tests/unit/Storage/ABS/SourceFileTest.php index 22a43d9e..52964a39 100644 --- a/tests/unit/Storage/ABS/SourceFileTest.php +++ b/tests/unit/Storage/ABS/SourceFileTest.php @@ -62,15 +62,17 @@ public function testGetManifestEntries(): void { $source = $this->createDummyABSSourceInstance('empty.csv'); self::assertSame( - ['azure://absAccount.blob.core.windows.net/absContainer/empty.csv'], - $source->getManifestEntries() + [ + ['url' => 'azure://absAccount.blob.core.windows.net/absContainer/empty.csv'], + ], + iterator_to_array($source->getManifestEntries()) ); } public function testGetManifestEntriesIncremental(): void { $source = $this->createABSSourceInstance('sliced/accounts/accounts.csvmanifest', true); - $entries = $source->getManifestEntries(); + $entries = iterator_to_array($source->getManifestEntries()); self::assertCount(2, $entries); } } From 28006edfe4a387aeb0f1fc6d4e22f458b7808264 Mon Sep 17 00:00:00 2001 From: Martin Zajic Date: Wed, 13 Nov 2019 12:42:09 +0100 Subject: [PATCH 3/4] fix edge case when entries are empty --- src/Storage/ManifestReader.php | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/Storage/ManifestReader.php b/src/Storage/ManifestReader.php index 4ff49810..60d92af4 100644 --- a/src/Storage/ManifestReader.php +++ b/src/Storage/ManifestReader.php @@ -23,6 +23,10 @@ public static function readEntries( $reader->read(); // Step to the first element. do { + if ($reader->value() === null) { + // on empty entries lib will return null item + continue; + } yield $reader->value(); } while ($reader->next() && $reader->depth() > $depth); // Read each sibling. From 9ab19a543cce09733104a26762e0ad6397cdd670 Mon Sep 17 00:00:00 2001 From: Martin Zajic Date: Wed, 13 Nov 2019 14:26:31 +0100 Subject: [PATCH 4/4] fix edge case when copy command for empty files is generated --- src/Storage/ABS/SnowflakeImportAdapter.php | 14 ++++++----- tests/HugeManifest.php | 16 ++++++++++--- .../ABS/SnowflakeImportAdapterTest.php | 23 +++++++++++++++++++ 3 files changed, 44 insertions(+), 9 deletions(-) diff --git a/src/Storage/ABS/SnowflakeImportAdapter.php b/src/Storage/ABS/SnowflakeImportAdapter.php index 01cd0d36..ac0f17c1 100644 --- a/src/Storage/ABS/SnowflakeImportAdapter.php +++ b/src/Storage/ABS/SnowflakeImportAdapter.php @@ -77,12 +77,14 @@ public function getCopyCommands( $entriesInChunk = []; } } - yield $this->getCopyCommand( - $destination, - $importOptions, - $stagingTableName, - $entriesInChunk - ); + if (!empty($entriesInChunk)) { + yield $this->getCopyCommand( + $destination, + $importOptions, + $stagingTableName, + $entriesInChunk + ); + } } /** diff --git a/tests/HugeManifest.php b/tests/HugeManifest.php index 25ceb8cb..169c838d 100644 --- a/tests/HugeManifest.php +++ b/tests/HugeManifest.php @@ -5,16 +5,19 @@ namespace Tests\Keboola\Db\ImportExport; use Keboola\Csv\CsvOptions; +use Keboola\Db\ImportExport\Backend\ImporterInterface; use Keboola\Db\ImportExport\ImportOptions; use Keboola\Db\ImportExport\Storage; use Keboola\Temp\Temp; use MicrosoftAzure\Storage\Common\Internal\Resources; use Symfony\Component\Stopwatch\Stopwatch; +use Webmozart\Assert\Assert; class HugeManifest { private const MANIFEST_FILE_NAME = 'hugeManifest.json'; - private const SLICES_TOTAL = 500 * 1000; + // 500k slices + something extra to test slices count + private const SLICES_TOTAL = (500 * ImporterInterface::SLICED_FILES_CHUNK_SIZE) + 10; use ABSSourceTrait; /** @@ -69,7 +72,7 @@ public function run(): void $this->stopwatch->start('upload'); $this->uploadManifestAndSlices(); $event = $this->stopwatch->stop('upload'); - echo 'max memory upload: ' . $event->getMemory() . PHP_EOL; + echo 'max memory upload: ' . $this->getMemoryForHuman($event->getMemory()) . PHP_EOL; $this->stopwatch->start('commands'); $this->generateCommands(); @@ -108,7 +111,10 @@ private function uploadManifestAndSlices(): void file_get_contents($this->getManifestFileName()) ); - echo sprintf('Manifest file size: %s bytes', filesize($this->getManifestFileName())) . PHP_EOL; + echo sprintf( + 'Manifest file size: %s', + $this->getMemoryForHuman(filesize($this->getManifestFileName())) + ) . PHP_EOL; $this->printMemory(); } @@ -184,12 +190,16 @@ private function generateCommands(): void $options = new ImportOptions(); $adapter = new Storage\ABS\SnowflakeImportAdapter($source); echo 'Generating commands' . PHP_EOL; + $commandsCount = 0; foreach ($adapter->getCopyCommands( $destination, $options, 'stagingTable' ) as $index => $cmd) { + $commandsCount++; $this->stopwatch->lap('commands'); }; + + Assert::eq(501, $commandsCount); } } diff --git a/tests/unit/Storage/ABS/SnowflakeImportAdapterTest.php b/tests/unit/Storage/ABS/SnowflakeImportAdapterTest.php index 47ec467f..6028e50b 100644 --- a/tests/unit/Storage/ABS/SnowflakeImportAdapterTest.php +++ b/tests/unit/Storage/ABS/SnowflakeImportAdapterTest.php @@ -143,4 +143,27 @@ function () use ($files) { $commands->current() ); } + + public function testGetCopyCommandsEmptyManifest(): void + { + /** @var Storage\ABS\SourceFile|MockObject $source */ + $source = self::createMock(Storage\ABS\SourceFile::class); + $source->expects(self::never())->method('getCsvOptions')->willReturn(new CsvOptions()); + $source->expects(self::once())->method('getManifestEntries')->willReturnCallback(function () { + yield from []; + }); + $source->expects(self::never())->method('getContainerUrl')->willReturn('containerUrl'); + $source->expects(self::never())->method('getSasToken')->willReturn('sasToken'); + + $destination = new Storage\Snowflake\Table('schema', 'table'); + $options = new ImportOptions(); + $adapter = new SnowflakeImportAdapter($source); + $commands = $adapter->getCopyCommands( + $destination, + $options, + 'stagingTable' + ); + + self::assertCount(0, iterator_to_array($commands)); + } }