From 885fda8c65bd44369f74438c00a500aa0a7984b7 Mon Sep 17 00:00:00 2001 From: 0xluk Date: Fri, 1 Mar 2024 18:02:45 +0200 Subject: [PATCH] added pool conn usage --- go.mod | 3 ++- go.sum | 6 ++++-- main.go | 64 +++++++++++++++++++++++++++++++++++++++++---------------- 3 files changed, 52 insertions(+), 21 deletions(-) diff --git a/go.mod b/go.mod index 94002d0..97291f9 100644 --- a/go.mod +++ b/go.mod @@ -4,12 +4,13 @@ go 1.20 require ( github.com/ardanlabs/conf v1.5.0 + github.com/buraksezer/connpool v0.6.0 github.com/cloudflare/circl v1.3.7 github.com/cockroachdb/pebble v1.1.0 github.com/google/go-cmp v0.6.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 github.com/pkg/errors v0.9.1 - github.com/qubic/go-node-connector v0.2.1 + github.com/qubic/go-node-connector v0.3.0 github.com/stretchr/testify v1.8.4 go.uber.org/zap v1.26.0 google.golang.org/grpc v1.61.1 diff --git a/go.sum b/go.sum index c0b0c10..b10f99a 100644 --- a/go.sum +++ b/go.sum @@ -46,6 +46,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/buraksezer/connpool v0.6.0 h1:NnTWkd3OH3BAn4qbeI+Ks1XDzU0DQRgOfF+SxsUMdtU= +github.com/buraksezer/connpool v0.6.0/go.mod h1:qPiG7gKXo+EjrwG/yqn2StZM4ek6gcYnnGgFIVKN6b0= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -220,8 +222,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= -github.com/qubic/go-node-connector v0.2.1 h1:zvJ5L2/fVjqgZwFuKd+Tn0vIpXRhXsfpXq4dnGc9EN0= -github.com/qubic/go-node-connector v0.2.1/go.mod h1:/fXS8g/hUgkkDcxb0NvFeArV61zWfaW1ImUZ9eK+nhg= +github.com/qubic/go-node-connector v0.3.0 h1:jmdge7CK5CGoz7iC7SzKQhN23oLKzo8oLlPzqCMYy9o= +github.com/qubic/go-node-connector v0.3.0/go.mod h1:y0eMsGPY1DFEzz2JovUgEIwBZ/27zoJ8G81nX9yt8xI= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= diff --git a/main.go b/main.go index 880dfd4..0d3358e 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "github.com/ardanlabs/conf" + "github.com/buraksezer/connpool" "github.com/cockroachdb/pebble" "github.com/pkg/errors" "github.com/qubic/go-archiver/rpc" @@ -13,6 +14,7 @@ import ( "io" "log" "math/rand" + "net" "net/http" "os" "os/signal" @@ -86,45 +88,58 @@ func run() error { signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM) ticker := time.NewTicker(5 * time.Second) - peer, err := getNewRandomPeer("http://127.0.0.1:8080/peers") + + cf := &connectionFactory{nodeFetcherHost: "http://127.0.0.1:8080/peers"} + p, err := connpool.NewChannelPool(5, 30, cf.connect) if err != nil { - return errors.Wrap(err, "getting new random peer") + return errors.Wrap(err, "creating new connection pool") } - fmt.Printf("Got new peer: %s\n", peer) - for { select { case <-shutdown: log.Fatalf("Shutting down") case <-ticker.C: - err = validateMultiple(peer, cfg.Qubic.NodePort, cfg.Qubic.FallbackTick, cfg.Qubic.BatchSize, ps) + err := do(p, cfg.Qubic.FallbackTick, cfg.Qubic.BatchSize, ps) if err != nil { - log.Printf("Error running batch. Retrying...: %s", err.Error()) - newPeer, err := getNewRandomPeer("http://127.0.0.1:8080/peers") - if err != nil { - continue - } - fmt.Printf("Got new peer: %s\n", newPeer) - peer = newPeer + log.Printf("do err: %s", err.Error()) } - log.Printf("Batch completed, continuing to next one") } + } +} + +func do(pool connpool.Pool, fallbackTick, batchSize uint64, ps *store.PebbleStore) error { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + conn, err := pool.Get(ctx) + if err != nil { + return errors.Wrap(err, "getting initial connection") + } + + err = validateMultiple(conn, fallbackTick, batchSize, ps) + if err != nil { + conn.Close() + return errors.Wrap(err, "validating multiple") } + log.Printf("Batch completed, continuing to next one") + + return nil } -func validateMultiple(nodeIP string, nodePort string, fallbackStartTick uint64, batchSize uint64, ps *store.PebbleStore) error { - client, err := qubic.NewConnection(context.Background(), nodeIP, nodePort) +func validateMultiple(conn net.Conn, fallbackStartTick uint64, batchSize uint64, ps *store.PebbleStore) error { + defer conn.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) + defer cancel() + + client, err := qubic.NewClientWithConn(ctx, conn) if err != nil { return errors.Wrap(err, "creating qubic client") } defer client.Close() val := validator.NewValidator(client, ps) - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - tickInfo, err := client.GetTickInfo(ctx) if err != nil { return errors.Wrap(err, "getting tick info") @@ -175,6 +190,19 @@ func getNextProcessingTick(ctx context.Context, fallBackTick uint64, ps *store.P return lastTick + 1, nil } +type connectionFactory struct { + nodeFetcherHost string +} + +func (cf *connectionFactory) connect() (net.Conn, error) { + peer, err := getNewRandomPeer(cf.nodeFetcherHost) + if err != nil { + return nil, errors.Wrap(err, "getting new random peer") + } + fmt.Printf("connecting to: %s\n", peer) + return net.Dial("tcp", net.JoinHostPort(peer, "21841")) +} + type response struct { Peers []string `json:"peers"` Length int `json:"length"`