diff --git a/packages/php-datatypes/src/Definition/Common.php b/packages/php-datatypes/src/Definition/Common.php index 0d421b813..9e163e17a 100644 --- a/packages/php-datatypes/src/Definition/Common.php +++ b/packages/php-datatypes/src/Definition/Common.php @@ -93,10 +93,12 @@ public function toMetadata(): array [ 'key' => self::KBC_METADATA_KEY_TYPE, 'value' => $this->getType(), - ],[ + ], + [ 'key' => self::KBC_METADATA_KEY_NULLABLE, 'value' => $this->isNullable(), - ],[ + ], + [ 'key' => self::KBC_METADATA_KEY_BASETYPE, 'value' => $this->getBasetype(), ], diff --git a/packages/php-datatypes/src/Definition/DefinitionInterface.php b/packages/php-datatypes/src/Definition/DefinitionInterface.php index d328cb3dc..23d523392 100644 --- a/packages/php-datatypes/src/Definition/DefinitionInterface.php +++ b/packages/php-datatypes/src/Definition/DefinitionInterface.php @@ -15,5 +15,13 @@ public function toArray(): array; public function getBasetype(): string; + public function getType(): string; + + public function getLength(): ?string; + + public function isNullable(): bool; + + public function getDefault(): ?string; + public static function getTypeByBasetype(string $basetype): string; } diff --git a/packages/php-db-import-export/src/Backend/Snowflake/SnowflakeException.php b/packages/php-db-import-export/src/Backend/Snowflake/SnowflakeException.php index 5745cdaed..c07dd15ab 100644 --- a/packages/php-db-import-export/src/Backend/Snowflake/SnowflakeException.php +++ b/packages/php-db-import-export/src/Backend/Snowflake/SnowflakeException.php @@ -31,11 +31,24 @@ public static function covertException(Throwable $e): Throwable } // phpcs:ignore - $isNullInNotNullCol = preg_match('/NULL result in a non-nullable column/', $e->getMessage(), $output_array) === 1; - $isNotRecognized = preg_match('/ \'(.*)\' is not recognized/', $e->getMessage(), $output_array) === 1; - if ($isNullInNotNullCol || $isNotRecognized) { + $message = $e->getMessage(); + $isObjectCastFail = preg_match('/Failed to cast variant value .* to OBJECT/', $message, $output_array) === 1; + if ($isObjectCastFail) { + // remove variant from message as it would confuse users + // we are using TO_OBJECT(TO_VARIANT(...)) casting combination + $message = str_replace('variant ', '', $message); + } + $isInvalidGeo = preg_match('/Error parsing Geo input/', $message, $output_array) === 1; + // phpcs:ignore + $isInvalidBinary = preg_match('/The following string is not a legal hex-encoded value/', $message, $output_array) === 1; + $isNullInNotNullCol = preg_match('/NULL result in a non-nullable column/', $message, $output_array) === 1; + $isNotRecognized = preg_match('/ \'(.*)\' is not recognized/', $message, $output_array) === 1; + if ($isNotRecognized) { + $message .= '. Value you are trying to load cannot be converted to used datatype.'; + } + if ($isNullInNotNullCol || $isNotRecognized || $isInvalidBinary || $isInvalidGeo || $isObjectCastFail) { return new Exception( - 'Load error: ' . $e->getMessage(), + 'Load error: ' . $message, Exception::VALUE_CONVERSION, $e ); diff --git a/packages/php-db-import-export/src/Backend/Snowflake/SnowflakeImportOptions.php b/packages/php-db-import-export/src/Backend/Snowflake/SnowflakeImportOptions.php index 9446a8908..58ecc7bb8 100644 --- a/packages/php-db-import-export/src/Backend/Snowflake/SnowflakeImportOptions.php +++ b/packages/php-db-import-export/src/Backend/Snowflake/SnowflakeImportOptions.php @@ -27,7 +27,7 @@ public function __construct( int $numberOfIgnoredLines = 0, bool $requireSameTables = self::SAME_TABLES_NOT_REQUIRED, bool $nullManipulation = self::NULL_MANIPULATION_ENABLED, - array $ignoreColumns = [] + array $ignoreColumns = [], ) { parent::__construct( $convertEmptyValuesToNull, diff --git a/packages/php-db-import-export/src/Backend/Snowflake/ToFinalTable/SqlBuilder.php b/packages/php-db-import-export/src/Backend/Snowflake/ToFinalTable/SqlBuilder.php index a832f88db..4b373f992 100644 --- a/packages/php-db-import-export/src/Backend/Snowflake/ToFinalTable/SqlBuilder.php +++ b/packages/php-db-import-export/src/Backend/Snowflake/ToFinalTable/SqlBuilder.php @@ -8,17 +8,19 @@ use Keboola\Datatype\Definition\Snowflake; use Keboola\Db\ImportExport\Backend\Snowflake\Helper\QuoteHelper; use Keboola\Db\ImportExport\Backend\Snowflake\SnowflakeImportOptions; +use Keboola\Db\ImportExport\Backend\SourceDestinationColumnMap; use Keboola\Db\ImportExport\Backend\ToStageImporterInterface; -use Keboola\Db\ImportExport\ImportOptionsInterface; -use Keboola\Db\ImportExport\Storage\Snowflake\Table; -use Keboola\Db\ImportExport\Storage\SourceInterface; use Keboola\TableBackendUtils\Column\Snowflake\SnowflakeColumn; -use Keboola\TableBackendUtils\Escaping\Exasol\ExasolQuote; use Keboola\TableBackendUtils\Escaping\Snowflake\SnowflakeQuote; use Keboola\TableBackendUtils\Table\Snowflake\SnowflakeTableDefinition; class SqlBuilder { + private const AUTO_CASTING_TYPES = [ + Snowflake::TYPE_VARIANT, + Snowflake::TYPE_OBJECT, + Snowflake::TYPE_ARRAY, + ]; public const SRC_ALIAS = 'src'; public function getBeginTransaction(): string @@ -165,6 +167,11 @@ public function getInsertAllIntoTargetTableCommand( SnowflakeImportOptions $importOptions, string $timestamp ): string { + $columnMap = SourceDestinationColumnMap::createForTables( + $sourceTableDefinition, + $destinationTableDefinition, + $importOptions->ignoreColumns() + ); $destinationTable = sprintf( '%s.%s', SnowflakeQuote::quoteSingleIdentifier($destinationTableDefinition->getSchemaName()), @@ -184,44 +191,67 @@ public function getInsertAllIntoTargetTableCommand( $columnsSetSql = []; - /** @var SnowflakeColumn $columnDefinition */ - foreach ($sourceTableDefinition->getColumnsDefinitions() as $columnDefinition) { + /** @var SnowflakeColumn $sourceColumn */ + foreach ($sourceTableDefinition->getColumnsDefinitions() as $sourceColumn) { // output mapping same tables are required do not convert nulls to empty strings if (!$importOptions->isNullManipulationEnabled()) { - $columnsSetSql[] = SnowflakeQuote::quoteSingleIdentifier($columnDefinition->getColumnName()); + $destinationColumn = $columnMap->getDestination($sourceColumn); + $type = $destinationColumn->getColumnDefinition()->getType(); + $useAutoCast = in_array($type, self::AUTO_CASTING_TYPES, true); + $isSameType = $type === $sourceColumn->getColumnDefinition()->getType(); + if ($useAutoCast && !$isSameType) { + if ($type === Snowflake::TYPE_OBJECT) { + // object can't be casted from string but can be casted from variant + $columnsSetSql[] = sprintf( + 'CAST(TO_VARIANT(%s) AS %s) AS %s', + SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()), + $destinationColumn->getColumnDefinition()->getSQLDefinition(), + SnowflakeQuote::quoteSingleIdentifier($destinationColumn->getColumnName()) + ); + continue; + } + $columnsSetSql[] = sprintf( + 'CAST(%s AS %s) AS %s', + SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()), + $destinationColumn->getColumnDefinition()->getSQLDefinition(), + SnowflakeQuote::quoteSingleIdentifier($destinationColumn->getColumnName()) + ); + continue; + } + $columnsSetSql[] = SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()); continue; } // Input mapping convert empty values to null // empty strings '' are converted to null values - if (in_array($columnDefinition->getColumnName(), $importOptions->getConvertEmptyValuesToNull(), true)) { + if (in_array($sourceColumn->getColumnName(), $importOptions->getConvertEmptyValuesToNull(), true)) { // use nullif only for string base type - if ($columnDefinition->getColumnDefinition()->getBasetype() === BaseType::STRING) { + if ($sourceColumn->getColumnDefinition()->getBasetype() === BaseType::STRING) { $columnsSetSql[] = sprintf( 'IFF(%s = \'\', NULL, %s)', - SnowflakeQuote::quoteSingleIdentifier($columnDefinition->getColumnName()), - SnowflakeQuote::quoteSingleIdentifier($columnDefinition->getColumnName()) + SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()), + SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()) ); continue; } // if tables is not typed column could be other than string in this case we skip conversion - $columnsSetSql[] = SnowflakeQuote::quoteSingleIdentifier($columnDefinition->getColumnName()); + $columnsSetSql[] = SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()); continue; } // for string base type convert null values to empty string '' //phpcs:ignore - if (!$importOptions->usingUserDefinedTypes() && $columnDefinition->getColumnDefinition()->getBasetype() === BaseType::STRING) { + if (!$importOptions->usingUserDefinedTypes() && $sourceColumn->getColumnDefinition()->getBasetype() === BaseType::STRING) { $columnsSetSql[] = sprintf( 'COALESCE(%s, \'\') AS %s', - SnowflakeQuote::quoteSingleIdentifier($columnDefinition->getColumnName()), - SnowflakeQuote::quoteSingleIdentifier($columnDefinition->getColumnName()) + SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()), + SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()) ); continue; } // on columns other than string dont use COALESCE // this will fail if the column is not null, but this is expected - $columnsSetSql[] = SnowflakeQuote::quoteSingleIdentifier($columnDefinition->getColumnName()); + $columnsSetSql[] = SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()); } if ($useTimestamp) { @@ -256,29 +286,58 @@ public function getUpdateWithPkCommand( SnowflakeImportOptions $importOptions, string $timestamp ): string { + $columnMap = SourceDestinationColumnMap::createForTables( + $stagingTableDefinition, + $destinationDefinition, + $importOptions->ignoreColumns() + ); $columnsSet = []; - foreach ($stagingTableDefinition->getColumnsNames() as $columnName) { + foreach ($stagingTableDefinition->getColumnsDefinitions() as $sourceColumn) { if (!$importOptions->isNullManipulationEnabled()) { + $destinationColumn = $columnMap->getDestination($sourceColumn); + $type = $destinationColumn->getColumnDefinition()->getType(); + $useAutoCast = in_array($type, self::AUTO_CASTING_TYPES, true); + $isSameType = $type === $sourceColumn->getColumnDefinition()->getType(); + if ($useAutoCast && !$isSameType) { + if ($type === Snowflake::TYPE_OBJECT) { + // object can't be casted from string but can be casted from variant + $columnsSet[] = sprintf( + '%s = CAST(TO_VARIANT("src".%s) AS %s)', + SnowflakeQuote::quoteSingleIdentifier($destinationColumn->getColumnName()), + SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()), + $destinationColumn->getColumnDefinition()->getSQLDefinition(), + ); + continue; + } + $columnsSet[] = sprintf( + '%s = CAST("src".%s AS %s)', + SnowflakeQuote::quoteSingleIdentifier($destinationColumn->getColumnName()), + SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()), + $destinationColumn->getColumnDefinition()->getSQLDefinition(), + ); + continue; + } + $columnsSet[] = sprintf( '%s = "src".%s', - SnowflakeQuote::quoteSingleIdentifier($columnName), - SnowflakeQuote::quoteSingleIdentifier($columnName), + SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()), + SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()), ); continue; } - if (in_array($columnName, $importOptions->getConvertEmptyValuesToNull(), true)) { + if (in_array($sourceColumn->getColumnName(), $importOptions->getConvertEmptyValuesToNull(), true)) { $columnsSet[] = sprintf( '%s = IFF("src".%s = \'\', NULL, "src".%s)', - SnowflakeQuote::quoteSingleIdentifier($columnName), - SnowflakeQuote::quoteSingleIdentifier($columnName), - SnowflakeQuote::quoteSingleIdentifier($columnName) + SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()), + SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()), + SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()) ); } else { $columnsSet[] = sprintf( '%s = COALESCE("src".%s, \'\')', - SnowflakeQuote::quoteSingleIdentifier($columnName), - SnowflakeQuote::quoteSingleIdentifier($columnName) + SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()), + SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()) ); } } diff --git a/packages/php-db-import-export/src/Backend/SourceDestinationColumnMap.php b/packages/php-db-import-export/src/Backend/SourceDestinationColumnMap.php new file mode 100644 index 000000000..b566802f0 --- /dev/null +++ b/packages/php-db-import-export/src/Backend/SourceDestinationColumnMap.php @@ -0,0 +1,118 @@ + + */ + private WeakMap $map; + + /** + * @param string[] $ignoreColumns + */ + public function __construct( + private readonly ColumnCollection $source, + private readonly ColumnCollection $destination, + private readonly array $ignoreColumns = [], + ) { + $this->map = new WeakMap(); + $this->buildMap(); + } + + /** + * @param string[] $ignoreColumns + */ + public static function createForTables( + TableDefinitionInterface $source, + TableDefinitionInterface $destination, + array $ignoreColumns = [], + ): self { + return new self( + $source->getColumnsDefinitions(), + $destination->getColumnsDefinitions(), + $ignoreColumns + ); + } + + private function buildMap(): void + { + $it0 = $this->source->getIterator(); + $it1 = $this->destination->getIterator(); + while ($it0->valid() || $it1->valid()) { + $it0 = $this->ignoreColumn($it0, $it1); + if ($it0 === false) { + break; + } + $it1 = $this->ignoreColumn($it1, $it0); + if ($it1 === false) { + break; + } + + if ($it0->valid() && $it1->valid()) { + /** @var ColumnInterface $sourceCol */ + $sourceCol = $it0->current(); + /** @var ColumnInterface $destCol */ + $destCol = $it1->current(); + $this->map[$sourceCol] = $destCol; + } else { + throw ColumnsMismatchException::createColumnsCountMismatch($this->source, $this->destination); + } + $it0->next(); + $it1->next(); + } + } + + public function getDestination(ColumnInterface $source): ColumnInterface + { + try { + $destination = $this->map[$source]; + } catch (Error $e) { + // this can happen only when class is used with different source and destination tables instances + throw new Exception(sprintf('Column "%s" not found in destination table', $source->getColumnName())); + } + assert($destination !== null); + return $destination; + } + + /** + * @param Generator $it0 + * @param Generator $it1 + * @return Generator|false + */ + private function ignoreColumn(Generator $it0, Generator $it1): Generator|false + { + if ($this->isIgnoredColumn($it0)) { + $it0->next(); + $this->ignoreColumn($it0, $it1); + if (!$it0->valid() && !$it1->valid()) { + return false; + } + } + + return $it0; + } + + /** + * @param Generator $it + */ + private function isIgnoredColumn(Generator $it): bool + { + return $it->valid() && in_array($it->current()->getColumnName(), $this->ignoreColumns, true); + } +} diff --git a/packages/php-db-import-export/tests/functional/Backend/Snowflake/SnowflakeExceptionTest.php b/packages/php-db-import-export/tests/functional/Backend/Snowflake/SnowflakeExceptionTest.php index b3a7e0f02..6157a661c 100644 --- a/packages/php-db-import-export/tests/functional/Backend/Snowflake/SnowflakeExceptionTest.php +++ b/packages/php-db-import-export/tests/functional/Backend/Snowflake/SnowflakeExceptionTest.php @@ -58,7 +58,8 @@ public function provideExceptions(): Generator yield 'value conversion' => [ "Numeric value 'male' is not recognized", ImportException::class, - "Load error: Numeric value 'male' is not recognized", + // phpcs:ignore + "Load error: Numeric value 'male' is not recognized. Value you are trying to load cannot be converted to used datatype.", 13, // VALUE_CONVERSION true, ]; @@ -66,7 +67,8 @@ public function provideExceptions(): Generator yield 'value conversion 2' => [ "Numeric value 'ma\'le' is not recognized", ImportException::class, - "Load error: Numeric value 'ma\'le' is not recognized", + // phpcs:ignore + "Load error: Numeric value 'ma\'le' is not recognized. Value you are trying to load cannot be converted to used datatype.", 13, // VALUE_CONVERSION true, ]; @@ -108,5 +110,29 @@ public function provideExceptions(): Generator 11, // ROW_SIZE_TOO_LARGE true, ]; + + yield 'GEO casting error' => [ + 'Error parsing Geo input: xxx. Did not recognize valid GeoJSON, (E)WKT or (E)WKB.', + ImportException::class, + 'Load error: Error parsing Geo input: xxx. Did not recognize valid GeoJSON, (E)WKT or (E)WKB.', + 13, // VALUE_CONVERSION + true, + ]; + + yield 'OBJECT casting error' => [ + 'An exception occurred while executing a query: Failed to cast variant value "xxx" to OBJECT', + ImportException::class, + 'Load error: An exception occurred while executing a query: Failed to cast value "xxx" to OBJECT', + 13, // VALUE_CONVERSION + true, + ]; + + yield 'Binary casting error' => [ + "The following string is not a legal hex-encoded value: 'xxx'", + ImportException::class, + "Load error: The following string is not a legal hex-encoded value: 'xxx'", + 13, // VALUE_CONVERSION + true, + ]; } } diff --git a/packages/php-db-import-export/tests/functional/Snowflake/SnowflakeBaseTestCase.php b/packages/php-db-import-export/tests/functional/Snowflake/SnowflakeBaseTestCase.php index d4588d2a5..166b7fa5b 100644 --- a/packages/php-db-import-export/tests/functional/Snowflake/SnowflakeBaseTestCase.php +++ b/packages/php-db-import-export/tests/functional/Snowflake/SnowflakeBaseTestCase.php @@ -9,6 +9,7 @@ use Doctrine\DBAL\Logging\Middleware; use Exception; use Keboola\Db\ImportExport\Backend\Snowflake\SnowflakeImportOptions; +use Keboola\Db\ImportExport\Backend\ToStageImporterInterface; use Keboola\Db\ImportExport\ImportOptions; use Keboola\Db\ImportExport\Storage\SourceInterface; use Keboola\TableBackendUtils\Connection\Snowflake\SnowflakeConnectionFactory; @@ -403,10 +404,11 @@ protected function getSnowflakeImportOptions( bool $useTimeStamp = true ): SnowflakeImportOptions { return new SnowflakeImportOptions( - [], - false, - $useTimeStamp, - $skipLines + convertEmptyValuesToNull: [], + isIncremental: false, + useTimestamp: $useTimeStamp, + numberOfIgnoredLines: $skipLines, + ignoreColumns: [ToStageImporterInterface::TIMESTAMP_COLUMN_NAME] ); } @@ -472,10 +474,11 @@ protected function getSimpleImportOptions( int $skipLines = ImportOptions::SKIP_FIRST_LINE ): SnowflakeImportOptions { return new SnowflakeImportOptions( - [], - false, - true, - $skipLines + convertEmptyValuesToNull: [], + isIncremental: false, + useTimestamp: true, + numberOfIgnoredLines: $skipLines, + ignoreColumns: [ToStageImporterInterface::TIMESTAMP_COLUMN_NAME], ); } } diff --git a/packages/php-db-import-export/tests/functional/Snowflake/ToFinal/FullImportTest.php b/packages/php-db-import-export/tests/functional/Snowflake/ToFinal/FullImportTest.php index 289b457ff..3bebf9dfb 100644 --- a/packages/php-db-import-export/tests/functional/Snowflake/ToFinal/FullImportTest.php +++ b/packages/php-db-import-export/tests/functional/Snowflake/ToFinal/FullImportTest.php @@ -7,11 +7,14 @@ use Generator; use Keboola\Csv\CsvFile; use Keboola\CsvOptions\CsvOptions; +use Keboola\Datatype\Definition\Snowflake; +use Keboola\Db\ImportExport\Backend\ImportState; use Keboola\Db\ImportExport\Backend\Snowflake\SnowflakeImportOptions; use Keboola\Db\ImportExport\Backend\Snowflake\ToFinalTable\FullImporter; use Keboola\Db\ImportExport\Backend\Snowflake\ToFinalTable\SqlBuilder; use Keboola\Db\ImportExport\Backend\Snowflake\ToStage\StageTableDefinitionFactory; use Keboola\Db\ImportExport\Backend\Snowflake\ToStage\ToStageImporter; +use Keboola\Db\ImportExport\Backend\ToStageImporterInterface; use Keboola\Db\ImportExport\ImportOptions; use Keboola\Db\ImportExport\Storage\Snowflake\Table; use Keboola\Db\ImportExport\Storage\SourceInterface; @@ -33,6 +36,92 @@ protected function setUp(): void $this->createSchema($this->getDestinationSchemaName()); } + /** + * Test is testing loading of semi-structured data into typed table. + * + * This test is not using CSV but inserting data directly into stage table to mimic this behavior + */ + public function testLoadTypedTableWithCastingValues(): void + { + $this->connection->executeQuery(sprintf( + /** @lang Snowflake */ + 'CREATE TABLE %s."types" ( + "id" NUMBER, + "VARIANT" VARIANT, + "BINARY" BINARY, + "VARBINARY" VARBINARY, + "OBJECT" OBJECT, + "ARRAY" ARRAY, + "GEOGRAPHY" GEOGRAPHY, + "GEOMETRY" GEOMETRY, + "_timestamp" TIMESTAMP + );', + SnowflakeQuote::quoteSingleIdentifier($this->getDestinationSchemaName()) + )); + + // skipping header + $options = new SnowflakeImportOptions( + [], + false, + false, + 1, + SnowflakeImportOptions::SAME_TABLES_NOT_REQUIRED, + SnowflakeImportOptions::NULL_MANIPULATION_SKIP, + [ToStageImporterInterface::TIMESTAMP_COLUMN_NAME] + ); + + $destinationRef = new SnowflakeTableReflection( + $this->connection, + $this->getDestinationSchemaName(), + 'types' + ); + /** @var SnowflakeTableDefinition $destination */ + $destination = $destinationRef->getTableDefinition(); + $stagingTable = StageTableDefinitionFactory::createVarcharStagingTableDefinition( + $destination->getSchemaName(), + [ + 'id', + 'VARIANT', + 'BINARY', + 'VARBINARY', + 'OBJECT', + 'ARRAY', + 'GEOGRAPHY', + 'GEOMETRY', + ] + ); + + $qb = new SnowflakeTableQueryBuilder(); + $this->connection->executeStatement( + $qb->getCreateTableCommandFromDefinition($stagingTable) + ); + $this->connection->executeQuery(sprintf( + /** @lang Snowflake */ + 'INSERT INTO "%s"."%s" ("id","VARIANT","BINARY","VARBINARY","OBJECT","ARRAY","GEOGRAPHY","GEOMETRY") +select 1, + TO_VARCHAR(TO_VARIANT(\'3.14\')), + TO_VARCHAR(TO_BINARY(HEX_ENCODE(\'1\'), \'HEX\')), + TO_VARCHAR(TO_BINARY(HEX_ENCODE(\'1\'), \'HEX\')), + TO_VARCHAR(OBJECT_CONSTRUCT(\'name\', \'Jones\'::VARIANT, \'age\', 42::VARIANT)), + TO_VARCHAR(ARRAY_CONSTRUCT(1, 2, 3, NULL)), + \'POINT(-122.35 37.55)\', + \'POINT(1820.12 890.56)\' +;', + $stagingTable->getSchemaName(), + $stagingTable->getTableName() + )); + $toFinalTableImporter = new FullImporter($this->connection); + + $toFinalTableImporter->importToTable( + $stagingTable, + $destination, + $options, + new ImportState($stagingTable->getTableName()) + ); + + self::assertEquals(1, $destinationRef->getRowsCount()); + } + public function testLoadToTableWithNullValuesShouldPass(): void { $this->initTable(self::TABLE_SINGLE_PK); @@ -466,7 +555,16 @@ public function fullImportData(): Generator [] ), [$this->getDestinationSchemaName(), self::TABLE_TABLE], - $this->getSnowflakeImportOptions(), + new SnowflakeImportOptions( + convertEmptyValuesToNull: [], + isIncremental: false, + useTimestamp: true, + numberOfIgnoredLines: 1, + ignoreColumns: [ + ToStageImporterInterface::TIMESTAMP_COLUMN_NAME, + 'lemmaIndex', + ] + ), [['table', 'column', null]], 1, self::TABLE_TABLE, @@ -510,10 +608,11 @@ public function fullImportData(): Generator self::TABLE_OUT_NO_TIMESTAMP_TABLE, ], new SnowflakeImportOptions( - [], - false, - false, // don't use timestamp - ImportOptions::SKIP_FIRST_LINE + convertEmptyValuesToNull: [], + isIncremental: false, + useTimestamp: false, // don't use timestamp + numberOfIgnoredLines: ImportOptions::SKIP_FIRST_LINE, + ignoreColumns: [ToStageImporterInterface::TIMESTAMP_COLUMN_NAME], ), $escapingStub->getRows(), 7, diff --git a/packages/php-db-import-export/tests/functional/Snowflake/ToFinal/IncrementalImportTest.php b/packages/php-db-import-export/tests/functional/Snowflake/ToFinal/IncrementalImportTest.php index 1ebf978e9..0e139d042 100644 --- a/packages/php-db-import-export/tests/functional/Snowflake/ToFinal/IncrementalImportTest.php +++ b/packages/php-db-import-export/tests/functional/Snowflake/ToFinal/IncrementalImportTest.php @@ -6,14 +6,18 @@ use Generator; use Keboola\Csv\CsvFile; +use Keboola\Datatype\Definition\Snowflake; +use Keboola\Db\ImportExport\Backend\ImportState; use Keboola\Db\ImportExport\Backend\Snowflake\SnowflakeImportOptions; use Keboola\Db\ImportExport\Backend\Snowflake\ToFinalTable\FullImporter; use Keboola\Db\ImportExport\Backend\Snowflake\ToFinalTable\IncrementalImporter; use Keboola\Db\ImportExport\Backend\Snowflake\ToFinalTable\SqlBuilder; use Keboola\Db\ImportExport\Backend\Snowflake\ToStage\StageTableDefinitionFactory; use Keboola\Db\ImportExport\Backend\Snowflake\ToStage\ToStageImporter; +use Keboola\Db\ImportExport\Backend\ToStageImporterInterface; use Keboola\Db\ImportExport\ImportOptions; use Keboola\Db\ImportExport\Storage; +use Keboola\TableBackendUtils\Escaping\Snowflake\SnowflakeQuote; use Keboola\TableBackendUtils\Table\Snowflake\SnowflakeTableDefinition; use Keboola\TableBackendUtils\Table\Snowflake\SnowflakeTableQueryBuilder; use Keboola\TableBackendUtils\Table\Snowflake\SnowflakeTableReflection; @@ -26,10 +30,11 @@ protected function getSnowflakeIncrementalImportOptions( int $skipLines = ImportOptions::SKIP_FIRST_LINE ): SnowflakeImportOptions { return new SnowflakeImportOptions( - [], - true, - true, - $skipLines + convertEmptyValuesToNull: [], + isIncremental: true, + useTimestamp: true, + numberOfIgnoredLines: $skipLines, + ignoreColumns: [ToStageImporterInterface::TIMESTAMP_COLUMN_NAME], ); } @@ -42,6 +47,123 @@ protected function setUp(): void $this->createSchema($this->getDestinationSchemaName()); } + /** + * Test is testing loading of semi-structured data into typed table. + * + * This test is not using CSV but inserting data directly into stage table to mimic this behavior + */ + public function testLoadTypedTableWithCastingValues(): void + { + $this->connection->executeQuery(sprintf( + /** @lang Snowflake */ + 'CREATE TABLE %s."types" ( + "id" NUMBER, + "VARIANT" VARIANT, + "BINARY" BINARY, + "VARBINARY" VARBINARY, + "OBJECT" OBJECT, + "ARRAY" ARRAY, + "GEOGRAPHY" GEOGRAPHY, + "GEOMETRY" GEOMETRY, + "_timestamp" TIMESTAMP, + PRIMARY KEY ("id") + );', + SnowflakeQuote::quoteSingleIdentifier($this->getDestinationSchemaName()) + )); + $this->connection->executeQuery(sprintf( + /** @lang Snowflake */ + 'INSERT INTO "%s"."%s" ("id","VARIANT","BINARY","VARBINARY","OBJECT","ARRAY","GEOGRAPHY","GEOMETRY") +SELECT 1, + TO_VARIANT(\'3.14\'), + TO_BINARY(HEX_ENCODE(\'1\'), \'HEX\'), + TO_BINARY(HEX_ENCODE(\'1\'), \'HEX\'), + OBJECT_CONSTRUCT(\'name\', \'Jones\'::VARIANT, \'age\', 42::VARIANT), + ARRAY_CONSTRUCT(1, 2, 3, NULL), + \'POINT(-122.35 37.55)\', + \'POINT(1820.12 890.56)\' +;', + $this->getDestinationSchemaName(), + 'types' + )); + + // skipping header + $options = new SnowflakeImportOptions( + convertEmptyValuesToNull: [], + isIncremental: false, + useTimestamp: false, + numberOfIgnoredLines: 1, + requireSameTables: SnowflakeImportOptions::SAME_TABLES_NOT_REQUIRED, + nullManipulation: SnowflakeImportOptions::NULL_MANIPULATION_SKIP, + ignoreColumns: [ToStageImporterInterface::TIMESTAMP_COLUMN_NAME], + ); + + $destinationRef = new SnowflakeTableReflection( + $this->connection, + $this->getDestinationSchemaName(), + 'types' + ); + /** @var SnowflakeTableDefinition $destination */ + $destination = $destinationRef->getTableDefinition(); + $stagingTable = StageTableDefinitionFactory::createVarcharStagingTableDefinition( + $destination->getSchemaName(), + [ + 'id', + 'VARIANT', + 'BINARY', + 'VARBINARY', + 'OBJECT', + 'ARRAY', + 'GEOGRAPHY', + 'GEOMETRY', + ] + ); + + $qb = new SnowflakeTableQueryBuilder(); + $this->connection->executeStatement( + $qb->getCreateTableCommandFromDefinition($stagingTable) + ); + $this->connection->executeQuery(sprintf( + /** @lang Snowflake */ + 'INSERT INTO "%s"."%s" ("id","VARIANT","BINARY","VARBINARY","OBJECT","ARRAY","GEOGRAPHY","GEOMETRY") +SELECT 1, + TO_VARCHAR(TO_VARIANT(\'3.14\')), + TO_VARCHAR(TO_BINARY(HEX_ENCODE(\'1\'), \'HEX\')), + TO_VARCHAR(TO_BINARY(HEX_ENCODE(\'1\'), \'HEX\')), + TO_VARCHAR(OBJECT_CONSTRUCT(\'name\', \'Jones\'::VARIANT, \'age\', 42::VARIANT)), + TO_VARCHAR(ARRAY_CONSTRUCT(1, 2, 3, NULL)), + \'POINT(-122.35 37.55)\', + \'POINT(1820.12 890.56)\' +;', + $stagingTable->getSchemaName(), + $stagingTable->getTableName() + )); + $this->connection->executeQuery(sprintf( + /** @lang Snowflake */ + 'INSERT INTO "%s"."%s" ("id","VARIANT","BINARY","VARBINARY","OBJECT","ARRAY","GEOGRAPHY","GEOMETRY") +SELECT 2, + TO_VARCHAR(TO_VARIANT(\'3.14\')), + TO_VARCHAR(TO_BINARY(HEX_ENCODE(\'1\'), \'HEX\')), + TO_VARCHAR(TO_BINARY(HEX_ENCODE(\'1\'), \'HEX\')), + TO_VARCHAR(OBJECT_CONSTRUCT(\'name\', \'Jones\'::VARIANT, \'age\', 42::VARIANT)), + TO_VARCHAR(ARRAY_CONSTRUCT(1, 2, 3, NULL)), + \'POINT(-122.35 37.55)\', + \'POINT(1820.12 890.56)\' +;', + $stagingTable->getSchemaName(), + $stagingTable->getTableName() + )); + $toFinalTableImporter = new IncrementalImporter($this->connection); + + $toFinalTableImporter->importToTable( + $stagingTable, + $destination, + $options, + new ImportState($stagingTable->getTableName()) + ); + + self::assertEquals(2, $destinationRef->getRowsCount()); + } + /** * @return \Generator> */ @@ -83,10 +205,11 @@ public function incrementalImportData(): Generator ['id'] ), new SnowflakeImportOptions( - [], - false, - false, // disable timestamp - ImportOptions::SKIP_FIRST_LINE + convertEmptyValuesToNull: [], + isIncremental: false, + useTimestamp: false, // disable timestamp + numberOfIgnoredLines: ImportOptions::SKIP_FIRST_LINE, + ignoreColumns: [ToStageImporterInterface::TIMESTAMP_COLUMN_NAME], ), $this->getSourceInstance( 'tw_accounts.increment.csv', @@ -96,10 +219,11 @@ public function incrementalImportData(): Generator ['id'] ), new SnowflakeImportOptions( - [], - true, // incremental - false, // disable timestamp - ImportOptions::SKIP_FIRST_LINE + convertEmptyValuesToNull: [], + isIncremental: true, // incremental + useTimestamp: false, // disable timestamp + numberOfIgnoredLines: ImportOptions::SKIP_FIRST_LINE, + ignoreColumns: [ToStageImporterInterface::TIMESTAMP_COLUMN_NAME], ), [$this->getDestinationSchemaName(), 'accounts_without_ts'], $accountsStub->getRows(), @@ -138,10 +262,11 @@ public function incrementalImportData(): Generator ['VisitID', 'Value', 'MenuItem'] ), new SnowflakeImportOptions( - [], - true, // incremental - false, // disable timestamp - ImportOptions::SKIP_FIRST_LINE + convertEmptyValuesToNull: [], + isIncremental: true, // incremental + useTimestamp: false, // disable timestamp + numberOfIgnoredLines: ImportOptions::SKIP_FIRST_LINE, + ignoreColumns: [ToStageImporterInterface::TIMESTAMP_COLUMN_NAME], ), $this->getSourceInstance( 'multi-pk.increment.csv', diff --git a/packages/php-db-import-export/tests/functional/Snowflake/ToFinal/SqlBuilderTest.php b/packages/php-db-import-export/tests/functional/Snowflake/ToFinal/SqlBuilderTest.php index 05f052135..dbc468193 100644 --- a/packages/php-db-import-export/tests/functional/Snowflake/ToFinal/SqlBuilderTest.php +++ b/packages/php-db-import-export/tests/functional/Snowflake/ToFinal/SqlBuilderTest.php @@ -9,6 +9,7 @@ use Keboola\Db\ImportExport\Backend\Snowflake\Helper\DateTimeHelper; use Keboola\Db\ImportExport\Backend\Snowflake\SnowflakeImportOptions; use Keboola\Db\ImportExport\Backend\Snowflake\ToFinalTable\SqlBuilder; +use Keboola\Db\ImportExport\Backend\ToStageImporterInterface; use Keboola\Db\ImportExport\ImportOptions; use Keboola\TableBackendUtils\Column\ColumnCollection; use Keboola\TableBackendUtils\Column\Snowflake\SnowflakeColumn; @@ -303,12 +304,13 @@ public function testGetDeleteOldItemsCommandRequireSameTables(): void $stagingTableDefinition, $tableDefinition, new SnowflakeImportOptions( - [], - false, - false, - 0, - ImportOptions::SAME_TABLES_NOT_REQUIRED, - ImportOptions::NULL_MANIPULATION_SKIP //<- skipp null manipulation + convertEmptyValuesToNull: [], + isIncremental: false, + useTimestamp: false, + numberOfIgnoredLines: 0, + requireSameTables: ImportOptions::SAME_TABLES_NOT_REQUIRED, + nullManipulation: ImportOptions::NULL_MANIPULATION_SKIP, //<- skipp null manipulation, + ignoreColumns: [ToStageImporterInterface::TIMESTAMP_COLUMN_NAME], ), ); @@ -410,7 +412,9 @@ public function testGetInsertAllIntoTargetTableCommand(): void $sql = $this->getBuilder()->getInsertAllIntoTargetTableCommand( $fakeStage, $destination, - $this->getDummyImportOptions(), + new SnowflakeImportOptions( + ignoreColumns: ['id'] + ), '2020-01-01 00:00:00' ); @@ -452,6 +456,168 @@ public function testGetInsertAllIntoTargetTableCommand(): void ], $result); } + public function testGetInsertAllIntoTargetTableCommandCasting(): void + { + $this->createTestSchema(); + $destination = new SnowflakeTableDefinition( + self::TEST_SCHEMA, + self::TEST_TABLE, + false, + new ColumnCollection([ + $this->createNullableGenericColumn('pk1'), + new SnowflakeColumn( + 'VARIANT', + new Snowflake( + Snowflake::TYPE_VARIANT + ), + ), + new SnowflakeColumn( + 'BINARY', + new Snowflake( + Snowflake::TYPE_BINARY + ), + ), + new SnowflakeColumn( + 'VARBINARY', + new Snowflake( + Snowflake::TYPE_VARBINARY + ), + ), + new SnowflakeColumn( + 'OBJECT', + new Snowflake( + Snowflake::TYPE_OBJECT + ), + ), + new SnowflakeColumn( + 'ARRAY', + new Snowflake( + Snowflake::TYPE_ARRAY + ), + ), + ]), + ['pk1'] + ); + $stage = new SnowflakeTableDefinition( + self::TEST_SCHEMA, + self::TEST_STAGING_TABLE, + true, + new ColumnCollection([ + $this->createNullableGenericColumn('pk1'), + $this->createNullableGenericColumn('VARIANT'), + $this->createNullableGenericColumn('BINARY'), + $this->createNullableGenericColumn('VARBINARY'), + $this->createNullableGenericColumn('OBJECT'), + $this->createNullableGenericColumn('ARRAY'), + ]), + [] + ); + $this->connection->executeStatement( + (new SnowflakeTableQueryBuilder())->getCreateTableCommandFromDefinition($destination) + ); + $this->connection->executeStatement( + (new SnowflakeTableQueryBuilder())->getCreateTableCommandFromDefinition($stage) + ); + + $this->connection->executeQuery(sprintf( + /** @lang Snowflake */ + 'INSERT INTO "%s"."%s" ("pk1","VARIANT","BINARY","VARBINARY","OBJECT","ARRAY") +SELECT \'1\', + TO_VARIANT(\'4.14\'), + TO_BINARY(HEX_ENCODE(\'1\'), \'HEX\'), + TO_BINARY(HEX_ENCODE(\'1\'), \'HEX\'), + OBJECT_CONSTRUCT(\'name\', \'Jones\'::VARIANT, \'age\', 24::VARIANT), + ARRAY_CONSTRUCT(1, 2, 3, NULL) +;', + self::TEST_SCHEMA, + self::TEST_TABLE, + )); + + $this->connection->executeQuery(sprintf( + /** @lang Snowflake */ + 'INSERT INTO "%s"."%s" ("pk1","VARIANT","BINARY","VARBINARY","OBJECT","ARRAY") +SELECT \'1\', + TO_VARCHAR(TO_VARIANT(\'3.14\')), + TO_VARCHAR(TO_BINARY(HEX_ENCODE(\'1\'), \'HEX\')), + TO_VARCHAR(TO_BINARY(HEX_ENCODE(\'1\'), \'HEX\')), + TO_VARCHAR(OBJECT_CONSTRUCT(\'name\', \'Jones\'::VARIANT, \'age\', 42::VARIANT)), + TO_VARCHAR(ARRAY_CONSTRUCT(1, 2, 3, NULL)) +;', + self::TEST_SCHEMA, + self::TEST_STAGING_TABLE, + )); + + // no convert values no timestamp + $sql = $this->getBuilder()->getInsertAllIntoTargetTableCommand( + $stage, + $destination, + new SnowflakeImportOptions( + convertEmptyValuesToNull: [], + isIncremental: false, + useTimestamp: false, + numberOfIgnoredLines: 0, + requireSameTables: ImportOptions::SAME_TABLES_NOT_REQUIRED, + nullManipulation: ImportOptions::NULL_MANIPULATION_SKIP, //<- skipp null manipulation + ignoreColumns: [ToStageImporterInterface::TIMESTAMP_COLUMN_NAME], + ), + '2020-01-01 00:00:00' + ); + + self::assertEquals( + // phpcs:ignore + 'INSERT INTO "import_export_test_schema"."import_export_test_test" ("pk1", "VARIANT", "BINARY", "VARBINARY", "OBJECT", "ARRAY") (SELECT "pk1",CAST("VARIANT" AS VARIANT) AS "VARIANT","BINARY","VARBINARY",CAST(TO_VARIANT("OBJECT") AS OBJECT) AS "OBJECT",CAST("ARRAY" AS ARRAY) AS "ARRAY" FROM "import_export_test_schema"."__temp_stagingTable" AS "src")', + $sql + ); + + $out = $this->connection->executeStatement($sql); + self::assertEquals(1, $out); + + $result = $this->connection->fetchAllAssociative(sprintf( + 'SELECT * FROM %s', + self::TEST_TABLE_IN_SCHEMA + )); + + self::assertEqualsCanonicalizing([ + [ + 'pk1' => '1', + 'VARIANT' => '"3.14"', + 'BINARY' => '1', + 'VARBINARY' => '1', + 'OBJECT' => << << '1', + 'VARIANT' => '"4.14"', + 'BINARY' => '1', + 'VARBINARY' => '1', + 'OBJECT' => << <<createTestSchema(); @@ -475,12 +641,15 @@ public function testGetInsertAllIntoTargetTableCommandSameTables(): void $fakeStage, $destination, new SnowflakeImportOptions( - [], - false, - false, - 0, - ImportOptions::SAME_TABLES_NOT_REQUIRED, - ImportOptions::NULL_MANIPULATION_SKIP //<- skipp null manipulation + convertEmptyValuesToNull: [], + isIncremental: false, + useTimestamp: false, + numberOfIgnoredLines: 0, + requireSameTables: ImportOptions::SAME_TABLES_NOT_REQUIRED, + nullManipulation: ImportOptions::NULL_MANIPULATION_SKIP, //<- skipp null manipulation + ignoreColumns: [ + 'id', + ], ), '2020-01-01 00:00:00' ); @@ -596,7 +765,10 @@ public function testGetInsertAllIntoTargetTableCommandConvertToNull(): void ); // convert col1 to null - $options = new SnowflakeImportOptions(['col1']); + $options = new SnowflakeImportOptions( + convertEmptyValuesToNull: ['col1'], + ignoreColumns: ['id'], + ); $sql = $this->getBuilder()->getInsertAllIntoTargetTableCommand( $fakeStage, $destination, @@ -658,7 +830,15 @@ public function testGetInsertAllIntoTargetTableCommandConvertToNullWithTimestamp ); // use timestamp - $options = new SnowflakeImportOptions(['col1'], false, true); + $options = new SnowflakeImportOptions( + convertEmptyValuesToNull: ['col1'], + isIncremental: false, + useTimestamp: true, + ignoreColumns: [ + 'id', + ToStageImporterInterface::TIMESTAMP_COLUMN_NAME, + ], + ); $sql = $this->getBuilder()->getInsertAllIntoTargetTableCommand( $fakeStage, $destination, @@ -832,12 +1012,13 @@ public function testGetUpdateWithPkCommandRequireSameTables(): void $fakeStage, $fakeDestination, new SnowflakeImportOptions( - [], - false, - false, - 0, - ImportOptions::SAME_TABLES_NOT_REQUIRED, - ImportOptions::NULL_MANIPULATION_SKIP //<- skipp null manipulation + convertEmptyValuesToNull: [], + isIncremental: false, + useTimestamp: false, + numberOfIgnoredLines: 0, + requireSameTables: ImportOptions::SAME_TABLES_NOT_REQUIRED, + nullManipulation: ImportOptions::NULL_MANIPULATION_SKIP, //<- skipp null manipulation + ignoreColumns: [ToStageImporterInterface::TIMESTAMP_COLUMN_NAME], ), '2020-01-01 00:00:00' ); @@ -862,6 +1043,145 @@ public function testGetUpdateWithPkCommandRequireSameTables(): void ], $result); } + public function testGetUpdateWithPkCommandCasting(): void + { + $this->createTestSchema(); + $destination = new SnowflakeTableDefinition( + self::TEST_SCHEMA, + self::TEST_TABLE, + false, + new ColumnCollection([ + $this->createNullableGenericColumn('pk1'), + new SnowflakeColumn( + 'VARIANT', + new Snowflake( + Snowflake::TYPE_VARIANT + ), + ), + new SnowflakeColumn( + 'BINARY', + new Snowflake( + Snowflake::TYPE_BINARY + ), + ), + new SnowflakeColumn( + 'VARBINARY', + new Snowflake( + Snowflake::TYPE_VARBINARY + ), + ), + new SnowflakeColumn( + 'OBJECT', + new Snowflake( + Snowflake::TYPE_OBJECT + ), + ), + new SnowflakeColumn( + 'ARRAY', + new Snowflake( + Snowflake::TYPE_ARRAY + ), + ), + ]), + ['pk1'] + ); + $stage = new SnowflakeTableDefinition( + self::TEST_SCHEMA, + self::TEST_STAGING_TABLE, + true, + new ColumnCollection([ + $this->createNullableGenericColumn('pk1'), + $this->createNullableGenericColumn('VARIANT'), + $this->createNullableGenericColumn('BINARY'), + $this->createNullableGenericColumn('VARBINARY'), + $this->createNullableGenericColumn('OBJECT'), + $this->createNullableGenericColumn('ARRAY'), + ]), + [] + ); + $this->connection->executeStatement( + (new SnowflakeTableQueryBuilder())->getCreateTableCommandFromDefinition($destination) + ); + $this->connection->executeStatement( + (new SnowflakeTableQueryBuilder())->getCreateTableCommandFromDefinition($stage) + ); + + $this->connection->executeQuery(sprintf( + /** @lang Snowflake */ + 'INSERT INTO "%s"."%s" ("pk1","VARIANT","BINARY","VARBINARY","OBJECT","ARRAY") +SELECT \'1\', + TO_VARIANT(\'4.14\'), + TO_BINARY(HEX_ENCODE(\'1\'), \'HEX\'), + TO_BINARY(HEX_ENCODE(\'1\'), \'HEX\'), + OBJECT_CONSTRUCT(\'name\', \'Jones\'::VARIANT, \'age\', 42::VARIANT), + ARRAY_CONSTRUCT(1, 2, 3, NULL) +;', + self::TEST_SCHEMA, + self::TEST_TABLE, + )); + + $this->connection->executeQuery(sprintf( + /** @lang Snowflake */ + 'INSERT INTO "%s"."%s" ("pk1","VARIANT","BINARY","VARBINARY","OBJECT","ARRAY") +SELECT \'1\', + TO_VARCHAR(TO_VARIANT(\'3.14\')), + TO_VARCHAR(TO_BINARY(HEX_ENCODE(\'1\'), \'HEX\')), + TO_VARCHAR(TO_BINARY(HEX_ENCODE(\'1\'), \'HEX\')), + TO_VARCHAR(OBJECT_CONSTRUCT(\'name\', \'Jones\'::VARIANT, \'age\', 42::VARIANT)), + TO_VARCHAR(ARRAY_CONSTRUCT(1, 2, 3, NULL)) +;', + self::TEST_SCHEMA, + self::TEST_STAGING_TABLE, + )); + + // no convert values no timestamp + $sql = $this->getBuilder()->getUpdateWithPkCommand( + $stage, + $destination, + new SnowflakeImportOptions( + convertEmptyValuesToNull: [], + isIncremental: false, + useTimestamp: false, + numberOfIgnoredLines: 0, + requireSameTables: ImportOptions::SAME_TABLES_NOT_REQUIRED, + nullManipulation: ImportOptions::NULL_MANIPULATION_SKIP, //<- skipp null manipulation + ignoreColumns: [ToStageImporterInterface::TIMESTAMP_COLUMN_NAME], + ), + '2020-01-01 00:00:00' + ); + self::assertEquals( + // phpcs:ignore + 'UPDATE "import_export_test_schema"."import_export_test_test" AS "dest" SET "pk1" = "src"."pk1", "VARIANT" = CAST("src"."VARIANT" AS VARIANT), "BINARY" = "src"."BINARY", "VARBINARY" = "src"."VARBINARY", "OBJECT" = CAST(TO_VARIANT("src"."OBJECT") AS OBJECT), "ARRAY" = CAST("src"."ARRAY" AS ARRAY) FROM "import_export_test_schema"."__temp_stagingTable" AS "src" WHERE "dest"."pk1" = "src"."pk1" ', + $sql + ); + $this->connection->executeStatement($sql); + + $result = $this->connection->fetchAllAssociative(sprintf( + 'SELECT * FROM %s', + self::TEST_TABLE_IN_SCHEMA + )); + + self::assertEquals([ + [ + 'pk1' => '1', + 'VARIANT' => '"3.14"', + 'BINARY' => '1', + 'VARBINARY' => '1', + 'OBJECT' => << <<createTestSchema(); @@ -1055,6 +1375,7 @@ public function testGetUpdateWithPkCommandConvertValuesWithTimestamp(): void ); } } + public function testGetUpdateWithPkCommandNullManipulationWithTimestamp(): void { $timestampInit = new DateTime('2020-01-01 00:00:01'); @@ -1110,12 +1431,13 @@ public function testGetUpdateWithPkCommandNullManipulationWithTimestamp(): void // use timestamp $options = new SnowflakeImportOptions( - ['col1'], - false, - true, - 0, - SnowflakeImportOptions::SAME_TABLES_REQUIRED, - SnowflakeImportOptions::NULL_MANIPULATION_SKIP, + convertEmptyValuesToNull: ['col1'], + isIncremental: false, + useTimestamp: true, + numberOfIgnoredLines: 0, + requireSameTables: SnowflakeImportOptions::SAME_TABLES_REQUIRED, + nullManipulation: SnowflakeImportOptions::NULL_MANIPULATION_SKIP, + ignoreColumns: [ToStageImporterInterface::TIMESTAMP_COLUMN_NAME], ); $sql = $this->getBuilder()->getUpdateWithPkCommand( $fakeStage, diff --git a/packages/php-db-import-export/tests/functional/Snowflake/ToFinal/StageToFinalCastingErrorsTest.php b/packages/php-db-import-export/tests/functional/Snowflake/ToFinal/StageToFinalCastingErrorsTest.php new file mode 100644 index 000000000..ec1e5f7da --- /dev/null +++ b/packages/php-db-import-export/tests/functional/Snowflake/ToFinal/StageToFinalCastingErrorsTest.php @@ -0,0 +1,137 @@ +cleanSchema($this->getDestinationSchemaName()); + $this->cleanSchema($this->getSourceSchemaName()); + $this->createSchema($this->getSourceSchemaName()); + $this->createSchema($this->getDestinationSchemaName()); + } + + public function castingErrorCases(): Generator + { + yield 'BINARY string' => [ + 'column' => new SnowflakeColumn('id', new Snowflake(Snowflake::TYPE_BINARY)), + 'insertData' => '\'xxx\'', + 'expectedMessage' => '/The following string is not a legal hex-encoded value/', + ]; + yield 'VARBINARY string' => [ + 'column' => new SnowflakeColumn('id', new Snowflake(Snowflake::TYPE_VARBINARY)), + 'insertData' => '\'xxx\'', + 'expectedMessage' => '/The following string is not a legal hex-encoded value/', + ]; + yield 'OBJECT string' => [ + 'column' => new SnowflakeColumn('id', new Snowflake(Snowflake::TYPE_OBJECT)), + 'insertData' => '\'xxx\'', + 'expectedMessage' => '/Failed to cast variant value .* to OBJECT/', + ]; + yield 'GEOGRAPHY string' => [ + 'column' => new SnowflakeColumn('id', new Snowflake(Snowflake::TYPE_GEOGRAPHY)), + 'insertData' => '\'xxx\'', + 'expectedMessage' => '/Error parsing Geo input/', + ]; + yield 'GEOMETRY string' => [ + 'column' => new SnowflakeColumn('id', new Snowflake(Snowflake::TYPE_GEOMETRY)), + 'insertData' => '\'xxx\'', + 'expectedMessage' => '/Error parsing Geo input/', + ]; + yield 'TIMESTAMP string' => [ + 'column' => new SnowflakeColumn('id', new Snowflake(Snowflake::TYPE_TIMESTAMP)), + 'insertData' => '\'xxx\'', + 'expectedMessage' => '/is not recognized/', + ]; + } + + /** + * @dataProvider castingErrorCases + */ + public function testLoadTypedTableWithCastingValuesErrors( + SnowflakeColumn $column, + string $insertData, + string $expectedMessage, + ): void { + $this->connection->executeQuery(sprintf( + /** @lang Snowflake */ + 'CREATE TABLE %s."types" ( + "%s" %s, + "_timestamp" TIMESTAMP + );', + SnowflakeQuote::quoteSingleIdentifier($this->getDestinationSchemaName()), + $column->getColumnName(), + $column->getColumnDefinition()->getSQLDefinition(), + )); + + // skipping header + $options = new SnowflakeImportOptions( + [], + false, + false, + 1, + SnowflakeImportOptions::SAME_TABLES_NOT_REQUIRED, + SnowflakeImportOptions::NULL_MANIPULATION_SKIP, + [ToStageImporterInterface::TIMESTAMP_COLUMN_NAME] + ); + + $destinationRef = new SnowflakeTableReflection( + $this->connection, + $this->getDestinationSchemaName(), + 'types' + ); + /** @var SnowflakeTableDefinition $destination */ + $destination = $destinationRef->getTableDefinition(); + $stagingTable = StageTableDefinitionFactory::createVarcharStagingTableDefinition( + $destination->getSchemaName(), + [ + $column->getColumnName(), + ] + ); + + $qb = new SnowflakeTableQueryBuilder(); + $this->connection->executeStatement( + $qb->getCreateTableCommandFromDefinition($stagingTable) + ); + $this->connection->executeQuery(sprintf( + /** @lang Snowflake */ + 'INSERT INTO "%s"."%s" ("%s") SELECT %s;', + $stagingTable->getSchemaName(), + $stagingTable->getTableName(), + $column->getColumnName(), + $insertData + )); + $toFinalTableImporter = new FullImporter($this->connection); + + try { + $toFinalTableImporter->importToTable( + $stagingTable, + $destination, + $options, + new ImportState($stagingTable->getTableName()) + ); + $this->fail('Import must fail'); + } catch (Exception $e) { + $this->assertMatchesRegularExpression($expectedMessage, $e->getMessage()); + } + } +} diff --git a/packages/php-db-import-export/tests/functional/Snowflake/ToStage/StageImportTest.php b/packages/php-db-import-export/tests/functional/Snowflake/ToStage/StageImportTest.php index 50459d1b2..a3abff752 100644 --- a/packages/php-db-import-export/tests/functional/Snowflake/ToStage/StageImportTest.php +++ b/packages/php-db-import-export/tests/functional/Snowflake/ToStage/StageImportTest.php @@ -7,6 +7,7 @@ use Keboola\Db\Import\Exception; use Keboola\Db\ImportExport\Backend\Snowflake\SnowflakeImportOptions; use Keboola\Db\ImportExport\Backend\Snowflake\ToStage\ToStageImporter; +use Keboola\Db\ImportExport\Backend\ToStageImporterInterface; use Keboola\Db\ImportExport\Exception\ColumnsMismatchException; use Keboola\Db\ImportExport\Storage\Snowflake\Table; use Keboola\TableBackendUtils\Escaping\Snowflake\SnowflakeQuote; @@ -176,11 +177,12 @@ public function testMoveDataFromAToBRequireSameTablesFailColumnNameMismatch(): v $source, $targetTableRef->getTableDefinition(), new SnowflakeImportOptions( - [], - false, - true, - 1, - SnowflakeImportOptions::SAME_TABLES_REQUIRED + convertEmptyValuesToNull: [], + isIncremental: false, + useTimestamp: true, + numberOfIgnoredLines: 1, + requireSameTables: SnowflakeImportOptions::SAME_TABLES_REQUIRED, + ignoreColumns: [ToStageImporterInterface::TIMESTAMP_COLUMN_NAME], ) ); } @@ -225,11 +227,12 @@ public function testMoveDataFromAToBRequireSameTablesFailColumnCount(): void $source, $targetTableRef->getTableDefinition(), new SnowflakeImportOptions( - [], - false, - true, - 1, - SnowflakeImportOptions::SAME_TABLES_REQUIRED + convertEmptyValuesToNull: [], + isIncremental: false, + useTimestamp: true, + numberOfIgnoredLines: 1, + requireSameTables: SnowflakeImportOptions::SAME_TABLES_REQUIRED, + ignoreColumns: [ToStageImporterInterface::TIMESTAMP_COLUMN_NAME], ) ); } @@ -272,11 +275,12 @@ public function testMoveDataFromAToBRequireSameTablesFailColumnMismatch(): void $source, $targetTableRef->getTableDefinition(), new SnowflakeImportOptions( - [], - false, - true, - 1, - SnowflakeImportOptions::SAME_TABLES_REQUIRED + convertEmptyValuesToNull: [], + isIncremental: false, + useTimestamp: true, + numberOfIgnoredLines: 1, + requireSameTables: SnowflakeImportOptions::SAME_TABLES_REQUIRED, + ignoreColumns: [ToStageImporterInterface::TIMESTAMP_COLUMN_NAME], ) ); } diff --git a/packages/php-db-import-export/tests/unit/Backend/SourceDestinationColumnMapTest.php b/packages/php-db-import-export/tests/unit/Backend/SourceDestinationColumnMapTest.php new file mode 100644 index 000000000..353b924db --- /dev/null +++ b/packages/php-db-import-export/tests/unit/Backend/SourceDestinationColumnMapTest.php @@ -0,0 +1,227 @@ +getColumn('col1', 'string'); + $source = new ColumnCollection([ + $col1, + $this->getColumn('col2', 'string'), + ]); + $col1Dest = $this->getColumn('col1', 'bool'); + $destination = new ColumnCollection([ + $col1Dest, + $this->getColumn('col2', 'bool'), + ]); + + $map = new SourceDestinationColumnMap( + $source, + $destination + ); + return [$col1, $col1Dest, $map]; + } + + private function getColumn(string $name, string $type): ColumnInterface + { + return new class($name, $type) implements ColumnInterface { + public function __construct(private readonly string $name, private readonly string $type) + { + } + + public function getColumnName(): string + { + return $this->name; + } + + public function getColumnDefinition(): DefinitionInterface + { + return new class($this->type) implements DefinitionInterface { + public function __construct(private readonly string $type) + { + } + + public function getSQLDefinition(): string + { + return $this->type . 'DEF'; + } + + public function toArray(): array + { + throw new Exception('Not implemented'); + } + + public function getBasetype(): string + { + throw new Exception('Not implemented'); + } + + public function getType(): string + { + return $this->type; + } + + public function getLength(): ?string + { + throw new Exception('Not implemented'); + } + + public function isNullable(): bool + { + throw new Exception('Not implemented'); + } + + public function getDefault(): ?string + { + throw new Exception('Not implemented'); + } + + public static function getTypeByBasetype(string $basetype): string + { + throw new Exception('Not implemented'); + } + }; + } + + public static function createGenericColumn(string $columnName): ColumnInterface + { + throw new Exception('Not implemented'); + } + + public static function createTimestampColumn( + string $columnName = self::TIMESTAMP_COLUMN_NAME + ): ColumnInterface { + throw new Exception('Not implemented'); + } + + /** + * @param array $dbResponse + */ + public static function createFromDB(array $dbResponse): ColumnInterface + { + throw new Exception('Not implemented'); + } + }; + } + + public function testCreateForTables(): void + { + $col1 = $this->getColumn('col1', 'string'); + $source = $this->createMock(TableDefinitionInterface::class); + $source->expects(self::once())->method('getColumnsDefinitions')->willReturn(new ColumnCollection([ + $col1, + $this->getColumn('col2', 'string'), + ])); + $col1Dest = $this->getColumn('col1', 'bool'); + $destination = $this->createMock(TableDefinitionInterface::class); + $destination->expects(self::once())->method('getColumnsDefinitions')->willReturn(new ColumnCollection([ + $col1Dest, + $this->getColumn('col2', 'bool'), + ])); + + $map = SourceDestinationColumnMap::createForTables( + $source, + $destination + ); + + $this->assertSame($col1Dest, $map->getDestination($col1)); + } + + public function testCreateForCollection(): void + { + [$col1, $col1Dest, $map] = $this->getMap(); + + $this->assertSame($col1Dest, $map->getDestination($col1)); + } + + public function testColumnMismatch(): void + { + $source = new ColumnCollection([ + $this->getColumn('col1', 'string'), + $this->getColumn('col2', 'string'), + ]); + $destination = new ColumnCollection([ + $this->getColumn('col1', 'bool'), + $this->getColumn('col2', 'bool'), + $this->getColumn('col3', 'bool'), + ]); + + $this->expectException(ColumnsMismatchException::class); + new SourceDestinationColumnMap( + $source, + $destination + ); + } + + public function testIgnoreColumn(): void + { + $this->expectNotToPerformAssertions(); + $source = new ColumnCollection([ + $this->getColumn('col1', 'string'), + $this->getColumn('col2', 'string'), + ]); + $destination = new ColumnCollection([ + $this->getColumn('col1', 'bool'), + $this->getColumn('col2', 'bool'), + $this->getColumn('col3', 'bool'), + ]); + + new SourceDestinationColumnMap( + $source, + $destination, + ['col3'] + ); + } + + public function testIgnoreColumnsMoreThanOne(): void + { + $this->expectNotToPerformAssertions(); + $source = new ColumnCollection([ + $this->getColumn('col1', 'string'), + $this->getColumn('col2', 'string'), + ]); + $destination = new ColumnCollection([ + $this->getColumn('col1', 'bool'), + $this->getColumn('col2', 'bool'), + $this->getColumn('col3', 'bool'), + $this->getColumn('col4', 'bool'), + $this->getColumn('col5', 'bool'), + ]); + + new SourceDestinationColumnMap( + $source, + $destination, + ['col3', 'col4', 'col5'] + ); + } + + public function testColumnNotFound(): void + { + [, , $map] = $this->getMap(); + + $this->expectException(Throwable::class); + $map->getDestination($this->getColumn('test', 'string')); + } +}