Skip to content

Commit

Permalink
[#3609] Improve error handling of custom mapper
Browse files Browse the repository at this point in the history
The logic for invoking a custom mapper for upstream commands has been adapted
to consider the (failure) status code returned by the mapper when generating a corresponding
ServiceInvocationException to be sent back in the reply to the downstream sender.

fixes #3609
  • Loading branch information
JeffreyThijs authored Jan 27, 2024
1 parent e9a92f3 commit ddebeae
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.hono.adapter.mqtt.MqttProtocolAdapterProperties;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.util.StatusCodeMapper;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.Strings;
Expand Down Expand Up @@ -199,7 +200,8 @@ private void mapUpstreamMessageRequest(
command.getDeviceId(),
mapperEndpoint.getHost(), mapperEndpoint.getPort(), mapperEndpoint.getUri(),
httpResponseAsyncResult.cause());
result.fail(new ServerErrorException(HttpURLConnection.HTTP_UNAVAILABLE, httpResponseAsyncResult.cause()));
final Throwable exception = mapException(command.getTenant(), httpResponseAsyncResult, null);
result.fail(exception);
} else {
final HttpResponse<Buffer> httpResponse = httpResponseAsyncResult.result();
if (httpResponse.statusCode() == HttpURLConnection.HTTP_OK) {
Expand All @@ -208,8 +210,9 @@ private void mapUpstreamMessageRequest(
LOG.debug("mapping service [host: {}, port: {}, URI: {}] returned unexpected status code: {}",
mapperEndpoint.getHost(), mapperEndpoint.getPort(), mapperEndpoint.getUri(),
httpResponse.statusCode());
result.fail(new ServerErrorException(HttpURLConnection.HTTP_UNAVAILABLE,
"could not invoke configured mapping service"));
final Throwable exception = mapException(command.getTenant(), httpResponseAsyncResult,
"could not invoke configured mapping service");
result.fail(exception);
}
}
resultHandler.handle(result.future());
Expand Down Expand Up @@ -279,4 +282,17 @@ private void mapDownstreamMessageRequest(
resultHandler.handle(result.future());
});
}

private Throwable mapException(final String tenantId, final AsyncResult<HttpResponse<Buffer>> httpResponseAsyncResult, final String message) {
final String detailMessage = Optional.ofNullable(message)
.orElse(Optional.ofNullable(httpResponseAsyncResult.cause()).map(Throwable::getMessage).orElse(null));
final Optional<HttpResponse<Buffer>> httpResponse = Optional.ofNullable(httpResponseAsyncResult.result());
final int statusCode = httpResponse.map(HttpResponse::statusCode).orElse(HttpURLConnection.HTTP_UNAVAILABLE);
return StatusCodeMapper.from(
tenantId,
statusCode,
detailMessage,
httpResponseAsyncResult.cause()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.eclipse.hono.adapter.MapperEndpoint;
import org.eclipse.hono.adapter.mqtt.MqttContext;
import org.eclipse.hono.adapter.mqtt.MqttProtocolAdapterProperties;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.service.auth.DeviceUser;
Expand Down Expand Up @@ -353,7 +354,7 @@ public void testMapCommandSucceeds(final VertxTestContext ctx) {
}

/**
* Verifies that the upstream mapper returns a failed future with a ServerErrorException if the upstream mapper has been configured
* Verifies that the upstream mapper returns a failed future with a ClientErrorException if the upstream mapper has been configured
* for an adapter but the remote service returns a 403 status code indicating that the device payload cannot be mapped.
*
* @param ctx The Vert.x test context.
Expand All @@ -379,8 +380,8 @@ public void testMappingCommandFailsForWhenPayloadCannotMapped(final VertxTestCon
messageMapping.mapUpstreamMessage(assertion, command)
.onComplete(ctx.failing(t -> {
ctx.verify(() -> {
assertThat(t).isInstanceOf(ServerErrorException.class);
assertThat((((ServerErrorException) t).getErrorCode())).isEqualTo(HttpURLConnection.HTTP_UNAVAILABLE);
assertThat(t).isInstanceOf(ClientErrorException.class);
assertThat((((ClientErrorException) t).getErrorCode())).isEqualTo(HttpURLConnection.HTTP_FORBIDDEN);
});
ctx.completeNow();
}));
Expand All @@ -389,4 +390,39 @@ public void testMappingCommandFailsForWhenPayloadCannotMapped(final VertxTestCon
verify(httpRequest).sendBuffer(any(Buffer.class), handleCaptor.capture());
handleCaptor.getValue().handle(Future.succeededFuture(httpResponse));
}

/**
* Verifies that the upstream mapper returns a failed future with a ServerErrorException if the upstream mapper has been configured
* for an adapter but the remote service cannot be reached should return a 503.
*
* @param ctx The Vert.x test context.
*/
@Test
@SuppressWarnings("unchecked")
public void testMappingCommandFailsForWhenMapperCannotBeReached(final VertxTestContext ctx) {

config.setMapperEndpoints(Map.of("mapper", MapperEndpoint.from("host", 1234, "/uri", false)));
final HttpRequest<Buffer> httpRequest = mock(HttpRequest.class, withSettings().defaultAnswer(RETURNS_SELF));

final Buffer payload = Buffer.buffer("payload");

when(mapperWebClient.post(anyInt(), anyString(), anyString())).thenReturn(httpRequest);

final Command command = mock(Command.class);
when(command.getPayload()).thenReturn(payload);

final RegistrationAssertion assertion = new RegistrationAssertion("gateway").setUpstreamMessageMapper("mapper");
messageMapping.mapUpstreamMessage(assertion, command)
.onComplete(ctx.failing(t -> {
ctx.verify(() -> {
assertThat(t).isInstanceOf(ServerErrorException.class);
assertThat((((ServerErrorException) t).getErrorCode())).isEqualTo(HttpURLConnection.HTTP_UNAVAILABLE);
});
ctx.completeNow();
}));

final ArgumentCaptor<Handler<AsyncResult<HttpResponse<Buffer>>>> handleCaptor = VertxMockSupport.argumentCaptorHandler();
verify(httpRequest).sendBuffer(any(Buffer.class), handleCaptor.capture());
handleCaptor.getValue().handle(Future.failedFuture(new RuntimeException("something went wrong")));
}
}

0 comments on commit ddebeae

Please sign in to comment.