Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 46 additions & 10 deletions packages/cubejs-backend-native/src/python/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ enum PyScheduledFunResult {

pub struct PyRuntime {
sender: tokio::sync::mpsc::Sender<PyScheduledFun>,
js_channel: neon::event::Channel,
}

impl PyRuntime {
Expand All @@ -55,16 +56,49 @@ impl PyRuntime {
args: Vec<CLRepr>,
deferred: Deferred,
) {
let res = self.sender.blocking_send(PyScheduledFun {
// Try to reserve immediately for the fast path
let permit = match self.sender.try_reserve() {
Ok(permit) => permit,
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
log::warn!("Python channel is full, this may cause performance issues. Consider increasing the channel size for PyRuntime.");

// Channel is full, use async reserve with blocking for efficiency
match futures::executor::block_on(self.sender.reserve()) {
Ok(permit) => permit,
Err(_) => {
// Channel was closed while waiting
deferred.settle_with(
&self.js_channel,
move |mut cx| -> NeonResult<Handle<JsError>> {
cx.throw_error(
"Unable to schedule python function call: channel is closed",
)
},
);
return;
}
}
}
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
// Channel is closed, settle deferred with error
deferred.settle_with(
&self.js_channel,
move |mut cx| -> NeonResult<Handle<JsError>> {
cx.throw_error("Unable to schedule python function call: channel is closed")
},
);
return;
}
};

let scheduled_fun = PyScheduledFun {
fun,
args,
callback: PyScheduledCallback::NodeDeferred(deferred),
});
if let Err(err) = res {
// TODO: We need to return this error to deferred, but for now
// neon will handle this issue on Drop
error!("Unable to schedule python function call: {}", err)
}
};

// This should never fail since we have a permit
permit.send(scheduled_fun);
}

pub async fn call_async(
Expand Down Expand Up @@ -202,7 +236,9 @@ impl PyRuntime {

trace!("New Python runtime");

std::thread::spawn(|| {
let js_channel_clone = js_channel.clone();

std::thread::spawn(move || {
trace!("Initializing executor in a separate thread");

std::thread::spawn(|| {
Expand All @@ -216,7 +252,7 @@ impl PyRuntime {
if let Some(task) = receiver.recv().await {
trace!("New task");

if let Err(err) = Self::process_task(task, &js_channel) {
if let Err(err) = Self::process_task(task, &js_channel_clone) {
error!("Error while processing python task: {:?}", err)
};
}
Expand All @@ -229,7 +265,7 @@ impl PyRuntime {
}
});

Self { sender }
Self { sender, js_channel }
}
}

Expand Down
Loading