From df58643ea31213ef4cc65313005e935a79fdc0df Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Mon, 7 Apr 2025 12:24:03 +0200 Subject: [PATCH] chore: retrieve protocols in new added peer from discv5 (#3354) * add new unit test to validate that any peer can be retrieved * add new discv5 test and better peer store management * wakuPeerStore -> switch.peerStore * simplify waku_peer_store, better logs and peer_manager enhancements --- .../diagnose_connections.nim | 10 +- .../service_peer_management.nim | 6 +- apps/wakucanary/wakucanary.nim | 2 +- examples/publisher.nim | 2 +- examples/subscriber.nim | 2 +- .../requests/peer_manager_request.nim | 6 +- tests/all_tests_waku.nim | 3 +- tests/node/peer_manager/peer_store/utils.nim | 3 - tests/node/test_wakunode_peer_exchange.nim | 26 +- tests/node/test_wakunode_peer_manager.nim | 34 +- tests/test_peer_manager.nim | 304 ++++++++++-------- tests/test_peer_store_extended.nim | 4 +- tests/test_waku_dnsdisc.nim | 12 +- tests/waku_discv5/test_all.nim | 1 - tests/waku_discv5/test_waku_discv5.nim | 84 ++++- waku/node/peer_manager/peer_manager.nim | 148 ++++----- waku/node/peer_manager/waku_peer_store.nim | 210 ++++++------ waku/node/waku_node.nim | 4 +- waku/waku_api/rest/admin/handlers.nim | 16 +- waku/waku_filter_v2/protocol.nim | 2 +- waku/waku_peer_exchange/protocol.nim | 2 +- 21 files changed, 485 insertions(+), 396 deletions(-) delete mode 100644 tests/waku_discv5/test_all.nim diff --git a/apps/liteprotocoltester/diagnose_connections.nim b/apps/liteprotocoltester/diagnose_connections.nim index 788f83c68..a4007d59c 100644 --- a/apps/liteprotocoltester/diagnose_connections.nim +++ b/apps/liteprotocoltester/diagnose_connections.nim @@ -42,7 +42,7 @@ proc `$`*(cap: Capabilities): string = proc allPeers(pm: PeerManager): string = var allStr: string = "" - for idx, peer in pm.wakuPeerStore.peers(): + for idx, peer in pm.switch.peerStore.peers(): allStr.add( " " & $idx & ". | " & constructMultiaddrStr(peer) & " | agent: " & peer.getAgent() & " | protos: " & $peer.protocols & " | caps: " & @@ -51,10 +51,10 @@ proc allPeers(pm: PeerManager): string = return allStr proc logSelfPeers*(pm: PeerManager) = - let selfLighpushPeers = pm.wakuPeerStore.getPeersByProtocol(WakuLightPushCodec) - let selfRelayPeers = pm.wakuPeerStore.getPeersByProtocol(WakuRelayCodec) - let selfFilterPeers = pm.wakuPeerStore.getPeersByProtocol(WakuFilterSubscribeCodec) - let selfPxPeers = pm.wakuPeerStore.getPeersByProtocol(WakuPeerExchangeCodec) + let selfLighpushPeers = pm.switch.peerStore.getPeersByProtocol(WakuLightPushCodec) + let selfRelayPeers = pm.switch.peerStore.getPeersByProtocol(WakuRelayCodec) + let selfFilterPeers = pm.switch.peerStore.getPeersByProtocol(WakuFilterSubscribeCodec) + let selfPxPeers = pm.switch.peerStore.getPeersByProtocol(WakuPeerExchangeCodec) let printable = catch: """*------------------------------------------------------------------------------------------* diff --git a/apps/liteprotocoltester/service_peer_management.nim b/apps/liteprotocoltester/service_peer_management.nim index 8fd6de973..83216ae3b 100644 --- a/apps/liteprotocoltester/service_peer_management.nim +++ b/apps/liteprotocoltester/service_peer_management.nim @@ -61,7 +61,7 @@ proc selectRandomCapablePeer*( elif codec.contains("filter"): cap = Capabilities.Filter - var supportivePeers = pm.wakuPeerStore.getPeersByCapability(cap) + var supportivePeers = pm.switch.peerStore.getPeersByCapability(cap) trace "Found supportive peers count", count = supportivePeers.len() trace "Found supportive peers", supportivePeers = $supportivePeers @@ -102,7 +102,7 @@ proc tryCallAllPxPeers*( elif codec.contains("filter"): capability = Capabilities.Filter - var supportivePeers = pm.wakuPeerStore.getPeersByCapability(capability) + var supportivePeers = pm.switch.peerStore.getPeersByCapability(capability) lpt_px_peers.set(supportivePeers.len) debug "Found supportive peers count", count = supportivePeers.len() @@ -215,7 +215,7 @@ proc selectRandomServicePeer*( if actualPeer.isSome(): alreadyUsedServicePeers.add(actualPeer.get()) - let supportivePeers = pm.wakuPeerStore.getPeersByProtocol(codec).filterIt( + let supportivePeers = pm.switch.peerStore.getPeersByProtocol(codec).filterIt( it notin alreadyUsedServicePeers ) if supportivePeers.len == 0: diff --git a/apps/wakucanary/wakucanary.nim b/apps/wakucanary/wakucanary.nim index 914d76e70..ea5220248 100644 --- a/apps/wakucanary/wakucanary.nim +++ b/apps/wakucanary/wakucanary.nim @@ -246,7 +246,7 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} = return 1 let lp2pPeerStore = node.switch.peerStore - let conStatus = node.peerManager.wakuPeerStore[ConnectionBook][peer.peerId] + let conStatus = node.peerManager.switch.peerStore[ConnectionBook][peer.peerId] if conf.ping: discard await pingFut diff --git a/examples/publisher.nim b/examples/publisher.nim index 654f40601..5b1ca9f18 100644 --- a/examples/publisher.nim +++ b/examples/publisher.nim @@ -95,7 +95,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = # wait for a minimum of peers to be connected, otherwise messages wont be gossiped while true: - let numConnectedPeers = node.peerManager.wakuPeerStore[ConnectionBook].book + let numConnectedPeers = node.peerManager.switch.peerStore[ConnectionBook].book .values() .countIt(it == Connected) if numConnectedPeers >= 6: diff --git a/examples/subscriber.nim b/examples/subscriber.nim index 0dd22f469..90440aabc 100644 --- a/examples/subscriber.nim +++ b/examples/subscriber.nim @@ -93,7 +93,7 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} = # wait for a minimum of peers to be connected, otherwise messages wont be gossiped while true: - let numConnectedPeers = node.peerManager.wakuPeerStore[ConnectionBook].book + let numConnectedPeers = node.peerManager.switch.peerStore[ConnectionBook].book .values() .countIt(it == Connected) if numConnectedPeers >= 6: diff --git a/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim b/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim index d8a0a57af..1e5202891 100644 --- a/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim @@ -86,13 +86,13 @@ proc process*( of GET_ALL_PEER_IDS: ## returns a comma-separated string of peerIDs let peerIDs = - waku.node.peerManager.wakuPeerStore.peers().mapIt($it.peerId).join(",") + waku.node.peerManager.switch.peerStore.peers().mapIt($it.peerId).join(",") return ok(peerIDs) of GET_CONNECTED_PEERS_INFO: ## returns a JSON string mapping peerIDs to objects with protocols and addresses var peersMap = initTable[string, PeerInfo]() - let peers = waku.node.peerManager.wakuPeerStore.peers().filterIt( + let peers = waku.node.peerManager.switch.peerStore.peers().filterIt( it.connectedness == Connected ) @@ -108,7 +108,7 @@ proc process*( return ok(jsonStr) of GET_PEER_IDS_BY_PROTOCOL: ## returns a comma-separated string of peerIDs that mount the given protocol - let connectedPeers = waku.node.peerManager.wakuPeerStore + let connectedPeers = waku.node.peerManager.switch.peerStore .peers($self[].protocol) .filterIt(it.connectedness == Connected) .mapIt($it.peerId) diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index 3e847ae86..f23f4249c 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -85,7 +85,8 @@ import ./test_waku_noise_sessions, ./test_waku_netconfig, ./test_waku_switch, - ./test_waku_rendezvous + ./test_waku_rendezvous, + ./waku_discv5/test_waku_discv5 # Waku Keystore test suite import ./test_waku_keystore_keyfile, ./test_waku_keystore diff --git a/tests/node/peer_manager/peer_store/utils.nim b/tests/node/peer_manager/peer_store/utils.nim index 1d5dc6e22..b087dc471 100644 --- a/tests/node/peer_manager/peer_store/utils.nim +++ b/tests/node/peer_manager/peer_store/utils.nim @@ -7,6 +7,3 @@ import proc newTestWakuPeerStorage*(path: Option[string] = string.none()): WakuPeerStorage = let db = newSqliteDatabase(path) WakuPeerStorage.new(db).value() - -proc peerExists*(peerStore: PeerStore, peerId: PeerId): bool = - return peerStore[AddressBook].contains(peerId) diff --git a/tests/node/test_wakunode_peer_exchange.nim b/tests/node/test_wakunode_peer_exchange.nim index edb262b0e..afd808a2c 100644 --- a/tests/node/test_wakunode_peer_exchange.nim +++ b/tests/node/test_wakunode_peer_exchange.nim @@ -83,7 +83,7 @@ suite "Waku Peer Exchange": # Then no peers are fetched check: - node.peerManager.wakuPeerStore.peers.len == 0 + node.peerManager.switch.peerStore.peers.len == 0 res.error.status_code == SERVICE_UNAVAILABLE res.error.status_desc == some("PeerExchange is not mounted") @@ -98,12 +98,12 @@ suite "Waku Peer Exchange": res.error.status_desc == some("peer_not_found_failure") # Then no peers are fetched - check node.peerManager.wakuPeerStore.peers.len == 0 + check node.peerManager.switch.peerStore.peers.len == 0 asyncTest "Node succesfully exchanges px peers with faked discv5": # Given both nodes mount peer exchange await allFutures([node.mountPeerExchange(), node2.mountPeerExchange()]) - check node.peerManager.wakuPeerStore.peers.len == 0 + check node.peerManager.switch.peerStore.peers.len == 0 # Mock that we discovered a node (to avoid running discv5) var enr = enr.Record() @@ -124,8 +124,8 @@ suite "Waku Peer Exchange": # Check that the peer ended up in the peerstore let rpInfo = enr.toRemotePeerInfo.get() check: - node.peerManager.wakuPeerStore.peers.anyIt(it.peerId == rpInfo.peerId) - node.peerManager.wakuPeerStore.peers.anyIt(it.addrs == rpInfo.addrs) + node.peerManager.switch.peerStore.peers.anyIt(it.peerId == rpInfo.peerId) + node.peerManager.switch.peerStore.peers.anyIt(it.addrs == rpInfo.addrs) suite "setPeerExchangePeer": var node2 {.threadvar.}: WakuNode @@ -142,7 +142,7 @@ suite "Waku Peer Exchange": asyncTest "peer set successfully": # Given a node with peer exchange mounted await node.mountPeerExchange() - let initialPeers = node.peerManager.wakuPeerStore.peers.len + let initialPeers = node.peerManager.switch.peerStore.peers.len # And a valid peer info let remotePeerInfo2 = node2.peerInfo.toRemotePeerInfo() @@ -152,12 +152,12 @@ suite "Waku Peer Exchange": # Then the peer is added to the peer store check: - node.peerManager.wakuPeerStore.peers.len == (initialPeers + 1) + node.peerManager.switch.peerStore.peers.len == (initialPeers + 1) asyncTest "peer exchange not mounted": # Given a node without peer exchange mounted check node.wakuPeerExchange == nil - let initialPeers = node.peerManager.wakuPeerStore.peers.len + let initialPeers = node.peerManager.switch.peerStore.peers.len # And a valid peer info let invalidMultiAddress = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() @@ -167,12 +167,12 @@ suite "Waku Peer Exchange": # Then no peer is added to the peer store check: - node.peerManager.wakuPeerStore.peers.len == initialPeers + node.peerManager.switch.peerStore.peers.len == initialPeers asyncTest "peer info parse error": # Given a node with peer exchange mounted await node.mountPeerExchange() - let initialPeers = node.peerManager.wakuPeerStore.peers.len + let initialPeers = node.peerManager.switch.peerStore.peers.len # And given a peer info with an invalid peer id var remotePeerInfo2 = node2.peerInfo.toRemotePeerInfo() @@ -183,7 +183,7 @@ suite "Waku Peer Exchange": # Then no peer is added to the peer store check: - node.peerManager.wakuPeerStore.peers.len == initialPeers + node.peerManager.switch.peerStore.peers.len == initialPeers suite "Waku Peer Exchange with discv5": asyncTest "Node successfully exchanges px peers with real discv5": @@ -286,13 +286,13 @@ suite "Waku Peer Exchange with discv5": let requestPeers = 1 - currentPeers = node3.peerManager.wakuPeerStore.peers.len + currentPeers = node3.peerManager.switch.peerStore.peers.len let res = await node3.fetchPeerExchangePeers(1) check res.tryGet() == 1 # Then node3 has received 1 peer from node1 check: - node3.peerManager.wakuPeerStore.peers.len == currentPeers + requestPeers + node3.peerManager.switch.peerStore.peers.len == currentPeers + requestPeers await allFutures( [node1.stop(), node2.stop(), node3.stop(), disc1.stop(), disc2.stop()] diff --git a/tests/node/test_wakunode_peer_manager.nim b/tests/node/test_wakunode_peer_manager.nim index 0fd80271b..6b8fb2fa6 100644 --- a/tests/node/test_wakunode_peer_manager.nim +++ b/tests/node/test_wakunode_peer_manager.nim @@ -45,9 +45,9 @@ suite "Peer Manager": var server {.threadvar.}: WakuNode - serverPeerStore {.threadvar.}: WakuPeerStore + serverPeerStore {.threadvar.}: PeerStore client {.threadvar.}: WakuNode - clientPeerStore {.threadvar.}: WakuPeerStore + clientPeerStore {.threadvar.}: PeerStore var serverRemotePeerInfo {.threadvar.}: RemotePeerInfo @@ -64,9 +64,9 @@ suite "Peer Manager": clientKey = generateSecp256k1Key() server = newTestWakuNode(serverKey, listenIp, Port(3000)) - serverPeerStore = server.peerManager.wakuPeerStore + serverPeerStore = server.peerManager.switch.peerStore client = newTestWakuNode(clientKey, listenIp, Port(3001)) - clientPeerStore = client.peerManager.wakuPeerStore + clientPeerStore = client.peerManager.switch.peerStore await allFutures(server.start(), client.start()) @@ -140,7 +140,7 @@ suite "Peer Manager": clientPeerStore.peers().len == 1 # Given the server is marked as CannotConnect - client.peerManager.wakuPeerStore[ConnectionBook].book[serverPeerId] = + client.peerManager.switch.peerStore[ConnectionBook].book[serverPeerId] = CannotConnect # When pruning the client's store @@ -177,7 +177,7 @@ suite "Peer Manager": clientPeerStore.peers().len == 1 # Given the server is marked as having 1 failed connection - client.peerManager.wakuPeerStore[NumberFailedConnBook].book[serverPeerId] = 1 + client.peerManager.switch.peerStore[NumberFailedConnBook].book[serverPeerId] = 1 # When pruning the client's store client.peerManager.prunePeerStore() @@ -196,7 +196,7 @@ suite "Peer Manager": clientPeerStore.peers().len == 1 # Given the server is marked as not connected - client.peerManager.wakuPeerStore[ConnectionBook].book[serverPeerId] = + client.peerManager.switch.peerStore[ConnectionBook].book[serverPeerId] = CannotConnect # When pruning the client's store @@ -220,7 +220,7 @@ suite "Peer Manager": # Given the server is marked as not connected # (There's only one shard in the ENR so avg shards will be the same as the shard count; hence it will be purged.) - client.peerManager.wakuPeerStore[ConnectionBook].book[serverPeerId] = + client.peerManager.switch.peerStore[ConnectionBook].book[serverPeerId] = CannotConnect # When pruning the client's store @@ -714,8 +714,8 @@ suite "Persistence Check": client = newTestWakuNode( clientKey, listenIp, listenPort, peerStorage = clientPeerStorage ) - serverPeerStore = server.peerManager.wakuPeerStore - clientPeerStore = client.peerManager.wakuPeerStore + serverPeerStore = server.peerManager.switch.peerStore + clientPeerStore = client.peerManager.switch.peerStore await allFutures(server.start(), client.start()) @@ -731,7 +731,7 @@ suite "Persistence Check": newClient = newTestWakuNode( clientKey, listenIp, listenPort, peerStorage = newClientPeerStorage ) - newClientPeerStore = newClient.peerManager.wakuPeerStore + newClientPeerStore = newClient.peerManager.switch.peerStore await newClient.start() @@ -756,8 +756,8 @@ suite "Persistence Check": client = newTestWakuNode( clientKey, listenIp, listenPort, peerStorage = clientPeerStorage ) - serverPeerStore = server.peerManager.wakuPeerStore - clientPeerStore = client.peerManager.wakuPeerStore + serverPeerStore = server.peerManager.switch.peerStore + clientPeerStore = client.peerManager.switch.peerStore await allFutures(server.start(), client.start()) @@ -776,8 +776,8 @@ suite "Persistence Check": clientKey = generateSecp256k1Key() server = newTestWakuNode(serverKey, listenIp, listenPort) client = newTestWakuNode(clientKey, listenIp, listenPort) - serverPeerStore = server.peerManager.wakuPeerStore - clientPeerStore = client.peerManager.wakuPeerStore + serverPeerStore = server.peerManager.switch.peerStore + clientPeerStore = client.peerManager.switch.peerStore await allFutures(server.start(), client.start()) @@ -792,13 +792,13 @@ suite "Mount Order": var client {.threadvar.}: WakuNode clientRemotePeerInfo {.threadvar.}: RemotePeerInfo - clientPeerStore {.threadvar.}: WakuPeerStore + clientPeerStore {.threadvar.}: PeerStore asyncSetup: let clientKey = generateSecp256k1Key() client = newTestWakuNode(clientKey, listenIp, listenPort) - clientPeerStore = client.peerManager.wakuPeerStore + clientPeerStore = client.peerManager.switch.peerStore await client.start() diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index 4fd148b81..4ca08e46f 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -50,10 +50,10 @@ procSuite "Peer Manager": check: connOk == true - nodes[0].peerManager.wakuPeerStore.peers().anyIt( + nodes[0].peerManager.switch.peerStore.peers().anyIt( it.peerId == nodes[1].peerInfo.peerId ) - nodes[0].peerManager.wakuPeerStore.connectedness(nodes[1].peerInfo.peerId) == + nodes[0].peerManager.switch.peerStore.connectedness(nodes[1].peerInfo.peerId) == Connectedness.Connected asyncTest "dialPeer() works": @@ -80,13 +80,13 @@ procSuite "Peer Manager": # Check that node2 is being managed in node1 check: - nodes[0].peerManager.wakuPeerStore.peers().anyIt( + nodes[0].peerManager.switch.peerStore.peers().anyIt( it.peerId == nodes[1].peerInfo.peerId ) # Check connectedness check: - nodes[0].peerManager.wakuPeerStore.connectedness(nodes[1].peerInfo.peerId) == + nodes[0].peerManager.switch.peerStore.connectedness(nodes[1].peerInfo.peerId) == Connectedness.Connected await allFutures(nodes.mapIt(it.stop())) @@ -141,12 +141,12 @@ procSuite "Peer Manager": # Check peers were successfully added to peer manager check: - node.peerManager.wakuPeerStore.peers().len == 2 - node.peerManager.wakuPeerStore.peers(WakuFilterSubscribeCodec).allIt( + node.peerManager.switch.peerStore.peers().len == 2 + node.peerManager.switch.peerStore.peers(WakuFilterSubscribeCodec).allIt( it.peerId == filterPeer.peerId and it.addrs.contains(filterLoc) and it.protocols.contains(WakuFilterSubscribeCodec) ) - node.peerManager.wakuPeerStore.peers(WakuStoreCodec).allIt( + node.peerManager.switch.peerStore.peers(WakuStoreCodec).allIt( it.peerId == storePeer.peerId and it.addrs.contains(storeLoc) and it.protocols.contains(WakuStoreCodec) ) @@ -166,7 +166,7 @@ procSuite "Peer Manager": nodes[0].peerManager.addPeer(nodes[1].peerInfo.toRemotePeerInfo()) check: # No information about node2's connectedness - nodes[0].peerManager.wakuPeerStore.connectedness(nodes[1].peerInfo.peerId) == + nodes[0].peerManager.switch.peerStore.connectedness(nodes[1].peerInfo.peerId) == NotConnected # Failed connection @@ -183,7 +183,7 @@ procSuite "Peer Manager": check: # Cannot connect to node2 - nodes[0].peerManager.wakuPeerStore.connectedness(nonExistentPeer.peerId) == + nodes[0].peerManager.switch.peerStore.connectedness(nonExistentPeer.peerId) == CannotConnect # Successful connection @@ -194,14 +194,14 @@ procSuite "Peer Manager": check: # Currently connected to node2 - nodes[0].peerManager.wakuPeerStore.connectedness(nodes[1].peerInfo.peerId) == + nodes[0].peerManager.switch.peerStore.connectedness(nodes[1].peerInfo.peerId) == Connected # Stop node. Gracefully disconnect from all peers. await nodes[0].stop() check: # Not currently connected to node2, but had recent, successful connection. - nodes[0].peerManager.wakuPeerStore.connectedness(nodes[1].peerInfo.peerId) == + nodes[0].peerManager.switch.peerStore.connectedness(nodes[1].peerInfo.peerId) == CanConnect await nodes[1].stop() @@ -232,12 +232,13 @@ procSuite "Peer Manager": let conn1Ok = await nodes[0].peerManager.connectPeer(nonExistentPeer) check: # Cannot connect to node2 - nodes[0].peerManager.wakuPeerStore.connectedness(nonExistentPeer.peerId) == + nodes[0].peerManager.switch.peerStore.connectedness(nonExistentPeer.peerId) == CannotConnect - nodes[0].peerManager.wakuPeerStore[ConnectionBook][nonExistentPeer.peerId] == + nodes[0].peerManager.switch.peerStore[ConnectionBook][nonExistentPeer.peerId] == CannotConnect - nodes[0].peerManager.wakuPeerStore[NumberFailedConnBook][nonExistentPeer.peerId] == - 1 + nodes[0].peerManager.switch.peerStore[NumberFailedConnBook][ + nonExistentPeer.peerId + ] == 1 # Connection attempt failed conn1Ok == false @@ -253,14 +254,17 @@ procSuite "Peer Manager": nodes[0].peerManager.canBeConnected(nodes[1].peerInfo.peerId) == true # After a successful connection, the number of failed connections is reset - nodes[0].peerManager.wakuPeerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] = - 4 + + nodes[0].peerManager.switch.peerStore[NumberFailedConnBook][ + nodes[1].peerInfo.peerId + ] = 4 let conn2Ok = await nodes[0].peerManager.connectPeer(nodes[1].peerInfo.toRemotePeerInfo()) check: conn2Ok == true - nodes[0].peerManager.wakuPeerStore[NumberFailedConnBook][nodes[1].peerInfo.peerId] == - 0 + nodes[0].peerManager.switch.peerStore[NumberFailedConnBook][ + nodes[1].peerInfo.peerId + ] == 0 await allFutures(nodes.mapIt(it.stop())) @@ -290,7 +294,7 @@ procSuite "Peer Manager": assert is12Connected == true, "Node 1 and 2 not connected" check: - node1.peerManager.wakuPeerStore[AddressBook][remotePeerInfo2.peerId] == + node1.peerManager.switch.peerStore[AddressBook][remotePeerInfo2.peerId] == remotePeerInfo2.addrs # wait for the peer store update @@ -298,9 +302,9 @@ procSuite "Peer Manager": check: # Currently connected to node2 - node1.peerManager.wakuPeerStore.peers().len == 1 - node1.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peerInfo2.peerId) - node1.peerManager.wakuPeerStore.connectedness(peerInfo2.peerId) == Connected + node1.peerManager.switch.peerStore.peers().len == 1 + node1.peerManager.switch.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + node1.peerManager.switch.peerStore.connectedness(peerInfo2.peerId) == Connected # Simulate restart by initialising a new node using the same storage let node3 = newTestWakuNode( @@ -316,9 +320,9 @@ procSuite "Peer Manager": check: # Node2 has been loaded after "restart", but we have not yet reconnected - node3.peerManager.wakuPeerStore.peers().len == 1 - node3.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peerInfo2.peerId) - node3.peerManager.wakuPeerStore.connectedness(peerInfo2.peerId) == NotConnected + node3.peerManager.switch.peerStore.peers().len == 1 + node3.peerManager.switch.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + node3.peerManager.switch.peerStore.connectedness(peerInfo2.peerId) == NotConnected await node3.mountRelay() @@ -328,9 +332,9 @@ procSuite "Peer Manager": check: # Reconnected to node2 after "restart" - node3.peerManager.wakuPeerStore.peers().len == 1 - node3.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peerInfo2.peerId) - node3.peerManager.wakuPeerStore.connectedness(peerInfo2.peerId) == Connected + node3.peerManager.switch.peerStore.peers().len == 1 + node3.peerManager.switch.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + node3.peerManager.switch.peerStore.connectedness(peerInfo2.peerId) == Connected await allFutures([node1.stop(), node2.stop(), node3.stop()]) @@ -360,7 +364,7 @@ procSuite "Peer Manager": assert is12Connected == true, "Node 1 and 2 not connected" check: - node1.peerManager.wakuPeerStore[AddressBook][remotePeerInfo2.peerId] == + node1.peerManager.switch.peerStore[AddressBook][remotePeerInfo2.peerId] == remotePeerInfo2.addrs # wait for the peer store update @@ -368,9 +372,9 @@ procSuite "Peer Manager": check: # Currently connected to node2 - node1.peerManager.wakuPeerStore.peers().len == 1 - node1.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peerInfo2.peerId) - node1.peerManager.wakuPeerStore.connectedness(peerInfo2.peerId) == Connected + node1.peerManager.switch.peerStore.peers().len == 1 + node1.peerManager.switch.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + node1.peerManager.switch.peerStore.connectedness(peerInfo2.peerId) == Connected # Simulate restart by initialising a new node using the same storage let node3 = newTestWakuNode( @@ -386,9 +390,9 @@ procSuite "Peer Manager": check: # Node2 has been loaded after "restart", but we have not yet reconnected - node3.peerManager.wakuPeerStore.peers().len == 1 - node3.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peerInfo2.peerId) - node3.peerManager.wakuPeerStore.connectedness(peerInfo2.peerId) == NotConnected + node3.peerManager.switch.peerStore.peers().len == 1 + node3.peerManager.switch.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + node3.peerManager.switch.peerStore.connectedness(peerInfo2.peerId) == NotConnected await node3.mountRelay() @@ -398,9 +402,9 @@ procSuite "Peer Manager": check: # Reconnected to node2 after "restart" - node3.peerManager.wakuPeerStore.peers().len == 1 - node3.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peerInfo2.peerId) - node3.peerManager.wakuPeerStore.connectedness(peerInfo2.peerId) == Connected + node3.peerManager.switch.peerStore.peers().len == 1 + node3.peerManager.switch.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + node3.peerManager.switch.peerStore.connectedness(peerInfo2.peerId) == Connected await allFutures([node1.stop(), node2.stop(), node3.stop()]) @@ -488,12 +492,12 @@ procSuite "Peer Manager": (await node1.peerManager.connectPeer(peerInfo2.toRemotePeerInfo())) == true check: # Currently connected to node2 - node1.peerManager.wakuPeerStore.peers().len == 1 - node1.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peerInfo2.peerId) - node1.peerManager.wakuPeerStore.peers().anyIt( + node1.peerManager.switch.peerStore.peers().len == 1 + node1.peerManager.switch.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + node1.peerManager.switch.peerStore.peers().anyIt( it.protocols.contains(node2.wakuRelay.codec) ) - node1.peerManager.wakuPeerStore.connectedness(peerInfo2.peerId) == Connected + node1.peerManager.switch.peerStore.connectedness(peerInfo2.peerId) == Connected # Simulate restart by initialising a new node using the same storage let node3 = newTestWakuNode( @@ -510,20 +514,22 @@ procSuite "Peer Manager": node2.wakuRelay.codec == betaCodec node3.wakuRelay.codec == stableCodec # Node2 has been loaded after "restart", but we have not yet reconnected - node3.peerManager.wakuPeerStore.peers().len == 1 - node3.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peerInfo2.peerId) - node3.peerManager.wakuPeerStore.peers().anyIt(it.protocols.contains(betaCodec)) - node3.peerManager.wakuPeerStore.connectedness(peerInfo2.peerId) == NotConnected + node3.peerManager.switch.peerStore.peers().len == 1 + node3.peerManager.switch.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + node3.peerManager.switch.peerStore.peers().anyIt(it.protocols.contains(betaCodec)) + node3.peerManager.switch.peerStore.connectedness(peerInfo2.peerId) == NotConnected await node3.start() # This should trigger a reconnect check: # Reconnected to node2 after "restart" - node3.peerManager.wakuPeerStore.peers().len == 1 - node3.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peerInfo2.peerId) - node3.peerManager.wakuPeerStore.peers().anyIt(it.protocols.contains(betaCodec)) - node3.peerManager.wakuPeerStore.peers().anyIt(it.protocols.contains(stableCodec)) - node3.peerManager.wakuPeerStore.connectedness(peerInfo2.peerId) == Connected + node3.peerManager.switch.peerStore.peers().len == 1 + node3.peerManager.switch.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) + node3.peerManager.switch.peerStore.peers().anyIt(it.protocols.contains(betaCodec)) + node3.peerManager.switch.peerStore.peers().anyIt( + it.protocols.contains(stableCodec) + ) + node3.peerManager.switch.peerStore.connectedness(peerInfo2.peerId) == Connected await allFutures([node1.stop(), node2.stop(), node3.stop()]) @@ -560,38 +566,38 @@ procSuite "Peer Manager": check: # Peerstore track all three peers - nodes[0].peerManager.wakuPeerStore.peers().len == 3 + nodes[0].peerManager.switch.peerStore.peers().len == 3 # All peer ids are correct - nodes[0].peerManager.wakuPeerStore.peers().anyIt( + nodes[0].peerManager.switch.peerStore.peers().anyIt( it.peerId == nodes[1].switch.peerInfo.peerId ) - nodes[0].peerManager.wakuPeerStore.peers().anyIt( + nodes[0].peerManager.switch.peerStore.peers().anyIt( it.peerId == nodes[2].switch.peerInfo.peerId ) - nodes[0].peerManager.wakuPeerStore.peers().anyIt( + nodes[0].peerManager.switch.peerStore.peers().anyIt( it.peerId == nodes[3].switch.peerInfo.peerId ) # All peers support the relay protocol - nodes[0].peerManager.wakuPeerStore[ProtoBook][nodes[1].switch.peerInfo.peerId].contains( + nodes[0].peerManager.switch.peerStore[ProtoBook][nodes[1].switch.peerInfo.peerId].contains( WakuRelayCodec ) - nodes[0].peerManager.wakuPeerStore[ProtoBook][nodes[2].switch.peerInfo.peerId].contains( + nodes[0].peerManager.switch.peerStore[ProtoBook][nodes[2].switch.peerInfo.peerId].contains( WakuRelayCodec ) - nodes[0].peerManager.wakuPeerStore[ProtoBook][nodes[3].switch.peerInfo.peerId].contains( + nodes[0].peerManager.switch.peerStore[ProtoBook][nodes[3].switch.peerInfo.peerId].contains( WakuRelayCodec ) # All peers are connected - nodes[0].peerManager.wakuPeerStore[ConnectionBook][ + nodes[0].peerManager.switch.peerStore[ConnectionBook][ nodes[1].switch.peerInfo.peerId ] == Connected - nodes[0].peerManager.wakuPeerStore[ConnectionBook][ + nodes[0].peerManager.switch.peerStore[ConnectionBook][ nodes[2].switch.peerInfo.peerId ] == Connected - nodes[0].peerManager.wakuPeerStore[ConnectionBook][ + nodes[0].peerManager.switch.peerStore[ConnectionBook][ nodes[3].switch.peerInfo.peerId ] == Connected @@ -630,38 +636,38 @@ procSuite "Peer Manager": check: # Peerstore track all three peers - nodes[0].peerManager.wakuPeerStore.peers().len == 3 + nodes[0].peerManager.switch.peerStore.peers().len == 3 # All peer ids are correct - nodes[0].peerManager.wakuPeerStore.peers().anyIt( + nodes[0].peerManager.switch.peerStore.peers().anyIt( it.peerId == nodes[1].switch.peerInfo.peerId ) - nodes[0].peerManager.wakuPeerStore.peers().anyIt( + nodes[0].peerManager.switch.peerStore.peers().anyIt( it.peerId == nodes[2].switch.peerInfo.peerId ) - nodes[0].peerManager.wakuPeerStore.peers().anyIt( + nodes[0].peerManager.switch.peerStore.peers().anyIt( it.peerId == nodes[3].switch.peerInfo.peerId ) # All peers support the relay protocol - nodes[0].peerManager.wakuPeerStore[ProtoBook][nodes[1].switch.peerInfo.peerId].contains( + nodes[0].peerManager.switch.peerStore[ProtoBook][nodes[1].switch.peerInfo.peerId].contains( WakuRelayCodec ) - nodes[0].peerManager.wakuPeerStore[ProtoBook][nodes[2].switch.peerInfo.peerId].contains( + nodes[0].peerManager.switch.peerStore[ProtoBook][nodes[2].switch.peerInfo.peerId].contains( WakuRelayCodec ) - nodes[0].peerManager.wakuPeerStore[ProtoBook][nodes[3].switch.peerInfo.peerId].contains( + nodes[0].peerManager.switch.peerStore[ProtoBook][nodes[3].switch.peerInfo.peerId].contains( WakuRelayCodec ) # All peers are connected - nodes[0].peerManager.wakuPeerStore[ConnectionBook][ + nodes[0].peerManager.switch.peerStore[ConnectionBook][ nodes[1].switch.peerInfo.peerId ] == Connected - nodes[0].peerManager.wakuPeerStore[ConnectionBook][ + nodes[0].peerManager.switch.peerStore[ConnectionBook][ nodes[2].switch.peerInfo.peerId ] == Connected - nodes[0].peerManager.wakuPeerStore[ConnectionBook][ + nodes[0].peerManager.switch.peerStore[ConnectionBook][ nodes[3].switch.peerInfo.peerId ] == Connected @@ -690,66 +696,72 @@ procSuite "Peer Manager": check: # Peerstore track all three peers - nodes[0].peerManager.wakuPeerStore.peers().len == 3 + nodes[0].peerManager.switch.peerStore.peers().len == 3 # Inbound/Outbound number of peers match - nodes[0].peerManager.wakuPeerStore.getPeersByDirection(Inbound).len == 3 - nodes[0].peerManager.wakuPeerStore.getPeersByDirection(Outbound).len == 0 - nodes[1].peerManager.wakuPeerStore.getPeersByDirection(Inbound).len == 0 - nodes[1].peerManager.wakuPeerStore.getPeersByDirection(Outbound).len == 1 - nodes[2].peerManager.wakuPeerStore.getPeersByDirection(Inbound).len == 0 - nodes[2].peerManager.wakuPeerStore.getPeersByDirection(Outbound).len == 1 - nodes[3].peerManager.wakuPeerStore.getPeersByDirection(Inbound).len == 0 - nodes[3].peerManager.wakuPeerStore.getPeersByDirection(Outbound).len == 1 + nodes[0].peerManager.switch.peerStore.getPeersByDirection(Inbound).len == 3 + nodes[0].peerManager.switch.peerStore.getPeersByDirection(Outbound).len == 0 + nodes[1].peerManager.switch.peerStore.getPeersByDirection(Inbound).len == 0 + nodes[1].peerManager.switch.peerStore.getPeersByDirection(Outbound).len == 1 + nodes[2].peerManager.switch.peerStore.getPeersByDirection(Inbound).len == 0 + nodes[2].peerManager.switch.peerStore.getPeersByDirection(Outbound).len == 1 + nodes[3].peerManager.switch.peerStore.getPeersByDirection(Inbound).len == 0 + nodes[3].peerManager.switch.peerStore.getPeersByDirection(Outbound).len == 1 # All peer ids are correct - nodes[0].peerManager.wakuPeerStore.peers().anyIt( + nodes[0].peerManager.switch.peerStore.peers().anyIt( it.peerId == nodes[1].switch.peerInfo.peerId ) - nodes[0].peerManager.wakuPeerStore.peers().anyIt( + nodes[0].peerManager.switch.peerStore.peers().anyIt( it.peerId == nodes[2].switch.peerInfo.peerId ) - nodes[0].peerManager.wakuPeerStore.peers().anyIt( + nodes[0].peerManager.switch.peerStore.peers().anyIt( it.peerId == nodes[3].switch.peerInfo.peerId ) # All peers support the relay protocol - nodes[0].peerManager.wakuPeerStore[ProtoBook][nodes[1].switch.peerInfo.peerId].contains( + nodes[0].peerManager.switch.peerStore[ProtoBook][nodes[1].switch.peerInfo.peerId].contains( WakuRelayCodec ) - nodes[0].peerManager.wakuPeerStore[ProtoBook][nodes[2].switch.peerInfo.peerId].contains( + nodes[0].peerManager.switch.peerStore[ProtoBook][nodes[2].switch.peerInfo.peerId].contains( WakuRelayCodec ) - nodes[0].peerManager.wakuPeerStore[ProtoBook][nodes[3].switch.peerInfo.peerId].contains( + nodes[0].peerManager.switch.peerStore[ProtoBook][nodes[3].switch.peerInfo.peerId].contains( WakuRelayCodec ) # All peers are connected - nodes[0].peerManager.wakuPeerStore[ConnectionBook][ + nodes[0].peerManager.switch.peerStore[ConnectionBook][ nodes[1].switch.peerInfo.peerId ] == Connected - nodes[0].peerManager.wakuPeerStore[ConnectionBook][ + nodes[0].peerManager.switch.peerStore[ConnectionBook][ nodes[2].switch.peerInfo.peerId ] == Connected - nodes[0].peerManager.wakuPeerStore[ConnectionBook][ + nodes[0].peerManager.switch.peerStore[ConnectionBook][ nodes[3].switch.peerInfo.peerId ] == Connected # All peers are Inbound in peer 0 - nodes[0].peerManager.wakuPeerStore[DirectionBook][nodes[1].switch.peerInfo.peerId] == - Inbound - nodes[0].peerManager.wakuPeerStore[DirectionBook][nodes[2].switch.peerInfo.peerId] == - Inbound - nodes[0].peerManager.wakuPeerStore[DirectionBook][nodes[3].switch.peerInfo.peerId] == - Inbound + nodes[0].peerManager.switch.peerStore[DirectionBook][ + nodes[1].switch.peerInfo.peerId + ] == Inbound + nodes[0].peerManager.switch.peerStore[DirectionBook][ + nodes[2].switch.peerInfo.peerId + ] == Inbound + nodes[0].peerManager.switch.peerStore[DirectionBook][ + nodes[3].switch.peerInfo.peerId + ] == Inbound # All peers have an Outbound connection with peer 0 - nodes[1].peerManager.wakuPeerStore[DirectionBook][nodes[0].switch.peerInfo.peerId] == - Outbound - nodes[2].peerManager.wakuPeerStore[DirectionBook][nodes[0].switch.peerInfo.peerId] == - Outbound - nodes[3].peerManager.wakuPeerStore[DirectionBook][nodes[0].switch.peerInfo.peerId] == - Outbound + nodes[1].peerManager.switch.peerStore[DirectionBook][ + nodes[0].switch.peerInfo.peerId + ] == Outbound + nodes[2].peerManager.switch.peerStore[DirectionBook][ + nodes[0].switch.peerInfo.peerId + ] == Outbound + nodes[3].peerManager.switch.peerStore[DirectionBook][ + nodes[0].switch.peerInfo.peerId + ] == Outbound await allFutures(nodes.mapIt(it.stop())) @@ -778,12 +790,13 @@ procSuite "Peer Manager": # all peers are stored in the peerstore check: - node.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peers[0].peerId) - node.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peers[1].peerId) - node.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peers[2].peerId) + node.peerManager.switch.peerStore.peers().anyIt(it.peerId == peers[0].peerId) + node.peerManager.switch.peerStore.peers().anyIt(it.peerId == peers[1].peerId) + node.peerManager.switch.peerStore.peers().anyIt(it.peerId == peers[2].peerId) # but the relay peer is not - node.peerManager.wakuPeerStore.peers().anyIt(it.peerId == peers[3].peerId) == false + node.peerManager.switch.peerStore.peers().anyIt(it.peerId == peers[3].peerId) == + false # all service peers are added to its service slot check: @@ -900,8 +913,8 @@ procSuite "Peer Manager": peers.len == 3 # Add a peer[0] to the peerstore - pm.wakuPeerStore[AddressBook][peers[0].peerId] = peers[0].addrs - pm.wakuPeerStore[ProtoBook][peers[0].peerId] = + pm.switch.peerStore[AddressBook][peers[0].peerId] = peers[0].addrs + pm.switch.peerStore[ProtoBook][peers[0].peerId] = @[WakuRelayCodec, WakuStoreCodec, WakuFilterSubscribeCodec] # When no service peers, we get one from the peerstore @@ -979,44 +992,44 @@ procSuite "Peer Manager": # Check that we have 30 peers in the peerstore check: - pm.wakuPeerStore.peers.len == 30 + pm.switch.peerStore.peers.len == 30 # fake that some peers failed to connected - pm.wakuPeerStore[NumberFailedConnBook][peers[0].peerId] = 2 - pm.wakuPeerStore[NumberFailedConnBook][peers[1].peerId] = 2 - pm.wakuPeerStore[NumberFailedConnBook][peers[2].peerId] = 2 - pm.wakuPeerStore[NumberFailedConnBook][peers[3].peerId] = 2 - pm.wakuPeerStore[NumberFailedConnBook][peers[4].peerId] = 2 + pm.switch.peerStore[NumberFailedConnBook][peers[0].peerId] = 2 + pm.switch.peerStore[NumberFailedConnBook][peers[1].peerId] = 2 + pm.switch.peerStore[NumberFailedConnBook][peers[2].peerId] = 2 + pm.switch.peerStore[NumberFailedConnBook][peers[3].peerId] = 2 + pm.switch.peerStore[NumberFailedConnBook][peers[4].peerId] = 2 # fake that some peers are connected - pm.wakuPeerStore[ConnectionBook][peers[5].peerId] = Connected - pm.wakuPeerStore[ConnectionBook][peers[8].peerId] = Connected - pm.wakuPeerStore[ConnectionBook][peers[15].peerId] = Connected - pm.wakuPeerStore[ConnectionBook][peers[18].peerId] = Connected - pm.wakuPeerStore[ConnectionBook][peers[24].peerId] = Connected - pm.wakuPeerStore[ConnectionBook][peers[29].peerId] = Connected + pm.switch.peerStore[ConnectionBook][peers[5].peerId] = Connected + pm.switch.peerStore[ConnectionBook][peers[8].peerId] = Connected + pm.switch.peerStore[ConnectionBook][peers[15].peerId] = Connected + pm.switch.peerStore[ConnectionBook][peers[18].peerId] = Connected + pm.switch.peerStore[ConnectionBook][peers[24].peerId] = Connected + pm.switch.peerStore[ConnectionBook][peers[29].peerId] = Connected # Prune the peerstore (current=30, target=25) pm.prunePeerStore() check: # ensure peerstore was pruned - pm.wakuPeerStore.peers.len == 25 + pm.switch.peerStore.peers.len == 25 # ensure connected peers were not pruned - pm.wakuPeerStore.peers.anyIt(it.peerId == peers[5].peerId) - pm.wakuPeerStore.peers.anyIt(it.peerId == peers[8].peerId) - pm.wakuPeerStore.peers.anyIt(it.peerId == peers[15].peerId) - pm.wakuPeerStore.peers.anyIt(it.peerId == peers[18].peerId) - pm.wakuPeerStore.peers.anyIt(it.peerId == peers[24].peerId) - pm.wakuPeerStore.peers.anyIt(it.peerId == peers[29].peerId) + pm.switch.peerStore.peers.anyIt(it.peerId == peers[5].peerId) + pm.switch.peerStore.peers.anyIt(it.peerId == peers[8].peerId) + pm.switch.peerStore.peers.anyIt(it.peerId == peers[15].peerId) + pm.switch.peerStore.peers.anyIt(it.peerId == peers[18].peerId) + pm.switch.peerStore.peers.anyIt(it.peerId == peers[24].peerId) + pm.switch.peerStore.peers.anyIt(it.peerId == peers[29].peerId) # ensure peers that failed were the first to be pruned - not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[0].peerId) - not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[1].peerId) - not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[2].peerId) - not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[3].peerId) - not pm.wakuPeerStore.peers.anyIt(it.peerId == peers[4].peerId) + not pm.switch.peerStore.peers.anyIt(it.peerId == peers[0].peerId) + not pm.switch.peerStore.peers.anyIt(it.peerId == peers[1].peerId) + not pm.switch.peerStore.peers.anyIt(it.peerId == peers[2].peerId) + not pm.switch.peerStore.peers.anyIt(it.peerId == peers[3].peerId) + not pm.switch.peerStore.peers.anyIt(it.peerId == peers[4].peerId) asyncTest "canBeConnected() returns correct value": let pm = PeerManager.new( @@ -1042,8 +1055,8 @@ procSuite "Peer Manager": pm.canBeConnected(p1) == true # peer with ONE error that just failed - pm.wakuPeerStore[NumberFailedConnBook][p1] = 1 - pm.wakuPeerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second) + pm.switch.peerStore[NumberFailedConnBook][p1] = 1 + pm.switch.peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second) # we cant connect right now check: pm.canBeConnected(p1) == false @@ -1054,8 +1067,8 @@ procSuite "Peer Manager": pm.canBeConnected(p1) == true # peer with TWO errors, we can connect until 2 seconds have passed - pm.wakuPeerStore[NumberFailedConnBook][p1] = 2 - pm.wakuPeerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second) + pm.switch.peerStore[NumberFailedConnBook][p1] = 2 + pm.switch.peerStore[LastFailedConnBook][p1] = Moment.init(getTime().toUnix, Second) # cant be connected after 1 second await sleepAsync(chronos.milliseconds(1000)) @@ -1152,6 +1165,23 @@ procSuite "Peer Manager": check: nodes[0].peerManager.ipTable["127.0.0.1"].len == 1 nodes[0].peerManager.switch.connManager.getConnections().len == 1 - nodes[0].peerManager.wakuPeerStore.peers().len == 1 + nodes[0].peerManager.switch.peerStore.peers().len == 1 await allFutures(nodes.mapIt(it.stop())) + + asyncTest "Retrieve peer that mounted peer exchange": + let + node1 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(55048)) + node2 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(55023)) + + await allFutures(node1.start(), node2.start()) + await allFutures(node1.mountRelay(), node2.mountRelay()) + await allFutures(node1.mountPeerExchange(), node2.mountPeerExchange()) + + await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) + + var r = node1.peerManager.selectPeer(WakuRelayCodec) + assert r.isSome(), "could not retrieve peer mounting WakuRelayCodec" + + r = node1.peerManager.selectPeer(WakuPeerExchangeCodec) + assert r.isSome(), "could not retrieve peer mounting WakuPeerExchangeCodec" diff --git a/tests/test_peer_store_extended.nim b/tests/test_peer_store_extended.nim index ef03fc69a..aa5947181 100644 --- a/tests/test_peer_store_extended.nim +++ b/tests/test_peer_store_extended.nim @@ -25,7 +25,7 @@ suite "Extended nim-libp2p Peer Store": setup: # Setup a nim-libp2p peerstore with some peers - let peerStore = WakuPeerStore.new(nil, capacity = 50) + let peerStore = PeerStore.new(nil, capacity = 50) var p1, p2, p3, p4, p5, p6: PeerId # create five peers basePeerId + [1-5] @@ -320,7 +320,7 @@ suite "Extended nim-libp2p Peer Store": test "del() successfully deletes waku custom books": # Given - let peerStore = WakuPeerStore.new(nil, capacity = 5) + let peerStore = PeerStore.new(nil, capacity = 5) var p1: PeerId require p1.init("QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW1") diff --git a/tests/test_waku_dnsdisc.nim b/tests/test_waku_dnsdisc.nim index 228fa5542..cf0fd4007 100644 --- a/tests/test_waku_dnsdisc.nim +++ b/tests/test_waku_dnsdisc.nim @@ -94,20 +94,20 @@ suite "Waku DNS Discovery": check: # We have successfully connected to all discovered nodes - node4.peerManager.wakuPeerStore.peers().anyIt( + node4.peerManager.switch.peerStore.peers().anyIt( it.peerId == node1.switch.peerInfo.peerId ) - node4.peerManager.wakuPeerStore.connectedness(node1.switch.peerInfo.peerId) == + node4.peerManager.switch.peerStore.connectedness(node1.switch.peerInfo.peerId) == Connected - node4.peerManager.wakuPeerStore.peers().anyIt( + node4.peerManager.switch.peerStore.peers().anyIt( it.peerId == node2.switch.peerInfo.peerId ) - node4.peerManager.wakuPeerStore.connectedness(node2.switch.peerInfo.peerId) == + node4.peerManager.switch.peerStore.connectedness(node2.switch.peerInfo.peerId) == Connected - node4.peerManager.wakuPeerStore.peers().anyIt( + node4.peerManager.switch.peerStore.peers().anyIt( it.peerId == node3.switch.peerInfo.peerId ) - node4.peerManager.wakuPeerStore.connectedness(node3.switch.peerInfo.peerId) == + node4.peerManager.switch.peerStore.connectedness(node3.switch.peerInfo.peerId) == Connected await allFutures([node1.stop(), node2.stop(), node3.stop(), node4.stop()]) diff --git a/tests/waku_discv5/test_all.nim b/tests/waku_discv5/test_all.nim deleted file mode 100644 index a6d2c22c4..000000000 --- a/tests/waku_discv5/test_all.nim +++ /dev/null @@ -1 +0,0 @@ -import ./test_waku_discv5 diff --git a/tests/waku_discv5/test_waku_discv5.nim b/tests/waku_discv5/test_waku_discv5.nim index c4696d658..3d66136e8 100644 --- a/tests/waku_discv5/test_waku_discv5.nim +++ b/tests/waku_discv5/test_waku_discv5.nim @@ -8,13 +8,15 @@ import chronicles, testutils/unittests, libp2p/crypto/crypto as libp2p_keys, - eth/keys as eth_keys + eth/keys as eth_keys, + libp2p/crypto/secp, + libp2p/protocols/rendezvous import - waku/[waku_core/topics, waku_enr, discovery/waku_discv5, common/enr], + waku/[waku_core/topics, waku_enr, discovery/waku_discv5, waku_enr/capabilities], ../testlib/[wakucore, testasync, assertions, futures, wakunode], ../waku_enr/utils, - ./utils + ./utils as discv5_utils import eth/p2p/discoveryv5/enr as ethEnr @@ -53,7 +55,7 @@ suite "Waku Discovery v5": var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum) require builder.withWakuRelaySharding(shardsTopics).isOk() - builder.withWakuCapabilities(Relay) + builder.withWakuCapabilities(Capabilities.Relay) let recordRes = builder.build() require recordRes.isOk() @@ -73,7 +75,7 @@ suite "Waku Discovery v5": var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum) require builder.withWakuRelaySharding(shardsTopics).isOk() - builder.withWakuCapabilities(Relay) + builder.withWakuCapabilities(Capabilities.Relay) let recordRes = builder.build() require recordRes.isOk() @@ -93,7 +95,7 @@ suite "Waku Discovery v5": var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum) require builder.withWakuRelaySharding(shardsTopics).isOk() - builder.withWakuCapabilities(Relay) + builder.withWakuCapabilities(Capabilities.Relay) let recordRes = builder.build() require recordRes.isOk() @@ -187,7 +189,7 @@ suite "Waku Discovery v5": indices = indices, flags = recordFlags, ) - node = newTestDiscv5( + node = discv5_utils.newTestDiscv5( privKey = privKey, bindIp = bindIp, tcpPort = tcpPort, @@ -342,7 +344,8 @@ suite "Waku Discovery v5": let res4 = await node4.start() assertResultOk res4 - await sleepAsync(FUTURE_TIMEOUT) + ## leave some time for discv5 to act + await sleepAsync(chronos.seconds(10)) ## When let peers = await node1.findRandomPeers() @@ -407,12 +410,69 @@ suite "Waku Discovery v5": enrs.len == 0 suite "waku discv5 initialization": + asyncTest "Start waku and check discv5 discovered peers": + let myRng = crypto.newRng() + var conf = defaultTestWakuNodeConf() + + conf.nodekey = some(crypto.PrivateKey.random(Secp256k1, myRng[])[]) + conf.discv5Discovery = true + conf.discv5UdpPort = Port(9000) + + let waku0 = Waku.new(conf).valueOr: + raiseAssert error + (waitFor startWaku(addr waku0)).isOkOr: + raiseAssert error + + conf.nodekey = some(crypto.PrivateKey.random(Secp256k1, myRng[])[]) + conf.discv5BootstrapNodes = @[waku0.node.enr.toURI()] + conf.discv5Discovery = true + conf.discv5UdpPort = Port(9001) + conf.tcpPort = Port(60001) + + let waku1 = Waku.new(conf).valueOr: + raiseAssert error + (waitFor startWaku(addr waku1)).isOkOr: + raiseAssert error + + await waku1.node.mountPeerExchange() + await waku1.node.mountRendezvous() + + var conf2 = conf + conf2.discv5BootstrapNodes = @[waku1.node.enr.toURI()] + conf2.discv5Discovery = true + conf2.tcpPort = Port(60003) + conf2.discv5UdpPort = Port(9003) + conf2.nodekey = some(crypto.PrivateKey.random(Secp256k1, myRng[])[]) + + let waku2 = Waku.new(conf2).valueOr: + raiseAssert error + (waitFor startWaku(addr waku2)).isOkOr: + raiseAssert error + + # leave some time for discv5 to act + await sleepAsync(chronos.seconds(10)) + + var r = waku0.node.peerManager.selectPeer(WakuPeerExchangeCodec) + assert r.isSome(), "could not retrieve peer mounting WakuPeerExchangeCodec" + + r = waku1.node.peerManager.selectPeer(WakuRelayCodec) + assert r.isSome(), "could not retrieve peer mounting WakuRelayCodec" + + r = waku1.node.peerManager.selectPeer(WakuPeerExchangeCodec) + assert r.isNone(), "should not retrieve peer mounting WakuPeerExchangeCodec" + + r = waku2.node.peerManager.selectPeer(WakuPeerExchangeCodec) + assert r.isSome(), "could not retrieve peer mounting WakuPeerExchangeCodec" + + r = waku2.node.peerManager.selectPeer(RendezVousCodec) + assert r.isSome(), "could not retrieve peer mounting RendezVousCodec" + asyncTest "Discv5 bootstrap nodes should be added to the peer store": var conf = defaultTestWakuNodeConf() conf.discv5BootstrapNodes = @[validEnr] - let waku = Waku.init(conf).valueOr: + let waku = Waku.new(conf).valueOr: raiseAssert error discard setupDiscoveryV5( @@ -421,7 +481,7 @@ suite "Waku Discovery v5": ) check: - waku.node.peerManager.wakuPeerStore.peers().anyIt( + waku.node.peerManager.switch.peerStore.peers().anyIt( it.enr.isSome() and it.enr.get().toUri() == validEnr ) @@ -432,7 +492,7 @@ suite "Waku Discovery v5": conf.discv5BootstrapNodes = @[invalidEnr] - let waku = Waku.init(conf).valueOr: + let waku = Waku.new(conf).valueOr: raiseAssert error discard setupDiscoveryV5( @@ -441,6 +501,6 @@ suite "Waku Discovery v5": ) check: - not waku.node.peerManager.wakuPeerStore.peers().anyIt( + not waku.node.peerManager.switch.peerStore.peers().anyIt( it.enr.isSome() and it.enr.get().toUri() == invalidEnr ) diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index ba04b6b00..39baeea3e 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -79,7 +79,6 @@ type ConnectionChangeHandler* = proc( type PeerManager* = ref object of RootObj switch*: Switch - wakuPeerStore*: WakuPeerStore wakuMetadata*: WakuMetadata initialBackoffInSec*: int backoffFactor*: int @@ -138,38 +137,13 @@ proc addPeer*( trace "skipping to manage our unmanageable self" return - if pm.wakuPeerStore[AddressBook][remotePeerInfo.peerId] == remotePeerInfo.addrs and - pm.wakuPeerStore[KeyBook][remotePeerInfo.peerId] == remotePeerInfo.publicKey and - pm.wakuPeerStore[ENRBook][remotePeerInfo.peerId].raw.len > 0: - let incomingEnr = remotePeerInfo.enr.valueOr: - trace "peer already managed and incoming ENR is empty", - remote_peer_id = $remotePeerInfo.peerId - return - - if pm.wakuPeerStore[ENRBook][remotePeerInfo.peerId].raw == incomingEnr.raw or - pm.wakuPeerStore[ENRBook][remotePeerInfo.peerId].seqNum > incomingEnr.seqNum: - trace "peer already managed and ENR info is already saved", - remote_peer_id = $remotePeerInfo.peerId - return + pm.switch.peerStore.addPeer(remotePeerInfo, origin) trace "Adding peer to manager", - peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs + peerId = remotePeerInfo.peerId, addresses = remotePeerInfo.addrs, origin waku_total_unique_peers.inc() - pm.wakuPeerStore[AddressBook][remotePeerInfo.peerId] = remotePeerInfo.addrs - pm.wakuPeerStore[KeyBook][remotePeerInfo.peerId] = remotePeerInfo.publicKey - pm.wakuPeerStore[SourceBook][remotePeerInfo.peerId] = origin - pm.wakuPeerStore[ProtoVersionBook][remotePeerInfo.peerId] = - remotePeerInfo.protoVersion - pm.wakuPeerStore[AgentBook][remotePeerInfo.peerId] = remotePeerInfo.agent - - if remotePeerInfo.protocols.len > 0: - pm.wakuPeerStore[ProtoBook][remotePeerInfo.peerId] = remotePeerInfo.protocols - - if remotePeerInfo.enr.isSome(): - pm.wakuPeerStore[ENRBook][remotePeerInfo.peerId] = remotePeerInfo.enr.get() - # Add peer to storage. Entry will subsequently be updated with connectedness information if not pm.storage.isNil: # Reading from the db (pm.storage) is only done on startup, hence you need to connect to all saved peers. @@ -180,6 +154,9 @@ proc addPeer*( pm.storage.insertOrReplace(remotePeerInfo) +proc getPeer(pm: PeerManager, peerId: PeerId): RemotePeerInfo = + return pm.switch.peerStore.getPeer(peerId) + proc loadFromStorage(pm: PeerManager) {.gcsafe.} = ## Load peers from storage, if available @@ -202,19 +179,20 @@ proc loadFromStorage(pm: PeerManager) {.gcsafe.} = version = remotePeerInfo.protoVersion # nim-libp2p books - pm.wakuPeerStore[AddressBook][peerId] = remotePeerInfo.addrs - pm.wakuPeerStore[ProtoBook][peerId] = remotePeerInfo.protocols - pm.wakuPeerStore[KeyBook][peerId] = remotePeerInfo.publicKey - pm.wakuPeerStore[AgentBook][peerId] = remotePeerInfo.agent - pm.wakuPeerStore[ProtoVersionBook][peerId] = remotePeerInfo.protoVersion + pm.switch.peerStore[AddressBook][peerId] = remotePeerInfo.addrs + pm.switch.peerStore[ProtoBook][peerId] = remotePeerInfo.protocols + pm.switch.peerStore[KeyBook][peerId] = remotePeerInfo.publicKey + pm.switch.peerStore[AgentBook][peerId] = remotePeerInfo.agent + pm.switch.peerStore[ProtoVersionBook][peerId] = remotePeerInfo.protoVersion # custom books - pm.wakuPeerStore[ConnectionBook][peerId] = NotConnected # Reset connectedness state - pm.wakuPeerStore[DisconnectBook][peerId] = remotePeerInfo.disconnectTime - pm.wakuPeerStore[SourceBook][peerId] = remotePeerInfo.origin + pm.switch.peerStore[ConnectionBook][peerId] = NotConnected + # Reset connectedness state + pm.switch.peerStore[DisconnectBook][peerId] = remotePeerInfo.disconnectTime + pm.switch.peerStore[SourceBook][peerId] = remotePeerInfo.origin if remotePeerInfo.enr.isSome(): - pm.wakuPeerStore[ENRBook][peerId] = remotePeerInfo.enr.get() + pm.switch.peerStore[ENRBook][peerId] = remotePeerInfo.enr.get() amount.inc() @@ -228,10 +206,11 @@ proc loadFromStorage(pm: PeerManager) {.gcsafe.} = proc selectPeer*( pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic) ): Option[RemotePeerInfo] = - trace "Selecting peer from peerstore", protocol = proto - # Selects the best peer for a given protocol - var peers = pm.wakuPeerStore.getPeersByProtocol(proto) + + var peers = pm.switch.peerStore.getPeersByProtocol(proto) + trace "Selecting peer from peerstore", + protocol = proto, peers, address = cast[uint](pm.switch.peerStore) if shard.isSome(): peers.keepItIf((it.enr.isSome() and it.enr.get().containsShard(shard.get()))) @@ -302,14 +281,16 @@ proc connectPeer*( ): Future[bool] {.async.} = let peerId = peer.peerId + var peerStore = pm.switch.peerStore + # Do not attempt to dial self if peerId == pm.switch.peerInfo.peerId: return false - if not pm.wakuPeerStore.peerExists(peerId): + if not peerStore.peerExists(peerId): pm.addPeer(peer) - let failedAttempts = pm.wakuPeerStore[NumberFailedConnBook][peerId] + let failedAttempts = peerStore[NumberFailedConnBook][peerId] trace "Connecting to peer", wireAddr = peer.addrs, peerId = peerId, failedAttempts = failedAttempts @@ -333,20 +314,19 @@ proc connectPeer*( waku_peers_dials.inc(labelValues = ["successful"]) waku_node_conns_initiated.inc(labelValues = [source]) - pm.wakuPeerStore[NumberFailedConnBook][peerId] = 0 + peerStore[NumberFailedConnBook][peerId] = 0 return true # Dial failed - pm.wakuPeerStore[NumberFailedConnBook][peerId] = - pm.wakuPeerStore[NumberFailedConnBook][peerId] + 1 - pm.wakuPeerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second) - pm.wakuPeerStore[ConnectionBook][peerId] = CannotConnect + peerStore[NumberFailedConnBook][peerId] = peerStore[NumberFailedConnBook][peerId] + 1 + peerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second) + peerStore[ConnectionBook][peerId] = CannotConnect trace "Connecting peer failed", peerId = peerId, reason = reasonFailed, - failedAttempts = pm.wakuPeerStore[NumberFailedConnBook][peerId] + failedAttempts = peerStore[NumberFailedConnBook][peerId] waku_peers_dials.inc(labelValues = [reasonFailed]) return false @@ -453,7 +433,7 @@ proc dialPeer*( # First add dialed peer info to peer store, if it does not exist yet.. # TODO: nim libp2p peerstore already adds them - if not pm.wakuPeerStore.hasPeer(remotePeerInfo.peerId, proto): + if not pm.switch.peerStore.hasPeer(remotePeerInfo.peerId, proto): trace "Adding newly dialed peer to manager", peerId = $remotePeerInfo.peerId, address = $remotePeerInfo.addrs[0], proto = proto pm.addPeer(remotePeerInfo) @@ -479,7 +459,8 @@ proc canBeConnected*(pm: PeerManager, peerId: PeerId): bool = # Returns if we can try to connect to this peer, based on past failed attempts # It uses an exponential backoff. Each connection attempt makes us # wait more before trying again. - let failedAttempts = pm.wakuPeerStore[NumberFailedConnBook][peerId] + let peerStore = pm.switch.peerStore + let failedAttempts = peerStore[NumberFailedConnBook][peerId] # if it never errored, we can try to connect if failedAttempts == 0: @@ -492,7 +473,7 @@ proc canBeConnected*(pm: PeerManager, peerId: PeerId): bool = # If it errored we wait an exponential backoff from last connection # the more failed attempts, the greater the backoff since last attempt let now = Moment.init(getTime().toUnix, Second) - let lastFailed = pm.wakuPeerStore[LastFailedConnBook][peerId] + let lastFailed = peerStore[LastFailedConnBook][peerId] let backoff = calculateBackoff(pm.initialBackoffInSec, pm.backoffFactor, failedAttempts) @@ -564,7 +545,7 @@ proc connectToRelayPeers*(pm: PeerManager) {.async.} = if outRelayPeers.len >= pm.outRelayPeersTarget: return - let notConnectedPeers = pm.wakuPeerStore.getDisconnectedPeers() + let notConnectedPeers = pm.switch.peerStore.getDisconnectedPeers() var outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId)) @@ -593,7 +574,7 @@ proc reconnectPeers*( debug "Reconnecting peers", proto = proto # Proto is not persisted, we need to iterate over all peers. - for peerInfo in pm.wakuPeerStore.peers(protocolMatcher(proto)): + for peerInfo in pm.switch.peerStore.peers(protocolMatcher(proto)): # Check that the peer can be connected if peerInfo.connectedness == CannotConnect: error "Not reconnecting to unreachable or non-existing peer", @@ -666,7 +647,7 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = break guardClauses if ( - pm.wakuPeerStore.hasPeer(peerId, WakuRelayCodec) and + pm.switch.peerStore.hasPeer(peerId, WakuRelayCodec) and not metadata.shards.anyIt(pm.wakuMetadata.shards.contains(it)) ): let myShardsString = "[ " & toSeq(pm.wakuMetadata.shards).join(", ") & " ]" @@ -680,13 +661,14 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = info "disconnecting from peer", peerId = peerId, reason = reason asyncSpawn(pm.switch.disconnect(peerId)) - pm.wakuPeerStore.delete(peerId) + pm.switch.peerStore.delete(peerId) # called when a peer i) first connects to us ii) disconnects all connections from us proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = if not pm.wakuMetadata.isNil() and event.kind == PeerEventKind.Joined: await pm.onPeerMetadata(peerId) + var peerStore = pm.switch.peerStore var direction: PeerDirection var connectedness: Connectedness @@ -698,7 +680,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = ## Check max allowed in-relay peers let inRelayPeers = pm.connectedPeers(WakuRelayCodec)[0] if inRelayPeers.len > pm.inRelayPeersTarget and - pm.wakuPeerStore.hasPeer(peerId, WakuRelayCodec): + peerStore.hasPeer(peerId, WakuRelayCodec): debug "disconnecting relay peer because reached max num in-relay peers", peerId = peerId, inRelayPeers = inRelayPeers.len, @@ -717,7 +699,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = for peerId in peersBehindIp[0 ..< (peersBehindIp.len - pm.colocationLimit)]: debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip asyncSpawn(pm.switch.disconnect(peerId)) - pm.wakuPeerStore.delete(peerId) + peerStore.delete(peerId) if not pm.onConnectionChange.isNil(): # we don't want to await for the callback to finish asyncSpawn pm.onConnectionChange(peerId, Joined) @@ -738,11 +720,11 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = of Identified: debug "event identified", peerId = peerId - pm.wakuPeerStore[ConnectionBook][peerId] = connectedness - pm.wakuPeerStore[DirectionBook][peerId] = direction + peerStore[ConnectionBook][peerId] = connectedness + peerStore[DirectionBook][peerId] = direction if not pm.storage.isNil: - var remotePeerInfo = pm.wakuPeerStore.getPeer(peerId) + var remotePeerInfo = peerStore.getPeer(peerId) if event.kind == PeerEventKind.Left: remotePeerInfo.disconnectTime = getTime().toUnix @@ -755,12 +737,12 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = proc logAndMetrics(pm: PeerManager) {.async.} = heartbeat "Scheduling log and metrics run", LogAndMetricsInterval: + var peerStore = pm.switch.peerStore # log metrics let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec) let maxConnections = pm.switch.connManager.inSema.size - let notConnectedPeers = pm.wakuPeerStore.getDisconnectedPeers().mapIt( - RemotePeerInfo.init(it.peerId, it.addrs) - ) + let notConnectedPeers = + peerStore.getDisconnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs)) let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId)) let totalConnections = pm.switch.connManager.getConnections().len @@ -772,7 +754,7 @@ proc logAndMetrics(pm: PeerManager) {.async.} = outsideBackoffPeers = outsideBackoffPeers.len # update prometheus metrics - for proto in pm.wakuPeerStore.getWakuProtos(): + for proto in peerStore.getWakuProtos(): let (protoConnsIn, protoConnsOut) = pm.connectedPeers(proto) let (protoStreamsIn, protoStreamsOut) = pm.getNumStreams(proto) waku_connected_peers.set( @@ -806,14 +788,16 @@ proc manageRelayPeers*(pm: PeerManager) {.async.} = let inTarget = pm.inRelayPeersTarget div pm.wakuMetadata.shards.len let outTarget = pm.outRelayPeersTarget div pm.wakuMetadata.shards.len + var peerStore = pm.switch.peerStore + for shard in pm.wakuMetadata.shards.items: # Filter out peer not on this shard let connectedInPeers = inPeers.filterIt( - pm.wakuPeerStore.hasShard(it, uint16(pm.wakuMetadata.clusterId), uint16(shard)) + peerStore.hasShard(it, uint16(pm.wakuMetadata.clusterId), uint16(shard)) ) let connectedOutPeers = outPeers.filterIt( - pm.wakuPeerStore.hasShard(it, uint16(pm.wakuMetadata.clusterId), uint16(shard)) + peerStore.hasShard(it, uint16(pm.wakuMetadata.clusterId), uint16(shard)) ) # Calculate the difference between current values and targets @@ -828,17 +812,17 @@ proc manageRelayPeers*(pm: PeerManager) {.async.} = # Get all peers for this shard var connectablePeers = - pm.wakuPeerStore.getPeersByShard(uint16(pm.wakuMetadata.clusterId), uint16(shard)) + peerStore.getPeersByShard(uint16(pm.wakuMetadata.clusterId), uint16(shard)) let shardCount = connectablePeers.len connectablePeers.keepItIf( - not pm.wakuPeerStore.isConnected(it.peerId) and pm.canBeConnected(it.peerId) + not peerStore.isConnected(it.peerId) and pm.canBeConnected(it.peerId) ) let connectableCount = connectablePeers.len - connectablePeers.keepItIf(pm.wakuPeerStore.hasCapability(it.peerId, Relay)) + connectablePeers.keepItIf(peerStore.hasCapability(it.peerId, Relay)) let relayCount = connectablePeers.len @@ -862,7 +846,7 @@ proc manageRelayPeers*(pm: PeerManager) {.async.} = if peersToConnect.len == 0: return - let uniquePeers = toSeq(peersToConnect).mapIt(pm.wakuPeerStore.getPeer(it)) + let uniquePeers = toSeq(peersToConnect).mapIt(peerStore.getPeer(it)) # Connect to all nodes for i in countup(0, uniquePeers.len, MaxParallelDials): @@ -871,8 +855,9 @@ proc manageRelayPeers*(pm: PeerManager) {.async.} = await pm.connectToNodes(uniquePeers[i ..< stop]) proc prunePeerStore*(pm: PeerManager) = - let numPeers = pm.wakuPeerStore[AddressBook].book.len - let capacity = pm.wakuPeerStore.getCapacity() + let peerStore = pm.switch.peerStore + let numPeers = peerStore[AddressBook].book.len + let capacity = peerStore.getCapacity() if numPeers <= capacity: return @@ -881,7 +866,7 @@ proc prunePeerStore*(pm: PeerManager) = var peersToPrune: HashSet[PeerId] # prune failed connections - for peerId, count in pm.wakuPeerStore[NumberFailedConnBook].book.pairs: + for peerId, count in peerStore[NumberFailedConnBook].book.pairs: if count < pm.maxFailedAttempts: continue @@ -890,7 +875,7 @@ proc prunePeerStore*(pm: PeerManager) = peersToPrune.incl(peerId) - var notConnected = pm.wakuPeerStore.getDisconnectedPeers().mapIt(it.peerId) + var notConnected = peerStore.getDisconnectedPeers().mapIt(it.peerId) # Always pick random non-connected peers shuffle(notConnected) @@ -899,11 +884,11 @@ proc prunePeerStore*(pm: PeerManager) = var peersByShard = initTable[uint16, seq[PeerId]]() for peer in notConnected: - if not pm.wakuPeerStore[ENRBook].contains(peer): + if not peerStore[ENRBook].contains(peer): shardlessPeers.add(peer) continue - let record = pm.wakuPeerStore[ENRBook][peer] + let record = peerStore[ENRBook][peer] let rec = record.toTyped().valueOr: shardlessPeers.add(peer) @@ -937,9 +922,9 @@ proc prunePeerStore*(pm: PeerManager) = peersToPrune.incl(peer) for peer in peersToPrune: - pm.wakuPeerStore.delete(peer) + peerStore.delete(peer) - let afterNumPeers = pm.wakuPeerStore[AddressBook].book.len + let afterNumPeers = peerStore[AddressBook].book.len trace "Finished pruning peer store", beforeNumPeers = numPeers, @@ -1060,7 +1045,6 @@ proc new*( let pm = PeerManager( switch: switch, wakuMetadata: wakuMetadata, - wakuPeerStore: createWakuPeerStore(switch.peerStore), storage: storage, initialBackoffInSec: initialBackoffInSec, backoffFactor: backoffFactor, @@ -1076,14 +1060,16 @@ proc new*( proc peerHook(peerId: PeerId, event: PeerEvent): Future[void] {.gcsafe.} = onPeerEvent(pm, peerId, event) + var peerStore = pm.switch.peerStore + proc peerStoreChanged(peerId: PeerId) {.gcsafe.} = - waku_peer_store_size.set(toSeq(pm.wakuPeerStore[AddressBook].book.keys).len.int64) + waku_peer_store_size.set(toSeq(peerStore[AddressBook].book.keys).len.int64) pm.switch.addPeerEventHandler(peerHook, PeerEventKind.Joined) pm.switch.addPeerEventHandler(peerHook, PeerEventKind.Left) # called every time the peerstore is updated - pm.wakuPeerStore[AddressBook].addHandler(peerStoreChanged) + peerStore[AddressBook].addHandler(peerStoreChanged) pm.serviceSlots = initTable[string, RemotePeerInfo]() pm.ipTable = initTable[string, seq[PeerId]]() diff --git a/waku/node/peer_manager/waku_peer_store.nim b/waku/node/peer_manager/waku_peer_store.nim index 027a1823f..777e4f2be 100644 --- a/waku/node/peer_manager/waku_peer_store.nim +++ b/waku/node/peer_manager/waku_peer_store.nim @@ -3,6 +3,7 @@ import std/[tables, sequtils, sets, options, strutils], chronos, + chronicles, eth/p2p/discoveryv5/enr, libp2p/builders, libp2p/peerstore @@ -11,14 +12,12 @@ import ../../waku_core, ../../waku_enr/sharding, ../../waku_enr/capabilities, - ../../common/utils/sequence + ../../common/utils/sequence, + ../../waku_core/peers export peerstore, builders type - WakuPeerStore* = ref object - peerStore: PeerStore - # Keeps track of the Connectedness state of a peer ConnectionBook* = ref object of PeerBook[Connectedness] @@ -40,137 +39,152 @@ type # Keeps track of the ENR (Ethereum Node Record) of a peer ENRBook* = ref object of PeerBook[enr.Record] -# Constructor -proc new*(T: type WakuPeerStore, identify: Identify, capacity = 1000): WakuPeerStore = - let peerStore = PeerStore.new(identify, capacity) - WakuPeerStore(peerStore: peerStore) - -proc createWakuPeerStore*(peerStore: PeerStore): WakuPeerStore = - WakuPeerStore(peerStore: peerStore) - -# Core functionality -proc `[]`*(wps: WakuPeerStore, T: typedesc): T = - wps.peerStore[T] - -proc getPeer*(wps: WakuPeerStore, peerId: PeerId): RemotePeerInfo = +proc getPeer*(peerStore: PeerStore, peerId: PeerId): RemotePeerInfo = RemotePeerInfo( peerId: peerId, - addrs: wps[AddressBook][peerId], + addrs: peerStore[AddressBook][peerId], enr: - if wps[ENRBook][peerId] != default(enr.Record): - some(wps[ENRBook][peerId]) + if peerStore[ENRBook][peerId] != default(enr.Record): + some(peerStore[ENRBook][peerId]) else: none(enr.Record), - protocols: wps[ProtoBook][peerId], - agent: wps[AgentBook][peerId], - protoVersion: wps[ProtoVersionBook][peerId], - publicKey: wps[KeyBook][peerId], - connectedness: wps[ConnectionBook][peerId], - disconnectTime: wps[DisconnectBook][peerId], - origin: wps[SourceBook][peerId], - direction: wps[DirectionBook][peerId], - lastFailedConn: wps[LastFailedConnBook][peerId], - numberFailedConn: wps[NumberFailedConnBook][peerId], + protocols: peerStore[ProtoBook][peerId], + agent: peerStore[AgentBook][peerId], + protoVersion: peerStore[ProtoVersionBook][peerId], + publicKey: peerStore[KeyBook][peerId], + connectedness: peerStore[ConnectionBook][peerId], + disconnectTime: peerStore[DisconnectBook][peerId], + origin: peerStore[SourceBook][peerId], + direction: peerStore[DirectionBook][peerId], + lastFailedConn: peerStore[LastFailedConnBook][peerId], + numberFailedConn: peerStore[NumberFailedConnBook][peerId], ) -proc addPeer*(wps: WakuPeerStore, peer: RemotePeerInfo) = - ## Only used in tests - wps[AddressBook][peer.peerId] = peer.addrs - wps[ProtoBook][peer.peerId] = peer.protocols - wps[AgentBook][peer.peerId] = peer.agent - wps[ProtoVersionBook][peer.peerId] = peer.protoVersion - wps[KeyBook][peer.peerId] = peer.publicKey - wps[ConnectionBook][peer.peerId] = peer.connectedness - wps[DisconnectBook][peer.peerId] = peer.disconnectTime - wps[SourceBook][peer.peerId] = peer.origin - wps[DirectionBook][peer.peerId] = peer.direction - wps[LastFailedConnBook][peer.peerId] = peer.lastFailedConn - wps[NumberFailedConnBook][peer.peerId] = peer.numberFailedConn - if peer.enr.isSome(): - wps[ENRBook][peer.peerId] = peer.enr.get() - -proc delete*(wps: WakuPeerStore, peerId: PeerId) = +proc delete*(peerStore: PeerStore, peerId: PeerId) = # Delete all the information of a given peer. - wps.peerStore.del(peerId) + peerStore.del(peerId) -# TODO: Rename peers() to getPeersByProtocol() -proc peers*(wps: WakuPeerStore): seq[RemotePeerInfo] = +proc peers*(peerStore: PeerStore): seq[RemotePeerInfo] = let allKeys = concat( - toSeq(wps[AddressBook].book.keys()), - toSeq(wps[ProtoBook].book.keys()), - toSeq(wps[KeyBook].book.keys()), + toSeq(peerStore[AddressBook].book.keys()), + toSeq(peerStore[ProtoBook].book.keys()), + toSeq(peerStore[KeyBook].book.keys()), ) .toHashSet() - return allKeys.mapIt(wps.getPeer(it)) + return allKeys.mapIt(peerStore.getPeer(it)) -proc peers*(wps: WakuPeerStore, proto: string): seq[RemotePeerInfo] = - wps.peers().filterIt(it.protocols.contains(proto)) +proc addPeer*(peerStore: PeerStore, peer: RemotePeerInfo, origin = UnknownOrigin) = + ## Notice that the origin parameter is used to manually override the given peer origin. + ## At the time of writing, this is used in waku_discv5 or waku_node (peer exchange.) + if peerStore[AddressBook][peer.peerId] == peer.addrs and + peerStore[KeyBook][peer.peerId] == peer.publicKey and + peerStore[ENRBook][peer.peerId].raw.len > 0: + let incomingEnr = peer.enr.valueOr: + trace "peer already managed and incoming ENR is empty", + remote_peer_id = $peer.peerId + return -proc peers*(wps: WakuPeerStore, protocolMatcher: Matcher): seq[RemotePeerInfo] = - wps.peers().filterIt(it.protocols.anyIt(protocolMatcher(it))) + if peerStore[ENRBook][peer.peerId].raw == incomingEnr.raw or + peerStore[ENRBook][peer.peerId].seqNum > incomingEnr.seqNum: + trace "peer already managed and ENR info is already saved", + remote_peer_id = $peer.peerId + return -proc connectedness*(wps: WakuPeerStore, peerId: PeerId): Connectedness = - wps[ConnectionBook].book.getOrDefault(peerId, NotConnected) + peerStore[AddressBook][peer.peerId] = peer.addrs -proc hasShard*(wps: WakuPeerStore, peerId: PeerID, cluster, shard: uint16): bool = - wps[ENRBook].book.getOrDefault(peerId).containsShard(cluster, shard) + var protos = peerStore[ProtoBook][peer.peerId] + for new_proto in peer.protocols: + ## append new discovered protocols to the current known protocols set + if not protos.contains(new_proto): + protos.add($new_proto) + peerStore[ProtoBook][peer.peerId] = protos -proc hasCapability*(wps: WakuPeerStore, peerId: PeerID, cap: Capabilities): bool = - wps[ENRBook].book.getOrDefault(peerId).supportsCapability(cap) + peerStore[AgentBook][peer.peerId] = peer.agent + peerStore[ProtoVersionBook][peer.peerId] = peer.protoVersion + peerStore[KeyBook][peer.peerId] = peer.publicKey + peerStore[ConnectionBook][peer.peerId] = peer.connectedness + peerStore[DisconnectBook][peer.peerId] = peer.disconnectTime + peerStore[SourceBook][peer.peerId] = + if origin != UnknownOrigin: origin else: peer.origin + peerStore[DirectionBook][peer.peerId] = peer.direction + peerStore[LastFailedConnBook][peer.peerId] = peer.lastFailedConn + peerStore[NumberFailedConnBook][peer.peerId] = peer.numberFailedConn + if peer.enr.isSome(): + peerStore[ENRBook][peer.peerId] = peer.enr.get() -proc peerExists*(wps: WakuPeerStore, peerId: PeerId): bool = - wps[AddressBook].contains(peerId) +proc peers*(peerStore: PeerStore, proto: string): seq[RemotePeerInfo] = + peerStore.peers().filterIt(it.protocols.contains(proto)) -proc isConnected*(wps: WakuPeerStore, peerId: PeerID): bool = +proc peers*(peerStore: PeerStore, protocolMatcher: Matcher): seq[RemotePeerInfo] = + peerStore.peers().filterIt(it.protocols.anyIt(protocolMatcher(it))) + +proc connectedness*(peerStore: PeerStore, peerId: PeerId): Connectedness = + peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected) + +proc hasShard*(peerStore: PeerStore, peerId: PeerID, cluster, shard: uint16): bool = + peerStore[ENRBook].book.getOrDefault(peerId).containsShard(cluster, shard) + +proc hasCapability*(peerStore: PeerStore, peerId: PeerID, cap: Capabilities): bool = + peerStore[ENRBook].book.getOrDefault(peerId).supportsCapability(cap) + +proc peerExists*(peerStore: PeerStore, peerId: PeerId): bool = + peerStore[AddressBook].contains(peerId) + +proc isConnected*(peerStore: PeerStore, peerId: PeerID): bool = # Returns `true` if the peer is connected - wps.connectedness(peerId) == Connected + peerStore.connectedness(peerId) == Connected -proc hasPeer*(wps: WakuPeerStore, peerId: PeerID, proto: string): bool = +proc hasPeer*(peerStore: PeerStore, peerId: PeerID, proto: string): bool = # Returns `true` if peer is included in manager for the specified protocol - # TODO: What if peer does not exist in the wps? - wps.getPeer(peerId).protocols.contains(proto) + # TODO: What if peer does not exist in the peerStore? + peerStore.getPeer(peerId).protocols.contains(proto) -proc hasPeers*(wps: WakuPeerStore, proto: string): bool = +proc hasPeers*(peerStore: PeerStore, proto: string): bool = # Returns `true` if the peerstore has any peer for the specified protocol - toSeq(wps[ProtoBook].book.values()).anyIt(it.anyIt(it == proto)) + toSeq(peerStore[ProtoBook].book.values()).anyIt(it.anyIt(it == proto)) -proc hasPeers*(wps: WakuPeerStore, protocolMatcher: Matcher): bool = +proc hasPeers*(peerStore: PeerStore, protocolMatcher: Matcher): bool = # Returns `true` if the peerstore has any peer matching the protocolMatcher - toSeq(wps[ProtoBook].book.values()).anyIt(it.anyIt(protocolMatcher(it))) + toSeq(peerStore[ProtoBook].book.values()).anyIt(it.anyIt(protocolMatcher(it))) -proc getCapacity*(wps: WakuPeerStore): int = - wps.peerStore.capacity +proc getCapacity*(peerStore: PeerStore): int = + peerStore.capacity -proc setCapacity*(wps: WakuPeerStore, capacity: int) = - wps.peerStore.capacity = capacity +proc setCapacity*(peerStore: PeerStore, capacity: int) = + peerStore.capacity = capacity -proc getWakuProtos*(wps: WakuPeerStore): seq[string] = - toSeq(wps[ProtoBook].book.values()).flatten().deduplicate().filterIt( +proc getWakuProtos*(peerStore: PeerStore): seq[string] = + toSeq(peerStore[ProtoBook].book.values()).flatten().deduplicate().filterIt( it.startsWith("/vac/waku") ) proc getPeersByDirection*( - wps: WakuPeerStore, direction: PeerDirection + peerStore: PeerStore, direction: PeerDirection ): seq[RemotePeerInfo] = - return wps.peers.filterIt(it.direction == direction) + return peerStore.peers.filterIt(it.direction == direction) -proc getDisconnectedPeers*(wps: WakuPeerStore): seq[RemotePeerInfo] = - return wps.peers.filterIt(it.connectedness != Connected) +proc getDisconnectedPeers*(peerStore: PeerStore): seq[RemotePeerInfo] = + return peerStore.peers.filterIt(it.connectedness != Connected) -proc getConnectedPeers*(wps: WakuPeerStore): seq[RemotePeerInfo] = - return wps.peers.filterIt(it.connectedness == Connected) +proc getConnectedPeers*(peerStore: PeerStore): seq[RemotePeerInfo] = + return peerStore.peers.filterIt(it.connectedness == Connected) -proc getPeersByProtocol*(wps: WakuPeerStore, proto: string): seq[RemotePeerInfo] = - return wps.peers.filterIt(it.protocols.contains(proto)) +proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[RemotePeerInfo] = + return peerStore.peers.filterIt(it.protocols.contains(proto)) -proc getReachablePeers*(wps: WakuPeerStore): seq[RemotePeerInfo] = +proc getReachablePeers*(peerStore: PeerStore): seq[RemotePeerInfo] = + return peerStore.peers.filterIt(it.connectedness != CannotConnect) + +proc getPeersByShard*( + peerStore: PeerStore, cluster, shard: uint16 +): seq[RemotePeerInfo] = + return peerStore.peers.filterIt( + it.enr.isSome() and it.enr.get().containsShard(cluster, shard) + ) + +proc getPeersByCapability*( + peerStore: PeerStore, cap: Capabilities +): seq[RemotePeerInfo] = return - wps.peers.filterIt(it.connectedness == CanConnect or it.connectedness == Connected) - -proc getPeersByShard*(wps: WakuPeerStore, cluster, shard: uint16): seq[RemotePeerInfo] = - return - wps.peers.filterIt(it.enr.isSome() and it.enr.get().containsShard(cluster, shard)) - -proc getPeersByCapability*(wps: WakuPeerStore, cap: Capabilities): seq[RemotePeerInfo] = - return wps.peers.filterIt(it.enr.isSome() and it.enr.get().supportsCapability(cap)) + peerStore.peers.filterIt(it.enr.isSome() and it.enr.get().supportsCapability(cap)) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index cb712befd..ae08b503a 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -417,7 +417,7 @@ proc startRelay*(node: WakuNode) {.async.} = ## Setup relay protocol # Resume previous relay connections - if node.peerManager.wakuPeerStore.hasPeers(protocolMatcher(WakuRelayCodec)): + if node.peerManager.switch.peerStore.hasPeers(protocolMatcher(WakuRelayCodec)): info "Found previous WakuRelay peers. Reconnecting." # Reconnect to previous relay peers. This will respect a backoff period, if necessary @@ -1260,7 +1260,7 @@ proc fetchPeerExchangePeers*( ) ) - info "Retrieving peer info via peer exchange protocol" + info "Retrieving peer info via peer exchange protocol", amount let pxPeersRes = await node.wakuPeerExchange.request(amount) if pxPeersRes.isOk: var validPeers = 0 diff --git a/waku/waku_api/rest/admin/handlers.nim b/waku/waku_api/rest/admin/handlers.nim index c140c46d6..f2eb4a8ba 100644 --- a/waku/waku_api/rest/admin/handlers.nim +++ b/waku/waku_api/rest/admin/handlers.nim @@ -41,7 +41,7 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = router.api(MethodGet, ROUTE_ADMIN_V1_PEERS) do() -> RestApiResponse: var peers: WakuPeers = @[] - let relayPeers = node.peerManager.wakuPeerStore.peers(WakuRelayCodec).mapIt( + let relayPeers = node.peerManager.switch.peerStore.peers(WakuRelayCodec).mapIt( ( multiaddr: constructMultiaddrStr(it), protocol: WakuRelayCodec, @@ -51,7 +51,7 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = ) tuplesToWakuPeers(peers, relayPeers) - let filterV2Peers = node.peerManager.wakuPeerStore + let filterV2Peers = node.peerManager.switch.peerStore .peers(WakuFilterSubscribeCodec) .mapIt( ( @@ -63,7 +63,7 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = ) tuplesToWakuPeers(peers, filterV2Peers) - let storePeers = node.peerManager.wakuPeerStore.peers(WakuStoreCodec).mapIt( + let storePeers = node.peerManager.switch.peerStore.peers(WakuStoreCodec).mapIt( ( multiaddr: constructMultiaddrStr(it), protocol: WakuStoreCodec, @@ -73,7 +73,7 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = ) tuplesToWakuPeers(peers, storePeers) - let legacyStorePeers = node.peerManager.wakuPeerStore + let legacyStorePeers = node.peerManager.switch.peerStore .peers(WakuLegacyStoreCodec) .mapIt( ( @@ -85,7 +85,7 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = ) tuplesToWakuPeers(peers, legacyStorePeers) - let legacyLightpushPeers = node.peerManager.wakuPeerStore + let legacyLightpushPeers = node.peerManager.switch.peerStore .peers(WakuLegacyLightPushCodec) .mapIt( ( @@ -97,7 +97,9 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = ) tuplesToWakuPeers(peers, legacyLightpushPeers) - let lightpushPeers = node.peerManager.wakuPeerStore.peers(WakuLightPushCodec).mapIt( + let lightpushPeers = node.peerManager.switch.peerStore + .peers(WakuLightPushCodec) + .mapIt( ( multiaddr: constructMultiaddrStr(it), protocol: WakuLightPushCodec, @@ -107,7 +109,7 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = ) tuplesToWakuPeers(peers, lightpushPeers) - let pxPeers = node.peerManager.wakuPeerStore.peers(WakuPeerExchangeCodec).mapIt( + let pxPeers = node.peerManager.switch.peerStore.peers(WakuPeerExchangeCodec).mapIt( ( multiaddr: constructMultiaddrStr(it), protocol: WakuPeerExchangeCodec, diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index d8b79ab67..c3a4683f7 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -225,7 +225,7 @@ proc maintainSubscriptions*(wf: WakuFilter) {.async.} = ## Remove subscriptions for peers that have been removed from peer store var peersToRemove: seq[PeerId] for peerId in wf.subscriptions.peersSubscribed.keys: - if not wf.peerManager.wakuPeerStore.hasPeer(peerId, WakuFilterPushCodec): + if not wf.peerManager.switch.peerStore.hasPeer(peerId, WakuFilterPushCodec): debug "peer has been removed from peer store, we will remove subscription", peerId = peerId peersToRemove.add(peerId) diff --git a/waku/waku_peer_exchange/protocol.nim b/waku/waku_peer_exchange/protocol.nim index 7c9005215..2732cb1c1 100644 --- a/waku/waku_peer_exchange/protocol.nim +++ b/waku/waku_peer_exchange/protocol.nim @@ -218,7 +218,7 @@ proc poolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool = proc populateEnrCache(wpx: WakuPeerExchange) = # share only peers that i) are reachable ii) come from discv5 iii) share cluster - let withEnr = wpx.peerManager.wakuPeerStore.getReachablePeers().filterIt( + let withEnr = wpx.peerManager.switch.peerStore.getReachablePeers().filterIt( poolFilter(wpx.cluster, it) )