Skip to content

Commit

Permalink
Move code to create generic receiveAny function
Browse files Browse the repository at this point in the history
This allows for reuse later with other resource extractors.
  • Loading branch information
victorlin committed Jul 18, 2022
1 parent 3ae02ea commit 72fa8bc
Showing 1 changed file with 93 additions and 78 deletions.
171 changes: 93 additions & 78 deletions src/endpoints/sources.js
Original file line number Diff line number Diff line change
Expand Up @@ -400,100 +400,115 @@ async function sendAny(req, res, url, accept) {
*/
function receiveSubresource(subresourceExtractor) {
return async (req, res) => {
const method = "PUT";
const subresource = subresourceExtractor(req);

authz.assertAuthorized(req.user, authz.actions.Write, subresource.resource);

/* Proxy the data through us:
*
* client (browser, CLI, etc) ⟷ us (nextstrain.org) ⟷ upstream source
*/
// eslint-disable-next-line prefer-const
let headers = {
"Content-Type": subresource.mediaType,
...copyHeaders(req, ["Content-Encoding", "Content-Length"]),

/* XXX TODO: Consider setting Cache-Control rather than relying on
* ambiguous defaults. Potentially impacts:
*
* - Our own fetch() caching, including in sendSubresource() above
* - Our Charon endpoints, if upstream headers are sent to the browser?
* - CloudFront caching (not sure about its default behaviour)
* - Browsers, if fetched directly, such as by redirection
*
* I think a cautious initial value would be to set "private" or "public"
* depending on the Source and then always set "must-revalidate,
* proxy-revalidate, max-age=0". This would allow caches (ours,
* browsers, CloudFront?) to store the data but always check upstream
* with conditional requests.
* -trs, 7 Dec 2021
*/
};
urlCallback = async (method, headers) => await subresource.url(method, headers);

// Body of the request as a Node stream
let body = req;
return receiveAny(req, res,
urlCallback,
subresource.mediaType
);
};
}

// Compress on the fly to gzip if it's not already gzip compressed.
if (!headers["Content-Encoding"]) {
delete headers["Content-Length"]; // won't be valid after compression
headers["Content-Encoding"] = "gzip";
body = body.pipe(zlib.createGzip());
}

if (headers["Content-Encoding"] !== "gzip") {
throw new UnsupportedMediaType("unsupported Content-Encoding; only gzip is supported");
}
/**
* Proxy the data through us:
*
* client (browser, CLI, etc) ⟷ us (nextstrain.org) ⟷ upstream source
*
* @param {express.request} req - Express-style request instance
* @param {express.response} res - Express-style response instance
* @param {urlCallback} urlCallback - Callback to retrieve resource URL
* @param {string} contentType - `Content-Type` header value to send
* @returns {expressEndpointAsync}
*/
async function receiveAny(req, res, urlCallback, contentType) {
const method = "PUT";

/* Our upstreams for PUTs are all S3, and S3 requires a Content-Length
* header (i.e. doesn't accept streaming PUTs). If we don't have a
* Content-Length from the request (i.e. the request is a streaming PUT or
* we're doing on-the-fly compression), then we have to buffer the entire
* body into memory so we can calculate length for S3. When passed a
* buffer instead of a stream, fetch() will calculate Content-Length for us
* before sending the request.
*
* An alternative to buffering the whole body is to use S3's multipart
* upload API, but the minimum part size is 5MB so some buffering would be
* required anyway. Multipart uploads would add inherent complexity at
* runtime and also design time, as we'd have to rework our data model.
// eslint-disable-next-line prefer-const
let headers = {
"Content-Type": contentType,
...copyHeaders(req, ["Content-Encoding", "Content-Length"]),

/* XXX TODO: Consider setting Cache-Control rather than relying on
* ambiguous defaults. Potentially impacts:
*
* In a review of all the (compressed) core and group datasets (nearly
* 11k), over 99% are less than 5MB and none are more than 15MB. Given
* that we'd only be able to use multipart uploads for less than 1% of
* datasets and even the largest datasets would fit comfortably in memory,
* it doesn't seem worth implementing.
* - Our own fetch() caching, including in sendSubresource() above
* - Our Charon endpoints, if upstream headers are sent to the browser?
* - CloudFront caching (not sure about its default behaviour)
* - Browsers, if fetched directly, such as by redirection
*
* Allow buffering up to 20MB of data after gzip compression (guaranteed by
* Content-Encoding handling above). Requests that exceed this will get a
* 413 error (thrown by readStream()), and if this becomes an issue we can
* consider bumping the limit. Clients also have the option of
* pre-compressing the data and including a Content-Length themselves so we
* don't have to buffer it, in which case we don't limit request sizes.
* -trs, 21 Jan 2022
* I think a cautious initial value would be to set "private" or "public"
* depending on the Source and then always set "must-revalidate,
* proxy-revalidate, max-age=0". This would allow caches (ours,
* browsers, CloudFront?) to store the data but always check upstream
* with conditional requests.
* -trs, 7 Dec 2021
*/
if (!headers["Content-Length"]) {
body = await readStream(body, { limit: 20_000_000 /* 20MB */ });
}
};

const subresourceUrl = await subresource.url(method, {
"Content-Type": headers["Content-Type"],
"Content-Encoding": headers["Content-Encoding"],
});
// Body of the request as a Node stream
let body = req;

const upstreamRes = await fetch(subresourceUrl, {method, body, headers});
// Compress on the fly to gzip if it's not already gzip compressed.
if (!headers["Content-Encoding"]) {
delete headers["Content-Length"]; // won't be valid after compression
headers["Content-Encoding"] = "gzip";
body = body.pipe(zlib.createGzip());
}

switch (upstreamRes.status) {
case 200:
case 204:
break;
if (headers["Content-Encoding"] !== "gzip") {
throw new UnsupportedMediaType("unsupported Content-Encoding; only gzip is supported");
}

default:
throw new InternalServerError(`upstream said: ${upstreamRes.status} ${upstreamRes.statusText}`);
}
/* Our upstreams for PUTs are all S3, and S3 requires a Content-Length
* header (i.e. doesn't accept streaming PUTs). If we don't have a
* Content-Length from the request (i.e. the request is a streaming PUT or
* we're doing on-the-fly compression), then we have to buffer the entire
* body into memory so we can calculate length for S3. When passed a
* buffer instead of a stream, fetch() will calculate Content-Length for us
* before sending the request.
*
* An alternative to buffering the whole body is to use S3's multipart
* upload API, but the minimum part size is 5MB so some buffering would be
* required anyway. Multipart uploads would add inherent complexity at
* runtime and also design time, as we'd have to rework our data model.
*
* In a review of all the (compressed) core and group datasets (nearly
* 11k), over 99% are less than 5MB and none are more than 15MB. Given
* that we'd only be able to use multipart uploads for less than 1% of
* datasets and even the largest datasets would fit comfortably in memory,
* it doesn't seem worth implementing.
*
* Allow buffering up to 20MB of data after gzip compression (guaranteed by
* Content-Encoding handling above). Requests that exceed this will get a
* 413 error (thrown by readStream()), and if this becomes an issue we can
* consider bumping the limit. Clients also have the option of
* pre-compressing the data and including a Content-Length themselves so we
* don't have to buffer it, in which case we don't limit request sizes.
* -trs, 21 Jan 2022
*/
if (!headers["Content-Length"]) {
body = await readStream(body, { limit: 20_000_000 /* 20MB */ });
}

return res.status(204).end();
};
const url = await urlCallback(method, headers);

const upstreamRes = await fetch(url, {method, body, headers});

switch (upstreamRes.status) {
case 200:
case 204:
break;

default:
throw new InternalServerError(`upstream said: ${upstreamRes.status} ${upstreamRes.statusText}`);
}

return res.status(204).end();
}


Expand Down

0 comments on commit 72fa8bc

Please sign in to comment.