Skip to content

Commit

Permalink
# This is a combination of 7 commits.
Browse files Browse the repository at this point in the history
# This is the 1st commit message:

additional test coverage

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>

# This is the commit message #2:

cleaned up JiraOauthConfig file

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>

# This is the commit message #3:

addressing review comments and simplifying the exception handling

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>

# This is the commit message #4:

Add external origination time for events created from S3 Object (#5104)

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Co-authored-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
# This is the commit message #5:

moved the wait block out of the catch block

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>

# This is the commit message #6:

Renewal logic adjusted

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>

# This is the commit message #7:

partial

Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com>

fix merge issues

Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com>

update

Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com>

Add getColumnDataTypes method to SchemaManager to get datatype for table columns (#5135)

Add getColumnDataTypes method to SchemaManager

Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>

Add model for table column metadata for Global state (#5136)

Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>

Rename the KDS source plugin name to "kinesis-data-streams" (#5138)

Signed-off-by: Souvik Bose <souvbose@amazon.com>
Co-authored-by: Souvik Bose <souvbose@amazon.com>

Addressed review comments (#5108)

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>

fixes related to source config properties change

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>

removed future handling for loop based operations

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>

additional test cases

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>

addressing review comments

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>

Jira Service Test coverage

Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com>

jirasourceconfigTest comments

Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com>

introduced RestClient and moved rest template interactions to there. Similar chage on the test cases too

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>

backingoff for any kind of exception.

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>

restructured constants file

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>

JiraSourceTests

Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com>

JiraItemInfo coverage

Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com>

jira service branch coverage

Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com>

branch coverage jira service

Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com>

move add Items to queue logic into JiraItemInfo

Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com>

fixing regex and adding date time formatter

Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com>

Revert "Jira source"

re add changes and fix issues

Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com>

unneeded comment

Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com>

using issue bean methods to simplify the logic

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
  • Loading branch information
san81 committed Nov 1, 2024
1 parent 3a615e6 commit 8e169f0
Show file tree
Hide file tree
Showing 36 changed files with 1,277 additions and 519 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class KafkaCustomConsumer implements Runnable, ConsumerRebalanceListener
private final LogRateLimiter errLogRateLimiter;
private final ByteDecoder byteDecoder;
private final long maxRetriesOnException;
private final Map<Integer, Long> partitionToLastReceivedTimestampMillis;

public KafkaCustomConsumer(final KafkaConsumer consumer,
final AtomicBoolean shutdownInProgress,
Expand All @@ -122,6 +123,7 @@ public KafkaCustomConsumer(final KafkaConsumer consumer,
this.pauseConsumePredicate = pauseConsumePredicate;
this.topicMetrics.register(consumer);
this.offsetsToCommit = new HashMap<>();
this.partitionToLastReceivedTimestampMillis = new HashMap<>();
this.ownedPartitionsEpoch = new HashMap<>();
this.metricsUpdatedTime = Instant.now().getEpochSecond();
this.acknowledgedOffsets = new ArrayList<>();
Expand All @@ -142,6 +144,22 @@ KafkaTopicConsumerMetrics getTopicMetrics() {
return topicMetrics;
}

<T> long getRecordTimeStamp(final ConsumerRecord<String, T> consumerRecord, final long nowMs) {
final long timestamp = consumerRecord.timestamp();
int partition = consumerRecord.partition();
if (timestamp > nowMs) {
topicMetrics.getNumberOfInvalidTimeStamps().increment();
if (partitionToLastReceivedTimestampMillis.containsKey(partition)) {
return partitionToLastReceivedTimestampMillis.get(partition);
} else {
return nowMs;
}
} else {
partitionToLastReceivedTimestampMillis.put(partition, timestamp);
return timestamp;
}
}

private long getCurrentTimeNanos() {
Instant now = Instant.now();
return now.getEpochSecond()*1000000000+now.getNano();
Expand Down Expand Up @@ -436,12 +454,13 @@ private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord, in
}
eventMetadata.setAttribute("kafka_headers", headerData);
}
eventMetadata.setAttribute("kafka_timestamp", consumerRecord.timestamp());
final long receivedTimeStamp = getRecordTimeStamp(consumerRecord, now.toEpochMilli());
eventMetadata.setAttribute("kafka_timestamp", receivedTimeStamp);
eventMetadata.setAttribute("kafka_timestamp_type", consumerRecord.timestampType().toString());
eventMetadata.setAttribute("kafka_topic", topicName);
eventMetadata.setAttribute("kafka_partition", String.valueOf(partition));
eventMetadata.setExternalOriginationTime(Instant.ofEpochMilli(consumerRecord.timestamp()));
event.getEventHandle().setExternalOriginationTime(Instant.ofEpochMilli(consumerRecord.timestamp()));
eventMetadata.setExternalOriginationTime(Instant.ofEpochMilli(receivedTimeStamp));
event.getEventHandle().setExternalOriginationTime(Instant.ofEpochMilli(receivedTimeStamp));

return new Record<Event>(event);
}
Expand Down Expand Up @@ -511,7 +530,9 @@ private <T> void iterateRecordPartitions(ConsumerRecords<String, T> records, fin
if (schema == MessageFormat.BYTES) {
InputStream inputStream = new ByteArrayInputStream((byte[])consumerRecord.value());
if(byteDecoder != null) {
byteDecoder.parse(inputStream, Instant.ofEpochMilli(consumerRecord.timestamp()), (record) -> {
final long receivedTimeStamp = getRecordTimeStamp(consumerRecord, Instant.now().toEpochMilli());

byteDecoder.parse(inputStream, Instant.ofEpochMilli(receivedTimeStamp), (record) -> {
processRecord(acknowledgementSet, record);
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class KafkaTopicConsumerMetrics {
static final String NUMBER_OF_RECORDS_FAILED_TO_PARSE = "numberOfRecordsFailedToParse";
static final String NUMBER_OF_DESERIALIZATION_ERRORS = "numberOfDeserializationErrors";
static final String NUMBER_OF_BUFFER_SIZE_OVERFLOWS = "numberOfBufferSizeOverflows";
static final String NUMBER_OF_INVALID_TIMESTAMPS = "numberOfInvalidTimeStamps";
static final String NUMBER_OF_POLL_AUTH_ERRORS = "numberOfPollAuthErrors";
static final String NUMBER_OF_RECORDS_COMMITTED = "numberOfRecordsCommitted";
static final String NUMBER_OF_RECORDS_CONSUMED = "numberOfRecordsConsumed";
Expand All @@ -38,6 +39,7 @@ public class KafkaTopicConsumerMetrics {
private final Counter numberOfDeserializationErrors;
private final Counter numberOfBufferSizeOverflows;
private final Counter numberOfPollAuthErrors;
private final Counter numberOfInvalidTimeStamps;
private final Counter numberOfRecordsCommitted;
private final Counter numberOfRecordsConsumed;
private final Counter numberOfBytesConsumed;
Expand All @@ -53,6 +55,7 @@ public KafkaTopicConsumerMetrics(final String topicName, final PluginMetrics plu
this.numberOfBytesConsumed = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_BYTES_CONSUMED, topicNameInMetrics));
this.numberOfRecordsCommitted = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_RECORDS_COMMITTED, topicNameInMetrics));
this.numberOfRecordsFailedToParse = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_RECORDS_FAILED_TO_PARSE, topicNameInMetrics));
this.numberOfInvalidTimeStamps = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_INVALID_TIMESTAMPS, topicNameInMetrics));
this.numberOfDeserializationErrors = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_DESERIALIZATION_ERRORS, topicNameInMetrics));
this.numberOfBufferSizeOverflows = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_BUFFER_SIZE_OVERFLOWS, topicNameInMetrics));
this.numberOfPollAuthErrors = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_POLL_AUTH_ERRORS, topicNameInMetrics));
Expand Down Expand Up @@ -151,6 +154,10 @@ public Counter getNumberOfNegativeAcknowledgements() {
return numberOfNegativeAcknowledgements;
}

public Counter getNumberOfInvalidTimeStamps() {
return numberOfInvalidTimeStamps;
}

public Counter getNumberOfPositiveAcknowledgements() {
return numberOfPositiveAcknowledgements;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -51,6 +52,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasEntry;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -138,6 +140,7 @@ public void setUp() {
when(topicMetrics.getNumberOfBufferSizeOverflows()).thenReturn(overflowCounter);
when(topicMetrics.getNumberOfRecordsCommitted()).thenReturn(counter);
when(topicMetrics.getNumberOfDeserializationErrors()).thenReturn(counter);
when(topicMetrics.getNumberOfInvalidTimeStamps()).thenReturn(counter);
when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1));
when(topicConfig.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT);
when(topicConfig.getAutoCommit()).thenReturn(false);
Expand Down Expand Up @@ -196,6 +199,27 @@ private BlockingBuffer<Record<Event>> getBuffer() {
return new BlockingBuffer<>(pluginSetting);
}

@Test
public void testGetRecordTimeStamp() {
ConsumerRecord<String, Object> consumerRecord1 = mock(ConsumerRecord.class);
ConsumerRecord<String, Object> consumerRecord2 = mock(ConsumerRecord.class);
ConsumerRecord<String, Object> consumerRecord3 = mock(ConsumerRecord.class);
consumer = createObjectUnderTestWithMockBuffer("plaintext");
long nowMs = Instant.now().toEpochMilli();
long timestamp1 = nowMs - 5;
when(consumerRecord1.timestamp()).thenReturn(timestamp1);
when(consumerRecord1.partition()).thenReturn(1);
assertThat(consumer.getRecordTimeStamp(consumerRecord1, nowMs), equalTo(timestamp1));
long timestamp2 = nowMs + 5;
when(consumerRecord2.timestamp()).thenReturn(timestamp2);
when(consumerRecord2.partition()).thenReturn(1);
assertThat(consumer.getRecordTimeStamp(consumerRecord2, nowMs), equalTo(timestamp1));
long timestamp3 = nowMs + 10;
when(consumerRecord3.timestamp()).thenReturn(timestamp3);
when(consumerRecord3.partition()).thenReturn(2);
assertThat(consumer.getRecordTimeStamp(consumerRecord3, nowMs), equalTo(nowMs));
}

@Test
public void testBufferOverflowPauseResume() throws InterruptedException, Exception {
when(topicConfig.getMaxPollInterval()).thenReturn(Duration.ofMillis(4000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DataPrepperPlugin(name = "kinesis", pluginType = Source.class, pluginConfigurationType = KinesisSourceConfig.class)
@DataPrepperPlugin(name = "kinesis", alternateNames = "kinesis-data-streams", pluginType = Source.class, pluginConfigurationType = KinesisSourceConfig.class)
public class KinesisSource implements Source<Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class);
private final KinesisSourceConfig kinesisSourceConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,12 @@ public Map<String, Object> toMap() {
PORT_KEY, port
);
}

public static DbMetadata fromMap(Map<String, Object> map) {
return new DbMetadata(
(String) map.get(DB_IDENTIFIER_KEY),
(String) map.get(HOST_NAME_KEY),
((Integer) map.get(PORT_KEY))
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.rds.model;

import java.util.Map;

public class DbTableMetadata {

private static final String DB_METADATA_KEY = "dbMetadata";
private static final String TABLE_COLUMN_METADATA_KEY = "tableColumnDataTypeMap";
private final DbMetadata dbMetadata;
private final Map<String, Map<String, String>> tableColumnDataTypeMap;

public DbTableMetadata(final DbMetadata dbMetadata, final Map<String, Map<String, String>> tableColumnDataTypeMap) {
this.dbMetadata = dbMetadata;
this.tableColumnDataTypeMap = tableColumnDataTypeMap;
}

public DbMetadata getDbMetadata() {
return dbMetadata;
}

public Map<String, Map<String, String>> getTableColumnDataTypeMap() {
return tableColumnDataTypeMap;
}

public Map<String, Object> toMap() {
return Map.of(
DB_METADATA_KEY, dbMetadata.toMap(),
TABLE_COLUMN_METADATA_KEY, tableColumnDataTypeMap
);
}

public static DbTableMetadata fromMap(Map<String, Object> map) {
return new DbTableMetadata(
DbMetadata.fromMap((Map<String, Object>)map.get(DB_METADATA_KEY)),
(Map<String, Map<String, String>>) map.get(TABLE_COLUMN_METADATA_KEY)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class SchemaManager {
Expand All @@ -24,6 +27,7 @@ public class SchemaManager {
static final String BINLOG_POSITION = "Position";
static final int NUM_OF_RETRIES = 3;
static final int BACKOFF_IN_MILLIS = 500;
static final String TYPE_NAME = "TYPE_NAME";
private final ConnectionManager connectionManager;

public SchemaManager(ConnectionManager connectionManager) {
Expand All @@ -35,11 +39,12 @@ public List<String> getPrimaryKeys(final String database, final String table) {
while (retry <= NUM_OF_RETRIES) {
final List<String> primaryKeys = new ArrayList<>();
try (final Connection connection = connectionManager.getConnection()) {
final ResultSet rs = connection.getMetaData().getPrimaryKeys(database, null, table);
while (rs.next()) {
primaryKeys.add(rs.getString(COLUMN_NAME));
try (final ResultSet rs = connection.getMetaData().getPrimaryKeys(database, null, table)) {
while (rs.next()) {
primaryKeys.add(rs.getString(COLUMN_NAME));
}
return primaryKeys;
}
return primaryKeys;
} catch (Exception e) {
LOG.error("Failed to get primary keys for table {}, retrying", table, e);
}
Expand All @@ -50,6 +55,33 @@ public List<String> getPrimaryKeys(final String database, final String table) {
return List.of();
}

public Map<String, String> getColumnDataTypes(final String database, final String tableName) {
final Map<String, String> columnsToDataType = new HashMap<>();
for (int retry = 0; retry <= NUM_OF_RETRIES; retry++) {
try (Connection connection = connectionManager.getConnection()) {
final DatabaseMetaData metaData = connection.getMetaData();

// Retrieve column metadata
try (ResultSet columns = metaData.getColumns(database, null, tableName, null)) {
while (columns.next()) {
columnsToDataType.put(
columns.getString(COLUMN_NAME),
columns.getString(TYPE_NAME)
);
}
}
} catch (final Exception e) {
LOG.error("Failed to get dataTypes for database {} table {}, retrying", database, tableName, e);
if (retry == NUM_OF_RETRIES) {
throw new RuntimeException(String.format("Failed to get dataTypes for database %s table %s after " +
"%d retries", database, tableName, retry), e);
}
}
applyBackoff();
}
return columnsToDataType;
}

public Optional<BinlogCoordinate> getCurrentBinaryLogPosition() {
int retry = 0;
while (retry <= NUM_OF_RETRIES) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.opensearch.dataprepper.plugins.source.rds.model;

import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.is;

public class DbMetadataTest {

@Test
public void test_fromMap_success() {
final String dbIdentifier = UUID.randomUUID().toString();
final String hostName = UUID.randomUUID().toString();
final int port = new Random().nextInt();
final Map<String, Object> map = new HashMap<>();
map.put("dbIdentifier", dbIdentifier);
map.put("hostName", hostName);
map.put("port", port);

final DbMetadata result = DbMetadata.fromMap(map);

assertThat(result.getDbIdentifier(), is(dbIdentifier));
assertThat(result.getHostName(), is(hostName));
assertThat(result.getPort(), is(port));
}

@Test
public void test_toMap_success() {
final String dbIdentifier = UUID.randomUUID().toString();
final String hostName = UUID.randomUUID().toString();
final int port = new Random().nextInt();
final DbMetadata dbMetadata = new DbMetadata(dbIdentifier, hostName, port);

final Map<String, Object> result = dbMetadata.toMap();

assertThat(result, is(notNullValue()));
assertThat(result.size(), is(3));
assertThat(result.get("dbIdentifier"), is(dbIdentifier));
assertThat(result.get("hostName"), is(hostName));
assertThat(result.get("port"), is(port));
}
}
Loading

0 comments on commit 8e169f0

Please sign in to comment.