diff --git a/examples/server/server.go b/examples/server/server.go new file mode 100644 index 0000000..ed9dabd --- /dev/null +++ b/examples/server/server.go @@ -0,0 +1,44 @@ +package main + +import ( + "log/slog" + "opcua-go/opcua" + "os" + "os/signal" + "syscall" + "time" +) + +func main() { + logger := slog.Default() + + config := &opcua.ServerConfig{ + Host: "127.0.0.1", + Port: 4840, + ReceiverBufferSize: 1024, + ReadTimeout: 5 * time.Second, + Logger: logger, + } + + server := opcua.NewServer(config) + + port, err := server.Run() + if err != nil { + logger.Error("Failed to start server", slog.String("error", err.Error())) + os.Exit(1) + } + + logger.Info("Server running", slog.Int("port", port)) + + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) + + <-signalChan + logger.Info("Shutting down server...") + + if err := server.Close(); err != nil { + logger.Error("Error closing server", slog.String("error", err.Error())) + } else { + logger.Info("Server stopped gracefully") + } +} diff --git a/go.mod b/go.mod index 692723c..322452e 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,10 @@ module opcua-go go 1.21 -require github.com/stretchr/testify v1.9.0 +require ( + github.com/shoothzj/gox v0.0.3-0.20240910014233-77c917119f05 + github.com/stretchr/testify v1.9.0 +) require ( github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go.sum b/go.sum index 60ce688..a3237ad 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/shoothzj/gox v0.0.3-0.20240910014233-77c917119f05 h1:1WadlKXqm14AU5fUFP29Zxdsn/oDwvVoz6CUhv5u7/M= +github.com/shoothzj/gox v0.0.3-0.20240910014233-77c917119f05/go.mod h1:W8vthsaC2LWvu1zy/B9znKuOOSrFUqWCMHkdbqZJE04= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= diff --git a/opcua/server.go b/opcua/server.go index cd99d5b..8c4dd2c 100644 --- a/opcua/server.go +++ b/opcua/server.go @@ -1,17 +1,24 @@ package opcua import ( + "encoding/binary" "fmt" + "github.com/shoothzj/gox/buffer" "log/slog" "net" "sync" + "time" ) type ServerConfig struct { Host string Port int - logger *slog.Logger + ReceiverBufferSize int + + ReadTimeout time.Duration + + Logger *slog.Logger } func (s *ServerConfig) addr() string { @@ -32,7 +39,7 @@ func NewServer(config *ServerConfig) *Server { server := &Server{ config: config, quit: make(chan bool), - logger: config.logger, + logger: config.Logger, } server.logger.Info("server initialized", slog.String("host", config.Host), slog.Int("port", config.Port)) return server @@ -80,12 +87,56 @@ func (s *Server) listenLoop() { } } go func() { - s.handleConn(netConn) + s.handleConn(&opcuaConn{ + conn: netConn, + buffer: buffer.NewBuffer(s.config.ReceiverBufferSize), + }) }() } } -func (s *Server) handleConn(conn net.Conn) { +type opcuaConn struct { + conn net.Conn + buffer *buffer.Buffer +} + +func (s *Server) handleConn(conn *opcuaConn) { + for { + if s.config.ReadTimeout > 0 { + _ = conn.conn.SetReadDeadline(time.Now().Add(s.config.ReadTimeout)) + } + readLen, err := conn.conn.Read(conn.buffer.WritableSlice()) + if err != nil { + break + } + err = conn.buffer.AdjustWriteCursor(readLen) + if err != nil { + break + } + for { + if conn.buffer.Size() < 8 { + break + } + + header := make([]byte, 8) + _, err = conn.buffer.Peek(header) + if err != nil { + break + } + + messageLen := int(binary.LittleEndian.Uint32(header[4:8])) + + if conn.buffer.Size() < messageLen { + break + } + + bytes := make([]byte, messageLen) + err = conn.buffer.ReadExactly(bytes) + if err != nil { + break + } + } + } } func (s *Server) Close() error { diff --git a/opcua/server_test.go b/opcua/server_test.go index 2e74862..e3e671a 100644 --- a/opcua/server_test.go +++ b/opcua/server_test.go @@ -11,7 +11,7 @@ func TestStartWithZeroPort(t *testing.T) { config := &ServerConfig{ Host: "localhost", Port: 0, - logger: slog.Default(), + Logger: slog.Default(), } server := NewServer(config)