-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmirror.go
More file actions
293 lines (249 loc) · 8.11 KB
/
mirror.go
File metadata and controls
293 lines (249 loc) · 8.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
package httpmirror
import (
"context"
"errors"
"io"
"net"
"net/http"
"strings"
"sync"
"time"
"github.com/OpenCIDN/cidn/pkg/clientset/versioned"
informers "github.com/OpenCIDN/cidn/pkg/informers/externalversions/task/v1alpha1"
"github.com/wzshiming/sss"
"golang.org/x/sync/singleflight"
)
// MirrorHandler is the main HTTP handler that processes requests and manages caching.
//
// It acts as a reverse proxy, optionally caching responses in a remote storage backend.
// When RemoteCache is configured, it caches files and redirects clients to signed URLs.
// When RemoteCache is nil, it proxies requests directly to the source.
//
// The handler supports both simple proxying and advanced features like:
// - Cloud storage caching via RemoteCache
// - CIDN-based distributed blob management
// - Content freshness checking with CheckSyncTimeout
// - Host extraction from URL path with HostFromFirstPath
// - File suffix blocking via BlockSuffix
type MirrorHandler struct {
// RemoteCache is the cache of the remote file system.
// When set, files are cached in the storage backend and clients
// are redirected to signed URLs. When nil, requests are proxied directly.
RemoteCache *sss.SSS
// LinkExpires is the expiration duration for signed URLs.
// Only used when RemoteCache is configured.
// Default should be 24 hours.
LinkExpires time.Duration
// BaseDomain is the domain name suffix to filter requests.
// If set, only requests to hosts ending with this suffix are processed.
// Leave empty to process all valid domain requests.
BaseDomain string
// Client is the HTTP client used for requests to source servers.
// If nil, a default client with ProxyDial will be created.
Client *http.Client
// ProxyDial specifies the optional proxy dial function for
// establishing transport connections to source servers.
ProxyDial func(context.Context, string, string) (net.Conn, error)
// NotFound is the handler for requests that don't match any proxy rules.
// If nil, http.NotFound is used.
NotFound http.Handler
// Logger is used for error and informational logging.
// If nil, no logging is performed.
Logger Logger
// CheckSyncTimeout is the timeout for checking if cached content
// is synchronized with the source. When > 0, the handler verifies
// that cached files match the source size before serving.
// Set to 0 to disable sync checking.
CheckSyncTimeout time.Duration
// Host is the target host for all requests.
Host string
// HostFromFirstPath enables extracting the target host from the first
// path segment instead of the Host header.
// When true, URLs like /example.com/path/file become requests to
// https://example.com/path/file
HostFromFirstPath bool
// BlockSuffix is a list of file suffixes to block.
// Requests for files ending with these suffixes return 403 Forbidden.
// Example: []string{".exe", ".msi"}
BlockSuffix []string
// NoRedirect disables HTTP redirects to signed URLs for cached content.
// When true, the handler serves cached content directly instead of
// redirecting clients to signed URLs from RemoteCache.
// This is useful for clients that don't handle redirects well or when
// you want the proxy to serve all traffic directly.
NoRedirect bool
group singleflight.Group
// TeeResponse is used to tee the response body for caching while serving.
// When true, the handler will write the response to a tee while
// caching it in RemoteCache. This allows for streaming responses to clients
// while simultaneously caching them.
TeeResponse bool
teeCache sync.Map
// CIDNClient is the Kubernetes client for CIDN integration.
// When set along with RemoteCache, enables distributed blob management.
CIDNClient versioned.Interface
// CIDNBlobInformer watches for CIDN Blob resource changes.
// Used to monitor blob sync status when CIDN is enabled.
CIDNBlobInformer informers.BlobInformer
// CIDNDestination is the destination name for CIDN blobs.
// Typically set to the storage backend scheme (e.g., "s3").
CIDNDestination string
// CIDNMaximumRunning is the maximum number of running CIDN blob sync operations.
CIDNMaximumRunning int64
// CIDNMinimumChunkSize is the minimum chunk size for CIDN blob sync operations.
CIDNMinimumChunkSize int64
}
// Logger provides a simple logging interface for the mirror handler.
type Logger interface {
// Println logs a message with the provided arguments.
Println(v ...interface{})
}
// ServeHTTP implements the http.Handler interface.
// It processes HTTP GET and HEAD requests, optionally caching responses.
//
// Request processing:
// 1. Validates request method (only GET and HEAD allowed)
// 2. Extracts target host and path
// 3. Applies filters (BlockSuffix, BaseDomain, valid domain check)
// 4. Routes to cacheResponse if RemoteCache is set, otherwise directResponse
//
// Returns:
// - 405 Method Not Allowed for non-GET/HEAD requests
// - 403 Forbidden for blocked suffixes
// - 404 Not Found for invalid paths or domains
// - 302 Found (redirect) for cached files
// - 500 Internal Server Error for failures
// - 200 OK for successful proxied or cached responses
func (m *MirrorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet && r.Method != http.MethodHead {
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}
r.URL.Path = cleanPath(r.URL.Path)
urlpath := r.URL.Path
if len(urlpath) == 0 || strings.HasSuffix(urlpath, "/") {
m.notFoundResponse(w, r)
return
}
if len(m.BlockSuffix) != 0 {
for _, suffix := range m.BlockSuffix {
if strings.HasSuffix(urlpath, suffix) {
http.Error(w, "Forbidden", http.StatusForbidden)
return
}
}
}
if m.Host != "" {
r.Host = m.Host
}
host := r.Host
if m.HostFromFirstPath {
paths := strings.Split(urlpath[1:], "/")
host = paths[0]
urlpath = "/" + strings.Join(paths[1:], "/")
if urlpath == "/" {
m.notFoundResponse(w, r)
return
}
r.Host = host
r.URL.Path = urlpath
}
if !strings.Contains(host, ".") {
m.notFoundResponse(w, r)
return
}
if m.BaseDomain != "" {
if !strings.HasSuffix(host, m.BaseDomain) {
m.notFoundResponse(w, r)
return
}
host = host[:len(r.Host)-len(m.BaseDomain)]
}
r.RequestURI = ""
r.URL.Host = host
r.URL.Scheme = "https"
r.URL.RawQuery = ""
r.URL.ForceQuery = false
if m.Logger != nil {
m.Logger.Println("Request", r.URL)
}
if m.RemoteCache == nil {
m.directResponse(w, r)
return
}
m.cacheResponse(w, r)
return
}
func (m *MirrorHandler) directResponse(w http.ResponseWriter, r *http.Request) {
resp, err := m.client().Do(r)
if err != nil {
m.errorResponse(w, r, err)
return
}
defer resp.Body.Close()
header := w.Header()
for k, v := range resp.Header {
if _, ok := ignoreHeader[k]; ok {
continue
}
header[k] = v
}
if resp.StatusCode != http.StatusOK {
w.WriteHeader(resp.StatusCode)
}
if r.Method == http.MethodGet {
var body io.Reader = resp.Body
contentLength := resp.ContentLength
if contentLength > 0 {
body = io.LimitReader(body, contentLength)
}
if m.Logger != nil {
m.Logger.Println("Response", r.URL, contentLength)
}
_, err = io.Copy(w, body)
if err != nil {
if !errors.Is(err, io.EOF) {
if m.Logger != nil {
m.Logger.Println("Copy error", r.URL, err)
}
}
return
}
}
}
func (m *MirrorHandler) errorResponse(w http.ResponseWriter, r *http.Request, err error) {
e := err.Error()
if m.Logger != nil {
m.Logger.Println(e)
}
http.Error(w, e, http.StatusInternalServerError)
}
func (m *MirrorHandler) notFoundResponse(w http.ResponseWriter, r *http.Request) {
if m.NotFound != nil {
m.NotFound.ServeHTTP(w, r)
} else {
http.NotFound(w, r)
}
}
var ignoreHeader = map[string]struct{}{
"Connection": {},
"Server": {},
}
func (m *MirrorHandler) client() *http.Client {
if m.Client != nil {
return m.Client
}
return &http.Client{
Transport: &http.Transport{
DialContext: m.proxyDial,
},
}
}
func (m *MirrorHandler) proxyDial(ctx context.Context, network, address string) (net.Conn, error) {
proxyDial := m.ProxyDial
if proxyDial == nil {
var dialer net.Dialer
proxyDial = dialer.DialContext
}
return proxyDial(ctx, network, address)
}