Skip to content

Commit

Permalink
Fix Opensearch Sink cleanup when initialize fails
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
  • Loading branch information
Krishna Kondaka committed Jul 22, 2024
1 parent 4ec547b commit 062986f
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public OpenSearchClientRefresher(final PluginMetrics pluginMetrics,
final Function<ConnectionConfiguration, OpenSearchClient> clientFunction) {
this.clientFunction = clientFunction;
this.currentConfig = connectionConfiguration;
this.currentClient = clientFunction.apply(connectionConfiguration);
this.currentClient = null;
credentialsChangeCounter = pluginMetrics.counter(CREDENTIALS_CHANGED);
clientRefreshErrorsCounter = pluginMetrics.counter(CLIENT_REFRESH_ERRORS);
}
Expand All @@ -44,6 +44,9 @@ public Class<OpenSearchClient> getComponentClass() {
public OpenSearchClient get() {
readWriteLock.readLock().lock();
try {
if (currentClient == null) {
currentClient = clientFunction.apply(currentConfig);
}
return currentClient;
} finally {
readWriteLock.readLock().unlock();
Expand All @@ -57,7 +60,7 @@ public void update(PluginSetting pluginSetting) {
credentialsChangeCounter.increment();
readWriteLock.writeLock().lock();
try {
currentClient = clientFunction.apply(newConfig);
currentClient = null;
currentConfig = newConfig;
} catch (Exception e) {
clientRefreshErrorsCounter.increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ public OpenSearchSink(final PluginSetting pluginSetting,
this.bulkRequestMap = new ConcurrentHashMap<>();
this.lastFlushTimeMap = new ConcurrentHashMap<>();
this.pluginConfigObservable = pluginConfigObservable;
this.objectMapper = new ObjectMapper();

final Optional<PluginModel> dlqConfig = openSearchSinkConfig.getRetryConfiguration().getDlq();
if (dlqConfig.isPresent()) {
Expand All @@ -201,7 +202,7 @@ public void doInitialize() {
doInitializeInternal();
} catch (IOException e) {
LOG.warn("Failed to initialize OpenSearch sink, retrying: {} ", e.getMessage());
closeFiles();
this.shutdown();
} catch (InvalidPluginConfigurationException e) {
LOG.error("Failed to initialize OpenSearch sink due to a configuration error.", e);
this.shutdown();
Expand All @@ -212,7 +213,7 @@ public void doInitialize() {
throw e;
} catch (Exception e) {
LOG.warn("Failed to initialize OpenSearch sink with a retryable exception. ", e);
closeFiles();
this.shutdown();
}
}

Expand Down Expand Up @@ -279,7 +280,6 @@ private void doInitializeInternal() throws IOException {
bulkRequestSupplier,
pluginSetting);

objectMapper = new ObjectMapper();
this.initialized = true;
LOG.info("Initialized OpenSearch sink");
}
Expand Down Expand Up @@ -591,20 +591,23 @@ private void closeFiles() {
if (restHighLevelClient != null) {
try {
restHighLevelClient.close();
restHighLevelClient = null;
} catch (final IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
if (dlqWriter != null) {
try {
dlqWriter.close();
dlqWriter = null;
} catch (final IOException e) {
LOG.error(e.getMessage(), e);
}
}
if (dlqFileWriter != null) {
try {
dlqFileWriter.close();
dlqFileWriter = null;
} catch (final IOException e) {
LOG.error(e.getMessage(), e);
}
Expand All @@ -615,6 +618,15 @@ private void closeFiles() {
public void shutdown() {
super.shutdown();
closeFiles();
openSearchClient.shutdown();
// Explicitly set to null to make the GC faster.
// This is useful in cases of sink initialize retry
openSearchClient = null;
openSearchClientRefresher = null;
bulkRetryStrategy = null;
bulkApiWrapper = null;
bulkRequestSupplier = null;
initialized = false;
}

private void maybeUpdateServerlessNetworkPolicy() {
Expand Down

0 comments on commit 062986f

Please sign in to comment.