1
+ from typing import Optional , Any , Union , Dict , List
2
+ from bots .bot import NovaBot , Agent
3
+ from utils .console import NovaConsole , MessageType
4
+ from hubs .hub import NovaHub
5
+ import json
6
+ import asyncio
7
+
8
+ class ConsoleAgent (NovaBot ):
9
+ """
10
+ An agent that provides visual console feedback for all operations.
11
+ Combines NovaBot's AutoGen compatibility with NovaConsole's beautiful output.
12
+ """
13
+
14
+ def __init__ (self ,
15
+ hub : NovaHub ,
16
+ name : str = "ConsoleAgent" ,
17
+ description : Optional [str ] = None ,
18
+ show_timestamp : bool = True ,
19
+ debug : bool = False ,
20
+ test_mode : bool = False ):
21
+ super ().__init__ (hub = hub , name = name , description = description )
22
+ self .console = NovaConsole (show_timestamp = show_timestamp , debug = debug )
23
+ self ._thinking = False
24
+ self .test_mode = test_mode
25
+
26
+ def _format_content (self , content : Union [str , Dict [str , Any ]]) -> str :
27
+ """Format message content for display"""
28
+ if isinstance (content , dict ):
29
+ return content .get ("content" , str (content ))
30
+ return str (content )
31
+
32
+ def _format_message_detail (self , message : Union [Dict [str , Any ], str ]) -> Optional [Dict [str , Any ]]:
33
+ """Format message details for display"""
34
+ if isinstance (message , dict ):
35
+ # Remove content from details to avoid duplication
36
+ details = message .copy ()
37
+ details .pop ("content" , None )
38
+ return details if details else None
39
+ return None
40
+
41
+ async def _show_thinking (self , action : str ):
42
+ """Show a thinking animation"""
43
+ self ._thinking = True
44
+ dots = ["⠋" , "⠙" , "⠹" , "⠸" , "⠼" , "⠴" , "⠦" , "⠧" , "⠇" , "⠏" ]
45
+ i = 0
46
+ try :
47
+ while self ._thinking :
48
+ try :
49
+ if self .test_mode :
50
+ # In test mode, just print once and exit
51
+ self .console .print (f"{ action } ..." , MessageType .DEBUG )
52
+ break
53
+ self .console .print (f"\r { dots [i ]} { action } ..." , MessageType .DEBUG )
54
+ await asyncio .sleep (0.1 )
55
+ i = (i + 1 ) % len (dots )
56
+ if i > 300 : # 30 seconds (300 * 0.1)
57
+ self ._thinking = False
58
+ break
59
+ except asyncio .CancelledError :
60
+ self ._thinking = False
61
+ break
62
+ finally :
63
+ self ._thinking = False
64
+ if not self .test_mode :
65
+ print ("\r " , end = "" ) # Clear the line
66
+
67
+ async def a_send (self ,
68
+ message : Union [Dict [str , Any ], str ],
69
+ recipient : "Agent" ,
70
+ request_reply : Optional [bool ] = None ) -> None :
71
+ """Send a message with visual feedback"""
72
+ self .console .info (
73
+ f"Sending message to { recipient .name } " ,
74
+ detail = self ._format_message_detail (message )
75
+ )
76
+
77
+ # Show the actual message content
78
+ content = self ._format_content (message )
79
+ self .console .print (f"Message: { content } " , MessageType .DEBUG )
80
+
81
+ await super ().a_send (message , recipient , request_reply )
82
+ self .console .success (f"Message sent to { recipient .name } " )
83
+
84
+ async def a_receive (self ,
85
+ message : Union [Dict [str , Any ], str ],
86
+ sender : "Agent" ,
87
+ request_reply : Optional [bool ] = None ) -> None :
88
+ """Receive a message with visual feedback"""
89
+ self .console .info (
90
+ f"Received message from { sender .name } " ,
91
+ detail = self ._format_message_detail (message )
92
+ )
93
+
94
+ # Show the actual message content
95
+ content = self ._format_content (message )
96
+ self .console .print (f"Message: { content } " , MessageType .DEBUG )
97
+
98
+ # Start thinking animation if reply is requested
99
+ thinking_task = None
100
+ if request_reply :
101
+ thinking_task = asyncio .create_task (
102
+ self ._show_thinking (f"Processing message from { sender .name } " )
103
+ )
104
+
105
+ try :
106
+ await super ().a_receive (message , sender , request_reply )
107
+
108
+ if request_reply :
109
+ self .console .debug (f"Reply requested by { sender .name } " )
110
+ finally :
111
+ if thinking_task :
112
+ self ._thinking = False
113
+ try :
114
+ await thinking_task
115
+ except asyncio .CancelledError :
116
+ pass
117
+
118
+ async def a_generate_reply (self , messages : List [Dict [str , Any ]], sender : Optional [Agent ] = None , ** kwargs ) -> Optional [Union [str , Dict [str , Any ]]]:
119
+ """Generate a reply with visual feedback"""
120
+ thinking_task = None
121
+ try :
122
+ # Start thinking animation in the background
123
+ thinking_task = asyncio .create_task (
124
+ self ._show_thinking (f"Generating reply to { sender .name if sender else 'message' } " )
125
+ )
126
+
127
+ # In test mode, wait for the thinking task to complete first
128
+ if self .test_mode and thinking_task :
129
+ await thinking_task
130
+
131
+ reply = await super ().a_generate_reply (messages , sender , ** kwargs )
132
+
133
+ if reply :
134
+ self .console .success (
135
+ "Reply generated" ,
136
+ detail = self ._format_message_detail (reply )
137
+ )
138
+ content = self ._format_content (reply )
139
+ self .console .print (f"Reply: { content } " , MessageType .DEBUG )
140
+ else :
141
+ self .console .warning ("No reply generated" )
142
+
143
+ return reply
144
+
145
+ except Exception as e :
146
+ self .console .error (f"Error generating reply: { str (e )} " )
147
+ raise
148
+ finally :
149
+ # Ensure thinking animation is always stopped
150
+ self ._thinking = False
151
+ if thinking_task and not thinking_task .done ():
152
+ thinking_task .cancel ()
153
+ try :
154
+ await thinking_task
155
+ except asyncio .CancelledError :
156
+ pass
157
+
158
+ async def cleanup (self ):
159
+ """Cleanup with visual feedback"""
160
+ self .console .system ("Cleaning up agent resources" )
161
+ await super ().cleanup ()
162
+ self .console .success ("Cleanup complete" )
0 commit comments