-
Notifications
You must be signed in to change notification settings - Fork 2
CT-1128 SNFLK autocast types when using string stage table #90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0fa8ddb
4c4efe0
099a759
66953f2
e69b473
0c0390f
0cb3cb5
3a61db2
46b5ffd
d534718
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,17 +8,19 @@ | |
| use Keboola\Datatype\Definition\Snowflake; | ||
| use Keboola\Db\ImportExport\Backend\Snowflake\Helper\QuoteHelper; | ||
| use Keboola\Db\ImportExport\Backend\Snowflake\SnowflakeImportOptions; | ||
| use Keboola\Db\ImportExport\Backend\SourceDestinationColumnMap; | ||
| use Keboola\Db\ImportExport\Backend\ToStageImporterInterface; | ||
| use Keboola\Db\ImportExport\ImportOptionsInterface; | ||
| use Keboola\Db\ImportExport\Storage\Snowflake\Table; | ||
| use Keboola\Db\ImportExport\Storage\SourceInterface; | ||
| use Keboola\TableBackendUtils\Column\Snowflake\SnowflakeColumn; | ||
| use Keboola\TableBackendUtils\Escaping\Exasol\ExasolQuote; | ||
| use Keboola\TableBackendUtils\Escaping\Snowflake\SnowflakeQuote; | ||
| use Keboola\TableBackendUtils\Table\Snowflake\SnowflakeTableDefinition; | ||
|
|
||
| class SqlBuilder | ||
| { | ||
| private const AUTO_CASTING_TYPES = [ | ||
| Snowflake::TYPE_VARIANT, | ||
| Snowflake::TYPE_OBJECT, | ||
| Snowflake::TYPE_ARRAY, | ||
| ]; | ||
| public const SRC_ALIAS = 'src'; | ||
|
|
||
| public function getBeginTransaction(): string | ||
|
|
@@ -165,6 +167,11 @@ public function getInsertAllIntoTargetTableCommand( | |
| SnowflakeImportOptions $importOptions, | ||
| string $timestamp | ||
| ): string { | ||
| $columnMap = SourceDestinationColumnMap::createForTables( | ||
| $sourceTableDefinition, | ||
| $destinationTableDefinition, | ||
| $importOptions->ignoreColumns() | ||
| ); | ||
| $destinationTable = sprintf( | ||
| '%s.%s', | ||
| SnowflakeQuote::quoteSingleIdentifier($destinationTableDefinition->getSchemaName()), | ||
|
|
@@ -184,44 +191,67 @@ public function getInsertAllIntoTargetTableCommand( | |
|
|
||
| $columnsSetSql = []; | ||
|
|
||
| /** @var SnowflakeColumn $columnDefinition */ | ||
| foreach ($sourceTableDefinition->getColumnsDefinitions() as $columnDefinition) { | ||
| /** @var SnowflakeColumn $sourceColumn */ | ||
| foreach ($sourceTableDefinition->getColumnsDefinitions() as $sourceColumn) { | ||
| // output mapping same tables are required do not convert nulls to empty strings | ||
| if (!$importOptions->isNullManipulationEnabled()) { | ||
| $columnsSetSql[] = SnowflakeQuote::quoteSingleIdentifier($columnDefinition->getColumnName()); | ||
| $destinationColumn = $columnMap->getDestination($sourceColumn); | ||
| $type = $destinationColumn->getColumnDefinition()->getType(); | ||
| $useAutoCast = in_array($type, self::AUTO_CASTING_TYPES, true); | ||
| $isSameType = $type === $sourceColumn->getColumnDefinition()->getType(); | ||
| if ($useAutoCast && !$isSameType) { | ||
| if ($type === Snowflake::TYPE_OBJECT) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tady ocekavame to ze pro dalsi edgecasy co prijdou, tak se to bude vetvit, ze? Nechces to nekam vycuknout? kdyz to nechame takto, tak s dalsim typem prijde copypaste stejneho ifu. Ale asi je to pre-mature... necham na tobe
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prakjticky sem teď přidal test pro GEOMETRY a GEOGRAPHY víc takových typů tam není takže to dost pravděpodobně potřeba vůbec nebude pro nic dalšího. |
||
| // object can't be casted from string but can be casted from variant | ||
| $columnsSetSql[] = sprintf( | ||
| 'CAST(TO_VARIANT(%s) AS %s) AS %s', | ||
| SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()), | ||
| $destinationColumn->getColumnDefinition()->getSQLDefinition(), | ||
| SnowflakeQuote::quoteSingleIdentifier($destinationColumn->getColumnName()) | ||
| ); | ||
| continue; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ta metoda je takto nadesignovana, ale ten ty
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if else by byl hrozný hell, možeš to zkusit jak by to vypadalo :) Možná match(true) by to trochu pošéfoval, ale takto jdeš po těch podmínkách a máš early return přes continue. Když je to "flat" přes else if tak bude strašně komplikované ty elseif poskládat správně pod sebe. |
||
| } | ||
| $columnsSetSql[] = sprintf( | ||
| 'CAST(%s AS %s) AS %s', | ||
| SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()), | ||
| $destinationColumn->getColumnDefinition()->getSQLDefinition(), | ||
| SnowflakeQuote::quoteSingleIdentifier($destinationColumn->getColumnName()) | ||
| ); | ||
| continue; | ||
| } | ||
| $columnsSetSql[] = SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()); | ||
| continue; | ||
| } | ||
|
|
||
| // Input mapping convert empty values to null | ||
| // empty strings '' are converted to null values | ||
| if (in_array($columnDefinition->getColumnName(), $importOptions->getConvertEmptyValuesToNull(), true)) { | ||
| if (in_array($sourceColumn->getColumnName(), $importOptions->getConvertEmptyValuesToNull(), true)) { | ||
| // use nullif only for string base type | ||
| if ($columnDefinition->getColumnDefinition()->getBasetype() === BaseType::STRING) { | ||
| if ($sourceColumn->getColumnDefinition()->getBasetype() === BaseType::STRING) { | ||
| $columnsSetSql[] = sprintf( | ||
| 'IFF(%s = \'\', NULL, %s)', | ||
| SnowflakeQuote::quoteSingleIdentifier($columnDefinition->getColumnName()), | ||
| SnowflakeQuote::quoteSingleIdentifier($columnDefinition->getColumnName()) | ||
| SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()), | ||
| SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()) | ||
| ); | ||
| continue; | ||
| } | ||
| // if tables is not typed column could be other than string in this case we skip conversion | ||
| $columnsSetSql[] = SnowflakeQuote::quoteSingleIdentifier($columnDefinition->getColumnName()); | ||
| $columnsSetSql[] = SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()); | ||
| continue; | ||
| } | ||
|
|
||
| // for string base type convert null values to empty string '' | ||
| //phpcs:ignore | ||
| if (!$importOptions->usingUserDefinedTypes() && $columnDefinition->getColumnDefinition()->getBasetype() === BaseType::STRING) { | ||
| if (!$importOptions->usingUserDefinedTypes() && $sourceColumn->getColumnDefinition()->getBasetype() === BaseType::STRING) { | ||
| $columnsSetSql[] = sprintf( | ||
| 'COALESCE(%s, \'\') AS %s', | ||
| SnowflakeQuote::quoteSingleIdentifier($columnDefinition->getColumnName()), | ||
| SnowflakeQuote::quoteSingleIdentifier($columnDefinition->getColumnName()) | ||
| SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()), | ||
| SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()) | ||
| ); | ||
| continue; | ||
| } | ||
| // on columns other than string dont use COALESCE | ||
| // this will fail if the column is not null, but this is expected | ||
| $columnsSetSql[] = SnowflakeQuote::quoteSingleIdentifier($columnDefinition->getColumnName()); | ||
| $columnsSetSql[] = SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()); | ||
| } | ||
|
|
||
| if ($useTimestamp) { | ||
|
|
@@ -256,29 +286,58 @@ public function getUpdateWithPkCommand( | |
| SnowflakeImportOptions $importOptions, | ||
| string $timestamp | ||
| ): string { | ||
| $columnMap = SourceDestinationColumnMap::createForTables( | ||
| $stagingTableDefinition, | ||
| $destinationDefinition, | ||
| $importOptions->ignoreColumns() | ||
| ); | ||
| $columnsSet = []; | ||
|
|
||
| foreach ($stagingTableDefinition->getColumnsNames() as $columnName) { | ||
| foreach ($stagingTableDefinition->getColumnsDefinitions() as $sourceColumn) { | ||
| if (!$importOptions->isNullManipulationEnabled()) { | ||
| $destinationColumn = $columnMap->getDestination($sourceColumn); | ||
| $type = $destinationColumn->getColumnDefinition()->getType(); | ||
| $useAutoCast = in_array($type, self::AUTO_CASTING_TYPES, true); | ||
| $isSameType = $type === $sourceColumn->getColumnDefinition()->getType(); | ||
| if ($useAutoCast && !$isSameType) { | ||
| if ($type === Snowflake::TYPE_OBJECT) { | ||
| // object can't be casted from string but can be casted from variant | ||
| $columnsSet[] = sprintf( | ||
| '%s = CAST(TO_VARIANT("src".%s) AS %s)', | ||
| SnowflakeQuote::quoteSingleIdentifier($destinationColumn->getColumnName()), | ||
| SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()), | ||
| $destinationColumn->getColumnDefinition()->getSQLDefinition(), | ||
| ); | ||
| continue; | ||
| } | ||
| $columnsSet[] = sprintf( | ||
| '%s = CAST("src".%s AS %s)', | ||
| SnowflakeQuote::quoteSingleIdentifier($destinationColumn->getColumnName()), | ||
| SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()), | ||
| $destinationColumn->getColumnDefinition()->getSQLDefinition(), | ||
| ); | ||
| continue; | ||
| } | ||
|
|
||
| $columnsSet[] = sprintf( | ||
| '%s = "src".%s', | ||
| SnowflakeQuote::quoteSingleIdentifier($columnName), | ||
| SnowflakeQuote::quoteSingleIdentifier($columnName), | ||
| SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()), | ||
| SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()), | ||
| ); | ||
| continue; | ||
| } | ||
| if (in_array($columnName, $importOptions->getConvertEmptyValuesToNull(), true)) { | ||
| if (in_array($sourceColumn->getColumnName(), $importOptions->getConvertEmptyValuesToNull(), true)) { | ||
| $columnsSet[] = sprintf( | ||
| '%s = IFF("src".%s = \'\', NULL, "src".%s)', | ||
| SnowflakeQuote::quoteSingleIdentifier($columnName), | ||
| SnowflakeQuote::quoteSingleIdentifier($columnName), | ||
| SnowflakeQuote::quoteSingleIdentifier($columnName) | ||
| SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()), | ||
| SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()), | ||
| SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()) | ||
| ); | ||
| } else { | ||
| $columnsSet[] = sprintf( | ||
| '%s = COALESCE("src".%s, \'\')', | ||
| SnowflakeQuote::quoteSingleIdentifier($columnName), | ||
| SnowflakeQuote::quoteSingleIdentifier($columnName) | ||
| SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()), | ||
| SnowflakeQuote::quoteSingleIdentifier($sourceColumn->getColumnName()) | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,118 @@ | ||
| <?php | ||
|
|
||
| declare(strict_types=1); | ||
|
|
||
| namespace Keboola\Db\ImportExport\Backend; | ||
|
|
||
| use Error; | ||
| use Exception; | ||
| use Generator; | ||
| use Keboola\Db\ImportExport\Exception\ColumnsMismatchException; | ||
| use Keboola\TableBackendUtils\Column\ColumnCollection; | ||
| use Keboola\TableBackendUtils\Column\ColumnInterface; | ||
| use Keboola\TableBackendUtils\Table\TableDefinitionInterface; | ||
| use WeakMap; | ||
|
|
||
| /** | ||
| * Class will create map of table column based on columns order | ||
| */ | ||
| final class SourceDestinationColumnMap | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tady to mapování source->destination sloupců sem potřeboval dostat někam ven. |
||
| { | ||
| /** | ||
| * @var WeakMap<ColumnInterface,ColumnInterface> | ||
| */ | ||
| private WeakMap $map; | ||
|
|
||
| /** | ||
| * @param string[] $ignoreColumns | ||
| */ | ||
| public function __construct( | ||
| private readonly ColumnCollection $source, | ||
| private readonly ColumnCollection $destination, | ||
| private readonly array $ignoreColumns = [], | ||
| ) { | ||
| $this->map = new WeakMap(); | ||
| $this->buildMap(); | ||
| } | ||
|
|
||
| /** | ||
| * @param string[] $ignoreColumns | ||
| */ | ||
| public static function createForTables( | ||
| TableDefinitionInterface $source, | ||
| TableDefinitionInterface $destination, | ||
| array $ignoreColumns = [], | ||
| ): self { | ||
| return new self( | ||
| $source->getColumnsDefinitions(), | ||
| $destination->getColumnsDefinitions(), | ||
| $ignoreColumns | ||
| ); | ||
| } | ||
|
|
||
| private function buildMap(): void | ||
| { | ||
| $it0 = $this->source->getIterator(); | ||
| $it1 = $this->destination->getIterator(); | ||
| while ($it0->valid() || $it1->valid()) { | ||
| $it0 = $this->ignoreColumn($it0, $it1); | ||
| if ($it0 === false) { | ||
| break; | ||
| } | ||
| $it1 = $this->ignoreColumn($it1, $it0); | ||
| if ($it1 === false) { | ||
| break; | ||
| } | ||
|
|
||
| if ($it0->valid() && $it1->valid()) { | ||
| /** @var ColumnInterface $sourceCol */ | ||
| $sourceCol = $it0->current(); | ||
| /** @var ColumnInterface $destCol */ | ||
| $destCol = $it1->current(); | ||
| $this->map[$sourceCol] = $destCol; | ||
| } else { | ||
| throw ColumnsMismatchException::createColumnsCountMismatch($this->source, $this->destination); | ||
| } | ||
| $it0->next(); | ||
| $it1->next(); | ||
| } | ||
| } | ||
|
|
||
| public function getDestination(ColumnInterface $source): ColumnInterface | ||
| { | ||
| try { | ||
| $destination = $this->map[$source]; | ||
| } catch (Error $e) { | ||
| // this can happen only when class is used with different source and destination tables instances | ||
| throw new Exception(sprintf('Column "%s" not found in destination table', $source->getColumnName())); | ||
| } | ||
| assert($destination !== null); | ||
| return $destination; | ||
| } | ||
|
|
||
| /** | ||
| * @param Generator<int, ColumnInterface> $it0 | ||
| * @param Generator<int, ColumnInterface> $it1 | ||
| * @return Generator<int, ColumnInterface>|false | ||
| */ | ||
| private function ignoreColumn(Generator $it0, Generator $it1): Generator|false | ||
| { | ||
| if ($this->isIgnoredColumn($it0)) { | ||
| $it0->next(); | ||
| $this->ignoreColumn($it0, $it1); | ||
| if (!$it0->valid() && !$it1->valid()) { | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| return $it0; | ||
| } | ||
|
|
||
| /** | ||
| * @param Generator<int, ColumnInterface> $it | ||
| */ | ||
| private function isIgnoredColumn(Generator $it): bool | ||
| { | ||
| return $it->valid() && in_array($it->current()->getColumnName(), $this->ignoreColumns, true); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nechápu proč to neni v tom interface