Skip to content

Commit a345bac

Browse files
jokemanfiremxpv
authored andcommitted
OpenOptions in blocking mod is a time-consuming operation.
1.Prevent it from blocking a tokio thread. Change sync to async. 2.Add pipe unit test which I found error in Pipe new. Signed-off-by: jokemanfire <hu.dingyang@zte.com.cn>
1 parent 41d2ded commit a345bac

File tree

6 files changed

+332
-39
lines changed

6 files changed

+332
-39
lines changed

crates/runc/src/asynchronous/io.rs

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
use std::{fmt::Debug, io::Result, os::unix::io::AsRawFd, process::Stdio};
18+
19+
use async_trait::async_trait;
20+
use nix::unistd::{Gid, Uid};
21+
use tokio::fs::OpenOptions;
22+
23+
pub use crate::Io;
24+
use crate::{Command, Pipe, PipedIo};
25+
26+
#[derive(Debug, Clone)]
27+
pub struct IOOption {
28+
pub open_stdin: bool,
29+
pub open_stdout: bool,
30+
pub open_stderr: bool,
31+
}
32+
33+
impl Default for IOOption {
34+
fn default() -> Self {
35+
Self {
36+
open_stdin: true,
37+
open_stdout: true,
38+
open_stderr: true,
39+
}
40+
}
41+
}
42+
43+
impl PipedIo {
44+
pub fn new(uid: u32, gid: u32, opts: &IOOption) -> std::io::Result<Self> {
45+
Ok(Self {
46+
stdin: if opts.open_stdin {
47+
Self::create_pipe(uid, gid, true)?
48+
} else {
49+
None
50+
},
51+
stdout: if opts.open_stdout {
52+
Self::create_pipe(uid, gid, true)?
53+
} else {
54+
None
55+
},
56+
stderr: if opts.open_stderr {
57+
Self::create_pipe(uid, gid, true)?
58+
} else {
59+
None
60+
},
61+
})
62+
}
63+
64+
fn create_pipe(uid: u32, gid: u32, stdin: bool) -> std::io::Result<Option<Pipe>> {
65+
let pipe = Pipe::new()?;
66+
let uid = Some(Uid::from_raw(uid));
67+
let gid = Some(Gid::from_raw(gid));
68+
if stdin {
69+
let rd = pipe.rd.try_clone()?;
70+
nix::unistd::fchown(rd.as_raw_fd(), uid, gid)?;
71+
} else {
72+
let wr = pipe.wr.try_clone()?;
73+
nix::unistd::fchown(wr.as_raw_fd(), uid, gid)?;
74+
}
75+
Ok(Some(pipe))
76+
}
77+
}
78+
79+
/// IO driver to direct output/error messages to /dev/null.
80+
///
81+
/// With this Io driver, all methods of [crate::Runc] can't capture the output/error messages.
82+
#[derive(Debug)]
83+
pub struct NullIo {
84+
dev_null: std::sync::Mutex<Option<std::fs::File>>,
85+
}
86+
87+
impl NullIo {
88+
pub fn new() -> std::io::Result<Self> {
89+
let f = std::fs::OpenOptions::new().read(true).open("/dev/null")?;
90+
let dev_null = std::sync::Mutex::new(Some(f));
91+
Ok(Self { dev_null })
92+
}
93+
}
94+
95+
#[async_trait]
96+
impl Io for NullIo {
97+
async fn set(&self, cmd: &mut Command) -> std::io::Result<()> {
98+
if let Some(null) = self.dev_null.lock().unwrap().as_ref() {
99+
cmd.stdout(null.try_clone()?);
100+
cmd.stderr(null.try_clone()?);
101+
}
102+
Ok(())
103+
}
104+
105+
async fn close_after_start(&self) {
106+
let mut m = self.dev_null.lock().unwrap();
107+
let _ = m.take();
108+
}
109+
}
110+
111+
/// Io driver based on Stdio::inherited(), to direct outputs/errors to stdio.
112+
///
113+
/// With this Io driver, all methods of [crate::Runc] can't capture the output/error messages.
114+
#[derive(Debug)]
115+
pub struct InheritedStdIo {}
116+
117+
impl InheritedStdIo {
118+
pub fn new() -> std::io::Result<Self> {
119+
Ok(InheritedStdIo {})
120+
}
121+
}
122+
123+
#[async_trait]
124+
impl Io for InheritedStdIo {
125+
async fn set(&self, cmd: &mut Command) -> std::io::Result<()> {
126+
cmd.stdin(Stdio::null())
127+
.stdout(Stdio::inherit())
128+
.stderr(Stdio::inherit());
129+
Ok(())
130+
}
131+
132+
async fn close_after_start(&self) {}
133+
}
134+
135+
/// Io driver based on Stdio::piped(), to capture outputs/errors from runC.
136+
///
137+
/// With this Io driver, methods of [crate::Runc] may capture the output/error messages.
138+
#[derive(Debug)]
139+
pub struct PipedStdIo {}
140+
141+
impl PipedStdIo {
142+
pub fn new() -> std::io::Result<Self> {
143+
Ok(PipedStdIo {})
144+
}
145+
}
146+
#[async_trait]
147+
impl Io for PipedStdIo {
148+
async fn set(&self, cmd: &mut Command) -> std::io::Result<()> {
149+
cmd.stdin(Stdio::null())
150+
.stdout(Stdio::piped())
151+
.stderr(Stdio::piped());
152+
Ok(())
153+
}
154+
155+
async fn close_after_start(&self) {}
156+
}
157+
158+
/// FIFO for the scenario that set FIFO for command Io.
159+
#[derive(Debug)]
160+
pub struct FIFO {
161+
pub stdin: Option<String>,
162+
pub stdout: Option<String>,
163+
pub stderr: Option<String>,
164+
}
165+
#[async_trait]
166+
impl Io for FIFO {
167+
async fn set(&self, cmd: &mut Command) -> Result<()> {
168+
if let Some(path) = self.stdin.as_ref() {
169+
let stdin = OpenOptions::new()
170+
.read(true)
171+
.custom_flags(libc::O_NONBLOCK)
172+
.open(path)
173+
.await?;
174+
cmd.stdin(stdin.into_std().await);
175+
}
176+
177+
if let Some(path) = self.stdout.as_ref() {
178+
let stdout = OpenOptions::new().write(true).open(path).await?;
179+
cmd.stdout(stdout.into_std().await);
180+
}
181+
182+
if let Some(path) = self.stderr.as_ref() {
183+
let stderr = OpenOptions::new().write(true).open(path).await?;
184+
cmd.stderr(stderr.into_std().await);
185+
}
186+
187+
Ok(())
188+
}
189+
190+
async fn close_after_start(&self) {}
191+
}
192+
193+
#[cfg(test)]
194+
mod tests {
195+
use super::*;
196+
197+
#[cfg(not(target_os = "macos"))]
198+
#[test]
199+
fn test_io_option() {
200+
let opts = IOOption {
201+
open_stdin: false,
202+
open_stdout: false,
203+
open_stderr: false,
204+
};
205+
let io = PipedIo::new(1000, 1000, &opts).unwrap();
206+
207+
assert!(io.stdin().is_none());
208+
assert!(io.stdout().is_none());
209+
assert!(io.stderr().is_none());
210+
}
211+
212+
#[tokio::test]
213+
async fn test_null_io() {
214+
let io = NullIo::new().unwrap();
215+
assert!(io.stdin().is_none());
216+
assert!(io.stdout().is_none());
217+
assert!(io.stderr().is_none());
218+
}
219+
}

crates/runc/src/asynchronous/mod.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,41 +13,36 @@
1313
See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
16-
16+
pub mod io;
1717
mod pipe;
1818
use std::{fmt::Debug, io::Result, os::fd::AsRawFd};
1919

20+
use async_trait::async_trait;
2021
use log::debug;
2122
pub use pipe::Pipe;
2223
use tokio::io::{AsyncRead, AsyncWrite};
2324

2425
use crate::Command;
25-
26+
#[async_trait]
2627
pub trait Io: Debug + Send + Sync {
27-
/// Return write side of stdin
28-
#[cfg(feature = "async")]
2928
fn stdin(&self) -> Option<Box<dyn AsyncWrite + Send + Sync + Unpin>> {
3029
None
3130
}
3231

33-
/// Return read side of stdout
34-
#[cfg(feature = "async")]
3532
fn stdout(&self) -> Option<Box<dyn AsyncRead + Send + Sync + Unpin>> {
3633
None
3734
}
3835

39-
/// Return read side of stderr
40-
#[cfg(feature = "async")]
4136
fn stderr(&self) -> Option<Box<dyn AsyncRead + Send + Sync + Unpin>> {
4237
None
4338
}
4439

4540
/// Set IO for passed command.
4641
/// Read side of stdin, write side of stdout and write side of stderr should be provided to command.
47-
fn set(&self, cmd: &mut Command) -> Result<()>;
42+
async fn set(&self, cmd: &mut Command) -> Result<()>;
4843

4944
/// Only close write side (should be stdout/err "from" runc process)
50-
fn close_after_start(&self);
45+
async fn close_after_start(&self);
5146
}
5247

5348
#[derive(Debug)]
@@ -56,7 +51,7 @@ pub struct PipedIo {
5651
pub stdout: Option<Pipe>,
5752
pub stderr: Option<Pipe>,
5853
}
59-
54+
#[async_trait]
6055
impl Io for PipedIo {
6156
fn stdin(&self) -> Option<Box<dyn AsyncWrite + Send + Sync + Unpin>> {
6257
self.stdin.as_ref().and_then(|pipe| {
@@ -87,7 +82,7 @@ impl Io for PipedIo {
8782

8883
// Note that this internally use [`std::fs::File`]'s `try_clone()`.
8984
// Thus, the files passed to commands will be not closed after command exit.
90-
fn set(&self, cmd: &mut Command) -> std::io::Result<()> {
85+
async fn set(&self, cmd: &mut Command) -> std::io::Result<()> {
9186
if let Some(p) = self.stdin.as_ref() {
9287
let pr = p.rd.try_clone()?;
9388
cmd.stdin(pr);
@@ -106,7 +101,7 @@ impl Io for PipedIo {
106101
Ok(())
107102
}
108103

109-
fn close_after_start(&self) {
104+
async fn close_after_start(&self) {
110105
if let Some(p) = self.stdout.as_ref() {
111106
nix::unistd::close(p.wr.as_raw_fd()).unwrap_or_else(|e| debug!("close stdout: {}", e));
112107
}

crates/runc/src/asynchronous/pipe.rs

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,80 @@ pub struct Pipe {
3131
impl Pipe {
3232
pub fn new() -> std::io::Result<Self> {
3333
let (tx, rx) = pipe::pipe()?;
34-
let rd = tx.into_blocking_fd()?;
35-
let wr = rx.into_blocking_fd()?;
34+
let rd = rx.into_blocking_fd()?;
35+
let wr = tx.into_blocking_fd()?;
3636
Ok(Self { rd, wr })
3737
}
3838
}
39+
40+
#[cfg(test)]
41+
mod tests {
42+
use std::os::fd::IntoRawFd;
43+
44+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
45+
46+
use super::*;
47+
48+
#[tokio::test]
49+
async fn test_pipe_creation() {
50+
let pipe = Pipe::new().expect("Failed to create pipe");
51+
assert!(
52+
pipe.rd.into_raw_fd() >= 0,
53+
"Read file descriptor is invalid"
54+
);
55+
assert!(
56+
pipe.wr.into_raw_fd() >= 0,
57+
"Write file descriptor is invalid"
58+
);
59+
}
60+
61+
#[tokio::test]
62+
async fn test_pipe_write_read() {
63+
let pipe = Pipe::new().expect("Failed to create pipe");
64+
let mut read_end = pipe::Receiver::from_owned_fd(pipe.rd).unwrap();
65+
let mut write_end = pipe::Sender::from_owned_fd(pipe.wr).unwrap();
66+
let write_data = b"hello";
67+
68+
write_end
69+
.write_all(write_data)
70+
.await
71+
.expect("Failed to write to pipe");
72+
73+
let mut read_data = vec![0; write_data.len()];
74+
read_end
75+
.read_exact(&mut read_data)
76+
.await
77+
.expect("Failed to read from pipe");
78+
79+
assert_eq!(
80+
read_data, write_data,
81+
"Data read from pipe does not match data written"
82+
);
83+
}
84+
85+
#[tokio::test]
86+
async fn test_pipe_async_write_read() {
87+
let pipe = Pipe::new().expect("Failed to create pipe");
88+
let mut read_end = pipe::Receiver::from_owned_fd(pipe.rd).unwrap();
89+
let mut write_end = pipe::Sender::from_owned_fd(pipe.wr).unwrap();
90+
91+
let write_data = b"hello";
92+
tokio::spawn(async move {
93+
write_end
94+
.write_all(write_data)
95+
.await
96+
.expect("Failed to write to pipe");
97+
});
98+
99+
let mut read_data = vec![0; write_data.len()];
100+
read_end
101+
.read_exact(&mut read_data)
102+
.await
103+
.expect("Failed to read from pipe");
104+
105+
assert_eq!(
106+
&read_data, write_data,
107+
"Data read from pipe does not match data written"
108+
);
109+
}
110+
}

0 commit comments

Comments
 (0)