From e1ec7ba57a5303b88baa2b9f3364d177bd6ba25f Mon Sep 17 00:00:00 2001 From: SionoiS Date: Fri, 20 Mar 2026 12:30:09 -0400 Subject: [PATCH] init discovery integration --- .gitmodules | 2 +- vendor/nim-libp2p | 2 +- waku/discovery/waku_kademlia.nim | 312 ++++++++++--------------------- waku/factory/node_factory.nim | 69 ++++--- 4 files changed, 132 insertions(+), 253 deletions(-) diff --git a/.gitmodules b/.gitmodules index 6a63491e3..427dec9c4 100644 --- a/.gitmodules +++ b/.gitmodules @@ -12,7 +12,7 @@ path = vendor/nim-libp2p url = https://github.com/vacp2p/nim-libp2p.git ignore = dirty - branch = master + branch = feat--logos-capability-discovery [submodule "vendor/nim-stew"] path = vendor/nim-stew url = https://github.com/status-im/nim-stew.git diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p index ff8d51857..687bbb64d 160000 --- a/vendor/nim-libp2p +++ b/vendor/nim-libp2p @@ -1 +1 @@ -Subproject commit ff8d51857b4b79a68468e7bcc27b2026cca02996 +Subproject commit 687bbb64d48b976fd63dc4f3df4cd7f562cff586 diff --git a/waku/discovery/waku_kademlia.nim b/waku/discovery/waku_kademlia.nim index 94b63a321..16feb2964 100644 --- a/waku/discovery/waku_kademlia.nim +++ b/waku/discovery/waku_kademlia.nim @@ -1,6 +1,6 @@ {.push raises: [].} -import std/[options, sequtils] +import std/[options, sequtils, sugar] import chronos, chronicles, @@ -8,111 +8,23 @@ import stew/byteutils, libp2p/[peerid, multiaddress, switch], libp2p/extended_peer_record, - libp2p/crypto/curve25519, libp2p/protocols/[kademlia, kad_disco], libp2p/protocols/kademlia_discovery/types as kad_types, - libp2p/protocols/mix/mix_protocol + libp2p/protocols/service_discovery/types import waku/waku_core, waku/node/peer_manager logScope: - topics = "waku extended kademlia discovery" + topics = "waku kademlia discovery" -const - DefaultExtendedKademliaDiscoveryInterval* = chronos.seconds(5) - ExtendedKademliaDiscoveryStartupDelay* = chronos.seconds(5) +const DefaultKademliaDiscoveryInterval* = chronos.seconds(10) -type - MixNodePoolSizeProvider* = proc(): int {.gcsafe, raises: [].} - NodeStartedProvider* = proc(): bool {.gcsafe, raises: [].} +type WakuKademlia* = ref object + protocol*: KademliaDiscovery + peerManager: PeerManager + intervalFut: Future[void] - ExtendedKademliaDiscoveryParams* = object - bootstrapNodes*: seq[(PeerId, seq[MultiAddress])] - mixPubKey*: Option[Curve25519Key] - advertiseMix*: bool = false - - WakuKademlia* = ref object - protocol*: KademliaDiscovery - peerManager: PeerManager - discoveryLoop: Future[void] - running*: bool - getMixNodePoolSize: MixNodePoolSizeProvider - isNodeStarted: NodeStartedProvider - -proc new*( - T: type WakuKademlia, - switch: Switch, - params: ExtendedKademliaDiscoveryParams, - peerManager: PeerManager, - getMixNodePoolSize: MixNodePoolSizeProvider = nil, - isNodeStarted: NodeStartedProvider = nil, -): Result[T, string] = - if params.bootstrapNodes.len == 0: - info "creating kademlia discovery as seed node (no bootstrap nodes)" - - let kademlia = KademliaDiscovery.new( - switch, - bootstrapNodes = params.bootstrapNodes, - config = KadDHTConfig.new( - validator = kad_types.ExtEntryValidator(), selector = kad_types.ExtEntrySelector() - ), - codec = ExtendedKademliaDiscoveryCodec, - ) - - try: - switch.mount(kademlia) - except CatchableError: - return err("failed to mount kademlia discovery: " & getCurrentExceptionMsg()) - - # Register services BEFORE starting kademlia so they are included in the - # initial self-signed peer record published to the DHT - if params.advertiseMix: - if params.mixPubKey.isSome(): - let alreadyAdvertising = kademlia.startAdvertising( - ServiceInfo(id: MixProtocolID, data: @(params.mixPubKey.get())) - ) - if alreadyAdvertising: - warn "mix service was already being advertised" - debug "extended kademlia advertising mix service", - keyHex = byteutils.toHex(params.mixPubKey.get()), - bootstrapNodes = params.bootstrapNodes.len - else: - warn "mix advertising enabled but no key provided" - - info "kademlia discovery created", - bootstrapNodes = params.bootstrapNodes.len, advertiseMix = params.advertiseMix - - return ok( - WakuKademlia( - protocol: kademlia, - peerManager: peerManager, - running: false, - getMixNodePoolSize: getMixNodePoolSize, - isNodeStarted: isNodeStarted, - ) - ) - -proc extractMixPubKey(service: ServiceInfo): Option[Curve25519Key] = - if service.id != MixProtocolID: - trace "service is not mix protocol", - serviceId = service.id, mixProtocolId = MixProtocolID - return none(Curve25519Key) - - if service.data.len != Curve25519KeySize: - warn "invalid mix pub key length from kademlia record", - expected = Curve25519KeySize, - actual = service.data.len, - dataHex = byteutils.toHex(service.data) - return none(Curve25519Key) - - debug "found mix protocol service", - dataLen = service.data.len, expectedLen = Curve25519KeySize - - let key = intoCurve25519Key(service.data) - debug "successfully extracted mix pub key", keyHex = byteutils.toHex(key) - return some(key) - -proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] = +proc toRemotePeerInfo(record: ExtendedPeerRecord): Option[RemotePeerInfo] = debug "processing kademlia record", peerId = record.peerId, numAddresses = record.addresses.len, @@ -130,151 +42,119 @@ proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] = let protocols = record.services.mapIt(it.id) - var mixPubKey = none(Curve25519Key) - for service in record.services: - debug "checking service", - peerId = record.peerId, serviceId = service.id, dataLen = service.data.len - mixPubKey = extractMixPubKey(service) - if mixPubKey.isSome(): - debug "extracted mix public key from service", peerId = record.peerId - break - - if record.services.len > 0 and mixPubKey.isNone(): - debug "record has services but no valid mix key", - peerId = record.peerId, services = record.services.mapIt(it.id) - return none(RemotePeerInfo) return some( RemotePeerInfo.init( - record.peerId, - addrs = addrs, - protocols = protocols, - origin = PeerOrigin.Kademlia, - mixPubKey = mixPubKey, + record.peerId, addrs = addrs, protocols = protocols, origin = PeerOrigin.Kademlia ) ) -proc lookupMixPeers*( - wk: WakuKademlia -): Future[Result[int, string]] {.async: (raises: []).} = - ## Lookup mix peers via kademlia and add them to the peer store. - ## Returns the number of mix peers found and added. - if wk.protocol.isNil(): - return err("cannot lookup mix peers: kademlia not mounted") - - let mixService = ServiceInfo(id: MixProtocolID, data: @[]) - var records: seq[ExtendedPeerRecord] - try: - records = await wk.protocol.lookup(mixService) - except CatchableError: - return err("mix peer lookup failed: " & getCurrentExceptionMsg()) - - debug "mix peer lookup returned records", numRecords = records.len - - var added = 0 - for record in records: - let peerOpt = remotePeerInfoFrom(record) - if peerOpt.isNone(): - continue - - let peerInfo = peerOpt.get() - if peerInfo.mixPubKey.isNone(): - continue - - wk.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) - info "mix peer added via kademlia lookup", - peerId = $peerInfo.peerId, mixPubKey = byteutils.toHex(peerInfo.mixPubKey.get()) - added.inc() - - info "mix peer lookup complete", found = added - return ok(added) - proc runDiscoveryLoop( - wk: WakuKademlia, interval: Duration, minMixPeers: int -) {.async: (raises: []).} = - info "extended kademlia discovery loop started", interval = interval + wk: WakuKademlia, interval: Duration +) {.async: (raises: [CancelledError]).} = + info "kademlia discovery loop started", interval = interval - try: - while true: - # Wait for node to be started - if not wk.isNodeStarted.isNil() and not wk.isNodeStarted(): - await sleepAsync(ExtendedKademliaDiscoveryStartupDelay) + while true: + await sleepAsync(interval) + + let res = catch: + await wk.protocol.randomRecords() + let records = res.valueOr: + error "kademlia discovery lookup failed", error = res.error.msg + continue + + for record in records: + let peerInfo = toRemotePeerInfo(record).valueOr: continue - var records: seq[ExtendedPeerRecord] - try: - records = await wk.protocol.randomRecords() - except CatchableError as e: - warn "extended kademlia discovery failed", error = e.msg - await sleepAsync(interval) - continue + wk.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) - debug "received random records from kademlia", numRecords = records.len + debug "peer added via kademlia discovery", + peerId = $peerInfo.peerId, + addresses = peerInfo.addrs.mapIt($it), + protocols = peerInfo.protocols - var added = 0 - for record in records: - let peerOpt = remotePeerInfoFrom(record) - if peerOpt.isNone(): - continue + #TODO peer added metric - let peerInfo = peerOpt.get() - wk.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) - debug "peer added via extended kademlia discovery", - peerId = $peerInfo.peerId, - addresses = peerInfo.addrs.mapIt($it), - protocols = peerInfo.protocols, - hasMixPubKey = peerInfo.mixPubKey.isSome() - added.inc() +proc lookup*( + self: WakuKademlia, codec: string +): Future[seq[RemotePeerInfo]] {.async: (raises: []).} = + let serviceId = hashServiceId(codec) - if added > 0: - info "added peers from extended kademlia discovery", count = added + let catchRes = catch: + await self.protocol.lookup(serviceId) + let lookupRes = catchRes.valueOr: + error "kademlia discovery lookup failed", error = catchRes.error.msg + return - # Targeted mix peer lookup when pool is low - if minMixPeers > 0 and not wk.getMixNodePoolSize.isNil() and - wk.getMixNodePoolSize() < minMixPeers: - debug "mix node pool below threshold, performing targeted lookup", - currentPoolSize = wk.getMixNodePoolSize(), threshold = minMixPeers - let found = (await wk.lookupMixPeers()).valueOr: - warn "targeted mix peer lookup failed", error = error - 0 - if found > 0: - info "found mix peers via targeted kademlia lookup", count = found + let ads = lookupRes.valueOr: + error "kademlia discovery lookup failed", error + return - await sleepAsync(interval) - except CancelledError as e: - debug "extended kademlia discovery loop cancelled", error = e.msg - except CatchableError as e: - error "extended kademlia discovery loop failed", error = e.msg + var peerInfos = newSeqOfCap[RemotePeerInfo](ads.len) + for ad in ads: + let peerInfo = toRemotePeerInfo(ad.data).valueOr: + continue + + self.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) + + debug "peer added via kademlia discovery", + peerId = $peerInfo.peerId, + addresses = peerInfo.addrs.mapIt($it), + protocols = peerInfo.protocols + + #TODO peer added metric + + peerInfos.add(peerInfo) + + return peerInfos + +proc new*( + T: type WakuKademlia, + switch: Switch, + peerManager: PeerManager, + bootstrapNodes: seq[(PeerId, seq[MultiAddress])], + providedServices: var seq[ServiceInfo], +): T = + if bootstrapNodes.len == 0: + info "creating kademlia discovery as seed node (no bootstrap nodes)" + + let kademlia = KademliaDiscovery.new( + switch, + bootstrapNodes = bootstrapNodes, + config = KadDHTConfig.new( + validator = kad_types.ExtEntryValidator(), selector = kad_types.ExtEntrySelector() + ), + services = providedServices, + ) + + info "kademlia service discovery created", bootstrapNodes = bootstrapNodes.len + + return WakuKademlia(protocol: kademlia, peerManager: peerManager) proc start*( - wk: WakuKademlia, - interval: Duration = DefaultExtendedKademliaDiscoveryInterval, - minMixPeers: int = 0, -): Future[Result[void, string]] {.async: (raises: []).} = - if wk.running: - return err("already running") + self: WakuKademlia, interval: Duration = DefaultKademliaDiscoveryInterval +) {.async: (raises: [CancelledError]).} = + if self.protocol.started: + warn "Starting kad-disco twice" + return - try: - await wk.protocol.start() - except CatchableError as e: - return err("failed to start kademlia discovery: " & e.msg) + await self.protocol.start() - wk.discoveryLoop = wk.runDiscoveryLoop(interval, minMixPeers) + self.intervalFut = self.runDiscoveryLoop(interval) info "kademlia discovery started" - return ok() -proc stop*(wk: WakuKademlia) {.async: (raises: []).} = - if not wk.running: +proc stop*(self: WakuKademlia) {.async: (raises: []).} = + if not self.protocol.started: return info "Stopping kademlia discovery" - wk.running = false + if not self.intervalFut.isNil(): + self.intervalFut.cancelSoon() + self.intervalFut = nil - if not wk.discoveryLoop.isNil(): - await wk.discoveryLoop.cancelAndWait() - wk.discoveryLoop = nil + if not self.protocol.isNil(): + await self.protocol.stop() - if not wk.protocol.isNil(): - await wk.protocol.stop() info "Successfully stopped kademlia discovery" diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 52b719b8f..0dc4c8f89 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -1,5 +1,5 @@ import - std/[options, sequtils], + std/[options, sequtils, sugar], chronicles, chronos, libp2p/peerid, @@ -7,7 +7,9 @@ import libp2p/protocols/connectivity/relay/relay, libp2p/nameresolving/dnsresolver, libp2p/crypto/crypto, - libp2p/crypto/curve25519 + libp2p/crypto/curve25519, + libp2p/extended_peer_record, + libp2p/protocols/mix/mix_protocol import ./internal_config, @@ -121,10 +123,11 @@ proc initNode( builder.withRateLimit(conf.rateLimit) builder.withCircuitRelay(relay) - let node = ?builder.build().mapErr( - proc(err: string): string = - "failed to create waku node instance: " & err - ) + let node = + ?builder.build().mapErr( + proc(err: string): string = + "failed to create waku node instance: " & err + ) ok(node) @@ -159,38 +162,18 @@ proc setupProtocols( error "Unrecoverable error occurred", error = msg quit(QuitFailure) + var providedServices: seq[ServiceInfo] + #mount mix if conf.mixConf.isSome(): let mixConf = conf.mixConf.get() + + let mixService = ServiceInfo(id: MixProtocolID, data: @(mixConf.mixKey)) + providedServices.add(mixService) + (await node.mountMix(conf.clusterId, mixConf.mixKey, mixConf.mixnodes)).isOkOr: return err("failed to mount waku mix protocol: " & $error) - # Setup extended kademlia discovery - if conf.kademliaDiscoveryConf.isSome(): - let mixPubKey = - if conf.mixConf.isSome(): - some(conf.mixConf.get().mixPubKey) - else: - none(Curve25519Key) - - node.wakuKademlia = WakuKademlia.new( - node.switch, - ExtendedKademliaDiscoveryParams( - bootstrapNodes: conf.kademliaDiscoveryConf.get().bootstrapNodes, - mixPubKey: mixPubKey, - advertiseMix: conf.mixConf.isSome(), - ), - node.peerManager, - getMixNodePoolSize = proc(): int {.gcsafe, raises: [].} = - if node.wakuMix.isNil(): - 0 - else: - node.getMixNodePoolSize(), - isNodeStarted = proc(): bool {.gcsafe, raises: [].} = - node.started, - ).valueOr: - return err("failed to setup kademlia discovery: " & error) - if conf.storeServiceConf.isSome(): let storeServiceConf = conf.storeServiceConf.get() @@ -399,6 +382,21 @@ proc setupProtocols( if conf.peerExchangeDiscovery: await node.mountPeerExchangeClient() + if conf.kademliaDiscoveryConf.isSome(): + let kademlia = WakuKademlia.new( + node.switch, + node.peerManager, + conf.kademliaDiscoveryConf.get().bootstrapNodes, + providedServices, + ) + + let catchRes = catch: + node.switch.mount(kademlia.protocol) + if catchRes.isErr(): + return err("failed to mount kademlia discovery: " & catchRes.error.msg) + + node.wakuKademlia = kademlia + return ok() ## Start node @@ -451,9 +449,10 @@ proc startNode*( node.peerManager.start() if not node.wakuKademlia.isNil(): - let minMixPeers = if conf.mixConf.isSome(): 4 else: 0 - (await node.wakuKademlia.start(minMixPeers = minMixPeers)).isOkOr: - return err("failed to start kademlia discovery: " & error) + let catchRes = catch: + await node.wakuKademlia.start() + if catchRes.isErr(): + return err("failed to start kademlia discovery: " & catchRes.error.msg) return ok()