Skip to content

Commit

Permalink
fix: NodeManager.setNode Properly handles adding new node when buck…
Browse files Browse the repository at this point in the history
…et is full

Logic of adding nodes has been split between `NodeManager` and `NodeGraph`. The `NodeGraph.setNode` just handles adding a node to the bucket where the `NodeManager.setNode` contains the logic of when to add the node

Relates #359
  • Loading branch information
tegefaulkes committed Mar 17, 2022
1 parent 366aa3f commit e9c125f
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 30 deletions.
61 changes: 33 additions & 28 deletions src/nodes/NodeGraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,12 @@ class NodeGraph {
}
}


/**
* Will add a node to the node graph and increment the bucket count.
* If the node already existed it will be updated.
* @param nodeId NodeId to add to the NodeGraph
* @param nodeAddress Address information to add
*/
@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async setNode(
nodeId: NodeId,
Expand All @@ -258,34 +263,17 @@ class NodeGraph {
bucketDomain,
nodesUtils.bucketDbKey(nodeId),
);
// If this is a new entry, check the bucket limit
if (nodeData == null) {
const count = await this.getBucketMetaProp(bucketIndex, 'count');
if (count < this.nodeBucketLimit) {
// Increment the bucket count
this.setBucketMetaProp(bucketIndex, 'count', count + 1);
} else {
// Remove the oldest entry in the bucket
const lastUpdatedBucketDb = await this.db.level(
bucketKey,
this.nodeGraphLastUpdatedDb
);
let oldestLastUpdatedKey: Buffer;
let oldestNodeId: NodeId;
for await (const key of lastUpdatedBucketDb.createKeyStream({ limit: 1 })) {
oldestLastUpdatedKey = key as Buffer;
({ nodeId: oldestNodeId } = nodesUtils.parseLastUpdatedBucketDbKey(key as Buffer));
}
await this.db.del(bucketDomain, oldestNodeId!.toBuffer());
await this.db.del(lastUpdatedDomain, oldestLastUpdatedKey!);
}
} else {
// This is an existing entry, so the index entry must be reset
if(nodeData != null){
// If the node already exists we want to remove the old `lastUpdated`
const lastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey(nodeData.lastUpdated, nodeId)
await this.db.del(
lastUpdatedDomain,
lastUpdatedKey
);
} else {
// It didn't exist so we want to increment the bucket count
const count = await this.getBucketMetaProp(bucketIndex, 'count');
await this.setBucketMetaProp(bucketIndex, 'count', count + 1);
}
const lastUpdated = getUnixtime();
await this.db.put(
Expand All @@ -296,15 +284,32 @@ class NodeGraph {
lastUpdated,
}
);
const lastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey(lastUpdated, nodeId)
const newLastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey(lastUpdated, nodeId)
await this.db.put(
lastUpdatedDomain,
lastUpdatedKey,
newLastUpdatedKey,
nodesUtils.bucketDbKey(nodeId),
true
);
}

@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async getOldestNode(bucketIndex: number ): Promise<NodeId | undefined> {
const bucketKey = nodesUtils.bucketKey(bucketIndex);
// Remove the oldest entry in the bucket
const lastUpdatedBucketDb = await this.db.level(
bucketKey,
this.nodeGraphLastUpdatedDb
);
let oldestLastUpdatedKey: Buffer;
let oldestNodeId: NodeId | undefined;
for await (const key of lastUpdatedBucketDb.createKeyStream({ limit: 1 })) {
oldestLastUpdatedKey = key as Buffer;
({ nodeId: oldestNodeId } = nodesUtils.parseLastUpdatedBucketDbKey(key as Buffer));
}
return oldestNodeId;
}

@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async unsetNode(nodeId: NodeId): Promise<void> {
const [bucketIndex, bucketKey] = this.bucketIndex(nodeId);
Expand All @@ -316,7 +321,7 @@ class NodeGraph {
);
if (nodeData != null) {
const count = await this.getBucketMetaProp(bucketIndex, 'count');
this.setBucketMetaProp(bucketIndex, 'count', count - 1);
await this.setBucketMetaProp(bucketIndex, 'count', count - 1);
await this.db.del(
bucketDomain,
nodesUtils.bucketDbKey(nodeId)
Expand Down Expand Up @@ -750,7 +755,7 @@ class NodeGraph {
* The bucket key is the string encoded version of bucket index
* that preserves lexicographic order
*/
protected bucketIndex(nodeId: NodeId): [NodeBucketIndex, string] {
public bucketIndex(nodeId: NodeId): [NodeBucketIndex, string] {
const nodeIdOwn = this.keyManager.getNodeId();
if (nodeId.equals(nodeIdOwn)) {
throw new nodesErrors.ErrorNodeGraphSameNodeId();
Expand Down
43 changes: 41 additions & 2 deletions src/nodes/NodeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import * as networkErrors from '../network/errors';
import * as networkUtils from '../network/utils';
import * as sigchainUtils from '../sigchain/utils';
import * as claimsUtils from '../claims/utils';
import { NodeData } from '../nodes/types';
import { getUnixtime } from '@/utils';

class NodeManager {
protected db: DB;
Expand Down Expand Up @@ -53,6 +55,10 @@ class NodeManager {
* Determines whether a node in the Polykey network is online.
* @return true if online, false if offline
*/
// FIXME: We shouldn't be trying to find the node just to ping it
// since we are usually pinging it during the find procedure anyway.
// I think we should be providing the address of what we're trying to ping,
// possibly make it an optional parameter?
public async pingNode(targetNodeId: NodeId): Promise<boolean> {
const targetAddress: NodeAddress =
await this.nodeConnectionManager.findNode(targetNodeId);
Expand Down Expand Up @@ -326,13 +332,46 @@ class NodeManager {
}

/**
* Sets a node in the NodeGraph
* Adds a node to the node graph.
* Updates the node if the node already exists.
*
*/
public async setNode(
nodeId: NodeId,
nodeAddress: NodeAddress,
force = false,
): Promise<void> {
return await this.nodeGraph.setNode(nodeId, nodeAddress);
// When adding a node we need to handle 3 cases
// 1. The node already exists. We need to update it's last updated field
// 2. The node doesn't exist and bucket has room.
// We need to add the node to the bucket
// 3. The node doesn't exist and the bucket is full.
// We need to ping the oldest node. If the ping succeeds we need to update
// the lastUpdated of the oldest node and drop the new one. If the ping
// fails we delete the old node and add in the new one.
const nodeData = await this.nodeGraph.getNode(nodeId);
// If this is a new entry, check the bucket limit
const [bucketIndex, ] = this.nodeGraph.bucketIndex(nodeId)
const count = await this.nodeGraph.getBucketMetaProp(bucketIndex, 'count');
if (nodeData == null && count < this.nodeGraph.nodeBucketLimit) {
// We want to add a node but the bucket is full
// We need to ping the oldest node
const oldestNodeId = (await this.nodeGraph.getOldestNode(bucketIndex))!;
if (await this.pingNode(oldestNodeId) && !force){
// The node responded, we need to update it's info and drop the new node
const oldestNode = (await this.nodeGraph.getNode(oldestNodeId))!;
await this.nodeGraph.setNode(oldestNodeId, oldestNode.address);
} else {
// The node could not be contacted or force was set,
// we drop it in favor of the new node
await this.nodeGraph.unsetNode(oldestNodeId);
await this.nodeGraph.setNode(nodeId, nodeAddress);
}
} else {
// Either already exists or has room in the bucket
// We want to add or update the node
await this.nodeGraph.setNode(nodeId, nodeAddress);
}
}

/**
Expand Down

0 comments on commit e9c125f

Please sign in to comment.