Skip to content

Commit

Permalink
Retry disconnect after connect when disconnect already happened (#16096)
Browse files Browse the repository at this point in the history
## Description

In prod, we have seen clients disconnect socket within 0.1ms after we
log `connect_document` message. Even that telemetry is slightly delayed
by a few operations before the log, so the actual time between
`connect_document` message and `disconnect` event is likely even smaller
in these cases.

If `disconnect` is happening, essentially, immediately after
`connect_document` message, then the work in #15769 is not enough to
catch this race-condition, because it depends on variables that are set
a few ticks after `connect_document` is received.

To be absolutely certain that we catch this race condition, we can add a
disconnect retry _after_ connectDocument completes whenever disconnect
has already happened.

## Reviewer Guidance

I know the code isn't super clean. Alfred Lambda is getting pretty
messy. I think some cleanup of that 1,100 line file is warranted, but I
don't have the time right now.
  • Loading branch information
znewton authored Jul 3, 2023
1 parent 3bb86f7 commit 26a89a4
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 71 deletions.
227 changes: 156 additions & 71 deletions server/routerlicious/packages/lambdas/src/alfred/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ export function configureWebSocketServices(

let connectDocumentComplete: boolean = false;
let connectDocumentP: Promise<void> | undefined;
const clientIdConnectionsDisconnected = new Set<string>();
const clientIdClientsDisconnected = new Set<string>();
let disconnectDocumentP: Promise<void> | undefined;

// Timer to check token expiry for this socket connection
let expirationTimer: NodeJS.Timer | undefined;
Expand Down Expand Up @@ -621,6 +624,101 @@ export function configureWebSocketServices(
};
}

async function disconnectDocument() {
// Clear token expiration timer on disconnection
clearExpirationTimer();
const removeAndStoreP: Promise<void>[] = [];
// Send notification messages for all client IDs in the connection map
for (const [clientId, connection] of connectionsMap) {
if (clientIdConnectionsDisconnected.has(clientId)) {
// We already removed this clientId once. Skip it.
continue;
}
const messageMetaData = getMessageMetadata(
connection.documentId,
connection.tenantId,
);
logger.info(`Disconnect of ${clientId}`, { messageMetaData });
Lumberjack.info(
`Disconnect of ${clientId}`,
getLumberBaseProperties(connection.documentId, connection.tenantId),
);

connection
.disconnect()
.then(() => {
// Keep track of disconnected clientIds so that we don't repeat the disconnect signal
// for the same clientId if retrying when connectDocument completes after disconnectDocument.
clientIdConnectionsDisconnected.add(clientId);
})
.catch((error) => {
const errorMsg = `Failed to disconnect client ${clientId} from orderer connection.`;
Lumberjack.error(
errorMsg,
getLumberBaseProperties(connection.documentId, connection.tenantId),
error,
);
});
if (isClientConnectivityCountingEnabled && throttleAndUsageStorageManager) {
const connectionTimestamp = connectionTimeMap.get(clientId);
if (connectionTimestamp) {
removeAndStoreP.push(
storeClientConnectivityTime(
clientId,
connection.documentId,
connection.tenantId,
connectionTimestamp,
throttleAndUsageStorageManager,
),
);
}
}
}
// Send notification messages for all client IDs in the room map
for (const [clientId, room] of roomMap) {
if (clientIdClientsDisconnected.has(clientId)) {
// We already removed this clientId once. Skip it.
continue;
}
const messageMetaData = getMessageMetadata(room.documentId, room.tenantId);
// excluding summarizer for total client count.
if (connectionTimeMap.has(clientId)) {
connectionCountLogger.decrementConnectionCount();
}
logger.info(`Disconnect of ${clientId} from room`, { messageMetaData });
Lumberjack.info(
`Disconnect of ${clientId} from room`,
getLumberBaseProperties(room.documentId, room.tenantId),
);
removeAndStoreP.push(
clientManager
.removeClient(room.tenantId, room.documentId, clientId)
.then(() => {
// Keep track of disconnected clientIds so that we don't repeat the disconnect signal
// for the same clientId if retrying when connectDocument completes after disconnectDocument.
clientIdClientsDisconnected.add(clientId);
}),
);
socket
.emitToRoom(getRoomId(room), "signal", createRoomLeaveMessage(clientId))
.catch((error) => {
const errorMsg = `Failed to emit signal to room ${clientId}, ${getRoomId(
room,
)}.`;
Lumberjack.error(
errorMsg,
getLumberBaseProperties(room.documentId, room.tenantId),
error,
);
});
}
// Clear socket tracker upon disconnection
if (socketTracker) {
socketTracker.removeSocket(socket.id);
}
await Promise.all(removeAndStoreP);
}

// Note connect is a reserved socket.io word so we use connect_document to represent the connect request
// eslint-disable-next-line @typescript-eslint/no-misused-promises
socket.on("connect_document", async (connectionMessage: IConnect) => {
Expand Down Expand Up @@ -667,6 +765,62 @@ export function configureWebSocketServices(
})
.finally(() => {
connectDocumentComplete = true;
if (disconnectDocumentP) {
Lumberjack.warning(
`ConnectDocument completed after disconnect was handled.`,
);
// We have already received disconnect for this connection.
disconnectDocumentP.finally(() => {
// We might need to re-run disconnect handler after previous disconnect handler completes.
// DisconnectDocument internally handles the cases where we have already run disconnect for
// roomsMap and connectionsMap so that we don't duplicate disconnect efforts.
// The primary need for this retry is when we receive "disconnect" in the narrow window after
// "connect_document" but before "connectDocumentP" is defined.
const alreadyDisconnectedAllConnections: boolean =
connectionsMap.size === clientIdConnectionsDisconnected.size;
const alreadyDisconnectedAllClients: boolean =
roomMap.size === clientIdClientsDisconnected.size;
if (
alreadyDisconnectedAllConnections &&
alreadyDisconnectedAllClients
) {
// Don't retry disconnect if all connections and clients are already handled.
return;
}

const disconnectRetryMetric = Lumberjack.newLumberMetric(
LumberEventName.DisconnectDocumentRetry,
);
disconnectRetryMetric.setProperties({
[CommonProperties.connectionCount]: connectionsMap.size,
[CommonProperties.connectionClients]: JSON.stringify(
Array.from(connectionsMap.keys()),
),
[CommonProperties.roomClients]: JSON.stringify(
Array.from(roomMap.keys()),
),
});

if (roomMap.size >= 1) {
const rooms = Array.from(roomMap.values());
const documentId = rooms[0].documentId;
const tenantId = rooms[0].tenantId;
disconnectRetryMetric.setProperties({
...getLumberBaseProperties(documentId, tenantId),
});
}

disconnectDocument()
.then(() => {
disconnectRetryMetric.success(
`Successfully retried disconnect.`,
);
})
.catch((error) => {
disconnectRetryMetric.error(`Disconnect retry failed.`, error);
});
});
}
});
});

Expand Down Expand Up @@ -896,77 +1050,8 @@ export function configureWebSocketServices(
}

try {
clearExpirationTimer();
const removeAndStoreP: Promise<void>[] = [];
// Send notification messages for all client IDs in the connection map
for (const [clientId, connection] of connectionsMap) {
const messageMetaData = getMessageMetadata(
connection.documentId,
connection.tenantId,
);
logger.info(`Disconnect of ${clientId}`, { messageMetaData });
Lumberjack.info(
`Disconnect of ${clientId}`,
getLumberBaseProperties(connection.documentId, connection.tenantId),
);

connection.disconnect().catch((error) => {
const errorMsg = `Failed to disconnect client ${clientId} from orderer connection.`;
Lumberjack.error(
errorMsg,
getLumberBaseProperties(connection.documentId, connection.tenantId),
error,
);
});
if (isClientConnectivityCountingEnabled && throttleAndUsageStorageManager) {
const connectionTimestamp = connectionTimeMap.get(clientId);
if (connectionTimestamp) {
removeAndStoreP.push(
storeClientConnectivityTime(
clientId,
connection.documentId,
connection.tenantId,
connectionTimestamp,
throttleAndUsageStorageManager,
),
);
}
}
}
// Send notification messages for all client IDs in the room map
for (const [clientId, room] of roomMap) {
const messageMetaData = getMessageMetadata(room.documentId, room.tenantId);
// excluding summarizer for total client count.
if (connectionTimeMap.has(clientId)) {
connectionCountLogger.decrementConnectionCount();
}
logger.info(`Disconnect of ${clientId} from room`, { messageMetaData });
Lumberjack.info(
`Disconnect of ${clientId} from room`,
getLumberBaseProperties(room.documentId, room.tenantId),
);
removeAndStoreP.push(
clientManager.removeClient(room.tenantId, room.documentId, clientId),
);
socket
.emitToRoom(getRoomId(room), "signal", createRoomLeaveMessage(clientId))
.catch((error) => {
const errorMsg = `Failed to emit signal to room ${clientId}, ${getRoomId(
room,
)}.`;
Lumberjack.error(
errorMsg,
getLumberBaseProperties(room.documentId, room.tenantId),
error,
);
});
}
// Clear socket tracker upon disconnection
if (socketTracker) {
socketTracker.removeSocket(socket.id);
}
await Promise.all(removeAndStoreP);

disconnectDocumentP = disconnectDocument();
await disconnectDocumentP;
disconnectMetric.success(`Successfully disconnected.`);
} catch (error) {
disconnectMetric.error(`Disconnect failed.`, error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export enum LumberEventName {
CreateDocumentUpdateDocumentCollection = "CreateDocumentUpdateDocumentCollection",
CreateDocInitialSummaryWrite = "CreateDocInitialSummaryWrite",
DisconnectDocument = "DisconnectDocument",
DisconnectDocumentRetry = "DisconnectDocumentRetry",
RiddlerFetchTenantKey = "RiddlerFetchTenantKey",
HttpRequest = "HttpRequest",
TotalConnectionCount = "TotalConnectionCount",
Expand Down

0 comments on commit 26a89a4

Please sign in to comment.