forked from skolodyazhnyy/amqp-cgi-bridge
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
125 lines (102 loc) · 2.42 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package main
import (
"context"
"flag"
"fmt"
"github.com/skolodyazhnyy/amqp-cgi-bridge/bridge"
"github.com/skolodyazhnyy/go-common/log"
"gopkg.in/yaml.v2"
"io/ioutil"
"os"
"os/signal"
"time"
)
var version = "unknown"
var commit = "unknown"
var config struct {
AMQPURL string `yaml:"amqp_url"`
Consumers []struct {
Queue string
Prefetch *int
Parallelism int
FailureTimeout time.Duration
Env map[string]string
FastCGI struct {
Net string
Addr string
ScriptName string `yaml:"script_name"`
}
}
}
func load(filename string, v interface{}) error {
data, err := ioutil.ReadFile(filename)
if err != nil {
return err
}
return yaml.Unmarshal(data, v)
}
func main() {
// parse flags
filename := flag.String("config", "config.yml", "Configuration")
logfmt := flag.String("log", "text", "Log format: json or text")
printVersion := flag.Bool("v", false, "Print version")
flag.Parse()
if *printVersion {
fmt.Println("Version", version)
fmt.Println("Commit", commit)
os.Exit(0)
}
logger := log.New(*logfmt, os.Stdout, log.DefaultTextFormat).With(log.R{
"app": "amqp-cgi-bridge",
"version": version,
})
if err := load(*filename, &config); err != nil {
logger.Fatal(err)
}
ctx := context.Background()
var queues []bridge.Queue
for _, c := range config.Consumers {
if c.FastCGI.Net == "" {
c.FastCGI.Net = "tcp"
}
if c.FastCGI.Addr == "" {
c.FastCGI.Addr = "127.0.0.1:9000"
}
if c.FastCGI.ScriptName == "" {
c.FastCGI.ScriptName = "index.php"
}
p := bridge.NewFastCGIProcessor(
c.FastCGI.Net,
c.FastCGI.Addr,
c.FastCGI.ScriptName,
logger.Channel("fastcgi").With(log.R{
"script_name": c.FastCGI.ScriptName,
}),
)
if c.Env != nil {
p = bridge.ProcessorWithEnv(p, c.Env)
}
if c.Parallelism <= 0 {
c.Parallelism = 1
}
if c.Prefetch == nil {
c.Prefetch = &c.Parallelism
}
if c.FailureTimeout == 0 {
c.FailureTimeout = 10 * time.Second
}
queues = append(queues, bridge.Queue{
Name: c.Queue,
Prefetch: *c.Prefetch,
Parallelism: c.Parallelism,
FailureTimeout: c.FailureTimeout,
Processor: p,
})
}
cons := bridge.NewAMQPConsumer(ctx, config.AMQPURL, queues, logger.Channel("amqp"))
signals := make(chan os.Signal)
signal.Notify(signals, os.Interrupt, os.Kill)
s := <-signals
logger.Infof("Signal %v received, stopping...", s)
cons.Stop()
}