Skip to content

Commit

Permalink
learning the server code
Browse files Browse the repository at this point in the history
  • Loading branch information
davidnewhall committed May 29, 2023
1 parent 388ff87 commit 02427ae
Show file tree
Hide file tree
Showing 11 changed files with 274 additions and 303 deletions.
5 changes: 1 addition & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: build server client test
.PHONY: build server client

build: server client

Expand All @@ -7,6 +7,3 @@ server:

client:
go build ./cmd/wsp_client

test:
go run ./examples/test_api/main.go
28 changes: 21 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,27 @@
WS PROXY
========
Mulery
======

This is a reverse HTTP proxy over websockets.
The aim is to securely make call to internal APIs from outside.
**This project is under development and not yet ready for consumption.**

The detailed design is described at the series of [Reverse HTTP proxy over WebSocket in Go](https://dev.to/hgsgtk/reverse-http-proxy-over-websocket-in-go-part-1-13n4) on dev.to.
The idea behind what's beind built here is a server that can handle thousands of simultaneous client connections.
The server is essentially a command and control center for the clients. A simple server is provided, as a library
and a binary. The server is only useful with the client library that is not provided as a binary. You need to instrument
your own logic. The primary use case is to proxy http requests back into the http server running on the client.

How does it works
-----------------
You run the server. Then you run 5000 clients that make a persistent connect to the server and register themselves with
a configurable id. You can now send http requests to the server with a special header that contains the client's registered
ID. The server proxies the web request back into the client through the previously-established presistent connection.

We use this so our clients do not have to open a port (port forward) for our server to communicate with the software they
deployed on premesis. It allows our servers to distribute load and reliably reach back into the running end-user application.

How
---

Websockets and some reverse-proxy engineering.

Old Readme
----------

a WSP client runs in the internal network ( alongside the APIs )
and connects to a remote WSP server with HTTP websockets.
Expand Down
13 changes: 6 additions & 7 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ type Client struct {

// NewClient creates a new Client.
func NewClient(config *Config) *Client {
client := new(Client)
client.Config = config
client.client = &http.Client{}
client.dialer = &websocket.Dialer{}
client.pools = make(map[string]*Pool)

return client
return &Client{
Config: config,
client: &http.Client{},
dialer: &websocket.Dialer{},
pools: make(map[string]*Pool),
}
}

// Start the Proxy.
Expand Down
34 changes: 16 additions & 18 deletions client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"gopkg.in/yaml.v2"
)

// Config configures an Proxy.
// Config is the required data to initialize a client proxy connection.
type Config struct {
ID string
Targets []string
Expand All @@ -18,28 +18,17 @@ type Config struct {
}

// NewConfig creates a new ProxyConfig.
func NewConfig() (*Config, error) {
config := new(Config)

id, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("unable to get unique id: %w", err)
func NewConfig() *Config {
return &Config{
Targets: []string{"ws://127.0.0.1:8080/register"},
PoolIdleSize: 10,
PoolMaxSize: 100,
}

config.ID = id.String()
config.Targets = []string{"ws://127.0.0.1:8080/register"}
config.PoolIdleSize = 10
config.PoolMaxSize = 100

return config, nil
}

// LoadConfiguration loads configuration from a YAML file.
func LoadConfiguration(path string) (*Config, error) {
config, err := NewConfig()
if err != nil {
return nil, err
}
config := NewConfig()

bytes, err := os.ReadFile(path)
if err != nil {
Expand All @@ -51,5 +40,14 @@ func LoadConfiguration(path string) (*Config, error) {
return nil, fmt.Errorf("failed to parse configuration: %w", err)
}

if config.ID == "" {
id, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("unable to get unique id: %w", err)
}

config.ID = id.String()
}

return config, nil
}
13 changes: 6 additions & 7 deletions client/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,14 @@ func (connection *Connection) Connect(ctx context.Context) error {
return nil
}

// the main loop it :
// - wait to receive HTTP requests from the Server
// - execute HTTP requests
// - send HTTP response back to the Server
// serve is the main loop, it:
// - Waits to receive HTTP requests from the Server.
// - Executes HTTP requests.
// - Sends HTTP responses back to the Server.
//
// As in the server code there is no buffering of HTTP request/response body.
// As in the server if any error occurs the connection is closed/throwed.
// As in the server if any error occurs the connection is closed/thrown.
func (connection *Connection) serve(ctx context.Context) {
// If there's any
defer connection.Close()

// Keep connection alive. This go routine may leak.
Expand Down Expand Up @@ -188,7 +187,7 @@ func (connection *Connection) serve(ctx context.Context) {
// Returns true if there's an error.
func (connection *Connection) error(msg string) bool {
resp := wsp.NewHTTPResponse()
resp.StatusCode = 527
resp.StatusCode = wsp.ClientErrorCode

log.Println(msg)

Expand Down
57 changes: 0 additions & 57 deletions examples/test_api/main.go

This file was deleted.

12 changes: 5 additions & 7 deletions response.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package wsp

import (
"fmt"
"log"
"net/http"
)
Expand All @@ -13,7 +12,11 @@ type HTTPResponse struct {
ContentLength int64 `json:"contentLength"`
}

const ProxyErrorCode = 526
// Custom HTTP error codes shared by client and server.
const (
ProxyErrorCode = 526
ClientErrorCode = 527
)

// SerializeHTTPResponse create a new HTTPResponse from a http.Response.
func SerializeHTTPResponse(resp *http.Response) *HTTPResponse {
Expand All @@ -36,8 +39,3 @@ func ProxyError(w http.ResponseWriter, err error) {
log.Println(err)
http.Error(w, err.Error(), ProxyErrorCode)
}

// ProxyErrorf log error and return a HTTP 526 error with the message.
func ProxyErrorf(w http.ResponseWriter, format string, args ...interface{}) {
ProxyError(w, fmt.Errorf(format, args...))
}
22 changes: 9 additions & 13 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package server

import (
"fmt"
"net/http"
"os"
"strconv"
"time"

"gopkg.in/yaml.v2"
Expand All @@ -16,22 +16,18 @@ type Config struct {
Timeout time.Duration
IdleTimeout time.Duration
SecretKey string
}

// GetAddr returns the address to specify a HTTP server address.
func (c Config) GetAddr() string {
return c.Host + ":" + strconv.Itoa(c.Port)
// If a KeyValidator method is provided, then Secretkey is ignored.
KeyValidator func(http.Header) error
}

// NewConfig creates a new ProxyConfig.
func NewConfig() *Config {
config := new(Config)
config.Host = "127.0.0.1"
config.Port = 8080
config.Timeout = 1000 // millisecond
config.IdleTimeout = 60000

return config
return &Config{
Host: "127.0.0.1",
Port: 8080,
Timeout: time.Second,
IdleTimeout: time.Minute,
}
}

// LoadConfiguration loads configuration from a YAML file.
Expand Down
43 changes: 18 additions & 25 deletions server/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const (
// Connection manages a single websocket connection from the peer.
// wsp supports multiple connections from a single peer at the same time.
type Connection struct {
pool *Pool
pool *Pool // the pool this connection belongs to.
ws *websocket.Conn
status ConnectionStatus
idleSince time.Time
Expand All @@ -51,16 +51,17 @@ type Connection struct {
// NewConnection returns a new Connection.
func NewConnection(pool *Pool, ws *websocket.Conn) *Connection {
// Initialize a new Connection
conn := new(Connection)
conn.pool = pool
conn.ws = ws
conn.nextResponse = make(chan chan io.Reader)
conn.status = Idle
conn := &Connection{
pool: pool,
ws: ws,
nextResponse: make(chan chan io.Reader),
status: Idle,
}

// Mark that this connection is ready to use for relay
// Mark that this connection is ready to use for relay.
conn.Release()

// Start to listen to incoming messages over the WebSocket connection
// Start to listen to incoming messages over the WebSocket connection.
go conn.read()

return conn
Expand Down Expand Up @@ -96,18 +97,18 @@ func (connection *Connection) read() {
}

if connection.status != Busy {
// We received a wild unexpected message
// We received a wild unexpected message, but we're goin to silently ignore it.
break
}

// When it gets here, it is expected to be either a HttpResponse or a HttpResponseBody has been returned.
//
// Next, it waits to receive the value from the Connection.proxyRequest function
// Next, it waits to receive the value from the Connection.proxyRequest function.
// that is invoked in the "server" thread.
// https://github.com/hgsgtk/wsp/blob/29cc73bbd67de18f1df295809166a7a5ef52e9fa/server/connection.go#L157
resp := <-connection.nextResponse
if resp == nil {
// We have been unlocked by Close()
// We have been unlocked by Close().
break
}

Expand All @@ -129,13 +130,6 @@ func (connection *Connection) proxyRequest(w http.ResponseWriter, r *http.Reques
if err != nil {
return fmt.Errorf("unable to serialize request: %w", err)
}
// i.e.
// {
// "Method":"GET",
// "URL":"http://localhost:8081/hello",
// "Header":{"Accept":["*/*"],"User-Agent":["curl/7.77.0"],"X-Proxy-Destination":["http://localhost:8081/hello"]},
// "ContentLength":0
// }

// [2]: Send the HTTP request to the peer
// Send the serialized HTTP request to the peer
Expand All @@ -154,7 +148,7 @@ func (connection *Connection) proxyRequest(w http.ResponseWriter, r *http.Reques
}

if err := bodyWriter.Close(); err != nil {
return fmt.Errorf("unable to pipe request body (close): %w", err)
return fmt.Errorf("unable to close request body pipe: %w", err)
}

// [3]: Wait the HTTP response is ready
Expand Down Expand Up @@ -212,14 +206,14 @@ func (connection *Connection) proxyRequest(w http.ResponseWriter, r *http.Reques
close(responseChannel)
}

return fmt.Errorf("unable to get http response body reader : %w", err)
return fmt.Errorf("unable to get http response body reader: %w", err)
}

// [6]: Read the HTTP response body from the peer
// Pipe the HTTP response body right from the remote Proxy to the client
if _, err := io.Copy(w, responseBodyReader); err != nil {
close(responseBodyChannel)
return fmt.Errorf("unable to pipe response body : %w", err)
return fmt.Errorf("unable to pipe response body: %w", err)
}

// Notify read() that we are done reading the response body
Expand Down Expand Up @@ -279,12 +273,11 @@ func (connection *Connection) close() {

log.Printf("Closing connection from %s", connection.pool.id)

// This one will be executed *before* lock.Unlock()
defer func() { connection.status = Closed }()

// Unlock a possible read() wild message
close(connection.nextResponse)

// Close the underlying TCP connection
connection.ws.Close()

// This must be executed *before* lock.Unlock()
connection.status = Closed
}
Loading

0 comments on commit 02427ae

Please sign in to comment.