diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java index 2e0301ce4d090..c3066e9306bf1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal; import java.time.Duration; +import java.util.Map; public interface Suppressed extends NamedOperation> { @@ -118,6 +119,25 @@ static StrictBufferConfig unbounded() { * duplicate results downstream, but does not promise to eliminate them. */ EagerBufferConfig emitEarlyWhenFull(); + + /** + * Disable the changelog for store built by this {@link StoreBuilder}. + * This will turn off fault-tolerance for your store. + * By default the changelog is enabled. + * @return this + */ + BC withLoggingDisabled(); + + /** + * Indicates that a changelog topic should be created containing the currently suppressed + * records. Due to the short-lived nature of records in this topic it is likely more + * compactable than changelog topics for KTables. + * + * @param config Configs that should be applied to the changelog. Note: Any unrecognized + * configs will be ignored. + * @return this + */ + BC withLoggingEnabled(final Map config); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 052a5f8445c4b..3a12dd9f64e31 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -72,6 +72,7 @@ import java.time.Duration; import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.function.Function; @@ -528,10 +529,27 @@ public KTable suppress(final Suppressed suppressed) { this ); + final StoreBuilder> storeBuilder; + + if (suppressedInternal.bufferConfig().isLoggingEnabled()) { + final Map topicConfig = suppressedInternal.bufferConfig().getLogConfig(); + storeBuilder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>( + storeName, + keySerde, + valSerde) + .withLoggingEnabled(topicConfig); + } else { + storeBuilder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>( + storeName, + keySerde, + valSerde) + .withLoggingDisabled(); + } + final ProcessorGraphNode> node = new StatefulProcessorNode<>( name, new ProcessorParameters<>(suppressionSupplier, name), - new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, valSerde) + storeBuilder ); builder.addGraphNode(streamsGraphNode, node); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java index 2087945ab882e..d35a85b0ec4e5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java @@ -18,9 +18,11 @@ import org.apache.kafka.streams.kstream.Suppressed; +import java.util.Map; + import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN; -abstract class BufferConfigInternal> implements Suppressed.BufferConfig { +public abstract class BufferConfigInternal> implements Suppressed.BufferConfig { public abstract long maxRecords(); public abstract long maxBytes(); @@ -46,4 +48,9 @@ public Suppressed.StrictBufferConfig shutDownWhenFull() { public Suppressed.EagerBufferConfig emitEarlyWhenFull() { return new EagerBufferConfigImpl(maxRecords(), maxBytes()); } + + public abstract boolean isLoggingEnabled(); + + public abstract Map getLogConfig(); + } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java index 1c1b30c3edc2a..237d469bb734e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java @@ -18,26 +18,38 @@ import org.apache.kafka.streams.kstream.Suppressed; +import java.util.Collections; +import java.util.Map; import java.util.Objects; public class EagerBufferConfigImpl extends BufferConfigInternal implements Suppressed.EagerBufferConfig { private final long maxRecords; private final long maxBytes; + private final Map logConfig; public EagerBufferConfigImpl(final long maxRecords, final long maxBytes) { this.maxRecords = maxRecords; this.maxBytes = maxBytes; + this.logConfig = Collections.emptyMap(); + } + + private EagerBufferConfigImpl(final long maxRecords, + final long maxBytes, + final Map logConfig) { + this.maxRecords = maxRecords; + this.maxBytes = maxBytes; + this.logConfig = logConfig; } @Override public Suppressed.EagerBufferConfig withMaxRecords(final long recordLimit) { - return new EagerBufferConfigImpl(recordLimit, maxBytes); + return new EagerBufferConfigImpl(recordLimit, maxBytes, logConfig); } @Override public Suppressed.EagerBufferConfig withMaxBytes(final long byteLimit) { - return new EagerBufferConfigImpl(maxRecords, byteLimit); + return new EagerBufferConfigImpl(maxRecords, byteLimit, logConfig); } @Override @@ -55,6 +67,26 @@ public BufferFullStrategy bufferFullStrategy() { return BufferFullStrategy.EMIT; } + @Override + public Suppressed.EagerBufferConfig withLoggingDisabled() { + return new EagerBufferConfigImpl(maxRecords, maxBytes, null); + } + + @Override + public Suppressed.EagerBufferConfig withLoggingEnabled(final Map config) { + return new EagerBufferConfigImpl(maxRecords, maxBytes, config); + } + + @Override + public boolean isLoggingEnabled() { + return logConfig != null; + } + + @Override + public Map getLogConfig() { + return isLoggingEnabled() ? logConfig : Collections.emptyMap(); + } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java index 30427b7a154e8..1d65e52894927 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java @@ -18,6 +18,8 @@ import org.apache.kafka.streams.kstream.Suppressed; +import java.util.Collections; +import java.util.Map; import java.util.Objects; import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN; @@ -27,6 +29,17 @@ public class StrictBufferConfigImpl extends BufferConfigInternal logConfig; + + public StrictBufferConfigImpl(final long maxRecords, + final long maxBytes, + final BufferFullStrategy bufferFullStrategy, + final Map logConfig) { + this.maxRecords = maxRecords; + this.maxBytes = maxBytes; + this.bufferFullStrategy = bufferFullStrategy; + this.logConfig = logConfig; + } public StrictBufferConfigImpl(final long maxRecords, final long maxBytes, @@ -34,22 +47,34 @@ public StrictBufferConfigImpl(final long maxRecords, this.maxRecords = maxRecords; this.maxBytes = maxBytes; this.bufferFullStrategy = bufferFullStrategy; + this.logConfig = Collections.emptyMap(); } public StrictBufferConfigImpl() { this.maxRecords = Long.MAX_VALUE; this.maxBytes = Long.MAX_VALUE; this.bufferFullStrategy = SHUT_DOWN; + this.logConfig = Collections.emptyMap(); } @Override public Suppressed.StrictBufferConfig withMaxRecords(final long recordLimit) { - return new StrictBufferConfigImpl(recordLimit, maxBytes, bufferFullStrategy); + return new StrictBufferConfigImpl(recordLimit, maxBytes, bufferFullStrategy, logConfig); } @Override public Suppressed.StrictBufferConfig withMaxBytes(final long byteLimit) { - return new StrictBufferConfigImpl(maxRecords, byteLimit, bufferFullStrategy); + return new StrictBufferConfigImpl(maxRecords, byteLimit, bufferFullStrategy, logConfig); + } + + @Override + public Suppressed.StrictBufferConfig withLoggingDisabled() { + return new StrictBufferConfigImpl(maxRecords, maxBytes, bufferFullStrategy, null); + } + + @Override + public Suppressed.StrictBufferConfig withLoggingEnabled(final Map config) { + return new StrictBufferConfigImpl(maxRecords, maxBytes, bufferFullStrategy, config); } @Override @@ -67,6 +92,16 @@ public BufferFullStrategy bufferFullStrategy() { return bufferFullStrategy; } + @Override + public boolean isLoggingEnabled() { + return logConfig != null; + } + + @Override + public Map getLogConfig() { + return isLoggingEnabled() ? logConfig : Collections.emptyMap(); + } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java index d24988dcbf204..5206821a6336c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java @@ -67,7 +67,8 @@ public String name() { return name; } - BufferConfigInternal bufferConfig() { + @SuppressWarnings("unchecked") + public > BufferConfigInternal bufferConfig() { return bufferConfig; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java index 4043ceace5afe..d58adbc54ea14 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java @@ -94,6 +94,7 @@ public static class Builder implements StoreBuilder keySerde; private final Serde valSerde; private boolean loggingEnabled = true; + private Map logConfig = new HashMap<>(); public Builder(final String storeName, final Serde keySerde, final Serde valSerde) { this.storeName = storeName; @@ -127,7 +128,8 @@ public StoreBuilder> withCachingDisabled @Override public StoreBuilder> withLoggingEnabled(final Map config) { - throw new UnsupportedOperationException(); + logConfig = config; + return this; } @Override @@ -143,7 +145,7 @@ public InMemoryTimeOrderedKeyValueBuffer build() { @Override public Map logConfig() { - return Collections.emptyMap(); + return loggingEnabled() ? Collections.unmodifiableMap(logConfig) : Collections.emptyMap(); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java index 18f689f3cd483..a003ec9db3276 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java @@ -16,8 +16,18 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.streams.state.StoreBuilder; import org.junit.Test; +import java.util.HashMap; +import java.util.Map; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + + public class InMemoryTimeOrderedKeyValueBufferTest { @Test @@ -29,4 +39,25 @@ public void bufferShouldAllowCacheEnablement() { public void bufferShouldAllowCacheDisablement() { new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null).withCachingDisabled(); } + + @Test + public void bufferShouldAllowLoggingEnablement() { + final String expect = "3"; + final Map logConfig = new HashMap<>(); + logConfig.put("min.insync.replicas", expect); + final StoreBuilder builder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null) + .withLoggingEnabled(logConfig); + + assertThat(builder.logConfig(), is(singletonMap("min.insync.replicas", expect))); + assertThat(builder.loggingEnabled(), is(true)); + } + + @Test + public void bufferShouldAllowLoggingDisablement() { + final StoreBuilder builder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null) + .withLoggingDisabled(); + + assertThat(builder.logConfig(), is(emptyMap())); + assertThat(builder.loggingEnabled(), is(false)); + } }