-
Notifications
You must be signed in to change notification settings - Fork 5.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add functionality to stream or poll events from the build process #5940
base: main
Are you sure you want to change the base?
Conversation
8d930e3
to
3b9cc03
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 6 out of 16 changed files in this pull request and generated 1 comment.
Files not reviewed (10)
- src/backend/base/langflow/events/event_manager.py: Evaluated as low risk
- src/backend/base/langflow/services/schema.py: Evaluated as low risk
- src/backend/base/langflow/services/task/service.py: Evaluated as low risk
- src/backend/base/langflow/main.py: Evaluated as low risk
- src/backend/base/langflow/graph/utils.py: Evaluated as low risk
- src/backend/base/langflow/services/database/models/transactions/crud.py: Evaluated as low risk
- src/backend/base/langflow/api/v1/chat.py: Evaluated as low risk
- src/backend/tests/unit/components/logic/test_loop.py: Evaluated as low risk
- src/backend/base/langflow/services/deps.py: Evaluated as low risk
- src/backend/tests/unit/test_chat_endpoint.py: Evaluated as low risk
Comments suppressed due to low confidence (2)
src/backend/base/langflow/services/queue/service.py:37
- The
create_queue
method should validate that thejob_id
is a valid UUID. Add a validation check for thejob_id
to ensure it is a valid UUID.
def create_queue(self, job_id: str) -> tuple[asyncio.Queue, EventManager]:
src/frontend/src/utils/buildUtils.ts:2
- The variable 'runId' is used but not defined within the provided code snippet. Ensure 'runId' is defined elsewhere in the code to avoid runtime errors.
if (onBuildUpdate) onBuildUpdate(data, status, runId);
Co-authored-by: Christophe Bornet <cbornet@hotmail.com>
…ce initialization
…d logging (#6312) * feat: implement LimitedBackgroundTasks for controlled vertex build logging * refactor: replace BackgroundTasks with LimitedBackgroundTasks in build_flow endpoint
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 35 out of 49 changed files in this pull request and generated no comments.
Files not reviewed (14)
- src/backend/base/langflow/events/event_manager.py: Evaluated as low risk
- src/backend/base/langflow/services/schema.py: Evaluated as low risk
- src/backend/base/langflow/api/v1/chat.py: Evaluated as low risk
- src/backend/base/langflow/services/database/models/vertex_builds/crud.py: Evaluated as low risk
- .github/changes-filter.yaml: Evaluated as low risk
- src/backend/base/langflow/main.py: Evaluated as low risk
- src/backend/base/langflow/services/settings/base.py: Evaluated as low risk
- src/backend/base/langflow/api/v1/monitor.py: Evaluated as low risk
- src/backend/base/langflow/services/database/models/transactions/crud.py: Evaluated as low risk
- src/backend/base/langflow/graph/utils.py: Evaluated as low risk
- src/backend/base/langflow/api/v1/schemas.py: Evaluated as low risk
- src/backend/base/langflow/base/agents/agent.py: Evaluated as low risk
- src/backend/base/langflow/services/deps.py: Evaluated as low risk
- src/backend/base/langflow/api/v1/endpoints.py: Evaluated as low risk
Comments suppressed due to low confidence (3)
src/backend/base/langflow/services/job_queue/service.py:116
- The error message should include the job_id for better clarity. Suggestion: f"Queue for job_id {job_id} already exists".
msg = f"Queue for job_id {job_id} already exists"
src/backend/base/langflow/services/job_queue/service.py:152
- The error message is repeated. Ensure consistency in error handling.
msg = "Queue service is closed"
src/backend/base/langflow/services/job_queue/service.py:212
- Handle exceptions during task cancellation more gracefully. Log the error and continue with the cleanup.
await asyncio.wait([task])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be nice to have tests for both modes in frontend
If more than max_vertex_builds_per_vertex tasks are added for a given vertex_id, | ||
the oldest task is removed so that only the most recent remain. | ||
This only applies to log_vertex_build tasks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are going to apply this only to the log_vertex_build we should make it explicity on the class name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why there are two places were we create a job_id, can't we center it on one place only?
const eventDelivery = useGetConfig((state) => state.data?.event_delivery); | ||
const shouldStreamEvents = () => { | ||
// Get from useGetConfig store | ||
return eventDelivery === "streaming"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use an enum here instead of a plain string
} catch (e: any) { | ||
if (e.message === "Endpoint not available") { | ||
return await buildVertices(params); | ||
if ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be good to put that inside some consts
This PR refactors the build flow process by introducing a job‐based event system that decouples the build execution from the API response and provides flexible event delivery (streaming or polling). Key changes include:
Backend API & Event Handling
langflow/api/build.py
) implements asynchronous build flow processing. It sets up an event queue per build job (identified by a generated job ID) and streams build events via anEventManager
.DisconnectHandlerStreamingResponse
(inlangflow/api/disconnect.py
) gracefully handles client disconnects.langflow/api/v1/chat.py
) has been refactored to start the build process, return a job ID, and expose a separate events endpoint for polling/streaming events.Queue Service Implementation
QueueService
(with accompanying factory inservices/queue/factory.py
and service inservices/queue/service.py
) manages per-job asyncio queues, starts build tasks, and performs periodic cleanup of completed or cancelled queues.services/deps.py
) now provides access to theQueueService
.Configuration & Settings Updates
event_delivery
option (a literal of"polling"
or"streaming"
), and backend settings have been updated to support this.Task Backend Improvements
Test & Frontend Updates
tests/unit/build_utils.py
) and updated test cases validate the new build flow process, ensuring that job IDs are returned and events are correctly streamed or polled.src/frontend/src/utils/buildUtils.ts
and config queries) has been updated to honor theevent_delivery
setting. It now falls back to polling if streaming is not available.Overall, this refactor decouples build processing from the client response by using a queue-based event system, improves robustness via proper resource cleanup, and provides flexible event delivery options to support various deployment scenarios.