Skip to content

Commit

Permalink
feat: replace magic with id in locations field with actual location
Browse files Browse the repository at this point in the history
  • Loading branch information
alexey-yarmosh committed Dec 8, 2023
1 parent e649720 commit c5e0a35
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 172 deletions.
6 changes: 3 additions & 3 deletions src/measurement/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type { Probe } from '../probe/types.js';
import { getMetricsAgent, type MetricsAgent } from '../lib/metrics.js';
import type { MeasurementStore } from './store.js';
import { getMeasurementStore } from './store.js';
import type { MeasurementRequest, MeasurementResultMessage, MeasurementProgressMessage } from './types.js';
import type { MeasurementRequest, MeasurementResultMessage, MeasurementProgressMessage, UserRequest } from './types.js';
import { rateLimit } from '../lib/ratelimiter.js';

export class MeasurementRunner {
Expand All @@ -21,8 +21,8 @@ export class MeasurementRunner {
) {}

async run (ctx: Context): Promise<{measurementId: string; probesCount: number;}> {
const request = ctx.request.body as MeasurementRequest;
const { onlineProbesMap, allProbes } = await this.router.findMatchingProbes(request.locations, request.limit);
const userRequest = ctx.request.body as UserRequest;
const { onlineProbesMap, allProbes, request } = await this.router.findMatchingProbes(userRequest);

if (allProbes.length === 0) {
throw createHttpError(422, 'No suitable probes found.', { type: 'no_probes_found' });
Expand Down
8 changes: 7 additions & 1 deletion src/measurement/schema/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ export const globalIpOptions: {version: string[]; cidr: PresenceMode} = { versio

export const GLOBAL_DEFAULTS = {
locations: [],
limit: (request: MeasurementRequest) => request.locations.length || 1,
limit: (request: MeasurementRequest) => {
if (typeof request.locations === 'string') {
return 1;
}

return request.locations?.length || 1;
},
inProgressUpdates: false,
};

Expand Down
22 changes: 3 additions & 19 deletions src/measurement/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type { OfflineProbe, Probe } from '../probe/types.js';
import type { RedisClient } from '../lib/redis/client.js';
import { getRedisClient } from '../lib/redis/client.js';
import { scopedLogger } from '../lib/logger.js';
import type { MeasurementRecord, MeasurementResult, MeasurementRequest, MeasurementProgressMessage, RequestType, MeasurementResultMessage, LocationWithLimit } from './types.js';
import type { MeasurementRecord, MeasurementResult, MeasurementRequest, MeasurementProgressMessage, RequestType, MeasurementResultMessage } from './types.js';
import { getDefaults } from './schema/utils.js';

const logger = scopedLogger('store');
Expand Down Expand Up @@ -57,18 +57,6 @@ export class MeasurementStore {
return ips || [];
}

async getLimitAndLocations (id: string) {
const response = await this.redis.json.get(getMeasurementKey(id), { path: [ '$.limit', '$.locations' ] }) as {
'$.limit': number[];
'$.locations': LocationWithLimit[][];
} | null;

return {
limit: response ? response['$.limit'][0] : undefined,
locations: response ? response['$.locations'][0] : undefined,
};
}

async createMeasurement (request: MeasurementRequest, onlineProbesMap: Map<number, Probe>, allProbes: (Probe | OfflineProbe)[]): Promise<string> {
const id = cryptoRandomString({ length: 16, type: 'alphanumeric' });
const key = getMeasurementKey(id);
Expand All @@ -92,20 +80,16 @@ export class MeasurementStore {
async getInitialMeasurement (id: string, request: MeasurementRequest, allProbes: (Probe | OfflineProbe)[], startTime: Date) {
const results = this.probesToResults(allProbes, request.type);

const { limit, locations } = typeof request.locations === 'string'
? await this.getLimitAndLocations(request.locations)
: { limit: request.limit, locations: request.locations };

const measurement: Partial<MeasurementRecord> = {
id,
type: request.type,
status: 'in-progress',
createdAt: startTime.toISOString(),
updatedAt: startTime.toISOString(),
target: request.target,
...(limit && { limit }),
...(request.limit && { limit: request.limit }),
probesCount: allProbes.length,
...(locations && { locations }),
...(request.locations && { locations: request.locations }),
measurementOptions: request.measurementOptions,
results,
};
Expand Down
9 changes: 7 additions & 2 deletions src/measurement/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,17 @@ export type LocationWithLimit = Location & {limit?: number};
* Measurement Objects
*/

export type UserRequest = Omit<MeasurementRequest, 'locations' | 'limit'> & {
locations: LocationWithLimit[] | string;
limit: number;
}

export type MeasurementRequest = {
type: 'ping' | 'traceroute' | 'dns' | 'http' | 'mtr';
target: string;
measurementOptions: MeasurementOptions;
locations: LocationWithLimit[] | string;
limit: number;
locations: LocationWithLimit[] | undefined;
limit: number | undefined;
inProgressUpdates: boolean;
};

Expand Down
58 changes: 32 additions & 26 deletions src/probe/router.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import _ from 'lodash';
import { fetchSockets } from '../lib/ws/fetch-sockets.js';
import type { LocationWithLimit, MeasurementRecord, MeasurementResult } from '../measurement/types.js';
import type { LocationWithLimit, MeasurementRequest, MeasurementResult, UserRequest } from '../measurement/types.js';
import type { Location } from '../lib/location/types.js';
import type { OfflineProbe, Probe } from './types.js';
import { ProbesLocationFilter } from './probes-location-filter.js';
Expand All @@ -15,38 +15,43 @@ export class ProbeRouter {
private readonly store: MeasurementStore,
) {}

public async findMatchingProbes (
locations: LocationWithLimit[] | string = [],
globalLimit = 1,
) {
public async findMatchingProbes (userRequest: UserRequest): Promise<{
onlineProbesMap: Map<number, Probe>;
allProbes: (Probe | OfflineProbe)[];
request: MeasurementRequest;
}> {
const locations = userRequest.locations ?? [];
const globalLimit = userRequest.limit ?? 1;

const connectedProbes = await this.fetchProbes();

if (typeof locations === 'string') {
return this.findWithMeasurementId(connectedProbes, locations);
return this.findWithMeasurementId(connectedProbes, locations, userRequest);
}

if (locations.some(l => l.limit)) {
const filtered = this.findWithLocationLimit(connectedProbes, locations);
return this.processFiltered(filtered, connectedProbes, locations);
return this.processFiltered(filtered, connectedProbes, locations, userRequest);
}

if (locations.length > 0) {
const filtered = this.findWithGlobalLimit(connectedProbes, locations, globalLimit);
return this.processFiltered(filtered, connectedProbes, locations);
return this.processFiltered(filtered, connectedProbes, locations, userRequest);
}

const filtered = this.findGloballyDistributed(connectedProbes, globalLimit);
return this.processFiltered(filtered, connectedProbes, locations);
return this.processFiltered(filtered, connectedProbes, locations, userRequest);
}

private async processFiltered (filtered: Probe[], connectedProbes: Probe[], locations: LocationWithLimit[]) {
private async processFiltered (filtered: Probe[], connectedProbes: Probe[], locations: LocationWithLimit[], request: UserRequest) {
if (filtered.length === 0 && locations.length === 1 && locations[0]?.magic) {
return this.findWithMeasurementId(connectedProbes, locations[0].magic);
return this.findWithMeasurementId(connectedProbes, locations[0].magic, request);
}

return {
allProbes: filtered,
onlineProbesMap: new Map(filtered.entries()),
request: request as MeasurementRequest,
};
}

Expand Down Expand Up @@ -89,19 +94,28 @@ export class ProbeRouter {
return [ ...picked ];
}

private async findWithMeasurementId (connectedProbes: Probe[], measurementId: string) {
private async findWithMeasurementId (connectedProbes: Probe[], measurementId: string, userRequest: UserRequest): Promise<{
onlineProbesMap: Map<number, Probe>;
allProbes: (Probe | OfflineProbe)[];
request: MeasurementRequest;
}> {
const ipToConnectedProbe = new Map(connectedProbes.map(probe => [ probe.ipAddress, probe ]));
const prevIps = await this.store.getMeasurementIps(measurementId);
const prevMeasurement = await this.store.getMeasurement(measurementId);

const onlineProbesMap: Map<number, Probe> = new Map();
const allProbes: (Probe | OfflineProbe)[] = [];

const emptyResult = { onlineProbesMap: new Map(), allProbes: [] } as {
const emptyResult = { onlineProbesMap: new Map(), allProbes: [], request: userRequest } as {
onlineProbesMap: Map<number, Probe>;
allProbes: (Probe | OfflineProbe)[];
request: MeasurementRequest;
};

let prevMeasurement: MeasurementRecord | null = null;
if (!prevMeasurement || prevIps.length === 0) {
return emptyResult;
}

const request: MeasurementRequest = { ...userRequest, limit: prevMeasurement.limit, locations: prevMeasurement.locations };
const onlineProbesMap: Map<number, Probe> = new Map();
const allProbes: (Probe | OfflineProbe)[] = [];

for (let i = 0; i < prevIps.length; i++) {
const ip = prevIps[i]!;
Expand All @@ -111,14 +125,6 @@ export class ProbeRouter {
onlineProbesMap.set(i, connectedProbe);
allProbes.push(connectedProbe);
} else {
if (!prevMeasurement) {
prevMeasurement = await this.store.getMeasurement(measurementId);

if (!prevMeasurement) {
return emptyResult;
}
}

const prevTest = prevMeasurement.results[i];

if (!prevTest) {
Expand All @@ -130,7 +136,7 @@ export class ProbeRouter {
}
}

return { onlineProbesMap, allProbes };
return { onlineProbesMap, allProbes, request };
}

private testToOfflineProbe = (test: MeasurementResult, ip: string): OfflineProbe => ({
Expand Down
6 changes: 4 additions & 2 deletions test/tests/integration/measurement/create-measurement.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,8 @@ describe('Create measurement', () => {
.send({
type: 'ping',
target: 'example.com',
limit: 2,
locations: [{ country: 'US' }],
})
.expect(202)
.expect((response) => {
Expand All @@ -554,8 +556,8 @@ describe('Create measurement', () => {
await requestAgent.get(`/v1/measurements/${id2}`)
.expect(200)
.expect((response) => {
expect(response.body.limit).to.not.exist;
expect(response.body.locations).to.not.exist;
expect(response.body.limit).to.equal(2);
expect(response.body.locations).to.deep.equal([{ country: 'US' }]);
expect(response).to.matchApiSchema();
});
});
Expand Down
Loading

0 comments on commit c5e0a35

Please sign in to comment.