Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Improved support for KeyboardInterrupts #20961

Merged
merged 13 commits into from
Jan 29, 2025
Merged
Prev Previous commit
Next Next commit
wip
orlp committed Jan 28, 2025
commit 2ac95ad3ee235cf2b0293761ed1d44511545da93
2 changes: 1 addition & 1 deletion crates/polars-error/src/signals.rs
Original file line number Diff line number Diff line change
@@ -84,7 +84,7 @@ fn unregister_catcher() {
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |state| {
let num_catchers = state >> 1;
if num_catchers > 1 {
Some(num_catchers - 2)
Some(state - 2)
} else {
// Last catcher, clear interrupt flag.
Some(0)
6 changes: 3 additions & 3 deletions crates/polars-python/src/batched_csv.rs
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ use pyo3::pybacked::PyBackedStr;

use crate::error::PyPolarsErr;
use crate::{PyDataFrame, Wrap};
use crate::utils::EnterPolarsExt;

#[pyclass]
#[repr(transparent)]
@@ -136,12 +137,11 @@ impl PyBatchedCsv {

fn next_batches(&self, py: Python, n: usize) -> PyResult<Option<Vec<PyDataFrame>>> {
let reader = &self.reader;
let batches = py.allow_threads(move || {
let batches = py.enter_polars(move || {
reader
.lock()
.map_err(|e| PyPolarsErr::Other(e.to_string()))?
.unwrap()
.next_batches(n)
.map_err(PyPolarsErr::from)
})?;

// SAFETY: same memory layout
5 changes: 2 additions & 3 deletions crates/polars-python/src/cloud.rs
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ use pyo3::types::{IntoPyDict, PyBytes};
use crate::error::PyPolarsErr;
use crate::lazyframe::visit::NodeTraverser;
use crate::{PyDataFrame, PyLazyFrame};
use crate::utils::EnterPolarsExt;

#[pyfunction]
pub fn prepare_cloud_plan(lf: PyLazyFrame, py: Python<'_>) -> PyResult<Bound<'_, PyBytes>> {
@@ -46,9 +47,7 @@ pub fn _execute_ir_plan_with_gpu(ir_plan_ser: Vec<u8>, py: Python) -> PyResult<P

// Execute the plan.
let mut state = ExecutionState::new();
let df = py.allow_threads(|| physical_plan.execute(&mut state).map_err(PyPolarsErr::from))?;

Ok(df.into())
py.enter_polars_df(|| physical_plan.execute(&mut state))
}

/// Prepare the IR for execution by the Polars GPU engine.
14 changes: 6 additions & 8 deletions crates/polars-python/src/dataframe/general.rs
Original file line number Diff line number Diff line change
@@ -298,11 +298,11 @@ impl PyDataFrame {
}

pub fn is_unique(&self, py: Python) -> PyResult<PySeries> {
py.enter_polars_series(|| Ok(self.df.is_unique()?.into_series()))
py.enter_polars_series(|| self.df.is_unique())
}

pub fn is_duplicated(&self, py: Python) -> PyResult<PySeries> {
py.enter_polars_series(|| Ok(self.df.is_duplicated()?.into_series()))
py.enter_polars_series(|| self.df.is_duplicated())
}

pub fn equals(&self, py: Python, other: &PyDataFrame, null_equal: bool) -> PyResult<bool> {
@@ -515,7 +515,7 @@ impl PyDataFrame {
k3: u64,
) -> PyResult<PySeries> {
let hb = PlRandomState::with_seeds(k0, k1, k2, k3);
py.enter_polars_series(|| Ok(self.df.hash_rows(Some(hb))?.into_series()))
py.enter_polars_series(|| self.df.hash_rows(Some(hb)))
}

#[pyo3(signature = (keep_names_as, column_names))]
@@ -569,11 +569,9 @@ impl PyDataFrame {
validity.set(i, false);
}
let ca = ca.rechunk();
Ok(ca
.with_outer_validity(Some(validity.freeze()))
.into_series())
Ok(ca.with_outer_validity(Some(validity.freeze())))
} else {
Ok(ca.into_series())
Ok(ca)
}
})
}
@@ -620,7 +618,7 @@ impl PyDataFrame {
)
}?;

Ok(ca.into_series())
Ok(ca)
})
}
}
126 changes: 40 additions & 86 deletions crates/polars-python/src/series/aggregation.rs
Original file line number Diff line number Diff line change
@@ -7,6 +7,10 @@ use crate::conversion::Wrap;
use crate::error::PyPolarsErr;
use crate::utils::EnterPolarsExt;

fn scalar_to_py<'py>(scalar: PyResult<Scalar>, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
Ok(Wrap(scalar?.as_any_value()).into_pyobject(py)?)
}

#[pymethods]
impl PySeries {
fn any(&self, py: Python, ignore_nulls: bool) -> PyResult<Option<bool>> {
@@ -39,70 +43,44 @@ impl PySeries {
py.enter_polars_ok(|| self.series.arg_min())
}

fn min<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
scalar_to_py(py.enter_polars(|| self.series.min_reduce()), py)
}

fn max<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
Wrap(
py.allow_threads(|| self.series.max_reduce().map_err(PyPolarsErr::from))?
.as_any_value(),
)
.into_pyobject(py)
scalar_to_py(py.enter_polars(|| self.series.max_reduce()), py)
}

fn mean<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
match self.series.dtype() {
Boolean => Wrap(
py.allow_threads(|| self.series.cast(&DataType::UInt8).unwrap().mean_reduce())
.as_any_value(),
)
.into_pyobject(py),
Boolean => scalar_to_py(
py.enter_polars_ok(|| self.series.cast(&DataType::UInt8).unwrap().mean_reduce()),
py,
),
// For non-numeric output types we require mean_reduce.
dt if dt.is_temporal() => Wrap(
py.allow_threads(|| self.series.mean_reduce())
.as_any_value(),
)
.into_pyobject(py),
_ => py
.allow_threads(|| self.series.mean())
.into_pyobject(py)
.map_err(Into::into),
dt if dt.is_temporal() => {
scalar_to_py(py.enter_polars_ok(|| self.series.mean_reduce()), py)
},
_ => Ok(self.series.mean().into_pyobject(py)?),
}
}

fn median<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
match self.series.dtype() {
Boolean => Wrap(
py.allow_threads(|| self.series.cast(&DataType::UInt8).unwrap().median_reduce())
.map_err(PyPolarsErr::from)?
.as_any_value(),
)
.into_pyobject(py),
Boolean => scalar_to_py(
py.enter_polars(|| self.series.cast(&DataType::UInt8).unwrap().median_reduce()),
py,
),
// For non-numeric output types we require median_reduce.
dt if dt.is_temporal() => Wrap(
py.allow_threads(|| self.series.median_reduce())
.map_err(PyPolarsErr::from)?
.as_any_value(),
)
.into_pyobject(py),
_ => py
.allow_threads(|| self.series.median())
.into_pyobject(py)
.map_err(Into::into),
dt if dt.is_temporal() => {
scalar_to_py(py.enter_polars(|| self.series.median_reduce()), py)
},
_ => Ok(self.series.median().into_pyobject(py)?),
}
}

fn min<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
Wrap(
py.allow_threads(|| self.series.min_reduce().map_err(PyPolarsErr::from))?
.as_any_value(),
)
.into_pyobject(py)
}

fn product<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
Wrap(
py.allow_threads(|| self.series.product().map_err(PyPolarsErr::from))?
.as_any_value(),
)
.into_pyobject(py)
scalar_to_py(py.enter_polars(|| self.series.product()), py)
}

fn quantile<'py>(
@@ -111,73 +89,49 @@ impl PySeries {
quantile: f64,
interpolation: Wrap<QuantileMethod>,
) -> PyResult<Bound<'py, PyAny>> {
let bind = py.allow_threads(|| self.series.quantile_reduce(quantile, interpolation.0));
let sc = bind.map_err(PyPolarsErr::from)?;

Wrap(sc.as_any_value()).into_pyobject(py)
scalar_to_py(
py.enter_polars(|| self.series.quantile_reduce(quantile, interpolation.0)),
py,
)
}

fn std<'py>(&self, py: Python<'py>, ddof: u8) -> PyResult<Bound<'py, PyAny>> {
Wrap(
py.allow_threads(|| self.series.std_reduce(ddof).map_err(PyPolarsErr::from))?
.as_any_value(),
)
.into_pyobject(py)
scalar_to_py(py.enter_polars(|| self.series.std_reduce(ddof)), py)
}

fn var<'py>(&self, py: Python<'py>, ddof: u8) -> PyResult<Bound<'py, PyAny>> {
Wrap(
py.allow_threads(|| self.series.var_reduce(ddof).map_err(PyPolarsErr::from))?
.as_any_value(),
)
.into_pyobject(py)
scalar_to_py(py.enter_polars(|| self.series.var_reduce(ddof)), py)
}

fn sum<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
Wrap(
py.allow_threads(|| self.series.sum_reduce().map_err(PyPolarsErr::from))?
.as_any_value(),
)
.into_pyobject(py)
scalar_to_py(py.enter_polars(|| self.series.sum_reduce()), py)
}

fn first<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
Wrap(py.allow_threads(|| self.series.first()).as_any_value()).into_pyobject(py)
scalar_to_py(py.enter_polars_ok(|| self.series.first()), py)
}

fn last<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
Wrap(py.allow_threads(|| self.series.last()).as_any_value()).into_pyobject(py)
scalar_to_py(py.enter_polars_ok(|| self.series.last()), py)
}

#[cfg(feature = "approx_unique")]
fn approx_n_unique(&self, py: Python) -> Result<IdxSize, PyPolarsErr> {
py.allow_threads(|| self.series.approx_n_unique().map_err(PyPolarsErr::from))
fn approx_n_unique(&self, py: Python) -> PyResult<IdxSize> {
py.enter_polars(|| self.series.approx_n_unique())
}

#[cfg(feature = "bitwise")]
fn bitwise_and<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
Wrap(
py.allow_threads(|| self.series.and_reduce().map_err(PyPolarsErr::from))?
.as_any_value(),
)
.into_pyobject(py)
scalar_to_py(py.enter_polars(|| self.series.and_reduce()), py)
}

#[cfg(feature = "bitwise")]
fn bitwise_or<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
Wrap(
py.allow_threads(|| self.series.or_reduce().map_err(PyPolarsErr::from))?
.as_any_value(),
)
.into_pyobject(py)
scalar_to_py(py.enter_polars(|| self.series.or_reduce()), py)
}

#[cfg(feature = "bitwise")]
fn bitwise_xor<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
Wrap(
py.allow_threads(|| self.series.xor_reduce().map_err(PyPolarsErr::from))?
.as_any_value(),
)
.into_pyobject(py)
scalar_to_py(py.enter_polars(|| self.series.xor_reduce()), py)
}
}
Loading