Skip to content

Execute concurrently a set of tasks assigned to multiple different workers

License

Notifications You must be signed in to change notification settings

mmbros/taskengine

Repository files navigation

taskengine

Package taskengine concurrently execute a set of tasks assigned to multiple different workers.

Each worker can works all or a subset of the tasks.

When a worker is ready, the next task to execute is dynamically choosen considering the current status of the tasks so to maximize the thoughput of the tasks successfully executed.

After the first success result is found the remaining jobs for same task are cancelled.

The main types defined by the package are:

  • Engine
  • Task
  • Worker
  • WorkerTasks
  • Event

Engine

NewEngine

The NewEngine function initialize a new Engine object given the list of workers and the tasks of each worker.

func NewEngine(ws []*Worker, wts WorkerTasks) (*Engine, error)

Execute

The Execute method returns a chan that receives the workers results after each task execution.

func (eng *Engine) Execute(ctx context.Context, mode Mode) (chan Result, error)

The Execute method uses internally the ExecuteEvents method and filters the returned results based on the Mode parameter.

The Mode enum type represents the mode of execution:

  • AllResults: for each task returns the results of all the workers: success, error or canceled. Multiple success results might be returned if they happen almost at the same time.

  • SuccessOrErrorResults: for each task returns the success or error results. The canceled results are not returned. Multiple success results might be returned if they happen almost at the same time.

  • ResultsUntilFirstSuccess: for each task returns the results preceding the first success (included). At most one success is returned.

  • FirstSuccessOrLastResult: For each task returns only one result: the first success or the last result. At most one success is returned.

ExecuteEvents

The ExecuteEvents method returns a chan that receives all the Events generated by each task execution. For each (worker, task) pair, it is emitted a Start event followed by a final event that can be Success, Canceled or Error.

func (eng *Engine) ExecuteEvents(ctx context.Context) (chan *Event, error)

This method is useful to track the execution of the tasks: while Execute can only return the result on completion of execution, the ExecuteEvents method returns also the Start event at the beginning of execution (with a nil result).

Task

A Task represents a unit of work to be executed. Each task can be assigned to one or more workers. Two tasks are considered equivalent if they have the same TaskID.

NOTE: tasks with the same TaskID can be different object with different information; this allows a task object assigned to a worker to contain information specific to that worker.

type Task interface {
    TaskID() TaskID // Unique ID of the task
}

Worker

Each Worker has a WorkFunc that performs the task. Multiple instances of the same worker can be used in order to execute concurrently different tasks assign to the worker.

type Worker struct {
    WorkerID  WorkerID   // Unique ID of the worker
    Instances int        // Number of worker instances
    Work      WorkFunc   // The work function
}

The WorkFunc receives in input a context, the *Worker and the instance number of the worker and the Task, and returns an object that meets the Result interface.

type WorkFunc func(context.Context, *Worker, int, Task) Result

The Result interface has only the Error method.

type Result interface {
    Error() error
}

The Error method is used to determine the status of the task execution based on the error returned:

  • Success: if error is nil
  • Canceled: if error is context.Canceled
  • Error: otherwise

WorkerTasks

WorkerTasks type is a map that contains the tasks list of each WorkerID.

type WorkerTasks map[WorkerID]Tasks

Event

Event type contains the informations to track a task execution. Events objects are emitted by the engine.ExecuteEvents method. For each (worker, task) pair, it is emitted a Start event followeb by a final event that can be Success, Canceled or Error.

type Event struct {
    Result     Result // nil for Start event
    WorkerID   WorkerID
    WorkerInst int
    Task       Task
    TaskStat   TaskStat
    TimeStart  time.Time
    TimeEnd    time.Time // same as TimeStart for Start event
}

Type

The Type method returns the EventType of the event.

const (
    EventNil EventType = iota
    EventStart
    EventSuccess
    EventError
    EventCanceled
)

About

Execute concurrently a set of tasks assigned to multiple different workers

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages