Skip to content

Commit

Permalink
fix(self-shutdown): abort frozen ios when unsharing shutdown nexus
Browse files Browse the repository at this point in the history
These frozen IOs prevent the nexus from shutting down.
We don't have any hooks today to do this whilsts the target is stopping
so we add a simple loop which tries a number of times.
Ideally we should get some sort of callback to trigger this.

Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>
  • Loading branch information
tiagolobocastro committed Dec 18, 2024
1 parent 88d1cc3 commit e77a824
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 17 deletions.
12 changes: 12 additions & 0 deletions io-engine/src/bdev/nexus/nexus_bdev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,18 @@ impl<'n> Nexus<'n> {
Ok(())
}

/// Aborts all frozen IOs of a shutdown Nexus.
/// # Warning
/// These aborts may translate into I/O errors for the initiator.
pub async fn abort_shutdown_frozen_ios(&self) {
if self.status() == NexusStatus::Shutdown {
self.traverse_io_channels_async((), |channel, _| {
channel.abort_frozen();
})
.await;
}
}

/// Suspend any incoming IO to the bdev pausing the controller allows us to
/// handle internal events and which is a protocol feature.
/// In case concurrent pause requests take place, the other callers
Expand Down
3 changes: 2 additions & 1 deletion io-engine/src/bdev/nexus/nexus_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl<'n> Debug for NexusChannel<'n> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{io} chan '{nex}' core:{core}({cur}) [R:{r} W:{w} D:{d} L:{l} C:{c}]",
"{io} chan '{nex}' core:{core}({cur}) [R:{r} W:{w} D:{d} L:{l} C:{c} F:{f}]",
io = if self.is_io_chan { "I/O" } else { "Aux" },
nex = self.nexus.nexus_name(),
core = self.core,
Expand All @@ -42,6 +42,7 @@ impl<'n> Debug for NexusChannel<'n> {
d = self.detached.len(),
l = self.io_logs.len(),
c = self.nexus.child_count(),
f = self.frozen_ios.len()
)
}
}
Expand Down
39 changes: 33 additions & 6 deletions io-engine/src/bdev/nexus/nexus_share.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use crate::bdev::PtplFileOps;
use super::{nexus_err, nexus_lookup, Error, NbdDisk, Nexus, NexusTarget};
use crate::{
bdev::PtplFileOps,
core::{NvmfShareProps, Protocol, PtplProps, Reactors, Share, UpdateProps},
sleep::mayastor_sleep,
};
use async_trait::async_trait;
use futures::{channel::oneshot, future::FusedFuture};
use snafu::ResultExt;
use std::pin::Pin;

use super::{nexus_err, Error, NbdDisk, Nexus, NexusTarget};

use crate::core::{NvmfShareProps, Protocol, PtplProps, Share, UpdateProps};
use std::{pin::Pin, time::Duration};

///
/// The sharing of the nexus is different compared to regular bdevs
Expand Down Expand Up @@ -78,6 +80,31 @@ impl<'n> Share for Nexus<'n> {
async fn unshare(mut self: Pin<&mut Self>) -> Result<(), Self::Error> {
info!("{:?}: unsharing nexus bdev...", self);

// TODO: we should not allow new initiator connections for a shutdown
// nexus!

// TODO: we may want to disable the freeze at this point instead,
// allowing new I/Os to fail "normally"
let name = self.name.clone();
let (_s, r) = oneshot::channel::<()>();
Reactors::master().send_future(async move {
for _ in 0 ..= 10 {
mayastor_sleep(Duration::from_secs(2)).await.ok();
if r.is_terminated() {
// This means the unshare is complete, so nothing to do here
return;
}
// If we're not unshared, then abort any I/Os that may have
// reached
if let Some(nexus) = nexus_lookup(&name) {
nexus.abort_shutdown_frozen_ios().await;
}
}
});

// Aborts frozen I/Os a priori
self.abort_shutdown_frozen_ios().await;

let name = self.name.clone();
self.as_mut().pin_bdev_mut().unshare().await.context(
nexus_err::UnshareNexus {
Expand Down
82 changes: 74 additions & 8 deletions io-engine/tests/persistence.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::common::fio_run_verify;
use crate::common::{dd_random_file, fio_run_verify};
use common::compose::{
rpc::v0::{
mayastor::{
Expand Down Expand Up @@ -347,11 +347,80 @@ async fn persistent_store_connection() {
assert!(get_nexus(ms1, nexus_uuid).await.is_some());
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn self_shutdown_destroy() {
let test = start_infrastructure_("self_shutdown_destroy", Some("1")).await;
let grpc = GrpcConnect::new(&test);
let ms1 = &mut grpc.grpc_handle("ms1").await.unwrap();
let ms2 = &mut grpc.grpc_handle("ms2").await.unwrap();
let ms3 = &mut grpc.grpc_handle("ms3").await.unwrap();

// Create bdevs and share over nvmf.
let child1 = create_and_share_bdevs(ms2, CHILD1_UUID).await;
let child2 = create_and_share_bdevs(ms3, CHILD2_UUID).await;

// Create and publish a nexus.
let nexus_uuid = "8272e9d3-3738-4e33-b8c3-769d8eed5771";
create_nexus(ms1, nexus_uuid, vec![child1.clone(), child2.clone()]).await;
let nexus_uri = publish_nexus(ms1, nexus_uuid).await;

// Create and connect NVMF target.
let target = libnvme_rs::NvmeTarget::try_from(nexus_uri.clone())
.unwrap()
.with_reconnect_delay(Some(1))
.ctrl_loss_timeout(Some(1))
.with_rand_hostnqn(true);
target.connect().unwrap();

// simulate node with child and etcd going down
test.stop("etcd").await.unwrap();
test.stop("ms3").await.unwrap();

// allow pstor to timeout and self shutdown
// todo: use wait loop
tokio::time::sleep(Duration::from_secs(2)).await;

let nexus = get_nexus(ms1, nexus_uuid).await.unwrap();
assert_eq!(nexus.state, NexusState::NexusShutdown as i32);

let devices = target.block_devices(2).unwrap();
let fio_hdl = tokio::spawn(async move {
dd_random_file(&devices[0].to_string(), 4096, 1)
});

test.start("etcd").await.unwrap();

ms1.mayastor
.destroy_nexus(DestroyNexusRequest {
uuid: nexus_uuid.to_string(),
})
.await
.expect("Failed to destroy nexus");

// Disconnect NVMF target
target.disconnect().unwrap();

fio_hdl.await.unwrap();
}

/// Start the containers for the tests.
async fn start_infrastructure(test_name: &str) -> ComposeTest {
start_infrastructure_(test_name, None).await
}

/// Start the containers for the tests.
async fn start_infrastructure_(
test_name: &str,
ps_retries: Option<&str>,
) -> ComposeTest {
common::composer_init();

let etcd_endpoint = format!("http://etcd.{test_name}:2379");
let mut args = vec!["-p", &etcd_endpoint];
if let Some(retries) = ps_retries {
args.extend(["--ps-retries", retries]);
}

Builder::new()
.name(test_name)
.add_container_spec(
Expand All @@ -371,20 +440,17 @@ async fn start_infrastructure(test_name: &str) -> ComposeTest {
)
.add_container_bin(
"ms1",
Binary::from_dbg("io-engine").with_args(vec!["-p", &etcd_endpoint]),
Binary::from_dbg("io-engine").with_args(args.clone()),
)
.add_container_bin(
"ms2",
Binary::from_dbg("io-engine").with_args(vec!["-p", &etcd_endpoint]),
Binary::from_dbg("io-engine").with_args(args.clone()),
)
.add_container_bin(
"ms3",
Binary::from_dbg("io-engine").with_args(vec!["-p", &etcd_endpoint]),
)
.add_container_bin(
"ms4",
Binary::from_dbg("io-engine").with_args(vec!["-p", &etcd_endpoint]),
Binary::from_dbg("io-engine").with_args(args.clone()),
)
.add_container_bin("ms4", Binary::from_dbg("io-engine").with_args(args))
.build()
.await
.unwrap()
Expand Down
23 changes: 21 additions & 2 deletions libnvme-rs/src/nvme_uri.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ pub struct NvmeTarget {
trtype: NvmeTransportType,
/// Auto-Generate random HostNqn.
hostnqn_autogen: bool,
/// The Reconnect Delay.
reconnect_delay: Option<u8>,
/// The Controller Loss Timeout.
ctrl_loss_timeout: Option<u32>,
}

impl TryFrom<String> for NvmeTarget {
Expand Down Expand Up @@ -117,6 +121,8 @@ impl TryFrom<&str> for NvmeTarget {
trsvcid: url.port().unwrap_or(4420),
subsysnqn: subnqn,
hostnqn_autogen: false,
reconnect_delay: None,
ctrl_loss_timeout: None,
})
}
}
Expand All @@ -128,6 +134,16 @@ impl NvmeTarget {
self.hostnqn_autogen = random;
self
}
/// With the reconnect delay.
pub fn with_reconnect_delay(mut self, delay: Option<u8>) -> Self {
self.reconnect_delay = delay;
self
}
/// With the ctrl loss timeout.
pub fn ctrl_loss_timeout(mut self, timeout: Option<u32>) -> Self {
self.ctrl_loss_timeout = timeout;
self
}
/// Connect to NVMe target
/// Returns Ok on successful connect
pub fn connect(&self) -> Result<(), NvmeError> {
Expand Down Expand Up @@ -184,8 +200,11 @@ impl NvmeTarget {
host_iface,
queue_size: 0,
nr_io_queues: 0,
reconnect_delay: 0,
ctrl_loss_tmo: crate::NVMF_DEF_CTRL_LOSS_TMO as i32,
reconnect_delay: self.reconnect_delay.unwrap_or(0) as i32,
ctrl_loss_tmo: self
.ctrl_loss_timeout
.unwrap_or(crate::NVMF_DEF_CTRL_LOSS_TMO)
as i32,
fast_io_fail_tmo: 0,
keep_alive_tmo: 0,
nr_write_queues: 0,
Expand Down

0 comments on commit e77a824

Please sign in to comment.