diff --git a/apps/chat2mix/chat2mix.nim b/apps/chat2mix/chat2mix.nim index dda9acd79..8bc1bcaa9 100644 --- a/apps/chat2mix/chat2mix.nim +++ b/apps/chat2mix/chat2mix.nim @@ -503,7 +503,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = await node.start() node.peerManager.start() - node.startExtendedKademliaDiscoveryLoop() + node.startExtendedKademliaDiscoveryLoop(minMixPeers = MinMixNodePoolSize) await node.mountLibp2pPing() #await node.mountPeerExchangeClient() @@ -651,10 +651,6 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = while node.getMixNodePoolSize() < MinMixNodePoolSize: info "waiting for mix nodes to be discovered", currentpoolSize = node.getMixNodePoolSize() - # Try to lookup mix peers via kademlia if pool is low - let found = await node.lookupMixPeers() - if found > 0: - info "found mix peers via kademlia lookup", count = found await sleepAsync(1000) notice "ready to publish with mix node pool size ", currentpoolSize = node.getMixNodePoolSize() diff --git a/waku/discovery/waku_ext_kademlia.nim b/waku/discovery/waku_ext_kademlia.nim index f72c46fda..adf2ed7ce 100644 --- a/waku/discovery/waku_ext_kademlia.nim +++ b/waku/discovery/waku_ext_kademlia.nim @@ -132,8 +132,45 @@ proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] = ) ) +proc lookupMixPeers*(node: WakuNode): Future[int] {.async.} = + ## Lookup mix peers via kademlia and add them to the peer store. + ## Returns the number of mix peers found and added. + if node.wakuKademlia.isNil(): + warn "cannot lookup mix peers: kademlia not mounted" + return 0 + + let mixService = ServiceInfo(id: MixProtocolID, data: @[]) + var records: seq[ExtendedPeerRecord] + try: + records = await node.wakuKademlia.lookup(mixService) + except CatchableError: + warn "mix peer lookup failed", error = getCurrentExceptionMsg() + return 0 + + debug "mix peer lookup returned records", numRecords = records.len + + var added = 0 + for record in records: + let peerOpt = remotePeerInfoFrom(record) + if peerOpt.isNone(): + continue + + let peerInfo = peerOpt.get() + if peerInfo.mixPubKey.isNone(): + continue + + node.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) + info "mix peer added via kademlia lookup", + peerId = $peerInfo.peerId, mixPubKey = peerInfo.mixPubKey.get().toHex() + added.inc() + + info "mix peer lookup complete", found = added + return added + proc runExtendedKademliaDiscoveryLoop*( - node: WakuNode, interval = DefaultExtendedKademliaDiscoveryInterval + node: WakuNode, + interval = DefaultExtendedKademliaDiscoveryInterval, + minMixPeers: int = 0, ): Future[void] {.async.} = info "extended kademlia discovery loop started", interval = interval @@ -175,6 +212,14 @@ proc runExtendedKademliaDiscoveryLoop*( if added > 0: info "added peers from extended kademlia discovery", count = added + # Targeted mix peer lookup when pool is low + if minMixPeers > 0 and node.getMixNodePoolSize() < minMixPeers: + debug "mix node pool below threshold, performing targeted lookup", + currentPoolSize = node.getMixNodePoolSize(), threshold = minMixPeers + let found = await node.lookupMixPeers() + if found > 0: + info "found mix peers via targeted kademlia lookup", count = found + await sleepAsync(interval) except CancelledError: debug "extended kademlia discovery loop cancelled" @@ -182,7 +227,9 @@ proc runExtendedKademliaDiscoveryLoop*( error "extended kademlia discovery loop failed", error = e.msg proc startExtendedKademliaDiscoveryLoop*( - node: WakuNode, interval = DefaultExtendedKademliaDiscoveryInterval + node: WakuNode, + interval = DefaultExtendedKademliaDiscoveryInterval, + minMixPeers: int = 0, ) = if node.wakuKademlia.isNil(): trace "extended kademlia discovery not started: protocol not mounted" @@ -192,39 +239,4 @@ proc startExtendedKademliaDiscoveryLoop*( trace "extended kademlia discovery loop already running" return - node.kademliaDiscoveryLoop = node.runExtendedKademliaDiscoveryLoop(interval) - -proc lookupMixPeers*(node: WakuNode): Future[int] {.async.} = - ## Lookup mix peers via kademlia and add them to the peer store. - ## Returns the number of mix peers found and added. - if node.wakuKademlia.isNil(): - warn "cannot lookup mix peers: kademlia not mounted" - return 0 - - let mixService = ServiceInfo(id: MixProtocolID, data: @[]) - var records: seq[ExtendedPeerRecord] - try: - records = await node.wakuKademlia.lookup(mixService) - except CatchableError: - warn "mix peer lookup failed", error = getCurrentExceptionMsg() - return 0 - - debug "mix peer lookup returned records", numRecords = records.len - - var added = 0 - for record in records: - let peerOpt = remotePeerInfoFrom(record) - if peerOpt.isNone(): - continue - - let peerInfo = peerOpt.get() - if peerInfo.mixPubKey.isNone(): - continue - - node.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) - info "mix peer added via kademlia lookup", - peerId = $peerInfo.peerId, mixPubKey = peerInfo.mixPubKey.get().toHex() - added.inc() - - info "mix peer lookup complete", found = added - return added + node.kademliaDiscoveryLoop = node.runExtendedKademliaDiscoveryLoop(interval, minMixPeers) diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 00f0c0d1d..800b8ea4e 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -496,7 +496,10 @@ proc startNode*( if conf.relay: node.peerManager.start() - startExtendedKademliaDiscoveryLoop(node) + let minMixPeers = + if conf.mixConf.isSome(): 4 + else: 0 + startExtendedKademliaDiscoveryLoop(node, minMixPeers = minMixPeers) return ok()