Skip to content

Commit 4a7d063

Browse files
committed
sse支持
1 parent a8ae4ee commit 4a7d063

File tree

5 files changed

+97
-5
lines changed

5 files changed

+97
-5
lines changed

src/concerns/InteractsWithHttp.php

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,18 @@
22

33
namespace think\worker\concerns;
44

5+
use Stringable;
56
use think\App;
67
use think\Cookie;
78
use think\Event;
89
use think\exception\Handle;
910
use think\helper\Str;
1011
use think\Http;
1112
use think\response\View;
12-
use think\worker\response\File as FileResponse;
1313
use think\worker\App as WorkerApp;
1414
use think\worker\Http as WorkerHttp;
15+
use think\worker\response\File as FileResponse;
16+
use think\worker\response\Iterator as IteratorResponse;
1517
use Throwable;
1618
use Workerman\Connection\TcpConnection;
1719
use Workerman\Protocols\Http\Chunk;
@@ -105,6 +107,9 @@ public function onRequest(TcpConnection $connection, WorkerRequest $wkRequest)
105107

106108
$this->sendResponse($connection, $request, $response, $app->cookie);
107109

110+
//关闭连接
111+
$connection->close();
112+
108113
$http->end($response);
109114
});
110115
}
@@ -169,6 +174,9 @@ protected function prepareResponse(\think\Response $response)
169174
protected function sendResponse(TcpConnection $connection, \think\Request $request, \think\Response $response, Cookie $cookie)
170175
{
171176
switch (true) {
177+
case $response instanceof IteratorResponse:
178+
$this->sendIterator($connection, $response, $cookie);
179+
break;
172180
case $response instanceof FileResponse:
173181
$this->sendFile($connection, $request, $response, $cookie);
174182
break;
@@ -177,6 +185,26 @@ protected function sendResponse(TcpConnection $connection, \think\Request $reque
177185
}
178186
}
179187

188+
protected function sendIterator(TcpConnection $connection, IteratorResponse $response, Cookie $cookie)
189+
{
190+
$wkResponse = $this->createResponse($response, $cookie);
191+
$connection->send($wkResponse);
192+
193+
foreach ($response as $content) {
194+
$chunk = new class($content) implements Stringable {
195+
public function __construct(protected $content)
196+
{
197+
}
198+
199+
public function __toString(): string
200+
{
201+
return $this->content;
202+
}
203+
};
204+
$connection->send($chunk);
205+
}
206+
}
207+
180208
protected function sendFile(TcpConnection $connection, \think\Request $request, FileResponse $response, Cookie $cookie)
181209
{
182210
$ifNoneMatch = $request->header('If-None-Match');
@@ -263,12 +291,12 @@ protected function sendContent(TcpConnection $connection, \think\Response $respo
263291
$connection->send(new Chunk(''));
264292
}
265293

266-
protected function createResponse(\think\Response $response, Cookie $cookie)
294+
protected function createResponse(\think\Response $response, Cookie $cookie, $body = '')
267295
{
268296
$code = $response->getCode();
269297
$header = $response->getHeader();
270298

271-
$wkResponse = new Response($code, $header);
299+
$wkResponse = new Response($code, $header, $body);
272300

273301
foreach ($cookie->getCookie() as $name => $val) {
274302
[$value, $expire, $option] = $val;

src/response/Iterator.php

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace think\worker\response;
4+
5+
use IteratorAggregate;
6+
use think\Response;
7+
use Traversable;
8+
9+
class Iterator extends Response implements IteratorAggregate
10+
{
11+
protected $iterator;
12+
13+
public function __construct(Traversable $iterator)
14+
{
15+
$this->iterator = $iterator;
16+
}
17+
18+
public function getIterator(): Traversable
19+
{
20+
return $this->iterator;
21+
}
22+
}

tests/feature/HttpTest.php

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
while (!$process->getOutput()) {
1414
$wait++;
15-
if ($wait > 10) {
15+
if ($wait > 30) {
1616
throw new Exception('server start failed');
1717
}
1818
sleep(1);
@@ -95,6 +95,30 @@
9595
->toBe(file_get_contents(STUB_DIR . '/public/asset.txt'));
9696
});
9797

98+
it('sse', function () {
99+
$response = $this->httpClient->get('/sse', [
100+
'stream' => true,
101+
'timeout' => 3,
102+
]);
103+
104+
$body = $response->getBody();
105+
106+
$buffer = '';
107+
while (!$body->eof()) {
108+
$text = $body->read(1);
109+
if ($text == "\r") {
110+
continue;
111+
}
112+
$buffer .= $text;
113+
if ($text == "\n") {
114+
if ($buffer != "\n") {
115+
expect($buffer)->toStartWith('data: ');
116+
}
117+
$buffer = '';
118+
}
119+
}
120+
});
121+
98122
it('hot update', function () {
99123
$response = $this->httpClient->get('/hot');
100124

tests/stub/config/worker.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
'enable' => env('HTTP_ENABLE', true),
1515
'host' => '0.0.0.0',
1616
'port' => 8080,
17-
'worker_num' => 2,
17+
'worker_num' => 1,
1818
'options' => [],
1919
],
2020
//队列

tests/stub/route/app.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,24 @@
1414
return 'delete';
1515
});
1616

17+
Route::get('/sse', function () {
18+
19+
$generator = function () {
20+
foreach (range(0, 9) as $event) {
21+
yield 'data: ' . json_encode($event) . "\n\n";
22+
}
23+
24+
yield "data: [DONE]\n\n";
25+
};
26+
27+
$response = new \think\worker\response\Iterator($generator());
28+
29+
return $response->header([
30+
'Content-Type' => 'text/event-stream',
31+
'Cache-Control' => 'no-cache, must-revalidate',
32+
]);
33+
});
34+
1735
Route::get('test', 'index/test');
1836
Route::post('json', 'index/json');
1937

0 commit comments

Comments
 (0)