Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,52 @@ if err := result.Wait(); err != nil {
}
```

### Streaming callbacks

Set `TurnOptions.Callbacks` to receive typed updates without writing a `switch` over
`ThreadEvent`. The SDK invokes `OnEvent` first, followed by any matching typed callbacks,
and finally forwards the raw event through `Events()`. Callbacks run on the streaming goroutine,
so long-running work should be offloaded to avoid stalling the stream. You must continue
draining the `Events()` channel (an empty `for range` loop works) to honour the CLI's
backpressure expectations.

```go
callbacks := &godex.StreamCallbacks{
OnMessage: func(evt godex.StreamMessageEvent) {
if evt.Stage == godex.StreamItemStageCompleted {
log.Printf("assistant: %s", evt.Message.Text)
}
},
OnCommand: func(evt godex.StreamCommandEvent) {
log.Printf("command %s: %s", evt.Command.Status, evt.Command.Command)
},
OnPatch: func(evt godex.StreamPatchEvent) {
log.Printf("patch %s: %s", evt.Patch.ID, evt.Patch.Status)
},
OnFileChange: func(evt godex.StreamFileChangeEvent) {
log.Printf(" file %s (%s)", evt.Change.Path, evt.Change.Kind)
},
}

result, err := thread.RunStreamed(ctx, "Summarize the latest changes.", &godex.TurnOptions{
Callbacks: callbacks,
})
if err != nil {
log.Fatal(err)
}
defer result.Close()

for range result.Events() {
// Drain events; callbacks handled the typed work already.
}

if err := result.Wait(); err != nil {
log.Fatal(err)
}
```

See `examples/streaming_callbacks` for a complete runnable sample.

## Structured output

Pass a JSON schema in `TurnOptions.OutputSchema` and the SDK writes a temporary file for the CLI:
Expand Down
176 changes: 176 additions & 0 deletions callbacks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package godex

// StreamItemStage indicates which phase of the lifecycle produced a callback.
type StreamItemStage string

const (
StreamItemStageStarted StreamItemStage = "started"
StreamItemStageUpdated StreamItemStage = "updated"
StreamItemStageCompleted StreamItemStage = "completed"
)

// StreamMessageEvent describes a callback payload for agent message items.
type StreamMessageEvent struct {
Stage StreamItemStage
Message AgentMessageItem
}

// StreamReasoningEvent describes a callback payload for reasoning items.
type StreamReasoningEvent struct {
Stage StreamItemStage
Reasoning ReasoningItem
}

// StreamCommandEvent describes a callback payload for command execution items.
type StreamCommandEvent struct {
Stage StreamItemStage
Command CommandExecutionItem
}

// StreamPatchEvent describes a callback payload for patch/file change items.
type StreamPatchEvent struct {
Stage StreamItemStage
Patch FileChangeItem
}

// StreamFileChangeEvent describes a callback payload for each file updated within a patch.
type StreamFileChangeEvent struct {
Stage StreamItemStage
Patch FileChangeItem
Change FileUpdateChange
}

// StreamWebSearchEvent describes a callback payload for web search items.
type StreamWebSearchEvent struct {
Stage StreamItemStage
Search WebSearchItem
}

// StreamToolCallEvent describes a callback payload for MCP tool call items.
type StreamToolCallEvent struct {
Stage StreamItemStage
ToolCall McpToolCallItem
}

// StreamTodoListEvent describes a callback payload for todo list items.
type StreamTodoListEvent struct {
Stage StreamItemStage
List TodoListItem
}

// StreamErrorItemEvent describes a callback payload for non-fatal error items.
type StreamErrorItemEvent struct {
Stage StreamItemStage
Error ErrorItem
}

// StreamCallbacks enumerates optional hooks invoked when streaming events are delivered.
type StreamCallbacks struct {
// OnEvent fires for every event before any type-specific callback.
OnEvent func(ThreadEvent)

OnThreadStarted func(ThreadStartedEvent)
OnTurnStarted func(TurnStartedEvent)
OnTurnCompleted func(TurnCompletedEvent)
OnTurnFailed func(TurnFailedEvent)
OnThreadError func(ThreadErrorEvent)

OnMessage func(StreamMessageEvent)
OnReasoning func(StreamReasoningEvent)
OnCommand func(StreamCommandEvent)
OnPatch func(StreamPatchEvent)
OnFileChange func(StreamFileChangeEvent)
OnWebSearch func(StreamWebSearchEvent)
OnToolCall func(StreamToolCallEvent)
OnTodoList func(StreamTodoListEvent)
OnErrorItem func(StreamErrorItemEvent)
}

func (c *StreamCallbacks) handle(event ThreadEvent) {
if c == nil {
return
}

if c.OnEvent != nil {
c.OnEvent(event)
}

switch e := event.(type) {
case ThreadStartedEvent:
if c.OnThreadStarted != nil {
c.OnThreadStarted(e)
}
case TurnStartedEvent:
if c.OnTurnStarted != nil {
c.OnTurnStarted(e)
}
case TurnCompletedEvent:
if c.OnTurnCompleted != nil {
c.OnTurnCompleted(e)
}
case TurnFailedEvent:
if c.OnTurnFailed != nil {
c.OnTurnFailed(e)
}
case ThreadErrorEvent:
if c.OnThreadError != nil {
c.OnThreadError(e)
}
case ItemStartedEvent:
c.handleItem(StreamItemStageStarted, e.Item)
case ItemUpdatedEvent:
c.handleItem(StreamItemStageUpdated, e.Item)
case ItemCompletedEvent:
c.handleItem(StreamItemStageCompleted, e.Item)
}
}

func (c *StreamCallbacks) handleItem(stage StreamItemStage, item ThreadItem) {
if c == nil || item == nil {
return
}

switch v := item.(type) {
case AgentMessageItem:
if c.OnMessage != nil {
c.OnMessage(StreamMessageEvent{Stage: stage, Message: v})
}
case ReasoningItem:
if c.OnReasoning != nil {
c.OnReasoning(StreamReasoningEvent{Stage: stage, Reasoning: v})
}
case CommandExecutionItem:
if c.OnCommand != nil {
c.OnCommand(StreamCommandEvent{Stage: stage, Command: v})
}
case FileChangeItem:
if c.OnPatch != nil {
c.OnPatch(StreamPatchEvent{Stage: stage, Patch: v})
}
if c.OnFileChange != nil {
for _, change := range v.Changes {
c.OnFileChange(StreamFileChangeEvent{
Stage: stage,
Patch: v,
Change: change,
})
}
}
case McpToolCallItem:
if c.OnToolCall != nil {
c.OnToolCall(StreamToolCallEvent{Stage: stage, ToolCall: v})
}
case WebSearchItem:
if c.OnWebSearch != nil {
c.OnWebSearch(StreamWebSearchEvent{Stage: stage, Search: v})
}
case TodoListItem:
if c.OnTodoList != nil {
c.OnTodoList(StreamTodoListEvent{Stage: stage, List: v})
}
case ErrorItem:
if c.OnErrorItem != nil {
c.OnErrorItem(StreamErrorItemEvent{Stage: stage, Error: v})
}
}
}
62 changes: 62 additions & 0 deletions examples/streaming_callbacks/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package main

import (
"context"
"fmt"
"log"

"github.com/activadee/godex"
)

func main() {
client, err := godex.New(godex.CodexOptions{})
if err != nil {
log.Fatalf("create codex client: %v", err)
}

thread := client.StartThread(godex.ThreadOptions{
Model: "gpt-5",
})

callbacks := &godex.StreamCallbacks{
OnMessage: func(evt godex.StreamMessageEvent) {
switch evt.Stage {
case godex.StreamItemStageUpdated:
fmt.Printf("[assistant partial] %s\n", evt.Message.Text)
case godex.StreamItemStageCompleted:
fmt.Printf("[assistant final] %s\n", evt.Message.Text)
}
},
OnCommand: func(evt godex.StreamCommandEvent) {
fmt.Printf("[command %s] %s\n", evt.Command.Status, evt.Command.Command)
},
OnPatch: func(evt godex.StreamPatchEvent) {
fmt.Printf("[patch %s] status=%s\n", evt.Patch.ID, evt.Patch.Status)
},
OnFileChange: func(evt godex.StreamFileChangeEvent) {
fmt.Printf(" file %s (%s)\n", evt.Change.Path, evt.Change.Kind)
},
OnWebSearch: func(evt godex.StreamWebSearchEvent) {
fmt.Printf("[web search] %s\n", evt.Search.Query)
},
OnThreadError: func(evt godex.ThreadErrorEvent) {
log.Printf("[stream error] %s", evt.Message)
},
}

result, err := thread.RunStreamed(context.Background(), "Summarize the latest SDK changes and list next steps.", &godex.TurnOptions{
Callbacks: callbacks,
})
if err != nil {
log.Fatalf("start streamed run: %v", err)
}
defer result.Close()

for range result.Events() {
// Drain events to honour backpressure; callbacks already handled rendering.
}

if err := result.Wait(); err != nil {
log.Fatalf("stream failed: %v", err)
}
}
2 changes: 2 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,6 @@ type TurnOptions struct {
// OutputSchema is an optional JSON schema describing the structured response to
// collect from the agent. Must serialize to a JSON object (not an array or primitive).
OutputSchema any
// Callbacks attaches optional streaming callbacks invoked as events arrive.
Callbacks *StreamCallbacks
}
6 changes: 6 additions & 0 deletions thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func (t *Thread) runStreamed(ctx context.Context, baseInput string, segments []I
turnOpts = *turnOptions
}

callbacks := turnOpts.Callbacks

prepared, err := normalizeInput(baseInput, segments)
if err != nil {
return RunStreamedResult{}, err
Expand Down Expand Up @@ -152,6 +154,10 @@ func (t *Thread) runStreamed(ctx context.Context, baseInput string, segments []I
threadErr = &ThreadStreamError{ThreadError: ThreadError{Message: errEvent.Message}}
}

if callbacks != nil {
callbacks.handle(event)
}

select {
case events <- event:
return nil
Expand Down
Loading