Skip to content

Commit

Permalink
Merge pull request #31 from Intreecom/feature/docs-update
Browse files Browse the repository at this point in the history
  • Loading branch information
s3rius authored Sep 25, 2023
2 parents 59bb6b6 + 6e33675 commit 4b52042
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 36 deletions.
88 changes: 83 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,26 +160,91 @@ async def run_batch(scylla: Scylla, num_queries: int) -> None:
await scylla.batch(batch, [{"id": 1}]) # Will rase an error!
```

## Pagination

Sometimes you want to query lots of data. For such cases it's better not to
fetch all results at once, but fetch them using pagination. It reduces load
not only on your application, but also on a cluster.

To execute query with pagination, simply add `paged=True` in execute method.
After doing so, `execute` method will return `IterableQueryResult`, instead of `QueryResult`.
Instances of `IterableQueryResult` can be iterated with `async for` statements.
You, as a client, won't see any information about pages, it's all handeled internally within a driver.

Please note, that paginated queries are slower to fetch all rows, but much more memory efficent for large datasets.

```python
result = await scylla.execute("SELECT * FROM table", paged=True)
async for row in result:
print(row)

```

Of course, you can change how results returned to you, by either using `scalars` or
`as_cls`. For example:

```python
async def func(scylla: Scylla) -> None:
rows = await scylla.execute("SELECT id FROM table", paged=True)
# Will print ids of each returned row.
async for test_id in rows.scalars():
print(test_id)

```

```python
from dataclasses import dataclass

@dataclass
class MyDTO:
id: int
val: int

async def func(scylla: Scylla) -> None:
rows = await scylla.execute("SELECT * FROM table", paged=True)
# Will print ids of each returned row.
async for my_dto in rows.as_cls(MyDTO):
print(my_dto.id, my_dto.val)

```

## Execution profiles

You can define profiles using `ExecutionProfile` class. After that the
profile can be used while creating a cluster or when defining queries.

```python
from scyllapy import Consistency, ExecutionProfile, Query, Scylla, SerialConsistency
from scyllapy.load_balancing import LoadBalancingPolicy, LatencyAwareness

default_profile = ExecutionProfile(
consistency=Consistency.LOCAL_QUORUM,
serial_consistency=SerialConsistency.LOCAL_SERIAL,
request_timeout=2,
)
query_profile = ExecutionProfile(
consistency=Consistency.ALL,
serial_consistency=SerialConsistency.SERIAL,
)


async def main():
query_profile = ExecutionProfile(
consistency=Consistency.ALL,
serial_consistency=SerialConsistency.SERIAL,
# Load balancing cannot be constructed without running event loop.
# If you won't do it inside async funcion, it will result in error.
load_balancing_policy=await LoadBalancingPolicy.build(
token_aware=True,
prefer_rack="rack1",
prefer_datacenter="dc1",
permit_dc_failover=True,
shuffling_replicas=True,
latency_awareness=LatencyAwareness(
minimum_measurements=10,
retry_period=1000,
exclusion_threshold=1.4,
update_rate=1000,
scale=2,
),
),
)

scylla = Scylla(
["192.168.32.4"],
default_execution_profile=default_profile,
Expand Down Expand Up @@ -324,4 +389,17 @@ async def execute_batch(scylla: Scylla) -> None:
Insert("users").set("id", i).set("name", "test").add_to_batch(batch)
await scylla.batch(batch)

```

## Paging

Queries that were built with QueryBuilder also support paged returns.
But it supported only for select, because update, delete and insert should
not return anything and it makes no sense implementing it.
To make built `Select` query return paginated iterator, add paged parameter in execute method.

```python
rows = await Select("test").execute(scylla, paged=True)
async for row in rows:
print(row['id'])
```
16 changes: 11 additions & 5 deletions python/scyllapy/_internal/load_balancing.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,29 @@ class LoadBalancingPolicy:
Can be applied to profiles.
"""

def __init__(
self,
@classmethod
async def build(
cls,
*,
token_aware: bool | None = None,
prefer_rack: str | None = None,
prefer_datacenter: str | None = None,
permit_dc_failover: bool | None = None,
shuffling_replicas: bool | None = None,
latency_awareness: LatencyAwareness | None = None,
) -> None: ...
) -> LoadBalancingPolicy: ...
"""
Construct load balancing policy.
It requires to be async, becausse it needs to start a background task.
:param token_aware: Whether to use token aware routing.
:param prefer_rack: Name of the rack to prefer.
:param prefer_datacenter: Name of the datacenter to prefer.
:param permit_dc_failover: Whether to allow datacenter failover.
:param shuffling_replicas: Whether to shuffle replicas.
:param latency_awareness: Latency awareness policy.
"""

async def with_latency_awareness(
self,
latency_awareness: LatencyAwareness,
) -> None: ...
61 changes: 35 additions & 26 deletions src/load_balancing.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
use std::sync::Arc;

use pyo3::{pyclass, pymethods, types::PyModule, PyResult, Python};
use pyo3::{
pyclass, pymethods,
types::{PyModule, PyType},
PyAny, PyResult, Python,
};
use scylla::load_balancing::{DefaultPolicy, LatencyAwarenessBuilder, LoadBalancingPolicy};
use std::time::Duration;

use crate::{exceptions::rust_err::ScyllaPyResult, utils::scyllapy_future};

#[pyclass(name = "LoadBalancingPolicy")]
#[derive(Clone, Debug)]
pub struct ScyllaPyLoadBalancingPolicy {
Expand All @@ -12,7 +18,7 @@ pub struct ScyllaPyLoadBalancingPolicy {

#[pymethods]
impl ScyllaPyLoadBalancingPolicy {
#[new]
#[classmethod]
#[pyo3(signature = (
*,
token_aware = None,
Expand All @@ -23,36 +29,39 @@ impl ScyllaPyLoadBalancingPolicy {
latency_awareness = None,
)
)]
fn new(
fn build(
cls: &PyType,
token_aware: Option<bool>,
prefer_rack: Option<String>,
prefer_datacenter: Option<String>,
permit_dc_failover: Option<bool>,
shuffling_replicas: Option<bool>,
latency_awareness: Option<ScyllaPyLatencyAwareness>,
) -> Self {
let mut policy_builer = DefaultPolicy::builder();
if let Some(permit) = permit_dc_failover {
policy_builer = policy_builer.permit_dc_failover(permit);
}
if let Some(token) = token_aware {
policy_builer = policy_builer.token_aware(token);
}
if let Some(rack) = prefer_rack {
policy_builer = policy_builer.prefer_rack(rack);
}
if let Some(dc) = prefer_datacenter {
policy_builer = policy_builer.prefer_datacenter(dc);
}
if let Some(shufle) = shuffling_replicas {
policy_builer = policy_builer.enable_shuffling_replicas(shufle);
}
if let Some(latency_awareness) = latency_awareness {
policy_builer = policy_builer.latency_awareness(latency_awareness.into());
}
Self {
inner: policy_builer.build(),
}
) -> ScyllaPyResult<&PyAny> {
scyllapy_future(cls.py(), async move {
let mut policy_builer = DefaultPolicy::builder();
if let Some(permit) = permit_dc_failover {
policy_builer = policy_builer.permit_dc_failover(permit);
}
if let Some(token) = token_aware {
policy_builer = policy_builer.token_aware(token);
}
if let Some(rack) = prefer_rack {
policy_builer = policy_builer.prefer_rack(rack);
}
if let Some(dc) = prefer_datacenter {
policy_builer = policy_builer.prefer_datacenter(dc);
}
if let Some(shufle) = shuffling_replicas {
policy_builer = policy_builer.enable_shuffling_replicas(shufle);
}
if let Some(latency_awareness) = latency_awareness {
policy_builer = policy_builer.latency_awareness(latency_awareness.into());
}
Ok(Self {
inner: policy_builer.build(),
})
})
}
}

Expand Down

0 comments on commit 4b52042

Please sign in to comment.