forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhandler.go
135 lines (114 loc) · 3.03 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package zipkin
import (
"compress/gzip"
"io/ioutil"
"net/http"
"strings"
"sync"
"github.com/apache/thrift/lib/go/thrift"
"github.com/gorilla/mux"
"github.com/openzipkin/zipkin-go-opentracing/_thrift/gen-go/zipkincore"
)
// SpanHandler is an implementation of a Handler which accepts zipkin thrift
// span data and sends it to the recorder
type SpanHandler struct {
Path string
recorder Recorder
waitGroup *sync.WaitGroup
}
// NewSpanHandler returns a new server instance given path to handle
func NewSpanHandler(path string) *SpanHandler {
return &SpanHandler{
Path: path,
}
}
func cors(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if origin := r.Header.Get("Origin"); origin != "" {
w.Header().Set(`Access-Control-Allow-Origin`, origin)
w.Header().Set(`Access-Control-Allow-Methods`, strings.Join([]string{
`OPTIONS`,
`POST`,
}, ", "))
w.Header().Set(`Access-Control-Allow-Headers`, strings.Join([]string{
`Accept`,
`Accept-Encoding`,
`Content-Length`,
`Content-Type`,
}, ", "))
w.Header().Set(`Access-Control-Expose-Headers`, strings.Join([]string{
`Date`,
}, ", "))
}
if r.Method == "OPTIONS" {
return
}
next.ServeHTTP(w, r)
}
}
// Register implements the Service interface. Register accepts zipkin thrift data
// POSTed to the path of the mux router
func (s *SpanHandler) Register(router *mux.Router, recorder Recorder) error {
handler := cors(http.HandlerFunc(s.Spans))
router.Handle(s.Path, handler).Methods("POST", "OPTIONS")
s.recorder = recorder
return nil
}
// Spans handles zipkin thrift spans
func (s *SpanHandler) Spans(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
body := r.Body
var err error
// Handle gzip decoding of the body
if r.Header.Get("Content-Encoding") == "gzip" {
body, err = gzip.NewReader(r.Body)
if err != nil {
s.recorder.Error(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
defer body.Close()
}
octets, err := ioutil.ReadAll(body)
if err != nil {
s.recorder.Error(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
spans, err := unmarshalThrift(octets)
if err != nil {
s.recorder.Error(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
trace := NewTrace(spans)
if err = s.recorder.Record(trace); err != nil {
s.recorder.Error(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
func unmarshalThrift(body []byte) ([]*zipkincore.Span, error) {
buffer := thrift.NewTMemoryBuffer()
if _, err := buffer.Write(body); err != nil {
return nil, err
}
transport := thrift.NewTBinaryProtocolTransport(buffer)
_, size, err := transport.ReadListBegin()
if err != nil {
return nil, err
}
spans := make([]*zipkincore.Span, size)
for i := 0; i < size; i++ {
zs := &zipkincore.Span{}
if err = zs.Read(transport); err != nil {
return nil, err
}
spans[i] = zs
}
if err = transport.ReadListEnd(); err != nil {
return nil, err
}
return spans, nil
}