From b943ece443a5565359d78e10fa614b34cc30a0de Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 11 Feb 2026 19:18:56 +0530 Subject: [PATCH] address review comments --- apps/chat2mix/chat2mix.nim | 30 ++-- .../kernel_api/protocols/lightpush_api.nim | 4 +- ...aku_ext_kademlia.nim => waku_kademlia.nim} | 158 +++++++++++------- waku/factory/node_factory.nim | 31 ++-- waku/node/waku_node.nim | 12 +- waku/waku_mix/protocol.nim | 3 + 6 files changed, 142 insertions(+), 96 deletions(-) rename waku/discovery/{waku_ext_kademlia.nim => waku_kademlia.nim} (62%) diff --git a/apps/chat2mix/chat2mix.nim b/apps/chat2mix/chat2mix.nim index 8bc1bcaa9..13ff3530e 100644 --- a/apps/chat2mix/chat2mix.nim +++ b/apps/chat2mix/chat2mix.nim @@ -40,7 +40,7 @@ import waku_lightpush/rpc, waku_enr, discovery/waku_dnsdisc, - discovery/waku_ext_kademlia, + discovery/waku_kademlia, waku_node, node/waku_metrics, node/peer_manager, @@ -485,16 +485,19 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = kadBootstrapPeers.add(parsed) if kadBootstrapPeers.len > 0: - ( - await setupExtendedKademliaDiscovery( - node, - ExtendedKademliaDiscoveryParams( - bootstrapNodes: kadBootstrapPeers, - mixPubKey: some(mixPubKey), - advertiseMix: false, - ), - ) - ).isOkOr: + node.wakuKademlia = WakuKademlia.new( + node.switch, + ExtendedKademliaDiscoveryParams( + bootstrapNodes: kadBootstrapPeers, + mixPubKey: some(mixPubKey), + advertiseMix: false, + ), + node.peerManager, + getMixNodePoolSize = proc(): int {.gcsafe, raises: [].} = + if node.wakuMix.isNil(): 0 else: node.getMixNodePoolSize(), + isNodeStarted = proc(): bool {.gcsafe, raises: [].} = + node.started, + ).valueOr: error "failed to setup kademlia discovery", error = error quit(QuitFailure) @@ -503,7 +506,10 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = await node.start() node.peerManager.start() - node.startExtendedKademliaDiscoveryLoop(minMixPeers = MinMixNodePoolSize) + if not node.wakuKademlia.isNil(): + (await node.wakuKademlia.start(minMixPeers = MinMixNodePoolSize)).isOkOr: + error "failed to start kademlia discovery", error = error + quit(QuitFailure) await node.mountLibp2pPing() #await node.mountPeerExchangeClient() diff --git a/library/kernel_api/protocols/lightpush_api.nim b/library/kernel_api/protocols/lightpush_api.nim index 3484f9308..c8b517c17 100644 --- a/library/kernel_api/protocols/lightpush_api.nim +++ b/library/kernel_api/protocols/lightpush_api.nim @@ -45,9 +45,9 @@ proc waku_lightpush_publish( else: some(PubsubTopic($pubsubTopic)) - discard (await ctx.myLib[].node.lightpushPublish(topic, msg, peerOpt)).valueOr: + let messageHash = (await ctx.myLib[].node.lightpushPublish(topic, msg, peerOpt)).valueOr: let errorMsg = error.desc.get($error.code.int) error "PUBLISH failed", error = errorMsg return err(errorMsg) - return ok("") + return ok($messageHash) diff --git a/waku/discovery/waku_ext_kademlia.nim b/waku/discovery/waku_kademlia.nim similarity index 62% rename from waku/discovery/waku_ext_kademlia.nim rename to waku/discovery/waku_kademlia.nim index cd3598bd8..a3927ac60 100644 --- a/waku/discovery/waku_ext_kademlia.nim +++ b/waku/discovery/waku_kademlia.nim @@ -5,14 +5,15 @@ import chronos, chronicles, results, - libp2p/[peerid, multiaddress], + stew/byteutils, + libp2p/[peerid, multiaddress, switch], libp2p/extended_peer_record, libp2p/crypto/curve25519, libp2p/protocols/[kademlia, kad_disco], libp2p/protocols/kademlia_discovery/types as kad_types, libp2p/protocols/mix/mix_protocol -import ../waku_core, ../node/waku_node, ../node/peer_manager +import ../waku_core, ../node/peer_manager logScope: topics = "waku extended kademlia discovery" @@ -21,19 +22,36 @@ const DefaultExtendedKademliaDiscoveryInterval* = chronos.seconds(5) ExtendedKademliaDiscoveryStartupDelay* = chronos.seconds(5) -type ExtendedKademliaDiscoveryParams* = object - bootstrapNodes*: seq[(PeerId, seq[MultiAddress])] - mixPubKey*: Option[Curve25519Key] - advertiseMix*: bool = false +type + MixNodePoolSizeProvider* = proc(): int {.gcsafe, raises: [].} + NodeStartedProvider* = proc(): bool {.gcsafe, raises: [].} -proc setupExtendedKademliaDiscovery*( - node: WakuNode, params: ExtendedKademliaDiscoveryParams -): Future[Result[void, string]] {.async.} = + ExtendedKademliaDiscoveryParams* = object + bootstrapNodes*: seq[(PeerId, seq[MultiAddress])] + mixPubKey*: Option[Curve25519Key] + advertiseMix*: bool = false + + WakuKademlia* = ref object + protocol*: KademliaDiscovery + peerManager: PeerManager + discoveryLoop: Future[void] + running*: bool + getMixNodePoolSize: MixNodePoolSizeProvider + isNodeStarted: NodeStartedProvider + +proc new*( + T: type WakuKademlia, + switch: Switch, + params: ExtendedKademliaDiscoveryParams, + peerManager: PeerManager, + getMixNodePoolSize: MixNodePoolSizeProvider = nil, + isNodeStarted: NodeStartedProvider = nil, +): Result[T, string] = if params.bootstrapNodes.len == 0: - info "starting kademlia discovery as seed node (no bootstrap nodes)" + info "creating kademlia discovery as seed node (no bootstrap nodes)" let kademlia = KademliaDiscovery.new( - node.switch, + switch, bootstrapNodes = params.bootstrapNodes, config = KadDHTConfig.new( validator = kad_types.ExtEntryValidator(), selector = kad_types.ExtEntrySelector() @@ -42,7 +60,7 @@ proc setupExtendedKademliaDiscovery*( ) try: - node.switch.mount(kademlia) + switch.mount(kademlia) except CatchableError: return err("failed to mount kademlia discovery: " & getCurrentExceptionMsg()) @@ -50,26 +68,27 @@ proc setupExtendedKademliaDiscovery*( # initial self-signed peer record published to the DHT if params.advertiseMix: if params.mixPubKey.isSome(): - discard kademlia.startAdvertising( + let alreadyAdvertising = kademlia.startAdvertising( ServiceInfo(id: MixProtocolID, data: @(params.mixPubKey.get())) ) + if alreadyAdvertising: + warn "mix service was already being advertised" debug "extended kademlia advertising mix service", - keyHex = params.mixPubKey.get().toHex(), + keyHex = byteutils.toHex(params.mixPubKey.get()), bootstrapNodes = params.bootstrapNodes.len else: warn "mix advertising enabled but no key provided" - try: - await kademlia.start() - except CatchableError: - return err("failed to start kademlia discovery: " & getCurrentExceptionMsg()) - - node.wakuKademlia = kademlia - - info "kademlia discovery started", + info "kademlia discovery created", bootstrapNodes = params.bootstrapNodes.len, advertiseMix = params.advertiseMix - ok() + ok(WakuKademlia( + protocol: kademlia, + peerManager: peerManager, + running: false, + getMixNodePoolSize: getMixNodePoolSize, + isNodeStarted: isNodeStarted, + )) proc extractMixPubKey(service: ServiceInfo): Option[Curve25519Key] = if service.id != MixProtocolID: @@ -81,14 +100,14 @@ proc extractMixPubKey(service: ServiceInfo): Option[Curve25519Key] = warn "invalid mix pub key length from kademlia record", expected = Curve25519KeySize, actual = service.data.len, - dataHex = service.data.toHex() + dataHex = byteutils.toHex(service.data) return none(Curve25519Key) debug "found mix protocol service", dataLen = service.data.len, expectedLen = Curve25519KeySize let key = intoCurve25519Key(service.data) - debug "successfully extracted mix pub key", keyHex = key.toHex() + debug "successfully extracted mix pub key", keyHex = byteutils.toHex(key) some(key) proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] = @@ -132,20 +151,18 @@ proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] = ) ) -proc lookupMixPeers*(node: WakuNode): Future[int] {.async.} = +proc lookupMixPeers*(wk: WakuKademlia): Future[Result[int, string]] {.async: (raises: []).} = ## Lookup mix peers via kademlia and add them to the peer store. ## Returns the number of mix peers found and added. - if node.wakuKademlia.isNil(): - warn "cannot lookup mix peers: kademlia not mounted" - return 0 + if wk.protocol.isNil(): + return err("cannot lookup mix peers: kademlia not mounted") let mixService = ServiceInfo(id: MixProtocolID, data: @[]) var records: seq[ExtendedPeerRecord] try: - records = await node.wakuKademlia.lookup(mixService) + records = await wk.protocol.lookup(mixService) except CatchableError: - warn "mix peer lookup failed", error = getCurrentExceptionMsg() - return 0 + return err("mix peer lookup failed: " & getCurrentExceptionMsg()) debug "mix peer lookup returned records", numRecords = records.len @@ -159,34 +176,31 @@ proc lookupMixPeers*(node: WakuNode): Future[int] {.async.} = if peerInfo.mixPubKey.isNone(): continue - node.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) + wk.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) info "mix peer added via kademlia lookup", - peerId = $peerInfo.peerId, mixPubKey = peerInfo.mixPubKey.get().toHex() + peerId = $peerInfo.peerId, mixPubKey = byteutils.toHex(peerInfo.mixPubKey.get()) added.inc() info "mix peer lookup complete", found = added - return added + return ok(added) -proc runExtendedKademliaDiscoveryLoop( - node: WakuNode, - interval = DefaultExtendedKademliaDiscoveryInterval, - minMixPeers: int = 0, -): {.async.} = +proc runDiscoveryLoop( + wk: WakuKademlia, + interval: Duration, + minMixPeers: int, +) {.async: (raises: []).} = info "extended kademlia discovery loop started", interval = interval try: - while true: - if node.wakuKademlia.isNil(): - info "extended kademlia discovery loop stopping: protocol disabled" - return - - if not node.started: + while wk.running: + # Wait for node to be started + if not wk.isNodeStarted.isNil() and not wk.isNodeStarted(): await sleepAsync(ExtendedKademliaDiscoveryStartupDelay) continue var records: seq[ExtendedPeerRecord] try: - records = await node.wakuKademlia.randomRecords() + records = await wk.protocol.randomRecords() except CatchableError: warn "extended kademlia discovery failed", error = getCurrentExceptionMsg() await sleepAsync(interval) @@ -201,7 +215,7 @@ proc runExtendedKademliaDiscoveryLoop( continue let peerInfo = peerOpt.get() - node.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) + wk.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) debug "peer added via extended kademlia discovery", peerId = $peerInfo.peerId, addresses = peerInfo.addrs.mapIt($it), @@ -213,10 +227,13 @@ proc runExtendedKademliaDiscoveryLoop( info "added peers from extended kademlia discovery", count = added # Targeted mix peer lookup when pool is low - if minMixPeers > 0 and node.getMixNodePoolSize() < minMixPeers: + if minMixPeers > 0 and not wk.getMixNodePoolSize.isNil() and + wk.getMixNodePoolSize() < minMixPeers: debug "mix node pool below threshold, performing targeted lookup", - currentPoolSize = node.getMixNodePoolSize(), threshold = minMixPeers - let found = await node.lookupMixPeers() + currentPoolSize = wk.getMixNodePoolSize(), threshold = minMixPeers + let found = (await wk.lookupMixPeers()).valueOr: + warn "targeted mix peer lookup failed", error = error + 0 if found > 0: info "found mix peers via targeted kademlia lookup", count = found @@ -226,18 +243,35 @@ proc runExtendedKademliaDiscoveryLoop( except CatchableError as e: error "extended kademlia discovery loop failed", error = e.msg -proc startExtendedKademliaDiscoveryLoop*( - node: WakuNode, - interval = DefaultExtendedKademliaDiscoveryInterval, +proc start*( + wk: WakuKademlia, + interval: Duration = DefaultExtendedKademliaDiscoveryInterval, minMixPeers: int = 0, -) = - if node.wakuKademlia.isNil(): - trace "extended kademlia discovery not started: protocol not mounted" +): Future[Result[void, string]] {.async: (raises: []).} = + if wk.running: + return err("already running") + + try: + await wk.protocol.start() + except CatchableError: + return err("failed to start kademlia discovery: " & getCurrentExceptionMsg()) + + wk.running = true + wk.discoveryLoop = wk.runDiscoveryLoop(interval, minMixPeers) + + info "kademlia discovery started" + ok() + +proc stop*(wk: WakuKademlia): Future[void] {.async: (raises: []).} = + if not wk.running: return - if not node.kademliaDiscoveryLoop.isNil(): - trace "extended kademlia discovery loop already running" - return + info "Stopping kademlia discovery" - node.kademliaDiscoveryLoop = - node.runExtendedKademliaDiscoveryLoop(interval, minMixPeers) + wk.running = false + + if not wk.discoveryLoop.isNil(): + await wk.discoveryLoop.cancelAndWait() + wk.discoveryLoop = nil + + info "Successfully stopped kademlia discovery" diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 50c0eb79b..54be25149 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -33,7 +33,7 @@ import ../waku_store_legacy/common as legacy_common, ../waku_filter_v2, ../waku_peer_exchange, - ../discovery/waku_ext_kademlia, + ../discovery/waku_kademlia, ../node/peer_manager, ../node/peer_manager/peer_store/waku_peer_storage, ../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, @@ -179,16 +179,19 @@ proc setupProtocols( else: none(Curve25519Key) - ( - await setupExtendedKademliaDiscovery( - node, - ExtendedKademliaDiscoveryParams( - bootstrapNodes: conf.kademliaDiscoveryConf.get().bootstrapNodes, - mixPubKey: mixPubKey, - advertiseMix: conf.mixConf.isSome(), - ), - ) - ).isOkOr: + node.wakuKademlia = WakuKademlia.new( + node.switch, + ExtendedKademliaDiscoveryParams( + bootstrapNodes: conf.kademliaDiscoveryConf.get().bootstrapNodes, + mixPubKey: mixPubKey, + advertiseMix: conf.mixConf.isSome(), + ), + node.peerManager, + getMixNodePoolSize = proc(): int {.gcsafe, raises: [].} = + if node.wakuMix.isNil(): 0 else: node.getMixNodePoolSize(), + isNodeStarted = proc(): bool {.gcsafe, raises: [].} = + node.started, + ).valueOr: return err("failed to setup kademlia discovery: " & error) if conf.storeServiceConf.isSome(): @@ -496,8 +499,10 @@ proc startNode*( if conf.relay: node.peerManager.start() - let minMixPeers = if conf.mixConf.isSome(): 4 else: 0 - startExtendedKademliaDiscoveryLoop(node, minMixPeers = minMixPeers) + if not node.wakuKademlia.isNil(): + let minMixPeers = if conf.mixConf.isSome(): 4 else: 0 + (await node.wakuKademlia.start(minMixPeers = minMixPeers)).isOkOr: + return err("failed to start kademlia discovery: " & error) return ok() diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 3b4aaea22..7dba66fea 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -17,7 +17,6 @@ import libp2p/protocols/ping, libp2p/protocols/pubsub/gossipsub, libp2p/protocols/pubsub/rpc/messages, - libp2p/protocols/kad_disco, libp2p/builders, libp2p/transports/transport, libp2p/transports/tcptransport, @@ -62,6 +61,7 @@ import requests/node_requests, common/broker/broker_context, ], + ../discovery/waku_kademlia, ./net_config, ./peer_manager @@ -137,8 +137,7 @@ type topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent] rateLimitSettings*: ProtocolRateLimitSettings wakuMix*: WakuMix - wakuKademlia*: KademliaDiscovery - kademliaDiscoveryLoop*: Future[void] + wakuKademlia*: WakuKademlia proc deduceRelayShard( node: WakuNode, @@ -285,7 +284,7 @@ proc mountAutoSharding*( return ok() proc getMixNodePoolSize*(node: WakuNode): int = - return node.wakuMix.nodePool.len + return node.wakuMix.poolSize proc mountMix*( node: WakuNode, @@ -585,9 +584,8 @@ proc stop*(node: WakuNode) {.async.} = await node.wakuPeerExchangeClient.pxLoopHandle.cancelAndWait() node.wakuPeerExchangeClient.pxLoopHandle = nil - if not node.kademliaDiscoveryLoop.isNil(): - await node.kademliaDiscoveryLoop.cancelAndWait() - node.kademliaDiscoveryLoop = nil + if not node.wakuKademlia.isNil(): + await node.wakuKademlia.stop() if not node.wakuRendezvous.isNil(): await node.wakuRendezvous.stopWait() diff --git a/waku/waku_mix/protocol.nim b/waku/waku_mix/protocol.nim index c77ec46d0..3078035b0 100644 --- a/waku/waku_mix/protocol.nim +++ b/waku/waku_mix/protocol.nim @@ -100,6 +100,9 @@ proc new*( warn "publishing with mix won't work until atleast 3 mix nodes in node pool" return ok(m) +proc poolSize*(mix: WakuMix): int = + mix.nodePool.len + method start*(mix: WakuMix) = info "starting waku mix protocol"