Skip to content

Commit

Permalink
feat: sync probe data to the dashboard db
Browse files Browse the repository at this point in the history
  • Loading branch information
alexey-yarmosh committed Oct 23, 2023
1 parent 4df5e83 commit ce69144
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 14 deletions.
11 changes: 10 additions & 1 deletion config/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ CREATE TABLE IF NOT EXISTS adopted_probes (
userId VARCHAR(255) NOT NULL,
ip VARCHAR(255) NOT NULL,
uuid VARCHAR(255),
lastSyncDate DATE
lastSyncDate DATE,
status VARCHAR(255),
version VARCHAR(255),
country VARCHAR(255),
city VARCHAR(255),
latitude FLOAT,
longitude FLOAT,
asn INTEGER,
network VARCHAR(255)
);

INSERT IGNORE INTO adopted_probes (id, userId, ip) VALUES ('1', '6191378', '79.205.97.254');
10 changes: 5 additions & 5 deletions docs/geoip.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ New data state:
"ip2location": {
"country": "US",
"city": "Long Beach",
"lat": 33.790287,
"lat": 33.790287,
"long": -118.193769,
"asn": "0002"
},
Expand Down Expand Up @@ -53,7 +53,7 @@ New data state:

### Step 2

Apply city approximation. For every provider we are checking if it's city field value is in the [popular DC cities list](../src/lib/geoip/dc-cities.json).
Apply city approximation. For every provider we are checking if it's city field value is in the [DC cities list](../src/lib/geoip/dc-cities.json).

If value is in the list, then city approximation is not required and city value from provider is kept.

Expand All @@ -77,7 +77,7 @@ New data state:
"ip2location": {
"country": "US",
"city": "Los Angeles", // provider's "Long Beach" city is not in the DC cities list and approximated city is "Los Angeles", so "Los Angeles" is a new city value
"lat": 33.790287,
"lat": 33.790287,
"long": -118.193769,
"asn": "0002"
},
Expand Down Expand Up @@ -132,8 +132,8 @@ New data state:
},
"ip2location": { // group "Los Angeles"
"country": "US",
"city": "Los Angeles",
"lat": 33.790287,
"city": "Los Angeles",
"lat": 33.790287,
"long": -118.193769,
"asn": "0002"
},
Expand Down
64 changes: 56 additions & 8 deletions src/lib/adopted-probes.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import type { Knex } from 'knex';
import Bluebird from 'bluebird';
import _ from 'lodash';

import { scopedLogger } from './logger.js';
import { client } from './sql/client.js';
import { fetchSockets } from './ws/server.js';
import type { Probe } from '../probe/types.js';

const logger = scopedLogger('adopted-probes');

Expand All @@ -13,11 +15,29 @@ type AdoptedProbe = {
ip: string;
uuid: string;
lastSyncDate: string;
status: string;
version: string;
country: string;
city: string;
latitude: number;
longitude: number;
asn: number;
network: string;
}

export class AdoptedProbes {
private connectedIpToUuid: Map<string, string> = new Map();
private connectedIpToProbe: Map<string, Probe> = new Map();
private connectedUuidToIp: Map<string, string> = new Map();
private readonly adoptedFieldToConnectedField = {
status: 'status',
version: 'version',
country: 'location.country',
city: 'location.city',
latitude: 'location.latitude',
longitude: 'location.longitude',
asn: 'location.asn',
network: 'location.network',
};

constructor (
private readonly sql: Knex,
Expand All @@ -34,23 +54,24 @@ export class AdoptedProbes {

async syncDashboardData () {
const allSockets = await this.fetchWsSockets();
this.connectedIpToUuid = new Map(allSockets.map(socket => [ socket.data.probe.ipAddress, socket.data.probe.uuid ]));
this.connectedIpToProbe = new Map(allSockets.map(socket => [ socket.data.probe.ipAddress, socket.data.probe ]));
this.connectedUuidToIp = new Map(allSockets.map(socket => [ socket.data.probe.uuid, socket.data.probe.ipAddress ]));

const adoptedProbes = await this.sql(TABLE_NAME).select<AdoptedProbe[]>('ip', 'uuid', 'lastSyncDate');
const adoptedProbes = await this.sql(TABLE_NAME).select<AdoptedProbe[]>('ip', 'uuid', 'lastSyncDate', ...Object.keys(this.adoptedFieldToConnectedField));
await Bluebird.map(adoptedProbes, ({ ip, uuid }) => this.syncProbeIds(ip, uuid), { concurrency: 8 });
await Bluebird.map(adoptedProbes, adoptedProbe => this.syncProbeData(adoptedProbe), { concurrency: 8 });
await Bluebird.map(adoptedProbes, ({ ip, lastSyncDate }) => this.updateSyncDate(ip, lastSyncDate), { concurrency: 8 });
}

private async syncProbeIds (ip: string, uuid: string) {
const connectedUuid = this.connectedIpToUuid.get(ip);
const connectedProbe = this.connectedIpToProbe.get(ip);

if (connectedUuid && connectedUuid === uuid) { // ip and uuid are synced
if (connectedProbe && connectedProbe.uuid === uuid) { // ip and uuid are synced
return;
}

if (connectedUuid && connectedUuid !== uuid) { // uuid was found, but it is outdated
await this.updateUuid(ip, connectedUuid);
if (connectedProbe && connectedProbe.uuid !== uuid) { // uuid was found, but it is outdated
await this.updateUuid(ip, connectedProbe.uuid);
return;
}

Expand All @@ -61,12 +82,35 @@ export class AdoptedProbes {
}
}

private async syncProbeData (adoptedProbe: AdoptedProbe) {
const connectedProbe = this.connectedIpToProbe.get(adoptedProbe.ip);

if (!connectedProbe) {
return;
}

const updateObject: Record<string, string | number> = {};

Object.entries(this.adoptedFieldToConnectedField).forEach(([ adoptedField, connectedField ]) => {
const adoptedValue = _.get(adoptedProbe, adoptedField) as string | number;
const connectedValue = _.get(connectedProbe, connectedField) as string | number;

if (adoptedValue !== connectedValue) {
updateObject[adoptedField] = connectedValue;
}
});

if (!_.isEmpty(updateObject)) {
await this.updateProbeData(adoptedProbe.ip, updateObject);
}
}

private async updateSyncDate (ip: string, lastSyncDate: string) {
if (this.isToday(lastSyncDate)) { // date is already synced
return;
}

const probeIsConnected = this.connectedIpToUuid.has(ip);
const probeIsConnected = this.connectedIpToProbe.has(ip);

if (probeIsConnected) { // date is old, but probe is connected, therefore updating the sync date
await this.updateLastSyncDate(ip);
Expand All @@ -86,6 +130,10 @@ export class AdoptedProbes {
await this.sql(TABLE_NAME).where({ uuid }).update({ ip });
}

private async updateProbeData (ip: string, updateObject: Record<string, string | number>) {
await this.sql(TABLE_NAME).where({ ip }).update(updateObject);
}

private async updateLastSyncDate (ip: string) {
await this.sql(TABLE_NAME).where({ ip }).update({ lastSyncDate: new Date() });
}
Expand Down

0 comments on commit ce69144

Please sign in to comment.