Skip to content

Commit d60d65c

Browse files
committed
chore(test): don't use spinloop when testing
GitHub Actions doesn't allocate enough resource to do spinloop Signed-off-by: Wataru Ishida <wataru.ishid@gmail.com>
1 parent 439a98d commit d60d65c

File tree

4 files changed

+82
-17
lines changed

4 files changed

+82
-17
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ FROM optcast AS unittest
4848

4949
ENV RUST_LOG=info
5050
ENV NCCL_SOCKET_IFNAME=lo
51-
RUN cd reduction_server && cargo test --all -- --nocapture --test-threads=1
51+
RUN cd reduction_server && RUSTFLAGS="--cfg no_spinloop" cargo test --all -- --nocapture --test-threads=1
5252

5353
FROM nvcr.io/nvidia/cuda:12.3.1-devel-ubuntu22.04 AS final
5454

reduction_server/src/client.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* See LICENSE for license information
55
*/
66

7+
use std::hint;
78
use std::io::{Read, Write};
89
use std::net::{TcpListener, TcpStream};
910
use std::sync::Arc;
@@ -60,6 +61,12 @@ fn do_client<T: Float>(args: &Args, comms: Vec<(Comm, Comm)>) {
6061
let start = std::time::Instant::now();
6162

6263
loop {
64+
if cfg!(no_spinloop) {
65+
std::thread::sleep(NO_SPINLOOP_INTERVAL);
66+
} else {
67+
hint::spin_loop();
68+
}
69+
6370
for (i, req, sbuf, rbuf, mhs) in reqs.iter_mut() {
6471
if req.is_none() && reqed < args.try_count {
6572
*req = Some(
@@ -72,6 +79,12 @@ fn do_client<T: Float>(args: &Args, comms: Vec<(Comm, Comm)>) {
7279
let mut rrequest: Option<Request> = None;
7380

7481
loop {
82+
if cfg!(no_spinloop) {
83+
std::thread::sleep(NO_SPINLOOP_INTERVAL);
84+
} else {
85+
hint::spin_loop();
86+
}
87+
7588
if srequest.is_none() {
7689
srequest = nccl_net::isend(
7790
scomm,

reduction_server/src/server.rs

Lines changed: 64 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::sync::atomic::AtomicUsize;
1212
use std::sync::Arc;
1313

1414
use half::{bf16, f16};
15-
use log::{info, trace, warn, error};
15+
use log::{error, info, trace, warn};
1616

1717
use crate::reduce::{Reduce, WorkingMemory};
1818
use crate::utils::*;
@@ -103,7 +103,11 @@ fn reduce_loop<T: Float>(
103103
trace!("rank({})/job({}) reduce wait recv", i, job_idx);
104104

105105
loop {
106-
hint::spin_loop();
106+
if cfg!(no_spinloop) {
107+
std::thread::sleep(NO_SPINLOOP_INTERVAL);
108+
} else {
109+
hint::spin_loop();
110+
}
107111
let send_ready = send_ready.load(std::sync::atomic::Ordering::Relaxed);
108112
let send_expect = (1 << args.send_threads) - 1;
109113
let recv_ready = recv_ready.load(std::sync::atomic::Ordering::Relaxed);
@@ -213,7 +217,11 @@ fn send_loop<T: Float>(
213217
for (idx, (readys, send)) in sends.iter().enumerate().cycle() {
214218
for ready in readys.iter() {
215219
loop {
216-
hint::spin_loop();
220+
if cfg!(no_spinloop) {
221+
std::thread::sleep(NO_SPINLOOP_INTERVAL);
222+
} else {
223+
hint::spin_loop();
224+
}
217225
let ready = ready.load(std::sync::atomic::Ordering::Relaxed);
218226
// trace!(
219227
// "[send] rank({})/job({}) send ready: 0b{:016b}",
@@ -235,7 +243,11 @@ fn send_loop<T: Float>(
235243

236244
let mut reqs = vec_of_none(send.len());
237245
loop {
238-
hint::spin_loop();
246+
if cfg!(no_spinloop) {
247+
std::thread::sleep(NO_SPINLOOP_INTERVAL);
248+
} else {
249+
hint::spin_loop();
250+
}
239251
if rank.load(std::sync::atomic::Ordering::Relaxed) != nrank {
240252
warn!("rank != nrank");
241253
warn!("send thread({}) exit.", i);
@@ -260,7 +272,12 @@ fn send_loop<T: Float>(
260272
let start = std::time::Instant::now();
261273

262274
loop {
263-
hint::spin_loop();
275+
if cfg!(no_spinloop) {
276+
std::thread::sleep(NO_SPINLOOP_INTERVAL);
277+
} else {
278+
hint::spin_loop();
279+
}
280+
264281
if rank.load(std::sync::atomic::Ordering::Relaxed) != nrank {
265282
warn!("rank != nrank");
266283
warn!("send thread({}) exit.", i);
@@ -360,7 +377,11 @@ fn recv_loop<T: Float>(
360377
for (job_idx, (readys, recv)) in recvs.iter_mut().enumerate() {
361378
for ready in readys.iter() {
362379
loop {
363-
hint::spin_loop();
380+
if cfg!(no_spinloop) {
381+
std::thread::sleep(NO_SPINLOOP_INTERVAL);
382+
} else {
383+
hint::spin_loop();
384+
}
364385
let ready = ready.load(std::sync::atomic::Ordering::Relaxed);
365386
// trace!(
366387
// "[recv] rank({})/job({}) recv ready: 0b{:016b}",
@@ -382,7 +403,11 @@ fn recv_loop<T: Float>(
382403

383404
let mut reqs = vec_of_none(recv.len());
384405
loop {
385-
hint::spin_loop();
406+
if cfg!(no_spinloop) {
407+
std::thread::sleep(NO_SPINLOOP_INTERVAL);
408+
} else {
409+
hint::spin_loop();
410+
}
386411
if rank.load(std::sync::atomic::Ordering::Relaxed) != nrank {
387412
warn!("rank != nrank");
388413
warn!("recv thread({}) exit.", i);
@@ -408,7 +433,12 @@ fn recv_loop<T: Float>(
408433
let start = std::time::Instant::now();
409434

410435
loop {
411-
hint::spin_loop();
436+
if cfg!(no_spinloop) {
437+
std::thread::sleep(NO_SPINLOOP_INTERVAL);
438+
} else {
439+
hint::spin_loop();
440+
}
441+
412442
if rank.load(std::sync::atomic::Ordering::Relaxed) != nrank {
413443
warn!("rank != nrank");
414444
warn!("recv thread({}) exit.", i);
@@ -522,7 +552,12 @@ fn upstream_loop<T: Float>(
522552
for (idx, (send_ready, reduce_readys, buf)) in jobs.iter_mut().enumerate() {
523553
for reduce_ready in reduce_readys.iter() {
524554
loop {
525-
hint::spin_loop();
555+
if cfg!(no_spinloop) {
556+
std::thread::sleep(NO_SPINLOOP_INTERVAL);
557+
} else {
558+
hint::spin_loop();
559+
}
560+
526561
let reduce_ready = reduce_ready.load(std::sync::atomic::Ordering::Relaxed);
527562
if reduce_ready == 0 {
528563
break;
@@ -536,7 +571,11 @@ fn upstream_loop<T: Float>(
536571
}
537572

538573
loop {
539-
hint::spin_loop();
574+
if cfg!(no_spinloop) {
575+
std::thread::sleep(NO_SPINLOOP_INTERVAL);
576+
} else {
577+
hint::spin_loop();
578+
}
540579
let send_ready = send_ready.load(std::sync::atomic::Ordering::Relaxed);
541580
let send_expect = (1 << args.send_threads) - 1;
542581
if send_ready == send_expect {
@@ -554,15 +593,20 @@ fn upstream_loop<T: Float>(
554593
let mut rrequest: Option<Request> = None;
555594

556595
loop {
596+
if cfg!(no_spinloop) {
597+
std::thread::sleep(NO_SPINLOOP_INTERVAL);
598+
} else {
599+
hint::spin_loop();
600+
}
601+
557602
if srequest.is_none() {
558603
srequest = nccl_net::isend(&scomm, send_mh, buf.lock().as_ref(), tag).unwrap();
559604
if srequest.is_some() {
560605
trace!("upstream send : idx: {} start", idx);
561606
}
562607
}
563608
if rrequest.is_none() {
564-
rrequest =
565-
nccl_net::irecv(&rcomm, recv_mh, buf.lock().as_mut(), tag).unwrap();
609+
rrequest = nccl_net::irecv(&rcomm, recv_mh, buf.lock().as_mut(), tag).unwrap();
566610
if srequest.is_some() {
567611
trace!("upstream recv : idx: {} start", idx);
568612
}
@@ -573,6 +617,12 @@ fn upstream_loop<T: Float>(
573617
}
574618

575619
loop {
620+
if cfg!(no_spinloop) {
621+
std::thread::sleep(NO_SPINLOOP_INTERVAL);
622+
} else {
623+
hint::spin_loop();
624+
}
625+
576626
if srequest.is_some() {
577627
match nccl_net::test(&srequest.as_ref().unwrap()) {
578628
Ok((send_done, _)) => {
@@ -583,7 +633,7 @@ fn upstream_loop<T: Float>(
583633
}
584634
Err(e) => {
585635
error!("upstream send : idx: {} error: {:?}", idx, e);
586-
return
636+
return;
587637
}
588638
}
589639
}
@@ -597,7 +647,7 @@ fn upstream_loop<T: Float>(
597647
}
598648
Err(e) => {
599649
error!("upstream recv : idx: {} error: {:?}", idx, e);
600-
return
650+
return;
601651
}
602652
}
603653
}

reduction_server/src/utils.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ use std::fmt::Debug;
88
use std::time::Duration;
99

1010
use clap::{Parser, ValueEnum};
11-
use half::{f16, bf16};
11+
use half::{bf16, f16};
1212
use log::info;
1313
use num_traits::FromPrimitive;
1414

15+
pub(crate) const NO_SPINLOOP_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10);
16+
1517
pub(crate) fn transpose<T>(v: Vec<Vec<T>>) -> Vec<Vec<T>> {
1618
assert!(!v.is_empty());
1719
let len = v[0].len();
@@ -140,8 +142,8 @@ pub(crate) fn vec_of_none<T>(n: usize) -> Vec<Option<T>> {
140142

141143
#[cfg(test)]
142144
pub mod tests {
143-
use std::sync::Once;
144145
use crate::nccl_net;
146+
use std::sync::Once;
145147

146148
static INIT: Once = Once::new();
147149

0 commit comments

Comments
 (0)