diff --git a/manager/src/state/file.rs b/manager/src/state/file.rs index c24e12e..b855e20 100644 --- a/manager/src/state/file.rs +++ b/manager/src/state/file.rs @@ -131,11 +131,27 @@ impl State for FileStore { } pending_file_path.push(task_id); - File::create(pending_file_path) - .context(StateErrorKind::PendingTaskWriteFailed)? - .write_all(&task_id.as_bytes()) - .context(StateErrorKind::PendingTaskWriteFailed)?; - Ok(()) + match task.get_status() { + TaskStatus::TASK_UNKNOWN => Ok(()), + TaskStatus::TASK_IN_PROGRESS => Ok(()), + TaskStatus::TASK_PENDING => { + File::create(pending_file_path) + .context(StateErrorKind::PendingTaskWriteFailed)? + .write_all(&task_id.as_bytes()) + .context(StateErrorKind::PendingTaskWriteFailed)?; + Ok(()) + } + TaskStatus::TASK_DONE => { + fs::remove_file(pending_file_path) + .context(StateErrorKind::PendingTaskRemoveFailed)?; + Ok(()) + } + TaskStatus::TASK_FAILED => { + // TODO: Do we want to remove the pending file if the task + // fails or do we keep it? + Ok(()) + } + } } fn pending_map_tasks(&self, job: &Job) -> Result, StateError> { diff --git a/manager/src/state/mod.rs b/manager/src/state/mod.rs index 39d40bc..111b9ff 100644 --- a/manager/src/state/mod.rs +++ b/manager/src/state/mod.rs @@ -8,14 +8,16 @@ use std::fmt::Display; use failure::*; use futures::Future; -use heracles_proto::datatypes::{Job, Task, TaskKind}; +use heracles_proto::datatypes::{Job, Task, TaskKind, TaskStatus}; #[allow(doc_markdown)] /// Interface for creating connections to state stores, such as etcd or TiKV etc. pub trait State { /// Serialize the job and save it in the state store so it can be loaded later. fn save_job(&self, job: &Job) -> Result<(), StateError>; - /// Adds a task to the list of tasks and add it to pending + /// Adds a task to the list of tasks. + /// If the task has status of PENDING, it is added to pending tasks + /// If the task has status of DONE, the pending task is removed. fn save_task(&self, task: &Task) -> Result<(), StateError>; /// List of pending map tasks for a specific job. fn pending_map_tasks(&self, job: &Job) -> Result, StateError>; @@ -61,6 +63,8 @@ pub enum StateErrorKind { TaskWriteFailed, #[fail(display = "Failed to create pending task")] PendingTaskWriteFailed, + #[fail(display = "Failed to remove pending task")] + PendingTaskRemoveFailed, #[fail(display = "Failed operation.")] OperationFailed, } diff --git a/worker/settings/settings.go b/worker/settings/settings.go index 4978f5c..703d28b 100644 --- a/worker/settings/settings.go +++ b/worker/settings/settings.go @@ -33,7 +33,7 @@ func Init() error { func setDefaults() { settings.SetDefault("broker.queue_name", "heracles_tasks") settings.SetDefault("broker.address", "") - settings.SetDefault("state.backend", "file") + settings.SetDefault("state.backend", "empty") settings.SetDefault("state.location", "") } diff --git a/worker/state/empty.go b/worker/state/empty.go new file mode 100644 index 0000000..19c8fed --- /dev/null +++ b/worker/state/empty.go @@ -0,0 +1,21 @@ +package state + +import ( + "github.com/cpssd/heracles/proto/datatypes" + log "github.com/golang/glog" +) + +// EmptyStore implements State +type EmptyStore struct { +} + +// NewEmptyStore returns a state store which does nothing. +func NewEmptyStore() (*EmptyStore, error) { + return &EmptyStore{}, nil +} + +// SaveProgress implementation +func (f EmptyStore) SaveProgress(task *datatypes.Task) error { + log.V(1).Info("empty store saving task %s", task.GetId()) + return nil +} diff --git a/worker/state/state.go b/worker/state/state.go index a98b432..2445c94 100644 --- a/worker/state/state.go +++ b/worker/state/state.go @@ -20,6 +20,8 @@ func New() (State, error) { case "file": location := settings.Get("state.location").(string) return NewFileStore(location) + case "empty": + return NewEmptyStore() } return nil, errors.New("unknown state kind")