mirror of https://github.com/waku-org/nwaku.git
added sharded peer store pruning (#2167)
This commit is contained in:
parent
dba9820c1f
commit
281c13a429
|
@ -5,7 +5,7 @@ else:
|
|||
|
||||
|
||||
import
|
||||
std/[options, sugar, sets, sequtils, times, strutils, math],
|
||||
std/[options, sets, tables, sequtils, times, strutils, math],
|
||||
chronos,
|
||||
chronicles,
|
||||
metrics,
|
||||
|
@ -14,13 +14,11 @@ import
|
|||
libp2p/nameresolving/nameresolver
|
||||
import
|
||||
../../common/nimchronos,
|
||||
../../common/enr,
|
||||
../../waku_core,
|
||||
../../waku_relay,
|
||||
../../waku_enr/sharding,
|
||||
../../waku_enr/capabilities,
|
||||
../../waku_store/common,
|
||||
../../waku_filter_v2/common,
|
||||
../../waku_lightpush/common,
|
||||
../../waku_metadata,
|
||||
./peer_store/peer_storage,
|
||||
./waku_peer_store
|
||||
|
@ -728,38 +726,77 @@ proc manageRelayPeers*(pm: PeerManager) {.async.} =
|
|||
await pm.connectToNodes(uniquePeers[i..<stop])
|
||||
|
||||
proc prunePeerStore*(pm: PeerManager) =
|
||||
let numPeers = toSeq(pm.peerStore[AddressBook].book.keys).len
|
||||
let numPeers = pm.peerStore[AddressBook].book.len
|
||||
let capacity = pm.peerStore.capacity
|
||||
if numPeers < capacity:
|
||||
if numPeers <= capacity:
|
||||
return
|
||||
|
||||
debug "Peer store capacity exceeded", numPeers = numPeers, capacity = capacity
|
||||
let peersToPrune = numPeers - capacity
|
||||
let pruningCount = numPeers - capacity
|
||||
var peersToPrune: HashSet[PeerId]
|
||||
|
||||
# prune peers with too many failed attempts
|
||||
var pruned = 0
|
||||
# copy to avoid modifying the book while iterating
|
||||
let peerKeys = toSeq(pm.peerStore[NumberFailedConnBook].book.keys)
|
||||
for peerId in peerKeys:
|
||||
if peersToPrune - pruned == 0:
|
||||
# prune failed connections
|
||||
for peerId, count in pm.peerStore[NumberFailedConnBook].book.pairs:
|
||||
if count < pm.maxFailedAttempts:
|
||||
continue
|
||||
|
||||
if peersToPrune.len >= pruningCount:
|
||||
break
|
||||
if pm.peerStore[NumberFailedConnBook][peerId] >= pm.maxFailedAttempts:
|
||||
pm.peerStore.del(peerId)
|
||||
pruned += 1
|
||||
|
||||
# if we still need to prune, prune peers that are not connected
|
||||
peersToPrune.incl(peerId)
|
||||
|
||||
let notConnected = pm.peerStore.getNotConnectedPeers().mapIt(it.peerId)
|
||||
for peerId in notConnected:
|
||||
if peersToPrune - pruned == 0:
|
||||
break
|
||||
pm.peerStore.del(peerId)
|
||||
pruned += 1
|
||||
|
||||
let afterNumPeers = toSeq(pm.peerStore[AddressBook].book.keys).len
|
||||
var shardlessPeers: seq[PeerId]
|
||||
var peersByShard = initTable[uint16, seq[PeerId]]()
|
||||
|
||||
for peer in notConnected:
|
||||
if not pm.peerStore[ENRBook].contains(peer):
|
||||
shardlessPeers.add(peer)
|
||||
continue
|
||||
|
||||
let record = pm.peerStore[ENRBook][peer]
|
||||
|
||||
let rec = record.toTyped().valueOr:
|
||||
shardlessPeers.add(peer)
|
||||
continue
|
||||
|
||||
let rs = rec.relaySharding().valueOr:
|
||||
shardlessPeers.add(peer)
|
||||
continue
|
||||
|
||||
for shard in rs.shardIds:
|
||||
peersByShard.mgetOrPut(shard, @[peer]).add(peer)
|
||||
|
||||
# prune not connected peers without shard
|
||||
for peer in shardlessPeers:
|
||||
if peersToPrune.len >= pruningCount:
|
||||
break
|
||||
|
||||
peersToPrune.incl(peer)
|
||||
|
||||
# calculate the avg peers per shard
|
||||
let total = sum(toSeq(peersByShard.values).mapIt(it.len))
|
||||
let avg = min(1, total div max(1, peersByShard.len))
|
||||
|
||||
# prune peers from shard with higher than avg count
|
||||
for shard, peers in peersByShard.pairs:
|
||||
let count = max(peers.len - avg, 0)
|
||||
for peer in peers[0..count]:
|
||||
if peersToPrune.len >= pruningCount:
|
||||
break
|
||||
|
||||
peersToPrune.incl(peer)
|
||||
|
||||
for peer in peersToPrune:
|
||||
pm.peerStore.delete(peer)
|
||||
|
||||
let afterNumPeers = pm.peerStore[AddressBook].book.len
|
||||
|
||||
debug "Finished pruning peer store", beforeNumPeers = numPeers,
|
||||
afterNumPeers = afterNumPeers,
|
||||
capacity = capacity,
|
||||
pruned = pruned
|
||||
pruned = peersToPrune.len
|
||||
|
||||
proc selectPeer*(pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic)): Option[RemotePeerInfo] =
|
||||
debug "Selecting peer from peerstore", protocol=proto
|
||||
|
|
Loading…
Reference in New Issue