Skip to content

Commit

Permalink
fix: gracefully handle streaming hangups when the server dies unexpec…
Browse files Browse the repository at this point in the history
…ted (#2358)

## Why is this change needed?

Fixes #2346. Under certain conditions, like a hub failure or odd
connectivity quirks, the fetch can hang indefinitely, leading to a need
for consumers to restart if they aren't timeout driven. This avoids the
issue by setting a sensible timeout and returning failure in the event
it can't complete the request.

## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [x] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.


<!-- start pr-codex -->

---

## PR-Codex overview
This PR focuses on improving the handling of server streaming timeouts
and errors in the `shuttle` package, ensuring graceful termination of
unresponsive connections and adding corresponding tests.

### Detailed summary
- Added timeout handling to prevent hanging connections in
`packages/shuttle/src/shuttle/messageReconciliation.ts`.
- Updated promise resolution logic to handle server timeout errors.
- Implemented a test case in
`packages/shuttle/src/shuttle.integration.test.ts` for unresponsive
server requests.

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
CassOnMars authored Oct 8, 2024
1 parent 7370ee4 commit 386059a
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 2 deletions.
5 changes: 5 additions & 0 deletions .changeset/fast-peaches-retire.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@farcaster/shuttle": patch
---

fix: gracefully handle streaming hangups when the server dies unexpected
142 changes: 142 additions & 0 deletions packages/shuttle/src/shuttle.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,148 @@ describe("shuttle", () => {
expect(messagesInDb.length).toBe(2);
});

test("reconciler lets unresponsive server requests terminate in error", async () => {
const startTimestamp = getFarcasterTime()._unsafeUnwrap();

const linkAddMessage = await Factories.LinkAddMessage.create(
{ data: { timestamp: startTimestamp } },
{ transient: { signer } },
);

const castAddMessage = await Factories.CastAddMessage.create({
data: { timestamp: startTimestamp - 1, fid: linkAddMessage.data.fid },
});

const verificationAddMessage = await Factories.CastAddMessage.create({
data: { timestamp: startTimestamp - 2, fid: linkAddMessage.data.fid },
});

await subscriber.processHubEvent(
HubEvent.create({
id: 1,
type: HubEventType.MERGE_MESSAGE,
mergeMessageBody: { message: verificationAddMessage },
}),
);
await subscriber.processHubEvent(
HubEvent.create({
id: 2,
type: HubEventType.MERGE_MESSAGE,
mergeMessageBody: { message: castAddMessage },
}),
);
await subscriber.processHubEvent(
HubEvent.create({
id: 3,
type: HubEventType.MERGE_MESSAGE,
mergeMessageBody: { message: linkAddMessage },
}),
);

// It's a hack, but mockito is not handling this well:
const mockRPCClient = {
streamFetch: (metadata?: Metadata, options?: Partial<CallOptions>) => {
return err(new HubError("unavailable", "unavailable"));
},
getAllLinkMessagesByFid: async (_request: FidRequest, _metadata: Metadata, _options: Partial<CallOptions>) => {
return ok(
MessagesResponse.create({
messages: [linkAddMessage],
nextPageToken: undefined,
}),
);
},
getAllCastMessagesByFid: async (_request: FidRequest, _metadata: Metadata, _options: Partial<CallOptions>) => {
// force wait for 2 seconds to trigger failure
await new Promise((resolve) => setTimeout(resolve, 5000));
return ok(
MessagesResponse.create({
messages: [
/* Pretend this message is missing from the hub */
],
nextPageToken: undefined,
}),
);
},
getAllVerificationMessagesByFid: async (
_request: FidRequest,
_metadata: Metadata,
_options: Partial<CallOptions>,
) => {
return ok(
MessagesResponse.create({
messages: [],
nextPageToken: undefined,
}),
);
},
getAllReactionMessagesByFid: async (
_request: FidRequest,
_metadata: Metadata,
_options: Partial<CallOptions>,
) => {
return ok(
MessagesResponse.create({
messages: [],
nextPageToken: undefined,
}),
);
},
getLinkCompactStateMessageByFid: async (
_request: FidRequest,
_metadata: Metadata,
_options: Partial<CallOptions>,
) => {
// force wait for 2 seconds to trigger failure
await new Promise((resolve) => setTimeout(resolve, 5000));
return ok(
MessagesResponse.create({
messages: [],
nextPageToken: undefined,
}),
);
},
getAllUserDataMessagesByFid: async (
_request: FidRequest,
_metadata: Metadata,
_options: Partial<CallOptions>,
) => {
return ok(
MessagesResponse.create({
messages: [],
nextPageToken: undefined,
}),
);
},
};

// Only include 2 of the 3 messages in the time window
const reconciler = new MessageReconciliation(mockRPCClient as unknown as HubRpcClient, db, log);
const messagesOnHub: Message[] = [];
const messagesInDb: {
hash: Uint8Array;
prunedAt: Date | null;
revokedAt: Date | null;
fid: number;
type: MessageType;
raw: Uint8Array;
signer: Uint8Array;
}[] = [];
await expect(
reconciler.reconcileMessagesForFid(
linkAddMessage.data.fid,
async (msg, _missing, _pruned, _revoked) => {
messagesOnHub.push(msg);
},
async (dbMsg, _missing) => {
messagesInDb.push(dbMsg);
},
startTimestamp - 1,
startTimestamp,
),
).rejects.toThrow();
}, 15000); // Need to make sure this is long enough to handle the timeout termination

test("marks messages as pruned", async () => {
const addMessage = await Factories.ReactionAddMessage.create({}, { transient: { signer } });
subscriber.addMessageCallback((msg, operation, state, isNew, wasMissed) => {
Expand Down
14 changes: 12 additions & 2 deletions packages/shuttle/src/shuttle/messageReconciliation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,26 +181,36 @@ export class MessageReconciliation {
) {
const id = randomUUID();
const result = new Promise<HubResult<MessagesResponse>>((resolve) => {
// Do not allow hanging unresponsive connections to linger:
const cancel = setTimeout(() => resolve(err(new HubError("unavailable", "server timeout"))), 5000);

if (!this.stream) {
resolve(fallback());
fallback()
.then((result) => resolve(result))
.finally(() => clearTimeout(cancel));
return;
}
const process = async (response: StreamFetchResponse) => {
if (!this.stream) {
clearTimeout(cancel);
resolve(err(new HubError("unavailable", "unexpected stream termination")));
return;
}
this.stream.off("data", process);
if (response.idempotencyKey !== id || !response.messages) {
if (response?.error) {
clearTimeout(cancel);
resolve(err(new HubError(response.error.errCode as HubErrorCode, { message: response.error.message })));
return;
}

this.stream.cancel();
this.stream = undefined;
resolve(fallback());
fallback()
.then((result) => resolve(result))
.finally(() => clearTimeout(cancel));
} else {
clearTimeout(cancel);
resolve(ok(response.messages));
}
};
Expand Down

0 comments on commit 386059a

Please sign in to comment.