Skip to content

Commit

Permalink
test: added test to check if nodes are properly added to the seed nod…
Browse files Browse the repository at this point in the history
…es when entering the network

This tests for if the Seed node contains the new nodes when they are created. It also checks if the new nodes discover each other after being created.

Includes a change to `findNode`. It will no longer throw an error when failing to find the node. This will have to be thrown by the caller now. This was required by `refreshBucket` since it's very likely that we can't find the random node it is looking for.
  • Loading branch information
tegefaulkes committed Apr 29, 2022
1 parent 08beafc commit 6b75ad0
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 20 deletions.
2 changes: 2 additions & 0 deletions src/client/service/nodesFind.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { utils as grpcUtils } from '../../grpc';
import { validateSync, utils as validationUtils } from '../../validation';
import { matchSync } from '../../utils';
import * as nodesPB from '../../proto/js/polykey/v1/nodes/nodes_pb';
import * as nodesErrors from '../../nodes/errors';

/**
* Attempts to get the node address of a provided node ID (by contacting
Expand Down Expand Up @@ -44,6 +45,7 @@ function nodesFind({
},
);
const address = await nodeConnectionManager.findNode(nodeId);
if (address == null) throw new nodesErrors.ErrorNodeGraphNodeIdNotFound();
response
.setNodeId(nodesUtils.encodeNodeId(nodeId))
.setAddress(
Expand Down
23 changes: 11 additions & 12 deletions src/nodes/NodeConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,9 @@ class NodeConnectionManager {
timer?: Timer,
): Promise<ConnectionAndLock> {
const targetAddress = await this.findNode(targetNodeId);
if (targetAddress == null) {
throw new nodesErrors.ErrorNodeGraphNodeIdNotFound();
}
// If the stored host is not a valid host (IP address), then we assume it to
// be a hostname
const targetHostname = !networkUtils.isHost(targetAddress.host)
Expand Down Expand Up @@ -447,23 +450,17 @@ class NodeConnectionManager {
public async findNode(
targetNodeId: NodeId,
options: { signal?: AbortSignal } = {},
): Promise<NodeAddress> {
): Promise<NodeAddress | undefined> {
const { signal } = { ...options };
// First check if we already have an existing ID -> address record
let address = (await this.nodeGraph.getNode(targetNodeId))?.address;
// Otherwise, attempt to locate it by contacting network
if (address == null) {
address = await this.getClosestGlobalNodes(targetNodeId, undefined, {
address =
address ??
(await this.getClosestGlobalNodes(targetNodeId, undefined, {
signal,
});
// TODO: This currently just does one iteration
// If not found in this single iteration, we throw an exception
if (address == null) {
throw new nodesErrors.ErrorNodeGraphNodeIdNotFound();
}
}
// We ensure that we always return a NodeAddress (either by lookup, or
// network search) - if we can't locate it from either, we throw an exception
}));
// TODO: This currently just does one iteration
return address;
}

Expand Down Expand Up @@ -633,6 +630,7 @@ class NodeConnectionManager {
*/
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
public async syncNodeGraph(block: boolean = true, timer?: Timer) {
this.logger.info('Syncing nodeGraph');
for (const seedNodeId of this.getSeedNodes()) {
// Check if the connection is viable
try {
Expand All @@ -646,6 +644,7 @@ class NodeConnectionManager {
this.keyManager.getNodeId(),
timer,
);
// FIXME: we need to ping a node before setting it
for (const [nodeId, nodeData] of nodes) {
if (!block) {
this.queue.push(() =>
Expand Down
9 changes: 6 additions & 3 deletions src/nodes/NodeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ class NodeManager {
protected refreshBucketQueue: Set<NodeBucketIndex> = new Set();
protected refreshBucketQueueRunning: boolean = false;
protected refreshBucketQueueRunner: Promise<void>;
protected refreshBucketQueuePlug_: PromiseType<void>;
protected refreshBucketQueueDrained_: PromiseType<void>;
protected refreshBucketQueuePlug_: PromiseType<void> = promise();
protected refreshBucketQueueDrained_: PromiseType<void> = promise();
protected refreshBucketQueueAbortController: AbortController;

constructor({
Expand Down Expand Up @@ -109,7 +109,10 @@ class NodeManager {
// We need to attempt a connection using the proxies
// For now we will just do a forward connect + relay message
const targetAddress =
address ?? (await this.nodeConnectionManager.findNode(nodeId))!;
address ?? (await this.nodeConnectionManager.findNode(nodeId));
if (targetAddress == null) {
throw new nodesErrors.ErrorNodeGraphNodeIdNotFound();
}
const targetHost = await networkUtils.resolveHost(targetAddress.host);
return await this.nodeConnectionManager.pingNode(
nodeId,
Expand Down
7 changes: 3 additions & 4 deletions tests/nodes/NodeConnectionManager.general.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import Proxy from '@/network/Proxy';

import GRPCClientAgent from '@/agent/GRPCClientAgent';
import * as nodesUtils from '@/nodes/utils';
import * as nodesErrors from '@/nodes/errors';
import * as keysUtils from '@/keys/utils';
import * as grpcUtils from '@/grpc/utils';
import * as nodesPB from '@/proto/js/polykey/v1/nodes/nodes_pb';
Expand Down Expand Up @@ -336,9 +335,9 @@ describe(`${NodeConnectionManager.name} general test`, () => {
port: 22222 as Port,
} as NodeAddress);
// Un-findable Node cannot be found
await expect(() =>
nodeConnectionManager.findNode(nodeId),
).rejects.toThrowError(nodesErrors.ErrorNodeGraphNodeIdNotFound);
await expect(nodeConnectionManager.findNode(nodeId)).resolves.toEqual(
undefined,
);

await server.stop();
} finally {
Expand Down
89 changes: 88 additions & 1 deletion tests/nodes/NodeConnectionManager.seednodes.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { NodeId, SeedNodes } from '@/nodes/types';
import type { NodeId, NodeIdEncoded, SeedNodes } from '@/nodes/types';
import type { Host, Port } from '@/network/types';
import type { Sigchain } from '@/sigchain';
import fs from 'fs';
Expand Down Expand Up @@ -123,6 +123,13 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => {
});

beforeEach(async () => {
// Clearing nodes from graphs
for await (const [nodeId] of remoteNode1.nodeGraph.getNodes()) {
await remoteNode1.nodeGraph.unsetNode(nodeId);
}
for await (const [nodeId] of remoteNode2.nodeGraph.getNodes()) {
await remoteNode2.nodeGraph.unsetNode(nodeId);
}
dataDir = await fs.promises.mkdtemp(
path.join(os.tmpdir(), 'polykey-test-'),
);
Expand Down Expand Up @@ -417,4 +424,84 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => {
await queue?.stop();
}
});
test('should expand the network when nodes enter', async () => {
// Using a single seed node we need to check that each entering node adds itself to the seed node.
// Also need to check that the new nodes can be seen in the network.
let node1: PolykeyAgent | undefined;
let node2: PolykeyAgent | undefined;
const seedNodes: SeedNodes = {};
seedNodes[nodesUtils.encodeNodeId(remoteNodeId1)] = {
host: remoteNode1.proxy.getProxyHost(),
port: remoteNode1.proxy.getProxyPort(),
};
seedNodes[nodesUtils.encodeNodeId(remoteNodeId2)] = {
host: remoteNode2.proxy.getProxyHost(),
port: remoteNode2.proxy.getProxyPort(),
};
try {
logger.setLevel(LogLevel.WARN);
node1 = await PolykeyAgent.createPolykeyAgent({
nodePath: path.join(dataDir, 'node1'),
password: 'password',
networkConfig: {
proxyHost: localHost,
agentHost: localHost,
clientHost: localHost,
forwardHost: localHost,
},
seedNodes,
logger,
});
node2 = await PolykeyAgent.createPolykeyAgent({
nodePath: path.join(dataDir, 'node2'),
password: 'password',
networkConfig: {
proxyHost: localHost,
agentHost: localHost,
clientHost: localHost,
forwardHost: localHost,
},
seedNodes,
logger,
});

await node1.queue.drained();
await node1.nodeManager.refreshBucketQueueDrained();
await node2.queue.drained();
await node2.nodeManager.refreshBucketQueueDrained();

const getAllNodes = async (node: PolykeyAgent) => {
const nodes: Array<NodeIdEncoded> = [];
for await (const [nodeId] of node.nodeGraph.getNodes()) {
nodes.push(nodesUtils.encodeNodeId(nodeId));
}
return nodes;
};
const rNode1Nodes = await getAllNodes(remoteNode1);
const rNode2Nodes = await getAllNodes(remoteNode2);
const node1Nodes = await getAllNodes(node1);
const node2Nodes = await getAllNodes(node2);

const nodeIdR1 = nodesUtils.encodeNodeId(remoteNodeId1);
const nodeIdR2 = nodesUtils.encodeNodeId(remoteNodeId2);
const nodeId1 = nodesUtils.encodeNodeId(node1.keyManager.getNodeId());
const nodeId2 = nodesUtils.encodeNodeId(node2.keyManager.getNodeId());
expect(rNode1Nodes).toContain(nodeId1);
expect(rNode1Nodes).toContain(nodeId2);
expect(rNode2Nodes).toContain(nodeId1);
expect(rNode2Nodes).toContain(nodeId2);
expect(node1Nodes).toContain(nodeIdR1);
expect(node1Nodes).toContain(nodeIdR2);
expect(node1Nodes).toContain(nodeId2);
expect(node2Nodes).toContain(nodeIdR1);
expect(node2Nodes).toContain(nodeIdR2);
expect(node2Nodes).toContain(nodeId1);
} finally {
logger.setLevel(LogLevel.WARN);
await node1?.stop();
await node1?.destroy();
await node2?.stop();
await node2?.destroy();
}
});
});

0 comments on commit 6b75ad0

Please sign in to comment.