Skip to content

Commit

Permalink
[0.18] Backport fixes found during continuous testing in #249 (#321)
Browse files Browse the repository at this point in the history
* Backport fixes found during sarama testing

Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>

* Change logging during e2e tests
  • Loading branch information
pierDipi authored Oct 21, 2020
1 parent 5a5fddb commit 7970446
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 74 deletions.
2 changes: 0 additions & 2 deletions data-plane/config/100-config-kafka-broker-data-plane.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ data:
retries=2147483647
batch.size=16384
client.dns.lookup=use_all_dns_ips
client.id=KKBD
connections.max.idle.ms=600000
delivery.timeout.ms=120000
linger.ms=0
Expand Down Expand Up @@ -87,7 +86,6 @@ data:
# ssl.provider=
auto.commit.interval.ms=5000
check.crcs=true
client.id=KKBD
# client.rack=
fetch.max.wait.ms=500
# interceptor.classes=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ data:
retries=2147483647
batch.size=16384
client.dns.lookup=use_all_dns_ips
client.id=KKBD
connections.max.idle.ms=600000
delivery.timeout.ms=120000
linger.ms=0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,9 @@
* @param <R> type of the response of given senders.
* @see ConsumerRecordHandler#handle(KafkaConsumerRecord)
*/
public final class ConsumerRecordHandler<K, V, R> implements
Handler<KafkaConsumerRecord<K, V>> {
public final class ConsumerRecordHandler<K, V, R> implements Handler<KafkaConsumerRecord<K, V>> {

private static final Logger logger = LoggerFactory
.getLogger(ConsumerRecordHandler.class);

private static final String SUBSCRIBER = "subscriber";
private static final String DLQ = "dead letter queue";
private static final Logger logger = LoggerFactory.getLogger(ConsumerRecordHandler.class);

private final Filter<V> filter;
private final ConsumerRecordSender<K, V, R> subscriberSender;
Expand All @@ -55,9 +50,8 @@ public final class ConsumerRecordHandler<K, V, R> implements
*
* @param subscriberSender sender to trigger subscriber
* @param filter event filter
* @param receiver hook receiver {@link ConsumerRecordOffsetStrategy}. It allows to
* plug in custom offset management depending on the success/failure
* during the algorithm.
* @param receiver hook receiver {@link ConsumerRecordOffsetStrategy}. It allows to plug in custom offset
* management depending on the success/failure during the algorithm.
* @param sinkResponseHandler handler of the response from {@code subscriberSender}
* @param deadLetterQueueSender sender to DLQ
*/
Expand Down Expand Up @@ -86,9 +80,8 @@ public ConsumerRecordHandler(
*
* @param subscriberSender sender to trigger subscriber
* @param filter event filter
* @param receiver hook receiver {@link ConsumerRecordOffsetStrategy}. It allows to
* plug in custom offset management depending on the success/failure
* during the algorithm.
* @param receiver hook receiver {@link ConsumerRecordOffsetStrategy}. It allows to plug in custom offset
* management depending on the success/failure during the algorithm.
* @param sinkResponseHandler handler of the response
*/
public ConsumerRecordHandler(
Expand Down Expand Up @@ -116,77 +109,81 @@ record -> Future.failedFuture("no DLQ configured")
@Override
public void handle(final KafkaConsumerRecord<K, V> record) {

logger.debug("handling record {}", record);
logDebug("handling record", record);

receiver.recordReceived(record);

if (filter.match(record.value())) {
logger.debug("record match filtering {}", record);

subscriberSender.send(record)
.onSuccess(response -> onSuccessfullySentToSubscriber(record))
.onFailure(cause -> onFailedToSendToSubscriber(record, cause))
.compose(sinkResponseHandler::handle)
.onFailure(
t -> logger.error("Failed to send the subscriber response to the broker topic", t));
logDebug("record match filtering", record);
send(record);
} else {
logger.debug("record doesn't match filtering {}", record);

logDebug("record doesn't match filtering", record);
receiver.recordDiscarded(record);
}
}

private void onSuccessfullySentToSubscriber(final KafkaConsumerRecord<K, V> record) {
logSuccessfulSendTo(SUBSCRIBER, record);

receiver.successfullySentToSubscriber(record);
private void send(final KafkaConsumerRecord<K, V> record) {
subscriberSender.send(record)
.onSuccess(response -> sinkResponseHandler.handle(response)
.onSuccess(ignored -> {
logDebug("Successfully send response to the broker", record);
receiver.successfullySentToSubscriber(record);
})
.onFailure(cause -> {
logError("Failed to handle response", record, cause);
sendToDLS(record);
}))
.onFailure(cause -> {
logError("Failed to send event to subscriber", record, cause);
sendToDLS(record);
});
}

private void onFailedToSendToSubscriber(
final KafkaConsumerRecord<K, V> record,
final Throwable cause) {

logFailedSendTo(SUBSCRIBER, record, cause);

private void sendToDLS(KafkaConsumerRecord<K, V> record) {
deadLetterQueueSender.send(record)
.onSuccess(ignored -> onSuccessfullySentToDLQ(record))
.onFailure(ex -> onFailedToSendToDLQ(record, ex))
.compose(sinkResponseHandler::handle)
.onFailure(
t -> logger.error("Failed to send the subscriber response to the broker topic", t));
}

private void onSuccessfullySentToDLQ(final KafkaConsumerRecord<K, V> record) {
logSuccessfulSendTo(DLQ, record);

receiver.successfullySentToDLQ(record);
.onFailure(ex -> {
logError("Failed to send record to dead letter sink", record, ex);
receiver.failedToSendToDLQ(record, ex);
})
.onSuccess(response -> sinkResponseHandler.handle(response)
.onSuccess(ignored -> {
logDebug("Successfully send response to the broker", record);
receiver.successfullySentToDLQ(record);
})
.onFailure(cause -> {
logError("Failed to handle response", record, cause);
receiver.failedToSendToDLQ(record, cause);
}));
}

private void onFailedToSendToDLQ(KafkaConsumerRecord<K, V> record, Throwable ex) {
logFailedSendTo(DLQ, record, ex);

receiver.failedToSendToDLQ(record, ex);
}

private static <K, V> void logFailedSendTo(
final String component,
private static <K, V> void logError(
final String msg,
final KafkaConsumerRecord<K, V> record,
final Throwable cause) {

logger.error(component + " sender failed to send record {} {} {}",
keyValue("topic", record.topic()),
keyValue("partition", record.partition()),
keyValue("offset", record.offset()),
keyValue("event", record.value()),
cause
);
if (logger.isDebugEnabled()) {
logger.error(msg + " {} {} {}",
keyValue("topic", record.topic()),
keyValue("partition", record.partition()),
keyValue("offset", record.offset()),
keyValue("event", record.value()),
cause
);
} else {
logger.error(msg + " {} {} {}",
keyValue("topic", record.topic()),
keyValue("partition", record.partition()),
keyValue("offset", record.offset()),
cause
);
}
}

private static <K, V> void logSuccessfulSendTo(
final String component,
private static <K, V> void logDebug(
final String msg,
final KafkaConsumerRecord<K, V> record) {

logger.debug("record successfully handled by " + component + " {} {} {}",
logger.debug(msg + " {} {} {}",
keyValue("topic", record.topic()),
keyValue("partition", record.partition()),
keyValue("offset", record.offset()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ public HttpSinkResponseHandler(
*/
@Override
public Future<Void> handle(final HttpResponse<Buffer> response) {
// TODO if it's in structured, the SDK just calls getBytes on the response.body() which might lead to NPE.
MessageReader messageReader = VertxMessageFactory.createReader(response);
if (messageReader.getEncoding() == Encoding.UNKNOWN) {

// When the sink returns a malformed event we return a failed future to avoid committing the message to Kafka.
if (response.body().length() > 0) {
if (response.body() != null && response.body().length() > 0) {
return Future.failedFuture(
new IllegalResponseException("Unable to decode response: unknown encoding and non empty response")
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package dev.knative.eventing.kafka.broker.dispatcher.http;

import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -70,6 +71,30 @@ public void shouldSucceedOnUnknownEncodingAndEmptyResponse(final Vertx vertx, fi
.onSuccess(v -> context.completeNow());
}

@Test
public void shouldSucceedOnUnknownEncodingAndNullResponseBody(final Vertx vertx, final VertxTestContext context) {
final var producer = new MockProducer<>(
true,
new StringSerializer(),
new CloudEventSerializer()
);
final var handler = new HttpSinkResponseHandler(
TOPIC,
KafkaProducer.create(vertx, producer)
);

// Empty response
final HttpResponse<Buffer> response = mock(HttpResponse.class);
when(response.statusCode()).thenReturn(202);
when(response.body()).thenReturn(null);
when(response.headers()).thenReturn(MultiMap.caseInsensitiveMultiMap());

context
.assertComplete(handler.handle(response))
.onSuccess(v -> context.completeNow());
}


@Test
public void shouldFailOnUnknownEncodingAndNonEmptyResponse(final Vertx vertx, final VertxTestContext context) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ public void testUnorderedConsumer(final Vertx vertx, final VertxTestContext cont

Thread.sleep(6000); // reduce flakiness

assertThat(vertx.deploymentIDs())
.hasSize(numEgresses);
assertThat(vertx.deploymentIDs()).hasSize(numEgresses);

waitEvents.await();

Expand All @@ -144,10 +143,8 @@ public void testUnorderedConsumer(final Vertx vertx, final VertxTestContext cont
final var history = producerEntry.getValue().history();
assertThat(history).hasSameSizeAs(consumerRecords);

assertThat(history.stream().map(ProducerRecord::value))
.containsExactlyInAnyOrder(events);
assertThat(history.stream().map(ProducerRecord::key))
.containsAnyElementsOf(partitionKeys);
assertThat(history.stream().map(ProducerRecord::value)).containsExactlyInAnyOrder(events);
assertThat(history.stream().map(ProducerRecord::key)).containsAnyElementsOf(partitionKeys);
}

executorService.shutdown();
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ github.com/go-toolsmith/strparse v1.0.0/go.mod h1:YI2nUKP9YGZnL/L1/DLFBfixrcjslW
github.com/go-toolsmith/typep v1.0.0/go.mod h1:JSQCQMUPdRlMZFswiq3TGpNp1GMktqkR2Ns5AIQkATU=
github.com/go-toolsmith/typep v1.0.2/go.mod h1:JSQCQMUPdRlMZFswiq3TGpNp1GMktqkR2Ns5AIQkATU=
github.com/go-xmlfmt/xmlfmt v0.0.0-20191208150333-d5b6f63a941b/go.mod h1:aUCEOzzezBEjDBbFBoSiya/gduyIiWYRP6CnSFIV8AM=
github.com/go-yaml/yaml v2.1.0+incompatible h1:RYi2hDdss1u4YE7GwixGzWwVo47T8UQwnTLB6vQiq+o=
github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0=
github.com/gobuffalo/envy v1.6.5/go.mod h1:N+GkhhZ/93bGZc6ZKhJLP6+m+tCNPKwgSpH9kaifseQ=
github.com/gobuffalo/envy v1.7.0/go.mod h1:n7DRkBerg/aorDM8kbduw5dN3oXGswK5liaSCx4T5NI=
Expand Down
5 changes: 4 additions & 1 deletion test/pkg/config/100-config-logging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ data:
<appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<root level="DEBUG">
<logger name="dev.knative" level="DEBUG">
<appender-ref ref="jsonConsoleAppender" />
</logger>
<root level="INFO">
<appender-ref ref="jsonConsoleAppender"/>
</root>
</configuration>
Expand Down

0 comments on commit 7970446

Please sign in to comment.