@@ -23,6 +23,7 @@ import { TypedEmitter, generateSTHKey, normalizeUrl } from "@scramjet/utility";
2323import { ObjLogger } from "@scramjet/obj-logger" ;
2424import { ReasonPhrases } from "http-status-codes" ;
2525import { DuplexStream } from "@scramjet/api-server" ;
26+ import { VerserClientConnection } from "@scramjet/verser/src/types" ;
2627
2728type STHInformation = {
2829 id ?: string ;
@@ -223,7 +224,7 @@ export class CPMConnector extends TypedEmitter<Events> {
223224 * @returns {string } Host id.
224225 */
225226 getId ( ) : string | undefined {
226- return this . info . id ;
227+ return this . config . id ;
227228 }
228229
229230 /**
@@ -249,6 +250,8 @@ export class CPMConnector extends TypedEmitter<Events> {
249250 } ;
250251 }
251252
253+ await this . setLoadCheckMessageSender ( ) ;
254+
252255 StringStream . from ( duplex . input as Readable )
253256 . JSONParse ( )
254257 . map ( async ( message : EncodedControlMessage ) => {
@@ -292,9 +295,10 @@ export class CPMConnector extends TypedEmitter<Events> {
292295 [ CPMMessageCode . NETWORK_INFO , await this . getNetworkInfo ( ) ]
293296 ) ;
294297
295- this . emit ( "connect" ) ;
296298
297- await this . setLoadCheckMessageSender ( ) ;
299+
300+
301+ this . emit ( "connect" ) ;
298302
299303 return new Promise ( ( resolve , reject ) => {
300304 duplex . on ( "end" , ( ) => {
@@ -332,11 +336,17 @@ export class CPMConnector extends TypedEmitter<Events> {
332336 this . verserClient . updateHeaders ( { "x-sth-id" : this . info . id } ) ;
333337 }
334338
335- let connection ;
339+ let connection : VerserClientConnection ;
336340
337341 try {
338342 this . logger . trace ( "Connecting to Manager" , this . cpmUrl , this . cpmId ) ;
339343 connection = await this . verserClient . connect ( ) ;
344+
345+ connection . socket
346+ . once ( "close" , async ( ) => {
347+ this . logger . warn ( "CLOSE STATUS" , connection . res . statusCode )
348+ await this . handleConnectionClose ( connection . res . statusCode || - 1 ) ;
349+ } ) ;
340350 } catch ( error : any ) {
341351 this . logger . error ( "Can not connect to Manager" , this . cpmUrl , this . cpmId , error . message ) ;
342352
@@ -345,12 +355,7 @@ export class CPMConnector extends TypedEmitter<Events> {
345355 return ;
346356 }
347357
348- this . logger . info ( "Connected to Manager" ) ;
349-
350- connection . socket
351- . once ( "close" , async ( ) => {
352- await this . handleConnectionClose ( ) ;
353- } ) ;
358+ this . logger . info ( "Connected..." ) ;
354359
355360 /**
356361 * @TODO : Distinguish existing `connect` request and started communication (Manager handled this host
@@ -361,7 +366,7 @@ export class CPMConnector extends TypedEmitter<Events> {
361366 this . connected = true ;
362367 this . connectionAttempts = 0 ;
363368
364- connection . req . once ( "error" , async ( error : any ) => {
369+ connection . res . once ( "error" , async ( error : any ) => {
365370 this . logger . error ( "Request error" , error ) ;
366371
367372 try {
@@ -386,7 +391,7 @@ export class CPMConnector extends TypedEmitter<Events> {
386391 * Handles connection close.
387392 * Tries to reconnect.
388393 */
389- async handleConnectionClose ( ) {
394+ async handleConnectionClose ( connectionStatusCode : number ) {
390395 this . handleCommunicationRequestEnd ( ) ;
391396
392397 this . connection ?. removeAllListeners ( ) ;
@@ -400,6 +405,10 @@ export class CPMConnector extends TypedEmitter<Events> {
400405 clearInterval ( this . loadInterval ) ;
401406 }
402407
408+ if ( connectionStatusCode === 403 ) {
409+ this . isAbandoned = true ;
410+ }
411+
403412 await this . reconnect ( ) ;
404413 }
405414
@@ -426,9 +435,9 @@ export class CPMConnector extends TypedEmitter<Events> {
426435 this . isReconnecting = true ;
427436
428437 await new Promise < void > ( ( resolve , reject ) => {
429- setTimeout ( async ( ) => {
430- this . logger . info ( "Connection lost, retrying" , this . connectionAttempts ) ;
438+ this . logger . info ( "Connection lost, retrying" , this . connectionAttempts ) ;
431439
440+ setTimeout ( async ( ) => {
432441 await this . connect ( ) . then ( resolve , reject ) ;
433442 } , this . config . reconnectionDelay ) ;
434443 } ) ;
0 commit comments