-
-
Notifications
You must be signed in to change notification settings - Fork 135
[Chat] Add streaming support & persistence to storage through accumulator callback chaining #928
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[Chat] Add streaming support & persistence to storage through accumulator callback chaining #928
Conversation
a1f4c83 to
8a71916
Compare
7d97953 to
5b388c1
Compare
ab56dc7 to
301e15c
Compare
301e15c to
377d7d6
Compare
377d7d6 to
62503c1
Compare
|
Had some trouble with Git as I never worked with a forked repo like this before, hence the force pushes.. However, the branch history should now be fixed, up-to-date & mergeable. |
|
|
||
| return new AccumulatingStreamResult($result, function (AssistantMessage $assistantMessage) use ($messages) { | ||
| $messages->add($assistantMessage); | ||
| $this->store->save($messages); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be done in a single line with $this->store->save($messages->with($assistantMessage));
This way, we could use an arrow function.
| if ($result instanceof StreamResult || $result instanceof ToolboxStreamResult) { | ||
| if (!$this->store instanceof StreamableStoreInterface) { | ||
| throw new RuntimeException($this->store::class.' does not support streaming.'); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that we should throw an exception here, maybe just returning the complete stream at once?
Or maybe we can just store the message "as it" in the store?
Description
The
Chat::submit()function didn't support streaming. I have added support for this through:AccumulatingStreamResultwrapper which allows callback chaining and accumulates the full message (callbacks are fired onceGeneratorin innerStreamResultis exhausted, aka when stream has completed)StreamResultclasses with this wrapper inside theChat::submit()functionStoragewith a newAssistantMessagebuilt from the accumulated message (as non-stream responses are also automatically added to storage inChat::submit())Other changes:
StreamableStoreInterfacewhich indicates aStoreimplementation supports streaming. Currently added to all stores exceptSessionStorage(due to inherent issues with headers being already sent; deferred updates through callbacks go against nature of header/session lifecycles in Symfony; discussed with @Guikingone)Chat::submit()(metadata was not transferred to newAssistantMessagewe created)Chat::submit()combination