-
Notifications
You must be signed in to change notification settings - Fork 63
/
Copy pathrouter.go
58 lines (48 loc) · 1.05 KB
/
router.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package main
import (
"log"
"regexp"
"sync/atomic"
)
type Router struct {
inChan chan *PipelinePack
outChan map[*regexp.Regexp]chan *PipelinePack
}
func (self *Router) Init() {
self.outChan = make(map[*regexp.Regexp]chan *PipelinePack)
}
func (self *Router) AddOutChan(matchtag string, outChan chan *PipelinePack) error {
chunk, err := BuildRegexpFromGlobPattern(matchtag)
if err != nil {
return err
}
re, err := regexp.Compile(chunk)
if err != nil {
return err
}
self.outChan[re] = outChan
return nil
}
func (self *Router) AddInChan(inChan chan *PipelinePack) {
self.inChan = inChan
}
func (self *Router) Loop() {
for pack := range self.inChan {
for re, outChan := range self.outChan {
flag := re.MatchString(pack.Msg.Tag)
if flag == true {
atomic.AddInt32(&pack.RefCount, 1)
select {
case outChan <- pack:
default:
{
log.Println("outChan fulled, tag=", pack.Msg.Tag)
<-outChan
outChan <- pack
}
}
}
}
pack.Recycle()
}
}