@@ -8,29 +8,55 @@ import (
8
8
"sync/atomic"
9
9
"time"
10
10
11
+ "go.uber.org/fx"
11
12
"go.uber.org/zap"
12
13
13
- "github.com/go-kit/kit/metrics "
14
+ "github.com/prometheus/client_golang/prometheus "
14
15
uuid "github.com/satori/go.uuid"
15
16
16
17
"github.com/xmidt-org/sallust"
17
18
"github.com/xmidt-org/webpa-common/v2/adapter"
18
19
"github.com/xmidt-org/wrp-go/v3"
19
20
)
20
21
22
+ type ServerHandlerIn struct {
23
+ fx.In
24
+ Logger * zap.Logger
25
+ Telemetry * HandlerTelemetry
26
+ }
27
+
28
+ type ServerHandlerOut struct {
29
+ fx.Out
30
+ Handler * ServerHandler
31
+ }
32
+
21
33
// Below is the struct that will implement our ServeHTTP method
22
34
type ServerHandler struct {
23
- * zap.Logger
24
- caduceusHandler RequestHandler
25
- errorRequests metrics.Counter
26
- emptyRequests metrics.Counter
27
- invalidCount metrics.Counter
28
- incomingQueueDepthMetric metrics.Gauge
29
- modifiedWRPCount metrics.Counter
30
- incomingQueueDepth int64
31
- maxOutstanding int64
32
- incomingQueueLatency metrics.Histogram
33
- now func () time.Time
35
+ log * zap.Logger
36
+ // caduceusHandler RequestHandler
37
+ telemetry * HandlerTelemetry
38
+ incomingQueueDepth int64
39
+ maxOutstanding int64
40
+
41
+ now func () time.Time
42
+ }
43
+
44
+ type HandlerTelemetryIn struct {
45
+ fx.In
46
+ ErrorRequests prometheus.Counter `name:"error_request_body_counter"`
47
+ EmptyRequests prometheus.Counter `name:"empty_request_boyd_counter"`
48
+ InvalidCount prometheus.Counter `name:"drops_due_to_invalid_payload"`
49
+ IncomingQueueDepthMetric prometheus.Gauge `name:"incoming_queue_depth"`
50
+ ModifiedWRPCount prometheus.CounterVec `name:"modified_wrp_count"`
51
+ IncomingQueueLatency prometheus.HistogramVec `name:"incoming_queue_latency_histogram_seconds"`
52
+ }
53
+ type HandlerTelemetry struct {
54
+ errorRequests prometheus.Counter
55
+ emptyRequests prometheus.Counter
56
+ invalidCount prometheus.Counter
57
+ incomingQueueDepthMetric prometheus.Gauge
58
+ modifiedWRPCount prometheus.CounterVec
59
+ incomingQueueLatency prometheus.HistogramVec
34
60
}
35
61
36
62
func (sh * ServerHandler ) ServeHTTP (response http.ResponseWriter , request * http.Request ) {
@@ -42,7 +68,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
42
68
43
69
logger := sallust .Get (request .Context ())
44
70
if logger == adapter .DefaultLogger ().Logger {
45
- logger = sh .Logger
71
+ logger = sh .log
46
72
}
47
73
48
74
logger .Info ("Receiving incoming request..." )
@@ -66,19 +92,19 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
66
92
return
67
93
}
68
94
69
- sh .incomingQueueDepthMetric .Add (1.0 )
70
- defer sh .incomingQueueDepthMetric .Add (- 1.0 )
95
+ sh .telemetry . incomingQueueDepthMetric .Add (1.0 )
96
+ defer sh .telemetry . incomingQueueDepthMetric .Add (- 1.0 )
71
97
72
98
payload , err := io .ReadAll (request .Body )
73
99
if err != nil {
74
- sh .errorRequests .Add (1.0 )
100
+ sh .telemetry . errorRequests .Add (1.0 )
75
101
logger .Error ("Unable to retrieve the request body." , zap .Error (err ))
76
102
response .WriteHeader (http .StatusBadRequest )
77
103
return
78
104
}
79
105
80
106
if len (payload ) == 0 {
81
- sh .emptyRequests .Add (1.0 )
107
+ sh .telemetry . emptyRequests .Add (1.0 )
82
108
logger .Error ("Empty payload." )
83
109
response .WriteHeader (http .StatusBadRequest )
84
110
response .Write ([]byte ("Empty payload.\n " ))
@@ -91,7 +117,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
91
117
err = decoder .Decode (msg )
92
118
if err != nil || msg .MessageType () != 4 {
93
119
// return a 400
94
- sh .invalidCount .Add (1.0 )
120
+ sh .telemetry . invalidCount .Add (1.0 )
95
121
response .WriteHeader (http .StatusBadRequest )
96
122
if err != nil {
97
123
response .Write ([]byte ("Invalid payload format.\n " ))
@@ -106,15 +132,15 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
106
132
err = wrp .UTF8 (msg )
107
133
if err != nil {
108
134
// return a 400
109
- sh .invalidCount .Add (1.0 )
135
+ sh .telemetry . invalidCount .Add (1.0 )
110
136
response .WriteHeader (http .StatusBadRequest )
111
137
response .Write ([]byte ("Strings must be UTF-8.\n " ))
112
138
logger .Debug ("Strings must be UTF-8." )
113
139
return
114
140
}
115
141
eventType = msg .FindEventStringSubMatch ()
116
142
117
- sh .caduceusHandler .HandleRequest (0 , sh .fixWrp (msg ))
143
+ // sh.caduceusHandler.HandleRequest(0, sh.fixWrp(msg))
118
144
119
145
// return a 202
120
146
response .WriteHeader (http .StatusAccepted )
@@ -125,7 +151,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
125
151
126
152
func (sh * ServerHandler ) recordQueueLatencyToHistogram (startTime time.Time , eventType string ) {
127
153
endTime := sh .now ()
128
- sh .incomingQueueLatency .With ("event" , eventType ).Observe (endTime .Sub (startTime ).Seconds ())
154
+ sh .telemetry . incomingQueueLatency .With (prometheus. Labels { "event" : eventType } ).Observe (float64 ( endTime .Sub (startTime ).Seconds () ))
129
155
}
130
156
131
157
func (sh * ServerHandler ) fixWrp (msg * wrp.Message ) * wrp.Message {
@@ -134,13 +160,13 @@ func (sh *ServerHandler) fixWrp(msg *wrp.Message) *wrp.Message {
134
160
135
161
// Default to "application/json" if there is no content type, otherwise
136
162
// use the one the source specified.
137
- if "" == msg . ContentType {
163
+ if msg . ContentType == "" {
138
164
msg .ContentType = wrp .MimeTypeJson
139
165
reason = emptyContentTypeReason
140
166
}
141
167
142
168
// Ensure there is a transaction id even if we make one up
143
- if "" == msg . TransactionUUID {
169
+ if msg . TransactionUUID == "" {
144
170
msg .TransactionUUID = uuid .NewV4 ().String ()
145
171
if reason == "" {
146
172
reason = emptyUUIDReason
@@ -150,8 +176,40 @@ func (sh *ServerHandler) fixWrp(msg *wrp.Message) *wrp.Message {
150
176
}
151
177
152
178
if reason != "" {
153
- sh .modifiedWRPCount .With ("reason" , reason ).Add (1.0 )
179
+ sh .telemetry . modifiedWRPCount .With (prometheus. Labels { "reason" : reason } ).Add (1.0 )
154
180
}
155
181
156
182
return msg
157
183
}
184
+
185
+ var HandlerModule = fx .Module ("server" ,
186
+ fx .Provide (
187
+ func (in HandlerTelemetryIn ) * HandlerTelemetry {
188
+ return & HandlerTelemetry {
189
+ errorRequests : in .ErrorRequests ,
190
+ emptyRequests : in .EmptyRequests ,
191
+ invalidCount : in .InvalidCount ,
192
+ incomingQueueDepthMetric : in .IncomingQueueDepthMetric ,
193
+ modifiedWRPCount : in .ModifiedWRPCount ,
194
+ incomingQueueLatency : in .IncomingQueueLatency ,
195
+ }
196
+ }),
197
+ fx .Provide (
198
+ func (in ServerHandlerIn ) (ServerHandlerOut , error ) {
199
+ //Hard coding maxOutstanding and incomingQueueDepth for now
200
+ handler , err := New (in .Logger , in .Telemetry , 0.0 , 0.0 )
201
+ return ServerHandlerOut {
202
+ Handler : handler ,
203
+ }, err
204
+ },
205
+ ),
206
+ )
207
+
208
+ func New (log * zap.Logger , t * HandlerTelemetry , maxOutstanding , incomingQueueDepth int64 ) (* ServerHandler , error ) {
209
+ return & ServerHandler {
210
+ log : log ,
211
+ telemetry : t ,
212
+ maxOutstanding : maxOutstanding ,
213
+ incomingQueueDepth : incomingQueueDepth ,
214
+ }, nil
215
+ }
0 commit comments