-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCallback.py
121 lines (94 loc) · 3.99 KB
/
Callback.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
"""Callback Handler that prints to streamlit."""
from __future__ import annotations
from asyncio import Queue
from enum import Enum
from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.callbacks import BaseCallbackHandler, AsyncCallbackHandler
from langchain_core.outputs import LLMResult
from langchain_community.callbacks.streamlit.mutable_expander import MutableExpander
class GradioCallbackHandler(BaseCallbackHandler):
"""Callback handler that yields events."""
def __init__(
self, progress
):
self.queue = Queue()
self.done = False
self.progress = progress
self.count = 0
self.message = "Verarbeite Frage"
self.progress((self.count, 4), self.message)
def __aiter__(self):
return self
async def __anext__(self):
if self.done:
raise StopAsyncIteration
#self.count += 1
print("progress: ", self.count, self.message)
self.progress((self.count, 4), self.message)
return await self.queue.get()
def end_run(self, future):
print("end_run: ", future)
self.done = True
self.queue.put_nowait("end of run")
def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> None:
pass
def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
pass
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
print("llm end: ", response)
def on_llm_error(self, error: BaseException, **kwargs: Any) -> None:
pass
def on_tool_start(
self, serialized: Dict[str, Any], input_str: str, **kwargs: Any
) -> None:
print("tool start: ", serialized, input_str, kwargs)
def on_tool_end(
self,
output: Any,
color: Optional[str] = None,
observation_prefix: Optional[str] = None,
llm_prefix: Optional[str] = None,
**kwargs: Any,
) -> None:
print("tool end: ", output)
def on_tool_error(self, error: BaseException, **kwargs: Any) -> None:
print("tool error: ", error)
def on_text(
self,
text: str,
color: Optional[str] = None,
end: str = "",
**kwargs: Any,
) -> None:
print("on_text: ", text)
def on_chain_start(
self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any
) -> None:
print("chain starting: ", str(serialized)[:100], str(inputs)[:100], kwargs)
print("chain metadata: ", kwargs["metadata"])
if "metadata" in kwargs and "message" in kwargs['metadata'] and kwargs["metadata"]["message"] != self.message:
self.message = kwargs["metadata"]["message"]
self.queue.put_nowait(f"chain start: {inputs}")
self.count += 1
def on_retriever_start(self, serialized: dict[str, Any], query: str, *, run_id: UUID, parent_run_id: UUID | None = None, tags: list[str] | None = None, metadata: dict[str, Any] | None = None, **kwargs: Any) -> None:
#print("on_retriever_start: ", query, run_id, parent_run_id, tags, metadata)
pass
def on_retriever_end(self, documents: Sequence[Document], *, run_id: UUID, parent_run_id: UUID | None = None, tags: list[str] | None = None, **kwargs: Any) -> None:
pass
# print("on_retriever_end: ", documents, run_id, parent_run_id, tags)
def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
print("chain end: ", str(outputs)[:100], kwargs)
self.queue.put_nowait(f"chain end: {outputs.keys()}")
def on_chain_error(self, error: BaseException, **kwargs: Any) -> None:
print("chain error: ", error)
def on_agent_action(
self, action: AgentAction, color: Optional[str] = None, **kwargs: Any
) -> Any:
pass
def on_agent_finish(
self, finish: AgentFinish, color: Optional[str] = None, **kwargs: Any
) -> None:
pass