Skip to content

Commit 986e7f0

Browse files
feat: periodic ip limit check (#513)
Co-authored-by: Martin Kolárik <martin@kolarik.sk>
1 parent f7f49a6 commit 986e7f0

File tree

5 files changed

+139
-13
lines changed

5 files changed

+139
-13
lines changed

src/lib/server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { Server } from 'node:http';
22
import { initRedisClient } from './redis/client.js';
3-
import { adoptedProbes, initWsServer } from './ws/server.js';
3+
import { adoptedProbes, probeIpLimit, initWsServer } from './ws/server.js';
44
import { getMetricsAgent } from './metrics.js';
55
import { populateMemList as populateMemMalwareList } from './malware/client.js';
66
import { populateMemList as populateMemIpRangesList } from './ip-ranges.js';
@@ -33,6 +33,8 @@ export const createServer = async (): Promise<Server> => {
3333
await auth.syncTokens();
3434
auth.scheduleSync();
3535

36+
probeIpLimit.scheduleSync();
37+
3638
reconnectProbes();
3739

3840
const { getWsServer } = await import('./ws/server.js');

src/lib/ws/helper/probe-ip-limit.ts

Lines changed: 61 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,70 @@
1-
import { fetchProbes } from '../server.js';
1+
import _ from 'lodash';
2+
import config from 'config';
3+
import type { fetchProbes as serverFetchProbes, fetchRawSockets as serverFetchRawSockets } from '../server.js';
24
import { scopedLogger } from '../../logger.js';
35
import { ProbeError } from '../../probe-error.js';
46

7+
const numberOfProcesses = config.get<number>('server.processes');
8+
59
const logger = scopedLogger('ws:limit');
610

7-
export const verifyIpLimit = async (ip: string, socketId: string): Promise<void> => {
8-
if (process.env['FAKE_PROBE_IP'] || process.env['TEST_MODE'] === 'unit') {
9-
return;
11+
export class ProbeIpLimit {
12+
private timer: NodeJS.Timeout | undefined;
13+
14+
constructor (
15+
private readonly fetchProbes: typeof serverFetchProbes,
16+
private readonly fetchRawSockets: typeof serverFetchRawSockets,
17+
) {}
18+
19+
scheduleSync () {
20+
clearTimeout(this.timer);
21+
22+
this.timer = setTimeout(() => {
23+
this.syncIpLimit()
24+
.finally(() => this.scheduleSync())
25+
.catch(error => logger.error(error));
26+
}, 60_000 * 2 * Math.random() * numberOfProcesses).unref();
27+
}
28+
29+
async syncIpLimit () {
30+
const probes = await this.fetchProbes();
31+
// Sorting probes by "client" (socket id), so all workers will treat the same probe as "first".
32+
const sortedProbes = _.sortBy(probes, [ 'client' ]);
33+
34+
const uniqIpToSocketId = new Map<string, string>();
35+
const socketIdsToDisconnect = new Set<string>();
36+
37+
for (const probe of sortedProbes) {
38+
const prevSocketId = uniqIpToSocketId.get(probe.ipAddress);
39+
40+
if (prevSocketId && prevSocketId !== probe.client) {
41+
logger.warn(`Probe ip duplication occurred (${probe.ipAddress}). Socket id to preserve: ${prevSocketId}, socket id to disconnect: ${probe.client}`);
42+
socketIdsToDisconnect.add(probe.client);
43+
} else {
44+
uniqIpToSocketId.set(probe.ipAddress, probe.client);
45+
}
46+
}
47+
48+
if (socketIdsToDisconnect.size > 0) {
49+
const sockets = await this.fetchRawSockets();
50+
sockets
51+
.filter(socket => socketIdsToDisconnect.has(socket.id))
52+
.forEach(socket => socket.disconnect());
53+
}
1054
}
1155

12-
const probes = await fetchProbes({ allowStale: false });
13-
const previousSocket = probes.find(p => p.ipAddress === ip && p.client !== socketId);
56+
async verifyIpLimit (ip: string, socketId: string): Promise<void> {
57+
if (process.env['FAKE_PROBE_IP'] || process.env['TEST_MODE'] === 'unit') {
58+
return;
59+
}
60+
61+
const probes = await this.fetchProbes({ allowStale: false });
62+
const previousProbe = probes.find(p => p.ipAddress === ip && p.client !== socketId);
1463

15-
if (previousSocket) {
16-
logger.info(`ws client ${socketId} has reached the concurrent IP limit.`, { message: previousSocket.ipAddress });
17-
throw new ProbeError('ip limit');
64+
if (previousProbe) {
65+
logger.info(`ws client ${socketId} has reached the concurrent IP limit.`, { message: previousProbe.ipAddress });
66+
throw new ProbeError('ip limit');
67+
}
1868
}
19-
};
69+
}
70+

src/lib/ws/server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { getRedisClient } from '../redis/client.js';
77
import { SyncedProbeList } from './synced-probe-list.js';
88
import { client } from '../sql/client.js';
99
import { AdoptedProbes } from '../adopted-probes.js';
10+
import { ProbeIpLimit } from './helper/probe-ip-limit.js';
1011

1112
export type SocketData = {
1213
probe: Probe;
@@ -90,3 +91,5 @@ export const fetchRawProbes = async (): Promise<Probe[]> => {
9091
};
9192

9293
export const adoptedProbes = new AdoptedProbes(client, fetchRawProbes);
94+
95+
export const probeIpLimit = new ProbeIpLimit(fetchProbes, fetchRawSockets);

src/probe/builder.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import type GeoipClient from '../lib/geoip/client.js';
1717
import getProbeIp from '../lib/get-probe-ip.js';
1818
import { getRegion } from '../lib/ip-ranges.js';
1919
import type { Probe, ProbeLocation, Tag } from './types.js';
20-
import { verifyIpLimit } from '../lib/ws/helper/probe-ip-limit.js';
20+
import { probeIpLimit } from '../lib/ws/server.js';
2121
import { fakeLookup } from '../lib/geoip/fake-client.js';
2222

2323
let geoipClient: GeoipClient;
@@ -62,7 +62,7 @@ export const buildProbe = async (socket: Socket): Promise<Probe> => {
6262
throw new Error(`couldn't detect probe location for ip ${ip}`);
6363
}
6464

65-
await verifyIpLimit(ip, socket.id);
65+
await probeIpLimit.verifyIpLimit(ip, socket.id);
6666

6767
const location = getLocation(ipInfo);
6868

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import * as sinon from 'sinon';
2+
import { expect } from 'chai';
3+
4+
import { ProbeIpLimit } from '../../../../src/lib/ws/helper/probe-ip-limit.js';
5+
6+
describe('ProbeIpLimit', () => {
7+
const sandbox = sinon.createSandbox();
8+
const fetchProbes = sandbox.stub();
9+
const fetchRawSockets = sandbox.stub();
10+
11+
const getSocket = (id: string, ip: string) => ({
12+
id,
13+
data: { probe: { client: id, ipAddress: ip } },
14+
disconnect: sandbox.stub(),
15+
});
16+
17+
it('syncIpLimit should disconnect duplicates', async () => {
18+
const socket1 = getSocket('a', '1.1.1.1');
19+
const socket2 = getSocket('b', '2.2.2.2');
20+
const duplicate = getSocket('c', '2.2.2.2');
21+
22+
fetchProbes.resolves([
23+
socket1.data.probe,
24+
socket2.data.probe,
25+
duplicate.data.probe,
26+
]);
27+
28+
fetchRawSockets.resolves([
29+
socket1,
30+
socket2,
31+
duplicate,
32+
]);
33+
34+
const probeIpLimit = new ProbeIpLimit(fetchProbes, fetchRawSockets);
35+
await probeIpLimit.syncIpLimit();
36+
37+
expect(socket1.disconnect.callCount).to.equal(0);
38+
expect(socket2.disconnect.callCount).to.equal(0);
39+
expect(duplicate.disconnect.callCount).to.equal(1);
40+
});
41+
42+
it('syncIpLimit should preserve socket with the earliest id', async () => {
43+
const socket1 = getSocket('a', '1.1.1.1');
44+
const socket2 = getSocket('b', '2.2.2.2');
45+
const duplicate1 = getSocket('c', '2.2.2.2');
46+
const duplicate2 = getSocket('d', '2.2.2.2');
47+
48+
fetchProbes.resolves([
49+
socket1.data.probe,
50+
duplicate1.data.probe,
51+
socket2.data.probe,
52+
duplicate2.data.probe,
53+
]);
54+
55+
fetchRawSockets.resolves([
56+
socket1,
57+
duplicate1,
58+
socket2,
59+
duplicate2,
60+
]);
61+
62+
const probeIpLimit = new ProbeIpLimit(fetchProbes, fetchRawSockets);
63+
await probeIpLimit.syncIpLimit();
64+
65+
expect(socket1.disconnect.callCount).to.equal(0);
66+
expect(socket2.disconnect.callCount).to.equal(0);
67+
expect(duplicate1.disconnect.callCount).to.equal(1);
68+
expect(duplicate2.disconnect.callCount).to.equal(1);
69+
});
70+
});

0 commit comments

Comments
 (0)