Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Improved support for KeyboardInterrupts #20961

Merged
merged 13 commits into from
Jan 29, 2025
Merged
Prev Previous commit
Next Next commit
wip
orlp committed Jan 28, 2025
commit bf97a8414559f77f337a9e66428b25ca24f686a0
27 changes: 11 additions & 16 deletions crates/polars-python/src/catalog/mod.rs
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ use pyo3::{pyclass, pymethods, Bound, PyObject, PyResult, Python};

use crate::lazyframe::PyLazyFrame;
use crate::prelude::parse_cloud_options;
use crate::utils::to_py_err;
use crate::utils::{to_py_err, EnterPolarsExt};

macro_rules! pydict_insert_keys {
($dict:expr, {$a:expr}) => {
@@ -47,10 +47,9 @@ impl PyCatalogClient {

pub fn list_catalogs(&self, py: Python) -> PyResult<PyObject> {
let v = py
.allow_threads(|| {
.enter_polars(|| {
pl_async::get_runtime().block_on_potential_spawn(self.client().list_catalogs())
})
.map_err(to_py_err)?;
})?;

PyList::new(
py,
@@ -71,11 +70,10 @@ impl PyCatalogClient {
#[pyo3(signature = (catalog_name))]
pub fn list_schemas(&self, py: Python, catalog_name: &str) -> PyResult<PyObject> {
let v = py
.allow_threads(|| {
.enter_polars(|| {
pl_async::get_runtime()
.block_on_potential_spawn(self.client().list_schemas(catalog_name))
})
.map_err(to_py_err)?;
})?;

PyList::new(
py,
@@ -101,11 +99,10 @@ impl PyCatalogClient {
schema_name: &str,
) -> PyResult<PyObject> {
let v = py
.allow_threads(|| {
.enter_polars(|| {
pl_async::get_runtime()
.block_on_potential_spawn(self.client().list_tables(catalog_name, schema_name))
})
.map_err(to_py_err)?;
})?;

PyList::new(
py,
@@ -124,14 +121,13 @@ impl PyCatalogClient {
table_name: &str,
) -> PyResult<PyObject> {
let table_entry = py
.allow_threads(|| {
.enter_polars(|| {
pl_async::get_runtime().block_on_potential_spawn(self.client().get_table_info(
catalog_name,
schema_name,
table_name,
))
})
.map_err(to_py_err)?;
})?;

Ok(table_entry_to_pydict(py, table_entry).into())
}
@@ -148,14 +144,13 @@ impl PyCatalogClient {
retries: usize,
) -> PyResult<PyLazyFrame> {
let table_info = py
.allow_threads(|| {
.enter_polars(|| {
pl_async::get_runtime().block_on_potential_spawn(self.client().get_table_info(
catalog_name,
schema_name,
table_name,
))
})
.map_err(to_py_err)?;
})?;

let Some(storage_location) = table_info.storage_location.as_deref() else {
return Err(PyValueError::new_err(
5 changes: 3 additions & 2 deletions crates/polars-python/src/dataframe/construction.rs
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ use super::PyDataFrame;
use crate::conversion::any_value::py_object_to_any_value;
use crate::conversion::{vec_extract_wrapped, Wrap};
use crate::error::PyPolarsErr;
use crate::utils::EnterPolarsExt;
use crate::interop;

#[pymethods]
@@ -21,7 +22,7 @@ impl PyDataFrame {
) -> PyResult<Self> {
let data = vec_extract_wrapped(data);
let schema = schema.map(|wrap| wrap.0);
py.allow_threads(move || finish_from_rows(data, schema, None, infer_schema_length))
py.enter_polars(move || finish_from_rows(data, schema, None, infer_schema_length))
}

#[staticmethod]
@@ -46,7 +47,7 @@ impl PyDataFrame {
))
});

py.allow_threads(move || {
py.enter_polars(move || {
finish_from_rows(rows, schema, schema_overrides, infer_schema_length)
})
}
7 changes: 4 additions & 3 deletions crates/polars-python/src/dataframe/export.rs
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ use pyo3::IntoPyObjectExt;
use super::PyDataFrame;
use crate::conversion::{ObjectValue, Wrap};
use crate::error::PyPolarsErr;
use crate::utils::EnterPolarsExt;
use crate::interop;
use crate::interop::arrow::to_py::dataframe_to_stream;
use crate::prelude::PyCompatLevel;
@@ -74,7 +75,7 @@ impl PyDataFrame {

#[allow(clippy::wrong_self_convention)]
pub fn to_arrow(&mut self, py: Python, compat_level: PyCompatLevel) -> PyResult<Vec<PyObject>> {
py.allow_threads(|| self.df.align_chunks_par());
py.enter_polars_ok(|| self.df.align_chunks_par())?;
let pyarrow = py.import("pyarrow")?;

let rbs = self
@@ -92,7 +93,7 @@ impl PyDataFrame {
/// code should make sure these are not included.
#[allow(clippy::wrong_self_convention)]
pub fn to_pandas(&mut self, py: Python) -> PyResult<Vec<PyObject>> {
py.allow_threads(|| self.df.as_single_chunk_par());
py.enter_polars_ok(|| self.df.as_single_chunk_par())?;
Python::with_gil(|py| {
let pyarrow = py.import("pyarrow")?;
let cat_columns = self
@@ -163,7 +164,7 @@ impl PyDataFrame {
py: Python<'py>,
requested_schema: Option<PyObject>,
) -> PyResult<Bound<'py, PyCapsule>> {
py.allow_threads(|| self.df.align_chunks_par());
py.enter_polars_ok(|| self.df.align_chunks_par())?;
dataframe_to_stream(&self.df, py)
}
}
Loading