Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into sk/rst_noerror
Browse files Browse the repository at this point in the history
  • Loading branch information
srikrsna-buf committed Sep 10, 2024
2 parents 2ad600a + ed63cee commit 2fb090c
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 37 deletions.
12 changes: 10 additions & 2 deletions packages/connect-cloudflare/turbo.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,15 @@
"$schema": "https://turbo.build/schema.json",
"extends": ["//"],
"tasks": {
"conformance:client": { "cache": false, "dependsOn": ["^build"] },
"conformance:server": { "cache": false, "dependsOn": ["^build"] }
"conformance:client": {
"cache": false,
"dependsOn": ["^build"],
"env": ["CLOUDFLARE_*"]
},
"conformance:server": {
"cache": false,
"dependsOn": ["^build"],
"env": ["CLOUDFLARE_*"]
}
}
}
1 change: 1 addition & 0 deletions packages/connect-express/src/express-connect-middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ export function expressConnectMiddleware(
}
const uReq = universalRequestFromNodeRequest(
req,
res,
getPreparsedBody(req),
options.contextValues?.(req),
);
Expand Down
1 change: 1 addition & 0 deletions packages/connect-fastify/src/fastify-connect-plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ export function fastifyConnectPlugin(
const uRes = await uHandler(
universalRequestFromNodeRequest(
req.raw,
reply.raw,
req.body as JsonValue | undefined,
opts.contextValues?.(req),
),
Expand Down
1 change: 1 addition & 0 deletions packages/connect-next/src/connect-nextjs-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ export function nextJsApiRouter(options: NextJsApiRouterOptions): ApiRoute {
const uRes = await uHandler(
universalRequestFromNodeRequest(
req,
res,
req.body as JsonValue | undefined,
options.contextValues?.(req),
),
Expand Down
1 change: 1 addition & 0 deletions packages/connect-node/src/connect-node-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ export function connectNodeAdapter(
}
const uReq = universalRequestFromNodeRequest(
req,
res,
undefined,
options.contextValues?.(req),
);
Expand Down
5 changes: 2 additions & 3 deletions packages/connect-node/src/node-universal-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,12 @@ function createNodeHttp1Client(
sentinel.catch((e) => {
reject(e);
});

h1Request(
sentinel,
req.url,
{
...httpOptions,
headers: webHeaderToNodeHeaders(req.header),
headers: webHeaderToNodeHeaders(req.header, httpOptions?.headers),
method: req.method,
},
(request) => {
Expand Down Expand Up @@ -280,7 +279,7 @@ function h2Request(
options: Omit<http2.ClientSessionRequestOptions, "signal">,
onStream: (stream: http2.ClientHttp2Stream) => void,
): void {
const requestUrl = new URL(url, sm.authority);
const requestUrl = new URL(url);
if (requestUrl.origin !== sm.authority) {
const message = `cannot make a request to ${requestUrl.origin}: the http2 session is connected to ${sm.authority}`;
sentinel.reject(new ConnectError(message, Code.Internal));
Expand Down
54 changes: 51 additions & 3 deletions packages/connect-node/src/node-universal-handler.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ import type { UniversalServerRequest } from "@connectrpc/connect/protocol";
// Polyfill the Headers API for Node versions < 18
import "./node-headers-polyfill.js";

describe("universalRequestFromNodeRequest()", function () {
describe("universalRequestFromNodeResponse()", function () {
describe("with HTTP/2 stream closed with an RST code", function () {
let serverRequest: UniversalServerRequest | undefined;
const server = useNodeServer(() => {
serverRequest = undefined;
return http2.createServer(function (request) {
return http2.createServer(function (request, response) {
serverRequest = universalRequestFromNodeRequest(
request,
response,
undefined,
undefined,
);
Expand Down Expand Up @@ -176,9 +177,10 @@ describe("universalRequestFromNodeRequest()", function () {
connectionsCheckingInterval: 1,
requestTimeout: 0,
},
function (request) {
function (request, response) {
serverRequest = universalRequestFromNodeRequest(
request,
response,
undefined,
undefined,
);
Expand Down Expand Up @@ -269,6 +271,7 @@ describe("universalRequestFromNodeRequest()", function () {
function (request, response) {
serverRequest = universalRequestFromNodeRequest(
request,
response,
undefined,
undefined,
);
Expand Down Expand Up @@ -322,4 +325,49 @@ describe("universalRequestFromNodeRequest()", function () {
}
});
});
describe("with HTTP/1.1", function () {
let serverRequest: UniversalServerRequest | undefined;
let serverNodeResponse:
| http.ServerResponse<http.IncomingMessage>
| undefined;
const server = useNodeServer(() =>
http.createServer(function (request, response) {
serverRequest = universalRequestFromNodeRequest(
request,
response,
undefined,
undefined,
);
response.on("error", fail);
serverNodeResponse = response;
void readAllBytes(
serverRequest.body as AsyncIterable<Uint8Array>,
Number.MAX_SAFE_INTEGER,
).then(() => {
response.writeHead(200);
response.flushHeaders();
});
}),
);
it("signal should not be aborted on start", async function () {
await new Promise<void>((resolve) => {
const request = http.request(server.getUrl(), {
method: "POST",
// close TCP connection after we're done so that the server shuts down cleanly
agent: new http.Agent({ keepAlive: false }),
});
request.on("error", fail);
request.flushHeaders();
request.end();
request.on("response", (response) => {
expect(serverRequest).toBeDefined();
expect(serverRequest?.signal.aborted).toBeFalse();
serverNodeResponse?.end();
void readAllBytes(response, Number.MAX_SAFE_INTEGER).then(() =>
resolve(),
);
});
});
});
});
});
48 changes: 43 additions & 5 deletions packages/connect-node/src/node-universal-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,44 @@ export type NodeServerResponse = (
};

/**
* Converts a UniversalServerRequest to a Node.js server request.
* Converts a Node.js server request to a UniversalServerRequest.
* This function helps to implement adapters to server frameworks running
* on Node.js. Please be careful using this function in your own code, as we
* may have to make changes to it in the future.
*/
export function universalRequestFromNodeRequest(
nodeRequest: NodeServerRequest,
nodeResponse: NodeServerResponse,
parsedJsonBody: JsonValue | undefined,
contextValues: ContextValues | undefined,
): UniversalServerRequest;
/**
* @deprecated
*/
export function universalRequestFromNodeRequest(
nodeRequest: NodeServerRequest,
parsedJsonBody: JsonValue | undefined,
contextValues: ContextValues | undefined,
): UniversalServerRequest;
export function universalRequestFromNodeRequest(
nodeRequest: NodeServerRequest,
...rest:
| [
nodeResponse: NodeServerResponse,
parsedJsonBody: JsonValue | undefined,
contextValues: ContextValues | undefined,
]
| [
parsedJsonBody: JsonValue | undefined,
contextValues: ContextValues | undefined,
]
): UniversalServerRequest {
const nodeResponse: NodeServerResponse | undefined =
rest.length === 3 ? rest[0] : undefined;
const parsedJsonBody: JsonValue | undefined =
rest.length === 3 ? rest[1] : rest[0];
const contextValues: ContextValues | undefined =
rest.length === 3 ? rest[2] : rest[1];
const encrypted =
"encrypted" in nodeRequest.socket && nodeRequest.socket.encrypted;
const protocol = encrypted ? "https" : "http";
Expand Down Expand Up @@ -107,18 +135,28 @@ export function universalRequestFromNodeRequest(
});
} else {
// HTTP/1.1 does not have error codes, but Node.js has ECONNRESET
const nodeResponsOrRequest = nodeResponse ?? nodeRequest;
const onH1Error = (e: Error) => {
nodeRequest.off("error", onH1Error);
nodeRequest.off("close", onH1Close);
nodeResponsOrRequest.off("close", onH1Close);
abortController.abort(connectErrorFromNodeReason(e));
};
const onH1Close = () => {
nodeRequest.off("error", onH1Error);
nodeRequest.off("close", onH1Close);
abortController.abort();
nodeResponsOrRequest.off("close", onH1Close);
// When subscribed to the response, this can get called before "error"
abortController.abort(
nodeRequest.errored
? connectErrorFromNodeReason(nodeRequest.errored)
: undefined,
);
};
nodeRequest.once("error", onH1Error);
nodeRequest.once("close", onH1Close);
// Node emits close on the request as soon as all data is read.
// We instead subscribe to the response (if available)
//
// Ref: https://github.com/nodejs/node/issues/40775
nodeResponsOrRequest.once("close", onH1Close);
}
return {
httpVersion: nodeRequest.httpVersion,
Expand Down
21 changes: 21 additions & 0 deletions packages/connect-node/src/node-universal-header.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import * as http from "http";
import {
nodeHeaderToWebHeader,
webHeaderToNodeHeaders,
Expand Down Expand Up @@ -124,4 +125,24 @@ describe("webHeaderToNodeHeaders()", function () {
});
});
}
it("should accept default node headers", function () {
const nodeDefaults: http.OutgoingHttpHeaders = {
a: "a",
b: ["b1", "b2"],
c: 123,
};
const webHeaders: HeadersInit = [
["b", "web"],
["c", "456"],
["d", "d1"],
["d", "d2"],
];
const h = webHeaderToNodeHeaders(webHeaders, nodeDefaults);
expect(h).toEqual({
a: "a",
b: ["b1", "b2", "web"],
c: ["123", "456"],
d: ["d1", "d2"],
});
});
});
69 changes: 45 additions & 24 deletions packages/connect-node/src/node-universal-header.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,45 +53,66 @@ export function nodeHeaderToWebHeader(

/**
* Convert a fetch API Headers object to a Node.js headers object.
*
* Optionally accepts default Node.js headers. If provided, fetch API headers
* are appended to the defaults. The original defaults headers are not modified.
*/
export function webHeaderToNodeHeaders(
headersInit: HeadersInit,
defaultNodeHeaders?: http.OutgoingHttpHeaders,
): http.OutgoingHttpHeaders;
export function webHeaderToNodeHeaders(
headersInit: HeadersInit | undefined,
): http.OutgoingHttpHeaders | undefined;
export function webHeaderToNodeHeaders(
headersInit: HeadersInit | undefined,
defaultNodeHeaders?: http.OutgoingHttpHeaders,
): http.OutgoingHttpHeaders | undefined {
if (headersInit === undefined) {
if (headersInit === undefined && defaultNodeHeaders === undefined) {
return undefined;
}
const o = Object.create(null) as http.OutgoingHttpHeaders;
const append = (key: string, value: string): void => {
key = key.toLowerCase();
const existing = o[key];
if (typeof existing == "string") {
o[key] = [existing, value];
} else if (Array.isArray(existing)) {
existing.push(value);
} else {
o[key] = value;
}
};
if (Array.isArray(headersInit)) {
for (const [key, value] of headersInit) {
append(key, value);
}
} else if ("forEach" in headersInit) {
if (typeof headersInit.forEach == "function") {
headersInit.forEach((value, key) => {
append(key, value);
});
if (defaultNodeHeaders !== undefined) {
for (const [key, value] of Object.entries(defaultNodeHeaders)) {
if (Array.isArray(value)) {
o[key] = value.concat();
} else if (value !== undefined) {
o[key] = value;
}
}
} else {
for (const [key, value] of Object.entries<string>(headersInit)) {
append(key, value);
}
if (headersInit !== undefined) {
if (Array.isArray(headersInit)) {
for (const [key, value] of headersInit) {
appendWebHeader(o, key, value);
}
} else if ("forEach" in headersInit) {
if (typeof headersInit.forEach == "function") {
headersInit.forEach((value, key) => {
appendWebHeader(o, key, value);
});
}
} else {
for (const [key, value] of Object.entries<string>(headersInit)) {
appendWebHeader(o, key, value);
}
}
}
return o;
}

function appendWebHeader(
o: http.OutgoingHttpHeaders,
key: string,
value: string,
) {
key = key.toLowerCase();
const existing = o[key];
if (Array.isArray(existing)) {
existing.push(value);
} else if (existing === undefined) {
o[key] = value;
} else {
o[key] = [existing.toString(), value];
}
}

0 comments on commit 2fb090c

Please sign in to comment.