Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 43 additions & 5 deletions docs/development/extensions-core/kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ This extension works together with HTTP-based segment and task management in Dru
`druid.indexer.runner.type=httpRemote`
`druid.discovery.type=k8s`

For Node Discovery, Each Druid process running inside a pod "announces" itself by adding few "labels" and "annotations" in the pod spec. Druid process needs to be aware of pod name and namespace which it reads from environment variables `POD_NAME` and `POD_NAMESPACE`. These variable names can be changed, see configuration below. But in the end, each pod needs to have self pod name and namespace added as environment variables.
For node discovery, each Druid process running inside a pod "announces" itself by adding labels and annotations to the pod spec. A pod is discoverable by other Druid processes when it has the required labels and annotations and Kubernetes considers the container ready. Without a readiness probe, Kubernetes marks a container as ready the moment the process starts — see [Readiness Probes](#readiness-probes) for why you should configure one.

Additionally, this extension has following configuration.
Each Druid process needs to be aware of its own pod name and namespace, which it reads from environment variables `POD_NAME` and `POD_NAMESPACE`. These variable names can be changed (see configuration below), but every pod must have its own name and namespace available as environment variables.

Additionally, this extension has the following configuration.

### Properties
|Property|Possible Values|Description|Default|required|
Expand All @@ -52,11 +54,47 @@ Additionally, this extension has following configuration.
|`druid.discovery.k8s.renewDeadline`|`Duration`|Lease renewal period used by Leader.|PT17S|No|
|`druid.discovery.k8s.retryPeriod`|`Duration`|Retry wait used by Leader Election algorithm on failed operations.|PT5S|No|

### Readiness Probes

:::info
Readiness probe configuration directly affects discovery behavior. If a probe is too aggressive (low timeout, low failure threshold), a pod under heavy load could temporarily fail its probe, be removed from discovery, and shift its load onto other pods — potentially causing a cascade. To avoid this, tune your probes to tolerate brief periods of high load.
:::

This extension uses Kubernetes container readiness, in addition to labels and annotations, to decide whether a pod is available for service discovery.

You should configure [readiness probes](https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/) on all Druid pods. Without a readiness probe, Kubernetes marks a container as ready the moment the process starts, which may be before the Druid service is fully initialized and able to handle requests.

A container may become unready if:
* The process is killed or crashes. It will be immediately marked as unready without waiting for the readiness probe to cross its failure threshold.
* The process is alive but not healthy. It will be marked as unready after it fails its readiness probe a configured number of times (`failureThreshold`).

Once marked as unready, a container must pass its readiness probe `successThreshold` times (default: 1) before it is considered ready again and re-enters discovery.

#### Recommendations

The `/status/ready` endpoint is a good candidate for readiness checks, as it indicates whether the Druid node is ready to serve requests. Services use this endpoint to decide if they should announce themselves, so it is a natural choice for the readiness probe. However, you can choose a different endpoint if it better suits your needs.

For Druid processes that have long startup times, consider using a [startup probe](https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-startup-probes) so that the readiness probe does not run (and fail) during initialization.

Baseline readiness probe configuration for Druid might look like this. Replace the port value with the port your Druid process listens on (e.g., `8888` for the Router, `8081` for the Broker):

```yaml
readinessProbe:
httpGet:
path: /status/ready
port: 8888
periodSeconds: 10
failureThreshold: 3
timeoutSeconds: 10
```

With this configuration, a pod must fail its readiness check 3 times in a row (30 seconds) before it is marked as not ready. Adjust these values based on your workload and tolerance for routing to temporarily unhealthy pods.

### Gotchas

- Label/Annotation path in each pod spec MUST EXIST, which is easily satisfied if there is at least one label/annotation in the pod spec already.
- All Druid Pods belonging to one Druid cluster must be inside same kubernetes namespace.
- All Druid Pods need permissions to be able to add labels to self-pod, List and Watch other Pods, create and read ConfigMap for leader election. Assuming, "default" service account is used by Druid pods, you might need to add following or something similar Kubernetes Role and Role Binding.
- The label/annotation path in each pod spec must exist, which is easily satisfied if there is at least one label or annotation in the pod spec already.
- All Druid pods belonging to one Druid cluster must be inside the same Kubernetes namespace.
- All Druid pods need permissions to add labels to their own pod, list and watch other pods, and create and read ConfigMaps for leader election. Assuming the `default` service account is used by Druid pods, you might need to add the following (or similar) Kubernetes Role and RoleBinding.

```
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1ContainerStatus;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.util.PatchUtils;
Expand All @@ -41,6 +42,7 @@
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -114,6 +116,14 @@ public DiscoveryDruidNodeList listPods(

Map<String, DiscoveryDruidNode> allNodes = new HashMap();
for (V1Pod podDef : podList.getItems()) {
if (!isPodReady(podDef)) {
LOGGER.info(
"Ignoring pod[%s] for role[%s] during list: pod has discovery label but is not yet reporting as ready.",
podDef.getMetadata().getName(),
nodeRole
);
continue;
}
DiscoveryDruidNode node = getDiscoveryDruidNodeFromPodDef(nodeRole, podDef);
allNodes.put(node.getDruidNode().getHostAndPortToUse(), node);
}
Expand All @@ -124,6 +134,23 @@ public DiscoveryDruidNodeList listPods(
}
}

/**
* Check whether a pod's containers are all running and ready. This is used to filter out pods
* whose containers have been OOM-killed or are otherwise not serving traffic, even though the
* pod itself still exists and retains its Druid announcement labels.
*/
static boolean isPodReady(V1Pod pod)
{
if (pod.getStatus() == null) {
return false;
}
List<V1ContainerStatus> containerStatuses = pod.getStatus().getContainerStatuses();
if (containerStatuses == null || containerStatuses.isEmpty()) {
return false;
}
return containerStatuses.stream().allMatch(cs -> Boolean.TRUE.equals(cs.getReady()));
}

private DiscoveryDruidNode getDiscoveryDruidNodeFromPodDef(NodeRole nodeRole, V1Pod podDef)
{
String jsonStr = podDef.getMetadata().getAnnotations().get(K8sDruidNodeAnnouncer.getInfoAnnotation(nodeRole));
Expand Down Expand Up @@ -174,11 +201,54 @@ public boolean hasNext() throws SocketTimeoutException
Watch.Response<V1Pod> item = watch.next();
if (item != null && item.type != null && !item.type.equals(WatchResult.BOOKMARK)) {
DiscoveryDruidNodeAndResourceVersion result = null;
String effectiveType = item.type;

if (item.object != null) {
result = new DiscoveryDruidNodeAndResourceVersion(
item.object.getMetadata().getResourceVersion(),
getDiscoveryDruidNodeFromPodDef(nodeRole, item.object)
);
if (!isPodReady(item.object)) {
if (WatchResult.MODIFIED.equals(item.type)) {
// Pod was previously ready but is now unready (e.g., OOM-killed container).
// Remap to NOT_READY to ensure the host is removed from discovery cache if is cached
LOGGER.info(
"Pod[%s] for role[%s] notified that it was modified and is now showing as not ready, "
+ "treating as removed for discovery purposes.",
item.object.getMetadata().getName(),
nodeRole
);
effectiveType = WatchResult.NOT_READY;
} else if (WatchResult.ADDED.equals(item.type)) {
// Pod is not ready yet (e.g., still starting up). Skip this event entirely.
// It will appear via a MODIFIED event that remaps to ADDED for discovery, once it becomes ready.
LOGGER.debug(
"Pod[%s] for role[%s] is not ready on ADDED event, skipping until it becomes ready.",
item.object.getMetadata().getName(),
nodeRole
);
continue;
}
} else if (WatchResult.MODIFIED.equals(item.type)) {
// Remap MODIFIED (pod ready) events to ADDED for discovery cache purposes.
// This is safe even if the node is already in the cache because BaseNodeRoleWatcher.childAdded() uses
// putIfAbsent, so duplicates are silently ignored.
effectiveType = WatchResult.ADDED;
}

try {
result = new DiscoveryDruidNodeAndResourceVersion(
item.object.getMetadata().getResourceVersion(),
getDiscoveryDruidNodeFromPodDef(nodeRole, item.object)
);
}
catch (Exception ex) {
LOGGER.warn(
ex,
"Failed to deserialize node info from pod[%s] for role[%s] on [%s] event. "
+ "Passing null to trigger watch restart and full resync.",
item.object.getMetadata() != null ? item.object.getMetadata().getName() : "unknown",
nodeRole,
item.type
);
// result stays null, caller will restart the watch and do a full listPods resync
}
} else {
// The item's object can be null in some cases -- likely due to a blip
// in the k8s watch. Handle that by passing the null upwards. The caller
Expand All @@ -187,7 +257,7 @@ public boolean hasNext() throws SocketTimeoutException
}

obj = new Watch.Response<>(
item.type,
effectiveType,
result
);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,12 @@ private void keepWatching(String labelSelector, String resourceVersion)
baseNodeRoleWatcher.childAdded(item.object.getNode());
break;
case WatchResult.DELETED:
baseNodeRoleWatcher.childRemoved(item.object.getNode());
case WatchResult.NOT_READY:
// Use skipIfUnknown=true for all k8s discovery removals.
// DELETED can fire after NOT_READY (so the service is already removed), or before ADDED (pod deleted before becoming ready).
// NOT_READY can repeat during CrashLoopBackOff. None of these warrant the error-level logging that
// comes with trying to remove an unknown service.
baseNodeRoleWatcher.childRemoved(item.object.getNode(), true);
break;
default:
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,27 @@

import java.net.SocketTimeoutException;

/**
* Iterator over k8s pod watch events that is aligned with the needs of Druid service discovery rather than
* raw Kubernetes watch semantics. Implementations may remap or synthesize event types: for example,
* a Kubernetes MODIFIED event for a pod whose containers are no longer ready (according to k8s readiness state) is surfaced as the
* synthetic {@link #NOT_READY} type so that consumers can handle it as a removal from k8s service discovery.
*/
public interface WatchResult
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class used to be a relatively thin layer over k8s watches, now there's remapping and synthetic events going on. The javadoc of this class should make clear that it's not meant to be a thin layer over k8s watches, it's meant to be aligned with the needs of service discovery.

{
String ADDED = "ADDED";
String MODIFIED = "MODIFIED";
String DELETED = "DELETED";
String BOOKMARK = "BOOKMARK";

/**
* Synthetic event type: pod's container became not-ready (e.g., OOM-killed) but the pod
* still exists. Should be treated as a removal from discovery, but without error-level
* logging for nodes that aren't in the cache (repeated events are expected during
* CrashLoopBackOff).
*/
String NOT_READY = "NOT_READY";

boolean hasNext() throws SocketTimeoutException;

Watch.Response<DiscoveryDruidNodeAndResourceVersion> next();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.k8s.discovery;

import io.kubernetes.client.openapi.models.V1ContainerStatus;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodStatus;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;

public class DefaultK8sApiClientTest
{
@Test
public void testIsPodReady_nullStatus()
{
V1Pod pod = new V1Pod();
Assertions.assertFalse(DefaultK8sApiClient.isPodReady(pod));
}

@Test
public void testIsPodReady_nullContainerStatuses()
{
V1Pod pod = new V1Pod();
pod.setStatus(new V1PodStatus());
Assertions.assertFalse(DefaultK8sApiClient.isPodReady(pod));
}

@Test
public void testIsPodReady_emptyContainerStatuses()
{
V1Pod pod = new V1Pod();
V1PodStatus status = new V1PodStatus();
status.setContainerStatuses(Collections.emptyList());
pod.setStatus(status);
Assertions.assertFalse(DefaultK8sApiClient.isPodReady(pod));
}

@Test
public void testIsPodReady_allContainersReady()
{
V1Pod pod = new V1Pod();
V1PodStatus status = new V1PodStatus();
V1ContainerStatus cs = new V1ContainerStatus();
cs.setReady(true);
status.setContainerStatuses(Collections.singletonList(cs));
pod.setStatus(status);
Assertions.assertTrue(DefaultK8sApiClient.isPodReady(pod));
}

@Test
public void testIsPodReady_containerNotReady()
{
V1Pod pod = new V1Pod();
V1PodStatus status = new V1PodStatus();
V1ContainerStatus cs = new V1ContainerStatus();
cs.setReady(false);
status.setContainerStatuses(Collections.singletonList(cs));
pod.setStatus(status);
Assertions.assertFalse(DefaultK8sApiClient.isPodReady(pod));
}

@Test
public void testIsPodReady_mixedContainerReadiness()
{
V1Pod pod = new V1Pod();
V1PodStatus status = new V1PodStatus();
V1ContainerStatus cs1 = new V1ContainerStatus();
cs1.setReady(true);
V1ContainerStatus cs2 = new V1ContainerStatus();
cs2.setReady(false);
status.setContainerStatuses(Arrays.asList(cs1, cs2));
pod.setStatus(status);
Assertions.assertFalse(DefaultK8sApiClient.isPodReady(pod));
}

@Test
public void testIsPodReady_containerReadyNull()
{
V1Pod pod = new V1Pod();
V1PodStatus status = new V1PodStatus();
V1ContainerStatus cs = new V1ContainerStatus();
// ready is null (not set)
status.setContainerStatuses(Collections.singletonList(cs));
pod.setStatus(status);
Assertions.assertFalse(DefaultK8sApiClient.isPodReady(pod));
}
}
Loading
Loading