diff --git a/Dockerfile b/Dockerfile index 16ead017..0ee09124 100644 --- a/Dockerfile +++ b/Dockerfile @@ -24,6 +24,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ debsig-verify \ dirmngr \ gpg-agent \ + libicu-dev \ && rm -r /var/lib/apt/lists/* \ && sed -i 's/^# *\(en_US.UTF-8\)/\1/' /etc/locale.gen \ && locale-gen \ @@ -64,6 +65,9 @@ RUN mkdir -p ~/.gnupg \ && gpg --batch --delete-key --yes $SNOWFLAKE_GPG_KEY \ && dpkg -i /tmp/snowflake-odbc.deb +RUN docker-php-ext-configure intl \ + && docker-php-ext-install intl + ## Composer - deps always cached unless changed # First copy only composer files COPY composer.* /code/ diff --git a/composer.json b/composer.json index 1dcb443f..4978b4a4 100644 --- a/composer.json +++ b/composer.json @@ -7,6 +7,7 @@ "keboola/db-adapter-snowflake": "^1.0.0", "keboola/php-component": "^7.0.1", "microsoft/azure-storage-blob": "^1.4", + "pcrov/jsonreader": "^1.0", "symfony/stopwatch": "^4.3" }, "require-dev": { @@ -32,6 +33,7 @@ } }, "scripts": { + "testHuge": "php ./tests/testHugeManifest.php", "tests-unit": "phpunit tests/unit", "tests-functional": "phpunit tests/functional", "tests": [ diff --git a/src/Backend/BackendImportAdapterInterface.php b/src/Backend/BackendImportAdapterInterface.php index 4cd7880f..a184001b 100644 --- a/src/Backend/BackendImportAdapterInterface.php +++ b/src/Backend/BackendImportAdapterInterface.php @@ -4,6 +4,7 @@ namespace Keboola\Db\ImportExport\Backend; +use Generator; use Keboola\Db\ImportExport\ImportOptions; use Keboola\Db\ImportExport\Storage\DestinationInterface; use Keboola\Db\ImportExport\Storage\SourceInterface; @@ -12,12 +13,9 @@ interface BackendImportAdapterInterface { public function __construct(SourceInterface $source); - /** - * @return string[] - */ public function getCopyCommands( DestinationInterface $destination, ImportOptions $importOptions, string $stagingTableName - ): array; + ): Generator; } diff --git a/src/Backend/Snowflake/SnowflakeImportAdapterInterface.php b/src/Backend/Snowflake/SnowflakeImportAdapterInterface.php index c5f23a37..48a00d8d 100644 --- a/src/Backend/Snowflake/SnowflakeImportAdapterInterface.php +++ b/src/Backend/Snowflake/SnowflakeImportAdapterInterface.php @@ -4,6 +4,7 @@ namespace Keboola\Db\ImportExport\Backend\Snowflake; +use Generator; use Keboola\Db\ImportExport\Backend\ImportState; use Keboola\Db\ImportExport\ImportOptions; use Keboola\Db\ImportExport\Backend\BackendImportAdapterInterface; @@ -15,11 +16,10 @@ interface SnowflakeImportAdapterInterface extends BackendImportAdapterInterface /** * Snowflake import is handled differently for copy table2table and file2table * - * @param string[] $commands - sql commands array * @return int - number of imported rows */ public function executeCopyCommands( - array $commands, + Generator $commands, Connection $connection, DestinationInterface $destination, ImportOptions $importOptions, diff --git a/src/Storage/ABS/SnowflakeImportAdapter.php b/src/Storage/ABS/SnowflakeImportAdapter.php index c7cd89c3..ac0f17c1 100644 --- a/src/Storage/ABS/SnowflakeImportAdapter.php +++ b/src/Storage/ABS/SnowflakeImportAdapter.php @@ -4,6 +4,7 @@ namespace Keboola\Db\ImportExport\Storage\ABS; +use Generator; use Keboola\Csv\CsvOptions; use Keboola\Db\ImportExport\Backend\ImporterInterface; use Keboola\Db\ImportExport\Backend\ImportState; @@ -35,7 +36,7 @@ public function __construct(SourceInterface $source) * @param Table $destination */ public function executeCopyCommands( - array $commands, + Generator $commands, Connection $connection, DestinationInterface $destination, ImportOptions $importOptions, @@ -62,33 +63,60 @@ public function getCopyCommands( DestinationInterface $destination, ImportOptions $importOptions, string $stagingTableName - ): array { - $filesToImport = $this->source->getManifestEntries(); - $commands = []; - foreach (array_chunk($filesToImport, ImporterInterface::SLICED_FILES_CHUNK_SIZE) as $entries) { - $commands[] = sprintf( - 'COPY INTO %s.%s + ): Generator { + $entriesInChunk = []; + foreach ($this->source->getManifestEntries() as $entry) { + $entriesInChunk[] = $entry; + if (count($entriesInChunk) === ImporterInterface::SLICED_FILES_CHUNK_SIZE) { + yield $this->getCopyCommand( + $destination, + $importOptions, + $stagingTableName, + $entriesInChunk + ); + $entriesInChunk = []; + } + } + if (!empty($entriesInChunk)) { + yield $this->getCopyCommand( + $destination, + $importOptions, + $stagingTableName, + $entriesInChunk + ); + } + } + + /** + * @param Table $destination + */ + private function getCopyCommand( + DestinationInterface $destination, + ImportOptions $importOptions, + string $stagingTableName, + array $entriesInChunk + ): string { + return sprintf( + 'COPY INTO %s.%s FROM %s CREDENTIALS=(AZURE_SAS_TOKEN=\'%s\') FILE_FORMAT = (TYPE=CSV %s) FILES = (%s)', - QueryBuilder::quoteIdentifier($destination->getSchema()), - QueryBuilder::quoteIdentifier($stagingTableName), - QueryBuilder::quote($this->source->getContainerUrl()), - $this->source->getSasToken(), - implode(' ', $this->getCsvCopyCommandOptions($importOptions, $this->source->getCsvOptions())), - implode( - ', ', - array_map( - function ($entry) { - return QueryBuilder::quote(strtr($entry, [$this->source->getContainerUrl() => ''])); - }, - $entries - ) + QueryBuilder::quoteIdentifier($destination->getSchema()), + QueryBuilder::quoteIdentifier($stagingTableName), + QueryBuilder::quote($this->source->getContainerUrl()), + $this->source->getSasToken(), + implode(' ', $this->getCsvCopyCommandOptions($importOptions, $this->source->getCsvOptions())), + implode( + ', ', + array_map( + function ($entry) { + return QueryBuilder::quote(strtr($entry['url'], [$this->source->getContainerUrl() => ''])); + }, + $entriesInChunk ) - ); - } - return $commands; + ) + ); } private function getCsvCopyCommandOptions( diff --git a/src/Storage/ABS/SourceFile.php b/src/Storage/ABS/SourceFile.php index 5597de9c..23c3f36b 100644 --- a/src/Storage/ABS/SourceFile.php +++ b/src/Storage/ABS/SourceFile.php @@ -4,10 +4,12 @@ namespace Keboola\Db\ImportExport\Storage\ABS; +use Generator; use Keboola\Csv\CsvOptions; use Keboola\Db\ImportExport\Backend\BackendImportAdapterInterface; use Keboola\Db\ImportExport\Backend\ImporterInterface; use Keboola\Db\ImportExport\Backend\Snowflake\Importer as SnowflakeImporter; +use Keboola\Db\ImportExport\Storage\ManifestReader; use Keboola\Db\ImportExport\Storage\NoBackendAdapterException; use Keboola\Db\ImportExport\Storage\SourceInterface; use MicrosoftAzure\Storage\Blob\BlobRestProxy; @@ -54,10 +56,11 @@ public function getCsvOptions(): CsvOptions return $this->csvOptions; } - public function getManifestEntries(): array + public function getManifestEntries(): Generator { if (!$this->isSliced) { - return [$this->getContainerUrl() . $this->filePath]; + yield from [['url' => $this->getContainerUrl() . $this->filePath]]; + return; } $SASConnectionString = sprintf( @@ -74,9 +77,6 @@ public function getManifestEntries(): array ); $getResult = $blobClient->getBlob($this->container, $this->filePath); - $manifest = \GuzzleHttp\json_decode(stream_get_contents($getResult->getContentStream()), true); - return array_map(function ($entry) { - return $entry['url']; - }, $manifest['entries']); + yield from ManifestReader::readEntries($getResult->getContentStream()); } } diff --git a/src/Storage/ManifestReader.php b/src/Storage/ManifestReader.php new file mode 100644 index 00000000..60d92af4 --- /dev/null +++ b/src/Storage/ManifestReader.php @@ -0,0 +1,35 @@ +stream($stream); + + $reader->read('entries'); + $depth = $reader->depth(); // Check in a moment to break when the array is done. + + $reader->read(); // Step to the first element. + do { + if ($reader->value() === null) { + // on empty entries lib will return null item + continue; + } + yield $reader->value(); + } while ($reader->next() && $reader->depth() > $depth); // Read each sibling. + + $reader->close(); + } +} diff --git a/src/Storage/Snowflake/SnowflakeImportAdapter.php b/src/Storage/Snowflake/SnowflakeImportAdapter.php index 3e0be8d2..4a241a6b 100644 --- a/src/Storage/Snowflake/SnowflakeImportAdapter.php +++ b/src/Storage/Snowflake/SnowflakeImportAdapter.php @@ -4,9 +4,9 @@ namespace Keboola\Db\ImportExport\Storage\Snowflake; +use Generator; use Keboola\Db\ImportExport\Backend\ImportState; use Keboola\Db\ImportExport\ImportOptions; -use Keboola\Db\ImportExport\Backend\Snowflake\Helper\QuoteHelper; use Keboola\Db\ImportExport\Backend\Snowflake\SnowflakeImportAdapterInterface; use Keboola\Db\ImportExport\Storage\DestinationInterface; use Keboola\Db\ImportExport\Storage\SourceInterface; @@ -32,14 +32,14 @@ public function __construct(SourceInterface $source) * @param Table $destination */ public function executeCopyCommands( - array $commands, + Generator $commands, Connection $connection, DestinationInterface $destination, ImportOptions $importOptions, ImportState $importState ): int { $importState->startTimer('copyToStaging'); - $connection->query($commands[0]); + $connection->query($commands->current()); $rows = $connection->fetchAll(sprintf( 'SELECT COUNT(*) AS "count" FROM %s.%s', QueryBuilder::quoteIdentifier($destination->getSchema()), @@ -56,7 +56,7 @@ public function getCopyCommands( DestinationInterface $destination, ImportOptions $importOptions, string $stagingTableName - ): array { + ): Generator { $quotedColumns = array_map(function ($column) { return QueryBuilder::quoteIdentifier($column); }, $importOptions->getColumns()); @@ -75,6 +75,6 @@ public function getCopyCommands( QueryBuilder::quoteIdentifier($this->source->getTableName()), ); - return [$sql]; + yield $sql; } } diff --git a/src/Storage/Snowflake/Table.php b/src/Storage/Snowflake/Table.php index 5d738612..e027c77f 100644 --- a/src/Storage/Snowflake/Table.php +++ b/src/Storage/Snowflake/Table.php @@ -8,7 +8,6 @@ use Keboola\Db\ImportExport\Backend\BackendImportAdapterInterface; use Keboola\Db\ImportExport\Backend\ExporterInterface; use Keboola\Db\ImportExport\Backend\ImporterInterface; -use Keboola\Db\ImportExport\Backend\Snowflake\Helper\QuoteHelper; use Keboola\Db\ImportExport\Backend\Snowflake\Importer as SnowflakeImporter; use Keboola\Db\ImportExport\Storage\DestinationInterface; use Keboola\Db\ImportExport\Storage\NoBackendAdapterException; diff --git a/tests/HugeManifest.php b/tests/HugeManifest.php new file mode 100644 index 00000000..169c838d --- /dev/null +++ b/tests/HugeManifest.php @@ -0,0 +1,205 @@ +accountName = $accountName; + $this->containerName = $containerName; + $this->loader = new AbsLoader($accountName, $containerName); + } + + public function run(): void + { + $this->temp = new Temp(); + $this->temp->initRunFolder(); + + $this->stopwatch = new Stopwatch(); + $this->stopwatch->start('test'); + $this->loader->deleteContainer(); + $this->loader->createContainer(); + + $this->stopwatch->start('upload'); + $this->uploadManifestAndSlices(); + $event = $this->stopwatch->stop('upload'); + echo 'max memory upload: ' . $this->getMemoryForHuman($event->getMemory()) . PHP_EOL; + + $this->stopwatch->start('commands'); + $this->generateCommands(); + $event = $this->stopwatch->stop('commands'); + echo 'max memory commands: ' . $this->getMemoryForHuman($event->getMemory()) . PHP_EOL; + + $this->loader->deleteContainer(); + $event = $this->stopwatch->stop('test'); + echo 'max memory: ' . $this->getMemoryForHuman($event->getMemory()) . PHP_EOL; + } + + private function uploadManifestAndSlices(): void + { + $this->printMemory(); + $manifest = $this->openManifestFile(); + + echo 'Generating manifest' . PHP_EOL; + for ($i = 0; $i <= self::SLICES_TOTAL; $i++) { + $sliceName = sprintf('my_awesome_long_name_slice.csv_%d', $i); + fwrite($manifest, sprintf( + '{"url":"%s"}%s' . PHP_EOL, + $this->getAbsUrl($sliceName), + $i === self::SLICES_TOTAL ? '' : ',' + )); + } + + $this->closeManifestFile($manifest); + + echo PHP_EOL; + + echo 'Uploading manifest' . PHP_EOL; + + $this->loader->getBlobService()->createBlockBlob( + $this->containerName, + self::MANIFEST_FILE_NAME, + file_get_contents($this->getManifestFileName()) + ); + + echo sprintf( + 'Manifest file size: %s', + $this->getMemoryForHuman(filesize($this->getManifestFileName())) + ) . PHP_EOL; + $this->printMemory(); + } + + private function printMemory(): void + { + $memUsage = memory_get_usage(true); + + echo $this->getMemoryForHuman($memUsage); + + echo PHP_EOL; + } + + private function getMemoryForHuman(int $memUsage): string + { + if ($memUsage < 1024) { + return $memUsage . ' bytes'; + } elseif ($memUsage < 1048576) { + return round($memUsage / 1024, 2) . ' kilobytes'; + } else { + return round($memUsage / 1048576, 2) . ' megabytes'; + } + } + + /** + * @return false|resource + */ + private function openManifestFile() + { + file_put_contents($this->getManifestFileName(), '{"entries":[' . PHP_EOL); + return fopen($this->getManifestFileName(), 'a'); + } + + private function getManifestFileName(): string + { + if ($this->manifestFile === null) { + $this->manifestFile = $this->temp->getTmpFolder() . '/' . self::MANIFEST_FILE_NAME; + } + + return $this->manifestFile; + } + + private function getAbsUrl(string $fileName): string + { + return sprintf( + 'azure://%s.%s/%s/%s', + $this->accountName, + Resources::BLOB_BASE_DNS_NAME, + $this->containerName, + $fileName + ); + } + + /** + * @param resource $resource + */ + private function closeManifestFile($resource): void + { + fwrite($resource, ']}'); + fclose($resource); + } + + private function generateCommands(): void + { + $source = new Storage\ABS\SourceFile( + $this->containerName, + self::MANIFEST_FILE_NAME, + $this->getCredentialsForAzureContainer($this->containerName), + $this->accountName, + new CsvOptions, + true + ); + $destination = new Storage\Snowflake\Table('schema', 'table'); + $options = new ImportOptions(); + $adapter = new Storage\ABS\SnowflakeImportAdapter($source); + echo 'Generating commands' . PHP_EOL; + $commandsCount = 0; + foreach ($adapter->getCopyCommands( + $destination, + $options, + 'stagingTable' + ) as $index => $cmd) { + $commandsCount++; + $this->stopwatch->lap('commands'); + }; + + Assert::eq(501, $commandsCount); + } +} diff --git a/tests/testHugeManifest.php b/tests/testHugeManifest.php new file mode 100644 index 00000000..0861dbec --- /dev/null +++ b/tests/testHugeManifest.php @@ -0,0 +1,24 @@ +run(); diff --git a/tests/unit/SourceStorage/ABS/BaseFileTest.php b/tests/unit/Storage/ABS/BaseFileTest.php similarity index 100% rename from tests/unit/SourceStorage/ABS/BaseFileTest.php rename to tests/unit/Storage/ABS/BaseFileTest.php diff --git a/tests/unit/SourceStorage/ABS/DestinationFileTest.php b/tests/unit/Storage/ABS/DestinationFileTest.php similarity index 100% rename from tests/unit/SourceStorage/ABS/DestinationFileTest.php rename to tests/unit/Storage/ABS/DestinationFileTest.php diff --git a/tests/unit/SourceStorage/ABS/SnowflakeExportAdapterTest.php b/tests/unit/Storage/ABS/SnowflakeExportAdapterTest.php similarity index 100% rename from tests/unit/SourceStorage/ABS/SnowflakeExportAdapterTest.php rename to tests/unit/Storage/ABS/SnowflakeExportAdapterTest.php diff --git a/tests/unit/SourceStorage/ABS/SnowflakeImportAdapterTest.php b/tests/unit/Storage/ABS/SnowflakeImportAdapterTest.php similarity index 73% rename from tests/unit/SourceStorage/ABS/SnowflakeImportAdapterTest.php rename to tests/unit/Storage/ABS/SnowflakeImportAdapterTest.php index 196a2bba..6028e50b 100644 --- a/tests/unit/SourceStorage/ABS/SnowflakeImportAdapterTest.php +++ b/tests/unit/Storage/ABS/SnowflakeImportAdapterTest.php @@ -4,6 +4,7 @@ namespace Tests\Keboola\Db\ImportExportUnit\Storage\ABS; +use Generator; use Keboola\Csv\CsvOptions; use Keboola\Db\ImportExport\Backend\ImportState; use Keboola\Db\ImportExport\ImportOptions; @@ -30,9 +31,14 @@ public function testExecuteCopyCommands(): void /** @var ImportOptions|MockObject $options */ $options = self::createMock(ImportOptions::class); + $generator = function (): Generator { + yield 'cmd1'; + yield 'cmd2'; + }; + $adapter = new SnowflakeImportAdapter($source); $rows = $adapter->executeCopyCommands( - ['cmd1', 'cmd2'], + $generator(), $connection, new Storage\Snowflake\Table('', ''), $options, @@ -47,7 +53,9 @@ public function testGetCopyCommands(): void /** @var Storage\ABS\SourceFile|MockObject $source */ $source = self::createMock(Storage\ABS\SourceFile::class); $source->expects(self::once())->method('getCsvOptions')->willReturn(new CsvOptions()); - $source->expects(self::once())->method('getManifestEntries')->willReturn(['azure://url']); + $source->expects(self::once())->method('getManifestEntries')->willReturnCallback(function () { + yield ['url' => 'azure://url']; + }); $source->expects(self::exactly(2))->method('getContainerUrl')->willReturn('containerUrl'); $source->expects(self::once())->method('getSasToken')->willReturn('sasToken'); @@ -60,7 +68,7 @@ public function testGetCopyCommands(): void 'stagingTable' ); - self::assertSame([ + self::assertSame( <<current() + ); } public function testGetCopyCommandsChunk(): void @@ -82,7 +90,13 @@ public function testGetCopyCommandsChunk(): void /** @var Storage\ABS\SourceFile|MockObject $source */ $source = self::createMock(Storage\ABS\SourceFile::class); $source->expects(self::exactly(2))->method('getCsvOptions')->willReturn(new CsvOptions()); - $source->expects(self::exactly(1))->method('getManifestEntries')->willReturn($files); + $source->expects(self::exactly(1))->method('getManifestEntries')->willReturnCallback( + function () use ($files) { + foreach ($files as $file) { + yield ['url' => $file]; + } + } + ); $source->expects(self::exactly(1502/*Called for each entry plus 2times*/)) ->method('getContainerUrl')->willReturn('containerUrl'); $source->expects(self::exactly(2))->method('getSasToken')->willReturn('sasToken'); @@ -105,7 +119,7 @@ public function testGetCopyCommandsChunk(): void return sprintf("'%s'", $file); }, $cmd2Files)); - self::assertSame([ + self::assertSame( <<current() + ); + + $commands->next(); + + self::assertSame( <<current() + ); + } + + public function testGetCopyCommandsEmptyManifest(): void + { + /** @var Storage\ABS\SourceFile|MockObject $source */ + $source = self::createMock(Storage\ABS\SourceFile::class); + $source->expects(self::never())->method('getCsvOptions')->willReturn(new CsvOptions()); + $source->expects(self::once())->method('getManifestEntries')->willReturnCallback(function () { + yield from []; + }); + $source->expects(self::never())->method('getContainerUrl')->willReturn('containerUrl'); + $source->expects(self::never())->method('getSasToken')->willReturn('sasToken'); + + $destination = new Storage\Snowflake\Table('schema', 'table'); + $options = new ImportOptions(); + $adapter = new SnowflakeImportAdapter($source); + $commands = $adapter->getCopyCommands( + $destination, + $options, + 'stagingTable' + ); + + self::assertCount(0, iterator_to_array($commands)); } } diff --git a/tests/unit/SourceStorage/ABS/SourceFileTest.php b/tests/unit/Storage/ABS/SourceFileTest.php similarity index 91% rename from tests/unit/SourceStorage/ABS/SourceFileTest.php rename to tests/unit/Storage/ABS/SourceFileTest.php index 22a43d9e..52964a39 100644 --- a/tests/unit/SourceStorage/ABS/SourceFileTest.php +++ b/tests/unit/Storage/ABS/SourceFileTest.php @@ -62,15 +62,17 @@ public function testGetManifestEntries(): void { $source = $this->createDummyABSSourceInstance('empty.csv'); self::assertSame( - ['azure://absAccount.blob.core.windows.net/absContainer/empty.csv'], - $source->getManifestEntries() + [ + ['url' => 'azure://absAccount.blob.core.windows.net/absContainer/empty.csv'], + ], + iterator_to_array($source->getManifestEntries()) ); } public function testGetManifestEntriesIncremental(): void { $source = $this->createABSSourceInstance('sliced/accounts/accounts.csvmanifest', true); - $entries = $source->getManifestEntries(); + $entries = iterator_to_array($source->getManifestEntries()); self::assertCount(2, $entries); } } diff --git a/tests/unit/SourceStorage/Snowflake/SnowflakeAdapterTest.php b/tests/unit/Storage/Snowflake/SnowflakeAdapterTest.php similarity index 91% rename from tests/unit/SourceStorage/Snowflake/SnowflakeAdapterTest.php rename to tests/unit/Storage/Snowflake/SnowflakeAdapterTest.php index 0554d10c..70d5964a 100644 --- a/tests/unit/SourceStorage/Snowflake/SnowflakeAdapterTest.php +++ b/tests/unit/Storage/Snowflake/SnowflakeAdapterTest.php @@ -4,6 +4,7 @@ namespace Tests\Keboola\Db\ImportExportUnit\Storage\Snowflake; +use Generator; use Keboola\Db\ImportExport\Backend\ImportState; use Keboola\Db\ImportExport\ImportOptions; use Keboola\Db\ImportExport\Storage\Snowflake\SnowflakeImportAdapter; @@ -26,9 +27,13 @@ public function testExecuteCopyCommands(): void /** @var ImportOptions|MockObject $options */ $options = self::createMock(ImportOptions::class); + $generator = function (): Generator { + yield 'cmd1'; + }; + $adapter = new SnowflakeImportAdapter(new Storage\Snowflake\Table('schema', 'table')); $rows = $adapter->executeCopyCommands( - ['cmd1'], + $generator(), $connection, new Storage\Snowflake\Table('schema', 'table'), $options, @@ -54,8 +59,9 @@ public function testGetCopyCommands(): void 'stagingTable' ); - self::assertSame([ + self::assertSame( 'INSERT INTO "schema"."stagingTable" ("col1", "col2") SELECT "col1", "col2" FROM "schema"."table"', - ], $commands); + $commands->current() + ); } } diff --git a/tests/unit/SourceStorage/Snowflake/TableTest.php b/tests/unit/Storage/Snowflake/TableTest.php similarity index 100% rename from tests/unit/SourceStorage/Snowflake/TableTest.php rename to tests/unit/Storage/Snowflake/TableTest.php