diff --git a/ksml-query/src/main/java/io/axual/ksml/rest/server/ComponentState.java b/ksml-query/src/main/java/io/axual/ksml/rest/server/ComponentState.java index 9529f8cf..eaf770dc 100644 --- a/ksml-query/src/main/java/io/axual/ksml/rest/server/ComponentState.java +++ b/ksml-query/src/main/java/io/axual/ksml/rest/server/ComponentState.java @@ -28,6 +28,10 @@ public enum ComponentState { * The component is not relevant or used */ NOT_APPLICABLE, + /** + * The component is created, but did not went into start yet + */ + CREATED, /** * The component is still starting */ @@ -47,5 +51,5 @@ public enum ComponentState { /** * The component is in a failed state */ - FAILED + FAILED; } diff --git a/ksml-query/src/main/java/io/axual/ksml/rest/server/LivenessResource.java b/ksml-query/src/main/java/io/axual/ksml/rest/server/LivenessResource.java new file mode 100644 index 00000000..bac534d2 --- /dev/null +++ b/ksml-query/src/main/java/io/axual/ksml/rest/server/LivenessResource.java @@ -0,0 +1,65 @@ +package io.axual.ksml.rest.server; + +/*- + * ========================LICENSE_START================================= + * KSML Queryable State Store + * %% + * Copyright (C) 2021 - 2024 Axual B.V. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * =========================LICENSE_END================================== + */ + +import java.util.Set; + +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.core.Response; +import lombok.extern.slf4j.Slf4j; + +import static io.axual.ksml.rest.server.ComponentState.*; + +@Slf4j(topic = "ksml.rest.service.live") +@Path("live") +public class LivenessResource { + + @GET() + public Response getLivenessState() { + final var querier = GlobalState.INSTANCE.querier(); + + if (querier == null) { + // Service has not started yet + log.trace("KSML Not Alive - No querier available, still in startup"); + return Response.serverError().build(); + } + + final var producerState = querier.getProducerState(); + final var streamRunnerState = querier.getStreamRunnerState(); + + // Misconfiguration + if( producerState == NOT_APPLICABLE && streamRunnerState == NOT_APPLICABLE ) { + log.trace("KSML Not Alive - Both producerState and streamRunnerState are disabled"); + return Response.serverError().build(); + } + + if( producerState == FAILED || streamRunnerState == FAILED || streamRunnerState == STOPPED){ + log.trace("KSML Not Alive - producer state '{}' stream runner state '{}' ", producerState, streamRunnerState); + return Response.serverError().build(); + } else { + // KSML is alive, return HTTP Status code 204 (OK, No Content) if components + log.trace("KSML Alive - producer state '{}' stream runner state '{}' ", producerState, streamRunnerState); + return Response.noContent().build(); + } + } + +} diff --git a/ksml-query/src/main/java/io/axual/ksml/rest/server/ReadyResource.java b/ksml-query/src/main/java/io/axual/ksml/rest/server/ReadyResource.java index 589a373d..78c2dbe0 100644 --- a/ksml-query/src/main/java/io/axual/ksml/rest/server/ReadyResource.java +++ b/ksml-query/src/main/java/io/axual/ksml/rest/server/ReadyResource.java @@ -27,13 +27,16 @@ import jakarta.ws.rs.core.Response; import lombok.extern.slf4j.Slf4j; -import static io.axual.ksml.rest.server.ComponentState.*; +import static io.axual.ksml.rest.server.ComponentState.NOT_APPLICABLE; +import static io.axual.ksml.rest.server.ComponentState.STARTED; +import static io.axual.ksml.rest.server.ComponentState.STOPPED; +import static io.axual.ksml.rest.server.ComponentState.STOPPING; -@Slf4j +@Slf4j(topic = "ksml.rest.service.ready") @Path("ready") public class ReadyResource { - private static final Set NOT_RUNNING_STATES = Set.of(NOT_APPLICABLE, STOPPED, FAILED); - private static final Set RUNNING_STATES = Set.of(STARTING, STARTED, STOPPING); + private static final Set PRODUCER_READY_STATES = Set.of(NOT_APPLICABLE, STARTED, STOPPING, STOPPED); + private static final Set STREAMS_READY_STATES = Set.of(NOT_APPLICABLE, STARTED); @GET() public Response getReadyState() { @@ -41,29 +44,25 @@ public Response getReadyState() { if (querier == null) { // Service has not started yet + log.info("KSML Not Ready -No querier available, still in startup"); return Response.serverError().build(); } final var producerState = querier.getProducerState(); final var streamRunnerState = querier.getStreamRunnerState(); - log.trace("Ready states - producer '{}' stream runner '{}' ", producerState, streamRunnerState); - - // Check if either Producer and StreamRunner has failed, or if they are BOTH in a not running state. - if (producerState == FAILED || streamRunnerState == FAILED || - (NOT_RUNNING_STATES.contains(producerState) && NOT_RUNNING_STATES.contains(streamRunnerState))) { - // One of the components has failed, or both are not running. + if (producerState == NOT_APPLICABLE && streamRunnerState == NOT_APPLICABLE) { + log.info("KSML Not Ready - Both producerState and streamRunnerState are disabled"); return Response.serverError().build(); } - // Check if either producer of consumer is in a running state - if (RUNNING_STATES.contains(producerState) || RUNNING_STATES.contains(streamRunnerState)) { - // KSML has started, return HTTP Status code 204 (OK, No Content) if components + if (PRODUCER_READY_STATES.contains(producerState) && STREAMS_READY_STATES.contains(streamRunnerState)) { + // KSML is running, return HTTP Status code 204 (OK, No Content) if components + log.info("KSML Ready - producer state '{}' stream runner state '{}' ", producerState, streamRunnerState); return Response.noContent().build(); } else { - // KSML has started, return HTTP Status code 204 (OK, No Content) if components + log.info("KSML Not Ready - producer state '{}' stream runner state '{}' ", producerState, streamRunnerState); return Response.serverError().build(); } } - } diff --git a/ksml-query/src/main/java/io/axual/ksml/rest/server/RestServer.java b/ksml-query/src/main/java/io/axual/ksml/rest/server/RestServer.java index bcfc9748..3154cf1e 100644 --- a/ksml-query/src/main/java/io/axual/ksml/rest/server/RestServer.java +++ b/ksml-query/src/main/java/io/axual/ksml/rest/server/RestServer.java @@ -54,6 +54,8 @@ public RestServer(HostInfo hostInfo) { // configure REST service ResourceConfig rc = new ResourceConfig(); + rc.register(StartupResource.class); + rc.register(LivenessResource.class); rc.register(ReadyResource.class); rc.register(KeyValueStoreResource.class); rc.register(WindowedKeyValueStoreResource.class); diff --git a/ksml-query/src/main/java/io/axual/ksml/rest/server/StartupResource.java b/ksml-query/src/main/java/io/axual/ksml/rest/server/StartupResource.java new file mode 100644 index 00000000..2173e79f --- /dev/null +++ b/ksml-query/src/main/java/io/axual/ksml/rest/server/StartupResource.java @@ -0,0 +1,65 @@ +package io.axual.ksml.rest.server; + +/*- + * ========================LICENSE_START================================= + * KSML Queryable State Store + * %% + * Copyright (C) 2021 - 2024 Axual B.V. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * =========================LICENSE_END================================== + */ + +import java.util.Set; + +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.core.Response; +import lombok.extern.slf4j.Slf4j; + +import static io.axual.ksml.rest.server.ComponentState.*; + +@Slf4j(topic = "ksml.rest.service.startup") +@Path("/startup") +public class StartupResource { + private static final Set STARTED_STATES = Set.of(NOT_APPLICABLE, STARTING, STARTED, STOPPING, STOPPED); + + @GET() + public Response getStartupState() { + final var querier = GlobalState.INSTANCE.querier(); + + if (querier == null) { + // Service has not started yet + log.trace("KSML Not Started - No querier available, still in startup"); + return Response.serverError().build(); + } + + final var producerState = querier.getProducerState(); + final var streamRunnerState = querier.getStreamRunnerState(); + + if( producerState == NOT_APPLICABLE && streamRunnerState == NOT_APPLICABLE ) { + log.trace("KSML Not Started - Both producerState and streamRunnerState are disabled"); + return Response.serverError().build(); + } + + if (STARTED_STATES.contains(producerState) && STARTED_STATES.contains(streamRunnerState)) { + // KSML is started, return HTTP Status code 204 (OK, No Content) if components + log.trace("KSML Started - producer state '{}' stream runner state '{}' ", producerState, streamRunnerState); + return Response.noContent().build(); + } else { + log.trace("KSML Not Started - producer state '{}' stream runner state '{}' ", producerState, streamRunnerState); + return Response.serverError().build(); + } + } + +} diff --git a/ksml-runner/src/main/java/io/axual/ksml/runner/KSMLRunner.java b/ksml-runner/src/main/java/io/axual/ksml/runner/KSMLRunner.java index d7ce542c..ed18c7ff 100644 --- a/ksml-runner/src/main/java/io/axual/ksml/runner/KSMLRunner.java +++ b/ksml-runner/src/main/java/io/axual/ksml/runner/KSMLRunner.java @@ -304,6 +304,7 @@ public ComponentState getProducerState() { ComponentState stateConverter(Runner.State state) { return switch (state) { + case CREATED -> ComponentState.CREATED; case STARTING -> ComponentState.STARTING; case STARTED -> ComponentState.STARTED; case STOPPING -> ComponentState.STOPPING; diff --git a/ksml-runner/src/main/java/io/axual/ksml/runner/backend/KafkaProducerRunner.java b/ksml-runner/src/main/java/io/axual/ksml/runner/backend/KafkaProducerRunner.java index 23c20aaf..bda9b6c0 100644 --- a/ksml-runner/src/main/java/io/axual/ksml/runner/backend/KafkaProducerRunner.java +++ b/ksml-runner/src/main/java/io/axual/ksml/runner/backend/KafkaProducerRunner.java @@ -20,11 +20,6 @@ * =========================LICENSE_END================================== */ -import io.axual.ksml.client.producer.ResolvingProducer; -import io.axual.ksml.generator.TopologyDefinition; -import io.axual.ksml.python.PythonContext; -import io.axual.ksml.python.PythonFunction; -import lombok.Builder; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.slf4j.Logger; @@ -34,6 +29,12 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import io.axual.ksml.client.producer.ResolvingProducer; +import io.axual.ksml.generator.TopologyDefinition; +import io.axual.ksml.python.PythonContext; +import io.axual.ksml.python.PythonFunction; +import lombok.Builder; + import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; @@ -44,6 +45,7 @@ public class KafkaProducerRunner implements Runner { private final AtomicBoolean hasFailed = new AtomicBoolean(false); private final AtomicBoolean stopRunning = new AtomicBoolean(false); private final Config config; + private State currentState; @Builder public record Config(Map definitions, Map kafkaConfig) { @@ -51,11 +53,22 @@ public record Config(Map definitions, Map { @@ -76,6 +89,7 @@ public void run() { } try (final Producer producer = createProducer(getProducerConfigs())) { + setState(State.STARTED); log.info("Starting Kafka producer(s)"); while (!stopRunning.get() && !hasFailed.get() && scheduler.hasScheduledItems()) { var scheduledGenerator = scheduler.getScheduledItem(); @@ -88,9 +102,11 @@ public void run() { } } } catch (Throwable e) { + setState(State.FAILED); hasFailed.set(true); log.error("Unhandled producer exception", e); } + setState(State.STOPPED); isRunning.set(false); log.info("Producer(s) stopped"); } @@ -114,14 +130,13 @@ private Map getProducerConfigs() { } @Override - public State getState() { - if (hasFailed.get()) return State.FAILED; - if (isRunning.get()) return State.STARTED; - return State.STOPPED; + public synchronized State getState() { + return currentState; } @Override public void stop() { + setState(State.STOPPING); stopRunning.set(true); } } diff --git a/ksml-runner/src/main/java/io/axual/ksml/runner/backend/KafkaStreamsRunner.java b/ksml-runner/src/main/java/io/axual/ksml/runner/backend/KafkaStreamsRunner.java index d8ae7581..15d9badb 100644 --- a/ksml-runner/src/main/java/io/axual/ksml/runner/backend/KafkaStreamsRunner.java +++ b/ksml-runner/src/main/java/io/axual/ksml/runner/backend/KafkaStreamsRunner.java @@ -74,9 +74,14 @@ public KafkaStreamsRunner(Config config) { final var topologyGenerator = new TopologyGenerator(applicationId, (String) optimize); final var topology = topologyGenerator.create(streamsBuilder, config.definitions); kafkaStreams = new KafkaStreams(topology, mapToProperties(streamsProps)); + kafkaStreams.setStateListener(this::logStreamsStateChange); kafkaStreams.setUncaughtExceptionHandler(ExecutionContext.INSTANCE::uncaughtException); } + private void logStreamsStateChange(KafkaStreams.State newState, KafkaStreams.State oldState) { + log.info("Pipeline processing state change. Moving from old state '{}' to new state '{}'", oldState, newState); + } + private Map getStreamsConfig(Map initialConfigs, String storageDirectory, ApplicationServerConfig appServer) { final Map result = initialConfigs != null ? new HashMap<>(initialConfigs) : new HashMap<>(); // Set default value if not explicitly configured @@ -108,9 +113,9 @@ private Properties mapToProperties(Map configs) { @Override public State getState() { - if (kafkaStreams == null) return State.STOPPED; return switch (kafkaStreams.state()) { - case CREATED, REBALANCING -> State.STARTING; + case CREATED -> State.CREATED; + case REBALANCING -> State.STARTING; case RUNNING -> State.STARTED; case PENDING_SHUTDOWN -> State.STOPPING; case NOT_RUNNING -> State.STOPPED; diff --git a/ksml-runner/src/main/java/io/axual/ksml/runner/backend/Runner.java b/ksml-runner/src/main/java/io/axual/ksml/runner/backend/Runner.java index 8784a0a6..a6e8f047 100644 --- a/ksml-runner/src/main/java/io/axual/ksml/runner/backend/Runner.java +++ b/ksml-runner/src/main/java/io/axual/ksml/runner/backend/Runner.java @@ -21,13 +21,29 @@ */ +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + public interface Runner extends Runnable { enum State { - STARTING, - STARTED, - STOPPING, - STOPPED, - FAILED + CREATED(1, 2, 5), // Ordinal 0 + STARTING(2, 3, 4, 5), // Ordinal 1 + STARTED(1, 3, 4, 5), // Ordinal 2 + STOPPING(4, 5), // Ordinal 3 + STOPPED, // Ordinal 4 + FAILED // Ordinal 5 + ; + + State(final Integer... validNextStates) { + this.validNextStates.addAll(Arrays.asList(validNextStates)); + } + + private final Set validNextStates = new HashSet<>(); + + public boolean isValidNextState(State nextState) { + return validNextStates.contains(nextState.ordinal()); + } } State getState(); diff --git a/packaging/helm-charts/ksml/values.yaml b/packaging/helm-charts/ksml/values.yaml index 4eced076..0de74065 100644 --- a/packaging/helm-charts/ksml/values.yaml +++ b/packaging/helm-charts/ksml/values.yaml @@ -85,7 +85,7 @@ schemaDefinitions: {} # The startup probe for the KSML containers startupProbe: httpGet: - path: /ready + path: /startup port: http # -- Minimum consecutive failures for the probe to be considered failed. # A failed startupProbe will mark the container as unhealthy and triggers a restart for that specific container. @@ -121,7 +121,7 @@ readinessProbe: livenessProbe: # The service definition to call to determine liveness. httpGet: - path: /ready + path: /live port: http # -- Minimum consecutive failures for the probe to be considered failed after having succeeded. # A failed livenessProbe will cause the container to be restarted.