-
Notifications
You must be signed in to change notification settings - Fork 3
Cache read in thread contructor #157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| def thread( | ||
| self, | ||
| *, | ||
| name: str | None = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we have a Thread.name and an Agent.name. I think we should remove Agent.name as we don't use it anywhere and it can cause confusion with Thread.name.
| self._default_rows_limit = rows_limit | ||
|
|
||
| self._lazy_mode = lazy | ||
| self._name = name or f"thread-{uuid.uuid4()!s}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably expose the name publicly as well, like we have with Agent.name.
| self._cache = self._agent.cache.scoped(self._name) | ||
| state = self._cache.get("state", default=None) | ||
| if state: | ||
| if overwrite: | ||
| self._cache.put("state", {}) | ||
| print(f"Overwrote existing state for thread {self._name}. History is empty.") | ||
| else: | ||
| print(f"Loaded existing state for thread {self._name}.\nOperations:") | ||
| self._opas = state.get("operations", []) | ||
| self._opas_processed_count = len(self._opas) | ||
| messages = state.get("messages", []) | ||
| self._meta = {"messages": messages} | ||
| counter = 1 | ||
| for opa_group in self._opas: | ||
| for opa in opa_group: | ||
| print(f"- Op {counter}: {opa.query}") | ||
| counter += 1 | ||
| self._data_result = self._agent.executor.get_result(messages) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it strange to assume the existence of these cache keys in Thread, as I would assume that they are specific to an Executor. Maybe it would be cleaner to have a thread-specific cache scope and an Executor specific cache scope instead. That way, the Thread cache would save _opas and _data_result without relying on Executor.
| self._graph_recursion_limit = 50 | ||
|
|
||
| def _process_opas(self, opas: list[Opa], cache: Cache) -> list[Any]: | ||
| def _process_opas(self, opas: list[Opa], cache: Cache) -> dict[str, Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these changes break ReactDuckDBExecutor
| """Update message history in cache with final messages from graph execution.""" | ||
| if final_messages: | ||
| cache.put("state", {"messages": final_messages}) | ||
| def _update_message_history(self, cache: Cache, state: dict[str, Any]) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rename the method name now to something like _save_state
Implemented simplest use case for cache: