forked from krakend/krakend-ce
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexecutor.go
148 lines (127 loc) · 4.49 KB
/
executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package krakend
import (
"context"
"fmt"
"io"
"os"
krakendbf "github.com/devopsfaith/bloomfilter/krakend"
cel "github.com/devopsfaith/krakend-cel"
"github.com/devopsfaith/krakend-cobra"
gelf "github.com/devopsfaith/krakend-gelf"
"github.com/devopsfaith/krakend-gologging"
"github.com/devopsfaith/krakend-jose"
logstash "github.com/devopsfaith/krakend-logstash"
metrics "github.com/devopsfaith/krakend-metrics/gin"
"github.com/devopsfaith/krakend-opencensus"
_ "github.com/devopsfaith/krakend-opencensus/exporter/influxdb"
_ "github.com/devopsfaith/krakend-opencensus/exporter/jaeger"
_ "github.com/devopsfaith/krakend-opencensus/exporter/prometheus"
_ "github.com/devopsfaith/krakend-opencensus/exporter/stackdriver"
_ "github.com/devopsfaith/krakend-opencensus/exporter/xray"
_ "github.com/devopsfaith/krakend-opencensus/exporter/zipkin"
pubsub "github.com/devopsfaith/krakend-pubsub"
"github.com/devopsfaith/krakend-usage/client"
"github.com/devopsfaith/krakend/config"
"github.com/devopsfaith/krakend/logging"
krakendrouter "github.com/devopsfaith/krakend/router"
router "github.com/devopsfaith/krakend/router/gin"
"github.com/gin-gonic/gin"
"github.com/go-contrib/uuid"
"github.com/letgoapp/krakend-influx"
)
func NewExecutor(ctx context.Context) cmd.Executor {
return func(cfg config.ServiceConfig) {
var writers []io.Writer
gelfWriter, gelfErr := gelf.NewWriter(cfg.ExtraConfig)
if gelfErr == nil {
writers = append(writers, gelfWriterWrapper{gelfWriter})
gologging.SetFormatterSelector(func(w io.Writer) string {
switch w.(type) {
case gelfWriterWrapper:
return "%{message}"
default:
return gologging.DefaultPattern
}
})
}
logger, gologgingErr := logstash.NewLogger(cfg.ExtraConfig)
if gologgingErr != nil {
logger, gologgingErr = gologging.NewLogger(cfg.ExtraConfig, writers...)
if gologgingErr != nil {
var err error
logger, err = logging.NewLogger("DEBUG", os.Stdout, "")
if err != nil {
return
}
logger.Error("unable to create the gologging logger:", gologgingErr.Error())
}
}
if gelfErr != nil {
logger.Error("unable to create the GELF writer:", gelfErr.Error())
}
logger.Info("Listening on port:", cfg.Port)
startReporter(ctx, logger, cfg)
reg := RegisterSubscriberFactories(ctx, cfg, logger)
// create the metrics collector
metricCollector := metrics.New(ctx, cfg.ExtraConfig, logger)
if err := influxdb.New(ctx, cfg.ExtraConfig, metricCollector, logger); err != nil {
logger.Warning(err.Error())
}
if err := opencensus.Register(ctx, cfg, append(opencensus.DefaultViews, pubsub.OpenCensusViews...)...); err != nil {
logger.Warning("opencensus:", err.Error())
}
rejecter, err := krakendbf.Register(ctx, "krakend-bf", cfg, logger, reg)
if err != nil {
logger.Warning("bloomFilter:", err.Error())
}
tokenRejecterFactory := jose.ChainedRejecterFactory([]jose.RejecterFactory{
jose.RejecterFactoryFunc(func(_ logging.Logger, _ *config.EndpointConfig) jose.Rejecter {
return jose.RejecterFunc(rejecter.RejectToken)
}),
jose.RejecterFactoryFunc(func(l logging.Logger, cfg *config.EndpointConfig) jose.Rejecter {
if r := cel.NewRejecter(l, cfg); r != nil {
return r
}
return jose.FixedRejecter(false)
}),
})
// setup the krakend router
routerFactory := router.NewFactory(router.Config{
Engine: NewEngine(cfg, logger),
ProxyFactory: NewProxyFactory(logger, NewBackendFactoryWithContext(ctx, logger, metricCollector), metricCollector),
Middlewares: []gin.HandlerFunc{},
Logger: logger,
HandlerFactory: NewHandlerFactory(logger, metricCollector, tokenRejecterFactory),
RunServer: krakendrouter.RunServer,
})
// start the engines
routerFactory.NewWithContext(ctx).Run(cfg)
}
}
const (
usageDisable = "USAGE_DISABLE"
)
func startReporter(ctx context.Context, logger logging.Logger, cfg config.ServiceConfig) {
if os.Getenv(usageDisable) == "1" {
logger.Info("usage report client disabled")
return
}
clusterID, err := cfg.Hash()
if err != nil {
logger.Warning("unable to hash the service configuration:", err.Error())
return
}
go func() {
serverID := uuid.NewV4().String()
logger.Info(fmt.Sprintf("registering usage stats for cluster ID '%s'", clusterID))
if err := client.StartReporter(ctx, client.Options{
ClusterID: clusterID,
ServerID: serverID,
}); err != nil {
logger.Warning("unable to create the usage report client:", err.Error())
}
}()
}
type gelfWriterWrapper struct {
io.Writer
}