Skip to content

Commit

Permalink
feat: implemented no activity timers and queuing for refreshBucket
Browse files Browse the repository at this point in the history
Added queuing for `refreshBucket`. This means that buckets will be refreshed one at a time sequentially. This is to avoid doing a lot of costly refreshing all at once.

Added no activity for buckets. If a bucket hasn't been touched for a while, 1 hour by default, it will add a refresh bucket operation to the queue. Timers are disabled for buckets already in the queue. Only 1 timer is used for all buckets since only one of them can have the shortest timer and that's all we really care about.

#345
  • Loading branch information
tegefaulkes authored and emmacasolin committed Jun 14, 2022
1 parent b4fbc0b commit 077195f
Show file tree
Hide file tree
Showing 3 changed files with 278 additions and 9 deletions.
168 changes: 165 additions & 3 deletions src/nodes/NodeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import type {
} from '../nodes/types';
import type { ClaimEncoded } from '../claims/types';
import type { Timer } from '../types';
import type { PromiseType } from '../utils/utils';
import Logger from '@matrixai/logger';
import { StartStop, ready } from '@matrixai/async-init/dist/StartStop';
import { IdInternal } from '@matrixai/id';
import * as nodesErrors from './errors';
import * as nodesUtils from './utils';
import * as networkUtils from '../network/utils';
Expand All @@ -24,7 +24,7 @@ import * as utilsPB from '../proto/js/polykey/v1/utils/utils_pb';
import * as claimsErrors from '../claims/errors';
import * as sigchainUtils from '../sigchain/utils';
import * as claimsUtils from '../claims/utils';
import { timerStart } from '../utils/utils';
import { promise, timerStart } from '../utils/utils';

interface NodeManager extends StartStop {}
@StartStop()
Expand All @@ -47,20 +47,32 @@ class NodeManager {
protected setNodeQueueRunner: Promise<void>;
protected setNodeQueueEmpty: Promise<void>;
protected setNodeQueueDrained: () => void;
// Refresh bucket timer
protected refreshBucketDeadlineMap: Map<NodeBucketIndex, number> = new Map();
protected refreshBucketTimer: NodeJS.Timer;
protected refreshBucketNext: NodeBucketIndex;
public readonly refreshBucketTimerDefault;
protected refreshBucketQueue: Set<NodeBucketIndex> = new Set();
protected refreshBucketQueueRunning: boolean = false;
protected refreshBucketQueueRunner: Promise<void>;
protected refreshBucketQueuePlug_: PromiseType<void>;
protected refreshBucketQueueDrained_: PromiseType<void>;

constructor({
db,
keyManager,
sigchain,
nodeConnectionManager,
nodeGraph,
refreshBucketTimerDefault = 3600000, // 1 hour in milliseconds
logger,
}: {
db: DB;
keyManager: KeyManager;
sigchain: Sigchain;
nodeConnectionManager: NodeConnectionManager;
nodeGraph: NodeGraph;
refreshBucketTimerDefault?: number;
logger?: Logger;
}) {
this.logger = logger ?? new Logger(this.constructor.name);
Expand All @@ -69,17 +81,22 @@ class NodeManager {
this.sigchain = sigchain;
this.nodeConnectionManager = nodeConnectionManager;
this.nodeGraph = nodeGraph;
this.refreshBucketTimerDefault = refreshBucketTimerDefault;
}

public async start() {
this.logger.info(`Starting ${this.constructor.name}`);
this.setNodeQueueRunner = this.startSetNodeQueue();
this.startRefreshBucketTimers();
this.refreshBucketQueueRunner = this.startRefreshBucketQueue();
this.logger.info(`Started ${this.constructor.name}`);
}

public async stop() {
this.logger.info(`Stopping ${this.constructor.name}`);
await this.stopSetNodeQueue();
await this.stopRefreshBucketTimers();
await this.stopRefreshBucketQueue();
this.logger.info(`Stopped ${this.constructor.name}`);
}

Expand Down Expand Up @@ -419,6 +436,8 @@ class NodeManager {
// Either already exists or has room in the bucket
// We want to add or update the node
await this.nodeGraph.setNode(nodeId, nodeAddress, tran);
// Updating the refreshBucket timer
this.refreshBucketUpdateDeadline(bucketIndex);
} else {
// We want to add a node but the bucket is full
// We need to ping the oldest node
Expand All @@ -434,6 +453,8 @@ class NodeManager {
);
await this.nodeGraph.unsetNode(oldNodeId, tran);
await this.nodeGraph.setNode(nodeId, nodeAddress, tran);
// Updating the refreshBucket timer
this.refreshBucketUpdateDeadline(bucketIndex);
return;
}
if (blocking) {
Expand Down Expand Up @@ -491,6 +512,8 @@ class NodeManager {
);
const node = (await this.nodeGraph.getNode(nodeId))!;
await this.nodeGraph.setNode(nodeId, node.address);
// Updating the refreshBucket timer
this.refreshBucketUpdateDeadline(bucketIndex);
} else {
this.logger.debug(`Ping failed for ${nodesUtils.encodeNodeId(nodeId)}`);
// Otherwise we remove the node
Expand All @@ -502,6 +525,8 @@ class NodeManager {
if (count < this.nodeGraph.nodeBucketLimit) {
this.logger.debug(`Bucket ${bucketIndex} now has room, adding new node`);
await this.nodeGraph.setNode(nodeId, nodeAddress);
// Updating the refreshBucket timer
this.refreshBucketUpdateDeadline(bucketIndex);
}
}

Expand Down Expand Up @@ -623,7 +648,7 @@ class NodeManager {
* nodes.
* @param bucketIndex
*/
private async refreshBucket(bucketIndex: NodeBucketIndex) {
public async refreshBucket(bucketIndex: NodeBucketIndex) {
// We need to generate a random nodeId for this bucket
const nodeId = this.keyManager.getNodeId();
const bucketRandomNodeId = nodesUtils.generateRandomNodeIdForBucket(
Expand All @@ -633,6 +658,143 @@ class NodeManager {
// We then need to start a findNode procedure
await this.nodeConnectionManager.findNode(bucketRandomNodeId);
}

// Refresh bucket activity timer methods

private startRefreshBucketTimers() {
// Setting initial bucket to refresh
this.refreshBucketNext = 0;
// Setting initial deadline
this.refreshBucketTimerReset(this.refreshBucketTimerDefault);

for (
let bucketIndex = 0;
bucketIndex < this.nodeGraph.nodeIdBits;
bucketIndex++
) {
const deadline = Date.now() + this.refreshBucketTimerDefault;
this.refreshBucketDeadlineMap.set(bucketIndex, deadline);
}
}

private async stopRefreshBucketTimers() {
clearTimeout(this.refreshBucketTimer);
}

private refreshBucketTimerReset(timeout: number) {
clearTimeout(this.refreshBucketTimer);
this.refreshBucketTimer = setTimeout(() => {
this.refreshBucketRefreshTimer();
}, timeout);
}

public refreshBucketUpdateDeadline(bucketIndex: NodeBucketIndex) {
// Update the map deadline
this.refreshBucketDeadlineMap.set(
bucketIndex,
Date.now() + this.refreshBucketTimerDefault,
);
// If the bucket was pending a refresh we remove it
this.refreshBucketQueueRemove(bucketIndex);
if (bucketIndex === this.refreshBucketNext) {
// Bucket is same as next bucket, this affects the timer
this.refreshBucketRefreshTimer();
}
}

private refreshBucketRefreshTimer() {
// Getting new closest deadline
let closestBucket = this.refreshBucketNext;
let closestDeadline = Date.now() + this.refreshBucketTimerDefault;
const now = Date.now();
for (const [bucketIndex, deadline] of this.refreshBucketDeadlineMap) {
// Skip any queued buckets marked by 0 deadline
if (deadline === 0) continue;
if (deadline <= now) {
// Deadline for this has already passed, we add it to the queue
this.refreshBucketQueueAdd(bucketIndex);
continue;
}
if (deadline < closestDeadline) {
closestBucket = bucketIndex;
closestDeadline = deadline;
}
}
// Working out time left
const timeout = closestDeadline - Date.now();
this.logger.debug(
`Refreshing refreshBucket timer with new timeout ${timeout}`,
);
// Updating timer and next
this.refreshBucketNext = closestBucket;
this.refreshBucketTimerReset(timeout);
}

// Refresh bucket async queue methods

public refreshBucketQueueAdd(bucketIndex: NodeBucketIndex) {
this.logger.debug(`Adding bucket ${bucketIndex} to queue`);
this.refreshBucketDeadlineMap.set(bucketIndex, 0);
this.refreshBucketQueue.add(bucketIndex);
this.refreshBucketQueueUnplug();
}

public refreshBucketQueueRemove(bucketIndex: NodeBucketIndex) {
this.logger.debug(`Removing bucket ${bucketIndex} from queue`);
this.refreshBucketQueue.delete(bucketIndex);
}

public async refreshBucketQueueDrained() {
await this.refreshBucketQueueDrained_.p;
}

private async startRefreshBucketQueue(): Promise<void> {
this.refreshBucketQueueRunning = true;
this.refreshBucketQueuePlug();
let iterator: IterableIterator<NodeBucketIndex> | undefined;
const pace = async () => {
// Wait for plug
await this.refreshBucketQueuePlug_.p;
if (iterator == null) {
iterator = this.refreshBucketQueue[Symbol.iterator]();
}
return this.refreshBucketQueueRunning;
};
while (await pace()) {
const bucketIndex: NodeBucketIndex = iterator?.next().value;
if (bucketIndex == null) {
// Iterator is empty, plug and continue
iterator = undefined;
this.refreshBucketQueuePlug();
continue;
}
// Do the job
this.logger.debug(
`processing refreshBucket for bucket ${bucketIndex}, ${this.refreshBucketQueue.size} left in queue`,
);
await this.refreshBucket(bucketIndex);
// Remove from queue and update bucket deadline
this.refreshBucketQueue.delete(bucketIndex);
this.refreshBucketUpdateDeadline(bucketIndex);
}
this.logger.debug('startRefreshBucketQueue has ended');
}

private async stopRefreshBucketQueue(): Promise<void> {
// Flag end and await queue finish
this.refreshBucketQueueRunning = false;
this.refreshBucketQueueUnplug();
}

private refreshBucketQueuePlug() {
this.refreshBucketQueuePlug_ = promise();
this.refreshBucketQueueDrained_?.resolveP();
}

private refreshBucketQueueUnplug() {
this.refreshBucketQueueDrained_ = promise();
this.refreshBucketQueuePlug_?.resolveP();
}
}

export default NodeManager;
12 changes: 7 additions & 5 deletions src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,16 @@ function promisify<
};
}

/**
* Deconstructed promise
*/
function promise<T>(): {
export type PromiseType<T> = {
p: Promise<T>;
resolveP: (value: T | PromiseLike<T>) => void;
rejectP: (reason?: any) => void;
} {
};

/**
* Deconstructed promise
*/
function promise<T>(): PromiseType<T> {
let resolveP, rejectP;
const p = new Promise<T>((resolve, reject) => {
resolveP = resolve;
Expand Down
Loading

0 comments on commit 077195f

Please sign in to comment.