diff --git a/Cargo.lock b/Cargo.lock index 20d9d1c11cc..d9bb6a27c3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1909,6 +1909,7 @@ dependencies = [ "anyhow", "napi", "napi-build", + "once_cell", "serde", ] diff --git a/crates/node-bindings/src/parcel/parcel.rs b/crates/node-bindings/src/parcel/parcel.rs index 528f91c646a..0b27436133a 100644 --- a/crates/node-bindings/src/parcel/parcel.rs +++ b/crates/node-bindings/src/parcel/parcel.rs @@ -1,15 +1,21 @@ +use std::sync::mpsc::channel; +use std::sync::mpsc::Sender; use std::sync::Arc; use std::thread; use napi::Env; +use napi::JsFunction; use napi::JsObject; +use napi::JsUnknown; use napi_derive::napi; use parcel::file_system::FileSystemRef; +use parcel::rpc::nodejs::NodejsWorker; use parcel::rpc::nodejs::RpcHostNodejs; use parcel::rpc::RpcHostRef; use parcel::Parcel; use parcel_core::types::ParcelOptions; +use parcel_napi_helpers::JsTransferable; use parcel_package_manager::PackageManagerRef; use crate::file_system::FileSystemNapi; @@ -19,7 +25,9 @@ use super::tracer::Tracer; use super::tracer::TracerMode; #[napi(object)] -pub struct ParcelNapiBuildOptions {} +pub struct ParcelNapiBuildOptions { + pub register_worker: JsFunction, +} #[napi(object)] pub struct ParcelNapiBuildResult {} @@ -42,6 +50,7 @@ pub struct ParcelNapi { package_manager: Option, rpc: Option, tracer: Tracer, + tx_worker: Sender, } #[napi] @@ -82,7 +91,8 @@ impl ParcelNapi { .map(|w| w as usize) .unwrap_or_else(|| threads); - let rpc_host_nodejs = RpcHostNodejs::new(node_worker_count)?; + let (tx_worker, rx_worker) = channel::(); + let rpc_host_nodejs = RpcHostNodejs::new(node_worker_count, rx_worker)?; let rpc = Some::(Arc::new(rpc_host_nodejs)); Ok(Self { @@ -92,14 +102,22 @@ impl ParcelNapi { package_manager, rpc, tracer, + tx_worker, }) } #[napi] - pub fn build(&self, env: Env, _options: ParcelNapiBuildOptions) -> napi::Result { + pub fn build(&self, env: Env, options: ParcelNapiBuildOptions) -> napi::Result { let (deferred, promise) = env.create_deferred()?; - // Both the parcel initialisation and build must be run a dedicated system thread so that + for _ in 0..self.node_worker_count { + let transferable = JsTransferable::new(self.tx_worker.clone()); + options + .register_worker + .call1::>, JsUnknown>(transferable)?; + } + + // Both the parcel initialization and build must be run a dedicated system thread so that // the napi threadsafe functions do not panic thread::spawn({ let fs = self.fs.clone(); diff --git a/crates/node-bindings/src/parcel/worker.rs b/crates/node-bindings/src/parcel/worker.rs index 1ec22dc6e28..71e9a82fa2f 100644 --- a/crates/node-bindings/src/parcel/worker.rs +++ b/crates/node-bindings/src/parcel/worker.rs @@ -1,10 +1,40 @@ +use std::sync::mpsc::Sender; + use napi::{Env, JsObject, JsUndefined}; use napi_derive::napi; use parcel::rpc::nodejs::NodejsWorker; +use parcel_napi_helpers::JsTransferable; +/// This function is run in the Nodejs worker context upon initialization +/// to notify the main thread that a Nodejs worker thread has started +/// +/// A Rust channel is transferred to the worker via JavaScript `worker.postMessage`. +/// The worker then calls `register_worker`, supplying it with an object containing +/// callbacks. +/// +/// The callbacks are later called from the main thread to send work to the worker. +/// +/// |-------------| --- Init channel ----> |-------------------| +/// | Main Thread | | Worker Thread (n) | +/// |-------------| <-- Worker wrapper --- |-------------------| +/// +/// **Later During Build** +/// +/// -- Resolver.resolve --> +/// <- DependencyResult --- +/// +/// -- Transf.transform --> +/// <--- Array ----- #[napi] -pub fn register_worker(env: Env, worker: JsObject) -> napi::Result { +pub fn register_worker( + env: Env, + channel: JsTransferable>, + worker: JsObject, +) -> napi::Result { let worker = NodejsWorker::new(worker)?; - NodejsWorker::register_worker(worker); + let tx_worker = channel.take()?; + if tx_worker.send(worker).is_err() { + return Err(napi::Error::from_reason("Unable to register worker")); + } env.get_undefined() } diff --git a/crates/parcel_napi_helpers/Cargo.toml b/crates/parcel_napi_helpers/Cargo.toml index 98af85c4cfc..49b5c285e8f 100644 --- a/crates/parcel_napi_helpers/Cargo.toml +++ b/crates/parcel_napi_helpers/Cargo.toml @@ -10,6 +10,7 @@ edition = "2021" serde = "1" anyhow = "1" napi = { version = "2", features = ["serde-json", "napi4", "napi5", "async"] } +once_cell = { version = "1" } [target.'cfg(target_arch = "wasm32")'.dependencies] napi = { version = "2", features = ["serde-json"] } diff --git a/crates/parcel_napi_helpers/src/lib.rs b/crates/parcel_napi_helpers/src/lib.rs index 4949086c839..e38d2c7c085 100644 --- a/crates/parcel_napi_helpers/src/lib.rs +++ b/crates/parcel_napi_helpers/src/lib.rs @@ -5,8 +5,10 @@ mod anyhow; mod call_method; mod console_log; mod get_function; +mod transferable; pub use self::anyhow::*; pub use self::call_method::*; pub use self::console_log::*; pub use self::get_function::*; +pub use self::transferable::*; diff --git a/crates/parcel_napi_helpers/src/transferable.rs b/crates/parcel_napi_helpers/src/transferable.rs new file mode 100644 index 00000000000..50825a82e5f --- /dev/null +++ b/crates/parcel_napi_helpers/src/transferable.rs @@ -0,0 +1,89 @@ +use std::any; +use std::any::Any; +use std::collections::HashMap; +use std::marker::PhantomData; +use std::sync::atomic::AtomicI32; +use std::sync::atomic::Ordering; +use std::sync::Mutex; + +use napi::bindgen_prelude::FromNapiValue; +use napi::bindgen_prelude::ToNapiValue; +use napi::Env; +use napi::JsNumber; +use napi::NapiRaw; +use once_cell::sync::Lazy; + +static COUNTER: AtomicI32 = AtomicI32::new(0); +static VALUES: Lazy>>> = + Lazy::new(|| Default::default()); + +/// Creates an external reference to a Rust value and +/// makes it transferable across Nodejs workers +/// +/// This is to get around the limitations of what can be transferred +/// between workers in Nodejs +/// +/// https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Transferable_objects +pub struct JsTransferable { + id: i32, + _value: PhantomData, +} + +impl JsTransferable { + /// Put a Rust value into a Transferable container to allow + /// sending values to Nodejs workers via postMessage or workerData + pub fn new(value: T) -> Self { + let id = COUNTER.fetch_add(1, Ordering::Relaxed); + + VALUES.lock().unwrap().insert(id.clone(), Box::new(value)); + Self { + id, + _value: Default::default(), + } + } + + /// Take the value out of Transferable, so it can no longer be accessed + pub fn take(self) -> napi::Result { + let Some(value) = VALUES.lock().unwrap().remove(&self.id) else { + return Err(napi::Error::from_reason(format!( + "JsTransferableError::NotExists: id({})", + self.id + ))); + }; + let Ok(val) = value.downcast::() else { + return Err(napi::Error::from_reason(format!( + "JsTransferableError::InvalidDowncast: id({}) type({})", + self.id, + any::type_name::() + ))); + }; + Ok(*val) + } +} + +/// Allows Transferable to be returned from a Napi functions +impl ToNapiValue for JsTransferable { + unsafe fn to_napi_value( + env: napi::sys::napi_env, + val: Self, + ) -> napi::Result { + let env = Env::from_raw(env); + let pointer = env.create_int32(val.id.clone())?; + Ok(pointer.raw()) + } +} + +/// Allows Transferable to be accepted as an argument for a Napi function +impl FromNapiValue for JsTransferable { + unsafe fn from_napi_value( + env: napi::sys::napi_env, + napi_val: napi::sys::napi_value, + ) -> napi::Result { + let pointer = JsNumber::from_napi_value(env, napi_val)?; + let id = pointer.get_int32()?; + Ok(Self { + id, + _value: Default::default(), + }) + } +} diff --git a/crates/parcel_plugin_rpc/src/nodejs/mod.rs b/crates/parcel_plugin_rpc/src/nodejs/mod.rs index d78c80c78b1..282e9faaf33 100644 --- a/crates/parcel_plugin_rpc/src/nodejs/mod.rs +++ b/crates/parcel_plugin_rpc/src/nodejs/mod.rs @@ -1,7 +1,6 @@ mod rpc_conn_nodejs; mod rpc_conns_nodejs; mod rpc_host_nodejs; -mod worker_init; pub use rpc_conn_nodejs::*; pub use rpc_conns_nodejs::*; diff --git a/crates/parcel_plugin_rpc/src/nodejs/rpc_conn_nodejs.rs b/crates/parcel_plugin_rpc/src/nodejs/rpc_conn_nodejs.rs index fa114c551fe..9ca35fa18ac 100644 --- a/crates/parcel_plugin_rpc/src/nodejs/rpc_conn_nodejs.rs +++ b/crates/parcel_plugin_rpc/src/nodejs/rpc_conn_nodejs.rs @@ -4,8 +4,6 @@ use parcel_napi_helpers::js_callable::JsCallable; use crate::RpcWorker; -use super::worker_init::register_worker; - /// RpcConnectionNodejs wraps the communication with a /// single Nodejs worker thread pub struct NodejsWorker { @@ -18,10 +16,6 @@ impl NodejsWorker { ping_fn: JsCallable::new_from_object_prop_bound("ping", &delegate)?, }) } - - pub fn register_worker(worker: Self) { - register_worker(worker) - } } impl RpcWorker for NodejsWorker { diff --git a/crates/parcel_plugin_rpc/src/nodejs/rpc_host_nodejs.rs b/crates/parcel_plugin_rpc/src/nodejs/rpc_host_nodejs.rs index 8eec97a0443..e40bcf57a68 100644 --- a/crates/parcel_plugin_rpc/src/nodejs/rpc_host_nodejs.rs +++ b/crates/parcel_plugin_rpc/src/nodejs/rpc_host_nodejs.rs @@ -1,28 +1,40 @@ +use std::sync::mpsc::Receiver; use std::sync::Arc; +use anyhow::anyhow; +use parking_lot::Mutex; + use crate::RpcHost; use crate::RpcWorkerRef; -use super::worker_init::get_worker; +use super::NodejsWorker; use super::NodejsWorkerFarm; pub struct RpcHostNodejs { node_workers: usize, + rx_worker: Mutex>, } impl RpcHostNodejs { - pub fn new(node_workers: usize) -> napi::Result { - Ok(Self { node_workers }) + pub fn new(node_workers: usize, rx_worker: Receiver) -> napi::Result { + Ok(Self { + node_workers, + rx_worker: Mutex::new(rx_worker), + }) } } // Forward events to Nodejs impl RpcHost for RpcHostNodejs { fn start(&self) -> anyhow::Result { + let rx_worker = self.rx_worker.lock(); let mut connections = vec![]; for _ in 0..self.node_workers { - connections.push(get_worker()) + let Ok(worker) = rx_worker.recv() else { + return Err(anyhow!("Unable to receive NodejsWorker")); + }; + connections.push(worker) } Ok(Arc::new(NodejsWorkerFarm::new(connections))) diff --git a/crates/parcel_plugin_rpc/src/nodejs/worker_init.rs b/crates/parcel_plugin_rpc/src/nodejs/worker_init.rs deleted file mode 100644 index 369a1cd67ec..00000000000 --- a/crates/parcel_plugin_rpc/src/nodejs/worker_init.rs +++ /dev/null @@ -1,55 +0,0 @@ -use std::{ - sync::mpsc::{channel, Sender}, - thread, -}; - -use once_cell::sync::Lazy; - -use super::NodejsWorker; - -enum WorkerInitMessage { - Subscribe(Sender), - Register(NodejsWorker), -} - -static WORKER_INIT: Lazy> = Lazy::new(|| { - let (tx_subscribe, rx_subscribe) = channel::(); - - thread::spawn(move || { - let mut subscribers = Vec::>::new(); - let mut workers = Vec::::new(); - - while let Ok(msg) = rx_subscribe.recv() { - match msg { - WorkerInitMessage::Subscribe(subscriber) => { - if let Some(rx_rpc) = workers.pop() { - subscriber.send(rx_rpc).unwrap(); - } else { - subscribers.push(subscriber); - } - } - WorkerInitMessage::Register(worker) => { - if let Some(subscriber) = subscribers.pop() { - subscriber.send(worker).unwrap(); - } else { - workers.push(worker); - } - } - } - } - }); - - tx_subscribe -}); - -pub fn get_worker() -> NodejsWorker { - let (tx, rx) = channel(); - WORKER_INIT.send(WorkerInitMessage::Subscribe(tx)).unwrap(); - rx.recv().unwrap() -} - -pub fn register_worker(worker: NodejsWorker) { - WORKER_INIT - .send(WorkerInitMessage::Register(worker)) - .unwrap(); -} diff --git a/packages/core/core/src/parcel-v3/ParcelV3.js b/packages/core/core/src/parcel-v3/ParcelV3.js index b8bd54ad536..a15b3f2bc26 100644 --- a/packages/core/core/src/parcel-v3/ParcelV3.js +++ b/packages/core/core/src/parcel-v3/ParcelV3.js @@ -4,6 +4,8 @@ import path from 'path'; import {Worker} from 'worker_threads'; import {ParcelNapi, type ParcelNapiOptions} from '@parcel/rust'; +const WORKER_PATH = path.join(__dirname, 'worker', 'index.js'); + export type ParcelV3Options = {| fs?: ParcelNapiOptions['fs'], nodeWorkers?: number, @@ -37,11 +39,16 @@ export class ParcelV3 { async build(): Promise { const workers = []; - for (let i = 0; i < this._internal.nodeWorkerCount; i++) { - workers.push(new Worker(path.join(__dirname, 'worker', 'index.js'))); - } - - let result = await this._internal.build(); + let result = await this._internal.build({ + registerWorker: tx_worker => { + let worker = new Worker(WORKER_PATH, { + workerData: { + tx_worker, + }, + }); + workers.push(worker); + }, + }); for (const worker of workers) worker.terminate(); return result; diff --git a/packages/core/core/src/parcel-v3/worker/worker.js b/packages/core/core/src/parcel-v3/worker/worker.js index 3ca4c6c3989..40a1031ac6e 100644 --- a/packages/core/core/src/parcel-v3/worker/worker.js +++ b/packages/core/core/src/parcel-v3/worker/worker.js @@ -1,5 +1,6 @@ // @flow import * as napi from '@parcel/rust'; +import {workerData} from 'worker_threads'; import type {ResolverNapi} from '../plugins/Resolver'; export class ParcelWorker { @@ -10,4 +11,4 @@ export class ParcelWorker { } } -napi.registerWorker(new ParcelWorker()); +napi.registerWorker(workerData.tx_worker, new ParcelWorker()); diff --git a/packages/core/rust/index.js.flow b/packages/core/rust/index.js.flow index 6e10c43978e..ddfebd0c993 100644 --- a/packages/core/rust/index.js.flow +++ b/packages/core/rust/index.js.flow @@ -9,6 +9,8 @@ import type { declare export var init: void | (() => void); +export type Transferable = {||}; + export type ProjectPath = any; export interface ConfigRequest { id: string; @@ -58,10 +60,14 @@ export type ParcelNapiOptions = {| |}, |}; +export type ParcelBuildOptions = {| + registerWorker: (channel: Transferable) => void | Promise, +|}; + declare export class ParcelNapi { nodeWorkerCount: number; constructor(options: ParcelNapiOptions): ParcelNapi; - build(): Promise; + build(options: ParcelBuildOptions): Promise; static defaultThreadCount(): number; testingTempFsReadToString(path: string): string; testingTempFsIsDir(path: string): boolean; @@ -69,7 +75,10 @@ declare export class ParcelNapi { testingRpcPing(): void; } -declare export function registerWorker(worker: any): void; +declare export function registerWorker( + channel: Transferable, + worker: any, +): void; declare export function initSentry(): void; declare export function closeSentry(): void;