Skip to content

Commit

Permalink
Merge pull request #599 from Nukesor/cleanup
Browse files Browse the repository at this point in the history
Cleanup
  • Loading branch information
Nukesor authored Feb 1, 2025
2 parents fbca708 + 17672ea commit 096b796
Show file tree
Hide file tree
Showing 12 changed files with 181 additions and 125 deletions.
80 changes: 6 additions & 74 deletions pueue/src/daemon/state_helper.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::collections::BTreeMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::path::Path;
use std::sync::MutexGuard;
use std::time::SystemTime;

use anyhow::{Context, Result};
use chrono::prelude::*;
Expand Down Expand Up @@ -63,64 +61,27 @@ pub fn pause_on_failure(state: &mut LockedState, settings: &Settings, group: &st
}
}

/// Do a full reset of the state.
/// This doesn't reset any processes!
pub fn reset_state(state: &mut LockedState, settings: &Settings) -> Result<()> {
backup_state(state, settings)?;
state.tasks = BTreeMap::new();
state.set_status_for_all_groups(GroupStatus::Running);

save_state(state, settings)
}

/// Convenience wrapper around save_to_file.
pub fn save_state(state: &State, settings: &Settings) -> Result<()> {
save_state_to_file(state, settings, false)
}

/// Save the current current state in a file with a timestamp.
/// At the same time remove old state logs from the log directory.
/// This function is called, when large changes to the state are applied, e.g. clean/reset.
pub fn backup_state(state: &LockedState, settings: &Settings) -> Result<()> {
save_state_to_file(state, settings, true)?;
rotate_state(settings).context("Failed to rotate old log files")?;
Ok(())
}

/// Save the current state to disk. \
/// We do this to restore in case of a crash. \
/// If log == true, the file will be saved with a time stamp.
///
/// In comparison to the daemon -> client communication, the state is saved
/// as JSON for readability and debugging purposes.
fn save_state_to_file(state: &State, settings: &Settings, log: bool) -> Result<()> {
pub fn save_state(state: &State, settings: &Settings) -> Result<()> {
let serialized = serde_json::to_string(&state).context("Failed to serialize state:");

let serialized = serialized.unwrap();
let path = settings.shared.pueue_directory();
let (temp, real) = if log {
let path = path.join("log");
let now: DateTime<Utc> = Utc::now();
let time = now.format("%Y-%m-%d_%H-%M-%S");
(
path.join(format!("{time}_state.json.partial")),
path.join(format!("{time}_state.json")),
)
} else {
(path.join("state.json.partial"), path.join("state.json"))
};
let temp = path.join("state.json.partial");
let real = path.join("state.json");

// Write to temporary log file first, to prevent loss due to crashes.
fs::write(&temp, serialized).context("Failed to write temp file while saving state.")?;

// Overwrite the original with the temp file, if everything went fine.
fs::rename(&temp, &real).context("Failed to overwrite old state while saving state")?;

if log {
debug!("State backup created at: {real:?}");
} else {
debug!("State saved at: {real:?}");
}
debug!("State saved at: {real:?}");

Ok(())
}
Expand Down Expand Up @@ -177,7 +138,7 @@ pub fn restore_state(pueue_directory: &Path) -> Result<Option<State>> {
let group = match state.groups.get_mut(&task.group) {
Some(group) => group,
None => {
task.set_default_group();
task.group = PUEUE_DEFAULT_GROUP.into();
state
.groups
.entry(PUEUE_DEFAULT_GROUP.into())
Expand All @@ -201,32 +162,3 @@ pub fn restore_state(pueue_directory: &Path) -> Result<Option<State>> {

Ok(Some(state))
}

/// Remove old logs that aren't needed any longer.
fn rotate_state(settings: &Settings) -> Result<()> {
let path = settings.shared.pueue_directory().join("log");

// Get all log files in the directory with their respective system time.
let mut entries: BTreeMap<SystemTime, PathBuf> = BTreeMap::new();
let mut directory_list = fs::read_dir(path)?;
while let Some(Ok(entry)) = directory_list.next() {
let path = entry.path();

let metadata = entry.metadata()?;
let time = metadata.modified()?;
entries.insert(time, path);
}

// Remove all files above the threshold.
// Old files are removed first (implicitly by the BTree order).
let mut number_entries = entries.len();
let mut iter = entries.iter();
while number_entries > 10 {
if let Some((_, path)) = iter.next() {
fs::remove_file(path)?;
number_entries -= 1;
}
}

Ok(())
}
33 changes: 31 additions & 2 deletions pueue/tests/client/integration/completions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use anyhow::{Context, Result};
use assert_cmd::prelude::*;
use rstest::rstest;

/// Make sure that the daemon's environment variables don't bleed into the spawned subprocesses.
/// Make sure completion for all shells work as expected.
/// This test tests writing to file.
#[rstest]
#[case("zsh")]
#[case("elvish")]
Expand All @@ -13,7 +14,7 @@ use rstest::rstest;
#[case("power-shell")]
#[case("nushell")]
#[test]
fn autocompletion_generation(#[case] shell: &'static str) -> Result<()> {
fn autocompletion_generation_to_file(#[case] shell: &'static str) -> Result<()> {
let output = Command::cargo_bin("pueue")?
.arg("completions")
.arg(shell)
Expand All @@ -31,3 +32,31 @@ fn autocompletion_generation(#[case] shell: &'static str) -> Result<()> {

Ok(())
}

/// Make sure completion for all shells work as expected.
/// This test tests writing to stdout.
#[rstest]
#[case("zsh")]
#[case("elvish")]
#[case("bash")]
#[case("fish")]
#[case("power-shell")]
#[case("nushell")]
#[test]
fn autocompletion_generation_to_stdout(#[case] shell: &'static str) -> Result<()> {
let output = Command::cargo_bin("pueue")?
.arg("completions")
.arg(shell)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(env!("CARGO_TARGET_TMPDIR"))
.output()
.context(format!("Failed to run completion generation for {shell}:"))?;

assert!(
output.status.success(),
"Completion for {shell} didn't finish successfully."
);

Ok(())
}
77 changes: 77 additions & 0 deletions pueue/tests/daemon/integration/dependencies.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use anyhow::Result;

use pueue_lib::{
network::message::{KillMessage, TaskSelection},
task::*,
};

use crate::helper::*;

/// Test if adding a normal task works as intended.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_dependency() -> Result<()> {
let (daemon, lockfile) = daemon_with_lockfile().await?;
let shared = &daemon.settings.shared;

// Add a task that waits until the lockfile is removed. It's added to a non-default group.
add_group_with_slots(shared, "testgroup_3", 3).await?;
assert_success(add_task_to_group(shared, lockfile_command(&lockfile), "testgroup_3").await?);

// This task now has to wait for task 0, even though it's added to the default group and could
// start right away.
assert_success(add_task_with_dependencies(shared, "ls", vec![0]).await?);

// Wait for a bit, the second task should still be queued.
sleep_ms(500).await;

// Clear the lock, the first task should now finish.
clear_lock(&lockfile)?;
wait_for_task_condition(shared, 0, Task::is_done).await?;

// The second one should start and finish right away.
wait_for_task_condition(shared, 1, Task::is_done).await?;

Ok(())
}

/// Test if adding a normal task works as intended.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_failing_dependency() -> Result<()> {
let (daemon, lockfile) = daemon_with_lockfile().await?;
let shared = &daemon.settings.shared;

// Add a task that waits until the lockfile is removed. It's added to a non-default group.
add_group_with_slots(shared, "testgroup_3", 3).await?;
assert_success(add_task_to_group(shared, lockfile_command(&lockfile), "testgroup_3").await?);

// This task now has to wait for task 0, even though it's added to the default group and could
// start right away.
assert_success(add_task_with_dependencies(shared, "ls", vec![0]).await?);
// Wait for a bit, the second task should still be queued.
sleep_ms(500).await;

// Now we kill the first task.
// This should result in the second task failing.
send_message(
shared,
KillMessage {
tasks: TaskSelection::TaskIds(vec![0]),
signal: None,
},
)
.await?;
wait_for_task_condition(shared, 0, Task::failed).await?;

// Now wait until the first task finishes and make sure it failed because of a failing
// dependency.
let task = wait_for_task_condition(shared, 1, Task::failed).await?;
assert!(matches!(
task.status,
TaskStatus::Done {
result: TaskResult::DependencyFailed,
..
}
));

Ok(())
}
1 change: 1 addition & 0 deletions pueue/tests/daemon/integration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod add;
mod aliases;
mod callback;
mod clean;
mod dependencies;
mod edit;
mod environment_variables;
mod group;
Expand Down
14 changes: 6 additions & 8 deletions pueue/tests/daemon/integration/restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ async fn test_restart_in_place() -> Result<()> {
let shared = &daemon.settings.shared;

// Add a single task that instantly finishes.
assert_success(add_task(shared, "sleep 0.1").await?);
assert_success(add_and_start_task(shared, "ls").await?);

// Wait for task 0 to finish.
let original_task = wait_for_task_condition(shared, 0, Task::is_done).await?;

// Restart task 0 with an extended sleep command with a different path.
// Restart task 0 with an sleep command and with a different path.
let restart_message = RestartMessage {
tasks: vec![TaskToRestart {
task_id: 0,
Expand Down Expand Up @@ -60,13 +60,11 @@ async fn test_cannot_restart_running() -> Result<()> {
let daemon = daemon().await?;
let shared = &daemon.settings.shared;

// Add a single task that instantly finishes.
assert_success(add_task(shared, "sleep 60").await?);

// Wait for task 0 to finish.
let task = wait_for_task_condition(shared, 0, Task::is_running).await?;
// Add a long running task that starts immediately.
assert_success(add_and_start_task(shared, "sleep 60").await?);
let task = get_task(shared, 1).await?;

// Restart task 0 with an extended sleep command.
// Try to restart task 0
let restart_message = RestartMessage {
tasks: vec![TaskToRestart {
task_id: 0,
Expand Down
20 changes: 19 additions & 1 deletion pueue/tests/helper/factories/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,26 @@ pub async fn add_task_with_priority(
.context("Failed to to add task.")
}

/// Adds a task with a dependency to the test daemon.
pub async fn add_task_with_dependencies(
shared: &Shared,
command: &str,
dependencies: Vec<usize>,
) -> Result<Message> {
let mut message = create_add_message(shared, command);
message.dependencies = dependencies;

send_message(shared, message)
.await
.context("Failed to to add task.")
}

/// Adds a task to a specific group of the test daemon.
pub async fn add_task_to_group(shared: &Shared, command: &str, group: &str) -> Result<Message> {
pub async fn add_task_to_group<C: ToString, G: ToString>(
shared: &Shared,
command: C,
group: G,
) -> Result<Message> {
let mut message = create_add_message(shared, command);
message.group = group.to_string();

Expand Down
1 change: 0 additions & 1 deletion pueue/tests/helper/fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ impl Drop for PueueDaemon {
/// This is done in 90% of our tests, thereby this convenience helper.
pub async fn daemon() -> Result<PueueDaemon> {
let (settings, tempdir) = daemon_base_setup()?;

daemon_with_settings(settings, tempdir).await
}

Expand Down
37 changes: 37 additions & 0 deletions pueue/tests/helper/lockfile.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::{
fs::{remove_file, File},
path::{Path, PathBuf},
};

use anyhow::{Context, Result};

use super::{daemon_base_setup, daemon_with_settings, PueueDaemon};

/// A helper wrapper around [`daemon`] that also creates a lockfile that can be listened to from a
/// task via `inotifywait -e delete_self "$FILE"`.
/// This is super useful as it allows us to have a proper lock that we may release at any point in
/// time when working with timing specific issues in tests.
///
/// E.g. task1 depends on task0 and we want to make sure that task1 isn't started before task0
/// ends. This mechanism can be ensured that task0 only finishes when we allow it to do so.
pub async fn daemon_with_lockfile() -> Result<(PueueDaemon, PathBuf)> {
let (settings, tempdir) = daemon_base_setup()?;
let tempdir_path = tempdir.path().to_owned();

let daemon = daemon_with_settings(settings, tempdir).await?;

let lockfile = tempdir_path.join("file.lock");
File::create(&lockfile)?;

Ok((daemon, lockfile))
}

pub fn lockfile_command(path: &Path) -> String {
format!("inotifywait -e delete_self \"{}\"", path.to_string_lossy())
}

pub fn clear_lock(path: &Path) -> Result<()> {
remove_file(path).context("Failed to clear lock file")?;

Ok(())
}
2 changes: 2 additions & 0 deletions pueue/tests/helper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod asserts;
mod daemon;
mod factories;
mod fixtures;
mod lockfile;
mod log;
mod network;
mod state;
Expand All @@ -21,6 +22,7 @@ pub use asserts::*;
pub use daemon::*;
pub use factories::*;
pub use fixtures::*;
pub use lockfile::*;
pub use network::*;
pub use state::*;
pub use task::*;
Expand Down
4 changes: 2 additions & 2 deletions pueue/tests/helper/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use crate::helper::*;

/// Create a bare AddMessage for testing.
/// This is just here to minimize boilerplate code.
pub fn create_add_message(shared: &Shared, command: &str) -> AddMessage {
pub fn create_add_message<C: ToString>(shared: &Shared, command: C) -> AddMessage {
AddMessage {
command: command.into(),
command: command.to_string(),
path: shared.pueue_directory(),
envs: HashMap::from_iter(vars()),
start_immediately: false,
Expand Down
Loading

0 comments on commit 096b796

Please sign in to comment.