2121 ResponseOutputItemDoneEvent ,
2222 ResponseOutputItemAddedEvent ,
2323 ResponseCodeInterpreterToolCall ,
24- ResponseReasoningSummaryPartDoneEvent ,
2524 ResponseReasoningSummaryPartAddedEvent ,
2625 ResponseReasoningSummaryTextDeltaEvent ,
2726)
2827from agents .models .openai_provider import OpenAIProvider
28+ from openai .types .responses .response_reasoning_text_done_event import ResponseReasoningTextDoneEvent
29+ from openai .types .responses .response_reasoning_text_delta_event import ResponseReasoningTextDeltaEvent
30+ from openai .types .responses .response_reasoning_summary_text_done_event import ResponseReasoningSummaryTextDoneEvent
2931
3032from agentex import AsyncAgentex
3133from agentex .lib .utils .logging import make_logger
4042from agentex .types .task_message_content import TextContent
4143from agentex .types .tool_request_content import ToolRequestContent
4244from agentex .types .tool_response_content import ToolResponseContent
45+ from agentex .types .reasoning_content_delta import ReasoningContentDelta
46+ from agentex .types .reasoning_summary_delta import ReasoningSummaryDelta
4347
4448logger = make_logger (__name__ )
4549
@@ -460,22 +464,25 @@ def _extract_tool_response_info(tool_map: dict[str, Any], tool_output_item: Any)
460464 return call_id , tool_name , content
461465
462466
463- async def convert_openai_to_agentex_events (stream_response ):
464- """Convert OpenAI streaming events to AgentEx TaskMessageUpdate events.
465- This function takes an async iterator of OpenAI events and yields AgentEx
466- TaskMessageUpdate events based on the OpenAI event types.
467+ async def convert_openai_to_agentex_events_with_reasoning (stream_response ):
468+ """Convert OpenAI streaming events to AgentEx TaskMessageUpdate events with reasoning support.
469+
470+ This is an enhanced version of the base converter that includes support for:
471+ - Reasoning content deltas (for o1 models)
472+ - Reasoning summary deltas (for o1 models)
473+
467474 Args:
468475 stream_response: An async iterator of OpenAI streaming events
469476 Yields:
470- TaskMessageUpdate: AgentEx streaming events (StreamTaskMessageDelta or StreamTaskMessageDone)
477+ TaskMessageUpdate: AgentEx streaming events (StreamTaskMessageDelta, StreamTaskMessageFull, or StreamTaskMessageDone)
471478 """
472479
473480 tool_map = {}
474481 event_count = 0
475482 message_index = 0 # Track message index for proper sequencing
476483 seen_tool_output = False # Track if we've seen tool output to know when final text starts
477484 item_id_to_index = {} # Map item_id to message index
478- current_reasoning_summary = "" # Accumulate reasoning summary text
485+ item_id_to_type = {} # Map item_id to content type ( text, reasoning_content, reasoning_summary)
479486
480487 async for event in stream_response :
481488 event_count += 1
@@ -495,16 +502,107 @@ async def convert_openai_to_agentex_events(stream_response):
495502 elif isinstance (raw_event , ResponseOutputItemDoneEvent ):
496503 item_id = raw_event .item .id
497504 if item_id in item_id_to_index :
498- # Send done event for this message
499- yield StreamTaskMessageDone (
500- type = "done" ,
505+ # Get the message type to decide whether to send done event
506+ message_type = item_id_to_type .get (item_id , "text" )
507+
508+ # Don't send done events for reasoning content/summary
509+ # They just end with their last delta
510+ if message_type not in ("reasoning_content" , "reasoning_summary" ):
511+ yield StreamTaskMessageDone (
512+ type = "done" ,
513+ index = item_id_to_index [item_id ],
514+ )
515+
516+ # Skip reasoning summary part added events - we handle them on delta
517+ elif isinstance (raw_event , ResponseReasoningSummaryPartAddedEvent ):
518+ pass
519+
520+ # Handle reasoning summary text delta events
521+ elif isinstance (raw_event , ResponseReasoningSummaryTextDeltaEvent ):
522+ item_id = raw_event .item_id
523+ summary_index = raw_event .summary_index
524+
525+ # If this is a new item_id we haven't seen, create a new message
526+ if item_id and item_id not in item_id_to_index :
527+ message_index += 1
528+ item_id_to_index [item_id ] = message_index
529+ item_id_to_type [item_id ] = "reasoning_summary"
530+
531+ # Send a start event for this new reasoning summary message
532+ yield StreamTaskMessageStart (
533+ type = "start" ,
501534 index = item_id_to_index [item_id ],
535+ content = TextContent (
536+ type = "text" ,
537+ author = "agent" ,
538+ content = "" , # Start with empty content
539+ ),
502540 )
503541
504- # Skip reasoning summary events since o1 reasoning tokens are not accessible
505- elif isinstance (raw_event , (ResponseReasoningSummaryPartAddedEvent ,
506- ResponseReasoningSummaryTextDeltaEvent ,
507- ResponseReasoningSummaryPartDoneEvent )):
542+ # Use the index for this item_id
543+ current_index = item_id_to_index .get (item_id , message_index )
544+
545+ # Yield reasoning summary delta
546+ yield StreamTaskMessageDelta (
547+ type = "delta" ,
548+ index = current_index ,
549+ delta = ReasoningSummaryDelta (
550+ type = "reasoning_summary" ,
551+ summary_index = summary_index ,
552+ summary_delta = raw_event .delta ,
553+ ),
554+ )
555+
556+ # Handle reasoning summary text done events
557+ elif isinstance (raw_event , ResponseReasoningSummaryTextDoneEvent ):
558+ # We do NOT close the streaming context here
559+ # as there can be multiple reasoning summaries.
560+ # The context will be closed when the entire
561+ # output item is done (ResponseOutputItemDoneEvent)
562+ pass
563+
564+ # Handle reasoning content text delta events
565+ elif isinstance (raw_event , ResponseReasoningTextDeltaEvent ):
566+ item_id = raw_event .item_id
567+ content_index = raw_event .content_index
568+
569+ # If this is a new item_id we haven't seen, create a new message
570+ if item_id and item_id not in item_id_to_index :
571+ message_index += 1
572+ item_id_to_index [item_id ] = message_index
573+ item_id_to_type [item_id ] = "reasoning_content"
574+
575+ # Send a start event for this new reasoning content message
576+ yield StreamTaskMessageStart (
577+ type = "start" ,
578+ index = item_id_to_index [item_id ],
579+ content = TextContent (
580+ type = "text" ,
581+ author = "agent" ,
582+ content = "" , # Start with empty content
583+ ),
584+ )
585+
586+ # Use the index for this item_id
587+ current_index = item_id_to_index .get (item_id , message_index )
588+
589+ # Yield reasoning content delta
590+ yield StreamTaskMessageDelta (
591+ type = "delta" ,
592+ index = current_index ,
593+ delta = ReasoningContentDelta (
594+ type = "reasoning_content" ,
595+ content_index = content_index ,
596+ content_delta = raw_event .delta ,
597+ ),
598+ )
599+
600+ # Handle reasoning content text done events
601+ elif isinstance (raw_event , ResponseReasoningTextDoneEvent ):
602+ # We do NOT close the streaming context here
603+ # as there can be multiple reasoning content texts.
604+ # The context will be closed when the entire
605+ # output item is done (ResponseOutputItemDoneEvent)
508606 pass
509607
510608 # Check if this is a text delta event from OpenAI
@@ -523,6 +621,8 @@ async def convert_openai_to_agentex_events(stream_response):
523621 else :
524622 item_id_to_index [item_id ] = message_index
525623
624+ item_id_to_type [item_id ] = "text"
625+
526626 # Send a start event with empty content for this new text message
527627 yield StreamTaskMessageStart (
528628 type = "start" ,
@@ -548,7 +648,7 @@ async def convert_openai_to_agentex_events(stream_response):
548648 yield delta_message
549649
550650 elif hasattr (event , 'type' ) and event .type == 'run_item_stream_event' :
551- # Skip reasoning_item events since o1 reasoning tokens are not accessible via the API
651+ # Skip reasoning_item events - they're handled via raw_response_event above
552652 if hasattr (event , 'item' ) and event .item .type == 'reasoning_item' :
553653 continue
554654
@@ -587,3 +687,4 @@ async def convert_openai_to_agentex_events(stream_response):
587687 index = message_index ,
588688 content = tool_response_content ,
589689 )
690+
0 commit comments