From 68e10a6eced5a6a0c443fe2c7231dd3e8daedb3f Mon Sep 17 00:00:00 2001 From: Arne Beer Date: Sat, 1 Feb 2025 22:50:32 +0100 Subject: [PATCH 1/5] refactor: Remove dead code --- pueue/src/daemon/state_helper.rs | 2 +- pueue_lib/src/state.rs | 7 ------- pueue_lib/src/task.rs | 30 ------------------------------ 3 files changed, 1 insertion(+), 38 deletions(-) diff --git a/pueue/src/daemon/state_helper.rs b/pueue/src/daemon/state_helper.rs index e37e132a1..29c5fc7c0 100644 --- a/pueue/src/daemon/state_helper.rs +++ b/pueue/src/daemon/state_helper.rs @@ -177,7 +177,7 @@ pub fn restore_state(pueue_directory: &Path) -> Result> { let group = match state.groups.get_mut(&task.group) { Some(group) => group, None => { - task.set_default_group(); + task.group = PUEUE_DEFAULT_GROUP.into(); state .groups .entry(PUEUE_DEFAULT_GROUP.into()) diff --git a/pueue_lib/src/state.rs b/pueue_lib/src/state.rs index 4fcb052fb..4a3e29373 100644 --- a/pueue_lib/src/state.rs +++ b/pueue_lib/src/state.rs @@ -157,13 +157,6 @@ impl State { self.groups.remove(group); - // Reset all tasks with removed group to the default. - for (_, task) in self.tasks.iter_mut() { - if task.group.eq(group) { - task.set_default_group(); - } - } - Ok(()) } diff --git a/pueue_lib/src/task.rs b/pueue_lib/src/task.rs index a32154ae7..deda1b50a 100644 --- a/pueue_lib/src/task.rs +++ b/pueue_lib/src/task.rs @@ -4,8 +4,6 @@ use chrono::prelude::*; use serde::{Deserialize, Serialize}; use strum::Display; -use crate::state::PUEUE_DEFAULT_GROUP; - /// This enum represents the status of the internal task handling of Pueue. /// They basically represent the internal task life-cycle. #[derive(PartialEq, Eq, Clone, Debug, Display, Serialize, Deserialize)] @@ -98,25 +96,6 @@ impl Task { } } - /// A convenience function used to duplicate a task. - pub fn from_task(task: &Task) -> Task { - Task { - id: 0, - created_at: Local::now(), - original_command: task.original_command.clone(), - command: task.command.clone(), - path: task.path.clone(), - envs: task.envs.clone(), - group: task.group.clone(), - dependencies: Vec::new(), - priority: 0, - label: task.label.clone(), - status: TaskStatus::Queued { - enqueued_at: Local::now(), - }, - } - } - pub fn start_and_end(&self) -> (Option>, Option>) { match self.status { TaskStatus::Running { start, .. } => (Some(start), None), @@ -170,15 +149,6 @@ impl Task { } ) } - - /// Small convenience function to set the task's group to the default group. - pub fn set_default_group(&mut self) { - self.group = String::from(PUEUE_DEFAULT_GROUP); - } - - pub fn is_in_default_group(&self) -> bool { - self.group.eq(PUEUE_DEFAULT_GROUP) - } } /// We use a custom `Debug` implementation for [Task], as the `envs` field just has too much From 812f44633677d74852cda23ba99b59636cbcc4b5 Mon Sep 17 00:00:00 2001 From: Arne Beer Date: Sat, 1 Feb 2025 22:52:30 +0100 Subject: [PATCH 2/5] test: Generate completions to stdout --- pueue/tests/client/integration/completions.rs | 33 +++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/pueue/tests/client/integration/completions.rs b/pueue/tests/client/integration/completions.rs index 8735b00af..583eabf6c 100644 --- a/pueue/tests/client/integration/completions.rs +++ b/pueue/tests/client/integration/completions.rs @@ -4,7 +4,8 @@ use anyhow::{Context, Result}; use assert_cmd::prelude::*; use rstest::rstest; -/// Make sure that the daemon's environment variables don't bleed into the spawned subprocesses. +/// Make sure completion for all shells work as expected. +/// This test tests writing to file. #[rstest] #[case("zsh")] #[case("elvish")] @@ -13,7 +14,7 @@ use rstest::rstest; #[case("power-shell")] #[case("nushell")] #[test] -fn autocompletion_generation(#[case] shell: &'static str) -> Result<()> { +fn autocompletion_generation_to_file(#[case] shell: &'static str) -> Result<()> { let output = Command::cargo_bin("pueue")? .arg("completions") .arg(shell) @@ -31,3 +32,31 @@ fn autocompletion_generation(#[case] shell: &'static str) -> Result<()> { Ok(()) } + +/// Make sure completion for all shells work as expected. +/// This test tests writing to stdout. +#[rstest] +#[case("zsh")] +#[case("elvish")] +#[case("bash")] +#[case("fish")] +#[case("power-shell")] +#[case("nushell")] +#[test] +fn autocompletion_generation_to_stdout(#[case] shell: &'static str) -> Result<()> { + let output = Command::cargo_bin("pueue")? + .arg("completions") + .arg(shell) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .current_dir(env!("CARGO_TARGET_TMPDIR")) + .output() + .context(format!("Failed to run completion generation for {shell}:"))?; + + assert!( + output.status.success(), + "Completion for {shell} didn't finish successfully." + ); + + Ok(()) +} From d5ac7585fca9e1751db083e7a977000c66674d54 Mon Sep 17 00:00:00 2001 From: Arne Beer Date: Sat, 1 Feb 2025 23:22:17 +0100 Subject: [PATCH 3/5] test: Task dependencies --- .../tests/daemon/integration/dependencies.rs | 77 +++++++++++++++++++ pueue/tests/daemon/integration/mod.rs | 1 + pueue/tests/helper/factories/task.rs | 20 ++++- pueue/tests/helper/fixtures.rs | 1 - pueue/tests/helper/lockfile.rs | 37 +++++++++ pueue/tests/helper/mod.rs | 2 + pueue/tests/helper/task.rs | 4 +- 7 files changed, 138 insertions(+), 4 deletions(-) create mode 100644 pueue/tests/daemon/integration/dependencies.rs create mode 100644 pueue/tests/helper/lockfile.rs diff --git a/pueue/tests/daemon/integration/dependencies.rs b/pueue/tests/daemon/integration/dependencies.rs new file mode 100644 index 000000000..c205da1a9 --- /dev/null +++ b/pueue/tests/daemon/integration/dependencies.rs @@ -0,0 +1,77 @@ +use anyhow::Result; + +use pueue_lib::{ + network::message::{KillMessage, TaskSelection}, + task::*, +}; + +use crate::helper::*; + +/// Test if adding a normal task works as intended. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_dependency() -> Result<()> { + let (daemon, lockfile) = daemon_with_lockfile().await?; + let shared = &daemon.settings.shared; + + // Add a task that waits until the lockfile is removed. It's added to a non-default group. + add_group_with_slots(shared, "testgroup_3", 3).await?; + assert_success(add_task_to_group(shared, lockfile_command(&lockfile), "testgroup_3").await?); + + // This task now has to wait for task 0, even though it's added to the default group and could + // start right away. + assert_success(add_task_with_dependencies(shared, "ls", vec![0]).await?); + + // Wait for a bit, the second task should still be queued. + sleep_ms(500).await; + + // Clear the lock, the first task should now finish. + clear_lock(&lockfile)?; + wait_for_task_condition(shared, 0, Task::is_done).await?; + + // The second one should start and finish right away. + wait_for_task_condition(shared, 1, Task::is_done).await?; + + Ok(()) +} + +/// Test if adding a normal task works as intended. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_failing_dependency() -> Result<()> { + let (daemon, lockfile) = daemon_with_lockfile().await?; + let shared = &daemon.settings.shared; + + // Add a task that waits until the lockfile is removed. It's added to a non-default group. + add_group_with_slots(shared, "testgroup_3", 3).await?; + assert_success(add_task_to_group(shared, lockfile_command(&lockfile), "testgroup_3").await?); + + // This task now has to wait for task 0, even though it's added to the default group and could + // start right away. + assert_success(add_task_with_dependencies(shared, "ls", vec![0]).await?); + // Wait for a bit, the second task should still be queued. + sleep_ms(500).await; + + // Now we kill the first task. + // This should result in the second task failing. + send_message( + shared, + KillMessage { + tasks: TaskSelection::TaskIds(vec![0]), + signal: None, + }, + ) + .await?; + wait_for_task_condition(shared, 0, Task::failed).await?; + + // Now wait until the first task finishes and make sure it failed because of a failing + // dependency. + let task = wait_for_task_condition(shared, 1, Task::failed).await?; + assert!(matches!( + task.status, + TaskStatus::Done { + result: TaskResult::DependencyFailed, + .. + } + )); + + Ok(()) +} diff --git a/pueue/tests/daemon/integration/mod.rs b/pueue/tests/daemon/integration/mod.rs index 509d6b5da..860870d00 100644 --- a/pueue/tests/daemon/integration/mod.rs +++ b/pueue/tests/daemon/integration/mod.rs @@ -2,6 +2,7 @@ mod add; mod aliases; mod callback; mod clean; +mod dependencies; mod edit; mod environment_variables; mod group; diff --git a/pueue/tests/helper/factories/task.rs b/pueue/tests/helper/factories/task.rs index 3d1047caa..f448b706e 100644 --- a/pueue/tests/helper/factories/task.rs +++ b/pueue/tests/helper/factories/task.rs @@ -36,8 +36,26 @@ pub async fn add_task_with_priority( .context("Failed to to add task.") } +/// Adds a task with a dependency to the test daemon. +pub async fn add_task_with_dependencies( + shared: &Shared, + command: &str, + dependencies: Vec, +) -> Result { + let mut message = create_add_message(shared, command); + message.dependencies = dependencies; + + send_message(shared, message) + .await + .context("Failed to to add task.") +} + /// Adds a task to a specific group of the test daemon. -pub async fn add_task_to_group(shared: &Shared, command: &str, group: &str) -> Result { +pub async fn add_task_to_group( + shared: &Shared, + command: C, + group: G, +) -> Result { let mut message = create_add_message(shared, command); message.group = group.to_string(); diff --git a/pueue/tests/helper/fixtures.rs b/pueue/tests/helper/fixtures.rs index e90057733..2e471d0cf 100644 --- a/pueue/tests/helper/fixtures.rs +++ b/pueue/tests/helper/fixtures.rs @@ -42,7 +42,6 @@ impl Drop for PueueDaemon { /// This is done in 90% of our tests, thereby this convenience helper. pub async fn daemon() -> Result { let (settings, tempdir) = daemon_base_setup()?; - daemon_with_settings(settings, tempdir).await } diff --git a/pueue/tests/helper/lockfile.rs b/pueue/tests/helper/lockfile.rs new file mode 100644 index 000000000..fce0a7f12 --- /dev/null +++ b/pueue/tests/helper/lockfile.rs @@ -0,0 +1,37 @@ +use std::{ + fs::{remove_file, File}, + path::{Path, PathBuf}, +}; + +use anyhow::{Context, Result}; + +use super::{daemon_base_setup, daemon_with_settings, PueueDaemon}; + +/// A helper wrapper around [`daemon`] that also creates a lockfile that can be listened to from a +/// task via `inotifywait -e delete_self "$FILE"`. +/// This is super useful as it allows us to have a proper lock that we may release at any point in +/// time when working with timing specific issues in tests. +/// +/// E.g. task1 depends on task0 and we want to make sure that task1 isn't started before task0 +/// ends. This mechanism can be ensured that task0 only finishes when we allow it to do so. +pub async fn daemon_with_lockfile() -> Result<(PueueDaemon, PathBuf)> { + let (settings, tempdir) = daemon_base_setup()?; + let tempdir_path = tempdir.path().to_owned(); + + let daemon = daemon_with_settings(settings, tempdir).await?; + + let lockfile = tempdir_path.join("file.lock"); + File::create(&lockfile)?; + + Ok((daemon, lockfile)) +} + +pub fn lockfile_command(path: &Path) -> String { + format!("inotifywait -e delete_self \"{}\"", path.to_string_lossy()) +} + +pub fn clear_lock(path: &Path) -> Result<()> { + remove_file(path).context("Failed to clear lock file")?; + + Ok(()) +} diff --git a/pueue/tests/helper/mod.rs b/pueue/tests/helper/mod.rs index 85cfa0532..1f7277dd8 100644 --- a/pueue/tests/helper/mod.rs +++ b/pueue/tests/helper/mod.rs @@ -10,6 +10,7 @@ mod asserts; mod daemon; mod factories; mod fixtures; +mod lockfile; mod log; mod network; mod state; @@ -21,6 +22,7 @@ pub use asserts::*; pub use daemon::*; pub use factories::*; pub use fixtures::*; +pub use lockfile::*; pub use network::*; pub use state::*; pub use task::*; diff --git a/pueue/tests/helper/task.rs b/pueue/tests/helper/task.rs index 4cca395a9..8b76c1ecf 100644 --- a/pueue/tests/helper/task.rs +++ b/pueue/tests/helper/task.rs @@ -12,9 +12,9 @@ use crate::helper::*; /// Create a bare AddMessage for testing. /// This is just here to minimize boilerplate code. -pub fn create_add_message(shared: &Shared, command: &str) -> AddMessage { +pub fn create_add_message(shared: &Shared, command: C) -> AddMessage { AddMessage { - command: command.into(), + command: command.to_string(), path: shared.pueue_directory(), envs: HashMap::from_iter(vars()), start_immediately: false, From 3f9547814089bef6568175db0f91d27c96ed535b Mon Sep 17 00:00:00 2001 From: Arne Beer Date: Sat, 1 Feb 2025 23:26:35 +0100 Subject: [PATCH 4/5] refactor: Clean up some tests --- pueue/tests/daemon/integration/restart.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/pueue/tests/daemon/integration/restart.rs b/pueue/tests/daemon/integration/restart.rs index 256ed9c38..c4ee39084 100644 --- a/pueue/tests/daemon/integration/restart.rs +++ b/pueue/tests/daemon/integration/restart.rs @@ -13,12 +13,12 @@ async fn test_restart_in_place() -> Result<()> { let shared = &daemon.settings.shared; // Add a single task that instantly finishes. - assert_success(add_task(shared, "sleep 0.1").await?); + assert_success(add_and_start_task(shared, "ls").await?); // Wait for task 0 to finish. let original_task = wait_for_task_condition(shared, 0, Task::is_done).await?; - // Restart task 0 with an extended sleep command with a different path. + // Restart task 0 with an sleep command and with a different path. let restart_message = RestartMessage { tasks: vec![TaskToRestart { task_id: 0, @@ -60,13 +60,11 @@ async fn test_cannot_restart_running() -> Result<()> { let daemon = daemon().await?; let shared = &daemon.settings.shared; - // Add a single task that instantly finishes. - assert_success(add_task(shared, "sleep 60").await?); - - // Wait for task 0 to finish. - let task = wait_for_task_condition(shared, 0, Task::is_running).await?; + // Add a long running task that starts immediately. + assert_success(add_and_start_task(shared, "sleep 60").await?); + let task = get_task(shared, 1).await?; - // Restart task 0 with an extended sleep command. + // Try to restart task 0 let restart_message = RestartMessage { tasks: vec![TaskToRestart { task_id: 0, From 17672ea43e308412d4c32b0f94355fa3ec8262bb Mon Sep 17 00:00:00 2001 From: Arne Beer Date: Sat, 1 Feb 2025 23:30:42 +0100 Subject: [PATCH 5/5] refactor: Remove dead code --- pueue/src/daemon/state_helper.rs | 78 ++------------------------------ 1 file changed, 5 insertions(+), 73 deletions(-) diff --git a/pueue/src/daemon/state_helper.rs b/pueue/src/daemon/state_helper.rs index 29c5fc7c0..ca2bd25df 100644 --- a/pueue/src/daemon/state_helper.rs +++ b/pueue/src/daemon/state_helper.rs @@ -1,8 +1,6 @@ -use std::collections::BTreeMap; use std::fs; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::sync::MutexGuard; -use std::time::SystemTime; use anyhow::{Context, Result}; use chrono::prelude::*; @@ -63,52 +61,19 @@ pub fn pause_on_failure(state: &mut LockedState, settings: &Settings, group: &st } } -/// Do a full reset of the state. -/// This doesn't reset any processes! -pub fn reset_state(state: &mut LockedState, settings: &Settings) -> Result<()> { - backup_state(state, settings)?; - state.tasks = BTreeMap::new(); - state.set_status_for_all_groups(GroupStatus::Running); - - save_state(state, settings) -} - -/// Convenience wrapper around save_to_file. -pub fn save_state(state: &State, settings: &Settings) -> Result<()> { - save_state_to_file(state, settings, false) -} - -/// Save the current current state in a file with a timestamp. -/// At the same time remove old state logs from the log directory. -/// This function is called, when large changes to the state are applied, e.g. clean/reset. -pub fn backup_state(state: &LockedState, settings: &Settings) -> Result<()> { - save_state_to_file(state, settings, true)?; - rotate_state(settings).context("Failed to rotate old log files")?; - Ok(()) -} - /// Save the current state to disk. \ /// We do this to restore in case of a crash. \ /// If log == true, the file will be saved with a time stamp. /// /// In comparison to the daemon -> client communication, the state is saved /// as JSON for readability and debugging purposes. -fn save_state_to_file(state: &State, settings: &Settings, log: bool) -> Result<()> { +pub fn save_state(state: &State, settings: &Settings) -> Result<()> { let serialized = serde_json::to_string(&state).context("Failed to serialize state:"); let serialized = serialized.unwrap(); let path = settings.shared.pueue_directory(); - let (temp, real) = if log { - let path = path.join("log"); - let now: DateTime = Utc::now(); - let time = now.format("%Y-%m-%d_%H-%M-%S"); - ( - path.join(format!("{time}_state.json.partial")), - path.join(format!("{time}_state.json")), - ) - } else { - (path.join("state.json.partial"), path.join("state.json")) - }; + let temp = path.join("state.json.partial"); + let real = path.join("state.json"); // Write to temporary log file first, to prevent loss due to crashes. fs::write(&temp, serialized).context("Failed to write temp file while saving state.")?; @@ -116,11 +81,7 @@ fn save_state_to_file(state: &State, settings: &Settings, log: bool) -> Result<( // Overwrite the original with the temp file, if everything went fine. fs::rename(&temp, &real).context("Failed to overwrite old state while saving state")?; - if log { - debug!("State backup created at: {real:?}"); - } else { - debug!("State saved at: {real:?}"); - } + debug!("State saved at: {real:?}"); Ok(()) } @@ -201,32 +162,3 @@ pub fn restore_state(pueue_directory: &Path) -> Result> { Ok(Some(state)) } - -/// Remove old logs that aren't needed any longer. -fn rotate_state(settings: &Settings) -> Result<()> { - let path = settings.shared.pueue_directory().join("log"); - - // Get all log files in the directory with their respective system time. - let mut entries: BTreeMap = BTreeMap::new(); - let mut directory_list = fs::read_dir(path)?; - while let Some(Ok(entry)) = directory_list.next() { - let path = entry.path(); - - let metadata = entry.metadata()?; - let time = metadata.modified()?; - entries.insert(time, path); - } - - // Remove all files above the threshold. - // Old files are removed first (implicitly by the BTree order). - let mut number_entries = entries.len(); - let mut iter = entries.iter(); - while number_entries > 10 { - if let Some((_, path)) = iter.next() { - fs::remove_file(path)?; - number_entries -= 1; - } - } - - Ok(()) -}