diff --git a/ksml-runner/src/main/java/io/axual/ksml/runner/KSMLRunner.java b/ksml-runner/src/main/java/io/axual/ksml/runner/KSMLRunner.java index ed18c7ff..fdff5b20 100644 --- a/ksml-runner/src/main/java/io/axual/ksml/runner/KSMLRunner.java +++ b/ksml-runner/src/main/java/io/axual/ksml/runner/KSMLRunner.java @@ -41,9 +41,9 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import io.axual.ksml.client.serde.ResolvingDeserializer; import io.axual.ksml.client.serde.ResolvingSerializer; @@ -199,8 +199,9 @@ public static void main(String[] args) { try (var prometheusExport = new PrometheusExport(config.getKsmlConfig().getPrometheusConfig())) { prometheusExport.start(); final var executorService = Executors.newFixedThreadPool(2); - final var producerFuture = producer == null ? CompletableFuture.completedFuture(null) : executorService.submit(producer); - final var streamsFuture = streams == null ? CompletableFuture.completedFuture(null) : executorService.submit(streams); + + final var producerFuture = producer == null ? CompletableFuture.completedFuture(null) : CompletableFuture.runAsync(producer, executorService); + final var streamsFuture = streams == null ? CompletableFuture.completedFuture(null) : CompletableFuture.runAsync(streams, executorService); try { // Allow the runner(s) to start @@ -211,40 +212,30 @@ public static void main(String[] args) { restServer.initGlobalQuerier(getQuerier(streams, producer)); } - while (!producerFuture.isDone() || !streamsFuture.isDone()) { - final var producerError = producer != null && producer.getState() == Runner.State.FAILED; - final var streamsError = streams != null && streams.getState() == Runner.State.FAILED; - - // If either runner has an error, stop all runners - if (producerError || streamsError) { - if (producer != null) producer.stop(); - if (streams != null) streams.stop(); - if (producer != null) { - try { - producerFuture.get(5, TimeUnit.SECONDS); - } catch (TimeoutException | ExecutionException | InterruptedException e) { - // Ignore - } - } + producerFuture.whenComplete((result, exc) -> { + if (exc != null) { + log.info("Producer failed", exc); + // Exception, always stop streams too if (streams != null) { - try { - streamsFuture.get(5, TimeUnit.SECONDS); - } catch (TimeoutException | ExecutionException | InterruptedException e) { - // Ignore - } + streams.stop(); } - executorService.shutdown(); - try { - if (!executorService.awaitTermination(2000, TimeUnit.MILLISECONDS)) { - executorService.shutdownNow(); - } - } catch (InterruptedException e) { - executorService.shutdownNow(); - throw new ExecutionException("Exception caught while shutting down", e); + } + }); + streamsFuture.whenComplete((result, exc) -> { + if (exc != null) { + log.info("Stream processing failed", exc); + // Exception, always stop producer too + if (producer != null) { + producer.stop(); } - break; } - } + }); + + final var allFutures = CompletableFuture.allOf(producerFuture, streamsFuture); + // wait for all futures to finish + allFutures.join(); + + closeExecutorService(executorService); } finally { if (restServer != null) restServer.close(); } @@ -253,13 +244,25 @@ public static void main(String[] args) { } } } catch (Throwable t) { - log.error("Unhandled exception", t); + log.error("KSML Stopping because of unhandled exception"); throw FatalError.reportAndExit(t); } // Explicit exit, need to find out which threads are actually stopping us System.exit(0); } + private static void closeExecutorService(final ExecutorService executorService) throws ExecutionException { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(2000, TimeUnit.MILLISECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + throw new ExecutionException("Exception caught while shutting down", e); + } + } + protected static KsmlQuerier getQuerier(KafkaStreamsRunner streamsRunner, KafkaProducerRunner producerRunner) { return new KsmlQuerier() { @Override diff --git a/ksml-runner/src/main/java/io/axual/ksml/runner/backend/KafkaProducerRunner.java b/ksml-runner/src/main/java/io/axual/ksml/runner/backend/KafkaProducerRunner.java index bda9b6c0..74f512a8 100644 --- a/ksml-runner/src/main/java/io/axual/ksml/runner/backend/KafkaProducerRunner.java +++ b/ksml-runner/src/main/java/io/axual/ksml/runner/backend/KafkaProducerRunner.java @@ -33,6 +33,7 @@ import io.axual.ksml.generator.TopologyDefinition; import io.axual.ksml.python.PythonContext; import io.axual.ksml.python.PythonFunction; +import io.axual.ksml.runner.exception.RunnerException; import lombok.Builder; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; @@ -84,8 +85,8 @@ public void run() { }); }); } catch (Exception e) { - log.error("Error while registering functions and producers", e); - hasFailed.set(true); + setState(State.FAILED); + throw new RunnerException("Error while registering functions and producers", e); } try (final Producer producer = createProducer(getProducerConfigs())) { @@ -103,8 +104,7 @@ public void run() { } } catch (Throwable e) { setState(State.FAILED); - hasFailed.set(true); - log.error("Unhandled producer exception", e); + throw new RunnerException("Unhandled producer exception", e); } setState(State.STOPPED); isRunning.set(false); diff --git a/ksml-runner/src/main/java/io/axual/ksml/runner/backend/KafkaStreamsRunner.java b/ksml-runner/src/main/java/io/axual/ksml/runner/backend/KafkaStreamsRunner.java index 15d9badb..8e75a0d9 100644 --- a/ksml-runner/src/main/java/io/axual/ksml/runner/backend/KafkaStreamsRunner.java +++ b/ksml-runner/src/main/java/io/axual/ksml/runner/backend/KafkaStreamsRunner.java @@ -39,6 +39,7 @@ import io.axual.ksml.execution.ExecutionErrorHandler; import io.axual.ksml.generator.TopologyDefinition; import io.axual.ksml.runner.config.ApplicationServerConfig; +import io.axual.ksml.runner.exception.RunnerException; import io.axual.ksml.runner.streams.KSMLClientSupplier; import lombok.Builder; import lombok.Getter; @@ -143,6 +144,9 @@ public void run() { log.info("Kafka Streams has stopped"); } } + if (getState() == State.FAILED) { + throw new RunnerException("Kafka Streams is in a failed state"); + } } @Override diff --git a/ksml/src/main/java/io/axual/ksml/execution/FatalError.java b/ksml/src/main/java/io/axual/ksml/execution/FatalError.java index 91a71388..88b9cd26 100644 --- a/ksml/src/main/java/io/axual/ksml/execution/FatalError.java +++ b/ksml/src/main/java/io/axual/ksml/execution/FatalError.java @@ -20,43 +20,37 @@ * =========================LICENSE_END================================== */ -import io.axual.ksml.exception.TopologyException; import lombok.extern.slf4j.Slf4j; @Slf4j public class FatalError { - private FatalError() { - } - public static Throwable topologyError(String message) { - return reportAndExit(new TopologyException(message)); + public static final String NEW_LINE = System.lineSeparator(); + public static final String LOG_ITEM_SEPARATOR = "==========="+NEW_LINE; + + private FatalError() { } public static RuntimeException reportAndExit(Throwable t) { - log.error("\n\n"); - log.error("Fatal error"); - printLineSeparator(); - printExceptionDetails(t); + var messageBuilder = new StringBuilder().append(NEW_LINE); + messageBuilder.append("FatalError").append(NEW_LINE).append(LOG_ITEM_SEPARATOR); + printExceptionDetails(messageBuilder, t); + log.error(messageBuilder.toString()); System.exit(1); - // Dummy return to resolve compiler errors return new RuntimeException(t.getMessage()); } - private static void printExceptionDetails(Throwable t) { + private static void printExceptionDetails(StringBuilder messageBuilder, Throwable t) { if (t.getCause() != null) { - printExceptionDetails(t.getCause()); - printLineSeparator(); - log.error("Above error caused: {}", t.getMessage()); + printExceptionDetails(messageBuilder, t.getCause()); + messageBuilder.append(LOG_ITEM_SEPARATOR).append(NEW_LINE); + messageBuilder.append("Above error caused: ").append(t.getMessage()).append(NEW_LINE); } else { - log.error("Description: {}", t.getMessage()); + messageBuilder.append("Description: ").append(t.getMessage()).append(NEW_LINE); } - log.error("Stack trace:"); + messageBuilder.append("Stack trace: ").append(NEW_LINE); for (var ste : t.getStackTrace()) { - log.error(" {}::{} @ {}:{}", ste.getClassName(), ste.getMethodName(), ste.getFileName(), ste.getLineNumber()); + messageBuilder.append(" %s::%s @ %s:%s".formatted(ste.getClassName(), ste.getMethodName(), ste.getFileName(), ste.getLineNumber())).append(NEW_LINE); } } - - private static void printLineSeparator() { - log.error("==========="); - } }