Skip to content

Commit

Permalink
added pool conn usage
Browse files Browse the repository at this point in the history
  • Loading branch information
0xluk committed Mar 1, 2024
1 parent 476b6cf commit 885fda8
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 21 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ go 1.20

require (
github.com/ardanlabs/conf v1.5.0
github.com/buraksezer/connpool v0.6.0
github.com/cloudflare/circl v1.3.7
github.com/cockroachdb/pebble v1.1.0
github.com/google/go-cmp v0.6.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1
github.com/pkg/errors v0.9.1
github.com/qubic/go-node-connector v0.2.1
github.com/qubic/go-node-connector v0.3.0
github.com/stretchr/testify v1.8.4
go.uber.org/zap v1.26.0
google.golang.org/grpc v1.61.1
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/buraksezer/connpool v0.6.0 h1:NnTWkd3OH3BAn4qbeI+Ks1XDzU0DQRgOfF+SxsUMdtU=
github.com/buraksezer/connpool v0.6.0/go.mod h1:qPiG7gKXo+EjrwG/yqn2StZM4ek6gcYnnGgFIVKN6b0=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down Expand Up @@ -220,8 +222,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/qubic/go-node-connector v0.2.1 h1:zvJ5L2/fVjqgZwFuKd+Tn0vIpXRhXsfpXq4dnGc9EN0=
github.com/qubic/go-node-connector v0.2.1/go.mod h1:/fXS8g/hUgkkDcxb0NvFeArV61zWfaW1ImUZ9eK+nhg=
github.com/qubic/go-node-connector v0.3.0 h1:jmdge7CK5CGoz7iC7SzKQhN23oLKzo8oLlPzqCMYy9o=
github.com/qubic/go-node-connector v0.3.0/go.mod h1:y0eMsGPY1DFEzz2JovUgEIwBZ/27zoJ8G81nX9yt8xI=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
Expand Down
64 changes: 46 additions & 18 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"github.com/ardanlabs/conf"
"github.com/buraksezer/connpool"
"github.com/cockroachdb/pebble"
"github.com/pkg/errors"
"github.com/qubic/go-archiver/rpc"
Expand All @@ -13,6 +14,7 @@ import (
"io"
"log"
"math/rand"
"net"
"net/http"
"os"
"os/signal"
Expand Down Expand Up @@ -86,45 +88,58 @@ func run() error {
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)

ticker := time.NewTicker(5 * time.Second)
peer, err := getNewRandomPeer("http://127.0.0.1:8080/peers")

cf := &connectionFactory{nodeFetcherHost: "http://127.0.0.1:8080/peers"}
p, err := connpool.NewChannelPool(5, 30, cf.connect)
if err != nil {
return errors.Wrap(err, "getting new random peer")
return errors.Wrap(err, "creating new connection pool")
}

fmt.Printf("Got new peer: %s\n", peer)

for {
select {
case <-shutdown:
log.Fatalf("Shutting down")
case <-ticker.C:
err = validateMultiple(peer, cfg.Qubic.NodePort, cfg.Qubic.FallbackTick, cfg.Qubic.BatchSize, ps)
err := do(p, cfg.Qubic.FallbackTick, cfg.Qubic.BatchSize, ps)
if err != nil {
log.Printf("Error running batch. Retrying...: %s", err.Error())
newPeer, err := getNewRandomPeer("http://127.0.0.1:8080/peers")
if err != nil {
continue
}
fmt.Printf("Got new peer: %s\n", newPeer)
peer = newPeer
log.Printf("do err: %s", err.Error())
}
log.Printf("Batch completed, continuing to next one")
}
}
}

func do(pool connpool.Pool, fallbackTick, batchSize uint64, ps *store.PebbleStore) error {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

conn, err := pool.Get(ctx)
if err != nil {
return errors.Wrap(err, "getting initial connection")
}

err = validateMultiple(conn, fallbackTick, batchSize, ps)
if err != nil {
conn.Close()
return errors.Wrap(err, "validating multiple")
}
log.Printf("Batch completed, continuing to next one")

return nil
}

func validateMultiple(nodeIP string, nodePort string, fallbackStartTick uint64, batchSize uint64, ps *store.PebbleStore) error {
client, err := qubic.NewConnection(context.Background(), nodeIP, nodePort)
func validateMultiple(conn net.Conn, fallbackStartTick uint64, batchSize uint64, ps *store.PebbleStore) error {
defer conn.Close()

ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
defer cancel()

client, err := qubic.NewClientWithConn(ctx, conn)
if err != nil {
return errors.Wrap(err, "creating qubic client")
}
defer client.Close()

val := validator.NewValidator(client, ps)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

tickInfo, err := client.GetTickInfo(ctx)
if err != nil {
return errors.Wrap(err, "getting tick info")
Expand Down Expand Up @@ -175,6 +190,19 @@ func getNextProcessingTick(ctx context.Context, fallBackTick uint64, ps *store.P
return lastTick + 1, nil
}

type connectionFactory struct {
nodeFetcherHost string
}

func (cf *connectionFactory) connect() (net.Conn, error) {
peer, err := getNewRandomPeer(cf.nodeFetcherHost)
if err != nil {
return nil, errors.Wrap(err, "getting new random peer")
}
fmt.Printf("connecting to: %s\n", peer)
return net.Dial("tcp", net.JoinHostPort(peer, "21841"))
}

type response struct {
Peers []string `json:"peers"`
Length int `json:"length"`
Expand Down

0 comments on commit 885fda8

Please sign in to comment.