Skip to content

Commit

Permalink
Bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Leo Irudayam committed Mar 17, 2020
1 parent e106c1e commit e636da3
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 6 deletions.
37 changes: 34 additions & 3 deletions prepare.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,57 @@
# shut down running systemse
cd kafka
echo 'Shutdown running servers'
lsof -ti tcp:8080 | xargs kill
nohup bin/kafka-server-stop.sh > /dev/null 2>&1 &
nohup bin/zookeeper-server-stop.sh > /dev/null 2>&1 &
sleep 1

echo 'Clear log file'
rm logs/*

# start up Kafka and Zookeeper
echo 'Start Apache Zookeeper'
nohup bin/zookeeper-server-start.sh -daemon config/zookeeper.properties > /dev/null 2>&1 &
sleep 2
echo 'Start Apache Kafka'
nohup bin/kafka-server-start.sh -daemon config/server.properties > /dev/null 2>&1 &
sleep 2

sleep 10
echo 'Clear Topic'
nohup bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic cryptocurrency
nohup bin/kafka-streams-application-reset.sh --application-id "li.crypto-stream" --bootstrap-servers localhost:9092 --input-topics cryptocurrency --zookeeper localhost:2181
sleep 5

cd ../
# build and start application
mvn spring-boot:run &
nohup mvn spring-boot:run &
( echo "Waiting... Tomcat to launch on 8080..."

while ! nc -z localhost 8080; do
sleep 0.5 # wait for 1/10 of the second before check again
sleep 1
done

echo "Tomcat launched" &&
open "http://localhost:8080" )


# function called by trap
other_commands() {
echo 'Shutdown triggered'
lsof -ti tcp:8080 | xargs kill
nohup kafka/bin/kafka-server-stop.sh > /dev/null 2>&1 &
sleep 5
nohup kafka/bin/zookeeper-server-stop.sh > /dev/null 2>&1 &
echo 'Shutdown complete'
exit 1
}

trap 'other_commands' SIGINT

input="$@"

while true; do
printf "\rExecute control C to stop >>> "
read input
[[ $input == finish ]] && break
done
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStreamUtils
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.slf4j.Logger
Expand All @@ -36,7 +36,7 @@ object StreamFlinkKafkaConsumer {

rawStream?.keyBy(KeySelector<CurrencyEntry?, String> {
it?.symbol!!
})?.window(TumblingEventTimeWindows.of(Time.seconds(15), Time.seconds(1)))?.process(AnomalyDetector())
})?.window(SlidingEventTimeWindows.of(Time.seconds(15), Time.seconds(1)))?.process(AnomalyDetector())

// for web stream
DataStreamUtils.collect(rawStream).iterator().forEach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class RuleExecutor(private val dataSet: MutableIterable<CurrencyEntry?>?, privat
min = max(avg - confidenceIntervalFigure * derivation, 0.0f)
max = avg + confidenceIntervalFigure * derivation

return (compare > min) || (compare < max)
return (compare < min) || (compare > max)
}
return false
}
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer

spring.kafka.streams.application-id: li.crypto-stream
spring.kafka.listener.missing-topics-fatal=false

spring.jackson.serialization.write_dates_as_timestamps=false
Expand Down

0 comments on commit e636da3

Please sign in to comment.