Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion service/sensor/RealSensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from awscrt import mqtt
from service.simulation.SimulatorInterface2 import SimulatorInterface2
from mqtt_util.publish import AwsMQTT
import serial.tools.list_ports

class RealSensor(SimulatorInterface2):
def __init__(self, idx, zone_id, equip_id, interval, msg_count, conn=None, stop_event=None):
Expand All @@ -21,7 +22,7 @@ def __init__(self, idx, zone_id, equip_id, interval, msg_count, conn=None, stop_
self.topic_name_temp = self._build_topic(self.zone_id, self.equip_id, self.sensor_id, "temp")
self.topic_name_humid = self._build_topic(self.zone_id, self.equip_id, self.sensor_id, "humid")
# 시리얼 포트 설정
self.serial_port = 'COM3' # Windows COM 포트
self.serial_port = self._find_serial_port() # Windows COM 포트
Copy link

Copilot AI May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider extracting the hardcoded fallback port ('COM3') into a configuration setting or environment variable for easier customization.

Copilot uses AI. Check for mistakes.
self.baudrate = 9600 # 바우드

######## 오버라이딩은 하되 내용은 필요없는 메서드 ########
Expand Down Expand Up @@ -51,6 +52,27 @@ def start_publishing(self):
thread.start()
return thread

def _find_serial_port(self):
"""USB Serial Device 포트를 자동으로 찾습니다"""

# 사용 가능한 모든 포트 가져오기
ports = list(serial.tools.list_ports.comports())

# 포트 정보 출력 (디버깅용)
print(f"Available ports: {len(ports)}")
for port in ports:
print(f"- {port.device}: {port.description} (manufacturer: {port.manufacturer})")

# "USB Serial Device"라는 설명이 있는 포트 찾기
for port in ports:
if "USB Serial Device" in port.description:
print(f"Found USB Serial Device at {port.device}")
return port.device

# 찾지 못한 경우 기본값 'COM3' 반환
print("USB Serial Device not found. Falling back to COM3.")
Comment on lines +62 to +73
Copy link

Copilot AI May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace print statements with the project's logging framework to ensure consistent log levels and outputs.

Suggested change
print(f"Available ports: {len(ports)}")
for port in ports:
print(f"- {port.device}: {port.description} (manufacturer: {port.manufacturer})")
# "USB Serial Device"라는 설명이 있는 포트 찾기
for port in ports:
if "USB Serial Device" in port.description:
print(f"Found USB Serial Device at {port.device}")
return port.device
# 찾지 못한 경우 기본값 'COM3' 반환
print("USB Serial Device not found. Falling back to COM3.")
logging.info(f"Available ports: {len(ports)}")
for port in ports:
logging.debug(f"- {port.device}: {port.description} (manufacturer: {port.manufacturer})")
# "USB Serial Device"라는 설명이 있는 포트 찾기
for port in ports:
if "USB Serial Device" in port.description:
logging.info(f"Found USB Serial Device at {port.device}")
return port.device
# 찾지 못한 경우 기본값 'COM3' 반환
logging.warning("USB Serial Device not found. Falling back to COM3.")

Copilot uses AI. Check for mistakes.
return 'COM3'
Comment on lines +66 to +74
Copy link

Copilot AI May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Filtering by the literal string 'USB Serial Device' may not catch all adapter types; consider making this match rule configurable or more robust.

Suggested change
# "USB Serial Device"라는 설명이 있는 포트 찾기
for port in ports:
if "USB Serial Device" in port.description:
print(f"Found USB Serial Device at {port.device}")
return port.device
# 찾지 못한 경우 기본값 'COM3' 반환
print("USB Serial Device not found. Falling back to COM3.")
return 'COM3'
# Configurable keywords for matching port descriptions
match_keywords = getattr(self, "serial_port_keywords", ["USB Serial Device"])
# Search for a port matching any of the keywords
for port in ports:
if any(keyword.lower() in port.description.lower() for keyword in match_keywords):
print(f"Found matching device at {port.device} (description: {port.description})")
return port.device
# If no matching port is found, fall back to default
print(f"No matching device found. Falling back to default port: 'COM3'.")
return 'COM3'

Copilot uses AI. Check for mistakes.

def _read_and_publish_loop(self):
try:
# 1) 시리얼 열고 대기
Expand Down
2 changes: 1 addition & 1 deletion simulation_cconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
{
"count": 5,
"interval": 3,
"equip_id": "20250507171316-389",
"equip_id": "20250507165750-827",
"zone_id": "20250507165750-827",
"simulator": "real_sensor",
"sensor_num": 1
Expand Down
2 changes: 1 addition & 1 deletion streamlit_app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def main():
)
# real_sensor가 선택된 경우 경고 메시지 표시
if device["simulator"] == "real_sensor":
st.warning("⚠️ 'real_sensor'는 로컬 환경에서만 사용 가능하며, 센서를 USB 포트(COM3)에 연결해야 합니다.")
st.warning("⚠️ 'real_sensor'는 로컬 환경에서만 사용 가능하며, 센서를 USB 포트에 연결해야 합니다.")
# 센서 갯수 입력
device["sensor_num"] = st.number_input(f"Sensor Num (Device {i + 1})", value=device["sensor_num"], key=f"sensor_num_{i}")

Expand Down
Loading