Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1360 Support for poll maximum wait time #1417

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.microsoft.sqlserver.jdbc.ISQLServerConnection;
public class JdbcSourceConnectorConfig extends AbstractConfig {

private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceConnectorConfig.class);
private static Pattern INVALID_CHARS = Pattern.compile("[^a-zA-Z0-9._-]");
private static final Pattern INVALID_CHARS = Pattern.compile("[^a-zA-Z0-9._-]");

public static final String CONNECTION_PREFIX = "connection.";

Expand Down Expand Up @@ -101,6 +102,19 @@ public class JdbcSourceConnectorConfig extends AbstractConfig {
public static final int POLL_INTERVAL_MS_DEFAULT = 5000;
private static final String POLL_INTERVAL_MS_DISPLAY = "Poll Interval (ms)";

public static final String POLL_MAX_WAIT_TIME_MS_CONFIG = "poll.max.wait.time.ms";
public static final String POLL_MAX_WAIT_TIME_MS_DOC = "The maximum time in ms to wait by "
+ "the worker task for the poll operation. This includes additional poll.interval.ms "
+ "wait time applied in between subsequent poll calls. If the set maximum time is exceeded, "
+ "the task will signal no-data to the worker. The polling operation however will not be "
+ "interrupted until the task is stopped. Each time the worker polls the records from the "
+ "source task it will either wait for the result from the previously started polling "
+ "operation or a new polling operation will be started. "
+ "When the poll.max.wait.time.ms is set to zero, then the worker will wait indefinitely "
+ "until the polling operation is finished.";
public static final int POLL_MAX_WAIT_TIME_MS_DEFAULT = 1_000;
private static final String POLL_MAX_DURATION_MS_DISPLAY = "Poll Max Wait Time (ms)";

public static final String BATCH_MAX_ROWS_CONFIG = "batch.max.rows";
private static final String BATCH_MAX_ROWS_DOC =
"Maximum number of rows to include in a single batch when polling for new data. This "
Expand Down Expand Up @@ -401,18 +415,15 @@ public Config validateMultiConfigs(Config config) {
} else {
dialect = DatabaseDialects.findBestFor(this.getString(CONNECTION_URL_CONFIG), this);
}
if (!dialect.name().equals(
DatabaseDialects.create(
SqlServerDatabaseDialectName, this
).name()
)
) {
configValues
.get(JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG)
.addErrorMessage("Isolation mode of `"
+ TransactionIsolationMode.SQL_SERVER_SNAPSHOT.name()
+ "` can only be configured with a Sql Server Dialect"
);
try (DatabaseDialect sqlServerDialect = DatabaseDialects.create(
SqlServerDatabaseDialectName, this)) {
if (!dialect.name().equals(sqlServerDialect.name())) {
configValues
.get(JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG)
.addErrorMessage("Isolation mode of `"
+ TransactionIsolationMode.SQL_SERVER_SNAPSHOT.name()
+ "` can only be configured with a Sql Server Dialect");
}
}
}

Expand Down Expand Up @@ -694,6 +705,17 @@ private static final void addConnectorOptions(ConfigDef config) {
++orderInGroup,
Width.SHORT,
POLL_INTERVAL_MS_DISPLAY
).define(
POLL_MAX_WAIT_TIME_MS_CONFIG,
Type.INT,
POLL_MAX_WAIT_TIME_MS_DEFAULT,
Range.atLeast(0),
Importance.MEDIUM,
POLL_MAX_WAIT_TIME_MS_DOC,
CONNECTOR_GROUP,
++orderInGroup,
Width.SHORT,
POLL_MAX_DURATION_MS_DISPLAY
).define(
BATCH_MAX_ROWS_CONFIG,
Type.INT,
Expand Down Expand Up @@ -792,7 +814,7 @@ public JdbcSourceConnectorConfig(Map<String, ?> props) {
}

public String topicPrefix() {
return getString(JdbcSourceTaskConfig.TOPIC_PREFIX_CONFIG).trim();
return getString(TOPIC_PREFIX_CONFIG).trim();
}

/**
Expand Down Expand Up @@ -914,7 +936,7 @@ public static NumericMapping get(JdbcSourceConnectorConfig config) {
if (newMappingConfig != null) {
return get(config.getString(JdbcSourceConnectorConfig.NUMERIC_MAPPING_CONFIG));
}
if (config.getBoolean(JdbcSourceTaskConfig.NUMERIC_PRECISION_MAPPING_CONFIG)) {
if (config.getBoolean(NUMERIC_PRECISION_MAPPING_CONFIG)) {
return NumericMapping.PRECISION_ONLY;
}
return NumericMapping.NONE;
Expand Down Expand Up @@ -993,7 +1015,7 @@ public static int get(TransactionIsolationMode mode) {
case SERIALIZABLE:
return Connection.TRANSACTION_SERIALIZABLE;
case SQL_SERVER_SNAPSHOT:
return SQLServerConnection.TRANSACTION_SNAPSHOT;
return ISQLServerConnection.TRANSACTION_SNAPSHOT;
default:
return -1;
}
Expand All @@ -1010,7 +1032,7 @@ public NumericMapping numericMapping() {
}

public TimeZone timeZone() {
String dbTimeZone = getString(JdbcSourceTaskConfig.DB_TIMEZONE_CONFIG);
String dbTimeZone = getString(DB_TIMEZONE_CONFIG);
return TimeZone.getTimeZone(ZoneId.of(dbTimeZone));
}

Expand Down
104 changes: 58 additions & 46 deletions src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,9 @@

package io.confluent.connect.jdbc.source;

import java.sql.SQLNonTransientException;
import java.util.TimeZone;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLNonTransientException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
Expand All @@ -39,19 +29,29 @@
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.confluent.connect.jdbc.dialect.DatabaseDialect;
import io.confluent.connect.jdbc.dialect.DatabaseDialects;
import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.TransactionIsolationMode;
import io.confluent.connect.jdbc.util.CachedConnectionProvider;
import io.confluent.connect.jdbc.util.ColumnDefinition;
import io.confluent.connect.jdbc.util.ColumnId;
import io.confluent.connect.jdbc.util.TableId;
import io.confluent.connect.jdbc.util.Version;
import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.TransactionIsolationMode;

/**
* JdbcSourceTask is a Kafka Connect SourceTask implementation that reads from JDBC databases and
Expand All @@ -66,6 +66,7 @@ public class JdbcSourceTask extends SourceTask {
private Time time;
private JdbcSourceTaskConfig config;
private DatabaseDialect dialect;
private JdbcSourceTaskPollExecutor pollExecutor;
//Visible for Testing
CachedConnectionProvider cachedConnectionProvider;
PriorityQueue<TableQuerier> tableQueue = new PriorityQueue<>();
Expand Down Expand Up @@ -95,10 +96,11 @@ public void start(Map<String, String> properties) {
} catch (ConfigException e) {
throw new ConfigException("Couldn't start JdbcSourceTask due to configuration error", e);
}
pollExecutor = new JdbcSourceTaskPollExecutor(time, config, this::doPoll);

List<String> tables = config.getList(JdbcSourceTaskConfig.TABLES_CONFIG);
Boolean tablesFetched = config.getBoolean(JdbcSourceTaskConfig.TABLES_FETCHED);
String query = config.getString(JdbcSourceTaskConfig.QUERY_CONFIG);
String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG);

if ((tables.isEmpty() && query.isEmpty())) {
// We are still waiting for the tables call to complete.
Expand Down Expand Up @@ -155,13 +157,13 @@ public void start(Map<String, String> properties) {
List<String> tablesOrQuery = queryMode == TableQuerier.QueryMode.QUERY
? Collections.singletonList(query) : tables;

String mode = config.getString(JdbcSourceTaskConfig.MODE_CONFIG);
String mode = config.getString(JdbcSourceConnectorConfig.MODE_CONFIG);
//used only in table mode
Map<String, List<Map<String, String>>> partitionsByTableFqn = new HashMap<>();
Map<Map<String, String>, Map<String, Object>> offsets = null;
if (mode.equals(JdbcSourceTaskConfig.MODE_INCREMENTING)
|| mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP)
|| mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP_INCREMENTING)) {
if (mode.equals(JdbcSourceConnectorConfig.MODE_INCREMENTING)
|| mode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP)
|| mode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING)) {
List<Map<String, String>> partitions = new ArrayList<>(tables.size());
switch (queryMode) {
case TABLE:
Expand All @@ -187,15 +189,15 @@ public void start(Map<String, String> properties) {
}

String incrementingColumn
= config.getString(JdbcSourceTaskConfig.INCREMENTING_COLUMN_NAME_CONFIG);
= config.getString(JdbcSourceConnectorConfig.INCREMENTING_COLUMN_NAME_CONFIG);
List<String> timestampColumns
= config.getList(JdbcSourceTaskConfig.TIMESTAMP_COLUMN_NAME_CONFIG);
= config.getList(JdbcSourceConnectorConfig.TIMESTAMP_COLUMN_NAME_CONFIG);
Long timestampDelayInterval
= config.getLong(JdbcSourceTaskConfig.TIMESTAMP_DELAY_INTERVAL_MS_CONFIG);
= config.getLong(JdbcSourceConnectorConfig.TIMESTAMP_DELAY_INTERVAL_MS_CONFIG);
boolean validateNonNulls
= config.getBoolean(JdbcSourceTaskConfig.VALIDATE_NON_NULL_CONFIG);
= config.getBoolean(JdbcSourceConnectorConfig.VALIDATE_NON_NULL_CONFIG);
TimeZone timeZone = config.timeZone();
String suffix = config.getString(JdbcSourceTaskConfig.QUERY_SUFFIX_CONFIG).trim();
String suffix = config.getString(JdbcSourceConnectorConfig.QUERY_SUFFIX_CONFIG).trim();

if (queryMode.equals(TableQuerier.QueryMode.TABLE)) {
validateColumnsExist(mode, incrementingColumn, timestampColumns, tables.get(0));
Expand Down Expand Up @@ -246,17 +248,17 @@ public void start(Map<String, String> properties) {
JdbcSourceConnectorConfig.TimestampGranularity timestampGranularity
= JdbcSourceConnectorConfig.TimestampGranularity.get(config);

if (mode.equals(JdbcSourceTaskConfig.MODE_BULK)) {
if (mode.equals(JdbcSourceConnectorConfig.MODE_BULK)) {
tableQueue.add(
new BulkTableQuerier(
dialect,
queryMode,
tableOrQuery,
topicPrefix,
dialect,
queryMode,
tableOrQuery,
topicPrefix,
suffix
)
);
} else if (mode.equals(JdbcSourceTaskConfig.MODE_INCREMENTING)) {
} else if (mode.equals(JdbcSourceConnectorConfig.MODE_INCREMENTING)) {
tableQueue.add(
new TimestampIncrementingTableQuerier(
dialect,
Expand All @@ -272,7 +274,7 @@ public void start(Map<String, String> properties) {
timestampGranularity
)
);
} else if (mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP)) {
} else if (mode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP)) {
tableQueue.add(
new TimestampTableQuerier(
dialect,
Expand All @@ -287,7 +289,7 @@ public void start(Map<String, String> properties) {
timestampGranularity
)
);
} else if (mode.endsWith(JdbcSourceTaskConfig.MODE_TIMESTAMP_INCREMENTING)) {
} else if (mode.endsWith(JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING)) {
tableQueue.add(
new TimestampIncrementingTableQuerier(
dialect,
Expand All @@ -305,12 +307,11 @@ public void start(Map<String, String> properties) {
);
}
}
maxRetriesPerQuerier = config.getInt(JdbcSourceConnectorConfig.QUERY_RETRIES_CONFIG);

running.set(true);
taskThreadId.set(Thread.currentThread().getId());
log.info("Started JDBC source task");

maxRetriesPerQuerier = config.getInt(JdbcSourceConnectorConfig.QUERY_RETRIES_CONFIG);
}

private void validateColumnsExist(
Expand All @@ -324,16 +325,16 @@ private void validateColumnsExist(
Set<String> columnNames = defnsById.keySet().stream().map(ColumnId::name)
.map(String::toLowerCase).collect(Collectors.toSet());

if ((mode.equals(JdbcSourceTaskConfig.MODE_INCREMENTING)
|| mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP_INCREMENTING))
if ((mode.equals(JdbcSourceConnectorConfig.MODE_INCREMENTING)
|| mode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING))
&& !incrementingColumn.isEmpty()
&& !columnNames.contains(incrementingColumn.toLowerCase(Locale.getDefault()))) {
throw new ConfigException("Incrementing column: " + incrementingColumn
+ " does not exist.");
}

if ((mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP)
|| mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP_INCREMENTING))
if ((mode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP)
|| mode.equals(JdbcSourceConnectorConfig.MODE_TIMESTAMP_INCREMENTING))
&& !timestampColumns.isEmpty()) {

Set<String> missingTsColumns = timestampColumns.stream()
Expand Down Expand Up @@ -443,45 +444,49 @@ protected void closeResources() {

@Override
public List<SourceRecord> poll() throws InterruptedException {
return pollExecutor.poll();
}

private List<SourceRecord> doPoll() {
log.trace("Polling for new data");

// If the call to get tables has not completed we will not do anything.
// This is only valid in table mode.
Boolean tablesFetched = config.getBoolean(JdbcSourceTaskConfig.TABLES_FETCHED);
String query = config.getString(JdbcSourceTaskConfig.QUERY_CONFIG);
String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG);
if (query.isEmpty() && !tablesFetched) {
final long sleepMs = config.getInt(JdbcSourceTaskConfig.POLL_INTERVAL_MS_CONFIG);
final long sleepMs = config.getInt(JdbcSourceConnectorConfig.POLL_INTERVAL_MS_CONFIG);
log.trace("Waiting for tables to be fetched from the database. No records will be polled. "
+ "Waiting {} ms to poll", sleepMs);
time.sleep(sleepMs);
return null;
}

Map<TableQuerier, Integer> consecutiveEmptyResults = tableQueue.stream().collect(
Collectors.toMap(Function.identity(), (q) -> 0));
Collectors.toMap(Function.identity(), q -> 0));
while (running.get()) {
final TableQuerier querier = tableQueue.peek();

if (!querier.querying()) {
// If not in the middle of an update, wait for next update time
final long nextUpdate = querier.getLastUpdate()
+ config.getInt(JdbcSourceTaskConfig.POLL_INTERVAL_MS_CONFIG);
+ config.getInt(JdbcSourceConnectorConfig.POLL_INTERVAL_MS_CONFIG);
final long now = time.milliseconds();
final long sleepMs = Math.min(nextUpdate - now, 100);

if (sleepMs > 0) {
log.trace("Waiting {} ms to poll {} next", nextUpdate - now, querier.toString());
log.trace("Waiting {} ms to poll {} next", nextUpdate - now, querier);
time.sleep(sleepMs);
continue; // Re-check stop flag before continuing
}
}

final List<SourceRecord> results = new ArrayList<>();
try {
log.debug("Checking for next block of results from {}", querier.toString());
log.debug("Checking for next block of results from {}", querier);
querier.maybeStartQuery(cachedConnectionProvider.getConnection());

int batchMaxRows = config.getInt(JdbcSourceTaskConfig.BATCH_MAX_ROWS_CONFIG);
int batchMaxRows = config.getInt(JdbcSourceConnectorConfig.BATCH_MAX_ROWS_CONFIG);
boolean hadNext = true;
while (results.size() < batchMaxRows && (hadNext = querier.next())) {
results.add(querier.extractRecord());
Expand All @@ -496,7 +501,7 @@ public List<SourceRecord> poll() throws InterruptedException {

if (results.isEmpty()) {
consecutiveEmptyResults.compute(querier, (k, v) -> v + 1);
log.trace("No updates for {}", querier.toString());
log.trace("No updates for {}", querier);

if (Collections.min(consecutiveEmptyResults.values())
>= CONSECUTIVE_EMPTY_RESULTS_BEFORE_RETURN) {
Expand Down Expand Up @@ -554,11 +559,18 @@ private void shutdown() {
if (querier != null) {
resetAndRequeueHead(querier, true);
}
closePollExecutor();
closeResources();
}

private void closePollExecutor() {
if (pollExecutor != null) {
pollExecutor.close();
}
}

private void resetAndRequeueHead(TableQuerier expectedHead, boolean resetOffset) {
log.debug("Resetting querier {}", expectedHead.toString());
log.debug("Resetting querier {}", expectedHead);
TableQuerier removedQuerier = tableQueue.poll();
assert removedQuerier == expectedHead;
expectedHead.reset(time.milliseconds(), resetOffset);
Expand Down Expand Up @@ -588,7 +600,7 @@ private void validateNonNullable(
String columnName = defn.id().name();
if (columnName.equalsIgnoreCase(incrementingColumn)) {
incrementingOptional = defn.isOptional();
} else if (lowercaseTsColumns.contains(columnName.toLowerCase(Locale.getDefault()))) {
} else if (lowercaseTsColumns.contains(columnName.toLowerCase(Locale.ROOT))) {
if (!defn.isOptional()) {
atLeastOneTimestampNotOptional = true;
}
Expand Down
Loading