13
13
namespace Ripple ;
14
14
15
15
use Closure ;
16
- use Exception ;
17
16
use Revolt \EventLoop ;
18
17
use Ripple \Coroutine \Coroutine ;
19
18
use Ripple \Coroutine \Suspension ;
20
19
use Ripple \Stream \Exception \ConnectionCloseException ;
21
- use Ripple \Stream \Exception \ConnectionException ;
22
20
use Ripple \Stream \Exception \ConnectionTimeoutException ;
23
21
use Ripple \Stream \Stream as StreamBase ;
24
- use Ripple \Stream \Transaction ;
25
22
use Ripple \Utils \Format ;
26
23
use Ripple \Utils \Output ;
27
24
use Throwable ;
@@ -79,22 +76,12 @@ class Stream extends StreamBase
79
76
*/
80
77
protected int $ index = 0 ;
81
78
82
- /**
83
- * @var Transaction
84
- */
85
- protected Transaction $ transaction ;
86
-
87
79
/**
88
80
* @param mixed $resource
89
81
*/
90
82
public function __construct (mixed $ resource )
91
83
{
92
84
parent ::__construct ($ resource );
93
-
94
- $ this ->onClose (function () {
95
- $ this ->cancelReadable ();
96
- $ this ->cancelWriteable ();
97
- });
98
85
}
99
86
100
87
/**
@@ -140,57 +127,6 @@ public function setBlocking(bool $bool): bool
140
127
return stream_set_blocking ($ this ->stream , $ bool );
141
128
}
142
129
143
- /**
144
- * @param Closure $closure
145
- *
146
- * @return void
147
- * @throws Throwable
148
- */
149
- public function transaction (Closure $ closure ): void
150
- {
151
- if (isset ($ this ->transaction ) && $ this ->transaction ->getPromise ()->getStatus () === Promise::PENDING ) {
152
- throw new Exception ('Transaction has been completed ' );
153
- }
154
-
155
- $ this ->setTransaction (new Transaction ($ this ));
156
- call_user_func_array ($ closure , [$ this ->getTransaction ()]);
157
- }
158
-
159
- /**
160
- * @return Transaction|null
161
- */
162
- public function getTransaction (): Transaction |null
163
- {
164
- if (isset ($ this ->transaction )) {
165
- return $ this ->transaction ;
166
- }
167
- return null ;
168
- }
169
-
170
- /**
171
- * @param Transaction $transaction
172
- *
173
- * @return void
174
- */
175
- protected function setTransaction (Transaction $ transaction ): void
176
- {
177
- if (isset ($ this ->transaction )) {
178
- $ this ->completeTransaction ();
179
- unset($ this ->transaction );
180
- }
181
- $ this ->transaction = $ transaction ;
182
- }
183
-
184
- /**
185
- * @return void
186
- */
187
- public function completeTransaction (): void
188
- {
189
- if (isset ($ this ->transaction )) {
190
- $ this ->transaction ->complete ();
191
- }
192
- }
193
-
194
130
/**
195
131
* Wait for readable events. This method is only valid when there are no readable events to listen for.
196
132
* After enabling this method, it is forbidden to use the onReadable method elsewhere unless you know what you are doing.
@@ -208,7 +144,7 @@ public function waitForReadable(int $timeout = 0): bool
208
144
{
209
145
$ suspension = getSuspension ();
210
146
if (!isset ($ this ->onReadable )) {
211
- $ this ->onReadable (fn () => Coroutine::resume ($ suspension , true ));
147
+ $ this ->onReadable (static fn () => Coroutine::resume ($ suspension , true ));
212
148
$ suspension instanceof Suspension && $ suspension ->promise ->finally (fn () => $ this ->cancelReadable ());
213
149
}
214
150
@@ -268,15 +204,6 @@ public function close(): void
268
204
$ this ->cancelReadable ();
269
205
$ this ->cancelWriteable ();
270
206
271
- if (isset ($ this ->transaction )) {
272
- $ this ->failTransaction (new ConnectionException (
273
- 'Stream has been closed ' ,
274
- ConnectionException::CONNECTION_CLOSED ,
275
- null ,
276
- $ this
277
- ));
278
- }
279
-
280
207
foreach ($ this ->onCloseCallbacks as $ callback ) {
281
208
try {
282
209
call_user_func ($ callback );
@@ -294,18 +221,6 @@ public function isClosed(): bool
294
221
return !is_resource ($ this ->stream );
295
222
}
296
223
297
- /**
298
- * @param Throwable $exception
299
- *
300
- * @return void
301
- */
302
- public function failTransaction (Throwable $ exception ): void
303
- {
304
- if (isset ($ this ->transaction )) {
305
- $ this ->transaction ->fail ($ exception );
306
- }
307
- }
308
-
309
224
/**
310
225
* @param string $key
311
226
*
@@ -335,7 +250,7 @@ public function waitForWriteable(int $timeout = 0): bool
335
250
{
336
251
$ suspension = getSuspension ();
337
252
if (!isset ($ this ->onWriteable )) {
338
- $ this ->onWriteable (fn () => Coroutine::resume ($ suspension , true ));
253
+ $ this ->onWriteable (static fn () => Coroutine::resume ($ suspension , true ));
339
254
$ suspension instanceof Suspension && $ suspension ->promise ->finally (fn () => $ this ->cancelWriteable ());
340
255
}
341
256
0 commit comments