-
Notifications
You must be signed in to change notification settings - Fork 57
/
http.go
151 lines (130 loc) · 4.43 KB
/
http.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package typhon
import (
"io"
"net"
"net/http"
"net/textproto"
"os"
"strings"
"sync"
"syscall"
"github.com/monzo/slog"
"golang.org/x/net/http/httpguts"
)
const (
// chunkThreshold is a byte threshold above which request and response bodies that result from using buffered I/O
// within Typhon will be transferred with chunked encoding on the wire.
chunkThreshold = 5 * 1000000 // 5 megabytes
)
var httpChunkBufPool = sync.Pool{
New: func() interface{} {
buf := make([]byte, 32*1024) // size is the same as io.Copy uses internally
return &buf
}}
func isStreamingRsp(rsp Response) bool {
// Most straightforward: service may have set rsp.Body to a streamer
if s, ok := rsp.Body.(*streamer); ok && s != nil {
return true
}
// If the content length is unknown, it should stream
if rsp.ContentLength <= 0 {
return true
}
// If the response body is the same as the request body and the request is streaming, the response should be too
if rsp.Request != nil && rsp.Request.ContentLength <= 0 && rsp.Body == rsp.Request.Body {
return true
}
// Chunked transfer encoding (only in HTTP/1.1) gives us an additional clue
if !rsp.ProtoAtLeast(2, 0) {
if httpguts.HeaderValuesContainsToken(rsp.Header[textproto.CanonicalMIMEHeaderKey("Transfer-Encoding")], "chunked") {
return true
}
// Annoyingly, this can be removed from headers by net/http and promoted to its own field
for _, v := range rsp.TransferEncoding {
if v == "chunked" {
return true
}
}
}
return false
}
// copyErrSeverity returns a slog error severity that should be used to report an error from an io.Copy operation to
// send the response body to a client. This exists because these errors often do not indicate actual problems. For
// example, a client may disconnect before the response body is copied to it; this doesn't mean the server is
// misbehaving.
func copyErrSeverity(err error) slog.Severity {
switch {
case strings.HasSuffix(err.Error(), "read on closed response body"),
strings.HasSuffix(err.Error(), "connection reset by peer"),
strings.HasSuffix(err.Error(), "http2: stream closed"):
return slog.DebugSeverity
}
// Annoyingly, these errors can be deeply nested; &net.OpError{&os.SyscallError{syscall.Errno}}
switch err := err.(type) {
case syscall.Errno:
return copyErrnoSeverity(err) // platform-specific
case *os.SyscallError:
return copyErrSeverity(err.Err)
case *net.OpError:
return copyErrSeverity(err.Err)
default:
return slog.WarnSeverity
}
}
// bodyAllowedForStatus reports whether a given response status code
// permits a body. This is taken directly from the net/http package.
func bodyAllowedForStatus(status int) bool {
switch {
case status >= 100 && status <= 199:
return false
case status == 204:
return false
case status == 304:
return false
}
return true
}
// HttpHandler transforms the given Service into a standard library HTTP handler. It is one of the main "bridges"
// between Typhon and net/http.
func HttpHandler(svc Service) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, httpReq *http.Request) {
if httpReq.Body != nil {
defer httpReq.Body.Close()
}
req := Request{
Context: httpReq.Context(),
Request: *httpReq}
if h, ok := rw.(http.Hijacker); ok {
req.hijacker = h
}
rsp := svc(req)
// If the connection was hijacked, we should not attempt to write anything out
if rsp.hijacked {
return
}
rwHeader := rw.Header()
for k, v := range rsp.Header {
rwHeader[k] = v
}
rw.WriteHeader(rsp.StatusCode)
if rsp.Body != nil && bodyAllowedForStatus(rsp.StatusCode) {
defer rsp.Body.Close()
buf := *httpChunkBufPool.Get().(*[]byte)
defer httpChunkBufPool.Put(&buf)
if isStreamingRsp(rsp) {
// Streaming responses use copyChunked(), which takes care of flushing transparently
if _, err := copyChunked(rw, rsp.Body, buf); err != nil {
slog.Log(slog.Eventf(copyErrSeverity(err), req, "Couldn't send streaming response body", err))
// Prevent the client from accidentally consuming a truncated stream by aborting the response.
// The official way of interrupting an HTTP reply mid-stream is panic(http.ErrAbortHandler), which
// works for both HTTP/1.1 and HTTP.2. https://github.com/golang/go/issues/17790
panic(http.ErrAbortHandler)
}
} else {
if _, err := io.CopyBuffer(rw, rsp.Body, buf); err != nil {
slog.Log(slog.Eventf(copyErrSeverity(err), req, "Couldn't send response body", err))
}
}
}
})
}