Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 88 additions & 122 deletions contribs/gnodev/pkg/proxy/path_interceptor.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package proxy

import (
"bufio"
"bytes"
"encoding/json"
"errors"
"fmt"
"go/parser"
"go/token"
"io"
"log/slog"
"net"
"net/http"
"net/http/httputil"
gopath "path"
"strconv"
"strings"
Expand All @@ -28,10 +27,11 @@
type PathInterceptor struct {
proxyAddr, targetAddr net.Addr

logger *slog.Logger
listener net.Listener
handlers []PathHandler
muHandlers sync.RWMutex
logger *slog.Logger
server *http.Server
reverseProxy *httputil.ReverseProxy
handlers []PathHandler
muHandlers sync.RWMutex
}

// NewPathInterceptor creates a new path proxy interceptor.
Expand All @@ -51,14 +51,31 @@
// Immediately close this listener after proxy initialization
defer targetListener.Close()

targetHost := proxyAddr.String()

proxy := &PathInterceptor{
listener: proxyListener,
logger: logger,
targetAddr: target,
proxyAddr: proxyAddr,
reverseProxy: &httputil.ReverseProxy{
Director: func(req *http.Request) {
req.URL.Scheme = "http"
req.URL.Host = targetHost
req.Host = targetHost
},
// Disable keep-alive so each request gets a fresh connection.
// The target node may restart at any time (during lazy-load reload),
// which would leave pooled connections dead.
// Cost is negligible since the target is always localhost.
Transport: &http.Transport{DisableKeepAlives: true},
},
}

proxy.server = &http.Server{

Check failure on line 74 in contribs/gnodev/pkg/proxy/path_interceptor.go

View workflow job for this annotation

GitHub Actions / Run Main (gnodev) / Go Lint / lint

G112: Potential Slowloris Attack because ReadHeaderTimeout is not configured in the http.Server (gosec)
Handler: proxy,
}

go proxy.handleConnections()
go proxy.server.Serve(proxyListener)

return proxy, nil
}
Expand All @@ -80,133 +97,84 @@
return fmt.Sprintf("%s://%s", proxy.targetAddr.Network(), proxy.targetAddr.String())
}

// handleConnections manages incoming connections to the proxy.
func (proxy *PathInterceptor) handleConnections() {
defer proxy.listener.Close()

for {
conn, err := proxy.listener.Accept()
if err != nil {
if !errors.Is(err, net.ErrClosed) {
proxy.logger.Debug("failed to accept connection", "error", err)
}
// ServeHTTP intercepts HTTP requests, extracts package paths, and forwards to the target.
func (proxy *PathInterceptor) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Handle WebSocket upgrades via raw TCP pipe
if isWebSocket(r) {
proxy.handleWebSocket(w, r)
return
}

return
}
// Read body for path interception
body, err := io.ReadAll(r.Body)
r.Body.Close()
if err != nil {
proxy.logger.Debug("body read failed", "error", err)
http.Error(w, "failed to read body", http.StatusBadGateway)
return
}

proxy.logger.Debug("new connection", "remote", conn.RemoteAddr())
go proxy.handleConnection(conn)
// Intercept paths — this may trigger a synchronous node reload
if err := proxy.handleRequest(body); err != nil {
proxy.logger.Debug("request handler warning", "error", err)
}

// Restore body for forwarding
r.Body = io.NopCloser(bytes.NewReader(body))
r.ContentLength = int64(len(body))

// Forward to the target node (fresh connection per request)
proxy.reverseProxy.ServeHTTP(w, r)
}

// handleConnection processes a single connection between client and target.
func (proxy *PathInterceptor) handleConnection(inConn net.Conn) {
logger := proxy.logger.With(slog.String("in", inConn.RemoteAddr().String()))
// handleWebSocket hijacks the client connection and pipes data to the target.
func (proxy *PathInterceptor) handleWebSocket(w http.ResponseWriter, r *http.Request) {
// Dial the target
targetConn, err := net.Dial(proxy.proxyAddr.Network(), proxy.proxyAddr.String())
if err != nil {
proxy.logger.Debug("websocket upstream dial failed", "error", err)
http.Error(w, "upstream dial failed", http.StatusBadGateway)
return
}

// Hijack the client connection
hijacker, ok := w.(http.Hijacker)
if !ok {
proxy.logger.Debug("hijacking not supported")
http.Error(w, "hijacking not supported", http.StatusInternalServerError)
targetConn.Close()
return
}

// Establish a connection to the target
outConn, err := net.Dial(proxy.proxyAddr.Network(), proxy.proxyAddr.String())
clientConn, _, err := hijacker.Hijack()
if err != nil {
logger.Error("target connection failed", "target", proxy.proxyAddr.String(), "error", err)
inConn.Close()
proxy.logger.Debug("hijack failed", "error", err)
http.Error(w, "hijack failed", http.StatusInternalServerError)
targetConn.Close()
return
}
logger = logger.With(slog.String("out", outConn.RemoteAddr().String()))

// Coordinate connection closure
var closeOnce sync.Once
closeConnections := func() {
inConn.Close()
outConn.Close()
// Forward the original upgrade request to the target
if err := r.Write(targetConn); err != nil {
clientConn.Close()
targetConn.Close()
return
}

// Setup bidirectional copying
// Bidirectional copy
var wg sync.WaitGroup
wg.Add(2)

// Response path (target -> client)
go func() {
defer wg.Done()
defer closeOnce.Do(closeConnections)

_, err := io.Copy(inConn, outConn)
if err == nil || errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) {
return // Connection has been closed
}

logger.Debug("response copy error", "error", err)
io.Copy(targetConn, clientConn)
targetConn.Close()
}()

// Request path (client -> target)
go func() {
defer wg.Done()
defer closeOnce.Do(closeConnections)

var buffer bytes.Buffer
tee := io.TeeReader(inConn, &buffer)
reader := bufio.NewReader(tee)

// Process HTTP requests
if err := proxy.processHTTPRequests(reader, &buffer, outConn); err != nil {
if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) {
return // Connection has been closed
}

if _, isNetError := err.(net.Error); isNetError {
logger.Debug("request processing error", "error", err)
return
}

// Continue processing the connection if not a network error
}

// Forward remaining data after HTTP processing
if buffer.Len() > 0 {
if _, err := outConn.Write(buffer.Bytes()); err != nil {
logger.Debug("buffer flush failed", "error", err)
}
}

// Directly pipe remaining traffic
if _, err := io.Copy(outConn, inConn); err != nil && !errors.Is(err, net.ErrClosed) {
logger.Debug("raw copy failed", "error", err)
}
io.Copy(clientConn, targetConn)
clientConn.Close()
}()

wg.Wait()
logger.Debug("connection closed")
}

// processHTTPRequests handles the HTTP request/response cycle.
func (proxy *PathInterceptor) processHTTPRequests(reader *bufio.Reader, buffer *bytes.Buffer, outConn net.Conn) error {
for {
request, err := http.ReadRequest(reader)
if err != nil {
return fmt.Errorf("read request failed: %w", err)
}

// Check for websocket upgrade
if isWebSocket(request) {
return errors.New("websocket upgrade requested")
}

// Read and process the request body
body, err := io.ReadAll(request.Body)
request.Body.Close()
if err != nil {
return fmt.Errorf("body read failed: %w", err)
}

if err := proxy.handleRequest(body); err != nil {
proxy.logger.Debug("request handler warning", "error", err)
}

// Forward the original request bytes
if _, err := outConn.Write(buffer.Bytes()); err != nil {
return fmt.Errorf("request forward failed: %w", err)
}

buffer.Reset() // Prepare for the next request
}
}

func isWebSocket(req *http.Request) bool {
Expand All @@ -223,7 +191,6 @@
return paths
}

// Add a path to
func (upaths uniqPaths) addPath(path string) {
path = cleanupPath(path)
upaths[path] = struct{}{}
Expand Down Expand Up @@ -272,9 +239,9 @@
return nil
}

// Close closes the proxy listener.
// Close closes the proxy server and listener.
func (proxy *PathInterceptor) Close() error {
return proxy.listener.Close()
return proxy.server.Close()
}

// parseRPCRequest unmarshals and processes RPC requests, returning paths.
Expand Down Expand Up @@ -342,17 +309,16 @@
switch path {
case ".app/simulate":
return handleTx(data, upaths)

case "vm/qrender", "vm/qfile", "vm/qfuncs", "vm/qeval":
path, _, _ := strings.Cut(string(data), ":") // Cut arguments out
upaths.addPath(path)
return nil

case "vm/qpkg_json":
upaths.addPath(string(data))
case "vm/qobject", "vm/qobject_json", "vm/qtype_json": // operate on already-loaded state
default:
return fmt.Errorf("unhandled: %q", path)
}

// XXX: handle more cases
return nil
}

func cleanupPath(path string) string {
Expand Down
87 changes: 87 additions & 0 deletions contribs/gnodev/pkg/proxy/path_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/gnolang/gno/contribs/gnodev/pkg/proxy"
"github.com/gnolang/gno/gno.land/pkg/gnoland"
"github.com/gnolang/gno/gno.land/pkg/gnoland/ugnot"
"github.com/gnolang/gno/gno.land/pkg/integration"
"github.com/gnolang/gno/gno.land/pkg/sdk/vm"
Expand Down Expand Up @@ -250,3 +251,89 @@ func Render(_ string) string { return foo.Render("bar") }`,
}
})
}

// TestProxyQuerySurvivesNodeRestart reproduces the lazy loading bug:
// when a PathHandler triggers a node reload (stop old node + start new node),
// the query that triggered it must still get a valid response.
// With the old TCP proxy, the persistent outConn died during reload
// and the response was lost.
func TestProxyRestart(t *testing.T) {
const targetPath = "gno.land/r/target/foo"

pkg := std.MemPackage{
Name: "foo",
Path: targetPath,
Files: []*std.MemFile{
{
Name: "foo.gno",
Body: `package foo

func Render(_ string) string { return "foo" }
`,
},
},
}
pkg.SetFile("gnomod.toml", gnolang.GenGnoModLatest(pkg.Path))
pkg.Sort()

rootdir := gnoenv.RootDir()
cfg := integration.TestingMinimalNodeConfig(rootdir)
logger := log.NewTestingLogger(t)

tmp := t.TempDir()
sock := filepath.Join(tmp, "node.sock")
addr, err := net.ResolveUnixAddr("unix", sock)
require.NoError(t, err)

// Create proxy
interceptor, err := proxy.NewPathInterceptor(logger, addr)
require.NoError(t, err)
defer interceptor.Close()
cfg.TMConfig.RPC.ListenAddress = interceptor.ProxyAddress()
cfg.SkipGenesisSigVerification = true

// Setup genesis
privKey := secp256k1.GenPrivKey()
cfg.Genesis.AppState = integration.GenerateTestingGenesisState(privKey, pkg)

// Start the initial node
node, _ := integration.TestingInMemoryNode(t, logger, cfg)

// Register a handler that restarts the node (simulating devNode.Reload)
restarted := make(chan struct{}, 1)
interceptor.HandlePath(func(paths ...string) {
// Stop the current node — this kills the RPC server
require.NoError(t, node.Stop())

// Start a fresh node on the same address (same cfg)
newNode, err := gnoland.NewInMemoryNode(logger, cfg)
require.NoError(t, err)
require.NoError(t, newNode.Start())
select {
case <-newNode.Ready():
case <-time.After(10 * time.Second):
t.Fatal("node didn't become ready after restart")
}
node = newNode
t.Cleanup(func() { newNode.Stop() })

restarted <- struct{}{}
})

cli, err := client.NewHTTPClient(interceptor.TargetAddress())
require.NoError(t, err)

// This query triggers the handler which restarts the node mid-request.
// With the HTTP reverse proxy, the forward happens AFTER the restart,
// so it connects to the new node and succeeds.
res, err := cli.ABCIQuery(context.Background(), "vm/qrender", []byte(targetPath+":\n"))
require.NoError(t, err, "query must succeed even after node restart")
assert.Nil(t, res.Response.Error)

select {
case <-restarted:
// Good — handler restarted the node before the query was forwarded
default:
t.Fatal("handler was not called")
}
}
Loading
Loading