Skip to content

Commit

Permalink
Centralize exception handling and fix behavior for RequestTimeoutExce…
Browse files Browse the repository at this point in the history
…ption (opensearch-project#3063)

* Centralize exception handling and fix behavior for RequestTimeoutException

Signed-off-by: Chase Engelbrecht <engechas@amazon.com>

* Fix existing tests

Signed-off-by: Chase Engelbrecht <engechas@amazon.com>

* Add unit tests for exception handlers

Signed-off-by: Chase Engelbrecht <engechas@amazon.com>

* Add copyright headers

Signed-off-by: Chase Engelbrecht <engechas@amazon.com>

* Add better default messages

Signed-off-by: Chase Engelbrecht <engechas@amazon.com>

---------

Signed-off-by: Chase Engelbrecht <engechas@amazon.com>
  • Loading branch information
engechas authored Aug 4, 2023
1 parent c75ed7b commit 259fea1
Show file tree
Hide file tree
Showing 23 changed files with 615 additions and 361 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper;

import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.grpc.GrpcStatusFunction;
import com.linecorp.armeria.server.RequestTimeoutException;
import io.grpc.Metadata;
import io.grpc.Status;
import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.exceptions.BadRequestException;
import org.opensearch.dataprepper.exceptions.BufferWriteException;
import org.opensearch.dataprepper.exceptions.RequestCancelledException;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;

import java.util.concurrent.TimeoutException;

public class GrpcRequestExceptionHandler implements GrpcStatusFunction {
static final String ARMERIA_REQUEST_TIMEOUT_MESSAGE = "Timeout waiting for request to be served. This is usually due to the buffer being full.";

public static final String REQUEST_TIMEOUTS = "requestTimeouts";
public static final String BAD_REQUESTS = "badRequests";
public static final String REQUESTS_TOO_LARGE = "requestsTooLarge";
public static final String INTERNAL_SERVER_ERROR = "internalServerError";

private final Counter requestTimeoutsCounter;
private final Counter badRequestsCounter;
private final Counter requestsTooLargeCounter;
private final Counter internalServerErrorCounter;

public GrpcRequestExceptionHandler(final PluginMetrics pluginMetrics) {
requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS);
badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS);
requestsTooLargeCounter = pluginMetrics.counter(REQUESTS_TOO_LARGE);
internalServerErrorCounter = pluginMetrics.counter(INTERNAL_SERVER_ERROR);
}

@Override
public @Nullable Status apply(final RequestContext context, final Throwable exception, final Metadata metadata) {
final Throwable exceptionCause = exception instanceof BufferWriteException ? exception.getCause() : exception;

return handleExceptions(exceptionCause);
}

private Status handleExceptions(final Throwable e) {
if (e instanceof RequestTimeoutException || e instanceof TimeoutException) {
requestTimeoutsCounter.increment();
return createStatus(e, Status.RESOURCE_EXHAUSTED);
} else if (e instanceof SizeOverflowException) {
requestsTooLargeCounter.increment();
return createStatus(e, Status.RESOURCE_EXHAUSTED);
} else if (e instanceof BadRequestException) {
badRequestsCounter.increment();
return createStatus(e, Status.INVALID_ARGUMENT);
} else if (e instanceof RequestCancelledException) {
requestTimeoutsCounter.increment();
return createStatus(e, Status.CANCELLED);
}

internalServerErrorCounter.increment();
return createStatus(e, Status.INTERNAL);
}

private Status createStatus(final Throwable e, final Status status) {
final String message;
if (e instanceof RequestTimeoutException) {
message = ARMERIA_REQUEST_TIMEOUT_MESSAGE;
} else {
message = e.getMessage() == null ? status.getCode().name() : e.getMessage();
}

return status.withDescription(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.loghttp;
package org.opensearch.dataprepper;

import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.server.RequestTimeoutException;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.annotation.ExceptionHandlerFunction;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;
import com.linecorp.armeria.common.HttpResponse;
Expand All @@ -13,10 +17,11 @@
import io.micrometer.core.instrument.Counter;

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.TimeoutException;

public class RequestExceptionHandler {
public class HttpRequestExceptionHandler implements ExceptionHandlerFunction {
static final String ARMERIA_REQUEST_TIMEOUT_MESSAGE = "Timeout waiting for request to be served. This is usually due to the buffer being full.";

public static final String REQUEST_TIMEOUTS = "requestTimeouts";
public static final String BAD_REQUESTS = "badRequests";
public static final String REQUESTS_TOO_LARGE = "requestsTooLarge";
Expand All @@ -27,31 +32,39 @@ public class RequestExceptionHandler {
private final Counter requestsTooLargeCounter;
private final Counter internalServerErrorCounter;

public RequestExceptionHandler(final PluginMetrics pluginMetrics) {
public HttpRequestExceptionHandler(final PluginMetrics pluginMetrics) {
requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS);
badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS);
requestsTooLargeCounter = pluginMetrics.counter(REQUESTS_TOO_LARGE);
internalServerErrorCounter = pluginMetrics.counter(INTERNAL_SERVER_ERROR);
}

public HttpResponse handleException(final Exception e) {
final String message = e.getMessage() == null? "" : e.getMessage();
return handleException(e, message);
@Override
public HttpResponse handleException(final ServiceRequestContext ctx, final HttpRequest req, final Throwable cause) {
final HttpStatus status = handleException(cause);
final String message;
if (cause instanceof RequestTimeoutException) {
message = ARMERIA_REQUEST_TIMEOUT_MESSAGE;
} else {
message = cause.getMessage() == null ? status.reasonPhrase() : cause.getMessage();
}

return HttpResponse.of(status, MediaType.ANY_TYPE, message);
}

public HttpResponse handleException(final Exception e, final String message) {
Objects.requireNonNull(message);
private HttpStatus handleException(final Throwable e) {
if (e instanceof IOException) {
badRequestsCounter.increment();
return HttpResponse.of(HttpStatus.BAD_REQUEST, MediaType.ANY_TYPE, message);
} else if (e instanceof TimeoutException) {
return HttpStatus.BAD_REQUEST;
} else if (e instanceof TimeoutException || e instanceof RequestTimeoutException) {
requestTimeoutsCounter.increment();
return HttpResponse.of(HttpStatus.REQUEST_TIMEOUT, MediaType.ANY_TYPE, message);
return HttpStatus.REQUEST_TIMEOUT;
} else if (e instanceof SizeOverflowException) {
requestsTooLargeCounter.increment();
return HttpResponse.of(HttpStatus.REQUEST_ENTITY_TOO_LARGE, MediaType.ANY_TYPE, message);
return HttpStatus.REQUEST_ENTITY_TOO_LARGE;
}

internalServerErrorCounter.increment();
return HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR, MediaType.ANY_TYPE, message);
return HttpStatus.INTERNAL_SERVER_ERROR;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.exceptions;

public class BadRequestException extends RuntimeException {
public BadRequestException(final String message, final Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.exceptions;

public class BufferWriteException extends RuntimeException {
public BufferWriteException(final String message, final Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.exceptions;

public class RequestCancelledException extends RuntimeException {
public RequestCancelledException(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper;

import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.server.RequestTimeoutException;
import io.grpc.Metadata;
import io.grpc.Status;
import io.micrometer.core.instrument.Counter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.exceptions.BadRequestException;
import org.opensearch.dataprepper.exceptions.BufferWriteException;
import org.opensearch.dataprepper.exceptions.RequestCancelledException;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.GrpcRequestExceptionHandler.ARMERIA_REQUEST_TIMEOUT_MESSAGE;

@ExtendWith(MockitoExtension.class)
public class GrpcRequestExceptionHandlerTest {
@Mock
private PluginMetrics pluginMetrics;

@Mock
private Counter requestTimeoutsCounter;

@Mock
private Counter badRequestsCounter;

@Mock
private Counter requestsTooLargeCounter;

@Mock
private Counter internalServerErrorCounter;

@Mock
private RequestContext requestContext;

@Mock
private Metadata metadata;

private GrpcRequestExceptionHandler grpcRequestExceptionHandler;

@BeforeEach
public void setUp() {
when(pluginMetrics.counter(HttpRequestExceptionHandler.REQUEST_TIMEOUTS)).thenReturn(requestTimeoutsCounter);
when(pluginMetrics.counter(HttpRequestExceptionHandler.BAD_REQUESTS)).thenReturn(badRequestsCounter);
when(pluginMetrics.counter(HttpRequestExceptionHandler.REQUESTS_TOO_LARGE)).thenReturn(requestsTooLargeCounter);
when(pluginMetrics.counter(HttpRequestExceptionHandler.INTERNAL_SERVER_ERROR)).thenReturn(internalServerErrorCounter);

grpcRequestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics);
}

@Test
public void testHandleBadRequestException() {
final BadRequestException badRequestExceptionNoMessage = new BadRequestException(null, new IOException());
final String exceptionMessage = UUID.randomUUID().toString();
final BadRequestException badRequestExceptionWithMessage = new BadRequestException(exceptionMessage, new IOException());

final Status noMessageStatus = grpcRequestExceptionHandler.apply(requestContext, badRequestExceptionNoMessage, metadata);
assertThat(noMessageStatus.getCode(), equalTo(Status.Code.INVALID_ARGUMENT));
assertThat(noMessageStatus.getDescription(), equalTo(Status.Code.INVALID_ARGUMENT.name()));

final Status messageStatus = grpcRequestExceptionHandler.apply(requestContext, badRequestExceptionWithMessage, metadata);
assertThat(messageStatus.getCode(), equalTo(Status.Code.INVALID_ARGUMENT));
assertThat(messageStatus.getDescription(), equalTo(exceptionMessage));

verify(badRequestsCounter, times(2)).increment();
}

@Test
public void testHandleTimeoutException() {
final BufferWriteException timeoutExceptionNoMessage = new BufferWriteException(null, new TimeoutException());
final String exceptionMessage = UUID.randomUUID().toString();
final BufferWriteException timeoutExceptionWithMessage = new BufferWriteException(exceptionMessage, new TimeoutException(exceptionMessage));

final Status noMessageStatus = grpcRequestExceptionHandler.apply(requestContext, timeoutExceptionNoMessage, metadata);
assertThat(noMessageStatus.getCode(), equalTo(Status.Code.RESOURCE_EXHAUSTED));
assertThat(noMessageStatus.getDescription(), equalTo(Status.Code.RESOURCE_EXHAUSTED.name()));

final Status messageStatus = grpcRequestExceptionHandler.apply(requestContext, timeoutExceptionWithMessage, metadata);
assertThat(messageStatus.getCode(), equalTo(Status.Code.RESOURCE_EXHAUSTED));
assertThat(messageStatus.getDescription(), equalTo(exceptionMessage));

verify(requestTimeoutsCounter, times(2)).increment();
}

@Test
public void testHandleArmeriaTimeoutException() {
final RequestTimeoutException timeoutExceptionNoMessage = RequestTimeoutException.get();

final Status noMessageStatus = grpcRequestExceptionHandler.apply(requestContext, timeoutExceptionNoMessage, metadata);
assertThat(noMessageStatus.getCode(), equalTo(Status.Code.RESOURCE_EXHAUSTED));
assertThat(noMessageStatus.getDescription(), equalTo(ARMERIA_REQUEST_TIMEOUT_MESSAGE));

verify(requestTimeoutsCounter, times(1)).increment();
}

@Test
public void testHandleSizeOverflowException() {
final BufferWriteException sizeOverflowExceptionNoMessage = new BufferWriteException(null, new SizeOverflowException(null));
final String exceptionMessage = UUID.randomUUID().toString();
final BufferWriteException sizeOverflowExceptionWithMessage = new BufferWriteException(exceptionMessage, new SizeOverflowException(exceptionMessage));

final Status noMessageStatus = grpcRequestExceptionHandler.apply(requestContext, sizeOverflowExceptionNoMessage, metadata);
assertThat(noMessageStatus.getCode(), equalTo(Status.Code.RESOURCE_EXHAUSTED));
assertThat(noMessageStatus.getDescription(), equalTo(Status.Code.RESOURCE_EXHAUSTED.name()));

final Status messageStatus = grpcRequestExceptionHandler.apply(requestContext, sizeOverflowExceptionWithMessage, metadata);
assertThat(messageStatus.getCode(), equalTo(Status.Code.RESOURCE_EXHAUSTED));
assertThat(messageStatus.getDescription(), equalTo(exceptionMessage));

verify(requestsTooLargeCounter, times(2)).increment();
}

@Test
public void testHandleRequestCancelledException() {
final RequestCancelledException requestCancelledExceptionNoMessage = new RequestCancelledException(null);
final String exceptionMessage = UUID.randomUUID().toString();
final RequestCancelledException requestCancelledExceptionWithMessage = new RequestCancelledException(exceptionMessage);

final Status noMessageStatus = grpcRequestExceptionHandler.apply(requestContext, requestCancelledExceptionNoMessage, metadata);
assertThat(noMessageStatus.getCode(), equalTo(Status.Code.CANCELLED));
assertThat(noMessageStatus.getDescription(), equalTo(Status.Code.CANCELLED.name()));

final Status messageStatus = grpcRequestExceptionHandler.apply(requestContext, requestCancelledExceptionWithMessage, metadata);
assertThat(messageStatus.getCode(), equalTo(Status.Code.CANCELLED));
assertThat(messageStatus.getDescription(), equalTo(exceptionMessage));

verify(requestTimeoutsCounter, times(2)).increment();
}

@Test
public void testHandleInternalServerException() {
final RuntimeException runtimeExceptionNoMessage = new RuntimeException();
final String exceptionMessage = UUID.randomUUID().toString();
final RuntimeException runtimeExceptionWithMessage = new RuntimeException(exceptionMessage);

final Status noMessageStatus = grpcRequestExceptionHandler.apply(requestContext, runtimeExceptionNoMessage, metadata);
assertThat(noMessageStatus.getCode(), equalTo(Status.Code.INTERNAL));
assertThat(noMessageStatus.getDescription(), equalTo(Status.Code.INTERNAL.name()));

final Status messageStatus = grpcRequestExceptionHandler.apply(requestContext, runtimeExceptionWithMessage, metadata);
assertThat(messageStatus.getCode(), equalTo(Status.Code.INTERNAL));
assertThat(messageStatus.getDescription(), equalTo(exceptionMessage));

verify(internalServerErrorCounter, times(2)).increment();
}
}
Loading

0 comments on commit 259fea1

Please sign in to comment.