diff --git a/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java b/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java index fd4f6b83fb..3f8e694952 100644 --- a/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java +++ b/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java @@ -606,7 +606,7 @@ protected ProtocolAdapterCommandConsumerFactoryImpl commandConsumerFactory(final LOG.debug("using Command Router service client, configuring CommandConsumerFactory [{}]", ProtocolAdapterCommandConsumerFactoryImpl.class.getName()); - return new ProtocolAdapterCommandConsumerFactoryImpl(commandRouterClient, getComponentName()); + return new ProtocolAdapterCommandConsumerFactoryImpl(vertx, commandRouterClient, getComponentName()); } private ClientConfigProperties commandResponseSenderConfig() { diff --git a/clients/command/pom.xml b/clients/command/pom.xml index 4ec5233369..32b549b4b3 100644 --- a/clients/command/pom.xml +++ b/clients/command/pom.xml @@ -67,11 +67,26 @@ opentracing-noop true + + io.quarkus + quarkus-kubernetes-client + + + org.jboss.spec.javax.xml.bind + jboss-jaxb-api_2.3_spec + + + - ch.qos.logback - logback-classic + io.fabric8 + kubernetes-server-mock + test + + + io.quarkus + quarkus-junit5 test diff --git a/clients/command/src/main/java/org/eclipse/hono/client/command/CommandRoutingUtil.java b/clients/command/src/main/java/org/eclipse/hono/client/command/CommandRoutingUtil.java index eb2edb3725..c6268bddec 100644 --- a/clients/command/src/main/java/org/eclipse/hono/client/command/CommandRoutingUtil.java +++ b/clients/command/src/main/java/org/eclipse/hono/client/command/CommandRoutingUtil.java @@ -43,23 +43,25 @@ private CommandRoutingUtil() { /** * Creates a new adapter instance identifier, used for identifying the protocol adapter to route a command to. *

- * If this method is invoked from within a docker container in a Kubernetes cluster, the format is - * [prefix]_[docker_container_id]_[counter], with prefix being the name of the Kubernetes pod. + * If this application is running in a Kubernetes cluster and the container id is supplied as parameter here, + * the format of the returned id is [prefix]_[container_id]_[counter], with prefix being the name of + * the Kubernetes pod. * See also {@link #getK8sPodNameAndContainerIdFromAdapterInstanceId(String)}. *

* If not running in a Kubernetes cluster, a random id with the given adapter name as prefix is used. * * @param adapterName The adapter name. + * @param k8sContainerId The container id or {@code null} if not running in Kubernetes. * @param counter The counter value to use. * @return The new adapter instance identifier. */ - public static String getNewAdapterInstanceId(final String adapterName, final int counter) { - final String k8sContainerId = CgroupV1KubernetesContainerUtil.getContainerId(); + public static String getNewAdapterInstanceId(final String adapterName, final String k8sContainerId, + final int counter) { if (k8sContainerId == null || k8sContainerId.length() < 12) { return getNewAdapterInstanceIdForNonK8sEnv(adapterName); } else { - // running in Kubernetes: prefer HOSTNAME env var containing the pod name - String prefix = System.getenv("HOSTNAME"); + // running in Kubernetes: use pod name as prefix + String prefix = KubernetesContainerInfoProvider.getInstance().getPodName(); if (Strings.isNullOrEmpty(prefix)) { prefix = adapterName; } diff --git a/clients/command/src/main/java/org/eclipse/hono/client/command/KubernetesContainerInfoProvider.java b/clients/command/src/main/java/org/eclipse/hono/client/command/KubernetesContainerInfoProvider.java new file mode 100644 index 0000000000..83e32f58aa --- /dev/null +++ b/clients/command/src/main/java/org/eclipse/hono/client/command/KubernetesContainerInfoProvider.java @@ -0,0 +1,229 @@ +/******************************************************************************* + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package org.eclipse.hono.client.command; + +import java.net.HttpURLConnection; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import org.eclipse.hono.client.ServerErrorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.api.model.ContainerStatus; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.utils.PodStatusUtil; +import io.vertx.core.Context; +import io.vertx.core.Future; +import io.vertx.core.Promise; + +/** + * Provides information about the Kubernetes container if this application is running in Kubernetes. + */ +public class KubernetesContainerInfoProvider { + + /** + * Name of the environment variable that contains the name of the container that this application is running in. + *
+ * Such an environment variable needs to be set to determine the container id if the pod that this application + * is running in contains multiple running containers. + */ + public static final String KUBERNETES_CONTAINER_NAME_ENV_VAR = "KUBERNETES_CONTAINER_NAME"; + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesContainerInfoProvider.class); + + private static final KubernetesContainerInfoProvider INSTANCE = new KubernetesContainerInfoProvider(); + + private final AtomicReference> containerIdPromiseRef = new AtomicReference<>(); + + private String containerId; + private final String podName; + private final boolean isRunningInKubernetes; + + private KubernetesContainerInfoProvider() { + isRunningInKubernetes = System.getenv("KUBERNETES_SERVICE_HOST") != null; + podName = isRunningInKubernetes ? System.getenv("HOSTNAME") : null; + } + + /** + * Gets the KubernetesContainerInfoProvider instance. + * + * @return The KubernetesContainerInfoProvider. + */ + public static KubernetesContainerInfoProvider getInstance() { + return INSTANCE; + } + + boolean isRunningInKubernetes() { + return isRunningInKubernetes; + } + + /** + * Gets the Kubernetes pod name if running in Kubernetes. + * + * @return The pod name or {@code null} if not running in Kubernetes. + */ + public String getPodName() { + return podName; + } + + /** + * Determines the container id if running in a container in Kubernetes. + *

+ * First an attempt is made to get the container id by inspecting /proc/self/cgroup + * (via {@link CgroupV1KubernetesContainerUtil#getContainerId()}). + * If not found there, the container id is queried via the Kubernetes API. + *

+ * NOTE: The service account of the application pod must have an RBAC role allowing "get" on the "pods" resource. + * If this application is running in a pod with multiple containers, the container that this application is running + * in must have an environment variable with the name specified in {@link #KUBERNETES_CONTAINER_NAME_ENV_VAR} set + * to the container name. + * + * @param context The vert.x context to run the code on to determine the container id via the K8s API. + * @return A future indicating the outcome of the operation. + * The future will be succeeded with the container id or {@code null} if not running in Kubernetes. + * In case of an error determining the container id, the future will be failed with either a + * {@link IllegalStateException} if a precondition for getting the id isn't fulfilled (because of missing + * permissions or because multiple pod containers exist and no KUBERNETES_CONTAINER_NAME env var is set), + * or otherwise a {@link ServerErrorException}. + * @throws NullPointerException if context is {@code null}. + */ + public Future getContainerId(final Context context) { + Objects.requireNonNull(context); + if (containerId != null) { + return Future.succeededFuture(containerId); + } + if (!isRunningInKubernetes()) { + return Future.succeededFuture(null); + } + final String containerIdViaCgroup1 = CgroupV1KubernetesContainerUtil.getContainerId(); + if (containerIdViaCgroup1 != null) { + containerId = containerIdViaCgroup1; + return Future.succeededFuture(containerId); + } + final Promise containerIdPromise = Promise.promise(); + if (!containerIdPromiseRef.compareAndSet(null, containerIdPromise)) { + containerIdPromiseRef.get().future().onComplete(containerIdPromise); + LOG.debug("getContainerId result future will be completed with the result of an already ongoing invocation"); + return containerIdPromise.future(); + } + context.executeBlocking(codeHandler -> { + try { + containerId = getContainerIdViaK8sApi(); + codeHandler.complete(containerId); + } catch (final Exception e) { + codeHandler.fail(e); + } + }, containerIdPromise); + containerIdPromise.future().onComplete(ar -> containerIdPromiseRef.set(null)); + return containerIdPromise.future(); + } + + private String getContainerIdViaK8sApi() throws ServerErrorException, IllegalStateException { + try (KubernetesClient client = new KubernetesClientBuilder().build()) { + // container name env var needs to be set if there are multiple running containers in the pod + final String containerNameEnvVarValue = System.getenv(KUBERNETES_CONTAINER_NAME_ENV_VAR); + return getContainerIdViaK8sApi(client, getPodName(), containerNameEnvVarValue); + + } catch (final KubernetesClientException e) { + if (e.getCause() != null && e.getCause().getMessage() != null + && e.getCause().getMessage().contains("timed out")) { + final String errorMsg = "Timed out getting container id via K8s API. Consider increasing " + + "the request timeout via the KUBERNETES_REQUEST_TIMEOUT env var (default is 10000[ms])."; + LOG.error(errorMsg); + throw new ServerErrorException(HttpURLConnection.HTTP_INTERNAL_ERROR, errorMsg); + } + throw new ServerErrorException(HttpURLConnection.HTTP_INTERNAL_ERROR, + "Error getting container id via K8s API: " + e.getMessage()); + } + } + + static String getContainerIdViaK8sApi(final KubernetesClient client, final String podName, + final String containerNameEnvVarValue) throws KubernetesClientException, IllegalStateException { + LOG.info("get container id via K8s API"); + try { + final String namespace = Optional.ofNullable(client.getNamespace()).orElse("default"); + final Pod pod = client.pods().inNamespace(namespace).withName(podName).get(); + if (pod == null) { + throw new KubernetesClientException("application pod not found in Kubernetes namespace " + namespace); + } + final List containerStatuses = PodStatusUtil.getContainerStatus(pod).stream() + .filter(KubernetesContainerInfoProvider::isContainerRunning).toList(); + if (containerStatuses.isEmpty()) { + LOG.debug("got empty container statuses list"); + throw new KubernetesClientException( + "no running container found in pod %s, namespace %s".formatted(podName, namespace)); + } + final ContainerStatus foundContainerStatus; + if (containerStatuses.size() > 1) { + final String foundContainerNames = containerStatuses.stream().map(ContainerStatus::getName) + .collect(Collectors.joining(", ")); + if (containerNameEnvVarValue == null) { + LOG.error( + "can't get container id: found multiple running containers, but {} env var is not set " + + "to specify which container to use; found containers [{}] in pod {}", + KUBERNETES_CONTAINER_NAME_ENV_VAR, foundContainerNames, podName); + throw new IllegalStateException( + ("can't get container id via K8s API: multiple running containers found; " + + "the %s env variable needs to be set for the container this application is running in, " + + "having the container name as value") + .formatted(KUBERNETES_CONTAINER_NAME_ENV_VAR)); + } + LOG.info("multiple running containers found: {}", foundContainerNames); + LOG.info("using container name {} (derived from env var {}) to determine container id", + containerNameEnvVarValue, KUBERNETES_CONTAINER_NAME_ENV_VAR); + foundContainerStatus = containerStatuses.stream() + .filter(status -> status.getName().equals(containerNameEnvVarValue)) + .findFirst() + .orElseThrow(() -> new KubernetesClientException( + "no running container with name %s found in pod %s, namespace %s" + .formatted(containerNameEnvVarValue, podName, namespace))); + } else { + foundContainerStatus = containerStatuses.get(0); + } + String containerId = foundContainerStatus.getContainerID(); + // remove container runtime prefix (e.g. "containerd://") + final int delimIdx = containerId.lastIndexOf("://"); + if (delimIdx > -1) { + containerId = containerId.substring(delimIdx + 3); + } + LOG.info("got container id via K8s API: {}", containerId); + return containerId; + + } catch (final KubernetesClientException e) { + // rethrow error concerning missing RBAC role assignment as IllegalStateException to skip retry + // Error message looks like this: + // Forbidden!Configured service account doesn't have access. Service account may have been revoked. pods "XXX" is forbidden: + // User "XXX" cannot get resource "pods" in API group "" in the namespace "hono". + if (e.getMessage().contains("orbidden")) { + LOG.error("Error getting container id via K8s API: \n{}", e.getMessage()); + throw new IllegalStateException("error getting container id via K8s API: " + + "application pod needs service account with role binding allowing 'get' on 'pods' resource"); + } + LOG.error("Error getting container id via K8s API", e); + throw e; + } + } + + private static boolean isContainerRunning(final ContainerStatus containerStatus) { + return containerStatus.getState() != null + && containerStatus.getState().getRunning() != null; + } +} diff --git a/clients/command/src/main/java/org/eclipse/hono/client/command/ProtocolAdapterCommandConsumerFactoryImpl.java b/clients/command/src/main/java/org/eclipse/hono/client/command/ProtocolAdapterCommandConsumerFactoryImpl.java index c7db5077df..8a9acd6c47 100644 --- a/clients/command/src/main/java/org/eclipse/hono/client/command/ProtocolAdapterCommandConsumerFactoryImpl.java +++ b/clients/command/src/main/java/org/eclipse/hono/client/command/ProtocolAdapterCommandConsumerFactoryImpl.java @@ -19,6 +19,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Function; @@ -35,9 +37,13 @@ import io.opentracing.SpanContext; import io.vertx.core.CompositeFuture; +import io.vertx.core.Context; import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; import io.vertx.ext.healthchecks.HealthCheckHandler; +import io.vertx.ext.healthchecks.Status; /** * A Protocol Adapter factory for creating consumers of command messages. @@ -51,31 +57,41 @@ public class ProtocolAdapterCommandConsumerFactoryImpl implements ProtocolAdapte private static final AtomicInteger ADAPTER_INSTANCE_ID_COUNTER = new AtomicInteger(); - /** - * Identifier that has to be unique to this factory instance. - * Will be used to represent the protocol adapter (verticle) instance that this factory instance is used in, - * when registering command handlers with the command router service client. - */ - private final String adapterInstanceId; + private final Vertx vertx; + private final int adapterInstanceIdCounterValue; + private final String adapterName; private final CommandHandlers commandHandlers = new CommandHandlers(); private final CommandRouterClient commandRouterClient; private final List internalCommandConsumers = new ArrayList<>(); + private final AtomicBoolean stopCalled = new AtomicBoolean(); private int maxTenantIdsPerRequest = 100; + private KubernetesContainerInfoProvider kubernetesContainerInfoProvider = KubernetesContainerInfoProvider.getInstance(); + private final List> internalCommandConsumerSuppliers = new ArrayList<>(); + private HealthCheckHandler readinessHandler; + /** + * Identifier that has to be unique to this factory instance. + * Will be used to represent the protocol adapter (verticle) instance that this factory instance is used in, + * when registering command handlers with the command router service client. + */ + private String adapterInstanceId; + private String startFailureMessage; /** * Creates a new factory. * + * @param vertx The Vert.x instance to use. * @param commandRouterClient The client to use for accessing the command router service. * @param adapterName The name of the protocol adapter. * @throws NullPointerException if any of the parameters is {@code null}. */ - public ProtocolAdapterCommandConsumerFactoryImpl(final CommandRouterClient commandRouterClient, final String adapterName) { + public ProtocolAdapterCommandConsumerFactoryImpl(final Vertx vertx, final CommandRouterClient commandRouterClient, + final String adapterName) { + this.vertx = Objects.requireNonNull(vertx); this.commandRouterClient = Objects.requireNonNull(commandRouterClient); - Objects.requireNonNull(adapterName); + this.adapterName = Objects.requireNonNull(adapterName); - this.adapterInstanceId = CommandRoutingUtil.getNewAdapterInstanceId(adapterName, - ADAPTER_INSTANCE_ID_COUNTER.getAndIncrement()); + this.adapterInstanceIdCounterValue = ADAPTER_INSTANCE_ID_COUNTER.getAndIncrement(); if (commandRouterClient instanceof ConnectionLifecycle) { ((ConnectionLifecycle) commandRouterClient).addReconnectListener(con -> reenableCommandRouting()); } @@ -85,6 +101,10 @@ void setMaxTenantIdsPerRequest(final int count) { this.maxTenantIdsPerRequest = count; } + void setKubernetesContainerInfoProvider(final KubernetesContainerInfoProvider kubernetesContainerInfoProvider) { + this.kubernetesContainerInfoProvider = kubernetesContainerInfoProvider; + } + private void reenableCommandRouting() { final List tenantIds = commandHandlers.getCommandHandlers().stream() .map(CommandHandlerWrapper::getTenantId) @@ -115,10 +135,7 @@ private void reenableCommandRouting() { */ public void registerInternalCommandConsumer( final BiFunction internalCommandConsumerSupplier) { - final InternalCommandConsumer consumer = internalCommandConsumerSupplier.apply(adapterInstanceId, - commandHandlers); - LOG.info("register internal command consumer {}", consumer.getClass().getSimpleName()); - internalCommandConsumers.add(consumer); + this.internalCommandConsumerSuppliers.add(internalCommandConsumerSupplier); } /** @@ -131,18 +148,58 @@ public void registerInternalCommandConsumer( */ @Override public Future start() { - @SuppressWarnings("rawtypes") - final List futures = internalCommandConsumers.stream() - .map(Lifecycle::start) - .collect(Collectors.toList()); - if (futures.isEmpty()) { - return Future.failedFuture("no command consumer registered"); + if (internalCommandConsumerSuppliers.isEmpty()) { + startFailureMessage = "no command consumer registered"; + LOG.error("cannot start, {}", startFailureMessage); + return Future.failedFuture(startFailureMessage); } - return CompositeFuture.all(futures).mapEmpty(); + return getK8sContainerId(1) + .compose(containerId -> { + adapterInstanceId = CommandRoutingUtil.getNewAdapterInstanceId(adapterName, containerId, + adapterInstanceIdCounterValue); + internalCommandConsumerSuppliers.stream() + .map(sup -> sup.apply(adapterInstanceId, commandHandlers)) + .forEach(consumer -> { + LOG.info("created internal command consumer {}", consumer.getClass().getSimpleName()); + internalCommandConsumers.add(consumer); + Optional.ofNullable(readinessHandler).ifPresent(consumer::registerReadinessChecks); + }); + internalCommandConsumerSuppliers.clear(); + readinessHandler = null; + @SuppressWarnings("rawtypes") + final List futures = internalCommandConsumers.stream() + .map(Lifecycle::start) + .collect(Collectors.toList()); + if (futures.isEmpty()) { + return Future.failedFuture("no command consumer registered"); + } + return CompositeFuture.all(futures).mapEmpty(); + }) + .recover(thr -> { + startFailureMessage = thr.getMessage(); + return Future.failedFuture(thr); + }).mapEmpty(); + } + + private Future getK8sContainerId(final int attempt) { + final Context context = vertx.getOrCreateContext(); + return kubernetesContainerInfoProvider.getContainerId(context) + .recover(thr -> { + if (thr instanceof IllegalStateException || stopCalled.get()) { + return Future.failedFuture(thr); + } + LOG.info("attempt {} to get K8s container id failed, trying again...", attempt); + final Promise containerIdPromise = Promise.promise(); + context.runOnContext(action -> getK8sContainerId(attempt + 1).onComplete(containerIdPromise)); + return containerIdPromise.future(); + }); } @Override public Future stop() { + if (!stopCalled.compareAndSet(false, true)) { + return Future.succeededFuture(); + } @SuppressWarnings("rawtypes") final List futures = internalCommandConsumers.stream() .map(Lifecycle::stop) @@ -152,7 +209,26 @@ public Future stop() { @Override public void registerReadinessChecks(final HealthCheckHandler readinessHandler) { - internalCommandConsumers.forEach(consumer -> consumer.registerReadinessChecks(readinessHandler)); + if (!internalCommandConsumers.isEmpty()) { + LOG.warn("registerReadinessChecks expected to be called before start()"); + internalCommandConsumers.forEach(consumer -> consumer.registerReadinessChecks(readinessHandler)); + return; + } + this.readinessHandler = readinessHandler; + readinessHandler.register("command-consumer-factory", 1000, this::checkIfInternalCommandConsumersCreated); + } + + private void checkIfInternalCommandConsumersCreated(final Promise status) { + if (internalCommandConsumers.isEmpty() || startFailureMessage != null) { + final JsonObject data = new JsonObject(); + if (startFailureMessage != null) { + LOG.error("failed to start command consumer factory: {}", startFailureMessage); + data.put("status", "startup of command consumer factory failed, check logs for details"); + } + status.tryComplete(Status.KO(data)); + } else { + status.tryComplete(Status.OK()); + } } @Override public void registerLivenessChecks(final HealthCheckHandler livenessHandler) { @@ -201,6 +277,9 @@ private Future doCreateCommandConsumer( final Function> commandHandler, final Duration lifespan, final SpanContext context) { + if (adapterInstanceId == null) { + return Future.failedFuture("not started yet"); + } // lifespan greater than what can be expressed in nanoseconds (i.e. 292 years) is considered unlimited, // preventing ArithmeticExceptions down the road final Duration sanitizedLifespan = lifespan == null || lifespan.isNegative() @@ -247,6 +326,9 @@ private Future removeCommandConsumer( final Instant lifespanStart, final SpanContext onCloseSpanContext) { + if (adapterInstanceId == null) { + return Future.failedFuture("not started yet"); + } final String tenantId = commandHandlerWrapper.getTenantId(); final String deviceId = commandHandlerWrapper.getDeviceId(); diff --git a/clients/command/src/test/java/org/eclipse/hono/client/command/KubernetesContainerInfoProviderTest.java b/clients/command/src/test/java/org/eclipse/hono/client/command/KubernetesContainerInfoProviderTest.java new file mode 100644 index 0000000000..d5826a7626 --- /dev/null +++ b/clients/command/src/test/java/org/eclipse/hono/client/command/KubernetesContainerInfoProviderTest.java @@ -0,0 +1,153 @@ +/******************************************************************************* + * Copyright (c) 2023 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package org.eclipse.hono.client.command; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +import static com.google.common.truth.Truth.assertThat; + +import java.util.List; +import java.util.UUID; + +import org.junit.jupiter.api.Test; + +import io.fabric8.kubernetes.api.model.ContainerStateBuilder; +import io.fabric8.kubernetes.api.model.ContainerStatus; +import io.fabric8.kubernetes.api.model.ContainerStatusBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.PodStatus; +import io.fabric8.kubernetes.api.model.PodStatusBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; +import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; + +/** + * Unit tests for {@link KubernetesContainerInfoProvider}. + */ +@EnableKubernetesMockClient(https = false) +public class KubernetesContainerInfoProviderTest { + + KubernetesMockServer server; + /** + * This client uses the namespace "test" (see KubernetesMockServer#createClient()). + */ + KubernetesClient client; + + /** + * Verifies that getting the container id via the K8s API succeeds. + */ + @Test + public void testGetContainerIdViaK8sApi() { + final String podName = "testPod0"; + final String containerId = getRandomContainerId(); + final String containerIdWithPrefix = "containerd://" + containerId; + final String containerNameEnvVarValue = null; // should not be needed in this test (only one running container) + + // GIVEN a Kubernetes API service mock returning a pod with a running container + final ContainerStatus containerStatus = createRunningContainerStatus(containerIdWithPrefix, "testContainer0"); + final Pod pod = createPod(podName, List.of(containerStatus)); + server.expect() + .withPath("/api/v1/namespaces/test/pods/" + podName) + .andReturn(200, pod) + .once(); + // WHEN invoking getContainerIdViaK8sApi + final String extractedContainerId = KubernetesContainerInfoProvider.getContainerIdViaK8sApi(client, podName, + containerNameEnvVarValue); + // THEN the expected id was returned + assertThat(extractedContainerId).isEqualTo(containerId); + } + + /** + * Verifies that getting the container id via the K8s API succeeds when there are multiple running pods. + */ + @Test + public void testGetContainerIdViaK8sApiWithMultipleRunningContainers() { + final String podName = "testPod0"; + final String containerId1WithPrefix = "containerd://" + getRandomContainerId(); + final String containerId2 = getRandomContainerId(); + final String containerId2WithPrefix = "containerd://" + containerId2; + final String containerName1 = "testContainer1"; + final String containerName2 = "testContainer2"; + final ContainerStatus containerStatus1 = createRunningContainerStatus(containerId1WithPrefix, containerName1); + final ContainerStatus containerStatus2 = createRunningContainerStatus(containerId2WithPrefix, containerName2); + + // GIVEN a Kubernetes API service mock returning a pod with multiple running containers + final Pod pod = createPod(podName, List.of(containerStatus1, containerStatus2)); + server.expect() + .withPath("/api/v1/namespaces/test/pods/" + podName) + .andReturn(200, pod) + .once(); + // WHEN invoking getContainerIdViaK8sApi, setting "testContainer2" as the KUBERNETES_CONTAINER_NAME env var value + final String extractedContainerId = KubernetesContainerInfoProvider.getContainerIdViaK8sApi(client, podName, + containerName2); + // THEN the container id of "testContainer2" is returned + assertThat(extractedContainerId).isEqualTo(containerId2); + } + + /** + * Verifies that getting the container id via the K8s API fails if there are multiple running pods, but no + * environment variable is set to specify the container name. + */ + @Test + public void testGetContainerIdViaK8sApiWithMultipleRunningContainersButNoContainerNameEnvVar() { + final String podName = "testPod0"; + final String containerId1 = getRandomContainerId(); + final String containerId2 = getRandomContainerId(); + final String containerName1 = "testContainer1"; + final String containerName2 = "testContainer2"; + final ContainerStatus containerStatus1 = createRunningContainerStatus(containerId1, containerName1); + final ContainerStatus containerStatus2 = createRunningContainerStatus(containerId2, containerName2); + final String containerNameEnvVarValue = null; + + // GIVEN a Kubernetes API service mock returning a pod with multiple running containers + final Pod pod = createPod(podName, List.of(containerStatus1, containerStatus2)); + server.expect() + .withPath("/api/v1/namespaces/test/pods/" + podName) + .andReturn(200, pod) + .once(); + + // WHEN invoking getContainerIdViaK8sApi with a null KUBERNETES_CONTAINER_NAME env var value + // THEN an IllegalStateException is thrown + final IllegalStateException thrown = assertThrows( + IllegalStateException.class, + () -> KubernetesContainerInfoProvider.getContainerIdViaK8sApi(client, podName, containerNameEnvVarValue)); + assertThat(thrown.getMessage()).contains("multiple running containers"); + } + + private static ContainerStatus createRunningContainerStatus(final String containerIdWithPrefix, + final String containerName) { + return new ContainerStatusBuilder() + .withContainerID(containerIdWithPrefix) + .withName(containerName) + .withState(new ContainerStateBuilder().withNewRunning().endRunning().build()) + .build(); + } + + private Pod createPod(final String podName, final List containerStatuses) { + final PodStatus podStatus = new PodStatusBuilder() + .withContainerStatuses(containerStatuses) + .build(); + return new PodBuilder() + .withNewMetadata() + .withName(podName) + .endMetadata() + .withStatus(podStatus) + .build(); + } + + private static String getRandomContainerId() { + return UUID.randomUUID().toString().concat(UUID.randomUUID().toString()).replaceAll("-", ""); + } +} diff --git a/clients/command/src/test/java/org/eclipse/hono/client/command/ProtocolAdapterCommandConsumerFactoryTest.java b/clients/command/src/test/java/org/eclipse/hono/client/command/ProtocolAdapterCommandConsumerFactoryTest.java index 02a9785842..74a3737542 100644 --- a/clients/command/src/test/java/org/eclipse/hono/client/command/ProtocolAdapterCommandConsumerFactoryTest.java +++ b/clients/command/src/test/java/org/eclipse/hono/client/command/ProtocolAdapterCommandConsumerFactoryTest.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2021, 2022 Contributors to the Eclipse Foundation + * Copyright (c) 2021, 2023 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -28,15 +28,19 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.UUID; +import java.util.function.BiFunction; import org.eclipse.hono.client.amqp.connection.ConnectionLifecycle; import org.eclipse.hono.client.amqp.connection.HonoConnection; import org.eclipse.hono.client.amqp.connection.ReconnectListener; +import org.eclipse.hono.test.VertxMockSupport; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import io.vertx.core.Future; +import io.vertx.core.Vertx; /** * Tests verifying behavior of {@link ProtocolAdapterCommandConsumerFactoryImpl}. @@ -45,17 +49,30 @@ public class ProtocolAdapterCommandConsumerFactoryTest { private CommandRouterClient commandRouterClient; + private KubernetesContainerInfoProvider kubernetesContainerInfoProvider; + private BiFunction internalCommandConsumerSupplier; private ProtocolAdapterCommandConsumerFactoryImpl factory; /** * Sets up the fixture. */ + @SuppressWarnings("unchecked") @BeforeEach void setUp() { commandRouterClient = mock(CommandRouterClient.class, withSettings().extraInterfaces(ConnectionLifecycle.class)); when(commandRouterClient.registerCommandConsumer(anyString(), anyString(), anyBoolean(), anyString(), any(Duration.class), any())) .thenReturn(Future.succeededFuture()); - factory = new ProtocolAdapterCommandConsumerFactoryImpl(commandRouterClient, "test-adapter"); + kubernetesContainerInfoProvider = mock(KubernetesContainerInfoProvider.class); + when(kubernetesContainerInfoProvider.getContainerId(any())).thenReturn(Future.succeededFuture(null)); + final Vertx vertx = mock(Vertx.class); + VertxMockSupport.executeBlockingCodeImmediately(vertx); + factory = new ProtocolAdapterCommandConsumerFactoryImpl(vertx, commandRouterClient, "test-adapter"); + factory.setKubernetesContainerInfoProvider(kubernetesContainerInfoProvider); + internalCommandConsumerSupplier = mock(BiFunction.class); + final InternalCommandConsumer internalCommandConsumer = mock(InternalCommandConsumer.class); + when(internalCommandConsumer.start()).thenReturn(Future.succeededFuture()); + when(internalCommandConsumerSupplier.apply(anyString(), any())).thenReturn(internalCommandConsumer); + factory.registerInternalCommandConsumer(internalCommandConsumerSupplier); } @SuppressWarnings("unchecked") @@ -67,6 +84,7 @@ void testFactoryReenablesCommandRouting() { final ArgumentCaptor> reconnectListener = ArgumentCaptor.forClass(ReconnectListener.class); verify(conLifecycle).addReconnectListener(reconnectListener.capture()); factory.setMaxTenantIdsPerRequest(2); + factory.start(); factory.createCommandConsumer("tenant1", "device1", "adapter", true, ctx -> Future.succeededFuture(), Duration.ofMinutes(10), null); factory.createCommandConsumer("tenant2", "device2", "adapter", true, ctx -> Future.succeededFuture(), Duration.ofMinutes(10), null); @@ -89,4 +107,50 @@ void testFactoryReenablesCommandRouting() { }), any()); assertThat(enabledTenants).containsExactly("tenant1", "tenant2", "tenant3"); } + + /** + * Verifies that if the command consumer factory is used within a Kubernetes container, the internal command + * consumer is initialized with an adapter instance id containing the container id. + */ + @Test + void testFactoryCreatesInternalConsumersWithAdapterInstanceId() { + final String containerId = getTestContainerId(); + final String shortContainerId = containerId.substring(0, 12); + // GIVEN a scenario where the application runs in a Kubernetes container + when(kubernetesContainerInfoProvider.getContainerId(any())).thenReturn(Future.succeededFuture(containerId)); + // WHEN starting the command consumer factory + factory.start(); + + // THEN the internal command consumer has been initialized with an adapter instance id containing the container id + verify(internalCommandConsumerSupplier).apply(argThat(adapterInstanceId -> { + return adapterInstanceId.contains(shortContainerId); + }), any()); + } + + /** + * Verifies that if the command consumer factory is used within a Kubernetes container, there are multiple + * attempts being made to determine the container id on startup. + */ + @SuppressWarnings("unchecked") + @Test + void testFactoryRetriesGetContainerId() { + final String containerId = getTestContainerId(); + final String shortContainerId = containerId.substring(0, 12); + // GIVEN a scenario where the application runs in a Kubernetes container, but the container id can only be + // determined on the 3rd attempt + when(kubernetesContainerInfoProvider.getContainerId(any())).thenReturn( + Future.failedFuture("some error"), + Future.failedFuture("some error"), + Future.succeededFuture(containerId)); + // WHEN starting the command consumer factory + factory.start(); + // THEN the internal command consumer has been initialized with an adapter instance id containing the container id + verify(internalCommandConsumerSupplier).apply(argThat(adapterInstanceId -> { + return adapterInstanceId.contains(shortContainerId); + }), any()); + } + + private static String getTestContainerId() { + return UUID.randomUUID().toString().concat(UUID.randomUUID().toString()).replaceAll("-", ""); + } }