From a45b05969a7e1a2f83e3a064e4b6b29d7bb2f04a Mon Sep 17 00:00:00 2001 From: Adam Babik Date: Thu, 19 Apr 2018 17:18:49 +0200 Subject: [PATCH] Add a priority queue to TopicPool (#840) --- geth/peers/peerpool.go | 6 +- geth/peers/topic_peer_queue.go | 41 ++++++ geth/peers/topic_peer_queue_test.go | 79 ++++++++++++ geth/peers/topicpool.go | 187 +++++++++++++++++++++------- geth/peers/topicpool_test.go | 49 ++++++-- 5 files changed, 302 insertions(+), 60 deletions(-) create mode 100644 geth/peers/topic_peer_queue.go create mode 100644 geth/peers/topic_peer_queue_test.go diff --git a/geth/peers/peerpool.go b/geth/peers/peerpool.go index 5826924ec..defdb5259 100644 --- a/geth/peers/peerpool.go +++ b/geth/peers/peerpool.go @@ -53,10 +53,8 @@ func NewPeerPool(config map[discv5.Topic]params.Limits, fastSync, slowSync time. type peerInfo struct { // discoveredTime last time when node was found by v5 discoveredTime mclock.AbsTime - // connected is true if node is added as a static peer - connected bool - // requested is true when our node requested a disconnect - requested bool + // dismissed is true when our node requested a disconnect + dismissed bool node *discv5.Node } diff --git a/geth/peers/topic_peer_queue.go b/geth/peers/topic_peer_queue.go new file mode 100644 index 000000000..3e1da5010 --- /dev/null +++ b/geth/peers/topic_peer_queue.go @@ -0,0 +1,41 @@ +package peers + +import ( + "container/heap" +) + +type peerInfoItem struct { + *peerInfo + index int +} + +type peerPriorityQueue []*peerInfoItem + +var _ heap.Interface = (*peerPriorityQueue)(nil) + +func (q peerPriorityQueue) Len() int { return len(q) } + +func (q peerPriorityQueue) Less(i, j int) bool { + return q[i].discoveredTime > q[j].discoveredTime +} + +func (q peerPriorityQueue) Swap(i, j int) { + q[i], q[j] = q[j], q[i] + q[i].index = i + q[j].index = j +} + +func (q *peerPriorityQueue) Push(x interface{}) { + item := x.(*peerInfoItem) + item.index = len(*q) + *q = append(*q, item) +} + +func (q *peerPriorityQueue) Pop() interface{} { + old := *q + n := len(old) + item := old[n-1] + item.index = -1 + *q = old[0 : n-1] + return item +} diff --git a/geth/peers/topic_peer_queue_test.go b/geth/peers/topic_peer_queue_test.go new file mode 100644 index 000000000..4fb6ab705 --- /dev/null +++ b/geth/peers/topic_peer_queue_test.go @@ -0,0 +1,79 @@ +package peers + +import ( + "container/heap" + "math/rand" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ethereum/go-ethereum/common/mclock" +) + +func TestPeerPriorityQueueSorting(t *testing.T) { + count := 5 + discTimes := make([]mclock.AbsTime, count) + + // generate a slice of monotonic times + for i := 0; i < count; i++ { + discTimes[i] = mclock.Now() + } + + // shuffle discTimes + for i := range discTimes { + j := rand.Intn(i + 1) + discTimes[i], discTimes[j] = discTimes[j], discTimes[i] + } + + // make a priority queue + q := make(peerPriorityQueue, count) + for i := 0; i < count; i++ { + q[i] = &peerInfoItem{ + peerInfo: &peerInfo{ + discoveredTime: discTimes[i], + }, + } + } + heap.Init(&q) + + // verify that the slice is sorted ascending by `discoveredTime` + var item *peerInfoItem + for q.Len() > 0 { + newItem := heap.Pop(&q).(*peerInfoItem) + if item != nil { + require.True(t, item.discoveredTime > newItem.discoveredTime) + } + item = newItem + } +} + +func TestPeerPriorityQueueIndexUpdating(t *testing.T) { + q := make(peerPriorityQueue, 0) + heap.Init(&q) + + item1 := &peerInfoItem{ + index: -1, + peerInfo: &peerInfo{ + discoveredTime: mclock.Now(), + }, + } + item2 := &peerInfoItem{ + index: -1, + peerInfo: &peerInfo{ + discoveredTime: mclock.Now(), + }, + } + + // insert older item first + heap.Push(&q, item1) + require.Equal(t, item1.index, 0) + heap.Push(&q, item2) + require.Equal(t, item2.index, 0) + require.Equal(t, item1.index, 1) + + // pop operation should reset index + poppedItem := heap.Pop(&q) + require.Equal(t, item2, poppedItem) + require.Equal(t, item2.index, -1) + require.Equal(t, item1.index, 0) +} diff --git a/geth/peers/topicpool.go b/geth/peers/topicpool.go index 2c4ede8a4..438ec8601 100644 --- a/geth/peers/topicpool.go +++ b/geth/peers/topicpool.go @@ -1,6 +1,7 @@ package peers import ( + "container/heap" "sync" "sync/atomic" "time" @@ -15,13 +16,19 @@ import ( // NewTopicPool returns instance of TopicPool func NewTopicPool(topic discv5.Topic, limits params.Limits, slowSync, fastSync time.Duration) *TopicPool { - return &TopicPool{ - topic: topic, - limits: limits, - slowSync: slowSync, - fastSync: fastSync, - peers: map[discv5.NodeID]*peerInfo{}, + pool := TopicPool{ + topic: topic, + limits: limits, + slowSync: slowSync, + fastSync: fastSync, + peerPool: make(map[discv5.NodeID]*peerInfoItem), + peerPoolQueue: make(peerPriorityQueue, 0), + connectedPeers: make(map[discv5.NodeID]*peerInfo), } + + heap.Init(&pool.peerPoolQueue) + + return &pool } // TopicPool manages peers for topic. @@ -37,13 +44,60 @@ type TopicPool struct { mu sync.RWMutex discWG sync.WaitGroup consumerWG sync.WaitGroup - connected int - peers map[discv5.NodeID]*peerInfo period chan time.Duration + peerPool map[discv5.NodeID]*peerInfoItem // found but not connected peers + peerPoolQueue peerPriorityQueue // priority queue to find the most recent peer + connectedPeers map[discv5.NodeID]*peerInfo // currently connected peers + cache *Cache } +func (t *TopicPool) addToPeerPool(peer *peerInfo) { + if _, ok := t.peerPool[peer.node.ID]; ok { + return + } + item := &peerInfoItem{peerInfo: peer} + t.peerPool[peer.node.ID] = item + heap.Push(&t.peerPoolQueue, item) +} + +func (t *TopicPool) removeFromPeerPool(nodeID discv5.NodeID) { + peer, ok := t.peerPool[nodeID] + if !ok { + return + } + delete(t.peerPool, nodeID) + heap.Remove(&t.peerPoolQueue, peer.index) +} + +func (t *TopicPool) popFromPeerPool() *peerInfo { + if t.peerPoolQueue.Len() == 0 { + return nil + } + item := heap.Pop(&t.peerPoolQueue).(*peerInfoItem) + delete(t.peerPool, item.node.ID) + return item.peerInfo +} + +func (t *TopicPool) updatePeerInPool(nodeID discv5.NodeID, time mclock.AbsTime) { + peer, ok := t.peerPool[nodeID] + if !ok { + return + } + peer.discoveredTime = mclock.Now() + heap.Fix(&t.peerPoolQueue, peer.index) +} + +func (t *TopicPool) movePeerFromPoolToConnected(nodeID discv5.NodeID) { + peer, ok := t.peerPool[nodeID] + if !ok { + return + } + t.removeFromPeerPool(nodeID) + t.connectedPeers[nodeID] = peer.peerInfo +} + // SearchRunning returns true if search is running func (t *TopicPool) SearchRunning() bool { return atomic.LoadInt32(&t.running) == 1 @@ -53,14 +107,14 @@ func (t *TopicPool) SearchRunning() bool { func (t *TopicPool) MaxReached() bool { t.mu.RLock() defer t.mu.RUnlock() - return t.connected == t.limits[1] + return len(t.connectedPeers) == t.limits[1] } // BelowMin returns true if current number of peers is below min limit. func (t *TopicPool) BelowMin() bool { t.mu.RLock() defer t.mu.RUnlock() - return t.connected < t.limits[0] + return len(t.connectedPeers) < t.limits[0] } // ConfirmAdded called when peer was added by p2p Server. @@ -73,31 +127,38 @@ func (t *TopicPool) BelowMin() bool { func (t *TopicPool) ConfirmAdded(server *p2p.Server, nodeID discover.NodeID) { t.mu.Lock() defer t.mu.Unlock() + + discV5NodeID := discv5.NodeID(nodeID) + // inbound connection - peer, exist := t.peers[discv5.NodeID(nodeID)] + peer, exist := t.peerPool[discV5NodeID] if !exist { return } - // established connection means that the node is a viable candidate for a connection and can be cached + + // established connection means that the node + // is a viable candidate for a connection and can be cached if t.cache != nil { if err := t.cache.AddPeer(peer.node, t.topic); err != nil { log.Error("failed to persist a peer", "error", err) } } - // when max limit is reached drop every peer after - if t.connected == t.limits[1] { + + // if the upper limit is already reached, drop this peer + if len(t.connectedPeers) == t.limits[1] { log.Debug("max limit is reached drop the peer", "ID", nodeID, "topic", t.topic) - peer.requested = true - t.removePeer(server, peer) + peer.dismissed = true + t.removeServerPeer(server, peer.peerInfo) return } - // don't count same peer twice - if !peer.connected { - log.Debug("marking as connected", "ID", nodeID) - peer.connected = true - t.connected++ - } - if t.SearchRunning() && t.connected == t.limits[0] { + + // move peer from pool to connected peers + t.movePeerFromPoolToConnected(discV5NodeID) + // make sure `dismissed` is restarted + peer.dismissed = false + + // when the lower limit is reached, we can switch to slow mode + if t.SearchRunning() && len(t.connectedPeers) == t.limits[0] { t.period <- t.slowSync } } @@ -112,26 +173,44 @@ func (t *TopicPool) ConfirmAdded(server *p2p.Server, nodeID discover.NodeID) { func (t *TopicPool) ConfirmDropped(server *p2p.Server, nodeID discover.NodeID) bool { t.mu.Lock() defer t.mu.Unlock() + + discV5NodeID := discv5.NodeID(nodeID) + // either inbound or connected from another topic - peer, exist := t.peers[discv5.NodeID(nodeID)] + peer, exist := t.connectedPeers[discV5NodeID] if !exist { return false } - log.Debug("disconnect", "ID", nodeID) - if peer.requested { + + log.Debug("disconnect", "ID", nodeID, "dismissed", peer.dismissed) + + // Peer was removed by us because exceeded the limit. + // Add it back to the pool as it can be useful in the future. + if peer.dismissed { + t.addToPeerPool(peer) return false } - if t.SearchRunning() && t.connected == t.limits[0] { + + // switch to fast mode as the number of connected peers is about to drop + // below the lower limit + if t.SearchRunning() && len(t.connectedPeers) == t.limits[0] { t.period <- t.fastSync } - t.connected-- - t.removePeer(server, peer) - delete(t.peers, discv5.NodeID(nodeID)) + + // If there was a network error, this event will be received + // but the peer won't be removed from the static nodes set. + // That's why we need to call `removeServerPeer` manually. + t.removeServerPeer(server, peer) + + delete(t.connectedPeers, discV5NodeID) + + // remove from cache only if the peer dropped by itself if t.cache != nil { - if err := t.cache.RemovePeer(discv5.NodeID(nodeID), t.topic); err != nil { + if err := t.cache.RemovePeer(discV5NodeID, t.topic); err != nil { log.Error("failed to remove peer from cache", "error", err) } } + return true } @@ -139,13 +218,17 @@ func (t *TopicPool) ConfirmDropped(server *p2p.Server, nodeID discover.NodeID) b func (t *TopicPool) AddPeerFromTable(server *p2p.Server) *discv5.Node { t.mu.RLock() defer t.mu.RUnlock() - // TODO use a heap queue and always get a peer that was discovered recently - for _, peer := range t.peers { - if !peer.connected && mclock.Now() < peer.discoveredTime+mclock.AbsTime(expirationPeriod) { - t.addPeer(server, peer) - return peer.node - } + + // The most recently added peer is removed from the queue. + // If it did not expire yet, it will be added to the server. + // TODO(adam): investigate if it's worth to keep the peer in the queue + // until the server confirms it is added and in the meanwhile only adjust its priority. + peer := t.popFromPeerPool() + if peer != nil && mclock.Now() < peer.discoveredTime+mclock.AbsTime(expirationPeriod) { + t.addServerPeer(server, peer) + return peer.node } + return nil } @@ -184,7 +267,7 @@ func (t *TopicPool) StartSearch(server *p2p.Server) error { } func (t *TopicPool) handleFoundPeers(server *p2p.Server, found <-chan *discv5.Node, lookup <-chan bool) { - if t.connected >= t.limits[0] { + if len(t.connectedPeers) >= t.limits[0] { t.period <- t.slowSync } else { t.period <- t.fastSync @@ -211,21 +294,31 @@ func (t *TopicPool) handleFoundPeers(server *p2p.Server, found <-chan *discv5.No func (t *TopicPool) processFoundNode(server *p2p.Server, node *discv5.Node) { t.mu.Lock() defer t.mu.Unlock() - if info, exist := t.peers[node.ID]; exist { - info.discoveredTime = mclock.Now() + + log.Debug("peer found", "ID", node.ID, "topic", t.topic) + + // peer is already connected so update only discoveredTime + if peer, ok := t.connectedPeers[node.ID]; ok { + peer.discoveredTime = mclock.Now() + return + } + + if _, ok := t.peerPool[node.ID]; ok { + t.updatePeerInPool(node.ID, mclock.Now()) } else { - t.peers[node.ID] = &peerInfo{ + t.addToPeerPool(&peerInfo{ discoveredTime: mclock.Now(), node: node, - } + }) } - if t.connected < t.limits[1] && !t.peers[node.ID].connected { - log.Debug("peer found", "ID", node.ID, "topic", t.topic) - t.addPeer(server, t.peers[node.ID]) + + // the upper limit is not reached, so let's add this peer + if len(t.connectedPeers) < t.limits[1] { + t.addServerPeer(server, t.peerPool[node.ID].peerInfo) } } -func (t *TopicPool) addPeer(server *p2p.Server, info *peerInfo) { +func (t *TopicPool) addServerPeer(server *p2p.Server, info *peerInfo) { server.AddPeer(discover.NewNode( discover.NodeID(info.node.ID), info.node.IP, @@ -234,7 +327,7 @@ func (t *TopicPool) addPeer(server *p2p.Server, info *peerInfo) { )) } -func (t *TopicPool) removePeer(server *p2p.Server, info *peerInfo) { +func (t *TopicPool) removeServerPeer(server *p2p.Server, info *peerInfo) { server.RemovePeer(discover.NewNode( discover.NodeID(info.node.ID), info.node.IP, diff --git a/geth/peers/topicpool_test.go b/geth/peers/topicpool_test.go index 03fa4e5d5..2d1d8f844 100644 --- a/geth/peers/topicpool_test.go +++ b/geth/peers/topicpool_test.go @@ -62,7 +62,7 @@ func (s *TopicPoolSuite) TestSyncSwitches() { s.topicPool.processFoundNode(s.peer, testPeer) s.topicPool.ConfirmAdded(s.peer, discover.NodeID(testPeer.ID)) s.AssertConsumed(s.topicPool.period, s.topicPool.slowSync, time.Second) - s.True(s.topicPool.peers[testPeer.ID].connected) + s.NotNil(s.topicPool.connectedPeers[testPeer.ID]) s.topicPool.ConfirmDropped(s.peer, discover.NodeID(testPeer.ID)) s.AssertConsumed(s.topicPool.period, s.topicPool.fastSync, time.Second) } @@ -75,15 +75,23 @@ func (s *TopicPoolSuite) TestNewPeerSelectedOnDrop() { s.topicPool.processFoundNode(s.peer, peer1) s.topicPool.processFoundNode(s.peer, peer2) s.topicPool.processFoundNode(s.peer, peer3) + s.Len(s.topicPool.peerPool, 3) + s.Len(s.topicPool.peerPoolQueue, 3) s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID)) - s.True(s.topicPool.peers[peer1.ID].connected) + s.Contains(s.topicPool.connectedPeers, peer1.ID) s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer2.ID)) - s.True(s.topicPool.peers[peer2.ID].connected) - s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer3.ID)) - s.False(s.topicPool.peers[peer3.ID].connected) + s.Contains(s.topicPool.connectedPeers, peer2.ID) + s.Len(s.topicPool.peerPool, 1) + s.Len(s.topicPool.peerPoolQueue, 1) + // drop peer1 s.True(s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID))) + s.NotContains(s.topicPool.connectedPeers, peer1.ID) + + // add peer from the pool s.Equal(peer3.ID, s.topicPool.AddPeerFromTable(s.peer).ID) + s.Len(s.topicPool.peerPool, 0) + s.Len(s.topicPool.peerPoolQueue, 0) } func (s *TopicPoolSuite) TestRequestedDoesntRemove() { @@ -96,10 +104,33 @@ func (s *TopicPoolSuite) TestRequestedDoesntRemove() { s.topicPool.processFoundNode(s.peer, peer2) s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID)) s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer2.ID)) - s.False(s.topicPool.peers[peer1.ID].requested) - s.True(s.topicPool.peers[peer2.ID].requested) + s.False(s.topicPool.connectedPeers[peer1.ID].dismissed) + s.True(s.topicPool.peerPool[peer2.ID].dismissed) s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer2.ID)) - s.Contains(s.topicPool.peers, peer2.ID) + s.Contains(s.topicPool.peerPool, peer2.ID) + s.NotContains(s.topicPool.connectedPeers, peer2.ID) s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID)) - s.NotContains(s.topicPool.peers, peer1.ID) + s.NotContains(s.topicPool.peerPool, peer1.ID) + s.NotContains(s.topicPool.connectedPeers, peer1.ID) +} + +func (s *TopicPoolSuite) TestTheMostRecentPeerIsSelected() { + s.topicPool.limits = params.Limits{1, 1} + + peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311) + peer2 := discv5.NewNode(discv5.NodeID{2}, s.peer.Self().IP, 32311, 32311) + peer3 := discv5.NewNode(discv5.NodeID{3}, s.peer.Self().IP, 32311, 32311) + + // after these operations, peer1 is confirmed and peer3 and peer2 + // was added to the pool; peer3 is the most recent one + s.topicPool.processFoundNode(s.peer, peer1) + s.topicPool.processFoundNode(s.peer, peer2) + s.topicPool.processFoundNode(s.peer, peer3) + s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID)) + + // peer1 has dropped + s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID)) + // and peer3 is take from the pool as the most recent + s.True(s.topicPool.peerPool[peer2.ID].discoveredTime < s.topicPool.peerPool[peer3.ID].discoveredTime) + s.Equal(peer3.ID, s.topicPool.AddPeerFromTable(s.peer).ID) }