Skip to content

Commit

Permalink
Implement batching for peer forwarder request documents (#2197)
Browse files Browse the repository at this point in the history
* Implement batching for peer forwarder request documents

Signed-off-by: Chase Engelbrecht <engechas@amazon.com>

* Add configurable forwarding_batch_timeout for low-traffic scenarios

Signed-off-by: Chase Engelbrecht <engechas@amazon.com>

* Slight refactors and add unit tests for batching

Signed-off-by: Chase Engelbrecht <engechas@amazon.com>

* Increase YAML deserialization size

Signed-off-by: Chase Engelbrecht <engechas@amazon.com>

* Refactor for clarity and flush all available batches on each iteration

Signed-off-by: Chase Engelbrecht <engechas@amazon.com>

* Fix typo in FORWARDING

Signed-off-by: Chase Engelbrecht <engechas@amazon.com>

* Use getOrDefault when checking last flushed time

Signed-off-by: Chase Engelbrecht <engechas@amazon.com>

---------

Signed-off-by: Chase Engelbrecht <engechas@amazon.com>
  • Loading branch information
engechas authored Jan 31, 2023
1 parent 10fcdf6 commit 6d74ec4
Show file tree
Hide file tree
Showing 12 changed files with 336 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private void buildPipelineFromConfiguration(
final List<Processor> processors = processorComponentList.stream().map(IdentifiedComponent::getComponent).collect(Collectors.toList());
if (!processors.isEmpty() && processors.get(0) instanceof RequiresPeerForwarding) {
return PeerForwardingProcessorDecorator.decorateProcessors(
processors, peerForwarderProvider, pipelineName, processorComponentList.get(0).getName()
processors, peerForwarderProvider, pipelineName, processorComponentList.get(0).getName(), pipelineConfiguration.getWorkers()
);
}
return processors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.yaml.snakeyaml.LoaderOptions;

@Configuration
class PeerForwarderAppConfig {
Expand All @@ -33,8 +34,13 @@ public PluginMetrics pluginMetrics() {
}

@Bean(name = "peerForwarderObjectMapper")
public ObjectMapper objectMapper(final YAMLFactory yamlFactory) {
public ObjectMapper objectMapper() {
final JavaTimeModule javaTimeModule = new JavaTimeModule();
final LoaderOptions loaderOptions = new LoaderOptions();
loaderOptions.setCodePointLimit(10 * 1024 * 1024); // 10MB
final YAMLFactory yamlFactory = YAMLFactory.builder()
.loaderOptions(loaderOptions)
.build();
return new ObjectMapper(yamlFactory).registerModule(javaTimeModule);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
public class PeerForwarderConfiguration {
public static final String DEFAULT_PEER_FORWARDING_URI = "/event/forward";
public static final Duration DEFAULT_DRAIN_TIMEOUT = Duration.ofSeconds(10L);
public static final Duration DEFAULT_FORWARDING_BATCH_TIMEOUT = Duration.ofSeconds(3L);
public static final String DEFAULT_CERTIFICATE_FILE_PATH = "config/default_certificate.pem";
public static final String DEFAULT_PRIVATE_KEY_FILE_PATH = "config/default_private_key.pem";
private static final String S3_PREFIX = "s3://";
private static final int MAX_FORWARDING_BATCH_SIZE = 3000;

private Integer serverPort = 4994;
private Integer requestTimeout = 10_000;
Expand Down Expand Up @@ -58,6 +60,8 @@ public class PeerForwarderConfiguration {
private boolean sslCertAndKeyFileInS3 = false;
private Duration drainTimeout = DEFAULT_DRAIN_TIMEOUT;
private Integer failedForwardingRequestLocalWriteTimeout = 500;
private Integer forwardingBatchSize = 1500;
private Duration forwardingBatchTimeout = DEFAULT_FORWARDING_BATCH_TIMEOUT;

public PeerForwarderConfiguration() {}

Expand Down Expand Up @@ -91,7 +95,9 @@ public PeerForwarderConfiguration (
@JsonProperty("batch_delay") final Integer batchDelay,
@JsonProperty("buffer_size") final Integer bufferSize,
@JsonProperty("drain_timeout") final Duration drainTimeout,
@JsonProperty("failed_forwarding_requests_local_write_timeout") final Integer failedForwardingRequestLocalWriteTimeout
@JsonProperty("failed_forwarding_requests_local_write_timeout") final Integer failedForwardingRequestLocalWriteTimeout,
@JsonProperty("forwarding_batch_size") final Integer forwardingBatchSize,
@JsonProperty("forwarding_batch_timeout") final Duration forwardingBatchTimeout
) {
setServerPort(serverPort);
setRequestTimeout(requestTimeout);
Expand Down Expand Up @@ -122,6 +128,8 @@ public PeerForwarderConfiguration (
setBufferSize(bufferSize);
setDrainTimeout(drainTimeout);
setFailedForwardingRequestLocalWriteTimeout(failedForwardingRequestLocalWriteTimeout);
setForwardingBatchSize(forwardingBatchSize);
setForwardingBatchTimeout(forwardingBatchTimeout);
checkForCertAndKeyFileInS3();
validateSslAndAuthentication();
}
Expand Down Expand Up @@ -230,6 +238,14 @@ public Integer getFailedForwardingRequestLocalWriteTimeout() {
return failedForwardingRequestLocalWriteTimeout;
}

public Integer getForwardingBatchSize() {
return forwardingBatchSize;
}

public Duration getForwardingBatchTimeout() {
return forwardingBatchTimeout;
}

private void setServerPort(final Integer serverPort) {
if (serverPort != null) {
if (serverPort < 0 || serverPort > 65535) {
Expand Down Expand Up @@ -492,4 +508,22 @@ private void setFailedForwardingRequestLocalWriteTimeout(final Integer failedFor
this.failedForwardingRequestLocalWriteTimeout = failedForwardingRequestLocalWriteTimeout;
}
}

private void setForwardingBatchSize(final Integer forwardingBatchSize) {
if (forwardingBatchSize != null) {
if (forwardingBatchSize <= 0 || forwardingBatchSize > MAX_FORWARDING_BATCH_SIZE) {
throw new IllegalArgumentException("Forwarding batch size must be between 1 and 3000 inclusive.");
}
this.forwardingBatchSize = forwardingBatchSize;
}
}

private void setForwardingBatchTimeout(final Duration forwardingBatchTimeout) {
if (forwardingBatchTimeout != null) {
if (forwardingBatchTimeout.isNegative()) {
throw new IllegalArgumentException("Forwarding batch timeout must be non-negative.");
}
this.forwardingBatchTimeout = forwardingBatchTimeout;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public class PeerForwarderProvider {
this.pluginMetrics = pluginMetrics;
}

public PeerForwarder register(final String pipelineName, final String pluginId, final Set<String> identificationKeys) {
public PeerForwarder register(final String pipelineName, final String pluginId, final Set<String> identificationKeys,
final Integer pipelineWorkerThreads) {
if (pipelinePeerForwarderReceiveBufferMap.containsKey(pipelineName) &&
pipelinePeerForwarderReceiveBufferMap.get(pipelineName).containsKey(pluginId)) {
throw new RuntimeException("Data Prepper 2.0 will only support a single peer-forwarder per pipeline/plugin type");
Expand All @@ -57,7 +58,10 @@ public PeerForwarder register(final String pipelineName, final String pluginId,
pluginMetrics,
peerForwarderConfiguration.getBatchDelay(),
peerForwarderConfiguration.getFailedForwardingRequestLocalWriteTimeout(),
Executors.newFixedThreadPool(peerForwarderConfiguration.getClientThreadCount())
Executors.newFixedThreadPool(peerForwarderConfiguration.getClientThreadCount()),
peerForwarderConfiguration.getForwardingBatchSize(),
peerForwarderConfiguration.getForwardingBatchTimeout(),
pipelineWorkerThreads
);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public static List<Processor> decorateProcessors(
final List<Processor> processors,
final PeerForwarderProvider peerForwarderProvider,
final String pipelineName,
final String pluginId) {
final String pluginId,
final Integer pipelineWorkerThreads) {

Set<String> identificationKeys;
Processor firstInnerProcessor;
Expand Down Expand Up @@ -65,7 +66,7 @@ public static List<Processor> decorateProcessors(
"Peer Forwarder Plugin: %s cannot have empty identification keys." + pluginId);
}

final PeerForwarder peerForwarder = peerForwarderProvider.register(pipelineName, pluginId, identificationKeys);
final PeerForwarder peerForwarder = peerForwarderProvider.register(pipelineName, pluginId, identificationKeys, pipelineWorkerThreads);

return processors.stream().map(processor -> new PeerForwardingProcessorDecorator(peerForwarder, processor))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,24 @@
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;

class RemotePeerForwarder implements PeerForwarder {
private static final Logger LOG = LoggerFactory.getLogger(RemotePeerForwarder.class);
private static final int BATCH_QUEUE_DEPTH = 1;

static final String RECORDS_ACTUALLY_PROCESSED_LOCALLY = "recordsActuallyProcessedLocally";
static final String RECORDS_TO_BE_PROCESSED_LOCALLY = "recordsToBeProcessedLocally";
static final String RECORDS_TO_BE_FORWARDED = "recordsToBeForwarded";
Expand All @@ -47,6 +54,9 @@ class RemotePeerForwarder implements PeerForwarder {
private final String pipelineName;
private final String pluginId;
private final Set<String> identificationKeys;
final ConcurrentHashMap<String, LinkedBlockingQueue<Record<Event>>> peerBatchingQueueMap;
private final ConcurrentHashMap<String, Long> peerBatchingLastFlushTimeMap;

private final Counter recordsActuallyProcessedLocallyCounter;
private final Counter recordsToBeProcessedLocallyCounter;
private final Counter recordsToBeForwardedCounter;
Expand All @@ -58,6 +68,9 @@ class RemotePeerForwarder implements PeerForwarder {
private final Integer batchDelay;
private final Integer failedForwardingRequestLocalWriteTimeout;
private final ExecutorService forwardingRequestExecutor;
private final Integer forwardingBatchSize;
private final Duration forwardingBatchTimeout;
private final Integer pipelineWorkerThreads;

RemotePeerForwarder(final PeerForwarderClient peerForwarderClient,
final HashRing hashRing,
Expand All @@ -68,7 +81,10 @@ class RemotePeerForwarder implements PeerForwarder {
final PluginMetrics pluginMetrics,
final Integer batchDelay,
final Integer failedForwardingRequestLocalWriteTimeout,
final ExecutorService forwardingRequestExecutor) {
final ExecutorService forwardingRequestExecutor,
final Integer forwardingBatchSize,
final Duration forwardingBatchTimeout,
final Integer pipelineWorkerThreads) {
this.peerForwarderClient = peerForwarderClient;
this.hashRing = hashRing;
this.peerForwarderReceiveBuffer = peerForwarderReceiveBuffer;
Expand All @@ -78,6 +94,12 @@ class RemotePeerForwarder implements PeerForwarder {
this.batchDelay = batchDelay;
this.failedForwardingRequestLocalWriteTimeout = failedForwardingRequestLocalWriteTimeout;
this.forwardingRequestExecutor = forwardingRequestExecutor;
this.forwardingBatchSize = forwardingBatchSize;
this.forwardingBatchTimeout = forwardingBatchTimeout;
this.pipelineWorkerThreads = pipelineWorkerThreads;
peerBatchingQueueMap = new ConcurrentHashMap<>();
peerBatchingLastFlushTimeMap = new ConcurrentHashMap<>();

recordsActuallyProcessedLocallyCounter = pluginMetrics.counter(RECORDS_ACTUALLY_PROCESSED_LOCALLY);
recordsToBeProcessedLocallyCounter = pluginMetrics.counter(RECORDS_TO_BE_PROCESSED_LOCALLY);
recordsToBeForwardedCounter = pluginMetrics.counter(RECORDS_TO_BE_FORWARDED);
Expand All @@ -92,7 +114,6 @@ public Collection<Record<Event>> forwardRecords(final Collection<Record<Event>>
final Map<String, List<Record<Event>>> groupedRecords = groupRecordsBasedOnIdentificationKeys(records, identificationKeys);

final List<Record<Event>> recordsToProcessLocally = new ArrayList<>();

for (final Map.Entry<String, List<Record<Event>>> entry : groupedRecords.entrySet()) {
final String destinationIp = entry.getKey();

Expand All @@ -101,17 +122,18 @@ public Collection<Record<Event>> forwardRecords(final Collection<Record<Event>>
recordsToBeProcessedLocallyCounter.increment(entry.getValue().size());
} else {
recordsToBeForwardedCounter.increment(entry.getValue().size());
try {
submitForwardingRequest(entry.getValue(), destinationIp);
} catch (final Exception ex) {
LOG.warn("Unable to submit request for forwarding, processing locally.", ex);
recordsToProcessLocally.addAll(entry.getValue());
recordsFailedForwardingCounter.increment(entry.getValue().size());
requestsFailedCounter.increment();
}
final List<Record<Event>> recordsFailedToBatch = batchRecordsForForwarding(destinationIp, entry.getValue());
recordsToProcessLocally.addAll(recordsFailedToBatch);
}
}

for (final String destinationIp: peerBatchingQueueMap.keySet()) {
final List<Record<Event>> recordsFailedToForward = forwardRecordBatches(destinationIp);
recordsToProcessLocally.addAll(recordsFailedToForward);
}

recordsActuallyProcessedLocallyCounter.increment(recordsToProcessLocally.size());

return recordsToProcessLocally;
}

Expand Down Expand Up @@ -177,7 +199,84 @@ private boolean isAddressDefinedLocally(final String address) {
}
}

private List<Record<Event>> batchRecordsForForwarding(final String destinationIp, final List<Record<Event>> records) {
try {
final List<Record<Event>> recordsFailedToBatch = populateBatchingQueue(destinationIp, records);
if (!recordsFailedToBatch.isEmpty()) {
recordsFailedForwardingCounter.increment(recordsFailedToBatch.size());
}

return recordsFailedToBatch;
} catch (final Exception ex) {
LOG.warn("Unable to batch records for forwarding, processing locally.", ex);
recordsFailedForwardingCounter.increment(records.size());
return records;
}
}

private List<Record<Event>> populateBatchingQueue(final String destinationIp, final List<Record<Event>> records) {
peerBatchingQueueMap.putIfAbsent(destinationIp, new LinkedBlockingQueue<>(forwardingBatchSize * pipelineWorkerThreads * BATCH_QUEUE_DEPTH));
peerBatchingLastFlushTimeMap.putIfAbsent(destinationIp, System.currentTimeMillis());

final List<Record<Event>> recordsFailedToBatch = new ArrayList<>();
final LinkedBlockingQueue<Record<Event>> peerBatchingQueue = peerBatchingQueueMap.get(destinationIp);
for (final Record<Event> record: records) {
try {
peerBatchingQueue.add(record);
} catch (final IllegalStateException e) {
recordsFailedToBatch.add(record);
}
}

return recordsFailedToBatch;
}

private List<Record<Event>> forwardRecordBatches(final String destinationIp) {
final List<Record<Event>> recordsFailedToForward = new ArrayList<>();

List<Record<Event>> recordsToForward;
do {
recordsToForward = getRecordsToForward(destinationIp);
try {
submitForwardingRequest(recordsToForward, destinationIp);
} catch (final Exception e) {
LOG.warn("Unable to submit request for forwarding, processing locally.", e);
recordsFailedToForward.addAll(recordsToForward);
recordsFailedForwardingCounter.increment(recordsToForward.size());
requestsFailedCounter.increment();
}
} while(!recordsToForward.isEmpty());

return recordsFailedToForward;
}

private List<Record<Event>> getRecordsToForward(final String destinationIp) {
if (shouldFlushBatch(destinationIp)) {
peerBatchingLastFlushTimeMap.put(destinationIp, System.currentTimeMillis());

final List<Record<Event>> recordsToForward = new ArrayList<>();
peerBatchingQueueMap.get(destinationIp).drainTo(recordsToForward, forwardingBatchSize);

return recordsToForward;
}

return Collections.emptyList();
}

private boolean shouldFlushBatch(final String destinationIp) {
final long currentTime = System.currentTimeMillis();
final long millisSinceLastFlush = currentTime - peerBatchingLastFlushTimeMap.getOrDefault(destinationIp, System.currentTimeMillis());
final Duration durationSinceLastFlush = Duration.of(millisSinceLastFlush, ChronoUnit.MILLIS);

final boolean shouldFlushDueToTimeout = durationSinceLastFlush.compareTo(forwardingBatchTimeout) >= 0;
return shouldFlushDueToTimeout || peerBatchingQueueMap.get(destinationIp).size() >= forwardingBatchSize;
}

private void submitForwardingRequest(final Collection<Record<Event>> records, final String destinationIp) {
if (records.isEmpty()) {
return;
}

forwardingRequestExecutor.submit(() -> {
AggregatedHttpResponse aggregatedHttpResponse;
try {
Expand Down Expand Up @@ -207,5 +306,4 @@ void processFailedRequestsLocally(final AggregatedHttpResponse httpResponse, fin
requestsSuccessfulCounter.increment();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.opensearch.dataprepper.peerforwarder.PeerForwarderConfiguration.DEFAULT_FORWARDING_BATCH_TIMEOUT;
import static org.opensearch.dataprepper.peerforwarder.PeerForwarderConfiguration.DEFAULT_PRIVATE_KEY_FILE_PATH;

class PeerForwarderConfigurationTest {
Expand Down Expand Up @@ -61,6 +63,8 @@ void testPeerForwarderDefaultConfig() throws IOException {
assertThat(peerForwarderConfiguration.getAuthentication(), equalTo(ForwardingAuthentication.UNAUTHENTICATED));
assertThat(peerForwarderConfiguration.getDrainTimeout(), equalTo(DEFAULT_DRAIN_TIMEOUT));
assertThat(peerForwarderConfiguration.getFailedForwardingRequestLocalWriteTimeout(), equalTo(500));
assertThat(peerForwarderConfiguration.getForwardingBatchSize(), equalTo(1500));
assertThat(peerForwarderConfiguration.getForwardingBatchTimeout(), equalTo(DEFAULT_FORWARDING_BATCH_TIMEOUT));
}

@Test
Expand Down Expand Up @@ -90,6 +94,8 @@ void testValidPeerForwarderConfig() throws IOException {
assertThat(peerForwarderConfiguration.getAuthentication(), equalTo(ForwardingAuthentication.UNAUTHENTICATED));
assertThat(peerForwarderConfiguration.getDrainTimeout(), equalTo(DEFAULT_DRAIN_TIMEOUT));
assertThat(peerForwarderConfiguration.getFailedForwardingRequestLocalWriteTimeout(), equalTo(15));
assertThat(peerForwarderConfiguration.getForwardingBatchSize(), equalTo(2500));
assertThat(peerForwarderConfiguration.getForwardingBatchTimeout(), equalTo(Duration.of(5, ChronoUnit.SECONDS)));
}

@Test
Expand Down
Loading

0 comments on commit 6d74ec4

Please sign in to comment.