Prevent Discovery from timeout when registering topics (#944)

When the node registers topics, Discovery V5 should never be stopped as otherwise, no peer will be able to find the node.
This commit is contained in:
Adam Babik 2018-05-15 11:16:25 +02:00 committed by GitHub
parent 52cdcf8f0f
commit 9268f55d7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 91 additions and 36 deletions

View File

@ -165,13 +165,13 @@ func (n *StatusNode) setupRPCClient() (err error) {
func (n *StatusNode) startPeerPool() error { func (n *StatusNode) startPeerPool() error {
n.register = peers.NewRegister(n.config.RegisterTopics...) n.register = peers.NewRegister(n.config.RegisterTopics...)
options := peers.NewDefaultOptions()
// TODO(dshulyak) consider adding a flag to define this behaviour // TODO(dshulyak) consider adding a flag to define this behaviour
stopOnMax := len(n.config.RegisterTopics) == 0 options.AllowStop = len(n.config.RegisterTopics) == 0
n.peerPool = peers.NewPeerPool(n.config.RequireTopics, n.peerPool = peers.NewPeerPool(
peers.DefaultFastSync, n.config.RequireTopics,
peers.DefaultSlowSync,
peers.NewCache(n.db), peers.NewCache(n.db),
stopOnMax, options,
) )
if err := n.register.Start(n.gethNode.Server()); err != nil { if err := n.register.Start(n.gethNode.Server()); err != nil {
return err return err

View File

@ -39,15 +39,23 @@ const (
DefaultTopicFastModeTimeout = 30 * time.Second DefaultTopicFastModeTimeout = 30 * time.Second
) )
// NewPeerPool creates instance of PeerPool // Options is a struct with PeerPool configuration.
func NewPeerPool(config map[discv5.Topic]params.Limits, fastSync, slowSync time.Duration, cache *Cache, stopOnMax bool) *PeerPool { type Options struct {
return &PeerPool{ FastSync time.Duration
config: config, SlowSync time.Duration
fastSync: fastSync, // After this time, Discovery is stopped even if max peers is not reached.
slowSync: slowSync, DiscServerTimeout time.Duration
discServerTimeout: DefaultDiscV5Timeout, // AllowStop allows stopping Discovery when reaching max peers or after timeout.
cache: cache, AllowStop bool
stopOnMax: stopOnMax, }
// NewDefaultOptions return a struct with default Options.
func NewDefaultOptions() *Options {
return &Options{
FastSync: DefaultFastSync,
SlowSync: DefaultSlowSync,
DiscServerTimeout: DefaultDiscV5Timeout,
AllowStop: false,
} }
} }
@ -62,13 +70,11 @@ type peerInfo struct {
// PeerPool manages discovered peers and connects them to p2p server // PeerPool manages discovered peers and connects them to p2p server
type PeerPool struct { type PeerPool struct {
opts *Options
// config can be set only once per pool life cycle // config can be set only once per pool life cycle
config map[discv5.Topic]params.Limits config map[discv5.Topic]params.Limits
fastSync time.Duration
slowSync time.Duration
discServerTimeout time.Duration
cache *Cache cache *Cache
stopOnMax bool
mu sync.RWMutex mu sync.RWMutex
topics []*TopicPool topics []*TopicPool
@ -79,26 +85,48 @@ type PeerPool struct {
timeout <-chan time.Time timeout <-chan time.Time
} }
// NewPeerPool creates instance of PeerPool
func NewPeerPool(config map[discv5.Topic]params.Limits, cache *Cache, options *Options) *PeerPool {
return &PeerPool{
opts: options,
config: config,
cache: cache,
}
}
func (p *PeerPool) setDiscoveryTimeout() {
if p.opts.AllowStop && p.opts.DiscServerTimeout > 0 {
p.timeout = time.After(p.opts.DiscServerTimeout)
}
}
// Start creates topic pool for each topic in config and subscribes to server events. // Start creates topic pool for each topic in config and subscribes to server events.
func (p *PeerPool) Start(server *p2p.Server) error { func (p *PeerPool) Start(server *p2p.Server) error {
if server.DiscV5 == nil { if server.DiscV5 == nil {
return ErrDiscv5NotRunning return ErrDiscv5NotRunning
} }
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
// init channels
p.quit = make(chan struct{}) p.quit = make(chan struct{})
p.timeout = time.After(p.discServerTimeout) p.setDiscoveryTimeout()
// collect topics and start searching for nodes
p.topics = make([]*TopicPool, 0, len(p.config)) p.topics = make([]*TopicPool, 0, len(p.config))
for topic, limits := range p.config { for topic, limits := range p.config {
topicPool := NewTopicPool(topic, limits, p.slowSync, p.fastSync) topicPool := NewTopicPool(topic, limits, p.opts.SlowSync, p.opts.FastSync)
if err := topicPool.StartSearch(server); err != nil { if err := topicPool.StartSearch(server); err != nil {
return err return err
} }
p.topics = append(p.topics, topicPool) p.topics = append(p.topics, topicPool)
} }
signal.SendDiscoveryStarted() // discovery must be already started when pool is started // discovery must be already started when pool is started
signal.SendDiscoveryStarted()
// subscribe to peer events
p.events = make(chan *p2p.PeerEvent, 20) p.events = make(chan *p2p.PeerEvent, 20)
p.serverSubscription = server.SubscribeEvents(p.events) p.serverSubscription = server.SubscribeEvents(p.events)
p.wg.Add(1) p.wg.Add(1)
@ -106,6 +134,7 @@ func (p *PeerPool) Start(server *p2p.Server) error {
p.handleServerPeers(server, p.events) p.handleServerPeers(server, p.events)
p.wg.Done() p.wg.Done()
}() }()
return nil return nil
} }
@ -118,9 +147,10 @@ func (p *PeerPool) startDiscovery(server *p2p.Server) error {
if err != nil { if err != nil {
return err return err
} }
p.mu.Lock() p.mu.Lock()
server.DiscV5 = ntab server.DiscV5 = ntab
p.timeout = time.After(p.discServerTimeout) p.setDiscoveryTimeout()
p.mu.Unlock() p.mu.Unlock()
signal.SendDiscoveryStarted() signal.SendDiscoveryStarted()
@ -185,10 +215,8 @@ func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.Peer
p.stopDiscovery(server) p.stopDiscovery(server)
return return
case <-timeout: case <-timeout:
if p.stopOnMax { log.Info("DiscV5 timed out", "server", server.Self())
log.Debug("DiscV5 timed out", "server", server.Self())
p.stopDiscovery(server) p.stopDiscovery(server)
}
case <-retryDiscv5: case <-retryDiscv5:
if err := p.restartDiscovery(server); err != nil { if err := p.restartDiscovery(server); err != nil {
retryDiscv5 = time.After(discoveryRestartTimeout) retryDiscv5 = time.After(discoveryRestartTimeout)
@ -203,7 +231,7 @@ func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.Peer
} }
case p2p.PeerEventTypeAdd: case p2p.PeerEventTypeAdd:
log.Debug("confirm peer added", "ID", event.Peer) log.Debug("confirm peer added", "ID", event.Peer)
if p.stopOnMax && p.handleAddedPeer(server, event.Peer) { if p.opts.AllowStop && p.handleAddedPeer(server, event.Peer) {
log.Debug("closing discv5 connection because all topics reached max limit", "server", server.Self()) log.Debug("closing discv5 connection because all topics reached max limit", "server", server.Self())
p.stopDiscovery(server) p.stopDiscovery(server)
} }
@ -222,7 +250,7 @@ func (p *PeerPool) handleAddedPeer(server *p2p.Server, nodeID discover.NodeID) (
all = true all = true
for _, t := range p.topics { for _, t := range p.topics {
t.ConfirmAdded(server, nodeID) t.ConfirmAdded(server, nodeID)
if p.stopOnMax && t.MaxReached() { if p.opts.AllowStop && t.MaxReached() {
t.StopSearch() t.StopSearch()
} else { } else {
all = false all = false

View File

@ -132,7 +132,8 @@ func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() {
config := map[discv5.Topic]params.Limits{ config := map[discv5.Topic]params.Limits{
topic: params.NewLimits(1, 1), // limits are chosen for simplicity of the simulation topic: params.NewLimits(1, 1), // limits are chosen for simplicity of the simulation
} }
peerPool := NewPeerPool(config, 100*time.Millisecond, 100*time.Millisecond, nil, true) peerPoolOpts := &Options{100 * time.Millisecond, 100 * time.Millisecond, 0, true}
peerPool := NewPeerPool(config, nil, peerPoolOpts)
register := NewRegister(topic) register := NewRegister(topic)
s.Require().NoError(register.Start(s.peers[0])) s.Require().NoError(register.Start(s.peers[0]))
// need to wait for topic to get registered, discv5 can query same node // need to wait for topic to get registered, discv5 can query same node
@ -208,7 +209,8 @@ func TestPeerPoolMaxPeersOverflow(t *testing.T) {
defer peer.Stop() defer peer.Stop()
require.NotNil(t, peer.DiscV5) require.NotNil(t, peer.DiscV5)
pool := NewPeerPool(nil, DefaultFastSync, DefaultSlowSync, nil, true) poolOpts := &Options{DefaultFastSync, DefaultSlowSync, 0, true}
pool := NewPeerPool(nil, nil, poolOpts)
require.NoError(t, pool.Start(peer)) require.NoError(t, pool.Start(peer))
require.Equal(t, signal.EventDiscoveryStarted, <-signals) require.Equal(t, signal.EventDiscoveryStarted, <-signals)
// without config, it will stop the discovery because all topic pools are satisfied // without config, it will stop the discovery because all topic pools are satisfied
@ -256,8 +258,8 @@ func TestPeerPoolDiscV5Timeout(t *testing.T) {
require.NotNil(t, server.DiscV5) require.NotNil(t, server.DiscV5)
// start PeerPool // start PeerPool
pool := NewPeerPool(nil, DefaultFastSync, DefaultSlowSync, nil, true) poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, true}
pool.discServerTimeout = time.Millisecond * 100 pool := NewPeerPool(nil, nil, poolOpts)
require.NoError(t, pool.Start(server)) require.NoError(t, pool.Start(server))
require.Equal(t, signal.EventDiscoveryStarted, <-signals) require.Equal(t, signal.EventDiscoveryStarted, <-signals)
@ -265,7 +267,7 @@ func TestPeerPoolDiscV5Timeout(t *testing.T) {
select { select {
case sig := <-signals: case sig := <-signals:
require.Equal(t, signal.EventDiscoveryStopped, sig) require.Equal(t, signal.EventDiscoveryStopped, sig)
case <-time.After(pool.discServerTimeout * 2): case <-time.After(pool.opts.DiscServerTimeout * 2):
t.Fatal("timed out") t.Fatal("timed out")
} }
require.Nil(t, server.DiscV5) require.Nil(t, server.DiscV5)
@ -278,8 +280,33 @@ func TestPeerPoolDiscV5Timeout(t *testing.T) {
select { select {
case sig := <-signals: case sig := <-signals:
require.Equal(t, signal.EventDiscoveryStopped, sig) require.Equal(t, signal.EventDiscoveryStopped, sig)
case <-time.After(pool.discServerTimeout * 2): case <-time.After(pool.opts.DiscServerTimeout * 2):
t.Fatal("timed out") t.Fatal("timed out")
} }
require.Nil(t, server.DiscV5) require.Nil(t, server.DiscV5)
} }
func TestPeerPoolNotAllowedStopping(t *testing.T) {
// create and start server
key, err := crypto.GenerateKey()
require.NoError(t, err)
server := &p2p.Server{
Config: p2p.Config{
PrivateKey: key,
DiscoveryV5: true,
NoDiscovery: true,
},
}
require.NoError(t, server.Start())
defer server.Stop()
require.NotNil(t, server.DiscV5)
// start PeerPool
poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, false}
pool := NewPeerPool(nil, nil, poolOpts)
require.NoError(t, pool.Start(server))
// wait 2x timeout duration
<-time.After(pool.opts.DiscServerTimeout * 2)
require.NotNil(t, server.DiscV5)
}