diff --git a/geth/peers/peerpool.go b/geth/peers/peerpool.go index 82f22698a..1ef9e551a 100644 --- a/geth/peers/peerpool.go +++ b/geth/peers/peerpool.go @@ -37,6 +37,8 @@ const ( DefaultDiscV5Timeout = 3 * time.Minute // DefaultTopicFastModeTimeout is a timeout after which sync mode is switched to slow mode. DefaultTopicFastModeTimeout = 30 * time.Second + // DefaultTopicStopSearchDelay is the default delay when stopping a topic search. + DefaultTopicStopSearchDelay = 10 * time.Second ) // Options is a struct with PeerPool configuration. @@ -47,15 +49,19 @@ type Options struct { DiscServerTimeout time.Duration // AllowStop allows stopping Discovery when reaching max peers or after timeout. AllowStop bool + // TopicStopSearchDelay time stopSearch will be waiting for max cached peers to be + // filled before really stopping the search. + TopicStopSearchDelay time.Duration } -// NewDefaultOptions return a struct with default Options. +// NewDefaultOptions returns a struct with default Options. func NewDefaultOptions() *Options { return &Options{ - FastSync: DefaultFastSync, - SlowSync: DefaultSlowSync, - DiscServerTimeout: DefaultDiscV5Timeout, - AllowStop: false, + FastSync: DefaultFastSync, + SlowSync: DefaultSlowSync, + DiscServerTimeout: DefaultDiscV5Timeout, + AllowStop: false, + TopicStopSearchDelay: DefaultTopicStopSearchDelay, } } @@ -203,6 +209,7 @@ func (p *PeerPool) restartDiscovery(server *p2p.Server) error { // simplify the whole logic and allow to remove `timeout` field from `PeerPool`. func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.PeerEvent) { var retryDiscv5 <-chan time.Time + var stopDiscv5 <-chan time.Time for { p.mu.RLock() @@ -222,6 +229,8 @@ func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.Peer retryDiscv5 = time.After(discoveryRestartTimeout) log.Error("starting discv5 failed", "error", err, "retry", discoveryRestartTimeout) } + case <-stopDiscv5: + p.handleStopTopics(server) case event := <-events: switch event.Type { case p2p.PeerEventTypeDrop: @@ -229,13 +238,11 @@ func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.Peer if p.handleDroppedPeer(server, event.Peer) { retryDiscv5 = time.After(0) } - case p2p.PeerEventTypeAdd: + case p2p.PeerEventTypeAdd: // skip other events log.Debug("confirm peer added", "ID", event.Peer) - if p.opts.AllowStop && p.handleAddedPeer(server, event.Peer) { - log.Debug("closing discv5 connection because all topics reached max limit", "server", server.Self()) - p.stopDiscovery(server) - } - default: // skip other events + p.handleAddedPeer(server, event.Peer) + stopDiscv5 = time.After(p.opts.TopicStopSearchDelay) + default: continue } SendDiscoverySummary(server.PeersInfo()) @@ -243,16 +250,44 @@ func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.Peer } } -// handleAddedPeer notifies all topics about added peer and return true if all topics has max limit of connections -func (p *PeerPool) handleAddedPeer(server *p2p.Server, nodeID discover.NodeID) (all bool) { +// handleAddedPeer notifies all topics about added peer. +func (p *PeerPool) handleAddedPeer(server *p2p.Server, nodeID discover.NodeID) { p.mu.Lock() defer p.mu.Unlock() - all = true for _, t := range p.topics { t.ConfirmAdded(server, nodeID) if p.opts.AllowStop && t.MaxReached() { + t.setStopSearchTimeout(p.opts.TopicStopSearchDelay) + } + } +} + +// handleStopTopics stops the search on any topics having reached its max cached +// limit or its delay stop is expired, additionally will stop discovery if all +// peers are stopped. +func (p *PeerPool) handleStopTopics(server *p2p.Server) { + if !p.opts.AllowStop { + return + } + for _, t := range p.topics { + if t.readyToStopSearch() { t.StopSearch() - } else { + } + } + if p.allTopicsStopped() { + log.Debug("closing discv5 connection because all topics reached max limit", "server", server.Self()) + p.stopDiscovery(server) + } +} + +// allTopicsStopped returns true if all topics are stopped. +func (p *PeerPool) allTopicsStopped() (all bool) { + if !p.opts.AllowStop { + return false + } + all = true + for _, t := range p.topics { + if !t.isStopped() { all = false } } diff --git a/geth/peers/peerpool_test.go b/geth/peers/peerpool_test.go index b0b404093..3ae5e10a2 100644 --- a/geth/peers/peerpool_test.go +++ b/geth/peers/peerpool_test.go @@ -121,7 +121,7 @@ func (s *PeerPoolSimulationSuite) TestPeerPoolCache() { config := map[discv5.Topic]params.Limits{ topic: params.NewLimits(1, 1), } - peerPoolOpts := &Options{100 * time.Millisecond, 100 * time.Millisecond, 0, true} + peerPoolOpts := &Options{100 * time.Millisecond, 100 * time.Millisecond, 0, true, 100 * time.Millisecond} cache, err := newInMemoryCache() s.Require().NoError(err) peerPool := NewPeerPool(config, cache, peerPoolOpts) @@ -168,7 +168,7 @@ func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() { config := map[discv5.Topic]params.Limits{ topic: params.NewLimits(1, 1), // limits are chosen for simplicity of the simulation } - peerPoolOpts := &Options{100 * time.Millisecond, 100 * time.Millisecond, 0, true} + peerPoolOpts := &Options{100 * time.Millisecond, 100 * time.Millisecond, 0, true, 0} cache, err := newInMemoryCache() s.Require().NoError(err) peerPool := NewPeerPool(config, cache, peerPoolOpts) @@ -192,8 +192,8 @@ func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() { connectedPeer := s.getPeerFromEvent(events, p2p.PeerEventTypeAdd) s.Equal(s.peers[0].Self().ID, connectedPeer) // as the upper limit was reached, Discovery should be stoped - s.Equal(signal.EventDiscoveryStopped, s.getPoolEvent(poolEvents)) s.Equal(signal.EventDiscoverySummary, s.getPoolEvent(poolEvents)) + s.Equal(signal.EventDiscoveryStopped, s.getPoolEvent(poolEvents)) s.Len(<-summaries, 1) // stop topic register and the connected peer @@ -213,8 +213,8 @@ func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() { defer register.Stop() s.Equal(s.peers[2].Self().ID, s.getPeerFromEvent(events, p2p.PeerEventTypeAdd)) // Discovery can be stopped again. - s.Equal(signal.EventDiscoveryStopped, s.getPoolEvent(poolEvents)) s.Require().Equal(signal.EventDiscoverySummary, s.getPoolEvent(poolEvents)) + s.Equal(signal.EventDiscoveryStopped, s.getPoolEvent(poolEvents)) s.Len(<-summaries, 1) } @@ -225,6 +225,7 @@ func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() { // - process peer B // - panic because discv5 is nil!!! func TestPeerPoolMaxPeersOverflow(t *testing.T) { + maxCachedPeersMultiplier = 0 signals := make(chan string, 10) signal.SetDefaultNodeNotificationHandler(func(jsonEvent string) { var envelope struct { @@ -248,12 +249,13 @@ func TestPeerPoolMaxPeersOverflow(t *testing.T) { defer peer.Stop() require.NotNil(t, peer.DiscV5) - poolOpts := &Options{DefaultFastSync, DefaultSlowSync, 0, true} + poolOpts := &Options{DefaultFastSync, DefaultSlowSync, 0, true, 100 * time.Millisecond} pool := NewPeerPool(nil, nil, poolOpts) require.NoError(t, pool.Start(peer)) require.Equal(t, signal.EventDiscoveryStarted, <-signals) // without config, it will stop the discovery because all topic pools are satisfied pool.events <- &p2p.PeerEvent{Type: p2p.PeerEventTypeAdd} + require.Equal(t, signal.EventDiscoverySummary, <-signals) require.Equal(t, signal.EventDiscoveryStopped, <-signals) require.Nil(t, peer.DiscV5) // another peer added after discovery is stopped should not panic @@ -297,7 +299,7 @@ func TestPeerPoolDiscV5Timeout(t *testing.T) { require.NotNil(t, server.DiscV5) // start PeerPool - poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, true} + poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, true, 100 * time.Millisecond} pool := NewPeerPool(nil, nil, poolOpts) require.NoError(t, pool.Start(server)) require.Equal(t, signal.EventDiscoveryStarted, <-signals) @@ -341,7 +343,7 @@ func TestPeerPoolNotAllowedStopping(t *testing.T) { require.NotNil(t, server.DiscV5) // start PeerPool - poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, false} + poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, false, 100 * time.Millisecond} pool := NewPeerPool(nil, nil, poolOpts) require.NoError(t, pool.Start(server)) diff --git a/geth/peers/topicpool.go b/geth/peers/topicpool.go index 59896248f..430ef3fe1 100644 --- a/geth/peers/topicpool.go +++ b/geth/peers/topicpool.go @@ -19,6 +19,10 @@ const ( notQueuedIndex = -1 ) +// maxCachedPeersMultiplier peers max limit will be multiplied by this number +// to get the maximum number of cached peers allowed. +var maxCachedPeersMultiplier = 2 + // NewTopicPool returns instance of TopicPool func NewTopicPool(topic discv5.Topic, limits params.Limits, slowMode, fastMode time.Duration, cache *Cache) *TopicPool { pool := TopicPool{ @@ -31,8 +35,8 @@ func NewTopicPool(topic discv5.Topic, limits params.Limits, slowMode, fastMode t discoveredPeersQueue: make(peerPriorityQueue, 0), connectedPeers: make(map[discv5.NodeID]*peerInfo), cache: cache, + maxCachedPeers: limits.Max * maxCachedPeersMultiplier, } - heap.Init(&pool.discoveredPeersQueue) return &pool @@ -62,7 +66,10 @@ type TopicPool struct { discoveredPeersQueue peerPriorityQueue // priority queue to find the most recently discovered peers; does not containt peers requested to connect connectedPeers map[discv5.NodeID]*peerInfo // currently connected peers - cache *Cache + stopSearchTimeout *time.Time + + maxCachedPeers int + cache *Cache } func (t *TopicPool) addToPendingPeers(peer *peerInfo) { @@ -141,6 +148,40 @@ func (t *TopicPool) BelowMin() bool { return len(t.connectedPeers) < t.limits.Min } +// maxCachedPeersReached returns true if max number of cached peers is reached. +func (t *TopicPool) maxCachedPeersReached() bool { + if t.maxCachedPeers == 0 { + return true + } + peers := t.cache.GetPeersRange(t.topic, t.maxCachedPeers) + + return len(peers) >= t.maxCachedPeers +} + +// setStopSearchTimeout sets the timeout to stop current topic search if it's not +// been stopped before. +func (t *TopicPool) setStopSearchTimeout(delay time.Duration) { + if t.stopSearchTimeout != nil { + return + } + now := time.Now().Add(delay) + t.stopSearchTimeout = &now +} + +// isStopSearchDelayExpired returns true if the timeout to stop current topic +// search has been accomplished. +func (t *TopicPool) isStopSearchDelayExpired() bool { + if t.stopSearchTimeout == nil { + return false + } + return t.stopSearchTimeout.Before(time.Now()) +} + +// readyToStopSearch return true if all conditions to stop search are ok. +func (t *TopicPool) readyToStopSearch() bool { + return t.isStopSearchDelayExpired() || t.maxCachedPeersReached() +} + // updateSyncMode changes the sync mode depending on the current number // of connected peers and limits. func (t *TopicPool) updateSyncMode() { @@ -319,6 +360,7 @@ func (t *TopicPool) StartSearch(server *p2p.Server) error { defer t.mu.Unlock() t.quit = make(chan struct{}) + t.stopSearchTimeout = nil // `period` is used to notify about the current sync mode. t.period = make(chan time.Duration, 2) @@ -390,7 +432,7 @@ func (t *TopicPool) processFoundNode(server *p2p.Server, node *discv5.Node) { } // the upper limit is not reached, so let's add this peer - if len(t.connectedPeers) < t.limits.Max { + if len(t.connectedPeers) < t.maxCachedPeers { t.addServerPeer(server, t.pendingPeers[node.ID].peerInfo) } else { t.addToQueue(t.pendingPeers[node.ID].peerInfo) @@ -415,6 +457,12 @@ func (t *TopicPool) removeServerPeer(server *p2p.Server, info *peerInfo) { )) } +func (t *TopicPool) isStopped() bool { + t.mu.Lock() + defer t.mu.Unlock() + return t.currentMode == 0 +} + // StopSearch stops the closes stop func (t *TopicPool) StopSearch() { if !atomic.CompareAndSwapInt32(&t.running, 1, 0) { diff --git a/geth/peers/topicpool_test.go b/geth/peers/topicpool_test.go index 255d9a7d6..eee0679a5 100644 --- a/geth/peers/topicpool_test.go +++ b/geth/peers/topicpool_test.go @@ -25,6 +25,7 @@ func TestTopicPoolSuite(t *testing.T) { } func (s *TopicPoolSuite) SetupTest() { + maxCachedPeersMultiplier = 1 key, _ := crypto.GenerateKey() name := common.MakeName("peer", "1.0") s.peer = &p2p.Server{ @@ -65,6 +66,7 @@ func (s *TopicPoolSuite) AssertConsumed(channel <-chan time.Duration, expected t func (s *TopicPoolSuite) TestUsingCache() { s.topicPool.limits = params.NewLimits(1, 1) + s.topicPool.maxCachedPeers = 1 peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311) s.topicPool.processFoundNode(s.peer, peer1) @@ -177,6 +179,7 @@ func (s *TopicPoolSuite) TestRequestedDoesntRemove() { // max limit is 1 because we test that 2nd peer will stay in local table // when we request to drop it s.topicPool.limits = params.NewLimits(1, 1) + s.topicPool.maxCachedPeers = 1 peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311) peer2 := discv5.NewNode(discv5.NodeID{2}, s.peer.Self().IP, 32311, 32311) s.topicPool.processFoundNode(s.peer, peer1) @@ -195,6 +198,7 @@ func (s *TopicPoolSuite) TestRequestedDoesntRemove() { func (s *TopicPoolSuite) TestTheMostRecentPeerIsSelected() { s.topicPool.limits = params.NewLimits(1, 1) + s.topicPool.maxCachedPeers = 1 peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311) peer2 := discv5.NewNode(discv5.NodeID{2}, s.peer.Self().IP, 32311, 32311) @@ -220,6 +224,7 @@ func (s *TopicPoolSuite) TestTheMostRecentPeerIsSelected() { func (s *TopicPoolSuite) TestSelectPeerAfterMaxLimit() { s.topicPool.limits = params.NewLimits(1, 1) + s.topicPool.maxCachedPeers = 1 peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311) peer2 := discv5.NewNode(discv5.NodeID{2}, s.peer.Self().IP, 32311, 32311) @@ -240,6 +245,7 @@ func (s *TopicPoolSuite) TestSelectPeerAfterMaxLimit() { func (s *TopicPoolSuite) TestReplacementPeerIsCounted() { s.topicPool.limits = params.NewLimits(1, 1) + s.topicPool.maxCachedPeers = 1 peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311) peer2 := discv5.NewNode(discv5.NodeID{2}, s.peer.Self().IP, 32311, 32311) @@ -259,6 +265,7 @@ func (s *TopicPoolSuite) TestReplacementPeerIsCounted() { func (s *TopicPoolSuite) TestPeerDontAddTwice() { s.topicPool.limits = params.NewLimits(1, 1) + s.topicPool.maxCachedPeers = 1 peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311) peer2 := discv5.NewNode(discv5.NodeID{2}, s.peer.Self().IP, 32311, 32311) @@ -268,3 +275,42 @@ func (s *TopicPoolSuite) TestPeerDontAddTwice() { // peer2 already added to p2p server no reason to add it again s.Nil(s.topicPool.AddPeerFromTable(s.peer)) } + +func (s *TopicPoolSuite) TestMaxCachedPeers() { + s.topicPool.limits = params.NewLimits(1, 1) + s.topicPool.maxCachedPeers = 3 + peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311) + peer2 := discv5.NewNode(discv5.NodeID{2}, s.peer.Self().IP, 32311, 32311) + peer3 := discv5.NewNode(discv5.NodeID{3}, s.peer.Self().IP, 32311, 32311) + s.topicPool.processFoundNode(s.peer, peer1) + s.topicPool.processFoundNode(s.peer, peer2) + s.topicPool.processFoundNode(s.peer, peer3) + s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID)) + s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer2.ID)) + s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer3.ID)) + + s.Equal(3, len(s.topicPool.connectedPeers)) + s.False(s.topicPool.connectedPeers[peer1.ID].dismissed) + s.True(s.topicPool.connectedPeers[peer2.ID].dismissed) + s.True(s.topicPool.connectedPeers[peer3.ID].dismissed) + + cached := s.topicPool.cache.GetPeersRange(s.topicPool.topic, 5) + s.Equal(3, len(cached)) + + s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer2.ID)) + s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer3.ID)) + + s.Equal(peer1.ID, cached[0].ID) + s.Equal(peer2.ID, cached[1].ID) + s.Equal(peer3.ID, cached[2].ID) + s.Contains(s.topicPool.connectedPeers, peer1.ID) + s.NotContains(s.topicPool.connectedPeers, peer2.ID) + s.NotContains(s.topicPool.connectedPeers, peer3.ID) + s.NotContains(s.topicPool.pendingPeers, peer1.ID) + s.Contains(s.topicPool.pendingPeers, peer2.ID) + s.Contains(s.topicPool.pendingPeers, peer3.ID) + + s.True(s.topicPool.maxCachedPeersReached()) + cached = s.topicPool.cache.GetPeersRange(s.topicPool.topic, 5) + s.Equal(3, len(cached)) +}