Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prep for dual network support #62

Merged
merged 6 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2018-2021 The Decred developers
// Copyright (c) 2018-2023 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand All @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"net"
"net/netip"
"os"
"path/filepath"
"strings"
Expand All @@ -33,10 +34,13 @@ var (
//
// See loadConfig for details on the configuration load process.
type config struct {
Listen string `long:"httplisten" description:"HTTP listen on address:port"`
Seeder string `short:"s" description:"IP address of a working node"`
TestNet bool `long:"testnet" description:"Use testnet"`
Listen string `long:"httplisten" description:"HTTP listen on address:port"`
Seeder string `short:"s" description:"IP address of a working node"`
TestNet bool `long:"testnet" description:"Use testnet"`

netParams *chaincfg.Params
seederIP netip.AddrPort
dataDir string
}

func loadConfig() (*config, error) {
Expand Down Expand Up @@ -103,6 +107,8 @@ func loadConfig() (*config, error) {
cfg.netParams = chaincfg.MainNetParams()
}

cfg.dataDir = filepath.Join(defaultHomeDir, cfg.netParams.Name)

if cfg.Listen == "" {
return nil, fmt.Errorf("no listeners specified")
}
Expand All @@ -113,6 +119,11 @@ func loadConfig() (*config, error) {
}
cfg.Seeder = normalizeAddress(cfg.Seeder, cfg.netParams.DefaultPort)

cfg.seederIP, err = netip.ParseAddrPort(cfg.Seeder)
if err != nil {
return nil, fmt.Errorf("invalid seeder ip: %v", err)
}

return &cfg, nil
}

Expand Down
81 changes: 52 additions & 29 deletions decred.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"net"
"net/netip"
"os"
"path/filepath"
"sync"
"time"

Expand All @@ -28,15 +27,27 @@ const (
defaultNodeTimeout = time.Second * 3
)

var amgr *Manager
type crawler struct {
params *chaincfg.Params
amgr *Manager
log *log.Logger
}

func newCrawler(params *chaincfg.Params, amgr *Manager, log *log.Logger) *crawler {
return &crawler{
params: params,
amgr: amgr,
log: log,
}
}

func testPeer(ctx context.Context, ip netip.AddrPort, netParams *chaincfg.Params) {
func (c *crawler) testPeer(ctx context.Context, ip netip.AddrPort) {
onaddr := make(chan struct{}, 1)
verack := make(chan struct{}, 1)
config := peer.Config{
UserAgentName: appName,
UserAgentVersion: "0.0.1",
Net: netParams.Net,
Net: c.params.Net,
DisableRelayTx: true,

Listeners: peer.MessageListeners{
Expand All @@ -48,13 +59,13 @@ func testPeer(ctx context.Context, ip netip.AddrPort, netParams *chaincfg.Params
n = append(n, addrPort)
}
}
added := amgr.AddAddresses(n)
log.Printf("Peer %v sent %v addresses, %d new",
added := c.amgr.AddAddresses(n)
c.log.Printf("Peer %v sent %v addresses, %d new",
p.Addr(), len(msg.AddrList), added)
onaddr <- struct{}{}
},
OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
log.Printf("Adding peer %v with services %v pver %d",
c.log.Printf("Adding peer %v with services %v pver %d",
p.NA().IP.String(), p.Services(), p.ProtocolVersion())
verack <- struct{}{}
},
Expand All @@ -64,13 +75,13 @@ func testPeer(ctx context.Context, ip netip.AddrPort, netParams *chaincfg.Params
host := ip.String()
p, err := peer.NewOutboundPeer(&config, host)
if err != nil {
log.Printf("NewOutboundPeer on %v: %v", host, err)
c.log.Printf("NewOutboundPeer on %v: %v", host, err)
return
}

// Time stamp the attempt after disconnect or dial error so we don't prune
// this peer before or during its test.
defer amgr.Attempt(ip)
defer c.amgr.Attempt(ip)

ctxTimeout, cancel := context.WithTimeout(ctx, defaultNodeTimeout)
defer cancel()
Expand All @@ -86,13 +97,13 @@ func testPeer(ctx context.Context, ip netip.AddrPort, netParams *chaincfg.Params
select {
case <-verack:
// Mark this peer as a good node.
amgr.Good(ip, p.Services(), p.ProtocolVersion())
c.amgr.Good(ip, p.Services(), p.ProtocolVersion())

// Ask peer for some addresses.
p.QueueMessage(wire.NewMsgGetAddr(), nil)

case <-time.After(defaultNodeTimeout):
log.Printf("verack timeout on peer %v", p.Addr())
c.log.Printf("verack timeout on peer %v", p.Addr())
return
case <-ctx.Done():
return
Expand All @@ -101,20 +112,20 @@ func testPeer(ctx context.Context, ip netip.AddrPort, netParams *chaincfg.Params
select {
case <-onaddr:
case <-time.After(defaultNodeTimeout):
log.Printf("getaddr timeout on peer %v", p.Addr())
c.log.Printf("getaddr timeout on peer %v", p.Addr())
case <-ctx.Done():
}
}

func creep(ctx context.Context, netParams *chaincfg.Params) {
func (c *crawler) run(ctx context.Context) {
for {
if ctx.Err() != nil {
return
}

ips := amgr.Addresses()
ips := c.amgr.Addresses()
if len(ips) == 0 {
log.Printf("No stale addresses -- sleeping for %v", defaultAddressTimeout)
c.log.Printf("No stale addresses -- sleeping for %v", defaultAddressTimeout)
select {
case <-time.After(defaultAddressTimeout):
case <-ctx.Done():
Expand All @@ -128,62 +139,74 @@ func creep(ctx context.Context, netParams *chaincfg.Params) {
for _, ip := range ips {
go func(ip netip.AddrPort) {
defer wg.Done()
testPeer(ctx, ip, netParams)
c.testPeer(ctx, ip)
}(ip)
}
wg.Wait()
}
}

func main() {
os.Exit(run())
}

// run is the real main function for dcrseeder. It is necessary to work around
// the fact that deferred functions do not run when os.Exit() is called.
func run() int {
ctx := shutdownListener()

cfg, err := loadConfig()
if err != nil {
fmt.Fprintf(os.Stderr, "loadConfig: %v\n", err)
os.Exit(1)
return 1
}

dataDir := filepath.Join(defaultHomeDir, cfg.netParams.Name)
amgr, err = NewManager(dataDir)
// Prefix log lines with current network, e.g. "[mainnet]" or "[testnet]".
logPrefix := fmt.Sprintf("[%.7s] ", cfg.netParams.Name)
log := log.New(os.Stdout, logPrefix, log.LstdFlags|log.Lmsgprefix)

amgr, err := NewManager(cfg.dataDir, log)
if err != nil {
fmt.Fprintf(os.Stderr, "NewManager: %v\n", err)
os.Exit(1)
return 1
}

seeder, err := netip.ParseAddrPort(cfg.Seeder)
amgr.AddAddresses([]netip.AddrPort{cfg.seederIP})

c := newCrawler(cfg.netParams, amgr, log)

server, err := newServer(cfg.Listen, amgr, log)
if err != nil {
fmt.Fprintf(os.Stderr, "Invalid seeder ip: %v\n", err)
os.Exit(1)
fmt.Fprint(os.Stderr, err)
return 1
}
amgr.AddAddresses([]netip.AddrPort{seeder})

var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
amgr.run(ctx) // only returns on context cancellation
amgr.run(ctx) // Only returns on context cancellation.
log.Print("Address manager done.")
}()

wg.Add(1)
go func() {
defer wg.Done()
creep(ctx, cfg.netParams) // only returns on context cancellation
c.run(ctx) // Only returns on context cancellation.
log.Print("Crawler done.")
}()

wg.Add(1)
go func() {
defer wg.Done()
if err := serveHTTP(ctx, cfg.Listen); err != nil {
log.Fatal(err)
}
server.run(ctx) // Only returns on context cancellation.
log.Print("HTTP server done.")
}()

// Wait for crawler and http server, then stop address manager.
wg.Wait()
log.Print("Bye!")

return 0
}
57 changes: 39 additions & 18 deletions http.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2018-2021 The Decred developers
// Copyright (c) 2018-2023 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.

Expand All @@ -8,7 +8,6 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"net"
"net/http"
Expand All @@ -22,7 +21,7 @@ import (

const defaultHTTPTimeout = 10 * time.Second

func httpGetAddrs(w http.ResponseWriter, r *http.Request) {
func httpGetAddrs(w http.ResponseWriter, r *http.Request, amgr *Manager, log *log.Logger) {
var wantedIP uint32
var wantedPV uint32
var wantedSF wire.ServiceFlag
Expand Down Expand Up @@ -81,39 +80,61 @@ func httpGetAddrs(w http.ResponseWriter, r *http.Request) {
}
}

func serveHTTP(ctx context.Context, addr string) error {
type server struct {
srv *http.Server
listener net.Listener
log *log.Logger
}

func newServer(addr string, amgr *Manager, log *log.Logger) (*server, error) {
listener, err := net.Listen("tcp", addr)
if err != nil {
return fmt.Errorf("can't listen on %s. web server quitting: %w", addr, err)
return nil, err
}

mux := http.NewServeMux()
mux.HandleFunc(api.GetAddrsPath, httpGetAddrs)
mux.HandleFunc(api.GetAddrsPath, func(w http.ResponseWriter, r *http.Request) {
httpGetAddrs(w, r, amgr, log)
})

srv := &http.Server{
Handler: mux,
ReadTimeout: defaultHTTPTimeout, // slow requests should not hold connections opened
WriteTimeout: defaultHTTPTimeout, // request to response time
}

// Shutdown the server on context cancellation.
return &server{
srv: srv,
listener: listener,
log: log,
}, nil
}

func (h *server) run(ctx context.Context) {
var wg sync.WaitGroup

// Add the graceful shutdown to the waitgroup.
wg.Add(1)
go func() {
defer wg.Done()
// Wait until context is canceled before shutting down the server.
<-ctx.Done()
ctxShutdown, cancel := context.WithTimeout(context.Background(), defaultHTTPTimeout)
defer cancel()
err := srv.Shutdown(ctxShutdown)
if err != nil {
log.Printf("Trouble shutting down HTTP server: %v", err)
_ = h.srv.Shutdown(ctx)
}()

// Start webserver.
wg.Add(1)
go func() {
defer wg.Done()

h.log.Printf("Listening on %s", h.listener.Addr())
err := h.srv.Serve(h.listener)
// ErrServerClosed is expected from a graceful server shutdown, it can
// be ignored. Anything else should be logged.
if err != nil && !errors.Is(err, http.ErrServerClosed) {
h.log.Printf("unexpected (http.Server).Serve error: %v", err)
}
}()
defer wg.Wait()

err = srv.Serve(listener) // blocking
if !errors.Is(err, http.ErrServerClosed) {
return fmt.Errorf("unexpected (http.Server).Serve error: %w", err)
}
return nil // Shutdown called
wg.Wait()
}
Loading
Loading