From 1b510ed0e9cf8564ce852be48719b79a3b6e2095 Mon Sep 17 00:00:00 2001 From: Subin Cho Date: Wed, 30 Apr 2025 10:02:38 +0900 Subject: [PATCH 1/3] =?UTF-8?q?feat=20|=20sprint1=20|=20FRB-31=20|=20kafka?= =?UTF-8?q?=20publish=20=EC=84=A4=EC=A0=95=20|=20=EC=A1=B0=EC=88=98?= =?UTF-8?q?=EB=B9=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../.lck | 0 .../com/monitory/data/FlinkApplication.java | 6 ++- .../com/monitory/data/config/KafkaConfig.java | 24 +++++++++++ .../com/monitory/data/utils/KafkaUtil.java | 42 +++++++++++++++++++ 4 files changed, 70 insertions(+), 2 deletions(-) rename {INF_TEST-ssla2n7kxevn6fh72-atsiotap-northeast-2amazonawscom8883 => INF_TEST-ssla2q1cmw33m6k7u-atsiotap-northeast-2amazonawscom8883}/.lck (100%) create mode 100644 src/main/java/com/monitory/data/config/KafkaConfig.java create mode 100644 src/main/java/com/monitory/data/utils/KafkaUtil.java diff --git a/INF_TEST-ssla2n7kxevn6fh72-atsiotap-northeast-2amazonawscom8883/.lck b/INF_TEST-ssla2q1cmw33m6k7u-atsiotap-northeast-2amazonawscom8883/.lck similarity index 100% rename from INF_TEST-ssla2n7kxevn6fh72-atsiotap-northeast-2amazonawscom8883/.lck rename to INF_TEST-ssla2q1cmw33m6k7u-atsiotap-northeast-2amazonawscom8883/.lck diff --git a/src/main/java/com/monitory/data/FlinkApplication.java b/src/main/java/com/monitory/data/FlinkApplication.java index 9dec0f5..9a5eac6 100644 --- a/src/main/java/com/monitory/data/FlinkApplication.java +++ b/src/main/java/com/monitory/data/FlinkApplication.java @@ -2,6 +2,7 @@ import com.monitory.data.sources.MqttSource; import com.monitory.data.transformations.TimeStampAssigner; +import com.monitory.data.utils.KafkaUtil; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; @@ -16,11 +17,12 @@ public static void main (String [] args) throws Exception { // 2. 데이터 소스 DataStream sourceStream = env.fromSource(new MqttSource(), WatermarkStrategy.noWatermarks(), "MQTT-Source"); - // 3. 데이터 처리: Time Stamp 출력과 Anomaly 감지 + // 3. 데이터 처리: Time Stamp 출력 DataStream transformedStream = sourceStream .map(new TimeStampAssigner()); - // 4. 데이터 싱크: 콘솔에 출력 + // 4. 데이터 싱크: 콘솔에 출력 & kafka publish + transformedStream.sinkTo(KafkaUtil.createKafkaSink()); transformedStream.print(); // 5. 실행 diff --git a/src/main/java/com/monitory/data/config/KafkaConfig.java b/src/main/java/com/monitory/data/config/KafkaConfig.java new file mode 100644 index 0000000..d04498e --- /dev/null +++ b/src/main/java/com/monitory/data/config/KafkaConfig.java @@ -0,0 +1,24 @@ +package com.monitory.data.config; + +import java.io.InputStream; +import java.util.Properties; + +public class KafkaConfig { + private static final Properties properties = new Properties(); + + static { + try (InputStream input = KafkaConfig.class.getClassLoader() + .getResourceAsStream("application.properties")) { + if (input == null) { + throw new RuntimeException("❌ application.properties 파일을 찾을 수 없습니다."); + } + properties.load(input); + } catch (Exception e) { + throw new RuntimeException("❌ properties 파일 로딩 실패", e); + } + } + + public static String get(String key) { + return properties.getProperty(key); + } +} diff --git a/src/main/java/com/monitory/data/utils/KafkaUtil.java b/src/main/java/com/monitory/data/utils/KafkaUtil.java new file mode 100644 index 0000000..713d433 --- /dev/null +++ b/src/main/java/com/monitory/data/utils/KafkaUtil.java @@ -0,0 +1,42 @@ +package com.monitory.data.utils; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.monitory.data.config.KafkaConfig; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaSink; + +public class KafkaUtil { + private static final ObjectMapper mapper = new ObjectMapper(); + + public static KafkaSink createKafkaSink() { + return KafkaSink.builder() + .setBootstrapServers(KafkaConfig.get("KAFKA_SERVER")) + .setRecordSerializer( + KafkaRecordSerializationSchema.builder() + .setValueSerializationSchema(new SimpleStringSchema()) + .setTopicSelector((element) -> { // <--- 바로 이 부분! + try { + // 1. 입력 문자열(element)을 JSON으로 파싱 + JsonNode json = mapper.readTree(element); + + // 2. 필요한 필드 값 추출 + String zoneId = json.path("zoneId").asText("unknown_zone"); + String equipId = json.path("equipId").asText("unknown_equip"); + String sensorId = json.path("sensorId").asText("unknown_sensor"); + String sensorType = json.path("sensorType").asText("unknown_type"); + + // 3. 추출된 값으로 원하는 형식의 토픽 이름 생성 및 반환 + return String.format("sensor.%s.%s.%s.%s", zoneId, equipId, sensorId, sensorType); + + } catch (Exception e) { + // 4. 오류 발생 시 대체 토픽 이름 반환 + return "sensor/error_topic"; + } + }) + .build() + ) + .build(); + } +} From 6150ec1a9ee35ed161305154e2bb2c4b23e771fa Mon Sep 17 00:00:00 2001 From: Subin Cho Date: Wed, 30 Apr 2025 14:03:33 +0900 Subject: [PATCH 2/3] =?UTF-8?q?feat=20|=20sprint1=20|=20FRB-31=20|=20kafka?= =?UTF-8?q?=20=ED=86=A0=ED=94=BD=20=EC=84=A4=EC=A0=95=20|=20=EC=A1=B0?= =?UTF-8?q?=EC=88=98=EB=B9=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../.lck | 0 ...ation.java => FlinkSourceApplication.java} | 4 +-- .../com/monitory/data/utils/KafkaUtil.java | 32 +++++++++---------- 3 files changed, 18 insertions(+), 18 deletions(-) delete mode 100644 INF_TEST-ssla2q1cmw33m6k7u-atsiotap-northeast-2amazonawscom8883/.lck rename src/main/java/com/monitory/data/{FlinkApplication.java => FlinkSourceApplication.java} (93%) diff --git a/INF_TEST-ssla2q1cmw33m6k7u-atsiotap-northeast-2amazonawscom8883/.lck b/INF_TEST-ssla2q1cmw33m6k7u-atsiotap-northeast-2amazonawscom8883/.lck deleted file mode 100644 index e69de29..0000000 diff --git a/src/main/java/com/monitory/data/FlinkApplication.java b/src/main/java/com/monitory/data/FlinkSourceApplication.java similarity index 93% rename from src/main/java/com/monitory/data/FlinkApplication.java rename to src/main/java/com/monitory/data/FlinkSourceApplication.java index 9a5eac6..c4a47d5 100644 --- a/src/main/java/com/monitory/data/FlinkApplication.java +++ b/src/main/java/com/monitory/data/FlinkSourceApplication.java @@ -8,7 +8,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -public class FlinkApplication { +public class FlinkSourceApplication { public static void main (String [] args) throws Exception { // 1. Flink 환경 설정 Configuration conf = new Configuration(); @@ -26,6 +26,6 @@ public static void main (String [] args) throws Exception { transformedStream.print(); // 5. 실행 - env.execute("Flink DataStream Example"); + env.execute("Flink to Kafka Produce"); } } diff --git a/src/main/java/com/monitory/data/utils/KafkaUtil.java b/src/main/java/com/monitory/data/utils/KafkaUtil.java index 713d433..60a4359 100644 --- a/src/main/java/com/monitory/data/utils/KafkaUtil.java +++ b/src/main/java/com/monitory/data/utils/KafkaUtil.java @@ -16,25 +16,25 @@ public static KafkaSink createKafkaSink() { .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setValueSerializationSchema(new SimpleStringSchema()) - .setTopicSelector((element) -> { // <--- 바로 이 부분! - try { - // 1. 입력 문자열(element)을 JSON으로 파싱 - JsonNode json = mapper.readTree(element); + .setTopicSelector((element) -> { + try { + JsonNode json = mapper.readTree(element); - // 2. 필요한 필드 값 추출 - String zoneId = json.path("zoneId").asText("unknown_zone"); - String equipId = json.path("equipId").asText("unknown_equip"); - String sensorId = json.path("sensorId").asText("unknown_sensor"); - String sensorType = json.path("sensorType").asText("unknown_type"); + String zoneId = json.path("zoneId").asText(null); + String equipId = json.path("equipId").asText(null); - // 3. 추출된 값으로 원하는 형식의 토픽 이름 생성 및 반환 - return String.format("sensor.%s.%s.%s.%s", zoneId, equipId, sensorId, sensorType); + if (zoneId != null && equipId != null) { + return "EQUIPMENT"; + } else if (equipId != null) { + return "ENVIRONMENT"; + } else { + return "sensor/unknown_topic"; + } - } catch (Exception e) { - // 4. 오류 발생 시 대체 토픽 이름 반환 - return "sensor/error_topic"; - } - }) + } catch (Exception e) { + return "sensor/error_topic"; + } + }) .build() ) .build(); From d70fd2efbacf4c35a98f05db85cd42e12c3872ab Mon Sep 17 00:00:00 2001 From: Subin Cho Date: Wed, 30 Apr 2025 14:18:19 +0900 Subject: [PATCH 3/3] =?UTF-8?q?=20feat=20|=20sprint1=20|=20FRB-31=20|=20ka?= =?UTF-8?q?fka=20=ED=86=A0=ED=94=BD=20=EC=A1=B0=EA=B1=B4=20=EC=88=98?= =?UTF-8?q?=EC=A0=95=20|=20=EC=A1=B0=EC=88=98=EB=B9=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/com/monitory/data/utils/KafkaUtil.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/monitory/data/utils/KafkaUtil.java b/src/main/java/com/monitory/data/utils/KafkaUtil.java index 60a4359..cf27ef2 100644 --- a/src/main/java/com/monitory/data/utils/KafkaUtil.java +++ b/src/main/java/com/monitory/data/utils/KafkaUtil.java @@ -25,14 +25,14 @@ public static KafkaSink createKafkaSink() { if (zoneId != null && equipId != null) { return "EQUIPMENT"; - } else if (equipId != null) { + } else if (zoneId != null && equipId == null) { return "ENVIRONMENT"; } else { - return "sensor/unknown_topic"; + return "sensor.unknown_topic"; } } catch (Exception e) { - return "sensor/error_topic"; + return "sensor.error_topic"; } }) .build()