diff --git a/wakuv2/message_publishing.go b/wakuv2/message_publishing.go index 41f7aede8..14aed7f91 100644 --- a/wakuv2/message_publishing.go +++ b/wakuv2/message_publishing.go @@ -59,6 +59,7 @@ func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage, priority *int) ([]b } func (w *Waku) broadcast() { + defer w.wg.Done() for { var envelope *protocol.Envelope diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 574381b5e..963375468 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -36,7 +36,6 @@ import ( "github.com/jellydator/ttlcache/v3" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" - "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/multiformats/go-multiaddr" "go.uber.org/zap" @@ -55,6 +54,7 @@ import ( "github.com/libp2p/go-libp2p" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/metrics" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" filterapi "github.com/waku-org/go-waku/waku/v2/api/filter" "github.com/waku-org/go-waku/waku/v2/api/missing" @@ -500,7 +500,6 @@ func (w *Waku) connect(peerInfo peer.AddrInfo, enr *enode.Node, origin wps.Origi } func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) { - w.wg.Add(1) defer w.wg.Done() if telemetryServerURL == "" { @@ -539,7 +538,6 @@ func (w *Waku) GetStats() types.StatsSummary { } func (w *Waku) runPeerExchangeLoop() { - w.wg.Add(1) defer w.wg.Done() if !w.cfg.EnablePeerExchangeClient { @@ -1092,9 +1090,12 @@ func (w *Waku) Start() error { } }() + w.wg.Add(1) go w.telemetryBandwidthStats(w.cfg.TelemetryServerURL) //TODO: commenting for now so that only fleet nodes are used. //Need to uncomment once filter peer scoring etc is implemented. + + w.wg.Add(1) go w.runPeerExchangeLoop() if w.cfg.EnableMissingMessageVerification { @@ -1143,9 +1144,11 @@ func (w *Waku) Start() error { numCPU := runtime.NumCPU() for i := 0; i < numCPU; i++ { + w.wg.Add(1) go w.processQueueLoop() } + w.wg.Add(1) go w.broadcast() go w.sendQueue.Start(w.ctx) @@ -1156,6 +1159,7 @@ func (w *Waku) Start() error { } // we should wait `seedBootnodesForDiscV5` shutdown smoothly before set w.ctx to nil within `w.Stop()` + w.wg.Add(1) go w.seedBootnodesForDiscV5() return nil @@ -1217,7 +1221,9 @@ func (w *Waku) startMessageSender() error { messageSentCheck := publish.NewMessageSentCheck(w.ctx, w.node.Store(), w.node.Timesource(), msgStoredChan, msgExpiredChan, w.logger) sender.WithMessageSentCheck(messageSentCheck) + w.wg.Add(1) go func() { + defer w.wg.Done() for { select { case <-w.ctx.Done(): @@ -1408,6 +1414,7 @@ func (w *Waku) postEvent(envelope *common.ReceivedMessage) { // processQueueLoop delivers the messages to the watchers during the lifetime of the waku node. func (w *Waku) processQueueLoop() { + defer w.wg.Done() if w.ctx == nil { return } @@ -1628,7 +1635,6 @@ func (w *Waku) ConnectionChanged(state connection.State) { // It backs off exponentially until maxRetries, at which point it restarts from 0 // It also restarts if there's a connection change signalled from the client func (w *Waku) seedBootnodesForDiscV5() { - w.wg.Add(1) defer w.wg.Done() if !w.cfg.EnableDiscV5 || w.node.DiscV5() == nil {