Package taskengine
concurrently execute a set of tasks
assigned to multiple different workers.
Each worker can works all or a subset of the tasks.
When a worker is ready, the next task to execute is dynamically choosen considering the current status of the tasks so to maximize the thoughput of the tasks successfully executed.
After the first success result is found the remaining jobs for same task are cancelled.
The main types defined by the package are:
- Engine
- Task
- Worker
- WorkerTasks
- Event
The NewEngine
function initialize a new Engine
object given the list of workers and the tasks of each worker.
func NewEngine(ws []*Worker, wts WorkerTasks) (*Engine, error)
The Execute
method returns a chan that receives the workers results after each task execution.
func (eng *Engine) Execute(ctx context.Context, mode Mode) (chan Result, error)
The Execute
method uses internally the ExecuteEvents
method and filters the returned results based on the Mode
parameter.
The Mode
enum type represents the mode of execution:
-
AllResults
: for each task returns the results of all the workers: success, error or canceled. Multiple success results might be returned if they happen almost at the same time. -
SuccessOrErrorResults
: for each task returns the success or error results. The canceled results are not returned. Multiple success results might be returned if they happen almost at the same time. -
ResultsUntilFirstSuccess
: for each task returns the results preceding the first success (included). At most one success is returned. -
FirstSuccessOrLastResult
: For each task returns only one result: the first success or the last result. At most one success is returned.
The ExecuteEvents
method returns a chan that receives all the Events generated by each task execution.
For each (worker, task) pair, it is emitted a Start event followed by a final event that can be Success, Canceled or Error.
func (eng *Engine) ExecuteEvents(ctx context.Context) (chan *Event, error)
This method is useful to track the execution of the tasks:
while Execute
can only return the result on completion of execution, the ExecuteEvents
method returns also the Start event at the beginning of execution (with a nil result).
A Task
represents a unit of work to be executed. Each task can be assigned to one or more workers.
Two tasks are considered equivalent if they have the same TaskID
.
NOTE: tasks with the same TaskID can be different object with different information; this allows a task object assigned to a worker to contain information specific to that worker.
type Task interface {
TaskID() TaskID // Unique ID of the task
}
Each Worker
has a WorkFunc
that performs the task.
Multiple instances of the same worker can be used in order to execute concurrently different tasks assign to the worker.
type Worker struct {
WorkerID WorkerID // Unique ID of the worker
Instances int // Number of worker instances
Work WorkFunc // The work function
}
The WorkFunc
receives in input a context
, the *Worker
and the instance number of the worker and the Task
, and returns an object that meets the Result
interface.
type WorkFunc func(context.Context, *Worker, int, Task) Result
The Result
interface has only the Error
method.
type Result interface {
Error() error
}
The Error
method is used to determine the status of the task execution based on the error returned:
- Success: if error is nil
- Canceled: if error is context.Canceled
- Error: otherwise
WorkerTasks
type is a map that contains the tasks list of each WorkerID.
type WorkerTasks map[WorkerID]Tasks
Event
type contains the informations to track a task execution.
Events objects are emitted by the engine.ExecuteEvents
method.
For each (worker, task) pair, it is emitted a Start event
followeb by a final event that can be Success, Canceled or Error.
type Event struct {
Result Result // nil for Start event
WorkerID WorkerID
WorkerInst int
Task Task
TaskStat TaskStat
TimeStart time.Time
TimeEnd time.Time // same as TimeStart for Start event
}
The Type
method returns the EventType of the event.
const (
EventNil EventType = iota
EventStart
EventSuccess
EventError
EventCanceled
)