Skip to content

Commit

Permalink
fix(bindings/python): global runtime for blocking row iterator (#347)
Browse files Browse the repository at this point in the history
  • Loading branch information
everpcpc authored Feb 1, 2024
1 parent 6fb16c9 commit c24fcbd
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 21 deletions.
16 changes: 1 addition & 15 deletions bindings/python/src/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,7 @@
use pyo3::prelude::*;

use crate::types::{ConnectionInfo, DriverError, Row, RowIterator, ServerStats, VERSION};

#[ctor::ctor]
static RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

/// Utility to collect rust futures with GIL released
fn wait_for_future<F: std::future::Future>(py: Python, f: F) -> F::Output
where
F: Send,
F::Output: Send,
{
py.allow_threads(|| RUNTIME.block_on(f))
}
use crate::utils::wait_for_future;

#[pyclass(module = "databend_driver")]
pub struct BlockingDatabendClient(databend_driver::Client);
Expand Down
1 change: 1 addition & 0 deletions bindings/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
mod asyncio;
mod blocking;
mod types;
mod utils;

use pyo3::prelude::*;

Expand Down
12 changes: 6 additions & 6 deletions bindings/python/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use pyo3_asyncio::tokio::future_into_py;
use tokio::sync::Mutex;
use tokio_stream::StreamExt;

use crate::utils::wait_for_future;

pub static VERSION: Lazy<String> = Lazy::new(|| {
let version = option_env!("CARGO_PKG_VERSION").unwrap_or("unknown");
version.to_string()
Expand Down Expand Up @@ -136,10 +138,9 @@ impl RowIterator {

#[pymethods]
impl RowIterator {
fn schema<'p>(&self) -> PyResult<Schema> {
fn schema<'p>(&self, py: Python) -> PyResult<Schema> {
let streamer = self.0.clone();
let rt = tokio::runtime::Runtime::new()?;
let ret = rt.block_on(async move {
let ret = wait_for_future(py, async move {
let schema = streamer.lock().await.schema();
schema
});
Expand All @@ -149,10 +150,9 @@ impl RowIterator {
fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
slf
}
fn __next__(&self) -> PyResult<Option<Row>> {
fn __next__(&self, py: Python) -> PyResult<Option<Row>> {
let streamer = self.0.clone();
let rt = tokio::runtime::Runtime::new()?;
let ret = rt.block_on(async move {
let ret = wait_for_future(py, async move {
match streamer.lock().await.next().await {
Some(val) => match val {
Err(e) => Err(PyException::new_err(format!("{}", e))),
Expand Down
30 changes: 30 additions & 0 deletions bindings/python/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pyo3::prelude::*;

#[ctor::ctor]
pub(crate) static RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

/// Utility to collect rust futures with GIL released
pub(crate) fn wait_for_future<F: std::future::Future>(py: Python, f: F) -> F::Output
where
F: Send,
F::Output: Send,
{
py.allow_threads(|| RUNTIME.block_on(f))
}

0 comments on commit c24fcbd

Please sign in to comment.