diff --git a/docs/development/extensions-core/kubernetes.md b/docs/development/extensions-core/kubernetes.md index 25696546dfeb..3115ee9056b2 100644 --- a/docs/development/extensions-core/kubernetes.md +++ b/docs/development/extensions-core/kubernetes.md @@ -38,9 +38,11 @@ This extension works together with HTTP-based segment and task management in Dru `druid.indexer.runner.type=httpRemote` `druid.discovery.type=k8s` -For Node Discovery, Each Druid process running inside a pod "announces" itself by adding few "labels" and "annotations" in the pod spec. Druid process needs to be aware of pod name and namespace which it reads from environment variables `POD_NAME` and `POD_NAMESPACE`. These variable names can be changed, see configuration below. But in the end, each pod needs to have self pod name and namespace added as environment variables. +For node discovery, each Druid process running inside a pod "announces" itself by adding labels and annotations to the pod spec. A pod is discoverable by other Druid processes when it has the required labels and annotations and Kubernetes considers the container ready. Without a readiness probe, Kubernetes marks a container as ready the moment the process starts — see [Readiness Probes](#readiness-probes) for why you should configure one. -Additionally, this extension has following configuration. +Each Druid process needs to be aware of its own pod name and namespace, which it reads from environment variables `POD_NAME` and `POD_NAMESPACE`. These variable names can be changed (see configuration below), but every pod must have its own name and namespace available as environment variables. + +Additionally, this extension has the following configuration. ### Properties |Property|Possible Values|Description|Default|required| @@ -52,11 +54,47 @@ Additionally, this extension has following configuration. |`druid.discovery.k8s.renewDeadline`|`Duration`|Lease renewal period used by Leader.|PT17S|No| |`druid.discovery.k8s.retryPeriod`|`Duration`|Retry wait used by Leader Election algorithm on failed operations.|PT5S|No| +### Readiness Probes + +:::info +Readiness probe configuration directly affects discovery behavior. If a probe is too aggressive (low timeout, low failure threshold), a pod under heavy load could temporarily fail its probe, be removed from discovery, and shift its load onto other pods — potentially causing a cascade. To avoid this, tune your probes to tolerate brief periods of high load. +::: + +This extension uses Kubernetes container readiness, in addition to labels and annotations, to decide whether a pod is available for service discovery. + +You should configure [readiness probes](https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/) on all Druid pods. Without a readiness probe, Kubernetes marks a container as ready the moment the process starts, which may be before the Druid service is fully initialized and able to handle requests. + +A container may become unready if: +* The process is killed or crashes. It will be immediately marked as unready without waiting for the readiness probe to cross its failure threshold. +* The process is alive but not healthy. It will be marked as unready after it fails its readiness probe a configured number of times (`failureThreshold`). + +Once marked as unready, a container must pass its readiness probe `successThreshold` times (default: 1) before it is considered ready again and re-enters discovery. + +#### Recommendations + +The `/status/ready` endpoint is a good candidate for readiness checks, as it indicates whether the Druid node is ready to serve requests. Services use this endpoint to decide if they should announce themselves, so it is a natural choice for the readiness probe. However, you can choose a different endpoint if it better suits your needs. + +For Druid processes that have long startup times, consider using a [startup probe](https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-startup-probes) so that the readiness probe does not run (and fail) during initialization. + +Baseline readiness probe configuration for Druid might look like this. Replace the port value with the port your Druid process listens on (e.g., `8888` for the Router, `8081` for the Broker): + +```yaml +readinessProbe: + httpGet: + path: /status/ready + port: 8888 + periodSeconds: 10 + failureThreshold: 3 + timeoutSeconds: 10 +``` + +With this configuration, a pod must fail its readiness check 3 times in a row (30 seconds) before it is marked as not ready. Adjust these values based on your workload and tolerance for routing to temporarily unhealthy pods. + ### Gotchas -- Label/Annotation path in each pod spec MUST EXIST, which is easily satisfied if there is at least one label/annotation in the pod spec already. -- All Druid Pods belonging to one Druid cluster must be inside same kubernetes namespace. -- All Druid Pods need permissions to be able to add labels to self-pod, List and Watch other Pods, create and read ConfigMap for leader election. Assuming, "default" service account is used by Druid pods, you might need to add following or something similar Kubernetes Role and Role Binding. +- The label/annotation path in each pod spec must exist, which is easily satisfied if there is at least one label or annotation in the pod spec already. +- All Druid pods belonging to one Druid cluster must be inside the same Kubernetes namespace. +- All Druid pods need permissions to add labels to their own pod, list and watch other pods, and create and read ConfigMaps for leader election. Assuming the `default` service account is used by Druid pods, you might need to add the following (or similar) Kubernetes Role and RoleBinding. ``` apiVersion: rbac.authorization.k8s.io/v1 diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java index adf10a42dd49..b7992a3fb2ad 100644 --- a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java +++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/DefaultK8sApiClient.java @@ -28,6 +28,7 @@ import io.kubernetes.client.openapi.ApiClient; import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1ContainerStatus; import io.kubernetes.client.openapi.models.V1Pod; import io.kubernetes.client.openapi.models.V1PodList; import io.kubernetes.client.util.PatchUtils; @@ -41,6 +42,7 @@ import java.io.IOException; import java.net.SocketTimeoutException; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -114,6 +116,14 @@ public DiscoveryDruidNodeList listPods( Map allNodes = new HashMap(); for (V1Pod podDef : podList.getItems()) { + if (!isPodReady(podDef)) { + LOGGER.info( + "Ignoring pod[%s] for role[%s] during list: pod has discovery label but is not yet reporting as ready.", + podDef.getMetadata().getName(), + nodeRole + ); + continue; + } DiscoveryDruidNode node = getDiscoveryDruidNodeFromPodDef(nodeRole, podDef); allNodes.put(node.getDruidNode().getHostAndPortToUse(), node); } @@ -124,6 +134,23 @@ public DiscoveryDruidNodeList listPods( } } + /** + * Check whether a pod's containers are all running and ready. This is used to filter out pods + * whose containers have been OOM-killed or are otherwise not serving traffic, even though the + * pod itself still exists and retains its Druid announcement labels. + */ + static boolean isPodReady(V1Pod pod) + { + if (pod.getStatus() == null) { + return false; + } + List containerStatuses = pod.getStatus().getContainerStatuses(); + if (containerStatuses == null || containerStatuses.isEmpty()) { + return false; + } + return containerStatuses.stream().allMatch(cs -> Boolean.TRUE.equals(cs.getReady())); + } + private DiscoveryDruidNode getDiscoveryDruidNodeFromPodDef(NodeRole nodeRole, V1Pod podDef) { String jsonStr = podDef.getMetadata().getAnnotations().get(K8sDruidNodeAnnouncer.getInfoAnnotation(nodeRole)); @@ -174,11 +201,54 @@ public boolean hasNext() throws SocketTimeoutException Watch.Response item = watch.next(); if (item != null && item.type != null && !item.type.equals(WatchResult.BOOKMARK)) { DiscoveryDruidNodeAndResourceVersion result = null; + String effectiveType = item.type; + if (item.object != null) { - result = new DiscoveryDruidNodeAndResourceVersion( - item.object.getMetadata().getResourceVersion(), - getDiscoveryDruidNodeFromPodDef(nodeRole, item.object) - ); + if (!isPodReady(item.object)) { + if (WatchResult.MODIFIED.equals(item.type)) { + // Pod was previously ready but is now unready (e.g., OOM-killed container). + // Remap to NOT_READY to ensure the host is removed from discovery cache if is cached + LOGGER.info( + "Pod[%s] for role[%s] notified that it was modified and is now showing as not ready, " + + "treating as removed for discovery purposes.", + item.object.getMetadata().getName(), + nodeRole + ); + effectiveType = WatchResult.NOT_READY; + } else if (WatchResult.ADDED.equals(item.type)) { + // Pod is not ready yet (e.g., still starting up). Skip this event entirely. + // It will appear via a MODIFIED event that remaps to ADDED for discovery, once it becomes ready. + LOGGER.debug( + "Pod[%s] for role[%s] is not ready on ADDED event, skipping until it becomes ready.", + item.object.getMetadata().getName(), + nodeRole + ); + continue; + } + } else if (WatchResult.MODIFIED.equals(item.type)) { + // Remap MODIFIED (pod ready) events to ADDED for discovery cache purposes. + // This is safe even if the node is already in the cache because BaseNodeRoleWatcher.childAdded() uses + // putIfAbsent, so duplicates are silently ignored. + effectiveType = WatchResult.ADDED; + } + + try { + result = new DiscoveryDruidNodeAndResourceVersion( + item.object.getMetadata().getResourceVersion(), + getDiscoveryDruidNodeFromPodDef(nodeRole, item.object) + ); + } + catch (Exception ex) { + LOGGER.warn( + ex, + "Failed to deserialize node info from pod[%s] for role[%s] on [%s] event. " + + "Passing null to trigger watch restart and full resync.", + item.object.getMetadata() != null ? item.object.getMetadata().getName() : "unknown", + nodeRole, + item.type + ); + // result stays null, caller will restart the watch and do a full listPods resync + } } else { // The item's object can be null in some cases -- likely due to a blip // in the k8s watch. Handle that by passing the null upwards. The caller @@ -187,7 +257,7 @@ public boolean hasNext() throws SocketTimeoutException } obj = new Watch.Response<>( - item.type, + effectiveType, result ); return true; diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java index d2472a9fde4d..96497d4510f6 100644 --- a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java +++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProvider.java @@ -271,7 +271,12 @@ private void keepWatching(String labelSelector, String resourceVersion) baseNodeRoleWatcher.childAdded(item.object.getNode()); break; case WatchResult.DELETED: - baseNodeRoleWatcher.childRemoved(item.object.getNode()); + case WatchResult.NOT_READY: + // Use skipIfUnknown=true for all k8s discovery removals. + // DELETED can fire after NOT_READY (so the service is already removed), or before ADDED (pod deleted before becoming ready). + // NOT_READY can repeat during CrashLoopBackOff. None of these warrant the error-level logging that + // comes with trying to remove an unknown service. + baseNodeRoleWatcher.childRemoved(item.object.getNode(), true); break; default: } diff --git a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/WatchResult.java b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/WatchResult.java index f47b13757320..2b3ef7b584a9 100644 --- a/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/WatchResult.java +++ b/extensions-core/kubernetes-extensions/src/main/java/org/apache/druid/k8s/discovery/WatchResult.java @@ -23,12 +23,27 @@ import java.net.SocketTimeoutException; +/** + * Iterator over k8s pod watch events that is aligned with the needs of Druid service discovery rather than + * raw Kubernetes watch semantics. Implementations may remap or synthesize event types: for example, + * a Kubernetes MODIFIED event for a pod whose containers are no longer ready (according to k8s readiness state) is surfaced as the + * synthetic {@link #NOT_READY} type so that consumers can handle it as a removal from k8s service discovery. + */ public interface WatchResult { String ADDED = "ADDED"; + String MODIFIED = "MODIFIED"; String DELETED = "DELETED"; String BOOKMARK = "BOOKMARK"; + /** + * Synthetic event type: pod's container became not-ready (e.g., OOM-killed) but the pod + * still exists. Should be treated as a removal from discovery, but without error-level + * logging for nodes that aren't in the cache (repeated events are expected during + * CrashLoopBackOff). + */ + String NOT_READY = "NOT_READY"; + boolean hasNext() throws SocketTimeoutException; Watch.Response next(); diff --git a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/DefaultK8sApiClientTest.java b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/DefaultK8sApiClientTest.java new file mode 100644 index 000000000000..74b95d3cfef3 --- /dev/null +++ b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/DefaultK8sApiClientTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.discovery; + +import io.kubernetes.client.openapi.models.V1ContainerStatus; +import io.kubernetes.client.openapi.models.V1Pod; +import io.kubernetes.client.openapi.models.V1PodStatus; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; + +public class DefaultK8sApiClientTest +{ + @Test + public void testIsPodReady_nullStatus() + { + V1Pod pod = new V1Pod(); + Assertions.assertFalse(DefaultK8sApiClient.isPodReady(pod)); + } + + @Test + public void testIsPodReady_nullContainerStatuses() + { + V1Pod pod = new V1Pod(); + pod.setStatus(new V1PodStatus()); + Assertions.assertFalse(DefaultK8sApiClient.isPodReady(pod)); + } + + @Test + public void testIsPodReady_emptyContainerStatuses() + { + V1Pod pod = new V1Pod(); + V1PodStatus status = new V1PodStatus(); + status.setContainerStatuses(Collections.emptyList()); + pod.setStatus(status); + Assertions.assertFalse(DefaultK8sApiClient.isPodReady(pod)); + } + + @Test + public void testIsPodReady_allContainersReady() + { + V1Pod pod = new V1Pod(); + V1PodStatus status = new V1PodStatus(); + V1ContainerStatus cs = new V1ContainerStatus(); + cs.setReady(true); + status.setContainerStatuses(Collections.singletonList(cs)); + pod.setStatus(status); + Assertions.assertTrue(DefaultK8sApiClient.isPodReady(pod)); + } + + @Test + public void testIsPodReady_containerNotReady() + { + V1Pod pod = new V1Pod(); + V1PodStatus status = new V1PodStatus(); + V1ContainerStatus cs = new V1ContainerStatus(); + cs.setReady(false); + status.setContainerStatuses(Collections.singletonList(cs)); + pod.setStatus(status); + Assertions.assertFalse(DefaultK8sApiClient.isPodReady(pod)); + } + + @Test + public void testIsPodReady_mixedContainerReadiness() + { + V1Pod pod = new V1Pod(); + V1PodStatus status = new V1PodStatus(); + V1ContainerStatus cs1 = new V1ContainerStatus(); + cs1.setReady(true); + V1ContainerStatus cs2 = new V1ContainerStatus(); + cs2.setReady(false); + status.setContainerStatuses(Arrays.asList(cs1, cs2)); + pod.setStatus(status); + Assertions.assertFalse(DefaultK8sApiClient.isPodReady(pod)); + } + + @Test + public void testIsPodReady_containerReadyNull() + { + V1Pod pod = new V1Pod(); + V1PodStatus status = new V1PodStatus(); + V1ContainerStatus cs = new V1ContainerStatus(); + // ready is null (not set) + status.setContainerStatuses(Collections.singletonList(cs)); + pod.setStatus(status); + Assertions.assertFalse(DefaultK8sApiClient.isPodReady(pod)); + } +} diff --git a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java index 36cab39ab59c..18e030f25073 100644 --- a/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java +++ b/extensions-core/kubernetes-extensions/src/test/java/org/apache/druid/k8s/discovery/K8sDruidNodeDiscoveryProviderTest.java @@ -230,6 +230,134 @@ public void testNodeRoleWatcherHandlesNullFromAPIByRestarting() throws Exception discoveryProvider.stop(); } + @Test + @Timeout(value = 60_000, unit = TimeUnit.MILLISECONDS) + public void testNotReadyEventRemovesNodeAndReAddOnReady() throws Exception + { + String labelSelector = "druidDiscoveryAnnouncement-cluster-identifier=druid-cluster,druidDiscoveryAnnouncement-router=true"; + K8sApiClient mockK8sApiClient = EasyMock.createMock(K8sApiClient.class); + + // Initial list returns two healthy nodes + EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(), labelSelector, NodeRole.ROUTER)).andReturn( + new DiscoveryDruidNodeList( + "v1", + ImmutableMap.of( + testNode1.getDruidNode().getHostAndPortToUse(), testNode1, + testNode2.getDruidNode().getHostAndPortToUse(), testNode2 + ) + ) + ); + + // Watch returns: testNode1 becomes NOT_READY (OOM), then comes back as ADDED (recovered) + EasyMock.expect(mockK8sApiClient.watchPods( + podInfo.getPodNamespace(), labelSelector, "v1", NodeRole.ROUTER)).andReturn( + new MockWatchResult( + ImmutableList.of( + new Watch.Response<>(WatchResult.NOT_READY, new DiscoveryDruidNodeAndResourceVersion("v2", testNode1)), + // Repeated NOT_READY during CrashLoopBackOff — should be silently ignored + new Watch.Response<>(WatchResult.NOT_READY, new DiscoveryDruidNodeAndResourceVersion("v3", testNode1)), + // Pod recovers and becomes ready again + new Watch.Response<>(WatchResult.ADDED, new DiscoveryDruidNodeAndResourceVersion("v4", testNode1)) + ), + false, + false + ) + ); + EasyMock.replay(mockK8sApiClient); + + K8sDruidNodeDiscoveryProvider discoveryProvider = new K8sDruidNodeDiscoveryProvider( + podInfo, + discoveryConfig, + mockK8sApiClient, + 1 + ); + discoveryProvider.start(); + + K8sDruidNodeDiscoveryProvider.NodeRoleWatcher nodeDiscovery = discoveryProvider.getForNodeRole(NodeRole.ROUTER, false); + + MockListener testListener = new MockListener( + ImmutableList.of( + MockListener.Event.added(testNode1), + MockListener.Event.added(testNode2), + MockListener.Event.inited(), + // testNode1 goes NOT_READY — removed + MockListener.Event.deleted(testNode1), + // Second NOT_READY is silently skipped (not in cache) + // testNode1 recovers — re-added + MockListener.Event.added(testNode1) + ) + ); + nodeDiscovery.registerListener(testListener); + + nodeDiscovery.start(); + + testListener.assertSuccess(); + + discoveryProvider.stop(); + } + + @Test + @Timeout(value = 60_000, unit = TimeUnit.MILLISECONDS) + public void testDeletedAfterNotReadyIssilentlyIgnored() throws Exception + { + String labelSelector = "druidDiscoveryAnnouncement-cluster-identifier=druid-cluster,druidDiscoveryAnnouncement-router=true"; + K8sApiClient mockK8sApiClient = EasyMock.createMock(K8sApiClient.class); + + // Initial list returns two healthy nodes + EasyMock.expect(mockK8sApiClient.listPods(podInfo.getPodNamespace(), labelSelector, NodeRole.ROUTER)).andReturn( + new DiscoveryDruidNodeList( + "v1", + ImmutableMap.of( + testNode1.getDruidNode().getHostAndPortToUse(), testNode1, + testNode2.getDruidNode().getHostAndPortToUse(), testNode2 + ) + ) + ); + + // Watch: testNode1 goes NOT_READY, then DELETED arrives (node already removed from cache) + EasyMock.expect(mockK8sApiClient.watchPods( + podInfo.getPodNamespace(), labelSelector, "v1", NodeRole.ROUTER)).andReturn( + new MockWatchResult( + ImmutableList.of( + new Watch.Response<>(WatchResult.NOT_READY, new DiscoveryDruidNodeAndResourceVersion("v2", testNode1)), + // DELETED after NOT_READY — node already removed, should be silently skipped + new Watch.Response<>(WatchResult.DELETED, new DiscoveryDruidNodeAndResourceVersion("v3", testNode1)) + ), + false, + false + ) + ); + EasyMock.replay(mockK8sApiClient); + + K8sDruidNodeDiscoveryProvider discoveryProvider = new K8sDruidNodeDiscoveryProvider( + podInfo, + discoveryConfig, + mockK8sApiClient, + 1 + ); + discoveryProvider.start(); + + K8sDruidNodeDiscoveryProvider.NodeRoleWatcher nodeDiscovery = discoveryProvider.getForNodeRole(NodeRole.ROUTER, false); + + MockListener testListener = new MockListener( + ImmutableList.of( + MockListener.Event.added(testNode1), + MockListener.Event.added(testNode2), + MockListener.Event.inited(), + // testNode1 goes NOT_READY — removed + MockListener.Event.deleted(testNode1) + // DELETED is silently skipped (not in cache) — no second deleted event + ) + ); + nodeDiscovery.registerListener(testListener); + + nodeDiscovery.start(); + + testListener.assertSuccess(); + + discoveryProvider.stop(); + } + @Test @Timeout(value = 10_000, unit = TimeUnit.MILLISECONDS) public void testNodeRoleWatcherLoopOnNullItems() throws Exception diff --git a/server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java b/server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java index f17b43cf2c70..6c38a5dc18a2 100644 --- a/server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java +++ b/server/src/main/java/org/apache/druid/discovery/BaseNodeRoleWatcher.java @@ -174,6 +174,17 @@ private void addNode(DiscoveryDruidNode druidNode) } public void childRemoved(DiscoveryDruidNode druidNode) + { + childRemoved(druidNode, false); + } + + /** + * Remove a node from the discovery cache. + *

+ * If {@code skipIfUnknown} is true, the removal is skipped if the node is not already + * present in the cache. If false, the removal is attempted unconditionally. + */ + public void childRemoved(DiscoveryDruidNode druidNode, boolean skipIfUnknown) { synchronized (lock) { if (!nodeRole.equals(druidNode.getNodeRole())) { @@ -186,6 +197,15 @@ public void childRemoved(DiscoveryDruidNode druidNode) return; } + if (skipIfUnknown && !nodes.containsKey(druidNode.getDruidNode().getHostAndPortToUse())) { + LOGGER.debug( + "Ignoring removal of node [%s] of role [%s] because it is not known to be present in the cache.", + druidNode.getDruidNode().getUriToUse(), + nodeRole.getJsonName() + ); + return; + } + LOGGER.warn("Node [%s] of role [%s] went offline.", druidNode.getDruidNode().getUriToUse(), nodeRole.getJsonName()); removeNode(druidNode); diff --git a/server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java b/server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java index 0decfcdef386..ec7b24cbdc0f 100644 --- a/server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java +++ b/server/src/test/java/org/apache/druid/discovery/BaseNodeRoleWatcherTest.java @@ -244,6 +244,72 @@ public void testDuplicateChildAddedAfterResetNodesDoesNotNotifyListeners() Assert.assertEquals(ImmutableSet.of(broker1, broker2), new HashSet<>(nodeRoleWatcher.getAllNodes())); } + @Test(timeout = 60_000L) + public void testChildRemovedIfPresentRemovesKnownNode() + { + BaseNodeRoleWatcher nodeRoleWatcher = BaseNodeRoleWatcher.create(exec, NodeRole.BROKER); + + DiscoveryDruidNode broker1 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker1"); + + nodeRoleWatcher.childAdded(broker1); + nodeRoleWatcher.cacheInitialized(); + + TestListener listener = new TestListener(); + nodeRoleWatcher.registerListener(listener); + + Assert.assertEquals(ImmutableList.of(broker1), listener.nodesAddedList); + + // Remove with skipIfUnknown=true — node IS in cache, should remove and notify + nodeRoleWatcher.childRemoved(broker1, true); + + Assert.assertEquals(ImmutableList.of(broker1), listener.nodesRemovedList); + Assert.assertTrue(nodeRoleWatcher.getAllNodes().isEmpty()); + } + + @Test(timeout = 60_000L) + public void testChildRemovedIfPresentSkipsUnknownNode() + { + BaseNodeRoleWatcher nodeRoleWatcher = BaseNodeRoleWatcher.create(exec, NodeRole.BROKER); + + DiscoveryDruidNode broker1 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker1"); + DiscoveryDruidNode broker2 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker2"); + + nodeRoleWatcher.childAdded(broker1); + nodeRoleWatcher.cacheInitialized(); + + TestListener listener = new TestListener(); + nodeRoleWatcher.registerListener(listener); + + // Remove broker2 with skipIfUnknown=true — node is NOT in cache, should silently skip + nodeRoleWatcher.childRemoved(broker2, true); + + Assert.assertTrue(listener.nodesRemovedList.isEmpty()); + Assert.assertEquals(1, nodeRoleWatcher.getAllNodes().size()); + } + + @Test(timeout = 60_000L) + public void testChildRemovedIfPresentRepeatedRemovalsAreIdempotent() + { + BaseNodeRoleWatcher nodeRoleWatcher = BaseNodeRoleWatcher.create(exec, NodeRole.BROKER); + + DiscoveryDruidNode broker1 = buildDiscoveryDruidNode(NodeRole.BROKER, "broker1"); + + nodeRoleWatcher.childAdded(broker1); + nodeRoleWatcher.cacheInitialized(); + + TestListener listener = new TestListener(); + nodeRoleWatcher.registerListener(listener); + + // First removal should remove and notify + nodeRoleWatcher.childRemoved(broker1, true); + Assert.assertEquals(ImmutableList.of(broker1), listener.nodesRemovedList); + + // Second removal should silently skip (node already removed) + nodeRoleWatcher.childRemoved(broker1, true); + // Still only one removal notification + Assert.assertEquals(ImmutableList.of(broker1), listener.nodesRemovedList); + } + private DiscoveryDruidNode buildDiscoveryDruidNode(NodeRole role, String host) { return new DiscoveryDruidNode( diff --git a/website/.spelling b/website/.spelling index 706c3845d753..f0c7e5cf6091 100644 --- a/website/.spelling +++ b/website/.spelling @@ -219,6 +219,7 @@ pull-deps RDBMS RDDs RDS +RoleBinding ROUTINE_CATALOG ROUTINE_NAME ROUTINE_SCHEMA @@ -1131,6 +1132,7 @@ Env POD_NAME POD_NAMESPACE ConfigMap +ConfigMaps PT17S GCS gcs-connector