Skip to content

Commit

Permalink
feat: implement Redis option for cursor persistence (#790)
Browse files Browse the repository at this point in the history
  • Loading branch information
gonzalezzfelipe committed May 23, 2024
1 parent 8dde4f8 commit 3b205b0
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 3 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions examples/redis_cursor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Redis cursor

This example shows you how to persits the cursor information on a Redis cluster.

The `daemon.toml` includes the `[cursor]` section that has `type` set to `Redis`, `key` is the key on the redis cluster where to dump the information, and `url` the connection string to connect with Redis.

To run the example:

* Set up [Demeter CLI](https://docs.demeter.run/cli).
* On a different terminal but same path run `dmtr ports tunnel`.
Chose the `node` option, followed by the `preprod` network and the `stable` version. Finally, mount the socket on `./socket`.
* `docker compose up -d` to spin up the Redis instance.
* ```sh
cargo run --bin oura --features redis daemon --config daemon.toml
```

In order to see cursor information on the Redis you can do the following.

```sh
$ docker exec -it redis redis-cli
127.0.0.1:6379> GET key
```
18 changes: 18 additions & 0 deletions examples/redis_cursor/daemon.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[source]
type = "N2C"
socket_path = "./socket"

[intersect]
type = "Origin"

[chain]
type = "preprod"

[cursor]
type = "Redis"
key = "key"
url = "redis://localhost:6379"
flush_interval = 1

[sink]
type = "Stdout"
18 changes: 18 additions & 0 deletions examples/redis_cursor/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: "3"
services:
redis:
image: redis
container_name: redis
ports:
- "6379:6379"
networks:
- redis-network
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 5s
retries: 5

networks:
redis-network:
driver: bridge
21 changes: 21 additions & 0 deletions src/cursor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,37 @@ use crate::framework::*;
pub mod file;
pub mod memory;

#[cfg(feature = "redis")]
mod redis;

pub type MaxBreadcrums = usize;

pub enum Bootstrapper {
Memory(memory::Stage),
File(file::Stage),

#[cfg(feature = "redis")]
Redis(redis::Stage),
}

impl Bootstrapper {
pub fn borrow_track(&mut self) -> &mut InputPort<Point> {
match self {
Bootstrapper::Memory(x) => &mut x.track,
Bootstrapper::File(x) => &mut x.track,

#[cfg(feature = "redis")]
Bootstrapper::Redis(x) => &mut x.track,
}
}

pub fn spawn(self, policy: gasket::runtime::Policy) -> Tether {
match self {
Bootstrapper::Memory(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::File(x) => gasket::runtime::spawn_stage(x, policy),

#[cfg(feature = "redis")]
Bootstrapper::Redis(x) => gasket::runtime::spawn_stage(x, policy),
}
}
}
Expand All @@ -35,20 +47,29 @@ impl Bootstrapper {
pub enum Config {
Memory(memory::Config),
File(file::Config),

#[cfg(feature = "redis")]
Redis(redis::Config),
}

impl Config {
pub fn initial_load(&self) -> Result<Breadcrumbs, Error> {
match self {
Config::Memory(x) => x.initial_load(),
Config::File(x) => x.initial_load(),

#[cfg(feature = "redis")]
Config::Redis(x) => x.initial_load(),
}
}

pub fn bootstrapper(self, ctx: &Context) -> Result<Bootstrapper, Error> {
match self {
Config::Memory(c) => Ok(Bootstrapper::Memory(c.bootstrapper(ctx)?)),
Config::File(c) => Ok(Bootstrapper::File(c.bootstrapper(ctx)?)),

#[cfg(feature = "redis")]
Config::Redis(c) => Ok(Bootstrapper::Redis(c.bootstrapper(ctx)?)),
}
}
}
Expand Down
163 changes: 163 additions & 0 deletions src/cursor/redis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
use gasket::framework::*;
use pallas::network::miniprotocols::Point;
use r2d2_redis::{
r2d2::{self, Pool},
redis::{self, Commands},
RedisConnectionManager,
};
use serde::Deserialize;
use tokio::select;
use tracing::debug;

use crate::framework::*;

fn breadcrumbs_to_data(crumbs: &Breadcrumbs) -> Vec<(u64, String)> {
crumbs
.points()
.into_iter()
.filter_map(|p| match p {
Point::Origin => None,
Point::Specific(slot, hash) => Some((slot, hex::encode(hash))),
})
.collect()
}

fn breadcrumbs_from_data(data: Vec<(u64, String)>, max: usize) -> Result<Breadcrumbs, Error> {
let points: Vec<_> = data
.into_iter()
.map::<Result<_, Error>, _>(|(slot, hash)| {
let hash = hex::decode(hash).map_err(Error::custom)?;
Ok(Point::Specific(slot, hash))
})
.collect::<Result<_, _>>()?;

Ok(Breadcrumbs::from_points(points, max))
}

pub enum Unit {
Track(Point),
Flush,
}

pub struct Worker {
pool: Pool<RedisConnectionManager>,
key: String,
}

#[async_trait::async_trait(?Send)]
impl gasket::framework::Worker<Stage> for Worker {
async fn bootstrap(stage: &Stage) -> Result<Self, WorkerError> {
let manager = RedisConnectionManager::new(stage.url.clone()).or_panic()?;
let pool = r2d2::Pool::builder().build(manager).or_panic()?;

Ok(Self {
pool,
key: stage.key.clone(),
})
}

async fn schedule(&mut self, stage: &mut Stage) -> Result<WorkSchedule<Unit>, WorkerError> {
select! {
msg = stage.track.recv() => {
let msg = msg.or_panic()?;
Ok(WorkSchedule::Unit(Unit::Track(msg.payload)))
}
msg = stage.flush.recv() => {
msg.or_panic()?;
Ok(WorkSchedule::Unit(Unit::Flush))
}
}
}

async fn execute(&mut self, unit: &Unit, stage: &mut Stage) -> Result<(), WorkerError> {
match unit {
Unit::Track(x) => stage.breadcrumbs.track(x.clone()),
Unit::Flush => {
let data = breadcrumbs_to_data(&stage.breadcrumbs);
let mut conn = self.pool.get().or_restart()?;

let data_to_write = serde_json::to_string(&data).or_panic()?;
conn.set(&self.key, &data_to_write)
.map_err(Error::custom)
.or_panic()?;
}
}

Ok(())
}
}

#[derive(Stage)]
#[stage(name = "cursor", unit = "Unit", worker = "Worker")]
pub struct Stage {
key: String,
url: String,

breadcrumbs: Breadcrumbs,

pub track: gasket::messaging::InputPort<Point>,

pub flush: gasket::messaging::TimerPort,

#[metric]
tracked_slot: gasket::metrics::Gauge,

#[metric]
flush_count: gasket::metrics::Counter,
}

const DEFAULT_MAX_BREADCRUMBS: usize = 10;
const DEFAULT_FLUSH_INTERVAL: usize = 10;

#[derive(Default, Debug, Deserialize)]
pub struct Config {
pub key: String,
pub url: String,
pub max_breadcrumbs: Option<usize>,
pub flush_interval: Option<u64>,
}

impl Config {
pub fn initial_load(&self) -> Result<Breadcrumbs, Error> {
let client = redis::Client::open(self.url.clone())
.map_err(|err| Error::Custom(format!("Unable to connect to Redis: {}", err)))?;
let mut conn = client
.get_connection()
.map_err(|err| Error::Custom(format!("Unable to establish connection: {}", err)))?;

let max_breadcrumbs = self.max_breadcrumbs.unwrap_or(DEFAULT_MAX_BREADCRUMBS);

let result: redis::RedisResult<Option<String>> = conn.get(&self.key);

match result {
Ok(Some(data_as_string)) => {
debug!("Retrieving cursor information from redis.");
let data: Vec<(u64, String)> =
serde_json::from_str(&data_as_string).map_err(Error::custom)?;
let crumbs = breadcrumbs_from_data(data, max_breadcrumbs)?;
Ok(crumbs)
}
Ok(None) => {
debug!("No cursor information found on redis cluster.");
Ok(Breadcrumbs::new(max_breadcrumbs))
}
Err(err) => Err(Error::custom(err)),
}
}

pub fn bootstrapper(self, ctx: &Context) -> Result<Stage, Error> {
let flush_interval = self.flush_interval.unwrap_or(DEFAULT_FLUSH_INTERVAL as u64);

let stage = Stage {
key: self.key.clone(),
url: self.url.clone(),
breadcrumbs: ctx.breadcrumbs.clone(),
tracked_slot: Default::default(),
flush_count: Default::default(),
track: Default::default(),
flush: gasket::messaging::TimerPort::from_secs(flush_interval),
};

Ok(stage)
}
}

0 comments on commit 3b205b0

Please sign in to comment.