From 371016b04fc8467f6a4c09564b26d96d1acd0999 Mon Sep 17 00:00:00 2001 From: kaiserd <1684595+kaiserd@users.noreply.github.com> Date: Tue, 20 Sep 2022 13:03:34 +0200 Subject: [PATCH] feat: waku peer exchange (RFC34) (#1152) --- examples/v2/chat2.nim | 1 + tests/all_tests_v2.nim | 1 + tests/v2/test_waku_peer_exchange.nim | 146 ++++++++++++ waku/common/wakubridge.nim | 1 + waku/v2/node/config.nim | 11 + waku/v2/node/peer_manager/peer_manager.nim | 46 ++++ waku/v2/node/wakunode2.nim | 88 ++++---- waku/v2/node/wakunode2_setup_metrics.nim | 7 +- waku/v2/node/wakunode2_types.nim | 2 + waku/v2/protocol/waku_peer_exchange.nim | 13 ++ waku/v2/protocol/waku_peer_exchange/README.md | 3 + .../v2/protocol/waku_peer_exchange/client.nim | 9 + .../protocol/waku_peer_exchange/protocol.nim | 209 ++++++++++++++++++ waku/v2/protocol/waku_peer_exchange/rpc.nim | 13 ++ .../protocol/waku_peer_exchange/rpc_codec.nim | 91 ++++++++ 15 files changed, 592 insertions(+), 49 deletions(-) create mode 100644 tests/v2/test_waku_peer_exchange.nim create mode 100644 waku/v2/protocol/waku_peer_exchange.nim create mode 100644 waku/v2/protocol/waku_peer_exchange/README.md create mode 100644 waku/v2/protocol/waku_peer_exchange/client.nim create mode 100644 waku/v2/protocol/waku_peer_exchange/protocol.nim create mode 100644 waku/v2/protocol/waku_peer_exchange/rpc.nim create mode 100644 waku/v2/protocol/waku_peer_exchange/rpc_codec.nim diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim index 26590df5c..ce7c31a2e 100644 --- a/examples/v2/chat2.nim +++ b/examples/v2/chat2.nim @@ -27,6 +27,7 @@ import ../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_store, ../../waku/v2/node/[wakunode2, waku_payload], ../../waku/v2/node/dnsdisc/waku_dnsdisc, + ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/utils/[peers, time], ../../waku/common/utils/nat, ./config_chat2 diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index 9a8404001..c0d40d624 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -17,6 +17,7 @@ import # Waku Filter ./v2/test_waku_filter, ./v2/test_wakunode_filter, + ./v2/test_waku_peer_exchange, ./v2/test_waku_payload, ./v2/test_waku_swap, ./v2/test_utils_peers, diff --git a/tests/v2/test_waku_peer_exchange.nim b/tests/v2/test_waku_peer_exchange.nim new file mode 100644 index 000000000..a07438b5d --- /dev/null +++ b/tests/v2/test_waku_peer_exchange.nim @@ -0,0 +1,146 @@ +{.used.} + +import + std/[options, tables, sets], + testutils/unittests, + chronos, + chronicles, + stew/shims/net, + libp2p/switch, + libp2p/crypto/crypto, + libp2p/multistream, + eth/keys, + eth/p2p/discoveryv5/enr +import + ../../waku/v2/node/wakunode2, + ../../waku/v2/node/peer_manager/peer_manager, + ../../waku/v2/node/discv5/waku_discv5, + ../../waku/v2/protocol/waku_peer_exchange, + ../../waku/v2/protocol/waku_relay, + ../test_helpers, + ./utils + + +# TODO: Extend test coverage +procSuite "Waku Peer Exchange": + + asyncTest "encode and decode peer exchange response": + ## Setup + var + enr1 = enr.Record(seqNum: 0, raw: @[]) + enr2 = enr.Record(seqNum: 0, raw: @[]) + + discard enr1.fromUri("enr:-JK4QPmO-sE2ELiWr8qVFs1kaY4jQZQpNaHvSPRmKiKcaDoqYRdki2c1BKSliImsxFeOD_UHnkddNL2l0XT9wlsP0WEBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQIMwKqlOl3zpwnrsKRKHuWPSuFzit1Cl6IZvL2uzBRe8oN0Y3CC6mKDdWRwgiMqhXdha3UyDw") + discard enr2.fromUri("enr:-Iu4QK_T7kzAmewG92u1pr7o6St3sBqXaiIaWIsFNW53_maJEaOtGLSN2FUbm6LmVxSfb1WfC7Eyk-nFYI7Gs3SlchwBgmlkgnY0gmlwhI5d6VKJc2VjcDI1NmsxoQLPYQDvrrFdCrhqw3JuFaGD71I8PtPfk6e7TJ3pg_vFQYN0Y3CC6mKDdWRwgiMq") + + let peerInfos = @[ + PeerExchangePeerInfo(enr: enr1.raw), + PeerExchangePeerInfo(enr: enr2.raw), + ] + + var rpc = PeerExchangeRpc( + response: PeerExchangeResponse( + peerInfos: peerInfos + ) + ) + + ## When + let + rpcBuffer: seq[byte] = rpc.encode().buffer + res = PeerExchangeRpc.init(rpcBuffer) + + ## Then + check: + res.isOk + res.get().response.peerInfos == peerInfos + + ## When + var + resEnr1 = enr.Record(seqNum: 0, raw: @[]) + resEnr2 = enr.Record(seqNum: 0, raw: @[]) + + discard resEnr1.fromBytes(res.get().response.peerInfos[0].enr) + discard resEnr2.fromBytes(res.get().response.peerInfos[1].enr) + + ## Then + check: + resEnr1 == enr1 + resEnr2 == enr2 + + asyncTest "retrieve and provide peer exchange peers from discv5": + ## Setup (copied from test_waku_discv5.nim) + let + bindIp = ValidIpAddress.init("0.0.0.0") + extIp = ValidIpAddress.init("127.0.0.1") + + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + nodeTcpPort1 = Port(60000) + nodeUdpPort1 = Port(9000) + node1 = WakuNode.new(nodeKey1, bindIp, nodeTcpPort1) + + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + nodeTcpPort2 = Port(60002) + nodeUdpPort2 = Port(9002) + node2 = WakuNode.new(nodeKey2, bindIp, nodeTcpPort2) + + nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] + nodeTcpPort3 = Port(60004) + nodeUdpPort3 = Port(9004) + node3 = WakuNode.new(nodeKey3, bindIp, nodeTcpPort3) + + # todo: px flag + flags = initWakuFlags(lightpush = false, + filter = false, + store = false, + relay = true) + + # Mount discv5 + node1.wakuDiscv5 = WakuDiscoveryV5.new( + some(extIp), some(nodeTcpPort1), some(nodeUdpPort1), + bindIp, + nodeUdpPort1, + newSeq[string](), + false, + keys.PrivateKey(nodeKey1.skkey), + flags, + [], # Empty enr fields, for now + node1.rng + ) + + node2.wakuDiscv5 = WakuDiscoveryV5.new( + some(extIp), some(nodeTcpPort2), some(nodeUdpPort2), + bindIp, + nodeUdpPort2, + @[node1.wakuDiscv5.protocol.localNode.record.toURI()], # Bootstrap with node1 + false, + keys.PrivateKey(nodeKey2.skkey), + flags, + [], # Empty enr fields, for now + node2.rng + ) + + ## Given + await allFutures([node1.start(), node2.start(), node3.start()]) + await allFutures([node1.startDiscv5(), node2.startDiscv5()]) + + # Mount peer exchange + await node1.mountWakuPeerExchange() + await node3.mountWakuPeerExchange() + + await sleepAsync(3000.millis) # Give the algorithm some time to work its magic + + asyncSpawn node1.wakuPeerExchange.runPeerExchangeDiscv5Loop() + + node3.wakuPeerExchange.setPeer(node1.switch.peerInfo.toRemotePeerInfo()) + + ## When + discard waitFor node3.wakuPeerExchange.request(1) + + await sleepAsync(2000.millis) # Give the algorithm some time to work its magic + + ## Then + check: + node1.wakuDiscv5.protocol.nodesDiscovered > 0 + node3.switch.peerStore[AddressBook].contains(node2.switch.peerInfo.peerId) + + await allFutures([node1.stop(), node2.stop(), node3.stop()]) diff --git a/waku/common/wakubridge.nim b/waku/common/wakubridge.nim index bd1cbabdc..b0a01349a 100644 --- a/waku/common/wakubridge.nim +++ b/waku/common/wakubridge.nim @@ -20,6 +20,7 @@ import ../v2/utils/time, ../v2/protocol/waku_message, ../v2/node/wakunode2, + ../v2/node/peer_manager/peer_manager, # Common cli config ./config_bridge diff --git a/waku/v2/node/config.nim b/waku/v2/node/config.nim index 4cf1f94d8..c8a055739 100644 --- a/waku/v2/node/config.nim +++ b/waku/v2/node/config.nim @@ -390,6 +390,17 @@ type defaultValue: 1 name: "discv5-bits-per-hop" .}: int + ## waku peer exchange config + peerExchange* {. + desc: "Enable waku peer exchange protocol (responder side): true|false", + defaultValue: false + name: "peer-exchange" }: bool + + peerExchangeNode* {. + desc: "Peer multiaddr to send peer exchange requests to. (enables peer exchange protocol requester side)", + defaultValue: "" + name: "peer-exchange-node" }: string + ## websocket config websocketSupport* {. desc: "Enable websocket: true|false", diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index ac8a61115..2a50cd6a5 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -11,6 +11,7 @@ import export waku_peer_store, peer_storage, peers declareCounter waku_peers_dials, "Number of peer dials", ["outcome"] +declarePublicCounter waku_node_conns_initiated, "Number of connections initiated", ["source"] declarePublicGauge waku_peers_errors, "Number of peer manager errors", ["type"] logScope: @@ -292,3 +293,48 @@ proc dialPeer*(pm: PeerManager, peerId: PeerID, proto: string, dialTimeout = def let addrs = pm.switch.peerStore[AddressBook][peerId] return await pm.dialPeer(peerId, addrs, proto, dialTimeout) + +proc connectToNode(pm: PeerManager, remotePeer: RemotePeerInfo, proto: string, source = "api") {.async.} = + ## `source` indicates source of node addrs (static config, api call, discovery, etc) + info "Connecting to node", remotePeer = remotePeer, source = source + + info "Attempting dial", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId + let connOpt = await pm.dialPeer(remotePeer, proto) + + if connOpt.isSome(): + info "Successfully connected to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId + waku_node_conns_initiated.inc(labelValues = [source]) + else: + error "Failed to connect to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId + waku_peers_errors.inc(labelValues = ["conn_init_failure"]) + +proc connectToNodes*(pm: PeerManager, nodes: seq[string], proto: string, source = "api") {.async.} = + ## `source` indicates source of node addrs (static config, api call, discovery, etc) + info "connectToNodes", len = nodes.len + + for nodeId in nodes: + await connectToNode(pm, parseRemotePeerInfo(nodeId), proto ,source) + + # The issue seems to be around peers not being fully connected when + # trying to subscribe. So what we do is sleep to guarantee nodes are + # fully connected. + # + # This issue was known to Dmitiry on nim-libp2p and may be resolvable + # later. + await sleepAsync(chronos.seconds(5)) + +proc connectToNodes*(pm: PeerManager, nodes: seq[RemotePeerInfo], proto: string, source = "api") {.async.} = + ## `source` indicates source of node addrs (static config, api call, discovery, etc) + info "connectToNodes", len = nodes.len + + for remotePeerInfo in nodes: + await connectToNode(pm, remotePeerInfo, proto, source) + + # The issue seems to be around peers not being fully connected when + # trying to subscribe. So what we do is sleep to guarantee nodes are + # fully connected. + # + # This issue was known to Dmitiry on nim-libp2p and may be resolvable + # later. + await sleepAsync(chronos.seconds(5)) + diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 60d979db3..a07081499 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -20,7 +20,8 @@ import ../protocol/waku_swap/waku_swap, ../protocol/waku_filter, ../protocol/waku_lightpush, - ../protocol/waku_rln_relay/waku_rln_relay_types, + ../protocol/waku_rln_relay/waku_rln_relay_types, + ../protocol/waku_peer_exchange, ../utils/[peers, requests, wakuenr], ./peer_manager/peer_manager, ./storage/message/waku_store_queue, @@ -42,7 +43,6 @@ when defined(rln): declarePublicCounter waku_node_messages, "number of messages received", ["type"] declarePublicGauge waku_node_filters, "number of content filter subscriptions" declarePublicGauge waku_node_errors, "number of wakunode errors", ["type"] -declarePublicCounter waku_node_conns_initiated, "number of connections initiated by this node", ["source"] logScope: topics = "wakunode" @@ -564,6 +564,20 @@ proc mountLightPush*(node: WakuNode) {.async, raises: [Defect, LPError].} = node.switch.mount(node.wakuLightPush, protocolMatcher(WakuLightPushCodec)) +proc mountWakuPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} = + info "mounting waku peer exchange" + + var discv5Opt: Option[WakuDiscoveryV5] + if not node.wakuDiscV5.isNil(): + discv5Opt = some(node.wakuDiscV5) + node.wakuPeerExchange = WakuPeerExchange.init(node.peerManager, discv5Opt) + + if node.started: + # Node has started already. Let's start Waku peer exchange too. + await node.wakuPeerExchange.start() + + node.switch.mount(node.wakuPeerExchange, protocolMatcher(WakuPeerExchangeCodec)) + proc mountLibp2pPing*(node: WakuNode) {.async, raises: [Defect, LPError].} = info "mounting libp2p ping protocol" @@ -611,22 +625,6 @@ proc startKeepalive*(node: WakuNode) = asyncSpawn node.keepaliveLoop(defaultKeepalive) -## Helpers -proc connectToNode(n: WakuNode, remotePeer: RemotePeerInfo, source = "api") {.async.} = - ## `source` indicates source of node addrs (static config, api call, discovery, etc) - info "Connecting to node", remotePeer = remotePeer, source = source - - # NOTE This is dialing on WakuRelay protocol specifically - info "Attempting dial", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId - let connOpt = await n.peerManager.dialPeer(remotePeer, WakuRelayCodec) - - if connOpt.isSome(): - info "Successfully connected to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId - waku_node_conns_initiated.inc(labelValues = [source]) - else: - error "Failed to connect to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId - waku_node_errors.inc(labelValues = ["conn_init_failure"]) - proc setStorePeer*(n: WakuNode, peer: RemotePeerInfo) = n.wakuStore.setPeer(peer) @@ -654,35 +652,18 @@ proc setLightPushPeer*(n: WakuNode, address: string) {.raises: [Defect, ValueErr let peer = parseRemotePeerInfo(address) n.wakuLightPush.setPeer(peer) -proc connectToNodes*(n: WakuNode, nodes: seq[string], source = "api") {.async.} = +proc setPeerExchangePeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} = + info "Set peer exchange peer", address = address + + let remotePeer = parseRemotePeerInfo(address) + + n.wakuPeerExchange.setPeer(remotePeer) + +proc connectToNodes*(n: WakuNode, nodes: seq[RemotePeerInfo] | seq[string], source = "api") {.async.} = ## `source` indicates source of node addrs (static config, api call, discovery, etc) - info "connectToNodes", len = nodes.len - - for nodeId in nodes: - await connectToNode(n, parseRemotePeerInfo(nodeId), source) + # NOTE This is dialing on WakuRelay protocol specifically + await connectToNodes(n.peerManager, nodes, WakuRelayCodec, source) - # The issue seems to be around peers not being fully connected when - # trying to subscribe. So what we do is sleep to guarantee nodes are - # fully connected. - # - # This issue was known to Dmitiry on nim-libp2p and may be resolvable - # later. - await sleepAsync(5.seconds) - -proc connectToNodes*(n: WakuNode, nodes: seq[RemotePeerInfo], source = "api") {.async.} = - ## `source` indicates source of node addrs (static config, api call, discovery, etc) - info "connectToNodes", len = nodes.len - - for remotePeerInfo in nodes: - await connectToNode(n, remotePeerInfo, source) - - # The issue seems to be around peers not being fully connected when - # trying to subscribe. So what we do is sleep to guarantee nodes are - # fully connected. - # - # This issue was known to Dmitiry on nim-libp2p and may be resolvable - # later. - await sleepAsync(5.seconds) proc runDiscv5Loop(node: WakuNode) {.async.} = ## Continuously add newly discovered nodes @@ -933,7 +914,7 @@ when isMainModule: trace "resolving", domain=domain let resolved = await dnsResolver.resolveTxt(domain) return resolved[0] # Use only first answer - + var wakuDnsDiscovery = WakuDnsDiscovery.init(conf.dnsDiscoveryUrl, resolver) if wakuDnsDiscovery.isOk: @@ -1107,6 +1088,13 @@ when isMainModule: if conf.filternode != "": setFilterPeer(node, conf.filternode) + # waku peer exchange setup + if (conf.peerExchangeNode != "") or (conf.peerExchange): + waitFor mountWakuPeerExchange(node) + + if conf.peerExchangeNode != "": + setPeerExchangePeer(node, conf.peerExchangeNode) + ok(true) # Success # 5/7 Start node and mounted protocols @@ -1135,7 +1123,13 @@ when isMainModule: if dynamicBootstrapNodes.len > 0: info "Connecting to dynamic bootstrap peers" waitFor connectToNodes(node, dynamicBootstrapNodes, "dynamic bootstrap") - + + # retrieve and connect to peer exchange peers + if conf.peerExchangeNode != "": + info "Retrieving peer info via peer exchange protocol" + let desiredOutDegree = node.wakuRelay.parameters.d.uint64() + discard waitFor node.wakuPeerExchange.request(desiredOutDegree) + # Start keepalive, if enabled if conf.keepAlive: node.startKeepalive() diff --git a/waku/v2/node/wakunode2_setup_metrics.nim b/waku/v2/node/wakunode2_setup_metrics.nim index a08ceca7f..b649bd826 100644 --- a/waku/v2/node/wakunode2_setup_metrics.nim +++ b/waku/v2/node/wakunode2_setup_metrics.nim @@ -9,10 +9,12 @@ import metrics/chronos_httpserver, ./config, ./wakunode2, + ./peer_manager/peer_manager, ../protocol/waku_filter, ../protocol/waku_store, ../protocol/waku_lightpush, - ../protocol/waku_swap/waku_swap + ../protocol/waku_swap/waku_swap, + ../protocol/waku_peer_exchange logScope: topics = "wakunode.setup.metrics" @@ -64,10 +66,11 @@ proc startMetricsLog*() = info "Total filter peers", count = parseCollectorIntoF64(waku_filter_peers) info "Total store peers", count = parseCollectorIntoF64(waku_store_peers) info "Total lightpush peers", count = parseCollectorIntoF64(waku_lightpush_peers) + info "Total peer exchange peers", count = parseCollectorIntoF64(waku_px_peers) info "Total errors", count = freshErrorCount info "Total active filter subscriptions", count = parseCollectorIntoF64(waku_filter_subscribers) discard setTimer(Moment.fromNow(30.seconds), logMetrics) discard setTimer(Moment.fromNow(30.seconds), logMetrics) - \ No newline at end of file + diff --git a/waku/v2/node/wakunode2_types.nim b/waku/v2/node/wakunode2_types.nim index d1c1026f5..744dfc37c 100644 --- a/waku/v2/node/wakunode2_types.nim +++ b/waku/v2/node/wakunode2_types.nim @@ -8,6 +8,7 @@ import ../protocol/waku_swap/waku_swap, ../protocol/waku_filter, ../protocol/waku_lightpush, + ../protocol/waku_peer_exchange, ../protocol/waku_rln_relay/waku_rln_relay_types, ./peer_manager/peer_manager, ./discv5/waku_discv5 @@ -39,6 +40,7 @@ type wakuSwap*: WakuSwap wakuRlnRelay*: WakuRLNRelay wakuLightPush*: WakuLightPush + wakuPeerExchange*: WakuPeerExchange enr*: enr.Record libp2pPing*: Ping filters*: Filters diff --git a/waku/v2/protocol/waku_peer_exchange.nim b/waku/v2/protocol/waku_peer_exchange.nim new file mode 100644 index 000000000..3cb1f9af0 --- /dev/null +++ b/waku/v2/protocol/waku_peer_exchange.nim @@ -0,0 +1,13 @@ +{.push raises: [Defect].} + +import + ./waku_peer_exchange/rpc, + ./waku_peer_exchange/rpc_codec, + ./waku_peer_exchange/protocol, + ./waku_peer_exchange/client + +export + rpc, + rpc_codec, + protocol, + client diff --git a/waku/v2/protocol/waku_peer_exchange/README.md b/waku/v2/protocol/waku_peer_exchange/README.md new file mode 100644 index 000000000..627023d18 --- /dev/null +++ b/waku/v2/protocol/waku_peer_exchange/README.md @@ -0,0 +1,3 @@ +# Waku Peer Exchange + +Implementation of [34/WAKU2-PEER-EXCHANGE](https://rfc.vac.dev/spec/34/). diff --git a/waku/v2/protocol/waku_peer_exchange/client.nim b/waku/v2/protocol/waku_peer_exchange/client.nim new file mode 100644 index 000000000..6de8dad67 --- /dev/null +++ b/waku/v2/protocol/waku_peer_exchange/client.nim @@ -0,0 +1,9 @@ +{.push raises: [Defect].} + +import + std/[tables, sequtils], + chronicles +import + ../waku_message, + ./rpc + diff --git a/waku/v2/protocol/waku_peer_exchange/protocol.nim b/waku/v2/protocol/waku_peer_exchange/protocol.nim new file mode 100644 index 000000000..719097f12 --- /dev/null +++ b/waku/v2/protocol/waku_peer_exchange/protocol.nim @@ -0,0 +1,209 @@ +import + std/[options, sets, tables, sequtils, random], + stew/results, + chronicles, + chronos, + metrics, + libp2p/protocols/protocol, + libp2p/crypto/crypto, + eth/p2p/discoveryv5/enr +import + ../../node/peer_manager/peer_manager, + ../../node/discv5/waku_discv5, + ../../utils/requests, + ../waku_message, + ../waku_relay, + ./rpc, + ./rpc_codec + + +declarePublicGauge waku_px_peers, "number of peers (in the node's peerManager) supporting the peer exchange protocol" +declarePublicGauge waku_px_peers_received_total, "number of ENRs received via peer exchange" +declarePublicGauge waku_px_peers_received_unknown, "number of previously unknown ENRs received via peer exchange" +declarePublicGauge waku_px_peers_sent, "number of ENRs sent to peer exchange requesters" +declarePublicGauge waku_px_peers_cached, "number of peer exchange peer ENRs cached" +declarePublicGauge waku_px_errors, "number of peer exchange errors", ["type"] + +logScope: + topics = "wakupx" + + +const + # We add a 64kB safety buffer for protocol overhead. + # 10x-multiplier also for safety + MaxRpcSize = 10 * MaxWakuMessageSize + 64 * 1024 # TODO what is the expected size of a PX message? As currently specified, it can contain an arbitary number of ENRs... + MaxCacheSize = 1000 + CacheCleanWindow = 200 + + WakuPeerExchangeCodec* = "/vac/waku/peer-exchange/2.0.0-alpha1" + +# Error types (metric label values) +const + dialFailure = "dial_failure" + peerNotFoundFailure = "peer_not_found_failure" + decodeRpcFailure = "decode_rpc_failure" + retrievePeersDiscv5Error= "retrieve_peers_discv5_failure" + pxFailure = "px_failure" + +type + WakuPeerExchangeResult*[T] = Result[T, string] + + WakuPeerExchange* = ref object of LPProtocol + peerManager*: PeerManager + wakuDiscv5: Option[WakuDiscoveryV5] + enrCache: seq[enr.Record] # todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/ + +proc sendPeerExchangeRpcToPeer(wpx: WakuPeerExchange, rpc: PeerExchangeRpc, peer: RemotePeerInfo | PeerId): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = + let connOpt = await wpx.peerManager.dialPeer(peer, WakuPeerExchangeCodec) + if connOpt.isNone(): + return err(dialFailure) + + let connection = connOpt.get() + + await connection.writeLP(rpc.encode().buffer) + + return ok() + +proc request(wpx: WakuPeerExchange, numPeers: uint64, peer: RemotePeerInfo): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = + let rpc = PeerExchangeRpc( + request: PeerExchangeRequest( + numPeers: numPeers + ) + ) + + let res = await wpx.sendPeerExchangeRpcToPeer(rpc, peer) + if res.isErr(): + waku_px_errors.inc(labelValues = [res.error()]) + return err(res.error()) + + return ok() + +proc request*(wpx: WakuPeerExchange, numPeers: uint64): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = + let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec) + if peerOpt.isNone(): + waku_px_errors.inc(labelValues = [peerNotFoundFailure]) + return err(peerNotFoundFailure) + + return await wpx.request(numPeers, peerOpt.get()) + +proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record], peer: RemotePeerInfo | PeerId): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = + var peerInfos: seq[PeerExchangePeerInfo] = @[] + for e in enrs: + let pi = PeerExchangePeerInfo( + enr: e.raw + ) + peerInfos.add(pi) + + let rpc = PeerExchangeRpc( + response: PeerExchangeResponse( + peerInfos: peerInfos + ) + ) + + let res = await wpx.sendPeerExchangeRpcToPeer(rpc, peer) + if res.isErr(): + waku_px_errors.inc(labelValues = [res.error()]) + return err(res.error()) + + return ok() + +proc respond*(wpx: WakuPeerExchange, enrs: seq[enr.Record]): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = + let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec) + if peerOpt.isNone(): + waku_px_errors.inc(labelValues = [peerNotFoundFailure]) + return err(peerNotFoundFailure) + + return await wpx.respond(enrs, peerOpt.get()) + +proc cleanCache(px: WakuPeerExchange) {.gcsafe.} = + px.enrCache.delete(0, CacheCleanWindow-1) + +proc runPeerExchangeDiscv5Loop*(px: WakuPeerExchange) {.async, gcsafe.} = + ## Runs a discv5 loop adding new peers to the px peer cache + if px.wakuDiscv5.isNone(): + warn "Trying to run discovery v5 (for PX) while it's disabled" + return + + info "Starting peer exchange discovery v5 loop" + + while px.wakuDiscv5.get().listening: + trace "Running px discv5 discovery loop" + let discoveredPeers = await px.wakuDiscv5.get().findRandomPeers() + info "Discovered px peers via discv5", count=discoveredPeers.get().len() + if discoveredPeers.isOk: + for dp in discoveredPeers.get(): + if dp.enr.isSome() and not px.enrCache.contains(dp.enr.get()): + px.enrCache.add(dp.enr.get()) + + if px.enrCache.len() >= MaxCacheSize: + px.cleanCache() + + ## This loop "competes" with the loop in wakunode2 + ## For the purpose of collecting px peers, 30 sec intervals should be enough + await sleepAsync(30.seconds) + +proc getEnrsFromCache(px: WakuPeerExchange, numPeers: uint64): seq[enr.Record] {.gcsafe.} = + randomize() + if px.enrCache.len() == 0: + debug "peer exchange ENR cache is empty" + return @[] + for i in 0.. 0: + waku_px_peers_received_unknown.inc(newPeers.len().int64()) + debug "Connecting to newly discovered peers", count=newPeers.len() + await px.peerManager.connectToNodes(newPeers, WakuRelayCodec, source = "peer exchange") + + px.handler = handler + px.codec = WakuPeerExchangeCodec + +proc init*(T: type WakuPeerExchange, + peerManager: PeerManager, + wakuDiscv5: Option[WakuDiscoveryV5] = none(WakuDiscoveryV5) + ): T = + let px = WakuPeerExchange( + peerManager: peerManager, + wakuDiscv5: wakuDiscv5 + ) + px.initProtocolHandler() + return px + +proc setPeer*(wpx: WakuPeerExchange, peer: RemotePeerInfo) = + wpx.peerManager.addPeer(peer, WakuPeerExchangeCodec) + waku_px_peers.inc() + diff --git a/waku/v2/protocol/waku_peer_exchange/rpc.nim b/waku/v2/protocol/waku_peer_exchange/rpc.nim new file mode 100644 index 000000000..0b248d935 --- /dev/null +++ b/waku/v2/protocol/waku_peer_exchange/rpc.nim @@ -0,0 +1,13 @@ +type + PeerExchangePeerInfo* = object + enr*: seq[byte] # RLP encoded ENR: https://eips.ethereum.org/EIPS/eip-778 + + PeerExchangeRequest* = object + numPeers*: uint64 + + PeerExchangeResponse* = object + peerInfos*: seq[PeerExchangePeerInfo] + + PeerExchangeRpc* = object + request*: PeerExchangeRequest + response*: PeerExchangeResponse diff --git a/waku/v2/protocol/waku_peer_exchange/rpc_codec.nim b/waku/v2/protocol/waku_peer_exchange/rpc_codec.nim new file mode 100644 index 000000000..9301204da --- /dev/null +++ b/waku/v2/protocol/waku_peer_exchange/rpc_codec.nim @@ -0,0 +1,91 @@ +{.push raises: [Defect].} + +import + libp2p/protobuf/minprotobuf, + libp2p/varint +import + ../../utils/protobuf, + ./rpc + +proc encode*(rpc: PeerExchangeRequest): ProtoBuffer = + var output = initProtoBuffer() + + output.write3(1, rpc.numPeers) + output.finish3() + + return output + +proc init*(T: type PeerExchangeRequest, buffer: seq[byte]): ProtoResult[T] = + let pb = initProtoBuffer(buffer) + + var rpc = PeerExchangeRequest(numPeers: 0) + + var numPeers: uint64 + if ?pb.getField(1, numPeers): + rpc.numPeers = numPeers + + return ok(rpc) + +proc encode*(rpc: PeerExchangePeerInfo): ProtoBuffer = + var output = initProtoBuffer() + + output.write3(1, rpc.enr) + output.finish3() + + return output + +proc init*(T: type PeerExchangePeerInfo, buffer: seq[byte]): ProtoResult[T] = + let pb = initProtoBuffer(buffer) + + var rpc = PeerExchangePeerInfo(enr: @[]) + + var peerInfoBuffer: seq[byte] + if ?pb.getField(1, peerInfoBuffer): + rpc.enr = peerInfoBuffer + + return ok(rpc) + +proc encode*(rpc: PeerExchangeResponse): ProtoBuffer = + var output = initProtoBuffer() + + for pi in rpc.peerInfos: + output.write3(1, pi.encode()) + output.finish3() + + return output + +proc init*(T: type PeerExchangeResponse, buffer: seq[byte]): ProtoResult[T] = + let pb = initProtoBuffer(buffer) + + var rpc = PeerExchangeResponse(peerInfos: @[]) + + var peerInfoBuffers: seq[seq[byte]] + if ?pb.getRepeatedField(1, peerInfoBuffers): + for pib in peerInfoBuffers: + rpc.peerInfos.add(?PeerExchangePeerInfo.init(pib)) + + return ok(rpc) + +proc encode*(rpc: PeerExchangeRpc): ProtoBuffer = + var output = initProtoBuffer() + output.write3(1, rpc.request.encode()) + output.write3(2, rpc.response.encode()) + output.finish3() + + return output + +proc init*(T: type PeerExchangeRpc, buffer: seq[byte]): ProtoResult[T] = + let pb = initProtoBuffer(buffer) + + var rpc = PeerExchangeRpc() + + var requestBuffer: seq[byte] + discard ?pb.getField(1, requestBuffer) + rpc.request = ?PeerExchangeRequest.init(requestBuffer) + + var responseBuffer: seq[byte] + discard ?pb.getField(2, responseBuffer) + rpc.response = ?PeerExchangeResponse.init(responseBuffer) + + return ok(rpc) +