Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
debsig-verify \
dirmngr \
gpg-agent \
libicu-dev \
&& rm -r /var/lib/apt/lists/* \
&& sed -i 's/^# *\(en_US.UTF-8\)/\1/' /etc/locale.gen \
&& locale-gen \
Expand Down Expand Up @@ -64,6 +65,9 @@ RUN mkdir -p ~/.gnupg \
&& gpg --batch --delete-key --yes $SNOWFLAKE_GPG_KEY \
&& dpkg -i /tmp/snowflake-odbc.deb

RUN docker-php-ext-configure intl \
&& docker-php-ext-install intl

## Composer - deps always cached unless changed
# First copy only composer files
COPY composer.* /code/
Expand Down
2 changes: 2 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"keboola/db-adapter-snowflake": "^1.0.0",
"keboola/php-component": "^7.0.1",
"microsoft/azure-storage-blob": "^1.4",
"pcrov/jsonreader": "^1.0",
"symfony/stopwatch": "^4.3"
},
"require-dev": {
Expand All @@ -32,6 +33,7 @@
}
},
"scripts": {
"testHuge": "php ./tests/testHugeManifest.php",
"tests-unit": "phpunit tests/unit",
"tests-functional": "phpunit tests/functional",
"tests": [
Expand Down
6 changes: 2 additions & 4 deletions src/Backend/BackendImportAdapterInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Keboola\Db\ImportExport\Backend;

use Generator;
use Keboola\Db\ImportExport\ImportOptions;
use Keboola\Db\ImportExport\Storage\DestinationInterface;
use Keboola\Db\ImportExport\Storage\SourceInterface;
Expand All @@ -12,12 +13,9 @@ interface BackendImportAdapterInterface
{
public function __construct(SourceInterface $source);

/**
* @return string[]
*/
public function getCopyCommands(
DestinationInterface $destination,
ImportOptions $importOptions,
string $stagingTableName
): array;
): Generator;
}
4 changes: 2 additions & 2 deletions src/Backend/Snowflake/SnowflakeImportAdapterInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Keboola\Db\ImportExport\Backend\Snowflake;

use Generator;
use Keboola\Db\ImportExport\Backend\ImportState;
use Keboola\Db\ImportExport\ImportOptions;
use Keboola\Db\ImportExport\Backend\BackendImportAdapterInterface;
Expand All @@ -15,11 +16,10 @@ interface SnowflakeImportAdapterInterface extends BackendImportAdapterInterface
/**
* Snowflake import is handled differently for copy table2table and file2table
*
* @param string[] $commands - sql commands array
* @return int - number of imported rows
*/
public function executeCopyCommands(
array $commands,
Generator $commands,
Connection $connection,
DestinationInterface $destination,
ImportOptions $importOptions,
Expand Down
74 changes: 51 additions & 23 deletions src/Storage/ABS/SnowflakeImportAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Keboola\Db\ImportExport\Storage\ABS;

use Generator;
use Keboola\Csv\CsvOptions;
use Keboola\Db\ImportExport\Backend\ImporterInterface;
use Keboola\Db\ImportExport\Backend\ImportState;
Expand Down Expand Up @@ -35,7 +36,7 @@ public function __construct(SourceInterface $source)
* @param Table $destination
*/
public function executeCopyCommands(
array $commands,
Generator $commands,
Connection $connection,
DestinationInterface $destination,
ImportOptions $importOptions,
Expand All @@ -62,33 +63,60 @@ public function getCopyCommands(
DestinationInterface $destination,
ImportOptions $importOptions,
string $stagingTableName
): array {
$filesToImport = $this->source->getManifestEntries();
$commands = [];
foreach (array_chunk($filesToImport, ImporterInterface::SLICED_FILES_CHUNK_SIZE) as $entries) {
$commands[] = sprintf(
'COPY INTO %s.%s
): Generator {
$entriesInChunk = [];
foreach ($this->source->getManifestEntries() as $entry) {
$entriesInChunk[] = $entry;
if (count($entriesInChunk) === ImporterInterface::SLICED_FILES_CHUNK_SIZE) {
yield $this->getCopyCommand(
$destination,
$importOptions,
$stagingTableName,
$entriesInChunk
);
$entriesInChunk = [];
}
}
if (!empty($entriesInChunk)) {
yield $this->getCopyCommand(
$destination,
$importOptions,
$stagingTableName,
$entriesInChunk
);
}
}

/**
* @param Table $destination
*/
private function getCopyCommand(
DestinationInterface $destination,
ImportOptions $importOptions,
string $stagingTableName,
array $entriesInChunk
): string {
return sprintf(
'COPY INTO %s.%s
FROM %s
CREDENTIALS=(AZURE_SAS_TOKEN=\'%s\')
FILE_FORMAT = (TYPE=CSV %s)
FILES = (%s)',
QueryBuilder::quoteIdentifier($destination->getSchema()),
QueryBuilder::quoteIdentifier($stagingTableName),
QueryBuilder::quote($this->source->getContainerUrl()),
$this->source->getSasToken(),
implode(' ', $this->getCsvCopyCommandOptions($importOptions, $this->source->getCsvOptions())),
implode(
', ',
array_map(
function ($entry) {
return QueryBuilder::quote(strtr($entry, [$this->source->getContainerUrl() => '']));
},
$entries
)
QueryBuilder::quoteIdentifier($destination->getSchema()),
QueryBuilder::quoteIdentifier($stagingTableName),
QueryBuilder::quote($this->source->getContainerUrl()),
$this->source->getSasToken(),
implode(' ', $this->getCsvCopyCommandOptions($importOptions, $this->source->getCsvOptions())),
implode(
', ',
array_map(
function ($entry) {
return QueryBuilder::quote(strtr($entry['url'], [$this->source->getContainerUrl() => '']));
},
$entriesInChunk
)
);
}
return $commands;
)
);
}

private function getCsvCopyCommandOptions(
Expand Down
12 changes: 6 additions & 6 deletions src/Storage/ABS/SourceFile.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

namespace Keboola\Db\ImportExport\Storage\ABS;

use Generator;
use Keboola\Csv\CsvOptions;
use Keboola\Db\ImportExport\Backend\BackendImportAdapterInterface;
use Keboola\Db\ImportExport\Backend\ImporterInterface;
use Keboola\Db\ImportExport\Backend\Snowflake\Importer as SnowflakeImporter;
use Keboola\Db\ImportExport\Storage\ManifestReader;
use Keboola\Db\ImportExport\Storage\NoBackendAdapterException;
use Keboola\Db\ImportExport\Storage\SourceInterface;
use MicrosoftAzure\Storage\Blob\BlobRestProxy;
Expand Down Expand Up @@ -54,10 +56,11 @@ public function getCsvOptions(): CsvOptions
return $this->csvOptions;
}

public function getManifestEntries(): array
public function getManifestEntries(): Generator
{
if (!$this->isSliced) {
return [$this->getContainerUrl() . $this->filePath];
yield from [['url' => $this->getContainerUrl() . $this->filePath]];
return;
}

$SASConnectionString = sprintf(
Expand All @@ -74,9 +77,6 @@ public function getManifestEntries(): array
);

$getResult = $blobClient->getBlob($this->container, $this->filePath);
$manifest = \GuzzleHttp\json_decode(stream_get_contents($getResult->getContentStream()), true);
return array_map(function ($entry) {
return $entry['url'];
}, $manifest['entries']);
yield from ManifestReader::readEntries($getResult->getContentStream());
}
}
35 changes: 35 additions & 0 deletions src/Storage/ManifestReader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

declare(strict_types=1);

namespace Keboola\Db\ImportExport\Storage;

use Generator;
use pcrov\JsonReader\JsonReader;

class ManifestReader
{
/**
* @param resource $stream
*/
public static function readEntries(
$stream
): Generator {
$reader = new JsonReader();
$reader->stream($stream);

$reader->read('entries');
$depth = $reader->depth(); // Check in a moment to break when the array is done.

$reader->read(); // Step to the first element.
do {
if ($reader->value() === null) {
// on empty entries lib will return null item
continue;
}
yield $reader->value();
} while ($reader->next() && $reader->depth() > $depth); // Read each sibling.

$reader->close();
}
}
10 changes: 5 additions & 5 deletions src/Storage/Snowflake/SnowflakeImportAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

namespace Keboola\Db\ImportExport\Storage\Snowflake;

use Generator;
use Keboola\Db\ImportExport\Backend\ImportState;
use Keboola\Db\ImportExport\ImportOptions;
use Keboola\Db\ImportExport\Backend\Snowflake\Helper\QuoteHelper;
use Keboola\Db\ImportExport\Backend\Snowflake\SnowflakeImportAdapterInterface;
use Keboola\Db\ImportExport\Storage\DestinationInterface;
use Keboola\Db\ImportExport\Storage\SourceInterface;
Expand All @@ -32,14 +32,14 @@ public function __construct(SourceInterface $source)
* @param Table $destination
*/
public function executeCopyCommands(
array $commands,
Generator $commands,
Connection $connection,
DestinationInterface $destination,
ImportOptions $importOptions,
ImportState $importState
): int {
$importState->startTimer('copyToStaging');
$connection->query($commands[0]);
$connection->query($commands->current());
$rows = $connection->fetchAll(sprintf(
'SELECT COUNT(*) AS "count" FROM %s.%s',
QueryBuilder::quoteIdentifier($destination->getSchema()),
Expand All @@ -56,7 +56,7 @@ public function getCopyCommands(
DestinationInterface $destination,
ImportOptions $importOptions,
string $stagingTableName
): array {
): Generator {
$quotedColumns = array_map(function ($column) {
return QueryBuilder::quoteIdentifier($column);
}, $importOptions->getColumns());
Expand All @@ -75,6 +75,6 @@ public function getCopyCommands(
QueryBuilder::quoteIdentifier($this->source->getTableName()),
);

return [$sql];
yield $sql;
}
}
1 change: 0 additions & 1 deletion src/Storage/Snowflake/Table.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
use Keboola\Db\ImportExport\Backend\BackendImportAdapterInterface;
use Keboola\Db\ImportExport\Backend\ExporterInterface;
use Keboola\Db\ImportExport\Backend\ImporterInterface;
use Keboola\Db\ImportExport\Backend\Snowflake\Helper\QuoteHelper;
use Keboola\Db\ImportExport\Backend\Snowflake\Importer as SnowflakeImporter;
use Keboola\Db\ImportExport\Storage\DestinationInterface;
use Keboola\Db\ImportExport\Storage\NoBackendAdapterException;
Expand Down
Loading