34
34
35
35
namespace Ripple ;
36
36
37
- use Co \IO ;
38
37
use Exception ;
39
38
use Ripple \Channel \Exception \ChannelException ;
40
39
use Ripple \File \Lock \Lock ;
41
40
use Ripple \Utils \Serialization \Zx7e ;
41
+ use Ripple \Utils \Utils ;
42
+ use Throwable ;
42
43
43
44
use function chr ;
44
45
use function Co \cancelForked ;
45
46
use function Co \forked ;
46
47
use function file_exists ;
47
48
use function fopen ;
48
- use function md5 ;
49
49
use function posix_mkfifo ;
50
50
use function serialize ;
51
- use function sys_get_temp_dir ;
52
51
use function touch ;
53
52
use function unlink ;
54
53
use function unpack ;
60
59
*/
61
60
class Channel
62
61
{
63
- private const FRAME_HEADER = 0x7E ;
62
+ protected const FRAME_HEADER = 0x7E ;
64
63
65
- private const FRAME_FOOTER = 0x7E ;
64
+ protected const FRAME_FOOTER = 0x7E ;
66
65
67
66
/*** @var Zx7e */
68
- private Zx7e $ zx7e ;
69
-
70
- /*** @var bool */
71
- private bool $ blocking = true ;
67
+ protected Zx7e $ zx7e ;
72
68
73
69
/*** @var string */
74
- private string $ forkHandlerID ;
70
+ protected string $ forkHandlerID ;
75
71
76
72
/*** @var Stream */
77
- private Stream $ stream ;
73
+ protected Stream $ stream ;
78
74
79
75
/*** @var Lock */
80
- private Lock $ readLock ;
76
+ protected Lock $ readLock ;
81
77
82
78
/*** @var Lock */
83
- private Lock $ writeLock ;
79
+ protected Lock $ writeLock ;
84
80
85
81
/*** @var string */
86
- private string $ path ;
82
+ protected string $ path ;
87
83
88
84
/*** @var bool */
89
- private bool $ closed = false ;
85
+ protected bool $ closed = false ;
90
86
91
87
/**
92
88
* @param string $name
93
89
* @param bool $owner
94
- *
95
- * @throws ChannelException
96
90
*/
97
- public function __construct (private readonly string $ name , private bool $ owner = false )
98
- {
99
- $ this ->path = Channel::generateFilePathByChannelName ($ name );
100
- $ this ->readLock = IO ::Lock ()->access ("{$ this ->name }.read " );
101
- $ this ->writeLock = IO ::Lock ()->access ("{$ this ->name }.write " );
91
+ public function __construct (
92
+ protected readonly string $ name ,
93
+ protected bool $ owner = false
94
+ ) {
95
+ $ this ->path = Utils::tempPath ($ this ->name , 'channel ' );
96
+ $ this ->readLock = \Co \lock ("{$ this ->name }.read " );
97
+ $ this ->writeLock = \Co \lock ("{$ this ->name }.write " );
102
98
103
99
if (!file_exists ($ this ->path )) {
104
100
if (!$ this ->owner ) {
@@ -113,83 +109,31 @@ public function __construct(private readonly string $name, private bool $owner =
113
109
}
114
110
}
115
111
116
- $ this ->stream = new Stream (fopen ($ this ->path , 'r+ ' ));
117
- $ this ->zx7e = new Zx7e ();
112
+ $ this ->openStream ();
118
113
119
114
// Re-open the stream resource after registering the process fork
120
115
$ this ->forkHandlerID = forked (function () {
121
116
$ this ->owner = false ;
122
117
$ this ->stream ->close ();
123
118
124
- $ this ->stream = new Stream (fopen ($ this ->path , 'r+ ' ));
125
- $ this ->zx7e = new Zx7e ();
119
+ $ this ->openStream ();
126
120
});
127
121
}
128
122
129
- /**
130
- * @param string $name
131
- *
132
- * @return string
133
- */
134
- private static function generateFilePathByChannelName (string $ name ): string
135
- {
136
- $ name = md5 ($ name );
137
- return sys_get_temp_dir () . '/ ' . $ name . '.channel ' ;
138
- }
139
-
140
- /*** @return void */
141
- public function close (): void
142
- {
143
- if ($ this ->closed ) {
144
- return ;
145
- }
146
-
147
- $ this ->stream ->close ();
148
- $ this ->readLock ->close ();
149
- $ this ->writeLock ->close ();
150
-
151
- if ($ this ->owner ) {
152
- file_exists ($ this ->path ) && unlink ($ this ->path );
153
- }
154
-
155
- $ this ->closed = true ;
156
-
157
- cancelForked ($ this ->forkHandlerID );
158
- }
159
-
160
123
/**
161
124
* @param string $name
162
125
*
163
126
* @return Channel
164
- * @throws ChannelException
165
127
*/
166
128
public static function make (string $ name ): Channel
167
129
{
168
- return new self ($ name , true );
169
- }
170
-
171
- /**
172
- * @param string $name
173
- *
174
- * @return Channel
175
- * @throws ChannelException
176
- */
177
- public static function open (string $ name ): Channel
178
- {
179
- $ path = Channel::generateFilePathByChannelName ($ name );
180
-
181
- if (!file_exists ($ path )) {
182
- throw new ChannelException ('Channel does not exist. ' );
183
- }
184
-
185
- return new self ($ name );
130
+ return new Channel ($ name , true );
186
131
}
187
132
188
133
/**
189
134
* @param mixed $data
190
135
*
191
136
* @return bool
192
- * @throws ChannelException
193
137
*/
194
138
public function send (mixed $ data ): bool
195
139
{
@@ -198,7 +142,6 @@ public function send(mixed $data): bool
198
142
}
199
143
200
144
$ this ->writeLock ->lock ();
201
- $ this ->stream ->setBlocking (true );
202
145
203
146
try {
204
147
$ this ->stream ->write ($ this ->zx7e ->encodeFrame (serialize ($ data )));
@@ -211,61 +154,60 @@ public function send(mixed $data): bool
211
154
}
212
155
213
156
/**
214
- * @param bool $blocking
215
- *
216
157
* @return void
217
158
*/
218
- public function setBlocking ( bool $ blocking ): void
159
+ protected function openStream ( ): void
219
160
{
220
- $ this ->blocking = $ blocking ;
161
+ $ this ->stream = new Stream (fopen ($ this ->path , 'r+ ' ));
162
+ $ this ->stream ->setBlocking (false );
163
+ $ this ->zx7e = new Zx7e ();
221
164
}
222
165
223
166
/**
167
+ * @param bool $blocking
168
+ *
224
169
* @return mixed
225
- * @throws ChannelException
226
170
*/
227
- public function receive (): mixed
171
+ public function receive (bool $ blocking = true ): mixed
228
172
{
229
173
if (!file_exists ($ this ->path )) {
230
174
throw new ChannelException ('Unable to receive data from a closed channel ' );
231
175
}
232
176
233
- try {
234
- if ($ this ->blocking ) {
235
- $ this ->readLock ->lock ();
236
- $ this ->stream ->setBlocking (true );
237
- } else {
238
- if (!$ this ->readLock ->lock (false )) {
239
- throw new Exception ('Failed to acquire lock. ' );
240
- }
241
- $ this ->stream ->setBlocking (false );
177
+ while (1 ) {
178
+ try {
179
+ $ blocking && $ this ->stream ->waitForReadable ();
180
+ } catch (Throwable $ e ) {
181
+ throw new ChannelException ($ e ->getMessage ());
242
182
}
243
183
244
- $ header = $ this ->stream ->read (1 );
245
-
246
- if ($ header !== chr (Channel::FRAME_HEADER )) {
247
- $ this ->readLock ->unlock ();
248
- return null ;
184
+ if ($ this ->readLock ->lock (blocking: false )) {
185
+ try {
186
+ if ($ this ->stream ->read (1 ) === chr (Channel::FRAME_HEADER )) {
187
+ break ;
188
+ } else {
189
+ throw new Stream \Exception \ConnectionException ('Failed to read frame header. ' );
190
+ }
191
+ } catch (Stream \Exception \ConnectionException ) {
192
+ $ this ->readLock ->unlock ();
193
+ }
249
194
}
195
+ }
250
196
251
- $ this ->stream ->setBlocking (true );
252
-
197
+ try {
253
198
$ length = $ this ->stream ->read (2 );
254
199
$ data = $ this ->stream ->read (unpack ('n ' , $ length )[1 ]);
255
200
$ checksum = $ this ->stream ->read (1 );
256
201
$ footer = $ this ->stream ->read (1 );
257
202
258
203
if ($ footer !== chr (Channel::FRAME_FOOTER )) {
259
- $ this ->readLock ->unlock ();
260
204
throw new Exception ('Failed to read frame footer. ' );
261
205
}
262
206
263
207
if ($ checksum !== chr ($ this ->zx7e ->calculateChecksum ($ data ))) {
264
- $ this ->readLock ->unlock ();
265
208
throw new Exception ('Failed to verify checksum. ' );
266
209
}
267
210
268
- $ this ->readLock ->unlock ();
269
211
return unserialize ($ data );
270
212
} catch (Exception $ e ) {
271
213
throw new ChannelException ($ e ->getMessage ());
@@ -286,6 +228,25 @@ public function getPath(): string
286
228
return $ this ->path ;
287
229
}
288
230
231
+ /*** @return void */
232
+ public function close (): void
233
+ {
234
+ if ($ this ->closed ) {
235
+ return ;
236
+ }
237
+
238
+ $ this ->stream ->close ();
239
+ $ this ->readLock ->close ();
240
+ $ this ->writeLock ->close ();
241
+
242
+ if ($ this ->owner ) {
243
+ file_exists ($ this ->path ) && unlink ($ this ->path );
244
+ }
245
+
246
+ $ this ->closed = true ;
247
+ cancelForked ($ this ->forkHandlerID );
248
+ }
249
+
289
250
public function __destruct ()
290
251
{
291
252
$ this ->close ();
0 commit comments