diff --git a/packages/headless/src/api/knowledge/stream-answer-api.ts b/packages/headless/src/api/knowledge/stream-answer-api.ts index 9c3f0bd76d..fe7ca19cbc 100644 --- a/packages/headless/src/api/knowledge/stream-answer-api.ts +++ b/packages/headless/src/api/knowledge/stream-answer-api.ts @@ -2,7 +2,13 @@ import { EventSourceMessage, fetchEventSource, } from '@microsoft/fetch-event-source'; -import {createSelector} from '@reduxjs/toolkit'; +import {createSelector, ThunkDispatch, UnknownAction} from '@reduxjs/toolkit'; +import { + setAnswerContentFormat, + updateCitations, + updateMessage, +} from '../../features/generated-answer/generated-answer-actions.js'; +import {logGeneratedAnswerStreamEnd} from '../../features/generated-answer/generated-answer-analytics-actions.js'; import {selectFieldsToIncludeInCitation} from '../../features/generated-answer/generated-answer-selectors.js'; import {GeneratedContentFormat} from '../../features/generated-answer/generated-response-format.js'; import {maximumNumberOfResultsFromIndex} from '../../features/pagination/pagination-constants.js'; @@ -118,7 +124,8 @@ const handleError = ( const updateCacheWithEvent = ( event: EventSourceMessage, - draft: GeneratedAnswerStream + draft: GeneratedAnswerStream, + dispatch: ThunkDispatch ) => { const message: Required = JSON.parse(event.data); if (message.finishReason === 'ERROR' && message.errorMessage) { @@ -133,21 +140,27 @@ const updateCacheWithEvent = ( case 'genqa.headerMessageType': if (parsedPayload.contentFormat) { handleHeaderMessage(draft, parsedPayload); + dispatch(setAnswerContentFormat(parsedPayload.contentFormat)); } break; case 'genqa.messageType': if (parsedPayload.textDelta) { handleMessage(draft, parsedPayload); + dispatch(updateMessage({textDelta: parsedPayload.textDelta})); } break; case 'genqa.citationsType': if (parsedPayload.citations) { handleCitations(draft, parsedPayload); + dispatch(updateCitations({citations: parsedPayload.citations})); } break; case 'genqa.endOfStreamType': if (draft.answer?.length || parsedPayload.answerGenerated) { handleEndOfStream(draft, parsedPayload); + dispatch( + logGeneratedAnswerStreamEnd(parsedPayload.answerGenerated ?? false) + ); } break; } @@ -170,7 +183,7 @@ export const answerApi = answerSlice.injectEndpoints({ }), async onCacheEntryAdded( args, - {getState, cacheDataLoaded, updateCachedData} + {getState, cacheDataLoaded, updateCachedData, dispatch} ) { await cacheDataLoaded; /** @@ -209,7 +222,7 @@ export const answerApi = answerSlice.injectEndpoints({ }, onmessage: (event) => { updateCachedData((draft) => { - updateCacheWithEvent(event, draft); + updateCacheWithEvent(event, draft, dispatch); }); }, onerror: (error) => {