From e636da37a5f00572c3de695ea2d4a04c1aa1d0e7 Mon Sep 17 00:00:00 2001 From: Leo Irudayam Date: Tue, 17 Mar 2020 14:36:36 +0100 Subject: [PATCH] Bug fixes --- prepare.sh | 37 +++++++++++++++++-- .../consumer/StreamFlinkKafkaConsumer.kt | 4 +- .../cryptopredictstream/rules/RuleExecutor.kt | 2 +- src/main/resources/application.properties | 1 + 4 files changed, 38 insertions(+), 6 deletions(-) diff --git a/prepare.sh b/prepare.sh index 6b4d371..c047ed8 100755 --- a/prepare.sh +++ b/prepare.sh @@ -3,26 +3,57 @@ # shut down running systemse cd kafka echo 'Shutdown running servers' +lsof -ti tcp:8080 | xargs kill nohup bin/kafka-server-stop.sh > /dev/null 2>&1 & nohup bin/zookeeper-server-stop.sh > /dev/null 2>&1 & sleep 1 +echo 'Clear log file' +rm logs/* + # start up Kafka and Zookeeper echo 'Start Apache Zookeeper' nohup bin/zookeeper-server-start.sh -daemon config/zookeeper.properties > /dev/null 2>&1 & sleep 2 echo 'Start Apache Kafka' nohup bin/kafka-server-start.sh -daemon config/server.properties > /dev/null 2>&1 & -sleep 2 + +sleep 10 +echo 'Clear Topic' +nohup bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic cryptocurrency +nohup bin/kafka-streams-application-reset.sh --application-id "li.crypto-stream" --bootstrap-servers localhost:9092 --input-topics cryptocurrency --zookeeper localhost:2181 +sleep 5 cd ../ # build and start application -mvn spring-boot:run & +nohup mvn spring-boot:run & ( echo "Waiting... Tomcat to launch on 8080..." while ! nc -z localhost 8080; do - sleep 0.5 # wait for 1/10 of the second before check again + sleep 1 done echo "Tomcat launched" && open "http://localhost:8080" ) + + +# function called by trap +other_commands() { + echo 'Shutdown triggered' + lsof -ti tcp:8080 | xargs kill + nohup kafka/bin/kafka-server-stop.sh > /dev/null 2>&1 & + sleep 5 + nohup kafka/bin/zookeeper-server-stop.sh > /dev/null 2>&1 & + echo 'Shutdown complete' + exit 1 +} + +trap 'other_commands' SIGINT + +input="$@" + +while true; do + printf "\rExecute control C to stop >>> " + read input + [[ $input == finish ]] && break +done \ No newline at end of file diff --git a/src/main/kotlin/li/bfih/cryptopredictstream/consumer/StreamFlinkKafkaConsumer.kt b/src/main/kotlin/li/bfih/cryptopredictstream/consumer/StreamFlinkKafkaConsumer.kt index fe9141b..b0563b9 100644 --- a/src/main/kotlin/li/bfih/cryptopredictstream/consumer/StreamFlinkKafkaConsumer.kt +++ b/src/main/kotlin/li/bfih/cryptopredictstream/consumer/StreamFlinkKafkaConsumer.kt @@ -9,7 +9,7 @@ import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.datastream.DataStreamUtils import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.slf4j.Logger @@ -36,7 +36,7 @@ object StreamFlinkKafkaConsumer { rawStream?.keyBy(KeySelector { it?.symbol!! - })?.window(TumblingEventTimeWindows.of(Time.seconds(15), Time.seconds(1)))?.process(AnomalyDetector()) + })?.window(SlidingEventTimeWindows.of(Time.seconds(15), Time.seconds(1)))?.process(AnomalyDetector()) // for web stream DataStreamUtils.collect(rawStream).iterator().forEach { diff --git a/src/main/kotlin/li/bfih/cryptopredictstream/rules/RuleExecutor.kt b/src/main/kotlin/li/bfih/cryptopredictstream/rules/RuleExecutor.kt index c04ef65..666d4a1 100644 --- a/src/main/kotlin/li/bfih/cryptopredictstream/rules/RuleExecutor.kt +++ b/src/main/kotlin/li/bfih/cryptopredictstream/rules/RuleExecutor.kt @@ -33,7 +33,7 @@ class RuleExecutor(private val dataSet: MutableIterable?, privat min = max(avg - confidenceIntervalFigure * derivation, 0.0f) max = avg + confidenceIntervalFigure * derivation - return (compare > min) || (compare < max) + return (compare < min) || (compare > max) } return false } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 643ce6b..1e5ec54 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -10,6 +10,7 @@ spring.kafka.producer.bootstrap-servers: localhost:9092 spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer +spring.kafka.streams.application-id: li.crypto-stream spring.kafka.listener.missing-topics-fatal=false spring.jackson.serialization.write_dates_as_timestamps=false