Limit the upper bounds of how many pending peers we track (#1714)

This commit is contained in:
André Medeiros 2019-12-10 10:20:22 -05:00 committed by GitHub
parent baa0767c26
commit 58fc7e45db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 91 additions and 2 deletions

View File

@ -24,6 +24,10 @@ const (
// to get the maximum number of cached peers allowed.
var maxCachedPeersMultiplier = 1
// maxPendingPeersMultiplier peers max limit will be multiplied by this number
// to get the maximum number of pending peers allowed.
var maxPendingPeersMultiplier = 2
// TopicPoolInterface the TopicPool interface.
type TopicPoolInterface interface {
StopSearch(server *p2p.Server)
@ -52,9 +56,11 @@ func newTopicPool(discovery discovery.Discovery, topic discv5.Topic, limits para
fastModeTimeout: DefaultTopicFastModeTimeout,
pendingPeers: make(map[enode.ID]*peerInfoItem),
discoveredPeersQueue: make(peerPriorityQueue, 0),
discoveredPeers: make(map[enode.ID]bool),
connectedPeers: make(map[enode.ID]*peerInfo),
cache: cache,
maxCachedPeers: limits.Max * maxCachedPeersMultiplier,
maxPendingPeers: limits.Max * maxPendingPeersMultiplier,
}
heap.Init(&pool.discoveredPeersQueue)
@ -85,12 +91,14 @@ type TopicPool struct {
pendingPeers map[enode.ID]*peerInfoItem // contains found and requested to be connected peers but not confirmed
discoveredPeersQueue peerPriorityQueue // priority queue to find the most recently discovered peers; does not containt peers requested to connect
discoveredPeers map[enode.ID]bool // remembers which peers have already been discovered and are enqueued
connectedPeers map[enode.ID]*peerInfo // currently connected peers
stopSearchTimeout *time.Time
maxCachedPeers int
cache *Cache
maxPendingPeers int
maxCachedPeers int
cache *Cache
}
func (t *TopicPool) addToPendingPeers(peer *peerInfo) {
@ -101,12 +109,33 @@ func (t *TopicPool) addToPendingPeers(peer *peerInfo) {
peerInfo: peer,
index: notQueuedIndex,
}
// maxPendingPeers = 0 means no limits.
if t.maxPendingPeers == 0 || t.maxPendingPeers >= len(t.pendingPeers) {
return
}
var oldestPeer *peerInfo
for _, i := range t.pendingPeers {
if oldestPeer != nil && oldestPeer.discoveredTime < i.peerInfo.discoveredTime {
continue
}
oldestPeer = i.peerInfo
}
t.removeFromPendingPeers(oldestPeer.NodeID())
}
// addToQueue adds the passed peer to the queue if it is already pending.
func (t *TopicPool) addToQueue(peer *peerInfo) {
if p, ok := t.pendingPeers[peer.NodeID()]; ok {
if _, ok := t.discoveredPeers[peer.NodeID()]; ok {
return
}
heap.Push(&t.discoveredPeersQueue, p)
t.discoveredPeers[peer.NodeID()] = true
}
}
@ -116,6 +145,7 @@ func (t *TopicPool) popFromQueue() *peerInfo {
}
item := heap.Pop(&t.discoveredPeersQueue).(*peerInfoItem)
item.index = notQueuedIndex
delete(t.discoveredPeers, item.peerInfo.NodeID())
return item.peerInfo
}
@ -127,6 +157,7 @@ func (t *TopicPool) removeFromPendingPeers(nodeID enode.ID) {
delete(t.pendingPeers, nodeID)
if peer.index != notQueuedIndex {
heap.Remove(&t.discoveredPeersQueue, peer.index)
delete(t.discoveredPeers, nodeID)
}
}

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discv5"
@ -340,6 +341,63 @@ func (s *TopicPoolSuite) TestMaxCachedPeers() {
s.Equal(3, len(cached))
}
func (s *TopicPoolSuite) TestMaxPendingPeers() {
s.topicPool.maxPendingPeers = 2
nodeID1, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
nodeID2, peer2 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
nodeID3, peer3 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
pk1, _ := peer1.ID.Pubkey()
pk2, _ := peer2.ID.Pubkey()
pk3, _ := peer3.ID.Pubkey()
s.topicPool.addToPendingPeers(&peerInfo{discoveredTime: mclock.Now(), node: peer1, publicKey: pk1})
s.topicPool.addToPendingPeers(&peerInfo{discoveredTime: mclock.Now(), node: peer2, publicKey: pk2})
s.topicPool.addToPendingPeers(&peerInfo{discoveredTime: mclock.Now(), node: peer3, publicKey: pk3})
s.Equal(2, len(s.topicPool.pendingPeers))
s.Require().NotContains(s.topicPool.pendingPeers, nodeID1)
s.Require().Contains(s.topicPool.pendingPeers, nodeID2)
s.Require().Contains(s.topicPool.pendingPeers, nodeID3)
// maxPendingPeers = 0 means no limits.
s.topicPool.maxPendingPeers = 0
s.topicPool.addToPendingPeers(&peerInfo{discoveredTime: mclock.Now(), node: peer1, publicKey: pk1})
s.Equal(3, len(s.topicPool.pendingPeers))
s.Require().Contains(s.topicPool.pendingPeers, nodeID1)
s.Require().Contains(s.topicPool.pendingPeers, nodeID2)
s.Require().Contains(s.topicPool.pendingPeers, nodeID3)
}
func (s *TopicPoolSuite) TestQueueDuplicatePeers() {
_, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
_, peer2 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
pk1, _ := peer1.ID.Pubkey()
pk2, _ := peer2.ID.Pubkey()
peerInfo1 := &peerInfo{discoveredTime: mclock.Now(), node: peer1, publicKey: pk1}
peerInfo2 := &peerInfo{discoveredTime: mclock.Now(), node: peer2, publicKey: pk2}
s.topicPool.addToPendingPeers(&peerInfo{discoveredTime: mclock.Now(), node: peer1, publicKey: pk1})
s.topicPool.addToPendingPeers(&peerInfo{discoveredTime: mclock.Now(), node: peer2, publicKey: pk2})
s.topicPool.addToQueue(peerInfo1)
s.topicPool.addToQueue(peerInfo2)
s.Equal(2, len(s.topicPool.discoveredPeersQueue))
s.Equal(2, len(s.topicPool.discoveredPeers))
s.topicPool.addToQueue(peerInfo1)
s.Equal(2, len(s.topicPool.discoveredPeersQueue))
s.Equal(2, len(s.topicPool.discoveredPeers))
peer := s.topicPool.popFromQueue()
s.Equal(1, len(s.topicPool.discoveredPeersQueue))
s.Equal(1, len(s.topicPool.discoveredPeers))
s.Require().NotContains(s.topicPool.discoveredPeers, peer.NodeID())
}
func (s *TopicPoolSuite) TestNewTopicPoolInterface() {
limits := params.NewLimits(1, 2)
cache, err := newInMemoryCache()