Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/liteclient_clone_conns'
Browse files Browse the repository at this point in the history
  • Loading branch information
mr-tron committed Dec 17, 2024
2 parents e4c72ef + 15d2e41 commit c167355
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 16 deletions.
13 changes: 11 additions & 2 deletions liteapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ type Options struct {
LiteServers []config.LiteServer
Timeout time.Duration
// MaxConnections specifies a number of connections to lite servers for a connections pool.
MaxConnections int
MaxConnections int
WorkersPerConnection int
// InitCtx is used when opening a new connection to lite servers during the initialization.
InitCtx context.Context
// ProofPolicy specifies a policy for proof checks.
Expand Down Expand Up @@ -114,6 +115,13 @@ func WithMaxConnectionsNumber(maxConns int) Option {
}
}

func WithWorkersPerConnection(workersNum int) Option {
return func(o *Options) error {
o.WorkersPerConnection = workersNum
return nil
}
}

func WithAsyncConnectionsInit() Option {
return func(o *Options) error {
o.SyncConnectionsInitialization = false
Expand Down Expand Up @@ -266,6 +274,7 @@ func NewClient(options ...Option) (*Client, error) {
DetectArchiveNodes: false,
SyncConnectionsInitialization: true,
PoolStrategy: pool.BestPingStrategy,
WorkersPerConnection: 1,
}
for _, o := range options {
if err := o(opts); err != nil {
Expand All @@ -276,7 +285,7 @@ func NewClient(options ...Option) (*Client, error) {
return nil, fmt.Errorf("server list empty")
}
connPool := pool.New(opts.PoolStrategy)
initCh := connPool.InitializeConnections(opts.InitCtx, opts.Timeout, opts.MaxConnections, opts.DetectArchiveNodes, opts.LiteServers)
initCh := connPool.InitializeConnections(opts.InitCtx, opts.Timeout, opts.MaxConnections, opts.WorkersPerConnection, opts.DetectArchiveNodes, opts.LiteServers)
if opts.SyncConnectionsInitialization {
if err := <-initCh; err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions liteapi/pool/conn_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ func New(strategy Strategy) *ConnPool {
}
}

func (p *ConnPool) InitializeConnections(ctx context.Context, timeout time.Duration, maxConnections int, detectArchiveNodes bool, servers []config.LiteServer) chan error {
func (p *ConnPool) InitializeConnections(ctx context.Context, timeout time.Duration, maxConnections int, workersPerConnection int, detectArchiveNodes bool, servers []config.LiteServer) chan error {
ch := make(chan error, 1)
go func() {
clientsCh := make(chan clientWrapper, len(servers))
for connID, server := range servers {
go func(connID int, server config.LiteServer) {
cli, _ := connect(ctx, timeout, server)
cli, _ := connect(ctx, timeout, server, workersPerConnection)
// TODO: log error
clientsCh <- clientWrapper{
connID: connID,
Expand Down Expand Up @@ -119,7 +119,7 @@ func (p *ConnPool) InitializeConnections(ctx context.Context, timeout time.Durat
return ch
}

func connect(ctx context.Context, timeout time.Duration, server config.LiteServer) (*liteclient.Client, error) {
func connect(ctx context.Context, timeout time.Duration, server config.LiteServer, n int) (*liteclient.Client, error) {
serverPubkey, err := base64.StdEncoding.DecodeString(server.Key)
if err != nil {
return nil, err
Expand All @@ -128,7 +128,7 @@ func connect(ctx context.Context, timeout time.Duration, server config.LiteServe
if err != nil {
return nil, err
}
cli := liteclient.NewClient(c, liteclient.OptionTimeout(timeout))
cli := liteclient.NewClient(c, liteclient.OptionTimeout(timeout), liteclient.OptionWorkersPerConnection(n))
if _, err := cli.LiteServerGetMasterchainInfo(ctx); err != nil {
return nil, err
}
Expand Down
56 changes: 46 additions & 10 deletions liteclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ type Client struct {
// if such a method makes several calls to a lite server,
// the total time is bounded by the timeout.
timeout time.Duration
connection *Connection
connections []*Connection
nextConn int
connMutex sync.Mutex
queries map[queryID]chan []byte
queriesMutex sync.Mutex
}
Expand All @@ -44,22 +46,47 @@ func OptionTimeout(t time.Duration) Options {
}
}

func OptionWorkersPerConnection(n int) Options {
return func(c *Client) {
if n < 1 {
n = 1
}
connFirst := c.connections[0]
for i := 0; i < n-1; i++ {
conn, err := NewConnection(context.Background(), connFirst.peerPublicKey, connFirst.host)
if err != nil {
slog.Warn("liteclient clone connection error", err)
continue
}
c.connections = append(c.connections, conn)
}
}
}

func NewClient(c *Connection, opts ...Options) *Client {
c2 := &Client{
timeout: defaultTimeout,
connection: c,
queries: make(map[queryID]chan []byte),
timeout: defaultTimeout,
connections: []*Connection{c},
queries: make(map[queryID]chan []byte),
}
for _, f := range opts {
f(c2)
}
go c2.reader()

for _, conn := range c2.connections {
go c2.reader(conn)
}
return c2
}

// IsOK returns true if there is no problems with this client and its underlying connection to a lite server.
func (c *Client) IsOK() bool {
return c.connection.Status() == Connected
for _, conn := range c.connections {
if conn.Status() == Connected {
return true
}
}
return false
}

// Request sends q as query in adnl.message.query and receives answer from adnl.message.answer
Expand All @@ -83,7 +110,12 @@ func (c *Client) Request(ctx context.Context, q []byte) ([]byte, error) {
resp := c.registerCallback(id)
defer c.unregisterCallback(id)

err = c.connection.Send(p)
c.connMutex.Lock()
conn := c.connections[c.nextConn]
c.nextConn = (c.nextConn + 1) % len(c.connections)
c.connMutex.Unlock()

err = conn.Send(p)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -142,8 +174,8 @@ func decodeLength(b []byte) (int, []byte, error) {
return int(i) >> 8, b[4:], nil
}

func (c *Client) reader() {
for p := range c.connection.Responses() {
func (c *Client) reader(conn *Connection) {
for p := range conn.Responses() {
if p.MagicType() != magicADNLAnswer {
continue
}
Expand Down Expand Up @@ -225,7 +257,11 @@ func (c *Client) WaitMasterchainSeqno(ctx context.Context, seqno uint32, timeout
}

func (c *Client) AverageRoundTrip() time.Duration {
return c.connection.AverageRoundTrip()
var total time.Duration
for _, conn := range c.connections {
total += conn.AverageRoundTrip()
}
return total / time.Duration(len(c.connections))
}

func (c *Client) WaitMasterchainBlock(ctx context.Context, seqno uint32, timeout uint32) (res LiteServerBlockHeaderC, err error) {
Expand Down

0 comments on commit c167355

Please sign in to comment.