From 55c9ee2eb227c006a62be052b3239934e9c1a760 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 23 Jan 2025 20:03:10 -0300 Subject: [PATCH] fix: improve task management in _log_transaction_async to avoid database locks (#5896) * refactor: Update _log_transaction_async to be asynchronous and improve task management - Changed _log_transaction_async method from synchronous to asynchronous to allow proper handling of transaction logging. - Added error handling for task cancellation and ensured that pending tasks are managed correctly. - Updated calls to _log_transaction_async in the ComponentVertex class to await the asynchronous method, ensuring proper flow execution and error handling. * refactor: Simplify _log_transaction_async method by removing redundant error handling - Streamlined the _log_transaction_async method to enhance readability and maintainability. - Removed unnecessary try-except blocks for task cancellation, as the async context already handles task management effectively. - Ensured that completed tasks are awaited and cleared properly, improving overall task management. * fix: Correctly handle vertex build response in asynchronous flow construction - Moved the retrieval of the vertex build response to occur after awaiting the build task, ensuring proper handling of task completion. - Improved error handling by maintaining the cancellation logic while ensuring the response is only accessed after the task is completed. - This change enhances the reliability of the flow construction process in the chat API. * fix: Improve task management in Vertex class by refining log transaction handling - Updated the log transaction handling in the Vertex class to await a single task instead of gathering all tasks, enhancing efficiency. - Removed the clearing of the task list, ensuring that only the most recent task is processed, which simplifies the task management logic. - This change aims to improve the reliability and performance of asynchronous logging in the flow execution. * refactor: Comment out log transaction handling in Vertex class for future review --- src/backend/base/langflow/api/v1/chat.py | 2 +- .../base/langflow/graph/vertex/base.py | 31 ++++++++++++++----- .../langflow/graph/vertex/vertex_types.py | 4 +-- 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index 00204fcaf5d6..09c985c3a518 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -350,12 +350,12 @@ async def build_vertices( build_task = asyncio.create_task(_build_vertex(vertex_id, graph, event_manager)) try: await build_task + vertex_build_response: VertexBuildResponse = build_task.result() except asyncio.CancelledError as exc: logger.exception(exc) build_task.cancel() return - vertex_build_response: VertexBuildResponse = build_task.result() # send built event or error event try: vertex_build_response_json = vertex_build_response.model_dump_json() diff --git a/src/backend/base/langflow/graph/vertex/base.py b/src/backend/base/langflow/graph/vertex/base.py index f659582853dd..08e8f441e9f2 100644 --- a/src/backend/base/langflow/graph/vertex/base.py +++ b/src/backend/base/langflow/graph/vertex/base.py @@ -15,7 +15,7 @@ from langflow.exceptions.component import ComponentBuildError from langflow.graph.schema import INPUT_COMPONENTS, OUTPUT_COMPONENTS, InterfaceComponentTypes, ResultData -from langflow.graph.utils import UnbuiltObject, UnbuiltResult, log_transaction +from langflow.graph.utils import UnbuiltObject, UnbuiltResult from langflow.interface import initialize from langflow.interface.listing import lazy_load_dict from langflow.schema.artifact import ArtifactType @@ -620,12 +620,29 @@ async def get_result(self, requester: Vertex, target_handle_name: str | None = N async with self._lock: return await self._get_result(requester, target_handle_name) - def _log_transaction_async( + async def _log_transaction_async( self, flow_id: str | UUID, source: Vertex, status, target: Vertex | None = None, error=None ) -> None: - task = asyncio.create_task(log_transaction(flow_id, source, status, target, error)) - self.log_transaction_tasks.add(task) - task.add_done_callback(self.log_transaction_tasks.discard) + """Log a transaction asynchronously with proper task handling and cancellation. + + Args: + flow_id: The ID of the flow + source: Source vertex + status: Transaction status + target: Optional target vertex + error: Optional error information + """ + # Commenting this out for now + # async with self._lock: + # if self.log_transaction_tasks: + # # Safely await and remove completed tasks + # task = self.log_transaction_tasks.pop() + # await task + + # # Create and track new task + # task = asyncio.create_task(log_transaction(flow_id, source, status, target, error)) + # self.log_transaction_tasks.add(task) + # task.add_done_callback(self.log_transaction_tasks.discard) async def _get_result( self, @@ -642,13 +659,13 @@ async def _get_result( flow_id = self.graph.flow_id if not self.built: if flow_id: - self._log_transaction_async(str(flow_id), source=self, target=requester, status="error") + await self._log_transaction_async(str(flow_id), source=self, target=requester, status="error") msg = f"Component {self.display_name} has not been built yet" raise ValueError(msg) result = self.built_result if self.use_result else self.built_object if flow_id: - self._log_transaction_async(str(flow_id), source=self, target=requester, status="success") + await self._log_transaction_async(str(flow_id), source=self, target=requester, status="success") return result async def _build_vertex_and_update_params(self, key, vertex: Vertex) -> None: diff --git a/src/backend/base/langflow/graph/vertex/vertex_types.py b/src/backend/base/langflow/graph/vertex/vertex_types.py index 038daea23e40..45412da0b4ce 100644 --- a/src/backend/base/langflow/graph/vertex/vertex_types.py +++ b/src/backend/base/langflow/graph/vertex/vertex_types.py @@ -107,7 +107,7 @@ async def _get_result(self, requester: Vertex, target_handle_name: str | None = default_value = requester.get_value_from_template_dict(edge.target_param) if flow_id: - self._log_transaction_async(source=self, target=requester, flow_id=str(flow_id), status="error") + await self._log_transaction_async(source=self, target=requester, flow_id=str(flow_id), status="error") if default_value is not UNDEFINED: return default_value msg = f"Component {self.display_name} has not been built yet" @@ -146,7 +146,7 @@ async def _get_result(self, requester: Vertex, target_handle_name: str | None = msg = f"Result not found for {edge.source_handle.name} in {edge}" raise ValueError(msg) if flow_id: - self._log_transaction_async(source=self, target=requester, flow_id=str(flow_id), status="success") + await self._log_transaction_async(source=self, target=requester, flow_id=str(flow_id), status="success") return result def extract_messages_from_artifacts(self, artifacts: dict[str, Any]) -> list[dict]: