Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;

import java.time.Duration;
import java.util.Map;

public interface Suppressed<K> extends NamedOperation<Suppressed<K>> {

Expand Down Expand Up @@ -118,6 +119,25 @@ static StrictBufferConfig unbounded() {
* duplicate results downstream, but does not promise to eliminate them.
*/
EagerBufferConfig emitEarlyWhenFull();

/**
* Disable the changelog for store built by this {@link StoreBuilder}.
* This will turn off fault-tolerance for your store.
* By default the changelog is enabled.
* @return this
*/
BC withLoggingDisabled();

/**
* Indicates that a changelog topic should be created containing the currently suppressed
* records. Due to the short-lived nature of records in this topic it is likely more
* compactable than changelog topics for KTables.
*
* @param config Configs that should be applied to the changelog. Note: Any unrecognized
* configs will be ignored.
* @return this
*/
BC withLoggingEnabled(final Map<String, String> config);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
Expand Down Expand Up @@ -528,10 +529,27 @@ public KTable<K, V> suppress(final Suppressed<? super K> suppressed) {
this
);

final StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<K, V>> storeBuilder;

if (suppressedInternal.bufferConfig().isLoggingEnabled()) {
final Map<String, String> topicConfig = suppressedInternal.bufferConfig().getLogConfig();
storeBuilder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(
storeName,
keySerde,
valSerde)
.withLoggingEnabled(topicConfig);
} else {
storeBuilder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(
storeName,
keySerde,
valSerde)
.withLoggingDisabled();
}

final ProcessorGraphNode<K, Change<V>> node = new StatefulProcessorNode<>(
name,
new ProcessorParameters<>(suppressionSupplier, name),
new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, valSerde)
storeBuilder
);

builder.addGraphNode(streamsGraphNode, node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import org.apache.kafka.streams.kstream.Suppressed;

import java.util.Map;

import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN;

abstract class BufferConfigInternal<BC extends Suppressed.BufferConfig<BC>> implements Suppressed.BufferConfig<BC> {
public abstract class BufferConfigInternal<BC extends Suppressed.BufferConfig<BC>> implements Suppressed.BufferConfig<BC> {
public abstract long maxRecords();

public abstract long maxBytes();
Expand All @@ -46,4 +48,9 @@ public Suppressed.StrictBufferConfig shutDownWhenFull() {
public Suppressed.EagerBufferConfig emitEarlyWhenFull() {
return new EagerBufferConfigImpl(maxRecords(), maxBytes());
}

public abstract boolean isLoggingEnabled();

public abstract Map<String, String> getLogConfig();

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,38 @@

import org.apache.kafka.streams.kstream.Suppressed;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;

public class EagerBufferConfigImpl extends BufferConfigInternal<Suppressed.EagerBufferConfig> implements Suppressed.EagerBufferConfig {

private final long maxRecords;
private final long maxBytes;
private final Map<String, String> logConfig;

public EagerBufferConfigImpl(final long maxRecords, final long maxBytes) {
this.maxRecords = maxRecords;
this.maxBytes = maxBytes;
this.logConfig = Collections.emptyMap();
}

private EagerBufferConfigImpl(final long maxRecords,
final long maxBytes,
final Map<String, String> logConfig) {
this.maxRecords = maxRecords;
this.maxBytes = maxBytes;
this.logConfig = logConfig;
}

@Override
public Suppressed.EagerBufferConfig withMaxRecords(final long recordLimit) {
return new EagerBufferConfigImpl(recordLimit, maxBytes);
return new EagerBufferConfigImpl(recordLimit, maxBytes, logConfig);
}

@Override
public Suppressed.EagerBufferConfig withMaxBytes(final long byteLimit) {
return new EagerBufferConfigImpl(maxRecords, byteLimit);
return new EagerBufferConfigImpl(maxRecords, byteLimit, logConfig);
}

@Override
Expand All @@ -55,6 +67,26 @@ public BufferFullStrategy bufferFullStrategy() {
return BufferFullStrategy.EMIT;
}

@Override
public Suppressed.EagerBufferConfig withLoggingDisabled() {
return new EagerBufferConfigImpl(maxRecords, maxBytes, null);
}

@Override
public Suppressed.EagerBufferConfig withLoggingEnabled(final Map<String, String> config) {
return new EagerBufferConfigImpl(maxRecords, maxBytes, config);
}

@Override
public boolean isLoggingEnabled() {
return logConfig != null;
}

@Override
public Map<String, String> getLogConfig() {
return isLoggingEnabled() ? logConfig : Collections.emptyMap();
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import org.apache.kafka.streams.kstream.Suppressed;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;

import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN;
Expand All @@ -27,29 +29,52 @@ public class StrictBufferConfigImpl extends BufferConfigInternal<Suppressed.Stri
private final long maxRecords;
private final long maxBytes;
private final BufferFullStrategy bufferFullStrategy;
private final Map<String, String> logConfig;

public StrictBufferConfigImpl(final long maxRecords,
final long maxBytes,
final BufferFullStrategy bufferFullStrategy,
final Map<String, String> logConfig) {
this.maxRecords = maxRecords;
this.maxBytes = maxBytes;
this.bufferFullStrategy = bufferFullStrategy;
this.logConfig = logConfig;
}

public StrictBufferConfigImpl(final long maxRecords,
final long maxBytes,
final BufferFullStrategy bufferFullStrategy) {
this.maxRecords = maxRecords;
this.maxBytes = maxBytes;
this.bufferFullStrategy = bufferFullStrategy;
this.logConfig = Collections.emptyMap();
}

public StrictBufferConfigImpl() {
this.maxRecords = Long.MAX_VALUE;
this.maxBytes = Long.MAX_VALUE;
this.bufferFullStrategy = SHUT_DOWN;
this.logConfig = Collections.emptyMap();
}

@Override
public Suppressed.StrictBufferConfig withMaxRecords(final long recordLimit) {
return new StrictBufferConfigImpl(recordLimit, maxBytes, bufferFullStrategy);
return new StrictBufferConfigImpl(recordLimit, maxBytes, bufferFullStrategy, logConfig);
}

@Override
public Suppressed.StrictBufferConfig withMaxBytes(final long byteLimit) {
return new StrictBufferConfigImpl(maxRecords, byteLimit, bufferFullStrategy);
return new StrictBufferConfigImpl(maxRecords, byteLimit, bufferFullStrategy, logConfig);
}

@Override
public Suppressed.StrictBufferConfig withLoggingDisabled() {
return new StrictBufferConfigImpl(maxRecords, maxBytes, bufferFullStrategy, null);
}

@Override
public Suppressed.StrictBufferConfig withLoggingEnabled(final Map<String, String> config) {
return new StrictBufferConfigImpl(maxRecords, maxBytes, bufferFullStrategy, config);
}

@Override
Expand All @@ -67,6 +92,16 @@ public BufferFullStrategy bufferFullStrategy() {
return bufferFullStrategy;
}

@Override
public boolean isLoggingEnabled() {
return logConfig != null;
}

@Override
public Map<String, String> getLogConfig() {
return isLoggingEnabled() ? logConfig : Collections.emptyMap();
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public String name() {
return name;
}

BufferConfigInternal bufferConfig() {
@SuppressWarnings("unchecked")
public <BC extends Suppressed.BufferConfig<BC>> BufferConfigInternal<BC> bufferConfig() {
return bufferConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public static class Builder<K, V> implements StoreBuilder<InMemoryTimeOrderedKey
private final Serde<K> keySerde;
private final Serde<V> valSerde;
private boolean loggingEnabled = true;
private Map<String, String> logConfig = new HashMap<>();

public Builder(final String storeName, final Serde<K> keySerde, final Serde<V> valSerde) {
this.storeName = storeName;
Expand Down Expand Up @@ -127,7 +128,8 @@ public StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<K, V>> withCachingDisabled

@Override
public StoreBuilder<InMemoryTimeOrderedKeyValueBuffer<K, V>> withLoggingEnabled(final Map<String, String> config) {
throw new UnsupportedOperationException();
logConfig = config;
return this;
}

@Override
Expand All @@ -143,7 +145,7 @@ public InMemoryTimeOrderedKeyValueBuffer<K, V> build() {

@Override
public Map<String, String> logConfig() {
return Collections.emptyMap();
return loggingEnabled() ? Collections.unmodifiableMap(logConfig) : Collections.emptyMap();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,18 @@
*/
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.streams.state.StoreBuilder;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;


public class InMemoryTimeOrderedKeyValueBufferTest {

@Test
Expand All @@ -29,4 +39,25 @@ public void bufferShouldAllowCacheEnablement() {
public void bufferShouldAllowCacheDisablement() {
new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null).withCachingDisabled();
}

@Test
public void bufferShouldAllowLoggingEnablement() {
final String expect = "3";
final Map<String, String> logConfig = new HashMap<>();
logConfig.put("min.insync.replicas", expect);
final StoreBuilder builder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null)
.withLoggingEnabled(logConfig);

assertThat(builder.logConfig(), is(singletonMap("min.insync.replicas", expect)));
assertThat(builder.loggingEnabled(), is(true));
}

@Test
public void bufferShouldAllowLoggingDisablement() {
final StoreBuilder builder = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null)
.withLoggingDisabled();

assertThat(builder.logConfig(), is(emptyMap()));
assertThat(builder.loggingEnabled(), is(false));
}
}