Skip to content

Commit

Permalink
udpnat2: New synced udp nat service
Browse files Browse the repository at this point in the history
  • Loading branch information
nekohasekai committed Oct 21, 2024
1 parent b07fb48 commit 098ca21
Show file tree
Hide file tree
Showing 10 changed files with 312 additions and 31 deletions.
2 changes: 1 addition & 1 deletion common/network/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ type UDPHandler interface {
}

type UDPHandlerEx interface {
NewPacket(ctx context.Context, conn PacketConn, buffer *buf.Buffer, source M.Socksaddr, destination M.Socksaddr) error
NewPacketEx(buffer *buf.Buffer, source M.Socksaddr, destination M.Socksaddr)
}

// Deprecated: Use UDPConnectionHandlerEx instead.
Expand Down
22 changes: 17 additions & 5 deletions common/network/direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,27 @@ func (o ReadWaitOptions) NeedHeadroom() bool {
return o.FrontHeadroom > 0 || o.RearHeadroom > 0
}

func (o ReadWaitOptions) Copy(buffer *buf.Buffer) *buf.Buffer {
if o.FrontHeadroom > buffer.Start() ||
o.RearHeadroom > buffer.FreeLen() {
newBuffer := o.newBuffer(buf.UDPBufferSize, false)
newBuffer.Write(buffer.Bytes())
buffer.Release()
return newBuffer
} else {
return buffer
}
}

func (o ReadWaitOptions) NewBuffer() *buf.Buffer {
return o.newBuffer(buf.BufferSize)
return o.newBuffer(buf.BufferSize, true)
}

func (o ReadWaitOptions) NewPacketBuffer() *buf.Buffer {
return o.newBuffer(buf.UDPBufferSize)
return o.newBuffer(buf.UDPBufferSize, true)
}

func (o ReadWaitOptions) newBuffer(defaultBufferSize int) *buf.Buffer {
func (o ReadWaitOptions) newBuffer(defaultBufferSize int, reserve bool) *buf.Buffer {
var bufferSize int
if o.MTU > 0 {
bufferSize = o.MTU + o.FrontHeadroom + o.RearHeadroom
Expand All @@ -36,9 +48,9 @@ func (o ReadWaitOptions) newBuffer(defaultBufferSize int) *buf.Buffer {
}
buffer := buf.NewSize(bufferSize)
if o.FrontHeadroom > 0 {
buffer.Resize(o.FrontHeadroom, 0)
buffer.Advance(o.FrontHeadroom)
}
if o.RearHeadroom > 0 {
if o.RearHeadroom > 0 && reserve {
buffer.Reserve(o.RearHeadroom)
}
return buffer
Expand Down
6 changes: 0 additions & 6 deletions common/udpnat/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,6 @@ func (s *Service[T]) NewContextPacketEx(ctx context.Context, key T, buffer *buf.
s.nat.Delete(key)
}
}()
} else {
c.localAddr = source
}
if common.Done(c.ctx) {
s.nat.Delete(key)
Expand Down Expand Up @@ -215,10 +213,6 @@ func (c *conn) SetWriteDeadline(t time.Time) error {
return os.ErrInvalid
}

func (c *conn) NeedAdditionalReadDeadline() bool {
return true
}

func (c *conn) Upstream() any {
return c.source
}
90 changes: 90 additions & 0 deletions common/udpnat2/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package udpnat

import (
"io"
"net"
"os"
"time"

"github.com/sagernet/sing/common/buf"
M "github.com/sagernet/sing/common/metadata"
N "github.com/sagernet/sing/common/network"
"github.com/sagernet/sing/common/pipe"
)

type natConn struct {
writer N.PacketWriter
localAddr M.Socksaddr
packetChan chan *Packet
doneChan chan struct{}
readDeadline pipe.Deadline
readWaitOptions N.ReadWaitOptions
}

func (c *natConn) ReadPacket(buffer *buf.Buffer) (addr M.Socksaddr, err error) {
select {
case p := <-c.packetChan:
_, err = buffer.ReadOnceFrom(p.Buffer)
destination := p.Destination
p.Buffer.Release()
PutPacket(p)
return destination, err

Check failure on line 31 in common/udpnat2/conn.go

View workflow job for this annotation

GitHub Actions / macOS

cannot use destination (variable of type netip.AddrPort) as metadata.Socksaddr value in return statement

Check failure on line 31 in common/udpnat2/conn.go

View workflow job for this annotation

GitHub Actions / Build

cannot use destination (variable of type netip.AddrPort) as metadata.Socksaddr value in return statement

Check failure on line 31 in common/udpnat2/conn.go

View workflow job for this annotation

GitHub Actions / Linux (Go 1.21)

cannot use destination (variable of type netip.AddrPort) as metadata.Socksaddr value in return statement

Check failure on line 31 in common/udpnat2/conn.go

View workflow job for this annotation

GitHub Actions / Linux (Go 1.22)

cannot use destination (variable of type netip.AddrPort) as metadata.Socksaddr value in return statement

Check failure on line 31 in common/udpnat2/conn.go

View workflow job for this annotation

GitHub Actions / Windows

cannot use destination (variable of type netip.AddrPort) as metadata.Socksaddr value in return statement

Check failure on line 31 in common/udpnat2/conn.go

View workflow job for this annotation

GitHub Actions / Linux

cannot use destination (variable of type netip.AddrPort) as metadata.Socksaddr value in return statement

Check failure on line 31 in common/udpnat2/conn.go

View workflow job for this annotation

GitHub Actions / Linux (Go 1.20)

cannot use destination (variable of type netip.AddrPort) as metadata.Socksaddr value in return statement
case <-c.doneChan:
return M.Socksaddr{}, io.ErrClosedPipe
case <-c.readDeadline.Wait():
return M.Socksaddr{}, os.ErrDeadlineExceeded
}
}

func (c *natConn) WritePacket(buffer *buf.Buffer, destination M.Socksaddr) error {
return c.writer.WritePacket(buffer, destination)
}

func (c *natConn) InitializeReadWaiter(options N.ReadWaitOptions) (needCopy bool) {
c.readWaitOptions = options
return false
}

func (c *natConn) WaitReadPacket() (buffer *buf.Buffer, destination M.Socksaddr, err error) {
select {
case packet := <-c.packetChan:
buffer = c.readWaitOptions.Copy(packet.Buffer)
destination = packet.Destination

Check failure on line 52 in common/udpnat2/conn.go

View workflow job for this annotation

GitHub Actions / macOS

cannot use packet.Destination (variable of type netip.AddrPort) as metadata.Socksaddr value in assignment

Check failure on line 52 in common/udpnat2/conn.go

View workflow job for this annotation

GitHub Actions / Build

cannot use packet.Destination (variable of type netip.AddrPort) as metadata.Socksaddr value in assignment (typecheck)

Check failure on line 52 in common/udpnat2/conn.go

View workflow job for this annotation

GitHub Actions / Linux (Go 1.21)

cannot use packet.Destination (variable of type netip.AddrPort) as metadata.Socksaddr value in assignment

Check failure on line 52 in common/udpnat2/conn.go

View workflow job for this annotation

GitHub Actions / Linux (Go 1.22)

cannot use packet.Destination (variable of type netip.AddrPort) as metadata.Socksaddr value in assignment

Check failure on line 52 in common/udpnat2/conn.go

View workflow job for this annotation

GitHub Actions / Windows

cannot use packet.Destination (variable of type netip.AddrPort) as metadata.Socksaddr value in assignment

Check failure on line 52 in common/udpnat2/conn.go

View workflow job for this annotation

GitHub Actions / Linux

cannot use packet.Destination (variable of type netip.AddrPort) as metadata.Socksaddr value in assignment

Check failure on line 52 in common/udpnat2/conn.go

View workflow job for this annotation

GitHub Actions / Linux (Go 1.20)

cannot use packet.Destination (variable of type netip.AddrPort) as metadata.Socksaddr value in assignment
PutPacket(packet)
return
case <-c.doneChan:
return nil, M.Socksaddr{}, io.ErrClosedPipe
case <-c.readDeadline.Wait():
return nil, M.Socksaddr{}, os.ErrDeadlineExceeded
}
}

func (c *natConn) Close() error {
select {
case <-c.doneChan:
default:
close(c.doneChan)
}
return nil
}

func (c *natConn) LocalAddr() net.Addr {
return c.localAddr
}

func (c *natConn) RemoteAddr() net.Addr {
return M.Socksaddr{}
}

func (c *natConn) SetDeadline(t time.Time) error {
return os.ErrInvalid
}

func (c *natConn) SetReadDeadline(t time.Time) error {
c.readDeadline.Set(t)
return nil
}

func (c *natConn) SetWriteDeadline(t time.Time) error {
return os.ErrInvalid
}
28 changes: 28 additions & 0 deletions common/udpnat2/packet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package udpnat

import (
"net/netip"
"sync"

"github.com/sagernet/sing/common/buf"
)

var packetPool = sync.Pool{
New: func() any {
return new(Packet)
},
}

type Packet struct {
Buffer *buf.Buffer
Destination netip.AddrPort
}

func NewPacket() *Packet {
return packetPool.Get().(*Packet)
}

func PutPacket(packet *Packet) {
*packet = Packet{}
packetPool.Put(packet)
}
100 changes: 100 additions & 0 deletions common/udpnat2/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package udpnat

import (
"context"
"net/netip"
"time"

"github.com/sagernet/sing/common"
"github.com/sagernet/sing/common/buf"
M "github.com/sagernet/sing/common/metadata"
N "github.com/sagernet/sing/common/network"
"github.com/sagernet/sing/common/pipe"
"github.com/sagernet/sing/contrab/freelru"
"github.com/sagernet/sing/contrab/maphash"
)

type Service struct {
nat *freelru.LRU[netip.AddrPort, *natConn]
handler Handler
metrics Metrics
}

type Handler interface {
PreparePacketConnection(buffer *buf.Buffer, source netip.AddrPort, destination netip.AddrPort, userData any) (bool, context.Context, N.PacketWriter, N.CloseHandlerFunc)
N.UDPConnectionHandlerEx
}

type Metrics struct {
Creates uint64
Rejects uint64
Inputs uint64
Drops uint64
}

func New(handler Handler, timeout time.Duration) *Service {
nat := common.Must1(freelru.New[netip.AddrPort, *natConn](1024, maphash.NewHasher[netip.AddrPort]().Hash32))
nat.SetLifetime(timeout)
nat.SetHealthCheck(func(port netip.AddrPort, conn *natConn) bool {
select {
case <-conn.doneChan:
return false
default:
return true
}
})
nat.SetOnEvict(func(_ netip.AddrPort, conn *natConn) {
conn.Close()
})
return &Service{
nat: nat,
handler: handler,
}
}

func (s *Service) NewPacket(buffer *buf.Buffer, source netip.AddrPort, destination netip.AddrPort, userData any) {
conn, loaded := s.nat.Get(source)
if !loaded {
ok, ctx, writer, onClose := s.handler.PreparePacketConnection(buffer, source, destination, userData)
if !ok {
buffer.Release()
s.metrics.Rejects++
return
}
conn = &natConn{
writer: writer,
localAddr: M.SocksaddrFromNetIP(source),
packetChan: make(chan *Packet, 64),
doneChan: make(chan struct{}),
readDeadline: pipe.MakeDeadline(),
}
packet := NewPacket()
*packet = Packet{
Buffer: buffer,
Destination: destination,
}
conn.packetChan <- packet
s.nat.Add(source, conn)
s.handler.NewPacketConnectionEx(ctx, conn, M.SocksaddrFromNetIP(source), M.SocksaddrFromNetIP(destination), onClose)
s.metrics.Creates++
s.metrics.Inputs++
return
}
packet := NewPacket()
*packet = Packet{
Buffer: conn.readWaitOptions.Copy(buffer),
Destination: destination,
}
select {
case conn.packetChan <- packet:
s.metrics.Inputs++
default:
packet.Buffer.Release()
PutPacket(packet)
s.metrics.Drops++
}
}

func (s *Service) Metrics() Metrics {
return s.metrics
}
Loading

0 comments on commit 098ca21

Please sign in to comment.