diff --git a/geth/node/status_node.go b/geth/node/status_node.go index 666a64aa0..587c1449c 100644 --- a/geth/node/status_node.go +++ b/geth/node/status_node.go @@ -165,13 +165,13 @@ func (n *StatusNode) setupRPCClient() (err error) { func (n *StatusNode) startPeerPool() error { n.register = peers.NewRegister(n.config.RegisterTopics...) + options := peers.NewDefaultOptions() // TODO(dshulyak) consider adding a flag to define this behaviour - stopOnMax := len(n.config.RegisterTopics) == 0 - n.peerPool = peers.NewPeerPool(n.config.RequireTopics, - peers.DefaultFastSync, - peers.DefaultSlowSync, + options.AllowStop = len(n.config.RegisterTopics) == 0 + n.peerPool = peers.NewPeerPool( + n.config.RequireTopics, peers.NewCache(n.db), - stopOnMax, + options, ) if err := n.register.Start(n.gethNode.Server()); err != nil { return err diff --git a/geth/peers/peerpool.go b/geth/peers/peerpool.go index f91ce4aaa..3c53fffd5 100644 --- a/geth/peers/peerpool.go +++ b/geth/peers/peerpool.go @@ -39,15 +39,23 @@ const ( DefaultTopicFastModeTimeout = 30 * time.Second ) -// NewPeerPool creates instance of PeerPool -func NewPeerPool(config map[discv5.Topic]params.Limits, fastSync, slowSync time.Duration, cache *Cache, stopOnMax bool) *PeerPool { - return &PeerPool{ - config: config, - fastSync: fastSync, - slowSync: slowSync, - discServerTimeout: DefaultDiscV5Timeout, - cache: cache, - stopOnMax: stopOnMax, +// Options is a struct with PeerPool configuration. +type Options struct { + FastSync time.Duration + SlowSync time.Duration + // After this time, Discovery is stopped even if max peers is not reached. + DiscServerTimeout time.Duration + // AllowStop allows stopping Discovery when reaching max peers or after timeout. + AllowStop bool +} + +// NewDefaultOptions return a struct with default Options. +func NewDefaultOptions() *Options { + return &Options{ + FastSync: DefaultFastSync, + SlowSync: DefaultSlowSync, + DiscServerTimeout: DefaultDiscV5Timeout, + AllowStop: false, } } @@ -62,13 +70,11 @@ type peerInfo struct { // PeerPool manages discovered peers and connects them to p2p server type PeerPool struct { + opts *Options + // config can be set only once per pool life cycle - config map[discv5.Topic]params.Limits - fastSync time.Duration - slowSync time.Duration - discServerTimeout time.Duration - cache *Cache - stopOnMax bool + config map[discv5.Topic]params.Limits + cache *Cache mu sync.RWMutex topics []*TopicPool @@ -79,26 +85,48 @@ type PeerPool struct { timeout <-chan time.Time } +// NewPeerPool creates instance of PeerPool +func NewPeerPool(config map[discv5.Topic]params.Limits, cache *Cache, options *Options) *PeerPool { + return &PeerPool{ + opts: options, + config: config, + cache: cache, + } +} + +func (p *PeerPool) setDiscoveryTimeout() { + if p.opts.AllowStop && p.opts.DiscServerTimeout > 0 { + p.timeout = time.After(p.opts.DiscServerTimeout) + } +} + // Start creates topic pool for each topic in config and subscribes to server events. func (p *PeerPool) Start(server *p2p.Server) error { if server.DiscV5 == nil { return ErrDiscv5NotRunning } + p.mu.Lock() defer p.mu.Unlock() + + // init channels p.quit = make(chan struct{}) - p.timeout = time.After(p.discServerTimeout) + p.setDiscoveryTimeout() + + // collect topics and start searching for nodes p.topics = make([]*TopicPool, 0, len(p.config)) for topic, limits := range p.config { - topicPool := NewTopicPool(topic, limits, p.slowSync, p.fastSync) + topicPool := NewTopicPool(topic, limits, p.opts.SlowSync, p.opts.FastSync) if err := topicPool.StartSearch(server); err != nil { return err } p.topics = append(p.topics, topicPool) } - signal.SendDiscoveryStarted() // discovery must be already started when pool is started + // discovery must be already started when pool is started + signal.SendDiscoveryStarted() + // subscribe to peer events p.events = make(chan *p2p.PeerEvent, 20) p.serverSubscription = server.SubscribeEvents(p.events) p.wg.Add(1) @@ -106,6 +134,7 @@ func (p *PeerPool) Start(server *p2p.Server) error { p.handleServerPeers(server, p.events) p.wg.Done() }() + return nil } @@ -118,9 +147,10 @@ func (p *PeerPool) startDiscovery(server *p2p.Server) error { if err != nil { return err } + p.mu.Lock() server.DiscV5 = ntab - p.timeout = time.After(p.discServerTimeout) + p.setDiscoveryTimeout() p.mu.Unlock() signal.SendDiscoveryStarted() @@ -185,10 +215,8 @@ func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.Peer p.stopDiscovery(server) return case <-timeout: - if p.stopOnMax { - log.Debug("DiscV5 timed out", "server", server.Self()) - p.stopDiscovery(server) - } + log.Info("DiscV5 timed out", "server", server.Self()) + p.stopDiscovery(server) case <-retryDiscv5: if err := p.restartDiscovery(server); err != nil { retryDiscv5 = time.After(discoveryRestartTimeout) @@ -203,7 +231,7 @@ func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.Peer } case p2p.PeerEventTypeAdd: log.Debug("confirm peer added", "ID", event.Peer) - if p.stopOnMax && p.handleAddedPeer(server, event.Peer) { + if p.opts.AllowStop && p.handleAddedPeer(server, event.Peer) { log.Debug("closing discv5 connection because all topics reached max limit", "server", server.Self()) p.stopDiscovery(server) } @@ -222,7 +250,7 @@ func (p *PeerPool) handleAddedPeer(server *p2p.Server, nodeID discover.NodeID) ( all = true for _, t := range p.topics { t.ConfirmAdded(server, nodeID) - if p.stopOnMax && t.MaxReached() { + if p.opts.AllowStop && t.MaxReached() { t.StopSearch() } else { all = false diff --git a/geth/peers/peerpool_test.go b/geth/peers/peerpool_test.go index 60e23dfc7..547466927 100644 --- a/geth/peers/peerpool_test.go +++ b/geth/peers/peerpool_test.go @@ -132,7 +132,8 @@ func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() { config := map[discv5.Topic]params.Limits{ topic: params.NewLimits(1, 1), // limits are chosen for simplicity of the simulation } - peerPool := NewPeerPool(config, 100*time.Millisecond, 100*time.Millisecond, nil, true) + peerPoolOpts := &Options{100 * time.Millisecond, 100 * time.Millisecond, 0, true} + peerPool := NewPeerPool(config, nil, peerPoolOpts) register := NewRegister(topic) s.Require().NoError(register.Start(s.peers[0])) // need to wait for topic to get registered, discv5 can query same node @@ -208,7 +209,8 @@ func TestPeerPoolMaxPeersOverflow(t *testing.T) { defer peer.Stop() require.NotNil(t, peer.DiscV5) - pool := NewPeerPool(nil, DefaultFastSync, DefaultSlowSync, nil, true) + poolOpts := &Options{DefaultFastSync, DefaultSlowSync, 0, true} + pool := NewPeerPool(nil, nil, poolOpts) require.NoError(t, pool.Start(peer)) require.Equal(t, signal.EventDiscoveryStarted, <-signals) // without config, it will stop the discovery because all topic pools are satisfied @@ -256,8 +258,8 @@ func TestPeerPoolDiscV5Timeout(t *testing.T) { require.NotNil(t, server.DiscV5) // start PeerPool - pool := NewPeerPool(nil, DefaultFastSync, DefaultSlowSync, nil, true) - pool.discServerTimeout = time.Millisecond * 100 + poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, true} + pool := NewPeerPool(nil, nil, poolOpts) require.NoError(t, pool.Start(server)) require.Equal(t, signal.EventDiscoveryStarted, <-signals) @@ -265,7 +267,7 @@ func TestPeerPoolDiscV5Timeout(t *testing.T) { select { case sig := <-signals: require.Equal(t, signal.EventDiscoveryStopped, sig) - case <-time.After(pool.discServerTimeout * 2): + case <-time.After(pool.opts.DiscServerTimeout * 2): t.Fatal("timed out") } require.Nil(t, server.DiscV5) @@ -278,8 +280,33 @@ func TestPeerPoolDiscV5Timeout(t *testing.T) { select { case sig := <-signals: require.Equal(t, signal.EventDiscoveryStopped, sig) - case <-time.After(pool.discServerTimeout * 2): + case <-time.After(pool.opts.DiscServerTimeout * 2): t.Fatal("timed out") } require.Nil(t, server.DiscV5) } + +func TestPeerPoolNotAllowedStopping(t *testing.T) { + // create and start server + key, err := crypto.GenerateKey() + require.NoError(t, err) + server := &p2p.Server{ + Config: p2p.Config{ + PrivateKey: key, + DiscoveryV5: true, + NoDiscovery: true, + }, + } + require.NoError(t, server.Start()) + defer server.Stop() + require.NotNil(t, server.DiscV5) + + // start PeerPool + poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, false} + pool := NewPeerPool(nil, nil, poolOpts) + require.NoError(t, pool.Start(server)) + + // wait 2x timeout duration + <-time.After(pool.opts.DiscServerTimeout * 2) + require.NotNil(t, server.DiscV5) +}