Heap queue stores only peers that were not added to p2p server (#979)

* Heap queue stores only peers that were not added to p2p server

The primary goal of this change is to keep whitelist of peers
that are managed by topic pool while also preventing same peer
from being selected from heap queue multiple times.
This commit is contained in:
Dmitry Shulyak 2018-05-22 16:11:21 +03:00 committed by GitHub
parent 4dfbef3867
commit 345b152a8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 137 additions and 59 deletions

View File

@ -14,21 +14,26 @@ import (
"github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/params"
) )
const (
// notQueuedIndex used to define that item is not queued in the heap queue.
notQueuedIndex = -1
)
// NewTopicPool returns instance of TopicPool // NewTopicPool returns instance of TopicPool
func NewTopicPool(topic discv5.Topic, limits params.Limits, slowMode, fastMode time.Duration, cache *Cache) *TopicPool { func NewTopicPool(topic discv5.Topic, limits params.Limits, slowMode, fastMode time.Duration, cache *Cache) *TopicPool {
pool := TopicPool{ pool := TopicPool{
topic: topic, topic: topic,
limits: limits, limits: limits,
fastMode: fastMode, fastMode: fastMode,
slowMode: slowMode, slowMode: slowMode,
fastModeTimeout: DefaultTopicFastModeTimeout, fastModeTimeout: DefaultTopicFastModeTimeout,
peerPool: make(map[discv5.NodeID]*peerInfoItem), pendingPeers: make(map[discv5.NodeID]*peerInfoItem),
peerPoolQueue: make(peerPriorityQueue, 0), discoveredPeersQueue: make(peerPriorityQueue, 0),
connectedPeers: make(map[discv5.NodeID]*peerInfo), connectedPeers: make(map[discv5.NodeID]*peerInfo),
cache: cache, cache: cache,
} }
heap.Init(&pool.peerPoolQueue) heap.Init(&pool.discoveredPeersQueue)
return &pool return &pool
} }
@ -53,55 +58,67 @@ type TopicPool struct {
period chan time.Duration period chan time.Duration
fastModeTimeoutCancel chan struct{} fastModeTimeoutCancel chan struct{}
peerPool map[discv5.NodeID]*peerInfoItem // found but not connected peers pendingPeers map[discv5.NodeID]*peerInfoItem // contains found and requested to be connected peers but not confirmed
peerPoolQueue peerPriorityQueue // priority queue to find the most recent peer discoveredPeersQueue peerPriorityQueue // priority queue to find the most recently discovered peers; does not containt peers requested to connect
connectedPeers map[discv5.NodeID]*peerInfo // currently connected peers connectedPeers map[discv5.NodeID]*peerInfo // currently connected peers
cache *Cache cache *Cache
} }
func (t *TopicPool) addToPeerPool(peer *peerInfo) { func (t *TopicPool) addToPendingPeers(peer *peerInfo) {
if _, ok := t.peerPool[peer.node.ID]; ok { if _, ok := t.pendingPeers[peer.node.ID]; ok {
return return
} }
item := &peerInfoItem{peerInfo: peer} t.pendingPeers[peer.node.ID] = &peerInfoItem{
t.peerPool[peer.node.ID] = item peerInfo: peer,
heap.Push(&t.peerPoolQueue, item) index: notQueuedIndex,
}
func (t *TopicPool) removeFromPeerPool(nodeID discv5.NodeID) {
peer, ok := t.peerPool[nodeID]
if !ok {
return
} }
delete(t.peerPool, nodeID)
heap.Remove(&t.peerPoolQueue, peer.index)
} }
func (t *TopicPool) popFromPeerPool() *peerInfo { // addToQueue adds the passed peer to the queue if it is already pending.
if t.peerPoolQueue.Len() == 0 { func (t *TopicPool) addToQueue(peer *peerInfo) {
if p, ok := t.pendingPeers[peer.node.ID]; ok {
heap.Push(&t.discoveredPeersQueue, p)
}
}
func (t *TopicPool) popFromQueue() *peerInfo {
if t.discoveredPeersQueue.Len() == 0 {
return nil return nil
} }
item := heap.Pop(&t.peerPoolQueue).(*peerInfoItem) item := heap.Pop(&t.discoveredPeersQueue).(*peerInfoItem)
delete(t.peerPool, item.node.ID) item.index = notQueuedIndex
return item.peerInfo return item.peerInfo
} }
func (t *TopicPool) updatePeerInPool(nodeID discv5.NodeID, time mclock.AbsTime) { func (t *TopicPool) removeFromPendingPeers(nodeID discv5.NodeID) {
peer, ok := t.peerPool[nodeID] peer, ok := t.pendingPeers[nodeID]
if !ok {
return
}
delete(t.pendingPeers, nodeID)
if peer.index != notQueuedIndex {
heap.Remove(&t.discoveredPeersQueue, peer.index)
}
}
func (t *TopicPool) updatePendingPeer(nodeID discv5.NodeID, time mclock.AbsTime) {
peer, ok := t.pendingPeers[nodeID]
if !ok { if !ok {
return return
} }
peer.discoveredTime = mclock.Now() peer.discoveredTime = mclock.Now()
heap.Fix(&t.peerPoolQueue, peer.index) if peer.index != notQueuedIndex {
heap.Fix(&t.discoveredPeersQueue, peer.index)
}
} }
func (t *TopicPool) movePeerFromPoolToConnected(nodeID discv5.NodeID) { func (t *TopicPool) movePeerFromPoolToConnected(nodeID discv5.NodeID) {
peer, ok := t.peerPool[nodeID] peer, ok := t.pendingPeers[nodeID]
if !ok { if !ok {
return return
} }
t.removeFromPeerPool(nodeID) t.removeFromPendingPeers(nodeID)
t.connectedPeers[nodeID] = peer.peerInfo t.connectedPeers[nodeID] = peer.peerInfo
} }
@ -192,10 +209,11 @@ func (t *TopicPool) ConfirmAdded(server *p2p.Server, nodeID discover.NodeID) {
discV5NodeID := discv5.NodeID(nodeID) discV5NodeID := discv5.NodeID(nodeID)
// inbound connection // inbound connection
peer, exist := t.peerPool[discV5NodeID] peerInfoItem, ok := t.pendingPeers[discV5NodeID]
if !exist { if !ok {
return return
} }
peer := peerInfoItem.peerInfo // get explicit reference
// established connection means that the node // established connection means that the node
// is a viable candidate for a connection and can be cached // is a viable candidate for a connection and can be cached
@ -203,15 +221,15 @@ func (t *TopicPool) ConfirmAdded(server *p2p.Server, nodeID discover.NodeID) {
log.Error("failed to persist a peer", "error", err) log.Error("failed to persist a peer", "error", err)
} }
t.movePeerFromPoolToConnected(discV5NodeID)
// if the upper limit is already reached, drop this peer // if the upper limit is already reached, drop this peer
if len(t.connectedPeers) == t.limits.Max { if len(t.connectedPeers) > t.limits.Max {
log.Debug("max limit is reached drop the peer", "ID", nodeID, "topic", t.topic) log.Debug("max limit is reached drop the peer", "ID", nodeID, "topic", t.topic)
peer.dismissed = true peer.dismissed = true
t.removeServerPeer(server, peer.peerInfo) t.removeServerPeer(server, peer)
return return
} }
t.movePeerFromPoolToConnected(discV5NodeID)
// make sure `dismissed` is reset // make sure `dismissed` is reset
peer.dismissed = false peer.dismissed = false
@ -242,10 +260,13 @@ func (t *TopicPool) ConfirmDropped(server *p2p.Server, nodeID discover.NodeID) b
log.Debug("disconnect", "ID", nodeID, "dismissed", peer.dismissed) log.Debug("disconnect", "ID", nodeID, "dismissed", peer.dismissed)
delete(t.connectedPeers, discV5NodeID)
// Peer was removed by us because exceeded the limit. // Peer was removed by us because exceeded the limit.
// Add it back to the pool as it can be useful in the future. // Add it back to the pool as it can be useful in the future.
if peer.dismissed { if peer.dismissed {
t.addToPeerPool(peer) t.addToPendingPeers(peer)
// use queue for peers that weren't added to p2p server
t.addToQueue(peer)
return false return false
} }
@ -254,7 +275,6 @@ func (t *TopicPool) ConfirmDropped(server *p2p.Server, nodeID discover.NodeID) b
// That's why we need to call `removeServerPeer` manually. // That's why we need to call `removeServerPeer` manually.
t.removeServerPeer(server, peer) t.removeServerPeer(server, peer)
delete(t.connectedPeers, discV5NodeID)
if err := t.cache.RemovePeer(discV5NodeID, t.topic); err != nil { if err := t.cache.RemovePeer(discV5NodeID, t.topic); err != nil {
log.Error("failed to remove peer from cache", "error", err) log.Error("failed to remove peer from cache", "error", err)
} }
@ -276,7 +296,7 @@ func (t *TopicPool) AddPeerFromTable(server *p2p.Server) *discv5.Node {
// If it did not expire yet, it will be added to the server. // If it did not expire yet, it will be added to the server.
// TODO(adam): investigate if it's worth to keep the peer in the queue // TODO(adam): investigate if it's worth to keep the peer in the queue
// until the server confirms it is added and in the meanwhile only adjust its priority. // until the server confirms it is added and in the meanwhile only adjust its priority.
peer := t.popFromPeerPool() peer := t.popFromQueue()
if peer != nil && mclock.Now() < peer.discoveredTime+mclock.AbsTime(expirationPeriod) { if peer != nil && mclock.Now() < peer.discoveredTime+mclock.AbsTime(expirationPeriod) {
t.addServerPeer(server, peer) t.addServerPeer(server, peer)
return peer.node return peer.node
@ -360,10 +380,10 @@ func (t *TopicPool) processFoundNode(server *p2p.Server, node *discv5.Node) {
return return
} }
if _, ok := t.peerPool[node.ID]; ok { if _, ok := t.pendingPeers[node.ID]; ok {
t.updatePeerInPool(node.ID, mclock.Now()) t.updatePendingPeer(node.ID, mclock.Now())
} else { } else {
t.addToPeerPool(&peerInfo{ t.addToPendingPeers(&peerInfo{
discoveredTime: mclock.Now(), discoveredTime: mclock.Now(),
node: node, node: node,
}) })
@ -371,7 +391,9 @@ func (t *TopicPool) processFoundNode(server *p2p.Server, node *discv5.Node) {
// the upper limit is not reached, so let's add this peer // the upper limit is not reached, so let's add this peer
if len(t.connectedPeers) < t.limits.Max { if len(t.connectedPeers) < t.limits.Max {
t.addServerPeer(server, t.peerPool[node.ID].peerInfo) t.addServerPeer(server, t.pendingPeers[node.ID].peerInfo)
} else {
t.addToQueue(t.pendingPeers[node.ID].peerInfo)
} }
} }

View File

@ -130,23 +130,24 @@ func (s *TopicPoolSuite) TestNewPeerSelectedOnDrop() {
s.topicPool.processFoundNode(s.peer, peer1) s.topicPool.processFoundNode(s.peer, peer1)
s.topicPool.processFoundNode(s.peer, peer2) s.topicPool.processFoundNode(s.peer, peer2)
s.topicPool.processFoundNode(s.peer, peer3) s.topicPool.processFoundNode(s.peer, peer3)
s.Len(s.topicPool.peerPool, 3) s.Len(s.topicPool.pendingPeers, 3)
s.Len(s.topicPool.peerPoolQueue, 3) s.Len(s.topicPool.discoveredPeersQueue, 0)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID)) s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID))
s.Contains(s.topicPool.connectedPeers, peer1.ID) s.Contains(s.topicPool.connectedPeers, peer1.ID)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer2.ID)) s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer2.ID))
s.Contains(s.topicPool.connectedPeers, peer2.ID) s.Contains(s.topicPool.connectedPeers, peer2.ID)
s.Len(s.topicPool.peerPool, 1) s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer3.ID))
s.Len(s.topicPool.peerPoolQueue, 1) s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer3.ID))
s.Contains(s.topicPool.pendingPeers, peer3.ID)
s.Len(s.topicPool.pendingPeers, 1)
s.Len(s.topicPool.discoveredPeersQueue, 1)
// drop peer1 // drop peer1
s.True(s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID))) s.True(s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID)))
s.NotContains(s.topicPool.connectedPeers, peer1.ID) s.NotContains(s.topicPool.connectedPeers, peer1.ID)
// add peer from the pool // add peer from the pool
s.Equal(peer3.ID, s.topicPool.AddPeerFromTable(s.peer).ID) s.Equal(peer3.ID, s.topicPool.AddPeerFromTable(s.peer).ID)
s.Len(s.topicPool.peerPool, 0) s.Len(s.topicPool.pendingPeers, 1)
s.Len(s.topicPool.peerPoolQueue, 0) s.Len(s.topicPool.discoveredPeersQueue, 0)
} }
func (s *TopicPoolSuite) TestRequestedDoesntRemove() { func (s *TopicPoolSuite) TestRequestedDoesntRemove() {
@ -160,12 +161,12 @@ func (s *TopicPoolSuite) TestRequestedDoesntRemove() {
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID)) s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID))
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer2.ID)) s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer2.ID))
s.False(s.topicPool.connectedPeers[peer1.ID].dismissed) s.False(s.topicPool.connectedPeers[peer1.ID].dismissed)
s.True(s.topicPool.peerPool[peer2.ID].dismissed) s.True(s.topicPool.connectedPeers[peer2.ID].dismissed)
s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer2.ID)) s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer2.ID))
s.Contains(s.topicPool.peerPool, peer2.ID) s.Contains(s.topicPool.pendingPeers, peer2.ID)
s.NotContains(s.topicPool.connectedPeers, peer2.ID) s.NotContains(s.topicPool.connectedPeers, peer2.ID)
s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID)) s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID))
s.NotContains(s.topicPool.peerPool, peer1.ID) s.NotContains(s.topicPool.pendingPeers, peer1.ID)
s.NotContains(s.topicPool.connectedPeers, peer1.ID) s.NotContains(s.topicPool.connectedPeers, peer1.ID)
} }
@ -182,10 +183,65 @@ func (s *TopicPoolSuite) TestTheMostRecentPeerIsSelected() {
s.topicPool.processFoundNode(s.peer, peer2) s.topicPool.processFoundNode(s.peer, peer2)
s.topicPool.processFoundNode(s.peer, peer3) s.topicPool.processFoundNode(s.peer, peer3)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID)) s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID))
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer2.ID))
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer3.ID))
s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer2.ID))
s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer3.ID))
// peer1 has dropped // peer1 has dropped
s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID)) s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID))
// and peer3 is take from the pool as the most recent // and peer3 is take from the pool as the most recent
s.True(s.topicPool.peerPool[peer2.ID].discoveredTime < s.topicPool.peerPool[peer3.ID].discoveredTime) s.True(s.topicPool.pendingPeers[peer2.ID].discoveredTime < s.topicPool.pendingPeers[peer3.ID].discoveredTime)
s.Equal(peer3.ID, s.topicPool.AddPeerFromTable(s.peer).ID) s.Equal(peer3.ID, s.topicPool.AddPeerFromTable(s.peer).ID)
} }
func (s *TopicPoolSuite) TestSelectPeerAfterMaxLimit() {
s.topicPool.limits = params.NewLimits(1, 1)
peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311)
peer2 := discv5.NewNode(discv5.NodeID{2}, s.peer.Self().IP, 32311, 32311)
peer3 := discv5.NewNode(discv5.NodeID{3}, s.peer.Self().IP, 32311, 32311)
s.topicPool.processFoundNode(s.peer, peer1)
s.topicPool.processFoundNode(s.peer, peer2)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID))
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer2.ID))
s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer2.ID))
s.Len(s.topicPool.pendingPeers, 1)
s.Contains(s.topicPool.pendingPeers, peer2.ID)
s.topicPool.processFoundNode(s.peer, peer3)
s.Len(s.topicPool.pendingPeers, 2)
s.Contains(s.topicPool.pendingPeers, peer3.ID)
s.Equal(peer3, s.topicPool.AddPeerFromTable(s.peer))
}
func (s *TopicPoolSuite) TestReplacementPeerIsCounted() {
s.topicPool.limits = params.NewLimits(1, 1)
peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311)
peer2 := discv5.NewNode(discv5.NodeID{2}, s.peer.Self().IP, 32311, 32311)
s.topicPool.processFoundNode(s.peer, peer1)
s.topicPool.processFoundNode(s.peer, peer2)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID))
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer2.ID))
s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer2.ID))
s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID))
s.NotContains(s.topicPool.pendingPeers, peer1.ID)
s.NotContains(s.topicPool.connectedPeers, peer1.ID)
s.Contains(s.topicPool.pendingPeers, peer2.ID)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer2.ID))
s.True(s.topicPool.MaxReached())
}
func (s *TopicPoolSuite) TestPeerDontAddTwice() {
s.topicPool.limits = params.NewLimits(1, 1)
peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311)
peer2 := discv5.NewNode(discv5.NodeID{2}, s.peer.Self().IP, 32311, 32311)
s.topicPool.processFoundNode(s.peer, peer1)
s.topicPool.processFoundNode(s.peer, peer2)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID))
// peer2 already added to p2p server no reason to add it again
s.Nil(s.topicPool.AddPeerFromTable(s.peer))
}