fix(discv5): threadsafe peerCh (#687)

This commit is contained in:
richΛrd 2023-08-28 01:45:26 -04:00 committed by GitHub
parent 041dc4070a
commit 09eb8ed19b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -34,16 +34,16 @@ type PeerConnector interface {
}
type DiscoveryV5 struct {
params *discV5Parameters
host host.Host
config discover.Config
udpAddr *net.UDPAddr
listener *discover.UDPv5
localnode *enode.LocalNode
metrics Metrics
params *discV5Parameters
host host.Host
config discover.Config
udpAddr *net.UDPAddr
listener *discover.UDPv5
localnode *enode.LocalNode
metrics Metrics
peerChannel *peerChannel
peerConnector PeerConnector
peerCh chan peermanager.PeerData
NAT nat.Interface
log *zap.Logger
@ -134,6 +134,7 @@ func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConn
peerConnector: peerConnector,
NAT: NAT,
wg: &sync.WaitGroup{},
peerChannel: &peerChannel{},
localnode: localnode,
metrics: newMetrics(reg),
config: discover.Config{
@ -194,6 +195,50 @@ func (d *DiscoveryV5) SetHost(h host.Host) {
d.host = h
}
type peerChannel struct {
mutex sync.Mutex
channel chan peermanager.PeerData
started bool
ctx context.Context
}
func (p *peerChannel) Start(ctx context.Context) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.started = true
p.ctx = ctx
p.channel = make(chan peermanager.PeerData)
}
func (p *peerChannel) Stop() {
p.mutex.Lock()
defer p.mutex.Unlock()
if !p.started {
return
}
p.started = false
close(p.channel)
}
func (p *peerChannel) Subscribe() chan peermanager.PeerData {
return p.channel
}
func (p *peerChannel) Publish(peer peermanager.PeerData) bool {
p.mutex.Lock()
defer p.mutex.Unlock()
if !p.started {
return false
}
select {
case p.channel <- peer:
case <-p.ctx.Done():
return false
}
return true
}
// only works if the discovery v5 hasn't been started yet.
func (d *DiscoveryV5) Start(ctx context.Context) error {
// compare and swap sets the discovery v5 to `started` state
@ -205,8 +250,8 @@ func (d *DiscoveryV5) Start(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
d.cancel = cancel
d.peerCh = make(chan peermanager.PeerData)
d.peerConnector.Subscribe(ctx, d.peerCh)
d.peerChannel.Start(ctx)
d.peerConnector.Subscribe(ctx, d.peerChannel.Subscribe())
err := d.listen(ctx)
if err != nil {
@ -249,7 +294,13 @@ func (d *DiscoveryV5) Stop() {
d.wg.Wait()
close(d.peerCh)
defer func() {
if r := recover(); r != nil {
d.log.Info("recovering from panic and quitting")
}
}()
d.peerChannel.Stop()
}
/*
@ -433,10 +484,10 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error {
ENR: n,
}
select {
case d.peerCh <- peer:
case <-ctx.Done():
return nil
if d.peerChannel.Publish(peer) {
d.log.Debug("published peer into peer channel", logging.HostID("peerID", peer.AddrInfo.ID))
} else {
d.log.Debug("could not publish peer into peer channel", logging.HostID("peerID", peer.AddrInfo.ID))
}
return nil