From bc8ba24a35aa58ed506a4348edf58c86134d24d7 Mon Sep 17 00:00:00 2001 From: ZhangJian He Date: Sun, 15 Sep 2024 10:56:48 +0800 Subject: [PATCH] feat: client support send bytes method Signed-off-by: ZhangJian He --- go.mod | 2 +- go.sum | 4 +- opcua/client.go | 166 ++++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 164 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index 3c8bc79..3f71120 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module opcua-go go 1.21 require ( - github.com/shoothzj/gox v0.0.3-0.20240914162741-a7541b79fa6f + github.com/shoothzj/gox v0.0.3-0.20240915030409-0ff3ac4e657c github.com/stretchr/testify v1.9.0 ) diff --git a/go.sum b/go.sum index 8170fd8..3aa2062 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/shoothzj/gox v0.0.3-0.20240914162741-a7541b79fa6f h1:a70I1OcxVdGGIN7hnt4p/22afJ16tFvOkjlTfV9aK8o= -github.com/shoothzj/gox v0.0.3-0.20240914162741-a7541b79fa6f/go.mod h1:W8vthsaC2LWvu1zy/B9znKuOOSrFUqWCMHkdbqZJE04= +github.com/shoothzj/gox v0.0.3-0.20240915030409-0ff3ac4e657c h1:bGa8KCyniEJwV1IF6BKSwjKlexk5If8cXp3M17JOfa4= +github.com/shoothzj/gox v0.0.3-0.20240915030409-0ff3ac4e657c/go.mod h1:W8vthsaC2LWvu1zy/B9znKuOOSrFUqWCMHkdbqZJE04= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= diff --git a/opcua/client.go b/opcua/client.go index 0767d7e..7a1edce 100644 --- a/opcua/client.go +++ b/opcua/client.go @@ -1,20 +1,176 @@ package opcua -import "log/slog" +import ( + "crypto/tls" + "fmt" + "github.com/shoothzj/gox/buffer" + "github.com/shoothzj/gox/netx" + "log/slog" + "net" + "sync" +) type ClientConfig struct { - logger *slog.Logger + Address netx.Address + BufferMax int + SendQueueSize int + PendingQueueSize int + TlsConfig *tls.Config + + Logger *slog.Logger +} + +type sendRequest struct { + bytes []byte + callback func([]byte, error) } type Client struct { config *ClientConfig logger *slog.Logger + + conn net.Conn + eventsChan chan *sendRequest + pendingQueue chan *sendRequest + buffer *buffer.Buffer + closeCh chan struct{} +} + +func (c *Client) Send(bytes []byte) ([]byte, error) { + wg := sync.WaitGroup{} + wg.Add(1) + var result []byte + var err error + c.sendAsync(bytes, func(resp []byte, e error) { + result = resp + err = e + wg.Done() + }) + wg.Wait() + if err != nil { + return nil, err + } + return result, nil +} + +func (c *Client) sendAsync(bytes []byte, callback func([]byte, error)) { + sr := &sendRequest{ + bytes: bytes, + callback: callback, + } + c.eventsChan <- sr +} + +func (c *Client) read() { + for { + select { + case req := <-c.pendingQueue: + n, err := c.conn.Read(c.buffer.WritableSlice()) + if err != nil { + req.callback(nil, err) + c.closeCh <- struct{}{} + break + } + err = c.buffer.AdjustWriteCursor(n) + if err != nil { + req.callback(nil, err) + c.closeCh <- struct{}{} + break + } + if c.buffer.Size() < 4 { + continue + } + bytes := make([]byte, 4) + err = c.buffer.ReadExactly(bytes) + c.buffer.Compact() + if err != nil { + req.callback(nil, err) + c.closeCh <- struct{}{} + break + } + length := int(bytes[3]) | int(bytes[2])<<8 | int(bytes[1])<<16 | int(bytes[0])<<24 + if c.buffer.Size() < length { + continue + } + // in case ddos attack + if length > c.buffer.Capacity() { + req.callback(nil, fmt.Errorf("response length %d is too large", length)) + c.closeCh <- struct{}{} + break + } + data := make([]byte, length) + err = c.buffer.ReadExactly(data) + if err != nil { + req.callback(nil, err) + c.closeCh <- struct{}{} + break + } + c.buffer.Compact() + req.callback(data, nil) + case <-c.closeCh: + return + } + } +} + +func (c *Client) write() { + for { + select { + case req := <-c.eventsChan: + n, err := c.conn.Write(req.bytes) + if err != nil { + req.callback(nil, err) + c.closeCh <- struct{}{} + break + } + if n != len(req.bytes) { + req.callback(nil, fmt.Errorf("write %d bytes, but expect %d bytes", n, len(req.bytes))) + c.closeCh <- struct{}{} + break + } + c.pendingQueue <- req + case <-c.closeCh: + return + } + } } -func NewClient(config *ClientConfig) *Client { +func (c *Client) Close() { + _ = c.conn.Close() + c.closeCh <- struct{}{} +} + +func NewClient(config *ClientConfig) (*Client, error) { + conn, err := netx.Dial(config.Address, config.TlsConfig) + + if err != nil { + return nil, err + } + if config.SendQueueSize == 0 { + config.SendQueueSize = 1000 + } + if config.PendingQueueSize == 0 { + config.PendingQueueSize = 1000 + } + if config.BufferMax == 0 { + config.BufferMax = 512 * 1024 + } + client := &Client{ config: config, - logger: config.logger, + logger: config.Logger, + + conn: conn, + eventsChan: make(chan *sendRequest, config.SendQueueSize), + pendingQueue: make(chan *sendRequest, config.PendingQueueSize), + buffer: buffer.NewBuffer(config.BufferMax), + closeCh: make(chan struct{}), } - return client + go func() { + client.read() + }() + go func() { + client.write() + }() + return client, nil }