diff --git a/packages/php-db-import-export/src/Backend/Bigquery/BigqueryException.php b/packages/php-db-import-export/src/Backend/Bigquery/BigqueryException.php index de31955eb..e4aadf453 100644 --- a/packages/php-db-import-export/src/Backend/Bigquery/BigqueryException.php +++ b/packages/php-db-import-export/src/Backend/Bigquery/BigqueryException.php @@ -19,6 +19,9 @@ public static function covertException(JobException|ServiceException $e): Throwa if (preg_match('/.*Required field .+ cannot be null.*/m', $e->getMessage(), $output_array) === 1) { return new BigqueryInputDataException($e->getMessage()); } + if (preg_match('/Bad \w+ value/m', $e->getMessage()) === 1) { + return new BigqueryInputDataException($e->getMessage()); + } return new self($e->getMessage()); } return $e; diff --git a/packages/php-db-import-export/src/Backend/Bigquery/ToFinalTable/SqlBuilder.php b/packages/php-db-import-export/src/Backend/Bigquery/ToFinalTable/SqlBuilder.php index 516057d04..e76197f3e 100644 --- a/packages/php-db-import-export/src/Backend/Bigquery/ToFinalTable/SqlBuilder.php +++ b/packages/php-db-import-export/src/Backend/Bigquery/ToFinalTable/SqlBuilder.php @@ -99,11 +99,24 @@ private function getColumnSetSqlPartForStringTable( if (in_array($columnDefinition->getColumnName(), $importOptions->getConvertEmptyValuesToNull(), true)) { // use nullif only for string base type if ($columnDefinition->getColumnDefinition()->getBasetype() === BaseType::STRING) { - $columnsSetSql[] = sprintf( + $nullifExpr = sprintf( 'NULLIF(%s.%s, \'\')', BigqueryQuote::quoteSingleIdentifier(self::SRC_ALIAS), BigqueryQuote::quoteSingleIdentifier($columnDefinition->getColumnName()), ); + // Cast to destination type when staging columns are STRING + // but destination is typed (e.g. TIMESTAMP from cross-backend CSV loads) + $destType = $destinationColumn->getColumnDefinition()->getType(); + if (strtoupper($destType) !== 'STRING') { + $columnsSetSql[] = sprintf( + 'CAST(%s AS %s) AS %s', + $nullifExpr, + $destType, + BigqueryQuote::quoteSingleIdentifier($columnDefinition->getColumnName()), + ); + } else { + $columnsSetSql[] = $nullifExpr; + } } else { $columnsSetSql[] = BigqueryQuote::quoteSingleIdentifier($columnDefinition->getColumnName()); } @@ -251,6 +264,7 @@ public function getDeleteOldItemsCommand( $this->getPrimaryKeyWhereConditions( $destinationTableDefinition->getPrimaryKeysNames(), $importOptions, + $destinationTableDefinition, ), ); } @@ -303,19 +317,45 @@ public function getUpdateWithPkCommand( ); continue; } + + // Resolve destination column type for CAST (needed when staging is STRING + // but destination is typed, e.g. cross-backend CSV loads) + $destType = null; + /** @var BigqueryColumn $col */ + foreach ($destinationTableDefinition->getColumnsDefinitions() as $col) { + if ($col->getColumnName() === $columnName) { + $destType = $col->getColumnDefinition()->getType(); + break; + } + } + // if string table convert nulls<=>'' if (in_array($columnName, $importOptions->getConvertEmptyValuesToNull(), true)) { - $columnsSet[] = sprintf( - '%s = IF(`src`.%s = \'\', NULL, `src`.%s)', - BigqueryQuote::quoteSingleIdentifier($columnName), + $expr = sprintf( + 'IF(`src`.%s = \'\', NULL, `src`.%s)', BigqueryQuote::quoteSingleIdentifier($columnName), BigqueryQuote::quoteSingleIdentifier($columnName), ); } else { + $expr = sprintf( + 'COALESCE(`src`.%s, \'\')', + BigqueryQuote::quoteSingleIdentifier($columnName), + ); + } + + // Cast to destination type when staging columns are STRING + if ($destType !== null && strtoupper($destType) !== 'STRING') { $columnsSet[] = sprintf( - '%s = COALESCE(`src`.%s, \'\')', + '%s = CAST(%s AS %s)', BigqueryQuote::quoteSingleIdentifier($columnName), + $expr, + $destType, + ); + } else { + $columnsSet[] = sprintf( + '%s = %s', BigqueryQuote::quoteSingleIdentifier($columnName), + $expr, ); } } @@ -348,11 +388,31 @@ static function ($columnName) { // do not compare timestamp column if it is taken from source continue; } - $columnsComparisonSql[$key] = sprintf( - '`dest`.%s != COALESCE(`src`.%s, \'\')', - BigqueryQuote::quoteSingleIdentifier($columnName), - BigqueryQuote::quoteSingleIdentifier($columnName), - ); + // Resolve destination column type for comparison + $destColType = null; + /** @var BigqueryColumn $col */ + foreach ($destinationTableDefinition->getColumnsDefinitions() as $col) { + if ($col->getColumnName() === $columnName) { + $destColType = $col->getColumnDefinition()->getType(); + break; + } + } + + if ($destColType !== null && strtoupper($destColType) !== 'STRING') { + // Cast dest to STRING so comparison works when staging columns are STRING + // but destination columns are typed (e.g. INT64 from cross-backend CSV loads) + $columnsComparisonSql[$key] = sprintf( + 'CAST(`dest`.%s AS STRING) != COALESCE(`src`.%s, \'\')', + BigqueryQuote::quoteSingleIdentifier($columnName), + BigqueryQuote::quoteSingleIdentifier($columnName), + ); + } else { + $columnsComparisonSql[$key] = sprintf( + '`dest`.%s != COALESCE(`src`.%s, \'\')', + BigqueryQuote::quoteSingleIdentifier($columnName), + BigqueryQuote::quoteSingleIdentifier($columnName), + ); + } } } @@ -368,7 +428,11 @@ static function ($columnName) { implode(', ', $columnsSet), BigqueryQuote::quoteSingleIdentifier($stagingTableDefinition->getSchemaName()), BigqueryQuote::quoteSingleIdentifier($stagingTableDefinition->getTableName()), - $this->getPrimaryKeyWhereConditions($destinationTableDefinition->getPrimaryKeysNames(), $importOptions), + $this->getPrimaryKeyWhereConditions( + $destinationTableDefinition->getPrimaryKeysNames(), + $importOptions, + $destinationTableDefinition, + ), implode(' OR ', $columnsComparisonSql), ); } @@ -379,14 +443,38 @@ static function ($columnName) { private function getPrimaryKeyWhereConditions( array $primaryKeys, BigqueryImportOptions $importOptions, + BigqueryTableDefinition $destinationTableDefinition, ): string { - $pkWhereSql = array_map(function (string $col) use ($importOptions) { - $str = '`dest`.%s = COALESCE(`src`.%s, \'\')'; + $pkWhereSql = array_map(function (string $col) use ($importOptions, $destinationTableDefinition) { if ($importOptions->usingUserDefinedTypes()) { - $str = '`dest`.%s = `src`.%s'; + return sprintf( + '`dest`.%s = `src`.%s', + BigqueryQuote::quoteSingleIdentifier($col), + BigqueryQuote::quoteSingleIdentifier($col), + ); + } + + // Check if PK column needs CAST + $destColType = null; + /** @var BigqueryColumn $colDef */ + foreach ($destinationTableDefinition->getColumnsDefinitions() as $colDef) { + if ($colDef->getColumnName() === $col) { + $destColType = $colDef->getColumnDefinition()->getType(); + break; + } + } + + if ($destColType !== null && strtoupper($destColType) !== 'STRING') { + // Cast dest to STRING so comparison works when staging columns are STRING + // but destination columns are typed (e.g. INT64 from cross-backend CSV loads) + return sprintf( + 'CAST(`dest`.%s AS STRING) = COALESCE(`src`.%s, \'\')', + BigqueryQuote::quoteSingleIdentifier($col), + BigqueryQuote::quoteSingleIdentifier($col), + ); } return sprintf( - $str, + '`dest`.%s = COALESCE(`src`.%s, \'\')', BigqueryQuote::quoteSingleIdentifier($col), BigqueryQuote::quoteSingleIdentifier($col), ); diff --git a/packages/php-db-import-export/tests/functional/Bigquery/SqlBuildTest.php b/packages/php-db-import-export/tests/functional/Bigquery/SqlBuildTest.php index a360e15ff..c383a0f9e 100644 --- a/packages/php-db-import-export/tests/functional/Bigquery/SqlBuildTest.php +++ b/packages/php-db-import-export/tests/functional/Bigquery/SqlBuildTest.php @@ -469,6 +469,84 @@ public function testGetInsertAllIntoTargetTableCommandConvertToNullWithTimestamp } } + /** + * Tests CAST(NULLIF(...) AS ) in INSERT when destination columns are + * non-STRING (e.g. TIMESTAMP) but staging is STRING (cross-backend CSV load). + */ + public function testGetInsertAllIntoTargetTableCommandConvertToNullWithTypedDestColumns(): void + { + $this->createTestDb(); + + // Destination table with TIMESTAMP column (nullable) + $destinationColumns = [ + $this->createNullableGenericColumn('id'), + $this->createNullableGenericColumn('col1'), + new BigqueryColumn('col2', new Bigquery(Bigquery::TYPE_TIMESTAMP, ['nullable' => true])), + ]; + $destination = new BigqueryTableDefinition( + self::TEST_DB, + self::TEST_TABLE, + false, + new ColumnCollection($destinationColumns), + [], + ); + $qb = new BigqueryTableQueryBuilder(); + $this->bqClient->runQuery($this->bqClient->query( + $qb->getCreateTableCommandFromDefinition($destination), + )); + + // Staging table with STRING data (STRING_TABLE strategy) + $this->createStagingTableWithData(true); + $fakeStage = new BigqueryTableDefinition( + self::TEST_DB, + self::TEST_STAGING_TABLE, + true, + new ColumnCollection([ + $this->createNullableGenericColumn('col1'), + $this->createNullableGenericColumn('col2'), + ]), + [], + ); + + // Fake destination matching staging columns: col1 STRING, col2 TIMESTAMP + $fakeDestination = new BigqueryTableDefinition( + self::TEST_DB, + self::TEST_TABLE, + true, + new ColumnCollection([ + $this->createNullableGenericColumn('col1'), + new BigqueryColumn('col2', new Bigquery(Bigquery::TYPE_TIMESTAMP, ['nullable' => true])), + ]), + [], + ); + + // convertEmptyValuesToNull on col2 (TIMESTAMP destination) + $sql = $this->getBuilder()->getInsertAllIntoTargetTableCommand( + $fakeStage, + $fakeDestination, + new BigqueryImportOptions( + ['col2'], // convert col2 to null when empty + false, + false, + BigqueryImportOptions::SKIP_NO_LINE, + BigqueryImportOptions::USING_TYPES_STRING, + ), + '2020-01-01 00:00:00', + ); + + // col1: no convert, STRING dest → CAST(COALESCE(...) as STRING) + // col2: convert, TIMESTAMP dest → CAST(NULLIF(...) AS TIMESTAMP) + self::assertEquals( + // phpcs:ignore + 'INSERT INTO `import_export_test_schema`.`import_export_test_test` (`col1`, `col2`) SELECT CAST(COALESCE(`src`.`col1`, \'\') as STRING) AS `col1`,CAST(NULLIF(`src`.`col2`, \'\') AS TIMESTAMP) AS `col2` FROM `import_export_test_schema`.`stagingTable` AS `src`', + $sql, + ); + + // The staging data has col2 values '1', '2', '2', '' — '1' and '2' are not valid + // timestamps, so we only verify the SQL is correct rather than executing it with + // real timestamp data. The E2E test in connection covers the full data path. + } + public function testGetTruncateTableCommand(): void { $this->createTestDb(); @@ -1055,6 +1133,279 @@ public function testGetDeleteOldItemsCommand(string $usingTypes, string $expecte ], $result); } + /** + * Tests that CAST is applied in SET, PK WHERE, and comparison + * when destination columns are non-STRING (e.g. INT64) but staging is STRING. + * This is the cross-backend CSV load scenario. + */ + public function testGetUpdateWithPkCommandWithTypedDestColumns(): void + { + $this->createTestDb(); + + // Create real destination table with typed col1 (INT64) + $tableDefinition = new BigqueryTableDefinition( + self::TEST_DB, + self::TEST_TABLE, + false, + new ColumnCollection([ + $this->createNullableGenericColumn('id'), + new BigqueryColumn('col1', new Bigquery(Bigquery::TYPE_INT64, ['nullable' => true])), + $this->createNullableGenericColumn('col2'), + ]), + [], + ); + $qb = new BigqueryTableQueryBuilder(); + $this->bqClient->runQuery($this->bqClient->query( + $qb->getCreateTableCommandFromDefinition($tableDefinition), + )); + + // Create staging table with STRING data (cross-backend CSV scenario) + $this->createStagingTableWithData(false); + + // Fake destination: col1 is INT64 (PK), col2 is STRING + $fakeDestination = new BigqueryTableDefinition( + self::TEST_DB, + self::TEST_TABLE, + true, + new ColumnCollection([ + new BigqueryColumn('col1', new Bigquery(Bigquery::TYPE_INT64, ['nullable' => true])), + $this->createNullableGenericColumn('col2'), + ]), + ['col1'], + ); + // Fake stage (always STRING from CSV) + $fakeStage = new BigqueryTableDefinition( + self::TEST_DB, + self::TEST_STAGING_TABLE, + true, + new ColumnCollection([ + $this->createNullableGenericColumn('col1'), + $this->createNullableGenericColumn('col2'), + ]), + [], + ); + + // Insert initial data in destination (col1 is INT64) + $this->bqClient->runQuery($this->bqClient->query( + sprintf( + 'INSERT INTO %s.%s(`id`,`col1`,`col2`) VALUES (\'1\',2,\'1\')', + self::TEST_DB_QUOTED, + self::TEST_TABLE_QUOTED, + ), + )); + + // no convert values no timestamp + $sql = $this->getBuilder()->getUpdateWithPkCommand( + $fakeStage, + $fakeDestination, + new BigqueryImportOptions( + [], + false, + false, + BigqueryImportOptions::SKIP_NO_LINE, + BigqueryImportOptions::USING_TYPES_STRING, + ), + '2020-01-01 00:00:00', + ); + + // CAST must be applied: SET col1=CAST(...AS INT64), PK WHERE CAST(dest.col1 AS STRING), comparison CAST + // col2 is STRING so no CAST + self::assertEquals( + // phpcs:ignore + 'UPDATE `import_export_test_schema`.`import_export_test_test` AS `dest` SET `col1` = CAST(COALESCE(`src`.`col1`, \'\') AS INT64), `col2` = COALESCE(`src`.`col2`, \'\') FROM `import_export_test_schema`.`stagingTable` AS `src` WHERE CAST(`dest`.`col1` AS STRING) = COALESCE(`src`.`col1`, \'\') AND (CAST(`dest`.`col1` AS STRING) != COALESCE(`src`.`col1`, \'\') OR `dest`.`col2` != COALESCE(`src`.`col2`, \'\'))', + $sql, + ); + + $this->bqClient->runQuery($this->bqClient->query($sql)); + + $result = $this->fetchTable(self::TEST_DB_QUOTED, self::TEST_TABLE_QUOTED); + // col1=2 matched staging col1='2', col2 updated from '1' to '2' + self::assertCount(1, $result); + self::assertSame('1', $result[0]['id']); + self::assertSame('2', $result[0]['col2']); + } + + /** + * Tests CAST with convertEmptyValuesToNull on typed destination columns. + */ + public function testGetUpdateWithPkCommandConvertValuesWithTypedDestColumns(): void + { + $this->createTestDb(); + + // Create real destination table with typed col1 (INT64) + $tableDefinition = new BigqueryTableDefinition( + self::TEST_DB, + self::TEST_TABLE, + false, + new ColumnCollection([ + $this->createNullableGenericColumn('id'), + new BigqueryColumn('col1', new Bigquery(Bigquery::TYPE_INT64, ['nullable' => true])), + $this->createNullableGenericColumn('col2'), + ]), + [], + ); + $qb = new BigqueryTableQueryBuilder(); + $this->bqClient->runQuery($this->bqClient->query( + $qb->getCreateTableCommandFromDefinition($tableDefinition), + )); + + $this->createStagingTableWithData(false); + + // Fake destination: col1 is INT64 (PK), col2 is STRING + $fakeDestination = new BigqueryTableDefinition( + self::TEST_DB, + self::TEST_TABLE, + true, + new ColumnCollection([ + new BigqueryColumn('col1', new Bigquery(Bigquery::TYPE_INT64, ['nullable' => true])), + $this->createNullableGenericColumn('col2'), + ]), + ['col1'], + ); + $fakeStage = new BigqueryTableDefinition( + self::TEST_DB, + self::TEST_STAGING_TABLE, + true, + new ColumnCollection([ + $this->createNullableGenericColumn('col1'), + $this->createNullableGenericColumn('col2'), + ]), + [], + ); + + $this->bqClient->runQuery($this->bqClient->query( + sprintf( + 'INSERT INTO %s.%s(`id`,`col1`,`col2`) VALUES (\'1\',2,\'1\')', + self::TEST_DB_QUOTED, + self::TEST_TABLE_QUOTED, + ), + )); + + // convert col1 to null when empty + $sql = $this->getBuilder()->getUpdateWithPkCommand( + $fakeStage, + $fakeDestination, + new BigqueryImportOptions( + ['col1'], + false, + false, + BigqueryImportOptions::SKIP_NO_LINE, + BigqueryImportOptions::USING_TYPES_STRING, + ), + '2020-01-01 00:00:00', + ); + + // col1 SET uses IF+CAST(..AS INT64), col2 no CAST (STRING dest) + self::assertEquals( + // phpcs:ignore + 'UPDATE `import_export_test_schema`.`import_export_test_test` AS `dest` SET `col1` = CAST(IF(`src`.`col1` = \'\', NULL, `src`.`col1`) AS INT64), `col2` = COALESCE(`src`.`col2`, \'\') FROM `import_export_test_schema`.`stagingTable` AS `src` WHERE CAST(`dest`.`col1` AS STRING) = COALESCE(`src`.`col1`, \'\') AND (CAST(`dest`.`col1` AS STRING) != COALESCE(`src`.`col1`, \'\') OR `dest`.`col2` != COALESCE(`src`.`col2`, \'\'))', + $sql, + ); + + $this->bqClient->runQuery($this->bqClient->query($sql)); + } + + /** + * Tests CAST on PK WHERE in DELETE when destination PK columns are non-STRING. + */ + public function testGetDeleteOldItemsCommandWithTypedDestColumns(): void + { + $this->createTestDb(); + + // Destination with typed pk1 (INT64) and STRING pk2 + $tableDefinition = new BigqueryTableDefinition( + self::TEST_DB, + self::TEST_TABLE, + false, + new ColumnCollection([ + new BigqueryColumn( + 'id', + new Bigquery(Bigquery::TYPE_INT), + ), + new BigqueryColumn('pk1', new Bigquery(Bigquery::TYPE_INT64, ['nullable' => false, 'default' => '0'])), + BigqueryColumn::createGenericColumn('pk2'), + BigqueryColumn::createGenericColumn('col1'), + BigqueryColumn::createGenericColumn('col2'), + ]), + ['pk1', 'pk2'], + ); + $tableSql = sprintf( + '%s.%s', + BigqueryQuote::quoteSingleIdentifier($tableDefinition->getSchemaName()), + BigqueryQuote::quoteSingleIdentifier($tableDefinition->getTableName()), + ); + $qb = new BigqueryTableQueryBuilder(); + $this->bqClient->runQuery($this->bqClient->query($qb->getCreateTableCommandFromDefinition($tableDefinition))); + // pk1 is INT64 + $this->bqClient->runQuery($this->bqClient->query( + sprintf( + 'INSERT INTO %s(`id`,`pk1`,`pk2`,`col1`,`col2`) VALUES (1,1,\'1\',\'1\',\'1\')', + $tableSql, + ), + )); + + // Staging table (all STRING, as from CSV) + $stagingTableDefinition = new BigqueryTableDefinition( + self::TEST_DB, + self::TEST_STAGING_TABLE, + false, + new ColumnCollection([ + BigqueryColumn::createGenericColumn('pk1'), + BigqueryColumn::createGenericColumn('pk2'), + BigqueryColumn::createGenericColumn('col1'), + BigqueryColumn::createGenericColumn('col2'), + ]), + ['pk1', 'pk2'], + ); + $this->bqClient->runQuery($this->bqClient->query( + $qb->getCreateTableCommandFromDefinition($stagingTableDefinition), + )); + $stagingTableSql = sprintf( + '%s.%s', + BigqueryQuote::quoteSingleIdentifier($stagingTableDefinition->getSchemaName()), + BigqueryQuote::quoteSingleIdentifier($stagingTableDefinition->getTableName()), + ); + $this->bqClient->runQuery($this->bqClient->query( + sprintf( + 'INSERT INTO %s(`pk1`,`pk2`,`col1`,`col2`) VALUES (\'1\',\'1\',\'1\',\'1\')', + $stagingTableSql, + ), + )); + $this->bqClient->runQuery($this->bqClient->query( + sprintf( + 'INSERT INTO %s(`pk1`,`pk2`,`col1`,`col2`) VALUES (\'2\',\'1\',\'1\',\'1\')', + $stagingTableSql, + ), + )); + + $sql = $this->getBuilder()->getDeleteOldItemsCommand( + $stagingTableDefinition, + $tableDefinition, + $this->getSimpleImportOptions(), + ); + + // pk1 is INT64 → CAST applied, pk2 is STRING → no CAST + self::assertEquals( + // phpcs:ignore + 'DELETE `import_export_test_schema`.`stagingTable` AS `src` WHERE EXISTS (SELECT * FROM `import_export_test_schema`.`import_export_test_test` AS `dest` WHERE CAST(`dest`.`pk1` AS STRING) = COALESCE(`src`.`pk1`, \'\') AND `dest`.`pk2` = COALESCE(`src`.`pk2`, \'\') )', + $sql, + ); + + $this->bqClient->runQuery($this->bqClient->query($sql)); + + $result = $this->fetchTable($stagingTableDefinition->getSchemaName(), $stagingTableDefinition->getTableName()); + + self::assertCount(1, $result); + self::assertSame([ + [ + 'pk1' => '2', + 'pk2' => '1', + 'col1' => '1', + 'col2' => '1', + ], + ], $result); + } + private function getStagingTableDefinition(): BigqueryTableDefinition { return new BigqueryTableDefinition(