-
Notifications
You must be signed in to change notification settings - Fork 63
/
Copy pathpipeline_runner.go
106 lines (86 loc) · 2.28 KB
/
pipeline_runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package main
import (
"log"
"sync/atomic"
)
type Message struct {
Tag string
Timestamp int64
Data map[string]interface{}
}
type PipelinePack struct {
MsgBytes []byte
Msg Message
RecycleChan chan *PipelinePack
RefCount int32
}
func NewPipelinePack(recycleChan chan *PipelinePack) (pack *PipelinePack) {
msgBytes := make([]byte, 100)
data := make(map[string]interface{})
msg := Message{Data: data}
return &PipelinePack{
MsgBytes: msgBytes,
Msg: msg,
RecycleChan: recycleChan,
RefCount: 1,
}
}
func (this *PipelinePack) Zero() {
this.MsgBytes = this.MsgBytes[:cap(this.MsgBytes)]
this.Msg.Data = make(map[string]interface{})
this.RefCount = 1
}
func (this *PipelinePack) Recycle() {
cnt := atomic.AddInt32(&this.RefCount, -1)
if cnt == 0 {
this.Zero()
this.RecycleChan <- this
}
}
type PipelineConfig struct {
Gc *GlobalConfig
InputRunners []interface{}
OutputRunners []interface{}
router Router
}
func NewPipeLineConfig(gc *GlobalConfig) *PipelineConfig {
config := new(PipelineConfig)
config.router.Init()
config.Gc = gc
return config
}
func (this *PipelineConfig) LoadConfig(path string) error {
configure, _ := ParseConfig(nil, path)
for _, v := range configure.Root.Elems {
if v.Name == "source" {
this.InputRunners = append(this.InputRunners, v.Attrs)
} else if v.Name == "match" {
v.Attrs["tag"] = v.Args
this.OutputRunners = append(this.OutputRunners, v.Attrs)
}
}
return nil
}
func Run(config *PipelineConfig) {
log.Println("Starting gofluent...")
rChan := make(chan *PipelinePack, config.Gc.PoolSize)
config.router.AddInChan(rChan)
for _, input_config := range config.InputRunners {
cf := input_config.(map[string]string)
InputRecycleChan := make(chan *PipelinePack, config.Gc.PoolSize)
for i := 0; i < config.Gc.PoolSize; i++ {
iPack := NewPipelinePack(InputRecycleChan)
InputRecycleChan <- iPack
}
iRunner := NewInputRunner(InputRecycleChan, rChan)
go iRunner.Start(cf)
}
for _, output_config := range config.OutputRunners {
cf := output_config.(map[string]string)
inChan := make(chan *PipelinePack, config.Gc.PoolSize)
oRunner := NewOutputRunner(inChan)
config.router.AddOutChan(cf["tag"], oRunner.InChan())
go oRunner.Start(cf)
}
config.router.Loop()
}