-
Notifications
You must be signed in to change notification settings - Fork 49
feat: Implement text-only agent with pipeline management and Redis ca… #295
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: release
Are you sure you want to change the base?
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughAdds a new text-only automatic agent with API endpoints, pipeline orchestration, processors, and response collection. Introduces Redis-backed caching with a fallback in-memory client, session lifecycle management, and tool discovery. Integrates router and startup/shutdown hooks in main app. Adds request schema and redis dependency. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor U as User
participant R as FastAPI Router<br>/agent/text/automatic
participant PM as TextPipelineManager
participant CM as PipelineCacheManager
participant PR as PipelineRunner
participant L as LLM Service
participant TP as TextCaptureProcessor
participant RC as ResponseCollector
U->>R: POST /chat/{session_id} (message)
R->>PM: process_message(session_id, message, config)
PM->>CM: get_cached_pipeline(session_id)?
alt cache miss
PM->>CM: load_conversation_history(session_id)
PM->>PR: create runner + assemble pipeline
PM->>TP: attach processor (session_id, RC)
PM->>CM: cache_pipeline(session_id, task, RC)
end
PM->>PR: queue InputText + LLMRun frames
PR->>L: stream tokens
L-->>TP: LLMTextFrame (chunks)
TP->>RC: append chunk
loop until end
L-->>TP: more chunks
TP->>RC: append chunk
end
L-->>TP: LLMFullResponseEndFrame
TP->>CM: update_conversation_history(session_id)
TP->>RC: set complete_response + signal done
PM-->>R: async generator (chunks then final)
R-->>U: StreamingResponse (text/plain)
sequenceDiagram
autonumber
actor U as Client
participant R as Router
participant UTL as Utils.get_available_tools
participant VT as Voice Tools.initialize_tools
U->>R: GET /tools
R->>UTL: get_available_tools(mode, shop_id, user_email)
UTL->>VT: initialize_tools(session_id="debug", ...)
VT-->>UTL: tool functions + schema
UTL-->>R: {tools_count, tool_names, details}
R-->>U: JSON response
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (3)
app/agents/text/automatic/types/response.py (1)
25-30: Consider thread-safety of reset().The
reset()method clears state and resets thecomplete_event, but if another coroutine is currently awaitingcomplete_event.wait(), callingreset()could lead to unexpected behavior. Based on the usage pattern (pipeline per session with sequential message processing), this is likely safe, but consider adding a docstring note about when it's safe to callreset().Example addition:
def reset(self): - """Reset the collector for a new response.""" + """Reset the collector for a new response. + + Note: Should only be called when no other coroutines are waiting on complete_event. + Typically called between sequential requests in the same session. + """ self.text_chunks.clear() self.complete_response = "" self.is_complete = False self.complete_event.clear()app/core/cache/__init__.py (1)
10-13: Sort__all__alphabetically.The static analysis tool suggests sorting
__all__entries alphabetically for consistency with project style.As per static analysis hints.
Apply this diff:
__all__ = [ - "get_redis_client", "PipelineCacheManager", + "get_redis_client", ]app/agents/text/automatic/utils/tools.py (1)
11-19: Use explicitOptional[str]annotationsWith
Nonedefaults, annotate these parameters asOptional[str](orstr | None) to satisfy PEP 484 and Ruff’s guidance, and to signal optionality clearly.-from typing import Dict, Any +from typing import Any, Dict, Optional @@ -def get_available_tools(mode: str = "TEST", shop_id: str = None, user_email: str = None) -> Dict[str, Any]: +def get_available_tools( + mode: str = "TEST", + shop_id: Optional[str] = None, + user_email: Optional[str] = None, +) -> Dict[str, Any]:
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (17)
app/agents/text/automatic/__init__.py(1 hunks)app/agents/text/automatic/features/__init__.py(1 hunks)app/agents/text/automatic/features/pipeline_manager.py(1 hunks)app/agents/text/automatic/processors/__init__.py(1 hunks)app/agents/text/automatic/processors/text_capture.py(1 hunks)app/agents/text/automatic/types/__init__.py(1 hunks)app/agents/text/automatic/types/response.py(1 hunks)app/agents/text/automatic/utils/__init__.py(1 hunks)app/agents/text/automatic/utils/tools.py(1 hunks)app/api/routers/text_automatic.py(1 hunks)app/core/cache/__init__.py(1 hunks)app/core/cache/pipeline_cache.py(1 hunks)app/core/cache/redis_client.py(1 hunks)app/core/config.py(1 hunks)app/main.py(4 hunks)app/schemas.py(1 hunks)requirements.txt(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (13)
app/agents/text/automatic/types/__init__.py (1)
app/agents/text/automatic/types/response.py (1)
ResponseCollector(11-30)
app/agents/text/automatic/features/__init__.py (1)
app/agents/text/automatic/features/pipeline_manager.py (1)
TextPipelineManager(32-219)
app/agents/text/automatic/utils/__init__.py (1)
app/agents/text/automatic/utils/tools.py (1)
get_available_tools(11-28)
app/core/cache/pipeline_cache.py (2)
app/core/cache/redis_client.py (3)
get_redis_client(109-111)get(39-49)get(91-92)app/agents/text/automatic/features/pipeline_manager.py (3)
get_cache_stats(212-214)cleanup_session(208-210)shutdown(216-219)
app/core/config.py (1)
app/core/cache/redis_client.py (2)
get(39-49)get(91-92)
app/agents/text/automatic/__init__.py (2)
app/agents/text/automatic/features/pipeline_manager.py (1)
TextPipelineManager(32-219)app/agents/voice/automatic/tools/__init__.py (1)
initialize_tools(19-135)
app/api/routers/text_automatic.py (4)
app/agents/text/automatic/utils/tools.py (1)
get_available_tools(11-28)app/schemas.py (1)
AutomaticTextUserConnectRequest(129-142)app/agents/text/automatic/features/pipeline_manager.py (4)
response_generator(169-199)process_message(152-202)get_cache_stats(212-214)get_active_sessions(204-206)app/core/cache/pipeline_cache.py (1)
get_cache_stats(133-148)
app/agents/text/automatic/processors/text_capture.py (1)
app/core/cache/pipeline_cache.py (2)
load_conversation_history(92-107)update_conversation_history(109-112)
app/agents/text/automatic/utils/tools.py (1)
app/agents/voice/automatic/tools/__init__.py (1)
initialize_tools(19-135)
app/agents/text/automatic/processors/__init__.py (1)
app/agents/text/automatic/processors/text_capture.py (1)
TextCaptureProcessor(20-83)
app/agents/text/automatic/features/pipeline_manager.py (6)
app/agents/voice/automatic/features/llm_wrapper.py (2)
LLMServiceWrapper(14-142)create_summarizing_context(125-139)app/agents/voice/automatic/tools/__init__.py (1)
initialize_tools(19-135)app/agents/voice/automatic/prompts/__init__.py (1)
get_system_prompt(26-59)app/core/cache/pipeline_cache.py (9)
PipelineCacheManager(14-167)start_cleanup_task(30-34)get_cached_pipeline(77-85)load_conversation_history(92-107)cache_pipeline(87-90)update_conversation_history(109-112)get_cache_stats(133-148)cleanup_session(150-152)shutdown(154-167)app/agents/text/automatic/processors/text_capture.py (1)
TextCaptureProcessor(20-83)app/agents/text/automatic/types/response.py (1)
ResponseCollector(11-30)
app/core/cache/__init__.py (2)
app/core/cache/redis_client.py (1)
get_redis_client(109-111)app/core/cache/pipeline_cache.py (1)
PipelineCacheManager(14-167)
app/main.py (2)
app/agents/text/automatic/features/pipeline_manager.py (2)
startup(40-45)shutdown(216-219)app/core/cache/pipeline_cache.py (1)
shutdown(154-167)
🪛 Ruff (0.13.3)
app/core/cache/pipeline_cache.py
45-45: Do not catch blind exception: Exception
(BLE001)
53-53: Loop control variable task not used within loop body
Rename unused task to _task
(B007)
53-53: Loop control variable response_collector not used within loop body
Rename unused response_collector to _response_collector
(B007)
67-67: Unpacked variable task is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
67-67: Unpacked variable response_collector is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
120-120: Do not catch blind exception: Exception
(BLE001)
128-128: Consider moving this statement to an else block
(TRY300)
129-129: Do not catch blind exception: Exception
(BLE001)
app/agents/text/automatic/__init__.py
17-21: __all__ is not sorted
Apply an isort-style sorting to __all__
(RUF022)
app/api/routers/text_automatic.py
45-45: f-string without any placeholders
Remove extraneous f prefix
(F541)
49-49: Do not catch blind exception: Exception
(BLE001)
67-67: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
68-68: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
74-74: Do not catch blind exception: Exception
(BLE001)
app/agents/text/automatic/utils/tools.py
11-11: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
11-11: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
27-27: Do not catch blind exception: Exception
(BLE001)
app/agents/text/automatic/features/pipeline_manager.py
53-53: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
84-84: Consider iterable unpacking instead of concatenation
Replace with iterable unpacking
(RUF005)
145-145: Store a reference to the return value of asyncio.create_task
(RUF006)
152-152: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
193-193: Do not catch blind exception: Exception
(BLE001)
197-197: Use explicit conversion flag
Replace with conversion flag
(RUF010)
app/core/cache/__init__.py
10-13: __all__ is not sorted
Apply an isort-style sorting to __all__
(RUF022)
app/core/cache/redis_client.py
33-33: Do not catch blind exception: Exception
(BLE001)
47-47: Do not catch blind exception: Exception
(BLE001)
58-58: Do not catch blind exception: Exception
(BLE001)
68-68: Do not catch blind exception: Exception
(BLE001)
77-77: Do not catch blind exception: Exception
(BLE001)
94-94: Unused method argument: ex
(ARG002)
app/main.py
124-124: Do not catch blind exception: Exception
(BLE001)
🔇 Additional comments (7)
app/agents/text/automatic/utils/__init__.py (1)
1-11: LGTM!Clean package initializer that properly exposes the tools discovery function. The module docstring and
__all__declaration follow Python best practices.app/agents/text/automatic/processors/__init__.py (1)
1-11: LGTM!Clean package initializer that properly exposes the frame processor. The module docstring and
__all__declaration follow Python best practices.app/agents/text/automatic/types/response.py (1)
19-23: LGTM!Clean initialization of the response collector with appropriate default values and an asyncio.Event for coordination.
requirements.txt (1)
46-47: Verify Python version compatibility.The
redis>=5.0.0dependency is appropriate for Redis-backed caching. However, note that redis-py 5.0.x is the last version supporting Python 3.7, while 5.1+ requires Python 3.8+. Ensure your project's Python version requirement is explicitly documented (e.g., in setup.py, pyproject.toml, or README) to avoid compatibility issues.Based on learnings about redis-py version requirements.
app/agents/text/automatic/types/__init__.py (1)
1-11: LGTM!Clean package initializer that properly exposes the ResponseCollector type. The module docstring and
__all__declaration follow Python best practices.app/agents/text/automatic/features/__init__.py (1)
1-11: LGTM!Clean package initializer that properly exposes the TextPipelineManager. The module docstring and
__all__declaration follow Python best practices.app/core/config.py (1)
131-132: LGTM!The Redis URL configuration follows the existing pattern and provides a reasonable default for local development. Ensure production deployments override this via the
REDIS_URLenvironment variable to point to your production Redis instance.
| # Get pipeline with response collector | ||
| logger.info(f"Getting pipeline for session {session_id}") | ||
| task, response_collector = await self.get_or_create_pipeline(session_id, config) | ||
| logger.info(f"Got pipeline task: {task}") | ||
|
|
||
| async def response_generator(): | ||
| try: | ||
| logger.info(f"Starting response generator for session {session_id}") | ||
| logger.info(f"About to queue frames: InputTextRawFrame('{message}') and LLMRunFrame()") | ||
|
|
||
| # Create frames | ||
| input_frame = InputTextRawFrame(message) | ||
| llm_frame = LLMRunFrame() | ||
| logger.info(f"Created frames: {input_frame}, {llm_frame}") | ||
|
|
||
| # Queue frames | ||
| logger.info("About to call task.queue_frames...") | ||
| await task.queue_frames([input_frame, llm_frame]) | ||
| logger.info("Frames queued successfully, waiting for responses...") | ||
|
|
||
| # Wait for the response to complete | ||
| await response_collector.complete_event.wait() | ||
| logger.info("Response collection completed") | ||
|
|
||
| # Yield the complete response as a single chunk (for now) | ||
| if response_collector.complete_response: | ||
| logger.info(f"Yielding complete response: '{response_collector.complete_response}'") | ||
| yield response_collector.complete_response | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reset the ResponseCollector before reusing it.
When a session reuses a cached pipeline, response_collector.complete_event is still set and text_chunks/complete_response still hold the previous turn. The next process_message therefore returns immediately with stale data. Call response_collector.reset() before queuing frames (after fetching the pipeline) so each turn starts with a clean collector.
Suggested patch:
logger.info(f"Getting pipeline for session {session_id}")
task, response_collector = await self.get_or_create_pipeline(session_id, config)
logger.info(f"Got pipeline task: {task}")
+ response_collector.reset()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Get pipeline with response collector | |
| logger.info(f"Getting pipeline for session {session_id}") | |
| task, response_collector = await self.get_or_create_pipeline(session_id, config) | |
| logger.info(f"Got pipeline task: {task}") | |
| async def response_generator(): | |
| try: | |
| logger.info(f"Starting response generator for session {session_id}") | |
| logger.info(f"About to queue frames: InputTextRawFrame('{message}') and LLMRunFrame()") | |
| # Create frames | |
| input_frame = InputTextRawFrame(message) | |
| llm_frame = LLMRunFrame() | |
| logger.info(f"Created frames: {input_frame}, {llm_frame}") | |
| # Queue frames | |
| logger.info("About to call task.queue_frames...") | |
| await task.queue_frames([input_frame, llm_frame]) | |
| logger.info("Frames queued successfully, waiting for responses...") | |
| # Wait for the response to complete | |
| await response_collector.complete_event.wait() | |
| logger.info("Response collection completed") | |
| # Yield the complete response as a single chunk (for now) | |
| if response_collector.complete_response: | |
| logger.info(f"Yielding complete response: '{response_collector.complete_response}'") | |
| yield response_collector.complete_response | |
| # Get pipeline with response collector | |
| logger.info(f"Getting pipeline for session {session_id}") | |
| task, response_collector = await self.get_or_create_pipeline(session_id, config) | |
| logger.info(f"Got pipeline task: {task}") | |
| response_collector.reset() | |
| async def response_generator(): | |
| try: | |
| logger.info(f"Starting response generator for session {session_id}") | |
| logger.info(f"About to queue frames: InputTextRawFrame('{message}') and LLMRunFrame()") | |
| # ... |
🤖 Prompt for AI Agents
In app/agents/text/automatic/features/pipeline_manager.py around lines 164 to
192, the ResponseCollector from a cached pipeline still contains previous turn
state (complete_event set and old text_chunks/complete_response) so the next
process_message returns stale data; after obtaining the pipeline (right after
task, response_collector = await self.get_or_create_pipeline(...)) call
response_collector.reset() to clear complete_event and any buffered text before
creating/queueing new frames, then proceed to create
InputTextRawFrame/LLMRunFrame and await task.queue_frames as before.
| tools_schema, tool_functions = initialize_tools( | ||
| mode=mode, | ||
| shop_id=shop_id, | ||
| user_email=user_email, | ||
| session_id="debug" | ||
| ) | ||
|
|
||
| return { | ||
| "tools_count": len(tools_schema.standard_tools), | ||
| "tool_names": list(tool_functions.keys()), | ||
| "tools": [{"name": tool.name, "description": tool.description} | ||
| for tool in tools_schema.standard_tools] | ||
| } | ||
| except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Surface tool discovery failures to the caller
By catching Exception and returning {"error": ...}, the /tools endpoint ends up responding with HTTP 200 even when initialization fails, because the router just relays this dict. That masks real errors from clients and from monitoring. Let the exception propagate (or re-raise after logging) so the router’s except block can emit a proper 5xx.
- try:
- tools_schema, tool_functions = initialize_tools(
- mode=mode,
- shop_id=shop_id,
- user_email=user_email,
- session_id="debug"
- )
-
- return {
- "tools_count": len(tools_schema.standard_tools),
- "tool_names": list(tool_functions.keys()),
- "tools": [{"name": tool.name, "description": tool.description}
- for tool in tools_schema.standard_tools]
- }
- except Exception as e:
- return {"error": str(e)}
+ tools_schema, tool_functions = initialize_tools(
+ mode=mode,
+ shop_id=shop_id,
+ user_email=user_email,
+ session_id="debug",
+ )
+
+ return {
+ "tools_count": len(tools_schema.standard_tools),
+ "tool_names": list(tool_functions.keys()),
+ "tools": [
+ {"name": tool.name, "description": tool.description}
+ for tool in tools_schema.standard_tools
+ ],
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| tools_schema, tool_functions = initialize_tools( | |
| mode=mode, | |
| shop_id=shop_id, | |
| user_email=user_email, | |
| session_id="debug" | |
| ) | |
| return { | |
| "tools_count": len(tools_schema.standard_tools), | |
| "tool_names": list(tool_functions.keys()), | |
| "tools": [{"name": tool.name, "description": tool.description} | |
| for tool in tools_schema.standard_tools] | |
| } | |
| except Exception as e: | |
| tools_schema, tool_functions = initialize_tools( | |
| mode=mode, | |
| shop_id=shop_id, | |
| user_email=user_email, | |
| session_id="debug", | |
| ) | |
| return { | |
| "tools_count": len(tools_schema.standard_tools), | |
| "tool_names": list(tool_functions.keys()), | |
| "tools": [ | |
| {"name": tool.name, "description": tool.description} | |
| for tool in tools_schema.standard_tools | |
| ], | |
| } |
🧰 Tools
🪛 Ruff (0.13.3)
27-27: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
In app/agents/text/automatic/utils/tools.py around lines 14 to 27, the current
try/except swallows initialization errors and returns an {"error": ...} dict
(causing a 200 response); remove the broad except block (or log the error and
re-raise it) so exceptions from initialize_tools propagate to the router and
result in a proper 5xx response; ensure any necessary logging is done before
re-raising and avoid returning error payloads from this function.
app/api/routers/text_automatic.py
Outdated
| config = { | ||
| "mode": request.mode or "TEST", # Default to TEST mode | ||
| "user_name": request.userName, | ||
| "user_email": request.email, | ||
| "euler_token": request.eulerToken, | ||
| "breeze_token": request.breezeToken, | ||
| "shop_url": request.shopUrl, | ||
| "shop_id": request.shopId, | ||
| "shop_type": request.shopType, | ||
| "merchant_id": request.merchantId, | ||
| "platform_integrations": request.platformIntegrations, | ||
| "reseller_id": request.resellerId, | ||
| } | ||
|
|
||
| logger.info(f"Calling pipeline_manager.process_message for session {session_id}") | ||
| # Process message through pipeline manager | ||
| response_generator = await pipeline_manager.process_message(session_id, request.message, config) | ||
| logger.info(f"Got response_generator, returning StreamingResponse") | ||
|
|
||
| return StreamingResponse(response_generator, media_type="text/plain") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normalize mode before passing to the pipeline
Clients often send "test"/"live" in lowercase. Because we forward the raw string, initialize_tools never matches Mode.TEST.value, so dummy test tools are skipped and the behavior flips to “live” unintentionally. Mirror the voice agent’s handling by uppercasing (or otherwise normalizing) the mode before it reaches the pipeline.
- config = {
- "mode": request.mode or "TEST", # Default to TEST mode
+ normalized_mode = (request.mode or "TEST").upper()
+ config = {
+ "mode": normalized_mode, # Default to TEST mode
"user_name": request.userName,
"user_email": request.email,
"euler_token": request.eulerToken,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| config = { | |
| "mode": request.mode or "TEST", # Default to TEST mode | |
| "user_name": request.userName, | |
| "user_email": request.email, | |
| "euler_token": request.eulerToken, | |
| "breeze_token": request.breezeToken, | |
| "shop_url": request.shopUrl, | |
| "shop_id": request.shopId, | |
| "shop_type": request.shopType, | |
| "merchant_id": request.merchantId, | |
| "platform_integrations": request.platformIntegrations, | |
| "reseller_id": request.resellerId, | |
| } | |
| logger.info(f"Calling pipeline_manager.process_message for session {session_id}") | |
| # Process message through pipeline manager | |
| response_generator = await pipeline_manager.process_message(session_id, request.message, config) | |
| logger.info(f"Got response_generator, returning StreamingResponse") | |
| return StreamingResponse(response_generator, media_type="text/plain") | |
| normalized_mode = (request.mode or "TEST").upper() | |
| config = { | |
| "mode": normalized_mode, # Default to TEST mode | |
| "user_name": request.userName, | |
| "user_email": request.email, | |
| "euler_token": request.eulerToken, | |
| "breeze_token": request.breezeToken, | |
| "shop_url": request.shopUrl, | |
| "shop_id": request.shopId, | |
| "shop_type": request.shopType, | |
| "merchant_id": request.merchantId, | |
| "platform_integrations": request.platformIntegrations, | |
| "reseller_id": request.resellerId, | |
| } | |
| logger.info(f"Calling pipeline_manager.process_message for session {session_id}") | |
| # Process message through pipeline manager | |
| response_generator = await pipeline_manager.process_message(session_id, request.message, config) | |
| logger.info(f"Got response_generator, returning StreamingResponse") | |
| return StreamingResponse(response_generator, media_type="text/plain") |
🧰 Tools
🪛 Ruff (0.13.3)
45-45: f-string without any placeholders
Remove extraneous f prefix
(F541)
🤖 Prompt for AI Agents
In app/api/routers/text_automatic.py around lines 28 to 47, the incoming
request.mode is forwarded as-is which allows lowercase values like "test"/"live"
to bypass Mode checks; normalize mode before adding to config by converting
request.mode to uppercase (e.g., mode = (request.mode or "TEST").upper()) and
optionally validate/mapping it to allowed values ("TEST","LIVE") before
assigning to config["mode"], then pass the normalized/validated mode into the
config so initialize_tools/pipeline_manager sees the expected enum string.
| def get_client(self) -> redis.Redis: | ||
| """Get or create Redis client.""" | ||
| if self._client is None: | ||
| try: | ||
| self._client = redis.from_url( | ||
| REDIS_URL, | ||
| decode_responses=True, | ||
| socket_connect_timeout=5, | ||
| socket_timeout=5 | ||
| ) | ||
| # Test connection | ||
| self._client.ping() | ||
| logger.info("Redis connection established") | ||
| except Exception as e: | ||
| logger.warning(f"Redis connection failed: {e}, using memory fallback") | ||
| # Fallback to fake Redis for development | ||
| self._client = FakeRedis() | ||
| return self._client | ||
|
|
||
| async def get(self, key: str) -> Optional[Any]: | ||
| """Get and deserialize value from Redis.""" | ||
| try: | ||
| client = self.get_client() | ||
| value = client.get(key) | ||
| if value is None: | ||
| return None | ||
| return json.loads(value) | ||
| except Exception as e: | ||
| logger.error(f"Redis get error for key {key}: {e}") | ||
| return None | ||
|
|
||
| async def set(self, key: str, value: Any, ex: Optional[int] = None) -> bool: | ||
| """Serialize and set value in Redis.""" | ||
| try: | ||
| client = self.get_client() | ||
| serialized = json.dumps(value, default=str) | ||
| result = client.set(key, serialized, ex=ex) | ||
| return bool(result) | ||
| except Exception as e: | ||
| logger.error(f"Redis set error for key {key}: {e}") | ||
| return False | ||
|
|
||
| async def delete(self, key: str) -> bool: | ||
| """Delete key from Redis.""" | ||
| try: | ||
| client = self.get_client() | ||
| result = client.delete(key) | ||
| return bool(result) | ||
| except Exception as e: | ||
| logger.error(f"Redis delete error for key {key}: {e}") | ||
| return False | ||
|
|
||
| async def exists(self, key: str) -> bool: | ||
| """Check if key exists in Redis.""" | ||
| try: | ||
| client = self.get_client() | ||
| return bool(client.exists(key)) | ||
| except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the asyncio Redis client to avoid blocking the event loop.
get_client() builds a synchronous redis.Redis. Every asynchronous helper (get/set/delete/exists) then runs network IO on the event loop thread. Under load or on slow Redis, this blocks all other requests and defeats the async pipeline. Please switch to redis.asyncio.Redis (or offload the sync client to a worker thread) and await its operations, while keeping FakeRedis compatible for the fallback.
Suggested fix:
-import redis
+import redis.asyncio as redis
@@
- def get_client(self) -> redis.Redis:
+ async def get_client(self) -> redis.Redis:
@@
- self._client = redis.from_url(
+ self._client = redis.from_url(
REDIS_URL,
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5
)
# Test connection
- self._client.ping()
+ await self._client.ping()
@@
- client = self.get_client()
- value = client.get(key)
+ client = await self.get_client()
+ value = await client.get(key)(Apply the same pattern for set/delete/exists, and branch to the synchronous FakeRedis methods when the fallback is active.)
Committable suggestion skipped: line range outside the PR's diff.
🧰 Tools
🪛 Ruff (0.13.3)
33-33: Do not catch blind exception: Exception
(BLE001)
47-47: Do not catch blind exception: Exception
(BLE001)
58-58: Do not catch blind exception: Exception
(BLE001)
68-68: Do not catch blind exception: Exception
(BLE001)
77-77: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
In app/core/cache/redis_client.py around lines 20-77, the code creates a
synchronous redis.Redis client and calls blocking IO from async helpers; change
to use redis.asyncio.Redis and await its coroutine methods, or when falling back
to FakeRedis (synchronous) run its methods in a thread executor. Update
get_client to construct and async-ping an instance of redis.asyncio.Redis (or
return the existing FakeRedis), make get/set/delete/exists await the async
client calls (await client.get/ set/ delete/ exists) and for the FakeRedis
branch call the sync methods via asyncio.to_thread (or loop.run_in_executor) so
the event loop is not blocked; ensure json serialization/deserialization and
return types remain the same and keep the fallback detection so both client
types work.
b60f62b to
818deb5
Compare
818deb5 to
d9acd02
Compare
Summary by CodeRabbit
New Features
Performance
Configuration
Chores