Skip to content

[FIX] Redis Task 오류 수정#21

Merged
youyeon11 merged 7 commits intodevelopfrom
fix/redis-task-10
Sep 7, 2025
Merged

[FIX] Redis Task 오류 수정#21
youyeon11 merged 7 commits intodevelopfrom
fix/redis-task-10

Conversation

@youyeon11
Copy link
Contributor

📌 작업 목적

  • Redis 구독을 하는 메시지에 대하여 제대로 인식하지 못해서 Task를 발행하지 못하는 오류를 수정하였습니다.

🗂 작업 유형

  • 기능 추가 (Feature)
  • 버그 수정 (Bug Fix)
  • 리팩터링 (Refactor)

🔨 주요 작업 내용

  • tasks.py
        await redis_client.xadd(
            settings.STREAM_RESULT,
            {
                "correlationId": correlationId,
                "type": "image_results",
                "payload": result.model_dump_json()},
            maxlen=10_000,
            approximate=True,
        )
  • Spring Server에서 읽을 수 있도록 필드 맞춰서 발행하였습니다.

  • workers.py
def _sanitize_fields_for_xadd(fields: dict) -> dict:
    # 정제
    cleaned = {}
    for k, v in fields.items():
        k = _decode(k)
        if isinstance(v, (bytes, bytearray)):
            try:
                v = v.decode()
            except Exception:
                pass
        else:
            v = _to_scalr(v)
        cleaned[k] = v
    return cleaned
  • Spring Server에서 보낸 image.jobs의 필드 부분을 읽으려는데 이에 대하여 오류 발생
  • 스프링에서 Redis에 대한 직렬화 문제도 있었으나, 전처리 함수 추가해주었습니다

  • job.py
class ImageJob(BaseModel):
    correlationId: str = Field(alias="correlationId")
    presignedUrl: str = Field(alias="presignedUrl")
    replyQueue: str = Field(alias="replyQueue")
    contentType: str = Field(alias="contentType")
    createdAt: str = Field(alias="createdAt")
    ttlSec: int = Field(alias="ttlSec")
  • 읽기와 발행에 대한 통일성 유지를 위하여 python에서 snake_case로 작성하던 것을 camelCase로 변경하였습니다.

  • tasks.py
        stream_file = await asyncio.to_thread(
            s3_service.download_file_from_presigned_url,
            job.presignedUrl
        )

        stream_file.seek(0)

        pillName, label, confidence = await asyncio.to_thread(
            predictor_service.predict,
            stream_file
        )
  • process_image_scan에서 다운 받은 stream_file에 대한 인자 처리 오류 수정 (asyncio.to_thread에 맞게 작업)
  • asyncio.to_thread()를 사용하여 동기 함수(S3로 부터 사진 다운 : download_file_from() )를 별도의 쓰레드에서 실행해 병렬
  • 다운 받은 스트림 파일 seek(0) 으로 파일 시작 위치로 두어 predictor_service.predict가 파일을 처음부터 읽도록 설정

🧪 테스트 결과

  • Fast API에서 작업 제대로 작동되는 것 확인
image
  • 안정적으로 Redis에 저장됨
image
  • Sring Server에서 읽기 성공적
image

📎 관련 이슈


💬 논의 및 고민한 점

  • 이제 CV 분석을 기반으로 OpenAI에 설명 요청 부분을 추가하겠습니다!!!!

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @youyeon11, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

이 PR은 Redis 메시지 구독 및 Task 발행 과정에서 발생하던 오류를 수정하고, 시스템 간의 데이터 일관성을 확보하는 데 중점을 둡니다. 특히, Spring 서버와의 연동을 위해 Redis 메시지 형식을 표준화하고, 데이터 처리 로직의 안정성을 개선하여 전반적인 시스템의 신뢰도를 향상시켰습니다.

Highlights

  • Redis Task 오류 수정: Redis 구독 메시지를 제대로 인식하지 못해 Task 발행이 실패하던 문제를 해결했습니다.
  • 데이터 직렬화 및 필드명 통일: Spring 서버와의 통신을 위해 Redis에 발행하는 메시지의 필드명을 camelCase로 통일하고, 직렬화 문제를 해결하기 위한 전처리 함수를 추가했습니다.
  • 이미지 처리 로직 개선: S3에서 다운로드한 파일 스트림 처리와 asyncio.to_thread 인자 전달 방식을 수정하여 이미지 처리의 안정성을 높였습니다.
  • Redis 클라이언트 기능 확장: Redis Stream 메시지 처리를 위한 _to_scalar, _sanitize_fields_for_xadd 유틸리티 함수와 xautoclaim 기능을 추가하여 견고성을 강화했습니다.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@youyeon11
Copy link
Contributor Author

🤖 Gemini 코드 리뷰 결과

1. 주요 변경 사항 요약 및 의도 파악

이 PR은 Redis Task 처리 과정에서 발생하는 오류를 수정하고 Spring Server와의 데이터 교환 호환성을 개선하는 것을 목표로 합니다. 구체적으로, Redis 메시지 발행/구독 과정에서 필드 불일치 문제를 해결하고, Python의 snake_case와 Spring의 camelCase 차이를 해결하기 위해 필드 이름을 camelCase로 통일했습니다. 또한, 비동기 처리를 위해 asyncio.to_thread를 사용하여 S3 파일 다운로드 및 예측 작업을 병렬로 처리하도록 개선했습니다.

2. 코드 품질 및 가독성

  • 코드 스타일: 전반적으로 PEP 8을 잘 준수하고 있습니다. 다만, workers.py_sanitize_fields_for_xadd 함수 내에서 try...except 블록이 너무 광범위합니다. decode() 과정에서 예상되는 특정 예외(예: UnicodeDecodeError)만 처리하도록 변경하는 것이 좋습니다.

  • 변수/함수명: 대부분의 변수 및 함수 이름은 명확하고 의도를 잘 드러냅니다. 하지만, workers.py_to_scalr 함수는 _to_scalar로 오타가 있는 것으로 보입니다. 수정이 필요합니다.

  • 주석/문서화: 함수 및 클래스의 목적, 인자, 반환 값에 대한 설명이 잘 되어 있습니다. 특히, JobResult, ImageJob 스키마에 대한 설명은 이해하기 쉽습니다. 다만, _sanitize_fields_for_xadd 함수의 주석은 "정제"라는 단어만으로는 구체적인 역할을 파악하기 어렵습니다. "Redis XADD에 적합한 형식으로 필드 값을 변환합니다." 와 같이 더 자세한 설명을 추가하면 좋겠습니다.

  • 중복 코드: 현재까지는 중복 코드가 눈에 띄지 않습니다.

3. 잠재적 버그 및 엣지 케이스

  • 논리적 오류: workers.py에서 payload 필드 처리 시, payloadbytes 또는 str이 아닌 다른 타입일 경우 json.dumps(payload)를 시도하는데, 이 부분에서 예상치 못한 타입이 들어올 경우 TypeError가 발생할 수 있습니다. payload 타입에 대한 검증 로직을 추가하는 것이 좋습니다. 예를 들어, isinstance(payload, (bytes, str, dict, list)) 와 같이 확인할 수 있습니다.

  • 경쟁 상태: 현재 코드에서 명시적인 경쟁 상태는 보이지 않습니다. 하지만, Redis의 xclaimxautoclaim을 사용하는 부분에서는 분산 환경에서의 경쟁 상태 가능성을 항상 염두에 두어야 합니다.

  • 에러 핸들링: tasks.pyprocess_image_scan 함수에서 발생하는 모든 예외를 Exception으로 처리하고 있습니다. 더 구체적인 예외 타입(예: S3DownloadError, PredictionError)을 정의하고 각각에 맞는 처리 로직을 구현하는 것이 좋습니다.

4. 성능 및 효율성

  • 시간 복잡도: 현재 코드에서 성능 병목이 예상되는 부분은 S3 파일 다운로드와 예측 작업입니다. asyncio.to_thread를 사용하여 병렬 처리하도록 개선한 것은 좋은 접근입니다.

  • 자원 사용: Redis Stream의 maxlen을 10,000으로 설정하여 메모리 사용량을 제한하고 있습니다. 적절한 값으로 보입니다.

  • 최적화 제안: 현재로서는 특별한 최적화 제안 사항은 없습니다. 다만, 향후 데이터 양이 증가할 경우, S3 파일 다운로드 및 예측 작업을 분산 처리하는 방안을 고려해 볼 수 있습니다.

5. 보안 및 아키텍처

  • 보안 취약점: 현재 코드에서 명시적인 보안 취약점은 보이지 않습니다. 다만, 외부 입력 값(예: presignedUrl)을 사용하는 부분에서는 항상 검증 로직을 추가하여 안전성을 확보해야 합니다.

  • 아키텍처 적합성: 변경 사항은 비동기 작업 처리 및 Redis Stream을 활용한 메시지 큐 시스템과 잘 부합하는 것으로 보입니다.

  • 확장성: 현재 아키텍처는 작업 큐를 사용하고 있어, 향후 작업량 증가에 따라 worker 노드를 추가하여 확장하기 용이합니다. 또한, S3와 Redis를 사용하여 저장 및 메시징 부분에서도 확장성을 확보하고 있습니다.

전반적으로 잘 작성된 PR입니다. 위에 언급된 몇 가지 사소한 수정 사항과 제안 사항을 반영한다면 더욱 견고하고 유지보수하기 쉬운 코드가 될 것입니다. 특히, 에러 핸들링 부분을 보완하고, payload 처리 부분에서 타입 검증을 추가하는 것을 권장합니다. 수고하셨습니다!

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

이 PR은 Redis 구독 메시지를 제대로 인식하지 못하는 문제를 해결하고, Spring 서버와의 호환성을 위해 데이터 구조를 변경하는 등 중요한 수정을 포함하고 있습니다. 전반적으로 코드의 안정성과 호환성을 높이는 좋은 변경이지만, 몇 가지 개선할 점이 보입니다. 특히 Redis 스트림 메시지 타입 불일치와 작업 처리 로직의 버그와 같은 심각한 문제가 있으며, 코드 중복 및 타입 안정성 문제도 발견되었습니다. 아래에 자세한 리뷰를 남겼으니 확인 부탁드립니다.

settings.STREAM_JOB,
{"json": job.model_dump_json()},
{
"type": "image_results",

This comment was marked as resolved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

제대로 저장한 거 맞음

data = json.loads(payload_str)
print(f"Job received id={msg_id} correlationId={correlation_id} payload={data}")

job = data

This comment was marked as resolved.

Comment on lines +8 to +41
"""
Redis Stream에 정의한 유효한 형식 메시지를 위한 전처리 함수
"""
def _to_scalr(v):
# XADD 허용 타입: str, bytes, int, float
if isinstance(v, (str, bytes, int, float)):
return v
# 그 외는 JSON str
return json.dumps(v, ensure_ascii=False)

"""
Decoding
"""
def _decode(b):
if isinstance(b, (bytes, bytearray)):
return b.decode()
else:
return b


def _sanitize_fields_for_xadd(fields: dict) -> dict:
# 정제
cleaned = {}
for k, v in fields.items():
k = _decode(k)
if isinstance(v, (bytes, bytearray)):
try:
v = v.decode()
except Exception:
pass
else:
v = _to_scalr(v)
cleaned[k] = v
return cleaned

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

_to_scalr, _decode, _sanitize_fields_for_xadd 헬퍼 함수들이 app/worker/redis_client.py 파일에도 거의 동일하게 정의되어 있습니다. 코드 중복은 유지보수를 어렵게 만듭니다. 이 함수들을 한 곳(예: app/worker/redis_client.py 또는 새로운 유틸리티 모듈)으로 통합하고, 필요한 곳에서 임포트하여 사용하는 것을 권장합니다.

Comment on lines 31 to 44
print("분석 결과 발행 시작...")
entry_id = await redis_client.xadd(
settings.STREAM_JOB,
{"json": job.model_dump_json()},
{
"type": "image_results",
"payload": payload,
"correlationId": correlationId,
},
maxlen=10_000,
approximate=True,
)

return {"job_id": correlation_id} No newline at end of file
print("분석 결과 발행 완료...")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

애플리케이션 로깅에 print() 함수를 사용하고 있습니다. 프로덕션 환경에서는 구조화된 로깅 라이브러리(예: Python의 logging 모듈)를 사용하는 것이 좋습니다. 이를 통해 로그 레벨(info, debug, error 등)에 따른 필터링, 로그 포맷 지정, 파일 또는 외부 서비스로의 로그 전송 등 더 유연하고 강력한 로깅 기능을 활용할 수 있습니다.

Comment on lines +34 to +37
try:
v = v.decode()
except Exception:
pass

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

바이트 문자열 디코딩 시 발생하는 모든 예외를 except Exception: pass로 무시하고 있습니다. 디코딩에 실패하면 값은 바이트 상태로 유지되는데, 이는 후속 처리에서 예기치 않은 문제를 일으킬 수 있습니다. 최소한 에러를 로깅하여 어떤 값이 디코딩에 실패했는지 추적할 수 있도록 하는 것이 좋습니다. 예를 들어, logging.warning(f"Failed to decode value for key {k}: {e}")와 같이 로그를 남길 수 있습니다.

Comment on lines +93 to +98
task.add_done_callback(lambda t: asyncio.create_task(
self.redis_client.xack_and_del(settings.STREAM_JOB, settings.GROUP_NAME, msg_id)
if not t.exception() else
self.redis_client.xadd(f"{settings.STREAM_JOB}:DLQ",
{"id": msg_id, "error": str(t.exception()), **fields})
))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

task.add_done_callback에 복잡한 람다 함수를 사용하고 있습니다. 코드가 길어지고 가독성이 떨어질 수 있습니다. 파일 뒷부분의 xautoclaim 처리 로직에서 사용된 _on_done 헬퍼 함수 패턴(137-150번째 줄)이 더 명확하고 유지보수하기 좋습니다. 일관성과 가독성을 위해 이 부분도 _on_done과 같은 헬퍼 함수를 정의하여 리팩토링하는 것을 권장합니다.

@youyeon11 youyeon11 merged commit e76098c into develop Sep 7, 2025
@youyeon11 youyeon11 mentioned this pull request Sep 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant