Skip to content

Commit

Permalink
resolver: coalesce requests
Browse files Browse the repository at this point in the history
  • Loading branch information
ignoramous committed Sep 29, 2024
1 parent 180d5c9 commit bd725ab
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/plugins/cache-util.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ function updateTtl(packet, end) {
}
}

function makeId(packet) {
export function makeId(packet) {
// multiple questions are kind of an undefined behaviour
// stackoverflow.com/a/55093896
if (!dnsutil.hasSingleQuestion(packet)) return null;
Expand Down
62 changes: 46 additions & 16 deletions src/plugins/dns-op/resolver.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import * as dnsutil from "../../commons/dnsutil.js";
import * as bufutil from "../../commons/bufutil.js";
import * as util from "../../commons/util.js";
import * as envutil from "../../commons/envutil.js";
import * as system from "../../system.js";
import { BlocklistFilter } from "../rethinkdns/filter.js";

export default class DNSResolver {
Expand All @@ -33,9 +34,11 @@ export default class DNSResolver {
this.log = log.withTags("DnsResolver");

this.measurements = [];
this.coalstats = { tot: 0, pub: 0, empty: 0, try: 0 };
this.profileResolve = envutil.profileDnsResolves();
// only valid on nodejs
this.forceDoh = envutil.forceDoh();
this.timeout = (envutil.workersTimeout() / 2) | 0;

// only valid on workers
// bg-bw-init results in higher io-wait, not lower
Expand Down Expand Up @@ -342,24 +345,39 @@ DNSResolver.prototype.resolveDnsUpstream = async function (
query,
packet
) {
// Promise.any on promisedPromises[] only works if there are
// zero awaits in this function or any of its downstream calls.
// Otherwise, the first reject in promisedPromises[], before
// any statement in the call-stack awaits, would throw unhandled
// error, since the event loop would have 'ticked' and Promise.any
// on promisedPromises[] would still not have been executed, as it
// is the last statement of this function (which would have eaten up
// all rejects as long as there was one resolved promise).
const promisedPromises = [];

// if no doh upstreams set, resolve over plain-old dns
if (util.emptyArray(resolverUrls)) {
const eid = cacheutil.makeId(packet);
/** @type {ArrayBuffer[]?} */
let parcel = null;

try {
const g = await system.when(eid, this.timeout);
this.coalstats.tot += 1;
if (!util.emptyArray(g) && g[0] != null) {
const sz = bufutil.len(g[0]);
this.log.d(rxid, "coalesced", eid, sz, this.coalstats);
if (sz > 0) return Promise.resolve(new Response(g[0]));
}
this.coalstats.empty += 1;
this.log.e(rxid, "empty coalesced", eid, this.coalstats);
return Promise.resolve(util.respond503());
} catch (reason) {
// happens on timeout or if new event, eid
this.coalstats.try += 1;
this.log.d(rxid, "not coalesced", eid, reason, this.coalstats);
}

if (this.transport == null) {
this.log.e(rxid, "plain dns transport not set");
this.coalstats.pub += 1;
system.pub(eid, parcel);
return Promise.reject(new Error("plain dns transport not set"));
}
// do not let exceptions passthrough to the caller

let promisedResponse = null;
try {
// do not let exceptions passthrough to the caller
const q = bufutil.bufferOf(query);

let ans = await this.transport.udpquery(rxid, q);
Expand All @@ -369,19 +387,31 @@ DNSResolver.prototype.resolveDnsUpstream = async function (
}

if (ans) {
const r = new Response(bufutil.arrayBufferOf(ans));
promisedPromises.push(Promise.resolve(r));
const ab = bufutil.arrayBufferOf(ans);
parcel = [ab];
promisedResponse = Promise.resolve(new Response(ab));
} else {
promisedPromises.push(Promise.resolve(util.respond503()));
promisedResponse = Promise.resolve(util.respond503());
}
} catch (e) {
this.log.e(rxid, "err when querying plain old dns", e.stack);
promisedPromises.push(Promise.reject(e));
promisedResponse = Promise.reject(e);
}

return Promise.any(promisedPromises);
this.coalstats.pub += 1;
system.pub(eid, parcel);
return promisedResponse;
}

// Promise.any on promisedPromises[] only works if there are
// zero awaits in this function or any of its downstream calls.
// Otherwise, the first reject in promisedPromises[], before
// any statement in the call-stack awaits, would throw unhandled
// error, since the event loop would have 'ticked' and Promise.any
// on promisedPromises[] would still not have been executed, as it
// is the last statement of this function (which would have eaten up
// all rejects as long as there was one resolved promise).
const promisedPromises = [];
try {
// upstream to cache
this.log.d(rxid, "upstream cache");
Expand Down
61 changes: 46 additions & 15 deletions src/system.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const events = new Set([
"stop",
]);

/** @type {Set<string>} */
const ephemeralEvents = new Set();

/** @type {Map<string, Set<listenfn>>} */
const listeners = new Map();
/** @type {Map<string, Set<listenfn>>} */
Expand All @@ -54,10 +57,13 @@ const waitGroup = new Map();
* Fires event.
* @param {string} event
* @param {parcel} parcel
* @returns {int}
*/
export function pub(event, parcel = undefined) {
awaiters(event, parcel);
callbacks(event, parcel);
export function pub(event, parcel = null) {
if (util.emptyString(event)) return;

const tot = awaiters(event, parcel);
return tot + callbacks(event, parcel);
}

/**
Expand All @@ -68,17 +74,21 @@ export function pub(event, parcel = undefined) {
* @returns {boolean}
*/
export function sub(event, cb, timeout = 0) {
if (util.emptyString(event)) return;
if (typeof cb !== "function") return;

const eventCallbacks = listeners.get(event);

// if such even callbacks don't exist
if (!eventCallbacks) {
// but event is sticky, fire off the listener at once
// event is sticky, fire off the listener at once
if (stickyEvents.has(event)) {
const parcel = stickyParcels.get(event); // may be null
microtaskBox(cb, parcel);
return true;
}
// but event doesn't exist, then there's nothing to do
// event doesn't exist so make it ephemeral
ephemeralEvents.add(event);
listeners.set(event, new Set());
return false;
}

Expand All @@ -102,6 +112,10 @@ export function sub(event, cb, timeout = 0) {
* @returns {Promise<parcel>}
*/
export function when(event, timeout = 0) {
if (util.emptyString(event)) {
return Promise.reject(new Error("empty event"));
}

const wg = waitGroup.get(event);

if (!wg) {
Expand All @@ -110,15 +124,17 @@ export function when(event, timeout = 0) {
const parcel = stickyParcels.get(event); // may be null
return Promise.resolve(parcel);
}
// no such event
return Promise.reject(new Error(event + " missing"));
// no such event so make it ephemeral
ephemeralEvents.add(event);
waitGroup.set(event, new Set());
return Promise.reject(new Error(event + " missing event"));
}

return new Promise((accept, reject) => {
const tid =
timeout > 0
? util.timeout(timeout, () => {
reject(new Error(event + " elapsed " + timeout));
reject(new Error(event + " event elapsed " + timeout));
})
: -2;
/** @type {listenfn} */
Expand All @@ -133,34 +149,47 @@ export function when(event, timeout = 0) {
/**
* @param {string} event
* @param {parcel} parcel
* @returns {int}
*/
function awaiters(event, parcel = null) {
const g = waitGroup.get(event);
if (util.emptyString(event)) return 0;
const wg = waitGroup.get(event);

if (!g) return;
if (!wg) return 0;

// listeners valid just the once for stickyEvents
// listeners valid just the once for stickyEvents & ephemeralEvents
if (stickyEvents.has(event)) {
waitGroup.delete(event);
stickyParcels.set(event, parcel);
} else if (ephemeralEvents.has(event)) {
// log.d("sys: wg ephemeralEvents", event, parcel);
waitGroup.delete(event);
ephemeralEvents.delete(event);
}

safeBox(g, parcel);
safeBox(wg, parcel);
return wg.size;
}

/**
* @param {string} event
* @param {parcel} parcel
* @returns {int}
*/
function callbacks(event, parcel = null) {
if (util.emptyString(event)) return 0;
const cbs = listeners.get(event);

if (!cbs) return;
if (!cbs) return 0;

// listeners valid just the once for stickyEvents
// listeners valid just the once for stickyEvents & ephemeralEvents
if (stickyEvents.has(event)) {
listeners.delete(event);
stickyParcels.set(event, parcel);
} else if (ephemeralEvents.has(event)) {
// log.d("sys: cb ephemeralEvents", event, parcel);
listeners.delete(event);
ephemeralEvents.delete(event);
}

// callbacks are queued async and don't block the caller. On Workers,
Expand All @@ -169,6 +198,7 @@ function callbacks(event, parcel = null) {
// incoming request (through the fetch event handler), such callbacks
// may not even fire. Instead use: awaiters and not callbacks.
microtaskBox(cbs, parcel);
return cbs.size;
}

/**
Expand Down Expand Up @@ -226,6 +256,7 @@ function safeBox(fns, arg) {
try {
r.push(f(arg));
} catch (ignore) {
// log.e("sys: safeBox err", ignore);
r.push(null);
}
}
Expand Down

0 comments on commit bd725ab

Please sign in to comment.