Skip to content

Commit

Permalink
persistence upsert cross-database with generated columns
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubkulhan committed May 30, 2024
1 parent 891e682 commit f9a0917
Show file tree
Hide file tree
Showing 31 changed files with 319 additions and 105 deletions.
9 changes: 9 additions & 0 deletions data-access-kit/src/Exception/PersistenceException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php declare(strict_types=1);

namespace DataAccessKit\Exception;

use LogicException;

class PersistenceException extends LogicException
{
}
174 changes: 107 additions & 67 deletions data-access-kit/src/Persistence.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@
namespace DataAccessKit;

use DataAccessKit\Attribute\Column;
use DataAccessKit\Exception\PersistenceException;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Platforms\AbstractMySQLPlatform;
use Doctrine\DBAL\Platforms\MariaDBPlatform;
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
use Doctrine\DBAL\Platforms\SQLitePlatform;
use LogicException;
use ReflectionClass;
use function array_map;
use function array_merge;
use function count;
use function implode;
use function in_array;
use function sprintf;
use function var_dump;

class Persistence implements PersistenceInterface
{
Expand All @@ -34,10 +37,11 @@ public function select(string $className, string $sql, array $parameters = []):
$result = $this->connection->executeQuery($sql, $parameters);
foreach ($result->iterateAssociative() as $row) {
$object = $table->reflection->newInstanceWithoutConstructor();
foreach ($table->columns as $column) {
foreach ($row as $columnName => $value) {
$column = $table->columns[$columnName];
$column->reflection->setValue(
$object,
$this->valueConverter->databaseToObject($table, $column, $row[$column->name]),
$this->valueConverter->databaseToObject($table, $column, $value),
);
}
yield $object;
Expand Down Expand Up @@ -83,122 +87,158 @@ private function insertUpsertAll(array $objects, ?array $upsertColumns = null):
$table = $this->registry->get($objects[0], true);
$platform = $this->connection->getDatabasePlatform();

$columnNames = [];
$rows = [];
$values = [];
$updateColumnNames = [];
$generatedColumnNames = [];
/** @var Column[] $generatedColumns */
$generatedColumns = [];
$primaryKeyColumnNames = [];
/** @var Column|null $primaryKeyColumn */
$primaryKeyColumn = null;
$supportsReturning = match (true) {
$platform instanceof MariaDBPlatform => true,
$platform instanceof PostgreSQLPlatform => true,
$platform instanceof SQLitePlatform => true,
default => false,
};
$supportsDefault = match (true) {
$platform instanceof AbstractMySQLPlatform => true,
$platform instanceof PostgreSQLPlatform => true,
default => false,
};
/** @var Column[] $primaryColumns */
$primaryColumns = [];
$primaryGeneratedCount = 0;
/** @var Column[] $insertColumns */
$insertColumns = [];
/** @var Column[] $updateColumns */
$updateColumns = [];
/** @var Column[] $returningColumns */
$returningColumns = [];

foreach ($table->columns as $column) {
if ($column->primary) {
if ($upsertColumns !== []) {
$columnNames[] = $platform->quoteSingleIdentifier($column->name);
}
$primaryKeyColumnNames[] = $platform->quoteSingleIdentifier($column->name);
$primaryColumns[] = $column;

if ($primaryKeyColumn !== null) {
throw new LogicException("Multiple generated primary columns.");
if (!$column->generated || $upsertColumns !== []) {
$insertColumns[] = $column;
}
$primaryKeyColumn = $column;

if ($column->generated && $supportsReturning) {
$generatedColumnNames[] = $platform->quoteSingleIdentifier($column->name);
$generatedColumns[] = $column;
if ($column->generated) {
$primaryGeneratedCount++;
$returningColumns[] = $column;
}

} else if ($column->generated) {
if ($supportsReturning) {
$generatedColumnNames[] = $platform->quoteSingleIdentifier($column->name);
$generatedColumns[] = $column;
}
$returningColumns[] = $column;

} else {
$columnNames[] = $platform->quoteSingleIdentifier($column->name);
$insertColumns[] = $column;

if ($upsertColumns === null || in_array($column->name, $upsertColumns, true)) {
$updateColumnNames[] = $platform->quoteSingleIdentifier($column->name);
$updateColumns[] = $column;
}
}
}

foreach ($objects as $index => $object) {
$row = [];
if (count($primaryColumns) > 1 && $primaryGeneratedCount > 0) {
throw new PersistenceException(sprintf(
"Multi-column primary key with generated column is not supported. Check column definitions of class [%s].",
$table->reflection->getName(),
));
}

foreach ($table->columns as $column) {
if ($column->generated && !($column->primary && $upsertColumns !== [])) {
continue;
}
$supportsReturning = match (true) {
$platform instanceof MariaDBPlatform => true,
$platform instanceof PostgreSQLPlatform => true,
$platform instanceof SQLitePlatform => true,
default => false,
};
if (count($returningColumns) > 0 && count($objects) > 1 && !$supportsReturning) {
throw new PersistenceException(sprintf(
"Database platform [%s] does not support INSERT ... RETURNING statement, cannot insert multiple rows with generated columns. Either insert one row at a time, or generate primary key in application code and use upsert() method.",
(new ReflectionClass($platform))->getShortName(),
));
}

if ($column->reflection->isInitialized($object)) {
$value = $this->valueConverter->objectToDatabase($table, $column, $column->reflection->getValue($object));
$supportsUpsert = match (true) {
$platform instanceof AbstractMySQLPlatform => true,
$platform instanceof PostgreSQLPlatform => true,
$platform instanceof SQLitePlatform => true,
default => false,
};
if (count($updateColumns) > 0 && !$supportsUpsert) {
throw new PersistenceException(sprintf(
"Database platform [%s] does not support upsert, cannot update columns.",
(new ReflectionClass($platform))->getShortName(),
));
}

$row[] = "?";
$values[] = $value;
$rows = "";
$values = [];
foreach ($objects as $index => $object) {
$row = "";

} else if ($supportsDefault && $upsertColumns === []) {
$row[] = "DEFAULT";
foreach ($insertColumns as $column) {
if ($column->reflection->isInitialized($object)) {
if ($row !== "") {
$row .= ", ";
}
$row .= "?";
$values[] = $this->valueConverter->objectToDatabase($table, $column, $column->reflection->getValue($object));

} else {
throw new LogicException(sprintf(
throw new PersistenceException(sprintf(
"Property [%s] of object at index [%d] not initialized.",
$column->reflection->getName(),
$index,
));
}
}

$rows[] = "(" . implode(", ", $row) . ")";
if ($rows !== "") {
$rows .= ", ";
}
$rows .= "(" . $row . ")";
}

$sql = sprintf(
"INSERT INTO %s (%s) VALUES %s%s%s",
$platform->quoteSingleIdentifier($table->name),
implode(", ", $columnNames),
implode(", ", $rows),
implode(", ", array_map(fn(Column $it) => $platform->quoteSingleIdentifier($it->name), $insertColumns)),
$rows,
match (true) {
count($updateColumnNames) > 0 && $platform instanceof AbstractMySQLPlatform => sprintf(" ON DUPLICATE KEY UPDATE %s", implode(", ", array_map(fn(string $it) => $it . " = VALUES(" . $it . ")", $updateColumnNames))),
count($updateColumnNames) > 0 && ($platform instanceof PostgreSQLPlatform || $platform instanceof SQLitePlatform) => sprintf(" ON CONFLICT (%s) DO UPDATE SET %s", implode(", ", $primaryKeyColumnNames), implode(", ", array_map(fn(string $it) => $it . " = EXCLUDED." . $it, $updateColumnNames))),
count($updateColumnNames) > 0 => throw new LogicException(sprintf(
"Upsert not supported on platform [%s].",
get_class($platform),
)),
count($updateColumns) > 0 && $platform instanceof AbstractMySQLPlatform => sprintf(
" ON DUPLICATE KEY UPDATE %s",
implode(", ", array_map(fn(Column $it) => $platform->quoteSingleIdentifier($it->name) . " = VALUES(" . $platform->quoteSingleIdentifier($it->name) . ")", $updateColumns)),
),
count($updateColumns) > 0 && ($platform instanceof PostgreSQLPlatform || $platform instanceof SQLitePlatform) => sprintf(
" ON CONFLICT (%s) DO UPDATE SET %s",
implode(", ", array_map(fn(Column $it) => $platform->quoteSingleIdentifier($it->name), $primaryColumns)),
implode(", ", array_map(fn(Column $it) => $platform->quoteSingleIdentifier($it->name) . " = EXCLUDED." . $platform->quoteSingleIdentifier($it->name), $updateColumns)),
),
count($updateColumns) > 0 => throw new LogicException("Unreachable statement."),
default => "",
},
match ($supportsReturning) {
true => sprintf(" RETURNING %s", implode(", ", $generatedColumnNames)),
match (count($returningColumns) > 0 && $supportsReturning) {
true => sprintf(" RETURNING %s", implode(", ", array_map(fn(Column $it) => $platform->quoteSingleIdentifier($it->name), $returningColumns))),
default => "",
},
);
$result = $this->connection->executeQuery($sql, $values);

if ($supportsReturning && count($generatedColumns) > 0) {
if (count($returningColumns) > 0 && $supportsReturning) {
foreach ($result->iterateAssociative() as $index => $row) {
$object = $objects[$index];
foreach ($generatedColumns as $column) {
foreach ($returningColumns as $column) {
$column->reflection->setValue(
$object,
$this->valueConverter->databaseToObject($table, $column, $row[$column->name]),
);
}
}
} else if (count($objects) === 1 && $primaryKeyColumn !== null) {
$primaryKeyColumn->reflection->setValue($objects[0], $this->connection->lastInsertId());
} else if ($primaryGeneratedCount > 0 && $upsertColumns === []) {
$primaryColumns[0]->reflection->setValue($objects[0], $this->connection->lastInsertId());
}

if (count($returningColumns) > $primaryGeneratedCount && !$supportsReturning) {
$result = $this->connection->executeQuery(
sprintf(
"SELECT %s FROM %s WHERE %s",
implode(", ", array_map(fn(Column $it) => $platform->quoteSingleIdentifier($it->name), $returningColumns)),
$platform->quoteSingleIdentifier($table->name),
implode(" AND ", array_map(fn(Column $it) => $platform->quoteSingleIdentifier($it->name) . " = ?", $primaryColumns)),
),
array_map(fn(Column $it) => $this->valueConverter->objectToDatabase($table, $it, $it->reflection->getValue($objects[0])), $primaryColumns),
);
$row = $result->fetchAssociative();
foreach ($returningColumns as $column) {
$column->reflection->setValue(
$objects[0],
$this->valueConverter->databaseToObject($table, $column, $row[$column->name]),
);
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions data-access-kit/test/Fixture/User.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,8 @@ class User
public int $id;
#[Column]
public string $firstName;
#[Column]
public string $lastName;
#[Column(generated: true)]
public string $fullName;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DELETE FROM users WHERE user_id = 1;
SELECT COUNT(*) FROM users;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
INSERT INTO `users` (`first_name`, `last_name`) VALUES (?, ?) RETURNING `user_id`, `full_name`;
SELECT user_id, first_name, last_name, full_name FROM users WHERE user_id = ?;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
INSERT INTO `users` (`first_name`, `last_name`) VALUES (?, ?), (?, ?) RETURNING `user_id`, `full_name`;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT user_id, first_name, last_name, full_name FROM users LIMIT 1;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT COUNT(*) FROM users;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT user_id FROM users;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
INSERT INTO `users` (`user_id`, `first_name`, `last_name`) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE `first_name` = VALUES(`first_name`), `last_name` = VALUES(`last_name`) RETURNING `user_id`, `full_name`;
SELECT user_id, first_name, last_name, full_name FROM users WHERE user_id = ?;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DELETE FROM users WHERE user_id = 1;
SELECT COUNT(*) FROM users;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
INSERT INTO `users` (`first_name`, `last_name`) VALUES (?, ?);
SELECT `user_id`, `full_name` FROM `users` WHERE `user_id` = ?;
SELECT user_id, first_name, last_name, full_name FROM users WHERE user_id = ?;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT user_id, first_name, last_name, full_name FROM users LIMIT 1;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT COUNT(*) FROM users;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT user_id FROM users;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
INSERT INTO `users` (`user_id`, `first_name`, `last_name`) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE `first_name` = VALUES(`first_name`), `last_name` = VALUES(`last_name`);
SELECT `user_id`, `full_name` FROM `users` WHERE `user_id` = ?;
SELECT user_id, first_name, last_name, full_name FROM users WHERE user_id = ?;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DELETE FROM users WHERE user_id = 1;
SELECT COUNT(*) FROM users;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
INSERT INTO "users" ("first_name", "last_name") VALUES (?, ?) RETURNING "user_id", "full_name";
SELECT user_id, first_name, last_name, full_name FROM users WHERE user_id = ?;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
INSERT INTO "users" ("first_name", "last_name") VALUES (?, ?), (?, ?) RETURNING "user_id", "full_name";
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT user_id, first_name, last_name, full_name FROM users LIMIT 1;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT COUNT(*) FROM users;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT user_id FROM users;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
INSERT INTO "users" ("user_id", "first_name", "last_name") VALUES (?, ?, ?) ON CONFLICT ("user_id") DO UPDATE SET "first_name" = EXCLUDED."first_name", "last_name" = EXCLUDED."last_name" RETURNING "user_id", "full_name";
SELECT user_id, first_name, last_name, full_name FROM users WHERE user_id = ?;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DELETE FROM users WHERE user_id = 1;
SELECT COUNT(*) FROM users;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
INSERT INTO "users" ("first_name", "last_name") VALUES (?, ?) RETURNING "user_id", "full_name";
SELECT user_id, first_name, last_name, full_name FROM users WHERE user_id = ?;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
INSERT INTO "users" ("first_name", "last_name") VALUES (?, ?), (?, ?) RETURNING "user_id", "full_name";
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT user_id, first_name, last_name, full_name FROM users LIMIT 1;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT COUNT(*) FROM users;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT user_id FROM users;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
INSERT INTO "users" ("user_id", "first_name", "last_name") VALUES (?, ?, ?) ON CONFLICT ("user_id") DO UPDATE SET "first_name" = EXCLUDED."first_name", "last_name" = EXCLUDED."last_name" RETURNING "user_id", "full_name";
SELECT user_id, first_name, last_name, full_name FROM users WHERE user_id = ?;
Loading

0 comments on commit f9a0917

Please sign in to comment.