Skip to content

Commit

Permalink
add abort to auditor, fix websocket subscription bug
Browse files Browse the repository at this point in the history
  • Loading branch information
dskvr committed Jan 14, 2025
1 parent 651eab9 commit 254f824
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 52 deletions.
9 changes: 5 additions & 4 deletions apps/gui/src/routes/relays/+page.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
import { relayAggregates } from '$lib/stores/checks.js';
import { doBootstrap } from '$lib/stores/routines';
import { doAggregateCache } from '$lib/stores/app';
import { onMount } from 'svelte';
import { onDestroy, onMount } from 'svelte';
import { writable, type Writable } from 'svelte/store';
import { StateManager } from '@nostrwatch/nip66';
import { type default as DataTableType } from '$lib/components/lists/table/DataTable.svelte';
import { type default as StatsType } from '$lib/components/layout/Stats.svelte';
import type { DataTableConfig } from '$lib/components/lists/table/DataTableTypes';
import { defaultDataTableConfig } from '$lib/components/lists/table/DataTableTypes';
import { default as relaysTableConfig } from '$lib/config/dataTable/relays.js';
import { userService } from '$lib/stores/user';
export const prerender = true;
Expand Down Expand Up @@ -56,12 +57,12 @@
doAggregateCache.set(true)
loadComponents().then(setConfig);
}
onMount(mount)
const destroy = () => {}
onMount(mount)
onDestroy(destroy)
</script>

<main>
<!-- <pre>{JSON.stringify(relayAggregates, null ,2)}</pre> -->
{#if $ready}
<Stats />
<DataTable data={relayAggregates} {config} {tableKey} />
Expand Down
39 changes: 13 additions & 26 deletions apps/gui/src/routes/relays/[protocol]/[...relay]/+page.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -123,20 +123,8 @@
}
);
const reset = () => {
if (currentRelay === relayUrl) return;
$nip66?.services?.relay?.unsubscribeAll();
console.log('!!! RESET');
loading = true;
currentRelay = '';
activeTab.set('overview');
monitors.set([]);
operatorProfile.set(null);
operatorRelays.set(null);
};
const loadRelayData = async () => {
reset();
destroy();
await nip66Ready()
loadNip11().then(loadOperatorMeta)
if (!$isLivesyncing) {
Expand Down Expand Up @@ -192,6 +180,7 @@
};
const mount = async () => {
if (currentRelay === relayUrl) return;
const resume = await pauseLiveSync()
loadComponents();
if (typeof window === 'undefined' || typeof navigator === 'undefined') return;
Expand All @@ -205,23 +194,21 @@
});
};
onMount(mount);
const destroy = () => {
if (currentRelay === relayUrl) return;
$nip66?.services?.relay?.unsubscribeAll();
loading = true;
currentRelay = '';
monitors.set([]);
operatorProfile.set(null);
operatorRelays.set(null);
};
onDestroy(() => {
reset();
});
onMount(mount);
onDestroy(destroy);
$: relayUrl = new URL(`${$page.params.protocol}://${$page.params.relay}`).toString();
$: timesSeen = $checksrelay.length;
$: seenBy = $checksrelay.map((check: any) => check.pubkey);
$: seenByCount = $checksrelay.length;
$: rttAverage = $relayAggregate?.rtt;
$: ipv4 = $relayAggregate?.ipv4;
$: ipv6 = $relayAggregate?.ipv6;
$: geocode = $relayAggregate?.geocode;
$: dd = $relayAggregate?.dd;
$: isp = $relayAggregate?.isp;
$: name = $nip11?.name || null;
$: description = $nip11?.description || null;
$: banner = $nip11?.banner || null;
$: icon = $nip11?.icon || null;
Expand Down
4 changes: 4 additions & 0 deletions libraries/auditor/src/base/Auditor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ export class Auditor {
return this._suites;
}

abort(): void {
Emitter.emit('all:abort');
}

addSuite(suite: string, options?: any) {
this.suites.add(suite);
if(options) this._conf.options[suite] = options;
Expand Down
15 changes: 10 additions & 5 deletions libraries/auditor/src/base/Sampler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type { WebSocketWrapper as WebSocket } from '@nostrwatch/websocket';
import { Nip01ClientMessageGenerator } from "#src/nips/Nip01/utils/generators.js";
import type { Note, RelayEventMessage } from "#src/nips/Nip01/interfaces/index.js";
import { generateSubId } from "#utils/nostr.js";
import { Emitter } from "./Emitter";

export class Sampler {
private ws: WebSocket;
Expand All @@ -16,14 +17,15 @@ export class Sampler {
private _timeoutMs: number = 5000;
private _totalSamples: number = 0;
private _abort: boolean = false;
private signal = new EventEmitter();
// private signal = new EventEmitter();
private logger: Logger = new Logger('@nostrwatch/auditor:Sampler');
private _ingestors: Ingestor[] = [];

constructor(ws: WebSocket, maximumSamples?: number, timeout?: number) {
this.ws = ws;
if(maximumSamples) this._maximumSamples = maximumSamples;
if(timeout) this._timeoutMs = timeout
Emitter.on('all:abort', this.abort.bind(this))
}

get ingestors(): Ingestor[] {
Expand Down Expand Up @@ -62,7 +64,8 @@ export class Sampler {
break;
}
case 'EOSE': {
this.signal.emit('WS:EOSE');
// this.signal.emit('ws:eose');
Emitter.emit(`ws:eose:${this.subId}`);
break;
}
}
Expand Down Expand Up @@ -121,12 +124,14 @@ export class Sampler {
}, 100);

const cleanup = () => {
this.signal.off('WS:EOSE', onEose);
Emitter.off(`ws:eose:${this.subId}`, onEose);
// this.signal.off('ws:eose', onEose);
clearTimeout(timeout);
clearInterval(interval);
};

this.signal.once('WS:EOSE', onEose);

Emitter.once(`ws:eose:${this.subId}`, onEose);
// this.signal.once('ws:eose', onEose);
});
}

Expand Down
2 changes: 1 addition & 1 deletion libraries/auditor/src/base/Suite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ export abstract class Suite implements ISuite {
private _sampler: Sampler;
private _ingestors: Ingestor[] = [];

protected ajv = new Ajv();
protected ajv = new Ajv({strict: false});
protected ws: WebSocket;
protected signal: EventEmitter = new EventEmitter();
protected result: ISuiteResult = structuredClone(defaultSuiteResult);
Expand Down
2 changes: 2 additions & 0 deletions libraries/auditor/src/base/SuiteTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { Nip01ClientMessageGenerator } from "#src/nips/Nip01/utils/generators.j
import type { INip01Filter, Note, RelayEventMessage, RelayNoticeMessage } from "#src/nips/Nip01/interfaces/index.js";

import { SuiteState } from "./SuiteState.js";
import { Emitter } from "./Emitter.js";

export type CompleteOnType = "off" | "maxEvents" | "EOSE";
export type CompleteOnTypeArray = [CompleteOnType, ...CompleteOnType[]];
Expand Down Expand Up @@ -89,6 +90,7 @@ export abstract class SuiteTest implements ISuiteTest {
this.logger.registerLogger('pass', 'info', chalk.green.bold);
this.logger.registerLogger('fail', 'info', chalk.redBright.bold);
this.logger.registerLogger('skipped', 'info', chalk.bgGray.yellow.bold);
Emitter.on('all:abort', this.abort.bind(this))
}

get filters(): INip01Filter[] {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
---
type: "array"
items:
- const: "AUTH"
- $ref: "@/note.yaml"
minItems: "2"
maxItems: "2"
{
"type": "array",
"items": [
{
"const": "AUTH"
},
{
"$ref": "../../Nip01/schemata/note.schema.json"
}
],
"minItems": 2,
"maxItems": 2
}
2 changes: 1 addition & 1 deletion libraries/nip66/src/core/WebsocketAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export interface SubscribeHandlers {
onevent?: (event: any) => void
onevents?: (events: any[]) => void
oneose?: () => void
onclose?: () => void
onclose?: (subId: string) => void
}
export interface IWebsocketAdapterCallbacks {
onNotice?: (notice: any) => void
Expand Down
17 changes: 9 additions & 8 deletions libraries/nip66/src/services/Service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,23 +78,26 @@ export class Service {
}

async subscribe(args: FetchOptions, callbacks?: SubscribeHandlers): Promise<IEvent[]> {
console.log(`Service.subscribe()`)
await this.ready();
console.log(`Service: is ready`)
let { filters, relays, options, hash } = args;
if(!hash) {
hash = deterministicHash(args)
console.log('Service.subscribe:hash', hash)
}
console.log('Service add hash', hash)
this.subscriptions.add(hash)
if(filters) {
this.fetchFromCache(filters, callbacks);
}
if(callbacks) {
callbacks.onclose = (subId: string) => {
this.subscriptions.delete(subId)
callbacks?.onclose?.(subId)
}
}
const message: WebsocketRequestBody = { filters, relays, options, hash };
const result = await this.websocketAdapter.subscribe(message, callbacks);
console.log('Service delete hash', hash)
this.subscriptions.delete(hash)
if(!options?.keepAlive) {
this.subscriptions.delete(hash)
}
return typeof result === 'boolean'? []: result;
}

Expand Down Expand Up @@ -222,7 +225,6 @@ export class Service {
}

const websocketEvents = await this.fetchFromWebsocket(args, _callbacks);
console.log('fetchFromWebsocket resolved')
websocketEvents.forEach(maybeAddEventToMap);

const finalEvents = Array.from(events.values());
Expand All @@ -243,5 +245,4 @@ export class Service {
if (!this._groupedRelays?.[from]) return;
this._groupedRelays[from] = this._groupedRelays[from].filter((r) => r !== relay);
}

}

0 comments on commit 254f824

Please sign in to comment.