From ab995c0a8e5a9b1cc781d15e9ae09a2aa57200e5 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Fri, 17 Apr 2026 08:53:26 -0400 Subject: [PATCH] add service lookup & refactor --- waku/discovery/waku_kademlia.nim | 99 ++++++++++++++++++-------------- waku/factory/node_factory.nim | 3 +- waku/node/waku_node.nim | 4 ++ 3 files changed, 60 insertions(+), 46 deletions(-) diff --git a/waku/discovery/waku_kademlia.nim b/waku/discovery/waku_kademlia.nim index edebcddff..3ea24a31f 100644 --- a/waku/discovery/waku_kademlia.nim +++ b/waku/discovery/waku_kademlia.nim @@ -25,7 +25,8 @@ type WakuKademlia* = ref object protocol*: KademliaDiscovery peerManager: PeerManager loopInterval: Duration - walkIntervalFut: Future[void] + periodicWalkFut: Future[void] + periodicLookupFut: Future[void] proc toRemotePeerInfo(record: ExtendedPeerRecord): Option[RemotePeerInfo] = debug "processing kademlia record", @@ -66,30 +67,30 @@ proc toRemotePeerInfo(record: ExtendedPeerRecord): Option[RemotePeerInfo] = ) ) -proc runDiscoveryLoop( - self: WakuKademlia, interval: Duration -) {.async: (raises: [CancelledError]).} = - debug "kademlia discovery loop started", interval = interval +proc randomWalk*( + self: WakuKademlia +): Future[seq[RemotePeerInfo]] {.async: (raises: []).} = + let res = catch: + await self.protocol.randomRecords() + let records = res.valueOr: + error "kademlia discovery lookup failed", error = res.error.msg + return - while true: - await sleepAsync(interval) - - let res = catch: - await self.protocol.randomRecords() - let records = res.valueOr: - error "kademlia discovery lookup failed", error = res.error.msg + var peerInfos = newSeqOfCap[RemotePeerInfo](records.len) + for record in records: + let peerInfo = toRemotePeerInfo(record).valueOr: continue - for record in records: - let peerInfo = toRemotePeerInfo(record).valueOr: - continue + self.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) - self.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) + debug "peer added via random walk", + peerId = $peerInfo.peerId, + addresses = peerInfo.addrs.mapIt($it), + protocols = peerInfo.protocols - debug "peer added via random walk", - peerId = $peerInfo.peerId, - addresses = peerInfo.addrs.mapIt($it), - protocols = peerInfo.protocols + peerInfos.add(peerInfo) + + return peerInfos proc lookup*( self: WakuKademlia, codec: string @@ -123,6 +124,28 @@ proc lookup*( return peerInfos +proc periodicRandomWalk( + self: WakuKademlia, interval: Duration +) {.async: (raises: [CancelledError]).} = + debug "periodic random walk started", interval = interval + + while true: + await sleepAsync(interval) + + discard await self.randomWalk() + +proc periodicLookup( + self: WakuKademlia, interval: Duration +) {.async: (raises: [CancelledError]).} = + debug "periodic service lookup started", interval = interval + + while true: + await sleepAsync(interval) + + # For testing lets use only one hard-coded service + # Same as the advertised one + discard await self.lookup("delivery") + proc new*( T: type WakuKademlia, switch: Switch, @@ -149,30 +172,18 @@ proc new*( protocol: kademlia, peerManager: peerManager, loopInterval: loopInterval ) -proc start*(self: WakuKademlia) {.async.} = - if self.protocol.started: - warn "Starting waku kad twice" - return +proc start*(self: WakuKademlia) = + if self.periodicWalkFut.isNil(): + self.periodicWalkFut = self.periodicRandomWalk(self.loopInterval) - info "Starting Waku Kademlia" + if self.periodicLookupFut.isNil(): + self.periodicLookupFut = self.periodicLookup(self.loopInterval) - await self.protocol.start() +proc stop*(self: WakuKademlia) = + if not self.periodicWalkFut.isNil(): + self.periodicWalkFut.cancelSoon() + self.periodicWalkFut = nil - self.walkIntervalFut = self.runDiscoveryLoop(self.loopInterval) - - info "Waku Kademlia Started" - -proc stop*(self: WakuKademlia) {.async.} = - if not self.protocol.started: - return - - info "Stopping Waku Kademlia" - - if not self.walkIntervalFut.isNil(): - self.walkIntervalFut.cancelSoon() - self.walkIntervalFut = nil - - if not self.protocol.isNil(): - await self.protocol.stop() - - info "Successfully stopped Waku Kademlia" + if not self.periodicLookupFut.isNil(): + self.periodicLookupFut.cancelSoon() + self.periodicLookupFut = nil diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 58eee4f29..dce6322f1 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -165,8 +165,7 @@ proc setupProtocols( var providedServices: seq[ServiceInfo] - # For testing lets use only one service - + # For testing lets use only one hard-coded service let deliveryService = ServiceInfo(id: "delivery", data: @[]) providedServices.add(deliveryService) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 03b2f05fb..a1d243eec 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -573,6 +573,8 @@ proc start*(node: WakuNode) {.async.} = if not node.wakuRendezvousClient.isNil(): await node.wakuRendezvousClient.start() + node.wakuKademlia.start() + ## The switch uses this mapper to update peer info addrs ## with announced addrs after start let addressMapper = proc( @@ -616,6 +618,8 @@ proc stop*(node: WakuNode) {.async.} = node.peerManager.stop() + node.wakuKademlia.stop() + if not node.wakuRlnRelay.isNil(): try: await node.wakuRlnRelay.stop() ## this can raise an exception