Skip to content

Commit

Permalink
Worker thread initialization via JsTransferable (#9861)
Browse files Browse the repository at this point in the history
* Added JsTransferable and using it to init workers

Update crates/parcel_napi_helpers/src/transferable.rs

Co-authored-by: Monica <monica.j.olejniczak@gmail.com>

pr comments

small changes

don't need arc

* docs on register worker
  • Loading branch information
alshdavid authored Jul 30, 2024
1 parent 7fd97e4 commit 8cc5c8f
Show file tree
Hide file tree
Showing 13 changed files with 188 additions and 80 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 22 additions & 4 deletions crates/node-bindings/src/parcel/parcel.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
use std::sync::mpsc::channel;
use std::sync::mpsc::Sender;
use std::sync::Arc;
use std::thread;

use napi::Env;
use napi::JsFunction;
use napi::JsObject;
use napi::JsUnknown;
use napi_derive::napi;

use parcel::file_system::FileSystemRef;
use parcel::rpc::nodejs::NodejsWorker;
use parcel::rpc::nodejs::RpcHostNodejs;
use parcel::rpc::RpcHostRef;
use parcel::Parcel;
use parcel_core::types::ParcelOptions;
use parcel_napi_helpers::JsTransferable;
use parcel_package_manager::PackageManagerRef;

use crate::file_system::FileSystemNapi;
Expand All @@ -19,7 +25,9 @@ use super::tracer::Tracer;
use super::tracer::TracerMode;

#[napi(object)]
pub struct ParcelNapiBuildOptions {}
pub struct ParcelNapiBuildOptions {
pub register_worker: JsFunction,
}

#[napi(object)]
pub struct ParcelNapiBuildResult {}
Expand All @@ -42,6 +50,7 @@ pub struct ParcelNapi {
package_manager: Option<PackageManagerRef>,
rpc: Option<RpcHostRef>,
tracer: Tracer,
tx_worker: Sender<NodejsWorker>,
}

#[napi]
Expand Down Expand Up @@ -82,7 +91,8 @@ impl ParcelNapi {
.map(|w| w as usize)
.unwrap_or_else(|| threads);

let rpc_host_nodejs = RpcHostNodejs::new(node_worker_count)?;
let (tx_worker, rx_worker) = channel::<NodejsWorker>();
let rpc_host_nodejs = RpcHostNodejs::new(node_worker_count, rx_worker)?;
let rpc = Some::<RpcHostRef>(Arc::new(rpc_host_nodejs));

Ok(Self {
Expand All @@ -92,14 +102,22 @@ impl ParcelNapi {
package_manager,
rpc,
tracer,
tx_worker,
})
}

#[napi]
pub fn build(&self, env: Env, _options: ParcelNapiBuildOptions) -> napi::Result<JsObject> {
pub fn build(&self, env: Env, options: ParcelNapiBuildOptions) -> napi::Result<JsObject> {
let (deferred, promise) = env.create_deferred()?;

// Both the parcel initialisation and build must be run a dedicated system thread so that
for _ in 0..self.node_worker_count {
let transferable = JsTransferable::new(self.tx_worker.clone());
options
.register_worker
.call1::<JsTransferable<Sender<NodejsWorker>>, JsUnknown>(transferable)?;
}

// Both the parcel initialization and build must be run a dedicated system thread so that
// the napi threadsafe functions do not panic
thread::spawn({
let fs = self.fs.clone();
Expand Down
34 changes: 32 additions & 2 deletions crates/node-bindings/src/parcel/worker.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,40 @@
use std::sync::mpsc::Sender;

use napi::{Env, JsObject, JsUndefined};
use napi_derive::napi;
use parcel::rpc::nodejs::NodejsWorker;
use parcel_napi_helpers::JsTransferable;

/// This function is run in the Nodejs worker context upon initialization
/// to notify the main thread that a Nodejs worker thread has started
///
/// A Rust channel is transferred to the worker via JavaScript `worker.postMessage`.
/// The worker then calls `register_worker`, supplying it with an object containing
/// callbacks.
///
/// The callbacks are later called from the main thread to send work to the worker.
///
/// |-------------| --- Init channel ----> |-------------------|
/// | Main Thread | | Worker Thread (n) |
/// |-------------| <-- Worker wrapper --- |-------------------|
///
/// **Later During Build**
///
/// -- Resolver.resolve -->
/// <- DependencyResult ---
///
/// -- Transf.transform -->
/// <--- Array<Asset> -----
#[napi]
pub fn register_worker(env: Env, worker: JsObject) -> napi::Result<JsUndefined> {
pub fn register_worker(
env: Env,
channel: JsTransferable<Sender<NodejsWorker>>,
worker: JsObject,
) -> napi::Result<JsUndefined> {
let worker = NodejsWorker::new(worker)?;
NodejsWorker::register_worker(worker);
let tx_worker = channel.take()?;
if tx_worker.send(worker).is_err() {
return Err(napi::Error::from_reason("Unable to register worker"));
}
env.get_undefined()
}
1 change: 1 addition & 0 deletions crates/parcel_napi_helpers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ edition = "2021"
serde = "1"
anyhow = "1"
napi = { version = "2", features = ["serde-json", "napi4", "napi5", "async"] }
once_cell = { version = "1" }

[target.'cfg(target_arch = "wasm32")'.dependencies]
napi = { version = "2", features = ["serde-json"] }
Expand Down
2 changes: 2 additions & 0 deletions crates/parcel_napi_helpers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ mod anyhow;
mod call_method;
mod console_log;
mod get_function;
mod transferable;

pub use self::anyhow::*;
pub use self::call_method::*;
pub use self::console_log::*;
pub use self::get_function::*;
pub use self::transferable::*;
89 changes: 89 additions & 0 deletions crates/parcel_napi_helpers/src/transferable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use std::any;
use std::any::Any;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::atomic::AtomicI32;
use std::sync::atomic::Ordering;
use std::sync::Mutex;

use napi::bindgen_prelude::FromNapiValue;
use napi::bindgen_prelude::ToNapiValue;
use napi::Env;
use napi::JsNumber;
use napi::NapiRaw;
use once_cell::sync::Lazy;

static COUNTER: AtomicI32 = AtomicI32::new(0);
static VALUES: Lazy<Mutex<HashMap<i32, Box<dyn Any + Send + Sync>>>> =
Lazy::new(|| Default::default());

/// Creates an external reference to a Rust value and
/// makes it transferable across Nodejs workers
///
/// This is to get around the limitations of what can be transferred
/// between workers in Nodejs
///
/// https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Transferable_objects
pub struct JsTransferable<T> {
id: i32,
_value: PhantomData<T>,
}

impl<T: Send + Sync + 'static> JsTransferable<T> {
/// Put a Rust value into a Transferable container to allow
/// sending values to Nodejs workers via postMessage or workerData
pub fn new(value: T) -> Self {
let id = COUNTER.fetch_add(1, Ordering::Relaxed);

VALUES.lock().unwrap().insert(id.clone(), Box::new(value));
Self {
id,
_value: Default::default(),
}
}

/// Take the value out of Transferable, so it can no longer be accessed
pub fn take(self) -> napi::Result<T> {
let Some(value) = VALUES.lock().unwrap().remove(&self.id) else {
return Err(napi::Error::from_reason(format!(
"JsTransferableError::NotExists: id({})",
self.id
)));
};
let Ok(val) = value.downcast::<T>() else {
return Err(napi::Error::from_reason(format!(
"JsTransferableError::InvalidDowncast: id({}) type({})",
self.id,
any::type_name::<T>()
)));
};
Ok(*val)
}
}

/// Allows Transferable to be returned from a Napi functions
impl<T> ToNapiValue for JsTransferable<T> {
unsafe fn to_napi_value(
env: napi::sys::napi_env,
val: Self,
) -> napi::Result<napi::sys::napi_value> {
let env = Env::from_raw(env);
let pointer = env.create_int32(val.id.clone())?;
Ok(pointer.raw())
}
}

/// Allows Transferable to be accepted as an argument for a Napi function
impl<T> FromNapiValue for JsTransferable<T> {
unsafe fn from_napi_value(
env: napi::sys::napi_env,
napi_val: napi::sys::napi_value,
) -> napi::Result<Self> {
let pointer = JsNumber::from_napi_value(env, napi_val)?;
let id = pointer.get_int32()?;
Ok(Self {
id,
_value: Default::default(),
})
}
}
1 change: 0 additions & 1 deletion crates/parcel_plugin_rpc/src/nodejs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
mod rpc_conn_nodejs;
mod rpc_conns_nodejs;
mod rpc_host_nodejs;
mod worker_init;

pub use rpc_conn_nodejs::*;
pub use rpc_conns_nodejs::*;
Expand Down
6 changes: 0 additions & 6 deletions crates/parcel_plugin_rpc/src/nodejs/rpc_conn_nodejs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ use parcel_napi_helpers::js_callable::JsCallable;

use crate::RpcWorker;

use super::worker_init::register_worker;

/// RpcConnectionNodejs wraps the communication with a
/// single Nodejs worker thread
pub struct NodejsWorker {
Expand All @@ -18,10 +16,6 @@ impl NodejsWorker {
ping_fn: JsCallable::new_from_object_prop_bound("ping", &delegate)?,
})
}

pub fn register_worker(worker: Self) {
register_worker(worker)
}
}

impl RpcWorker for NodejsWorker {
Expand Down
20 changes: 16 additions & 4 deletions crates/parcel_plugin_rpc/src/nodejs/rpc_host_nodejs.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,40 @@
use std::sync::mpsc::Receiver;
use std::sync::Arc;

use anyhow::anyhow;
use parking_lot::Mutex;

use crate::RpcHost;
use crate::RpcWorkerRef;

use super::worker_init::get_worker;
use super::NodejsWorker;
use super::NodejsWorkerFarm;

pub struct RpcHostNodejs {
node_workers: usize,
rx_worker: Mutex<Receiver<NodejsWorker>>,
}

impl RpcHostNodejs {
pub fn new(node_workers: usize) -> napi::Result<Self> {
Ok(Self { node_workers })
pub fn new(node_workers: usize, rx_worker: Receiver<NodejsWorker>) -> napi::Result<Self> {
Ok(Self {
node_workers,
rx_worker: Mutex::new(rx_worker),
})
}
}

// Forward events to Nodejs
impl RpcHost for RpcHostNodejs {
fn start(&self) -> anyhow::Result<RpcWorkerRef> {
let rx_worker = self.rx_worker.lock();
let mut connections = vec![];

for _ in 0..self.node_workers {
connections.push(get_worker())
let Ok(worker) = rx_worker.recv() else {
return Err(anyhow!("Unable to receive NodejsWorker"));
};
connections.push(worker)
}

Ok(Arc::new(NodejsWorkerFarm::new(connections)))
Expand Down
55 changes: 0 additions & 55 deletions crates/parcel_plugin_rpc/src/nodejs/worker_init.rs

This file was deleted.

17 changes: 12 additions & 5 deletions packages/core/core/src/parcel-v3/ParcelV3.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import path from 'path';
import {Worker} from 'worker_threads';
import {ParcelNapi, type ParcelNapiOptions} from '@parcel/rust';

const WORKER_PATH = path.join(__dirname, 'worker', 'index.js');

export type ParcelV3Options = {|
fs?: ParcelNapiOptions['fs'],
nodeWorkers?: number,
Expand Down Expand Up @@ -37,11 +39,16 @@ export class ParcelV3 {
async build(): Promise<any> {
const workers = [];

for (let i = 0; i < this._internal.nodeWorkerCount; i++) {
workers.push(new Worker(path.join(__dirname, 'worker', 'index.js')));
}

let result = await this._internal.build();
let result = await this._internal.build({
registerWorker: tx_worker => {
let worker = new Worker(WORKER_PATH, {
workerData: {
tx_worker,
},
});
workers.push(worker);
},
});

for (const worker of workers) worker.terminate();
return result;
Expand Down
Loading

0 comments on commit 8cc5c8f

Please sign in to comment.