602 lines
17 KiB
Go
602 lines
17 KiB
Go
package peers
|
|
|
|
import (
|
|
"container/heap"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/ethereum/go-ethereum/p2p"
|
|
"github.com/ethereum/go-ethereum/p2p/discv5"
|
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
|
|
|
"github.com/status-im/status-go/common"
|
|
"github.com/status-im/status-go/discovery"
|
|
"github.com/status-im/status-go/logutils"
|
|
"github.com/status-im/status-go/params"
|
|
)
|
|
|
|
const (
|
|
// notQueuedIndex used to define that item is not queued in the heap queue.
|
|
notQueuedIndex = -1
|
|
)
|
|
|
|
// maxCachedPeersMultiplier peers max limit will be multiplied by this number
|
|
// to get the maximum number of cached peers allowed.
|
|
var maxCachedPeersMultiplier = 1
|
|
|
|
// maxPendingPeersMultiplier peers max limit will be multiplied by this number
|
|
// to get the maximum number of pending peers allowed.
|
|
var maxPendingPeersMultiplier = 2
|
|
|
|
// TopicPoolInterface the TopicPool interface.
|
|
type TopicPoolInterface interface {
|
|
StopSearch(server *p2p.Server)
|
|
BelowMin() bool
|
|
SearchRunning() bool
|
|
StartSearch(server *p2p.Server) error
|
|
ConfirmDropped(server *p2p.Server, nodeID enode.ID) bool
|
|
AddPeerFromTable(server *p2p.Server) *discv5.Node
|
|
MaxReached() bool
|
|
ConfirmAdded(server *p2p.Server, nodeID enode.ID)
|
|
isStopped() bool
|
|
Topic() discv5.Topic
|
|
SetLimits(limits params.Limits)
|
|
setStopSearchTimeout(delay time.Duration)
|
|
readyToStopSearch() bool
|
|
}
|
|
|
|
type Clock interface {
|
|
Now() time.Time
|
|
}
|
|
|
|
type realClock struct{}
|
|
|
|
func (realClock) Now() time.Time { return time.Now() }
|
|
|
|
// newTopicPool returns instance of TopicPool.
|
|
func newTopicPool(discovery discovery.Discovery, topic discv5.Topic, limits params.Limits, slowMode, fastMode time.Duration, cache *Cache) *TopicPool {
|
|
pool := TopicPool{
|
|
discovery: discovery,
|
|
topic: topic,
|
|
limits: limits,
|
|
fastMode: fastMode,
|
|
slowMode: slowMode,
|
|
fastModeTimeout: DefaultTopicFastModeTimeout,
|
|
pendingPeers: make(map[enode.ID]*peerInfoItem),
|
|
discoveredPeersQueue: make(peerPriorityQueue, 0),
|
|
discoveredPeers: make(map[enode.ID]bool),
|
|
connectedPeers: make(map[enode.ID]*peerInfo),
|
|
cache: cache,
|
|
maxCachedPeers: limits.Max * maxCachedPeersMultiplier,
|
|
maxPendingPeers: limits.Max * maxPendingPeersMultiplier,
|
|
clock: realClock{},
|
|
}
|
|
heap.Init(&pool.discoveredPeersQueue)
|
|
|
|
return &pool
|
|
}
|
|
|
|
// TopicPool manages peers for topic.
|
|
type TopicPool struct {
|
|
discovery discovery.Discovery
|
|
|
|
// configuration
|
|
topic discv5.Topic
|
|
limits params.Limits
|
|
fastMode time.Duration
|
|
slowMode time.Duration
|
|
fastModeTimeout time.Duration
|
|
|
|
mu sync.RWMutex
|
|
discWG sync.WaitGroup
|
|
poolWG sync.WaitGroup
|
|
quit chan struct{}
|
|
|
|
running int32
|
|
|
|
currentMode time.Duration
|
|
period chan time.Duration
|
|
fastModeTimeoutCancel chan struct{}
|
|
|
|
pendingPeers map[enode.ID]*peerInfoItem // contains found and requested to be connected peers but not confirmed
|
|
discoveredPeersQueue peerPriorityQueue // priority queue to find the most recently discovered peers; does not containt peers requested to connect
|
|
discoveredPeers map[enode.ID]bool // remembers which peers have already been discovered and are enqueued
|
|
connectedPeers map[enode.ID]*peerInfo // currently connected peers
|
|
|
|
stopSearchTimeout *time.Time
|
|
|
|
maxPendingPeers int
|
|
maxCachedPeers int
|
|
cache *Cache
|
|
|
|
clock Clock
|
|
}
|
|
|
|
func (t *TopicPool) addToPendingPeers(peer *peerInfo) {
|
|
if _, ok := t.pendingPeers[peer.NodeID()]; ok {
|
|
return
|
|
}
|
|
t.pendingPeers[peer.NodeID()] = &peerInfoItem{
|
|
peerInfo: peer,
|
|
index: notQueuedIndex,
|
|
}
|
|
|
|
// maxPendingPeers = 0 means no limits.
|
|
if t.maxPendingPeers == 0 || t.maxPendingPeers >= len(t.pendingPeers) {
|
|
return
|
|
}
|
|
|
|
var oldestPeer *peerInfo
|
|
for _, i := range t.pendingPeers {
|
|
if oldestPeer != nil && oldestPeer.discoveredTime.Before(i.peerInfo.discoveredTime) {
|
|
continue
|
|
}
|
|
|
|
oldestPeer = i.peerInfo
|
|
}
|
|
|
|
t.removeFromPendingPeers(oldestPeer.NodeID())
|
|
}
|
|
|
|
// addToQueue adds the passed peer to the queue if it is already pending.
|
|
func (t *TopicPool) addToQueue(peer *peerInfo) {
|
|
if p, ok := t.pendingPeers[peer.NodeID()]; ok {
|
|
if _, ok := t.discoveredPeers[peer.NodeID()]; ok {
|
|
return
|
|
}
|
|
|
|
heap.Push(&t.discoveredPeersQueue, p)
|
|
t.discoveredPeers[peer.NodeID()] = true
|
|
}
|
|
}
|
|
|
|
func (t *TopicPool) popFromQueue() *peerInfo {
|
|
if t.discoveredPeersQueue.Len() == 0 {
|
|
return nil
|
|
}
|
|
item := heap.Pop(&t.discoveredPeersQueue).(*peerInfoItem)
|
|
item.index = notQueuedIndex
|
|
delete(t.discoveredPeers, item.peerInfo.NodeID())
|
|
return item.peerInfo
|
|
}
|
|
|
|
func (t *TopicPool) removeFromPendingPeers(nodeID enode.ID) {
|
|
peer, ok := t.pendingPeers[nodeID]
|
|
if !ok {
|
|
return
|
|
}
|
|
delete(t.pendingPeers, nodeID)
|
|
if peer.index != notQueuedIndex {
|
|
heap.Remove(&t.discoveredPeersQueue, peer.index)
|
|
delete(t.discoveredPeers, nodeID)
|
|
}
|
|
}
|
|
|
|
func (t *TopicPool) updatePendingPeer(nodeID enode.ID) {
|
|
peer, ok := t.pendingPeers[nodeID]
|
|
if !ok {
|
|
return
|
|
}
|
|
peer.discoveredTime = t.clock.Now()
|
|
if peer.index != notQueuedIndex {
|
|
heap.Fix(&t.discoveredPeersQueue, peer.index)
|
|
}
|
|
}
|
|
|
|
func (t *TopicPool) movePeerFromPoolToConnected(nodeID enode.ID) {
|
|
peer, ok := t.pendingPeers[nodeID]
|
|
if !ok {
|
|
return
|
|
}
|
|
t.removeFromPendingPeers(nodeID)
|
|
t.connectedPeers[nodeID] = peer.peerInfo
|
|
}
|
|
|
|
// SearchRunning returns true if search is running
|
|
func (t *TopicPool) SearchRunning() bool {
|
|
return atomic.LoadInt32(&t.running) == 1
|
|
}
|
|
|
|
// MaxReached returns true if we connected with max number of peers.
|
|
func (t *TopicPool) MaxReached() bool {
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
return len(t.connectedPeers) == t.limits.Max
|
|
}
|
|
|
|
// BelowMin returns true if current number of peers is below min limit.
|
|
func (t *TopicPool) BelowMin() bool {
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
return len(t.connectedPeers) < t.limits.Min
|
|
}
|
|
|
|
// maxCachedPeersReached returns true if max number of cached peers is reached.
|
|
func (t *TopicPool) maxCachedPeersReached() bool {
|
|
if t.maxCachedPeers == 0 {
|
|
return true
|
|
}
|
|
peers := t.cache.GetPeersRange(t.topic, t.maxCachedPeers)
|
|
|
|
return len(peers) >= t.maxCachedPeers
|
|
}
|
|
|
|
// setStopSearchTimeout sets the timeout to stop current topic search if it's not
|
|
// been stopped before.
|
|
func (t *TopicPool) setStopSearchTimeout(delay time.Duration) {
|
|
if t.stopSearchTimeout != nil {
|
|
return
|
|
}
|
|
now := t.clock.Now().Add(delay)
|
|
t.stopSearchTimeout = &now
|
|
}
|
|
|
|
// isStopSearchDelayExpired returns true if the timeout to stop current topic
|
|
// search has been accomplished.
|
|
func (t *TopicPool) isStopSearchDelayExpired() bool {
|
|
if t.stopSearchTimeout == nil {
|
|
return false
|
|
}
|
|
return t.stopSearchTimeout.Before(t.clock.Now())
|
|
}
|
|
|
|
// readyToStopSearch return true if all conditions to stop search are ok.
|
|
func (t *TopicPool) readyToStopSearch() bool {
|
|
return t.isStopSearchDelayExpired() || t.maxCachedPeersReached()
|
|
}
|
|
|
|
// updateSyncMode changes the sync mode depending on the current number
|
|
// of connected peers and limits.
|
|
func (t *TopicPool) updateSyncMode() {
|
|
newMode := t.slowMode
|
|
if len(t.connectedPeers) < t.limits.Min {
|
|
newMode = t.fastMode
|
|
}
|
|
t.setSyncMode(newMode)
|
|
}
|
|
|
|
func (t *TopicPool) setSyncMode(mode time.Duration) {
|
|
if mode == t.currentMode {
|
|
return
|
|
}
|
|
|
|
t.period <- mode
|
|
t.currentMode = mode
|
|
|
|
// if selected mode is fast mode and fast mode timeout was not set yet,
|
|
// do it now
|
|
if mode == t.fastMode && t.fastModeTimeoutCancel == nil {
|
|
t.fastModeTimeoutCancel = t.limitFastMode(t.fastModeTimeout)
|
|
}
|
|
// remove fast mode timeout as slow mode is selected now
|
|
if mode == t.slowMode && t.fastModeTimeoutCancel != nil {
|
|
close(t.fastModeTimeoutCancel)
|
|
t.fastModeTimeoutCancel = nil
|
|
}
|
|
}
|
|
|
|
func (t *TopicPool) limitFastMode(timeout time.Duration) chan struct{} {
|
|
if timeout == 0 {
|
|
return nil
|
|
}
|
|
|
|
cancel := make(chan struct{})
|
|
|
|
t.poolWG.Add(1)
|
|
go func() {
|
|
defer common.LogOnPanic()
|
|
defer t.poolWG.Done()
|
|
|
|
select {
|
|
case <-time.After(timeout):
|
|
t.mu.Lock()
|
|
t.setSyncMode(t.slowMode)
|
|
t.mu.Unlock()
|
|
case <-cancel:
|
|
return
|
|
}
|
|
}()
|
|
|
|
return cancel
|
|
}
|
|
|
|
// ConfirmAdded called when peer was added by p2p Server.
|
|
// 1. Skip a peer if it not in our peer table
|
|
// 2. Add a peer to a cache.
|
|
// 3. Disconnect a peer if it was connected after we reached max limit of peers.
|
|
// (we can't know in advance if peer will be connected, thats why we allow
|
|
// to overflow for short duration)
|
|
// 4. Switch search to slow mode if it is running.
|
|
func (t *TopicPool) ConfirmAdded(server *p2p.Server, nodeID enode.ID) {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
peerInfoItem, ok := t.pendingPeers[nodeID]
|
|
inbound := !ok || !peerInfoItem.added
|
|
|
|
logutils.ZapLogger().Debug("peer added event", zap.Stringer("peer", nodeID), zap.Bool("inbound", inbound))
|
|
|
|
if inbound {
|
|
return
|
|
}
|
|
|
|
peer := peerInfoItem.peerInfo // get explicit reference
|
|
|
|
// established connection means that the node
|
|
// is a viable candidate for a connection and can be cached
|
|
if err := t.cache.AddPeer(peer.node, t.topic); err != nil {
|
|
logutils.ZapLogger().Error("failed to persist a peer", zap.Error(err))
|
|
}
|
|
|
|
t.movePeerFromPoolToConnected(nodeID)
|
|
// if the upper limit is already reached, drop this peer
|
|
if len(t.connectedPeers) > t.limits.Max {
|
|
logutils.ZapLogger().Debug("max limit is reached drop the peer", zap.Stringer("ID", nodeID), zap.String("topic", string(t.topic)))
|
|
peer.dismissed = true
|
|
t.removeServerPeer(server, peer)
|
|
return
|
|
}
|
|
|
|
// make sure `dismissed` is reset
|
|
peer.dismissed = false
|
|
|
|
// A peer was added so check if we can switch to slow mode.
|
|
if t.SearchRunning() {
|
|
t.updateSyncMode()
|
|
}
|
|
}
|
|
|
|
// ConfirmDropped called when server receives drop event.
|
|
// 1. Skip peer if it is not in our peer table.
|
|
// 2. If disconnect request - we could drop that peer ourselves.
|
|
// 3. If connected number will drop below min limit - switch to fast mode.
|
|
// 4. Delete a peer from cache and peer table.
|
|
// Returns false if peer is not in our table or we requested removal of this peer.
|
|
// Otherwise peer is removed and true is returned.
|
|
func (t *TopicPool) ConfirmDropped(server *p2p.Server, nodeID enode.ID) bool {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
// either inbound or connected from another topic
|
|
peer, exist := t.connectedPeers[nodeID]
|
|
if !exist {
|
|
return false
|
|
}
|
|
|
|
logutils.ZapLogger().Debug("disconnect", zap.Stringer("ID", nodeID), zap.Bool("dismissed", peer.dismissed))
|
|
|
|
delete(t.connectedPeers, nodeID)
|
|
// Peer was removed by us because exceeded the limit.
|
|
// Add it back to the pool as it can be useful in the future.
|
|
if peer.dismissed {
|
|
t.addToPendingPeers(peer)
|
|
// use queue for peers that weren't added to p2p server
|
|
t.addToQueue(peer)
|
|
return false
|
|
}
|
|
|
|
// If there was a network error, this event will be received
|
|
// but the peer won't be removed from the static nodes set.
|
|
// That's why we need to call `removeServerPeer` manually.
|
|
t.removeServerPeer(server, peer)
|
|
|
|
if err := t.cache.RemovePeer(nodeID, t.topic); err != nil {
|
|
logutils.ZapLogger().Error("failed to remove peer from cache", zap.Error(err))
|
|
}
|
|
|
|
// As we removed a peer, update a sync strategy if needed.
|
|
if t.SearchRunning() {
|
|
t.updateSyncMode()
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// AddPeerFromTable checks if there is a valid peer in local table and adds it to a server.
|
|
func (t *TopicPool) AddPeerFromTable(server *p2p.Server) *discv5.Node {
|
|
t.mu.RLock()
|
|
defer t.mu.RUnlock()
|
|
|
|
// The most recently added peer is removed from the queue.
|
|
// If it did not expire yet, it will be added to the server.
|
|
// TODO(adam): investigate if it's worth to keep the peer in the queue
|
|
// until the server confirms it is added and in the meanwhile only adjust its priority.
|
|
peer := t.popFromQueue()
|
|
if peer != nil && t.clock.Now().Before(peer.discoveredTime.Add(expirationPeriod)) {
|
|
t.addServerPeer(server, peer)
|
|
return peer.node
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// StartSearch creates discv5 queries and runs a loop to consume found peers.
|
|
func (t *TopicPool) StartSearch(server *p2p.Server) error {
|
|
if atomic.LoadInt32(&t.running) == 1 {
|
|
return nil
|
|
}
|
|
if !t.discovery.Running() {
|
|
return ErrDiscv5NotRunning
|
|
}
|
|
atomic.StoreInt32(&t.running, 1)
|
|
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
t.quit = make(chan struct{})
|
|
t.stopSearchTimeout = nil
|
|
|
|
// `period` is used to notify about the current sync mode.
|
|
t.period = make(chan time.Duration, 2)
|
|
// use fast sync mode at the beginning
|
|
t.setSyncMode(t.fastMode)
|
|
|
|
// peers management
|
|
found := make(chan *discv5.Node, 5) // 5 reasonable number for concurrently found nodes
|
|
lookup := make(chan bool, 10) // sufficiently buffered channel, just prevents blocking because of lookup
|
|
|
|
for _, peer := range t.cache.GetPeersRange(t.topic, 5) {
|
|
logutils.ZapLogger().Debug("adding a peer from cache", zap.Stringer("peer", peer))
|
|
found <- peer
|
|
}
|
|
|
|
t.discWG.Add(1)
|
|
go func() {
|
|
defer common.LogOnPanic()
|
|
if err := t.discovery.Discover(string(t.topic), t.period, found, lookup); err != nil {
|
|
logutils.ZapLogger().Error("error searching foro", zap.String("topic", string(t.topic)), zap.Error(err))
|
|
}
|
|
t.discWG.Done()
|
|
}()
|
|
t.poolWG.Add(1)
|
|
go func() {
|
|
defer common.LogOnPanic()
|
|
t.handleFoundPeers(server, found, lookup)
|
|
t.poolWG.Done()
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *TopicPool) handleFoundPeers(server *p2p.Server, found <-chan *discv5.Node, lookup <-chan bool) {
|
|
selfID := discv5.PubkeyID(server.Self().Pubkey())
|
|
for {
|
|
select {
|
|
case <-t.quit:
|
|
return
|
|
case <-lookup:
|
|
case node := <-found:
|
|
if node.ID == selfID {
|
|
continue
|
|
}
|
|
if err := t.processFoundNode(server, node); err != nil {
|
|
logutils.ZapLogger().Error("failed to process found node", zap.Stringer("node", node), zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// processFoundNode called when node is discovered by kademlia search query
|
|
// 2 important conditions
|
|
// 1. every time when node is processed we need to update discoveredTime.
|
|
// peer will be considered as valid later only if it was discovered < 60m ago
|
|
// 2. if peer is connected or if max limit is reached we are not a adding peer to p2p server
|
|
func (t *TopicPool) processFoundNode(server *p2p.Server, node *discv5.Node) error {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
pk, err := node.ID.Pubkey()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
nodeID := enode.PubkeyToIDV4(pk)
|
|
|
|
logutils.ZapLogger().Debug("peer found", zap.Stringer("ID", nodeID), zap.String("topic", string(t.topic)))
|
|
|
|
// peer is already connected so update only discoveredTime
|
|
if peer, ok := t.connectedPeers[nodeID]; ok {
|
|
peer.discoveredTime = t.clock.Now()
|
|
return nil
|
|
}
|
|
|
|
if _, ok := t.pendingPeers[nodeID]; ok {
|
|
t.updatePendingPeer(nodeID)
|
|
} else {
|
|
t.addToPendingPeers(&peerInfo{
|
|
discoveredTime: t.clock.Now(),
|
|
node: node,
|
|
publicKey: pk,
|
|
})
|
|
}
|
|
logutils.ZapLogger().Debug(
|
|
"adding peer to a server", zap.Stringer("peer", node.ID),
|
|
zap.Int("connected", len(t.connectedPeers)), zap.Int("max", t.maxCachedPeers))
|
|
|
|
// This can happen when the monotonic clock is not precise enough and
|
|
// multiple peers gets added at the same clock time, resulting in all
|
|
// of them having the same discoveredTime.
|
|
// At which point a random peer will be removed, sometimes being the
|
|
// peer we just added.
|
|
// We could make sure that the latest added peer is not removed,
|
|
// but this is simpler, and peers will be fresh enough as resolution
|
|
// should be quite high (ms at least).
|
|
// This has been reported on windows builds
|
|
// only https://github.com/status-im/nim-status-client/issues/522
|
|
if t.pendingPeers[nodeID] == nil {
|
|
logutils.ZapLogger().Debug("peer added has just been removed", zap.Stringer("peer", nodeID))
|
|
return nil
|
|
}
|
|
|
|
// the upper limit is not reached, so let's add this peer
|
|
if len(t.connectedPeers) < t.maxCachedPeers {
|
|
t.addServerPeer(server, t.pendingPeers[nodeID].peerInfo)
|
|
} else {
|
|
t.addToQueue(t.pendingPeers[nodeID].peerInfo)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *TopicPool) addServerPeer(server *p2p.Server, info *peerInfo) {
|
|
info.added = true
|
|
n := enode.NewV4(info.publicKey, info.node.IP, int(info.node.TCP), int(info.node.UDP))
|
|
server.AddPeer(n)
|
|
}
|
|
|
|
func (t *TopicPool) removeServerPeer(server *p2p.Server, info *peerInfo) {
|
|
info.added = false
|
|
n := enode.NewV4(info.publicKey, info.node.IP, int(info.node.TCP), int(info.node.UDP))
|
|
server.RemovePeer(n)
|
|
}
|
|
|
|
func (t *TopicPool) isStopped() bool {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
return t.currentMode == 0
|
|
}
|
|
|
|
// StopSearch stops the closes stop
|
|
func (t *TopicPool) StopSearch(server *p2p.Server) {
|
|
if !atomic.CompareAndSwapInt32(&t.running, 1, 0) {
|
|
return
|
|
}
|
|
if t.quit == nil {
|
|
return
|
|
}
|
|
select {
|
|
case <-t.quit:
|
|
return
|
|
default:
|
|
}
|
|
logutils.ZapLogger().Debug("stoping search", zap.String("topic", string(t.topic)))
|
|
close(t.quit)
|
|
t.mu.Lock()
|
|
if t.fastModeTimeoutCancel != nil {
|
|
close(t.fastModeTimeoutCancel)
|
|
t.fastModeTimeoutCancel = nil
|
|
}
|
|
t.currentMode = 0
|
|
t.mu.Unlock()
|
|
// wait for poolWG to exit because it writes to period channel
|
|
t.poolWG.Wait()
|
|
close(t.period)
|
|
t.discWG.Wait()
|
|
}
|
|
|
|
// Topic exposes the internal discovery topic.
|
|
func (t *TopicPool) Topic() discv5.Topic {
|
|
return t.topic
|
|
}
|
|
|
|
// SetLimits set the limits for the current TopicPool.
|
|
func (t *TopicPool) SetLimits(limits params.Limits) {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
t.limits = limits
|
|
}
|