From cc7a6406f50786ff8b144b18f472b058b2dc8a0c Mon Sep 17 00:00:00 2001 From: Simon-Pierre Vivier Date: Tue, 23 Sep 2025 09:51:26 -0400 Subject: [PATCH] feat: adding rendezvous request interval (#3569) --- waku/node/waku_node.nim | 4 ---- waku/waku_rendezvous/common.nim | 1 + waku/waku_rendezvous/protocol.nim | 36 ++++++++++++++++++++++++++----- 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 52b210558..bbda3f5ea 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -1527,10 +1527,6 @@ proc mountRendezvous*(node: WakuNode, clusterId: uint16) {.async: (raises: []).} error "initializing waku rendezvous failed", error = error return - # Always start discovering peers at startup - (await node.wakuRendezvous.initialRequestAll()).isOkOr: - error "rendezvous failed initial requests", error = error - if node.started: await node.wakuRendezvous.start() diff --git a/waku/waku_rendezvous/common.nim b/waku/waku_rendezvous/common.nim index f5ce4b942..6125ac860 100644 --- a/waku/waku_rendezvous/common.nim +++ b/waku/waku_rendezvous/common.nim @@ -7,6 +7,7 @@ import ../common/enr, ../waku_enr/capabilities, ../waku_enr/sharding const DiscoverLimit* = 1000 const DefaultRegistrationTTL* = 60.seconds const DefaultRegistrationInterval* = 10.seconds +const DefaultRequestsInterval* = 1.minutes const MaxRegistrationInterval* = 5.minutes const PeersRequestedCount* = 12 diff --git a/waku/waku_rendezvous/protocol.nim b/waku/waku_rendezvous/protocol.nim index 5a254a5cc..ae5732267 100644 --- a/waku/waku_rendezvous/protocol.nim +++ b/waku/waku_rendezvous/protocol.nim @@ -35,6 +35,9 @@ type WakuRendezVous* = ref object registrationInterval: timer.Duration periodicRegistrationFut: Future[void] + requestInterval: timer.Duration + periodicRequestFut: Future[void] + proc batchAdvertise*( self: WakuRendezVous, namespace: string, @@ -143,10 +146,11 @@ proc advertiseAll( let futs = collect(newSeq): for shardId in shards: # Get a random RDV peer for that shard - let rpi = self.peerManager.selectPeer( - RendezVousCodec, - some(toPubsubTopic(RelayShard(clusterId: self.clusterId, shardId: shardId))), - ).valueOr: + + let pubsub = + toPubsubTopic(RelayShard(clusterId: self.clusterId, shardId: shardId)) + + let rpi = self.peerManager.selectPeer(RendezVousCodec, some(pubsub)).valueOr: continue let namespace = computeNamespace(self.clusterId, shardId) @@ -214,7 +218,7 @@ proc initialRequestAll*( rendezvousPeerFoundTotal.inc() self.peerManager.addPeer(record) - debug "waku rendezvous initial requests finished" + debug "waku rendezvous initial request finished" return ok() @@ -237,6 +241,22 @@ proc periodicRegistration(self: WakuRendezVous) {.async.} = # Back to normal interval if no errors self.registrationInterval = DefaultRegistrationInterval +proc periodicRequests(self: WakuRendezVous) {.async.} = + debug "waku rendezvous periodic requests started", interval = self.requestInterval + + # infinite loop + while true: + (await self.initialRequestAll()).isOkOr: + error "waku rendezvous requests failed", error = error + + await sleepAsync(self.requestInterval) + + # Exponential backoff + self.requestInterval += self.requestInterval + + if self.requestInterval >= 1.days: + break + proc new*( T: type WakuRendezVous, switch: Switch, @@ -266,6 +286,7 @@ proc new*( wrv.getShards = getShards wrv.getCapabilities = getCapabilities wrv.registrationInterval = DefaultRegistrationInterval + wrv.requestInterval = DefaultRequestsInterval debug "waku rendezvous initialized", clusterId = clusterId, shards = getShards(), capabilities = getCapabilities() @@ -276,10 +297,15 @@ proc start*(self: WakuRendezVous) {.async: (raises: []).} = # start registering forever self.periodicRegistrationFut = self.periodicRegistration() + self.periodicRequestFut = self.periodicRequests() + debug "waku rendezvous discovery started" proc stopWait*(self: WakuRendezVous) {.async: (raises: []).} = if not self.periodicRegistrationFut.isNil(): await self.periodicRegistrationFut.cancelAndWait() + if not self.periodicRequestFut.isNil(): + await self.periodicRequestFut.cancelAndWait() + debug "waku rendezvous discovery stopped"