Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: cache fixes #3830

Merged
merged 2 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 3 additions & 20 deletions lib/cache/memory-cache-store.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const { Writable, Readable } = require('node:stream')
const { Writable } = require('node:stream')

/**
* @typedef {import('../../types/cache-interceptor.d.ts').default.CacheStore} CacheStore
Expand Down Expand Up @@ -81,24 +81,7 @@ class MemoryCacheStore {
return undefined
}

/**
* @type {Readable | undefined}
*/
let readable
if (value.body) {
readable = new Readable()

for (const chunk of value.body) {
readable.push(chunk)
}

readable.push(null)
}

return {
response: value.opts,
body: readable
}
return { ...value.opts, body: value.body }
}

/**
Expand Down Expand Up @@ -242,7 +225,7 @@ class MemoryCacheStore {
/**
* @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} key
*/
deleteByKey (key) {
delete (key) {
this.#data.delete(`${key.origin}:${key.path}`)
}

Expand Down
62 changes: 25 additions & 37 deletions lib/handler/cache-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class CacheHandler extends DecoratorHandler {
) {
// https://www.rfc-editor.org/rfc/rfc9111.html#name-invalidating-stored-response
try {
this.#store.deleteByKey(this.#cacheKey).catch?.(noop)
this.#store.delete(this.#cacheKey).catch?.(noop)
} catch {
// Fail silently
}
Expand Down Expand Up @@ -135,43 +135,31 @@ class CacheHandler extends DecoratorHandler {
cacheControlDirectives
)

if (this.#cacheKey.method === 'HEAD') {
this.#store.createWriteStream(this.#cacheKey, {
statusCode,
statusMessage,
rawHeaders: strippedHeaders,
vary: varyDirectives,
cachedAt: now,
staleAt,
deleteAt
})
} else {
this.#writeStream = this.#store.createWriteStream(this.#cacheKey, {
statusCode,
statusMessage,
rawHeaders: strippedHeaders,
vary: varyDirectives,
cachedAt: now,
staleAt,
deleteAt
})

if (this.#writeStream) {
const handler = this
this.#writeStream
.on('drain', resume)
.on('error', function () {
this.#writeStream = this.#store.createWriteStream(this.#cacheKey, {
statusCode,
statusMessage,
rawHeaders: strippedHeaders,
vary: varyDirectives,
cachedAt: now,
staleAt,
deleteAt
})

if (this.#writeStream) {
const handler = this
this.#writeStream
.on('drain', resume)
.on('error', function () {
// TODO (fix): Make error somehow observable?
})
.on('close', function () {
if (handler.#writeStream === this) {
handler.#writeStream = undefined
}

// TODO (fix): Should we resume even if was paused downstream?
resume()
})
}
})
.on('close', function () {
if (handler.#writeStream === this) {
handler.#writeStream = undefined
}

// TODO (fix): Should we resume even if was paused downstream?
resume()
})
}
}

Expand Down
135 changes: 67 additions & 68 deletions lib/interceptor/cache.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'

const assert = require('node:assert')
const { Readable } = require('node:stream')
const util = require('../core/util')
const CacheHandler = require('../handler/cache-handler')
const MemoryCacheStore = require('../cache/memory-cache-store')
Expand Down Expand Up @@ -57,95 +58,88 @@ module.exports = (opts = {}) => {
// Where body can be a Buffer, string, stream or blob?
const result = store.get(cacheKey)
if (!result) {
// Request isn't cached
return dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler))
}

/**
* @param {import('node:stream').Readable | undefined} stream
* @param {import('../../types/cache-interceptor.d.ts').default.CachedResponse} value
* @param {import('../../types/cache-interceptor.d.ts').default.GetResult} result
*/
const respondWithCachedValue = (stream, value) => {
assert(!stream || !stream.destroyed, 'stream should not be destroyed')
assert(!stream || !stream.readableDidRead, 'stream should not be readableDidRead')
try {
stream
?.on('error', function (err) {
if (!this.readableEnded) {
if (typeof handler.onError === 'function') {
handler.onError(err)
} else {
process.nextTick(() => {
throw err
})
}
}
})
.on('close', function () {
if (!this.errored && typeof handler.onComplete === 'function') {
handler.onComplete([])
const respondWithCachedValue = ({ cachedAt, rawHeaders, statusCode, statusMessage, body }) => {
const stream = util.isStream(body)
? body
: Readable.from(body ?? [])

assert(!stream.destroyed, 'stream should not be destroyed')
assert(!stream.readableDidRead, 'stream should not be readableDidRead')

stream
.on('error', function (err) {
if (!this.readableEnded) {
if (typeof handler.onError === 'function') {
handler.onError(err)
} else {
throw err
}
})
}
})
.on('close', function () {
if (!this.errored && typeof handler.onComplete === 'function') {
handler.onComplete([])
}
})

if (typeof handler.onConnect === 'function') {
handler.onConnect((err) => {
stream?.destroy(err)
})
if (typeof handler.onConnect === 'function') {
handler.onConnect((err) => {
stream.destroy(err)
})

if (stream?.destroyed) {
return
}
if (stream.destroyed) {
return
}
}

if (typeof handler.onHeaders === 'function') {
// Add the age header
// https://www.rfc-editor.org/rfc/rfc9111.html#name-age
const age = Math.round((Date.now() - value.cachedAt) / 1000)
if (typeof handler.onHeaders === 'function') {
// Add the age header
// https://www.rfc-editor.org/rfc/rfc9111.html#name-age
const age = Math.round((Date.now() - cachedAt) / 1000)

// TODO (fix): What if rawHeaders already contains age header?
const rawHeaders = [...value.rawHeaders, AGE_HEADER, Buffer.from(`${age}`)]
// TODO (fix): What if rawHeaders already contains age header?
rawHeaders = [...rawHeaders, AGE_HEADER, Buffer.from(`${age}`)]

if (handler.onHeaders(value.statusCode, rawHeaders, () => stream?.resume(), value.statusMessage) === false) {
stream?.pause()
}
if (handler.onHeaders(statusCode, rawHeaders, () => stream?.resume(), statusMessage) === false) {
stream.pause()
}
}

if (opts.method === 'HEAD') {
if (typeof handler.onComplete === 'function') {
handler.onComplete([])
if (opts.method === 'HEAD') {
stream.destroy()
} else {
stream.on('data', function (chunk) {
if (typeof handler.onData === 'function' && !handler.onData(chunk)) {
stream.pause()
}

stream?.destroy()
} else {
stream.on('data', function (chunk) {
if (typeof handler.onData === 'function' && !handler.onData(chunk)) {
stream.pause()
}
})
}
} catch (err) {
stream?.destroy(err)
})
}
}

/**
* @param {import('../../types/cache-interceptor.d.ts').default.GetResult} result
*/
const handleStream = (result) => {
const { response: value, body: stream } = result
const handleResult = (result) => {
// TODO (perf): Readable.from path can be optimized...

if (!stream && opts.method !== 'HEAD') {
if (!result.body && opts.method !== 'HEAD') {
throw new Error('stream is undefined but method isn\'t HEAD')
}

// Check if the response is stale
const now = Date.now()
if (now < value.staleAt) {
if (now < result.staleAt) {
// Dump request body.
if (util.isStream(opts.body)) {
opts.body.on('error', () => {}).destroy()
}
respondWithCachedValue(stream, value)
respondWithCachedValue(result)
} else if (util.isStream(opts.body) && util.bodyLength(opts.body) !== 0) {
// If body is is stream we can't revalidate...
// TODO (fix): This could be less strict...
Expand All @@ -157,15 +151,15 @@ module.exports = (opts = {}) => {
...opts,
headers: {
...opts.headers,
'if-modified-since': new Date(value.cachedAt).toUTCString()
'if-modified-since': new Date(result.cachedAt).toUTCString()
}
},
new CacheRevalidationHandler(
(success) => {
if (success) {
respondWithCachedValue(stream, value)
} else {
stream.on('error', () => {}).destroy()
respondWithCachedValue(result)
} else if (util.isStream(result.body)) {
result.body.on('error', () => {}).destroy()
}
},
new CacheHandler(globalOpts, cacheKey, handler)
Expand All @@ -177,14 +171,19 @@ module.exports = (opts = {}) => {
if (typeof result.then === 'function') {
result.then((result) => {
if (!result) {
// Request isn't cached
return dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler))
dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler))
} else {
handleResult(result)
}

handleStream(result)
}).catch(err => handler.onError(err))
}, err => {
if (typeof handler.onError === 'function') {
handler.onError(err)
} else {
throw err
}
})
} else {
handleStream(result)
handleResult(result)
}

return true
Expand Down
2 changes: 1 addition & 1 deletion lib/util/cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ function assertCacheStore (store, name = 'CacheStore') {
throw new TypeError(`expected type of ${name} to be a CacheStore, got ${store === null ? 'null' : typeof store}`)
}

for (const fn of ['get', 'createWriteStream', 'deleteByKey']) {
for (const fn of ['get', 'createWriteStream', 'delete']) {
if (typeof store[fn] !== 'function') {
throw new TypeError(`${name} needs to have a \`${fn}()\` function`)
}
Expand Down
9 changes: 6 additions & 3 deletions test/cache-interceptor/cache-stores.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const { describe, test } = require('node:test')
const { deepStrictEqual, notEqual, equal } = require('node:assert')
const { Readable } = require('node:stream')
const { once } = require('node:events')
const MemoryCacheStore = require('../../lib/cache/memory-cache-store')

Expand All @@ -17,7 +18,7 @@ function cacheStoreTests (CacheStore) {
equal(typeof store.isFull, 'boolean')
equal(typeof store.get, 'function')
equal(typeof store.createWriteStream, 'function')
equal(typeof store.deleteByKey, 'function')
equal(typeof store.delete, 'function')
})

// Checks that it can store & fetch different responses
Expand Down Expand Up @@ -268,9 +269,11 @@ function writeResponse (stream, body) {
* @param {import('../../types/cache-interceptor.d.ts').default.GetResult} result
* @returns {Promise<import('../../types/cache-interceptor.d.ts').default.GetResult | { body: Buffer[] }>}
*/
async function readResponse ({ response, body: stream }) {
async function readResponse ({ body: src, ...response }) {
notEqual(response, undefined)
notEqual(stream, undefined)
notEqual(src, undefined)

const stream = Readable.from(src ?? [])

/**
* @type {Buffer[]}
Expand Down
Loading
Loading