diff --git a/public/dashp2p.js b/public/dashp2p.js index 3a9b608..cfd9df3 100644 --- a/public/dashp2p.js +++ b/public/dashp2p.js @@ -65,18 +65,15 @@ var DashP2P = ('object' === typeof module && exports) || {}; p2p.header = null; /** @type {Uint8Array?} */ p2p.payload = null; + let explicitEvents = ['version', 'verack', 'ping', 'pong']; + p2p._promiseStream = Utils.EventSocket.create(explicitEvents); - p2p._generator = Utils.createPromiseGenerator(); - - p2p.accept = async function (messageTypes) { - let data = await p2p._generator.next(); - return data; - }; + p2p.listen = p2p._promiseStream.listen; /** @param {Uint8Array?} */ p2p.write = function (chunk) { if (p2p.state === 'error') { - p2p._generator.reject(p2p.error); + p2p._promiseStream.rejectAll(p2p.error); // in the case of UDP where we miss a packet, // we can log the error but still resume on the next one. @@ -101,10 +98,12 @@ var DashP2P = ('object' === typeof module && exports) || {}; p2p.header.command, p2p.payload?.length || null, ); - p2p._generator.resolve({ + let msg = { + command: p2p.header.command, header: p2p.header, payload: p2p.payload, - }); + }; + p2p._promiseStream.resolveAll(msg.command, msg); p2p.state = 'header'; p2p.write(chunk); @@ -112,7 +111,7 @@ var DashP2P = ('object' === typeof module && exports) || {}; } let err = new Error(`developer error: unknown state '${p2p.state}'`); - p2p._generator.reject(err); + p2p._promiseStream.rejectAll(err); p2p.state = 'header'; p2p.write(chunk); }; @@ -715,53 +714,206 @@ var DashP2P = ('object' === typeof module && exports) || {}; }; Parsers.SIZES = SIZES; - Utils.createPromiseGenerator = function () { - let g = {}; - - g._settled = true; - g._promise = Promise.resolve(); // for type hint - g._results = []; - - g.resolve = function (result) {}; - g.reject = function (err) {}; - g.next = async function () { - if (!g._settled) { - console.warn('g.accept() called before previous call was settled'); - return await g._promise; - } - g._settled = false; - g._promise = new Promise(function (_resolve, _reject) { - g.resolve = function (result) { - if (g._settled) { - g._results.push(result); - return; - } - g._settled = true; - _resolve(result); + Utils.EventSocket = {}; + + /** @param {String} events */ + Utils.EventSocket.create = function (explicitEvents) { + let stream = {}; + + stream._explicitEvents = explicitEvents; + + /** @type {Array} */ + stream._connections = []; + + /** + * @param {Array} events - ex: ['*', 'error'] for default events, or list by name + */ + stream.listen = function (events = null) { + let conn = Utils.EventSocket.createConnection(stream, events); + return conn; + }; + + stream.resolveAll = function (eventname, msg) { + for (let p of stream._connections) { + let isSubscribed = p._events.includes(eventname); + if (isSubscribed) { + p._resolve(msg); + continue; + } + + let isExplicit = stream._explicitEvents.includes(eventname); + if (isExplicit) { + continue; + } + + let hasCatchall = p._events.includes('*'); + if (hasCatchall) { + p._resolve(msg); + } + } + }; + + stream.rejectAll = function (err) { + let handled = false; + for (let p of stream._connections) { + let handlesErrors = p._events.includes('error'); + if (!handlesErrors) { + continue; + } + + handled = true; + p._reject(err); + } + if (!handled) { + for (let p of stream._connections) { + p._reject(err); + } + } + }; + + return stream; + }; + + Utils.EventSocket.createConnection = function (stream, defaultEvents = null) { + let p = {}; + stream._connections.push(p); + + p._events = defaultEvents; + + p.closed = false; + p._settled = false; + p._resolve = function (msg) {}; + p._reject = function (err) {}; + p._promise = Promise.resolve(null); + p._next = async function () { + p._settled = false; + p._promise = new Promise(function (_resolve, _reject) { + p._resolve = function (msg) { + p._close(true); + _resolve(msg); }; - g.reject = function (error) { - if (g._settled) { - g._results.push(error); - return; - } - g._settled = true; - _reject(error); + p._reject = function (err) { + p._close(true); + _reject(err); }; }); - if (g._results.length) { - let result = g._results.shift(); - if (result instanceof Error) { - g.reject(result); - } else { - g.resolve(result); - } + + return await p._promise; + }; + + /** + * Accepts the next message of the given event name, + * or of any of the default event names. + * @param {String} eventname - '*' for default events, 'error' for error, or others by name + */ + p.accept = async function (eventname) { + if (p.closed) { + let err = new Error('cannot accept new events after close'); + Object.assign(err, { code: 'E_ALREADY_CLOSED' }); + throw err; } - return await g._promise; + + if (eventname) { + p.events = [eventname]; + } else if (defaultEvents?.length) { + p.events = defaultEvents; + } else { + let err = new Error( + `call stream.listen(['*']) or conn.accept('*') for default events`, + ); + Object.assign(err, { code: 'E_NO_EVENTS' }); + throw err; + } + + return await p._next(); }; - return g; + p._close = function (_settle) { + if (p.closed) { + return; + } + p.closed = true; + + let index = stream._connections.indexOf(p); + if (index >= 0) { + void stream._connections.splice(index, 1); + } + if (_settle) { + p._settled = true; + } + if (p._settled) { + return; + } + + p._settled = true; + let err = new Error('promise stream closed'); + Object.assign(err, { code: 'E_CLOSE' }); + p._reject(err); + }; + + /** + * Causes `let msg = conn.accept()` to fail with E_CLOSE or E_ALREADY_CLOSED + */ + p.close = function () { + p._close(false); + }; + + return p; }; + // /** @param {String} events */ + // Utils.createPromiseGenerator = function (events) { + // let g = {}; + + // g.events = events; + + // // g._settled = true; + // g._promise = Promise.resolve(); // for type hint + // g._results = []; + + // g.resolve = function (result) {}; + // g.reject = function (err) {}; + + // // g.init = async function () { + // // if (!g._settled) { + // // console.warn('g.init() called again before previous call was settled'); + // // return await g._promise; + // // } + // // g._settled = false; + // g._promise = new Promise(function (_resolve, _reject) { + // g.resolve = _resolve; + // g.reject = _reject; + // // g.resolve = function (result) { + // // if (g._settled) { + // // g._results.push(result); + // // return; + // // } + // // g._settled = true; + // // _resolve(result); + // // }; + // // g.reject = function (error) { + // // if (g._settled) { + // // g._results.push(error); + // // return; + // // } + // // g._settled = true; + // // _reject(error); + // // }; + // }); + // // if (g._results.length) { + // // let result = g._results.shift(); + // // if (result instanceof Error) { + // // g.reject(result); + // // } else { + // // g.resolve(result); + // // } + // // } + // // return await g._promise; + // // }; + + // return g; + // }; + /** * @param {Array} byteArrays * @param {Number?} [len] diff --git a/public/wallet-app.js b/public/wallet-app.js index e0fe438..7944077 100644 --- a/public/wallet-app.js +++ b/public/wallet-app.js @@ -1104,9 +1104,10 @@ for (;;) { let subs = ['*', 'inv', 'ping', 'pong', 'version', 'verack']; - let msg = await p2p.accept(subs); + let conn = p2p.listen(subs); + let msg = await conn.accept(); let command = msg.header.command; - console.log('p2p.accept():', command); + console.log('conn.accept():', command); let isSub = subs.includes(command); if (isSub) { console.log(msg);