Skip to content

Commit

Permalink
feat: client support send bytes method
Browse files Browse the repository at this point in the history
Signed-off-by: ZhangJian He <shoothzj@gmail.com>
  • Loading branch information
shoothzj committed Sep 15, 2024
1 parent 46afb9b commit bc8ba24
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 8 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module opcua-go
go 1.21

require (
github.com/shoothzj/gox v0.0.3-0.20240914162741-a7541b79fa6f
github.com/shoothzj/gox v0.0.3-0.20240915030409-0ff3ac4e657c
github.com/stretchr/testify v1.9.0
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/shoothzj/gox v0.0.3-0.20240914162741-a7541b79fa6f h1:a70I1OcxVdGGIN7hnt4p/22afJ16tFvOkjlTfV9aK8o=
github.com/shoothzj/gox v0.0.3-0.20240914162741-a7541b79fa6f/go.mod h1:W8vthsaC2LWvu1zy/B9znKuOOSrFUqWCMHkdbqZJE04=
github.com/shoothzj/gox v0.0.3-0.20240915030409-0ff3ac4e657c h1:bGa8KCyniEJwV1IF6BKSwjKlexk5If8cXp3M17JOfa4=
github.com/shoothzj/gox v0.0.3-0.20240915030409-0ff3ac4e657c/go.mod h1:W8vthsaC2LWvu1zy/B9znKuOOSrFUqWCMHkdbqZJE04=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
Expand Down
166 changes: 161 additions & 5 deletions opcua/client.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,176 @@
package opcua

import "log/slog"
import (
"crypto/tls"
"fmt"
"github.com/shoothzj/gox/buffer"
"github.com/shoothzj/gox/netx"
"log/slog"
"net"
"sync"
)

type ClientConfig struct {
logger *slog.Logger
Address netx.Address
BufferMax int
SendQueueSize int
PendingQueueSize int
TlsConfig *tls.Config

Logger *slog.Logger
}

type sendRequest struct {
bytes []byte
callback func([]byte, error)
}

type Client struct {
config *ClientConfig
logger *slog.Logger

conn net.Conn
eventsChan chan *sendRequest
pendingQueue chan *sendRequest
buffer *buffer.Buffer
closeCh chan struct{}
}

func (c *Client) Send(bytes []byte) ([]byte, error) {
wg := sync.WaitGroup{}
wg.Add(1)
var result []byte
var err error
c.sendAsync(bytes, func(resp []byte, e error) {
result = resp
err = e
wg.Done()
})
wg.Wait()
if err != nil {
return nil, err
}
return result, nil
}

func (c *Client) sendAsync(bytes []byte, callback func([]byte, error)) {
sr := &sendRequest{
bytes: bytes,
callback: callback,
}
c.eventsChan <- sr
}

func (c *Client) read() {
for {
select {
case req := <-c.pendingQueue:
n, err := c.conn.Read(c.buffer.WritableSlice())
if err != nil {
req.callback(nil, err)
c.closeCh <- struct{}{}
break
}
err = c.buffer.AdjustWriteCursor(n)
if err != nil {
req.callback(nil, err)
c.closeCh <- struct{}{}
break
}
if c.buffer.Size() < 4 {
continue
}
bytes := make([]byte, 4)
err = c.buffer.ReadExactly(bytes)
c.buffer.Compact()
if err != nil {
req.callback(nil, err)
c.closeCh <- struct{}{}
break
}
length := int(bytes[3]) | int(bytes[2])<<8 | int(bytes[1])<<16 | int(bytes[0])<<24
if c.buffer.Size() < length {
continue
}
// in case ddos attack
if length > c.buffer.Capacity() {
req.callback(nil, fmt.Errorf("response length %d is too large", length))
c.closeCh <- struct{}{}
break
}
data := make([]byte, length)
err = c.buffer.ReadExactly(data)
if err != nil {
req.callback(nil, err)
c.closeCh <- struct{}{}
break
}
c.buffer.Compact()
req.callback(data, nil)
case <-c.closeCh:
return
}
}
}

func (c *Client) write() {
for {
select {
case req := <-c.eventsChan:
n, err := c.conn.Write(req.bytes)
if err != nil {
req.callback(nil, err)
c.closeCh <- struct{}{}
break
}
if n != len(req.bytes) {
req.callback(nil, fmt.Errorf("write %d bytes, but expect %d bytes", n, len(req.bytes)))
c.closeCh <- struct{}{}
break
}
c.pendingQueue <- req
case <-c.closeCh:
return
}
}
}

func NewClient(config *ClientConfig) *Client {
func (c *Client) Close() {
_ = c.conn.Close()
c.closeCh <- struct{}{}
}

func NewClient(config *ClientConfig) (*Client, error) {
conn, err := netx.Dial(config.Address, config.TlsConfig)

if err != nil {
return nil, err
}
if config.SendQueueSize == 0 {
config.SendQueueSize = 1000
}
if config.PendingQueueSize == 0 {
config.PendingQueueSize = 1000
}
if config.BufferMax == 0 {
config.BufferMax = 512 * 1024
}

client := &Client{
config: config,
logger: config.logger,
logger: config.Logger,

conn: conn,
eventsChan: make(chan *sendRequest, config.SendQueueSize),
pendingQueue: make(chan *sendRequest, config.PendingQueueSize),
buffer: buffer.NewBuffer(config.BufferMax),
closeCh: make(chan struct{}),
}
return client
go func() {
client.read()
}()
go func() {
client.write()
}()
return client, nil
}

0 comments on commit bc8ba24

Please sign in to comment.