-
-
Notifications
You must be signed in to change notification settings - Fork 41
/
cmd_server.go
350 lines (303 loc) · 7.94 KB
/
cmd_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
//
// We present ourselves as a HTTP-server.
//
// We assume that *.tunnel.example.com will point to us,
// such that we receive requests for all names.
//
// When a request comes in for the host "foo.tunnel.example.com"
//
// 1. We squirt the incoming request down the MQ topic clients/foo.
//
// 2. We then await a reply, for up to 10 seconds.
//
// If we receive it great.
// Otherwise we return an error.
//
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"net"
"net/http"
"net/http/httputil"
"strings"
"time"
MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/google/subcommands"
)
//
// serveCmd is the structure for this sub-command.
//
type serveCmd struct {
// The host we bind upon
bindHost string
// MQ conneciton
mq MQTT.Client
// the port we bind upon
bindPort int
// The port MQ listens upon
mqPort int
}
// Name returns the name of this sub-command.
func (p *serveCmd) Name() string { return "serve" }
// Synopsis returns the brief description of this sub-command
func (p *serveCmd) Synopsis() string { return "Launch the HTTP server." }
// Usage returns details of this sub-command.
func (p *serveCmd) Usage() string {
return `serve [options]:
Launch the HTTP server for proxying via our MQ-connection to the clients.
`
}
// SetFlags configures the flags this sub-command accepts.
func (p *serveCmd) SetFlags(f *flag.FlagSet) {
f.IntVar(&p.bindPort, "port", 8080, "The port to bind upon.")
f.IntVar(&p.mqPort, "mq-port", 1883, "The MQ port.")
f.StringVar(&p.bindHost, "host", "127.0.0.1", "The IP to listen upon.")
}
//
// RemoteIP retrieves the remote IP address of the requesting HTTP-client.
//
// This is sent to the client, for logging purposes.
//
func RemoteIP(request *http.Request) string {
//
// Get the X-Forwarded-For header, if present.
//
xForwardedFor := request.Header.Get("X-Forwarded-For")
//
// No forwarded IP? Then use the remote address directly.
//
if xForwardedFor == "" {
ip, _, _ := net.SplitHostPort(request.RemoteAddr)
return ip
}
entries := strings.Split(xForwardedFor, ",")
address := strings.TrimSpace(entries[0])
// Remove the port - TODO: IPv6.
if strings.Contains(address, ":") {
tmp := strings.Split(address, ":")
address = tmp[0]
}
return (address)
}
//
// HTTPHandler is the core of our server.
//
// This function is invoked for all accesses.
//
func (p *serveCmd) HTTPHandler(w http.ResponseWriter, r *http.Request) {
//
// See which vhost the connection was sent to, we assume that
// the variable part will be the start of the hostname, which will
// be split by "."
//
// i.e. "foo.tunnel.steve.fi" has a name of "foo".
//
host := r.Host
if strings.Contains(host, ".") {
hsts := strings.Split(host, ".")
host = hsts[0]
}
//
// Dump the request to plain-text.
//
requestDump, err := httputil.DumpRequest(r, true)
fmt.Printf("Sending request to remote name %s\n", host)
if err != nil {
fmt.Fprintf(w, "Error converting the incoming request to plain-text: %s\n", err.Error())
fmt.Printf("Error converting the incoming request to plain-text: %s\n", err.Error())
return
}
//
// This is the structure we'll send to the client.
//
var req Request
//
// Add the actual request.
//
req.Request = string(requestDump)
//
// Add the source-IP from which it was received.
//
req.Source = RemoteIP(r)
//
// Convert the structure to a JSON message, so we can send it down
// the queue.
//
toSend, err := json.Marshal(req)
if err != nil {
fmt.Fprintf(w, "Error encoding the request as JSON: %s\n", err.Error())
fmt.Printf("Error encoding the request as JSON: %s\n", err.Error())
return
}
//
// Publish the JSON object to the topic that we believe the client
// will be listening upon.
//
token := p.mq.Publish("clients/"+host, 0, false, string(toSend))
token.Wait()
//
// The (complete) response from the client will be placed here.
//
response := ""
//
// Subscribe to the topic.
//
subToken := p.mq.Subscribe("clients/"+host, 0, func(client MQTT.Client, msg MQTT.Message) {
//
// This function will be executed when a message is received
//
// To avoid loops we're making sure that the client publishes
// its response with a specific-prefix, so that it doesn't
// treat it as a request to be made.
//
// That means that we can identify it here too.
//
tmp := string(msg.Payload())
if strings.HasPrefix(tmp, "X-") {
response = tmp[2:]
}
})
subToken.Wait()
//
// Did we get an error subscribing for the reply?
//
if subToken.Error() != nil {
fmt.Printf("Error subscribing to clients/%s - %s\n", host, subToken.Error())
fmt.Fprintf(w, "Error subscribing to clients/%s - %s\n", host, subToken.Error())
return
}
//
// We now busy-wait until we have a reply.
//
// We wait for up to ten seconds before deciding the client
// is either a) offline, or b) failing.
//
count := 0
for len(response) == 0 && count < 40 {
//
// Sleep .25 seconds; max count 40, result: 10 seconds.
//
fmt.Printf("Awaiting a reply ..\n")
time.Sleep(250 * time.Millisecond)
count++
}
//
// Unsubscribe from the topic, regardless of whether we received
// a response or note.
//
// Just to cut down on resource-usage.
//
unsubToken := p.mq.Unsubscribe("clients/" + host)
unsubToken.Wait()
if unsubToken.Error() != nil {
fmt.Printf("Failed to unsubscribe from clients/%s - %s\n",
host, unsubToken.Error())
}
//
// If the length is empty then that means either:
//
// 1. We didn't get a reply because the remote host was slow.
//
// 2. Nothing is listening on the topic, so the client is dead.
//
if len(response) == 0 {
//
// Failure-response.
//
// NOTE: This is a "complete" response.
//
response = `HTTP/1.0 503 OK
Content-type: text/html; charset=UTF-8
Connection: close
<!DOCTYPE html>
<html>
<body>
<p>We didn't receive a reply from the remote host, despite waiting 10 seconds.</p>
</body>
</html>
`
}
//
// The response from the client will be:
//
// HTTP/1.0 200 OK
// Header: blah
// Date: blah
// [newline]
// <html>
// ..
//
// i.e. It will contain a full-response, headers, and body.
// So we need to use hijacking to return that to the caller.
//
hj, ok := w.(http.Hijacker)
if !ok {
http.Error(w, "Webserver doesn't support hijacking", http.StatusInternalServerError)
fmt.Printf("Webserver doesn't support hijacking")
return
}
conn, bufrw, err := hj.Hijack()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
fmt.Printf("Error running hijack:%s", err.Error())
return
}
//
// Send the reply, and close the connection:
//
fmt.Fprintf(bufrw, "%s", response)
bufrw.Flush()
conn.Close()
}
// Execute is the entry-point to this sub-command.
func (p *serveCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus {
//
// Connect to our MQ instance.
//
mq := fmt.Sprintf("tcp://localhost:%d", p.mqPort)
fmt.Printf("Connecting to MQ %s\n", mq)
opts := MQTT.NewClientOptions().AddBroker(mq)
p.mq = MQTT.NewClient(opts)
if token := p.mq.Connect(); token.Wait() && token.Error() != nil {
fmt.Printf("Failed to connect to MQ-server: %s\n", token.Error())
return 1
}
//
// We present a HTTP-server, and we handle all incoming
// requests (both in terms of path and method).
//
http.HandleFunc("/", p.HTTPHandler)
//
// Show where we'll bind
//
bind := fmt.Sprintf("%s:%d", p.bindHost, p.bindPort)
fmt.Printf("Launching the server on http://%s\n", bind)
//
// We want to make sure we handle timeouts effectively by using
// a non-default http-server
//
// NOTE: The timeouts are a little generous, considering our
// proxy to the client will timeout after 10 seconds..
//
srv := &http.Server{
Addr: bind,
ReadTimeout: 300 * time.Second,
WriteTimeout: 300 * time.Second,
}
//
// Launch the server.
//
err := srv.ListenAndServe()
if err != nil {
fmt.Printf("\nError launching our HTTP-server\n:%s\n",
err.Error())
return 1
}
//
// Not reached.
//
return 0
}