diff --git a/liteapi/client.go b/liteapi/client.go index 736162e..c6d45b2 100644 --- a/liteapi/client.go +++ b/liteapi/client.go @@ -81,7 +81,8 @@ type Options struct { LiteServers []config.LiteServer Timeout time.Duration // MaxConnections specifies a number of connections to lite servers for a connections pool. - MaxConnections int + MaxConnections int + WorkersPerConnection int // InitCtx is used when opening a new connection to lite servers during the initialization. InitCtx context.Context // ProofPolicy specifies a policy for proof checks. @@ -114,6 +115,13 @@ func WithMaxConnectionsNumber(maxConns int) Option { } } +func WithWorkersPerConnection(workersNum int) Option { + return func(o *Options) error { + o.WorkersPerConnection = workersNum + return nil + } +} + func WithAsyncConnectionsInit() Option { return func(o *Options) error { o.SyncConnectionsInitialization = false @@ -266,6 +274,7 @@ func NewClient(options ...Option) (*Client, error) { DetectArchiveNodes: false, SyncConnectionsInitialization: true, PoolStrategy: pool.BestPingStrategy, + WorkersPerConnection: 1, } for _, o := range options { if err := o(opts); err != nil { @@ -276,7 +285,7 @@ func NewClient(options ...Option) (*Client, error) { return nil, fmt.Errorf("server list empty") } connPool := pool.New(opts.PoolStrategy) - initCh := connPool.InitializeConnections(opts.InitCtx, opts.Timeout, opts.MaxConnections, opts.DetectArchiveNodes, opts.LiteServers) + initCh := connPool.InitializeConnections(opts.InitCtx, opts.Timeout, opts.MaxConnections, opts.WorkersPerConnection, opts.DetectArchiveNodes, opts.LiteServers) if opts.SyncConnectionsInitialization { if err := <-initCh; err != nil { return nil, err diff --git a/liteapi/pool/conn_pool.go b/liteapi/pool/conn_pool.go index 5302856..5f6f486 100644 --- a/liteapi/pool/conn_pool.go +++ b/liteapi/pool/conn_pool.go @@ -71,13 +71,13 @@ func New(strategy Strategy) *ConnPool { } } -func (p *ConnPool) InitializeConnections(ctx context.Context, timeout time.Duration, maxConnections int, detectArchiveNodes bool, servers []config.LiteServer) chan error { +func (p *ConnPool) InitializeConnections(ctx context.Context, timeout time.Duration, maxConnections int, workersPerConnection int, detectArchiveNodes bool, servers []config.LiteServer) chan error { ch := make(chan error, 1) go func() { clientsCh := make(chan clientWrapper, len(servers)) for connID, server := range servers { go func(connID int, server config.LiteServer) { - cli, _ := connect(ctx, timeout, server) + cli, _ := connect(ctx, timeout, server, workersPerConnection) // TODO: log error clientsCh <- clientWrapper{ connID: connID, @@ -119,7 +119,7 @@ func (p *ConnPool) InitializeConnections(ctx context.Context, timeout time.Durat return ch } -func connect(ctx context.Context, timeout time.Duration, server config.LiteServer) (*liteclient.Client, error) { +func connect(ctx context.Context, timeout time.Duration, server config.LiteServer, n int) (*liteclient.Client, error) { serverPubkey, err := base64.StdEncoding.DecodeString(server.Key) if err != nil { return nil, err @@ -128,7 +128,7 @@ func connect(ctx context.Context, timeout time.Duration, server config.LiteServe if err != nil { return nil, err } - cli := liteclient.NewClient(c, liteclient.OptionTimeout(timeout)) + cli := liteclient.NewClient(c, liteclient.OptionTimeout(timeout), liteclient.OptionWorkersPerConnection(n)) if _, err := cli.LiteServerGetMasterchainInfo(ctx); err != nil { return nil, err } diff --git a/liteclient/client.go b/liteclient/client.go index f805d0f..d3bfcbb 100644 --- a/liteclient/client.go +++ b/liteclient/client.go @@ -31,7 +31,9 @@ type Client struct { // if such a method makes several calls to a lite server, // the total time is bounded by the timeout. timeout time.Duration - connection *Connection + connections []*Connection + nextConn int + connMutex sync.Mutex queries map[queryID]chan []byte queriesMutex sync.Mutex } @@ -44,22 +46,47 @@ func OptionTimeout(t time.Duration) Options { } } +func OptionWorkersPerConnection(n int) Options { + return func(c *Client) { + if n < 1 { + n = 1 + } + connFirst := c.connections[0] + for i := 0; i < n-1; i++ { + conn, err := NewConnection(context.Background(), connFirst.peerPublicKey, connFirst.host) + if err != nil { + slog.Warn("liteclient clone connection error", err) + continue + } + c.connections = append(c.connections, conn) + } + } +} + func NewClient(c *Connection, opts ...Options) *Client { c2 := &Client{ - timeout: defaultTimeout, - connection: c, - queries: make(map[queryID]chan []byte), + timeout: defaultTimeout, + connections: []*Connection{c}, + queries: make(map[queryID]chan []byte), } for _, f := range opts { f(c2) } - go c2.reader() + + for _, conn := range c2.connections { + go c2.reader(conn) + } return c2 } // IsOK returns true if there is no problems with this client and its underlying connection to a lite server. func (c *Client) IsOK() bool { - return c.connection.Status() == Connected + for _, conn := range c.connections { + if conn.Status() == Connected { + return true + } + } + return false } // Request sends q as query in adnl.message.query and receives answer from adnl.message.answer @@ -83,7 +110,12 @@ func (c *Client) Request(ctx context.Context, q []byte) ([]byte, error) { resp := c.registerCallback(id) defer c.unregisterCallback(id) - err = c.connection.Send(p) + c.connMutex.Lock() + conn := c.connections[c.nextConn] + c.nextConn = (c.nextConn + 1) % len(c.connections) + c.connMutex.Unlock() + + err = conn.Send(p) if err != nil { return nil, err } @@ -142,8 +174,8 @@ func decodeLength(b []byte) (int, []byte, error) { return int(i) >> 8, b[4:], nil } -func (c *Client) reader() { - for p := range c.connection.Responses() { +func (c *Client) reader(conn *Connection) { + for p := range conn.Responses() { if p.MagicType() != magicADNLAnswer { continue } @@ -225,7 +257,11 @@ func (c *Client) WaitMasterchainSeqno(ctx context.Context, seqno uint32, timeout } func (c *Client) AverageRoundTrip() time.Duration { - return c.connection.AverageRoundTrip() + var total time.Duration + for _, conn := range c.connections { + total += conn.AverageRoundTrip() + } + return total / time.Duration(len(c.connections)) } func (c *Client) WaitMasterchainBlock(ctx context.Context, seqno uint32, timeout uint32) (res LiteServerBlockHeaderC, err error) {