Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sync adopted probe data #441

Merged
merged 3 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion config/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ CREATE TABLE IF NOT EXISTS adopted_probes (
userId VARCHAR(255) NOT NULL,
ip VARCHAR(255) NOT NULL,
uuid VARCHAR(255),
lastSyncDate DATE
lastSyncDate DATE,
status VARCHAR(255),
version VARCHAR(255),
country VARCHAR(255),
city VARCHAR(255),
latitude FLOAT,
longitude FLOAT,
asn INTEGER,
network VARCHAR(255)
);

INSERT IGNORE INTO adopted_probes (id, userId, ip) VALUES ('1', '6191378', '79.205.97.254');
7 changes: 4 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ services:
mariadb:
image: mariadb:10.11.5
environment:
- MARIADB_RANDOM_ROOT_PASSWORD=1
- MARIADB_USER=directus
- MARIADB_PASSWORD=password
- MARIADB_DATABASE=directus
- MARIADB_USER=directus
- MARIADB_PASSWORD=password
- MARIADB_RANDOM_ROOT_PASSWORD=1
ports:
- "3306:3306"
volumes:
Expand Down
10 changes: 5 additions & 5 deletions docs/geoip.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ New data state:
"ip2location": {
"country": "US",
"city": "Long Beach",
"lat": 33.790287,
"lat": 33.790287,
"long": -118.193769,
"asn": "0002"
},
Expand Down Expand Up @@ -53,7 +53,7 @@ New data state:

### Step 2

Apply city approximation. For every provider we are checking if it's city field value is in the [popular DC cities list](../src/lib/geoip/dc-cities.json).
Apply city approximation. For every provider we are checking if it's city field value is in the [DC cities list](../src/lib/geoip/dc-cities.json).

If value is in the list, then city approximation is not required and city value from provider is kept.

Expand All @@ -77,7 +77,7 @@ New data state:
"ip2location": {
"country": "US",
"city": "Los Angeles", // provider's "Long Beach" city is not in the DC cities list and approximated city is "Los Angeles", so "Los Angeles" is a new city value
"lat": 33.790287,
"lat": 33.790287,
"long": -118.193769,
"asn": "0002"
},
Expand Down Expand Up @@ -132,8 +132,8 @@ New data state:
},
"ip2location": { // group "Los Angeles"
"country": "US",
"city": "Los Angeles",
"lat": 33.790287,
"city": "Los Angeles",
"lat": 33.790287,
"long": -118.193769,
"asn": "0002"
},
Expand Down
64 changes: 56 additions & 8 deletions src/lib/adopted-probes.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import type { Knex } from 'knex';
import Bluebird from 'bluebird';
import _ from 'lodash';

import { scopedLogger } from './logger.js';
import { client } from './sql/client.js';
import { fetchSockets } from './ws/server.js';
import type { Probe } from '../probe/types.js';

const logger = scopedLogger('adopted-probes');

Expand All @@ -13,11 +15,29 @@ type AdoptedProbe = {
ip: string;
uuid: string;
lastSyncDate: string;
status: string;
version: string;
country: string;
city: string;
latitude: number;
longitude: number;
asn: number;
network: string;
}

export class AdoptedProbes {
private connectedIpToUuid: Map<string, string> = new Map();
private connectedIpToProbe: Map<string, Probe> = new Map();
private connectedUuidToIp: Map<string, string> = new Map();
private readonly adoptedFieldToConnectedField = {
status: 'status',
version: 'version',
country: 'location.country',
city: 'location.city',
latitude: 'location.latitude',
longitude: 'location.longitude',
asn: 'location.asn',
network: 'location.network',
};

constructor (
private readonly sql: Knex,
Expand All @@ -34,23 +54,24 @@ export class AdoptedProbes {

async syncDashboardData () {
const allSockets = await this.fetchWsSockets();
this.connectedIpToUuid = new Map(allSockets.map(socket => [ socket.data.probe.ipAddress, socket.data.probe.uuid ]));
this.connectedIpToProbe = new Map(allSockets.map(socket => [ socket.data.probe.ipAddress, socket.data.probe ]));
this.connectedUuidToIp = new Map(allSockets.map(socket => [ socket.data.probe.uuid, socket.data.probe.ipAddress ]));

const adoptedProbes = await this.sql(TABLE_NAME).select<AdoptedProbe[]>('ip', 'uuid', 'lastSyncDate');
const adoptedProbes = await this.sql(TABLE_NAME).select<AdoptedProbe[]>('ip', 'uuid', 'lastSyncDate', ...Object.keys(this.adoptedFieldToConnectedField));
await Bluebird.map(adoptedProbes, ({ ip, uuid }) => this.syncProbeIds(ip, uuid), { concurrency: 8 });
await Bluebird.map(adoptedProbes, adoptedProbe => this.syncProbeData(adoptedProbe), { concurrency: 8 });
await Bluebird.map(adoptedProbes, ({ ip, lastSyncDate }) => this.updateSyncDate(ip, lastSyncDate), { concurrency: 8 });
}

private async syncProbeIds (ip: string, uuid: string) {
const connectedUuid = this.connectedIpToUuid.get(ip);
const connectedProbe = this.connectedIpToProbe.get(ip);

if (connectedUuid && connectedUuid === uuid) { // ip and uuid are synced
if (connectedProbe && connectedProbe.uuid === uuid) { // ip and uuid are synced
return;
}

if (connectedUuid && connectedUuid !== uuid) { // uuid was found, but it is outdated
await this.updateUuid(ip, connectedUuid);
if (connectedProbe && connectedProbe.uuid !== uuid) { // uuid was found, but it is outdated
await this.updateUuid(ip, connectedProbe.uuid);
return;
}

Expand All @@ -61,12 +82,35 @@ export class AdoptedProbes {
}
}

private async syncProbeData (adoptedProbe: AdoptedProbe) {
const connectedProbe = this.connectedIpToProbe.get(adoptedProbe.ip);

if (!connectedProbe) {
return;
}

const updateObject: Record<string, string | number> = {};

Object.entries(this.adoptedFieldToConnectedField).forEach(([ adoptedField, connectedField ]) => {
const adoptedValue = _.get(adoptedProbe, adoptedField) as string | number;
const connectedValue = _.get(connectedProbe, connectedField) as string | number;

if (adoptedValue !== connectedValue) {
updateObject[adoptedField] = connectedValue;
}
});

if (!_.isEmpty(updateObject)) {
await this.updateProbeData(adoptedProbe.ip, updateObject);
}
}

private async updateSyncDate (ip: string, lastSyncDate: string) {
if (this.isToday(lastSyncDate)) { // date is already synced
return;
}

const probeIsConnected = this.connectedIpToUuid.has(ip);
const probeIsConnected = this.connectedIpToProbe.has(ip);

if (probeIsConnected) { // date is old, but probe is connected, therefore updating the sync date
await this.updateLastSyncDate(ip);
Expand All @@ -86,6 +130,10 @@ export class AdoptedProbes {
await this.sql(TABLE_NAME).where({ uuid }).update({ ip });
}

private async updateProbeData (ip: string, updateObject: Record<string, string | number>) {
await this.sql(TABLE_NAME).where({ ip }).update(updateObject);
}

private async updateLastSyncDate (ip: string) {
await this.sql(TABLE_NAME).where({ ip }).update({ lastSyncDate: new Date() });
}
Expand Down
107 changes: 103 additions & 4 deletions test/tests/unit/adopted-probes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ describe('AdoptedProbes', () => {
sandbox.restore();
});

it('startSync method should sync the data and start regular syncs', async () => {
it('syncDashboardData method should sync the data', async () => {
const adoptedProbes = new AdoptedProbes(sqlStub as unknown as Knex, fetchSocketsStub);
selectStub.resolves([{ ip: '1.1.1.1', uuid: '1-1-1-1-1', lastSyncDate: '1970-01-01' }]);

Expand All @@ -39,7 +39,8 @@ describe('AdoptedProbes', () => {
expect(sqlStub.callCount).to.equal(1);
expect(sqlStub.args[0]).deep.equal([ 'adopted_probes' ]);
expect(selectStub.callCount).to.equal(1);
expect(selectStub.args[0]).deep.equal([ 'ip', 'uuid', 'lastSyncDate' ]);

expect(selectStub.args[0]).deep.equal([ 'ip', 'uuid', 'lastSyncDate', 'status', 'version', 'country', 'city', 'latitude', 'longitude', 'asn', 'network' ]);
});

it('class should update uuid if it is wrong', async () => {
Expand Down Expand Up @@ -80,7 +81,7 @@ describe('AdoptedProbes', () => {
expect(deleteStub.callCount).to.equal(0);
});

it('class should do nothing if adopted probe was not found and lastSyncDate > 30 days away', async () => {
it('class should delete adoption if adopted probe was not found and lastSyncDate > 30 days away', async () => {
const adoptedProbes = new AdoptedProbes(sqlStub as unknown as Knex, fetchSocketsStub);
selectStub.resolves([{ ip: '1.1.1.1', uuid: '1-1-1-1-1', lastSyncDate: '1969-11-15' }]);
fetchSocketsStub.resolves([]);
Expand All @@ -91,7 +92,6 @@ describe('AdoptedProbes', () => {
expect(whereStub.args[0]).to.deep.equal([{ ip: '1.1.1.1' }]);
expect(updateStub.callCount).to.equal(0);
expect(deleteStub.callCount).to.equal(1);
expect(deleteStub.callCount).to.equal(1);
});

it('class should do nothing if probe data is actual', async () => {
Expand Down Expand Up @@ -144,4 +144,103 @@ describe('AdoptedProbes', () => {
expect(updateStub.callCount).to.equal(0);
expect(deleteStub.callCount).to.equal(0);
});

it('class should update probe meta info if it is outdated', async () => {
const adoptedProbes = new AdoptedProbes(sqlStub as unknown as Knex, fetchSocketsStub);
selectStub.resolves([{
ip: '1.1.1.1',
uuid: '1-1-1-1-1',
lastSyncDate: '1970-01-01',
status: 'ready',
version: '0.26.0',
country: 'IE',
city: 'Dublin',
latitude: 53.3331,
longitude: -6.2489,
asn: 16509,
network: 'Amazon.com, Inc.',
}]);

fetchSocketsStub.resolves([{
data: {
probe: {
ipAddress: '1.1.1.1',
uuid: '1-1-1-1-1',
status: 'initializing',
version: '0.27.0',
nodeVersion: 'v18.17.0',
location: {
continent: 'EU',
region: 'Northern Europe',
country: 'GB',
city: 'London',
asn: 20473,
latitude: 53.3331,
longitude: -6.2489,
network: 'The Constant Company, LLC',
},
},
},
}]);

await adoptedProbes.syncDashboardData();

expect(whereStub.callCount).to.equal(1);
expect(whereStub.args[0]).to.deep.equal([{ ip: '1.1.1.1' }]);
expect(updateStub.callCount).to.equal(1);

expect(updateStub.args[0]).to.deep.equal([{
status: 'initializing',
version: '0.27.0',
country: 'GB',
city: 'London',
asn: 20473,
network: 'The Constant Company, LLC',
}]);
});

it('class should update probe meta info if it is outdated', async () => {
const adoptedProbes = new AdoptedProbes(sqlStub as unknown as Knex, fetchSocketsStub);
selectStub.resolves([{
ip: '1.1.1.1',
uuid: '1-1-1-1-1',
lastSyncDate: '1970-01-01',
status: 'ready',
version: '0.26.0',
country: 'IE',
city: 'Dublin',
latitude: 53.3331,
longitude: -6.2489,
asn: 16509,
network: 'Amazon.com, Inc.',
}]);

fetchSocketsStub.resolves([{
data: {
probe: {
ipAddress: '1.1.1.1',
uuid: '1-1-1-1-1',
status: 'ready',
version: '0.26.0',
nodeVersion: 'v18.17.0',
location: {
continent: 'EU',
region: 'Northern Europe',
country: 'IE',
city: 'Dublin',
asn: 16509,
latitude: 53.3331,
longitude: -6.2489,
network: 'Amazon.com, Inc.',
},
},
},
}]);

await adoptedProbes.syncDashboardData();

expect(whereStub.callCount).to.equal(0);
expect(updateStub.callCount).to.equal(0);
expect(deleteStub.callCount).to.equal(0);
});
});