Add a priority queue to TopicPool (#840)

This commit is contained in:
Adam Babik 2018-04-19 17:18:49 +02:00 committed by GitHub
parent fa526e7444
commit a45b05969a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 302 additions and 60 deletions

View File

@ -53,10 +53,8 @@ func NewPeerPool(config map[discv5.Topic]params.Limits, fastSync, slowSync time.
type peerInfo struct {
// discoveredTime last time when node was found by v5
discoveredTime mclock.AbsTime
// connected is true if node is added as a static peer
connected bool
// requested is true when our node requested a disconnect
requested bool
// dismissed is true when our node requested a disconnect
dismissed bool
node *discv5.Node
}

View File

@ -0,0 +1,41 @@
package peers
import (
"container/heap"
)
type peerInfoItem struct {
*peerInfo
index int
}
type peerPriorityQueue []*peerInfoItem
var _ heap.Interface = (*peerPriorityQueue)(nil)
func (q peerPriorityQueue) Len() int { return len(q) }
func (q peerPriorityQueue) Less(i, j int) bool {
return q[i].discoveredTime > q[j].discoveredTime
}
func (q peerPriorityQueue) Swap(i, j int) {
q[i], q[j] = q[j], q[i]
q[i].index = i
q[j].index = j
}
func (q *peerPriorityQueue) Push(x interface{}) {
item := x.(*peerInfoItem)
item.index = len(*q)
*q = append(*q, item)
}
func (q *peerPriorityQueue) Pop() interface{} {
old := *q
n := len(old)
item := old[n-1]
item.index = -1
*q = old[0 : n-1]
return item
}

View File

@ -0,0 +1,79 @@
package peers
import (
"container/heap"
"math/rand"
"testing"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common/mclock"
)
func TestPeerPriorityQueueSorting(t *testing.T) {
count := 5
discTimes := make([]mclock.AbsTime, count)
// generate a slice of monotonic times
for i := 0; i < count; i++ {
discTimes[i] = mclock.Now()
}
// shuffle discTimes
for i := range discTimes {
j := rand.Intn(i + 1)
discTimes[i], discTimes[j] = discTimes[j], discTimes[i]
}
// make a priority queue
q := make(peerPriorityQueue, count)
for i := 0; i < count; i++ {
q[i] = &peerInfoItem{
peerInfo: &peerInfo{
discoveredTime: discTimes[i],
},
}
}
heap.Init(&q)
// verify that the slice is sorted ascending by `discoveredTime`
var item *peerInfoItem
for q.Len() > 0 {
newItem := heap.Pop(&q).(*peerInfoItem)
if item != nil {
require.True(t, item.discoveredTime > newItem.discoveredTime)
}
item = newItem
}
}
func TestPeerPriorityQueueIndexUpdating(t *testing.T) {
q := make(peerPriorityQueue, 0)
heap.Init(&q)
item1 := &peerInfoItem{
index: -1,
peerInfo: &peerInfo{
discoveredTime: mclock.Now(),
},
}
item2 := &peerInfoItem{
index: -1,
peerInfo: &peerInfo{
discoveredTime: mclock.Now(),
},
}
// insert older item first
heap.Push(&q, item1)
require.Equal(t, item1.index, 0)
heap.Push(&q, item2)
require.Equal(t, item2.index, 0)
require.Equal(t, item1.index, 1)
// pop operation should reset index
poppedItem := heap.Pop(&q)
require.Equal(t, item2, poppedItem)
require.Equal(t, item2.index, -1)
require.Equal(t, item1.index, 0)
}

View File

@ -1,6 +1,7 @@
package peers
import (
"container/heap"
"sync"
"sync/atomic"
"time"
@ -15,13 +16,19 @@ import (
// NewTopicPool returns instance of TopicPool
func NewTopicPool(topic discv5.Topic, limits params.Limits, slowSync, fastSync time.Duration) *TopicPool {
return &TopicPool{
topic: topic,
limits: limits,
slowSync: slowSync,
fastSync: fastSync,
peers: map[discv5.NodeID]*peerInfo{},
pool := TopicPool{
topic: topic,
limits: limits,
slowSync: slowSync,
fastSync: fastSync,
peerPool: make(map[discv5.NodeID]*peerInfoItem),
peerPoolQueue: make(peerPriorityQueue, 0),
connectedPeers: make(map[discv5.NodeID]*peerInfo),
}
heap.Init(&pool.peerPoolQueue)
return &pool
}
// TopicPool manages peers for topic.
@ -37,13 +44,60 @@ type TopicPool struct {
mu sync.RWMutex
discWG sync.WaitGroup
consumerWG sync.WaitGroup
connected int
peers map[discv5.NodeID]*peerInfo
period chan time.Duration
peerPool map[discv5.NodeID]*peerInfoItem // found but not connected peers
peerPoolQueue peerPriorityQueue // priority queue to find the most recent peer
connectedPeers map[discv5.NodeID]*peerInfo // currently connected peers
cache *Cache
}
func (t *TopicPool) addToPeerPool(peer *peerInfo) {
if _, ok := t.peerPool[peer.node.ID]; ok {
return
}
item := &peerInfoItem{peerInfo: peer}
t.peerPool[peer.node.ID] = item
heap.Push(&t.peerPoolQueue, item)
}
func (t *TopicPool) removeFromPeerPool(nodeID discv5.NodeID) {
peer, ok := t.peerPool[nodeID]
if !ok {
return
}
delete(t.peerPool, nodeID)
heap.Remove(&t.peerPoolQueue, peer.index)
}
func (t *TopicPool) popFromPeerPool() *peerInfo {
if t.peerPoolQueue.Len() == 0 {
return nil
}
item := heap.Pop(&t.peerPoolQueue).(*peerInfoItem)
delete(t.peerPool, item.node.ID)
return item.peerInfo
}
func (t *TopicPool) updatePeerInPool(nodeID discv5.NodeID, time mclock.AbsTime) {
peer, ok := t.peerPool[nodeID]
if !ok {
return
}
peer.discoveredTime = mclock.Now()
heap.Fix(&t.peerPoolQueue, peer.index)
}
func (t *TopicPool) movePeerFromPoolToConnected(nodeID discv5.NodeID) {
peer, ok := t.peerPool[nodeID]
if !ok {
return
}
t.removeFromPeerPool(nodeID)
t.connectedPeers[nodeID] = peer.peerInfo
}
// SearchRunning returns true if search is running
func (t *TopicPool) SearchRunning() bool {
return atomic.LoadInt32(&t.running) == 1
@ -53,14 +107,14 @@ func (t *TopicPool) SearchRunning() bool {
func (t *TopicPool) MaxReached() bool {
t.mu.RLock()
defer t.mu.RUnlock()
return t.connected == t.limits[1]
return len(t.connectedPeers) == t.limits[1]
}
// BelowMin returns true if current number of peers is below min limit.
func (t *TopicPool) BelowMin() bool {
t.mu.RLock()
defer t.mu.RUnlock()
return t.connected < t.limits[0]
return len(t.connectedPeers) < t.limits[0]
}
// ConfirmAdded called when peer was added by p2p Server.
@ -73,31 +127,38 @@ func (t *TopicPool) BelowMin() bool {
func (t *TopicPool) ConfirmAdded(server *p2p.Server, nodeID discover.NodeID) {
t.mu.Lock()
defer t.mu.Unlock()
discV5NodeID := discv5.NodeID(nodeID)
// inbound connection
peer, exist := t.peers[discv5.NodeID(nodeID)]
peer, exist := t.peerPool[discV5NodeID]
if !exist {
return
}
// established connection means that the node is a viable candidate for a connection and can be cached
// established connection means that the node
// is a viable candidate for a connection and can be cached
if t.cache != nil {
if err := t.cache.AddPeer(peer.node, t.topic); err != nil {
log.Error("failed to persist a peer", "error", err)
}
}
// when max limit is reached drop every peer after
if t.connected == t.limits[1] {
// if the upper limit is already reached, drop this peer
if len(t.connectedPeers) == t.limits[1] {
log.Debug("max limit is reached drop the peer", "ID", nodeID, "topic", t.topic)
peer.requested = true
t.removePeer(server, peer)
peer.dismissed = true
t.removeServerPeer(server, peer.peerInfo)
return
}
// don't count same peer twice
if !peer.connected {
log.Debug("marking as connected", "ID", nodeID)
peer.connected = true
t.connected++
}
if t.SearchRunning() && t.connected == t.limits[0] {
// move peer from pool to connected peers
t.movePeerFromPoolToConnected(discV5NodeID)
// make sure `dismissed` is restarted
peer.dismissed = false
// when the lower limit is reached, we can switch to slow mode
if t.SearchRunning() && len(t.connectedPeers) == t.limits[0] {
t.period <- t.slowSync
}
}
@ -112,26 +173,44 @@ func (t *TopicPool) ConfirmAdded(server *p2p.Server, nodeID discover.NodeID) {
func (t *TopicPool) ConfirmDropped(server *p2p.Server, nodeID discover.NodeID) bool {
t.mu.Lock()
defer t.mu.Unlock()
discV5NodeID := discv5.NodeID(nodeID)
// either inbound or connected from another topic
peer, exist := t.peers[discv5.NodeID(nodeID)]
peer, exist := t.connectedPeers[discV5NodeID]
if !exist {
return false
}
log.Debug("disconnect", "ID", nodeID)
if peer.requested {
log.Debug("disconnect", "ID", nodeID, "dismissed", peer.dismissed)
// Peer was removed by us because exceeded the limit.
// Add it back to the pool as it can be useful in the future.
if peer.dismissed {
t.addToPeerPool(peer)
return false
}
if t.SearchRunning() && t.connected == t.limits[0] {
// switch to fast mode as the number of connected peers is about to drop
// below the lower limit
if t.SearchRunning() && len(t.connectedPeers) == t.limits[0] {
t.period <- t.fastSync
}
t.connected--
t.removePeer(server, peer)
delete(t.peers, discv5.NodeID(nodeID))
// If there was a network error, this event will be received
// but the peer won't be removed from the static nodes set.
// That's why we need to call `removeServerPeer` manually.
t.removeServerPeer(server, peer)
delete(t.connectedPeers, discV5NodeID)
// remove from cache only if the peer dropped by itself
if t.cache != nil {
if err := t.cache.RemovePeer(discv5.NodeID(nodeID), t.topic); err != nil {
if err := t.cache.RemovePeer(discV5NodeID, t.topic); err != nil {
log.Error("failed to remove peer from cache", "error", err)
}
}
return true
}
@ -139,13 +218,17 @@ func (t *TopicPool) ConfirmDropped(server *p2p.Server, nodeID discover.NodeID) b
func (t *TopicPool) AddPeerFromTable(server *p2p.Server) *discv5.Node {
t.mu.RLock()
defer t.mu.RUnlock()
// TODO use a heap queue and always get a peer that was discovered recently
for _, peer := range t.peers {
if !peer.connected && mclock.Now() < peer.discoveredTime+mclock.AbsTime(expirationPeriod) {
t.addPeer(server, peer)
return peer.node
}
// The most recently added peer is removed from the queue.
// If it did not expire yet, it will be added to the server.
// TODO(adam): investigate if it's worth to keep the peer in the queue
// until the server confirms it is added and in the meanwhile only adjust its priority.
peer := t.popFromPeerPool()
if peer != nil && mclock.Now() < peer.discoveredTime+mclock.AbsTime(expirationPeriod) {
t.addServerPeer(server, peer)
return peer.node
}
return nil
}
@ -184,7 +267,7 @@ func (t *TopicPool) StartSearch(server *p2p.Server) error {
}
func (t *TopicPool) handleFoundPeers(server *p2p.Server, found <-chan *discv5.Node, lookup <-chan bool) {
if t.connected >= t.limits[0] {
if len(t.connectedPeers) >= t.limits[0] {
t.period <- t.slowSync
} else {
t.period <- t.fastSync
@ -211,21 +294,31 @@ func (t *TopicPool) handleFoundPeers(server *p2p.Server, found <-chan *discv5.No
func (t *TopicPool) processFoundNode(server *p2p.Server, node *discv5.Node) {
t.mu.Lock()
defer t.mu.Unlock()
if info, exist := t.peers[node.ID]; exist {
info.discoveredTime = mclock.Now()
log.Debug("peer found", "ID", node.ID, "topic", t.topic)
// peer is already connected so update only discoveredTime
if peer, ok := t.connectedPeers[node.ID]; ok {
peer.discoveredTime = mclock.Now()
return
}
if _, ok := t.peerPool[node.ID]; ok {
t.updatePeerInPool(node.ID, mclock.Now())
} else {
t.peers[node.ID] = &peerInfo{
t.addToPeerPool(&peerInfo{
discoveredTime: mclock.Now(),
node: node,
}
})
}
if t.connected < t.limits[1] && !t.peers[node.ID].connected {
log.Debug("peer found", "ID", node.ID, "topic", t.topic)
t.addPeer(server, t.peers[node.ID])
// the upper limit is not reached, so let's add this peer
if len(t.connectedPeers) < t.limits[1] {
t.addServerPeer(server, t.peerPool[node.ID].peerInfo)
}
}
func (t *TopicPool) addPeer(server *p2p.Server, info *peerInfo) {
func (t *TopicPool) addServerPeer(server *p2p.Server, info *peerInfo) {
server.AddPeer(discover.NewNode(
discover.NodeID(info.node.ID),
info.node.IP,
@ -234,7 +327,7 @@ func (t *TopicPool) addPeer(server *p2p.Server, info *peerInfo) {
))
}
func (t *TopicPool) removePeer(server *p2p.Server, info *peerInfo) {
func (t *TopicPool) removeServerPeer(server *p2p.Server, info *peerInfo) {
server.RemovePeer(discover.NewNode(
discover.NodeID(info.node.ID),
info.node.IP,

View File

@ -62,7 +62,7 @@ func (s *TopicPoolSuite) TestSyncSwitches() {
s.topicPool.processFoundNode(s.peer, testPeer)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(testPeer.ID))
s.AssertConsumed(s.topicPool.period, s.topicPool.slowSync, time.Second)
s.True(s.topicPool.peers[testPeer.ID].connected)
s.NotNil(s.topicPool.connectedPeers[testPeer.ID])
s.topicPool.ConfirmDropped(s.peer, discover.NodeID(testPeer.ID))
s.AssertConsumed(s.topicPool.period, s.topicPool.fastSync, time.Second)
}
@ -75,15 +75,23 @@ func (s *TopicPoolSuite) TestNewPeerSelectedOnDrop() {
s.topicPool.processFoundNode(s.peer, peer1)
s.topicPool.processFoundNode(s.peer, peer2)
s.topicPool.processFoundNode(s.peer, peer3)
s.Len(s.topicPool.peerPool, 3)
s.Len(s.topicPool.peerPoolQueue, 3)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID))
s.True(s.topicPool.peers[peer1.ID].connected)
s.Contains(s.topicPool.connectedPeers, peer1.ID)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer2.ID))
s.True(s.topicPool.peers[peer2.ID].connected)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer3.ID))
s.False(s.topicPool.peers[peer3.ID].connected)
s.Contains(s.topicPool.connectedPeers, peer2.ID)
s.Len(s.topicPool.peerPool, 1)
s.Len(s.topicPool.peerPoolQueue, 1)
// drop peer1
s.True(s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID)))
s.NotContains(s.topicPool.connectedPeers, peer1.ID)
// add peer from the pool
s.Equal(peer3.ID, s.topicPool.AddPeerFromTable(s.peer).ID)
s.Len(s.topicPool.peerPool, 0)
s.Len(s.topicPool.peerPoolQueue, 0)
}
func (s *TopicPoolSuite) TestRequestedDoesntRemove() {
@ -96,10 +104,33 @@ func (s *TopicPoolSuite) TestRequestedDoesntRemove() {
s.topicPool.processFoundNode(s.peer, peer2)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID))
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer2.ID))
s.False(s.topicPool.peers[peer1.ID].requested)
s.True(s.topicPool.peers[peer2.ID].requested)
s.False(s.topicPool.connectedPeers[peer1.ID].dismissed)
s.True(s.topicPool.peerPool[peer2.ID].dismissed)
s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer2.ID))
s.Contains(s.topicPool.peers, peer2.ID)
s.Contains(s.topicPool.peerPool, peer2.ID)
s.NotContains(s.topicPool.connectedPeers, peer2.ID)
s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID))
s.NotContains(s.topicPool.peers, peer1.ID)
s.NotContains(s.topicPool.peerPool, peer1.ID)
s.NotContains(s.topicPool.connectedPeers, peer1.ID)
}
func (s *TopicPoolSuite) TestTheMostRecentPeerIsSelected() {
s.topicPool.limits = params.Limits{1, 1}
peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311)
peer2 := discv5.NewNode(discv5.NodeID{2}, s.peer.Self().IP, 32311, 32311)
peer3 := discv5.NewNode(discv5.NodeID{3}, s.peer.Self().IP, 32311, 32311)
// after these operations, peer1 is confirmed and peer3 and peer2
// was added to the pool; peer3 is the most recent one
s.topicPool.processFoundNode(s.peer, peer1)
s.topicPool.processFoundNode(s.peer, peer2)
s.topicPool.processFoundNode(s.peer, peer3)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID))
// peer1 has dropped
s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID))
// and peer3 is take from the pool as the most recent
s.True(s.topicPool.peerPool[peer2.ID].discoveredTime < s.topicPool.peerPool[peer3.ID].discoveredTime)
s.Equal(peer3.ID, s.topicPool.AddPeerFromTable(s.peer).ID)
}