diff --git a/config.go b/config.go index 4b1ec106..e66f8fce 100644 --- a/config.go +++ b/config.go @@ -1,4 +1,4 @@ -// Copyright (c) 2018-2021 The Decred developers +// Copyright (c) 2018-2023 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -8,6 +8,7 @@ import ( "errors" "fmt" "net" + "net/netip" "os" "path/filepath" "strings" @@ -33,10 +34,13 @@ var ( // // See loadConfig for details on the configuration load process. type config struct { - Listen string `long:"httplisten" description:"HTTP listen on address:port"` - Seeder string `short:"s" description:"IP address of a working node"` - TestNet bool `long:"testnet" description:"Use testnet"` + Listen string `long:"httplisten" description:"HTTP listen on address:port"` + Seeder string `short:"s" description:"IP address of a working node"` + TestNet bool `long:"testnet" description:"Use testnet"` + netParams *chaincfg.Params + seederIP netip.AddrPort + dataDir string } func loadConfig() (*config, error) { @@ -103,6 +107,8 @@ func loadConfig() (*config, error) { cfg.netParams = chaincfg.MainNetParams() } + cfg.dataDir = filepath.Join(defaultHomeDir, cfg.netParams.Name) + if cfg.Listen == "" { return nil, fmt.Errorf("no listeners specified") } @@ -113,6 +119,11 @@ func loadConfig() (*config, error) { } cfg.Seeder = normalizeAddress(cfg.Seeder, cfg.netParams.DefaultPort) + cfg.seederIP, err = netip.ParseAddrPort(cfg.Seeder) + if err != nil { + return nil, fmt.Errorf("invalid seeder ip: %v", err) + } + return &cfg, nil } diff --git a/decred.go b/decred.go index 08cc4f1a..75ddcb24 100644 --- a/decred.go +++ b/decred.go @@ -11,7 +11,6 @@ import ( "net" "net/netip" "os" - "path/filepath" "sync" "time" @@ -28,15 +27,27 @@ const ( defaultNodeTimeout = time.Second * 3 ) -var amgr *Manager +type crawler struct { + params *chaincfg.Params + amgr *Manager + log *log.Logger +} + +func newCrawler(params *chaincfg.Params, amgr *Manager, log *log.Logger) *crawler { + return &crawler{ + params: params, + amgr: amgr, + log: log, + } +} -func testPeer(ctx context.Context, ip netip.AddrPort, netParams *chaincfg.Params) { +func (c *crawler) testPeer(ctx context.Context, ip netip.AddrPort) { onaddr := make(chan struct{}, 1) verack := make(chan struct{}, 1) config := peer.Config{ UserAgentName: appName, UserAgentVersion: "0.0.1", - Net: netParams.Net, + Net: c.params.Net, DisableRelayTx: true, Listeners: peer.MessageListeners{ @@ -48,13 +59,13 @@ func testPeer(ctx context.Context, ip netip.AddrPort, netParams *chaincfg.Params n = append(n, addrPort) } } - added := amgr.AddAddresses(n) - log.Printf("Peer %v sent %v addresses, %d new", + added := c.amgr.AddAddresses(n) + c.log.Printf("Peer %v sent %v addresses, %d new", p.Addr(), len(msg.AddrList), added) onaddr <- struct{}{} }, OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) { - log.Printf("Adding peer %v with services %v pver %d", + c.log.Printf("Adding peer %v with services %v pver %d", p.NA().IP.String(), p.Services(), p.ProtocolVersion()) verack <- struct{}{} }, @@ -64,13 +75,13 @@ func testPeer(ctx context.Context, ip netip.AddrPort, netParams *chaincfg.Params host := ip.String() p, err := peer.NewOutboundPeer(&config, host) if err != nil { - log.Printf("NewOutboundPeer on %v: %v", host, err) + c.log.Printf("NewOutboundPeer on %v: %v", host, err) return } // Time stamp the attempt after disconnect or dial error so we don't prune // this peer before or during its test. - defer amgr.Attempt(ip) + defer c.amgr.Attempt(ip) ctxTimeout, cancel := context.WithTimeout(ctx, defaultNodeTimeout) defer cancel() @@ -86,13 +97,13 @@ func testPeer(ctx context.Context, ip netip.AddrPort, netParams *chaincfg.Params select { case <-verack: // Mark this peer as a good node. - amgr.Good(ip, p.Services(), p.ProtocolVersion()) + c.amgr.Good(ip, p.Services(), p.ProtocolVersion()) // Ask peer for some addresses. p.QueueMessage(wire.NewMsgGetAddr(), nil) case <-time.After(defaultNodeTimeout): - log.Printf("verack timeout on peer %v", p.Addr()) + c.log.Printf("verack timeout on peer %v", p.Addr()) return case <-ctx.Done(): return @@ -101,20 +112,20 @@ func testPeer(ctx context.Context, ip netip.AddrPort, netParams *chaincfg.Params select { case <-onaddr: case <-time.After(defaultNodeTimeout): - log.Printf("getaddr timeout on peer %v", p.Addr()) + c.log.Printf("getaddr timeout on peer %v", p.Addr()) case <-ctx.Done(): } } -func creep(ctx context.Context, netParams *chaincfg.Params) { +func (c *crawler) run(ctx context.Context) { for { if ctx.Err() != nil { return } - ips := amgr.Addresses() + ips := c.amgr.Addresses() if len(ips) == 0 { - log.Printf("No stale addresses -- sleeping for %v", defaultAddressTimeout) + c.log.Printf("No stale addresses -- sleeping for %v", defaultAddressTimeout) select { case <-time.After(defaultAddressTimeout): case <-ctx.Done(): @@ -128,7 +139,7 @@ func creep(ctx context.Context, netParams *chaincfg.Params) { for _, ip := range ips { go func(ip netip.AddrPort) { defer wg.Done() - testPeer(ctx, ip, netParams) + c.testPeer(ctx, ip) }(ip) } wg.Wait() @@ -136,54 +147,66 @@ func creep(ctx context.Context, netParams *chaincfg.Params) { } func main() { + os.Exit(run()) +} + +// run is the real main function for dcrseeder. It is necessary to work around +// the fact that deferred functions do not run when os.Exit() is called. +func run() int { ctx := shutdownListener() cfg, err := loadConfig() if err != nil { fmt.Fprintf(os.Stderr, "loadConfig: %v\n", err) - os.Exit(1) + return 1 } - dataDir := filepath.Join(defaultHomeDir, cfg.netParams.Name) - amgr, err = NewManager(dataDir) + // Prefix log lines with current network, e.g. "[mainnet]" or "[testnet]". + logPrefix := fmt.Sprintf("[%.7s] ", cfg.netParams.Name) + log := log.New(os.Stdout, logPrefix, log.LstdFlags|log.Lmsgprefix) + + amgr, err := NewManager(cfg.dataDir, log) if err != nil { fmt.Fprintf(os.Stderr, "NewManager: %v\n", err) - os.Exit(1) + return 1 } - seeder, err := netip.ParseAddrPort(cfg.Seeder) + amgr.AddAddresses([]netip.AddrPort{cfg.seederIP}) + + c := newCrawler(cfg.netParams, amgr, log) + + server, err := newServer(cfg.Listen, amgr, log) if err != nil { - fmt.Fprintf(os.Stderr, "Invalid seeder ip: %v\n", err) - os.Exit(1) + fmt.Fprint(os.Stderr, err) + return 1 } - amgr.AddAddresses([]netip.AddrPort{seeder}) var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - amgr.run(ctx) // only returns on context cancellation + amgr.run(ctx) // Only returns on context cancellation. log.Print("Address manager done.") }() wg.Add(1) go func() { defer wg.Done() - creep(ctx, cfg.netParams) // only returns on context cancellation + c.run(ctx) // Only returns on context cancellation. log.Print("Crawler done.") }() wg.Add(1) go func() { defer wg.Done() - if err := serveHTTP(ctx, cfg.Listen); err != nil { - log.Fatal(err) - } + server.run(ctx) // Only returns on context cancellation. log.Print("HTTP server done.") }() // Wait for crawler and http server, then stop address manager. wg.Wait() log.Print("Bye!") + + return 0 } diff --git a/http.go b/http.go index c8ea105a..15a5e427 100644 --- a/http.go +++ b/http.go @@ -1,4 +1,4 @@ -// Copyright (c) 2018-2021 The Decred developers +// Copyright (c) 2018-2023 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -8,7 +8,6 @@ import ( "context" "encoding/json" "errors" - "fmt" "log" "net" "net/http" @@ -22,7 +21,7 @@ import ( const defaultHTTPTimeout = 10 * time.Second -func httpGetAddrs(w http.ResponseWriter, r *http.Request) { +func httpGetAddrs(w http.ResponseWriter, r *http.Request, amgr *Manager, log *log.Logger) { var wantedIP uint32 var wantedPV uint32 var wantedSF wire.ServiceFlag @@ -81,14 +80,22 @@ func httpGetAddrs(w http.ResponseWriter, r *http.Request) { } } -func serveHTTP(ctx context.Context, addr string) error { +type server struct { + srv *http.Server + listener net.Listener + log *log.Logger +} + +func newServer(addr string, amgr *Manager, log *log.Logger) (*server, error) { listener, err := net.Listen("tcp", addr) if err != nil { - return fmt.Errorf("can't listen on %s. web server quitting: %w", addr, err) + return nil, err } mux := http.NewServeMux() - mux.HandleFunc(api.GetAddrsPath, httpGetAddrs) + mux.HandleFunc(api.GetAddrsPath, func(w http.ResponseWriter, r *http.Request) { + httpGetAddrs(w, r, amgr, log) + }) srv := &http.Server{ Handler: mux, @@ -96,24 +103,38 @@ func serveHTTP(ctx context.Context, addr string) error { WriteTimeout: defaultHTTPTimeout, // request to response time } - // Shutdown the server on context cancellation. + return &server{ + srv: srv, + listener: listener, + log: log, + }, nil +} + +func (h *server) run(ctx context.Context) { var wg sync.WaitGroup + + // Add the graceful shutdown to the waitgroup. wg.Add(1) go func() { defer wg.Done() + // Wait until context is canceled before shutting down the server. <-ctx.Done() - ctxShutdown, cancel := context.WithTimeout(context.Background(), defaultHTTPTimeout) - defer cancel() - err := srv.Shutdown(ctxShutdown) - if err != nil { - log.Printf("Trouble shutting down HTTP server: %v", err) + _ = h.srv.Shutdown(ctx) + }() + + // Start webserver. + wg.Add(1) + go func() { + defer wg.Done() + + h.log.Printf("Listening on %s", h.listener.Addr()) + err := h.srv.Serve(h.listener) + // ErrServerClosed is expected from a graceful server shutdown, it can + // be ignored. Anything else should be logged. + if err != nil && !errors.Is(err, http.ErrServerClosed) { + h.log.Printf("unexpected (http.Server).Serve error: %v", err) } }() - defer wg.Wait() - err = srv.Serve(listener) // blocking - if !errors.Is(err, http.ErrServerClosed) { - return fmt.Errorf("unexpected (http.Server).Serve error: %w", err) - } - return nil // Shutdown called + wg.Wait() } diff --git a/manager.go b/manager.go index 46d4d67b..9729e7e2 100644 --- a/manager.go +++ b/manager.go @@ -36,6 +36,7 @@ type Manager struct { nodes map[string]*Node peersFile string + log *log.Logger } const ( @@ -62,7 +63,7 @@ const ( pruneExpireTimeout = time.Hour * 24 ) -func NewManager(dataDir string) (*Manager, error) { +func NewManager(dataDir string, log *log.Logger) (*Manager, error) { err := os.MkdirAll(dataDir, 0o700) if err != nil { return nil, err @@ -71,6 +72,7 @@ func NewManager(dataDir string) (*Manager, error) { amgr := Manager{ nodes: make(map[string]*Node), peersFile: filepath.Join(dataDir, peersFilename), + log: log, } err = amgr.deserializePeers() @@ -276,7 +278,7 @@ func (m *Manager) prunePeers() { l := len(m.nodes) m.mtx.Unlock() - log.Printf("Pruned %d addresses: %d remaining", count, l) + m.log.Printf("Pruned %d addresses: %d remaining", count, l) } func (m *Manager) deserializePeers() error { @@ -304,7 +306,7 @@ func (m *Manager) deserializePeers() error { m.nodes = nodes m.mtx.Unlock() - log.Printf("%d nodes loaded from %s", l, filePath) + m.log.Printf("%d nodes loaded from %s", l, filePath) return nil } @@ -316,23 +318,23 @@ func (m *Manager) savePeers() { tmpfile := m.peersFile + ".new" w, err := os.Create(tmpfile) if err != nil { - log.Printf("Error opening file %s: %v", tmpfile, err) + m.log.Printf("Error opening file %s: %v", tmpfile, err) return } enc := json.NewEncoder(w) if err := enc.Encode(&m.nodes); err != nil { w.Close() - log.Printf("Failed to encode file %s: %v", tmpfile, err) + m.log.Printf("Failed to encode file %s: %v", tmpfile, err) return } if err := w.Close(); err != nil { - log.Printf("Error closing file %s: %v", tmpfile, err) + m.log.Printf("Error closing file %s: %v", tmpfile, err) return } if err := os.Rename(tmpfile, m.peersFile); err != nil { - log.Printf("Error writing file %s: %v", m.peersFile, err) + m.log.Printf("Error writing file %s: %v", m.peersFile, err) return } - log.Printf("%d nodes saved to %s", len(m.nodes), m.peersFile) + m.log.Printf("%d nodes saved to %s", len(m.nodes), m.peersFile) }