diff --git a/README.md b/README.md index 73fb798..0d3a8d2 100644 --- a/README.md +++ b/README.md @@ -160,6 +160,54 @@ async def run_batch(scylla: Scylla, num_queries: int) -> None: await scylla.batch(batch, [{"id": 1}]) # Will rase an error! ``` +## Pagination + +Sometimes you want to query lots of data. For such cases it's better not to +fetch all results at once, but fetch them using pagination. It reduces load +not only on your application, but also on a cluster. + +To execute query with pagination, simply add `paged=True` in execute method. +After doing so, `execute` method will return `IterableQueryResult`, instead of `QueryResult`. +Instances of `IterableQueryResult` can be iterated with `async for` statements. +You, as a client, won't see any information about pages, it's all handeled internally within a driver. + +Please note, that paginated queries are slower to fetch all rows, but much more memory efficent for large datasets. + +```python + result = await scylla.execute("SELECT * FROM table", paged=True) + async for row in result: + print(row) + +``` + +Of course, you can change how results returned to you, by either using `scalars` or +`as_cls`. For example: + +```python +async def func(scylla: Scylla) -> None: + rows = await scylla.execute("SELECT id FROM table", paged=True) + # Will print ids of each returned row. + async for test_id in rows.scalars(): + print(test_id) + +``` + +```python +from dataclasses import dataclass + +@dataclass +class MyDTO: + id: int + val: int + +async def func(scylla: Scylla) -> None: + rows = await scylla.execute("SELECT * FROM table", paged=True) + # Will print ids of each returned row. + async for my_dto in rows.as_cls(MyDTO): + print(my_dto.id, my_dto.val) + +``` + ## Execution profiles You can define profiles using `ExecutionProfile` class. After that the @@ -167,19 +215,36 @@ profile can be used while creating a cluster or when defining queries. ```python from scyllapy import Consistency, ExecutionProfile, Query, Scylla, SerialConsistency +from scyllapy.load_balancing import LoadBalancingPolicy, LatencyAwareness default_profile = ExecutionProfile( consistency=Consistency.LOCAL_QUORUM, serial_consistency=SerialConsistency.LOCAL_SERIAL, request_timeout=2, ) -query_profile = ExecutionProfile( - consistency=Consistency.ALL, - serial_consistency=SerialConsistency.SERIAL, -) - async def main(): + query_profile = ExecutionProfile( + consistency=Consistency.ALL, + serial_consistency=SerialConsistency.SERIAL, + # Load balancing cannot be constructed without running event loop. + # If you won't do it inside async funcion, it will result in error. + load_balancing_policy=await LoadBalancingPolicy.build( + token_aware=True, + prefer_rack="rack1", + prefer_datacenter="dc1", + permit_dc_failover=True, + shuffling_replicas=True, + latency_awareness=LatencyAwareness( + minimum_measurements=10, + retry_period=1000, + exclusion_threshold=1.4, + update_rate=1000, + scale=2, + ), + ), + ) + scylla = Scylla( ["192.168.32.4"], default_execution_profile=default_profile, @@ -324,4 +389,17 @@ async def execute_batch(scylla: Scylla) -> None: Insert("users").set("id", i).set("name", "test").add_to_batch(batch) await scylla.batch(batch) +``` + +## Paging + +Queries that were built with QueryBuilder also support paged returns. +But it supported only for select, because update, delete and insert should +not return anything and it makes no sense implementing it. +To make built `Select` query return paginated iterator, add paged parameter in execute method. + +```python + rows = await Select("test").execute(scylla, paged=True) + async for row in rows: + print(row['id']) ``` \ No newline at end of file diff --git a/python/scyllapy/_internal/load_balancing.pyi b/python/scyllapy/_internal/load_balancing.pyi index cadc9d7..ac78df4 100644 --- a/python/scyllapy/_internal/load_balancing.pyi +++ b/python/scyllapy/_internal/load_balancing.pyi @@ -31,23 +31,29 @@ class LoadBalancingPolicy: Can be applied to profiles. """ - def __init__( - self, + @classmethod + async def build( + cls, *, token_aware: bool | None = None, prefer_rack: str | None = None, prefer_datacenter: str | None = None, permit_dc_failover: bool | None = None, shuffling_replicas: bool | None = None, - latency_awareness: LatencyAwareness | None = None, - ) -> None: ... + ) -> LoadBalancingPolicy: ... """ Construct load balancing policy. + It requires to be async, becausse it needs to start a background task. + :param token_aware: Whether to use token aware routing. :param prefer_rack: Name of the rack to prefer. :param prefer_datacenter: Name of the datacenter to prefer. :param permit_dc_failover: Whether to allow datacenter failover. :param shuffling_replicas: Whether to shuffle replicas. - :param latency_awareness: Latency awareness policy. """ + + async def with_latency_awareness( + self, + latency_awareness: LatencyAwareness, + ) -> None: ... diff --git a/src/load_balancing.rs b/src/load_balancing.rs index ea69a21..6fd30d0 100644 --- a/src/load_balancing.rs +++ b/src/load_balancing.rs @@ -1,9 +1,15 @@ use std::sync::Arc; -use pyo3::{pyclass, pymethods, types::PyModule, PyResult, Python}; +use pyo3::{ + pyclass, pymethods, + types::{PyModule, PyType}, + PyAny, PyResult, Python, +}; use scylla::load_balancing::{DefaultPolicy, LatencyAwarenessBuilder, LoadBalancingPolicy}; use std::time::Duration; +use crate::{exceptions::rust_err::ScyllaPyResult, utils::scyllapy_future}; + #[pyclass(name = "LoadBalancingPolicy")] #[derive(Clone, Debug)] pub struct ScyllaPyLoadBalancingPolicy { @@ -12,7 +18,7 @@ pub struct ScyllaPyLoadBalancingPolicy { #[pymethods] impl ScyllaPyLoadBalancingPolicy { - #[new] + #[classmethod] #[pyo3(signature = ( *, token_aware = None, @@ -23,36 +29,39 @@ impl ScyllaPyLoadBalancingPolicy { latency_awareness = None, ) )] - fn new( + fn build( + cls: &PyType, token_aware: Option, prefer_rack: Option, prefer_datacenter: Option, permit_dc_failover: Option, shuffling_replicas: Option, latency_awareness: Option, - ) -> Self { - let mut policy_builer = DefaultPolicy::builder(); - if let Some(permit) = permit_dc_failover { - policy_builer = policy_builer.permit_dc_failover(permit); - } - if let Some(token) = token_aware { - policy_builer = policy_builer.token_aware(token); - } - if let Some(rack) = prefer_rack { - policy_builer = policy_builer.prefer_rack(rack); - } - if let Some(dc) = prefer_datacenter { - policy_builer = policy_builer.prefer_datacenter(dc); - } - if let Some(shufle) = shuffling_replicas { - policy_builer = policy_builer.enable_shuffling_replicas(shufle); - } - if let Some(latency_awareness) = latency_awareness { - policy_builer = policy_builer.latency_awareness(latency_awareness.into()); - } - Self { - inner: policy_builer.build(), - } + ) -> ScyllaPyResult<&PyAny> { + scyllapy_future(cls.py(), async move { + let mut policy_builer = DefaultPolicy::builder(); + if let Some(permit) = permit_dc_failover { + policy_builer = policy_builer.permit_dc_failover(permit); + } + if let Some(token) = token_aware { + policy_builer = policy_builer.token_aware(token); + } + if let Some(rack) = prefer_rack { + policy_builer = policy_builer.prefer_rack(rack); + } + if let Some(dc) = prefer_datacenter { + policy_builer = policy_builer.prefer_datacenter(dc); + } + if let Some(shufle) = shuffling_replicas { + policy_builer = policy_builer.enable_shuffling_replicas(shufle); + } + if let Some(latency_awareness) = latency_awareness { + policy_builer = policy_builer.latency_awareness(latency_awareness.into()); + } + Ok(Self { + inner: policy_builer.build(), + }) + }) } }