Skip to content

Commit

Permalink
feat: nodeConnectionManager.getClosestGlobalNodes can optionally sk…
Browse files Browse the repository at this point in the history
…ip recently offline nodes

This is done with an in-memory map of `nodeIdstring` to some data tracking the backoff period. it defaults to 5 min and doubles each failure.

#413
  • Loading branch information
tegefaulkes committed Sep 21, 2022
1 parent ca5e675 commit 206dceb
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 11 deletions.
61 changes: 53 additions & 8 deletions src/nodes/NodeConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ class NodeConnectionManager {
*/
protected connections: Map<NodeIdString, ConnectionAndTimer> = new Map();
protected connectionLocks: LockBox<RWLockWriter> = new LockBox();
// Tracks the backoff period for offline nodes
protected nodesBackoffMap: Map<
string,
{ lastAttempt: number; delay: number }
> = new Map();
protected backoffDefault: number = 300; // 5 min
protected backoffMultiplier: number = 2; // Doubles every failure

protected pingAndSetNodeHandlerId: TaskHandlerId =
'NodeConnectionManager.pingAndSetNodeHandler' as TaskHandlerId;
Expand Down Expand Up @@ -394,11 +401,13 @@ class NodeConnectionManager {
* Retrieves the node address. If an entry doesn't exist in the db, then
* proceeds to locate it using Kademlia.
* @param targetNodeId Id of the node we are tying to find
* @param ignoreRecentOffline skips nodes that are within their backoff period
* @param options
*/
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
public async findNode(
targetNodeId: NodeId,
ignoreRecentOffline: boolean = false,
options: { signal?: AbortSignal } = {},
): Promise<NodeAddress | undefined> {
const { signal } = { ...options };
Expand All @@ -407,9 +416,14 @@ class NodeConnectionManager {
// Otherwise, attempt to locate it by contacting network
address =
address ??
(await this.getClosestGlobalNodes(targetNodeId, undefined, {
signal,
}));
(await this.getClosestGlobalNodes(
targetNodeId,
ignoreRecentOffline,
undefined,
{
signal,
},
));
// TODO: This currently just does one iteration
return address;
}
Expand All @@ -426,13 +440,15 @@ class NodeConnectionManager {
* port).
* @param targetNodeId ID of the node attempting to be found (i.e. attempting
* to find its IP address and port)
* @param ignoreRecentOffline skips nodes that are within their backoff period
* @param timer Connection timeout timer
* @param options
* @returns whether the target node was located in the process
*/
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
public async getClosestGlobalNodes(
targetNodeId: NodeId,
ignoreRecentOffline: boolean = false,
timer?: Timer,
options: { signal?: AbortSignal } = {},
): Promise<NodeAddress | undefined> {
Expand All @@ -455,9 +471,9 @@ class NodeConnectionManager {
// Not sufficient to simply check if there's already a pre-existing connection
// in nodeConnections - what if there's been more than 1 invocation of
// getClosestGlobalNodes()?
const contacted: Record<string, boolean> = {};
const contacted: Set<string> = new Set();
// Iterate until we've found and contacted k nodes
while (Object.keys(contacted).length <= this.nodeGraph.nodeBucketLimit) {
while (contacted.size <= this.nodeGraph.nodeBucketLimit) {
if (signal?.aborted) throw signal.reason;
// Remove the node from the front of the array
const nextNode = shortlist.shift();
Expand All @@ -467,9 +483,8 @@ class NodeConnectionManager {
}
const [nextNodeId, nextNodeAddress] = nextNode;
// Skip if the node has already been contacted
if (contacted[nextNodeId]) {
continue;
}
if (contacted.has(nextNodeId.toString())) continue;
if (ignoreRecentOffline && this.hasBackoff(nextNodeId)) continue;
// Connect to the node (check if pre-existing connection exists, otherwise
// create a new one)
if (
Expand All @@ -482,7 +497,9 @@ class NodeConnectionManager {
)
) {
await this.nodeManager!.setNode(nextNodeId, nextNodeAddress.address);
this.removeBackoff(nextNodeId);
} else {
this.increaseBackoff(nextNodeId);
continue;
}
contacted[nextNodeId] = true;
Expand Down Expand Up @@ -828,6 +845,34 @@ class NodeConnectionManager {
}
return true;
}

protected hasBackoff(nodeId: NodeId): boolean {
const backoff = this.nodesBackoffMap.get(nodeId.toString());
if (backoff == null) return false;
const currentTime = performance.now() + performance.timeOrigin;
const backOffDeadline = backoff.lastAttempt + backoff.delay;
return currentTime < backOffDeadline;
}

protected increaseBackoff(nodeId: NodeId): void {
const backoff = this.nodesBackoffMap.get(nodeId.toString());
const currentTime = performance.now() + performance.timeOrigin;
if (backoff == null) {
this.nodesBackoffMap.set(nodeId.toString(), {
lastAttempt: currentTime,
delay: this.backoffDefault,
});
} else {
this.nodesBackoffMap.set(nodeId.toString(), {
lastAttempt: currentTime,
delay: backoff.delay * this.backoffMultiplier,
});
}
}

protected removeBackoff(nodeId: NodeId): void {
this.nodesBackoffMap.delete(nodeId.toString());
}
}

export default NodeConnectionManager;
7 changes: 5 additions & 2 deletions src/nodes/NodeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ class NodeManager {
// We need to attempt a connection using the proxies
// For now we will just do a forward connect + relay message
const targetAddress =
address ?? (await this.nodeConnectionManager.findNode(nodeId, options));
address ??
(await this.nodeConnectionManager.findNode(nodeId, false, options));
if (targetAddress == null) {
throw new nodesErrors.ErrorNodeGraphNodeIdNotFound();
}
Expand Down Expand Up @@ -640,7 +641,9 @@ class NodeManager {
bucketIndex,
);
// We then need to start a findNode procedure
await this.nodeConnectionManager.findNode(bucketRandomNodeId, { signal });
await this.nodeConnectionManager.findNode(bucketRandomNodeId, true, {
signal,
});
}

private async setupRefreshBucketTasks(tran?: DBTransaction) {
Expand Down
60 changes: 59 additions & 1 deletion tests/nodes/NodeConnectionManager.general.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ describe(`${NodeConnectionManager.name} general test`, () => {
return IdInternal.create<NodeId>(idArray);
};

const dummyNodeManager = { setNode: jest.fn() } as unknown as NodeManager;
const dummyNodeManager = {
setNode: jest.fn(),
updateRefreshBucketDelay: jest.fn(),
} as unknown as NodeManager;
const dummyTaskManager: TaskManager = {
registerHandler: jest.fn(),
deregisterHandler: jest.fn(),
Expand Down Expand Up @@ -520,4 +523,59 @@ describe(`${NodeConnectionManager.name} general test`, () => {
await nodeConnectionManager?.stop();
}
});
test('getClosestGlobalNodes should skip recent offline nodes', async () => {
let nodeConnectionManager: NodeConnectionManager | undefined;
const mockedPingNode = jest.spyOn(
NodeConnectionManager.prototype,
'pingNode',
);
try {
nodeConnectionManager = new NodeConnectionManager({
keyManager,
nodeGraph,
proxy,
taskManager: dummyTaskManager,
logger: nodeConnectionManagerLogger,
});
await nodeConnectionManager.start({ nodeManager: dummyNodeManager });
// Check two things,
// 1. existence of a node in the backoff map
// 2. getClosestGlobalNodes doesn't try to connect to offline node

// Add fake data to `NodeGraph`
await nodeGraph.setNode(nodeId1, {
host: serverHost,
port: serverPort,
});
await nodeGraph.setNode(nodeId2, {
host: serverHost,
port: serverPort,
});

// Making pings fail
mockedPingNode.mockImplementation(async () => false);
await nodeConnectionManager.getClosestGlobalNodes(nodeId3, false);
expect(mockedPingNode).toHaveBeenCalled();

// Nodes 1 and 2 should exist in backoff map
// @ts-ignore: kidnap protected property
const backoffMap = nodeConnectionManager.nodesBackoffMap;
expect(backoffMap.has(nodeId1.toString())).toBeTrue();
expect(backoffMap.has(nodeId2.toString())).toBeTrue();
expect(backoffMap.has(nodeId3.toString())).toBeFalse();

// Next find node should skip offline nodes
mockedPingNode.mockClear();
await nodeConnectionManager.getClosestGlobalNodes(nodeId3, true);
expect(mockedPingNode).not.toHaveBeenCalled();

// We can try connecting anyway
mockedPingNode.mockClear();
await nodeConnectionManager.getClosestGlobalNodes(nodeId3, false);
expect(mockedPingNode).toHaveBeenCalled();
} finally {
mockedPingNode.mockRestore();
await nodeConnectionManager?.stop();
}
});
});

0 comments on commit 206dceb

Please sign in to comment.