2
2
// SPDX-License-Identifier: Apache-2.0
3
3
4
4
import { Server , createServer as netCreateServer } from 'node:net' ;
5
+ import { setTimeout } from 'node:timers/promises' ;
6
+
7
+ import lodash from 'lodash' ;
5
8
6
9
import { EventPayload , EventSource , LinkedEvent , PromiseUtil } from '@this/async' ;
7
- import { FormatUtils } from '@this/loggy' ;
10
+ import { FormatUtils , IntfLogger } from '@this/loggy' ;
8
11
import { MustBe } from '@this/typey' ;
9
12
10
13
@@ -13,6 +16,9 @@ import { MustBe } from '@this/typey';
13
16
* in a way that is `async`-friendly.
14
17
*/
15
18
export class AsyncServer {
19
+ /** @type {?IntfLogger } Logger to use, or `null` to not do any logging. */
20
+ #logger;
21
+
16
22
/** @type {object } Parsed server socket `interface` specification. */
17
23
#interface;
18
24
@@ -39,11 +45,13 @@ export class AsyncServer {
39
45
*
40
46
* @param {object } iface Parsed server socket `interface` specification.
41
47
* @param {string } protocol The protocol name; just used for logging.
48
+ * @param {?IntfLogger } logger Logger to use, if any.
42
49
*/
43
- constructor ( iface , protocol ) {
50
+ constructor ( iface , protocol , logger ) {
44
51
// Note: `interface` is a reserved word.
45
52
this . #interface = MustBe . plainObject ( iface ) ;
46
53
this . #protocol = MustBe . string ( protocol ) ;
54
+ this . #logger = logger ;
47
55
}
48
56
49
57
/**
@@ -110,8 +118,33 @@ export class AsyncServer {
110
118
async start ( isReload ) {
111
119
MustBe . boolean ( isReload ) ;
112
120
113
- this . #serverSocket = netCreateServer (
114
- AsyncServer . #extractConstructorOptions( this . #interface) ) ;
121
+ // In case of a reload, look for a stashed instance which is already set up
122
+ // the same way.
123
+ const found = isReload
124
+ ? AsyncServer . #unstashInstance( this . #interface)
125
+ : null ;
126
+
127
+ if ( found ) {
128
+ // Inherit the "guts" of the now-unstashed instance.
129
+ this . #serverSocket = found . #serverSocket;
130
+ this . #serverSocket. removeAllListeners ( ) ;
131
+
132
+ // Transfer any unhandled events to the new instance.
133
+ found . #eventSource. emit ( new EventPayload ( 'done' ) ) ;
134
+ let eventHead = await found . #eventHead;
135
+ while ( eventHead ) {
136
+ if ( eventHead . type === 'done' ) {
137
+ break ;
138
+ }
139
+ this . emit ( eventHead . payload ) ;
140
+ eventHead = eventHead . nextNow ;
141
+ }
142
+ } else {
143
+ // Either this isn't a reload, or it's a reload with an endpoint that
144
+ // isn't configured the same way as one of the pre-reload ones.
145
+ this . #serverSocket = netCreateServer (
146
+ AsyncServer . #extractConstructorOptions( this . #interface) ) ;
147
+ }
115
148
116
149
this . #serverSocket. on ( 'connection' , ( ...args ) => {
117
150
this . #eventSource. emit ( new EventPayload ( 'connection' , ...args ) ) ;
@@ -133,7 +166,11 @@ export class AsyncServer {
133
166
async stop ( willReload ) {
134
167
MustBe . boolean ( willReload ) ;
135
168
136
- await this . #close( ) ;
169
+ if ( willReload ) {
170
+ AsyncServer . #stashInstance( this ) ;
171
+ } else {
172
+ await this . #close( ) ;
173
+ }
137
174
}
138
175
139
176
/**
@@ -177,15 +214,35 @@ export class AsyncServer {
177
214
serverSocket . on ( 'error' , handleError ) ;
178
215
} ) ;
179
216
}
217
+
218
+ // Close any sockets that happened to have been accepted in this class but
219
+ // which weren't then passed on to a client.
220
+ // Transfer any unhandled events to the new instance.
221
+ this . #eventSource. emit ( new EventPayload ( 'done' ) ) ;
222
+ let eventHead = await this . #eventHead;
223
+ while ( eventHead ) {
224
+ if ( eventHead . type === 'done' ) {
225
+ break ;
226
+ } else if ( eventHead . type === 'connection' ) {
227
+ const socket = eventHead . args [ 0 ] ;
228
+ socket . destroy ( ) ;
229
+ }
230
+ eventHead = eventHead . nextNow ;
231
+ }
232
+
180
233
}
181
234
182
235
/**
183
- * Performs a `listen()` on the underlying {@link Server}. This method
184
- * async-returns once the server is actually listening.
236
+ * Performs a `listen()` on the underlying {@link Server}, if not already
237
+ * done. This method async-returns once the server is actually listening.
185
238
*/
186
239
async #listen( ) {
187
240
const serverSocket = this . #serverSocket;
188
241
242
+ if ( serverSocket . listening ) {
243
+ return ;
244
+ }
245
+
189
246
// This `await new Promise` arrangement is done to get the `listen()` call
190
247
// to be a good async citizen. Notably, the optional callback passed to
191
248
// `listen()` is only ever sent a single `listening` event upon success and
@@ -217,7 +274,6 @@ export class AsyncServer {
217
274
} ) ;
218
275
}
219
276
220
-
221
277
//
222
278
// Static members
223
279
//
@@ -246,6 +302,18 @@ export class AsyncServer {
246
302
port : null
247
303
} ) ;
248
304
305
+ /**
306
+ * @type {number } How long in msec to allow a stashed instance to remain
307
+ * stashed.
308
+ */
309
+ static #STASH_TIMEOUT_MSEC = 5 * 1000 ;
310
+
311
+ /**
312
+ * @type {Set<AsyncServer> } Set of stashed instances, for use during a reload.
313
+ * Such instances were left open and listening during a previous `stop()`.
314
+ */
315
+ static #stashedInstances = new Set ( ) ;
316
+
249
317
/**
250
318
* Gets the options for a `Server` constructor(ish) call, given the full
251
319
* server socket `interface` options.
@@ -295,4 +363,50 @@ export class AsyncServer {
295
363
296
364
return result ;
297
365
}
366
+
367
+ /**
368
+ * Stashes an instance for possible reuse during a reload.
369
+ *
370
+ * @param {AsyncServer } instance The instance to stash.
371
+ */
372
+ static #stashInstance( instance ) {
373
+ // Remove any pre-existing matching instance. This shouldn't happen in the
374
+ // first place, but if it does this will minimize the downstream confusion.
375
+ this . #unstashInstance( instance . #interface) ;
376
+
377
+ this . #stashedInstances. add ( instance ) ;
378
+ instance . #logger?. stashed ( ) ;
379
+
380
+ ( async ( ) => {
381
+ await setTimeout ( this . #STASH_TIMEOUT_MSEC) ;
382
+ if ( this . #stashedInstances. delete ( instance ) ) {
383
+ instance . #logger?. stashTimeout ( ) ;
384
+ await instance . #close( ) ;
385
+ }
386
+ } ) ( ) ;
387
+ }
388
+
389
+ /**
390
+ * Finds a matching instance of this class, if any, which was stashed away
391
+ * during a reload. If found, it is removed from the stash.
392
+ *
393
+ * @param {object } iface The interface specification for the instance.
394
+ * @returns {?AsyncServer } The found instance, if any.
395
+ */
396
+ static #unstashInstance( iface ) {
397
+ let found = null ;
398
+ for ( const si of this . #stashedInstances) {
399
+ if ( lodash . isEqual ( iface , si . #interface) ) {
400
+ found = si ;
401
+ break ;
402
+ }
403
+ }
404
+
405
+ if ( found ) {
406
+ this . #stashedInstances. delete ( found ) ;
407
+ found . #logger?. unstashed ( ) ;
408
+ }
409
+
410
+ return found ;
411
+ }
298
412
}
0 commit comments