Skip to content

Commit

Permalink
WIP add example of a fully working service
Browse files Browse the repository at this point in the history
  • Loading branch information
toelo3 committed Nov 23, 2023
1 parent 41f8d8f commit 73f0dfb
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 0 deletions.
11 changes: 11 additions & 0 deletions service_example/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "dsh-rs-example"
version = "0.1.0"
description = "An example of DSH service using the dsh-sdk crate"
edition = "2021"

[dependencies]
# path needs to be changed to the path of the dsh_sdks crate
dsh_sdk = { git = "https://github.com/kpn-dsh/dsh-sdk-platform-rs.git" , features = ["rdkafka-ssl-vendored"] }
rdkafka = { version = "0.36", features = ["ssl-vendored", "cmake-build"] }
tokio = { version = "1", features = ["full"] }
43 changes: 43 additions & 0 deletions service_example/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
### builder
FROM rust:latest as builder

#cache
VOLUME /home/dsh/target

WORKDIR /home/dsh
COPY . .

# install dependencies (CMake required for rdkafka)
RUN apt-get update && apt-get install -y \
cmake

# Build the Rust service
RUN cargo install --path .

### runner
FROM debian:bookworm-slim

# args
ARG UID
RUN test -n "$UID" || (echo "UID argument not provided" && false)
ARG GID
RUN test -n "$GID" || (echo "GID argument not provided" && false)
ARG USER=dsh
ARG GROUP=dsh
ARG WORKDIR=/home/${USER}


RUN apt-get update && apt-get -y upgrade

# Add user to group
RUN groupadd --gid ${GID} ${GROUP}
RUN useradd -ms /bin/bash --uid ${UID} --gid ${GID} ${USER}

# Copy binary from builder (make sure the name is same as the one in Cargo.toml)
COPY --from=builder /usr/local/cargo/bin/dsh-rs-example /home/dsh/bin/dsh-rs-example

# Expose port for metrics
EXPOSE 9090

# Set the entry point or command to run the Rust application
ENTRYPOINT [ "/home/dsh/bin/ingester-db-kafka-service" ]
6 changes: 6 additions & 0 deletions service_example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# DSH SDK Example Service

This is an example service that uses the DSH SDK Platform and will demonstrate a simple consumer that prints the messages to the console.

## Getting Started

52 changes: 52 additions & 0 deletions service_example/makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
TENANT=
TENANTUID=

DOCKER_REPO_URL=registry.cp.kpn-dsh.com/$(TENANT)
VERSION=example
tagname=dsh-sdk-example


image=$(DOCKER_REPO_URL)/$(tagname):$(VERSION)

help:
@echo "login - login to the relevant repository"
@echo "fix - run dos2unix on every file"
@echo "build - build the image"
@echo "build-m1 - build the image"
@echo "rebuild - build the image with the --no-cache flag"
@echo "no-docker-build - Cargo build on local machine"
@echo "all - shorthand for fix, build, push, show"
@echo "run - run the image in local docker"
@echo "run-m1 - run the image in local docker (m1)"
@echo "push - push the image to jfrog"
@echo "show - show the current make variables"
login:
docker login $(DOCKER_REPO_URL) -u $(TENANT)
fix:
find . -type f -print0 | xargs -0 dos2unix
build:
docker build -t $(tagname) -f Dockerfile --build-arg UID=$(TENANTUID) --build-arg GID=$(TENANTUID) --platform linux/amd64 .
docker tag $(tagname) $(image)
rebuild:
docker build --no-cache -t $(tagname) -f Dockerfile --build-arg UID=$(TENANTUID) --build-arg GID=$(TENANTUID) --platform linux/amd64 .
docker tag $(tagname) $(image)
no-docker-build:
cargo b
all:
make build
make push
make show
run:
docker run -u $(TENANTUID):$(TENANTUID) -it --entrypoint "/bin/sh" $(image)
fun:
@curl -s -H "Accept: application/json" https://icanhazdadjoke.com/
push:
docker push $(image)
show:
@echo "#make file configuration"
@echo "#URL :" $(DOCKER_REPO_URL)
@echo "#TENANT :" $(TENANT)
@echo "#TENANTUID :" $(TENANTUID)
@echo "#TAG :" $(tagname)
@echo "#version :" $(VERSION)
@echo "#image :" $(image)
115 changes: 115 additions & 0 deletions service_example/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use dsh_sdk::bootstrap::Bootstrap;
use dsh_sdk::graceful_shutdown::Shutdown;

use rdkafka::consumer::CommitMode;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::message::BorrowedMessage;
use rdkafka::message::Message;

fn deserialize_and_print(msg: &BorrowedMessage) {
let payload = match msg.payload() {
Some(p) => std::string::String::from_utf8_lossy(p),
None => std::borrow::Cow::Borrowed(""),
};
let key = match msg.key() {
Some(p) => std::string::String::from_utf8_lossy(p),
None => std::borrow::Cow::Borrowed(""),
};

println!(
"Received message from topic {} partition {} offset {} with key {:?} and payload {}",
msg.topic(),
msg.partition(),
msg.offset(),
key,
payload
);
}

async fn consume(consumer: StreamConsumer, shutdown: Shutdown) {
loop {
tokio::select! {
Ok(msg) = consumer.recv() => {
deserialize_and_print(&msg);
// Commit the message
match consumer.commit_message(&msg, CommitMode::Sync)
{
Ok(_) => {}
Err(e) => println!("Error while committing message: {:?}", e),
}
},
_ = shutdown.recv() => {
println!("Shutdown requested, breaking out of consumer");
consumer.unsubscribe();
break;
}
}
}
}

#[tokio::main]
async fn main() {
// Create a new bootstrap instance (connects to the DSH server and fetches the kafka properties)
let bootstrap = match Bootstrap::new().await {
Ok(b) => b,
Err(e) => {
println!("Error bootstrap to DSH: {:?}", e);
return;
}
};

// Get the configured topics from env variable TOPICS (comma separated)
let topis_string = std::env::var("TOPICS").expect("TOPICS env variable not set");
let topics = topis_string.split(",").map(|s| s).collect::<Vec<&str>>();

// Validate your configured topic if it has read access (optional)
match bootstrap
.kafka_properties()
.verify_list_of_topics(&topics, dsh_sdk::bootstrap::ReadWriteAccess::Read)
{
Ok(_) => {}
Err(e) => {
println!("Error validating topics: {:?}", e);
return;
}
};

// Initialize the shutdown handler (This will handle SIGTERM and SIGINT signals and you can act on them)
let shutdown = Shutdown::new();

// Get the consumer config from the bootstrap instance
let mut consumer_client_config = bootstrap.consumer_rdkafka_config();

// Override some default values (optional)
consumer_client_config.set("auto.offset.reset", "latest");

// Create a new consumer instance
let consumer: StreamConsumer = consumer_client_config
.create()
.expect("Consumer creation failed");

// Subscribe to the configured topics
consumer
.subscribe(&topics)
.expect("Can't subscribe to specified topics");

// Create a future for consuming messages,
let consume_future = consume(consumer, shutdown.clone());
let consumer_handle = tokio::spawn(async move {
consume_future.await;
});

// Wait for shutdown signal or that the consumer has stopped
tokio::select! {
_ = shutdown.signal_listener() => {
println!("Shutdown signal received");
}
_ = consumer_handle => {
println!("Consumer stopped");
shutdown.start(); // Start the shutdown process (this will stop other potential running tasks, if you create them)
}
}

// Wait till the shutdown is complete
shutdown.complete().await;
}

0 comments on commit 73f0dfb

Please sign in to comment.