Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -576,4 +576,349 @@ public function testImportTimestampBehavior(array $features, array $expectedCont
);
$this->assertEqualsCanonicalizing($expectedContent, $destinationContent);
}

public function testImportTimestampBehaviorArrayType(): void
{
$this->connection->executeQuery(sprintf(
'CREATE TABLE %s.%s (
"id" INT CONSTRAINT "table_pk" PRIMARY KEY,
"name" STRING,
"array" ARRAY,
"object" OBJECT,
"isDeleted" INT,
"_timestamp" TIMESTAMP
);',
SnowflakeQuote::quoteSingleIdentifier($this->getDestinationSchemaName()),
SnowflakeQuote::quoteSingleIdentifier(self::TABLE_TRANSLATIONS),
));

$this->connection->executeQuery(sprintf(
'CREATE TABLE %s.%s (
"id" INT,
"name" STRING,
"array" ARRAY,
"object" OBJECT,
"isDeleted" INT
);',
SnowflakeQuote::quoteSingleIdentifier($this->getSourceSchemaName()),
SnowflakeQuote::quoteSingleIdentifier(self::TABLE_TRANSLATIONS),
));

$this->connection->executeStatement(sprintf(
/** @lang Snowflake */
'
INSERT INTO "%s"."%s" ("id", "name", "array", "object", "isDeleted")
SELECT 1, \'test1\', ARRAY_CONSTRUCT(2, NULL), OBJECT_CONSTRUCT(\'name\', \'Jones\'::VARIANT, \'age\', 41::VARIANT), 0;', //<- changed array value
$this->getSourceSchemaName(),
self::TABLE_TRANSLATIONS,
));
$this->connection->executeStatement(sprintf(
/** @lang Snowflake */
'
INSERT INTO "%s"."%s" ("id", "name", "array", "object", "isDeleted")
SELECT 2, \'test2\', ARRAY_CONSTRUCT(2, NULL), OBJECT_CONSTRUCT(\'name\', \'Jones\'::VARIANT, \'age\', 42::VARIANT), 0;',
$this->getSourceSchemaName(),
self::TABLE_TRANSLATIONS,
));
$this->connection->executeStatement(sprintf(
/** @lang Snowflake */
'
INSERT INTO "%s"."%s" ("id", "name", "array", "object", "isDeleted")
SELECT 4, \'test4\', ARRAY_CONSTRUCT(4, NULL), OBJECT_CONSTRUCT(\'name\', \'Jones\'::VARIANT, \'age\', 44::VARIANT), 0;',
$this->getSourceSchemaName(),
self::TABLE_TRANSLATIONS,
));

$this->connection->executeStatement(sprintf(
/** @lang Snowflake */
'
INSERT INTO "%s"."%s" ("id", "name", "array", "object", "isDeleted", "_timestamp")
SELECT 1, \'test1\', ARRAY_CONSTRUCT(1, NULL), OBJECT_CONSTRUCT(\'name\', \'Jones\'::VARIANT, \'age\', 41::VARIANT), 0, \'2021-01-01 00:00:00\';',
$this->getDestinationSchemaName(),
self::TABLE_TRANSLATIONS,
));
$this->connection->executeStatement(sprintf(
/** @lang Snowflake */
'
INSERT INTO "%s"."%s" ("id", "name", "array", "object", "isDeleted", "_timestamp")
SELECT 2, \'test2\', ARRAY_CONSTRUCT(2, NULL), OBJECT_CONSTRUCT(\'name\', \'Jones\'::VARIANT, \'age\', 42::VARIANT), 0, \'2021-01-01 00:00:00\';',
$this->getDestinationSchemaName(),
self::TABLE_TRANSLATIONS,
));
$this->connection->executeStatement(sprintf(
/** @lang Snowflake */
'
INSERT INTO "%s"."%s" ("id", "name", "array", "object", "isDeleted", "_timestamp")
SELECT 3, \'test3\', ARRAY_CONSTRUCT(3, NULL), OBJECT_CONSTRUCT(\'name\', \'Jones\'::VARIANT, \'age\', 43::VARIANT), 0, \'2021-01-01 00:00:00\';',
$this->getDestinationSchemaName(),
self::TABLE_TRANSLATIONS,
));

$source = (new SnowflakeTableReflection(
$this->connection,
$this->getSourceSchemaName(),
self::TABLE_TRANSLATIONS,
))->getTableDefinition();
$destination = (new SnowflakeTableReflection(
$this->connection,
$this->getDestinationSchemaName(),
self::TABLE_TRANSLATIONS,
))->getTableDefinition();

$state = new ImportState(self::TABLE_TRANSLATIONS);
(new IncrementalImporter(
$this->connection,
new DateTimeImmutable('2022-02-02 00:00:00', new DateTimeZone('UTC')),
)
)->importToTable(
$source,
$destination,
new SnowflakeImportOptions(
isIncremental: true,
useTimestamp: true,
nullManipulation: SnowflakeImportOptions::NULL_MANIPULATION_SKIP,
features: ['native-types_timestamp-bc'],
),
$state,
);

$destinationContent = $this->connection->fetchAllAssociative(
sprintf(
'SELECT * FROM %s.%s',
SnowflakeQuote::quoteSingleIdentifier($this->getDestinationSchemaName()),
SnowflakeQuote::quoteSingleIdentifier(self::TABLE_TRANSLATIONS),
),
);
$this->assertEqualsCanonicalizing([
[
'id'=> 1,
'name'=> 'test1',
'array'=> '[
2,
undefined
]',
'object' => '{
"age": 41,
"name": "Jones"
}',
'isDeleted'=> 0,
'_timestamp'=> '2022-02-02 00:00:00',
],
[
'id'=> 2,
'name'=> 'test2',
'array'=> '[
2,
undefined
]',
'object' => '{
"age": 42,
"name": "Jones"
}',
'isDeleted'=> 0,
'_timestamp'=> '2021-01-01 00:00:00',
],
[
'id'=> 3,
'name'=> 'test3',
'array'=> '[
3,
undefined
]',
'object' => '{
"age": 43,
"name": "Jones"
}',
'isDeleted'=> 0,
'_timestamp'=> '2021-01-01 00:00:00', // no change no timestamp update
],
[
'id'=> 4,
'name'=> 'test4',
'array'=> '[
4,
undefined
]',
'object' => '{
"age": 44,
"name": "Jones"
}',
'isDeleted'=> 0,
'_timestamp'=> '2022-02-02 00:00:00',
],
], $destinationContent);
}


public function testImportTimestampBehaviorArrayTypeWithStringSourceTable(): void
{
$this->connection->executeQuery(sprintf(
'CREATE TABLE %s.%s (
"id" INT CONSTRAINT "table_pk" PRIMARY KEY,
"name" STRING,
"array" ARRAY,
"object" OBJECT,
"isDeleted" INT,
"_timestamp" TIMESTAMP
);',
SnowflakeQuote::quoteSingleIdentifier($this->getDestinationSchemaName()),
SnowflakeQuote::quoteSingleIdentifier(self::TABLE_TRANSLATIONS),
));

$this->connection->executeQuery(sprintf(
'CREATE TABLE %s.%s (
"id" STRING,
"name" STRING,
"array" STRING,
"object" STRING,
"isDeleted" STRING
);',
SnowflakeQuote::quoteSingleIdentifier($this->getSourceSchemaName()),
SnowflakeQuote::quoteSingleIdentifier(self::TABLE_TRANSLATIONS),
));

$this->connection->executeStatement(sprintf(
/** @lang Snowflake */
'
INSERT INTO "%s"."%s" ("id", "name", "array", "object", "isDeleted")
SELECT TO_VARCHAR(1), \'test1\', TO_VARCHAR(ARRAY_CONSTRUCT(2, NULL)), TO_VARCHAR(OBJECT_CONSTRUCT(\'name\', \'Jones\'::VARIANT, \'age\', 41::VARIANT)), TO_VARCHAR(0);', //<- changed array value
$this->getSourceSchemaName(),
self::TABLE_TRANSLATIONS,
));
$this->connection->executeStatement(sprintf(
/** @lang Snowflake */
'
INSERT INTO "%s"."%s" ("id", "name", "array", "object", "isDeleted")
SELECT TO_VARCHAR(2), \'test2\', TO_VARCHAR(ARRAY_CONSTRUCT(2, NULL)), TO_VARCHAR(OBJECT_CONSTRUCT(\'name\', \'Jones\'::VARIANT, \'age\', 42::VARIANT)), TO_VARCHAR(0);',
$this->getSourceSchemaName(),
self::TABLE_TRANSLATIONS,
));
$this->connection->executeStatement(sprintf(
/** @lang Snowflake */
'
INSERT INTO "%s"."%s" ("id", "name", "array", "object", "isDeleted")
SELECT TO_VARCHAR(4), \'test4\', TO_VARCHAR(ARRAY_CONSTRUCT(4, NULL)), TO_VARCHAR(OBJECT_CONSTRUCT(\'name\', \'Jones\'::VARIANT, \'age\', 44::VARIANT)), TO_VARCHAR(0);',
$this->getSourceSchemaName(),
self::TABLE_TRANSLATIONS,
));

$this->connection->executeStatement(sprintf(
/** @lang Snowflake */
'
INSERT INTO "%s"."%s" ("id", "name", "array", "object", "isDeleted", "_timestamp")
SELECT 1, \'test1\', ARRAY_CONSTRUCT(1, NULL), OBJECT_CONSTRUCT(\'name\', \'Jones\'::VARIANT, \'age\', 41::VARIANT), 0, \'2021-01-01 00:00:00\';',
$this->getDestinationSchemaName(),
self::TABLE_TRANSLATIONS,
));
$this->connection->executeStatement(sprintf(
/** @lang Snowflake */
'
INSERT INTO "%s"."%s" ("id", "name", "array", "object", "isDeleted", "_timestamp")
SELECT 2, \'test2\', ARRAY_CONSTRUCT(2, NULL), OBJECT_CONSTRUCT(\'name\', \'Jones\'::VARIANT, \'age\', 42::VARIANT), 0, \'2021-01-01 00:00:00\';',
$this->getDestinationSchemaName(),
self::TABLE_TRANSLATIONS,
));
$this->connection->executeStatement(sprintf(
/** @lang Snowflake */
'
INSERT INTO "%s"."%s" ("id", "name", "array", "object", "isDeleted", "_timestamp")
SELECT 3, \'test3\', ARRAY_CONSTRUCT(3, NULL), OBJECT_CONSTRUCT(\'name\', \'Jones\'::VARIANT, \'age\', 43::VARIANT), 0, \'2021-01-01 00:00:00\';',
$this->getDestinationSchemaName(),
self::TABLE_TRANSLATIONS,
));

$source = (new SnowflakeTableReflection(
$this->connection,
$this->getSourceSchemaName(),
self::TABLE_TRANSLATIONS,
))->getTableDefinition();
$destination = (new SnowflakeTableReflection(
$this->connection,
$this->getDestinationSchemaName(),
self::TABLE_TRANSLATIONS,
))->getTableDefinition();

$state = new ImportState(self::TABLE_TRANSLATIONS);
(new IncrementalImporter(
$this->connection,
new DateTimeImmutable('2022-02-02 00:00:00', new DateTimeZone('UTC')),
)
)->importToTable(
$source,
$destination,
new SnowflakeImportOptions(
isIncremental: true,
useTimestamp: true,
nullManipulation: SnowflakeImportOptions::NULL_MANIPULATION_SKIP,
features: ['native-types_timestamp-bc'],
),
$state,
);

$destinationContent = $this->connection->fetchAllAssociative(
sprintf(
'SELECT * FROM %s.%s',
SnowflakeQuote::quoteSingleIdentifier($this->getDestinationSchemaName()),
SnowflakeQuote::quoteSingleIdentifier(self::TABLE_TRANSLATIONS),
),
);
$this->assertEqualsCanonicalizing([
[
'id'=> 1,
'name'=> 'test1',
'array'=> '[
2,
undefined
]',
'object' => '{
"age": 41,
"name": "Jones"
}',
'isDeleted'=> 0,
'_timestamp'=> '2022-02-02 00:00:00',
],
[
'id'=> 2,
'name'=> 'test2',
'array'=> '[
2,
undefined
]',
'object' => '{
"age": 42,
"name": "Jones"
}',
'isDeleted'=> 0,
'_timestamp'=> '2021-01-01 00:00:00',
],
[
'id'=> 3,
'name'=> 'test3',
'array'=> '[
3,
undefined
]',
'object' => '{
"age": 43,
"name": "Jones"
}',
'isDeleted'=> 0,
'_timestamp'=> '2021-01-01 00:00:00', // no change no timestamp update
],
[
'id'=> 4,
'name'=> 'test4',
'array'=> '[
4,
undefined
]',
'object' => '{
"age": 44,
"name": "Jones"
}',
'isDeleted'=> 0,
'_timestamp'=> '2022-02-02 00:00:00',
],
], $destinationContent);
}
}