From d51ee94cb9b6c5f3966d5e41db61a1263e47ea4f Mon Sep 17 00:00:00 2001 From: Nazar Hussain Date: Fri, 8 Sep 2023 17:32:28 +0200 Subject: [PATCH] fix: network worker not shutting down (#5946) * Fix the close handler for the worker * Add retry to exit the thread * Update code with feedback --- .../src/network/core/networkCoreWorker.ts | 26 +++++----- .../network/core/networkCoreWorkerHandler.ts | 50 ++++++++++++------- .../beacon-node/src/network/core/types.ts | 3 ++ packages/beacon-node/src/util/workerEvents.ts | 34 +++++++++++++ 4 files changed, 81 insertions(+), 32 deletions(-) diff --git a/packages/beacon-node/src/network/core/networkCoreWorker.ts b/packages/beacon-node/src/network/core/networkCoreWorker.ts index ef3408038343..a0c8ff22fe60 100644 --- a/packages/beacon-node/src/network/core/networkCoreWorker.ts +++ b/packages/beacon-node/src/network/core/networkCoreWorker.ts @@ -1,21 +1,18 @@ -import worker from "node:worker_threads"; import fs from "node:fs"; import path from "node:path"; -import {createFromProtobuf} from "@libp2p/peer-id-factory"; +import worker from "node:worker_threads"; +import type {ModuleThread} from "@chainsafe/threads"; import {expose} from "@chainsafe/threads/worker"; -import type {WorkerModule} from "@chainsafe/threads/dist/types/worker.js"; +import {createFromProtobuf} from "@libp2p/peer-id-factory"; import {chainConfigFromJson, createBeaconConfig} from "@lodestar/config"; import {getNodeLogger} from "@lodestar/logger/node"; -import {collectNodeJSMetrics, RegistryMetricCreator} from "../../metrics/index.js"; +import {RegistryMetricCreator, collectNodeJSMetrics} from "../../metrics/index.js"; import {AsyncIterableBridgeCaller, AsyncIterableBridgeHandler} from "../../util/asyncIterableToEvents.js"; import {Clock} from "../../util/clock.js"; -import {wireEventsOnWorkerThread} from "../../util/workerEvents.js"; -import {NetworkEventBus, NetworkEventData, networkEventDirection} from "../events.js"; import {peerIdToString} from "../../util/peerId.js"; import {profileNodeJS} from "../../util/profile.js"; -import {getNetworkCoreWorkerMetrics} from "./metrics.js"; -import {NetworkWorkerApi, NetworkWorkerData} from "./types.js"; -import {NetworkCore} from "./networkCore.js"; +import {NetworkEventBus, NetworkEventData, networkEventDirection} from "../events.js"; +import {wireEventsOnWorkerThread} from "../../util/workerEvents.js"; import { NetworkWorkerThreadEventType, ReqRespBridgeEventBus, @@ -24,8 +21,11 @@ import { getReqRespBridgeRespEvents, reqRespBridgeEventDirection, } from "./events.js"; +import {getNetworkCoreWorkerMetrics} from "./metrics.js"; +import {NetworkCore} from "./networkCore.js"; +import {NetworkWorkerApi, NetworkWorkerData} from "./types.js"; -// Cloned data from instatiation +// Cloned data from instantiation const workerData = worker.workerData as NetworkWorkerData; const parentPort = worker.parentPort; // eslint-disable-next-line @typescript-eslint/strict-boolean-expressions @@ -120,9 +120,9 @@ wireEventsOnWorkerThread( ); const libp2pWorkerApi: NetworkWorkerApi = { - close: () => { + close: async () => { abortController.abort(); - return core.close(); + await core.close(); }, scrapeMetrics: () => core.scrapeMetrics(), @@ -162,4 +162,4 @@ const libp2pWorkerApi: NetworkWorkerApi = { }, }; -expose(libp2pWorkerApi as WorkerModule); +expose(libp2pWorkerApi as ModuleThread); diff --git a/packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts b/packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts index 46c06456c429..73ca9e9c5fd0 100644 --- a/packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts +++ b/packages/beacon-node/src/network/core/networkCoreWorkerHandler.ts @@ -1,24 +1,23 @@ import worker_threads from "node:worker_threads"; -import {exportToProtobuf} from "@libp2p/peer-id-factory"; -import {PeerId} from "@libp2p/interface/peer-id"; import {PeerScoreStatsDump} from "@chainsafe/libp2p-gossipsub/dist/src/score/peer-score.js"; import {PublishOpts} from "@chainsafe/libp2p-gossipsub/types"; -import {spawn, Thread, Worker} from "@chainsafe/threads"; +import {ModuleThread, Thread, Worker, spawn} from "@chainsafe/threads"; +import {PeerId} from "@libp2p/interface/peer-id"; +import {exportToProtobuf} from "@libp2p/peer-id-factory"; import {routes} from "@lodestar/api"; -import {phase0} from "@lodestar/types"; -import {ResponseIncoming, ResponseOutgoing} from "@lodestar/reqresp"; import {BeaconConfig, chainConfigToJson} from "@lodestar/config"; import type {LoggerNode} from "@lodestar/logger/node"; -import {AsyncIterableBridgeCaller, AsyncIterableBridgeHandler} from "../../util/asyncIterableToEvents.js"; -import {wireEventsOnMainThread} from "../../util/workerEvents.js"; +import {ResponseIncoming, ResponseOutgoing} from "@lodestar/reqresp"; +import {phase0} from "@lodestar/types"; import {Metrics} from "../../metrics/index.js"; -import {IncomingRequestArgs, OutgoingRequestArgs, GetReqRespHandlerFn} from "../reqresp/types.js"; +import {AsyncIterableBridgeCaller, AsyncIterableBridgeHandler} from "../../util/asyncIterableToEvents.js"; +import {peerIdFromString} from "../../util/peerId.js"; +import {terminateWorkerThread, wireEventsOnMainThread} from "../../util/workerEvents.js"; import {NetworkEventBus, NetworkEventData, networkEventDirection} from "../events.js"; -import {CommitteeSubscription} from "../subnets/interface.js"; -import {PeerAction, PeerScoreStats} from "../peers/index.js"; import {NetworkOptions} from "../options.js"; -import {peerIdFromString} from "../../util/peerId.js"; -import {NetworkWorkerApi, NetworkWorkerData, INetworkCore, MultiaddrStr, PeerIdStr} from "./types.js"; +import {PeerAction, PeerScoreStats} from "../peers/index.js"; +import {GetReqRespHandlerFn, IncomingRequestArgs, OutgoingRequestArgs} from "../reqresp/types.js"; +import {CommitteeSubscription} from "../subnets/interface.js"; import { NetworkWorkerThreadEventType, ReqRespBridgeEventBus, @@ -27,6 +26,7 @@ import { getReqRespBridgeRespEvents, reqRespBridgeEventDirection, } from "./events.js"; +import {INetworkCore, MultiaddrStr, NetworkWorkerApi, NetworkWorkerData, PeerIdStr} from "./types.js"; export type WorkerNetworkCoreOpts = NetworkOptions & { metricsEnabled: boolean; @@ -47,10 +47,13 @@ export type WorkerNetworkCoreInitModules = { }; type WorkerNetworkCoreModules = WorkerNetworkCoreInitModules & { - workerApi: NetworkWorkerApi; + networkThreadApi: ModuleThread; worker: Worker; }; +const NETWORK_WORKER_EXIT_TIMEOUT_MS = 1000; +const NETWORK_WORKER_EXIT_RETRY_COUNT = 3; + /** * NetworkCore implementation using a Worker thread */ @@ -81,6 +84,10 @@ export class WorkerNetworkCore implements INetworkCore { reqRespBridgeEventDirection ); + Thread.errors(modules.networkThreadApi).subscribe((err) => { + this.modules.logger.error("Network worker thread error", {}, err); + }); + const {metrics} = modules; if (metrics) { metrics.networkWorkerHandler.reqRespBridgeReqCallerPending.addCollect(() => { @@ -124,16 +131,16 @@ export class WorkerNetworkCore implements INetworkCore { } as ConstructorParameters[1]); // eslint-disable-next-line @typescript-eslint/no-explicit-any - const workerApi = (await spawn(worker, { + const networkThreadApi = (await spawn(worker, { // A Lodestar Node may do very expensive task at start blocking the event loop and causing // the initialization to timeout. The number below is big enough to almost disable the timeout timeout: 5 * 60 * 1000, // TODO: types are broken on spawn, which claims that `NetworkWorkerApi` does not satifies its contrains - })) as unknown as NetworkWorkerApi; + })) as unknown as ModuleThread; return new WorkerNetworkCore({ ...modules, - workerApi, + networkThreadApi, worker, }); } @@ -141,7 +148,12 @@ export class WorkerNetworkCore implements INetworkCore { async close(): Promise { await this.getApi().close(); this.modules.logger.debug("terminating network worker"); - await Thread.terminate(this.modules.workerApi as unknown as Thread); + await terminateWorkerThread({ + worker: this.getApi(), + retryCount: NETWORK_WORKER_EXIT_RETRY_COUNT, + retryMs: NETWORK_WORKER_EXIT_TIMEOUT_MS, + logger: this.modules.logger, + }); this.modules.logger.debug("terminated network worker"); } @@ -231,7 +243,7 @@ export class WorkerNetworkCore implements INetworkCore { return this.getApi().writeDiscv5Profile(durationMs, dirpath); } - private getApi(): NetworkWorkerApi { - return this.modules.workerApi; + private getApi(): ModuleThread { + return this.modules.networkThreadApi; } } diff --git a/packages/beacon-node/src/network/core/types.ts b/packages/beacon-node/src/network/core/types.ts index d36d339e9a97..790c532aa2a4 100644 --- a/packages/beacon-node/src/network/core/types.ts +++ b/packages/beacon-node/src/network/core/types.ts @@ -87,6 +87,9 @@ export type NetworkWorkerData = { * API exposed by the libp2p worker */ export type NetworkWorkerApi = INetworkCorePublic & { + // To satisfy the constraint of `ModuleThread` type + // eslint-disable-next-line @typescript-eslint/no-explicit-any + [string: string]: (...args: any[]) => Promise | any; // Async method through worker boundary reportPeer(peer: PeerIdStr, action: PeerAction, actionName: string): Promise; reStatusPeers(peers: PeerIdStr[]): Promise; diff --git a/packages/beacon-node/src/util/workerEvents.ts b/packages/beacon-node/src/util/workerEvents.ts index 8926b6d18cb4..cd61e6b95393 100644 --- a/packages/beacon-node/src/util/workerEvents.ts +++ b/packages/beacon-node/src/util/workerEvents.ts @@ -1,4 +1,7 @@ import {MessagePort, Worker} from "node:worker_threads"; +import {Thread} from "@chainsafe/threads"; +import {Logger} from "@lodestar/logger"; +import {sleep} from "@lodestar/utils"; import {StrictEventEmitterSingleArg} from "./strictEvents.js"; export type WorkerBridgeEvent = { @@ -85,3 +88,34 @@ export function wireEventsOnMainThread( } } } + +export async function terminateWorkerThread({ + worker, + retryMs, + retryCount, + logger, +}: { + worker: Thread; + retryMs: number; + retryCount: number; + logger?: Logger; +}): Promise { + const terminated = new Promise((resolve) => { + Thread.events(worker).subscribe((event) => { + if (event.type === "termination") { + resolve(true); + } + }); + }); + + for (let i = 0; i < retryCount; i++) { + await Thread.terminate(worker); + const result = await Promise.race([terminated, sleep(retryMs).then(() => false)]); + + if (result) return; + + logger?.warn("Worker thread failed to terminate, retrying..."); + } + + throw new Error(`Worker thread failed to terminate in ${retryCount * retryMs}ms.`); +}