diff --git a/INF_TEST-ssla2n7kxevn6fh72-atsiotap-northeast-2amazonawscom8883/.lck b/INF_TEST-ssla2n7kxevn6fh72-atsiotap-northeast-2amazonawscom8883/.lck deleted file mode 100644 index e69de29..0000000 diff --git a/src/main/java/com/monitory/data/FlinkApplication.java b/src/main/java/com/monitory/data/FlinkSourceApplication.java similarity index 76% rename from src/main/java/com/monitory/data/FlinkApplication.java rename to src/main/java/com/monitory/data/FlinkSourceApplication.java index 9dec0f5..c4a47d5 100644 --- a/src/main/java/com/monitory/data/FlinkApplication.java +++ b/src/main/java/com/monitory/data/FlinkSourceApplication.java @@ -2,12 +2,13 @@ import com.monitory.data.sources.MqttSource; import com.monitory.data.transformations.TimeStampAssigner; +import com.monitory.data.utils.KafkaUtil; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -public class FlinkApplication { +public class FlinkSourceApplication { public static void main (String [] args) throws Exception { // 1. Flink 환경 설정 Configuration conf = new Configuration(); @@ -16,14 +17,15 @@ public static void main (String [] args) throws Exception { // 2. 데이터 소스 DataStream sourceStream = env.fromSource(new MqttSource(), WatermarkStrategy.noWatermarks(), "MQTT-Source"); - // 3. 데이터 처리: Time Stamp 출력과 Anomaly 감지 + // 3. 데이터 처리: Time Stamp 출력 DataStream transformedStream = sourceStream .map(new TimeStampAssigner()); - // 4. 데이터 싱크: 콘솔에 출력 + // 4. 데이터 싱크: 콘솔에 출력 & kafka publish + transformedStream.sinkTo(KafkaUtil.createKafkaSink()); transformedStream.print(); // 5. 실행 - env.execute("Flink DataStream Example"); + env.execute("Flink to Kafka Produce"); } } diff --git a/src/main/java/com/monitory/data/config/KafkaConfig.java b/src/main/java/com/monitory/data/config/KafkaConfig.java new file mode 100644 index 0000000..d04498e --- /dev/null +++ b/src/main/java/com/monitory/data/config/KafkaConfig.java @@ -0,0 +1,24 @@ +package com.monitory.data.config; + +import java.io.InputStream; +import java.util.Properties; + +public class KafkaConfig { + private static final Properties properties = new Properties(); + + static { + try (InputStream input = KafkaConfig.class.getClassLoader() + .getResourceAsStream("application.properties")) { + if (input == null) { + throw new RuntimeException("❌ application.properties 파일을 찾을 수 없습니다."); + } + properties.load(input); + } catch (Exception e) { + throw new RuntimeException("❌ properties 파일 로딩 실패", e); + } + } + + public static String get(String key) { + return properties.getProperty(key); + } +} diff --git a/src/main/java/com/monitory/data/utils/KafkaUtil.java b/src/main/java/com/monitory/data/utils/KafkaUtil.java new file mode 100644 index 0000000..cf27ef2 --- /dev/null +++ b/src/main/java/com/monitory/data/utils/KafkaUtil.java @@ -0,0 +1,42 @@ +package com.monitory.data.utils; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.monitory.data.config.KafkaConfig; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaSink; + +public class KafkaUtil { + private static final ObjectMapper mapper = new ObjectMapper(); + + public static KafkaSink createKafkaSink() { + return KafkaSink.builder() + .setBootstrapServers(KafkaConfig.get("KAFKA_SERVER")) + .setRecordSerializer( + KafkaRecordSerializationSchema.builder() + .setValueSerializationSchema(new SimpleStringSchema()) + .setTopicSelector((element) -> { + try { + JsonNode json = mapper.readTree(element); + + String zoneId = json.path("zoneId").asText(null); + String equipId = json.path("equipId").asText(null); + + if (zoneId != null && equipId != null) { + return "EQUIPMENT"; + } else if (zoneId != null && equipId == null) { + return "ENVIRONMENT"; + } else { + return "sensor.unknown_topic"; + } + + } catch (Exception e) { + return "sensor.error_topic"; + } + }) + .build() + ) + .build(); + } +}