From 09eb8ed19b40b409de1081d2011b2551905e40ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Mon, 28 Aug 2023 01:45:26 -0400 Subject: [PATCH] fix(discv5): threadsafe peerCh (#687) --- waku/v2/discv5/discover.go | 81 +++++++++++++++++++++++++++++++------- 1 file changed, 66 insertions(+), 15 deletions(-) diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index 6ea17f72..0dee3dee 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -34,16 +34,16 @@ type PeerConnector interface { } type DiscoveryV5 struct { - params *discV5Parameters - host host.Host - config discover.Config - udpAddr *net.UDPAddr - listener *discover.UDPv5 - localnode *enode.LocalNode - metrics Metrics + params *discV5Parameters + host host.Host + config discover.Config + udpAddr *net.UDPAddr + listener *discover.UDPv5 + localnode *enode.LocalNode + metrics Metrics + peerChannel *peerChannel peerConnector PeerConnector - peerCh chan peermanager.PeerData NAT nat.Interface log *zap.Logger @@ -134,6 +134,7 @@ func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConn peerConnector: peerConnector, NAT: NAT, wg: &sync.WaitGroup{}, + peerChannel: &peerChannel{}, localnode: localnode, metrics: newMetrics(reg), config: discover.Config{ @@ -194,6 +195,50 @@ func (d *DiscoveryV5) SetHost(h host.Host) { d.host = h } +type peerChannel struct { + mutex sync.Mutex + channel chan peermanager.PeerData + started bool + ctx context.Context +} + +func (p *peerChannel) Start(ctx context.Context) { + p.mutex.Lock() + defer p.mutex.Unlock() + p.started = true + p.ctx = ctx + p.channel = make(chan peermanager.PeerData) +} + +func (p *peerChannel) Stop() { + p.mutex.Lock() + defer p.mutex.Unlock() + if !p.started { + return + } + p.started = false + close(p.channel) +} + +func (p *peerChannel) Subscribe() chan peermanager.PeerData { + return p.channel +} + +func (p *peerChannel) Publish(peer peermanager.PeerData) bool { + p.mutex.Lock() + defer p.mutex.Unlock() + if !p.started { + return false + } + select { + case p.channel <- peer: + case <-p.ctx.Done(): + return false + + } + return true +} + // only works if the discovery v5 hasn't been started yet. func (d *DiscoveryV5) Start(ctx context.Context) error { // compare and swap sets the discovery v5 to `started` state @@ -205,8 +250,8 @@ func (d *DiscoveryV5) Start(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) d.cancel = cancel - d.peerCh = make(chan peermanager.PeerData) - d.peerConnector.Subscribe(ctx, d.peerCh) + d.peerChannel.Start(ctx) + d.peerConnector.Subscribe(ctx, d.peerChannel.Subscribe()) err := d.listen(ctx) if err != nil { @@ -249,7 +294,13 @@ func (d *DiscoveryV5) Stop() { d.wg.Wait() - close(d.peerCh) + defer func() { + if r := recover(); r != nil { + d.log.Info("recovering from panic and quitting") + } + }() + + d.peerChannel.Stop() } /* @@ -433,10 +484,10 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error { ENR: n, } - select { - case d.peerCh <- peer: - case <-ctx.Done(): - return nil + if d.peerChannel.Publish(peer) { + d.log.Debug("published peer into peer channel", logging.HostID("peerID", peer.AddrInfo.ID)) + } else { + d.log.Debug("could not publish peer into peer channel", logging.HostID("peerID", peer.AddrInfo.ID)) } return nil