Skip to content

Commit

Permalink
Merge #1788
Browse files Browse the repository at this point in the history
1788: fix(self-shutdown): abort frozen ios when unsharing shutdown nexus r=tiagolobocastro a=tiagolobocastro

These frozen IOs prevent the nexus from shutting down. We don't have any hooks today to do this whilsts the target is stopping so we add a simple loop which tries a number of times. Ideally we should get some sort of callback to trigger this.

Co-authored-by: Tiago Castro <tiagolobocastro@gmail.com>
  • Loading branch information
mayastor-bors and tiagolobocastro committed Dec 19, 2024
2 parents 88d1cc3 + e77a824 commit 02e79b6
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 17 deletions.
12 changes: 12 additions & 0 deletions io-engine/src/bdev/nexus/nexus_bdev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,18 @@ impl<'n> Nexus<'n> {
Ok(())
}

/// Aborts all frozen IOs of a shutdown Nexus.
/// # Warning
/// These aborts may translate into I/O errors for the initiator.
pub async fn abort_shutdown_frozen_ios(&self) {
if self.status() == NexusStatus::Shutdown {
self.traverse_io_channels_async((), |channel, _| {
channel.abort_frozen();
})
.await;
}
}

/// Suspend any incoming IO to the bdev pausing the controller allows us to
/// handle internal events and which is a protocol feature.
/// In case concurrent pause requests take place, the other callers
Expand Down
3 changes: 2 additions & 1 deletion io-engine/src/bdev/nexus/nexus_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl<'n> Debug for NexusChannel<'n> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{io} chan '{nex}' core:{core}({cur}) [R:{r} W:{w} D:{d} L:{l} C:{c}]",
"{io} chan '{nex}' core:{core}({cur}) [R:{r} W:{w} D:{d} L:{l} C:{c} F:{f}]",
io = if self.is_io_chan { "I/O" } else { "Aux" },
nex = self.nexus.nexus_name(),
core = self.core,
Expand All @@ -42,6 +42,7 @@ impl<'n> Debug for NexusChannel<'n> {
d = self.detached.len(),
l = self.io_logs.len(),
c = self.nexus.child_count(),
f = self.frozen_ios.len()
)
}
}
Expand Down
39 changes: 33 additions & 6 deletions io-engine/src/bdev/nexus/nexus_share.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use crate::bdev::PtplFileOps;
use super::{nexus_err, nexus_lookup, Error, NbdDisk, Nexus, NexusTarget};
use crate::{
bdev::PtplFileOps,
core::{NvmfShareProps, Protocol, PtplProps, Reactors, Share, UpdateProps},
sleep::mayastor_sleep,
};
use async_trait::async_trait;
use futures::{channel::oneshot, future::FusedFuture};
use snafu::ResultExt;
use std::pin::Pin;

use super::{nexus_err, Error, NbdDisk, Nexus, NexusTarget};

use crate::core::{NvmfShareProps, Protocol, PtplProps, Share, UpdateProps};
use std::{pin::Pin, time::Duration};

///
/// The sharing of the nexus is different compared to regular bdevs
Expand Down Expand Up @@ -78,6 +80,31 @@ impl<'n> Share for Nexus<'n> {
async fn unshare(mut self: Pin<&mut Self>) -> Result<(), Self::Error> {
info!("{:?}: unsharing nexus bdev...", self);

// TODO: we should not allow new initiator connections for a shutdown
// nexus!

// TODO: we may want to disable the freeze at this point instead,
// allowing new I/Os to fail "normally"
let name = self.name.clone();
let (_s, r) = oneshot::channel::<()>();
Reactors::master().send_future(async move {
for _ in 0 ..= 10 {
mayastor_sleep(Duration::from_secs(2)).await.ok();
if r.is_terminated() {
// This means the unshare is complete, so nothing to do here
return;
}
// If we're not unshared, then abort any I/Os that may have
// reached
if let Some(nexus) = nexus_lookup(&name) {
nexus.abort_shutdown_frozen_ios().await;
}
}
});

// Aborts frozen I/Os a priori
self.abort_shutdown_frozen_ios().await;

let name = self.name.clone();
self.as_mut().pin_bdev_mut().unshare().await.context(
nexus_err::UnshareNexus {
Expand Down
82 changes: 74 additions & 8 deletions io-engine/tests/persistence.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::common::fio_run_verify;
use crate::common::{dd_random_file, fio_run_verify};
use common::compose::{
rpc::v0::{
mayastor::{
Expand Down Expand Up @@ -347,11 +347,80 @@ async fn persistent_store_connection() {
assert!(get_nexus(ms1, nexus_uuid).await.is_some());
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn self_shutdown_destroy() {
let test = start_infrastructure_("self_shutdown_destroy", Some("1")).await;
let grpc = GrpcConnect::new(&test);
let ms1 = &mut grpc.grpc_handle("ms1").await.unwrap();
let ms2 = &mut grpc.grpc_handle("ms2").await.unwrap();
let ms3 = &mut grpc.grpc_handle("ms3").await.unwrap();

// Create bdevs and share over nvmf.
let child1 = create_and_share_bdevs(ms2, CHILD1_UUID).await;
let child2 = create_and_share_bdevs(ms3, CHILD2_UUID).await;

// Create and publish a nexus.
let nexus_uuid = "8272e9d3-3738-4e33-b8c3-769d8eed5771";
create_nexus(ms1, nexus_uuid, vec![child1.clone(), child2.clone()]).await;
let nexus_uri = publish_nexus(ms1, nexus_uuid).await;

// Create and connect NVMF target.
let target = libnvme_rs::NvmeTarget::try_from(nexus_uri.clone())
.unwrap()
.with_reconnect_delay(Some(1))
.ctrl_loss_timeout(Some(1))
.with_rand_hostnqn(true);
target.connect().unwrap();

// simulate node with child and etcd going down
test.stop("etcd").await.unwrap();
test.stop("ms3").await.unwrap();

// allow pstor to timeout and self shutdown
// todo: use wait loop
tokio::time::sleep(Duration::from_secs(2)).await;

let nexus = get_nexus(ms1, nexus_uuid).await.unwrap();
assert_eq!(nexus.state, NexusState::NexusShutdown as i32);

let devices = target.block_devices(2).unwrap();
let fio_hdl = tokio::spawn(async move {
dd_random_file(&devices[0].to_string(), 4096, 1)
});

test.start("etcd").await.unwrap();

ms1.mayastor
.destroy_nexus(DestroyNexusRequest {
uuid: nexus_uuid.to_string(),
})
.await
.expect("Failed to destroy nexus");

// Disconnect NVMF target
target.disconnect().unwrap();

fio_hdl.await.unwrap();
}

/// Start the containers for the tests.
async fn start_infrastructure(test_name: &str) -> ComposeTest {
start_infrastructure_(test_name, None).await
}

/// Start the containers for the tests.
async fn start_infrastructure_(
test_name: &str,
ps_retries: Option<&str>,
) -> ComposeTest {
common::composer_init();

let etcd_endpoint = format!("http://etcd.{test_name}:2379");
let mut args = vec!["-p", &etcd_endpoint];
if let Some(retries) = ps_retries {
args.extend(["--ps-retries", retries]);
}

Builder::new()
.name(test_name)
.add_container_spec(
Expand All @@ -371,20 +440,17 @@ async fn start_infrastructure(test_name: &str) -> ComposeTest {
)
.add_container_bin(
"ms1",
Binary::from_dbg("io-engine").with_args(vec!["-p", &etcd_endpoint]),
Binary::from_dbg("io-engine").with_args(args.clone()),
)
.add_container_bin(
"ms2",
Binary::from_dbg("io-engine").with_args(vec!["-p", &etcd_endpoint]),
Binary::from_dbg("io-engine").with_args(args.clone()),
)
.add_container_bin(
"ms3",
Binary::from_dbg("io-engine").with_args(vec!["-p", &etcd_endpoint]),
)
.add_container_bin(
"ms4",
Binary::from_dbg("io-engine").with_args(vec!["-p", &etcd_endpoint]),
Binary::from_dbg("io-engine").with_args(args.clone()),
)
.add_container_bin("ms4", Binary::from_dbg("io-engine").with_args(args))
.build()
.await
.unwrap()
Expand Down
23 changes: 21 additions & 2 deletions libnvme-rs/src/nvme_uri.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ pub struct NvmeTarget {
trtype: NvmeTransportType,
/// Auto-Generate random HostNqn.
hostnqn_autogen: bool,
/// The Reconnect Delay.
reconnect_delay: Option<u8>,
/// The Controller Loss Timeout.
ctrl_loss_timeout: Option<u32>,
}

impl TryFrom<String> for NvmeTarget {
Expand Down Expand Up @@ -117,6 +121,8 @@ impl TryFrom<&str> for NvmeTarget {
trsvcid: url.port().unwrap_or(4420),
subsysnqn: subnqn,
hostnqn_autogen: false,
reconnect_delay: None,
ctrl_loss_timeout: None,
})
}
}
Expand All @@ -128,6 +134,16 @@ impl NvmeTarget {
self.hostnqn_autogen = random;
self
}
/// With the reconnect delay.
pub fn with_reconnect_delay(mut self, delay: Option<u8>) -> Self {
self.reconnect_delay = delay;
self
}
/// With the ctrl loss timeout.
pub fn ctrl_loss_timeout(mut self, timeout: Option<u32>) -> Self {
self.ctrl_loss_timeout = timeout;
self
}
/// Connect to NVMe target
/// Returns Ok on successful connect
pub fn connect(&self) -> Result<(), NvmeError> {
Expand Down Expand Up @@ -184,8 +200,11 @@ impl NvmeTarget {
host_iface,
queue_size: 0,
nr_io_queues: 0,
reconnect_delay: 0,
ctrl_loss_tmo: crate::NVMF_DEF_CTRL_LOSS_TMO as i32,
reconnect_delay: self.reconnect_delay.unwrap_or(0) as i32,
ctrl_loss_tmo: self
.ctrl_loss_timeout
.unwrap_or(crate::NVMF_DEF_CTRL_LOSS_TMO)
as i32,
fast_io_fail_tmo: 0,
keep_alive_tmo: 0,
nr_write_queues: 0,
Expand Down

0 comments on commit 02e79b6

Please sign in to comment.