add service lookup & refactor

This commit is contained in:
SionoiS 2026-04-17 08:53:26 -04:00
parent 19ea5fad91
commit ab995c0a8e
No known key found for this signature in database
GPG Key ID: C9458A8CB1852951
3 changed files with 60 additions and 46 deletions

View File

@ -25,7 +25,8 @@ type WakuKademlia* = ref object
protocol*: KademliaDiscovery
peerManager: PeerManager
loopInterval: Duration
walkIntervalFut: Future[void]
periodicWalkFut: Future[void]
periodicLookupFut: Future[void]
proc toRemotePeerInfo(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
debug "processing kademlia record",
@ -66,30 +67,30 @@ proc toRemotePeerInfo(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
)
)
proc runDiscoveryLoop(
self: WakuKademlia, interval: Duration
) {.async: (raises: [CancelledError]).} =
debug "kademlia discovery loop started", interval = interval
proc randomWalk*(
self: WakuKademlia
): Future[seq[RemotePeerInfo]] {.async: (raises: []).} =
let res = catch:
await self.protocol.randomRecords()
let records = res.valueOr:
error "kademlia discovery lookup failed", error = res.error.msg
return
while true:
await sleepAsync(interval)
let res = catch:
await self.protocol.randomRecords()
let records = res.valueOr:
error "kademlia discovery lookup failed", error = res.error.msg
var peerInfos = newSeqOfCap[RemotePeerInfo](records.len)
for record in records:
let peerInfo = toRemotePeerInfo(record).valueOr:
continue
for record in records:
let peerInfo = toRemotePeerInfo(record).valueOr:
continue
self.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
self.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
debug "peer added via random walk",
peerId = $peerInfo.peerId,
addresses = peerInfo.addrs.mapIt($it),
protocols = peerInfo.protocols
debug "peer added via random walk",
peerId = $peerInfo.peerId,
addresses = peerInfo.addrs.mapIt($it),
protocols = peerInfo.protocols
peerInfos.add(peerInfo)
return peerInfos
proc lookup*(
self: WakuKademlia, codec: string
@ -123,6 +124,28 @@ proc lookup*(
return peerInfos
proc periodicRandomWalk(
self: WakuKademlia, interval: Duration
) {.async: (raises: [CancelledError]).} =
debug "periodic random walk started", interval = interval
while true:
await sleepAsync(interval)
discard await self.randomWalk()
proc periodicLookup(
self: WakuKademlia, interval: Duration
) {.async: (raises: [CancelledError]).} =
debug "periodic service lookup started", interval = interval
while true:
await sleepAsync(interval)
# For testing lets use only one hard-coded service
# Same as the advertised one
discard await self.lookup("delivery")
proc new*(
T: type WakuKademlia,
switch: Switch,
@ -149,30 +172,18 @@ proc new*(
protocol: kademlia, peerManager: peerManager, loopInterval: loopInterval
)
proc start*(self: WakuKademlia) {.async.} =
if self.protocol.started:
warn "Starting waku kad twice"
return
proc start*(self: WakuKademlia) =
if self.periodicWalkFut.isNil():
self.periodicWalkFut = self.periodicRandomWalk(self.loopInterval)
info "Starting Waku Kademlia"
if self.periodicLookupFut.isNil():
self.periodicLookupFut = self.periodicLookup(self.loopInterval)
await self.protocol.start()
proc stop*(self: WakuKademlia) =
if not self.periodicWalkFut.isNil():
self.periodicWalkFut.cancelSoon()
self.periodicWalkFut = nil
self.walkIntervalFut = self.runDiscoveryLoop(self.loopInterval)
info "Waku Kademlia Started"
proc stop*(self: WakuKademlia) {.async.} =
if not self.protocol.started:
return
info "Stopping Waku Kademlia"
if not self.walkIntervalFut.isNil():
self.walkIntervalFut.cancelSoon()
self.walkIntervalFut = nil
if not self.protocol.isNil():
await self.protocol.stop()
info "Successfully stopped Waku Kademlia"
if not self.periodicLookupFut.isNil():
self.periodicLookupFut.cancelSoon()
self.periodicLookupFut = nil

View File

@ -165,8 +165,7 @@ proc setupProtocols(
var providedServices: seq[ServiceInfo]
# For testing lets use only one service
# For testing lets use only one hard-coded service
let deliveryService = ServiceInfo(id: "delivery", data: @[])
providedServices.add(deliveryService)

View File

@ -573,6 +573,8 @@ proc start*(node: WakuNode) {.async.} =
if not node.wakuRendezvousClient.isNil():
await node.wakuRendezvousClient.start()
node.wakuKademlia.start()
## The switch uses this mapper to update peer info addrs
## with announced addrs after start
let addressMapper = proc(
@ -616,6 +618,8 @@ proc stop*(node: WakuNode) {.async.} =
node.peerManager.stop()
node.wakuKademlia.stop()
if not node.wakuRlnRelay.isNil():
try:
await node.wakuRlnRelay.stop() ## this can raise an exception