diff --git a/service-discovery/kubernetes/src/main/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscovery.java b/service-discovery/kubernetes/src/main/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscovery.java index 98e4f49d..8e5fec1a 100644 --- a/service-discovery/kubernetes/src/main/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscovery.java +++ b/service-discovery/kubernetes/src/main/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscovery.java @@ -20,12 +20,15 @@ import io.fabric8.kubernetes.api.model.EndpointPort; import io.fabric8.kubernetes.api.model.EndpointSubset; import io.fabric8.kubernetes.api.model.Endpoints; +import io.fabric8.kubernetes.api.model.EndpointsList; import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.ConfigBuilder; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.fabric8.kubernetes.client.dsl.AnyNamespaceOperation; +import io.fabric8.kubernetes.client.dsl.Resource; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.smallrye.mutiny.Uni; import io.smallrye.stork.api.Metadata; @@ -84,7 +87,13 @@ public KubernetesServiceDiscovery(String serviceName, KubernetesConfiguration co this.client = new KubernetesClientBuilder().withConfig(k8sConfig).build(); this.vertx = vertx; this.secure = isSecure(config); - client.endpoints().inform(new ResourceEventHandler() { + AnyNamespaceOperation> endpointsOperation; + if (allNamespaces) { + endpointsOperation = client.endpoints().inAnyNamespace(); + } else { + endpointsOperation = client.endpoints().inNamespace(namespace); + } + endpointsOperation.inform(new ResourceEventHandler() { @Override public void onAdd(Endpoints obj) { LOGGER.info("Endpoint added: {}", obj.getMetadata().getName()); diff --git a/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryCDITest.java b/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryCDITest.java index 4fef5c93..78434cab 100644 --- a/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryCDITest.java +++ b/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryCDITest.java @@ -7,6 +7,7 @@ import static org.awaitility.Awaitility.await; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -24,14 +25,18 @@ import org.jboss.weld.junit5.WeldSetup; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; import org.junit.jupiter.api.extension.ExtendWith; import io.fabric8.kubernetes.api.model.EndpointAddress; import io.fabric8.kubernetes.api.model.EndpointAddressBuilder; +import io.fabric8.kubernetes.api.model.EndpointPort; import io.fabric8.kubernetes.api.model.EndpointPortBuilder; import io.fabric8.kubernetes.api.model.EndpointSubsetBuilder; import io.fabric8.kubernetes.api.model.Endpoints; import io.fabric8.kubernetes.api.model.EndpointsBuilder; +import io.fabric8.kubernetes.api.model.EndpointsListBuilder; import io.fabric8.kubernetes.api.model.ObjectReference; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodBuilder; @@ -48,6 +53,7 @@ import io.smallrye.stork.test.StorkTestUtils; import io.smallrye.stork.test.TestConfigProviderBean; +@DisabledOnOs(OS.WINDOWS) @ExtendWith(WeldJunit5Extension.class) @EnableKubernetesMockClient(crud = true) public class KubernetesServiceDiscoveryCDITest { @@ -112,6 +118,44 @@ void shouldGetServiceFromK8sDefaultNamespace() { assertThat(instances.get()).allSatisfy(si -> assertThat(si.isSecure()).isFalse()); } + @Test + void shouldGetServiceFromK8sWithApplicationNameConfig() { + config.addServiceConfig("svc", null, "kubernetes", null, + null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", defaultNamespace, "application", "greetingApp"), null); + Stork stork = StorkTestUtils.getNewStorkInstance(); + + String serviceName = "svc"; + String[] ips = { "10.96.96.231", "10.96.96.232", "10.96.96.233" }; + + registerKubernetesResources("greetingApp", defaultNamespace, ips); + + AtomicReference> instances = new AtomicReference<>(); + + Service service = stork.getService(serviceName); + service.getServiceDiscovery().getServiceInstances() + .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) + .subscribe().with(instances::set); + + await().atMost(Duration.ofSeconds(5)) + .until(() -> instances.get() != null); + + assertThat(instances.get()).hasSize(3); + assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080); + assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231", + "10.96.96.232", "10.96.96.233"); + for (ServiceInstance serviceInstance : instances.get()) { + Map labels = serviceInstance.getLabels(); + assertThat(labels).contains(entry("app.kubernetes.io/name", "greetingApp"), + entry("app.kubernetes.io/version", "1.0"), + entry("ui", "ui-" + ipAsSuffix(serviceInstance.getHost()))); + } + instances.get().stream().map(ServiceInstance::getMetadata).forEach(metadata -> { + Metadata k8sMetadata = (Metadata) metadata; + assertThat(k8sMetadata.getMetadata()).containsKey(META_K8S_SERVICE_ID); + }); + assertThat(instances.get()).allSatisfy(si -> assertThat(si.isSecure()).isFalse()); + } + @Test void shouldGetServiceFromK8sDefaultNamespaceUsingProgrammaticAPI() { Stork stork = StorkTestUtils.getNewStorkInstance(); @@ -265,6 +309,96 @@ void shouldGetServiceFromSpecificNamespace() { } } + @Test + void shouldGetServiceUsingFirstPortWhenMultiplePortsFromSpecificNamespace() { + String serviceName = "svc"; + String specificNs = "ns1"; + + config.addServiceConfig(serviceName, null, "kubernetes", null, + null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", "ns1"), null); + Stork stork = StorkTestUtils.getNewStorkInstance(); + + String[] ips = new String[] { "10.96.96.231", "10.96.96.232", "10.96.96.233" }; + EndpointPort[] ports = new EndpointPort[] { + new EndpointPortBuilder().withName("http1").withPort(8080).withProtocol("TCP").build(), + new EndpointPortBuilder().withName("http2").withPort(8081).withProtocol("TCP").build() }; + buildAndRegisterKubernetesService(serviceName, specificNs, true, ports, ips); + Arrays.stream(ips).forEach(ip -> buildAndRegisterBackendPod(serviceName, specificNs, true, ip)); + + AtomicReference> instances = new AtomicReference<>(); + + Service service = stork.getService(serviceName); + service.getServiceDiscovery().getServiceInstances() + .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) + .subscribe().with(instances::set); + + await().atMost(Duration.ofSeconds(5)) + .until(() -> instances.get() != null); + + assertThat(instances.get()).hasSize(3); + assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080); + assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231", + "10.96.96.232", "10.96.96.233"); + instances.get().stream().map(ServiceInstance::getLabels) + .forEach(serviceInstanceLabels -> assertThat(serviceInstanceLabels) + .contains(entry("app.kubernetes.io/name", "svc"), entry("app.kubernetes.io/version", "1.0"))); + instances.get().stream().map(ServiceInstance::getMetadata).forEach(metadata -> { + Metadata k8sMetadata = (Metadata) metadata; + assertThat(k8sMetadata.getMetadata()).containsKey(META_K8S_SERVICE_ID); + }); + for (ServiceInstance serviceInstance : instances.get()) { + Map labels = serviceInstance.getLabels(); + assertThat(labels).contains(entry("app.kubernetes.io/name", "svc"), + entry("app.kubernetes.io/version", "1.0"), + entry("ui", "ui-" + ipAsSuffix(serviceInstance.getHost()))); + } + } + + @Test + void shouldGetServiceUsingSelectedPortNameWhenMultiplePortsFromSpecificNamespace() { + String serviceName = "svc"; + String specificNs = "ns1"; + + config.addServiceConfig(serviceName, null, "kubernetes", null, + null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", "ns1", "port-name", "http1"), null); + Stork stork = StorkTestUtils.getNewStorkInstance(); + + String[] ips = new String[] { "10.96.96.231", "10.96.96.232", "10.96.96.233" }; + EndpointPort[] ports = new EndpointPort[] { + new EndpointPortBuilder().withName("http1").withPort(8080).withProtocol("TCP").build(), + new EndpointPortBuilder().withName("http2").withPort(8081).withProtocol("TCP").build() }; + buildAndRegisterKubernetesService(serviceName, specificNs, true, ports, ips); + Arrays.stream(ips).forEach(ip -> buildAndRegisterBackendPod(serviceName, specificNs, true, ip)); + + AtomicReference> instances = new AtomicReference<>(); + + Service service = stork.getService(serviceName); + service.getServiceDiscovery().getServiceInstances() + .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) + .subscribe().with(instances::set); + + await().atMost(Duration.ofSeconds(5)) + .until(() -> instances.get() != null); + + assertThat(instances.get()).hasSize(3); + assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080); + assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231", + "10.96.96.232", "10.96.96.233"); + instances.get().stream().map(ServiceInstance::getLabels) + .forEach(serviceInstanceLabels -> assertThat(serviceInstanceLabels) + .contains(entry("app.kubernetes.io/name", "svc"), entry("app.kubernetes.io/version", "1.0"))); + instances.get().stream().map(ServiceInstance::getMetadata).forEach(metadata -> { + Metadata k8sMetadata = (Metadata) metadata; + assertThat(k8sMetadata.getMetadata()).containsKey(META_K8S_SERVICE_ID); + }); + for (ServiceInstance serviceInstance : instances.get()) { + Map labels = serviceInstance.getLabels(); + assertThat(labels).contains(entry("app.kubernetes.io/name", "svc"), + entry("app.kubernetes.io/version", "1.0"), + entry("ui", "ui-" + ipAsSuffix(serviceInstance.getHost()))); + } + } + @Test void shouldGetServiceFromAllNamespace() { @@ -377,21 +511,71 @@ void shouldPreserveIdsOnRefetch() throws InterruptedException { } } + @Test + void shouldGetInstancesFromCache() throws InterruptedException { + String serviceName = "svc"; + + //Recording k8s cluster calls and build a endpoints as response + AtomicInteger serverHit = new AtomicInteger(0); + server.expect().get().withPath("/api/v1/namespaces/test/endpoints?fieldSelector=metadata.name%3Dsvc") + .andReply(200, r -> { + serverHit.incrementAndGet(); + List endpointsList = new ArrayList<>(); + endpointsList.add(registerKubernetesResources(serviceName, defaultNamespace, "10.96.96.231", "10.96.96.232", + "10.96.96.233")); + return new EndpointsListBuilder().withItems(endpointsList).build(); + }).always(); + + config.addServiceConfig(serviceName, null, "kubernetes", null, + null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", defaultNamespace, "refresh-period", "3"), null); + Stork stork = StorkTestUtils.getNewStorkInstance(); + + AtomicReference> instances = new AtomicReference<>(); + + Service service = stork.getService(serviceName); + service.getServiceDiscovery().getServiceInstances() + .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) + .subscribe().with(instances::set); + + await().atMost(Duration.ofSeconds(5)) + .until(() -> instances.get() != null); + + assertThat(serverHit.get()).isEqualTo(1); + assertThat(instances.get()).hasSize(3); + assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080); + assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231", + "10.96.96.232", "10.96.96.233"); + + //second try to get instances, instances should be fetched from cache, cluster calls should be still 1 + service.getServiceDiscovery().getServiceInstances() + .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) + .subscribe().with(instances::set); + + await().atMost(Duration.ofSeconds(5)) + .until(() -> instances.get() != null); + + assertThat(serverHit.get()).isEqualTo(1); + assertThat(instances.get()).hasSize(3); + assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080); + assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231", + "10.96.96.232", "10.96.96.233"); + + } + @Test void shouldFetchInstancesFromTheClusterWhenCacheIsInvalidated() throws InterruptedException { - // Given a service with 3 instances registered in the cluster + // Given a service with 3 instances registered in the cluster, in `test` namespace // Stork gather the cache from the cluster // When the endpoints are removed (this invalidates the cache) // Stork is called to get service instances again // Stork contacts the cluster to get the instances : it gets 0 of them + String serviceName = "svc"; config.addServiceConfig("svc", null, "kubernetes", null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", defaultNamespace, "refresh-period", "3")); Stork stork = StorkTestUtils.getNewStorkInstance(); - String serviceName = "svc"; - registerKubernetesResources(serviceName, defaultNamespace, "10.96.96.231", "10.96.96.232", "10.96.96.233"); AtomicReference> instances = new AtomicReference<>(); @@ -409,9 +593,7 @@ void shouldFetchInstancesFromTheClusterWhenCacheIsInvalidated() throws Interrupt assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231", "10.96.96.232", "10.96.96.233"); - client.endpoints().withName(serviceName).delete(); - - Thread.sleep(5000); + client.endpoints().inNamespace(defaultNamespace).withName(serviceName).delete(); service.getServiceDiscovery().getServiceInstances() .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) @@ -424,31 +606,21 @@ void shouldFetchInstancesFromTheClusterWhenCacheIsInvalidated() throws Interrupt } @Test - void shouldFetchInstancesFromTheCache() throws InterruptedException { - - // Stork gathers the cache from the cluster - // Configure the mock cluster for recordings calls to a specific path - // Stork is called twice to get service instances - // Stork get the instances from the cache: assert that only 1 call to the cluster has been done + void shouldFetchInstancesFromAllNsWhenCacheIsInvalidated() throws InterruptedException { + // Given a service with 3 instances registered in the cluster in any namespace + // Stork gather the cache from the cluster + // When the endpoints are removed (this invalidates the cache) + // Stork is called to get service instances again + // Stork contacts the cluster to get the instances : it gets 0 of them String serviceName = "svc"; - String[] ips = { "10.96.96.231" }; - //Recording k8s cluster calls and build the endpoints as response - AtomicInteger serverHit = new AtomicInteger(0); - server.expect().get().withPath("/api/v1/namespaces/test/endpoints?fieldSelector=metadata.name%3Dsvc") - .andReply(200, r -> { - serverHit.incrementAndGet(); - Endpoints endpoints = buildAndRegisterKubernetesService(serviceName, defaultNamespace, true, ips); - Arrays.stream(ips).map(ip -> buildAndRegisterBackendPod(serviceName, defaultNamespace, true, ips[0])) - .collect(Collectors.toList()); - return endpoints; - }).always(); - - config.addServiceConfig("svc", null, "kubernetes", - null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", defaultNamespace, "refresh-period", "3")); + config.addServiceConfig(serviceName, null, "kubernetes", null, + null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", "all", "refresh-period", "3"), null); Stork stork = StorkTestUtils.getNewStorkInstance(); + registerKubernetesResources(serviceName, defaultNamespace, "10.96.96.231", "10.96.96.232", "10.96.96.233"); + AtomicReference> instances = new AtomicReference<>(); Service service = stork.getService(serviceName); @@ -459,23 +631,28 @@ void shouldFetchInstancesFromTheCache() throws InterruptedException { await().atMost(Duration.ofSeconds(5)) .until(() -> instances.get() != null); - assertThat(serverHit.get()).isEqualTo(1); + assertThat(instances.get()).hasSize(3); + assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080); + assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231", + "10.96.96.232", "10.96.96.233"); + + client.endpoints().inNamespace(defaultNamespace).delete(); - //second try to get instances, instances should be fetched from cache service.getServiceDiscovery().getServiceInstances() .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) .subscribe().with(instances::set); await().atMost(Duration.ofSeconds(5)) - .until(() -> instances.get() != null); + .until(() -> instances.get().isEmpty()); - assertThat(serverHit.get()).isEqualTo(1); + assertThat(instances.get()).hasSize(0); } - private void registerKubernetesResources(String serviceName, String namespace, String... ips) { + private Endpoints registerKubernetesResources(String serviceName, String namespace, String... ips) { Assert.checkNotNullParam("ips", ips); - buildAndRegisterKubernetesService(serviceName, namespace, true, ips); - Arrays.stream(ips).map(ip -> buildAndRegisterBackendPod(serviceName, namespace, true, ip)).collect(Collectors.toList()); + Endpoints endpoints = buildAndRegisterKubernetesService(serviceName, namespace, true, ips); + Arrays.stream(ips).forEach(ip -> buildAndRegisterBackendPod(serviceName, namespace, true, ip)); + return endpoints; } private Map mapHostnameToIds(List serviceInstances) { @@ -486,25 +663,31 @@ private Map mapHostnameToIds(List serviceInstance return result; } - private Endpoints buildAndRegisterKubernetesService(String serviceName, String namespace, boolean register, + private Endpoints buildAndRegisterKubernetesService(String applicationName, String namespace, boolean register, String... ipAdresses) { + EndpointPort[] ports = new EndpointPort[] { new EndpointPortBuilder().withPort(8080).withProtocol("TCP").build() }; + return buildAndRegisterKubernetesService(applicationName, namespace, register, ports, ipAdresses); + } + + private Endpoints buildAndRegisterKubernetesService(String applicationName, String namespace, boolean register, + EndpointPort[] ports, String... ipAdresses) { Map serviceLabels = new HashMap<>(); - serviceLabels.put("app.kubernetes.io/name", serviceName); + serviceLabels.put("app.kubernetes.io/name", applicationName); serviceLabels.put("app.kubernetes.io/version", "1.0"); List endpointAddresses = Arrays.stream(ipAdresses) .map(ipAddress -> { ObjectReference targetRef = new ObjectReference(null, null, "Pod", - serviceName + "-" + ipAsSuffix(ipAddress), namespace, null, UUID.randomUUID().toString()); + applicationName + "-" + ipAsSuffix(ipAddress), namespace, null, UUID.randomUUID().toString()); EndpointAddress endpointAddress = new EndpointAddressBuilder().withIp(ipAddress).withTargetRef(targetRef) .build(); return endpointAddress; }).collect(Collectors.toList()); Endpoints endpoint = new EndpointsBuilder() - .withNewMetadata().withName(serviceName).withLabels(serviceLabels).endMetadata() + .withNewMetadata().withName(applicationName).withLabels(serviceLabels).endMetadata() .addToSubsets(new EndpointSubsetBuilder().withAddresses(endpointAddresses) - .addToPorts(new EndpointPortBuilder().withPort(8080).withProtocol("TCP").build()) + .addToPorts(ports) .build()) .build(); diff --git a/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryCacheTest.java b/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryCacheTest.java deleted file mode 100644 index b9784fe6..00000000 --- a/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryCacheTest.java +++ /dev/null @@ -1,257 +0,0 @@ -package io.smallrye.stork.servicediscovery.kubernetes; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; -import static org.awaitility.Awaitility.await; - -import java.time.Duration; -import java.util.*; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; - -import org.hamcrest.Matchers; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import io.fabric8.kubernetes.api.model.*; -import io.fabric8.kubernetes.client.Config; -import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; -import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; -import io.smallrye.common.constraint.Assert; -import io.smallrye.stork.Stork; -import io.smallrye.stork.api.Service; -import io.smallrye.stork.api.ServiceInstance; -import io.smallrye.stork.test.StorkTestUtils; -import io.smallrye.stork.test.TestConfigProvider; - -@EnableKubernetesMockClient(crud = true) -public class KubernetesServiceDiscoveryCacheTest { - - KubernetesMockServer server; - - KubernetesClient client; - - String k8sMasterUrl; - String defaultNamespace; - - @BeforeEach - void setUp() { - TestConfigProvider.clear(); - System.setProperty(Config.KUBERNETES_TRUST_CERT_SYSTEM_PROPERTY, "true"); - k8sMasterUrl = client.getMasterUrl().toString(); - defaultNamespace = client.getNamespace(); - } - - // @Test - // void shouldFetchInstancesFromTheCache() throws InterruptedException { - // - // // Given a service with 3 instances registered in the cluster - // // Stork gather the cache from the cluster - // // Stork is called to get service instances again - // // Stork get the instances from the cache. - // - // TestConfigProvider.addServiceConfig("svc", null, "kubernetes", - // null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", defaultNamespace)); - // Stork stork = StorkTestUtils.getNewStorkInstance(); - // - // String serviceName = "svc"; - // - // Endpoints endpoints = buildAndRegisterKubernetesService(serviceName, defaultNamespace, false, "10.96.96.231"); - // Pod pod = buildAndRegisterBackendPod(serviceName, defaultNamespace, false, "10.96.96.231"); - // - // server.expect().get().withPath("/api/v1/namespaces/test/pods/svc-109696231") - // .andReturn(HttpURLConnection.HTTP_OK, pod) - // .once(); - // - // server.expect().get().withPath("/api/v1/namespaces/test/endpoints?fieldSelector=metadata.name%3Dsvc") - // .andReturn(HttpURLConnection.HTTP_OK, new KubernetesListBuilder().addToItems(endpoints).build()).once(); - // - // AtomicReference> instances = new AtomicReference<>(); - // - // Service service = stork.getService(serviceName); - // service.getServiceDiscovery().getServiceInstances() - // .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) - // .subscribe().with(instances::set); - // - // await().atMost(Duration.ofSeconds(5)) - // .until(() -> instances.get() != null); - // - // assertThat(instances.get()).hasSize(1); - // assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080); - // assertThat(instances.get().stream().map(ServiceInstance::getHost)).contains("10.96.96.231"); - // - // Thread.sleep(5000); - // - // service.getServiceDiscovery().getServiceInstances() - // .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) - // .subscribe().with(instances::set); - // - // await().atMost(Duration.ofSeconds(5)) - // .until(() -> instances.get() != null); - // - // assertThat(instances.get()).hasSize(1); - // assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080); - // assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231"); - // for (ServiceInstance serviceInstance : instances.get()) { - // Map labels = serviceInstance.getLabels(); - // assertThat(labels).contains(entry("app.kubernetes.io/name", "svc"), - // entry("app.kubernetes.io/version", "1.0"), - // entry("ui", "ui-" + ipAsSuffix(serviceInstance.getHost()))); - // } - // - // } - - @Test - void shouldPreserveIdsOnRefetch() throws InterruptedException { - - // Given a service with 3 instances registered in the cluster - // Stork gathers the cache from the cluster - // When endpoints are recreated with same IP and port - // Stork is called to get service instances again - // Stork contacts the cluster to get the instances but it preserves the Stork service instances Id - - TestConfigProvider.addServiceConfig("svc", null, "kubernetes", null, - null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", defaultNamespace, "refresh-period", "3"), null); - Stork stork = StorkTestUtils.getNewStorkInstance(); - - String serviceName = "svc"; - - registerKubernetesResources(serviceName, defaultNamespace, "10.96.96.231", "10.96.96.232", "10.96.96.233"); - - AtomicReference> instances = new AtomicReference<>(); - - Service service = stork.getService(serviceName); - service.getServiceDiscovery().getServiceInstances() - .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) - .subscribe().with(instances::set); - - await().atMost(Duration.ofSeconds(5)) - .until(() -> instances.get() != null); - - assertThat(instances.get()).hasSize(3); - assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080); - assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231", - "10.96.96.232", "10.96.96.233"); - - Map idsByHostname = mapHostnameToIds(instances.get()); - - client.endpoints().withName(serviceName).delete(); - client.pods().withName("svc-109696231").delete(); - client.pods().withName("svc-109696232").delete(); - client.pods().withName("svc-109696233").delete(); - // - // registerKubernetesResources(serviceName, defaultNamespace, "10.96.96.231", "10.96.96.232"); - // - // // Thread.sleep(5000); - // - // service.getServiceDiscovery().getServiceInstances() - // .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) - // .subscribe().with(instances::set); - // - // await().atMost(Duration.ofSeconds(5)) - // .until(instances::get, Matchers.hasSize(2)); - // - // for (ServiceInstance serviceInstance : instances.get()) { - // assertThat(idsByHostname.get(serviceInstance.getHost())).isEqualTo(serviceInstance.getId()); - // } - // - // client.endpoints().withName(serviceName).delete(); - // client.pods().withName("svc-109696231").delete(); - // client.pods().withName("svc-109696232").delete(); - - registerKubernetesResources(serviceName, defaultNamespace, "10.96.96.231", "10.96.96.232", "10.96.96.234"); - - // Thread.sleep(5000); - - service.getServiceDiscovery().getServiceInstances() - .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) - .subscribe().with(instances::set); - - await().atMost(Duration.ofSeconds(5)) - .until(instances::get, Matchers.hasSize(3)); - - for (ServiceInstance serviceInstance : instances.get()) { - if (serviceInstance.getHost().equals("10.96.96.234")) { - assertThat(idsByHostname.containsValue(serviceInstance.getId())).isFalse(); - } else { - assertThat(idsByHostname.get(serviceInstance.getHost())).isEqualTo(serviceInstance.getId()); - } - } - } - - private void registerKubernetesResources(String serviceName, String namespace, String... ips) { - Assert.checkNotNullParam("ips", ips); - buildAndRegisterKubernetesService(serviceName, namespace, true, ips); - Arrays.stream(ips).map(ip -> buildAndRegisterBackendPod(serviceName, namespace, true, ip)).collect(Collectors.toList()); - } - - private Map mapHostnameToIds(List serviceInstances) { - Map result = new HashMap<>(); - for (ServiceInstance serviceInstance : serviceInstances) { - result.put(serviceInstance.getHost(), serviceInstance.getId()); - } - return result; - } - - private Endpoints buildAndRegisterKubernetesService(String serviceName, String namespace, boolean register, - String... ipAdresses) { - - Map serviceLabels = new HashMap<>(); - serviceLabels.put("app.kubernetes.io/name", serviceName); - serviceLabels.put("app.kubernetes.io/version", "1.0"); - - List endpointAddresses = Arrays.stream(ipAdresses) - .map(ipAddress -> { - ObjectReference targetRef = new ObjectReference(null, null, "Pod", - serviceName + "-" + ipAsSuffix(ipAddress), namespace, null, UUID.randomUUID().toString()); - EndpointAddress endpointAddress = new EndpointAddressBuilder().withIp(ipAddress).withTargetRef(targetRef) - .build(); - return endpointAddress; - }).collect(Collectors.toList()); - Endpoints endpoint = new EndpointsBuilder() - .withNewMetadata().withName(serviceName).withLabels(serviceLabels).endMetadata() - .addToSubsets(new EndpointSubsetBuilder().withAddresses(endpointAddresses) - .addToPorts(new EndpointPortBuilder().withPort(8080).withProtocol("TCP").build()) - .build()) - .build(); - - if (register) { - if (namespace != null) { - client.endpoints().inNamespace(namespace).resource(endpoint).create(); - } else { - client.endpoints().resource(endpoint).create(); - } - } - return endpoint; - - } - - private Pod buildAndRegisterBackendPod(String name, String namespace, boolean register, String ip) { - - Map serviceLabels = new HashMap<>(); - serviceLabels.put("app.kubernetes.io/name", name); - serviceLabels.put("app.kubernetes.io/version", "1.0"); - - Map podLabels = new HashMap<>(serviceLabels); - podLabels.put("ui", "ui-" + ipAsSuffix(ip)); - Pod backendPod = new PodBuilder().withNewMetadata().withName(name + "-" + ipAsSuffix(ip)) - .withLabels(podLabels) - .withNamespace(namespace) - .endMetadata() - .build(); - if (register) { - if (namespace != null) { - client.pods().inNamespace(namespace).resource(backendPod).create(); - } else { - client.pods().resource(backendPod).create(); - } - } - return backendPod; - } - - private String ipAsSuffix(String ipAddress) { - return ipAddress.replace(".", ""); - } - -} diff --git a/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryTest.java b/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryTest.java index e5db1f3f..4f5da7e0 100644 --- a/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryTest.java +++ b/service-discovery/kubernetes/src/test/java/io/smallrye/stork/servicediscovery/kubernetes/KubernetesServiceDiscoveryTest.java @@ -7,6 +7,7 @@ import static org.awaitility.Awaitility.await; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -19,6 +20,8 @@ import org.hamcrest.Matchers; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; import io.fabric8.kubernetes.api.model.EndpointAddress; import io.fabric8.kubernetes.api.model.EndpointAddressBuilder; @@ -27,6 +30,7 @@ import io.fabric8.kubernetes.api.model.EndpointSubsetBuilder; import io.fabric8.kubernetes.api.model.Endpoints; import io.fabric8.kubernetes.api.model.EndpointsBuilder; +import io.fabric8.kubernetes.api.model.EndpointsListBuilder; import io.fabric8.kubernetes.api.model.ObjectReference; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodBuilder; @@ -43,6 +47,7 @@ import io.smallrye.stork.test.StorkTestUtils; import io.smallrye.stork.test.TestConfigProvider; +@DisabledOnOs(OS.WINDOWS) @EnableKubernetesMockClient(crud = true) public class KubernetesServiceDiscoveryTest { @@ -492,21 +497,71 @@ void shouldPreserveIdsOnRefetch() throws InterruptedException { } } + @Test + void shouldGetInstancesFromCache() throws InterruptedException { + String serviceName = "svc"; + + //Recording k8s cluster calls and build a endpoints as response + AtomicInteger serverHit = new AtomicInteger(0); + server.expect().get().withPath("/api/v1/namespaces/test/endpoints?fieldSelector=metadata.name%3Dsvc") + .andReply(200, r -> { + serverHit.incrementAndGet(); + List endpointsList = new ArrayList<>(); + endpointsList.add(registerKubernetesResources(serviceName, defaultNamespace, "10.96.96.231", "10.96.96.232", + "10.96.96.233")); + return new EndpointsListBuilder().withItems(endpointsList).build(); + }).always(); + + TestConfigProvider.addServiceConfig(serviceName, null, "kubernetes", null, + null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", defaultNamespace, "refresh-period", "3"), null); + Stork stork = StorkTestUtils.getNewStorkInstance(); + + AtomicReference> instances = new AtomicReference<>(); + + Service service = stork.getService(serviceName); + service.getServiceDiscovery().getServiceInstances() + .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) + .subscribe().with(instances::set); + + await().atMost(Duration.ofSeconds(5)) + .until(() -> instances.get() != null); + + assertThat(serverHit.get()).isEqualTo(1); + assertThat(instances.get()).hasSize(3); + assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080); + assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231", + "10.96.96.232", "10.96.96.233"); + + //second try to get instances, instances should be fetched from cache, cluster calls should be still 1 + service.getServiceDiscovery().getServiceInstances() + .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) + .subscribe().with(instances::set); + + await().atMost(Duration.ofSeconds(5)) + .until(() -> instances.get() != null); + + assertThat(serverHit.get()).isEqualTo(1); + assertThat(instances.get()).hasSize(3); + assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080); + assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231", + "10.96.96.232", "10.96.96.233"); + + } + @Test void shouldFetchInstancesFromTheClusterWhenCacheIsInvalidated() throws InterruptedException { - // Given a service with 3 instances registered in the cluster + // Given a service with 3 instances registered in the cluster, in `test` namespace // Stork gather the cache from the cluster // When the endpoints are removed (this invalidates the cache) // Stork is called to get service instances again // Stork contacts the cluster to get the instances : it gets 0 of them + String serviceName = "svc"; - TestConfigProvider.addServiceConfig("svc", null, "kubernetes", null, + TestConfigProvider.addServiceConfig(serviceName, null, "kubernetes", null, null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", defaultNamespace, "refresh-period", "3"), null); Stork stork = StorkTestUtils.getNewStorkInstance(); - String serviceName = "svc"; - registerKubernetesResources(serviceName, defaultNamespace, "10.96.96.231", "10.96.96.232", "10.96.96.233"); AtomicReference> instances = new AtomicReference<>(); @@ -524,9 +579,7 @@ void shouldFetchInstancesFromTheClusterWhenCacheIsInvalidated() throws Interrupt assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231", "10.96.96.232", "10.96.96.233"); - client.endpoints().withName(serviceName).delete(); - - Thread.sleep(5000); + client.endpoints().inNamespace(defaultNamespace).withName(serviceName).delete(); service.getServiceDiscovery().getServiceInstances() .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) @@ -539,31 +592,21 @@ void shouldFetchInstancesFromTheClusterWhenCacheIsInvalidated() throws Interrupt } @Test - void shouldFetchInstancesFromTheCache() throws InterruptedException { - - // Stork gathers the cache from the cluster - // Configure the mock cluster for recordings calls to a specific path - // Stork is called twice to get service instances - // Stork get the instances from the cache: assert that only 1 call to the cluster has been done + void shouldFetchInstancesFromAllNsWhenCacheIsInvalidated() throws InterruptedException { + // Given a service with 3 instances registered in the cluster in any namespace + // Stork gather the cache from the cluster + // When the endpoints are removed (this invalidates the cache) + // Stork is called to get service instances again + // Stork contacts the cluster to get the instances : it gets 0 of them String serviceName = "svc"; - String[] ips = { "10.96.96.231" }; - //Recording k8s cluster calls and build the endpoints as response - AtomicInteger serverHit = new AtomicInteger(0); - server.expect().get().withPath("/api/v1/namespaces/test/endpoints?fieldSelector=metadata.name%3Dsvc") - .andReply(200, r -> { - serverHit.incrementAndGet(); - Endpoints endpoints = buildAndRegisterKubernetesService(serviceName, defaultNamespace, true, ips); - Arrays.stream(ips).map(ip -> buildAndRegisterBackendPod(serviceName, defaultNamespace, true, ips[0])) - .collect(Collectors.toList()); - return endpoints; - }).always(); - - TestConfigProvider.addServiceConfig("svc", null, "kubernetes", null, - null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", defaultNamespace, "refresh-period", "3"), null); + TestConfigProvider.addServiceConfig(serviceName, null, "kubernetes", null, + null, Map.of("k8s-host", k8sMasterUrl, "k8s-namespace", "all", "refresh-period", "3"), null); Stork stork = StorkTestUtils.getNewStorkInstance(); + registerKubernetesResources(serviceName, defaultNamespace, "10.96.96.231", "10.96.96.232", "10.96.96.233"); + AtomicReference> instances = new AtomicReference<>(); Service service = stork.getService(serviceName); @@ -574,23 +617,28 @@ void shouldFetchInstancesFromTheCache() throws InterruptedException { await().atMost(Duration.ofSeconds(5)) .until(() -> instances.get() != null); - assertThat(serverHit.get()).isEqualTo(1); + assertThat(instances.get()).hasSize(3); + assertThat(instances.get().stream().map(ServiceInstance::getPort)).allMatch(p -> p == 8080); + assertThat(instances.get().stream().map(ServiceInstance::getHost)).containsExactlyInAnyOrder("10.96.96.231", + "10.96.96.232", "10.96.96.233"); + + client.endpoints().inNamespace(defaultNamespace).withName(serviceName).delete(); - //second try to get instances, instances should be fetched from cache service.getServiceDiscovery().getServiceInstances() .onFailure().invoke(th -> fail("Failed to get service instances from Kubernetes", th)) .subscribe().with(instances::set); await().atMost(Duration.ofSeconds(5)) - .until(() -> instances.get() != null); + .until(() -> instances.get().isEmpty()); - assertThat(serverHit.get()).isEqualTo(1); + assertThat(instances.get()).hasSize(0); } - private void registerKubernetesResources(String serviceName, String namespace, String... ips) { + private Endpoints registerKubernetesResources(String serviceName, String namespace, String... ips) { Assert.checkNotNullParam("ips", ips); - buildAndRegisterKubernetesService(serviceName, namespace, true, ips); + Endpoints endpoints = buildAndRegisterKubernetesService(serviceName, namespace, true, ips); Arrays.stream(ips).forEach(ip -> buildAndRegisterBackendPod(serviceName, namespace, true, ip)); + return endpoints; } private Map mapHostnameToIds(List serviceInstances) {