Skip to content

Commit

Permalink
fix: improve task management in _log_transaction_async to avoid datab…
Browse files Browse the repository at this point in the history
…ase locks (#5896)

* refactor: Update _log_transaction_async to be asynchronous and improve task management

- Changed _log_transaction_async method from synchronous to asynchronous to allow proper handling of transaction logging.
- Added error handling for task cancellation and ensured that pending tasks are managed correctly.
- Updated calls to _log_transaction_async in the ComponentVertex class to await the asynchronous method, ensuring proper flow execution and error handling.

* refactor: Simplify _log_transaction_async method by removing redundant error handling

- Streamlined the _log_transaction_async method to enhance readability and maintainability.
- Removed unnecessary try-except blocks for task cancellation, as the async context already handles task management effectively.
- Ensured that completed tasks are awaited and cleared properly, improving overall task management.

* fix: Correctly handle vertex build response in asynchronous flow construction

- Moved the retrieval of the vertex build response to occur after awaiting the build task, ensuring proper handling of task completion.
- Improved error handling by maintaining the cancellation logic while ensuring the response is only accessed after the task is completed.
- This change enhances the reliability of the flow construction process in the chat API.

* fix: Improve task management in Vertex class by refining log transaction handling

- Updated the log transaction handling in the Vertex class to await a single task instead of gathering all tasks, enhancing efficiency.
- Removed the clearing of the task list, ensuring that only the most recent task is processed, which simplifies the task management logic.
- This change aims to improve the reliability and performance of asynchronous logging in the flow execution.

* refactor: Comment out log transaction handling in Vertex class for future review
  • Loading branch information
ogabrielluiz authored Jan 23, 2025
1 parent 5910638 commit 55c9ee2
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/backend/base/langflow/api/v1/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,12 +350,12 @@ async def build_vertices(
build_task = asyncio.create_task(_build_vertex(vertex_id, graph, event_manager))
try:
await build_task
vertex_build_response: VertexBuildResponse = build_task.result()
except asyncio.CancelledError as exc:
logger.exception(exc)
build_task.cancel()
return

vertex_build_response: VertexBuildResponse = build_task.result()
# send built event or error event
try:
vertex_build_response_json = vertex_build_response.model_dump_json()
Expand Down
31 changes: 24 additions & 7 deletions src/backend/base/langflow/graph/vertex/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from langflow.exceptions.component import ComponentBuildError
from langflow.graph.schema import INPUT_COMPONENTS, OUTPUT_COMPONENTS, InterfaceComponentTypes, ResultData
from langflow.graph.utils import UnbuiltObject, UnbuiltResult, log_transaction
from langflow.graph.utils import UnbuiltObject, UnbuiltResult
from langflow.interface import initialize
from langflow.interface.listing import lazy_load_dict
from langflow.schema.artifact import ArtifactType
Expand Down Expand Up @@ -620,12 +620,29 @@ async def get_result(self, requester: Vertex, target_handle_name: str | None = N
async with self._lock:
return await self._get_result(requester, target_handle_name)

def _log_transaction_async(
async def _log_transaction_async(
self, flow_id: str | UUID, source: Vertex, status, target: Vertex | None = None, error=None
) -> None:
task = asyncio.create_task(log_transaction(flow_id, source, status, target, error))
self.log_transaction_tasks.add(task)
task.add_done_callback(self.log_transaction_tasks.discard)
"""Log a transaction asynchronously with proper task handling and cancellation.
Args:
flow_id: The ID of the flow
source: Source vertex
status: Transaction status
target: Optional target vertex
error: Optional error information
"""
# Commenting this out for now
# async with self._lock:
# if self.log_transaction_tasks:
# # Safely await and remove completed tasks
# task = self.log_transaction_tasks.pop()
# await task

# # Create and track new task
# task = asyncio.create_task(log_transaction(flow_id, source, status, target, error))
# self.log_transaction_tasks.add(task)
# task.add_done_callback(self.log_transaction_tasks.discard)

async def _get_result(
self,
Expand All @@ -642,13 +659,13 @@ async def _get_result(
flow_id = self.graph.flow_id
if not self.built:
if flow_id:
self._log_transaction_async(str(flow_id), source=self, target=requester, status="error")
await self._log_transaction_async(str(flow_id), source=self, target=requester, status="error")
msg = f"Component {self.display_name} has not been built yet"
raise ValueError(msg)

result = self.built_result if self.use_result else self.built_object
if flow_id:
self._log_transaction_async(str(flow_id), source=self, target=requester, status="success")
await self._log_transaction_async(str(flow_id), source=self, target=requester, status="success")
return result

async def _build_vertex_and_update_params(self, key, vertex: Vertex) -> None:
Expand Down
4 changes: 2 additions & 2 deletions src/backend/base/langflow/graph/vertex/vertex_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async def _get_result(self, requester: Vertex, target_handle_name: str | None =
default_value = requester.get_value_from_template_dict(edge.target_param)

if flow_id:
self._log_transaction_async(source=self, target=requester, flow_id=str(flow_id), status="error")
await self._log_transaction_async(source=self, target=requester, flow_id=str(flow_id), status="error")
if default_value is not UNDEFINED:
return default_value
msg = f"Component {self.display_name} has not been built yet"
Expand Down Expand Up @@ -146,7 +146,7 @@ async def _get_result(self, requester: Vertex, target_handle_name: str | None =
msg = f"Result not found for {edge.source_handle.name} in {edge}"
raise ValueError(msg)
if flow_id:
self._log_transaction_async(source=self, target=requester, flow_id=str(flow_id), status="success")
await self._log_transaction_async(source=self, target=requester, flow_id=str(flow_id), status="success")
return result

def extract_messages_from_artifacts(self, artifacts: dict[str, Any]) -> list[dict]:
Expand Down

0 comments on commit 55c9ee2

Please sign in to comment.