Skip to content

Commit

Permalink
Merge #1775
Browse files Browse the repository at this point in the history
1775: Backport fixes for release/2.7 r=tiagolobocastro a=tiagolobocastro

    test(bdd): make nvme controller usage more robust
    
    Caters for when the device is /dev/nvmeX but X not the same as the controller!
    
    Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>

---

    Merge #1755
    
    1755: Reuse Rebuild IO handles r=tiagolobocastro a=tiagolobocastro
    
        fix(rebuild): reuse rebuild IO handles
    
        Reuses the rebuild IO handles, rather than attempting to allocate
        them per rebuild task.
        The main issue with handle allocation on the fly is that the target
        may have not cleaned up a previous IO qpair connection, and so the
        connect may fail. We started seeing this more on CI because we forgot
        to cherry-pick a commit increasing the retry delay.
        However, after inspecting a bunch of user support bundles I see that
        we still have occasional connect errors. Rather than increasing the
        timeout, we attempt here to reuse the handles, thus avoid the
        problem almost entirely.
    
        Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>
    
    ---
    
        refactor(rebuild): rebuild completion is not an error
    
        When the rebuild has been complete, if we wait for it this fails because
        the channels are not longer available.
        Instead, simply return the rebuild state, since this is what we want anyway.
    
        Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>
    
    Co-authored-by: Tiago Castro <tiagolobocastro@gmail.com>

---

    fix: check valid sct and sc combinations for pi error
    
    Signed-off-by: Diwakar Sharma <diwakar.sharma@datacore.com>

---

    fix: use auto-detected sector size for blockdev
    
    This fixes the behaviour where we pass 512 as sector size if the
    disk uri doesn't contain blk_size parameter. This causes pool creation
    failure if the underlying disk has a different sector size e.g. 4096.
    Instead of passing 512, we now pass 0 which lets spdk detect the
    device's sector size and use that value.
    
    Signed-off-by: Diwakar Sharma <diwakar.sharma@datacore.com>


Co-authored-by: Diwakar Sharma <diwakar.sharma@datacore.com>
Co-authored-by: mayastor-bors <mayastor-bors@noreply.github.com>
Co-authored-by: Tiago Castro <tiagolobocastro@gmail.com>
  • Loading branch information
3 people committed Nov 27, 2024
2 parents 40572c9 + ece9dd0 commit 0d8ac9c
Show file tree
Hide file tree
Showing 18 changed files with 321 additions and 79 deletions.
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[submodule "spdk-rs"]
path = spdk-rs
url = https://github.com/openebs/spdk-rs
url = ../spdk-rs.git
branch = release/2.7
[submodule "utils/dependencies"]
path = utils/dependencies
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions io-engine-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,28 @@ pub fn truncate_file_bytes(path: &str, size: u64) {
assert!(output.status.success());
}

/// Automatically assign a loopdev to path
pub fn setup_loopdev_file(path: &str, sector_size: Option<u64>) -> String {
let log_sec = sector_size.unwrap_or(512);

let output = Command::new("losetup")
.args(["-f", "--show", "-b", &format!("{log_sec}"), path])
.output()
.expect("failed exec losetup");
assert!(output.status.success());
// return the assigned loop device
String::from_utf8(output.stdout).unwrap().trim().to_string()
}

/// Detach the provided loop device.
pub fn detach_loopdev(dev: &str) {
let output = Command::new("losetup")
.args(["-d", dev])
.output()
.expect("failed exec losetup");
assert!(output.status.success());
}

pub fn fscheck(device: &str) {
let output = Command::new("fsck")
.args([device, "-n"])
Expand Down
5 changes: 3 additions & 2 deletions io-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ libc = "0.2.149"
log = "0.4.20"
md5 = "0.7.0"
merge = "0.1.0"
nix = { version = "0.27.1", default-features = false, features = [ "hostname", "net", "socket", "ioctl" ] }
nix = { version = "0.27.1", default-features = false, features = ["hostname", "net", "socket", "ioctl"] }
once_cell = "1.18.0"
parking_lot = "0.12.1"
pin-utils = "0.1.0"
Expand Down Expand Up @@ -98,9 +98,10 @@ async-process = { version = "1.8.1" }
rstack = { version = "0.3.3" }
tokio-stream = "0.1.14"
rustls = "0.21.12"
either = "1.9.0"

devinfo = { path = "../utils/dependencies/devinfo" }
jsonrpc = { path = "../jsonrpc"}
jsonrpc = { path = "../jsonrpc" }
io-engine-api = { path = "../utils/dependencies/apis/io-engine" }
spdk-rs = { path = "../spdk-rs" }
sysfs = { path = "../sysfs" }
Expand Down
16 changes: 13 additions & 3 deletions io-engine/src/bdev/aio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
convert::TryFrom,
ffi::CString,
fmt::{Debug, Formatter},
os::unix::fs::FileTypeExt,
};

use async_trait::async_trait;
Expand All @@ -29,7 +30,7 @@ pub(super) struct Aio {

impl Debug for Aio {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Aio '{}'", self.name)
write!(f, "Aio '{}', 'blk_size: {}'", self.name, self.blk_size)
}
}

Expand All @@ -47,6 +48,10 @@ impl TryFrom<&Url> for Aio {
});
}

let path_is_blockdev = std::fs::metadata(url.path())
.ok()
.map_or(false, |meta| meta.file_type().is_block_device());

let mut parameters: HashMap<String, String> =
url.query_pairs().into_owned().collect();

Expand All @@ -58,9 +63,14 @@ impl TryFrom<&Url> for Aio {
value: value.clone(),
})?
}
None => 512,
None => {
if path_is_blockdev {
0
} else {
512
}
}
};

let uuid = uri::uuid(parameters.remove("uuid")).context(
bdev_api::UuidParamParseFailed {
uri: url.to_string(),
Expand Down
9 changes: 5 additions & 4 deletions io-engine/src/bdev/nexus/nexus_bdev_children.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1152,10 +1152,11 @@ impl<'n> Nexus<'n> {
// Cancel rebuild job for this child, if any.
if let Some(job) = child.rebuild_job() {
debug!("{self:?}: retire: stopping rebuild job...");
let terminated = job.force_fail();
Reactors::master().send_future(async move {
terminated.await.ok();
});
if let either::Either::Left(terminated) = job.force_fail() {
Reactors::master().send_future(async move {
terminated.await.ok();
});
}
}

debug!("{child:?}: retire: enqueuing device '{dev}' to retire");
Expand Down
22 changes: 14 additions & 8 deletions io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,15 +247,18 @@ impl<'n> Nexus<'n> {
async fn terminate_rebuild(&self, child_uri: &str) {
// If a rebuild job is not found that's ok
// as we were just going to remove it anyway.
if let Ok(rj) = self.rebuild_job_mut(child_uri) {
let ch = rj.force_stop();
if let Err(e) = ch.await {
error!(
"Failed to wait on rebuild job for child {child_uri} \
let Ok(rj) = self.rebuild_job_mut(child_uri) else {
return;
};
let either::Either::Left(ch) = rj.force_stop() else {
return;
};
if let Err(e) = ch.await {
error!(
"Failed to wait on rebuild job for child {child_uri} \
to terminate with error {}",
e.verbose()
);
}
e.verbose()
);
}
}

Expand Down Expand Up @@ -355,6 +358,9 @@ impl<'n> Nexus<'n> {

// wait for the jobs to complete terminating
for job in terminated_jobs {
let either::Either::Left(job) = job else {
continue;
};
if let Err(e) = job.await {
error!(
"{:?}: error when waiting for the rebuild job \
Expand Down
11 changes: 6 additions & 5 deletions io-engine/src/bdev/nvmx/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ pub enum NvmeAerInfoNvmCommandSet {

/// Check if the Completion Queue Entry indicates abnormal termination of
/// request due to any of the following conditions:
/// - Any media specific errors that occur in the NVM or data integrity type
/// errors.
/// - An Status Code Type(SCT) of media specific errors that occur in the NVM
/// or data integrity type errors, AND a Status Code(SC) value pertaining to
/// one of the below:
/// - The command was aborted due to an end-to-end guard check failure.
/// - The command was aborted due to an end-to-end application tag check
/// failure.
Expand All @@ -59,9 +60,9 @@ pub(crate) fn nvme_cpl_is_pi_error(cpl: *const spdk_nvme_cpl) -> bool {
}

sct == NvmeStatusCodeType::MediaError as u16
|| sc == NvmeMediaErrorStatusCode::Guard as u16
|| sc == NvmeMediaErrorStatusCode::ApplicationTag as u16
|| sc == NvmeMediaErrorStatusCode::ReferenceTag as u16
&& (sc == NvmeMediaErrorStatusCode::Guard as u16
|| sc == NvmeMediaErrorStatusCode::ApplicationTag as u16
|| sc == NvmeMediaErrorStatusCode::ReferenceTag as u16)
}

#[inline]
Expand Down
19 changes: 17 additions & 2 deletions io-engine/src/bdev/uring.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use std::{collections::HashMap, convert::TryFrom, ffi::CString};
use std::{
collections::HashMap,
convert::TryFrom,
ffi::CString,
os::unix::fs::FileTypeExt,
};

use async_trait::async_trait;
use futures::channel::oneshot;
Expand Down Expand Up @@ -36,6 +41,10 @@ impl TryFrom<&Url> for Uring {
});
}

let path_is_blockdev = std::fs::metadata(url.path())
.ok()
.map_or(false, |meta| meta.file_type().is_block_device());

let mut parameters: HashMap<String, String> =
url.query_pairs().into_owned().collect();

Expand All @@ -47,7 +56,13 @@ impl TryFrom<&Url> for Uring {
value: value.clone(),
})?
}
None => 512,
None => {
if path_is_blockdev {
0
} else {
512
}
}
};

let uuid = uri::uuid(parameters.remove("uuid")).context(
Expand Down
5 changes: 4 additions & 1 deletion io-engine/src/grpc/v1/snapshot_rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ impl SnapshotRebuildRpc for SnapshotRebuildService {
let Ok(job) = SnapshotRebuildJob::lookup(&args.uuid) else {
return Err(tonic::Status::not_found(""));
};
let rx = job.force_stop().await.ok();
let rx = match job.force_stop() {
either::Either::Left(chan) => chan.await,
either::Either::Right(stopped) => Ok(stopped),
};
info!("Snapshot Rebuild stopped: {rx:?}");
job.destroy();
Ok(())
Expand Down
5 changes: 4 additions & 1 deletion io-engine/src/rebuild/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ impl WithinRange<u64> for std::ops::Range<u64> {
/// Shutdown all pending snapshot rebuilds.
pub(crate) async fn shutdown_snapshot_rebuilds() {
let jobs = SnapshotRebuildJob::list().into_iter();
for recv in jobs.map(|job| job.force_stop()).collect::<Vec<_>>() {
for recv in jobs
.flat_map(|job| job.force_stop().left())
.collect::<Vec<_>>()
{
recv.await.ok();
}
}
Expand Down
29 changes: 12 additions & 17 deletions io-engine/src/rebuild/rebuild_descriptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ pub(super) struct RebuildDescriptor {
/// Pre-opened descriptor for the source block device.
#[allow(clippy::non_send_fields_in_send_ty)]
pub(super) src_descriptor: Box<dyn BlockDeviceDescriptor>,
pub(super) src_handle: Box<dyn BlockDeviceHandle>,
/// Pre-opened descriptor for destination block device.
#[allow(clippy::non_send_fields_in_send_ty)]
pub(super) dst_descriptor: Box<dyn BlockDeviceDescriptor>,
pub(super) dst_handle: Box<dyn BlockDeviceHandle>,
/// Start time of this rebuild.
pub(super) start_time: DateTime<Utc>,
}
Expand Down Expand Up @@ -90,9 +92,8 @@ impl RebuildDescriptor {
});
}

let source_hdl = RebuildDescriptor::io_handle(&*src_descriptor).await?;
let destination_hdl =
RebuildDescriptor::io_handle(&*dst_descriptor).await?;
let src_handle = RebuildDescriptor::io_handle(&*src_descriptor).await?;
let dst_handle = RebuildDescriptor::io_handle(&*dst_descriptor).await?;

let range = match range {
None => {
Expand All @@ -105,8 +106,8 @@ impl RebuildDescriptor {
};

if !Self::validate(
source_hdl.get_device(),
destination_hdl.get_device(),
src_handle.get_device(),
dst_handle.get_device(),
&range,
) {
return Err(RebuildError::InvalidSrcDstRange {});
Expand All @@ -123,7 +124,9 @@ impl RebuildDescriptor {
block_size,
segment_size_blks,
src_descriptor,
src_handle,
dst_descriptor,
dst_handle,
start_time: Utc::now(),
})
}
Expand Down Expand Up @@ -173,18 +176,14 @@ impl RebuildDescriptor {

/// Get a `BlockDeviceHandle` for the source.
#[inline(always)]
pub(super) async fn src_io_handle(
&self,
) -> Result<Box<dyn BlockDeviceHandle>, RebuildError> {
Self::io_handle(&*self.src_descriptor).await
pub(super) fn src_io_handle(&self) -> &dyn BlockDeviceHandle {
self.src_handle.as_ref()
}

/// Get a `BlockDeviceHandle` for the destination.
#[inline(always)]
pub(super) async fn dst_io_handle(
&self,
) -> Result<Box<dyn BlockDeviceHandle>, RebuildError> {
Self::io_handle(&*self.dst_descriptor).await
pub(super) fn dst_io_handle(&self) -> &dyn BlockDeviceHandle {
self.dst_handle.as_ref()
}

/// Get a `BlockDeviceHandle` for the given block device descriptor.
Expand Down Expand Up @@ -231,7 +230,6 @@ impl RebuildDescriptor {
) -> Result<bool, RebuildError> {
match self
.src_io_handle()
.await?
.readv_blocks_async(
iovs,
offset_blk,
Expand Down Expand Up @@ -269,7 +267,6 @@ impl RebuildDescriptor {
iovs: &[IoVec],
) -> Result<(), RebuildError> {
self.dst_io_handle()
.await?
.writev_blocks_async(
iovs,
offset_blk,
Expand All @@ -291,7 +288,6 @@ impl RebuildDescriptor {
) -> Result<(), RebuildError> {
// Read the source again.
self.src_io_handle()
.await?
.readv_blocks_async(
iovs,
offset_blk,
Expand All @@ -306,7 +302,6 @@ impl RebuildDescriptor {

match self
.dst_io_handle()
.await?
.comparev_blocks_async(
iovs,
offset_blk,
Expand Down
17 changes: 12 additions & 5 deletions io-engine/src/rebuild/rebuild_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,17 @@ impl RebuildJob {

/// Forcefully stops the job, overriding any pending client operation
/// returns an async channel which can be used to await for termination.
pub(crate) fn force_stop(&self) -> oneshot::Receiver<RebuildState> {
pub(crate) fn force_stop(
&self,
) -> either::Either<oneshot::Receiver<RebuildState>, RebuildState> {
self.force_terminate(RebuildOperation::Stop)
}

/// Forcefully fails the job, overriding any pending client operation
/// returns an async channel which can be used to await for termination.
pub(crate) fn force_fail(&self) -> oneshot::Receiver<RebuildState> {
pub(crate) fn force_fail(
&self,
) -> either::Either<oneshot::Receiver<RebuildState>, RebuildState> {
self.force_terminate(RebuildOperation::Fail)
}

Expand All @@ -179,10 +183,13 @@ impl RebuildJob {
fn force_terminate(
&self,
op: RebuildOperation,
) -> oneshot::Receiver<RebuildState> {
) -> either::Either<oneshot::Receiver<RebuildState>, RebuildState> {
self.exec_internal_op(op).ok();
self.add_completion_listener()
.unwrap_or_else(|_| oneshot::channel().1)

match self.add_completion_listener() {
Ok(chan) => either::Either::Left(chan),
Err(_) => either::Either::Right(self.state()),
}
}

/// Get the rebuild stats.
Expand Down
Loading

0 comments on commit 0d8ac9c

Please sign in to comment.