Skip to content

Commit

Permalink
feat: sync adopted probe data (#441)
Browse files Browse the repository at this point in the history
* feat: sync probe data to the dashboard db

* test: add unit tests

* feat: add default db to the dev mariadb
  • Loading branch information
alexey-yarmosh authored Oct 25, 2023
1 parent 4df5e83 commit 60b8312
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 21 deletions.
11 changes: 10 additions & 1 deletion config/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ CREATE TABLE IF NOT EXISTS adopted_probes (
userId VARCHAR(255) NOT NULL,
ip VARCHAR(255) NOT NULL,
uuid VARCHAR(255),
lastSyncDate DATE
lastSyncDate DATE,
status VARCHAR(255),
version VARCHAR(255),
country VARCHAR(255),
city VARCHAR(255),
latitude FLOAT,
longitude FLOAT,
asn INTEGER,
network VARCHAR(255)
);

INSERT IGNORE INTO adopted_probes (id, userId, ip) VALUES ('1', '6191378', '79.205.97.254');
7 changes: 4 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ services:
mariadb:
image: mariadb:10.11.5
environment:
- MARIADB_RANDOM_ROOT_PASSWORD=1
- MARIADB_USER=directus
- MARIADB_PASSWORD=password
- MARIADB_DATABASE=directus
- MARIADB_USER=directus
- MARIADB_PASSWORD=password
- MARIADB_RANDOM_ROOT_PASSWORD=1
ports:
- "3306:3306"
volumes:
Expand Down
10 changes: 5 additions & 5 deletions docs/geoip.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ New data state:
"ip2location": {
"country": "US",
"city": "Long Beach",
"lat": 33.790287,
"lat": 33.790287,
"long": -118.193769,
"asn": "0002"
},
Expand Down Expand Up @@ -53,7 +53,7 @@ New data state:

### Step 2

Apply city approximation. For every provider we are checking if it's city field value is in the [popular DC cities list](../src/lib/geoip/dc-cities.json).
Apply city approximation. For every provider we are checking if it's city field value is in the [DC cities list](../src/lib/geoip/dc-cities.json).

If value is in the list, then city approximation is not required and city value from provider is kept.

Expand All @@ -77,7 +77,7 @@ New data state:
"ip2location": {
"country": "US",
"city": "Los Angeles", // provider's "Long Beach" city is not in the DC cities list and approximated city is "Los Angeles", so "Los Angeles" is a new city value
"lat": 33.790287,
"lat": 33.790287,
"long": -118.193769,
"asn": "0002"
},
Expand Down Expand Up @@ -132,8 +132,8 @@ New data state:
},
"ip2location": { // group "Los Angeles"
"country": "US",
"city": "Los Angeles",
"lat": 33.790287,
"city": "Los Angeles",
"lat": 33.790287,
"long": -118.193769,
"asn": "0002"
},
Expand Down
64 changes: 56 additions & 8 deletions src/lib/adopted-probes.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import type { Knex } from 'knex';
import Bluebird from 'bluebird';
import _ from 'lodash';

import { scopedLogger } from './logger.js';
import { client } from './sql/client.js';
import { fetchSockets } from './ws/server.js';
import type { Probe } from '../probe/types.js';

const logger = scopedLogger('adopted-probes');

Expand All @@ -13,11 +15,29 @@ type AdoptedProbe = {
ip: string;
uuid: string;
lastSyncDate: string;
status: string;
version: string;
country: string;
city: string;
latitude: number;
longitude: number;
asn: number;
network: string;
}

export class AdoptedProbes {
private connectedIpToUuid: Map<string, string> = new Map();
private connectedIpToProbe: Map<string, Probe> = new Map();
private connectedUuidToIp: Map<string, string> = new Map();
private readonly adoptedFieldToConnectedField = {
status: 'status',
version: 'version',
country: 'location.country',
city: 'location.city',
latitude: 'location.latitude',
longitude: 'location.longitude',
asn: 'location.asn',
network: 'location.network',
};

constructor (
private readonly sql: Knex,
Expand All @@ -34,23 +54,24 @@ export class AdoptedProbes {

async syncDashboardData () {
const allSockets = await this.fetchWsSockets();
this.connectedIpToUuid = new Map(allSockets.map(socket => [ socket.data.probe.ipAddress, socket.data.probe.uuid ]));
this.connectedIpToProbe = new Map(allSockets.map(socket => [ socket.data.probe.ipAddress, socket.data.probe ]));
this.connectedUuidToIp = new Map(allSockets.map(socket => [ socket.data.probe.uuid, socket.data.probe.ipAddress ]));

const adoptedProbes = await this.sql(TABLE_NAME).select<AdoptedProbe[]>('ip', 'uuid', 'lastSyncDate');
const adoptedProbes = await this.sql(TABLE_NAME).select<AdoptedProbe[]>('ip', 'uuid', 'lastSyncDate', ...Object.keys(this.adoptedFieldToConnectedField));
await Bluebird.map(adoptedProbes, ({ ip, uuid }) => this.syncProbeIds(ip, uuid), { concurrency: 8 });
await Bluebird.map(adoptedProbes, adoptedProbe => this.syncProbeData(adoptedProbe), { concurrency: 8 });
await Bluebird.map(adoptedProbes, ({ ip, lastSyncDate }) => this.updateSyncDate(ip, lastSyncDate), { concurrency: 8 });
}

private async syncProbeIds (ip: string, uuid: string) {
const connectedUuid = this.connectedIpToUuid.get(ip);
const connectedProbe = this.connectedIpToProbe.get(ip);

if (connectedUuid && connectedUuid === uuid) { // ip and uuid are synced
if (connectedProbe && connectedProbe.uuid === uuid) { // ip and uuid are synced
return;
}

if (connectedUuid && connectedUuid !== uuid) { // uuid was found, but it is outdated
await this.updateUuid(ip, connectedUuid);
if (connectedProbe && connectedProbe.uuid !== uuid) { // uuid was found, but it is outdated
await this.updateUuid(ip, connectedProbe.uuid);
return;
}

Expand All @@ -61,12 +82,35 @@ export class AdoptedProbes {
}
}

private async syncProbeData (adoptedProbe: AdoptedProbe) {
const connectedProbe = this.connectedIpToProbe.get(adoptedProbe.ip);

if (!connectedProbe) {
return;
}

const updateObject: Record<string, string | number> = {};

Object.entries(this.adoptedFieldToConnectedField).forEach(([ adoptedField, connectedField ]) => {
const adoptedValue = _.get(adoptedProbe, adoptedField) as string | number;
const connectedValue = _.get(connectedProbe, connectedField) as string | number;

if (adoptedValue !== connectedValue) {
updateObject[adoptedField] = connectedValue;
}
});

if (!_.isEmpty(updateObject)) {
await this.updateProbeData(adoptedProbe.ip, updateObject);
}
}

private async updateSyncDate (ip: string, lastSyncDate: string) {
if (this.isToday(lastSyncDate)) { // date is already synced
return;
}

const probeIsConnected = this.connectedIpToUuid.has(ip);
const probeIsConnected = this.connectedIpToProbe.has(ip);

if (probeIsConnected) { // date is old, but probe is connected, therefore updating the sync date
await this.updateLastSyncDate(ip);
Expand All @@ -86,6 +130,10 @@ export class AdoptedProbes {
await this.sql(TABLE_NAME).where({ uuid }).update({ ip });
}

private async updateProbeData (ip: string, updateObject: Record<string, string | number>) {
await this.sql(TABLE_NAME).where({ ip }).update(updateObject);
}

private async updateLastSyncDate (ip: string) {
await this.sql(TABLE_NAME).where({ ip }).update({ lastSyncDate: new Date() });
}
Expand Down
107 changes: 103 additions & 4 deletions test/tests/unit/adopted-probes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ describe('AdoptedProbes', () => {
sandbox.restore();
});

it('startSync method should sync the data and start regular syncs', async () => {
it('syncDashboardData method should sync the data', async () => {
const adoptedProbes = new AdoptedProbes(sqlStub as unknown as Knex, fetchSocketsStub);
selectStub.resolves([{ ip: '1.1.1.1', uuid: '1-1-1-1-1', lastSyncDate: '1970-01-01' }]);

Expand All @@ -39,7 +39,8 @@ describe('AdoptedProbes', () => {
expect(sqlStub.callCount).to.equal(1);
expect(sqlStub.args[0]).deep.equal([ 'adopted_probes' ]);
expect(selectStub.callCount).to.equal(1);
expect(selectStub.args[0]).deep.equal([ 'ip', 'uuid', 'lastSyncDate' ]);

expect(selectStub.args[0]).deep.equal([ 'ip', 'uuid', 'lastSyncDate', 'status', 'version', 'country', 'city', 'latitude', 'longitude', 'asn', 'network' ]);
});

it('class should update uuid if it is wrong', async () => {
Expand Down Expand Up @@ -80,7 +81,7 @@ describe('AdoptedProbes', () => {
expect(deleteStub.callCount).to.equal(0);
});

it('class should do nothing if adopted probe was not found and lastSyncDate > 30 days away', async () => {
it('class should delete adoption if adopted probe was not found and lastSyncDate > 30 days away', async () => {
const adoptedProbes = new AdoptedProbes(sqlStub as unknown as Knex, fetchSocketsStub);
selectStub.resolves([{ ip: '1.1.1.1', uuid: '1-1-1-1-1', lastSyncDate: '1969-11-15' }]);
fetchSocketsStub.resolves([]);
Expand All @@ -91,7 +92,6 @@ describe('AdoptedProbes', () => {
expect(whereStub.args[0]).to.deep.equal([{ ip: '1.1.1.1' }]);
expect(updateStub.callCount).to.equal(0);
expect(deleteStub.callCount).to.equal(1);
expect(deleteStub.callCount).to.equal(1);
});

it('class should do nothing if probe data is actual', async () => {
Expand Down Expand Up @@ -144,4 +144,103 @@ describe('AdoptedProbes', () => {
expect(updateStub.callCount).to.equal(0);
expect(deleteStub.callCount).to.equal(0);
});

it('class should update probe meta info if it is outdated', async () => {
const adoptedProbes = new AdoptedProbes(sqlStub as unknown as Knex, fetchSocketsStub);
selectStub.resolves([{
ip: '1.1.1.1',
uuid: '1-1-1-1-1',
lastSyncDate: '1970-01-01',
status: 'ready',
version: '0.26.0',
country: 'IE',
city: 'Dublin',
latitude: 53.3331,
longitude: -6.2489,
asn: 16509,
network: 'Amazon.com, Inc.',
}]);

fetchSocketsStub.resolves([{
data: {
probe: {
ipAddress: '1.1.1.1',
uuid: '1-1-1-1-1',
status: 'initializing',
version: '0.27.0',
nodeVersion: 'v18.17.0',
location: {
continent: 'EU',
region: 'Northern Europe',
country: 'GB',
city: 'London',
asn: 20473,
latitude: 53.3331,
longitude: -6.2489,
network: 'The Constant Company, LLC',
},
},
},
}]);

await adoptedProbes.syncDashboardData();

expect(whereStub.callCount).to.equal(1);
expect(whereStub.args[0]).to.deep.equal([{ ip: '1.1.1.1' }]);
expect(updateStub.callCount).to.equal(1);

expect(updateStub.args[0]).to.deep.equal([{
status: 'initializing',
version: '0.27.0',
country: 'GB',
city: 'London',
asn: 20473,
network: 'The Constant Company, LLC',
}]);
});

it('class should update probe meta info if it is outdated', async () => {
const adoptedProbes = new AdoptedProbes(sqlStub as unknown as Knex, fetchSocketsStub);
selectStub.resolves([{
ip: '1.1.1.1',
uuid: '1-1-1-1-1',
lastSyncDate: '1970-01-01',
status: 'ready',
version: '0.26.0',
country: 'IE',
city: 'Dublin',
latitude: 53.3331,
longitude: -6.2489,
asn: 16509,
network: 'Amazon.com, Inc.',
}]);

fetchSocketsStub.resolves([{
data: {
probe: {
ipAddress: '1.1.1.1',
uuid: '1-1-1-1-1',
status: 'ready',
version: '0.26.0',
nodeVersion: 'v18.17.0',
location: {
continent: 'EU',
region: 'Northern Europe',
country: 'IE',
city: 'Dublin',
asn: 16509,
latitude: 53.3331,
longitude: -6.2489,
network: 'Amazon.com, Inc.',
},
},
},
}]);

await adoptedProbes.syncDashboardData();

expect(whereStub.callCount).to.equal(0);
expect(updateStub.callCount).to.equal(0);
expect(deleteStub.callCount).to.equal(0);
});
});

0 comments on commit 60b8312

Please sign in to comment.