From 08d14a6b742b9f52d642d74bc716c7e96e8e0033 Mon Sep 17 00:00:00 2001 From: "high.lee" Date: Sat, 1 Feb 2020 14:50:31 +0900 Subject: [PATCH 1/2] KAFKA-8147: Add changelog topic configuration to KTable suppress --- .../kafka/streams/kstream/Suppressed.java | 20 +++++++++++ .../streams/kstream/internals/KTableImpl.java | 20 ++++++++++- .../suppress/BufferConfigInternal.java | 30 ++++++++++++++--- .../suppress/EagerBufferConfigImpl.java | 23 +++++++++++-- .../suppress/StrictBufferConfigImpl.java | 25 ++++++++++++-- .../suppress/SuppressedInternal.java | 3 +- .../InMemoryTimeOrderedKeyValueBuffer.java | 6 ++-- ...InMemoryTimeOrderedKeyValueBufferTest.java | 33 +++++++++++++++++-- 8 files changed, 146 insertions(+), 14 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java index 2e0301ce4d090..c3066e9306bf1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal; import java.time.Duration; +import java.util.Map; public interface Suppressed extends NamedOperation> { @@ -118,6 +119,25 @@ static StrictBufferConfig unbounded() { * duplicate results downstream, but does not promise to eliminate them. */ EagerBufferConfig emitEarlyWhenFull(); + + /** + * Disable the changelog for store built by this {@link StoreBuilder}. + * This will turn off fault-tolerance for your store. + * By default the changelog is enabled. + * @return this + */ + BC withLoggingDisabled(); + + /** + * Indicates that a changelog topic should be created containing the currently suppressed + * records. Due to the short-lived nature of records in this topic it is likely more + * compactable than changelog topics for KTables. + * + * @param config Configs that should be applied to the changelog. Note: Any unrecognized + * configs will be ignored. + * @return this + */ + BC withLoggingEnabled(final Map config); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 052a5f8445c4b..3a12dd9f64e31 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -72,6 +72,7 @@ import java.time.Duration; import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.function.Function; @@ -528,10 +529,27 @@ public KTable suppress(final Suppressed suppressed) { this ); + final StoreBuilder> storeBuilder; + + if (suppressedInternal.bufferConfig().isLoggingEnabled()) { + final Map topicConfig = suppressedInternal.bufferConfig().getLogConfig(); + storeBuilder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>( + storeName, + keySerde, + valSerde) + .withLoggingEnabled(topicConfig); + } else { + storeBuilder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>( + storeName, + keySerde, + valSerde) + .withLoggingDisabled(); + } + final ProcessorGraphNode> node = new StatefulProcessorNode<>( name, new ProcessorParameters<>(suppressionSupplier, name), - new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, valSerde) + storeBuilder ); builder.addGraphNode(streamsGraphNode, node); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java index 2087945ab882e..a905385305c4c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java @@ -18,13 +18,26 @@ import org.apache.kafka.streams.kstream.Suppressed; +import java.util.Collections; +import java.util.Map; + import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN; -abstract class BufferConfigInternal> implements Suppressed.BufferConfig { +public abstract class BufferConfigInternal> implements Suppressed.BufferConfig { public abstract long maxRecords(); public abstract long maxBytes(); + protected final Map logConfig; + + BufferConfigInternal() { + this.logConfig = Collections.emptyMap(); + } + + BufferConfigInternal(final Map logConfig) { + this.logConfig = logConfig; + } + @SuppressWarnings("unused") public abstract BufferFullStrategy bufferFullStrategy(); @@ -33,17 +46,26 @@ public Suppressed.StrictBufferConfig withNoBound() { return new StrictBufferConfigImpl( Long.MAX_VALUE, Long.MAX_VALUE, - SHUT_DOWN // doesn't matter, given the bounds + SHUT_DOWN, // doesn't matter, given the bounds + logConfig ); } @Override public Suppressed.StrictBufferConfig shutDownWhenFull() { - return new StrictBufferConfigImpl(maxRecords(), maxBytes(), SHUT_DOWN); + return new StrictBufferConfigImpl(maxRecords(), maxBytes(), SHUT_DOWN, logConfig); } @Override public Suppressed.EagerBufferConfig emitEarlyWhenFull() { - return new EagerBufferConfigImpl(maxRecords(), maxBytes()); + return new EagerBufferConfigImpl(maxRecords(), maxBytes(), logConfig); + } + + public boolean isLoggingEnabled() { + return logConfig != null; + } + + public Map getLogConfig() { + return isLoggingEnabled() ? logConfig : Collections.emptyMap(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java index 1c1b30c3edc2a..faed44e3cd28e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java @@ -18,6 +18,7 @@ import org.apache.kafka.streams.kstream.Suppressed; +import java.util.Map; import java.util.Objects; public class EagerBufferConfigImpl extends BufferConfigInternal implements Suppressed.EagerBufferConfig { @@ -30,14 +31,22 @@ public EagerBufferConfigImpl(final long maxRecords, final long maxBytes) { this.maxBytes = maxBytes; } + public EagerBufferConfigImpl(final long maxRecords, + final long maxBytes, + final Map logConfig) { + super(logConfig); + this.maxRecords = maxRecords; + this.maxBytes = maxBytes; + } + @Override public Suppressed.EagerBufferConfig withMaxRecords(final long recordLimit) { - return new EagerBufferConfigImpl(recordLimit, maxBytes); + return new EagerBufferConfigImpl(recordLimit, maxBytes, logConfig); } @Override public Suppressed.EagerBufferConfig withMaxBytes(final long byteLimit) { - return new EagerBufferConfigImpl(maxRecords, byteLimit); + return new EagerBufferConfigImpl(maxRecords, byteLimit, logConfig); } @Override @@ -55,6 +64,16 @@ public BufferFullStrategy bufferFullStrategy() { return BufferFullStrategy.EMIT; } + @Override + public Suppressed.EagerBufferConfig withLoggingDisabled() { + return new EagerBufferConfigImpl(maxRecords, maxBytes, null); + } + + @Override + public Suppressed.EagerBufferConfig withLoggingEnabled(final Map config) { + return new EagerBufferConfigImpl(maxRecords, maxBytes, config); + } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java index 30427b7a154e8..d8778406348bc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java @@ -18,6 +18,7 @@ import org.apache.kafka.streams.kstream.Suppressed; +import java.util.Map; import java.util.Objects; import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN; @@ -28,6 +29,16 @@ public class StrictBufferConfigImpl extends BufferConfigInternal logConfig) { + super(logConfig); + this.maxRecords = maxRecords; + this.maxBytes = maxBytes; + this.bufferFullStrategy = bufferFullStrategy; + } + public StrictBufferConfigImpl(final long maxRecords, final long maxBytes, final BufferFullStrategy bufferFullStrategy) { @@ -44,12 +55,22 @@ public StrictBufferConfigImpl() { @Override public Suppressed.StrictBufferConfig withMaxRecords(final long recordLimit) { - return new StrictBufferConfigImpl(recordLimit, maxBytes, bufferFullStrategy); + return new StrictBufferConfigImpl(recordLimit, maxBytes, bufferFullStrategy, logConfig); } @Override public Suppressed.StrictBufferConfig withMaxBytes(final long byteLimit) { - return new StrictBufferConfigImpl(maxRecords, byteLimit, bufferFullStrategy); + return new StrictBufferConfigImpl(maxRecords, byteLimit, bufferFullStrategy, logConfig); + } + + @Override + public Suppressed.StrictBufferConfig withLoggingDisabled() { + return new StrictBufferConfigImpl(maxRecords, maxBytes, bufferFullStrategy, null); + } + + @Override + public Suppressed.StrictBufferConfig withLoggingEnabled(final Map config) { + return new StrictBufferConfigImpl(maxRecords, maxBytes, bufferFullStrategy, config); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java index d24988dcbf204..5206821a6336c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java @@ -67,7 +67,8 @@ public String name() { return name; } - BufferConfigInternal bufferConfig() { + @SuppressWarnings("unchecked") + public > BufferConfigInternal bufferConfig() { return bufferConfig; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java index 4043ceace5afe..d58adbc54ea14 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java @@ -94,6 +94,7 @@ public static class Builder implements StoreBuilder keySerde; private final Serde valSerde; private boolean loggingEnabled = true; + private Map logConfig = new HashMap<>(); public Builder(final String storeName, final Serde keySerde, final Serde valSerde) { this.storeName = storeName; @@ -127,7 +128,8 @@ public StoreBuilder> withCachingDisabled @Override public StoreBuilder> withLoggingEnabled(final Map config) { - throw new UnsupportedOperationException(); + logConfig = config; + return this; } @Override @@ -143,7 +145,7 @@ public InMemoryTimeOrderedKeyValueBuffer build() { @Override public Map logConfig() { - return Collections.emptyMap(); + return loggingEnabled() ? Collections.unmodifiableMap(logConfig) : Collections.emptyMap(); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java index 18f689f3cd483..3738dec3a6f4a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java @@ -16,17 +16,46 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.streams.state.StoreBuilder; import org.junit.Test; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class InMemoryTimeOrderedKeyValueBufferTest { @Test public void bufferShouldAllowCacheEnablement() { - new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null).withCachingEnabled(); + new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null) + .withCachingEnabled(); } @Test public void bufferShouldAllowCacheDisablement() { - new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null).withCachingDisabled(); + new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null) + .withCachingDisabled(); + } + + @Test + public void bufferShouldAllowLoggingEnablement() { + final String expect = "3"; + final Map logConfig = new HashMap<>(); + logConfig.put("min.insync.replicas", expect); + final StoreBuilder builder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null) + .withLoggingEnabled(logConfig); + + assertTrue(builder.logConfig().containsKey("min.insync.replicas")); + assertTrue(builder.logConfig().containsValue(expect)); + } + + @Test + public void bufferShouldAllowLoggingDisablement() { + final StoreBuilder builder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null) + .withLoggingDisabled(); + + assertEquals(builder.logConfig(), new HashMap<>()); } } From 00ecaad1c10e86f3ef08b7ade94f79f04ddf3361 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Mon, 17 Feb 2020 11:57:39 -0600 Subject: [PATCH 2/2] SUGGESTION: KAFKA-8147: Add changelog topic configuration to KTable suppress Suggestion for https://github.com/apache/kafka/pull/8029 --- .../suppress/BufferConfigInternal.java | 27 +++++-------------- .../suppress/EagerBufferConfigImpl.java | 17 ++++++++++-- .../suppress/StrictBufferConfigImpl.java | 16 ++++++++++- ...InMemoryTimeOrderedKeyValueBufferTest.java | 20 +++++++------- 4 files changed, 47 insertions(+), 33 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java index a905385305c4c..d35a85b0ec4e5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java @@ -18,7 +18,6 @@ import org.apache.kafka.streams.kstream.Suppressed; -import java.util.Collections; import java.util.Map; import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN; @@ -28,16 +27,6 @@ public abstract class BufferConfigInternal logConfig; - - BufferConfigInternal() { - this.logConfig = Collections.emptyMap(); - } - - BufferConfigInternal(final Map logConfig) { - this.logConfig = logConfig; - } - @SuppressWarnings("unused") public abstract BufferFullStrategy bufferFullStrategy(); @@ -46,26 +35,22 @@ public Suppressed.StrictBufferConfig withNoBound() { return new StrictBufferConfigImpl( Long.MAX_VALUE, Long.MAX_VALUE, - SHUT_DOWN, // doesn't matter, given the bounds - logConfig + SHUT_DOWN // doesn't matter, given the bounds ); } @Override public Suppressed.StrictBufferConfig shutDownWhenFull() { - return new StrictBufferConfigImpl(maxRecords(), maxBytes(), SHUT_DOWN, logConfig); + return new StrictBufferConfigImpl(maxRecords(), maxBytes(), SHUT_DOWN); } @Override public Suppressed.EagerBufferConfig emitEarlyWhenFull() { - return new EagerBufferConfigImpl(maxRecords(), maxBytes(), logConfig); + return new EagerBufferConfigImpl(maxRecords(), maxBytes()); } - public boolean isLoggingEnabled() { - return logConfig != null; - } + public abstract boolean isLoggingEnabled(); + + public abstract Map getLogConfig(); - public Map getLogConfig() { - return isLoggingEnabled() ? logConfig : Collections.emptyMap(); - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java index faed44e3cd28e..237d469bb734e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java @@ -18,6 +18,7 @@ import org.apache.kafka.streams.kstream.Suppressed; +import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -25,18 +26,20 @@ public class EagerBufferConfigImpl extends BufferConfigInternal logConfig; public EagerBufferConfigImpl(final long maxRecords, final long maxBytes) { this.maxRecords = maxRecords; this.maxBytes = maxBytes; + this.logConfig = Collections.emptyMap(); } - public EagerBufferConfigImpl(final long maxRecords, + private EagerBufferConfigImpl(final long maxRecords, final long maxBytes, final Map logConfig) { - super(logConfig); this.maxRecords = maxRecords; this.maxBytes = maxBytes; + this.logConfig = logConfig; } @Override @@ -74,6 +77,16 @@ public Suppressed.EagerBufferConfig withLoggingEnabled(final Map return new EagerBufferConfigImpl(maxRecords, maxBytes, config); } + @Override + public boolean isLoggingEnabled() { + return logConfig != null; + } + + @Override + public Map getLogConfig() { + return isLoggingEnabled() ? logConfig : Collections.emptyMap(); + } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java index d8778406348bc..1d65e52894927 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java @@ -18,6 +18,7 @@ import org.apache.kafka.streams.kstream.Suppressed; +import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -28,15 +29,16 @@ public class StrictBufferConfigImpl extends BufferConfigInternal logConfig; public StrictBufferConfigImpl(final long maxRecords, final long maxBytes, final BufferFullStrategy bufferFullStrategy, final Map logConfig) { - super(logConfig); this.maxRecords = maxRecords; this.maxBytes = maxBytes; this.bufferFullStrategy = bufferFullStrategy; + this.logConfig = logConfig; } public StrictBufferConfigImpl(final long maxRecords, @@ -45,12 +47,14 @@ public StrictBufferConfigImpl(final long maxRecords, this.maxRecords = maxRecords; this.maxBytes = maxBytes; this.bufferFullStrategy = bufferFullStrategy; + this.logConfig = Collections.emptyMap(); } public StrictBufferConfigImpl() { this.maxRecords = Long.MAX_VALUE; this.maxBytes = Long.MAX_VALUE; this.bufferFullStrategy = SHUT_DOWN; + this.logConfig = Collections.emptyMap(); } @Override @@ -88,6 +92,16 @@ public BufferFullStrategy bufferFullStrategy() { return bufferFullStrategy; } + @Override + public boolean isLoggingEnabled() { + return logConfig != null; + } + + @Override + public Map getLogConfig() { + return isLoggingEnabled() ? logConfig : Collections.emptyMap(); + } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java index 3738dec3a6f4a..a003ec9db3276 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java @@ -22,21 +22,22 @@ import java.util.HashMap; import java.util.Map; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + public class InMemoryTimeOrderedKeyValueBufferTest { @Test public void bufferShouldAllowCacheEnablement() { - new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null) - .withCachingEnabled(); + new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null).withCachingEnabled(); } @Test public void bufferShouldAllowCacheDisablement() { - new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null) - .withCachingDisabled(); + new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null).withCachingDisabled(); } @Test @@ -47,8 +48,8 @@ public void bufferShouldAllowLoggingEnablement() { final StoreBuilder builder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null) .withLoggingEnabled(logConfig); - assertTrue(builder.logConfig().containsKey("min.insync.replicas")); - assertTrue(builder.logConfig().containsValue(expect)); + assertThat(builder.logConfig(), is(singletonMap("min.insync.replicas", expect))); + assertThat(builder.loggingEnabled(), is(true)); } @Test @@ -56,6 +57,7 @@ public void bufferShouldAllowLoggingDisablement() { final StoreBuilder builder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null) .withLoggingDisabled(); - assertEquals(builder.logConfig(), new HashMap<>()); + assertThat(builder.logConfig(), is(emptyMap())); + assertThat(builder.loggingEnabled(), is(false)); } }