fix_: wakuv2 waitgroups (#5814)

* fix(wakuv2)_: usage of waitgroup

* fix_: revert changes in ConnectionChanged
This commit is contained in:
Igor Sirotin 2024-09-12 15:04:37 +01:00 committed by GitHub
parent 9038c66819
commit 13ab5b6f24
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 11 additions and 4 deletions

View File

@ -59,6 +59,7 @@ func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage, priority *int) ([]b
} }
func (w *Waku) broadcast() { func (w *Waku) broadcast() {
defer w.wg.Done()
for { for {
var envelope *protocol.Envelope var envelope *protocol.Envelope

View File

@ -36,7 +36,6 @@ import (
"github.com/jellydator/ttlcache/v3" "github.com/jellydator/ttlcache/v3"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"go.uber.org/zap" "go.uber.org/zap"
@ -55,6 +54,7 @@ import (
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/metrics" "github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
filterapi "github.com/waku-org/go-waku/waku/v2/api/filter" filterapi "github.com/waku-org/go-waku/waku/v2/api/filter"
"github.com/waku-org/go-waku/waku/v2/api/missing" "github.com/waku-org/go-waku/waku/v2/api/missing"
@ -500,7 +500,6 @@ func (w *Waku) connect(peerInfo peer.AddrInfo, enr *enode.Node, origin wps.Origi
} }
func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) { func (w *Waku) telemetryBandwidthStats(telemetryServerURL string) {
w.wg.Add(1)
defer w.wg.Done() defer w.wg.Done()
if telemetryServerURL == "" { if telemetryServerURL == "" {
@ -539,7 +538,6 @@ func (w *Waku) GetStats() types.StatsSummary {
} }
func (w *Waku) runPeerExchangeLoop() { func (w *Waku) runPeerExchangeLoop() {
w.wg.Add(1)
defer w.wg.Done() defer w.wg.Done()
if !w.cfg.EnablePeerExchangeClient { if !w.cfg.EnablePeerExchangeClient {
@ -1092,9 +1090,12 @@ func (w *Waku) Start() error {
} }
}() }()
w.wg.Add(1)
go w.telemetryBandwidthStats(w.cfg.TelemetryServerURL) go w.telemetryBandwidthStats(w.cfg.TelemetryServerURL)
//TODO: commenting for now so that only fleet nodes are used. //TODO: commenting for now so that only fleet nodes are used.
//Need to uncomment once filter peer scoring etc is implemented. //Need to uncomment once filter peer scoring etc is implemented.
w.wg.Add(1)
go w.runPeerExchangeLoop() go w.runPeerExchangeLoop()
if w.cfg.EnableMissingMessageVerification { if w.cfg.EnableMissingMessageVerification {
@ -1143,9 +1144,11 @@ func (w *Waku) Start() error {
numCPU := runtime.NumCPU() numCPU := runtime.NumCPU()
for i := 0; i < numCPU; i++ { for i := 0; i < numCPU; i++ {
w.wg.Add(1)
go w.processQueueLoop() go w.processQueueLoop()
} }
w.wg.Add(1)
go w.broadcast() go w.broadcast()
go w.sendQueue.Start(w.ctx) go w.sendQueue.Start(w.ctx)
@ -1156,6 +1159,7 @@ func (w *Waku) Start() error {
} }
// we should wait `seedBootnodesForDiscV5` shutdown smoothly before set w.ctx to nil within `w.Stop()` // we should wait `seedBootnodesForDiscV5` shutdown smoothly before set w.ctx to nil within `w.Stop()`
w.wg.Add(1)
go w.seedBootnodesForDiscV5() go w.seedBootnodesForDiscV5()
return nil return nil
@ -1217,7 +1221,9 @@ func (w *Waku) startMessageSender() error {
messageSentCheck := publish.NewMessageSentCheck(w.ctx, w.node.Store(), w.node.Timesource(), msgStoredChan, msgExpiredChan, w.logger) messageSentCheck := publish.NewMessageSentCheck(w.ctx, w.node.Store(), w.node.Timesource(), msgStoredChan, msgExpiredChan, w.logger)
sender.WithMessageSentCheck(messageSentCheck) sender.WithMessageSentCheck(messageSentCheck)
w.wg.Add(1)
go func() { go func() {
defer w.wg.Done()
for { for {
select { select {
case <-w.ctx.Done(): case <-w.ctx.Done():
@ -1408,6 +1414,7 @@ func (w *Waku) postEvent(envelope *common.ReceivedMessage) {
// processQueueLoop delivers the messages to the watchers during the lifetime of the waku node. // processQueueLoop delivers the messages to the watchers during the lifetime of the waku node.
func (w *Waku) processQueueLoop() { func (w *Waku) processQueueLoop() {
defer w.wg.Done()
if w.ctx == nil { if w.ctx == nil {
return return
} }
@ -1628,7 +1635,6 @@ func (w *Waku) ConnectionChanged(state connection.State) {
// It backs off exponentially until maxRetries, at which point it restarts from 0 // It backs off exponentially until maxRetries, at which point it restarts from 0
// It also restarts if there's a connection change signalled from the client // It also restarts if there's a connection change signalled from the client
func (w *Waku) seedBootnodesForDiscV5() { func (w *Waku) seedBootnodesForDiscV5() {
w.wg.Add(1)
defer w.wg.Done() defer w.wg.Done()
if !w.cfg.EnableDiscV5 || w.node.DiscV5() == nil { if !w.cfg.EnableDiscV5 || w.node.DiscV5() == nil {