diff --git a/tests/test_waku_rendezvous.nim b/tests/test_waku_rendezvous.nim index 65967b79a..fa2efbd47 100644 --- a/tests/test_waku_rendezvous.nim +++ b/tests/test_waku_rendezvous.nim @@ -33,7 +33,11 @@ procSuite "Waku Rendezvous": ) await allFutures( - [node1.mountRendezvous(), node2.mountRendezvous(), node3.mountRendezvous()] + [ + node1.mountRendezvous(clusterId), + node2.mountRendezvous(clusterId), + node3.mountRendezvous(clusterId), + ] ) await allFutures([node1.start(), node2.start(), node3.start()]) diff --git a/tests/waku_discv5/test_waku_discv5.nim b/tests/waku_discv5/test_waku_discv5.nim index 734ea4bf9..325e7a21d 100644 --- a/tests/waku_discv5/test_waku_discv5.nim +++ b/tests/waku_discv5/test_waku_discv5.nim @@ -450,7 +450,7 @@ suite "Waku Discovery v5": raiseAssert error await waku1.node.mountPeerExchange() - await waku1.node.mountRendezvous() + await waku1.node.mountRendezvous(conf.clusterId) confBuilder.discv5Conf.withBootstrapNodes(@[waku1.node.enr.toURI()]) confBuilder.withP2pTcpPort(60003.Port) diff --git a/waku/common/callbacks.nim b/waku/common/callbacks.nim index d1da48067..9b8590152 100644 --- a/waku/common/callbacks.nim +++ b/waku/common/callbacks.nim @@ -1 +1,5 @@ +import ../waku_enr/capabilities + type GetShards* = proc(): seq[uint16] {.closure, gcsafe, raises: [].} + +type GetCapabilities* = proc(): seq[Capabilities] {.closure, gcsafe, raises: [].} diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 9380163f6..2410f5f98 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -338,7 +338,7 @@ proc setupProtocols( # Only relay nodes should be rendezvous points. if conf.rendezvous: - await node.mountRendezvous() + await node.mountRendezvous(conf.clusterId) # Keepalive mounted on all nodes try: diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 6fa50e33d..4ee8af3b0 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -149,6 +149,12 @@ proc getShardsGetter(node: WakuNode): GetShards = return shards return @[] +proc getCapabilitiesGetter(node: WakuNode): GetCapabilities = + return proc(): seq[Capabilities] {.closure, gcsafe, raises: [].} = + if node.wakuRelay.isNil(): + return @[] + return node.enr.getCapabilities() + proc new*( T: type WakuNode, netConfig: NetConfig, @@ -1513,10 +1519,16 @@ proc parallelPings*(node: WakuNode, peerIds: seq[PeerId]): Future[int] {.async.} return successCount -proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} = +proc mountRendezvous*(node: WakuNode, clusterId: uint16) {.async: (raises: []).} = info "mounting rendezvous discovery protocol" - node.wakuRendezvous = WakuRendezVous.new(node.switch, node.peerManager, node.enr).valueOr: + node.wakuRendezvous = WakuRendezVous.new( + node.switch, + node.peerManager, + clusterId, + node.getShardsGetter(), + node.getCapabilitiesGetter(), + ).valueOr: error "initializing waku rendezvous failed", error = error return diff --git a/waku/waku_rendezvous/protocol.nim b/waku/waku_rendezvous/protocol.nim index a26eaca6f..5a254a5cc 100644 --- a/waku/waku_rendezvous/protocol.nim +++ b/waku/waku_rendezvous/protocol.nim @@ -12,11 +12,11 @@ import import ../node/peer_manager, - ../common/enr, + ../common/callbacks, ../waku_enr/capabilities, - ../waku_enr/sharding, ../waku_core/peers, ../waku_core/topics, + ../waku_core/topics/pubsub_topic, ./common logScope: @@ -28,9 +28,9 @@ declarePublicCounter rendezvousPeerFoundTotal, type WakuRendezVous* = ref object rendezvous: Rendezvous peerManager: PeerManager - - relayShard: RelayShards - capabilities: seq[Capabilities] + clusterId: uint16 + getShards: GetShards + getCapabilities: GetCapabilities registrationInterval: timer.Duration periodicRegistrationFut: Future[void] @@ -138,15 +138,18 @@ proc advertiseAll( ): Future[Result[void, string]] {.async: (raises: []).} = debug "waku rendezvous advertisements started" - let pubsubTopics = self.relayShard.topics() + let shards = self.getShards() let futs = collect(newSeq): - for pubsubTopic in pubsubTopics: + for shardId in shards: # Get a random RDV peer for that shard - let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr: + let rpi = self.peerManager.selectPeer( + RendezVousCodec, + some(toPubsubTopic(RelayShard(clusterId: self.clusterId, shardId: shardId))), + ).valueOr: continue - let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId) + let namespace = computeNamespace(self.clusterId, shardId) # Advertise yourself on that peer self.batchAdvertise(namespace, DefaultRegistrationTTL, @[rpi.peerId]) @@ -173,14 +176,16 @@ proc initialRequestAll*( ): Future[Result[void, string]] {.async: (raises: []).} = debug "waku rendezvous initial requests started" - let pubsubTopics = self.relayShard.topics() + let shards = self.getShards() let futs = collect(newSeq): - for pubsubTopic in pubsubTopics: - let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId) - + for shardId in shards: + let namespace = computeNamespace(self.clusterId, shardId) # Get a random RDV peer for that shard - let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr: + let rpi = self.peerManager.selectPeer( + RendezVousCodec, + some(toPubsubTopic(RelayShard(clusterId: self.clusterId, shardId: shardId))), + ).valueOr: continue # Ask for peer records for that shard @@ -233,14 +238,13 @@ proc periodicRegistration(self: WakuRendezVous) {.async.} = self.registrationInterval = DefaultRegistrationInterval proc new*( - T: type WakuRendezVous, switch: Switch, peerManager: PeerManager, enr: Record + T: type WakuRendezVous, + switch: Switch, + peerManager: PeerManager, + clusterId: uint16, + getShards: GetShards, + getCapabilities: GetCapabilities, ): Result[T, string] {.raises: [].} = - let relayshard = getRelayShards(enr).valueOr: - warn "Using default cluster id 0" - RelayShards(clusterID: 0, shardIds: @[]) - - let capabilities = enr.getCapabilities() - let rvCatchable = catch: RendezVous.new(switch = switch, minDuration = DefaultRegistrationTTL) @@ -258,14 +262,13 @@ proc new*( var wrv = WakuRendezVous() wrv.rendezvous = rv wrv.peerManager = peerManager - wrv.relayshard = relayshard - wrv.capabilities = capabilities + wrv.clusterId = clusterId + wrv.getShards = getShards + wrv.getCapabilities = getCapabilities wrv.registrationInterval = DefaultRegistrationInterval debug "waku rendezvous initialized", - cluster = relayshard.clusterId, - shards = relayshard.shardIds, - capabilities = capabilities + clusterId = clusterId, shards = getShards(), capabilities = getCapabilities() return ok(wrv)