Skip to content

Commit

Permalink
Shutdown program on stream error or close (#213)
Browse files Browse the repository at this point in the history
  • Loading branch information
kespinola authored Jan 24, 2025
1 parent 1343e33 commit e678814
Showing 1 changed file with 106 additions and 80 deletions.
186 changes: 106 additions & 80 deletions grpc-ingest/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> {

let subscriptions = config.subscriptions.clone();

let (global_shutdown_tx, mut global_shutdown_rx) = oneshot::channel();
let global_shutdown_tx = Arc::new(Mutex::new(Some(global_shutdown_tx)));

let mut subscription_tasks = Vec::new();
for (label, subscription_config) in subscriptions {
let subscription = Subscription {
Expand All @@ -48,19 +51,27 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> {
.config(Arc::clone(&config))
.connection(connection.clone())
.subscription(subscription)
.start()
.start(Arc::clone(&global_shutdown_tx))
.await?;

subscription_tasks.push(task);
}

if let Some(signal) = shutdown.next().await {
warn!(
target: "grpc2redis",
action = "shutdown_signal_received",
message = "Shutdown signal received, waiting for spawned tasks to complete",
signal = ?signal
);
tokio::select! {
_ = &mut global_shutdown_rx => {
warn!(
target: "grpc2redis",
action = "global_shutdown_signal_received",
message = "Global shutdown signal received, stopping all tasks"
);
}
_ = shutdown.next() => {
warn!(
target: "grpc2redis",
action = "shutdown_signal_received",
message = "Shutdown signal received, waiting for spawned tasks to complete"
);
}
}

futures::future::join_all(
Expand Down Expand Up @@ -108,7 +119,10 @@ impl SubscriptionTask {
self
}

pub async fn start(mut self) -> anyhow::Result<SubscriptionTaskStop> {
pub async fn start(
mut self,
global_shutdown_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
) -> anyhow::Result<SubscriptionTaskStop> {
let config = Arc::clone(&self.config);
let connection = self
.connection
Expand Down Expand Up @@ -164,6 +178,7 @@ impl SubscriptionTask {
let (mut subscribe_tx, stream) = dragon_mouth_client
.subscribe_with_request(Some(request))
.await?;
let global_shutdown_tx = Arc::clone(&global_shutdown_tx);

let control = tokio::spawn({
async move {
Expand Down Expand Up @@ -215,89 +230,100 @@ impl SubscriptionTask {

loop {
tokio::select! {
Some(Ok(msg)) = stream.next() => {
match msg.update_oneof {
Some(UpdateOneof::Account(_)) | Some(UpdateOneof::Transaction(_)) => {
if tasks.len() >= stream_config.max_concurrency {
tasks.next().await;
}
grpc_tasks_total_inc(&label, &stream_config.name);

tasks.push(tokio::spawn({
let pipe = Arc::clone(&pipes[current_pipe_index]);
let label = label.clone();
let stream_config = Arc::clone(&stream_config);

async move {
let stream = stream_config.name.clone();
let stream_maxlen = stream_config.max_len;

let SubscribeUpdate { update_oneof, .. } = msg;

let mut pipe = pipe.lock().await;

if let Some(update) = update_oneof {
match update {
UpdateOneof::Account(account) => {
pipe.xadd_maxlen(
&stream.to_string(),
StreamMaxlen::Approx(stream_maxlen),
"*",
account.encode_to_vec(),
);
debug!(target: "grpc2redis", action = "process_account_update",label = ?label, stream = ?stream, maxlen = ?stream_maxlen);
event = stream.next() => {
match event {
Some(Ok(msg)) => {
match msg.update_oneof {
Some(UpdateOneof::Account(_)) | Some(UpdateOneof::Transaction(_)) => {
if tasks.len() >= stream_config.max_concurrency {
tasks.next().await;
}
grpc_tasks_total_inc(&label, &stream_config.name);

tasks.push(tokio::spawn({
let pipe = Arc::clone(&pipes[current_pipe_index]);
let label = label.clone();
let stream_config = Arc::clone(&stream_config);

async move {
let stream = stream_config.name.clone();
let stream_maxlen = stream_config.max_len;

let SubscribeUpdate { update_oneof, .. } = msg;

let mut pipe = pipe.lock().await;

if let Some(update) = update_oneof {
match update {
UpdateOneof::Account(account) => {
pipe.xadd_maxlen(
&stream.to_string(),
StreamMaxlen::Approx(stream_maxlen),
"*",
account.encode_to_vec(),
);
debug!(target: "grpc2redis", action = "process_account_update", label = ?label, stream = ?stream, maxlen = ?stream_maxlen);
}

UpdateOneof::Transaction(transaction) => {
pipe.xadd_maxlen(
&stream.to_string(),
StreamMaxlen::Approx(stream_maxlen),
"*",
transaction.encode_to_vec(),
);
debug!(target: "grpc2redis", action = "process_transaction_update", label = ?label, stream = ?stream, maxlen = ?stream_maxlen);
}
_ => {
warn!(target: "grpc2redis", action = "unknown_update_variant", label = ?label, message = "Unknown update variant");
}
}
}

UpdateOneof::Transaction(transaction) => {
pipe.xadd_maxlen(
&stream.to_string(),
StreamMaxlen::Approx(stream_maxlen),
"*",
transaction.encode_to_vec(),
);
debug!(target: "grpc2redis", action = "process_transaction_update",label = ?label, stream = ?stream, maxlen = ?stream_maxlen);
}
_ => {
warn!(target: "grpc2redis", action = "unknown_update_variant",label = ?label, message = "Unknown update variant")
}
grpc_tasks_total_dec(&label, &stream_config.name);
}
}

}));

grpc_tasks_total_dec(&label, &stream_config.name);
current_pipe_index = (current_pipe_index + 1) % pipes.len();
}
}));

current_pipe_index = (current_pipe_index + 1) % pipes.len();
}
Some(UpdateOneof::Ping(_)) => {
let ping = subscribe_tx
.send(SubscribeRequest {
ping: Some(SubscribeRequestPing { id: PING_ID }),
..Default::default()
})
.await;

match ping {
Ok(_) => {
debug!(target: "grpc2redis", action = "send_ping", message = "Ping sent successfully", id = PING_ID)
Some(UpdateOneof::Ping(_)) => {
let ping = subscribe_tx
.send(SubscribeRequest {
ping: Some(SubscribeRequestPing { id: PING_ID }),
..Default::default()
})
.await;

match ping {
Ok(_) => {
debug!(target: "grpc2redis", action = "send_ping", message = "Ping sent successfully", id = PING_ID);
}
Err(err) => {
warn!(target: "grpc2redis", action = "send_ping_failed", message = "Failed to send ping", ?err, id = PING_ID);
}
}
}
Err(err) => {
warn!(target: "grpc2redis", action = "send_ping_failed", message = "Failed to send ping", ?err, id = PING_ID)
Some(UpdateOneof::Pong(pong)) => {
if pong.id == PING_ID {
debug!(target: "grpc2redis", action = "receive_pong", message = "Pong received", id = PING_ID);
} else {
warn!(target: "grpc2redis", action = "receive_unknown_pong", message = "Unknown pong id received", id = pong.id);
}
}
_ => {
warn!(target: "grpc2redis", action = "unknown_update_variant", message = "Unknown update variant");
}
}
}
Some(UpdateOneof::Pong(pong)) => {
if pong.id == PING_ID {
debug!(target: "grpc2redis", action = "receive_pong", message = "Pong received", id = PING_ID);
} else {
warn!(target: "grpc2redis", action = "receive_unknown_pong", message = "Unknown pong id received", id = pong.id);
}
}
_ => {
warn!(target: "grpc2redis", action = "unknown_update_variant", message = "Unknown update variant", ?msg.update_oneof)
break;
}
}

let mut global_shutdown_tx = global_shutdown_tx.lock().await;
if let Some(global_shutdown_tx) = global_shutdown_tx.take() {
let _ = global_shutdown_tx.send(());
}
}
_ = &mut shutdown_rx => {
debug!(target: "grpc2redis", action = "shutdown_signal_received", message = "Shutdown signal received, stopping subscription task", ?label);
Expand Down

0 comments on commit e678814

Please sign in to comment.