From 91e5c7bc1303a9b2dacc50fd0504c8224da408eb Mon Sep 17 00:00:00 2001 From: Simon-Pierre Vivier Date: Fri, 14 Mar 2025 08:49:06 -0400 Subject: [PATCH] chore: less logs for rendezvous (#3319) Co-authored-by: fryorcraken <110212804+fryorcraken@users.noreply.github.com> --- waku/waku_rendezvous/common.nim | 1 + waku/waku_rendezvous/protocol.nim | 37 ++++++++++++++++++++++--------- 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/waku/waku_rendezvous/common.nim b/waku/waku_rendezvous/common.nim index 9722347cf..f5ce4b942 100644 --- a/waku/waku_rendezvous/common.nim +++ b/waku/waku_rendezvous/common.nim @@ -7,6 +7,7 @@ import ../common/enr, ../waku_enr/capabilities, ../waku_enr/sharding const DiscoverLimit* = 1000 const DefaultRegistrationTTL* = 60.seconds const DefaultRegistrationInterval* = 10.seconds +const MaxRegistrationInterval* = 5.minutes const PeersRequestedCount* = 12 proc computeNamespace*(clusterId: uint16, shard: uint16): string = diff --git a/waku/waku_rendezvous/protocol.nim b/waku/waku_rendezvous/protocol.nim index 221321c95..9f1aa69cb 100644 --- a/waku/waku_rendezvous/protocol.nim +++ b/waku/waku_rendezvous/protocol.nim @@ -32,6 +32,7 @@ type WakuRendezVous* = ref object relayShard: RelayShards capabilities: seq[Capabilities] + registrationInterval: timer.Duration periodicRegistrationFut: Future[void] proc batchAdvertise*( @@ -42,7 +43,7 @@ proc batchAdvertise*( ): Future[Result[void, string]] {.async: (raises: []).} = ## Register with all rendezvous peers under a namespace - # rendezvous.advertise except already opened connections + # rendezvous.advertise expects already opened connections # must dial first var futs = collect(newSeq): for peerId in peers: @@ -62,7 +63,7 @@ proc batchAdvertise*( fut.read() if catchable.isErr(): - error "rendezvous dial failed", error = catchable.error.msg + warn "a rendezvous dial failed", cause = catchable.error.msg continue let connOpt = catchable.get() @@ -91,7 +92,7 @@ proc batchRequest*( ): Future[Result[seq[PeerRecord], string]] {.async: (raises: []).} = ## Request all records from all rendezvous peers matching a namespace - # rendezvous.request except already opened connections + # rendezvous.request expects already opened connections # must dial first var futs = collect(newSeq): for peerId in peers: @@ -111,7 +112,7 @@ proc batchRequest*( fut.read() if catchable.isErr(): - error "rendezvous dial failed", error = catchable.error.msg + warn "a rendezvous dial failed", cause = catchable.error.msg continue let connOpt = catchable.get() @@ -143,7 +144,6 @@ proc advertiseAll( for pubsubTopic in pubsubTopics: # Get a random RDV peer for that shard let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr: - error "could not get a peer supporting RendezVousCodec" continue let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId) @@ -151,6 +151,9 @@ proc advertiseAll( # Advertise yourself on that peer self.batchAdvertise(namespace, DefaultRegistrationTTL, @[rpi.peerId]) + if futs.len < 1: + return err("could not get a peer supporting RendezVousCodec") + let catchable = catch: await allFinished(futs) @@ -159,7 +162,7 @@ proc advertiseAll( for fut in catchable.get(): if fut.failed(): - error "rendezvous advertisement failed", error = fut.error.msg + warn "a rendezvous advertisement failed", cause = fut.error.msg debug "waku rendezvous advertisements finished" @@ -178,12 +181,14 @@ proc initialRequestAll*( # Get a random RDV peer for that shard let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr: - error "could not get a peer supporting RendezVousCodec" continue # Ask for peer records for that shard self.batchRequest(namespace, PeersRequestedCount, @[rpi.peerId]) + if futs.len < 1: + return err("could not get a peer supporting RendezVousCodec") + let catchable = catch: await allFinished(futs) @@ -192,12 +197,13 @@ proc initialRequestAll*( for fut in catchable.get(): if fut.failed(): - error "rendezvous request failed", error = fut.error.msg + warn "a rendezvous request failed", cause = fut.error.msg elif fut.finished(): let res = fut.value() let records = res.valueOr: - return err($res.error) + warn "a rendezvous request failed", cause = $res.error + continue for record in records: rendezvousPeerFoundTotal.inc() @@ -209,15 +215,23 @@ proc initialRequestAll*( proc periodicRegistration(self: WakuRendezVous) {.async.} = debug "waku rendezvous periodic registration started", - interval = DefaultRegistrationInterval + interval = self.registrationInterval # infinite loop while true: - await sleepAsync(DefaultRegistrationInterval) + await sleepAsync(self.registrationInterval) (await self.advertiseAll()).isOkOr: debug "waku rendezvous advertisements failed", error = error + if self.registrationInterval > MaxRegistrationInterval: + self.registrationInterval = MaxRegistrationInterval + else: + self.registrationInterval += self.registrationInterval + + # Back to normal interval if no errors + self.registrationInterval = DefaultRegistrationInterval + proc new*( T: type WakuRendezVous, switch: Switch, peerManager: PeerManager, enr: Record ): Result[T, string] {.raises: [].} = @@ -246,6 +260,7 @@ proc new*( wrv.peerManager = peerManager wrv.relayshard = relayshard wrv.capabilities = capabilities + wrv.registrationInterval = DefaultRegistrationInterval debug "waku rendezvous initialized", cluster = relayshard.clusterId,