Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ public static function covertException(JobException|ServiceException $e): Throwa
if (preg_match('/.*Required field .+ cannot be null.*/m', $e->getMessage(), $output_array) === 1) {
return new BigqueryInputDataException($e->getMessage());
}
if (preg_match('/Bad \w+ value/m', $e->getMessage()) === 1) {
return new BigqueryInputDataException($e->getMessage());
}
return new self($e->getMessage());
}
return $e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,24 @@ private function getColumnSetSqlPartForStringTable(
if (in_array($columnDefinition->getColumnName(), $importOptions->getConvertEmptyValuesToNull(), true)) {
// use nullif only for string base type
if ($columnDefinition->getColumnDefinition()->getBasetype() === BaseType::STRING) {
$columnsSetSql[] = sprintf(
$nullifExpr = sprintf(
'NULLIF(%s.%s, \'\')',
BigqueryQuote::quoteSingleIdentifier(self::SRC_ALIAS),
BigqueryQuote::quoteSingleIdentifier($columnDefinition->getColumnName()),
);
// Cast to destination type when staging columns are STRING
// but destination is typed (e.g. TIMESTAMP from cross-backend CSV loads)
$destType = $destinationColumn->getColumnDefinition()->getType();
if (strtoupper($destType) !== 'STRING') {
$columnsSetSql[] = sprintf(
'CAST(%s AS %s) AS %s',
$nullifExpr,
$destType,
BigqueryQuote::quoteSingleIdentifier($columnDefinition->getColumnName()),
);
} else {
$columnsSetSql[] = $nullifExpr;
}
} else {
$columnsSetSql[] = BigqueryQuote::quoteSingleIdentifier($columnDefinition->getColumnName());
}
Expand Down Expand Up @@ -251,6 +264,7 @@ public function getDeleteOldItemsCommand(
$this->getPrimaryKeyWhereConditions(
$destinationTableDefinition->getPrimaryKeysNames(),
$importOptions,
$destinationTableDefinition,
),
);
}
Expand Down Expand Up @@ -303,19 +317,45 @@ public function getUpdateWithPkCommand(
);
continue;
}

// Resolve destination column type for CAST (needed when staging is STRING
// but destination is typed, e.g. cross-backend CSV loads)
$destType = null;
/** @var BigqueryColumn $col */
foreach ($destinationTableDefinition->getColumnsDefinitions() as $col) {
if ($col->getColumnName() === $columnName) {
$destType = $col->getColumnDefinition()->getType();
break;
}
}

// if string table convert nulls<=>''
if (in_array($columnName, $importOptions->getConvertEmptyValuesToNull(), true)) {
$columnsSet[] = sprintf(
'%s = IF(`src`.%s = \'\', NULL, `src`.%s)',
BigqueryQuote::quoteSingleIdentifier($columnName),
$expr = sprintf(
'IF(`src`.%s = \'\', NULL, `src`.%s)',
BigqueryQuote::quoteSingleIdentifier($columnName),
BigqueryQuote::quoteSingleIdentifier($columnName),
);
} else {
$expr = sprintf(
'COALESCE(`src`.%s, \'\')',
BigqueryQuote::quoteSingleIdentifier($columnName),
);
}

// Cast to destination type when staging columns are STRING
if ($destType !== null && strtoupper($destType) !== 'STRING') {
$columnsSet[] = sprintf(
'%s = COALESCE(`src`.%s, \'\')',
'%s = CAST(%s AS %s)',
BigqueryQuote::quoteSingleIdentifier($columnName),
$expr,
$destType,
);
} else {
$columnsSet[] = sprintf(
'%s = %s',
BigqueryQuote::quoteSingleIdentifier($columnName),
$expr,
);
}
}
Expand Down Expand Up @@ -348,11 +388,31 @@ static function ($columnName) {
// do not compare timestamp column if it is taken from source
continue;
}
$columnsComparisonSql[$key] = sprintf(
'`dest`.%s != COALESCE(`src`.%s, \'\')',
BigqueryQuote::quoteSingleIdentifier($columnName),
BigqueryQuote::quoteSingleIdentifier($columnName),
);
// Resolve destination column type for comparison
$destColType = null;
/** @var BigqueryColumn $col */
foreach ($destinationTableDefinition->getColumnsDefinitions() as $col) {
if ($col->getColumnName() === $columnName) {
$destColType = $col->getColumnDefinition()->getType();
break;
}
}

if ($destColType !== null && strtoupper($destColType) !== 'STRING') {
// Cast dest to STRING so comparison works when staging columns are STRING
// but destination columns are typed (e.g. INT64 from cross-backend CSV loads)
$columnsComparisonSql[$key] = sprintf(
'CAST(`dest`.%s AS STRING) != COALESCE(`src`.%s, \'\')',
BigqueryQuote::quoteSingleIdentifier($columnName),
BigqueryQuote::quoteSingleIdentifier($columnName),
);
} else {
$columnsComparisonSql[$key] = sprintf(
'`dest`.%s != COALESCE(`src`.%s, \'\')',
BigqueryQuote::quoteSingleIdentifier($columnName),
BigqueryQuote::quoteSingleIdentifier($columnName),
);
}
}
}

Expand All @@ -368,7 +428,11 @@ static function ($columnName) {
implode(', ', $columnsSet),
BigqueryQuote::quoteSingleIdentifier($stagingTableDefinition->getSchemaName()),
BigqueryQuote::quoteSingleIdentifier($stagingTableDefinition->getTableName()),
$this->getPrimaryKeyWhereConditions($destinationTableDefinition->getPrimaryKeysNames(), $importOptions),
$this->getPrimaryKeyWhereConditions(
$destinationTableDefinition->getPrimaryKeysNames(),
$importOptions,
$destinationTableDefinition,
),
implode(' OR ', $columnsComparisonSql),
);
}
Expand All @@ -379,14 +443,38 @@ static function ($columnName) {
private function getPrimaryKeyWhereConditions(
array $primaryKeys,
BigqueryImportOptions $importOptions,
BigqueryTableDefinition $destinationTableDefinition,
): string {
$pkWhereSql = array_map(function (string $col) use ($importOptions) {
$str = '`dest`.%s = COALESCE(`src`.%s, \'\')';
$pkWhereSql = array_map(function (string $col) use ($importOptions, $destinationTableDefinition) {
if ($importOptions->usingUserDefinedTypes()) {
$str = '`dest`.%s = `src`.%s';
return sprintf(
'`dest`.%s = `src`.%s',
BigqueryQuote::quoteSingleIdentifier($col),
BigqueryQuote::quoteSingleIdentifier($col),
);
}

// Check if PK column needs CAST
$destColType = null;
/** @var BigqueryColumn $colDef */
foreach ($destinationTableDefinition->getColumnsDefinitions() as $colDef) {
if ($colDef->getColumnName() === $col) {
$destColType = $colDef->getColumnDefinition()->getType();
break;
}
}

if ($destColType !== null && strtoupper($destColType) !== 'STRING') {
// Cast dest to STRING so comparison works when staging columns are STRING
// but destination columns are typed (e.g. INT64 from cross-backend CSV loads)
return sprintf(
'CAST(`dest`.%s AS STRING) = COALESCE(`src`.%s, \'\')',
BigqueryQuote::quoteSingleIdentifier($col),
BigqueryQuote::quoteSingleIdentifier($col),
);
}
return sprintf(
$str,
'`dest`.%s = COALESCE(`src`.%s, \'\')',
BigqueryQuote::quoteSingleIdentifier($col),
BigqueryQuote::quoteSingleIdentifier($col),
);
Expand Down
Loading