feat(networking): prune peers from peerstore exceeding capacity (#1513)

* feat(networking): prune peers from peerstore

* chore: add comments

* feat(networking): fix comments

* Add tests
This commit is contained in:
Alvaro Revuelta 2023-01-31 13:24:49 +01:00 committed by GitHub
parent 77c64043f9
commit da7592bcc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 119 additions and 11 deletions

View File

@ -16,7 +16,7 @@ import
libp2p/crypto/crypto,
libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/rpc/message,
libp2p/builders
libp2p/peerid
import
../../waku/common/sqlite,
../../waku/v2/node/peer_manager/peer_manager,
@ -494,3 +494,63 @@ procSuite "Peer Manager":
check:
selectedPeer5.isSome() == true
selectedPeer5.get().peerId == peers[2].peerId
test "peer manager cant have more max connections than peerstore size":
# Peerstore size can't be smaller than max connections
let peerStoreSize = 5
let maxConnections = 10
expect(Defect):
let pm = PeerManager.new(
switch = SwitchBuilder.new().withRng(rng).withMplex().withNoise()
.withPeerStore(peerStoreSize)
.withMaxConnections(maxConnections)
.build(),
storage = nil)
test "prunePeerStore() correctly removes peers to match max quota":
# Create peer manager
let pm = PeerManager.new(
switch = SwitchBuilder.new().withRng(rng).withMplex().withNoise()
.withPeerStore(10)
.withMaxConnections(5)
.build(),
maxFailedAttempts = 1,
storage = nil)
# Create 15 peers and add them to the peerstore
let peers = toSeq(1..15).mapIt(parseRemotePeerInfo("/ip4/0.0.0.0/tcp/0/p2p/" & $PeerId.random().get()))
for p in peers: pm.addPeer(p, "")
# Check that we have 15 peers in the peerstore
check:
pm.peerStore.peers.len == 15
# fake that some peers failed to connected
pm.peerStore[NumberFailedConnBook][peers[0].peerId] = 2
pm.peerStore[NumberFailedConnBook][peers[1].peerId] = 2
pm.peerStore[NumberFailedConnBook][peers[2].peerId] = 2
# fake that some peers are connected
pm.peerStore[ConnectionBook][peers[5].peerId] = Connected
pm.peerStore[ConnectionBook][peers[8].peerId] = Connected
pm.peerStore[ConnectionBook][peers[10].peerId] = Connected
pm.peerStore[ConnectionBook][peers[12].peerId] = Connected
# Prune the peerstore
pm.prunePeerStore()
check:
# ensure peerstore was pruned
pm.peerStore.peers.len == 10
# ensure connected peers were not pruned
pm.peerStore.peers.anyIt(it.peerId == peers[5].peerId)
pm.peerStore.peers.anyIt(it.peerId == peers[8].peerId)
pm.peerStore.peers.anyIt(it.peerId == peers[10].peerId)
pm.peerStore.peers.anyIt(it.peerId == peers[12].peerId)
# ensure peers that failed were the first to be pruned
not pm.peerStore.peers.anyIt(it.peerId == peers[0].peerId)
not pm.peerStore.peers.anyIt(it.peerId == peers[1].peerId)
not pm.peerStore.peers.anyIt(it.peerId == peers[2].peerId)

View File

@ -23,6 +23,7 @@ declareCounter waku_peers_dials, "Number of peer dials", ["outcome"]
declarePublicCounter waku_node_conns_initiated, "Number of connections initiated", ["source"]
declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"]
declarePublicGauge waku_connected_peers, "Number of connected peers per direction: inbound|outbound", ["direction"]
declarePublicGauge waku_peer_store_size, "Number of peers managed by the peer store"
logScope:
topics = "waku node peer_manager"
@ -46,6 +47,9 @@ const
# Delay between consecutive relayConnectivityLoop runs
ConnectivityLoopInterval = chronos.seconds(30)
# How often the peer store is pruned
PrunePeerStoreInterval = chronos.minutes(5)
type
PeerManager* = ref object of RootObj
switch*: Switch
@ -119,16 +123,6 @@ proc dialPeer(pm: PeerManager, peerId: PeerID,
return none(Connection)
# TODO: To be addressed in nwaku/pull/1473. Do not prune service peers
# TODO: Currently unused
proc prunePeerStore(pm: PeerManager) =
# iterate peers in peerstore
# skip service peers
#if pm.peerStore[NumberFailedConnBook][peerId] >= pm.maxFailedAttempts:
# debug "Removing peer from peer store", peerId = peerId, failedAttempts=failedAttempts
# pm.peerStore.del(peerId)
doAssert(false, "Not implemented!")
proc loadFromStorage(pm: PeerManager) =
debug "loading peers from storage"
# Load peers from storage, if available
@ -191,6 +185,14 @@ proc new*(T: type PeerManager,
backoffFactor = BackoffFactor,
maxFailedAttempts = MaxFailedAttempts,): PeerManager =
let capacity = switch.peerStore.capacity
let maxConnections = switch.connManager.inSema.size
if maxConnections > capacity:
error "Max number of connections can't be greater than PeerManager capacity",
capacity = capacity,
maxConnections = maxConnections
raise newException(Defect, "Max number of connections can't be greater than PeerManager capacity")
let pm = PeerManager(switch: switch,
peerStore: switch.peerStore,
storage: storage,
@ -200,9 +202,15 @@ proc new*(T: type PeerManager,
proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
onConnEvent(pm, peerId, event)
proc peerStoreChanged(peerId: PeerId) {.gcsafe.} =
waku_peer_store_size.set(toSeq(pm.peerStore[AddressBook].book.keys).len.int64)
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Connected)
pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected)
# called every time the peerstore is updated
pm.peerStore[AddressBook].addHandler(peerStoreChanged)
pm.serviceSlots = initTable[string, RemotePeerInfo]()
if not storage.isNil():
@ -386,6 +394,45 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} =
await sleepAsync(ConnectivityLoopInterval)
proc prunePeerStore*(pm: PeerManager) =
let numPeers = toSeq(pm.peerStore[AddressBook].book.keys).len
let capacity = pm.peerStore.capacity
if numPeers < capacity:
return
debug "Peer store capacity exceeded", numPeers = numPeers, capacity = capacity
let peersToPrune = numPeers - capacity
# prune peers with too many failed attempts
var pruned = 0
for peerId in pm.peerStore[NumberFailedConnBook].book.keys:
if peersToPrune - pruned == 0:
break
if pm.peerStore[NumberFailedConnBook][peerId] >= pm.maxFailedAttempts:
pm.peerStore.del(peerId)
pruned += 1
# if we still need to prune, prune peers that are not connected
let notConnected = pm.peerStore.getNotConnectedPeers().mapIt(it.peerId)
for peerId in notConnected:
if peersToPrune - pruned == 0:
break
pm.peerStore.del(peerId)
pruned += 1
let afterNumPeers = toSeq(pm.peerStore[AddressBook].book.keys).len
debug "Finished pruning peer store", beforeNumPeers = numPeers,
afterNumPeers = afterNumPeers,
capacity = capacity,
pruned = pruned
proc prunePeerStoreLoop(pm: PeerManager) {.async.} =
while pm.started:
pm.prunePeerStore()
await sleepAsync(PrunePeerStoreInterval)
proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
debug "Selecting peer from peerstore", protocol=proto
@ -416,6 +463,7 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
proc start*(pm: PeerManager) =
pm.started = true
asyncSpawn pm.relayConnectivityLoop()
asyncSpawn pm.prunePeerStoreLoop()
proc stop*(pm: PeerManager) =
pm.started = false