diff --git a/adapter-base/src/main/java/org/eclipse/hono/adapter/resourcelimits/PrometheusBasedResourceLimitChecks.java b/adapter-base/src/main/java/org/eclipse/hono/adapter/resourcelimits/PrometheusBasedResourceLimitChecks.java index 1edc43d185..07c9ffe812 100644 --- a/adapter-base/src/main/java/org/eclipse/hono/adapter/resourcelimits/PrometheusBasedResourceLimitChecks.java +++ b/adapter-base/src/main/java/org/eclipse/hono/adapter/resourcelimits/PrometheusBasedResourceLimitChecks.java @@ -16,6 +16,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import org.eclipse.hono.client.registry.TenantClient; @@ -113,32 +114,28 @@ public Future isConnectionLimitReached(final TenantObject tenant, final final var value = connectionCountCache.get(new LimitedResourceKey(tenant.getTenantId(), tenantClient::get)); - if (value.isDone()) { - try { - final var limitedResource = value.get(); - TracingHelper.TAG_CACHE_HIT.set(span, Boolean.TRUE); - span.log(Map.of( - TenantConstants.FIELD_MAX_CONNECTIONS, Optional.ofNullable(limitedResource.getCurrentLimit()) - .map(String::valueOf) - .orElse("N/A"), - "current-connections", limitedResource.getCurrentValue())); - final boolean isExceeded = Optional.ofNullable(limitedResource.getCurrentLimit()) - .map(limit -> limitedResource.getCurrentValue() >= limit) - .orElse(false); - result.complete(isExceeded); - } catch (InterruptedException | ExecutionException e) { - // this means that the query could not be run successfully - TracingHelper.logError(span, e); - // fall back to default value - result.complete(Boolean.FALSE); - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - } - } else { - LOG.trace("Prometheus query [tenant: {}] still running, using default value", tenant.getTenantId()); - span.log(EVENT_QUERY_STILL_RUNNING); + try { + final var limitedResource = value.get(); + TracingHelper.TAG_CACHE_HIT.set(span, Boolean.TRUE); + span.log(Map.of( + TenantConstants.FIELD_MAX_CONNECTIONS, Optional.ofNullable(limitedResource.getCurrentLimit()) + .map(String::valueOf) + .orElse("N/A"), + "current-connections", limitedResource.getCurrentValue())); + final boolean isExceeded = Optional.ofNullable(limitedResource.getCurrentLimit()) + .map(limit -> limitedResource.getCurrentValue() >= limit) + .orElse(false); + result.complete(isExceeded); + } catch (CancellationException | ExecutionException | InterruptedException e) { + // this means that the query could not be run successfully + TracingHelper.logError(span, e); + // fall back to default value result.complete(Boolean.FALSE); + if (e instanceof InterruptedException) { + LOG.trace("Prometheus query [tenant: {}] still running, using default value", tenant.getTenantId()); + span.log(EVENT_QUERY_STILL_RUNNING); + Thread.currentThread().interrupt(); + } } return result.future() @@ -201,32 +198,28 @@ public Future isMessageLimitReached( final var value = dataVolumeCache.get(new LimitedResourceKey(tenant.getTenantId(), tenantClient::get)); - if (value.isDone()) { - try { - final var limitedResource = value.get(); - TracingHelper.TAG_CACHE_HIT.set(span, true); - span.log(Map.of( - "current period bytes limit", Optional.ofNullable(limitedResource.getCurrentLimit()) - .map(String::valueOf) - .orElse("N/A"), - "current period bytes consumed", limitedResource.getCurrentValue())); - final boolean isExceeded = Optional.ofNullable(limitedResource.getCurrentLimit()) - .map(limit -> (limitedResource.getCurrentValue() + payloadSize) > limit) - .orElse(false); - result.complete(isExceeded); - } catch (InterruptedException | ExecutionException e) { - // this means that the query could not be run successfully - TracingHelper.logError(span, e); - // fall back to default value - result.complete(Boolean.FALSE); - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - } - } else { - LOG.trace("Prometheus query [tenant: {}] still running, using default value", tenant.getTenantId()); - span.log(Map.of(Fields.MESSAGE, EVENT_QUERY_STILL_RUNNING)); + try { + final var limitedResource = value.get(); + TracingHelper.TAG_CACHE_HIT.set(span, true); + span.log(Map.of( + "current period bytes limit", Optional.ofNullable(limitedResource.getCurrentLimit()) + .map(String::valueOf) + .orElse("N/A"), + "current period bytes consumed", limitedResource.getCurrentValue())); + final boolean isExceeded = Optional.ofNullable(limitedResource.getCurrentLimit()) + .map(limit -> (limitedResource.getCurrentValue() + payloadSize) > limit) + .orElse(false); + result.complete(isExceeded); + } catch (CancellationException | ExecutionException | InterruptedException e) { + // this means that the query could not be run successfully + TracingHelper.logError(span, e); + // fall back to default value result.complete(Boolean.FALSE); + if (e instanceof InterruptedException) { + LOG.trace("Prometheus query [tenant: {}] still running, using default value", tenant.getTenantId()); + span.log(Map.of(Fields.MESSAGE, EVENT_QUERY_STILL_RUNNING)); + Thread.currentThread().interrupt(); + } } } return result.future() @@ -253,32 +246,28 @@ public Future isConnectionDurationLimitReached( final var key = new LimitedResourceKey(tenant.getTenantId(), tenantClient::get); final var value = connectionDurationCache.get(key); - if (value.isDone()) { - try { - final var limitedResource = value.get(); - TracingHelper.TAG_CACHE_HIT.set(span, true); - span.log(Map.of( - "current period's connection duration limit", Optional.ofNullable(limitedResource.getCurrentLimit()) - .map(String::valueOf) - .orElse("N/A"), - "current period's connection duration consumed", limitedResource.getCurrentValue())); - final boolean isExceeded = Optional.ofNullable(limitedResource.getCurrentLimit()) - .map(limit -> limitedResource.getCurrentValue().compareTo(limit) >= 0) - .orElse(false); - result.complete(isExceeded); - } catch (InterruptedException | ExecutionException e) { - // this means that the query could not be run successfully - TracingHelper.logError(span, e); - // fall back to default value - result.complete(Boolean.FALSE); - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - } - } else { - LOG.trace("Prometheus query [tenant: {}] still running, using default value", tenant.getTenantId()); - span.log(Map.of(Fields.MESSAGE, EVENT_QUERY_STILL_RUNNING)); + try { + final var limitedResource = value.get(); + TracingHelper.TAG_CACHE_HIT.set(span, true); + span.log(Map.of( + "current period's connection duration limit", Optional.ofNullable(limitedResource.getCurrentLimit()) + .map(String::valueOf) + .orElse("N/A"), + "current period's connection duration consumed", limitedResource.getCurrentValue())); + final boolean isExceeded = Optional.ofNullable(limitedResource.getCurrentLimit()) + .map(limit -> limitedResource.getCurrentValue().compareTo(limit) >= 0) + .orElse(false); + result.complete(isExceeded); + } catch (CancellationException | ExecutionException | InterruptedException e) { + // this means that the query could not be run successfully + TracingHelper.logError(span, e); + // fall back to default value result.complete(Boolean.FALSE); + if (e instanceof InterruptedException) { + LOG.trace("Prometheus query [tenant: {}] still running, using default value", tenant.getTenantId()); + span.log(Map.of(Fields.MESSAGE, EVENT_QUERY_STILL_RUNNING)); + Thread.currentThread().interrupt(); + } } return result.future() .onSuccess(b -> { diff --git a/adapter-base/src/test/java/org/eclipse/hono/adapter/resourcelimits/PrometheusBasedResourceLimitChecksTest.java b/adapter-base/src/test/java/org/eclipse/hono/adapter/resourcelimits/PrometheusBasedResourceLimitChecksTest.java index 90d167880b..ed113b041f 100644 --- a/adapter-base/src/test/java/org/eclipse/hono/adapter/resourcelimits/PrometheusBasedResourceLimitChecksTest.java +++ b/adapter-base/src/test/java/org/eclipse/hono/adapter/resourcelimits/PrometheusBasedResourceLimitChecksTest.java @@ -306,7 +306,9 @@ public void testMessageLimitNotExceededIfNoResourceLimitsFound(final VertxTestCo @Test public void testMessageLimitFallsBackToDefaultValueIfQueryStillRunning(final VertxTestContext ctx) { - when(dataVolumeCache.get(any(LimitedResourceKey.class))).thenReturn(new CompletableFuture>()); + when(dataVolumeCache.get(any(LimitedResourceKey.class))) + .thenReturn(new CompletableFuture>() + .orTimeout(2, TimeUnit.SECONDS)); final long incomingMessageSize = 20; final TenantObject tenant = TenantObject.from(Constants.DEFAULT_TENANT); @@ -388,7 +390,7 @@ public void testConnectionDurationLimitCheckSucceedsIfNoResourceLimitsFound(fina private void givenCurrentConnections(final Long maxConnections, final Long currentConnections) { when(connectionCountCache.get(any(LimitedResourceKey.class))).thenAnswer(i -> { final var count = new CompletableFuture>(); - count.complete(new LimitedResource<>(maxConnections, currentConnections)); + count.completeOnTimeout(new LimitedResource<>(maxConnections, currentConnections), 100, TimeUnit.MILLISECONDS); return count; }); } @@ -396,7 +398,7 @@ private void givenCurrentConnections(final Long maxConnections, final Long curre private void givenDataVolumeUsageInBytes(final Long maxBytes, final Long consumedBytes) { when(dataVolumeCache.get(any(LimitedResourceKey.class))).thenAnswer(i -> { final var count = new CompletableFuture>(); - count.complete(new LimitedResource<>(maxBytes, consumedBytes.longValue())); + count.completeOnTimeout(new LimitedResource<>(maxBytes, consumedBytes.longValue()), 100, TimeUnit.MILLISECONDS); return count; }); } @@ -404,7 +406,7 @@ private void givenDataVolumeUsageInBytes(final Long maxBytes, final Long consume private void givenDeviceConnectionDuration(final Duration maxDuration, final Duration consumedConnectionDuration) { when(connectionDurationCache.get(any(LimitedResourceKey.class))).thenAnswer(i -> { final var count = new CompletableFuture>(); - count.complete(new LimitedResource<>(maxDuration, consumedConnectionDuration)); + count.completeOnTimeout(new LimitedResource<>(maxDuration, consumedConnectionDuration), 100, TimeUnit.MILLISECONDS); return count; }); }