From bda6cad0bc7b9c11659812dadff47a610cb33abf Mon Sep 17 00:00:00 2001 From: rklein Date: Thu, 1 Jun 2023 12:41:28 +0200 Subject: [PATCH] Option for not inserting primary keys into Sql Server tables and only use them for UPDATE --- .../dialect/SqlServerDatabaseDialect.java | 46 ++++++++++++++- .../connect/jdbc/sink/JdbcSinkConfig.java | 24 +++++++- .../dialect/SqlServerDatabaseDialectTest.java | 58 ++++++++++++++++++- 3 files changed, 124 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialect.java index 8a3d0bce6..5fcd7ef54 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialect.java @@ -431,19 +431,61 @@ public String buildUpsertQueryStatement( .of(nonKeyColumns); } builder.append(" when not matched then insert ("); + + // potential primary key columns + Iterable pkColumns = Collections.emptyList(); + + if (((JdbcSinkConfig) this.config).insertPrimaryKeys) { + pkColumns = keyColumns; + } + builder.appendList() .delimitedBy(", ") .transformedBy(ExpressionBuilder.columnNames()) - .of(nonKeyColumns, keyColumns); + .of(nonKeyColumns, pkColumns); builder.append(") values ("); + builder.appendList() .delimitedBy(",") .transformedBy(ExpressionBuilder.columnNamesWithPrefix("incoming.")) - .of(nonKeyColumns, keyColumns); + .of(nonKeyColumns, pkColumns); builder.append(");"); + + return builder.toString(); } + + @Override + public String buildInsertStatement( + TableId table, + Collection keyColumns, + Collection nonKeyColumns + ) { + ExpressionBuilder builder = expressionBuilder(); + builder.append("INSERT INTO "); + builder.append(table); + builder.append("("); + + // potential primary key columns + Collection pkColumns = Collections.emptyList(); + + if (((JdbcSinkConfig) this.config).insertPrimaryKeys) { + pkColumns = keyColumns; + } + + builder.appendList() + .delimitedBy(",") + .transformedBy(ExpressionBuilder.columnNames()) + .of(pkColumns, nonKeyColumns); + builder.append(") VALUES("); + builder.appendMultiple(",", "?", pkColumns.size() + nonKeyColumns.size()); + + builder.append(")"); + return builder.toString(); + } + + /** * If Sql Server is 2016 or newer, and time stamp mode configured against a datetime column * kill task. diff --git a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java index 81f078622..d7c223f3d 100644 --- a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java @@ -270,7 +270,15 @@ public enum PrimaryKeyMode { + "Note that it is only applicable to SQL Server."; private static final String MSSQL_USE_MERGE_HOLDLOCK_DISPLAY = "SQL Server - Use HOLDLOCK in MERGE"; - + public static final String MSSQL_INSERT_PRIMARY_KEYS = "mssql.insert.primary.keys"; + private static final String MSSQL_INSERT_PRIMARY_KEYS_DEFAULT = "true"; + private static final String MSSQL_INSERT_PRIMARY_KEYS_DOC = + "This is only implemented for SQL Server: If false the primary key(s) " + + "will only be used for UPDATE but not for INSERT. This also applies to MERGE(mode=upsert)." + + "Background: SQL Server datbases reject inserting values into a auto-incremental columns " + + "of type IDENTITY."; + private static final String MSSQL_INSERT_PRIMARY_KEYS_DISPLAY = + "SQL Server - Whether to insert primary keys in INSERT or MERGE scenario"; public static final ConfigDef CONFIG_DEF = new ConfigDef() // Connection .define( @@ -446,6 +454,17 @@ public enum PrimaryKeyMode { ConfigDef.Width.MEDIUM, DB_TIMEZONE_CONFIG_DISPLAY ) + .define( + MSSQL_INSERT_PRIMARY_KEYS, + ConfigDef.Type.BOOLEAN, + MSSQL_INSERT_PRIMARY_KEYS_DEFAULT, + ConfigDef.Importance.LOW, + MSSQL_INSERT_PRIMARY_KEYS_DOC, + DATAMAPPING_GROUP, + 6, + ConfigDef.Width.MEDIUM, + MSSQL_INSERT_PRIMARY_KEYS_DISPLAY + ) // DDL .define( AUTO_CREATE, @@ -546,6 +565,8 @@ public enum PrimaryKeyMode { public final boolean trimSensitiveLogsEnabled; + public final boolean insertPrimaryKeys; + public JdbcSinkConfig(Map props) { super(CONFIG_DEF, props); connectorName = ConfigUtils.connectorName(props); @@ -564,6 +585,7 @@ public JdbcSinkConfig(Map props) { insertMode = InsertMode.valueOf(getString(INSERT_MODE).toUpperCase()); pkMode = PrimaryKeyMode.valueOf(getString(PK_MODE).toUpperCase()); pkFields = getList(PK_FIELDS); + insertPrimaryKeys = getBoolean(MSSQL_INSERT_PRIMARY_KEYS); dialectName = getString(DIALECT_NAME_CONFIG); fieldsWhitelist = new HashSet<>(getList(FIELDS_WHITELIST)); String dbTimeZone = getString(DB_TIMEZONE_CONFIG); diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialectTest.java index 2da6cb2a7..8006f9e72 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/SqlServerDatabaseDialectTest.java @@ -26,6 +26,7 @@ import java.util.TimeZone; import java.util.concurrent.ThreadLocalRandom; +import io.confluent.connect.jdbc.sink.JdbcSinkConfig; import io.confluent.connect.jdbc.util.ColumnDefinition; import io.confluent.connect.jdbc.util.ColumnId; import org.apache.kafka.common.config.AbstractConfig; @@ -42,7 +43,7 @@ import io.confluent.connect.jdbc.util.TableId; import org.mockito.Mockito; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -402,6 +403,61 @@ public void upsert2() { ); } + @Test + public void upsertSkipsPrimaryKeyInsertsIfConfiguredSo() { + JdbcSinkConfig sinkConfig = sinkConfigWithUrl("jdbc:jtds:sqlsserver://something","mssql.insert.primary.keys","false"); + assertFalse(sinkConfig.insertPrimaryKeys); + dialect = createDialect(sinkConfig); + TableId customer = tableId("Customer"); + assertEquals( + "merge into [Customer] with (HOLDLOCK) AS target using (select ? AS [id], ? AS [name], ? " + + "AS [salary], ? AS [address]) AS incoming on (target.[id]=incoming.[id]) when matched then update set " + + "[name]=incoming.[name],[salary]=incoming.[salary],[address]=incoming" + + ".[address] when not matched then insert " + + "([name], [salary], [address])"+ + " values (incoming.[name],incoming.[salary],incoming.[address]);", + dialect.buildUpsertQueryStatement( + customer, + columns(customer, "id"), + columns(customer, "name", "salary", "address") + ) + ); + + } + @Test + public void insertSkipsPrimaryKeyIfConfiguredSo() { + JdbcSinkConfig sinkConfig = sinkConfigWithUrl("jdbc:jtds:sqlsserver://something","mssql.insert.primary.keys","false"); + assertFalse(sinkConfig.insertPrimaryKeys); + dialect = createDialect(sinkConfig); + TableId customer = tableId("Customer"); + assertEquals( + "INSERT INTO [Customer]([name],[salary],[address]) VALUES(?,?,?)", + dialect.buildInsertStatement( + customer, + columns(customer, "id"), + columns(customer, "name", "salary", "address") + ) + ); + + } + + @Test + public void insertAddsPrimaryKeysIfConfiguredSo() { + JdbcSinkConfig sinkConfig = sinkConfigWithUrl("jdbc:jtds:sqlsserver://something"); + assertTrue(sinkConfig.insertPrimaryKeys); + dialect = createDialect(sinkConfig); + TableId customer = tableId("Customer"); + assertEquals( + "INSERT INTO [Customer]([id],[name],[salary],[address]) VALUES(?,?,?,?)", + dialect.buildInsertStatement( + customer, + columns(customer, "id"), + columns(customer, "name", "salary", "address") + ) + ); + + } + @Test(expected=ConnectException.class) public void shouldFailDatetimeColumnAsTimeStampColumn() throws SQLException, ConnectException { String timeStampColumnName = "start_time";