diff --git a/lib/index.js b/lib/index.js index bce6e6b1..898c8125 100644 --- a/lib/index.js +++ b/lib/index.js @@ -4,7 +4,7 @@ const { HttpErrorAuthOTP } = require('./errors.js') const checkResponse = require('./check-response.js') const getAuth = require('./auth.js') const fetch = require('make-fetch-happen') -const JSONStream = require('minipass-json-stream') +const JSONStream = require('./json-stream') const npa = require('npm-package-arg') const qs = require('querystring') const url = require('url') diff --git a/lib/json-stream.js b/lib/json-stream.js new file mode 100644 index 00000000..fd997d92 --- /dev/null +++ b/lib/json-stream.js @@ -0,0 +1,248 @@ +const Parser = require('jsonparse') +const { Minipass } = require('minipass') + +class JSONStreamError extends Error { + constructor (err, caller) { + super(err.message) + Error.captureStackTrace(this, caller || this.constructor) + } + + get name () { + return 'JSONStreamError' + } + + set name (n) {} +} + +const check = (x, y) => + typeof x === 'string' ? String(y) === x + : x && typeof x.test === 'function' ? x.test(y) + : typeof x === 'boolean' || typeof x === 'object' ? x + : typeof x === 'function' ? x(y) + : false + +class JSONStream extends Minipass { + #count = 0 + #ending = false + #footer = null + #header = null + #map = null + #onTokenOriginal + #parser + #path = null + #root = null + + constructor (opts = {}) { + super({ + ...opts, + objectMode: true, + }) + + const parser = this.#parser = new Parser() + parser.onValue = value => this.#onValue(value) + this.#onTokenOriginal = parser.onToken + parser.onToken = (token, value) => this.#onToken(token, value) + parser.onError = er => this.#onError(er) + + this.#path = typeof opts.path === 'string' + ? opts.path.split('.').map(e => + e === '$*' ? { emitKey: true } + : e === '*' ? true + : e === '' ? { recurse: true } + : e) + : Array.isArray(opts.path) && opts.path.length ? opts.path + : null + + if (typeof opts.map === 'function') { + this.#map = opts.map + } + } + + #setHeaderFooter (key, value) { + // header has not been emitted yet + if (this.#header !== false) { + this.#header = this.#header || {} + this.#header[key] = value + } + + // footer has not been emitted yet but header has + if (this.#footer !== false && this.#header === false) { + this.#footer = this.#footer || {} + this.#footer[key] = value + } + } + + #onError (er) { + // error will always happen during a write() call. + const caller = this.#ending ? this.end : this.write + this.#ending = false + return this.emit('error', new JSONStreamError(er, caller)) + } + + #onToken (token, value) { + const parser = this.#parser + this.#onTokenOriginal.call(this.#parser, token, value) + if (parser.stack.length === 0) { + if (this.#root) { + const root = this.#root + if (!this.#path) { + super.write(root) + } + this.#root = null + this.#count = 0 + } + } + } + + #onValue (value) { + const parser = this.#parser + // the LAST onValue encountered is the root object. + // just overwrite it each time. + this.#root = value + + if (!this.#path) { + return + } + + let i = 0 // iterates on path + let j = 0 // iterates on stack + let emitKey = false + let emitPath = false + while (i < this.#path.length) { + const key = this.#path[i] + j++ + + if (key && !key.recurse) { + const c = (j === parser.stack.length) ? parser : parser.stack[j] + if (!c) { + return + } + if (!check(key, c.key)) { + this.#setHeaderFooter(c.key, value) + return + } + emitKey = !!key.emitKey + emitPath = !!key.emitPath + i++ + } else { + i++ + if (i >= this.#path.length) { + return + } + const nextKey = this.#path[i] + if (!nextKey) { + return + } + while (true) { + const c = (j === parser.stack.length) ? parser : parser.stack[j] + if (!c) { + return + } + if (check(nextKey, c.key)) { + i++ + if (!Object.isFrozen(parser.stack[j])) { + parser.stack[j].value = null + } + break + } else { + this.#setHeaderFooter(c.key, value) + } + j++ + } + } + } + + // emit header + if (this.#header) { + const header = this.#header + this.#header = false + this.emit('header', header) + } + if (j !== parser.stack.length) { + return + } + + this.#count++ + const actualPath = parser.stack.slice(1) + .map(e => e.key).concat([parser.key]) + if (value !== null && value !== undefined) { + const data = this.#map ? this.#map(value, actualPath) : value + if (data !== null && data !== undefined) { + const emit = emitKey || emitPath ? { value: data } : data + if (emitKey) { + emit.key = parser.key + } + if (emitPath) { + emit.path = actualPath + } + super.write(emit) + } + } + + if (parser.value) { + delete parser.value[parser.key] + } + + for (const k of parser.stack) { + k.value = null + } + } + + write (chunk, encoding, cb) { + if (typeof encoding === 'function') { + cb = encoding + encoding = null + } + if (typeof chunk === 'string') { + chunk = Buffer.from(chunk, encoding) + } else if (!Buffer.isBuffer(chunk)) { + return this.emit('error', new TypeError( + 'Can only parse JSON from string or buffer input')) + } + this.#parser.write(chunk) + if (cb) { + cb() + } + return this.flowing + } + + end (chunk, encoding, cb) { + this.#ending = true + if (typeof encoding === 'function') { + cb = encoding + encoding = null + } + if (typeof chunk === 'function') { + cb = chunk + chunk = null + } + if (chunk) { + this.write(chunk, encoding) + } + if (cb) { + this.once('end', cb) + } + + const h = this.#header + this.#header = null + const f = this.#footer + this.#footer = null + if (h) { + this.emit('header', h) + } + if (f) { + this.emit('footer', f) + } + return super.end() + } + + static get JSONStreamError () { + return JSONStreamError + } + + static parse (path, map) { + return new JSONStream({ path, map }) + } +} + +module.exports = JSONStream diff --git a/test/index.js b/test/index.js index 4a165718..f989ea4a 100644 --- a/test/index.js +++ b/test/index.js @@ -1,8 +1,7 @@ -'use strict' +const t = require('tap') const { Minipass } = require('minipass') const ssri = require('ssri') -const t = require('tap') const zlib = require('zlib') const defaultOpts = require('../lib/default-opts.js') const tnock = require('./util/tnock.js') @@ -273,50 +272,6 @@ t.test('query string with ?write=true', t => { .then(res => t.strictSame(res, { write: 'go for it' })) }) -t.test('fetch.json.stream()', async t => { - tnock(t, defaultOpts.registry).get('/hello').reply(200, { - a: 1, - b: 2, - c: 3, - }) - const data = await fetch.json.stream('/hello', '$*', OPTS).collect() - t.same(data, [ - { key: 'a', value: 1 }, - { key: 'b', value: 2 }, - { key: 'c', value: 3 }, - ], 'got a streamed JSON body') -}) - -t.test('fetch.json.stream opts.mapJSON', async t => { - tnock(t, defaultOpts.registry).get('/hello').reply(200, { - a: 1, - b: 2, - c: 3, - }) - const data = await fetch.json.stream('/hello', '*', { - ...OPTS, - mapJSON (value, [key]) { - return [key, value] - }, - }).collect() - t.same(data, [ - ['a', 1], - ['b', 2], - ['c', 3], - ], 'data mapped') -}) - -t.test('fetch.json.stream gets fetch error on stream', async t => { - await t.rejects(fetch.json.stream('/hello', '*', { - ...OPTS, - body: Promise.reject(new Error('no body for you')), - method: 'POST', - gzip: true, // make sure we don't gzip the promise, lol! - }).collect(), { - message: 'no body for you', - }) -}) - t.test('opts.ignoreBody', async t => { tnock(t, defaultOpts.registry) .get('/hello') diff --git a/test/stream.js b/test/stream.js new file mode 100644 index 00000000..2e91c9ed --- /dev/null +++ b/test/stream.js @@ -0,0 +1,65 @@ +const t = require('tap') + +const tnock = require('./util/tnock.js') +const defaultOpts = require('../lib/default-opts.js') +defaultOpts.registry = 'https://mock.reg/' + +const fetch = require('..') + +const OPTS = { + timeout: 0, + retry: { + retries: 1, + factor: 1, + minTimeout: 1, + maxTimeout: 10, + }, +} + +t.test('json.stream', t => { + t.test('fetch.json.stream()', async t => { + tnock(t, defaultOpts.registry).get('/hello').reply(200, { + a: 1, + b: 2, + c: 3, + }) + const data = await fetch.json.stream('/hello', '$*', OPTS).collect() + t.same(data, [ + { key: 'a', value: 1 }, + { key: 'b', value: 2 }, + { key: 'c', value: 3 }, + ], 'got a streamed JSON body') + }) + + t.test('fetch.json.stream opts.mapJSON', async t => { + tnock(t, defaultOpts.registry).get('/hello').reply(200, { + a: 1, + b: 2, + c: 3, + }) + const data = await fetch.json.stream('/hello', '*', { + ...OPTS, + mapJSON (value, [key]) { + return [key, value] + }, + }).collect() + t.same(data, [ + ['a', 1], + ['b', 2], + ['c', 3], + ], 'data mapped') + }) + + t.test('fetch.json.stream gets fetch error on stream', async t => { + await t.rejects(fetch.json.stream('/hello', '*', { + ...OPTS, + body: Promise.reject(new Error('no body for you')), + method: 'POST', + gzip: true, // make sure we don't gzip the promise, lol! + }).collect(), { + message: 'no body for you', + }) + }) + + t.end() +})