diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index dbe2c1f0f..3c138777b 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -21,7 +21,8 @@ import ./v2/test_waku_discv5, ./v2/test_enr_utils, ./v2/test_waku_store_queue, - ./v2/test_pagination_utils + ./v2/test_pagination_utils, + ./v2/test_peer_exchange when defined(rln): import ./v2/test_waku_rln_relay diff --git a/tests/v2/test_peer_exchange.nim b/tests/v2/test_peer_exchange.nim new file mode 100644 index 000000000..904041741 --- /dev/null +++ b/tests/v2/test_peer_exchange.nim @@ -0,0 +1,64 @@ +{.used.} + +import + std/[sequtils, options], + chronicles, + chronos, + libp2p/crypto/crypto, + libp2p/protocols/pubsub/gossipsub, + stew/shims/net, + testutils/unittests, + ../../waku/v2/node/wakunode2, + ../test_helpers + +procSuite "Peer Exchange": + asyncTest "GossipSub (relay) peer exchange": + ## Tests peer exchange + + # Create nodes and ENR. These will be added to the discoverable list + let + bindIp = ValidIpAddress.init("0.0.0.0") + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.new(nodeKey1, bindIp, Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.new(nodeKey2, bindIp, Port(60002), sendSignedPeerRecord = true) + nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node3 = WakuNode.new(nodeKey3, bindIp, Port(60003), sendSignedPeerRecord = true) + + var + peerExchangeHandler: RoutingRecordsHandler + completionFut = newFuture[bool]() + + proc handlePeerExchange(peer: PeerId, topic: string, + peers: seq[RoutingRecordsPair]) {.gcsafe, raises: [Defect].} = + ## Handle peers received via gossipsub peer exchange + let peerRecords = peers.mapIt(it.record.get()) + + check: + # Node 3 is informed of node 2 via peer exchange + peer == node1.switch.peerInfo.peerId + topic == defaultTopic + peerRecords.countIt(it.peerId == node2.switch.peerInfo.peerId) == 1 + + if (not completionFut.completed()): + completionFut.complete(true) + + peerExchangeHandler = handlePeerExchange + + node1.mountRelay() + node2.mountRelay() + node3.mountRelay(peerExchangeHandler = some(peerExchangeHandler)) + + # Ensure that node1 prunes all peers after the first connection + node1.wakuRelay.parameters.dHigh = 1 + + await allFutures([node1.start(), node2.start(), node3.start()]) + + await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) + + await node3.connectToNodes(@[node1.switch.peerInfo.toRemotePeerInfo()]) + + check: + (await completionFut.withTimeout(5.seconds)) == true + + await allFutures([node1.stop(), node2.stop(), node3.stop()]) diff --git a/waku/v2/node/config.nim b/waku/v2/node/config.nim index db66872d9..bbb6dd6c1 100644 --- a/waku/v2/node/config.nim +++ b/waku/v2/node/config.nim @@ -89,6 +89,11 @@ type defaultValue: true name: "relay" }: bool + relayPeerExchange* {. + desc: "Enable gossipsub peer exchange in relay protocol: true|false", + defaultValue: true + name: "relay-peer-exchange" }: bool + rlnRelay* {. desc: "Enable spam protection through rln-relay: true|false", defaultValue: false diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index c6eeee4a6..217f22981 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -53,7 +53,7 @@ logScope: const clientId* = "Nimbus Waku v2 node" # Default topic -const defaultTopic = "/waku/2/default-waku/proto" +const defaultTopic* = "/waku/2/default-waku/proto" # Default Waku Filter Timeout const WakuFilterTimeout: Duration = 1.days @@ -170,6 +170,7 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey, secureCert: string = "", wakuFlags = none(WakuEnrBitfield), nameResolver: NameResolver = nil, + sendSignedPeerRecord = false, dns4DomainName = none(string) ): T {.raises: [Defect, LPError, IOError, TLSStreamProtocolError].} = @@ -242,7 +243,8 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey, wssEnabled = wssEnabled, secureKeyPath = secureKey, secureCertPath = secureCert, - nameResolver = nameResolver) + nameResolver = nameResolver, + sendSignedPeerRecord = sendSignedPeerRecord) let wakuNode = WakuNode( peerManager: PeerManager.new(switch, peerStorage), @@ -673,7 +675,8 @@ proc startRelay*(node: WakuNode) {.async.} = proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), relayMessages = true, - triggerSelf = true) + triggerSelf = true, + peerExchangeHandler = none(RoutingRecordsHandler)) # @TODO: Better error handling: CatchableError is raised by `waitFor` {.gcsafe, raises: [Defect, InitializationError, LPError, CatchableError].} = @@ -699,6 +702,10 @@ proc mountRelay*(node: WakuNode, ## all configured topics plus the hard-coded defaultTopic(s) wakuRelay.defaultTopics = concat(@[defaultTopic], topics) + ## Add peer exchange handler + if peerExchangeHandler.isSome(): + wakuRelay.routingRecordsHandler.add(peerExchangeHandler.get()) + node.switch.mount(wakuRelay, protocolMatcher(WakuRelayCodec)) if relayMessages: @@ -1107,6 +1114,7 @@ when isMainModule: conf.websocketSecureCertPath, some(wakuFlags), dnsResolver, + conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled dns4DomainName ) @@ -1147,7 +1155,7 @@ when isMainModule: ok(node) # 4/7 Mount and initialize configured protocols - proc setupProtocols(node: var WakuNode, + proc setupProtocols(node: WakuNode, conf: WakuNodeConf, mStorage: WakuMessageStore = nil): SetupResult[bool] = @@ -1156,10 +1164,26 @@ when isMainModule: ## No protocols are started yet. # Mount relay on all nodes + var peerExchangeHandler = none(RoutingRecordsHandler) + if conf.relayPeerExchange: + proc handlePeerExchange(peer: PeerId, topic: string, + peers: seq[RoutingRecordsPair]) {.gcsafe, raises: [Defect].} = + ## Handle peers received via gossipsub peer exchange + # TODO: Only consider peers on pubsub topics we subscribe to + let exchangedPeers = peers.filterIt(it.record.isSome()) # only peers with populated records + .mapIt(toRemotePeerInfo(it.record.get())) + + debug "connecting to exchanged peers", src=peer, topic=topic, numPeers=exchangedPeers.len + + # asyncSpawn, as we don't want to block here + asyncSpawn node.connectToNodes(exchangedPeers, "peer exchange") + + peerExchangeHandler = some(handlePeerExchange) + mountRelay(node, - conf.topics.split(" "), - relayMessages = conf.relay, # Indicates if node is capable to relay messages - ) + conf.topics.split(" "), + relayMessages = conf.relay, # Indicates if node is capable to relay messages + peerExchangeHandler = peerExchangeHandler) # Keepalive mounted on all nodes mountLibp2pPing(node) diff --git a/waku/v2/utils/peers.nim b/waku/v2/utils/peers.nim index 855cd30c5..c91b6222d 100644 --- a/waku/v2/utils/peers.nim +++ b/waku/v2/utils/peers.nim @@ -2,7 +2,7 @@ # Collection of utilities related to Waku peers import - std/[options, strutils], + std/[options, sequtils, strutils], stew/results, stew/shims/net, eth/keys, @@ -11,7 +11,8 @@ import libp2p/[errors, multiaddress, peerid, - peerinfo] + peerinfo, + routing_record] type RemotePeerInfo* = ref object of RootObj @@ -152,6 +153,12 @@ proc toRemotePeerInfo*(enr: enr.Record): Result[RemotePeerInfo, cstring] = return ok(RemotePeerInfo.init(peerId, addrs, some(enr))) +## Converts peer records to dialable RemotePeerInfo +## Useful if signed peer records have been received in an exchange +proc toRemotePeerInfo*(peerRecord: PeerRecord): RemotePeerInfo = + RemotePeerInfo.init(peerRecord.peerId, + peerRecord.addresses.mapIt(it.address)) + ## Converts the local peerInfo to dialable RemotePeerInfo ## Useful for testing or internal connections proc toRemotePeerInfo*(peerInfo: PeerInfo): RemotePeerInfo = diff --git a/waku/v2/utils/wakuswitch.nim b/waku/v2/utils/wakuswitch.nim index 65b9bac79..5339d9810 100644 --- a/waku/v2/utils/wakuswitch.nim +++ b/waku/v2/utils/wakuswitch.nim @@ -61,6 +61,7 @@ proc newWakuSwitch*( maxOut = -1, maxConnsPerPeer = MaxConnectionsPerPeer, nameResolver: NameResolver = nil, + sendSignedPeerRecord = false, wssEnabled: bool = false, secureKeyPath: string = "", secureCertPath: string = ""): Switch @@ -77,6 +78,8 @@ proc newWakuSwitch*( .withNoise() .withTcpTransport(transportFlags) .withNameResolver(nameResolver) + .withSignedPeerRecord(sendSignedPeerRecord) + if privKey.isSome(): b = b.withPrivateKey(privKey.get()) if wsAddress.isSome():