From c58d0f51e4fb58e6e8c7c6bed07f5eeedd3780cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Wed, 25 Oct 2023 21:25:56 -0400 Subject: [PATCH] fix: race condition in peer connector / manager interaction (#837) --- waku/v2/peermanager/peer_connector.go | 21 +++++++++++++++------ waku/v2/peermanager/peer_manager.go | 2 +- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index 120a112f..a4dbd2c6 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -35,12 +35,17 @@ type PeerConnectionStrategy struct { paused atomic.Bool dialTimeout time.Duration *CommonDiscoveryService - subscriptions []<-chan PeerData + subscriptions []subscription backoff backoff.BackoffFactory logger *zap.Logger } +type subscription struct { + ctx context.Context + ch <-chan PeerData +} + // backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer func getBackOff() backoff.BackoffFactory { rngSrc := rand.NewSource(rand.Int63()) @@ -85,7 +90,7 @@ func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan PeerDa // if not running yet, store the subscription and return if err := c.ErrOnNotRunning(); err != nil { c.mux.Lock() - c.subscriptions = append(c.subscriptions, ch) + c.subscriptions = append(c.subscriptions, subscription{ctx, ch}) c.mux.Unlock() return } @@ -93,16 +98,18 @@ func (c *PeerConnectionStrategy) Subscribe(ctx context.Context, ch <-chan PeerDa c.WaitGroup().Add(1) go func() { defer c.WaitGroup().Done() - c.consumeSubscription(ch) + c.consumeSubscription(subscription{ctx, ch}) }() } -func (c *PeerConnectionStrategy) consumeSubscription(ch <-chan PeerData) { +func (c *PeerConnectionStrategy) consumeSubscription(s subscription) { for { // for returning from the loop when peerConnector is paused. select { case <-c.Context().Done(): return + case <-s.ctx.Done(): + return default: } // @@ -110,7 +117,9 @@ func (c *PeerConnectionStrategy) consumeSubscription(ch <-chan PeerData) { select { case <-c.Context().Done(): return - case p, ok := <-ch: + case <-s.ctx.Done(): + return + case p, ok := <-s.ch: if !ok { return } @@ -166,7 +175,7 @@ func (c *PeerConnectionStrategy) isPaused() bool { func (c *PeerConnectionStrategy) consumeSubscriptions() { for _, subs := range c.subscriptions { c.WaitGroup().Add(1) - go func(s <-chan PeerData) { + go func(s subscription) { defer c.WaitGroup().Done() c.consumeSubscription(s) }(subs) diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index a99b2cfc..7fbd722a 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -339,7 +339,7 @@ func (pm *PeerManager) AddDiscoveredPeer(p PeerData, connectNow bool) { } } if connectNow { - pm.peerConnector.PushToChan(p) + go pm.peerConnector.PushToChan(p) } }