Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into sk/rst_noerror
Browse files Browse the repository at this point in the history
Signed-off-by: Sri Krishna Paritala <skrishna@buf.build>
  • Loading branch information
srikrsna-buf committed Sep 5, 2024
2 parents 8a7345e + 32d7ada commit 02de7a4
Show file tree
Hide file tree
Showing 15 changed files with 376 additions and 415 deletions.
2 changes: 1 addition & 1 deletion packages/connect-conformance/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"connectconformance": "bin/connectconformance.cjs"
},
"scripts": {
"generate": "buf generate buf.build/connectrpc/conformance:v1.0.2",
"generate": "buf generate buf.build/connectrpc/conformance:v1.0.3",
"postgenerate": "license-header src/gen",
"prebuild": "rm -rf ./dist/*",
"build": "npm run build:cjs && npm run build:esm",
Expand Down
221 changes: 85 additions & 136 deletions packages/connect-conformance/src/callback-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import { createCallbackClient, ConnectError } from "@connectrpc/connect";
import { createCallbackClient, ConnectError, Code } from "@connectrpc/connect";
import type { CallbackClient, Transport } from "@connectrpc/connect";
import {
ClientCompatRequest,
ClientResponseResult,
} from "./gen/connectrpc/conformance/v1/client_compat_pb.js";
import {
UnaryRequest,
Header as ConformanceHeader,
ServerStreamRequest,
ConformancePayload,
UnimplementedRequest,
IdempotentUnaryRequest,
} from "./gen/connectrpc/conformance/v1/service_pb.js";
import {
convertToProtoError,
convertToProtoHeaders,
appendProtoHeaders,
wait,
getCancelTiming,
getRequestHeaders,
getSingleRequestMessage,
setClientErrorResult,
} from "./protocol.js";
import { ConformanceService } from "./gen/connectrpc/conformance/v1/service_connect.js";

Expand Down Expand Up @@ -59,189 +59,138 @@ export function invokeWithCallbackClient(

async function unary(
client: ConformanceClient,
req: ClientCompatRequest,
compatRequest: ClientCompatRequest,
idempotent: boolean = false,
) {
if (req.requestMessages.length !== 1) {
throw new Error("Unary method requires exactly one request message");
}
const msg = req.requestMessages[0];
const uReq = idempotent ? new IdempotentUnaryRequest() : new UnaryRequest();
if (!msg.unpackTo(uReq)) {
throw new Error("Could not unpack request message to unary request");
}
const reqHeader = new Headers();
appendProtoHeaders(reqHeader, req.requestHeaders);
let error: ConnectError | undefined = undefined;
let resHeaders: ConformanceHeader[] = [];
let resTrailers: ConformanceHeader[] = [];
const payloads: ConformancePayload[] = [];

let call = client.unary;
if (idempotent) {
call = client.idempotentUnary;
}

await wait(req.requestDelayMs);
await wait(compatRequest.requestDelayMs);
const result = new ClientResponseResult();
return new Promise<ClientResponseResult>((resolve) => {
call(
uReq,
(err, uRes) => {
const call = idempotent ? client.idempotentUnary : client.unary;
let clientCancelled = false;
const clientCancelFn = call(
getSingleRequestMessage(
compatRequest,
idempotent ? IdempotentUnaryRequest : UnaryRequest,
),
(err, response) => {
// Callback clients swallow client triggered cancellations and never
// call the callback. This will trigger the global error handler and
// fail the process.
if (clientCancelled) {
throw new Error("Aborted requests should not trigger the callback");
}
if (err !== undefined) {
error = ConnectError.from(err);
// We can't distinguish between headers and trailers here, so we just
// add the metadata to both.
//
// But if the headers are already set, we don't need to overwrite them.
resHeaders =
resHeaders.length === 0
? convertToProtoHeaders(error.metadata)
: resHeaders;
resTrailers = convertToProtoHeaders(error.metadata);
setClientErrorResult(result, err);
} else {
payloads.push(uRes.payload!);
result.payloads.push(response.payload!);
}
resolve(
new ClientResponseResult({
payloads: payloads,
responseHeaders: resHeaders,
responseTrailers: resTrailers,
error: convertToProtoError(error),
}),
);
resolve(result);
},
{
headers: reqHeader,
headers: getRequestHeaders(compatRequest),
onHeader(headers) {
resHeaders = convertToProtoHeaders(headers);
result.responseHeaders = convertToProtoHeaders(headers);
},
onTrailer(trailers) {
resTrailers = convertToProtoHeaders(trailers);
result.responseTrailers = convertToProtoHeaders(trailers);
},
},
);
const { afterCloseSendMs } = getCancelTiming(compatRequest);
if (afterCloseSendMs >= 0) {
setTimeout(() => {
clientCancelled = true;
clientCancelFn();
// Callback clients swallow client triggered cancellations and never
// call the callback. We report a fake error to the test runner to let
// it know that the call was cancelled.
result.error = convertToProtoError(
new ConnectError("client cancelled", Code.Canceled),
);
resolve(result);
}, afterCloseSendMs);
}
});
}

async function serverStream(
client: ConformanceClient,
req: ClientCompatRequest,
compatRequest: ClientCompatRequest,
) {
if (req.requestMessages.length !== 1) {
throw new Error("ServerStream method requires exactly one request message");
}
const msg = req.requestMessages[0];
const uReq = new ServerStreamRequest();
if (!msg.unpackTo(uReq)) {
throw new Error(
"Could not unpack request message to server stream request",
);
}
const reqHeader = new Headers();
appendProtoHeaders(reqHeader, req.requestHeaders);
let error: ConnectError | undefined = undefined;
let resHeaders: ConformanceHeader[] = [];
let resTrailers: ConformanceHeader[] = [];
const payloads: ConformancePayload[] = [];
const cancelTiming = getCancelTiming(req);
let count = 0;

await wait(req.requestDelayMs);
const cancelTiming = getCancelTiming(compatRequest);
await wait(compatRequest.requestDelayMs);
const result = new ClientResponseResult();
return new Promise<ClientResponseResult>((resolve) => {
const cancelFn = client.serverStream(
uReq,
(uResp) => {
if (cancelTiming.afterNumResponses === 0) {
cancelFn();
}
payloads.push(uResp.payload!);
count++;
if (count === cancelTiming.afterNumResponses) {
cancelFn();
let clientCancelled = false;
const clientCancelFn = client.serverStream(
getSingleRequestMessage(compatRequest, ServerStreamRequest),
(response) => {
result.payloads.push(response.payload!);
if (result.payloads.length === cancelTiming.afterNumResponses) {
clientCancelled = true;
clientCancelFn();
}
},
(err) => {
// Callback clients call the closeCallback without an error for client
// triggered cancellation. We report a fake error to the test runner to let
// it know that the call was cancelled.
if (clientCancelled) {
if (err !== undefined) {
throw new Error(
"Aborted requests should not trigger the closeCallback with an error",
);
}
result.error = convertToProtoError(
new ConnectError("client cancelled", Code.Canceled),
);
}
if (err !== undefined) {
error = ConnectError.from(err);
// We can't distinguish between headers and trailers here, so we just
// add the metadata to both.
//
// But if the headers are already set, we don't need to overwrite them.
resHeaders =
resHeaders.length === 0
? convertToProtoHeaders(error.metadata)
: resHeaders;
resTrailers = convertToProtoHeaders(error.metadata);
setClientErrorResult(result, err);
}
resolve(
new ClientResponseResult({
responseHeaders: resHeaders,
responseTrailers: resTrailers,
payloads: payloads,
error: convertToProtoError(error),
}),
);
resolve(result);
},
{
headers: reqHeader,
headers: getRequestHeaders(compatRequest),
onHeader(headers) {
resHeaders = convertToProtoHeaders(headers);
result.responseHeaders = convertToProtoHeaders(headers);
},
onTrailer(trailers) {
resTrailers = convertToProtoHeaders(trailers);
result.responseTrailers = convertToProtoHeaders(trailers);
},
},
);
if (cancelTiming.afterCloseSendMs >= 0) {
setTimeout(() => {
clientCancelled = true;
clientCancelFn();
}, cancelTiming.afterCloseSendMs);
}
});
}

async function unimplemented(
client: ConformanceClient,
req: ClientCompatRequest,
compatRequest: ClientCompatRequest,
) {
const msg = req.requestMessages[0];
const unReq = new UnimplementedRequest();
if (!msg.unpackTo(unReq)) {
throw new Error("Could not unpack request message to unary request");
}
const reqHeader = new Headers();
appendProtoHeaders(reqHeader, req.requestHeaders);
let error: ConnectError | undefined = undefined;
let resHeaders: ConformanceHeader[] = [];
let resTrailers: ConformanceHeader[] = [];

const result = new ClientResponseResult();
return new Promise<ClientResponseResult>((resolve) => {
client.unimplemented(
unReq,
getSingleRequestMessage(compatRequest, UnimplementedRequest),
// eslint-disable-next-line @typescript-eslint/no-unused-vars
(err, _) => {
if (err !== undefined) {
error = ConnectError.from(err);
// We can't distinguish between headers and trailers here, so we just
// add the metadata to both.
//
// But if the headers are already set, we don't need to overwrite them.
resHeaders =
resHeaders.length === 0
? convertToProtoHeaders(error.metadata)
: resHeaders;
resTrailers = convertToProtoHeaders(error.metadata);
setClientErrorResult(result, err);
}
resolve(
new ClientResponseResult({
responseHeaders: resHeaders,
responseTrailers: resTrailers,
error: convertToProtoError(error),
}),
);
resolve(result);
},
{
headers: reqHeader,
headers: getRequestHeaders(compatRequest),
onHeader(headers) {
resHeaders = convertToProtoHeaders(headers);
result.responseHeaders = convertToProtoHeaders(headers);
},
onTrailer(trailers) {
resTrailers = convertToProtoHeaders(trailers);
result.responseTrailers = convertToProtoHeaders(trailers);
},
},
);
Expand Down
30 changes: 17 additions & 13 deletions packages/connect-conformance/src/conformance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,18 @@ import { execFileSync } from "node:child_process";
import { fetch } from "undici";
import { scripts } from "../package.json";

// Extract conformance runner version from the `generate` script
const [, version] = /conformance:(v\d+\.\d+\.\d+)/.exec(scripts.generate) ?? [
"?",
];

const downloadUrl = `https://github.com/connectrpc/conformance/releases/download/${version}`;

export async function run() {
const { archive, bin } = getArtifactNameForEnv();
const tempDir = getTempDir();
// Extract conformance runner version from the `generate` script
const [, version] = /conformance:(v\d+\.\d+\.\d+)/.exec(scripts.generate) ?? [
"?",
];
const { archive, bin } = getArtifactNameForEnv(version);
const tempDir = getTempDir(version);
const binPath = joinPath(tempDir, bin);
if (!existsSync(binPath)) {
const downloadUrl = `https://github.com/connectrpc/conformance/releases/download/${version}/${archive}`;
const archivePath = joinPath(tempDir, archive);
await download(`${downloadUrl}/${archive}`, archivePath);
await download(downloadUrl, archivePath);
await extractBin(archivePath, binPath);
}
execFileSync(binPath, process.argv.slice(2), {
Expand Down Expand Up @@ -101,15 +99,21 @@ async function extractBin(archivePath: string, binPath: string) {
);
}

function getTempDir() {
const tempDir = joinPath(process.env["TEMP"] ?? os.tmpdir(), "conformance");
function getTempDir(version: string) {
const tempDir = joinPath(
process.env["TEMP"] ?? os.tmpdir(),
`conformance-${version}`,
);
if (!existsSync(tempDir)) {
mkdirSync(tempDir, { recursive: true });
}
return tempDir;
}

function getArtifactNameForEnv(): { archive: string; bin: string } {
function getArtifactNameForEnv(version: string): {
archive: string;
bin: string;
} {
let build = "";
let ext = ".tar.gz";
let bin = "connectconformance";
Expand Down
Loading

0 comments on commit 02de7a4

Please sign in to comment.