diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index f46ccf231..11f835b37 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -18,6 +18,7 @@ import admin_api, private_api], ../../waku/v2/protocol/message_notifier, + ../../waku/v2/protocol/waku_relay, ../../waku/v2/protocol/waku_store/waku_store, ../../waku/v2/protocol/waku_swap/waku_swap, ../../waku/v2/protocol/waku_filter/waku_filter, @@ -366,7 +367,59 @@ procSuite "Waku v2 JSON-RPC API": server.close() waitfor node.stop() - asyncTest "Admin API: get peer information": + asyncTest "Admin API: get managed peer information": + # Create a couple of nodes + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), + Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), + Port(60002)) + peerInfo2 = node2.peerInfo + nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"), + Port(60004)) + peerInfo3 = node3.peerInfo + + await allFutures([node1.start(), node2.start(), node3.start()]) + + node1.mountRelay() + node2.mountRelay() + node3.mountRelay() + + # Dial nodes 2 and 3 from node1 + await node1.dialPeer(constructMultiaddrStr(peerInfo2)) + await node1.dialPeer(constructMultiaddrStr(peerInfo3)) + + # RPC server setup + let + rpcPort = Port(8545) + ta = initTAddress(bindIp, rpcPort) + server = newRpcHttpServer([ta]) + + installAdminApiHandlers(node1, server) + server.start() + + let client = newRpcHttpClient() + await client.connect("127.0.0.1", rpcPort) + + let response = await client.get_waku_v2_admin_v1_peers() + + check: + response.len == 2 + # Check peer 2 + response.anyIt(it.protocol == WakuRelayCodec and + it.multiaddr == constructMultiaddrStr(peerInfo2)) + # Check peer 3 + response.anyIt(it.protocol == WakuRelayCodec and + it.multiaddr == constructMultiaddrStr(peerInfo3)) + + server.stop() + server.close() + await allFutures([node1.stop(), node2.stop(), node3.stop()]) + + asyncTest "Admin API: get unmanaged peer information": const cTopic = ContentTopic(1) waitFor node.start() diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index 57c06e45b..1ad4527d4 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -15,7 +15,7 @@ import ../test_helpers procSuite "Peer Manager": - asyncTest "Peer dialing": + asyncTest "Peer dialing works": let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), @@ -31,7 +31,7 @@ procSuite "Peer Manager": node2.mountRelay() # Dial node2 from node1 - let conn = await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec) + let conn = (await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec)).get() # Check connection check: @@ -45,4 +45,30 @@ procSuite "Peer Manager": # Check connectedness check: node1.peerManager.connectedness(peerInfo2.peerId) + + await allFutures([node1.stop(), node2.stop()]) + + asyncTest "Dialing fails gracefully": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), + Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), + Port(60002)) + peerInfo2 = node2.peerInfo + + await node1.start() + # Purposefully don't start node2 + node1.mountRelay() + node2.mountRelay() + + # Dial node2 from node1 + let connOpt = await node1.peerManager.dialPeer(peerInfo2, WakuRelayCodec, 2.seconds) + + # Check connection failed gracefully + check: + connOpt.isNone() + + await node1.stop() diff --git a/waku/v2/node/jsonrpc/admin_api.nim b/waku/v2/node/jsonrpc/admin_api.nim index ee8745aa5..ebc482071 100644 --- a/waku/v2/node/jsonrpc/admin_api.nim +++ b/waku/v2/node/jsonrpc/admin_api.nim @@ -1,20 +1,25 @@ {.push raises: [Exception, Defect].} import - std/[options,sequtils], + std/[options, sequtils, sets], json_rpc/rpcserver, libp2p/[peerinfo, switch], ../../protocol/waku_store/[waku_store_types, waku_store], ../../protocol/waku_swap/[waku_swap_types, waku_swap], ../../protocol/waku_filter/[waku_filter_types, waku_filter], ../wakunode2, + ../peer_manager, ./jsonrpc_types export jsonrpc_types +proc constructMultiaddrStr*(wireaddr: MultiAddress, peerId: PeerId): string = + # Constructs a multiaddress with both wire address and p2p identity + $wireaddr & "/p2p/" & $peerId + proc constructMultiaddrStr*(peerInfo: PeerInfo): string = - # Constructs a multiaddress with both location address and p2p identity - $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId + # Constructs a multiaddress with both location (wire) address and p2p identity + constructMultiaddrStr(peerInfo.addrs[0], peerInfo.peerId) proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = @@ -29,6 +34,18 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = var wPeers: seq[WakuPeer] = @[] + ## Managed peers + + if not node.wakuRelay.isNil: + # Map all managed peers to WakuPeers and add to return list + wPeers.insert(node.peerManager.peers.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(toSeq(it.addrs.items)[0], it.peerId), + protocol: toSeq(it.protos.items)[0], + connected: node.peerManager.connectedness(it.peerId))), + wPeers.len) # Append to the end of the sequence + + ## Unmanaged peers + ## @TODO add these peers to peer manager + if not node.wakuSwap.isNil: # Map WakuSwap peers to WakuPeers and add to return list wPeers.insert(node.wakuSwap.peers.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it.peerInfo), diff --git a/waku/v2/node/peer_manager.nim b/waku/v2/node/peer_manager.nim index ccad7d3fb..c77ef2982 100644 --- a/waku/v2/node/peer_manager.nim +++ b/waku/v2/node/peer_manager.nim @@ -1,15 +1,26 @@ {.push raises: [Defect, Exception].} import - chronos, chronicles, + std/options, + chronos, chronicles, metrics, libp2p/standard_setup, libp2p/peerstore +export peerstore, standard_setup + +declareCounter waku_peers_dials, "Number of peer dials", ["outcome"] + +logScope: + topics = "wakupeers" + type PeerManager* = ref object of RootObj switch*: Switch peerStore*: PeerStore +const + defaultDialTimeout = 1.minutes # @TODO should this be made configurable? + proc new*(T: type PeerManager, switch: Switch): PeerManager = T(switch: switch, peerStore: PeerStore.new()) @@ -18,7 +29,7 @@ proc new*(T: type PeerManager, switch: Switch): PeerManager = # Dialer interface # #################### -proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string): Future[Connection] {.async.} = +proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string, dialTimeout = defaultDialTimeout): Future[Option[Connection]] {.async.} = # Dial a given peer and add it to the list of known peers # @TODO check peer validity, duplicates and score before continuing. Limit number of peers to be managed. @@ -43,7 +54,25 @@ proc dialPeer*(pm: PeerManager, peerInfo: PeerInfo, proto: string): Future[Conne # Dial Peer # @TODO Keep track of conn and connected state in peer store - return await pm.switch.dial(peerInfo.peerId, peerInfo.addrs, proto) + let dialFut = pm.switch.dial(peerInfo.peerId, peerInfo.addrs, proto) + + try: + # Attempt to dial remote peer + if (await dialFut.withTimeout(dialTimeout)): + waku_peers_dials.inc(labelValues = ["successful"]) + return some(dialFut.read()) + else: + # @TODO any redial attempts? + # @TODO indicate CannotConnect on peer metadata + debug "Dialing remote peer timed out" + waku_peers_dials.inc(labelValues = ["timeout"]) + return none(Connection) + except CatchableError as e: + # @TODO any redial attempts? + # @TODO indicate CannotConnect on peer metadata + debug "Dialing remote peer failed", msg = e.msg + waku_peers_dials.inc(labelValues = ["failed"]) + return none(Connection) ##################### # Manager interface #