chore: less logs for rendezvous (#3319)

Co-authored-by: fryorcraken <110212804+fryorcraken@users.noreply.github.com>
This commit is contained in:
Simon-Pierre Vivier 2025-03-14 08:49:06 -04:00 committed by GitHub
parent 8f775cc638
commit 91e5c7bc13
2 changed files with 27 additions and 11 deletions

View File

@ -7,6 +7,7 @@ import ../common/enr, ../waku_enr/capabilities, ../waku_enr/sharding
const DiscoverLimit* = 1000 const DiscoverLimit* = 1000
const DefaultRegistrationTTL* = 60.seconds const DefaultRegistrationTTL* = 60.seconds
const DefaultRegistrationInterval* = 10.seconds const DefaultRegistrationInterval* = 10.seconds
const MaxRegistrationInterval* = 5.minutes
const PeersRequestedCount* = 12 const PeersRequestedCount* = 12
proc computeNamespace*(clusterId: uint16, shard: uint16): string = proc computeNamespace*(clusterId: uint16, shard: uint16): string =

View File

@ -32,6 +32,7 @@ type WakuRendezVous* = ref object
relayShard: RelayShards relayShard: RelayShards
capabilities: seq[Capabilities] capabilities: seq[Capabilities]
registrationInterval: timer.Duration
periodicRegistrationFut: Future[void] periodicRegistrationFut: Future[void]
proc batchAdvertise*( proc batchAdvertise*(
@ -42,7 +43,7 @@ proc batchAdvertise*(
): Future[Result[void, string]] {.async: (raises: []).} = ): Future[Result[void, string]] {.async: (raises: []).} =
## Register with all rendezvous peers under a namespace ## Register with all rendezvous peers under a namespace
# rendezvous.advertise except already opened connections # rendezvous.advertise expects already opened connections
# must dial first # must dial first
var futs = collect(newSeq): var futs = collect(newSeq):
for peerId in peers: for peerId in peers:
@ -62,7 +63,7 @@ proc batchAdvertise*(
fut.read() fut.read()
if catchable.isErr(): if catchable.isErr():
error "rendezvous dial failed", error = catchable.error.msg warn "a rendezvous dial failed", cause = catchable.error.msg
continue continue
let connOpt = catchable.get() let connOpt = catchable.get()
@ -91,7 +92,7 @@ proc batchRequest*(
): Future[Result[seq[PeerRecord], string]] {.async: (raises: []).} = ): Future[Result[seq[PeerRecord], string]] {.async: (raises: []).} =
## Request all records from all rendezvous peers matching a namespace ## Request all records from all rendezvous peers matching a namespace
# rendezvous.request except already opened connections # rendezvous.request expects already opened connections
# must dial first # must dial first
var futs = collect(newSeq): var futs = collect(newSeq):
for peerId in peers: for peerId in peers:
@ -111,7 +112,7 @@ proc batchRequest*(
fut.read() fut.read()
if catchable.isErr(): if catchable.isErr():
error "rendezvous dial failed", error = catchable.error.msg warn "a rendezvous dial failed", cause = catchable.error.msg
continue continue
let connOpt = catchable.get() let connOpt = catchable.get()
@ -143,7 +144,6 @@ proc advertiseAll(
for pubsubTopic in pubsubTopics: for pubsubTopic in pubsubTopics:
# Get a random RDV peer for that shard # Get a random RDV peer for that shard
let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr: let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr:
error "could not get a peer supporting RendezVousCodec"
continue continue
let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId) let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId)
@ -151,6 +151,9 @@ proc advertiseAll(
# Advertise yourself on that peer # Advertise yourself on that peer
self.batchAdvertise(namespace, DefaultRegistrationTTL, @[rpi.peerId]) self.batchAdvertise(namespace, DefaultRegistrationTTL, @[rpi.peerId])
if futs.len < 1:
return err("could not get a peer supporting RendezVousCodec")
let catchable = catch: let catchable = catch:
await allFinished(futs) await allFinished(futs)
@ -159,7 +162,7 @@ proc advertiseAll(
for fut in catchable.get(): for fut in catchable.get():
if fut.failed(): if fut.failed():
error "rendezvous advertisement failed", error = fut.error.msg warn "a rendezvous advertisement failed", cause = fut.error.msg
debug "waku rendezvous advertisements finished" debug "waku rendezvous advertisements finished"
@ -178,12 +181,14 @@ proc initialRequestAll*(
# Get a random RDV peer for that shard # Get a random RDV peer for that shard
let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr: let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr:
error "could not get a peer supporting RendezVousCodec"
continue continue
# Ask for peer records for that shard # Ask for peer records for that shard
self.batchRequest(namespace, PeersRequestedCount, @[rpi.peerId]) self.batchRequest(namespace, PeersRequestedCount, @[rpi.peerId])
if futs.len < 1:
return err("could not get a peer supporting RendezVousCodec")
let catchable = catch: let catchable = catch:
await allFinished(futs) await allFinished(futs)
@ -192,12 +197,13 @@ proc initialRequestAll*(
for fut in catchable.get(): for fut in catchable.get():
if fut.failed(): if fut.failed():
error "rendezvous request failed", error = fut.error.msg warn "a rendezvous request failed", cause = fut.error.msg
elif fut.finished(): elif fut.finished():
let res = fut.value() let res = fut.value()
let records = res.valueOr: let records = res.valueOr:
return err($res.error) warn "a rendezvous request failed", cause = $res.error
continue
for record in records: for record in records:
rendezvousPeerFoundTotal.inc() rendezvousPeerFoundTotal.inc()
@ -209,15 +215,23 @@ proc initialRequestAll*(
proc periodicRegistration(self: WakuRendezVous) {.async.} = proc periodicRegistration(self: WakuRendezVous) {.async.} =
debug "waku rendezvous periodic registration started", debug "waku rendezvous periodic registration started",
interval = DefaultRegistrationInterval interval = self.registrationInterval
# infinite loop # infinite loop
while true: while true:
await sleepAsync(DefaultRegistrationInterval) await sleepAsync(self.registrationInterval)
(await self.advertiseAll()).isOkOr: (await self.advertiseAll()).isOkOr:
debug "waku rendezvous advertisements failed", error = error debug "waku rendezvous advertisements failed", error = error
if self.registrationInterval > MaxRegistrationInterval:
self.registrationInterval = MaxRegistrationInterval
else:
self.registrationInterval += self.registrationInterval
# Back to normal interval if no errors
self.registrationInterval = DefaultRegistrationInterval
proc new*( proc new*(
T: type WakuRendezVous, switch: Switch, peerManager: PeerManager, enr: Record T: type WakuRendezVous, switch: Switch, peerManager: PeerManager, enr: Record
): Result[T, string] {.raises: [].} = ): Result[T, string] {.raises: [].} =
@ -246,6 +260,7 @@ proc new*(
wrv.peerManager = peerManager wrv.peerManager = peerManager
wrv.relayshard = relayshard wrv.relayshard = relayshard
wrv.capabilities = capabilities wrv.capabilities = capabilities
wrv.registrationInterval = DefaultRegistrationInterval
debug "waku rendezvous initialized", debug "waku rendezvous initialized",
cluster = relayshard.clusterId, cluster = relayshard.clusterId,