-
Notifications
You must be signed in to change notification settings - Fork 4
/
task-manager.go
127 lines (116 loc) · 3.57 KB
/
task-manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package main
import (
"fmt"
"log"
"os"
"strings"
"sync"
"time"
"github.com/ashwanthkumar/golang-utils/maps"
"github.com/indix/marathon-logger/mesos"
)
const LogFilesToMonitor = "logs.files"
type TaskInfo struct {
App string
Labels map[string]string
TaskID string
Hostname string
CWD string // Current working directory of the task in the slave
FileNames []string // Actual file name to that we need monitor for logs
WorkDir string // WorkDir location of marathon-logger where we setup Symlink
}
// CleanAppName cleans the app-name string for `/` characters
func (t *TaskInfo) CleanAppName() string {
return strings.Replace(t.App[1:], "/", ".", -1)
}
// TaskManager - Enhances the Task with FileName and CWD info
// Message Flow: App Monitor -> Task Manager -> Log Manager
type TaskManager struct {
InputTasksChannel chan Task
MaxTasksHeartBeatInterval time.Duration
SlavePort int
AddLogs chan TaskInfo
RemoveLogs chan string
KnownTasks map[string]time.Time
Client mesos.Mesos
RunWaitGroup sync.WaitGroup
TasksMutex sync.Mutex
stopChannel chan bool
}
// Start the TaskManager
func (t *TaskManager) Start() {
fmt.Println("Starting Task Manager...")
t.RunWaitGroup.Add(1)
t.stopChannel = make(chan bool)
t.AddLogs = make(chan TaskInfo)
t.RemoveLogs = make(chan string)
t.KnownTasks = make(map[string]time.Time)
t.Client = mesos.NewMesosClient()
go t.run()
fmt.Println("Task Manager Started.")
fmt.Printf("Task Manager - Task's MaxHeartBeatInterval is %v\n", t.MaxTasksHeartBeatInterval)
}
// Stop the TaskManager
func (t *TaskManager) Stop() {
fmt.Println("Stopping Task Manager...")
close(t.stopChannel)
t.RunWaitGroup.Done()
}
func (t *TaskManager) run() {
hostname, err := os.Hostname()
if err != nil {
log.Fatalf("Error - %v\n", err)
}
log.Printf("Looking for tasks on %v", hostname)
running := true
for running {
select {
case <-time.After(5 * time.Second):
for task, lastHeartbeat := range t.KnownTasks {
if time.Now().Sub(lastHeartbeat) > t.MaxTasksHeartBeatInterval {
t.RemoveLogs <- task
t.TasksMutex.Lock()
delete(t.KnownTasks, task)
t.TasksMutex.Unlock()
}
}
case task := <-t.InputTasksChannel:
// TODO hostname check is an optimisation to avoid HTTP calls to mesos slave
// currently in our setup hostnames a little messed up, so disabling this chck for now
// if task.Hostname == hostname {
// println("Got task for addition.. do what needs to be done")
// fmt.Printf("%v\n", task)
t.TasksMutex.Lock()
_, present := t.KnownTasks[task.TaskID]
if !present {
log.Printf("TaskID %s is not monitored, sending it to LogManager", task.TaskID)
slaveState, _ := t.Client.SlaveState(fmt.Sprintf("http://%s:%d/state.json", hostname, t.SlavePort))
// fmt.Printf("%v\n", slaveState)
executor := slaveState.FindExecutor(task.TaskID)
if executor != nil {
logFiles := strings.Split(maps.GetString(task.Labels, LogFilesToMonitor, "stdout"), ",")
t.KnownTasks[task.TaskID] = time.Now()
taskInfo := TaskInfo{
App: task.App,
Hostname: task.Hostname,
Labels: task.Labels,
TaskID: task.TaskID,
CWD: executor.Directory,
FileNames: logFiles,
}
// fmt.Printf("%v\n", taskInfo)
t.AddLogs <- taskInfo
} else {
log.Printf("[WARN] Couldn't find the executor that spun up the task %s", task.TaskID)
}
} else {
// Already present - update the clock
t.KnownTasks[task.TaskID] = time.Now()
}
t.TasksMutex.Unlock()
// }
case <-t.stopChannel:
running = false
}
}
}