From 5014e4e3b0bde740d713e78433e11ad0e75b4726 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Fri, 13 Apr 2018 11:34:30 +0300 Subject: [PATCH] Fix two race conditions in peerpool tests (#827) Multiple concurrent topic pool stops could result in the close of the closed quit channel. Fixed by using atomic compare and swap and closing only if swap happened. Added events feed to peer pool for tests purpose. Otherwise it is impossible to run simulation with -race flag enabled. In the essence it happens because we are managing global object , which is server.Discv5, but unfortunately there is no way around it. --- geth/peers/peerpool.go | 16 +++++++++++++++- geth/peers/peerpool_test.go | 17 +++++++++++++++-- geth/peers/topicpool.go | 3 +-- 3 files changed, 31 insertions(+), 5 deletions(-) diff --git a/geth/peers/peerpool.go b/geth/peers/peerpool.go index c8ce998b9..6e3e6e043 100644 --- a/geth/peers/peerpool.go +++ b/geth/peers/peerpool.go @@ -20,6 +20,9 @@ var ( ErrDiscv5NotRunning = errors.New("Discovery v5 is not running") ) +// PoolEvent is a type used to for peer pool events. +type PoolEvent string + const ( // expirationPeriod is an amount of time while peer is considered as a connectable expirationPeriod = 60 * time.Minute @@ -29,6 +32,11 @@ const ( DefaultFastSync = 3 * time.Second // DefaultSlowSync is a recommended value for slow (background) peers search. DefaultSlowSync = 30 * time.Minute + + // Discv5Closed is sent when discv5 is closed + Discv5Closed PoolEvent = "discv5.closed" + // Discv5Started is sent when discv5 is started + Discv5Started PoolEvent = "discv5.started" ) // NewPeerPool creates instance of PeerPool @@ -68,6 +76,8 @@ type PeerPool struct { quit chan struct{} wg sync.WaitGroup + + feed event.Feed } // Start creates topic pool for each topic in config and subscribes to server events. @@ -107,6 +117,7 @@ func (p *PeerPool) restartDiscovery(server *p2p.Server) error { } log.Debug("restarted discovery from peer pool") server.DiscV5 = ntab + p.feed.Send(Discv5Started) } for _, t := range p.topics { if !t.BelowMin() || t.SearchRunning() { @@ -146,6 +157,7 @@ func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.Peer log.Debug("closing discv5 connection", "server", server.Self()) server.DiscV5.Close() server.DiscV5 = nil + p.feed.Send(Discv5Closed) } } } @@ -204,8 +216,10 @@ func (p *PeerPool) Stop() { close(p.quit) } p.serverSubscription.Unsubscribe() + // wait before closing topic pools, otherwise there is chance that + // they will be concurrently started while we are exiting. + p.wg.Wait() for _, t := range p.topics { t.StopSearch() } - p.wg.Wait() } diff --git a/geth/peers/peerpool_test.go b/geth/peers/peerpool_test.go index 37ce65792..28c21e757 100644 --- a/geth/peers/peerpool_test.go +++ b/geth/peers/peerpool_test.go @@ -81,6 +81,16 @@ func (s *PeerPoolSimulationSuite) getPeerFromEvent(events <-chan *p2p.PeerEvent, return } +func (s *PeerPoolSimulationSuite) getPoolEvent(events <-chan PoolEvent) PoolEvent { + select { + case ev := <-events: + return ev + case <-time.After(200 * time.Millisecond): + s.Fail("timed out waiting for a peer") + return "" + } +} + func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() { topic := discv5.Topic("cap=test") // simulation should only rely on fast sync @@ -98,14 +108,17 @@ func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() { defer subscription.Unsubscribe() s.NoError(peerPool.Start(s.peers[1])) defer peerPool.Stop() + poolEvents := make(chan PoolEvent) + poolSub := peerPool.feed.Subscribe(poolEvents) + defer poolSub.Unsubscribe() connected := s.getPeerFromEvent(events, p2p.PeerEventTypeAdd) s.Equal(s.peers[0].Self().ID, connected) - time.Sleep(100 * time.Millisecond) + s.Equal(Discv5Closed, s.getPoolEvent(poolEvents)) s.Require().Nil(s.peers[1].DiscV5) s.peers[0].Stop() disconnected := s.getPeerFromEvent(events, p2p.PeerEventTypeDrop) s.Equal(connected, disconnected) - time.Sleep(100 * time.Millisecond) + s.Equal(Discv5Started, s.getPoolEvent(poolEvents)) s.Require().NotNil(s.peers[1].DiscV5) register = NewRegister(topic) s.Require().NoError(register.Start(s.peers[2])) diff --git a/geth/peers/topicpool.go b/geth/peers/topicpool.go index ee22d430a..2c4ede8a4 100644 --- a/geth/peers/topicpool.go +++ b/geth/peers/topicpool.go @@ -245,7 +245,7 @@ func (t *TopicPool) removePeer(server *p2p.Server, info *peerInfo) { // StopSearch stops the closes stop func (t *TopicPool) StopSearch() { - if !t.SearchRunning() { + if !atomic.CompareAndSwapInt32(&t.running, 1, 0) { return } if t.quit == nil { @@ -259,7 +259,6 @@ func (t *TopicPool) StopSearch() { close(t.quit) } t.consumerWG.Wait() - atomic.StoreInt32(&t.running, 0) close(t.period) t.discWG.Wait() }