Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate liveness, readiness and startup services #150

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public enum ComponentState {
* The component is not relevant or used
*/
NOT_APPLICABLE,
/**
* The component is created, but did not went into start yet
*/
CREATED,
/**
* The component is still starting
*/
Expand All @@ -47,5 +51,5 @@ public enum ComponentState {
/**
* The component is in a failed state
*/
FAILED
FAILED;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package io.axual.ksml.rest.server;

/*-
* ========================LICENSE_START=================================
* KSML Queryable State Store
* %%
* Copyright (C) 2021 - 2024 Axual B.V.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =========================LICENSE_END==================================
*/

import java.util.Set;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;

import static io.axual.ksml.rest.server.ComponentState.*;

@Slf4j(topic = "ksml.rest.service.live")
@Path("live")
public class LivenessResource {

@GET()
public Response getLivenessState() {
final var querier = GlobalState.INSTANCE.querier();

if (querier == null) {
// Service has not started yet
log.trace("KSML Not Alive - No querier available, still in startup");
return Response.serverError().build();
}

final var producerState = querier.getProducerState();
final var streamRunnerState = querier.getStreamRunnerState();

// Misconfiguration
if( producerState == NOT_APPLICABLE && streamRunnerState == NOT_APPLICABLE ) {
log.trace("KSML Not Alive - Both producerState and streamRunnerState are disabled");
return Response.serverError().build();
}

if( producerState == FAILED || streamRunnerState == FAILED || streamRunnerState == STOPPED){
log.trace("KSML Not Alive - producer state '{}' stream runner state '{}' ", producerState, streamRunnerState);
return Response.serverError().build();
} else {
// KSML is alive, return HTTP Status code 204 (OK, No Content) if components
log.trace("KSML Alive - producer state '{}' stream runner state '{}' ", producerState, streamRunnerState);
return Response.noContent().build();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,43 +27,42 @@
import jakarta.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;

import static io.axual.ksml.rest.server.ComponentState.*;
import static io.axual.ksml.rest.server.ComponentState.NOT_APPLICABLE;
import static io.axual.ksml.rest.server.ComponentState.STARTED;
import static io.axual.ksml.rest.server.ComponentState.STOPPED;
import static io.axual.ksml.rest.server.ComponentState.STOPPING;

@Slf4j
@Slf4j(topic = "ksml.rest.service.ready")
@Path("ready")
public class ReadyResource {
private static final Set<ComponentState> NOT_RUNNING_STATES = Set.of(NOT_APPLICABLE, STOPPED, FAILED);
private static final Set<ComponentState> RUNNING_STATES = Set.of(STARTING, STARTED, STOPPING);
private static final Set<ComponentState> PRODUCER_READY_STATES = Set.of(NOT_APPLICABLE, STARTED, STOPPING, STOPPED);
private static final Set<ComponentState> STREAMS_READY_STATES = Set.of(NOT_APPLICABLE, STARTED);

@GET()
public Response getReadyState() {
final var querier = GlobalState.INSTANCE.querier();

if (querier == null) {
// Service has not started yet
log.info("KSML Not Ready -No querier available, still in startup");
return Response.serverError().build();
}

final var producerState = querier.getProducerState();
final var streamRunnerState = querier.getStreamRunnerState();

log.trace("Ready states - producer '{}' stream runner '{}' ", producerState, streamRunnerState);

// Check if either Producer and StreamRunner has failed, or if they are BOTH in a not running state.
if (producerState == FAILED || streamRunnerState == FAILED ||
(NOT_RUNNING_STATES.contains(producerState) && NOT_RUNNING_STATES.contains(streamRunnerState))) {
// One of the components has failed, or both are not running.
if (producerState == NOT_APPLICABLE && streamRunnerState == NOT_APPLICABLE) {
log.info("KSML Not Ready - Both producerState and streamRunnerState are disabled");
return Response.serverError().build();
}

// Check if either producer of consumer is in a running state
if (RUNNING_STATES.contains(producerState) || RUNNING_STATES.contains(streamRunnerState)) {
// KSML has started, return HTTP Status code 204 (OK, No Content) if components
if (PRODUCER_READY_STATES.contains(producerState) && STREAMS_READY_STATES.contains(streamRunnerState)) {
// KSML is running, return HTTP Status code 204 (OK, No Content) if components
log.info("KSML Ready - producer state '{}' stream runner state '{}' ", producerState, streamRunnerState);
return Response.noContent().build();
} else {
// KSML has started, return HTTP Status code 204 (OK, No Content) if components
log.info("KSML Not Ready - producer state '{}' stream runner state '{}' ", producerState, streamRunnerState);
return Response.serverError().build();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public RestServer(HostInfo hostInfo) {

// configure REST service
ResourceConfig rc = new ResourceConfig();
rc.register(StartupResource.class);
rc.register(LivenessResource.class);
rc.register(ReadyResource.class);
rc.register(KeyValueStoreResource.class);
rc.register(WindowedKeyValueStoreResource.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package io.axual.ksml.rest.server;

/*-
* ========================LICENSE_START=================================
* KSML Queryable State Store
* %%
* Copyright (C) 2021 - 2024 Axual B.V.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =========================LICENSE_END==================================
*/

import java.util.Set;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;

import static io.axual.ksml.rest.server.ComponentState.*;

@Slf4j(topic = "ksml.rest.service.startup")
@Path("/startup")
public class StartupResource {
private static final Set<ComponentState> STARTED_STATES = Set.of(NOT_APPLICABLE, STARTING, STARTED, STOPPING, STOPPED);

@GET()
public Response getStartupState() {
final var querier = GlobalState.INSTANCE.querier();

if (querier == null) {
// Service has not started yet
log.trace("KSML Not Started - No querier available, still in startup");
return Response.serverError().build();
}

final var producerState = querier.getProducerState();
final var streamRunnerState = querier.getStreamRunnerState();

if( producerState == NOT_APPLICABLE && streamRunnerState == NOT_APPLICABLE ) {
log.trace("KSML Not Started - Both producerState and streamRunnerState are disabled");
return Response.serverError().build();
}

if (STARTED_STATES.contains(producerState) && STARTED_STATES.contains(streamRunnerState)) {
// KSML is started, return HTTP Status code 204 (OK, No Content) if components
log.trace("KSML Started - producer state '{}' stream runner state '{}' ", producerState, streamRunnerState);
return Response.noContent().build();
} else {
log.trace("KSML Not Started - producer state '{}' stream runner state '{}' ", producerState, streamRunnerState);
return Response.serverError().build();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ public ComponentState getProducerState() {

ComponentState stateConverter(Runner.State state) {
return switch (state) {
case CREATED -> ComponentState.CREATED;
case STARTING -> ComponentState.STARTING;
case STARTED -> ComponentState.STARTED;
case STOPPING -> ComponentState.STOPPING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@
* =========================LICENSE_END==================================
*/

import io.axual.ksml.client.producer.ResolvingProducer;
import io.axual.ksml.generator.TopologyDefinition;
import io.axual.ksml.python.PythonContext;
import io.axual.ksml.python.PythonFunction;
import lombok.Builder;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
Expand All @@ -34,6 +29,12 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import io.axual.ksml.client.producer.ResolvingProducer;
import io.axual.ksml.generator.TopologyDefinition;
import io.axual.ksml.python.PythonContext;
import io.axual.ksml.python.PythonFunction;
import lombok.Builder;

import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;

Expand All @@ -44,18 +45,30 @@ public class KafkaProducerRunner implements Runner {
private final AtomicBoolean hasFailed = new AtomicBoolean(false);
private final AtomicBoolean stopRunning = new AtomicBoolean(false);
private final Config config;
private State currentState;

@Builder
public record Config(Map<String, TopologyDefinition> definitions, Map<String, String> kafkaConfig) {
}

public KafkaProducerRunner(Config config) {
this.config = config;
currentState = State.CREATED;
}


public synchronized void setState(State newState) {
if (currentState.isValidNextState(newState)) {
currentState = newState;
} else {
log.warn("Illegal Producer State transition. Current state is {}. Invalid next state {}", currentState, newState);
}
}

public void run() {
log.info("Registering Kafka producer(s)");
isRunning.set(true);
setState(State.STARTING);

try {
config.definitions.forEach((defName, definition) -> {
Expand All @@ -76,6 +89,7 @@ public void run() {
}

try (final Producer<byte[], byte[]> producer = createProducer(getProducerConfigs())) {
setState(State.STARTED);
log.info("Starting Kafka producer(s)");
while (!stopRunning.get() && !hasFailed.get() && scheduler.hasScheduledItems()) {
var scheduledGenerator = scheduler.getScheduledItem();
Expand All @@ -88,9 +102,11 @@ public void run() {
}
}
} catch (Throwable e) {
setState(State.FAILED);
hasFailed.set(true);
log.error("Unhandled producer exception", e);
}
setState(State.STOPPED);
isRunning.set(false);
log.info("Producer(s) stopped");
}
Expand All @@ -114,14 +130,13 @@ private Map<String, Object> getProducerConfigs() {
}

@Override
public State getState() {
if (hasFailed.get()) return State.FAILED;
if (isRunning.get()) return State.STARTED;
return State.STOPPED;
public synchronized State getState() {
return currentState;
}

@Override
public void stop() {
setState(State.STOPPING);
stopRunning.set(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,14 @@ public KafkaStreamsRunner(Config config) {
final var topologyGenerator = new TopologyGenerator(applicationId, (String) optimize);
final var topology = topologyGenerator.create(streamsBuilder, config.definitions);
kafkaStreams = new KafkaStreams(topology, mapToProperties(streamsProps));
kafkaStreams.setStateListener(this::logStreamsStateChange);
kafkaStreams.setUncaughtExceptionHandler(ExecutionContext.INSTANCE::uncaughtException);
}

private void logStreamsStateChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
log.info("Pipeline processing state change. Moving from old state '{}' to new state '{}'", oldState, newState);
}

private Map<String, Object> getStreamsConfig(Map<String, String> initialConfigs, String storageDirectory, ApplicationServerConfig appServer) {
final Map<String, Object> result = initialConfigs != null ? new HashMap<>(initialConfigs) : new HashMap<>();
// Set default value if not explicitly configured
Expand Down Expand Up @@ -108,9 +113,9 @@ private Properties mapToProperties(Map<String, Object> configs) {

@Override
public State getState() {
if (kafkaStreams == null) return State.STOPPED;
return switch (kafkaStreams.state()) {
case CREATED, REBALANCING -> State.STARTING;
case CREATED -> State.CREATED;
case REBALANCING -> State.STARTING;
case RUNNING -> State.STARTED;
case PENDING_SHUTDOWN -> State.STOPPING;
case NOT_RUNNING -> State.STOPPED;
Expand Down
26 changes: 21 additions & 5 deletions ksml-runner/src/main/java/io/axual/ksml/runner/backend/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,29 @@
*/


import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

public interface Runner extends Runnable {
enum State {
STARTING,
STARTED,
STOPPING,
STOPPED,
FAILED
CREATED(1, 2, 5), // Ordinal 0
STARTING(2, 3, 4, 5), // Ordinal 1
STARTED(1, 3, 4, 5), // Ordinal 2
STOPPING(4, 5), // Ordinal 3
STOPPED, // Ordinal 4
FAILED // Ordinal 5
;

State(final Integer... validNextStates) {
this.validNextStates.addAll(Arrays.asList(validNextStates));
}

private final Set<Integer> validNextStates = new HashSet<>();

public boolean isValidNextState(State nextState) {
return validNextStates.contains(nextState.ordinal());
}
}

State getState();
Expand Down
4 changes: 2 additions & 2 deletions packaging/helm-charts/ksml/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ schemaDefinitions: {}
# The startup probe for the KSML containers
startupProbe:
httpGet:
path: /ready
path: /startup
port: http
# -- Minimum consecutive failures for the probe to be considered failed.
# A failed startupProbe will mark the container as unhealthy and triggers a restart for that specific container.
Expand Down Expand Up @@ -121,7 +121,7 @@ readinessProbe:
livenessProbe:
# The service definition to call to determine liveness.
httpGet:
path: /ready
path: /live
port: http
# -- Minimum consecutive failures for the probe to be considered failed after having succeeded.
# A failed livenessProbe will cause the container to be restarted.
Expand Down