Fix flaky TestSingleTopicDiscoveryWithFailover and others (#992)

Other changes:
* needed to patch that loop implementation in Discover V5 implementation in go-ethereum,
* fixed TestStatusNodeReconnectStaticPeers,
* fixed TestBackendAccountsConcurrently.
This commit is contained in:
Adam Babik 2018-05-26 09:37:13 +02:00 committed by GitHub
parent bc14e6faee
commit b5f05b1ab5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 155 additions and 58 deletions

View File

@ -0,0 +1,24 @@
diff --git a/p2p/discv5/net.go b/p2p/discv5/net.go
index d0eae28f..25e75d50 100644
--- a/p2p/discv5/net.go
+++ b/p2p/discv5/net.go
@@ -641,7 +641,18 @@ loop:
}()
} else {
refreshDone = make(chan struct{})
- net.refresh(refreshDone)
+
+ done := make(chan struct{})
+ net.refresh(done)
+ <-done
+
+ // Refresh again only if there are no seeds.
+ // Also, sleep for some time to prevent from
+ // executing too often.
+ go func() {
+ time.Sleep(time.Millisecond * 100)
+ close(refreshDone)
+ }()
}
}
}

View File

@ -354,6 +354,9 @@ func (b *StatusBackend) AppStateChange(state string) {
// Logout clears whisper identities. // Logout clears whisper identities.
func (b *StatusBackend) Logout() error { func (b *StatusBackend) Logout() error {
b.mu.Lock()
defer b.mu.Unlock()
// FIXME(oleg-raev): This method doesn't make stop, it rather resets its cells to an initial state // FIXME(oleg-raev): This method doesn't make stop, it rather resets its cells to an initial state
// and should be properly renamed, for example: ResetCells // and should be properly renamed, for example: ResetCells
b.jailManager.Stop() b.jailManager.Stop()
@ -398,6 +401,9 @@ func (b *StatusBackend) ReSelectAccount() error {
// using provided password. Once verification is done, decrypted key is injected into Whisper (as a single identity, // using provided password. Once verification is done, decrypted key is injected into Whisper (as a single identity,
// all previous identities are removed). // all previous identities are removed).
func (b *StatusBackend) SelectAccount(address, password string) error { func (b *StatusBackend) SelectAccount(address, password string) error {
b.mu.Lock()
defer b.mu.Unlock()
// FIXME(oleg-raev): This method doesn't make stop, it rather resets its cells to an initial state // FIXME(oleg-raev): This method doesn't make stop, it rather resets its cells to an initial state
// and should be properly renamed, for example: ResetCells // and should be properly renamed, for example: ResetCells
b.jailManager.Stop() b.jailManager.Stop()

View File

@ -238,7 +238,6 @@ func TestStatusNodeReconnectStaticPeers(t *testing.T) {
require.NoError(t, n.ReconnectStaticPeers()) require.NoError(t, n.ReconnectStaticPeers())
// first check if a peer gets disconnected // first check if a peer gets disconnected
require.NoError(t, <-errCh) require.NoError(t, <-errCh)
require.Equal(t, 0, n.PeerCount())
// it takes at least 30 seconds to bring back previously connected peer // it takes at least 30 seconds to bring back previously connected peer
errCh = waitForPeerAsync(n, peerURL, p2p.PeerEventTypeAdd, time.Second*60) errCh = waitForPeerAsync(n, peerURL, p2p.PeerEventTypeAdd, time.Second*60)
require.NoError(t, <-errCh) require.NoError(t, <-errCh)

View File

@ -113,6 +113,15 @@ func (p *PeerPool) Start(server *p2p.Server) error {
p.quit = make(chan struct{}) p.quit = make(chan struct{})
p.setDiscoveryTimeout() p.setDiscoveryTimeout()
// subscribe to peer events
p.events = make(chan *p2p.PeerEvent, 20)
p.serverSubscription = server.SubscribeEvents(p.events)
p.wg.Add(1)
go func() {
p.handleServerPeers(server, p.events)
p.wg.Done()
}()
// collect topics and start searching for nodes // collect topics and start searching for nodes
p.topics = make([]*TopicPool, 0, len(p.config)) p.topics = make([]*TopicPool, 0, len(p.config))
for topic, limits := range p.config { for topic, limits := range p.config {
@ -126,15 +135,6 @@ func (p *PeerPool) Start(server *p2p.Server) error {
// discovery must be already started when pool is started // discovery must be already started when pool is started
signal.SendDiscoveryStarted() signal.SendDiscoveryStarted()
// subscribe to peer events
p.events = make(chan *p2p.PeerEvent, 20)
p.serverSubscription = server.SubscribeEvents(p.events)
p.wg.Add(1)
go func() {
p.handleServerPeers(server, p.events)
p.wg.Done()
}()
return nil return nil
} }

View File

@ -19,6 +19,9 @@ import (
"github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/signal" "github.com/status-im/status-go/signal"
// to access logs in the test with `-log` flag
_ "github.com/status-im/status-go/t/utils"
) )
type PeerPoolSimulationSuite struct { type PeerPoolSimulationSuite struct {
@ -26,14 +29,22 @@ type PeerPoolSimulationSuite struct {
bootnode *p2p.Server bootnode *p2p.Server
peers []*p2p.Server peers []*p2p.Server
port uint16
} }
func TestPeerPoolSimulationSuite(t *testing.T) { func TestPeerPoolSimulationSuite(t *testing.T) {
suite.Run(t, new(PeerPoolSimulationSuite)) s := new(PeerPoolSimulationSuite)
s.port = 33731
suite.Run(t, s)
}
func (s *PeerPoolSimulationSuite) nextPort() uint16 {
s.port++
return s.port
} }
func (s *PeerPoolSimulationSuite) SetupTest() { func (s *PeerPoolSimulationSuite) SetupTest() {
port := 33731 bootnodePort := s.nextPort()
key, _ := crypto.GenerateKey() key, _ := crypto.GenerateKey()
name := common.MakeName("bootnode", "1.0") name := common.MakeName("bootnode", "1.0")
// 127.0.0.1 is invalidated by discovery v5 // 127.0.0.1 is invalidated by discovery v5
@ -41,15 +52,14 @@ func (s *PeerPoolSimulationSuite) SetupTest() {
Config: p2p.Config{ Config: p2p.Config{
MaxPeers: 10, MaxPeers: 10,
Name: name, Name: name,
ListenAddr: fmt.Sprintf("0.0.0.0:%d", 33731), ListenAddr: fmt.Sprintf("0.0.0.0:%d", bootnodePort),
PrivateKey: key, PrivateKey: key,
DiscoveryV5: true, DiscoveryV5: true,
NoDiscovery: true, NoDiscovery: true,
}, },
} }
port++
s.Require().NoError(s.bootnode.Start()) s.Require().NoError(s.bootnode.Start())
bootnodeV5 := discv5.NewNode(s.bootnode.DiscV5.Self().ID, net.ParseIP("127.0.0.1"), uint16(port), uint16(port)) bootnodeV5 := discv5.NewNode(s.bootnode.DiscV5.Self().ID, net.ParseIP("127.0.0.1"), bootnodePort, bootnodePort)
// 1 peer to initiate connection, 1 peer as a first candidate, 1 peer - for failover // 1 peer to initiate connection, 1 peer as a first candidate, 1 peer - for failover
s.peers = make([]*p2p.Server, 3) s.peers = make([]*p2p.Server, 3)
@ -60,7 +70,7 @@ func (s *PeerPoolSimulationSuite) SetupTest() {
Config: p2p.Config{ Config: p2p.Config{
MaxPeers: 10, MaxPeers: 10,
Name: common.MakeName("peer-"+strconv.Itoa(i), "1.0"), Name: common.MakeName("peer-"+strconv.Itoa(i), "1.0"),
ListenAddr: fmt.Sprintf("0.0.0.0:%d", port), ListenAddr: fmt.Sprintf("0.0.0.0:%d", s.nextPort()),
PrivateKey: key, PrivateKey: key,
DiscoveryV5: true, DiscoveryV5: true,
NoDiscovery: true, NoDiscovery: true,
@ -68,7 +78,6 @@ func (s *PeerPoolSimulationSuite) SetupTest() {
Protocols: whisper.Protocols(), Protocols: whisper.Protocols(),
}, },
} }
port++
s.NoError(peer.Start()) s.NoError(peer.Start())
s.peers[i] = peer s.peers[i] = peer
} }
@ -87,7 +96,8 @@ func (s *PeerPoolSimulationSuite) getPeerFromEvent(events <-chan *p2p.PeerEvent,
if ev.Type == etype { if ev.Type == etype {
return ev.Peer return ev.Peer
} }
case <-time.After(5 * time.Second): s.Failf("invalid event", "expected %s but got %s for peer %s", etype, ev.Type, ev.Peer)
case <-time.After(10 * time.Second):
s.Fail("timed out waiting for a peer") s.Fail("timed out waiting for a peer")
return return
} }
@ -98,25 +108,51 @@ func (s *PeerPoolSimulationSuite) getPoolEvent(events <-chan string) string {
select { select {
case ev := <-events: case ev := <-events:
return ev return ev
case <-time.After(time.Second): case <-time.After(10 * time.Second):
s.FailNow("timed out waiting for a peer") s.FailNow("timed out waiting a pool event")
return "" return ""
} }
} }
func (s *PeerPoolSimulationSuite) TestPeerPoolCache() {
var err error
topic := discv5.Topic("cap=test")
config := map[discv5.Topic]params.Limits{
topic: params.NewLimits(1, 1),
}
peerPoolOpts := &Options{100 * time.Millisecond, 100 * time.Millisecond, 0, true}
cache, err := newInMemoryCache()
s.Require().NoError(err)
peerPool := NewPeerPool(config, cache, peerPoolOpts)
// start peer pool
s.Require().NoError(peerPool.Start(s.peers[1]))
defer peerPool.Stop()
// check if cache is passed to topic pools
for _, topicPool := range peerPool.topics {
s.Equal(cache, topicPool.cache)
}
}
func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() { func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() {
poolEvents := make(chan string, 1) var err error
summaries := make(chan []*p2p.PeerInfo, 1)
// Buffered channels must be used because we expect the events
// to be in the same order. Use a buffer length greater than
// the expected number of events to avoid deadlock.
poolEvents := make(chan string, 10)
summaries := make(chan []*p2p.PeerInfo, 10)
signal.SetDefaultNodeNotificationHandler(func(jsonEvent string) { signal.SetDefaultNodeNotificationHandler(func(jsonEvent string) {
var envelope struct { var envelope struct {
Type string Type string
Event json.RawMessage Event json.RawMessage
} }
s.NoError(json.Unmarshal([]byte(jsonEvent), &envelope)) s.NoError(json.Unmarshal([]byte(jsonEvent), &envelope))
switch envelope.Type {
case signal.EventDiscoveryStarted: switch typ := envelope.Type; typ {
poolEvents <- envelope.Type case signal.EventDiscoveryStarted, signal.EventDiscoveryStopped:
case signal.EventDiscoveryStopped:
poolEvents <- envelope.Type poolEvents <- envelope.Type
case signal.EventDiscoverySummary: case signal.EventDiscoverySummary:
poolEvents <- envelope.Type poolEvents <- envelope.Type
@ -124,9 +160,9 @@ func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() {
s.NoError(json.Unmarshal(envelope.Event, &summary)) s.NoError(json.Unmarshal(envelope.Event, &summary))
summaries <- summary summaries <- summary
} }
}) })
defer signal.ResetDefaultNodeNotificationHandler() defer signal.ResetDefaultNodeNotificationHandler()
topic := discv5.Topic("cap=test") topic := discv5.Topic("cap=test")
// simulation should only rely on fast sync // simulation should only rely on fast sync
config := map[discv5.Topic]params.Limits{ config := map[discv5.Topic]params.Limits{
@ -136,50 +172,48 @@ func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() {
cache, err := newInMemoryCache() cache, err := newInMemoryCache()
s.Require().NoError(err) s.Require().NoError(err)
peerPool := NewPeerPool(config, cache, peerPoolOpts) peerPool := NewPeerPool(config, cache, peerPoolOpts)
// create and start topic registry
register := NewRegister(topic) register := NewRegister(topic)
s.Require().NoError(register.Start(s.peers[0])) err = register.Start(s.peers[0])
// need to wait for topic to get registered, discv5 can query same node s.Require().NoError(err)
// for a topic only once a minute
// subscribe for peer events before starting the peer pool
events := make(chan *p2p.PeerEvent, 20) events := make(chan *p2p.PeerEvent, 20)
subscription := s.peers[1].SubscribeEvents(events) subscription := s.peers[1].SubscribeEvents(events)
defer subscription.Unsubscribe() defer subscription.Unsubscribe()
s.NoError(peerPool.Start(s.peers[1]))
// start the peer pool
s.Require().NoError(peerPool.Start(s.peers[1]))
defer peerPool.Stop() defer peerPool.Stop()
s.Equal(signal.EventDiscoveryStarted, s.getPoolEvent(poolEvents)) s.Equal(signal.EventDiscoveryStarted, s.getPoolEvent(poolEvents))
connected := s.getPeerFromEvent(events, p2p.PeerEventTypeAdd) connectedPeer := s.getPeerFromEvent(events, p2p.PeerEventTypeAdd)
s.Equal(s.peers[0].Self().ID, connected) s.Equal(s.peers[0].Self().ID, connectedPeer)
// as the upper limit was reached, Discovery should be stoped
s.Equal(signal.EventDiscoveryStopped, s.getPoolEvent(poolEvents)) s.Equal(signal.EventDiscoveryStopped, s.getPoolEvent(poolEvents))
s.Require().Nil(s.peers[1].DiscV5) s.Equal(signal.EventDiscoverySummary, s.getPoolEvent(poolEvents))
s.Len(<-summaries, 1)
s.Require().Equal(signal.EventDiscoverySummary, s.getPoolEvent(poolEvents))
summary := <-summaries
s.Len(summary, 1)
// stop topic register and the connected peer
register.Stop() register.Stop()
s.peers[0].Stop() s.peers[0].Stop()
disconnected := s.getPeerFromEvent(events, p2p.PeerEventTypeDrop) disconnectedPeer := s.getPeerFromEvent(events, p2p.PeerEventTypeDrop)
s.Equal(connected, disconnected) s.Equal(connectedPeer, disconnectedPeer)
s.Equal(signal.EventDiscoverySummary, s.getPoolEvent(poolEvents))
s.Require().Equal(signal.EventDiscoverySummary, s.getPoolEvent(poolEvents)) s.Len(<-summaries, 0)
summary = <-summaries // Discovery should be restarted because the number of peers dropped
s.Len(summary, 0) // below the lower limit.
s.Equal(signal.EventDiscoveryStarted, s.getPoolEvent(poolEvents)) s.Equal(signal.EventDiscoveryStarted, s.getPoolEvent(poolEvents))
s.Require().NotNil(s.peers[1].DiscV5)
s.Require().NoError(register.Start(s.peers[2])) // register the second peer
err = register.Start(s.peers[2])
s.Require().NoError(err)
defer register.Stop() defer register.Stop()
s.Equal(s.peers[2].Self().ID, s.getPeerFromEvent(events, p2p.PeerEventTypeAdd)) s.Equal(s.peers[2].Self().ID, s.getPeerFromEvent(events, p2p.PeerEventTypeAdd))
// Discovery can be stopped again.
s.Equal(signal.EventDiscoveryStopped, s.getPoolEvent(poolEvents)) s.Equal(signal.EventDiscoveryStopped, s.getPoolEvent(poolEvents))
s.Require().Equal(signal.EventDiscoverySummary, s.getPoolEvent(poolEvents)) s.Require().Equal(signal.EventDiscoverySummary, s.getPoolEvent(poolEvents))
s.Len(<-summaries, 1)
summary = <-summaries
s.Len(summary, 1)
// verify that we are actually using cache
cachedPeers := peerPool.cache.GetPeersRange(topic, 1)
s.Len(cachedPeers, 1)
s.Equal(s.peers[2].Self().ID, discover.NodeID(cachedPeers[0].ID))
} }
// TestPeerPoolMaxPeersOverflow verifies that following scenario will not occur: // TestPeerPoolMaxPeersOverflow verifies that following scenario will not occur:
@ -189,7 +223,7 @@ func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() {
// - process peer B // - process peer B
// - panic because discv5 is nil!!! // - panic because discv5 is nil!!!
func TestPeerPoolMaxPeersOverflow(t *testing.T) { func TestPeerPoolMaxPeersOverflow(t *testing.T) {
signals := make(chan string, 1) signals := make(chan string, 10)
signal.SetDefaultNodeNotificationHandler(func(jsonEvent string) { signal.SetDefaultNodeNotificationHandler(func(jsonEvent string) {
var envelope struct { var envelope struct {
Type string Type string

View File

@ -63,6 +63,29 @@ func (s *TopicPoolSuite) AssertConsumed(channel <-chan time.Duration, expected t
} }
} }
func (s *TopicPoolSuite) TestUsingCache() {
s.topicPool.limits = params.NewLimits(1, 1)
peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311)
s.topicPool.processFoundNode(s.peer, peer1)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID))
s.Equal([]*discv5.Node{peer1}, s.topicPool.cache.GetPeersRange(s.topicPool.topic, 10))
// Add a new peer which exceeds the upper limit.
// It should still be added to the cache and
// not removed when dropped.
peer2 := discv5.NewNode(discv5.NodeID{2}, s.peer.Self().IP, 32311, 32311)
s.topicPool.processFoundNode(s.peer, peer2)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer2.ID))
s.Equal([]*discv5.Node{peer1, peer2}, s.topicPool.cache.GetPeersRange(s.topicPool.topic, 10))
s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer2.ID))
s.Equal([]*discv5.Node{peer1, peer2}, s.topicPool.cache.GetPeersRange(s.topicPool.topic, 10))
// A peer that drops by itself, should be removed from the cache.
s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID))
s.Equal([]*discv5.Node{peer2}, s.topicPool.cache.GetPeersRange(s.topicPool.topic, 10))
}
func (s *TopicPoolSuite) TestSyncSwitches() { func (s *TopicPoolSuite) TestSyncSwitches() {
testPeer := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311) testPeer := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311)
s.topicPool.processFoundNode(s.peer, testPeer) s.topicPool.processFoundNode(s.peer, testPeer)

View File

@ -641,7 +641,18 @@ loop:
}() }()
} else { } else {
refreshDone = make(chan struct{}) refreshDone = make(chan struct{})
net.refresh(refreshDone)
done := make(chan struct{})
net.refresh(done)
<-done
// Refresh again only if there are no seeds.
// Also, sleep for some time to prevent from
// executing too often.
go func() {
time.Sleep(time.Millisecond * 100)
close(refreshDone)
}()
} }
} }
} }