From 24a30d7a7c4cf95c6d1a444277688b63ab8d69e0 Mon Sep 17 00:00:00 2001 From: Andrea Maria Piana Date: Fri, 29 Jan 2021 13:44:25 +0100 Subject: [PATCH] Fix deadlock in peerpoool This is a bit complicated, so: 1) Peerpool was subscribing to `event.Feed`, which is a global event emitter for ethereum. 2) The p2p.Server was publshing on `event.Feed`, this triggered in the same routine a publish on `event.Feed`. 3) Peerpool was listening to `event.Feed`, react on it, and in the same routine, trigger some code on p2p.Server that would publish on `event.Feed` This meant that if the size of the channel was unbufferred, it would deadlock, as peerPool would not be consuming when it would publish (the same go routine publishes and listen effectively, through a lot of indirection and non-buffered channels, p2p.Server->event.Feed) The channel though was a buffered channel with size 10, and this meant that most of the times is fine. The issue is that peerpool is not the only producer to this channel. So it's possible that while is processing an event, the buffer would fill up, and it would hange trying to publish, and nobody is listening to the channel, hanging EVERYTHING. At least that's what I think, needs to be tested, but definitely an issue. I kept the code changes to a minimum, this code is a bit hairy, but it's fairly critical so I don't want to make too many changes. --- peers/peerpool.go | 72 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 50 insertions(+), 22 deletions(-) diff --git a/peers/peerpool.go b/peers/peerpool.go index 01f1cf829..81b69213a 100644 --- a/peers/peerpool.go +++ b/peers/peerpool.go @@ -30,6 +30,7 @@ var ( type PoolEvent string const ( + immediately = 0 * time.Minute // expirationPeriod is an amount of time while peer is considered as a connectable expirationPeriod = 60 * time.Minute // discoveryRestartTimeout defines how often loop will try to start discovery server @@ -249,8 +250,29 @@ func (p *PeerPool) restartDiscovery(server *p2p.Server) error { // @TODO(adam): split it into peers and discovery management loops. This should // simplify the whole logic and allow to remove `timeout` field from `PeerPool`. func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.PeerEvent) { - var retryDiscv5 <-chan time.Time - var stopDiscv5 <-chan time.Time + retryDiscv5 := make(chan struct{}, 1) + stopDiscv5 := make(chan struct{}, 1) + + queueRetry := func(d time.Duration) { + go func() { + time.Sleep(d) + select { + case retryDiscv5 <- struct{}{}: + default: + } + }() + + } + + queueStop := func() { + go func() { + select { + case stopDiscv5 <- struct{}{}: + default: + } + }() + + } for { p.mu.RLock() @@ -267,40 +289,48 @@ func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.Peer p.stopDiscovery(server) case <-retryDiscv5: if err := p.restartDiscovery(server); err != nil { - retryDiscv5 = time.After(discoveryRestartTimeout) log.Error("starting discv5 failed", "error", err, "retry", discoveryRestartTimeout) + queueRetry(discoveryRestartTimeout) } case <-stopDiscv5: p.handleStopTopics(server) case req := <-p.updateTopic: if p.updateTopicLimits(server, req) == nil { if !p.discovery.Running() { - retryDiscv5 = time.After(0) + queueRetry(immediately) } } case event := <-events: - switch event.Type { - case p2p.PeerEventTypeDrop: - log.Debug("confirm peer dropped", "ID", event.Peer) - if p.handleDroppedPeer(server, event.Peer) { - retryDiscv5 = time.After(0) - } - case p2p.PeerEventTypeAdd: // skip other events - log.Debug("confirm peer added", "ID", event.Peer) - p.handleAddedPeer(server, event.Peer) - stopDiscv5 = time.After(p.opts.TopicStopSearchDelay) - default: - continue - } - SendDiscoverySummary(server.PeersInfo()) + // NOTE: handlePeerEventType needs to be called asynchronously + // as it publishes on the <-events channel, leading to a deadlock + // if events channel is full. + go p.handlePeerEventType(server, event, queueRetry, queueStop) } } } -// handleAddedPeer notifies all topics about added peer. -func (p *PeerPool) handleAddedPeer(server *p2p.Server, nodeID enode.ID) { +func (p *PeerPool) handlePeerEventType(server *p2p.Server, event *p2p.PeerEvent, queueRetry func(time.Duration), queueStop func()) { p.mu.Lock() defer p.mu.Unlock() + + switch event.Type { + case p2p.PeerEventTypeDrop: + log.Debug("confirm peer dropped", "ID", event.Peer) + if p.handleDroppedPeer(server, event.Peer) { + queueRetry(immediately) + } + case p2p.PeerEventTypeAdd: // skip other events + log.Debug("confirm peer added", "ID", event.Peer) + p.handleAddedPeer(server, event.Peer) + queueStop() + default: + return + } + SendDiscoverySummary(server.PeersInfo()) +} + +// handleAddedPeer notifies all topics about added peer. +func (p *PeerPool) handleAddedPeer(server *p2p.Server, nodeID enode.ID) { for _, t := range p.topics { t.ConfirmAdded(server, nodeID) if p.opts.AllowStop && t.MaxReached() { @@ -344,8 +374,6 @@ func (p *PeerPool) allTopicsStopped() (all bool) { // handleDroppedPeer notifies every topic about dropped peer and returns true if any peer have connections // below min limit func (p *PeerPool) handleDroppedPeer(server *p2p.Server, nodeID enode.ID) (any bool) { - p.mu.Lock() - defer p.mu.Unlock() for _, t := range p.topics { confirmed := t.ConfirmDropped(server, nodeID) if confirmed {