From 0024e6fa89a417c9e53e6da13536323e23a50663 Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Thu, 17 Mar 2022 17:32:01 +1100 Subject: [PATCH] fix: `NodeManager.setNode` Properly handles adding new node when bucket is full Logic of adding nodes has been split between `NodeManager` and `NodeGraph`. The `NodeGraph.setNode` just handles adding a node to the bucket where the `NodeManager.setNode` contains the logic of when to add the node Relates #359 --- .../service/nodesClosestLocalNodesGet.ts | 3 +- src/client/GRPCClientClient.ts | 11 +- src/client/service/nodesAdd.ts | 1 + src/nodes/NodeConnectionManager.ts | 10 +- src/nodes/NodeGraph.ts | 153 +++++++++++---- src/nodes/NodeManager.ts | 67 ++++++- src/nodes/utils.ts | 3 - tests/acl/ACL.test.ts | 13 -- tests/nodes/NodeConnection.test.ts | 8 +- tests/nodes/NodeManager.test.ts | 181 ++++++++++++++++++ 10 files changed, 375 insertions(+), 75 deletions(-) diff --git a/src/agent/service/nodesClosestLocalNodesGet.ts b/src/agent/service/nodesClosestLocalNodesGet.ts index 844aa0253e..36a172b125 100644 --- a/src/agent/service/nodesClosestLocalNodesGet.ts +++ b/src/agent/service/nodesClosestLocalNodesGet.ts @@ -47,7 +47,8 @@ function nodesClosestLocalNodesGet({ ); // Get all local nodes that are closest to the target node from the request const closestNodes = await db.withTransactionF( - async (tran) => await nodeGraph.getClosestNodes(nodeId, tran), + async (tran) => + await nodeGraph.getClosestNodes(nodeId, undefined, tran), ); for (const [nodeId, nodeData] of closestNodes) { const addressMessage = new nodesPB.Address(); diff --git a/src/client/GRPCClientClient.ts b/src/client/GRPCClientClient.ts index 78b13ec9df..2b1b905db7 100644 --- a/src/client/GRPCClientClient.ts +++ b/src/client/GRPCClientClient.ts @@ -3,7 +3,7 @@ import type { ClientReadableStream } from '@grpc/grpc-js/build/src/call'; import type { AsyncGeneratorReadableStreamClient } from '../grpc/types'; import type { Session } from '../sessions'; import type { NodeId } from '../nodes/types'; -import type { Host, Port, TLSConfig, ProxyConfig } from '../network/types'; +import type { Host, Port, ProxyConfig, TLSConfig } from '../network/types'; import type * as utilsPB from '../proto/js/polykey/v1/utils/utils_pb'; import type * as agentPB from '../proto/js/polykey/v1/agent/agent_pb'; import type * as vaultsPB from '../proto/js/polykey/v1/vaults/vaults_pb'; @@ -68,7 +68,7 @@ class GRPCClientClient extends GRPCClient { interceptors, logger, }); - const grpcClientClient = new GRPCClientClient({ + return new GRPCClientClient({ client, nodeId, host, @@ -80,7 +80,6 @@ class GRPCClientClient extends GRPCClient { destroyCallback, logger, }); - return grpcClientClient; } public async destroy() { @@ -905,6 +904,12 @@ class GRPCClientClient extends GRPCClient { public nodesGetAll(...args) { return grpcUtils.promisifyUnaryCall( this.client, + { + nodeId: this.nodeId, + host: this.host, + port: this.port, + command: this.identitiesAuthenticate.name, + }, this.client.nodesGetAll, )(...args); } diff --git a/src/client/service/nodesAdd.ts b/src/client/service/nodesAdd.ts index 079d2eee2d..0884a0f0b2 100644 --- a/src/client/service/nodesAdd.ts +++ b/src/client/service/nodesAdd.ts @@ -67,6 +67,7 @@ function nodesAdd({ host, port, } as NodeAddress, + undefined, tran, ), ); diff --git a/src/nodes/NodeConnectionManager.ts b/src/nodes/NodeConnectionManager.ts index fc5c99ff8a..c20f093f3c 100644 --- a/src/nodes/NodeConnectionManager.ts +++ b/src/nodes/NodeConnectionManager.ts @@ -10,9 +10,7 @@ import type { NodeId, NodeIdString, SeedNodes, - NodeEntry, } from './types'; -import type { DBTransaction } from '@matrixai/db'; import { withF } from '@matrixai/resources'; import Logger from '@matrixai/logger'; import { ready, StartStop } from '@matrixai/async-init/dist/StartStop'; @@ -103,7 +101,7 @@ class NodeConnectionManager { this.logger.info(`Starting ${this.constructor.name}`); for (const nodeIdEncoded in this.seedNodes) { const nodeId = nodesUtils.decodeNodeId(nodeIdEncoded)!; - await this.nodeGraph.setNode(nodeId, this.seedNodes[nodeIdEncoded]); + await this.nodeGraph.setNode(nodeId, this.seedNodes[nodeIdEncoded]); // FIXME: also fine implicit transactions } this.logger.info(`Started ${this.constructor.name}`); } @@ -243,6 +241,7 @@ class NodeConnectionManager { )}`, ); // Creating the connection and set in map + // FIXME: this is fine, just use the implicit tran. fix this when adding optional transactions const targetAddress = await this.findNode(targetNodeId); // If the stored host is not a valid host (IP address), // then we assume it to be a hostname @@ -363,6 +362,7 @@ class NodeConnectionManager { * Retrieves the node address. If an entry doesn't exist in the db, then * proceeds to locate it using Kademlia. * @param targetNodeId Id of the node we are tying to find + * @param tran */ @ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning()) public async findNode(targetNodeId: NodeId): Promise { @@ -405,6 +405,7 @@ class NodeConnectionManager { // Let foundTarget: boolean = false; let foundAddress: NodeAddress | undefined = undefined; // Get the closest alpha nodes to the target node (set as shortlist) + // FIXME: no tran const shortlist = await this.nodeGraph.getClosestNodes( targetNodeId, this.initialClosestNodes, @@ -438,6 +439,7 @@ class NodeConnectionManager { try { // Add the node to the database so that we can find its address in // call to getConnectionToNode + // FIXME: no tran await this.nodeGraph.setNode(nextNodeId, nextNodeAddress.address); await this.getConnection(nextNodeId); } catch (e) { @@ -458,6 +460,7 @@ class NodeConnectionManager { continue; } if (nodeId.equals(targetNodeId)) { + // FIXME: no tran await this.nodeGraph.setNode(nodeId, nodeData.address); foundAddress = nodeData.address; // We have found the target node, so we can stop trying to look for it @@ -556,6 +559,7 @@ class NodeConnectionManager { ); for (const [nodeId, nodeData] of nodes) { // FIXME: this should be the `nodeManager.setNode` + // FIXME: no tran needed await this.nodeGraph.setNode(nodeId, nodeData.address); } } diff --git a/src/nodes/NodeGraph.ts b/src/nodes/NodeGraph.ts index 34fa5c56a4..0bf30f3ae0 100644 --- a/src/nodes/NodeGraph.ts +++ b/src/nodes/NodeGraph.ts @@ -154,8 +154,14 @@ class NodeGraph { @ready(new nodesErrors.ErrorNodeGraphNotRunning()) public async getNode( nodeId: NodeId, - tran: DBTransaction, + tran?: DBTransaction, ): Promise { + if (tran == null) { + return this.db.withTransactionF(async (tran) => + this.getNode(nodeId, tran), + ); + } + const [bucketIndex] = this.bucketIndex(nodeId); const bucketDomain = [ ...this.nodeGraphBucketsDbPath, @@ -176,8 +182,15 @@ class NodeGraph { @ready(new nodesErrors.ErrorNodeGraphNotRunning()) public async *getNodes( order: 'asc' | 'desc' = 'asc', - tran: DBTransaction, + tran?: DBTransaction, ): AsyncGenerator<[NodeId, NodeData]> { + if (tran == null) { + const getNodes = (tran) => this.getNodes(order, tran); + return yield* this.db.withTransactionG(async function* (tran) { + return yield* getNodes(tran); + }); + } + for await (const [key, nodeData] of tran.iterator( { reverse: order !== 'asc', @@ -190,12 +203,25 @@ class NodeGraph { } } + /** + * Will add a node to the node graph and increment the bucket count. + * If the node already existed it will be updated. + * @param nodeId NodeId to add to the NodeGraph + * @param nodeAddress Address information to add + * @param tran + */ @ready(new nodesErrors.ErrorNodeGraphNotRunning()) public async setNode( nodeId: NodeId, nodeAddress: NodeAddress, - tran: DBTransaction, + tran?: DBTransaction, ): Promise { + if (tran == null) { + return this.db.withTransactionF(async (tran) => + this.setNode(nodeId, nodeAddress, tran), + ); + } + const [bucketIndex, bucketKey] = this.bucketIndex(nodeId); const lastUpdatedPath = [...this.nodeGraphLastUpdatedDbPath, bucketKey]; const bucketPath = [...this.nodeGraphBucketsDbPath, bucketKey]; @@ -203,57 +229,67 @@ class NodeGraph { ...bucketPath, nodesUtils.bucketDbKey(nodeId), ]); - // If this is a new entry, check the bucket limit - if (nodeData == null) { - const count = await this.getBucketMetaProp(bucketIndex, 'count', tran); - if (count < this.nodeBucketLimit) { - // Increment the bucket count - await this.setBucketMetaProp(bucketIndex, 'count', count + 1, tran); - } else { - // Remove the oldest entry in the bucket - let oldestLastUpdatedKey: Buffer; - let oldestNodeId: NodeId; - for await (const [key] of tran.iterator( - { - limit: 1, - values: false, - }, - this.nodeGraphLastUpdatedDbPath, - )) { - oldestLastUpdatedKey = key as unknown as Buffer; - ({ nodeId: oldestNodeId } = nodesUtils.parseLastUpdatedBucketDbKey( - key as unknown as Buffer, - )); - } - await tran.del([...bucketPath, oldestNodeId!.toBuffer()]); - await tran.del([...lastUpdatedPath, oldestLastUpdatedKey!]); - } - } else { - // This is an existing entry, so the index entry must be reset + if (nodeData != null) { + // If the node already exists we want to remove the old `lastUpdated` const lastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey( nodeData.lastUpdated, nodeId, ); await tran.del([...lastUpdatedPath, lastUpdatedKey]); + } else { + // It didn't exist so we want to increment the bucket count + const count = await this.getBucketMetaProp(bucketIndex, 'count', tran); + await this.setBucketMetaProp(bucketIndex, 'count', count + 1, tran); } const lastUpdated = getUnixtime(); await tran.put([...bucketPath, nodesUtils.bucketDbKey(nodeId)], { address: nodeAddress, lastUpdated, }); - const lastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey( + const newLastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey( lastUpdated, nodeId, ); await tran.put( - [...lastUpdatedPath, lastUpdatedKey], + [...lastUpdatedPath, newLastUpdatedKey], nodesUtils.bucketDbKey(nodeId), true, ); } @ready(new nodesErrors.ErrorNodeGraphNotRunning()) - public async unsetNode(nodeId: NodeId, tran: DBTransaction): Promise { + public async getOldestNode( + bucketIndex: number, + tran?: DBTransaction, + ): Promise { + if (tran == null) { + return this.db.withTransactionF(async (tran) => + this.getOldestNode(bucketIndex, tran), + ); + } + + const bucketKey = nodesUtils.bucketKey(bucketIndex); + // Remove the oldest entry in the bucket + let oldestNodeId: NodeId | undefined; + for await (const [key] of tran.iterator({ limit: 1 }, [ + ...this.nodeGraphLastUpdatedDbPath, + bucketKey, + ])) { + ({ nodeId: oldestNodeId } = nodesUtils.parseLastUpdatedBucketDbKey( + key as unknown as Buffer, + )); + } + return oldestNodeId; + } + + @ready(new nodesErrors.ErrorNodeGraphNotRunning()) + public async unsetNode(nodeId: NodeId, tran?: DBTransaction): Promise { + if (tran == null) { + return this.db.withTransactionF(async (tran) => + this.unsetNode(nodeId, tran), + ); + } + const [bucketIndex, bucketKey] = this.bucketIndex(nodeId); const bucketPath = [...this.nodeGraphBucketsDbPath, bucketKey]; const lastUpdatedPath = [...this.nodeGraphLastUpdatedDbPath, bucketKey]; @@ -284,8 +320,14 @@ class NodeGraph { bucketIndex: NodeBucketIndex, sort: 'nodeId' | 'distance' | 'lastUpdated' = 'nodeId', order: 'asc' | 'desc' = 'asc', - tran: DBTransaction, + tran?: DBTransaction, ): Promise { + if (tran == null) { + return this.db.withTransactionF(async (tran) => + this.getBucket(bucketIndex, sort, order, tran), + ); + } + if (bucketIndex < 0 || bucketIndex >= this.nodeIdBits) { throw new nodesErrors.ErrorNodeGraphBucketIndex( `bucketIndex must be between 0 and ${this.nodeIdBits - 1} inclusive`, @@ -355,8 +397,15 @@ class NodeGraph { public async *getBuckets( sort: 'nodeId' | 'distance' | 'lastUpdated' = 'nodeId', order: 'asc' | 'desc' = 'asc', - tran: DBTransaction, + tran?: DBTransaction, ): AsyncGenerator<[NodeBucketIndex, NodeBucket]> { + if (tran == null) { + const getBuckets = (tran) => this.getBuckets(sort, order, tran); + return yield* this.db.withTransactionG(async function* (tran) { + return yield* getBuckets(tran); + }); + } + let bucketIndex: NodeBucketIndex | undefined = undefined; let bucket: NodeBucket = []; if (sort === 'nodeId' || sort === 'distance') { @@ -448,8 +497,14 @@ class NodeGraph { @ready(new nodesErrors.ErrorNodeGraphNotRunning()) public async resetBuckets( nodeIdOwn: NodeId, - tran: DBTransaction, + tran?: DBTransaction, ): Promise { + if (tran == null) { + return this.db.withTransactionF(async (tran) => + this.resetBuckets(nodeIdOwn, tran), + ); + } + // Setup new space const spaceNew = this.space === '0' ? '1' : '0'; const nodeGraphMetaDbPathNew = [...this.nodeGraphDbPath, 'meta' + spaceNew]; @@ -536,8 +591,14 @@ class NodeGraph { @ready(new nodesErrors.ErrorNodeGraphNotRunning()) public async getBucketMeta( bucketIndex: NodeBucketIndex, - tran: DBTransaction, + tran?: DBTransaction, ): Promise { + if (tran == null) { + return this.db.withTransactionF(async (tran) => + this.getBucketMeta(bucketIndex, tran), + ); + } + if (bucketIndex < 0 || bucketIndex >= this.nodeIdBits) { throw new nodesErrors.ErrorNodeGraphBucketIndex( `bucketIndex must be between 0 and ${this.nodeIdBits - 1} inclusive`, @@ -561,8 +622,14 @@ class NodeGraph { public async getBucketMetaProp( bucketIndex: NodeBucketIndex, key: Key, - tran: DBTransaction, + tran?: DBTransaction, ): Promise { + if (tran == null) { + return this.db.withTransactionF(async (tran) => + this.getBucketMetaProp(bucketIndex, key, tran), + ); + } + if (bucketIndex < 0 || bucketIndex >= this.nodeIdBits) { throw new nodesErrors.ErrorNodeGraphBucketIndex( `bucketIndex must be between 0 and ${this.nodeIdBits - 1} inclusive`, @@ -602,8 +669,14 @@ class NodeGraph { public async getClosestNodes( nodeId: NodeId, limit: number = this.nodeBucketLimit, - tran: DBTransaction, + tran?: DBTransaction, ): Promise { + if (tran == null) { + return this.db.withTransactionF(async (tran) => + this.getClosestNodes(nodeId, limit, tran), + ); + } + // Buckets map to the target node in the following way; // 1. 0, 1, ..., T-1 -> T // 2. T -> 0, 1, ..., T-1 @@ -736,7 +809,7 @@ class NodeGraph { * The bucket key is the string encoded version of bucket index * that preserves lexicographic order */ - protected bucketIndex(nodeId: NodeId): [NodeBucketIndex, string] { + public bucketIndex(nodeId: NodeId): [NodeBucketIndex, string] { const nodeIdOwn = this.keyManager.getNodeId(); if (nodeId.equals(nodeIdOwn)) { throw new nodesErrors.ErrorNodeGraphSameNodeId(); diff --git a/src/nodes/NodeManager.ts b/src/nodes/NodeManager.ts index ac9d3a4a40..d5b905cbc5 100644 --- a/src/nodes/NodeManager.ts +++ b/src/nodes/NodeManager.ts @@ -53,6 +53,10 @@ class NodeManager { * Determines whether a node in the Polykey network is online. * @return true if online, false if offline */ + // FIXME: We shouldn't be trying to find the node just to ping it + // since we are usually pinging it during the find procedure anyway. + // I think we should be providing the address of what we're trying to ping, + // possibly make it an optional parameter? public async pingNode(targetNodeId: NodeId): Promise { const targetAddress: NodeAddress = await this.nodeConnectionManager.findNode(targetNodeId); @@ -311,7 +315,7 @@ class NodeManager { */ public async getNodeAddress( nodeId: NodeId, - tran?: DBTransaction, + tran: DBTransaction, ): Promise { return (await this.nodeGraph.getNode(nodeId, tran))?.address; } @@ -324,7 +328,7 @@ class NodeManager { */ public async knowsNode( targetNodeId: NodeId, - tran?: DBTransaction, + tran: DBTransaction, ): Promise { return (await this.nodeGraph.getNode(targetNodeId, tran)) != null; } @@ -334,22 +338,68 @@ class NodeManager { */ public async getBucket( bucketIndex: number, - tran?: DBTransaction, + tran: DBTransaction, ): Promise { - return await this.nodeGraph.getBucket(bucketIndex, tran); + return await this.nodeGraph.getBucket( + bucketIndex, + undefined, + undefined, + tran, + ); } /** - * Sets a node in the NodeGraph + * Adds a node to the node graph. + * Updates the node if the node already exists. + * */ public async setNode( nodeId: NodeId, nodeAddress: NodeAddress, - tran?: DBTransaction, + force = false, + tran: DBTransaction, ): Promise { - return await this.nodeGraph.setNode(nodeId, nodeAddress, tran); + // When adding a node we need to handle 3 cases + // 1. The node already exists. We need to update it's last updated field + // 2. The node doesn't exist and bucket has room. + // We need to add the node to the bucket + // 3. The node doesn't exist and the bucket is full. + // We need to ping the oldest node. If the ping succeeds we need to update + // the lastUpdated of the oldest node and drop the new one. If the ping + // fails we delete the old node and add in the new one. + const nodeData = await this.nodeGraph.getNode(nodeId, tran); + // If this is a new entry, check the bucket limit + const [bucketIndex] = this.nodeGraph.bucketIndex(nodeId); + const count = await this.nodeGraph.getBucketMetaProp( + bucketIndex, + 'count', + tran, + ); + if (nodeData != null || count < this.nodeGraph.nodeBucketLimit) { + // Either already exists or has room in the bucket + // We want to add or update the node + await this.nodeGraph.setNode(nodeId, nodeAddress, tran); + } else { + // We want to add a node but the bucket is full + // We need to ping the oldest node + const oldestNodeId = (await this.nodeGraph.getOldestNode( + bucketIndex, + tran, + ))!; + if ((await this.pingNode(oldestNodeId)) && !force) { + // The node responded, we need to update it's info and drop the new node + const oldestNode = (await this.nodeGraph.getNode(oldestNodeId, tran))!; + await this.nodeGraph.setNode(oldestNodeId, oldestNode.address, tran); + } else { + // The node could not be contacted or force was set, + // we drop it in favor of the new node + await this.nodeGraph.unsetNode(oldestNodeId, tran); + await this.nodeGraph.setNode(nodeId, nodeAddress, tran); + } + } } + // FIXME // /** // * Updates the node in the NodeGraph // */ @@ -364,10 +414,11 @@ class NodeManager { /** * Removes a node from the NodeGraph */ - public async unsetNode(nodeId: NodeId, tran?: DBTransaction): Promise { + public async unsetNode(nodeId: NodeId, tran: DBTransaction): Promise { return await this.nodeGraph.unsetNode(nodeId, tran); } + // FIXME // /** // * Gets all buckets from the NodeGraph // */ diff --git a/src/nodes/utils.ts b/src/nodes/utils.ts index 1db8033819..76bb4058a2 100644 --- a/src/nodes/utils.ts +++ b/src/nodes/utils.ts @@ -1,12 +1,9 @@ import type { - NodeData, NodeId, NodeIdEncoded, NodeBucket, - NodeIdString, NodeBucketIndex, } from './types'; -import { utils as dbUtils } from '@matrixai/db'; import { IdInternal } from '@matrixai/id'; import lexi from 'lexicographic-integer'; import { bytes2BigInt, bufferSplit } from '../utils'; diff --git a/tests/acl/ACL.test.ts b/tests/acl/ACL.test.ts index cd06585605..ec4020a1bf 100644 --- a/tests/acl/ACL.test.ts +++ b/tests/acl/ACL.test.ts @@ -11,7 +11,6 @@ import ACL from '@/acl/ACL'; import * as aclErrors from '@/acl/errors'; import * as keysUtils from '@/keys/utils'; import * as vaultsUtils from '@/vaults/utils'; -import * as testUtils from '../utils'; import * as testNodesUtils from '../nodes/utils'; describe(ACL.name, () => { @@ -109,30 +108,18 @@ describe(ACL.name, () => { await expect(acl.setNodesPerm([], {} as Permission)).rejects.toThrow( aclErrors.ErrorACLNotRunning, ); - await expect(acl.setNodesPermOps([], {} as Permission)).rejects.toThrow( - aclErrors.ErrorACLNotRunning, - ); await expect(acl.setNodePerm(nodeIdX, {} as Permission)).rejects.toThrow( aclErrors.ErrorACLNotRunning, ); - await expect(acl.setNodePermOps(nodeIdX, {} as Permission)).rejects.toThrow( - aclErrors.ErrorACLNotRunning, - ); await expect(acl.unsetNodePerm(nodeIdX)).rejects.toThrow( aclErrors.ErrorACLNotRunning, ); - await expect(acl.unsetNodePermOps(nodeIdX)).rejects.toThrow( - aclErrors.ErrorACLNotRunning, - ); await expect(acl.unsetVaultPerms(1 as VaultId)).rejects.toThrow( aclErrors.ErrorACLNotRunning, ); await expect(acl.joinNodePerm(nodeIdX, [])).rejects.toThrow( aclErrors.ErrorACLNotRunning, ); - await expect(acl.joinNodePermOps(nodeIdX, [])).rejects.toThrow( - aclErrors.ErrorACLNotRunning, - ); await expect(acl.joinVaultPerms(1 as VaultId, [])).rejects.toThrow( aclErrors.ErrorACLNotRunning, ); diff --git a/tests/nodes/NodeConnection.test.ts b/tests/nodes/NodeConnection.test.ts index c22475912d..3ce2e71831 100644 --- a/tests/nodes/NodeConnection.test.ts +++ b/tests/nodes/NodeConnection.test.ts @@ -704,7 +704,7 @@ describe('${NodeConnection.name} test', () => { "should call `killSelf and throw if the server %s's during testUnaryFail", async (option) => { let nodeConnection: - | NodeConnection + | NodeConnection | undefined; let testProxy: Proxy | undefined; let testProcess: child_process.ChildProcessWithoutNullStreams | undefined; @@ -749,7 +749,7 @@ describe('${NodeConnection.name} test', () => { targetHost: testProxy.getProxyHost(), targetPort: testProxy.getProxyPort(), clientFactory: (args) => - testGrpcUtils.GRPCClientTest.createGRPCClientTest(args), + grpcTestUtils.GRPCClientTest.createGRPCClientTest(args), }); const client = nodeConnection.getClient(); @@ -774,7 +774,7 @@ describe('${NodeConnection.name} test', () => { "should call `killSelf and throw if the server %s's during testStreamFail", async (option) => { let nodeConnection: - | NodeConnection + | NodeConnection | undefined; let testProxy: Proxy | undefined; let testProcess: child_process.ChildProcessWithoutNullStreams | undefined; @@ -819,7 +819,7 @@ describe('${NodeConnection.name} test', () => { targetHost: testProxy.getProxyHost(), targetPort: testProxy.getProxyPort(), clientFactory: (args) => - testGrpcUtils.GRPCClientTest.createGRPCClientTest(args), + grpcTestUtils.GRPCClientTest.createGRPCClientTest(args), }); const client = nodeConnection.getClient(); diff --git a/tests/nodes/NodeManager.test.ts b/tests/nodes/NodeManager.test.ts index 0ac96ec27a..d74df5c6a2 100644 --- a/tests/nodes/NodeManager.test.ts +++ b/tests/nodes/NodeManager.test.ts @@ -18,6 +18,7 @@ import Sigchain from '@/sigchain/Sigchain'; import * as claimsUtils from '@/claims/utils'; import { promisify, sleep } from '@/utils'; import * as nodesUtils from '@/nodes/utils'; +import * as nodesTestUtils from './utils'; describe(`${NodeManager.name} test`, () => { const password = 'password'; @@ -423,4 +424,184 @@ describe(`${NodeManager.name} test`, () => { expect(chainData).toContain(nodesUtils.encodeNodeId(yNodeId)); }); }); + test('should add a node when bucket has room', async () => { + const nodeManager = new NodeManager({ + db, + sigchain: {} as Sigchain, + keyManager, + nodeGraph, + nodeConnectionManager: {} as NodeConnectionManager, + logger, + }); + const localNodeId = keyManager.getNodeId(); + const bucketIndex = 100; + const nodeId = nodesTestUtils.generateNodeIdForBucket( + localNodeId, + bucketIndex, + ); + await nodeManager.setNode(nodeId, {} as NodeAddress); + + // Checking bucket + const bucket = await nodeManager.getBucket(bucketIndex); + expect(bucket).toHaveLength(1); + }); + test('should update a node if node exists', async () => { + const nodeManager = new NodeManager({ + db, + sigchain: {} as Sigchain, + keyManager, + nodeGraph, + nodeConnectionManager: {} as NodeConnectionManager, + logger, + }); + const localNodeId = keyManager.getNodeId(); + const bucketIndex = 100; + const nodeId = nodesTestUtils.generateNodeIdForBucket( + localNodeId, + bucketIndex, + ); + await nodeManager.setNode(nodeId, { + host: '' as Host, + port: 11111 as Port, + }); + + const nodeData = (await nodeGraph.getNode(nodeId))!; + await sleep(1100); + + // Should update the node + await nodeManager.setNode(nodeId, { + host: '' as Host, + port: 22222 as Port, + }); + + const newNodeData = (await nodeGraph.getNode(nodeId))!; + expect(newNodeData.address.port).not.toEqual(nodeData.address.port); + expect(newNodeData.lastUpdated).not.toEqual(nodeData.lastUpdated); + }); + test('should not add node if bucket is full and old node is alive', async () => { + const nodeManager = new NodeManager({ + db, + sigchain: {} as Sigchain, + keyManager, + nodeGraph, + nodeConnectionManager: {} as NodeConnectionManager, + logger, + }); + const localNodeId = keyManager.getNodeId(); + const bucketIndex = 100; + // Creating 20 nodes in bucket + for (let i = 1; i <= 20; i++) { + const nodeId = nodesTestUtils.generateNodeIdForBucket( + localNodeId, + bucketIndex, + i, + ); + await nodeManager.setNode(nodeId, { port: i } as NodeAddress); + } + const nodeId = nodesTestUtils.generateNodeIdForBucket( + localNodeId, + bucketIndex, + ); + // Mocking ping + const nodeManagerPingMock = jest.spyOn(NodeManager.prototype, 'pingNode'); + nodeManagerPingMock.mockResolvedValue(true); + const oldestNodeId = await nodeGraph.getOldestNode(bucketIndex); + const oldestNode = await nodeGraph.getNode(oldestNodeId!); + // Waiting for a second to tick over + await sleep(1100); + // Adding a new node with bucket full + await nodeManager.setNode(nodeId, { port: 55555 } as NodeAddress); + // Bucket still contains max nodes + const bucket = await nodeManager.getBucket(bucketIndex); + expect(bucket).toHaveLength(nodeGraph.nodeBucketLimit); + // New node was not added + const node = await nodeGraph.getNode(nodeId); + expect(node).toBeUndefined(); + // Oldest node was updated + const oldestNodeNew = await nodeGraph.getNode(oldestNodeId!); + expect(oldestNodeNew!.lastUpdated).not.toEqual(oldestNode!.lastUpdated); + nodeManagerPingMock.mockRestore(); + }); + test('should add node if bucket is full, old node is alive and force is set', async () => { + const nodeManager = new NodeManager({ + db, + sigchain: {} as Sigchain, + keyManager, + nodeGraph, + nodeConnectionManager: {} as NodeConnectionManager, + logger, + }); + const localNodeId = keyManager.getNodeId(); + const bucketIndex = 100; + // Creating 20 nodes in bucket + for (let i = 1; i <= 20; i++) { + const nodeId = nodesTestUtils.generateNodeIdForBucket( + localNodeId, + bucketIndex, + i, + ); + await nodeManager.setNode(nodeId, { port: i } as NodeAddress); + } + const nodeId = nodesTestUtils.generateNodeIdForBucket( + localNodeId, + bucketIndex, + ); + // Mocking ping + const nodeManagerPingMock = jest.spyOn(NodeManager.prototype, 'pingNode'); + nodeManagerPingMock.mockResolvedValue(true); + const oldestNodeId = await nodeGraph.getOldestNode(bucketIndex); + // Adding a new node with bucket full + await nodeManager.setNode(nodeId, { port: 55555 } as NodeAddress, true); + // Bucket still contains max nodes + const bucket = await nodeManager.getBucket(bucketIndex); + expect(bucket).toHaveLength(nodeGraph.nodeBucketLimit); + // New node was added + const node = await nodeGraph.getNode(nodeId); + expect(node).toBeDefined(); + // Oldest node was removed + const oldestNodeNew = await nodeGraph.getNode(oldestNodeId!); + expect(oldestNodeNew).toBeUndefined(); + nodeManagerPingMock.mockRestore(); + }); + test('should add node if bucket is full and old node is dead', async () => { + const nodeManager = new NodeManager({ + db, + sigchain: {} as Sigchain, + keyManager, + nodeGraph, + nodeConnectionManager: {} as NodeConnectionManager, + logger, + }); + const localNodeId = keyManager.getNodeId(); + const bucketIndex = 100; + // Creating 20 nodes in bucket + for (let i = 1; i <= 20; i++) { + const nodeId = nodesTestUtils.generateNodeIdForBucket( + localNodeId, + bucketIndex, + i, + ); + await nodeManager.setNode(nodeId, { port: i } as NodeAddress); + } + const nodeId = nodesTestUtils.generateNodeIdForBucket( + localNodeId, + bucketIndex, + ); + // Mocking ping + const nodeManagerPingMock = jest.spyOn(NodeManager.prototype, 'pingNode'); + nodeManagerPingMock.mockResolvedValue(false); + const oldestNodeId = await nodeGraph.getOldestNode(bucketIndex); + // Adding a new node with bucket full + await nodeManager.setNode(nodeId, { port: 55555 } as NodeAddress, true); + // Bucket still contains max nodes + const bucket = await nodeManager.getBucket(bucketIndex); + expect(bucket).toHaveLength(nodeGraph.nodeBucketLimit); + // New node was added + const node = await nodeGraph.getNode(nodeId); + expect(node).toBeDefined(); + // Oldest node was removed + const oldestNodeNew = await nodeGraph.getNode(oldestNodeId!); + expect(oldestNodeNew).toBeUndefined(); + nodeManagerPingMock.mockRestore(); + }); });