Skip to content

Commit

Permalink
Merge pull request #78 from wliu6v/fix-sse
Browse files Browse the repository at this point in the history
fix #69: sse not work as intended
  • Loading branch information
redevrx authored Nov 22, 2023
2 parents aa842b7 + a17b4fd commit 62628a6
Showing 1 changed file with 14 additions and 28 deletions.
42 changes: 14 additions & 28 deletions lib/src/client/openai_client.dart
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:chat_gpt_sdk/src/client/openai_wrapper.dart';

import 'package:chat_gpt_sdk/src/client/exception/base_error_wrapper.dart';
import 'package:chat_gpt_sdk/src/client/exception/request_error.dart';
import 'package:chat_gpt_sdk/src/client/openai_wrapper.dart';
import 'package:chat_gpt_sdk/src/logger/logger.dart';
import 'package:chat_gpt_sdk/src/model/cancel/cancel_data.dart';
import 'package:chat_gpt_sdk/src/model/error/openai_error.dart';
import 'package:chat_gpt_sdk/src/utils/json_decode_string.dart';
import 'package:dio/dio.dart';

class OpenAIClient extends OpenAIWrapper {
Expand Down Expand Up @@ -248,7 +248,6 @@ class OpenAIClient extends OpenAIWrapper {
log.log("request body :$request");
final controller = StreamController<T>.broadcast();
final cancelData = CancelData(cancelToken: CancelToken());
final List<int> chunks = [];

try {
onCancel(cancelData);
Expand All @@ -263,42 +262,29 @@ class OpenAIClient extends OpenAIWrapper {
(it) {
it.data.stream.listen(
(it) {
chunks.addAll(it);
},
onDone: () {
final raw = utf8.decode(chunks);
final dataList = raw
final rawData = utf8.decode(it);
final dataList = rawData
.split("\n")
.where((element) => element.isNotEmpty)
.toList();

for (final data in dataList) {
if (data.startsWith("data: ")) {
///remove data:
final mData = data.substring(6);
if (mData.startsWith("[DONE]")) {
for (final line in dataList) {
if (line.startsWith("data: ")) {
final data = line.substring(6);
if (data.startsWith("[DONE]")) {
log.log("stream response is done");
controller.done;
controller.close();

return;
}

final jsonMap = mData.decode();
if (jsonMap.keys.last) {
///decode data
controller
..sink
..add(complete(jsonMap[jsonMap.keys.last]));
} else {
log.log("stream response invalid try regenerate");
log.log("last json error :$mData");

controller.close();
}
controller
..sink
..add(complete(json.decode(data)));
}
}
},
onDone: () {
controller.close();
},
onError: (err, t) {
log.error(err, t);
if (err is DioException) {
Expand Down

0 comments on commit 62628a6

Please sign in to comment.