Skip to content

Commit acb2971

Browse files
committed
chore add logging to query queue test
1 parent 613a23a commit acb2971

File tree

1 file changed

+79
-55
lines changed

1 file changed

+79
-55
lines changed

src/query/service/tests/it/sessions/queue_mgr.rs

Lines changed: 79 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::BTreeMap;
1516
use std::collections::HashMap;
1617
use std::sync::Arc;
18+
use std::sync::Once;
1719
use std::time::Duration;
1820
use std::time::Instant;
1921
use std::time::SystemTime;
@@ -34,6 +36,8 @@ use databend_common_grpc::RpcClientConf;
3436
use databend_common_meta_store::MetaStore;
3537
use databend_common_meta_store::MetaStoreProvider;
3638
use databend_common_sql::Planner;
39+
use databend_common_tracing::init_logging;
40+
use databend_common_tracing::Config;
3741
use databend_common_version::BUILD_INFO;
3842
use databend_query::interpreters::InterpreterFactory;
3943
use databend_query::sessions::QueryEntry;
@@ -220,72 +224,92 @@ async fn test_serial_acquire() -> Result<()> {
220224
}
221225

222226
#[tokio::test(flavor = "multi_thread")]
223-
async fn test_concurrent_acquire() -> Result<()> {
224-
for is_global in [true, false] {
225-
let metastore = create_meta_store().await?;
226-
let test_count = (SystemTime::now()
227-
.duration_since(UNIX_EPOCH)
228-
.unwrap()
229-
.as_nanos()
230-
% 5) as usize
231-
+ 5;
227+
async fn test_concurrent_acquire_local() -> Result<()> {
228+
do_test_concurrent_acquire(false).await
229+
}
232230

233-
let ctx = format!("count={test_count}");
231+
#[tokio::test(flavor = "multi_thread")]
232+
async fn test_concurrent_acquire_global() -> Result<()> {
233+
do_test_concurrent_acquire(true).await
234+
}
234235

235-
let barrier = Arc::new(tokio::sync::Barrier::new(test_count));
236-
let queue = QueueManager::<TestData>::create(2, metastore, is_global);
237-
let mut join_handles = Vec::with_capacity(test_count);
236+
fn setup_test() {
237+
static INIT: Once = Once::new();
238+
INIT.call_once(|| {
239+
let mut config = Config::new_testing();
240+
config.file.level = "DEBUG".to_string();
238241

239-
let instant = Instant::now();
240-
for index in 0..test_count {
241-
join_handles.push({
242-
let queue = queue.clone();
243-
let barrier = barrier.clone();
244-
databend_common_base::runtime::spawn(async move {
245-
barrier.wait().await;
242+
let guards = init_logging("query_unittests", &config, BTreeMap::new());
243+
Box::leak(Box::new(guards));
244+
});
245+
}
246246

247-
// Time based semaphore is sensitive to time accuracy.
248-
// Lower timestamp semaphore being inserted after higher timestamp semaphore results in both acquired.
249-
// Thus, we have to make the gap between timestamp large enough.
250-
tokio::time::sleep(Duration::from_millis(300 * index as u64)).await;
247+
async fn do_test_concurrent_acquire(is_global: bool) -> Result<()> {
248+
setup_test();
251249

252-
let _guard = queue
253-
.acquire(TestData::new(
254-
String::from("test_concurrent_acquire"),
255-
format!("TestData{}", index),
256-
))
257-
.await?;
250+
let metastore = create_meta_store().await?;
251+
let test_count = (SystemTime::now()
252+
.duration_since(UNIX_EPOCH)
253+
.unwrap()
254+
.as_nanos()
255+
% 5) as usize
256+
+ 5;
258257

259-
tokio::time::sleep(Duration::from_secs(1)).await;
260-
Result::<()>::Ok(())
261-
})
262-
})
263-
}
258+
let ctx = format!("count={test_count}");
264259

265-
for join_handle in join_handles {
266-
let _ = join_handle.await;
267-
}
260+
let barrier = Arc::new(tokio::sync::Barrier::new(test_count));
261+
let queue = QueueManager::<TestData>::create(2, metastore, is_global);
262+
let mut join_handles = Vec::with_capacity(test_count);
268263

269-
let elapsed = instant.elapsed();
270-
let total = Duration::from_secs((test_count) as u64);
271-
assert!(
272-
elapsed >= total / 2,
273-
"{ctx}: expect: elapsed: {:?} >= {:?}, ",
274-
elapsed,
275-
total / 2,
276-
);
277-
let delta = Duration::from_millis(300) * test_count as u32;
278-
assert!(
279-
elapsed < total + delta,
280-
"{ctx}: expect: elapsed: {:?} < {:?} + {:?}, ",
281-
elapsed,
282-
total,
283-
delta,
284-
);
264+
let instant = Instant::now();
265+
for index in 0..test_count {
266+
join_handles.push({
267+
let queue = queue.clone();
268+
let barrier = barrier.clone();
269+
databend_common_base::runtime::spawn(async move {
270+
barrier.wait().await;
271+
272+
// Time based semaphore is sensitive to time accuracy.
273+
// Lower timestamp semaphore being inserted after higher timestamp semaphore results in both acquired.
274+
// Thus, we have to make the gap between timestamp large enough.
275+
tokio::time::sleep(Duration::from_millis(300 * index as u64)).await;
276+
277+
let _guard = queue
278+
.acquire(TestData::new(
279+
String::from("test_concurrent_acquire"),
280+
format!("TestData{}", index),
281+
))
282+
.await?;
283+
284+
tokio::time::sleep(Duration::from_secs(1)).await;
285+
Result::<()>::Ok(())
286+
})
287+
})
288+
}
285289

286-
assert_eq!(queue.length(), 0);
290+
for join_handle in join_handles {
291+
let _ = join_handle.await;
287292
}
288293

294+
let elapsed = instant.elapsed();
295+
let total = Duration::from_secs((test_count) as u64);
296+
assert!(
297+
elapsed >= total / 2,
298+
"{ctx}: expect: elapsed: {:?} >= {:?}, ",
299+
elapsed,
300+
total / 2,
301+
);
302+
let delta = Duration::from_millis(300) * test_count as u32;
303+
assert!(
304+
elapsed < total + delta,
305+
"{ctx}: expect: elapsed: {:?} < {:?} + {:?}, ",
306+
elapsed,
307+
total,
308+
delta,
309+
);
310+
311+
assert_eq!(queue.length(), 0);
312+
289313
Ok(())
290314
}
291315

0 commit comments

Comments
 (0)