The remote server was unreachable.
+ +` + + // + // Make the connection to our proxied host. + // + d := net.Dialer{} + con, err := d.Dial("tcp", p.expose) + + // + // OK we have a default result saved, which shows an error-page. + // + // If we didn't actually get an error then save the real response. + // + if err == nil { + + // + // Make the request + // + con.Write(fetch) + + // + // Read the reply. + // + var reply bytes.Buffer + io.Copy(&reply, con) + + // + // Store. + // + result = string(reply.Bytes()) } - fmt.Printf("No known prefix found, using %s\n", defaultPrefix) - return defaultPrefix + str + + // + // Send the reply back to the MQ topic. + // + fmt.Printf("Returning response:\n%s\n", result) + token := client.Publish("clients/"+p.name, 0, false, "X-"+result) + token.Wait() } +// // Execute is the entry-point to this sub-command. +// +// 1. Connect to the tunnel-host. +// +// 2. Subscribe to MQ and await the reception of URLs to fetch. +// +// (When one is received it will be handled via onMessage.) +// func (p *clientCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus { - p.mutex = &sync.Mutex{} - // // Ensure that we have setup variables // if p.expose == "" { - fmt.Printf("You must specify the host:port to expose.\n") + fmt.Printf("You must specify the local host:port to expose.\n") return 1 } if p.tunnel == "" { - fmt.Printf("You must specify the URL of the tunnel end-point.\n") + fmt.Printf("You must specify the tunnel end-point.\n") return 1 } + // + // This is optional, but useful. + // if p.name == "" { - // or error handling uid := uuid.NewV4() p.name = uid.String() } - // Not so clever hack to deal with malformed (schemaless) urls - // Schema must be set in the string to be parsed as url.Parse enforces correct url format - allowedPrefixes := []string{"ws://", "wss://"} - p.tunnel = checkUrlSchema(p.tunnel, "ws://", allowedPrefixes) - - // Parse url; - parsedUrl, err := url.Parse(p.tunnel) - if err != nil { - fmt.Printf("Cannot parse url %s: %s\n", p.tunnel, err) - } - // - // These are the details of the tunneller-server + // Create a channel so that we can be disconnected cleanly. // - u := url.URL{Scheme: parsedUrl.Scheme, Host: parsedUrl.Host, Path: "/" + p.name} - fmt.Printf("Connecting to %s\n", u.String()) + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) // - // connect to it + // Setup the server-address. // - tls_config := tls.Config{InsecureSkipVerify: p.insecure} - - ws_dial := websocket.Dialer{ - TLSClientConfig: &tls_config, - } - - c, resp, err := ws_dial.Dial(u.String(), nil) - if err != nil { - - if err == websocket.ErrBadHandshake { - fmt.Printf("\tHandshake failed with status %d\n", resp.StatusCode) - - defer resp.Body.Close() - var body []byte - body, err = ioutil.ReadAll(resp.Body) - if err == nil { - fmt.Printf("\t%s\n\n", body) - } - } - fmt.Printf("Connection failed: %s", err) - return 1 - } - defer c.Close() + opts := MQTT.NewClientOptions().AddBroker(fmt.Sprintf("tcp://%s:1883", p.tunnel)) // - // Connected now, show instructions + // Set our name. // - fmt.Printf("Visit http://%s.%s to see the local content from %s\n", - p.name, parsedUrl.Host, p.expose) + opts.SetClientID(p.name) - // Loop for messages - for { - p.mutex.Lock() - msgType, message, err := c.ReadMessage() - p.mutex.Unlock() + // + // Connected now, show instructions + // + fmt.Printf("tunneller client launched\n") + fmt.Printf("=========================\n") + fmt.Printf("Visit http://%s.%s/ to see the local content from %s\n", + p.name, p.tunnel, p.expose) - if err != nil { - fmt.Printf("Error reading the message from the socket: %s", err.Error()) - return 1 - } + // + // Once we're connected we will subscribe to the named topic. + // + opts.OnConnect = func(c MQTT.Client) { - if msgType == websocket.PingMessage { - fmt.Printf("Got pong-reply\n") - p.mutex.Lock() - c.WriteMessage(websocket.PongMessage, nil) - p.mutex.Unlock() + topic := "clients/" + p.name + if token := c.Subscribe(topic, 0, p.onMessage); token.Wait() && token.Error() != nil { + fmt.Printf("Failed to subscribe to the MQ-topic:%s\n", token.Error()) + os.Exit(1) } - if msgType == websocket.TextMessage { - - // - // At this point we've received a message. - // - // Show it - // - fmt.Printf("Received incoming request:\n%s\n", message) - - // - // Make the connection to our proxied host. - // - d := net.Dialer{} - con, err := d.Dial("tcp", p.expose) - if err != nil { - // - // Connection refused talking to the host - // - res := `HTTP 200 OK -Connection: close + } -Remote server was unreachable -` - safe := b64.StdEncoding.EncodeToString([]byte(res)) - - p.mutex.Lock() - err = c.WriteMessage(websocket.TextMessage, []byte(safe)) - if err != nil { - fmt.Printf("Error writing our error message to the socket:%s\n", err.Error()) - } - p.mutex.Unlock() - continue - } - con.Write(message) - - // - // Read the reply - // - var reply bytes.Buffer - io.Copy(&reply, con) - - // - // Send it back - // - safe := b64.StdEncoding.EncodeToString(reply.Bytes()) - p.mutex.Lock() - err = c.WriteMessage(websocket.TextMessage, []byte(safe)) - if err != nil { - fmt.Printf("Error writing our response to the socket:%s\n", err.Error()) - } - - p.mutex.Unlock() - fmt.Printf("Sent reply ..\n") - } + // + // Actually establish the MQ connection. + // + client := MQTT.NewClient(opts) + if token := client.Connect(); token.Wait() && token.Error() != nil { + fmt.Printf("Failed to connect to the MQ-host %s\n", token.Error()) + return 1 } + + // + // Wait until we're interrupted. + // + <-c + + // + // Not reached. + // + return 0 } diff --git a/cmd_server.go b/cmd_server.go index 64be153..c9b10cd 100644 --- a/cmd_server.go +++ b/cmd_server.go @@ -1,32 +1,35 @@ +// +// We present ourselves as a HTTP-server. +// +// We assume that *.tunnel.example.com will point to us, +// such that we receive requests for all names. +// +// When a request comes in for the host "foo.tunnel.example.com" +// +// 1. we squirt the incoming request down the MQ topic clients/foo. +// +// 2. We then await a reply, for up to 10 seconds. +// +// If we receive it great. +// +// Otherwise we return an error. +// + package main import ( "context" - b64 "encoding/base64" "flag" "fmt" "net/http" "net/http/httputil" "strings" - "sync" "time" + MQTT "github.com/eclipse/paho.mqtt.golang" "github.com/google/subcommands" - "github.com/gorilla/websocket" ) -// -// Each incoming websocket-connection will be allocated an instance of this -// because we want to ensure we read/write safely. -// -type connection struct { - // mutex for safety - mutex *sync.RWMutex - - // the socket to use to talk to the remote peer. - socket *websocket.Conn -} - // // serveCmd is the structure for this sub-command. // @@ -34,14 +37,11 @@ type serveCmd struct { // The host we bind upon bindHost string + // MQ conneciton + mq MQTT.Client + // the port we bind upon bindPort int - - // mutex for safety - assignedMutex *sync.RWMutex - - // keep track of name/connection pairs - assigned map[string]*connection } // Name returns the name of this sub-command. @@ -53,7 +53,7 @@ func (p *serveCmd) Synopsis() string { return "Launch the HTTP server." } // Usage returns details of this sub-command. func (p *serveCmd) Usage() string { return `serve [options]: - Launch the HTTP server for proxying via our clients + Launch the HTTP server for proxying via our MQ-connection to the clients. ` } @@ -63,58 +63,22 @@ func (p *serveCmd) SetFlags(f *flag.FlagSet) { f.StringVar(&p.bindHost, "host", "127.0.0.1", "The IP to listen upon.") } -// -// We want to make sure that we check the origin of any websocket-connections -// and bump the size of the buffers. -// -var upgrader = websocket.Upgrader{ - ReadBufferSize: 2048, - WriteBufferSize: 2048, - CheckOrigin: func(r *http.Request) bool { return true }, -} - // // HTTPHandler is the core of our server. // -// This function is invoked for all accesses. However it is complicated -// because it will be invoked in two different roles: -// -// http://foo.tunneller.example.com/blah -// -// -> Route the request to the host connected with name "foo". +// This function is invoked for all accesses. // -// ws://tunneller.example.com/foo -// -// -> Associate the name 'foo' with the long-lived web-socket connection -// -// We can decide at run-time if we're invoked with a HTTP-connection or -// a WS:// connection via the `Connection` header. +// If a request is made for our public-key that is handled, otherwise we +// defer to sending requests to connected clients via MQ. // func (p *serveCmd) HTTPHandler(w http.ResponseWriter, r *http.Request) { - // - // See if we're upgrading to a websocket connection. - // - con := r.Header.Get("Connection") - if strings.Contains(con, "Upgrade") { - p.HTTPHandlerWS(w, r) - } else { - p.HTTPHandlerHTTP(w, r) - } -} - -// -// HTTPHandlerHTTP is invoked to forward an incoming HTTP-request -// to the remote host which is tunnelling it. -// -func (p *serveCmd) HTTPHandlerHTTP(w http.ResponseWriter, r *http.Request) { - // // See which vhost the connection was sent to, we assume that // the variable part will be the start of the hostname, which will // be split by "." // - // i.e. "foo.tunneller.steve.fi" has a name of "foo". + // i.e. "foo.tunnel.steve.fi" has a name of "foo". // host := r.Host if strings.Contains(host, ".") { @@ -122,17 +86,6 @@ func (p *serveCmd) HTTPHandlerHTTP(w http.ResponseWriter, r *http.Request) { host = hsts[0] } - // - // Find the client to which to route the request. - // - p.assignedMutex.Lock() - sock := p.assigned[host] - p.assignedMutex.Unlock() - if sock == nil { - fmt.Fprintf(w, "The request cannot be made to '%s' as the host is offline!", host) - return - } - // // Dump the request to plain-text // @@ -145,57 +98,104 @@ func (p *serveCmd) HTTPHandlerHTTP(w http.ResponseWriter, r *http.Request) { } // - // Forward it on. + // Publish the request we've received to the topic that we + // believe the client will be listening upon. // - fmt.Printf("Locking mutex\n") - sock.mutex.Lock() - fmt.Printf("Locked mutex\n") - err = sock.socket.WriteMessage(websocket.TextMessage, []byte(requestDump)) - if err != nil { - fmt.Printf("Failed to send request down socket %s\n", err.Error()) - } - sock.mutex.Unlock() - fmt.Printf("\tRequest sent.\n") + token := p.mq.Publish("clients/"+host, 0, false, requestDump) + token.Wait() // - // Wait for the response from the client. + // The (complete) response from the client will be placed here. // response := "" - for len(response) == 0 { - fmt.Printf("Awaiting a reply ..\n") - - sock.mutex.Lock() - msgType, msg, error := sock.socket.ReadMessage() - sock.mutex.Unlock() - fmt.Printf("\tReceived something ..\n") + // + // Subscribe to the topic. + // + subToken := p.mq.Subscribe("clients/"+host, 0, func(client MQTT.Client, msg MQTT.Message) { - if error != nil { - fmt.Printf("\tError reading from websocket:%s\n", error.Error()) - fmt.Fprintf(w, "Error reading from websocket %s", error.Error()) - return - } - if msgType == websocket.TextMessage { - fmt.Printf("\tReply received.\n") - - var raw []byte - raw, err = b64.StdEncoding.DecodeString(string(msg)) - if err != nil { - fmt.Printf("Error decoding BASE64 from WS:%s\n", err.Error()) - fmt.Fprintf(w, "Error decoding BASE64 from WS:%s\n", err.Error()) - return - } - - response = string(raw) + // + // This function will be executed when a message is received + // + // To avoid loops we're making sure that the client publishes + // its response with a specific-prefix, so that it doesn't + // treat it as a request to be made. + // + // That means that we can identify it here too. + // + tmp := string(msg.Payload()) + if strings.HasPrefix(tmp, "X-") { + response = tmp[2:] } + }) + subToken.Wait() + if subToken.Error() != nil { + fmt.Printf("Error subscribing to clients/%s - %s\n", host, subToken.Error()) + fmt.Fprintf(w, "Error subscribing to clients/%s - %s\n", host, subToken.Error()) + return + } + + // + // We now busy-wait until we have a reply. + // + // We wait for up to ten seconds before deciding the client + // is either a) offline, or b) failing. + // + count := 0 + for len(response) == 0 && count < 10 { + + // + // Sleep 1 second; max count 10, result: 10 seconds. + // + fmt.Printf("Awaiting a reply ..\n") + time.Sleep(1 * time.Second) + count++ + } + + // + // Unsubscribe from the topic, regardless of whether we received + // a response or note. + // + // Just to cut down on resource-usage. + // + unsubToken := p.mq.Unsubscribe("clients/" + host) + unsubToken.Wait() + if unsubToken.Error() != nil { + fmt.Printf("Failed to unsubscribe from clients/%s - %s\n", + host, unsubToken.Error()) } // - // This is a hack. + // If the length is empty then that means either: + // + // 1. We didn't get a reply because the remote host was slow. + // + // 2. Nothing is listening on the topic, so the client is dead. + // + if len(response) == 0 { + + // + // Failure-response. + // + // NOTE: This is a "complete" response. + // + response = `HTTP/1.0 200 OK +Content-type: text/html; charset=UTF-8 +Connection: close + + + + +We didn't receive a reply from the remote host, despite waiting 10 seconds.
+ + +` + } + // // The response from the client will be: // - // HTTP 200 OK + // HTTP/1.0 200 OK // Header: blah // Date: blah // [newline] @@ -217,125 +217,28 @@ func (p *serveCmd) HTTPHandlerHTTP(w http.ResponseWriter, r *http.Request) { fmt.Printf("Error running hijack:%s", err.Error()) return } - // Don't forget to close the connection: - fmt.Fprintf(bufrw, "%s", response) - bufrw.Flush() - conn.Close() - -} - -// -// HTTPHandlerWS is invoked to handle an incoming websocket request. -// -// If a request is made for http://tunneller.example.com/blah we -// assign the name "blah" to the connection. -// -func (p *serveCmd) HTTPHandlerWS(w http.ResponseWriter, r *http.Request) { - - // - // At this point we've got a known-client. - // - // Record their ID in our connection - // - // The ID will be client-sent, for now. - // - cid := r.URL.Path[1:] - - // - // Ensure the name isn't already in-use. - // - p.assignedMutex.Lock() - tmp := p.assigned[cid] - p.assignedMutex.Unlock() - - if tmp != nil { - w.WriteHeader(http.StatusForbidden) - fmt.Fprintf(w, "The name you've chosen is already in use.") - return - - } // - // Upgrade, and handle any upgrade-errors. - // - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - fmt.Fprintf(w, "Error upgrading the connection to a web-socket %s", err.Error()) - return - } - - // - // Store their name / connection in the map. - // - p.assignedMutex.Lock() - p.assigned[cid] = &connection{mutex: &sync.RWMutex{}, socket: conn} - p.assignedMutex.Unlock() - - // - // Now we're just going to busy-loop. - // - // Ensuring that we keep the client connection alive. + // Send the reply, and close the connection: // - go func() { - // - // We're connected. - // - connected := true - - // - // Get the structure, we just set. - // - p.assignedMutex.Lock() - connection := p.assigned[cid] - p.assignedMutex.Unlock() + fmt.Fprintf(bufrw, "%s", response) + bufrw.Flush() + conn.Close() - // - // Loop until we get a disconnection. - // - for connected { - - // - // Try to write .. - // - connection.mutex.Lock() - fmt.Printf("Keepalive..\n") - err := conn.WriteMessage(websocket.PingMessage, []byte("!")) - connection.mutex.Unlock() - - // - // If/when it failed .. - // - if err != nil { - - // - // Reap the client. - // - fmt.Printf("Client gone away - freeing the name '%s'\n", cid) - p.assignedMutex.Lock() - p.assigned[cid] = nil - p.assignedMutex.Unlock() - connected = false - continue - } - - // - // Otherwise wait for the future. - // - time.Sleep(5 * time.Second) - } - }() } // Execute is the entry-point to this sub-command. func (p *serveCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) subcommands.ExitStatus { // - // Setup a mapping between connections and handlers, and ensure - // that our mutex is ready. + // Connect to our MQ instance. // - p.assigned = make(map[string]*connection) - p.assignedMutex = &sync.RWMutex{} + opts := MQTT.NewClientOptions().AddBroker("tcp://localhost:1883") + p.mq = MQTT.NewClient(opts) + if token := p.mq.Connect(); token.Wait() && token.Error() != nil { + fmt.Printf("Failed to connect to MQ-server: %s\n", token.Error()) + return 1 + } // // We present a HTTP-server, and we handle all incoming @@ -354,7 +257,12 @@ func (p *serveCmd) Execute(_ context.Context, f *flag.FlagSet, _ ...interface{}) // a non-default http-server // srv := &http.Server{ - Addr: bind, + Addr: bind, + + // + // NOTE: These are a little generous, considering our + // proxy to the client will timeout after 10 seconds.. + // ReadTimeout: 300 * time.Second, WriteTimeout: 300 * time.Second, } diff --git a/go.mod b/go.mod index 0db7232..28a9bde 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,11 @@ module github.com/skx/tunneller go 1.12 require ( + github.com/blevesearch/bleve v0.7.0 + github.com/eclipse/paho.mqtt.golang v1.2.0 github.com/google/subcommands v1.0.1 - github.com/gorilla/websocket v1.4.0 github.com/kr/pretty v0.1.0 // indirect github.com/satori/go.uuid v1.2.0 + golang.org/x/net v0.0.0-20190424024845-afe8014c977f // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect ) diff --git a/go.sum b/go.sum index d03c46b..08b5748 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,9 @@ +github.com/blevesearch/bleve v0.7.0 h1:znyZ3zjsh2Scr60vszs7rbF29TU6i1q9bfnZf1vh0Ac= +github.com/blevesearch/bleve v0.7.0/go.mod h1:Y2lmIkzV6mcNfAnAdOd+ZxHkHchhBfU/xroGIp61wfw= +github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0= +github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= github.com/google/subcommands v1.0.1 h1:/eqq+otEXm5vhfBrbREPCSVQbvofip6kIz+mX5TUH7k= github.com/google/subcommands v1.0.1/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= -github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= -github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -9,5 +11,10 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20190424024845-afe8014c977f h1:uALRiwYevCJtciRa4mKKFkrs5jY4F2OTf1D2sfi1swY= +golang.org/x/net v0.0.0-20190424024845-afe8014c977f/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/mq/README.md b/mq/README.md new file mode 100644 index 0000000..549fbe7 --- /dev/null +++ b/mq/README.md @@ -0,0 +1,36 @@ + +# Install mosquitto + +For a Debian/Ubuntu system: + + apt-get install mosquitto + + +# Configure Mosquitto + +Create `/etc/mosquitto/conf.d/acl.conf` with just the following contents: + + acl_file /etc/mosquitto/conf.d/acl.txt + +Now populate that with: + + topic readwrite clients/# + +The result of this will be that __any__ client can connect without any +username/password, and read/write to the topics beneath `clients`. + +For example client with the name `cake` can read/write to the topic +`clients/cake`. + + +## Test Subscription + +You should find that you can subscribe to the wildcard topic `clients/#` +via: + + $ mosquitto_sub -v -t clients/# + + +## Now you're good. + +Of course this does mean that clients can sniff on other user's traffic..