From 20424d235cc1849e08cc5e2f136526bde4af90f9 Mon Sep 17 00:00:00 2001 From: frank Date: Mon, 23 Sep 2024 17:06:35 +0800 Subject: [PATCH] feat_: log error and stacktrace when panic in goroutine --- library/filter.go | 2 ++ library/relay.go | 2 ++ waku/persistence/store.go | 3 +++ waku/v2/api/filter/filter.go | 5 +++++ waku/v2/api/filter/filter_manager.go | 3 +++ waku/v2/api/missing/missing_messages.go | 4 ++++ waku/v2/api/publish/message_check.go | 2 ++ waku/v2/api/publish/message_queue.go | 2 ++ waku/v2/discv5/discover.go | 2 ++ waku/v2/discv5/mock_peer_discoverer.go | 2 ++ waku/v2/node/connectedness.go | 2 ++ waku/v2/node/keepalive.go | 3 +++ waku/v2/node/localnode.go | 3 +++ waku/v2/node/wakunode2.go | 4 ++++ waku/v2/peermanager/connection_gater.go | 2 ++ waku/v2/peermanager/fastest_peer_selector.go | 3 +++ waku/v2/peermanager/peer_connector.go | 5 +++++ waku/v2/peermanager/peer_discovery.go | 2 ++ waku/v2/peermanager/peer_manager.go | 3 +++ waku/v2/peermanager/topic_event_handler.go | 2 ++ waku/v2/protocol/filter/client.go | 5 +++++ waku/v2/protocol/filter/filter_health_check.go | 3 +++ waku/v2/protocol/filter/server.go | 2 ++ waku/v2/protocol/filter/subscribers_map.go | 3 +++ waku/v2/protocol/legacy_store/waku_store_protocol.go | 3 +++ waku/v2/protocol/lightpush/waku_lightpush.go | 1 + waku/v2/protocol/metadata/waku_metadata.go | 2 ++ waku/v2/protocol/noise/pairing.go | 2 ++ waku/v2/protocol/noise/pairing_relay_messenger.go | 3 +++ waku/v2/protocol/peer_exchange/client.go | 2 ++ waku/v2/protocol/peer_exchange/protocol.go | 2 ++ waku/v2/protocol/relay/broadcast.go | 2 ++ waku/v2/protocol/relay/metrics.go | 2 ++ waku/v2/protocol/relay/topic_events.go | 2 ++ waku/v2/protocol/relay/waku_relay.go | 2 ++ .../rln/group_manager/dynamic/membership_fetcher.go | 2 ++ waku/v2/protocol/rln/nullifier_log.go | 2 ++ waku/v2/rendezvous/db.go | 2 ++ waku/v2/rendezvous/rendezvous.go | 3 +++ waku/v2/service/common_discovery_service.go | 2 ++ waku/v2/timesource/ntp.go | 3 +++ waku/v2/utils/logger.go | 8 ++++++++ 42 files changed, 114 insertions(+) diff --git a/library/filter.go b/library/filter.go index 59590eb94..ef1d8cc0d 100644 --- a/library/filter.go +++ b/library/filter.go @@ -10,6 +10,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/subscription" + "github.com/waku-org/go-waku/waku/v2/utils" ) type filterArgument struct { @@ -74,6 +75,7 @@ func FilterSubscribe(instance *WakuInstance, filterJSON string, peerID string, m for _, subscriptionDetails := range subscriptions { go func(subscriptionDetails *subscription.SubscriptionDetails) { + defer utils.LogOnPanic() for envelope := range subscriptionDetails.C { send(instance, "message", toSubscriptionMessage(envelope)) } diff --git a/library/relay.go b/library/relay.go index 666904e5f..6142a1bea 100644 --- a/library/relay.go +++ b/library/relay.go @@ -7,6 +7,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/utils" ) // RelayEnoughPeers determines if there are enough peers to publish a message on a topic @@ -66,6 +67,7 @@ func relaySubscribe(instance *WakuInstance, filterJSON string) error { for _, sub := range subscriptions { go func(subscription *relay.Subscription) { + defer utils.LogOnPanic() for envelope := range subscription.Ch { send(instance, "message", toSubscriptionMessage(envelope)) } diff --git a/waku/persistence/store.go b/waku/persistence/store.go index 10540c7ce..5ca38afbe 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -14,6 +14,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/timesource" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" "google.golang.org/protobuf/proto" ) @@ -186,6 +187,7 @@ func (d *DBStore) Start(ctx context.Context, timesource timesource.Timesource) e } func (d *DBStore) updateMetrics(ctx context.Context) { + defer utils.LogOnPanic() ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() defer d.wg.Done() @@ -251,6 +253,7 @@ func (d *DBStore) getDeleteOldRowsQuery() string { } func (d *DBStore) checkForOlderRecords(ctx context.Context, t time.Duration) { + defer utils.LogOnPanic() defer d.wg.Done() ticker := time.NewTicker(t) diff --git a/waku/v2/api/filter/filter.go b/waku/v2/api/filter/filter.go index b8cf14550..16f263367 100644 --- a/waku/v2/api/filter/filter.go +++ b/waku/v2/api/filter/filter.go @@ -11,6 +11,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/subscription" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -98,6 +99,7 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte } func (apiSub *Sub) Unsubscribe(contentFilter protocol.ContentFilter) { + defer utils.LogOnPanic() _, err := apiSub.wf.Unsubscribe(apiSub.ctx, contentFilter) //Not reading result unless we want to do specific error handling? if err != nil { @@ -106,6 +108,7 @@ func (apiSub *Sub) Unsubscribe(contentFilter protocol.ContentFilter) { } func (apiSub *Sub) subscriptionLoop(batchInterval time.Duration) { + defer utils.LogOnPanic() ticker := time.NewTicker(batchInterval) defer ticker.Stop() for { @@ -213,12 +216,14 @@ func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) { for _, subDetails := range subs { apiSub.subs[subDetails.ID] = subDetails go func(subDetails *subscription.SubscriptionDetails) { + defer utils.LogOnPanic() apiSub.log.Debug("new multiplex", zap.String("sub-id", subDetails.ID)) for env := range subDetails.C { apiSub.DataCh <- env } }(subDetails) go func(subDetails *subscription.SubscriptionDetails) { + defer utils.LogOnPanic() select { case <-apiSub.ctx.Done(): return diff --git a/waku/v2/api/filter/filter_manager.go b/waku/v2/api/filter/filter_manager.go index 84261882e..b4933a798 100644 --- a/waku/v2/api/filter/filter_manager.go +++ b/waku/v2/api/filter/filter_manager.go @@ -13,6 +13,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/onlinechecker" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/utils" ) // Methods on FilterManager just aggregate filters from application and subscribe to them @@ -87,6 +88,7 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter } func (mgr *FilterManager) startFilterSubLoop() { + defer utils.LogOnPanic() ticker := time.NewTicker(mgr.filterSubBatchDuration) defer ticker.Stop() for { @@ -157,6 +159,7 @@ func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFi } func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) { + defer utils.LogOnPanic() ctx, cancel := context.WithCancel(mgr.ctx) config := FilterConfig{MaxPeers: mgr.minPeersPerFilter} sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params) diff --git a/waku/v2/api/missing/missing_messages.go b/waku/v2/api/missing/missing_messages.go index 058af9a48..ca8b63fb7 100644 --- a/waku/v2/api/missing/missing_messages.go +++ b/waku/v2/api/missing/missing_messages.go @@ -15,6 +15,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/timesource" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" "google.golang.org/protobuf/proto" ) @@ -102,6 +103,7 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) { m.C = c go func() { + defer utils.LogOnPanic() t := time.NewTicker(m.params.interval) defer t.Stop() @@ -123,6 +125,7 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) { default: semaphore <- struct{}{} go func(interest criteriaInterest) { + defer utils.LogOnPanic() m.fetchHistory(c, interest) <-semaphore }(interest) @@ -276,6 +279,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, wg.Add(1) go func(messageHashes []pb.MessageHash) { + defer utils.LogOnPanic() defer wg.Wait() result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (*store.Result, error) { diff --git a/waku/v2/api/publish/message_check.go b/waku/v2/api/publish/message_check.go index 67a67c913..c6df0f29a 100644 --- a/waku/v2/api/publish/message_check.go +++ b/waku/v2/api/publish/message_check.go @@ -14,6 +14,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/timesource" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -145,6 +146,7 @@ func (m *MessageSentCheck) SetStorePeerID(peerID peer.ID) { // Start checks if the tracked outgoing messages are stored periodically func (m *MessageSentCheck) Start() { + defer utils.LogOnPanic() ticker := time.NewTicker(m.hashQueryInterval) defer ticker.Stop() for { diff --git a/waku/v2/api/publish/message_queue.go b/waku/v2/api/publish/message_queue.go index 03b7a16a6..50d3d75c3 100644 --- a/waku/v2/api/publish/message_queue.go +++ b/waku/v2/api/publish/message_queue.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/utils" ) // MessagePriority determines the ordering for the message priority queue @@ -182,6 +183,7 @@ func (m *MessageQueue) Pop(ctx context.Context) <-chan *protocol.Envelope { ch := make(chan *protocol.Envelope) go func() { + defer utils.LogOnPanic() defer close(ch) select { diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index 582e46d24..a92b8861f 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -172,6 +172,7 @@ func (d *DiscoveryV5) listen(ctx context.Context) error { if d.NAT != nil && !d.udpAddr.IP.IsLoopback() { d.WaitGroup().Add(1) go func() { + defer utils.LogOnPanic() defer d.WaitGroup().Done() nat.Map(d.NAT, ctx.Done(), "udp", d.udpAddr.Port, d.udpAddr.Port, "go-waku discv5 discovery") }() @@ -217,6 +218,7 @@ func (d *DiscoveryV5) start() error { if d.params.autoFindPeers { d.WaitGroup().Add(1) go func() { + defer utils.LogOnPanic() defer d.WaitGroup().Done() d.runDiscoveryV5Loop(d.Context()) }() diff --git a/waku/v2/discv5/mock_peer_discoverer.go b/waku/v2/discv5/mock_peer_discoverer.go index 5bef85421..f997f028c 100644 --- a/waku/v2/discv5/mock_peer_discoverer.go +++ b/waku/v2/discv5/mock_peer_discoverer.go @@ -6,6 +6,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/service" + "github.com/waku-org/go-waku/waku/v2/utils" ) // TestPeerDiscoverer is mock peer discoverer for testing @@ -26,6 +27,7 @@ func NewTestPeerDiscoverer() *TestPeerDiscoverer { // Subscribe is for subscribing to peer discoverer func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan service.PeerData) { go func() { + defer utils.LogOnPanic() for { select { case <-ctx.Done(): diff --git a/waku/v2/node/connectedness.go b/waku/v2/node/connectedness.go index 5a0f89fe2..3526b1d29 100644 --- a/waku/v2/node/connectedness.go +++ b/waku/v2/node/connectedness.go @@ -12,6 +12,7 @@ import ( "go.uber.org/zap" wps "github.com/waku-org/go-waku/waku/v2/peerstore" + "github.com/waku-org/go-waku/waku/v2/utils" ) // PeerStatis is a map of peer IDs to supported protocols @@ -101,6 +102,7 @@ func (c ConnectionNotifier) Close() { } func (w *WakuNode) connectednessListener(ctx context.Context) { + defer utils.LogOnPanic() defer w.wg.Done() for { diff --git a/waku/v2/node/keepalive.go b/waku/v2/node/keepalive.go index 94ebbb74a..416666f55 100644 --- a/waku/v2/node/keepalive.go +++ b/waku/v2/node/keepalive.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" "golang.org/x/exp/maps" ) @@ -40,6 +41,7 @@ func disconnectAllPeers(host host.Host, logger *zap.Logger) { // This is necessary because TCP connections are automatically closed due to inactivity, // and doing a ping will avoid this (with a small bandwidth cost) func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration time.Duration, allPeersPingDuration time.Duration) { + defer utils.LogOnPanic() defer w.wg.Done() if !w.opts.enableRelay { @@ -168,6 +170,7 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration t } func (w *WakuNode) pingPeer(ctx context.Context, wg *sync.WaitGroup, peerID peer.ID, resultChan chan bool) { + defer utils.LogOnPanic() defer wg.Done() logger := w.log.With(logging.HostID("peer", peerID)) diff --git a/waku/v2/node/localnode.go b/waku/v2/node/localnode.go index 9de6c59fd..5b042734b 100644 --- a/waku/v2/node/localnode.go +++ b/waku/v2/node/localnode.go @@ -15,6 +15,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -358,6 +359,7 @@ func (w *WakuNode) watchTopicShards(ctx context.Context) error { } go func() { + defer utils.LogOnPanic() defer evtRelaySubscribed.Close() defer evtRelayUnsubscribed.Close() @@ -411,6 +413,7 @@ func (w *WakuNode) registerAndMonitorReachability(ctx context.Context) { } w.wg.Add(1) go func() { + defer utils.LogOnPanic() defer myEventSub.Close() defer w.wg.Done() diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 10153fd6d..5929dd57a 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -214,6 +214,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { func(ctx context.Context, numPeers int) <-chan peer.AddrInfo { r := make(chan peer.AddrInfo) go func() { + defer utils.LogOnPanic() defer close(r) for ; numPeers != 0; numPeers-- { select { @@ -308,6 +309,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { } func (w *WakuNode) watchMultiaddressChanges(ctx context.Context) { + defer utils.LogOnPanic() defer w.wg.Done() addrsSet := utils.MultiAddrSet(w.ListenAddresses()...) @@ -550,6 +552,7 @@ func (w *WakuNode) ID() string { } func (w *WakuNode) watchENRChanges(ctx context.Context) { + defer utils.LogOnPanic() defer w.wg.Done() var prevNodeVal string @@ -885,6 +888,7 @@ func (w *WakuNode) PeersByContentTopic(contentTopic string) peer.IDSlice { } func (w *WakuNode) findRelayNodes(ctx context.Context) { + defer utils.LogOnPanic() defer w.wg.Done() // Feed peers more often right after the bootstrap, then backoff diff --git a/waku/v2/peermanager/connection_gater.go b/waku/v2/peermanager/connection_gater.go index d08008139..3a326d6bf 100644 --- a/waku/v2/peermanager/connection_gater.go +++ b/waku/v2/peermanager/connection_gater.go @@ -9,6 +9,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -77,6 +78,7 @@ func (c *ConnectionGater) InterceptUpgraded(_ network.Conn) (allow bool, reason // NotifyDisconnect is called when a connection disconnects. func (c *ConnectionGater) NotifyDisconnect(addr multiaddr.Multiaddr) { + defer utils.LogOnPanic() ip, err := manet.ToIP(addr) if err != nil { return diff --git a/waku/v2/peermanager/fastest_peer_selector.go b/waku/v2/peermanager/fastest_peer_selector.go index 225d59402..7f2ce6ddb 100644 --- a/waku/v2/peermanager/fastest_peer_selector.go +++ b/waku/v2/peermanager/fastest_peer_selector.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -69,9 +70,11 @@ func (r *FastestPeerSelector) FastestPeer(ctx context.Context, peers peer.IDSlic pinged := make(map[peer.ID]struct{}) go func() { + defer utils.LogOnPanic() // Ping any peer with no latency recorded for peerToPing := range pingCh { go func(p peer.ID) { + defer utils.LogOnPanic() defer wg.Done() rtt := time.Hour result, err := r.PingPeer(ctx, p) diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index bd844b20c..c1f2f70af 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -19,6 +19,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/onlinechecker" wps "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/service" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -104,6 +105,7 @@ func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan servic // if running start a goroutine to consume the subscription c.WaitGroup().Add(1) go func() { + defer utils.LogOnPanic() defer c.WaitGroup().Done() c.consumeSubscription(subscription{ctx, ch}) }() @@ -187,6 +189,7 @@ func (c *PeerConnectionStrategy) consumeSubscriptions() { for _, subs := range c.subscriptions { c.WaitGroup().Add(1) go func(s subscription) { + defer utils.LogOnPanic() defer c.WaitGroup().Done() c.consumeSubscription(s) }(subs) @@ -234,6 +237,7 @@ func (c *PeerConnectionStrategy) addConnectionBackoff(peerID peer.ID) { } func (c *PeerConnectionStrategy) dialPeers() { + defer utils.LogOnPanic() defer c.WaitGroup().Done() maxGoRoutines := c.pm.OutPeersTarget @@ -273,6 +277,7 @@ func (c *PeerConnectionStrategy) dialPeers() { } func (c *PeerConnectionStrategy) dialPeer(pi peer.AddrInfo, sem chan struct{}) { + defer utils.LogOnPanic() defer c.WaitGroup().Done() ctx, cancel := context.WithTimeout(c.Context(), c.dialTimeout) defer cancel() diff --git a/waku/v2/peermanager/peer_discovery.go b/waku/v2/peermanager/peer_discovery.go index 8ab1c8beb..89868510b 100644 --- a/waku/v2/peermanager/peer_discovery.go +++ b/waku/v2/peermanager/peer_discovery.go @@ -11,6 +11,7 @@ import ( wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/service" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -103,6 +104,7 @@ func (pm *PeerManager) discoverOnDemand(cluster uint16, } func (pm *PeerManager) discoverPeersByPubsubTopics(pubsubTopics []string, proto protocol.ID, ctx context.Context, maxCount int) { + defer utils.LogOnPanic() shardsInfo, err := waku_proto.TopicsToRelayShards(pubsubTopics...) if err != nil { pm.logger.Error("failed to convert pubsub topic to shard", zap.Strings("topics", pubsubTopics), zap.Error(err)) diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 2ac489a04..45341a6a5 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -23,6 +23,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/metadata" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/service" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -252,6 +253,7 @@ func (pm *PeerManager) Start(ctx context.Context) { } func (pm *PeerManager) peerStoreLoop(ctx context.Context) { + defer utils.LogOnPanic() t := time.NewTicker(prunePeerStoreInterval) defer t.Stop() for { @@ -353,6 +355,7 @@ func (pm *PeerManager) prunePeerStore() { // This is a connectivity loop, which currently checks and prunes inbound connections. func (pm *PeerManager) connectivityLoop(ctx context.Context) { + defer utils.LogOnPanic() pm.connectToPeers() t := time.NewTicker(peerConnectivityLoopSecs * time.Second) defer t.Stop() diff --git a/waku/v2/peermanager/topic_event_handler.go b/waku/v2/peermanager/topic_event_handler.go index 1b965ef03..41a7760f6 100644 --- a/waku/v2/peermanager/topic_event_handler.go +++ b/waku/v2/peermanager/topic_event_handler.go @@ -12,6 +12,7 @@ import ( wps "github.com/waku-org/go-waku/waku/v2/peerstore" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" "golang.org/x/exp/maps" ) @@ -162,6 +163,7 @@ func (pm *PeerManager) handlerPeerTopicEvent(peerEvt relay.EvtPeerTopic) { } func (pm *PeerManager) peerEventLoop(ctx context.Context) { + defer utils.LogOnPanic() defer pm.sub.Close() for { select { diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index a9d2b496d..181a9a090 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -28,6 +28,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/subscription" "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/timesource" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" "golang.org/x/exp/maps" "golang.org/x/exp/slices" @@ -127,6 +128,7 @@ func (wf *WakuFilterLightNode) Stop() { wf.h.RemoveStreamHandler(FilterPushID_v20beta1) if wf.subscriptions.Count() > 0 { go func() { + defer utils.LogOnPanic() defer func() { _ = recover() }() @@ -414,6 +416,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot for i, peerID := range selectedPeers { wg.Add(1) go func(index int, ID peer.ID) { + defer utils.LogOnPanic() defer wg.Done() err := wf.request( reqCtx, @@ -565,6 +568,7 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr // send unsubscribe request to all the peers for peerID := range peers { go func(peerID peer.ID) { + defer utils.LogOnPanic() defer func() { if params.wg != nil { params.wg.Done() @@ -687,6 +691,7 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte } for peerId := range peers { go func(peerID peer.ID) { + defer utils.LogOnPanic() defer func() { if params.wg != nil { params.wg.Done() diff --git a/waku/v2/protocol/filter/filter_health_check.go b/waku/v2/protocol/filter/filter_health_check.go index a6b76a340..126090d99 100644 --- a/waku/v2/protocol/filter/filter_health_check.go +++ b/waku/v2/protocol/filter/filter_health_check.go @@ -5,6 +5,7 @@ import ( "time" "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -19,6 +20,7 @@ func (wf *WakuFilterLightNode) PingPeers() { } func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) { + defer utils.LogOnPanic() ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), PingTimeout) defer cancel() err := wf.Ping(ctxWithTimeout, peer) @@ -41,6 +43,7 @@ func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) { } func (wf *WakuFilterLightNode) FilterHealthCheckLoop() { + defer utils.LogOnPanic() defer wf.WaitGroup().Done() ticker := time.NewTicker(wf.peerPingInterval) defer ticker.Stop() diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index 9a2e25d6b..99913a7c2 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -216,6 +216,7 @@ func (wf *WakuFilterFullNode) unsubscribeAll(ctx context.Context, stream network } func (wf *WakuFilterFullNode) filterListener(ctx context.Context) { + defer utils.LogOnPanic() defer wf.WaitGroup().Done() // This function is invoked for each message received @@ -237,6 +238,7 @@ func (wf *WakuFilterFullNode) filterListener(ctx context.Context) { logger.Debug("pushing message to light node") wf.WaitGroup().Add(1) go func(subscriber peer.ID) { + defer utils.LogOnPanic() defer wf.WaitGroup().Done() start := time.Now() err := wf.pushMessage(ctx, logger, subscriber, envelope) diff --git a/waku/v2/protocol/filter/subscribers_map.go b/waku/v2/protocol/filter/subscribers_map.go index faa5700ca..b539357e9 100644 --- a/waku/v2/protocol/filter/subscribers_map.go +++ b/waku/v2/protocol/filter/subscribers_map.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/utils" ) type PeerSet map[peer.ID]struct{} @@ -188,6 +189,7 @@ func (sub *SubscribersMap) Items(pubsubTopic string, contentTopic string) <-chan key := getKey(pubsubTopic, contentTopic) f := func() { + defer utils.LogOnPanic() sub.RLock() defer sub.RUnlock() @@ -236,6 +238,7 @@ func (sub *SubscribersMap) Refresh(peerID peer.ID) { } func (sub *SubscribersMap) cleanUp(ctx context.Context, cleanupInterval time.Duration) { + defer utils.LogOnPanic() t := time.NewTicker(cleanupInterval) defer t.Stop() diff --git a/waku/v2/protocol/legacy_store/waku_store_protocol.go b/waku/v2/protocol/legacy_store/waku_store_protocol.go index 16eabe8f0..90229b5bd 100644 --- a/waku/v2/protocol/legacy_store/waku_store_protocol.go +++ b/waku/v2/protocol/legacy_store/waku_store_protocol.go @@ -21,6 +21,7 @@ import ( wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" + "github.com/waku-org/go-waku/waku/v2/utils" ) func findMessages(query *pb.HistoryQuery, msgProvider MessageProvider) ([]*wpb.WakuMessage, *pb.PagingInfo, error) { @@ -159,9 +160,11 @@ func (store *WakuStore) storeMessage(env *protocol.Envelope) error { } func (store *WakuStore) storeIncomingMessages(ctx context.Context) { + defer utils.LogOnPanic() defer store.wg.Done() for envelope := range store.MsgC.Ch { go func(env *protocol.Envelope) { + defer utils.LogOnPanic() _ = store.storeMessage(env) }(envelope) } diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 8200fddfc..65d941aed 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -335,6 +335,7 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa for i, peerID := range params.selectedPeers { wg.Add(1) go func(index int, id peer.ID) { + defer utils.LogOnPanic() paramsValue := *params paramsValue.requestID = protocol.GenerateRequestID() defer wg.Done() diff --git a/waku/v2/protocol/metadata/waku_metadata.go b/waku/v2/protocol/metadata/waku_metadata.go index 47cf088dc..343279809 100644 --- a/waku/v2/protocol/metadata/waku_metadata.go +++ b/waku/v2/protocol/metadata/waku_metadata.go @@ -20,6 +20,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/metadata/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -225,6 +226,7 @@ func (wakuM *WakuMetadata) disconnectPeer(peerID peer.ID, reason error) { // Connected is called when a connection is opened func (wakuM *WakuMetadata) Connected(n network.Network, cc network.Conn) { go func() { + defer utils.LogOnPanic() wakuM.log.Debug("peer connected", zap.Stringer("peer", cc.RemotePeer())) // Metadata verification is done only if a clusterID is specified if wakuM.clusterID == 0 { diff --git a/waku/v2/protocol/noise/pairing.go b/waku/v2/protocol/noise/pairing.go index a2e74fcfe..b092f7745 100644 --- a/waku/v2/protocol/noise/pairing.go +++ b/waku/v2/protocol/noise/pairing.go @@ -11,6 +11,7 @@ import ( n "github.com/waku-org/go-noise" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -210,6 +211,7 @@ func (p *Pairing) initiatorHandshake(ctx context.Context, msgCh <-chan *pb.WakuM doneCh = make(chan error, 1) go func() { + defer utils.LogOnPanic() defer close(doneCh) // The handshake initiator writes a Waku2 payload v2 containing the handshake message // and the (encrypted) transport message diff --git a/waku/v2/protocol/noise/pairing_relay_messenger.go b/waku/v2/protocol/noise/pairing_relay_messenger.go index def8ba306..a57c8f97b 100644 --- a/waku/v2/protocol/noise/pairing_relay_messenger.go +++ b/waku/v2/protocol/noise/pairing_relay_messenger.go @@ -8,6 +8,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" + "github.com/waku-org/go-waku/waku/v2/utils" "google.golang.org/protobuf/proto" ) @@ -67,6 +68,7 @@ func NewWakuRelayMessenger(ctx context.Context, r *relay.WakuRelay, pubsubTopic } go func() { + defer utils.LogOnPanic() for { select { case <-ctx.Done(): @@ -97,6 +99,7 @@ func (r *NoiseWakuRelay) Subscribe(ctx context.Context, contentTopic string) <-c r.subscriptionChPerContentTopic[contentTopic] = subscriptionCh go func() { + defer utils.LogOnPanic() for { select { case <-ctx.Done(): diff --git a/waku/v2/protocol/peer_exchange/client.go b/waku/v2/protocol/peer_exchange/client.go index f901590d5..1e558bccc 100644 --- a/waku/v2/protocol/peer_exchange/client.go +++ b/waku/v2/protocol/peer_exchange/client.go @@ -16,6 +16,7 @@ import ( wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb" "github.com/waku-org/go-waku/waku/v2/service" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -154,6 +155,7 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb wakuPX.log.Info("connecting to newly discovered peers", zap.Int("count", len(discoveredPeers))) wakuPX.WaitGroup().Add(1) go func() { + defer utils.LogOnPanic() defer wakuPX.WaitGroup().Done() peerCh := make(chan service.PeerData) diff --git a/waku/v2/protocol/peer_exchange/protocol.go b/waku/v2/protocol/peer_exchange/protocol.go index 3f33b2ec8..5a3821b96 100644 --- a/waku/v2/protocol/peer_exchange/protocol.go +++ b/waku/v2/protocol/peer_exchange/protocol.go @@ -21,6 +21,7 @@ import ( wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb" "github.com/waku-org/go-waku/waku/v2/service" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" "golang.org/x/time/rate" ) @@ -223,6 +224,7 @@ func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) error { } func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop(ctx context.Context) { + defer utils.LogOnPanic() defer wakuPX.WaitGroup().Done() // Runs a discv5 loop adding new peers to the px peer cache diff --git a/waku/v2/protocol/relay/broadcast.go b/waku/v2/protocol/relay/broadcast.go index 36ca3e1e7..91ece5d8a 100644 --- a/waku/v2/protocol/relay/broadcast.go +++ b/waku/v2/protocol/relay/broadcast.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/utils" ) type BroadcasterParameters struct { @@ -174,6 +175,7 @@ func (b *broadcaster) Start(ctx context.Context) error { } func (b *broadcaster) run(ctx context.Context) { + defer utils.LogOnPanic() for { select { case <-ctx.Done(): diff --git a/waku/v2/protocol/relay/metrics.go b/waku/v2/protocol/relay/metrics.go index 4a10a0a9a..a6b33cc99 100644 --- a/waku/v2/protocol/relay/metrics.go +++ b/waku/v2/protocol/relay/metrics.go @@ -5,6 +5,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -56,6 +57,7 @@ func newMetrics(reg prometheus.Registerer, logger *zap.Logger) Metrics { // RecordMessage is used to increase the counter for the number of messages received via waku relay func (m *metricsImpl) RecordMessage(envelope *waku_proto.Envelope) { go func() { + defer utils.LogOnPanic() payloadSizeInBytes := len(envelope.Message().Payload) payloadSizeInKb := float64(payloadSizeInBytes) / 1000 messageSize.Observe(payloadSizeInKb) diff --git a/waku/v2/protocol/relay/topic_events.go b/waku/v2/protocol/relay/topic_events.go index 60261a3b3..0b828b867 100644 --- a/waku/v2/protocol/relay/topic_events.go +++ b/waku/v2/protocol/relay/topic_events.go @@ -8,6 +8,7 @@ import ( "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -51,6 +52,7 @@ func (w *WakuRelay) addPeerTopicEventListener(topic *pubsub.Topic) (*pubsub.Topi } func (w *WakuRelay) topicEventPoll(topic string, handler *pubsub.TopicEventHandler) { + defer utils.LogOnPanic() defer w.WaitGroup().Done() for { evt, err := handler.NextPeerEvent(w.Context()) diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 2ff8329af..24964dd18 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -439,6 +439,7 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont subscriptions = append(subscriptions, subscription) go func() { + defer utils.LogOnPanic() <-ctx.Done() subscription.Unsubscribe() }() @@ -533,6 +534,7 @@ func (w *WakuRelay) unsubscribeFromPubsubTopic(topicData *pubsubTopicSubscriptio } func (w *WakuRelay) pubsubTopicMsgHandler(sub *pubsub.Subscription) { + defer utils.LogOnPanic() defer w.WaitGroup().Done() for { diff --git a/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.go b/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.go index 42c3b5f26..a73b91264 100644 --- a/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.go +++ b/waku/v2/protocol/rln/group_manager/dynamic/membership_fetcher.go @@ -15,6 +15,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/rln/contracts" "github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager" "github.com/waku-org/go-waku/waku/v2/protocol/rln/web3" + "github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/go-zerokit-rln/rln" "go.uber.org/zap" ) @@ -120,6 +121,7 @@ func (mf *MembershipFetcher) loadOldEvents(ctx context.Context, fromBlock, toBlo } func (mf *MembershipFetcher) watchNewEvents(ctx context.Context, fromBlock uint64, handler RegistrationEventHandler, errCh chan<- error) { + defer utils.LogOnPanic() defer mf.wg.Done() // Watch for new events diff --git a/waku/v2/protocol/rln/nullifier_log.go b/waku/v2/protocol/rln/nullifier_log.go index 1bf6263ad..cc122a797 100644 --- a/waku/v2/protocol/rln/nullifier_log.go +++ b/waku/v2/protocol/rln/nullifier_log.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/go-zerokit-rln/rln" "go.uber.org/zap" ) @@ -89,6 +90,7 @@ func (n *NullifierLog) HasDuplicate(proofMD rln.ProofMetadata) (bool, error) { // cleanup cleans up the log every time there are more than MaxEpochGap epochs stored in it func (n *NullifierLog) cleanup(ctx context.Context) { + defer utils.LogOnPanic() t := time.NewTicker(1 * time.Minute) // TODO: tune this defer t.Stop() diff --git a/waku/v2/rendezvous/db.go b/waku/v2/rendezvous/db.go index 1d751fede..f4d9ba196 100644 --- a/waku/v2/rendezvous/db.go +++ b/waku/v2/rendezvous/db.go @@ -13,6 +13,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" dbi "github.com/waku-org/go-libp2p-rendezvous/db" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -315,6 +316,7 @@ func (db *DB) ValidCookie(ns string, cookie []byte) bool { } func (db *DB) background(ctx context.Context) { + defer utils.LogOnPanic() for { db.cleanupExpired() diff --git a/waku/v2/rendezvous/rendezvous.go b/waku/v2/rendezvous/rendezvous.go index 76c63ff5c..69862f60e 100644 --- a/waku/v2/rendezvous/rendezvous.go +++ b/waku/v2/rendezvous/rendezvous.go @@ -11,6 +11,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/service" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -142,6 +143,7 @@ func (r *Rendezvous) Register(ctx context.Context, rendezvousPoints []*Rendezvou // RegisterShard registers the node in the rendezvous points using a shard as namespace func (r *Rendezvous) RegisterShard(ctx context.Context, cluster uint16, shard uint16, rendezvousPoints []*RendezvousPoint) { + defer utils.LogOnPanic() namespace := ShardToNamespace(cluster, shard) r.RegisterWithNamespace(ctx, namespace, rendezvousPoints) } @@ -158,6 +160,7 @@ func (r *Rendezvous) RegisterWithNamespace(ctx context.Context, namespace string for _, m := range rendezvousPoints { r.WaitGroup().Add(1) go func(m *RendezvousPoint) { + defer utils.LogOnPanic() r.WaitGroup().Done() rendezvousClient := rvs.NewRendezvousClient(r.host, m.id) diff --git a/waku/v2/service/common_discovery_service.go b/waku/v2/service/common_discovery_service.go index 0aa6b8528..a2a4b2215 100644 --- a/waku/v2/service/common_discovery_service.go +++ b/waku/v2/service/common_discovery_service.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p/core/peer" wps "github.com/waku-org/go-waku/waku/v2/peerstore" + "github.com/waku-org/go-waku/waku/v2/utils" ) // PeerData contains information about a peer useful in establishing connections with it. @@ -58,6 +59,7 @@ func (sp *CommonDiscoveryService) GetListeningChan() <-chan PeerData { return sp.channel } func (sp *CommonDiscoveryService) PushToChan(data PeerData) bool { + defer utils.LogOnPanic() if err := sp.ErrOnNotRunning(); err != nil { return false } diff --git a/waku/v2/timesource/ntp.go b/waku/v2/timesource/ntp.go index 3454631e3..a27d03fc8 100644 --- a/waku/v2/timesource/ntp.go +++ b/waku/v2/timesource/ntp.go @@ -9,6 +9,7 @@ import ( "time" "github.com/beevik/ntp" + "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -69,6 +70,7 @@ func computeOffset(timeQuery ntpQuery, servers []string, allowedFailures int) (t responses := make(chan queryResponse, len(servers)) for _, server := range servers { go func(server string) { + defer utils.LogOnPanic() response, err := timeQuery(server, ntp.QueryOptions{ Timeout: DefaultRPCTimeout, }) @@ -172,6 +174,7 @@ func (s *NTPTimeSource) runPeriodically(ctx context.Context, fn func() error) er // we try to do it synchronously so that user can have reliable messages right away s.wg.Add(1) go func() { + defer utils.LogOnPanic() for { select { case <-time.After(period): diff --git a/waku/v2/utils/logger.go b/waku/v2/utils/logger.go index 02b82eee4..beba67d87 100644 --- a/waku/v2/utils/logger.go +++ b/waku/v2/utils/logger.go @@ -1,6 +1,7 @@ package utils import ( + "runtime/debug" "strings" logging "github.com/ipfs/go-log/v2" @@ -81,3 +82,10 @@ func InitLogger(encoding string, output string, name string, level zapcore.Level log = logging.Logger(name).Desugar() } + +func LogOnPanic() { + if err := recover(); err != nil { + Logger().Error("panic in goroutine", zap.Any("error", err), zap.String("stacktrace", string(debug.Stack()))) + panic(err) + } +}