|
47 | 47 | import io.reactivex.rxjava3.core.Maybe; |
48 | 48 | import io.reactivex.rxjava3.core.Single; |
49 | 49 | import io.reactivex.rxjava3.disposables.Disposable; |
| 50 | +import io.reactivex.rxjava3.functions.Function; |
50 | 51 | import java.util.ArrayList; |
51 | 52 | import java.util.Collections; |
52 | 53 | import java.util.HashMap; |
@@ -137,27 +138,28 @@ public static Maybe<Event> handleFunctionCalls( |
137 | 138 | Map<String, ToolConfirmation> toolConfirmations) { |
138 | 139 | ImmutableList<FunctionCall> functionCalls = functionCallEvent.functionCalls(); |
139 | 140 |
|
140 | | - List<Maybe<Event>> functionResponseEvents = new ArrayList<>(); |
141 | | - |
142 | 141 | for (FunctionCall functionCall : functionCalls) { |
143 | 142 | if (!tools.containsKey(functionCall.name().get())) { |
144 | 143 | throw new VerifyException("Tool not found: " + functionCall.name().get()); |
145 | 144 | } |
146 | | - BaseTool tool = tools.get(functionCall.name().get()); |
147 | | - ToolContext toolContext = |
148 | | - ToolContext.builder(invocationContext) |
149 | | - .functionCallId(functionCall.id().orElse("")) |
150 | | - .toolConfirmation(toolConfirmations.get(functionCall.id().orElse(null))) |
151 | | - .build(); |
| 145 | + } |
| 146 | + |
| 147 | + Function<FunctionCall, Maybe<Event>> functionCallMapper = |
| 148 | + functionCall -> { |
| 149 | + BaseTool tool = tools.get(functionCall.name().get()); |
| 150 | + ToolContext toolContext = |
| 151 | + ToolContext.builder(invocationContext) |
| 152 | + .functionCallId(functionCall.id().orElse("")) |
| 153 | + .toolConfirmation(toolConfirmations.get(functionCall.id().orElse(null))) |
| 154 | + .build(); |
152 | 155 |
|
153 | | - Map<String, Object> functionArgs = functionCall.args().orElse(ImmutableMap.of()); |
| 156 | + Map<String, Object> functionArgs = functionCall.args().orElse(ImmutableMap.of()); |
154 | 157 |
|
155 | | - Maybe<Map<String, Object>> maybeFunctionResult = |
156 | | - maybeInvokeBeforeToolCall(invocationContext, tool, functionArgs, toolContext) |
157 | | - .switchIfEmpty(Maybe.defer(() -> callTool(tool, functionArgs, toolContext))); |
| 158 | + Maybe<Map<String, Object>> maybeFunctionResult = |
| 159 | + maybeInvokeBeforeToolCall(invocationContext, tool, functionArgs, toolContext) |
| 160 | + .switchIfEmpty(Maybe.defer(() -> callTool(tool, functionArgs, toolContext))); |
158 | 161 |
|
159 | | - Maybe<Event> maybeFunctionResponseEvent = |
160 | | - maybeFunctionResult |
| 162 | + return maybeFunctionResult |
161 | 163 | .map(Optional::of) |
162 | 164 | .defaultIfEmpty(Optional.empty()) |
163 | 165 | .onErrorResumeNext( |
@@ -195,15 +197,15 @@ public static Maybe<Event> handleFunctionCalls( |
195 | 197 | return Maybe.just(functionResponseEvent); |
196 | 198 | }); |
197 | 199 | }); |
198 | | - |
199 | | - functionResponseEvents.add(maybeFunctionResponseEvent); |
200 | | - } |
| 200 | + }; |
201 | 201 |
|
202 | 202 | Flowable<Event> functionResponseEventsFlowable; |
203 | 203 | if (invocationContext.runConfig().toolExecutionMode() == ToolExecutionMode.SEQUENTIAL) { |
204 | | - functionResponseEventsFlowable = Maybe.concat(functionResponseEvents); |
| 204 | + functionResponseEventsFlowable = |
| 205 | + Flowable.fromIterable(functionCalls).concatMapMaybe(functionCallMapper); |
205 | 206 | } else { |
206 | | - functionResponseEventsFlowable = Maybe.merge(functionResponseEvents); |
| 207 | + functionResponseEventsFlowable = |
| 208 | + Flowable.fromIterable(functionCalls).flatMapMaybe(functionCallMapper); |
207 | 209 | } |
208 | 210 | return functionResponseEventsFlowable |
209 | 211 | .toList() |
@@ -240,29 +242,35 @@ public static Maybe<Event> handleFunctionCalls( |
240 | 242 | public static Maybe<Event> handleFunctionCallsLive( |
241 | 243 | InvocationContext invocationContext, Event functionCallEvent, Map<String, BaseTool> tools) { |
242 | 244 | ImmutableList<FunctionCall> functionCalls = functionCallEvent.functionCalls(); |
243 | | - List<Maybe<Event>> responseEvents = new ArrayList<>(); |
244 | 245 |
|
245 | 246 | for (FunctionCall functionCall : functionCalls) { |
246 | 247 | if (!tools.containsKey(functionCall.name().get())) { |
247 | 248 | throw new VerifyException("Tool not found: " + functionCall.name().get()); |
248 | 249 | } |
249 | | - BaseTool tool = tools.get(functionCall.name().get()); |
250 | | - ToolContext toolContext = |
251 | | - ToolContext.builder(invocationContext) |
252 | | - .functionCallId(functionCall.id().orElse("")) |
253 | | - .build(); |
254 | | - Map<String, Object> functionArgs = functionCall.args().orElse(new HashMap<>()); |
255 | | - |
256 | | - Maybe<Map<String, Object>> maybeFunctionResult = |
257 | | - maybeInvokeBeforeToolCall(invocationContext, tool, functionArgs, toolContext) |
258 | | - .switchIfEmpty( |
259 | | - Maybe.defer( |
260 | | - () -> |
261 | | - processFunctionLive( |
262 | | - invocationContext, tool, toolContext, functionCall, functionArgs))); |
263 | | - |
264 | | - Maybe<Event> maybeFunctionResponseEvent = |
265 | | - maybeFunctionResult |
| 250 | + } |
| 251 | + |
| 252 | + Function<FunctionCall, Maybe<Event>> functionCallMapper = |
| 253 | + functionCall -> { |
| 254 | + BaseTool tool = tools.get(functionCall.name().get()); |
| 255 | + ToolContext toolContext = |
| 256 | + ToolContext.builder(invocationContext) |
| 257 | + .functionCallId(functionCall.id().orElse("")) |
| 258 | + .build(); |
| 259 | + Map<String, Object> functionArgs = functionCall.args().orElse(new HashMap<>()); |
| 260 | + |
| 261 | + Maybe<Map<String, Object>> maybeFunctionResult = |
| 262 | + maybeInvokeBeforeToolCall(invocationContext, tool, functionArgs, toolContext) |
| 263 | + .switchIfEmpty( |
| 264 | + Maybe.defer( |
| 265 | + () -> |
| 266 | + processFunctionLive( |
| 267 | + invocationContext, |
| 268 | + tool, |
| 269 | + toolContext, |
| 270 | + functionCall, |
| 271 | + functionArgs))); |
| 272 | + |
| 273 | + return maybeFunctionResult |
266 | 274 | .map(Optional::of) |
267 | 275 | .defaultIfEmpty(Optional.empty()) |
268 | 276 | .onErrorResumeNext( |
@@ -300,15 +308,19 @@ public static Maybe<Event> handleFunctionCallsLive( |
300 | 308 | return Maybe.just(functionResponseEvent); |
301 | 309 | }); |
302 | 310 | }); |
303 | | - responseEvents.add(maybeFunctionResponseEvent); |
304 | | - } |
| 311 | + }; |
305 | 312 |
|
306 | 313 | Flowable<Event> responseEventsFlowable; |
| 314 | + |
307 | 315 | if (invocationContext.runConfig().toolExecutionMode() == ToolExecutionMode.SEQUENTIAL) { |
308 | | - responseEventsFlowable = Maybe.concat(responseEvents); |
| 316 | + responseEventsFlowable = |
| 317 | + Flowable.fromIterable(functionCalls).concatMapMaybe(functionCallMapper); |
| 318 | + |
309 | 319 | } else { |
310 | | - responseEventsFlowable = Maybe.merge(responseEvents); |
| 320 | + responseEventsFlowable = |
| 321 | + Flowable.fromIterable(functionCalls).flatMapMaybe(functionCallMapper); |
311 | 322 | } |
| 323 | + |
312 | 324 | return responseEventsFlowable |
313 | 325 | .toList() |
314 | 326 | .flatMapMaybe( |
|
0 commit comments