Skip to content

Support multiple wssER's in same network #125

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions src/channel/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -221,17 +221,15 @@ class ZitiChannel {
*/
async hello() {

this._zitiContext.logger.trace('ZitiChannel.hello() ch[%d] entered', this._id);
this._zitiContext.logger.trace('ZitiChannel.hello() ch[%d] wssER[%s] entered', this._id, this._edgeRouterHost);

await this._zws.open();

this._zitiContext.logger.trace('ZitiChannel.hello() ch[%d] _zws.open completed', this._id);
this._zitiContext.logger.trace('ZitiChannel.hello() ch[%d] wssER[%s] _zws.open completed', this._id, this._edgeRouterHost);

if (this.isHelloCompleted) {
this._zitiContext.logger.trace('ZitiChannel.hello() ch[%d] Hello handshake was previously completed', this._id);
return new Promise( async (resolve) => {
resolve( {channel: this, data: null});
});
this._zitiContext.logger.trace('ZitiChannel.hello() ch[%d] wssER[%s] Hello handshake was previously completed', this._id, this._edgeRouterHost);
return( {channel: this, data: null, helloCompletedDuration: this._helloCompletedDuration, edgeRouterHost: this._edgeRouterHost} );
}

if (isEqual(this._callerId, "ws:")) {
Expand All @@ -248,17 +246,17 @@ class ZitiChannel {

await this._tlsConn.create();

this._zitiContext.logger.debug('ZitiChannel.hello() ch[%d] initiating TLS handshake', this._id);
this._zitiContext.logger.debug('ZitiChannel.hello() ch[%d] wssER[%s] initiating TLS handshake', this._id, this._edgeRouterHost);

await this._tlsConn.handshake();

await this.awaitTLSHandshakeComplete();

this._zitiContext.logger.debug('ZitiChannel.hello() ch[%d] TLS handshake complete', this._id);
this._zitiContext.logger.debug('ZitiChannel.hello() ch[%d] wssER[%s] TLS handshake complete', this._id, this._edgeRouterHost);

}

this._zitiContext.logger.debug('ZitiChannel.hello() ch[%d] initiating message: ZitiEdgeProtocol.content_type.HelloType: ', this._id, ZitiEdgeProtocol.header_type.StringType);
this._zitiContext.logger.debug('ZitiChannel.hello() ch[%d] wssER[%s] initiating message: ZitiEdgeProtocol.content_type.HelloType: ', this._id, this._edgeRouterHost, ZitiEdgeProtocol.header_type.StringType);
let uuid = uuidv4();

let headers = [
Expand All @@ -278,18 +276,19 @@ class ZitiChannel {

let sequence = this.getAndIncrementSequence();

this._helloStartedTimestamp = Date.now();

let msg = await this.sendMessage( ZitiEdgeProtocol.content_type.HelloType, headers, null, {
sequence: sequence,
});

this._helloCompletedTimestamp = Date.now();
this._helloCompletedDuration = this._helloCompletedTimestamp - this._helloStartedTimestamp; //in ms
this._helloCompleted = true;
this.state = (ZitiEdgeProtocol.conn_state.Connected);
this._zitiContext.logger.debug('ZitiChannel.hello() ch[%d] Hello handshake to Edge Router [%s] completed at timestamp[%o]', this._id, this._edgeRouterHost, this._helloCompletedTimestamp);
this._zitiContext.logger.debug('ZitiChannel.hello() ch[%d] wssER[%s] Hello handshake completed at timestamp[%o]', this._id, this._edgeRouterHost, this._helloCompletedTimestamp);

return new Promise( async (resolve) => {
resolve( {channel: this, data: null});
});
return( {channel: this, data: null, helloCompletedDuration: this._helloCompletedDuration, edgeRouterHost: this._edgeRouterHost} );

}

Expand All @@ -301,7 +300,7 @@ class ZitiChannel {

const self = this;

self._zitiContext.logger.debug('initiating Connect to Edge Router [%s] for conn[%d]', this._edgeRouterHost, conn.id);
self._zitiContext.logger.debug('initiating Connect to wssER[%s] for conn[%d]', this._edgeRouterHost, conn.id);

await sodium.ready;

Expand Down Expand Up @@ -357,7 +356,7 @@ class ZitiChannel {

conn.state = (ZitiEdgeProtocol.conn_state.Connecting);

self._zitiContext.logger.debug('about to send Connect to Edge Router [%s] for conn[%d]', conn.channel.edgeRouterHost, conn.id);
self._zitiContext.logger.debug('about to send Connect to wssER[%s] for conn[%d]', conn.channel.edgeRouterHost, conn.id);

let msg = await self.sendMessage( ZitiEdgeProtocol.content_type.Connect, headers, self._network_session_token, {
conn: conn,
Expand All @@ -380,7 +379,7 @@ class ZitiChannel {
const self = this;
return new Promise( async (resolve, reject) => {

self._zitiContext.logger.debug('initiating Close to Edge Router [%s] for conn[%d]', this._edgeRouterHost, conn.id);
self._zitiContext.logger.debug('initiating Close to wssER[%s] for conn[%d]', this._edgeRouterHost, conn.id);

let sequence = conn.getAndIncrementSequence();
let uuid = uuidv4();
Expand All @@ -404,7 +403,7 @@ class ZitiChannel {

];

self._zitiContext.logger.debug('about to send Close to Edge Router [%s] for conn[%d]', conn.channel.edgeRouterHost, conn.id);
self._zitiContext.logger.debug('about to send Close to wssER[%s] for conn[%d]', conn.channel.edgeRouterHost, conn.id);

self.sendMessageNoWait( ZitiEdgeProtocol.content_type.StateClosed, headers, self._network_session_token, {
conn: conn,
Expand Down Expand Up @@ -960,6 +959,10 @@ class ZitiChannel {
*/
async _recvFromWireAfterDecrypt(ch, data) {

if (isEqual(ch._state, ZitiEdgeProtocol.conn_state.Closed)) {
return;
}

let buffer = data;

if (!isUndefined(ch._partialMessage)) { // if we are awaiting rest of a partial msg to arrive, append this chunk onto the end, then proceed
Expand Down
93 changes: 64 additions & 29 deletions src/context/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ class ZitiContext extends EventEmitter {

this._timeout = ZITI_CONSTANTS.ZITI_DEFAULT_TIMEOUT;

this._didInitialGetPendingChannelConnects = false;

}

get libCrypto () {
Expand Down Expand Up @@ -1404,31 +1406,28 @@ class ZitiContext extends EventEmitter {
*/
async _getPendingChannelConnects(conn, edgeRouters) {

return new Promise( async (resolve) => {
this.logger.trace('_getPendingChannelConnects entered');

this.logger.trace('_getPendingChannelConnects entered for edgeRouters [%o]', edgeRouters);
let pendingChannelConnects = new Array();

let pendingChannelConnects = new Array();
let self = this;

// Get a channel connection to each of the Edge Routers that have a WSS binding, initiating a connection if channel is not yet connected
for (var i = 0; i < edgeRouters.length; i++) {

self.logger.trace('calling getChannelByEdgeRouter for wssER [%s]', edgeRouters[i].hostname);
let ch = await self.getChannelByEdgeRouter(conn, edgeRouters[i]).catch((err) => {
self.logger.error( err );
throw new Error( err );
});
self.logger.debug('initiating Hello to [%s] for session[%s]', self.getEdgeRouterURL(edgeRouters[i]), conn.networkSessionToken);
pendingChannelConnects.push(
ch.hello()
);

let self = this;

// Get a channel connection to each of the Edge Routers that have a WS binding, initiating a connection if channel is not yet connected
edgeRouters.forEach(async function(edgeRouter, idx, array) {
self.logger.trace('calling getChannelByEdgeRouter for ER [%o]', edgeRouter);
let ch = await self.getChannelByEdgeRouter(conn, edgeRouter).catch((err) => {
self.logger.error( err );
throw new Error( err );
});
self.logger.debug('initiating Hello to [%s] for session[%s]', self.getEdgeRouterURL(edgeRouter), conn.networkSessionToken);
pendingChannelConnects.push(
ch.hello()
);
};

if (idx === array.length - 1) {
resolve(pendingChannelConnects); // Return to caller only after we have processed all edge routers
}
});
});
return pendingChannelConnects;
}


Expand Down Expand Up @@ -1549,7 +1548,7 @@ class ZitiContext extends EventEmitter {
let result = {};

find(Array.from(this._channels), function(obj) {
if (isEqual( obj[1]._edgeRouterHost, edgeRouter )) {
if (isEqual( obj[1][0]._edgeRouterHost, edgeRouter )) {
result.key = obj[0];
result.ch = obj[1];
return true;
Expand All @@ -1571,6 +1570,8 @@ class ZitiContext extends EventEmitter {
* @param {*} networkSession
*/
async connect(conn, networkSession) {

let self = this;

this.logger.debug('connect() entered for conn[%o] networkSession[%o]', conn.id, networkSession);

Expand All @@ -1597,11 +1598,44 @@ class ZitiContext extends EventEmitter {
this.logger.debug('now own _connectMutex for conn[%o]', conn.id);

let pendingChannelConnects = await this._getPendingChannelConnects(conn, edgeRouters);
this.logger.trace('pendingChannelConnects [%o]', pendingChannelConnects);

let channelWithNearestEdgeRouter = await Promise.race( pendingChannelConnects );
channelWithNearestEdgeRouter = channelWithNearestEdgeRouter.channel;
this.logger.debug('Channel [%d] has nearest Edge Router for conn[%o]', channelWithNearestEdgeRouter.id, conn.id);

let nearestEdgeRouter;

if (!this._didInitialGetPendingChannelConnects) {

// The first time through, we will only wait for one wssER connect to complete, and
// will select it as the "nearest". Other, slower, wssER connects will continue to
// run, and eventually complete in the background, but we will not wait for them here
// since that would impede performance.
nearestEdgeRouter = await Promise.race( pendingChannelConnects );

this.logger.trace(`Promise.race helloCompletedDuration time for wssER[${nearestEdgeRouter.edgeRouterHost}] was [${nearestEdgeRouter.helloCompletedDuration}]`);

this._didInitialGetPendingChannelConnects = true;

} else {

// Subsequently, we will wait for all wssER connects to complete, since they
// will most likely have done so before we get back here. We will then examine
// the helloCompletedDuration values across all wssERs and chose the one with
// the lowest value.

let edgeRouterConnects = await Promise.all( pendingChannelConnects );

let helloCompletedDuration = 999999999999;

find(edgeRouterConnects, function(edgeRouterConnect) {
self.logger.trace(`Promise.all helloCompletedDuration time for wssER[${edgeRouterConnect.edgeRouterHost}] was [${edgeRouterConnect.helloCompletedDuration}]`);
if (edgeRouterConnect.helloCompletedDuration < helloCompletedDuration) {
nearestEdgeRouter = edgeRouterConnect;
helloCompletedDuration = edgeRouterConnect.helloCompletedDuration;
}
});

}

let channelWithNearestEdgeRouter = nearestEdgeRouter.channel;
this.logger.debug('ch[%d] has nearest wssER[%s] for conn[%o]', channelWithNearestEdgeRouter.id, nearestEdgeRouter.edgeRouterHost, conn.id);
channelWithNearestEdgeRouter._connections._saveConnection(conn);
conn.channel = channelWithNearestEdgeRouter;

Expand Down Expand Up @@ -1956,9 +1990,10 @@ class ZitiContext extends EventEmitter {
closeChannelByEdgeRouter( edgeRouter ) {
let result = this.findChannelByEdgeRouter(edgeRouter);
if (result.key && result.ch) {
result.ch[0]._state = ZitiEdgeProtocol.conn_state.Closed;
this._channels.delete( result.key );
this._channelsById.delete( result.ch.id );
this.logger.warn('channel [%s] id[%d] deleted', result.key, result.ch.id);
this._channelsById.delete( result.ch[0].id );
this.logger.warn('channel [%s] id[%d] deleted', result.key, result.ch[0].id);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/websocket/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ class ZitiWebSocket {
self._zitiContext.logger.debug("zws: waitForWSConnection: connection is now open");
callback();
} else {
self._zitiContext.logger.debug("zws: waitForWSConnection: wait...for %o", self);
self._zitiContext.logger.debug("zws: waitForWSConnection: wait...for %o", self.url);
self.waitForWSConnection(callback);
}
},
Expand Down