Skip to content

Commit

Permalink
Caching implementation of EventKeyFactory (#4843)
Browse files Browse the repository at this point in the history
Create a caching implementation of EventKeyFactory that will cache a configurable number of EventKeys.

Refactored the approach to loading the EventKeyFactory in the application context for a couple of reasons: 1. Allow for skipping the CachingEventKeyFactory if not needed; 2. Help it run better in the data-prepper-test-event project.

Signed-off-by: David Venable <dlv@amazon.com>
  • Loading branch information
dlvenable authored Oct 7, 2024
1 parent 962750b commit 55c01b8
Show file tree
Hide file tree
Showing 15 changed files with 395 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.annotation.Nulls;
import org.opensearch.dataprepper.core.event.EventConfiguration;
import org.opensearch.dataprepper.core.event.EventConfigurationContainer;
import org.opensearch.dataprepper.model.configuration.PipelineExtensions;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.parser.config.MetricTagFilter;
Expand All @@ -29,7 +31,7 @@
/**
* Class to hold configuration for DataPrepper, including server port and Log4j settings
*/
public class DataPrepperConfiguration implements ExtensionsConfiguration {
public class DataPrepperConfiguration implements ExtensionsConfiguration, EventConfigurationContainer {
static final Duration DEFAULT_SHUTDOWN_DURATION = Duration.ofSeconds(30L);

private static final String DEFAULT_SOURCE_COORDINATION_STORE = "in_memory";
Expand All @@ -47,6 +49,7 @@ public class DataPrepperConfiguration implements ExtensionsConfiguration {
private CircuitBreakerConfig circuitBreakerConfig;
private SourceCoordinationConfig sourceCoordinationConfig;
private PipelineShutdownOption pipelineShutdown;
private EventConfiguration eventConfiguration;
private Map<String, String> metricTags = new HashMap<>();
private List<MetricTagFilter> metricTagFilters = new LinkedList<>();
private PeerForwarderConfiguration peerForwarderConfiguration;
Expand Down Expand Up @@ -92,6 +95,7 @@ public DataPrepperConfiguration(
@JsonProperty("circuit_breakers") final CircuitBreakerConfig circuitBreakerConfig,
@JsonProperty("source_coordination") final SourceCoordinationConfig sourceCoordinationConfig,
@JsonProperty("pipeline_shutdown") final PipelineShutdownOption pipelineShutdown,
@JsonProperty("event") final EventConfiguration eventConfiguration,
@JsonProperty("extensions")
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonSetter(nulls = Nulls.SKIP)
Expand All @@ -102,6 +106,7 @@ public DataPrepperConfiguration(
? new SourceCoordinationConfig(new PluginModel(DEFAULT_SOURCE_COORDINATION_STORE, Collections.emptyMap()), null)
: sourceCoordinationConfig;
this.pipelineShutdown = pipelineShutdown != null ? pipelineShutdown : DEFAULT_PIPELINE_SHUTDOWN;
this.eventConfiguration = eventConfiguration != null ? eventConfiguration : EventConfiguration.defaultConfiguration();
setSsl(ssl);
this.keyStoreFilePath = keyStoreFilePath != null ? keyStoreFilePath : "";
this.keyStorePassword = keyStorePassword != null ? keyStorePassword : "";
Expand Down Expand Up @@ -226,6 +231,10 @@ public PipelineShutdownOption getPipelineShutdown() {
return pipelineShutdown;
}

public EventConfiguration getEventConfiguration() {
return eventConfiguration;
}

@Override
public PipelineExtensions getPipelineExtensions() {
return pipelineExtensions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ void setUp() {

@AfterEach
void tearDown() {
verify(dataPrepperConfiguration).getEventConfiguration();
verifyNoMoreInteractions(dataPrepperConfiguration);
}

Expand Down
2 changes: 2 additions & 0 deletions data-prepper-event/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,7 @@ dependencies {
implementation(libs.spring.context) {
exclude group: 'commons-logging', module: 'commons-logging'
}
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation libs.caffeine
testImplementation libs.commons.lang3
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.core.event;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Objects;

class CachingEventKeyFactory implements EventKeyFactory {
private static final Logger log = LoggerFactory.getLogger(CachingEventKeyFactory.class);
private final EventKeyFactory delegateEventKeyFactory;
private final Cache<CacheKey, EventKey> cache;

private static class CacheKey {
private final String key;
private final EventAction[] eventActions;

private CacheKey(final String key, final EventAction[] eventActions) {
this.key = key;
this.eventActions = eventActions;
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final CacheKey cacheKey = (CacheKey) o;
return Objects.equals(key, cacheKey.key) && Arrays.equals(eventActions, cacheKey.eventActions);
}

@Override
public int hashCode() {
int result = Objects.hash(key);
result = 31 * result + Arrays.hashCode(eventActions);
return result;
}
}

CachingEventKeyFactory(final EventKeyFactory delegateEventKeyFactory, final EventConfiguration eventConfiguration) {
Objects.requireNonNull(delegateEventKeyFactory);
Objects.requireNonNull(eventConfiguration);

log.debug("Configured to cache a maximum of {} event keys.", eventConfiguration.getMaximumCachedKeys());

this.delegateEventKeyFactory = delegateEventKeyFactory;
cache = Caffeine.newBuilder()
.maximumSize(eventConfiguration.getMaximumCachedKeys())
.build();
}

@Override
public EventKey createEventKey(final String key, final EventAction... forActions) {
return getOrCreateEventKey(new CacheKey(key, forActions));
}

private EventKey getOrCreateEventKey(final CacheKey cacheKey) {
return cache.asMap().computeIfAbsent(cacheKey, key -> delegateEventKeyFactory.createEventKey(key.key, key.eventActions));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.event.InternalOnlyEventKeyBridge;

import javax.inject.Named;

@Named
public class DefaultEventKeyFactory implements EventKeyFactory {
class DefaultEventKeyFactory implements EventKeyFactory {
@Override
public EventKey createEventKey(final String key, final EventAction... forActions) {
return InternalOnlyEventKeyBridge.createEventKey(key, forActions);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.core.event;

import com.fasterxml.jackson.annotation.JsonProperty;

/**
* Data Prepper configurations for events.
*/
public class EventConfiguration {
@JsonProperty("maximum_cached_keys")
private Integer maximumCachedKeys = 512;

public static EventConfiguration defaultConfiguration() {
return new EventConfiguration();
}

/**
* Gets the maximum number of cached {@link org.opensearch.dataprepper.model.event.EventKey} objects.
*
* @return the cache maximum count
*/
Integer getMaximumCachedKeys() {
return maximumCachedKeys;
}

void setMaximumCachedKeys(final Integer maximumCachedKeys) {
this.maximumCachedKeys = maximumCachedKeys;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.core.event;

public interface EventConfigurationContainer {
EventConfiguration getEventConfiguration();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.core.event;

import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import javax.inject.Named;

@Configuration
class EventFactoryApplicationConfiguration {
@Bean
EventConfiguration eventConfiguration(@Autowired(required = false) final EventConfigurationContainer eventConfigurationContainer) {
if(eventConfigurationContainer == null || eventConfigurationContainer.getEventConfiguration() == null)
return EventConfiguration.defaultConfiguration();
return eventConfigurationContainer.getEventConfiguration();
}

@Bean(name = "innerEventKeyFactory")
EventKeyFactory innerEventKeyFactory() {
return new DefaultEventKeyFactory();
}

@Primary
@Bean(name = "eventKeyFactory")
EventKeyFactory eventKeyFactory(
@Named("innerEventKeyFactory") final EventKeyFactory eventKeyFactory,
final EventConfiguration eventConfiguration) {
if(eventConfiguration.getMaximumCachedKeys() <= 0) {
return eventKeyFactory;
}
return new CachingEventKeyFactory(eventKeyFactory, eventConfiguration);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.core.event;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.event.EventKeyFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class CachingEventKeyFactoryTest {
private static final int CACHE_SIZE = 2;
@Mock
private EventKeyFactory innerEventKeyFactory;

@Mock
private EventConfiguration eventConfiguration;

@BeforeEach
void setUp() {
when(eventConfiguration.getMaximumCachedKeys()).thenReturn(CACHE_SIZE);
}

private EventKeyFactory createObjectUnderTest() {
return new CachingEventKeyFactory(innerEventKeyFactory, eventConfiguration);
}

@ParameterizedTest
@EnumSource(EventKeyFactory.EventAction.class)
void createEventKey_with_EventAction_returns_inner_createEventKey(final EventKeyFactory.EventAction eventAction) {
final String key = UUID.randomUUID().toString();
final EventKey eventKey = mock(EventKey.class);

when(innerEventKeyFactory.createEventKey(key, eventAction)).thenReturn(eventKey);

final EventKey actualEventKey = createObjectUnderTest().createEventKey(key, eventAction);
assertThat(actualEventKey, sameInstance(eventKey));
}

@Test
void createEventKey_returns_inner_createEventKey() {
final String key = UUID.randomUUID().toString();
final EventKey eventKey = mock(EventKey.class);

when(innerEventKeyFactory.createEventKey(key, EventKeyFactory.EventAction.ALL)).thenReturn(eventKey);
final EventKey actualEventKey = createObjectUnderTest().createEventKey(key);
assertThat(actualEventKey, sameInstance(eventKey));
}

@ParameterizedTest
@EnumSource(EventKeyFactory.EventAction.class)
void createEventKey_with_EventAction_returns_same_instance_without_calling_inner_createEventKey_for_same_key(final EventKeyFactory.EventAction eventAction) {
final String key = UUID.randomUUID().toString();
final EventKey eventKey = mock(EventKey.class);

when(innerEventKeyFactory.createEventKey(key, eventAction)).thenReturn(eventKey);

final EventKeyFactory objectUnderTest = createObjectUnderTest();
final EventKey actualKey = objectUnderTest.createEventKey(key, eventAction);
final EventKey actualKey2 = objectUnderTest.createEventKey(key, eventAction);

assertThat(actualKey, sameInstance(eventKey));
assertThat(actualKey2, sameInstance(eventKey));

verify(innerEventKeyFactory).createEventKey(key, eventAction);
}

@Test
void createEventKey_returns_same_instance_without_calling_inner_createEventKey_for_same_key() {
final String key = UUID.randomUUID().toString();
final EventKey eventKey = mock(EventKey.class);

when(innerEventKeyFactory.createEventKey(key, EventKeyFactory.EventAction.ALL)).thenReturn(eventKey);

final EventKeyFactory objectUnderTest = createObjectUnderTest();
final EventKey actualKey = objectUnderTest.createEventKey(key);
final EventKey actualKey2 = objectUnderTest.createEventKey(key);

assertThat(actualKey, sameInstance(eventKey));
assertThat(actualKey2, sameInstance(eventKey));

verify(innerEventKeyFactory).createEventKey(key, EventKeyFactory.EventAction.ALL);
}

@Test
void createEventKey_with_EventAction_returns_different_values_for_different_keys() {
final String key1 = UUID.randomUUID().toString();
final String key2 = UUID.randomUUID().toString();
final EventKey eventKey1 = mock(EventKey.class);
final EventKey eventKey2 = mock(EventKey.class);

when(innerEventKeyFactory.createEventKey(key1, EventKeyFactory.EventAction.ALL)).thenReturn(eventKey1);
when(innerEventKeyFactory.createEventKey(key2, EventKeyFactory.EventAction.ALL)).thenReturn(eventKey2);

final EventKeyFactory objectUnderTest = createObjectUnderTest();
final EventKey actualEventKey1 = objectUnderTest.createEventKey(key1, EventKeyFactory.EventAction.ALL);
assertThat(actualEventKey1, sameInstance(eventKey1));
final EventKey actualEventKey2 = objectUnderTest.createEventKey(key2, EventKeyFactory.EventAction.ALL);
assertThat(actualEventKey2, sameInstance(eventKey2));
}

@Test
void createEventKey_with_EventAction_returns_different_values_for_different_actions() {
final String key = UUID.randomUUID().toString();
final EventKey eventKeyGet = mock(EventKey.class);
final EventKey eventKeyPut = mock(EventKey.class);

when(innerEventKeyFactory.createEventKey(key, EventKeyFactory.EventAction.GET)).thenReturn(eventKeyGet);
when(innerEventKeyFactory.createEventKey(key, EventKeyFactory.EventAction.PUT)).thenReturn(eventKeyPut);

final EventKeyFactory objectUnderTest = createObjectUnderTest();
final EventKey actualEventKeyGet = objectUnderTest.createEventKey(key, EventKeyFactory.EventAction.GET);
assertThat(actualEventKeyGet, sameInstance(eventKeyGet));
final EventKey actualEventKeyPut = objectUnderTest.createEventKey(key, EventKeyFactory.EventAction.PUT);
assertThat(actualEventKeyPut, sameInstance(eventKeyPut));
}

@Test
void createEventKey_expires_after_reaching_maximum() {

final List<String> keys = new ArrayList<>(CACHE_SIZE);
for (int i = 0; i < CACHE_SIZE * 2; i++) {
final String key = UUID.randomUUID().toString();
final EventKey eventKey = mock(EventKey.class);
when(innerEventKeyFactory.createEventKey(key, EventKeyFactory.EventAction.ALL)).thenReturn(eventKey);
keys.add(key);
}

final EventKeyFactory objectUnderTest = createObjectUnderTest();

final int numberOfIterations = 20;
for (int i = 0; i < numberOfIterations; i++) {
for (final String key : keys) {
objectUnderTest.createEventKey(key);
}
}

verify(innerEventKeyFactory, atLeast(CACHE_SIZE + 1))
.createEventKey(anyString(), eq(EventKeyFactory.EventAction.ALL));
}
}
Loading

0 comments on commit 55c01b8

Please sign in to comment.