Skip to content

Shutdown program on stream error or close #213

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 24, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 106 additions & 80 deletions grpc-ingest/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> {

let subscriptions = config.subscriptions.clone();

let (global_shutdown_tx, mut global_shutdown_rx) = oneshot::channel();
let global_shutdown_tx = Arc::new(Mutex::new(Some(global_shutdown_tx)));

let mut subscription_tasks = Vec::new();
for (label, subscription_config) in subscriptions {
let subscription = Subscription {
Expand All @@ -48,19 +51,27 @@ pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> {
.config(Arc::clone(&config))
.connection(connection.clone())
.subscription(subscription)
.start()
.start(Arc::clone(&global_shutdown_tx))
.await?;

subscription_tasks.push(task);
}

if let Some(signal) = shutdown.next().await {
warn!(
target: "grpc2redis",
action = "shutdown_signal_received",
message = "Shutdown signal received, waiting for spawned tasks to complete",
signal = ?signal
);
tokio::select! {
_ = &mut global_shutdown_rx => {
warn!(
target: "grpc2redis",
action = "global_shutdown_signal_received",
message = "Global shutdown signal received, stopping all tasks"
);
}
_ = shutdown.next() => {
warn!(
target: "grpc2redis",
action = "shutdown_signal_received",
message = "Shutdown signal received, waiting for spawned tasks to complete"
);
}
}

futures::future::join_all(
Expand Down Expand Up @@ -108,7 +119,10 @@ impl SubscriptionTask {
self
}

pub async fn start(mut self) -> anyhow::Result<SubscriptionTaskStop> {
pub async fn start(
mut self,
global_shutdown_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
) -> anyhow::Result<SubscriptionTaskStop> {
let config = Arc::clone(&self.config);
let connection = self
.connection
Expand Down Expand Up @@ -164,6 +178,7 @@ impl SubscriptionTask {
let (mut subscribe_tx, stream) = dragon_mouth_client
.subscribe_with_request(Some(request))
.await?;
let global_shutdown_tx = Arc::clone(&global_shutdown_tx);

let control = tokio::spawn({
async move {
Expand Down Expand Up @@ -215,89 +230,100 @@ impl SubscriptionTask {

loop {
tokio::select! {
Some(Ok(msg)) = stream.next() => {
match msg.update_oneof {
Some(UpdateOneof::Account(_)) | Some(UpdateOneof::Transaction(_)) => {
if tasks.len() >= stream_config.max_concurrency {
tasks.next().await;
}
grpc_tasks_total_inc(&label, &stream_config.name);

tasks.push(tokio::spawn({
let pipe = Arc::clone(&pipes[current_pipe_index]);
let label = label.clone();
let stream_config = Arc::clone(&stream_config);

async move {
let stream = stream_config.name.clone();
let stream_maxlen = stream_config.max_len;

let SubscribeUpdate { update_oneof, .. } = msg;

let mut pipe = pipe.lock().await;

if let Some(update) = update_oneof {
match update {
UpdateOneof::Account(account) => {
pipe.xadd_maxlen(
&stream.to_string(),
StreamMaxlen::Approx(stream_maxlen),
"*",
account.encode_to_vec(),
);
debug!(target: "grpc2redis", action = "process_account_update",label = ?label, stream = ?stream, maxlen = ?stream_maxlen);
event = stream.next() => {
match event {
Some(Ok(msg)) => {
match msg.update_oneof {
Some(UpdateOneof::Account(_)) | Some(UpdateOneof::Transaction(_)) => {
if tasks.len() >= stream_config.max_concurrency {
tasks.next().await;
}
grpc_tasks_total_inc(&label, &stream_config.name);

tasks.push(tokio::spawn({
let pipe = Arc::clone(&pipes[current_pipe_index]);
let label = label.clone();
let stream_config = Arc::clone(&stream_config);

async move {
let stream = stream_config.name.clone();
let stream_maxlen = stream_config.max_len;

let SubscribeUpdate { update_oneof, .. } = msg;

let mut pipe = pipe.lock().await;

if let Some(update) = update_oneof {
match update {
UpdateOneof::Account(account) => {
pipe.xadd_maxlen(
&stream.to_string(),
StreamMaxlen::Approx(stream_maxlen),
"*",
account.encode_to_vec(),
);
debug!(target: "grpc2redis", action = "process_account_update", label = ?label, stream = ?stream, maxlen = ?stream_maxlen);
}

UpdateOneof::Transaction(transaction) => {
pipe.xadd_maxlen(
&stream.to_string(),
StreamMaxlen::Approx(stream_maxlen),
"*",
transaction.encode_to_vec(),
);
debug!(target: "grpc2redis", action = "process_transaction_update", label = ?label, stream = ?stream, maxlen = ?stream_maxlen);
}
_ => {
warn!(target: "grpc2redis", action = "unknown_update_variant", label = ?label, message = "Unknown update variant");
}
}
}

UpdateOneof::Transaction(transaction) => {
pipe.xadd_maxlen(
&stream.to_string(),
StreamMaxlen::Approx(stream_maxlen),
"*",
transaction.encode_to_vec(),
);
debug!(target: "grpc2redis", action = "process_transaction_update",label = ?label, stream = ?stream, maxlen = ?stream_maxlen);
}
_ => {
warn!(target: "grpc2redis", action = "unknown_update_variant",label = ?label, message = "Unknown update variant")
}
grpc_tasks_total_dec(&label, &stream_config.name);
}
}

}));

grpc_tasks_total_dec(&label, &stream_config.name);
current_pipe_index = (current_pipe_index + 1) % pipes.len();
}
}));

current_pipe_index = (current_pipe_index + 1) % pipes.len();
}
Some(UpdateOneof::Ping(_)) => {
let ping = subscribe_tx
.send(SubscribeRequest {
ping: Some(SubscribeRequestPing { id: PING_ID }),
..Default::default()
})
.await;

match ping {
Ok(_) => {
debug!(target: "grpc2redis", action = "send_ping", message = "Ping sent successfully", id = PING_ID)
Some(UpdateOneof::Ping(_)) => {
let ping = subscribe_tx
.send(SubscribeRequest {
ping: Some(SubscribeRequestPing { id: PING_ID }),
..Default::default()
})
.await;

match ping {
Ok(_) => {
debug!(target: "grpc2redis", action = "send_ping", message = "Ping sent successfully", id = PING_ID);
}
Err(err) => {
warn!(target: "grpc2redis", action = "send_ping_failed", message = "Failed to send ping", ?err, id = PING_ID);
}
}
}
Err(err) => {
warn!(target: "grpc2redis", action = "send_ping_failed", message = "Failed to send ping", ?err, id = PING_ID)
Some(UpdateOneof::Pong(pong)) => {
if pong.id == PING_ID {
debug!(target: "grpc2redis", action = "receive_pong", message = "Pong received", id = PING_ID);
} else {
warn!(target: "grpc2redis", action = "receive_unknown_pong", message = "Unknown pong id received", id = pong.id);
}
}
_ => {
warn!(target: "grpc2redis", action = "unknown_update_variant", message = "Unknown update variant");
}
}
}
Some(UpdateOneof::Pong(pong)) => {
if pong.id == PING_ID {
debug!(target: "grpc2redis", action = "receive_pong", message = "Pong received", id = PING_ID);
} else {
warn!(target: "grpc2redis", action = "receive_unknown_pong", message = "Unknown pong id received", id = pong.id);
}
}
_ => {
warn!(target: "grpc2redis", action = "unknown_update_variant", message = "Unknown update variant", ?msg.update_oneof)
break;
}
}

let mut global_shutdown_tx = global_shutdown_tx.lock().await;
if let Some(global_shutdown_tx) = global_shutdown_tx.take() {
let _ = global_shutdown_tx.send(());
}
}
_ = &mut shutdown_rx => {
debug!(target: "grpc2redis", action = "shutdown_signal_received", message = "Shutdown signal received, stopping subscription task", ?label);
Expand Down