A distributed task runner.
go get github.com/Tapfury/cogman
cd cogman
./build.sh
- Go
- RabbitMQ
- Redis
- MongoDB (optional)
- Task Priority
- Persistence
- Queue type
- Retries
- Multiple consumer & server
- Concurrency
- Redis and Mongo log
- Re-enqueue recovered task
- Handle reconnection
- UI
- Rest API
Cogman api config example.
cfg := &config.Config{
ConnectionTimeout: time.Minute * 10, // default value 10 minutes
RequestTimeout : time.Second * 5, // default value 5 second
AmqpURI : "amqp://localhost:5672", // required
RedisURI: "redis://localhost:6379/0", // required
MongoURI: "mongodb://root:secret@localhost:27017/", // optional
RedisTTL: time.Hour * 24 * 7, // default value 1 week
MongoTTL: time.Hour * 24 * 30, // default value 1 month
HighPriorityQueueCount: 2, // default value 1
LowPriorityQueueCount : 4, // default value 1
}
Client & Server have individual config file to use them separately.
This Congman api call will start a client and a server.
if err := cogman.StartBackground(cfg); err != nil {
log.Fatal(err)
}
Initiate Client & Server individually:
// Client
clnt, err := cogman.NewSession(cfg)
if err != nil {
log.Fatal(err)
}
if err := clnt.Connect(); err != nil {
log.Fatal(err)
}
// Server
srvr, err := cogman.NewServer(cfg)
if err != nil {
log.Fatal(err)
}
go func() {
defer srvr.Stop()
if err = srvr.Start(); err != nil {
log.Fatal(err)
}
}()
Tasks are grouped by two priority level. Based on that it will be assigned to a queue.
type Task struct {
TaskID string // unique. ID should be assigned by Cogman.
Name string // required. And Task name must be registered with a task handler
OriginalTaskID string // a retry task will carry it's parents ID.
PrimaryKey string // optional. Client can set any key to trace a task.
Retry int // default value 0.
Prefetch int // optional. Number of task fetch from queue by consumer at a time.
Payload []byte // required
Priority TaskPriority // required. High or Low
Status Status // current task status
FailError string // default empty. If Status is failed, it must have a value.
Duration *float64 // task execution time.
CreatedAt time.Time // create time.
UpdatedAt time.Time // last update time.
}
Any struct can be passed as a handler it implement below interface
:
type Handler interface {
Do(ctx context.Context, payload []byte) error
}
A function type HandlerFunc
also can pass as handler.
type HandlerFunc func(ctx context.Context, payload []byte) error
func (h HandlerFunc) Do(ctx context.Context, payload []byte) error {
return h(ctx, payload)
}
Sending task using Cogman API:
if err := cogman.SendTask(*task, handler); err != nil {
log.Fatal(err)
}
// If a task handler already registered for the task, task handler wont required further.
if err := cogman.SendTask(*task, nil); err != nil {
log.Fatal(err)
}
Sending task using Cogman Client/Server:
// Register task handler from Server side
server.Register(taskName, handler)
server.Register(taskName, handlerFunc)
// Sending task from client
if err := client.SendTask(task); err != nil {
return err
}
Cogman queue type:
- High_Priority_Queue [default Queue]
- Low_Priority_Queue [lazy Queue]
Two type queue Cogman maintain. Default & Lazy queue. All the high priority task will be push to default queue and low priority task will be push to lazy queue. The number of each type of queue can be set by client/server. And queue won't be lost after any sort of connection interruption.
Cogman Client & Server both handler reconnection. If Client loss connection, it can still take task and those will be processed immediate after Cogman client get back the connection. After Server reconnect, it will start to consume task without losing any task.
Re-enqueue feature to recover all the initiated task those are lost for connection error. If client somehow lost the amqp connection, Cogman can still take the task in offline. All offline task will be re-queue after connection re-established. Cogman fetch all the offline task from mongo logs, and re-initiate them. Mongo connection required here. For re-enqueuing, task retry count wont be changed.
Comparison among the other job/task process runner.
Feature | Cogman | Machinery |
---|---|---|
Backend | redis/mongo | redis |
Priorities | ✓ | ✓ |
Re-Enqueue | ✓ | |
Concurrency | ✓ | ✓ |
Re-Connection | ✓ | |
Delayed jobs | ✓ | |
Concurrent client/server | ✓ | |
Re-try | ✓ | ✓ |
Persistence | ✓ | ✓ |
UI | ||
Rest API | ✓ | |
Chain | ✓ | |
Chords | ✓ | |
Groups | ✓ |