From c417c7edf8816c8af7e50af60736573b8a59203e Mon Sep 17 00:00:00 2001 From: Simon-Pierre Vivier Date: Thu, 18 Jun 2026 12:51:27 -0400 Subject: [PATCH] feat: integrate service discovery (#3947) --------- Co-authored-by: Fabiana Cecin --- apps/chat2mix/chat2mix.nim | 45 +- .../waku/discovery/waku_kademlia.nim | 415 +++++++++--------- .../waku/events/discovery_events.nim | 15 + logos_delivery/waku/events/events.nim | 10 +- .../kademlia_discovery_conf_builder.nim | 37 +- logos_delivery/waku/factory/node_factory.nim | 50 +-- logos_delivery/waku/factory/waku_conf.nim | 12 +- logos_delivery/waku/node/waku_node.nim | 34 +- tests/all_tests_waku.nim | 3 +- tests/waku_kademlia/test_waku_kademlia.nim | 203 +++++++++ tests/waku_kademlia/utils.nim | 50 +++ tools/confutils/cli_args.nim | 21 + 12 files changed, 622 insertions(+), 273 deletions(-) create mode 100644 logos_delivery/waku/events/discovery_events.nim create mode 100644 tests/waku_kademlia/test_waku_kademlia.nim create mode 100644 tests/waku_kademlia/utils.nim diff --git a/apps/chat2mix/chat2mix.nim b/apps/chat2mix/chat2mix.nim index 8e17dcf25..9bf66ebe5 100644 --- a/apps/chat2mix/chat2mix.nim +++ b/apps/chat2mix/chat2mix.nim @@ -29,6 +29,8 @@ import # manage the information of a peer, such as peer ID and public / private key peerid, # Implement how peers interact protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs + protocols/kademlia/types, + protocols/service_discovery/types as sd_types, nameresolving/dnsresolver, ] # define DNS resolution # libp2p_mix has been extracted into its own package; import from there. @@ -37,6 +39,7 @@ import logos_delivery/waku/[ waku_core, waku_core/topics/sharding, + waku_core/peers, waku_lightpush/common, waku_lightpush/rpc, waku_enr, @@ -45,6 +48,7 @@ import waku_node, node/waku_metrics, node/peer_manager, + factory/waku_conf, factory/builder, common/utils/nat, waku_store/common, @@ -567,30 +571,27 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = if conf.kadBootstrapNodes.len > 0: var kadBootstrapPeers: seq[(PeerId, seq[MultiAddress])] for nodeStr in conf.kadBootstrapNodes: - let (peerId, ma) = parseFullAddress(nodeStr).valueOr: - error "Failed to parse kademlia bootstrap node", node = nodeStr, error = error - continue + let (peerId, ma) = block: + parseFullAddress(nodeStr).isOkOr: + error "Failed to parse kademlia bootstrap node", node = nodeStr, error + continue + kadBootstrapPeers.add((peerId, @[ma])) if kadBootstrapPeers.len > 0: - node.wakuKademlia = WakuKademlia.new( - node.switch, - ExtendedServiceDiscoveryParams( + node.mountKademlia( + KademliaDiscoveryConf( bootstrapNodes: kadBootstrapPeers, - mixPubKey: some(mixPubKey), - advertiseMix: false, - ), - node.peerManager, - rng = libp2p_rng.newBearSslRng(node.rng), - getMixNodePoolSize = proc(): int {.gcsafe, raises: [].} = - if node.wakuMix.isNil(): - 0 - else: - node.getMixNodePoolSize(), - isNodeStarted = proc(): bool {.gcsafe, raises: [].} = - node.started, - ).valueOr: - error "failed to setup kademlia discovery", error = error + servicesToDiscover: @[MixProtocolID], + randomLookupInterval: chronos.seconds(60), + serviceLookupInterval: chronos.seconds(60), + kadDhtConfig: KadDHTConfig.new(), + discoConfig: sd_types.ServiceDiscoveryConfig.new(), + clientMode: false, + xprPublishing: true, + ) + ).isOkOr: + error "failed to setup service discovery", error = error quit(QuitFailure) #await node.mountRendezvousClient(conf.clusterId) @@ -602,10 +603,6 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = await node.start() node.peerManager.start() - if not node.wakuKademlia.isNil(): - (await node.wakuKademlia.start(minMixPeers = MinMixNodePoolSize)).isOkOr: - error "failed to start kademlia discovery", error = error - quit(QuitFailure) await node.mountLibp2pPing() #await node.mountPeerExchangeClient() diff --git a/logos_delivery/waku/discovery/waku_kademlia.nim b/logos_delivery/waku/discovery/waku_kademlia.nim index a659b9847..9b0e43abc 100644 --- a/logos_delivery/waku/discovery/waku_kademlia.nim +++ b/logos_delivery/waku/discovery/waku_kademlia.nim @@ -1,152 +1,94 @@ +import logos_delivery/waku/compat/option_valueor {.push raises: [].} -import logos_delivery/waku/common/option_shims - -import std/[options, sequtils] +import std/[options, sequtils, sets] import chronos, chronicles, results, stew/byteutils, - libp2p/[peerid, multiaddress, switch], + libp2p/[peerid, multiaddress, switch, extended_peer_record], libp2p/extended_peer_record, + libp2p/crypto/crypto, + libp2p/crypto/rng, libp2p/crypto/curve25519, - libp2p/protocols/[kademlia, service_discovery], - libp2p/protocols/service_discovery/types as kad_types, - libp2p/crypto/rng as libp2p_rng, - libp2p_mix/mix_protocol + libp2p/protocols/service_discovery, + libp2p/protocols/service_discovery/types, + libp2p/protocols/kademlia/types, + libp2p_mix/mix_protocol, + libp2p_mix/curve25519 -import logos_delivery/waku/waku_core, logos_delivery/waku/node/peer_manager +import + logos_delivery/waku/waku_core, + logos_delivery/waku/node/peer_manager, + logos_delivery/waku/events/discovery_events logScope: - topics = "waku extended kademlia discovery" + topics = "waku service discovery" const - DefaultExtendedServiceDiscoveryInterval* = chronos.seconds(5) - ExtendedServiceDiscoveryStartupDelay* = chronos.seconds(5) + DefaultServiceDiscoveryInterval* = chronos.seconds(60) + DefaultRandomDiscoveryInterval* = chronos.seconds(60) -type - MixNodePoolSizeProvider* = proc(): int {.gcsafe, raises: [].} - NodeStartedProvider* = proc(): bool {.gcsafe, raises: [].} +type WakuKademlia* = ref object + protocol*: ServiceDiscovery + peerManager: PeerManager + randomLookupLoop: Future[void] + serviceLookupLoop: Future[void] + randomLookupInterval: Duration + serviceLookupInterval: Duration + servicesToDiscover: HashSet[string] + servicesToAdvertise: HashSet[ServiceInfo] - ExtendedServiceDiscoveryParams* = object - bootstrapNodes*: seq[(PeerId, seq[MultiAddress])] - mixPubKey*: Option[Curve25519Key] - advertiseMix*: bool = false +type KademliaDiscoveryConf* = object + bootstrapNodes*: seq[(PeerId, seq[MultiAddress])] + servicesToAdvertise*: HashSet[ServiceInfo] + servicesToDiscover*: HashSet[string] + randomLookupInterval*: Duration + serviceLookupInterval*: Duration + kadDhtConfig*: KadDHTConfig + discoConfig*: ServiceDiscoveryConfig + clientMode*: bool + xprPublishing*: bool - WakuKademlia* = ref object - protocol*: ServiceDiscovery - peerManager: PeerManager - discoveryLoop: Future[void] - running*: bool - getMixNodePoolSize: MixNodePoolSizeProvider - isNodeStarted: NodeStartedProvider - -proc new*( - T: type WakuKademlia, - switch: Switch, - params: ExtendedServiceDiscoveryParams, - peerManager: PeerManager, - rng: libp2p_rng.Rng, - getMixNodePoolSize: MixNodePoolSizeProvider = nil, - isNodeStarted: NodeStartedProvider = nil, -): Result[T, string] = - if params.bootstrapNodes.len == 0: - info "creating kademlia discovery as seed node (no bootstrap nodes)" - - # libp2p 1.15.3: ServiceDiscovery.new requires `rng: Rng` (libp2p type). - let kademlia = ServiceDiscovery.new( - switch, - bootstrapNodes = params.bootstrapNodes, - config = KadDHTConfig.new( - validator = kad_types.ExtEntryValidator(), selector = kad_types.ExtEntrySelector() - ), - rng = rng, - codec = ExtendedServiceDiscoveryCodec, - ) - - try: - switch.mount(kademlia) - except CatchableError: - return err("failed to mount kademlia discovery: " & getCurrentExceptionMsg()) - - # Register services BEFORE starting kademlia so they are included in the - # initial self-signed peer record published to the DHT - if params.advertiseMix: - if params.mixPubKey.isSome(): - kademlia.startAdvertising( - ServiceInfo(id: MixProtocolID, data: @(params.mixPubKey.get())) - ) - debug "extended kademlia advertising mix service", - keyHex = byteutils.toHex(params.mixPubKey.get()), - bootstrapNodes = params.bootstrapNodes.len - else: - warn "mix advertising enabled but no key provided" - - info "kademlia discovery created", - bootstrapNodes = params.bootstrapNodes.len, advertiseMix = params.advertiseMix - - return ok( - WakuKademlia( - protocol: kademlia, - peerManager: peerManager, - running: false, - getMixNodePoolSize: getMixNodePoolSize, - isNodeStarted: isNodeStarted, - ) - ) - -proc extractMixPubKey(service: ServiceInfo): Option[Curve25519Key] = +proc extractMixPubKey*(service: ServiceInfo): Option[Curve25519Key] = if service.id != MixProtocolID: - trace "service is not mix protocol", - serviceId = service.id, mixProtocolId = MixProtocolID return none(Curve25519Key) if service.data.len != Curve25519KeySize: - warn "invalid mix pub key length from kademlia record", + trace "invalid mix pub key length", expected = Curve25519KeySize, actual = service.data.len, dataHex = byteutils.toHex(service.data) return none(Curve25519Key) - debug "found mix protocol service", - dataLen = service.data.len, expectedLen = Curve25519KeySize - let key = intoCurve25519Key(service.data) - debug "successfully extracted mix pub key", keyHex = byteutils.toHex(key) + return some(key) -proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] = - debug "processing kademlia record", - peerId = record.peerId, - numAddresses = record.addresses.len, - numServices = record.services.len, - serviceIds = record.services.mapIt(it.id) - +proc remotePeerInfoFrom*(record: ExtendedPeerRecord): Option[RemotePeerInfo] = if record.addresses.len == 0: - trace "kademlia record missing addresses", peerId = record.peerId + trace "missing addresses", peerId = record.peerId return none(RemotePeerInfo) let addrs = record.addresses.mapIt(it.address) if addrs.len == 0: - trace "kademlia record produced no dialable addresses", peerId = record.peerId + trace "no dialable addresses", peerId = record.peerId return none(RemotePeerInfo) let protocols = record.services.mapIt(it.id) - var mixPubKey = none(Curve25519Key) + var mixPubKey: Option[Curve25519Key] = none(Curve25519Key) for service in record.services: - debug "checking service", - peerId = record.peerId, serviceId = service.id, dataLen = service.data.len - mixPubKey = extractMixPubKey(service) - if mixPubKey.isSome(): - debug "extracted mix public key from service", peerId = record.peerId - break + let key = extractMixPubKey(service).valueOr: + continue + mixPubKey = some(key) + + trace "successfully extracted mix pub key", + peerId = record.peerId, keyHex = byteutils.toHex(mixPubKey.get()) + + break - if record.services.len > 0 and mixPubKey.isNone(): - debug "record has services but no valid mix key", - peerId = record.peerId, services = record.services.mapIt(it.id) - return none(RemotePeerInfo) return some( RemotePeerInfo.init( record.peerId, @@ -157,131 +99,182 @@ proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] = ) ) -proc lookupMixPeers*( - wk: WakuKademlia -): Future[Result[int, string]] {.async: (raises: []).} = - ## Lookup mix peers via kademlia and add them to the peer store. - ## Returns the number of mix peers found and added. - if wk.protocol.isNil(): - return err("cannot lookup mix peers: kademlia not mounted") - - let mixService = ServiceInfo(id: MixProtocolID, data: @[]) - let advertisements = - try: - (await wk.protocol.lookup(mixService)).valueOr: - return err("mix peer lookup failed: " & error) - except CatchableError: - return err("mix peer lookup failed: " & getCurrentExceptionMsg()) - - debug "mix peer lookup returned records", numRecords = advertisements.len - - var added = 0 - for ad in advertisements: - let peerOpt = remotePeerInfoFrom(ad.data) - if peerOpt.isNone(): +proc processRecords( + self: WakuKademlia, records: seq[ExtendedPeerRecord], source: string +): seq[RemotePeerInfo] = + var discovered: seq[RemotePeerInfo] + for record in records: + let peerInfo = remotePeerInfoFrom(record).valueOr: continue - let peerInfo = peerOpt.get() - if peerInfo.mixPubKey.isNone(): + self.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) + + debug "peer added via service discovery", + source, + peerId = $peerInfo.peerId, + addresses = peerInfo.addrs.mapIt($it), + protocols = peerInfo.protocols + + discovered.add(peerInfo) + + return discovered + +proc lookupServicePeers*( + self: WakuKademlia, service: string +): Future[Result[seq[RemotePeerInfo], string]] {.async: (raises: []).} = + if self.protocol.isNil(): + return err("cannot lookup service peers: service discovery not mounted") + + let serviceId = service.hashServiceId() + + let lookupCatch = catch: + (await self.protocol.lookup(serviceId)) + + let lookupResult = lookupCatch.valueOr: + return err("service peer lookup failed: " & error.msg) + + let advertisements = lookupResult.valueOr: + return err("service peer lookup failed: " & lookupResult.error) + + let records = advertisements.mapIt(it.data) + + let discovered = self.processRecords(records, "service lookup") + + debug "service lookup complete", service, found = discovered.len + + return ok(discovered) + +proc runRandomLookupLoop(self: WakuKademlia) {.async: (raises: [CancelledError]).} = + debug "periodic random lookup started", interval = $self.randomLookupInterval + + while true: + await sleepAsync(self.randomLookupInterval) + + let recordsRes = catch: + (await self.protocol.lookupRandom()) + + let records = recordsRes.valueOr: + error "random lookup failed", error continue - wk.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) - info "mix peer added via kademlia lookup", - peerId = $peerInfo.peerId, mixPubKey = byteutils.toHex(peerInfo.mixPubKey.get()) - added.inc() + let discovered = self.processRecords(records, "random walk") - info "mix peer lookup complete", found = added - return ok(added) + if discovered.len > 0: + PeersDiscoveredEvent.emit(peers = discovered) -proc runDiscoveryLoop( - wk: WakuKademlia, interval: Duration, minMixPeers: int -) {.async: (raises: []).} = - info "extended kademlia discovery loop started", interval = interval + debug "random lookup complete", found = discovered.len - try: - while wk.running: - # Wait for node to be started - if not wk.isNodeStarted.isNil() and not wk.isNodeStarted(): - await sleepAsync(ExtendedServiceDiscoveryStartupDelay) +proc runServiceLookupLoop(self: WakuKademlia) {.async: (raises: [CancelledError]).} = + debug "periodic service lookup started", + interval = $self.serviceLookupInterval, services = self.servicesToDiscover + + while true: + await sleepAsync(self.serviceLookupInterval) + + let futs = self.servicesToDiscover.mapIt(self.lookupServicePeers(it)) + + let finishedFuts = await allFinished(futs) + + var discovered: seq[RemotePeerInfo] + for fut in finishedFuts: + let catchRes = catch: + fut.read() + + let res = catchRes.valueOr: + error "service lookup failed", error continue - var records: seq[ExtendedPeerRecord] - try: - records = await wk.protocol.lookupRandom() - except CatchableError as e: - warn "extended kademlia discovery failed", error = e.msg - await sleepAsync(interval) + let peerInfos = res.valueOr: + error "service lookup failed", error continue - debug "received random records from kademlia", numRecords = records.len + for peerInfo in peerInfos: + discovered.add(peerInfo) - var added = 0 - for record in records: - let peerOpt = remotePeerInfoFrom(record) - if peerOpt.isNone(): - continue + if discovered.len > 0: + PeersDiscoveredEvent.emit(peers = discovered) - let peerInfo = peerOpt.get() - wk.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) - debug "peer added via extended kademlia discovery", - peerId = $peerInfo.peerId, - addresses = peerInfo.addrs.mapIt($it), - protocols = peerInfo.protocols, - hasMixPubKey = peerInfo.mixPubKey.isSome() - added.inc() +proc new*( + T: type WakuKademlia, + switch: Switch, + peerManager: PeerManager, + bootstrapNodes: seq[(PeerId, seq[MultiAddress])], + servicesToAdvertise: HashSet[ServiceInfo], + servicesToDiscover: HashSet[string], + randomLookupInterval: Duration = DefaultRandomDiscoveryInterval, + serviceLookupInterval: Duration = DefaultServiceDiscoveryInterval, + rng: Rng, + kadDhtConfig: KadDHTConfig = KadDHTConfig.new(), + discoConfig: ServiceDiscoveryConfig = ServiceDiscoveryConfig.new(), + clientMode: bool = false, + xprPublishing: bool = true, +): Result[T, string] = + if bootstrapNodes.len == 0: + debug "creating service discovery as seed node (no bootstrap nodes)" - if added > 0: - info "added peers from extended kademlia discovery", count = added + let protocol = ServiceDiscovery.new( + switch, + bootstrapNodes = bootstrapNodes, + config = kadDhtConfig, + rng = rng, + client = clientMode, + services = servicesToAdvertise.toSeq(), + discoConfig = discoConfig, + xprPublishing = xprPublishing, + ) - # Targeted mix peer lookup when pool is low - if minMixPeers > 0 and not wk.getMixNodePoolSize.isNil() and - wk.getMixNodePoolSize() < minMixPeers: - debug "mix node pool below threshold, performing targeted lookup", - currentPoolSize = wk.getMixNodePoolSize(), threshold = minMixPeers - let found = (await wk.lookupMixPeers()).valueOr: - warn "targeted mix peer lookup failed", error = error - 0 - if found > 0: - info "found mix peers via targeted kademlia lookup", count = found + let self = WakuKademlia( + protocol: protocol, + peerManager: peerManager, + randomLookupInterval: randomLookupInterval, + serviceLookupInterval: serviceLookupInterval, + servicesToDiscover: servicesToDiscover, + servicesToAdvertise: servicesToAdvertise, + ) - await sleepAsync(interval) - except CancelledError as e: - debug "extended kademlia discovery loop cancelled", error = e.msg - except CatchableError as e: - error "extended kademlia discovery loop failed", error = e.msg + return ok(self) -proc start*( - wk: WakuKademlia, - interval: Duration = DefaultExtendedServiceDiscoveryInterval, - minMixPeers: int = 0, -): Future[Result[void, string]] {.async: (raises: []).} = - if wk.running: - return err("already running") +proc start*(self: WakuKademlia) {.async: (raises: []).} = + for serviceId in self.servicesToDiscover: + discard self.protocol.registerInterest(serviceId) - try: - await wk.protocol.start() - except CatchableError as e: - return err("failed to start kademlia discovery: " & e.msg) + if self.randomLookupLoop.isNil(): + self.randomLookupLoop = self.runRandomLookupLoop() - wk.running = true - - wk.discoveryLoop = wk.runDiscoveryLoop(interval, minMixPeers) + if self.serviceLookupLoop.isNil(): + self.serviceLookupLoop = self.runServiceLookupLoop() info "kademlia discovery started" - return ok() -proc stop*(wk: WakuKademlia) {.async: (raises: []).} = - if not wk.running: - return +proc stop*(self: WakuKademlia) {.async: (raises: []).} = + if not self.serviceLookupLoop.isNil(): + await self.serviceLookupLoop.cancelAndWait() + self.serviceLookupLoop = nil - info "Stopping kademlia discovery" + if not self.randomLookupLoop.isNil(): + await self.randomLookupLoop.cancelAndWait() + self.randomLookupLoop = nil - wk.running = false + info "kademlia discovery stopped" - if not wk.discoveryLoop.isNil(): - await wk.discoveryLoop.cancelAndWait() - wk.discoveryLoop = nil +proc addServiceToDiscover*(self: WakuKademlia, service: string) = + if not self.servicesToDiscover.containsOrIncl(service): + discard self.protocol.registerInterest(service) + debug "added service to discover", service - if not wk.protocol.isNil(): - await wk.protocol.stop() - info "Successfully stopped kademlia discovery" +proc addServiceToAdvertise*(self: WakuKademlia, service: ServiceInfo) = + if not self.servicesToAdvertise.containsOrIncl(service): + self.protocol.startAdvertising(service) + debug "added service to advertise", service = service.id + +proc removeServiceToDiscover*(self: WakuKademlia, service: string) = + if not self.servicesToDiscover.missingOrExcl(service): + self.protocol.unregisterInterest(service) + debug "removed service to discover", service + +proc removeServiceToAdvertise*( + self: WakuKademlia, service: ServiceInfo +) {.async: (raises: [CancelledError]).} = + if not self.servicesToAdvertise.missingOrExcl(service): + await self.protocol.stopAdvertising(service.id) + debug "removed service to advertise", service = service.id diff --git a/logos_delivery/waku/events/discovery_events.nim b/logos_delivery/waku/events/discovery_events.nim new file mode 100644 index 000000000..3baa06ec1 --- /dev/null +++ b/logos_delivery/waku/events/discovery_events.nim @@ -0,0 +1,15 @@ +import libp2p/peerinfo, brokers/[event_broker, request_broker] +import logos_delivery/waku/waku_core + +EventBroker: + # Event emitted when peers are discovered via random or service lookup + type PeersDiscoveredEvent* = object + peers*: seq[RemotePeerInfo] + +RequestBroker: + # Request broker for on-demand service peer lookup + type ServicePeersRequest* = object + serviceId*: string + peers*: seq[RemotePeerInfo] + + proc signature*(serviceId: string): Future[Result[ServicePeersRequest, string]] diff --git a/logos_delivery/waku/events/events.nim b/logos_delivery/waku/events/events.nim index 5a3c0c748..130d7c018 100644 --- a/logos_delivery/waku/events/events.nim +++ b/logos_delivery/waku/events/events.nim @@ -1,3 +1,9 @@ -import ./[message_events, delivery_events, health_events, peer_events, lifecycle_events] +import + ./[ + message_events, delivery_events, health_events, peer_events, lifecycle_events, + discovery_events, + ] -export message_events, delivery_events, health_events, peer_events, lifecycle_events +export + message_events, delivery_events, health_events, peer_events, lifecycle_events, + discovery_events diff --git a/logos_delivery/waku/factory/conf_builder/kademlia_discovery_conf_builder.nim b/logos_delivery/waku/factory/conf_builder/kademlia_discovery_conf_builder.nim index 576b6b7d9..fa1ff4c94 100644 --- a/logos_delivery/waku/factory/conf_builder/kademlia_discovery_conf_builder.nim +++ b/logos_delivery/waku/factory/conf_builder/kademlia_discovery_conf_builder.nim @@ -1,6 +1,9 @@ import chronicles, std/options, results +import logos_delivery/waku/discovery/waku_kademlia +import chronos import libp2p/[peerid, multiaddress, peerinfo] -import logos_delivery/waku/factory/waku_conf +import libp2p/protocols/kademlia/types +import libp2p/protocols/service_discovery/types as sd_types logScope: topics = "waku conf builder kademlia discovery" @@ -8,9 +11,17 @@ logScope: ####################################### ## Kademlia Discovery Config Builder ## ####################################### + +const + DefaultKadEnabled*: bool = false + DefaultRandomLookupInterval* = chronos.seconds(60) + DefaultServiceLookupInterval* = chronos.seconds(60) + type KademliaDiscoveryConfBuilder* = object enabled*: bool bootstrapNodes*: seq[string] + randomLookupInterval*: Option[Duration] + serviceLookupInterval*: Option[Duration] proc init*(T: type KademliaDiscoveryConfBuilder): KademliaDiscoveryConfBuilder = KademliaDiscoveryConfBuilder() @@ -23,6 +34,16 @@ proc withBootstrapNodes*( ) = b.bootstrapNodes = bootstrapNodes +proc withRandomLookupInterval*( + b: var KademliaDiscoveryConfBuilder, interval: Duration +) = + b.randomLookupInterval = some(interval) + +proc withServiceLookupInterval*( + b: var KademliaDiscoveryConfBuilder, interval: Duration +) = + b.serviceLookupInterval = some(interval) + proc build*( b: KademliaDiscoveryConfBuilder ): Result[Option[KademliaDiscoveryConf], string] = @@ -37,4 +58,16 @@ proc build*( return err("Failed to parse kademlia bootstrap node: " & error) parsedNodes.add((peerId, @[ma])) - return ok(some(KademliaDiscoveryConf(bootstrapNodes: parsedNodes))) + return ok( + some( + KademliaDiscoveryConf( + bootstrapNodes: parsedNodes, + randomLookupInterval: b.randomLookupInterval.get(DefaultRandomLookupInterval), + serviceLookupInterval: b.serviceLookupInterval.get(DefaultServiceLookupInterval), + kadDhtConfig: KadDHTConfig.new(), + discoConfig: sd_types.ServiceDiscoveryConfig.new(), + clientMode: false, + xprPublishing: true, + ) + ) + ) diff --git a/logos_delivery/waku/factory/node_factory.nim b/logos_delivery/waku/factory/node_factory.nim index 3dc8b0337..8de1436a7 100644 --- a/logos_delivery/waku/factory/node_factory.nim +++ b/logos_delivery/waku/factory/node_factory.nim @@ -10,6 +10,8 @@ import libp2p/crypto/crypto, libp2p/crypto/curve25519, libp2p/crypto/rng as libp2p_rng, + libp2p/extended_peer_record, + libp2p_mix/mix_protocol, bearssl/rand import @@ -44,7 +46,8 @@ import ../node/peer_manager/peer_store/waku_peer_storage, ../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, ../waku_lightpush_legacy/common, - ../common/rate_limit/setting + ../common/rate_limit/setting, + ../events/discovery_events ## Peer persistence @@ -405,32 +408,29 @@ proc setupProtocols( if mixConf.gifterNode.len > 0: info "Gifter client mode: registration deferred to startNode()" - # Setup extended kademlia discovery + # Setup service discovery if conf.kademliaDiscoveryConf.isSome(): - let mixPubKey = - if conf.mixConf.isSome(): - some(conf.mixConf.get().mixPubKey) - else: - none(Curve25519Key) + var kadConf = conf.kademliaDiscoveryConf.get() - node.wakuKademlia = WakuKademlia.new( - node.switch, - ExtendedServiceDiscoveryParams( - bootstrapNodes: conf.kademliaDiscoveryConf.get().bootstrapNodes, - mixPubKey: mixPubKey, - advertiseMix: conf.mixConf.isSome(), - ), - node.peerManager, - rng = libp2p_rng.newBearSslRng(node.rng), - getMixNodePoolSize = proc(): int {.gcsafe, raises: [].} = - if node.wakuMix.isNil(): - 0 - else: - node.getMixNodePoolSize(), - isNodeStarted = proc(): bool {.gcsafe, raises: [].} = - node.started, - ).valueOr: - return err("failed to setup kademlia discovery: " & error) + if conf.mixConf.isSome(): + let mixService = + ServiceInfo(id: MixProtocolID, data: @(conf.mixConf.get().mixPubKey)) + kadConf.servicesToAdvertise.incl(mixService) + kadConf.servicesToDiscover.incl(mixService.id) + + node.mountKademlia(kadConf).isOkOr: + return err("failed to setup service discovery: " & error) + + # Register ServicePeersRequest provider + ServicePeersRequest.setProvider( + node.brokerCtx, + proc(serviceId: string): Future[Result[ServicePeersRequest, string]] {.async.} = + let peers = (await node.wakuKademlia.lookupServicePeers(serviceId)).valueOr: + return err("failed call to lookupServicePeers: " & error) + return ok(ServicePeersRequest(serviceId: serviceId, peers: peers)), + ).isOkOr: + error "Can't set provider for ServicePeersRequest", error = error + return err("Can't set provider for ServicePeersRequest: " & error) if conf.storeServiceConf.isSome(): let storeServiceConf = conf.storeServiceConf.get() diff --git a/logos_delivery/waku/factory/waku_conf.nim b/logos_delivery/waku/factory/waku_conf.nim index 1274302ed..60e50a941 100644 --- a/logos_delivery/waku/factory/waku_conf.nim +++ b/logos_delivery/waku/factory/waku_conf.nim @@ -1,10 +1,14 @@ import std/[net, options, strutils], chronicles, + chronos, libp2p/crypto/crypto, libp2p/multiaddress, libp2p/crypto/curve25519, libp2p/peerid, + libp2p/extended_peer_record, + libp2p/protocols/kademlia/types, + libp2p/protocols/service_discovery/types as sd_types, secp256k1, results @@ -12,12 +16,14 @@ import ../waku_rln_relay/rln_relay, ../rest_api/endpoint/builder, ../discovery/waku_discv5, + ../discovery/waku_kademlia, ../node/waku_metrics, ../common/logging, ../common/rate_limit/setting, ../waku_enr/capabilities, ./networks_config, - ../waku_mix + ../waku_mix, + ./conf_builder/kademlia_discovery_conf_builder export RlnRelayConf, RlnRelayCreds, RestServerConf, Discv5Conf, MetricsServerConf @@ -60,10 +66,6 @@ type MixConf* = ref object gifterAllowlist*: string gifterAuthKey*: string -type KademliaDiscoveryConf* = object - bootstrapNodes*: seq[(PeerId, seq[MultiAddress])] - ## Bootstrap nodes for extended kademlia discovery. - type StoreServiceConf* {.requiresInit.} = object dbMigration*: bool dbURl*: string diff --git a/logos_delivery/waku/node/waku_node.nim b/logos_delivery/waku/node/waku_node.nim index 0c3fdf0a4..c1f98f3a4 100644 --- a/logos_delivery/waku/node/waku_node.nim +++ b/logos_delivery/waku/node/waku_node.nim @@ -384,6 +384,31 @@ proc mountMix*( return ok() +proc mountKademlia*( + node: WakuNode, config: KademliaDiscoveryConf +): Result[void, string] = + if not node.wakuKademlia.isNil(): + return err("WakuKademlia already mounted, skipping") + + let wk = WakuKademlia.new( + node.switch, node.peerManager, config.bootstrapNodes, config.servicesToAdvertise, + config.servicesToDiscover, config.randomLookupInterval, + config.serviceLookupInterval, node.rng, config.kadDhtConfig, config.discoConfig, + config.clientMode, config.xprPublishing, + ).valueOr: + return err("failed to create service discovery: " & error) + + node.wakuKademlia = wk + + let mountRes = catch: + node.switch.mount(wk.protocol) + mountRes.isOkOr: + return err("failed to mount service discovery: " & error.msg) + + return ok() + +## Waku Sync + proc mountStoreSync*( node: WakuNode, cluster: uint16, @@ -707,6 +732,9 @@ proc start*(node: WakuNode) {.async.} = node.started = true + if not node.wakuKademlia.isNil(): + await node.wakuKademlia.start() + if not node.wakuFilterClient.isNil(): node.wakuFilterClient.registerPushHandler( proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = @@ -733,6 +761,9 @@ proc stop*(node: WakuNode) {.async.} = node.stopProvidersAndListeners() + if not node.wakuKademlia.isNil(): + await node.wakuKademlia.stop() + ## NOTE: This will dispatch gossipsub stop to the WakuRelay.stop method override await node.switch.stop() @@ -755,9 +786,6 @@ proc stop*(node: WakuNode) {.async.} = not node.wakuPeerExchangeClient.pxLoopHandle.isNil(): await node.wakuPeerExchangeClient.pxLoopHandle.cancelAndWait() - if not node.wakuKademlia.isNil(): - await node.wakuKademlia.stop() - if not node.wakuRendezvousClient.isNil(): await node.wakuRendezvousClient.stopWait() diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index dc6bed2b8..16ae76ef5 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -72,7 +72,8 @@ import ./test_waku_switch, ./test_waku_rendezvous, ./test_waku_metadata, - ./waku_discv5/test_waku_discv5 + ./waku_discv5/test_waku_discv5, + ./waku_kademlia/test_waku_kademlia # Waku Keystore test suite import ./test_waku_keystore_keyfile, ./test_waku_keystore diff --git a/tests/waku_kademlia/test_waku_kademlia.nim b/tests/waku_kademlia/test_waku_kademlia.nim new file mode 100644 index 000000000..f4b84bf41 --- /dev/null +++ b/tests/waku_kademlia/test_waku_kademlia.nim @@ -0,0 +1,203 @@ +import + std/[options, sequtils, strutils], + results, + chronos, + chronicles, + testutils/unittests, + libp2p/crypto/crypto as libp2p_keys, + libp2p/crypto/curve25519, + libp2p/[peerid, multiaddress, switch, extended_peer_record], + libp2p/extended_peer_record, + libp2p/protocols/service_discovery/types as sd_types, + libp2p_mix/mix_protocol + +import + logos_delivery/waku/discovery/waku_kademlia, + logos_delivery/waku/waku_core/peers, + logos_delivery/waku/node/peer_manager/waku_peer_store +import ../testlib/[wakucore, testasync, assertions, futures, testutils] +import ./utils as kad_utils + +suite "Waku Kademlia service discovery": + asyncTest "seed node starts with no bootstrap nodes": + let + switch = newTestSwitch() + wk = kad_utils.newTestKademlia( + switch, servicesToAdvertise = @[ServiceInfo(id: "/seed/svc/1.0.0", data: @[])] + ) + await switch.start() + await wk.start() + + await sleepAsync(FUTURE_TIMEOUT) + + check: + not wk.protocol.isNil() + + await wk.stop() + await switch.stop() + + suite "extractMixPubKey": + proc validKeyBytes(): seq[byte] = + var b = newSeq[byte](Curve25519KeySize) + for i in 0 ..< Curve25519KeySize: + b[i] = byte(i) + b + + test "non-mix service returns none": + let svc = ServiceInfo(id: "/foo/1.0.0", data: validKeyBytes()) + check: + extractMixPubKey(svc).isNone() + + test "mix service with wrong data length returns none": + let svc = ServiceInfo(id: MixProtocolID, data: @[0u8, 1u8, 2u8]) + check: + extractMixPubKey(svc).isNone() + + test "mix service with correct length returns some": + let bytes = validKeyBytes() + let svc = ServiceInfo(id: MixProtocolID, data: bytes) + let res = extractMixPubKey(svc) + require: + res.isSome() + let key = res.get() + check: + key.getBytes() == bytes + + test "round-trip matches intoCurve25519Key on raw bytes": + let bytes = validKeyBytes() + let svc = ServiceInfo(id: MixProtocolID, data: bytes) + let extracted = extractMixPubKey(svc).get() + let direct = intoCurve25519Key(bytes) + check: + extracted.getBytes() == direct.getBytes() + + suite "remotePeerInfoFrom": + proc randomPeerId(): PeerId = + PeerId.init(generateSecp256k1Key()).tryGet() + + proc testAddr(port: uint16): MultiAddress = + MultiAddress.init("/ip4/127.0.0.1/tcp/" & $port).tryGet() + + proc mixService(data: seq[byte]): ServiceInfo = + ServiceInfo(id: MixProtocolID, data: data) + + proc validMixService(): ServiceInfo = + var b = newSeq[byte](Curve25519KeySize) + for i in 0 ..< Curve25519KeySize: + b[i] = byte(i) + mixService(b) + + test "empty addresses returns none": + let record = buildExtendedPeerRecord(randomPeerId(), @[]) + check: + remotePeerInfoFrom(record).isNone() + + test "origin set to PeerOrigin.Kademlia": + let + pid = randomPeerId() + record = buildExtendedPeerRecord(pid, @[testAddr(61600)]) + res = remotePeerInfoFrom(record) + require: + res.isSome() + let peerInfo = res.get() + check: + peerInfo.origin == PeerOrigin.Kademlia + peerInfo.peerId == pid + + test "mixPubKey extracted from first mix service": + let + pid = randomPeerId() + svc = validMixService() + record = buildExtendedPeerRecord(pid, @[testAddr(61600)], @[svc]) + res = remotePeerInfoFrom(record) + require: + res.isSome() + let peerInfo = res.get() + check: + peerInfo.mixPubKey.isSome() + peerInfo.mixPubKey.get().getBytes() == svc.data + + test "mixPubKey stays none when no mix service present": + let + pid = randomPeerId() + svc = ServiceInfo(id: "/other/1.0.0", data: @[1u8]) + record = buildExtendedPeerRecord(pid, @[testAddr(61600)], @[svc]) + res = remotePeerInfoFrom(record) + require: + res.isSome() + check: + res.get().mixPubKey.isNone() + + test "addresses mapped correctly": + let + pid = randomPeerId() + addrs = @[testAddr(61600), testAddr(61601), testAddr(61602)] + record = buildExtendedPeerRecord(pid, addrs) + res = remotePeerInfoFrom(record) + require: + res.isSome() + let peerInfo = res.get() + check: + peerInfo.addrs.len == 3 + peerInfo.addrs == addrs + + test "multiple mix services, first one wins": + let + pid = randomPeerId() + firstBytes = block: + var b = newSeq[byte](Curve25519KeySize) + for i in 0 ..< Curve25519KeySize: + b[i] = byte(i) + b + secondBytes = block: + var b = newSeq[byte](Curve25519KeySize) + for i in 0 ..< Curve25519KeySize: + b[i] = byte(i + 100) + b + record = buildExtendedPeerRecord( + pid, @[testAddr(61600)], @[mixService(firstBytes), mixService(secondBytes)] + ) + res = remotePeerInfoFrom(record) + require: + res.isSome() + check: + res.get().mixPubKey.get().getBytes() == firstBytes + + test "mix service with bad key length is skipped silently": + let + pid = randomPeerId() + badSvc = mixService(@[0u8, 1u8, 2u8]) + record = buildExtendedPeerRecord(pid, @[testAddr(61600)], @[badSvc]) + res = remotePeerInfoFrom(record) + require: + res.isSome() + let peerInfo = res.get() + check: + peerInfo.peerId == pid + peerInfo.mixPubKey.isNone() + + suite "lookupServicePeers": + asyncTest "returns err when protocol is nil": + let + switch = newTestSwitch() + wk = kad_utils.newTestKademlia(switch) + wk.protocol = nil + let res = await wk.lookupServicePeers("/some/service/1.0.0") + check: + res.isErr() + res.error.contains("service discovery not mounted") + + asyncTest "returns ok with empty seq when no advertisements": + let + switch = newTestSwitch() + wk = kad_utils.newTestKademlia(switch) + await switch.start() + await wk.start() + + let res = await wk.lookupServicePeers("/nonexistent/service/1.0.0") + check: + res.isOk() + res.value.len == 0 + + await wk.stop() + await switch.stop() diff --git a/tests/waku_kademlia/utils.nim b/tests/waku_kademlia/utils.nim new file mode 100644 index 000000000..1057872d8 --- /dev/null +++ b/tests/waku_kademlia/utils.nim @@ -0,0 +1,50 @@ +{.used.} + +import std/[options, sets] +import chronos, chronicles, results +import libp2p/[peerid, multiaddress, switch] +import libp2p/extended_peer_record +import libp2p/protocols/service_discovery/types as sd_types +import libp2p/crypto/crypto as libp2p_keys + +import + logos_delivery/waku/discovery/waku_kademlia, + logos_delivery/waku/node/peer_manager/peer_manager +import ../testlib/[wakucore, common] + +export wakucore, common, peerid, multiaddress, switch, extended_peer_record, sd_types + +proc newTestKademlia*( + switch: Switch, + bootstrapNodes: seq[(PeerId, seq[MultiAddress])] = @[], + servicesToAdvertise: seq[ServiceInfo] = @[], + servicesToDiscover: seq[string] = @[], + randomLookupInterval: Duration = 100.milliseconds, + serviceLookupInterval: Duration = 100.milliseconds, + clientMode: bool = false, + xprPublishing: bool = true, +): WakuKademlia = + let peerManager = PeerManager.new(switch) + + let wk = WakuKademlia + .new( + switch = switch, + peerManager = peerManager, + bootstrapNodes = bootstrapNodes, + servicesToAdvertise = toHashSet(servicesToAdvertise), + servicesToDiscover = toHashSet(servicesToDiscover), + randomLookupInterval = randomLookupInterval, + serviceLookupInterval = serviceLookupInterval, + rng = rng(), + clientMode = clientMode, + xprPublishing = xprPublishing, + ) + .tryGet() + + switch.mount(wk.protocol) + wk + +proc buildExtendedPeerRecord*( + peerId: PeerId, addrs: seq[MultiAddress], services: seq[ServiceInfo] = @[] +): ExtendedPeerRecord = + ExtendedPeerRecord.init(peerId = peerId, addresses = addrs, services = services) diff --git a/tools/confutils/cli_args.nim b/tools/confutils/cli_args.nim index 73b694ade..95c6c7f2a 100644 --- a/tools/confutils/cli_args.nim +++ b/tools/confutils/cli_args.nim @@ -707,6 +707,18 @@ with the drawback of consuming some more bandwidth.""", name: "kad-bootstrap-node" .}: seq[string] + kadRandomLookupIntervalSec* {. + desc: "Interval seconds between random kademlia lookups.", + defaultValue: 60, + name: "kad-random-lookup-interval" + .}: uint32 + + kadServiceLookupIntervalSec* {. + desc: "Interval seconds between service-specific kademlia lookups.", + defaultValue: 60, + name: "kad-service-lookup-interval" + .}: uint32 + ## websocket config websocketSupport* {. desc: "Enable websocket: true|false", @@ -1192,6 +1204,15 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] = b.kademliaDiscoveryConf.withEnabled(n.enableKadDiscovery) b.kademliaDiscoveryConf.withBootstrapNodes(n.kadBootstrapNodes) + if n.kadRandomLookupIntervalSec > 0: + b.kademliaDiscoveryConf.withRandomLookupInterval( + chronos.seconds(n.kadRandomLookupIntervalSec.int64) + ) + if n.kadServiceLookupIntervalSec > 0: + b.kademliaDiscoveryConf.withServiceLookupInterval( + chronos.seconds(n.kadServiceLookupIntervalSec.int64) + ) + # Mode-driven configuration overrides case n.mode of WakuMode.Core: