diff --git a/packages/cubejs-backend-native/src/python/runtime.rs b/packages/cubejs-backend-native/src/python/runtime.rs index 3a0308243a33e..ef62860f4d2d2 100644 --- a/packages/cubejs-backend-native/src/python/runtime.rs +++ b/packages/cubejs-backend-native/src/python/runtime.rs @@ -46,6 +46,7 @@ enum PyScheduledFunResult { pub struct PyRuntime { sender: tokio::sync::mpsc::Sender, + js_channel: neon::event::Channel, } impl PyRuntime { @@ -55,16 +56,49 @@ impl PyRuntime { args: Vec, deferred: Deferred, ) { - let res = self.sender.blocking_send(PyScheduledFun { + // Try to reserve immediately for the fast path + let permit = match self.sender.try_reserve() { + Ok(permit) => permit, + Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { + log::warn!("Python channel is full, this may cause performance issues. Consider increasing the channel size for PyRuntime."); + + // Channel is full, use async reserve with blocking for efficiency + match futures::executor::block_on(self.sender.reserve()) { + Ok(permit) => permit, + Err(_) => { + // Channel was closed while waiting + deferred.settle_with( + &self.js_channel, + move |mut cx| -> NeonResult> { + cx.throw_error( + "Unable to schedule python function call: channel is closed", + ) + }, + ); + return; + } + } + } + Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { + // Channel is closed, settle deferred with error + deferred.settle_with( + &self.js_channel, + move |mut cx| -> NeonResult> { + cx.throw_error("Unable to schedule python function call: channel is closed") + }, + ); + return; + } + }; + + let scheduled_fun = PyScheduledFun { fun, args, callback: PyScheduledCallback::NodeDeferred(deferred), - }); - if let Err(err) = res { - // TODO: We need to return this error to deferred, but for now - // neon will handle this issue on Drop - error!("Unable to schedule python function call: {}", err) - } + }; + + // This should never fail since we have a permit + permit.send(scheduled_fun); } pub async fn call_async( @@ -202,7 +236,9 @@ impl PyRuntime { trace!("New Python runtime"); - std::thread::spawn(|| { + let js_channel_clone = js_channel.clone(); + + std::thread::spawn(move || { trace!("Initializing executor in a separate thread"); std::thread::spawn(|| { @@ -216,7 +252,7 @@ impl PyRuntime { if let Some(task) = receiver.recv().await { trace!("New task"); - if let Err(err) = Self::process_task(task, &js_channel) { + if let Err(err) = Self::process_task(task, &js_channel_clone) { error!("Error while processing python task: {:?}", err) }; } @@ -229,7 +265,7 @@ impl PyRuntime { } }); - Self { sender } + Self { sender, js_channel } } }