Pre-cache more peers than those defined on max limit (#989)

Peers between maxPeers and maxCachedPeers added as server Peers instead of queued peers
This commit is contained in:
Adrià Cidre 2018-06-06 15:39:27 +02:00 committed by GitHub
parent c82d5635b6
commit 893b464597
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 156 additions and 25 deletions

View File

@ -37,6 +37,8 @@ const (
DefaultDiscV5Timeout = 3 * time.Minute
// DefaultTopicFastModeTimeout is a timeout after which sync mode is switched to slow mode.
DefaultTopicFastModeTimeout = 30 * time.Second
// DefaultTopicStopSearchDelay is the default delay when stopping a topic search.
DefaultTopicStopSearchDelay = 10 * time.Second
)
// Options is a struct with PeerPool configuration.
@ -47,15 +49,19 @@ type Options struct {
DiscServerTimeout time.Duration
// AllowStop allows stopping Discovery when reaching max peers or after timeout.
AllowStop bool
// TopicStopSearchDelay time stopSearch will be waiting for max cached peers to be
// filled before really stopping the search.
TopicStopSearchDelay time.Duration
}
// NewDefaultOptions return a struct with default Options.
// NewDefaultOptions returns a struct with default Options.
func NewDefaultOptions() *Options {
return &Options{
FastSync: DefaultFastSync,
SlowSync: DefaultSlowSync,
DiscServerTimeout: DefaultDiscV5Timeout,
AllowStop: false,
FastSync: DefaultFastSync,
SlowSync: DefaultSlowSync,
DiscServerTimeout: DefaultDiscV5Timeout,
AllowStop: false,
TopicStopSearchDelay: DefaultTopicStopSearchDelay,
}
}
@ -203,6 +209,7 @@ func (p *PeerPool) restartDiscovery(server *p2p.Server) error {
// simplify the whole logic and allow to remove `timeout` field from `PeerPool`.
func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.PeerEvent) {
var retryDiscv5 <-chan time.Time
var stopDiscv5 <-chan time.Time
for {
p.mu.RLock()
@ -222,6 +229,8 @@ func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.Peer
retryDiscv5 = time.After(discoveryRestartTimeout)
log.Error("starting discv5 failed", "error", err, "retry", discoveryRestartTimeout)
}
case <-stopDiscv5:
p.handleStopTopics(server)
case event := <-events:
switch event.Type {
case p2p.PeerEventTypeDrop:
@ -229,13 +238,11 @@ func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.Peer
if p.handleDroppedPeer(server, event.Peer) {
retryDiscv5 = time.After(0)
}
case p2p.PeerEventTypeAdd:
case p2p.PeerEventTypeAdd: // skip other events
log.Debug("confirm peer added", "ID", event.Peer)
if p.opts.AllowStop && p.handleAddedPeer(server, event.Peer) {
log.Debug("closing discv5 connection because all topics reached max limit", "server", server.Self())
p.stopDiscovery(server)
}
default: // skip other events
p.handleAddedPeer(server, event.Peer)
stopDiscv5 = time.After(p.opts.TopicStopSearchDelay)
default:
continue
}
SendDiscoverySummary(server.PeersInfo())
@ -243,16 +250,44 @@ func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.Peer
}
}
// handleAddedPeer notifies all topics about added peer and return true if all topics has max limit of connections
func (p *PeerPool) handleAddedPeer(server *p2p.Server, nodeID discover.NodeID) (all bool) {
// handleAddedPeer notifies all topics about added peer.
func (p *PeerPool) handleAddedPeer(server *p2p.Server, nodeID discover.NodeID) {
p.mu.Lock()
defer p.mu.Unlock()
all = true
for _, t := range p.topics {
t.ConfirmAdded(server, nodeID)
if p.opts.AllowStop && t.MaxReached() {
t.setStopSearchTimeout(p.opts.TopicStopSearchDelay)
}
}
}
// handleStopTopics stops the search on any topics having reached its max cached
// limit or its delay stop is expired, additionally will stop discovery if all
// peers are stopped.
func (p *PeerPool) handleStopTopics(server *p2p.Server) {
if !p.opts.AllowStop {
return
}
for _, t := range p.topics {
if t.readyToStopSearch() {
t.StopSearch()
} else {
}
}
if p.allTopicsStopped() {
log.Debug("closing discv5 connection because all topics reached max limit", "server", server.Self())
p.stopDiscovery(server)
}
}
// allTopicsStopped returns true if all topics are stopped.
func (p *PeerPool) allTopicsStopped() (all bool) {
if !p.opts.AllowStop {
return false
}
all = true
for _, t := range p.topics {
if !t.isStopped() {
all = false
}
}

View File

@ -121,7 +121,7 @@ func (s *PeerPoolSimulationSuite) TestPeerPoolCache() {
config := map[discv5.Topic]params.Limits{
topic: params.NewLimits(1, 1),
}
peerPoolOpts := &Options{100 * time.Millisecond, 100 * time.Millisecond, 0, true}
peerPoolOpts := &Options{100 * time.Millisecond, 100 * time.Millisecond, 0, true, 100 * time.Millisecond}
cache, err := newInMemoryCache()
s.Require().NoError(err)
peerPool := NewPeerPool(config, cache, peerPoolOpts)
@ -168,7 +168,7 @@ func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() {
config := map[discv5.Topic]params.Limits{
topic: params.NewLimits(1, 1), // limits are chosen for simplicity of the simulation
}
peerPoolOpts := &Options{100 * time.Millisecond, 100 * time.Millisecond, 0, true}
peerPoolOpts := &Options{100 * time.Millisecond, 100 * time.Millisecond, 0, true, 0}
cache, err := newInMemoryCache()
s.Require().NoError(err)
peerPool := NewPeerPool(config, cache, peerPoolOpts)
@ -192,8 +192,8 @@ func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() {
connectedPeer := s.getPeerFromEvent(events, p2p.PeerEventTypeAdd)
s.Equal(s.peers[0].Self().ID, connectedPeer)
// as the upper limit was reached, Discovery should be stoped
s.Equal(signal.EventDiscoveryStopped, s.getPoolEvent(poolEvents))
s.Equal(signal.EventDiscoverySummary, s.getPoolEvent(poolEvents))
s.Equal(signal.EventDiscoveryStopped, s.getPoolEvent(poolEvents))
s.Len(<-summaries, 1)
// stop topic register and the connected peer
@ -213,8 +213,8 @@ func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() {
defer register.Stop()
s.Equal(s.peers[2].Self().ID, s.getPeerFromEvent(events, p2p.PeerEventTypeAdd))
// Discovery can be stopped again.
s.Equal(signal.EventDiscoveryStopped, s.getPoolEvent(poolEvents))
s.Require().Equal(signal.EventDiscoverySummary, s.getPoolEvent(poolEvents))
s.Equal(signal.EventDiscoveryStopped, s.getPoolEvent(poolEvents))
s.Len(<-summaries, 1)
}
@ -225,6 +225,7 @@ func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() {
// - process peer B
// - panic because discv5 is nil!!!
func TestPeerPoolMaxPeersOverflow(t *testing.T) {
maxCachedPeersMultiplier = 0
signals := make(chan string, 10)
signal.SetDefaultNodeNotificationHandler(func(jsonEvent string) {
var envelope struct {
@ -248,12 +249,13 @@ func TestPeerPoolMaxPeersOverflow(t *testing.T) {
defer peer.Stop()
require.NotNil(t, peer.DiscV5)
poolOpts := &Options{DefaultFastSync, DefaultSlowSync, 0, true}
poolOpts := &Options{DefaultFastSync, DefaultSlowSync, 0, true, 100 * time.Millisecond}
pool := NewPeerPool(nil, nil, poolOpts)
require.NoError(t, pool.Start(peer))
require.Equal(t, signal.EventDiscoveryStarted, <-signals)
// without config, it will stop the discovery because all topic pools are satisfied
pool.events <- &p2p.PeerEvent{Type: p2p.PeerEventTypeAdd}
require.Equal(t, signal.EventDiscoverySummary, <-signals)
require.Equal(t, signal.EventDiscoveryStopped, <-signals)
require.Nil(t, peer.DiscV5)
// another peer added after discovery is stopped should not panic
@ -297,7 +299,7 @@ func TestPeerPoolDiscV5Timeout(t *testing.T) {
require.NotNil(t, server.DiscV5)
// start PeerPool
poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, true}
poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, true, 100 * time.Millisecond}
pool := NewPeerPool(nil, nil, poolOpts)
require.NoError(t, pool.Start(server))
require.Equal(t, signal.EventDiscoveryStarted, <-signals)
@ -341,7 +343,7 @@ func TestPeerPoolNotAllowedStopping(t *testing.T) {
require.NotNil(t, server.DiscV5)
// start PeerPool
poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, false}
poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, false, 100 * time.Millisecond}
pool := NewPeerPool(nil, nil, poolOpts)
require.NoError(t, pool.Start(server))

View File

@ -19,6 +19,10 @@ const (
notQueuedIndex = -1
)
// maxCachedPeersMultiplier peers max limit will be multiplied by this number
// to get the maximum number of cached peers allowed.
var maxCachedPeersMultiplier = 2
// NewTopicPool returns instance of TopicPool
func NewTopicPool(topic discv5.Topic, limits params.Limits, slowMode, fastMode time.Duration, cache *Cache) *TopicPool {
pool := TopicPool{
@ -31,8 +35,8 @@ func NewTopicPool(topic discv5.Topic, limits params.Limits, slowMode, fastMode t
discoveredPeersQueue: make(peerPriorityQueue, 0),
connectedPeers: make(map[discv5.NodeID]*peerInfo),
cache: cache,
maxCachedPeers: limits.Max * maxCachedPeersMultiplier,
}
heap.Init(&pool.discoveredPeersQueue)
return &pool
@ -62,7 +66,10 @@ type TopicPool struct {
discoveredPeersQueue peerPriorityQueue // priority queue to find the most recently discovered peers; does not containt peers requested to connect
connectedPeers map[discv5.NodeID]*peerInfo // currently connected peers
cache *Cache
stopSearchTimeout *time.Time
maxCachedPeers int
cache *Cache
}
func (t *TopicPool) addToPendingPeers(peer *peerInfo) {
@ -141,6 +148,40 @@ func (t *TopicPool) BelowMin() bool {
return len(t.connectedPeers) < t.limits.Min
}
// maxCachedPeersReached returns true if max number of cached peers is reached.
func (t *TopicPool) maxCachedPeersReached() bool {
if t.maxCachedPeers == 0 {
return true
}
peers := t.cache.GetPeersRange(t.topic, t.maxCachedPeers)
return len(peers) >= t.maxCachedPeers
}
// setStopSearchTimeout sets the timeout to stop current topic search if it's not
// been stopped before.
func (t *TopicPool) setStopSearchTimeout(delay time.Duration) {
if t.stopSearchTimeout != nil {
return
}
now := time.Now().Add(delay)
t.stopSearchTimeout = &now
}
// isStopSearchDelayExpired returns true if the timeout to stop current topic
// search has been accomplished.
func (t *TopicPool) isStopSearchDelayExpired() bool {
if t.stopSearchTimeout == nil {
return false
}
return t.stopSearchTimeout.Before(time.Now())
}
// readyToStopSearch return true if all conditions to stop search are ok.
func (t *TopicPool) readyToStopSearch() bool {
return t.isStopSearchDelayExpired() || t.maxCachedPeersReached()
}
// updateSyncMode changes the sync mode depending on the current number
// of connected peers and limits.
func (t *TopicPool) updateSyncMode() {
@ -319,6 +360,7 @@ func (t *TopicPool) StartSearch(server *p2p.Server) error {
defer t.mu.Unlock()
t.quit = make(chan struct{})
t.stopSearchTimeout = nil
// `period` is used to notify about the current sync mode.
t.period = make(chan time.Duration, 2)
@ -390,7 +432,7 @@ func (t *TopicPool) processFoundNode(server *p2p.Server, node *discv5.Node) {
}
// the upper limit is not reached, so let's add this peer
if len(t.connectedPeers) < t.limits.Max {
if len(t.connectedPeers) < t.maxCachedPeers {
t.addServerPeer(server, t.pendingPeers[node.ID].peerInfo)
} else {
t.addToQueue(t.pendingPeers[node.ID].peerInfo)
@ -415,6 +457,12 @@ func (t *TopicPool) removeServerPeer(server *p2p.Server, info *peerInfo) {
))
}
func (t *TopicPool) isStopped() bool {
t.mu.Lock()
defer t.mu.Unlock()
return t.currentMode == 0
}
// StopSearch stops the closes stop
func (t *TopicPool) StopSearch() {
if !atomic.CompareAndSwapInt32(&t.running, 1, 0) {

View File

@ -25,6 +25,7 @@ func TestTopicPoolSuite(t *testing.T) {
}
func (s *TopicPoolSuite) SetupTest() {
maxCachedPeersMultiplier = 1
key, _ := crypto.GenerateKey()
name := common.MakeName("peer", "1.0")
s.peer = &p2p.Server{
@ -65,6 +66,7 @@ func (s *TopicPoolSuite) AssertConsumed(channel <-chan time.Duration, expected t
func (s *TopicPoolSuite) TestUsingCache() {
s.topicPool.limits = params.NewLimits(1, 1)
s.topicPool.maxCachedPeers = 1
peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311)
s.topicPool.processFoundNode(s.peer, peer1)
@ -177,6 +179,7 @@ func (s *TopicPoolSuite) TestRequestedDoesntRemove() {
// max limit is 1 because we test that 2nd peer will stay in local table
// when we request to drop it
s.topicPool.limits = params.NewLimits(1, 1)
s.topicPool.maxCachedPeers = 1
peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311)
peer2 := discv5.NewNode(discv5.NodeID{2}, s.peer.Self().IP, 32311, 32311)
s.topicPool.processFoundNode(s.peer, peer1)
@ -195,6 +198,7 @@ func (s *TopicPoolSuite) TestRequestedDoesntRemove() {
func (s *TopicPoolSuite) TestTheMostRecentPeerIsSelected() {
s.topicPool.limits = params.NewLimits(1, 1)
s.topicPool.maxCachedPeers = 1
peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311)
peer2 := discv5.NewNode(discv5.NodeID{2}, s.peer.Self().IP, 32311, 32311)
@ -220,6 +224,7 @@ func (s *TopicPoolSuite) TestTheMostRecentPeerIsSelected() {
func (s *TopicPoolSuite) TestSelectPeerAfterMaxLimit() {
s.topicPool.limits = params.NewLimits(1, 1)
s.topicPool.maxCachedPeers = 1
peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311)
peer2 := discv5.NewNode(discv5.NodeID{2}, s.peer.Self().IP, 32311, 32311)
@ -240,6 +245,7 @@ func (s *TopicPoolSuite) TestSelectPeerAfterMaxLimit() {
func (s *TopicPoolSuite) TestReplacementPeerIsCounted() {
s.topicPool.limits = params.NewLimits(1, 1)
s.topicPool.maxCachedPeers = 1
peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311)
peer2 := discv5.NewNode(discv5.NodeID{2}, s.peer.Self().IP, 32311, 32311)
@ -259,6 +265,7 @@ func (s *TopicPoolSuite) TestReplacementPeerIsCounted() {
func (s *TopicPoolSuite) TestPeerDontAddTwice() {
s.topicPool.limits = params.NewLimits(1, 1)
s.topicPool.maxCachedPeers = 1
peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311)
peer2 := discv5.NewNode(discv5.NodeID{2}, s.peer.Self().IP, 32311, 32311)
@ -268,3 +275,42 @@ func (s *TopicPoolSuite) TestPeerDontAddTwice() {
// peer2 already added to p2p server no reason to add it again
s.Nil(s.topicPool.AddPeerFromTable(s.peer))
}
func (s *TopicPoolSuite) TestMaxCachedPeers() {
s.topicPool.limits = params.NewLimits(1, 1)
s.topicPool.maxCachedPeers = 3
peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311)
peer2 := discv5.NewNode(discv5.NodeID{2}, s.peer.Self().IP, 32311, 32311)
peer3 := discv5.NewNode(discv5.NodeID{3}, s.peer.Self().IP, 32311, 32311)
s.topicPool.processFoundNode(s.peer, peer1)
s.topicPool.processFoundNode(s.peer, peer2)
s.topicPool.processFoundNode(s.peer, peer3)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID))
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer2.ID))
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer3.ID))
s.Equal(3, len(s.topicPool.connectedPeers))
s.False(s.topicPool.connectedPeers[peer1.ID].dismissed)
s.True(s.topicPool.connectedPeers[peer2.ID].dismissed)
s.True(s.topicPool.connectedPeers[peer3.ID].dismissed)
cached := s.topicPool.cache.GetPeersRange(s.topicPool.topic, 5)
s.Equal(3, len(cached))
s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer2.ID))
s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer3.ID))
s.Equal(peer1.ID, cached[0].ID)
s.Equal(peer2.ID, cached[1].ID)
s.Equal(peer3.ID, cached[2].ID)
s.Contains(s.topicPool.connectedPeers, peer1.ID)
s.NotContains(s.topicPool.connectedPeers, peer2.ID)
s.NotContains(s.topicPool.connectedPeers, peer3.ID)
s.NotContains(s.topicPool.pendingPeers, peer1.ID)
s.Contains(s.topicPool.pendingPeers, peer2.ID)
s.Contains(s.topicPool.pendingPeers, peer3.ID)
s.True(s.topicPool.maxCachedPeersReached())
cached = s.topicPool.cache.GetPeersRange(s.topicPool.topic, 5)
s.Equal(3, len(cached))
}