Skip to content

Commit

Permalink
Merge branch 'main' into chore/update-pallas-v027
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega committed Jun 1, 2024
2 parents 5febbb5 + 3b205b0 commit 9c11ab3
Show file tree
Hide file tree
Showing 9 changed files with 254 additions and 26 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
role-session-name: Github-e2e-Rollout
role-duration-seconds: 1200

- uses: azure/setup-kubectl@v3.2
- uses: azure/setup-kubectl@v4
with:
version: "v1.23.6"

Expand Down Expand Up @@ -93,7 +93,7 @@ jobs:
role-session-name: Github-e2e-Rollout
role-duration-seconds: 3600

- uses: azure/setup-kubectl@v3.2
- uses: azure/setup-kubectl@v4
with:
version: "v1.23.6"

Expand Down
1 change: 1 addition & 0 deletions .github/workflows/testdrive.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: Testdrive

on:
workflow_dispatch: {}
push:
branches:
- testdrive
Expand Down
31 changes: 8 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ elasticsearch = { version = "8.5.0-alpha.1", optional = true }
murmur3 = { version = "0.5.2", optional = true }
openssl = { version = "0.10", optional = true, features = ["vendored"] }
lapin = { version = "2.2.1", optional = true }
kafka = { version = "0.9.0", optional = true }
kafka = { version = "0.10.0", optional = true }
google-cloud-pubsub = { version = "0.16.0", optional = true }
google-cloud-googleapis = { version = "0.10.0", optional = true }
google-cloud-default = { version = "0.4.0", optional = true, features = ["pubsub"] }
Expand Down
22 changes: 22 additions & 0 deletions examples/redis_cursor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Redis cursor

This example shows you how to persits the cursor information on a Redis cluster.

The `daemon.toml` includes the `[cursor]` section that has `type` set to `Redis`, `key` is the key on the redis cluster where to dump the information, and `url` the connection string to connect with Redis.

To run the example:

* Set up [Demeter CLI](https://docs.demeter.run/cli).
* On a different terminal but same path run `dmtr ports tunnel`.
Chose the `node` option, followed by the `preprod` network and the `stable` version. Finally, mount the socket on `./socket`.
* `docker compose up -d` to spin up the Redis instance.
* ```sh
cargo run --bin oura --features redis daemon --config daemon.toml
```

In order to see cursor information on the Redis you can do the following.

```sh
$ docker exec -it redis redis-cli
127.0.0.1:6379> GET key
```
18 changes: 18 additions & 0 deletions examples/redis_cursor/daemon.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[source]
type = "N2C"
socket_path = "./socket"

[intersect]
type = "Origin"

[chain]
type = "preprod"

[cursor]
type = "Redis"
key = "key"
url = "redis://localhost:6379"
flush_interval = 1

[sink]
type = "Stdout"
18 changes: 18 additions & 0 deletions examples/redis_cursor/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
version: "3"
services:
redis:
image: redis
container_name: redis
ports:
- "6379:6379"
networks:
- redis-network
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 5s
retries: 5

networks:
redis-network:
driver: bridge
21 changes: 21 additions & 0 deletions src/cursor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,37 @@ use crate::framework::*;
pub mod file;
pub mod memory;

#[cfg(feature = "redis")]
mod redis;

pub type MaxBreadcrums = usize;

pub enum Bootstrapper {
Memory(memory::Stage),
File(file::Stage),

#[cfg(feature = "redis")]
Redis(redis::Stage),
}

impl Bootstrapper {
pub fn borrow_track(&mut self) -> &mut InputPort<Point> {
match self {
Bootstrapper::Memory(x) => &mut x.track,
Bootstrapper::File(x) => &mut x.track,

#[cfg(feature = "redis")]
Bootstrapper::Redis(x) => &mut x.track,
}
}

pub fn spawn(self, policy: gasket::runtime::Policy) -> Tether {
match self {
Bootstrapper::Memory(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::File(x) => gasket::runtime::spawn_stage(x, policy),

#[cfg(feature = "redis")]
Bootstrapper::Redis(x) => gasket::runtime::spawn_stage(x, policy),
}
}
}
Expand All @@ -35,20 +47,29 @@ impl Bootstrapper {
pub enum Config {
Memory(memory::Config),
File(file::Config),

#[cfg(feature = "redis")]
Redis(redis::Config),
}

impl Config {
pub fn initial_load(&self) -> Result<Breadcrumbs, Error> {
match self {
Config::Memory(x) => x.initial_load(),
Config::File(x) => x.initial_load(),

#[cfg(feature = "redis")]
Config::Redis(x) => x.initial_load(),
}
}

pub fn bootstrapper(self, ctx: &Context) -> Result<Bootstrapper, Error> {
match self {
Config::Memory(c) => Ok(Bootstrapper::Memory(c.bootstrapper(ctx)?)),
Config::File(c) => Ok(Bootstrapper::File(c.bootstrapper(ctx)?)),

#[cfg(feature = "redis")]
Config::Redis(c) => Ok(Bootstrapper::Redis(c.bootstrapper(ctx)?)),
}
}
}
Expand Down
Loading

0 comments on commit 9c11ab3

Please sign in to comment.