From d2ba5149af28240e22479a846cc5067702b88749 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Thu, 5 Feb 2026 09:51:05 +0530 Subject: [PATCH] integrate extended kademlia discovery with xpr for mix --- apps/chat2mix/chat2mix.nim | 59 ++++- apps/chat2mix/config_chat2mix.nim | 11 +- tools/confutils/cli_args.nim | 17 ++ waku/discovery/waku_ext_kademlia.nim | 224 ++++++++++++++++++ waku/factory/conf_builder/conf_builder.nim | 6 +- .../kademlia_discovery_conf_builder.nim | 59 +++++ .../conf_builder/waku_conf_builder.nim | 9 +- waku/factory/node_factory.nim | 29 ++- waku/factory/waku_conf.nim | 6 + waku/node/waku_node.nim | 8 + waku/waku_core/peers.nim | 1 + 11 files changed, 413 insertions(+), 16 deletions(-) create mode 100644 waku/discovery/waku_ext_kademlia.nim create mode 100644 waku/factory/conf_builder/kademlia_discovery_conf_builder.nim diff --git a/apps/chat2mix/chat2mix.nim b/apps/chat2mix/chat2mix.nim index 45fd1fa2d..dda9acd79 100644 --- a/apps/chat2mix/chat2mix.nim +++ b/apps/chat2mix/chat2mix.nim @@ -24,12 +24,14 @@ import stream/connection, # create and close stream read / write connections multiaddress, # encode different addressing schemes. For example, /ip4/7.7.7.7/tcp/6543 means it is using IPv4 protocol and TCP + multicodec, peerinfo, # manage the information of a peer, such as peer ID and public / private key peerid, # Implement how peers interact protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs nameresolving/dnsresolver, protocols/mix/curve25519, + protocols/mix/mix_protocol, ] # define DNS resolution import waku/[ @@ -38,6 +40,7 @@ import waku_lightpush/rpc, waku_enr, discovery/waku_dnsdisc, + discovery/waku_ext_kademlia, waku_node, node/waku_metrics, node/peer_manager, @@ -84,6 +87,24 @@ type const MinMixNodePoolSize = 4 +proc parseKadBootstrapNode( + multiAddrStr: string +): Result[(PeerId, seq[MultiAddress]), string] = + ## Parse a multiaddr string that includes /p2p/ into (PeerId, seq[MultiAddress]) + let multiAddr = MultiAddress.init(multiAddrStr).valueOr: + return err("Invalid multiaddress: " & multiAddrStr) + + let peerIdPart = multiAddr.getPart(multiCodec("p2p")).valueOr: + return err("Multiaddress must include /p2p/: " & multiAddrStr) + + let peerIdBytes = peerIdPart.protoArgument().valueOr: + return err("Failed to extract peer ID from multiaddress: " & multiAddrStr) + + let peerId = PeerId.init(peerIdBytes).valueOr: + return err("Invalid peer ID in multiaddress: " & multiAddrStr) + + ok((peerId, @[multiAddr])) + ##################### ## chat2 protobufs ## ##################### @@ -453,14 +474,39 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = (await node.mountMix(conf.clusterId, mixPrivKey, conf.mixnodes)).isOkOr: error "failed to mount waku mix protocol: ", error = $error quit(QuitFailure) - await node.mountRendezvousClient(conf.clusterId) + + # Setup extended kademlia discovery if bootstrap nodes are provided + if conf.kadBootstrapNodes.len > 0: + var kadBootstrapPeers: seq[(PeerId, seq[MultiAddress])] + for nodeStr in conf.kadBootstrapNodes: + let parsed = parseKadBootstrapNode(nodeStr).valueOr: + error "Failed to parse kademlia bootstrap node", node = nodeStr, error = error + continue + kadBootstrapPeers.add(parsed) + + if kadBootstrapPeers.len > 0: + ( + await setupExtendedKademliaDiscovery( + node, + ExtendedKademliaDiscoveryParams( + bootstrapNodes: kadBootstrapPeers, + mixPubKey: some(mixPubKey), + advertiseMix: false, + ), + ) + ).isOkOr: + error "failed to setup kademlia discovery", error = error + quit(QuitFailure) + + #await node.mountRendezvousClient(conf.clusterId) await node.start() node.peerManager.start() + node.startExtendedKademliaDiscoveryLoop() await node.mountLibp2pPing() - await node.mountPeerExchangeClient() + #await node.mountPeerExchangeClient() let pubsubTopic = conf.getPubsubTopic(node, conf.contentTopic) echo "pubsub topic is: " & pubsubTopic let nick = await readNick(transp) @@ -601,15 +647,14 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = node, pubsubTopic, conf.contentTopic, servicePeerInfo, false ) echo "waiting for mix nodes to be discovered..." - while true: - if node.getMixNodePoolSize() >= MinMixNodePoolSize: - break - discard await node.fetchPeerExchangePeers() - await sleepAsync(1000) while node.getMixNodePoolSize() < MinMixNodePoolSize: info "waiting for mix nodes to be discovered", currentpoolSize = node.getMixNodePoolSize() + # Try to lookup mix peers via kademlia if pool is low + let found = await node.lookupMixPeers() + if found > 0: + info "found mix peers via kademlia lookup", count = found await sleepAsync(1000) notice "ready to publish with mix node pool size ", currentpoolSize = node.getMixNodePoolSize() diff --git a/apps/chat2mix/config_chat2mix.nim b/apps/chat2mix/config_chat2mix.nim index ddb7136cb..1875beaa2 100644 --- a/apps/chat2mix/config_chat2mix.nim +++ b/apps/chat2mix/config_chat2mix.nim @@ -203,7 +203,7 @@ type fleet* {. desc: "Select the fleet to connect to. This sets the DNS discovery URL to the selected fleet.", - defaultValue: Fleet.test, + defaultValue: Fleet.none, name: "fleet" .}: Fleet @@ -228,7 +228,14 @@ type desc: "WebSocket Secure Support.", defaultValue: false, name: "websocket-secure-support" - .}: bool ## rln-relay configuration + .}: bool + + ## Kademlia Discovery config + kadBootstrapNodes* {. + desc: + "Peer multiaddr for kademlia discovery bootstrap node (must include /p2p/). Argument may be repeated.", + name: "kad-bootstrap-node" + .}: seq[string] proc parseCmdArg*(T: type MixNodePubInfo, p: string): T = let elements = p.split(":") diff --git a/tools/confutils/cli_args.nim b/tools/confutils/cli_args.nim index 6811e335f..5e4adacb2 100644 --- a/tools/confutils/cli_args.nim +++ b/tools/confutils/cli_args.nim @@ -621,6 +621,20 @@ with the drawback of consuming some more bandwidth.""", name: "mixnode" .}: seq[MixNodePubInfo] + # Kademlia Discovery config + enableKadDiscovery* {. + desc: + "Enable extended kademlia discovery. Can be enabled without bootstrap nodes for the first node in the network.", + defaultValue: false, + name: "enable-kad-discovery" + .}: bool + + kadBootstrapNodes* {. + desc: + "Peer multiaddr for kademlia discovery bootstrap node (must include /p2p/). Argument may be repeated.", + name: "kad-bootstrap-node" + .}: seq[string] + ## websocket config websocketSupport* {. desc: "Enable websocket: true|false", @@ -1057,4 +1071,7 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] = b.rateLimitConf.withRateLimits(n.rateLimits) + b.kademliaDiscoveryConf.withEnabled(n.enableKadDiscovery) + b.kademliaDiscoveryConf.withBootstrapNodes(n.kadBootstrapNodes) + return b.build() diff --git a/waku/discovery/waku_ext_kademlia.nim b/waku/discovery/waku_ext_kademlia.nim new file mode 100644 index 000000000..27eb503d2 --- /dev/null +++ b/waku/discovery/waku_ext_kademlia.nim @@ -0,0 +1,224 @@ +{.push raises: [].} + +import + std/[options, sequtils], + chronos, + chronicles, + results, + libp2p/[peerid, multiaddress], + libp2p/extended_peer_record, + libp2p/crypto/curve25519, + libp2p/protocols/[kademlia, kad_disco], + libp2p/protocols/kademlia_discovery/types as kad_types, + libp2p/protocols/mix/mix_protocol + +import ../waku_core, ../node/waku_node, ../node/peer_manager + +logScope: + topics = "waku extended kademlia discovery" + +const + DefaultExtendedKademliaDiscoveryInterval* = chronos.seconds(5) + ExtendedKademliaDiscoveryStartupDelay* = chronos.seconds(5) + +type ExtendedKademliaDiscoveryParams* = object + bootstrapNodes*: seq[(PeerId, seq[MultiAddress])] + mixPubKey*: Option[Curve25519Key] + advertiseMix*: bool = false + +proc setupExtendedKademliaDiscovery*( + node: WakuNode, params: ExtendedKademliaDiscoveryParams +): Future[Result[void, string]] {.async.} = + if params.bootstrapNodes.len == 0: + info "starting kademlia discovery as seed node (no bootstrap nodes)" + + let kademlia = KademliaDiscovery.new( + node.switch, + bootstrapNodes = params.bootstrapNodes, + config = KadDHTConfig.new( + validator = kad_types.ExtEntryValidator(), selector = kad_types.ExtEntrySelector() + ), + codec = ExtendedKademliaDiscoveryCodec, + ) + + try: + node.switch.mount(kademlia) + except CatchableError: + return err("failed to mount kademlia discovery: " & getCurrentExceptionMsg()) + + # Register services BEFORE starting kademlia so they are included in the + # initial self-signed peer record published to the DHT + if params.advertiseMix: + if params.mixPubKey.isSome(): + discard kademlia.startAdvertising( + ServiceInfo(id: MixProtocolID, data: @(params.mixPubKey.get())) + ) + debug "extended kademlia advertising mix service", + keyHex = params.mixPubKey.get().toHex(), + bootstrapNodes = params.bootstrapNodes.len + else: + warn "mix advertising enabled but no key provided" + + try: + await kademlia.start() + except CatchableError: + return err("failed to start kademlia discovery: " & getCurrentExceptionMsg()) + + node.wakuKademlia = kademlia + + info "kademlia discovery started", + bootstrapNodes = params.bootstrapNodes.len, advertiseMix = params.advertiseMix + + ok() + +proc extractMixPubKey(service: ServiceInfo): Option[Curve25519Key] = + if service.id != MixProtocolID: + trace "service is not mix protocol", serviceId = service.id, mixProtocolId = MixProtocolID + return none(Curve25519Key) + + debug "found mix protocol service", dataLen = service.data.len, expectedLen = Curve25519KeySize + + if service.data.len != Curve25519KeySize: + warn "invalid mix pub key length from kademlia record", + expected = Curve25519KeySize, actual = service.data.len, dataHex = service.data.toHex() + return none(Curve25519Key) + + let key = intoCurve25519Key(service.data) + debug "successfully extracted mix pub key", keyHex = key.toHex() + some(key) + +proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] = + debug "processing kademlia record", + peerId = record.peerId, + numAddresses = record.addresses.len, + numServices = record.services.len, + serviceIds = record.services.mapIt(it.id) + + if record.addresses.len == 0: + trace "kademlia record missing addresses", peerId = record.peerId + return none(RemotePeerInfo) + + let addrs = record.addresses.mapIt(it.address) + if addrs.len == 0: + trace "kademlia record produced no dialable addresses", peerId = record.peerId + return none(RemotePeerInfo) + + let protocols = record.services.mapIt(it.id) + + var mixPubKey = none(Curve25519Key) + for service in record.services: + debug "checking service", peerId = record.peerId, serviceId = service.id, dataLen = service.data.len + mixPubKey = extractMixPubKey(service) + if mixPubKey.isSome(): + debug "extracted mix public key from service", peerId = record.peerId + break + + if record.services.len > 0 and mixPubKey.isNone(): + debug "record has services but no valid mix key", + peerId = record.peerId, + services = record.services.mapIt(it.id) + + some( + RemotePeerInfo.init( + record.peerId, + addrs = addrs, + protocols = protocols, + origin = PeerOrigin.Kademlia, + mixPubKey = mixPubKey, + ) + ) + +proc runExtendedKademliaDiscoveryLoop*( + node: WakuNode, interval = DefaultExtendedKademliaDiscoveryInterval +): Future[void] {.async.} = + info "extended kademlia discovery loop started", interval = interval + + while true: + if node.wakuKademlia.isNil(): + info "extended kademlia discovery loop stopping: protocol disabled" + return + + if not node.started: + await sleepAsync(ExtendedKademliaDiscoveryStartupDelay) + continue + + var records: seq[ExtendedPeerRecord] + try: + records = await node.wakuKademlia.randomRecords() + except CatchableError: + warn "extended kademlia discovery failed", error = getCurrentExceptionMsg() + await sleepAsync(interval) + continue + + debug "received random records from kademlia", numRecords = records.len + + var added = 0 + for record in records: + let peerOpt = remotePeerInfoFrom(record) + if peerOpt.isNone(): + continue + + let peerInfo = peerOpt.get() + node.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) + debug "peer added via extended kademlia discovery", + peerId = $peerInfo.peerId, + addresses = peerInfo.addrs.mapIt($it), + protocols = peerInfo.protocols, + hasMixPubKey = peerInfo.mixPubKey.isSome() + added.inc() + + if added > 0: + info "added peers from extended kademlia discovery", count = added + + await sleepAsync(interval) + +proc startExtendedKademliaDiscoveryLoop*( + node: WakuNode, interval = DefaultExtendedKademliaDiscoveryInterval +) = + if node.wakuKademlia.isNil(): + trace "extended kademlia discovery not started: protocol not mounted" + return + + if not node.kademliaDiscoveryLoop.isNil(): + trace "extended kademlia discovery loop already running" + return + + node.kademliaDiscoveryLoop = node.runExtendedKademliaDiscoveryLoop(interval) + +proc lookupMixPeers*( + node: WakuNode +): Future[int] {.async.} = + ## Lookup mix peers via kademlia and add them to the peer store. + ## Returns the number of mix peers found and added. + if node.wakuKademlia.isNil(): + warn "cannot lookup mix peers: kademlia not mounted" + return 0 + + let mixService = ServiceInfo(id: MixProtocolID, data: @[]) + var records: seq[ExtendedPeerRecord] + try: + records = await node.wakuKademlia.lookup(mixService) + except CatchableError: + warn "mix peer lookup failed", error = getCurrentExceptionMsg() + return 0 + + debug "mix peer lookup returned records", numRecords = records.len + + var added = 0 + for record in records: + let peerOpt = remotePeerInfoFrom(record) + if peerOpt.isNone(): + continue + + let peerInfo = peerOpt.get() + if peerInfo.mixPubKey.isNone(): + continue + + node.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) + info "mix peer added via kademlia lookup", + peerId = $peerInfo.peerId, + mixPubKey = peerInfo.mixPubKey.get().toHex() + added.inc() + + info "mix peer lookup complete", found = added + return added diff --git a/waku/factory/conf_builder/conf_builder.nim b/waku/factory/conf_builder/conf_builder.nim index 37cea76fe..b8d0316c3 100644 --- a/waku/factory/conf_builder/conf_builder.nim +++ b/waku/factory/conf_builder/conf_builder.nim @@ -10,10 +10,12 @@ import ./metrics_server_conf_builder, ./rate_limit_conf_builder, ./rln_relay_conf_builder, - ./mix_conf_builder + ./mix_conf_builder, + ./kademlia_discovery_conf_builder export waku_conf_builder, filter_service_conf_builder, store_sync_conf_builder, store_service_conf_builder, rest_server_conf_builder, dns_discovery_conf_builder, discv5_conf_builder, web_socket_conf_builder, metrics_server_conf_builder, - rate_limit_conf_builder, rln_relay_conf_builder, mix_conf_builder + rate_limit_conf_builder, rln_relay_conf_builder, mix_conf_builder, + kademlia_discovery_conf_builder diff --git a/waku/factory/conf_builder/kademlia_discovery_conf_builder.nim b/waku/factory/conf_builder/kademlia_discovery_conf_builder.nim new file mode 100644 index 000000000..ee0b3beb8 --- /dev/null +++ b/waku/factory/conf_builder/kademlia_discovery_conf_builder.nim @@ -0,0 +1,59 @@ +import chronicles, std/options, results +import libp2p/[peerid, multiaddress, multicodec] +import ../waku_conf + +logScope: + topics = "waku conf builder kademlia discovery" + +####################################### +## Kademlia Discovery Config Builder ## +####################################### +type KademliaDiscoveryConfBuilder* = object + enabled*: Option[bool] + bootstrapNodes*: seq[string] + +proc init*(T: type KademliaDiscoveryConfBuilder): KademliaDiscoveryConfBuilder = + KademliaDiscoveryConfBuilder() + +proc withEnabled*(b: var KademliaDiscoveryConfBuilder, enabled: bool) = + b.enabled = some(enabled) + +proc withBootstrapNodes*( + b: var KademliaDiscoveryConfBuilder, bootstrapNodes: seq[string] +) = + b.bootstrapNodes = bootstrapNodes + +proc parseBootstrapNode( + multiAddrStr: string +): Result[(PeerId, seq[MultiAddress]), string] = + ## Parse a multiaddr string that includes /p2p/ into (PeerId, seq[MultiAddress]) + let multiAddr = MultiAddress.init(multiAddrStr).valueOr: + return err("Invalid multiaddress: " & multiAddrStr) + + let peerIdPart = multiAddr.getPart(multiCodec("p2p")).valueOr: + return err("Multiaddress must include /p2p/: " & multiAddrStr) + + let peerIdBytes = peerIdPart.protoArgument().valueOr: + return err("Failed to extract peer ID from multiaddress: " & multiAddrStr) + + let peerId = PeerId.init(peerIdBytes).valueOr: + return err("Invalid peer ID in multiaddress: " & multiAddrStr) + + # Get the address without the p2p part for connecting + ok((peerId, @[multiAddr])) + +proc build*( + b: KademliaDiscoveryConfBuilder +): Result[Option[KademliaDiscoveryConf], string] = + # Kademlia is enabled if explicitly enabled OR if bootstrap nodes are provided + let enabled = b.enabled.get(false) or b.bootstrapNodes.len > 0 + if not enabled: + return ok(none(KademliaDiscoveryConf)) + + var parsedNodes: seq[(PeerId, seq[MultiAddress])] + for nodeStr in b.bootstrapNodes: + let parsed = parseBootstrapNode(nodeStr).valueOr: + return err("Failed to parse kademlia bootstrap node: " & error) + parsedNodes.add(parsed) + + return ok(some(KademliaDiscoveryConf(bootstrapNodes: parsedNodes))) diff --git a/waku/factory/conf_builder/waku_conf_builder.nim b/waku/factory/conf_builder/waku_conf_builder.nim index b952e711e..e51f02dbd 100644 --- a/waku/factory/conf_builder/waku_conf_builder.nim +++ b/waku/factory/conf_builder/waku_conf_builder.nim @@ -25,7 +25,8 @@ import ./metrics_server_conf_builder, ./rate_limit_conf_builder, ./rln_relay_conf_builder, - ./mix_conf_builder + ./mix_conf_builder, + ./kademlia_discovery_conf_builder logScope: topics = "waku conf builder" @@ -80,6 +81,7 @@ type WakuConfBuilder* = object mixConf*: MixConfBuilder webSocketConf*: WebSocketConfBuilder rateLimitConf*: RateLimitConfBuilder + kademliaDiscoveryConf*: KademliaDiscoveryConfBuilder # End conf builders relay: Option[bool] lightPush: Option[bool] @@ -140,6 +142,7 @@ proc init*(T: type WakuConfBuilder): WakuConfBuilder = storeServiceConf: StoreServiceConfBuilder.init(), webSocketConf: WebSocketConfBuilder.init(), rateLimitConf: RateLimitConfBuilder.init(), + kademliaDiscoveryConf: KademliaDiscoveryConfBuilder.init(), ) proc withNetworkConf*(b: var WakuConfBuilder, networkConf: NetworkConf) = @@ -506,6 +509,9 @@ proc build*( let rateLimit = builder.rateLimitConf.build().valueOr: return err("Rate limits Conf building failed: " & $error) + let kademliaDiscoveryConf = builder.kademliaDiscoveryConf.build().valueOr: + return err("Kademlia Discovery Conf building failed: " & $error) + # End - Build sub-configs let logLevel = @@ -628,6 +634,7 @@ proc build*( restServerConf: restServerConf, dnsDiscoveryConf: dnsDiscoveryConf, mixConf: mixConf, + kademliaDiscoveryConf: kademliaDiscoveryConf, # end confs nodeKey: nodeKey, clusterId: clusterId, diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 2cdfdb0d2..00f0c0d1d 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -6,7 +6,8 @@ import libp2p/protocols/pubsub/gossipsub, libp2p/protocols/connectivity/relay/relay, libp2p/nameresolving/dnsresolver, - libp2p/crypto/crypto + libp2p/crypto/crypto, + libp2p/crypto/curve25519 import ./internal_config, @@ -32,6 +33,7 @@ import ../waku_store_legacy/common as legacy_common, ../waku_filter_v2, ../waku_peer_exchange, + ../discovery/waku_ext_kademlia, ../node/peer_manager, ../node/peer_manager/peer_store/waku_peer_storage, ../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, @@ -165,12 +167,29 @@ proc setupProtocols( #mount mix if conf.mixConf.isSome(): + let mixConf = conf.mixConf.get() + (await node.mountMix(conf.clusterId, mixConf.mixKey, mixConf.mixnodes)).isOkOr: + return err("failed to mount waku mix protocol: " & $error) + + # Setup extended kademlia discovery + if conf.kademliaDiscoveryConf.isSome(): + let mixPubKey = + if conf.mixConf.isSome(): + some(conf.mixConf.get().mixPubKey) + else: + none(Curve25519Key) + ( - await node.mountMix( - conf.clusterId, conf.mixConf.get().mixKey, conf.mixConf.get().mixnodes + await setupExtendedKademliaDiscovery( + node, + ExtendedKademliaDiscoveryParams( + bootstrapNodes: conf.kademliaDiscoveryConf.get().bootstrapNodes, + mixPubKey: mixPubKey, + advertiseMix: conf.mixConf.isSome(), + ), ) ).isOkOr: - return err("failed to mount waku mix protocol: " & $error) + return err("failed to setup kademlia discovery: " & error) if conf.storeServiceConf.isSome(): let storeServiceConf = conf.storeServiceConf.get() @@ -477,6 +496,8 @@ proc startNode*( if conf.relay: node.peerManager.start() + startExtendedKademliaDiscoveryLoop(node) + return ok() proc setupNode*( diff --git a/waku/factory/waku_conf.nim b/waku/factory/waku_conf.nim index 899008221..01574d067 100644 --- a/waku/factory/waku_conf.nim +++ b/waku/factory/waku_conf.nim @@ -4,6 +4,7 @@ import libp2p/crypto/crypto, libp2p/multiaddress, libp2p/crypto/curve25519, + libp2p/peerid, secp256k1, results @@ -51,6 +52,10 @@ type MixConf* = ref object mixPubKey*: Curve25519Key mixnodes*: seq[MixNodePubInfo] +type KademliaDiscoveryConf* = object + bootstrapNodes*: seq[(PeerId, seq[MultiAddress])] + ## Bootstrap nodes for extended kademlia discovery. + type StoreServiceConf* {.requiresInit.} = object dbMigration*: bool dbURl*: string @@ -109,6 +114,7 @@ type WakuConf* {.requiresInit.} = ref object metricsServerConf*: Option[MetricsServerConf] webSocketConf*: Option[WebSocketConf] mixConf*: Option[MixConf] + kademliaDiscoveryConf*: Option[KademliaDiscoveryConf] portsShift*: uint16 dnsAddrsNameServers*: seq[IpAddress] diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index ebe084080..a93e2a00d 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -17,6 +17,7 @@ import libp2p/protocols/ping, libp2p/protocols/pubsub/gossipsub, libp2p/protocols/pubsub/rpc/messages, + libp2p/protocols/kad_disco, libp2p/builders, libp2p/transports/transport, libp2p/transports/tcptransport, @@ -131,6 +132,8 @@ type topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent] rateLimitSettings*: ProtocolRateLimitSettings wakuMix*: WakuMix + wakuKademlia*: KademliaDiscovery + kademliaDiscoveryLoop*: Future[void] proc getShardsGetter(node: WakuNode): GetShards = return proc(): seq[uint16] {.closure, gcsafe, raises: [].} = @@ -534,6 +537,11 @@ proc stop*(node: WakuNode) {.async.} = if not node.wakuPeerExchangeClient.isNil() and not node.wakuPeerExchangeClient.pxLoopHandle.isNil(): await node.wakuPeerExchangeClient.pxLoopHandle.cancelAndWait() + node.wakuPeerExchangeClient.pxLoopHandle = nil + + if not node.kademliaDiscoveryLoop.isNil(): + await node.kademliaDiscoveryLoop.cancelAndWait() + node.kademliaDiscoveryLoop = nil if not node.wakuRendezvous.isNil(): await node.wakuRendezvous.stopWait() diff --git a/waku/waku_core/peers.nim b/waku/waku_core/peers.nim index 48c994403..51a8e1157 100644 --- a/waku/waku_core/peers.nim +++ b/waku/waku_core/peers.nim @@ -38,6 +38,7 @@ type Static PeerExchange Dns + Kademlia PeerDirection* = enum UnknownDirection