diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index efbb593a1..12f657896 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -16,7 +16,7 @@ import libp2p/crypto/crypto, libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/rpc/message, - libp2p/builders + libp2p/peerid import ../../waku/common/sqlite, ../../waku/v2/node/peer_manager/peer_manager, @@ -494,3 +494,63 @@ procSuite "Peer Manager": check: selectedPeer5.isSome() == true selectedPeer5.get().peerId == peers[2].peerId + + test "peer manager cant have more max connections than peerstore size": + # Peerstore size can't be smaller than max connections + let peerStoreSize = 5 + let maxConnections = 10 + + expect(Defect): + let pm = PeerManager.new( + switch = SwitchBuilder.new().withRng(rng).withMplex().withNoise() + .withPeerStore(peerStoreSize) + .withMaxConnections(maxConnections) + .build(), + storage = nil) + + test "prunePeerStore() correctly removes peers to match max quota": + # Create peer manager + let pm = PeerManager.new( + switch = SwitchBuilder.new().withRng(rng).withMplex().withNoise() + .withPeerStore(10) + .withMaxConnections(5) + .build(), + maxFailedAttempts = 1, + storage = nil) + + # Create 15 peers and add them to the peerstore + let peers = toSeq(1..15).mapIt(parseRemotePeerInfo("/ip4/0.0.0.0/tcp/0/p2p/" & $PeerId.random().get())) + for p in peers: pm.addPeer(p, "") + + # Check that we have 15 peers in the peerstore + check: + pm.peerStore.peers.len == 15 + + # fake that some peers failed to connected + pm.peerStore[NumberFailedConnBook][peers[0].peerId] = 2 + pm.peerStore[NumberFailedConnBook][peers[1].peerId] = 2 + pm.peerStore[NumberFailedConnBook][peers[2].peerId] = 2 + + # fake that some peers are connected + pm.peerStore[ConnectionBook][peers[5].peerId] = Connected + pm.peerStore[ConnectionBook][peers[8].peerId] = Connected + pm.peerStore[ConnectionBook][peers[10].peerId] = Connected + pm.peerStore[ConnectionBook][peers[12].peerId] = Connected + + # Prune the peerstore + pm.prunePeerStore() + + check: + # ensure peerstore was pruned + pm.peerStore.peers.len == 10 + + # ensure connected peers were not pruned + pm.peerStore.peers.anyIt(it.peerId == peers[5].peerId) + pm.peerStore.peers.anyIt(it.peerId == peers[8].peerId) + pm.peerStore.peers.anyIt(it.peerId == peers[10].peerId) + pm.peerStore.peers.anyIt(it.peerId == peers[12].peerId) + + # ensure peers that failed were the first to be pruned + not pm.peerStore.peers.anyIt(it.peerId == peers[0].peerId) + not pm.peerStore.peers.anyIt(it.peerId == peers[1].peerId) + not pm.peerStore.peers.anyIt(it.peerId == peers[2].peerId) diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index 7a930258e..02c8265ea 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -23,6 +23,7 @@ declareCounter waku_peers_dials, "Number of peer dials", ["outcome"] declarePublicCounter waku_node_conns_initiated, "Number of connections initiated", ["source"] declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"] declarePublicGauge waku_connected_peers, "Number of connected peers per direction: inbound|outbound", ["direction"] +declarePublicGauge waku_peer_store_size, "Number of peers managed by the peer store" logScope: topics = "waku node peer_manager" @@ -46,6 +47,9 @@ const # Delay between consecutive relayConnectivityLoop runs ConnectivityLoopInterval = chronos.seconds(30) + # How often the peer store is pruned + PrunePeerStoreInterval = chronos.minutes(5) + type PeerManager* = ref object of RootObj switch*: Switch @@ -119,16 +123,6 @@ proc dialPeer(pm: PeerManager, peerId: PeerID, return none(Connection) -# TODO: To be addressed in nwaku/pull/1473. Do not prune service peers -# TODO: Currently unused -proc prunePeerStore(pm: PeerManager) = - # iterate peers in peerstore - # skip service peers - #if pm.peerStore[NumberFailedConnBook][peerId] >= pm.maxFailedAttempts: - # debug "Removing peer from peer store", peerId = peerId, failedAttempts=failedAttempts - # pm.peerStore.del(peerId) - doAssert(false, "Not implemented!") - proc loadFromStorage(pm: PeerManager) = debug "loading peers from storage" # Load peers from storage, if available @@ -191,6 +185,14 @@ proc new*(T: type PeerManager, backoffFactor = BackoffFactor, maxFailedAttempts = MaxFailedAttempts,): PeerManager = + let capacity = switch.peerStore.capacity + let maxConnections = switch.connManager.inSema.size + if maxConnections > capacity: + error "Max number of connections can't be greater than PeerManager capacity", + capacity = capacity, + maxConnections = maxConnections + raise newException(Defect, "Max number of connections can't be greater than PeerManager capacity") + let pm = PeerManager(switch: switch, peerStore: switch.peerStore, storage: storage, @@ -200,9 +202,15 @@ proc new*(T: type PeerManager, proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} = onConnEvent(pm, peerId, event) + proc peerStoreChanged(peerId: PeerId) {.gcsafe.} = + waku_peer_store_size.set(toSeq(pm.peerStore[AddressBook].book.keys).len.int64) + pm.switch.addConnEventHandler(peerHook, ConnEventKind.Connected) pm.switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected) + # called every time the peerstore is updated + pm.peerStore[AddressBook].addHandler(peerStoreChanged) + pm.serviceSlots = initTable[string, RemotePeerInfo]() if not storage.isNil(): @@ -386,6 +394,45 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} = await sleepAsync(ConnectivityLoopInterval) +proc prunePeerStore*(pm: PeerManager) = + let numPeers = toSeq(pm.peerStore[AddressBook].book.keys).len + let capacity = pm.peerStore.capacity + if numPeers < capacity: + return + + debug "Peer store capacity exceeded", numPeers = numPeers, capacity = capacity + let peersToPrune = numPeers - capacity + + # prune peers with too many failed attempts + var pruned = 0 + for peerId in pm.peerStore[NumberFailedConnBook].book.keys: + if peersToPrune - pruned == 0: + break + if pm.peerStore[NumberFailedConnBook][peerId] >= pm.maxFailedAttempts: + pm.peerStore.del(peerId) + pruned += 1 + + # if we still need to prune, prune peers that are not connected + let notConnected = pm.peerStore.getNotConnectedPeers().mapIt(it.peerId) + for peerId in notConnected: + if peersToPrune - pruned == 0: + break + pm.peerStore.del(peerId) + pruned += 1 + + let afterNumPeers = toSeq(pm.peerStore[AddressBook].book.keys).len + debug "Finished pruning peer store", beforeNumPeers = numPeers, + afterNumPeers = afterNumPeers, + capacity = capacity, + pruned = pruned + + +proc prunePeerStoreLoop(pm: PeerManager) {.async.} = + while pm.started: + pm.prunePeerStore() + await sleepAsync(PrunePeerStoreInterval) + + proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] = debug "Selecting peer from peerstore", protocol=proto @@ -416,6 +463,7 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] = proc start*(pm: PeerManager) = pm.started = true asyncSpawn pm.relayConnectivityLoop() + asyncSpawn pm.prunePeerStoreLoop() proc stop*(pm: PeerManager) = pm.started = false