diff --git a/geth/peers/peerpool.go b/geth/peers/peerpool.go index c8ce998b9..6e3e6e043 100644 --- a/geth/peers/peerpool.go +++ b/geth/peers/peerpool.go @@ -20,6 +20,9 @@ var ( ErrDiscv5NotRunning = errors.New("Discovery v5 is not running") ) +// PoolEvent is a type used to for peer pool events. +type PoolEvent string + const ( // expirationPeriod is an amount of time while peer is considered as a connectable expirationPeriod = 60 * time.Minute @@ -29,6 +32,11 @@ const ( DefaultFastSync = 3 * time.Second // DefaultSlowSync is a recommended value for slow (background) peers search. DefaultSlowSync = 30 * time.Minute + + // Discv5Closed is sent when discv5 is closed + Discv5Closed PoolEvent = "discv5.closed" + // Discv5Started is sent when discv5 is started + Discv5Started PoolEvent = "discv5.started" ) // NewPeerPool creates instance of PeerPool @@ -68,6 +76,8 @@ type PeerPool struct { quit chan struct{} wg sync.WaitGroup + + feed event.Feed } // Start creates topic pool for each topic in config and subscribes to server events. @@ -107,6 +117,7 @@ func (p *PeerPool) restartDiscovery(server *p2p.Server) error { } log.Debug("restarted discovery from peer pool") server.DiscV5 = ntab + p.feed.Send(Discv5Started) } for _, t := range p.topics { if !t.BelowMin() || t.SearchRunning() { @@ -146,6 +157,7 @@ func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.Peer log.Debug("closing discv5 connection", "server", server.Self()) server.DiscV5.Close() server.DiscV5 = nil + p.feed.Send(Discv5Closed) } } } @@ -204,8 +216,10 @@ func (p *PeerPool) Stop() { close(p.quit) } p.serverSubscription.Unsubscribe() + // wait before closing topic pools, otherwise there is chance that + // they will be concurrently started while we are exiting. + p.wg.Wait() for _, t := range p.topics { t.StopSearch() } - p.wg.Wait() } diff --git a/geth/peers/peerpool_test.go b/geth/peers/peerpool_test.go index 37ce65792..28c21e757 100644 --- a/geth/peers/peerpool_test.go +++ b/geth/peers/peerpool_test.go @@ -81,6 +81,16 @@ func (s *PeerPoolSimulationSuite) getPeerFromEvent(events <-chan *p2p.PeerEvent, return } +func (s *PeerPoolSimulationSuite) getPoolEvent(events <-chan PoolEvent) PoolEvent { + select { + case ev := <-events: + return ev + case <-time.After(200 * time.Millisecond): + s.Fail("timed out waiting for a peer") + return "" + } +} + func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() { topic := discv5.Topic("cap=test") // simulation should only rely on fast sync @@ -98,14 +108,17 @@ func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() { defer subscription.Unsubscribe() s.NoError(peerPool.Start(s.peers[1])) defer peerPool.Stop() + poolEvents := make(chan PoolEvent) + poolSub := peerPool.feed.Subscribe(poolEvents) + defer poolSub.Unsubscribe() connected := s.getPeerFromEvent(events, p2p.PeerEventTypeAdd) s.Equal(s.peers[0].Self().ID, connected) - time.Sleep(100 * time.Millisecond) + s.Equal(Discv5Closed, s.getPoolEvent(poolEvents)) s.Require().Nil(s.peers[1].DiscV5) s.peers[0].Stop() disconnected := s.getPeerFromEvent(events, p2p.PeerEventTypeDrop) s.Equal(connected, disconnected) - time.Sleep(100 * time.Millisecond) + s.Equal(Discv5Started, s.getPoolEvent(poolEvents)) s.Require().NotNil(s.peers[1].DiscV5) register = NewRegister(topic) s.Require().NoError(register.Start(s.peers[2])) diff --git a/geth/peers/topicpool.go b/geth/peers/topicpool.go index ee22d430a..2c4ede8a4 100644 --- a/geth/peers/topicpool.go +++ b/geth/peers/topicpool.go @@ -245,7 +245,7 @@ func (t *TopicPool) removePeer(server *p2p.Server, info *peerInfo) { // StopSearch stops the closes stop func (t *TopicPool) StopSearch() { - if !t.SearchRunning() { + if !atomic.CompareAndSwapInt32(&t.running, 1, 0) { return } if t.quit == nil { @@ -259,7 +259,6 @@ func (t *TopicPool) StopSearch() { close(t.quit) } t.consumerWG.Wait() - atomic.StoreInt32(&t.running, 0) close(t.period) t.discWG.Wait() }