Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

import com.monitory.data.sources.MqttSource;
import com.monitory.data.transformations.TimeStampAssigner;
import com.monitory.data.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkApplication {
public class FlinkSourceApplication {
public static void main (String [] args) throws Exception {
// 1. Flink 환경 설정
Configuration conf = new Configuration();
Expand All @@ -16,14 +17,15 @@ public static void main (String [] args) throws Exception {
// 2. 데이터 소스
DataStream<String> sourceStream = env.fromSource(new MqttSource(), WatermarkStrategy.noWatermarks(), "MQTT-Source");

// 3. 데이터 처리: Time Stamp 출력과 Anomaly 감지
// 3. 데이터 처리: Time Stamp 출력
DataStream<String> transformedStream = sourceStream
.map(new TimeStampAssigner());

// 4. 데이터 싱크: 콘솔에 출력
// 4. 데이터 싱크: 콘솔에 출력 & kafka publish
transformedStream.sinkTo(KafkaUtil.createKafkaSink());
transformedStream.print();

// 5. 실행
env.execute("Flink DataStream Example");
env.execute("Flink to Kafka Produce");
}
}
24 changes: 24 additions & 0 deletions src/main/java/com/monitory/data/config/KafkaConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.monitory.data.config;

import java.io.InputStream;
import java.util.Properties;

public class KafkaConfig {
private static final Properties properties = new Properties();

static {
try (InputStream input = KafkaConfig.class.getClassLoader()
.getResourceAsStream("application.properties")) {
if (input == null) {
throw new RuntimeException("❌ application.properties 파일을 찾을 수 없습니다.");
}
properties.load(input);
} catch (Exception e) {
throw new RuntimeException("❌ properties 파일 로딩 실패", e);
}
}

public static String get(String key) {
return properties.getProperty(key);
}
}
42 changes: 42 additions & 0 deletions src/main/java/com/monitory/data/utils/KafkaUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.monitory.data.utils;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.monitory.data.config.KafkaConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;

public class KafkaUtil {
private static final ObjectMapper mapper = new ObjectMapper();

public static KafkaSink<String> createKafkaSink() {
return KafkaSink.<String>builder()
.setBootstrapServers(KafkaConfig.get("KAFKA_SERVER"))
.setRecordSerializer(
KafkaRecordSerializationSchema.<String>builder()
.setValueSerializationSchema(new SimpleStringSchema())
.setTopicSelector((element) -> {
try {
JsonNode json = mapper.readTree(element);

String zoneId = json.path("zoneId").asText(null);
String equipId = json.path("equipId").asText(null);

if (zoneId != null && equipId != null) {
return "EQUIPMENT";
} else if (zoneId != null && equipId == null) {
return "ENVIRONMENT";
} else {
return "sensor.unknown_topic";
}

} catch (Exception e) {
return "sensor.error_topic";
}
})
.build()
)
.build();
}
}