Fix two race conditions in peerpool tests (#827)

Multiple concurrent topic pool stops could result in the close of the closed
quit channel. Fixed by using atomic compare and swap and closing only if swap
happened.

Added events feed to peer pool for tests purpose. Otherwise it is impossible to run
simulation with -race flag enabled. In the essence it happens because we are managing global
object , which is server.Discv5, but unfortunately there is no way around it.
This commit is contained in:
Dmitry Shulyak 2018-04-13 11:34:30 +03:00 committed by GitHub
parent 0d652c3851
commit 5014e4e3b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 31 additions and 5 deletions

View File

@ -20,6 +20,9 @@ var (
ErrDiscv5NotRunning = errors.New("Discovery v5 is not running")
)
// PoolEvent is a type used to for peer pool events.
type PoolEvent string
const (
// expirationPeriod is an amount of time while peer is considered as a connectable
expirationPeriod = 60 * time.Minute
@ -29,6 +32,11 @@ const (
DefaultFastSync = 3 * time.Second
// DefaultSlowSync is a recommended value for slow (background) peers search.
DefaultSlowSync = 30 * time.Minute
// Discv5Closed is sent when discv5 is closed
Discv5Closed PoolEvent = "discv5.closed"
// Discv5Started is sent when discv5 is started
Discv5Started PoolEvent = "discv5.started"
)
// NewPeerPool creates instance of PeerPool
@ -68,6 +76,8 @@ type PeerPool struct {
quit chan struct{}
wg sync.WaitGroup
feed event.Feed
}
// Start creates topic pool for each topic in config and subscribes to server events.
@ -107,6 +117,7 @@ func (p *PeerPool) restartDiscovery(server *p2p.Server) error {
}
log.Debug("restarted discovery from peer pool")
server.DiscV5 = ntab
p.feed.Send(Discv5Started)
}
for _, t := range p.topics {
if !t.BelowMin() || t.SearchRunning() {
@ -146,6 +157,7 @@ func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.Peer
log.Debug("closing discv5 connection", "server", server.Self())
server.DiscV5.Close()
server.DiscV5 = nil
p.feed.Send(Discv5Closed)
}
}
}
@ -204,8 +216,10 @@ func (p *PeerPool) Stop() {
close(p.quit)
}
p.serverSubscription.Unsubscribe()
// wait before closing topic pools, otherwise there is chance that
// they will be concurrently started while we are exiting.
p.wg.Wait()
for _, t := range p.topics {
t.StopSearch()
}
p.wg.Wait()
}

View File

@ -81,6 +81,16 @@ func (s *PeerPoolSimulationSuite) getPeerFromEvent(events <-chan *p2p.PeerEvent,
return
}
func (s *PeerPoolSimulationSuite) getPoolEvent(events <-chan PoolEvent) PoolEvent {
select {
case ev := <-events:
return ev
case <-time.After(200 * time.Millisecond):
s.Fail("timed out waiting for a peer")
return ""
}
}
func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() {
topic := discv5.Topic("cap=test")
// simulation should only rely on fast sync
@ -98,14 +108,17 @@ func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() {
defer subscription.Unsubscribe()
s.NoError(peerPool.Start(s.peers[1]))
defer peerPool.Stop()
poolEvents := make(chan PoolEvent)
poolSub := peerPool.feed.Subscribe(poolEvents)
defer poolSub.Unsubscribe()
connected := s.getPeerFromEvent(events, p2p.PeerEventTypeAdd)
s.Equal(s.peers[0].Self().ID, connected)
time.Sleep(100 * time.Millisecond)
s.Equal(Discv5Closed, s.getPoolEvent(poolEvents))
s.Require().Nil(s.peers[1].DiscV5)
s.peers[0].Stop()
disconnected := s.getPeerFromEvent(events, p2p.PeerEventTypeDrop)
s.Equal(connected, disconnected)
time.Sleep(100 * time.Millisecond)
s.Equal(Discv5Started, s.getPoolEvent(poolEvents))
s.Require().NotNil(s.peers[1].DiscV5)
register = NewRegister(topic)
s.Require().NoError(register.Start(s.peers[2]))

View File

@ -245,7 +245,7 @@ func (t *TopicPool) removePeer(server *p2p.Server, info *peerInfo) {
// StopSearch stops the closes stop
func (t *TopicPool) StopSearch() {
if !t.SearchRunning() {
if !atomic.CompareAndSwapInt32(&t.running, 1, 0) {
return
}
if t.quit == nil {
@ -259,7 +259,6 @@ func (t *TopicPool) StopSearch() {
close(t.quit)
}
t.consumerWG.Wait()
atomic.StoreInt32(&t.running, 0)
close(t.period)
t.discWG.Wait()
}