@@ -156,36 +156,9 @@ protected Flowable<Event> postprocess(
156156 }
157157
158158 return currentLlmResponse .flatMapPublisher (
159- updatedResponse -> {
160- Flowable <Event > processorEvents = Flowable .fromIterable (Iterables .concat (eventIterables ));
161-
162- if (updatedResponse .content ().isEmpty ()
163- && updatedResponse .errorCode ().isEmpty ()
164- && !updatedResponse .interrupted ().orElse (false )
165- && !updatedResponse .turnComplete ().orElse (false )) {
166- return processorEvents ;
167- }
168-
169- Event modelResponseEvent =
170- buildModelResponseEvent (baseEventForLlmResponse , llmRequest , updatedResponse );
171-
172- Flowable <Event > modelEventStream = Flowable .just (modelResponseEvent );
173-
174- if (modelResponseEvent .functionCalls ().isEmpty ()) {
175- return processorEvents .concatWith (modelEventStream );
176- }
177-
178- Maybe <Event > maybeFunctionCallEvent ;
179- if (context .runConfig ().streamingMode () == StreamingMode .BIDI ) {
180- maybeFunctionCallEvent =
181- Functions .handleFunctionCallsLive (context , modelResponseEvent , llmRequest .tools ());
182- } else {
183- maybeFunctionCallEvent =
184- Functions .handleFunctionCalls (context , modelResponseEvent , llmRequest .tools ());
185- }
186-
187- return processorEvents .concatWith (modelEventStream ).concatWith (maybeFunctionCallEvent );
188- });
159+ updatedResponse ->
160+ buildPostprocessingEvents (
161+ updatedResponse , eventIterables , context , baseEventForLlmResponse , llmRequest ));
189162 }
190163
191164 /**
@@ -623,6 +596,45 @@ public void onError(Throwable e) {
623596 *
624597 * @return A fully constructed {@link Event} representing the LLM response.
625598 */
599+ private Flowable <Event > buildPostprocessingEvents (
600+ LlmResponse updatedResponse ,
601+ List <Iterable <Event >> eventIterables ,
602+ InvocationContext context ,
603+ Event baseEventForLlmResponse ,
604+ LlmRequest llmRequest ) {
605+ Flowable <Event > processorEvents = Flowable .fromIterable (Iterables .concat (eventIterables ));
606+ if (updatedResponse .content ().isEmpty ()
607+ && updatedResponse .errorCode ().isEmpty ()
608+ && !updatedResponse .interrupted ().orElse (false )
609+ && !updatedResponse .turnComplete ().orElse (false )) {
610+ return processorEvents ;
611+ }
612+
613+ Event modelResponseEvent =
614+ buildModelResponseEvent (baseEventForLlmResponse , llmRequest , updatedResponse );
615+ if (modelResponseEvent .functionCalls ().isEmpty ()) {
616+ return processorEvents .concatWith (Flowable .just (modelResponseEvent ));
617+ }
618+
619+ Maybe <Event > maybeFunctionResponseEvent =
620+ context .runConfig ().streamingMode () == StreamingMode .BIDI
621+ ? Functions .handleFunctionCallsLive (context , modelResponseEvent , llmRequest .tools ())
622+ : Functions .handleFunctionCalls (context , modelResponseEvent , llmRequest .tools ());
623+
624+ Flowable <Event > functionEvents =
625+ maybeFunctionResponseEvent .flatMapPublisher (
626+ functionResponseEvent -> {
627+ Optional <Event > toolConfirmationEvent =
628+ Functions .generateRequestConfirmationEvent (
629+ context , modelResponseEvent , functionResponseEvent );
630+ return toolConfirmationEvent .isPresent ()
631+ ? Flowable .just (toolConfirmationEvent .get (), functionResponseEvent )
632+ : Flowable .just (functionResponseEvent );
633+ });
634+
635+ return processorEvents .concatWith (Flowable .just (modelResponseEvent )).concatWith (functionEvents );
636+ }
637+
626638 private Event buildModelResponseEvent (
627639 Event baseEventForLlmResponse , LlmRequest llmRequest , LlmResponse llmResponse ) {
628640 Event .Builder eventBuilder =
@@ -640,10 +652,13 @@ private Event buildModelResponseEvent(
640652
641653 Event event = eventBuilder .build ();
642654
655+ logger .info ("event: {} functionCalls: {}" , event , event .functionCalls ());
656+
643657 if (!event .functionCalls ().isEmpty ()) {
644658 Functions .populateClientFunctionCallId (event );
645659 Set <String > longRunningToolIds =
646660 Functions .getLongRunningFunctionCalls (event .functionCalls (), llmRequest .tools ());
661+ logger .info ("longRunningToolIds: {}" , longRunningToolIds );
647662 if (!longRunningToolIds .isEmpty ()) {
648663 event .setLongRunningToolIds (Optional .of (longRunningToolIds ));
649664 }
0 commit comments