Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(manager): Make the manager responsible for saving the tasks. #417

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions manager/src/state/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,27 @@ impl State for FileStore {
}
pending_file_path.push(task_id);

File::create(pending_file_path)
.context(StateErrorKind::PendingTaskWriteFailed)?
.write_all(&task_id.as_bytes())
.context(StateErrorKind::PendingTaskWriteFailed)?;
Ok(())
match task.get_status() {
TaskStatus::TASK_UNKNOWN => Ok(()),
TaskStatus::TASK_IN_PROGRESS => Ok(()),
TaskStatus::TASK_PENDING => {
File::create(pending_file_path)
.context(StateErrorKind::PendingTaskWriteFailed)?
.write_all(&task_id.as_bytes())
.context(StateErrorKind::PendingTaskWriteFailed)?;
Ok(())
}
TaskStatus::TASK_DONE => {
fs::remove_file(pending_file_path)
.context(StateErrorKind::PendingTaskRemoveFailed)?;
Ok(())
}
TaskStatus::TASK_FAILED => {
// TODO: Do we want to remove the pending file if the task
// fails or do we keep it?
Ok(())
}
}
}

fn pending_map_tasks(&self, job: &Job) -> Result<Vec<Task>, StateError> {
Expand Down
8 changes: 6 additions & 2 deletions manager/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ use std::fmt::Display;
use failure::*;
use futures::Future;

use heracles_proto::datatypes::{Job, Task, TaskKind};
use heracles_proto::datatypes::{Job, Task, TaskKind, TaskStatus};

#[allow(doc_markdown)]
/// Interface for creating connections to state stores, such as etcd or TiKV etc.
pub trait State {
/// Serialize the job and save it in the state store so it can be loaded later.
fn save_job(&self, job: &Job) -> Result<(), StateError>;
/// Adds a task to the list of tasks and add it to pending
/// Adds a task to the list of tasks.
/// If the task has status of PENDING, it is added to pending tasks
/// If the task has status of DONE, the pending task is removed.
fn save_task(&self, task: &Task) -> Result<(), StateError>;
/// List of pending map tasks for a specific job.
fn pending_map_tasks(&self, job: &Job) -> Result<Vec<Task>, StateError>;
Expand Down Expand Up @@ -61,6 +63,8 @@ pub enum StateErrorKind {
TaskWriteFailed,
#[fail(display = "Failed to create pending task")]
PendingTaskWriteFailed,
#[fail(display = "Failed to remove pending task")]
PendingTaskRemoveFailed,
#[fail(display = "Failed operation.")]
OperationFailed,
}
Expand Down
2 changes: 1 addition & 1 deletion worker/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func Init() error {
func setDefaults() {
settings.SetDefault("broker.queue_name", "heracles_tasks")
settings.SetDefault("broker.address", "")
settings.SetDefault("state.backend", "file")
settings.SetDefault("state.backend", "empty")
settings.SetDefault("state.location", "")
}

Expand Down
21 changes: 21 additions & 0 deletions worker/state/empty.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package state

import (
"github.com/cpssd/heracles/proto/datatypes"
log "github.com/golang/glog"
)

// EmptyStore implements State
type EmptyStore struct {
}

// NewEmptyStore returns a state store which does nothing.
func NewEmptyStore() (*EmptyStore, error) {
return &EmptyStore{}, nil
}

// SaveProgress implementation
func (f EmptyStore) SaveProgress(task *datatypes.Task) error {
log.V(1).Info("empty store saving task %s", task.GetId())
return nil
}
2 changes: 2 additions & 0 deletions worker/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ func New() (State, error) {
case "file":
location := settings.Get("state.location").(string)
return NewFileStore(location)
case "empty":
return NewEmptyStore()
}

return nil, errors.New("unknown state kind")
Expand Down