feat: adding rendezvous request interval (#3569)

This commit is contained in:
Simon-Pierre Vivier 2025-09-23 09:51:26 -04:00 committed by GitHub
parent 794c3a850d
commit cc7a6406f5
3 changed files with 32 additions and 9 deletions

View File

@ -1527,10 +1527,6 @@ proc mountRendezvous*(node: WakuNode, clusterId: uint16) {.async: (raises: []).}
error "initializing waku rendezvous failed", error = error error "initializing waku rendezvous failed", error = error
return return
# Always start discovering peers at startup
(await node.wakuRendezvous.initialRequestAll()).isOkOr:
error "rendezvous failed initial requests", error = error
if node.started: if node.started:
await node.wakuRendezvous.start() await node.wakuRendezvous.start()

View File

@ -7,6 +7,7 @@ import ../common/enr, ../waku_enr/capabilities, ../waku_enr/sharding
const DiscoverLimit* = 1000 const DiscoverLimit* = 1000
const DefaultRegistrationTTL* = 60.seconds const DefaultRegistrationTTL* = 60.seconds
const DefaultRegistrationInterval* = 10.seconds const DefaultRegistrationInterval* = 10.seconds
const DefaultRequestsInterval* = 1.minutes
const MaxRegistrationInterval* = 5.minutes const MaxRegistrationInterval* = 5.minutes
const PeersRequestedCount* = 12 const PeersRequestedCount* = 12

View File

@ -35,6 +35,9 @@ type WakuRendezVous* = ref object
registrationInterval: timer.Duration registrationInterval: timer.Duration
periodicRegistrationFut: Future[void] periodicRegistrationFut: Future[void]
requestInterval: timer.Duration
periodicRequestFut: Future[void]
proc batchAdvertise*( proc batchAdvertise*(
self: WakuRendezVous, self: WakuRendezVous,
namespace: string, namespace: string,
@ -143,10 +146,11 @@ proc advertiseAll(
let futs = collect(newSeq): let futs = collect(newSeq):
for shardId in shards: for shardId in shards:
# Get a random RDV peer for that shard # Get a random RDV peer for that shard
let rpi = self.peerManager.selectPeer(
RendezVousCodec, let pubsub =
some(toPubsubTopic(RelayShard(clusterId: self.clusterId, shardId: shardId))), toPubsubTopic(RelayShard(clusterId: self.clusterId, shardId: shardId))
).valueOr:
let rpi = self.peerManager.selectPeer(RendezVousCodec, some(pubsub)).valueOr:
continue continue
let namespace = computeNamespace(self.clusterId, shardId) let namespace = computeNamespace(self.clusterId, shardId)
@ -214,7 +218,7 @@ proc initialRequestAll*(
rendezvousPeerFoundTotal.inc() rendezvousPeerFoundTotal.inc()
self.peerManager.addPeer(record) self.peerManager.addPeer(record)
debug "waku rendezvous initial requests finished" debug "waku rendezvous initial request finished"
return ok() return ok()
@ -237,6 +241,22 @@ proc periodicRegistration(self: WakuRendezVous) {.async.} =
# Back to normal interval if no errors # Back to normal interval if no errors
self.registrationInterval = DefaultRegistrationInterval self.registrationInterval = DefaultRegistrationInterval
proc periodicRequests(self: WakuRendezVous) {.async.} =
debug "waku rendezvous periodic requests started", interval = self.requestInterval
# infinite loop
while true:
(await self.initialRequestAll()).isOkOr:
error "waku rendezvous requests failed", error = error
await sleepAsync(self.requestInterval)
# Exponential backoff
self.requestInterval += self.requestInterval
if self.requestInterval >= 1.days:
break
proc new*( proc new*(
T: type WakuRendezVous, T: type WakuRendezVous,
switch: Switch, switch: Switch,
@ -266,6 +286,7 @@ proc new*(
wrv.getShards = getShards wrv.getShards = getShards
wrv.getCapabilities = getCapabilities wrv.getCapabilities = getCapabilities
wrv.registrationInterval = DefaultRegistrationInterval wrv.registrationInterval = DefaultRegistrationInterval
wrv.requestInterval = DefaultRequestsInterval
debug "waku rendezvous initialized", debug "waku rendezvous initialized",
clusterId = clusterId, shards = getShards(), capabilities = getCapabilities() clusterId = clusterId, shards = getShards(), capabilities = getCapabilities()
@ -276,10 +297,15 @@ proc start*(self: WakuRendezVous) {.async: (raises: []).} =
# start registering forever # start registering forever
self.periodicRegistrationFut = self.periodicRegistration() self.periodicRegistrationFut = self.periodicRegistration()
self.periodicRequestFut = self.periodicRequests()
debug "waku rendezvous discovery started" debug "waku rendezvous discovery started"
proc stopWait*(self: WakuRendezVous) {.async: (raises: []).} = proc stopWait*(self: WakuRendezVous) {.async: (raises: []).} =
if not self.periodicRegistrationFut.isNil(): if not self.periodicRegistrationFut.isNil():
await self.periodicRegistrationFut.cancelAndWait() await self.periodicRegistrationFut.cancelAndWait()
if not self.periodicRequestFut.isNil():
await self.periodicRequestFut.cancelAndWait()
debug "waku rendezvous discovery stopped" debug "waku rendezvous discovery stopped"