feat: integrate service discovery (#3947)

---------

Co-authored-by: Fabiana Cecin <fabiana@waku.org>
This commit is contained in:
Simon-Pierre Vivier 2026-06-18 12:51:27 -04:00 committed by GitHub
parent 03efe6766c
commit a73035e28d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 618 additions and 276 deletions

View File

@ -28,6 +28,8 @@ import
# manage the information of a peer, such as peer ID and public / private key
peerid, # Implement how peers interact
protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs
protocols/kademlia/types,
protocols/service_discovery/types as sd_types,
nameresolving/dnsresolver,
protocols/mix/curve25519,
protocols/mix/mix_protocol,
@ -35,6 +37,7 @@ import
import
logos_delivery/waku/[
waku_core,
waku_core/peers,
waku_lightpush/common,
waku_lightpush/rpc,
waku_enr,
@ -43,6 +46,7 @@ import
waku_node,
node/waku_metrics,
node/peer_manager,
factory/waku_conf,
factory/builder,
common/utils/nat,
waku_store/common,
@ -459,29 +463,27 @@ proc processInput(rfd: AsyncFD, rng: crypto.Rng) {.async.} =
if conf.kadBootstrapNodes.len > 0:
var kadBootstrapPeers: seq[(PeerId, seq[MultiAddress])]
for nodeStr in conf.kadBootstrapNodes:
let (peerId, ma) = parseFullAddress(nodeStr).valueOr:
error "Failed to parse kademlia bootstrap node", node = nodeStr, error = error
continue
let (peerId, ma) = block:
parseFullAddress(nodeStr).isOkOr:
error "Failed to parse kademlia bootstrap node", node = nodeStr, error
continue
kadBootstrapPeers.add((peerId, @[ma]))
if kadBootstrapPeers.len > 0:
node.wakuKademlia = WakuKademlia.new(
node.switch,
ExtendedKademliaDiscoveryParams(
node.mountKademlia(
KademliaDiscoveryConf(
bootstrapNodes: kadBootstrapPeers,
mixPubKey: some(mixPubKey),
advertiseMix: false,
),
node.peerManager,
getMixNodePoolSize = proc(): int {.gcsafe, raises: [].} =
if node.wakuMix.isNil():
0
else:
node.getMixNodePoolSize(),
isNodeStarted = proc(): bool {.gcsafe, raises: [].} =
node.started,
).valueOr:
error "failed to setup kademlia discovery", error = error
servicesToDiscover: @[MixProtocolID],
randomLookupInterval: chronos.seconds(60),
serviceLookupInterval: chronos.seconds(60),
kadDhtConfig: KadDHTConfig.new(),
discoConfig: sd_types.ServiceDiscoveryConfig.new(),
clientMode: false,
xprPublishing: true,
)
).isOkOr:
error "failed to setup service discovery", error = error
quit(QuitFailure)
#await node.mountRendezvousClient(conf.clusterId)
@ -489,10 +491,6 @@ proc processInput(rfd: AsyncFD, rng: crypto.Rng) {.async.} =
await node.start()
node.peerManager.start()
if not node.wakuKademlia.isNil():
(await node.wakuKademlia.start(minMixPeers = MinMixNodePoolSize)).isOkOr:
error "failed to start kademlia discovery", error = error
quit(QuitFailure)
await node.mountLibp2pPing()
#await node.mountPeerExchangeClient()

View File

@ -1,149 +1,94 @@
import logos_delivery/waku/compat/option_valueor
{.push raises: [].}
import std/[options, sequtils]
import std/[options, sequtils, sets]
import
chronos,
chronicles,
results,
stew/byteutils,
libp2p/[peerid, multiaddress, switch],
libp2p/[peerid, multiaddress, switch, extended_peer_record],
libp2p/extended_peer_record,
libp2p/crypto/crypto,
libp2p/crypto/rng,
libp2p/crypto/curve25519,
libp2p/protocols/[kademlia, service_discovery],
libp2p/protocols/service_discovery/types as svdisc_types,
libp2p_mix/mix_protocol
libp2p/protocols/service_discovery,
libp2p/protocols/service_discovery/types,
libp2p/protocols/kademlia/types,
libp2p_mix/mix_protocol,
libp2p_mix/curve25519
import logos_delivery/waku/waku_core, logos_delivery/waku/node/peer_manager
import
logos_delivery/waku/waku_core,
logos_delivery/waku/node/peer_manager,
logos_delivery/waku/events/discovery_events
logScope:
topics = "waku extended kademlia discovery"
topics = "waku service discovery"
const
DefaultExtendedKademliaDiscoveryInterval* = chronos.seconds(5)
ExtendedKademliaDiscoveryStartupDelay* = chronos.seconds(5)
DefaultServiceDiscoveryInterval* = chronos.seconds(60)
DefaultRandomDiscoveryInterval* = chronos.seconds(60)
type
MixNodePoolSizeProvider* = proc(): int {.gcsafe, raises: [].}
NodeStartedProvider* = proc(): bool {.gcsafe, raises: [].}
type WakuKademlia* = ref object
protocol*: ServiceDiscovery
peerManager: PeerManager
randomLookupLoop: Future[void]
serviceLookupLoop: Future[void]
randomLookupInterval: Duration
serviceLookupInterval: Duration
servicesToDiscover: HashSet[string]
servicesToAdvertise: HashSet[ServiceInfo]
ExtendedKademliaDiscoveryParams* = object
bootstrapNodes*: seq[(PeerId, seq[MultiAddress])]
mixPubKey*: Option[Curve25519Key]
advertiseMix*: bool = false
type KademliaDiscoveryConf* = object
bootstrapNodes*: seq[(PeerId, seq[MultiAddress])]
servicesToAdvertise*: HashSet[ServiceInfo]
servicesToDiscover*: HashSet[string]
randomLookupInterval*: Duration
serviceLookupInterval*: Duration
kadDhtConfig*: KadDHTConfig
discoConfig*: ServiceDiscoveryConfig
clientMode*: bool
xprPublishing*: bool
WakuKademlia* = ref object
protocol*: ServiceDiscovery
peerManager: PeerManager
discoveryLoop: Future[void]
running*: bool
getMixNodePoolSize: MixNodePoolSizeProvider
isNodeStarted: NodeStartedProvider
proc new*(
T: type WakuKademlia,
switch: Switch,
params: ExtendedKademliaDiscoveryParams,
peerManager: PeerManager,
getMixNodePoolSize: MixNodePoolSizeProvider = nil,
isNodeStarted: NodeStartedProvider = nil,
): Result[T, string] =
if params.bootstrapNodes.len == 0:
info "creating kademlia discovery as seed node (no bootstrap nodes)"
let kademlia = ServiceDiscovery.new(
switch,
bootstrapNodes = params.bootstrapNodes,
config = KadDHTConfig.new(
validator = svdisc_types.ExtEntryValidator(),
selector = svdisc_types.ExtEntrySelector(),
),
rng = switch.rng,
codec = ExtendedServiceDiscoveryCodec,
)
try:
switch.mount(kademlia)
except CatchableError:
return err("failed to mount kademlia discovery: " & getCurrentExceptionMsg())
# Register services BEFORE starting kademlia so they are included in the
# initial self-signed peer record published to the DHT
if params.advertiseMix:
if params.mixPubKey.isSome():
kademlia.startAdvertising(
ServiceInfo(id: MixProtocolID, data: @(params.mixPubKey.get()))
)
debug "extended kademlia advertising mix service",
keyHex = byteutils.toHex(params.mixPubKey.get()),
bootstrapNodes = params.bootstrapNodes.len
else:
warn "mix advertising enabled but no key provided"
info "kademlia discovery created",
bootstrapNodes = params.bootstrapNodes.len, advertiseMix = params.advertiseMix
return ok(
WakuKademlia(
protocol: kademlia,
peerManager: peerManager,
running: false,
getMixNodePoolSize: getMixNodePoolSize,
isNodeStarted: isNodeStarted,
)
)
proc extractMixPubKey(service: ServiceInfo): Option[Curve25519Key] =
proc extractMixPubKey*(service: ServiceInfo): Option[Curve25519Key] =
if service.id != MixProtocolID:
trace "service is not mix protocol",
serviceId = service.id, mixProtocolId = MixProtocolID
return none(Curve25519Key)
if service.data.len != Curve25519KeySize:
warn "invalid mix pub key length from kademlia record",
trace "invalid mix pub key length",
expected = Curve25519KeySize,
actual = service.data.len,
dataHex = byteutils.toHex(service.data)
return none(Curve25519Key)
debug "found mix protocol service",
dataLen = service.data.len, expectedLen = Curve25519KeySize
let key = intoCurve25519Key(service.data)
debug "successfully extracted mix pub key", keyHex = byteutils.toHex(key)
return some(key)
proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
debug "processing kademlia record",
peerId = record.peerId,
numAddresses = record.addresses.len,
numServices = record.services.len,
serviceIds = record.services.mapIt(it.id)
proc remotePeerInfoFrom*(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
if record.addresses.len == 0:
trace "kademlia record missing addresses", peerId = record.peerId
trace "missing addresses", peerId = record.peerId
return none(RemotePeerInfo)
let addrs = record.addresses.mapIt(it.address)
if addrs.len == 0:
trace "kademlia record produced no dialable addresses", peerId = record.peerId
trace "no dialable addresses", peerId = record.peerId
return none(RemotePeerInfo)
let protocols = record.services.mapIt(it.id)
var mixPubKey = none(Curve25519Key)
var mixPubKey: Option[Curve25519Key] = none(Curve25519Key)
for service in record.services:
debug "checking service",
peerId = record.peerId, serviceId = service.id, dataLen = service.data.len
mixPubKey = extractMixPubKey(service)
if mixPubKey.isSome():
debug "extracted mix public key from service", peerId = record.peerId
break
let key = extractMixPubKey(service).valueOr:
continue
mixPubKey = some(key)
trace "successfully extracted mix pub key",
peerId = record.peerId, keyHex = byteutils.toHex(mixPubKey.get())
break
if record.services.len > 0 and mixPubKey.isNone():
debug "record has services but no valid mix key",
peerId = record.peerId, services = record.services.mapIt(it.id)
return none(RemotePeerInfo)
return some(
RemotePeerInfo.init(
record.peerId,
@ -154,130 +99,182 @@ proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
)
)
proc lookupMixPeers*(
wk: WakuKademlia
): Future[Result[int, string]] {.async: (raises: []).} =
## Lookup mix peers via kademlia and add them to the peer store.
## Returns the number of mix peers found and added.
if wk.protocol.isNil():
return err("cannot lookup mix peers: kademlia not mounted")
let mixService = ServiceInfo(id: MixProtocolID, data: @[])
var records: seq[ExtendedPeerRecord]
try:
let advertisements = (await wk.protocol.lookup(mixService)).valueOr:
return err("mix peer lookup failed: " & $error)
records = advertisements.mapIt(it.data)
except CatchableError:
return err("mix peer lookup failed: " & getCurrentExceptionMsg())
debug "mix peer lookup returned records", numRecords = records.len
var added = 0
proc processRecords(
self: WakuKademlia, records: seq[ExtendedPeerRecord], source: string
): seq[RemotePeerInfo] =
var discovered: seq[RemotePeerInfo]
for record in records:
let peerOpt = remotePeerInfoFrom(record)
if peerOpt.isNone():
let peerInfo = remotePeerInfoFrom(record).valueOr:
continue
let peerInfo = peerOpt.get()
if peerInfo.mixPubKey.isNone():
self.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
debug "peer added via service discovery",
source,
peerId = $peerInfo.peerId,
addresses = peerInfo.addrs.mapIt($it),
protocols = peerInfo.protocols
discovered.add(peerInfo)
return discovered
proc lookupServicePeers*(
self: WakuKademlia, service: string
): Future[Result[seq[RemotePeerInfo], string]] {.async: (raises: []).} =
if self.protocol.isNil():
return err("cannot lookup service peers: service discovery not mounted")
let serviceId = service.hashServiceId()
let lookupCatch = catch:
(await self.protocol.lookup(serviceId))
let lookupResult = lookupCatch.valueOr:
return err("service peer lookup failed: " & error.msg)
let advertisements = lookupResult.valueOr:
return err("service peer lookup failed: " & lookupResult.error)
let records = advertisements.mapIt(it.data)
let discovered = self.processRecords(records, "service lookup")
debug "service lookup complete", service, found = discovered.len
return ok(discovered)
proc runRandomLookupLoop(self: WakuKademlia) {.async: (raises: [CancelledError]).} =
debug "periodic random lookup started", interval = $self.randomLookupInterval
while true:
await sleepAsync(self.randomLookupInterval)
let recordsRes = catch:
(await self.protocol.lookupRandom())
let records = recordsRes.valueOr:
error "random lookup failed", error
continue
wk.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
info "mix peer added via kademlia lookup",
peerId = $peerInfo.peerId, mixPubKey = byteutils.toHex(peerInfo.mixPubKey.get())
added.inc()
let discovered = self.processRecords(records, "random walk")
info "mix peer lookup complete", found = added
return ok(added)
if discovered.len > 0:
PeersDiscoveredEvent.emit(peers = discovered)
proc runDiscoveryLoop(
wk: WakuKademlia, interval: Duration, minMixPeers: int
) {.async: (raises: []).} =
info "extended kademlia discovery loop started", interval = interval
debug "random lookup complete", found = discovered.len
try:
while true:
# Wait for node to be started
if not wk.isNodeStarted.isNil() and not wk.isNodeStarted():
await sleepAsync(ExtendedKademliaDiscoveryStartupDelay)
proc runServiceLookupLoop(self: WakuKademlia) {.async: (raises: [CancelledError]).} =
debug "periodic service lookup started",
interval = $self.serviceLookupInterval, services = self.servicesToDiscover
while true:
await sleepAsync(self.serviceLookupInterval)
let futs = self.servicesToDiscover.mapIt(self.lookupServicePeers(it))
let finishedFuts = await allFinished(futs)
var discovered: seq[RemotePeerInfo]
for fut in finishedFuts:
let catchRes = catch:
fut.read()
let res = catchRes.valueOr:
error "service lookup failed", error
continue
var records: seq[ExtendedPeerRecord]
try:
records = await wk.protocol.lookupRandom()
except CatchableError as e:
warn "extended kademlia discovery failed", error = e.msg
await sleepAsync(interval)
let peerInfos = res.valueOr:
error "service lookup failed", error
continue
debug "received random records from kademlia", numRecords = records.len
for peerInfo in peerInfos:
discovered.add(peerInfo)
var added = 0
for record in records:
let peerOpt = remotePeerInfoFrom(record)
if peerOpt.isNone():
continue
if discovered.len > 0:
PeersDiscoveredEvent.emit(peers = discovered)
let peerInfo = peerOpt.get()
wk.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
debug "peer added via extended kademlia discovery",
peerId = $peerInfo.peerId,
addresses = peerInfo.addrs.mapIt($it),
protocols = peerInfo.protocols,
hasMixPubKey = peerInfo.mixPubKey.isSome()
added.inc()
proc new*(
T: type WakuKademlia,
switch: Switch,
peerManager: PeerManager,
bootstrapNodes: seq[(PeerId, seq[MultiAddress])],
servicesToAdvertise: HashSet[ServiceInfo],
servicesToDiscover: HashSet[string],
randomLookupInterval: Duration = DefaultRandomDiscoveryInterval,
serviceLookupInterval: Duration = DefaultServiceDiscoveryInterval,
rng: Rng,
kadDhtConfig: KadDHTConfig = KadDHTConfig.new(),
discoConfig: ServiceDiscoveryConfig = ServiceDiscoveryConfig.new(),
clientMode: bool = false,
xprPublishing: bool = true,
): Result[T, string] =
if bootstrapNodes.len == 0:
debug "creating service discovery as seed node (no bootstrap nodes)"
if added > 0:
info "added peers from extended kademlia discovery", count = added
let protocol = ServiceDiscovery.new(
switch,
bootstrapNodes = bootstrapNodes,
config = kadDhtConfig,
rng = rng,
client = clientMode,
services = servicesToAdvertise.toSeq(),
discoConfig = discoConfig,
xprPublishing = xprPublishing,
)
# Targeted mix peer lookup when pool is low
if minMixPeers > 0 and not wk.getMixNodePoolSize.isNil() and
wk.getMixNodePoolSize() < minMixPeers:
debug "mix node pool below threshold, performing targeted lookup",
currentPoolSize = wk.getMixNodePoolSize(), threshold = minMixPeers
let found = (await wk.lookupMixPeers()).valueOr:
warn "targeted mix peer lookup failed", error = error
0
if found > 0:
info "found mix peers via targeted kademlia lookup", count = found
let self = WakuKademlia(
protocol: protocol,
peerManager: peerManager,
randomLookupInterval: randomLookupInterval,
serviceLookupInterval: serviceLookupInterval,
servicesToDiscover: servicesToDiscover,
servicesToAdvertise: servicesToAdvertise,
)
await sleepAsync(interval)
except CancelledError as e:
debug "extended kademlia discovery loop cancelled", error = e.msg
except CatchableError as e:
error "extended kademlia discovery loop failed", error = e.msg
return ok(self)
proc start*(
wk: WakuKademlia,
interval: Duration = DefaultExtendedKademliaDiscoveryInterval,
minMixPeers: int = 0,
): Future[Result[void, string]] {.async: (raises: []).} =
if wk.running:
return err("already running")
proc start*(self: WakuKademlia) {.async: (raises: []).} =
for serviceId in self.servicesToDiscover:
discard self.protocol.registerInterest(serviceId)
try:
await wk.protocol.start()
except CatchableError as e:
return err("failed to start kademlia discovery: " & e.msg)
if self.randomLookupLoop.isNil():
self.randomLookupLoop = self.runRandomLookupLoop()
wk.discoveryLoop = wk.runDiscoveryLoop(interval, minMixPeers)
if self.serviceLookupLoop.isNil():
self.serviceLookupLoop = self.runServiceLookupLoop()
info "kademlia discovery started"
return ok()
proc stop*(wk: WakuKademlia) {.async: (raises: []).} =
if not wk.running:
return
proc stop*(self: WakuKademlia) {.async: (raises: []).} =
if not self.serviceLookupLoop.isNil():
await self.serviceLookupLoop.cancelAndWait()
self.serviceLookupLoop = nil
info "Stopping kademlia discovery"
if not self.randomLookupLoop.isNil():
await self.randomLookupLoop.cancelAndWait()
self.randomLookupLoop = nil
wk.running = false
info "kademlia discovery stopped"
if not wk.discoveryLoop.isNil():
await wk.discoveryLoop.cancelAndWait()
wk.discoveryLoop = nil
proc addServiceToDiscover*(self: WakuKademlia, service: string) =
if not self.servicesToDiscover.containsOrIncl(service):
discard self.protocol.registerInterest(service)
debug "added service to discover", service
if not wk.protocol.isNil():
await wk.protocol.stop()
info "Successfully stopped kademlia discovery"
proc addServiceToAdvertise*(self: WakuKademlia, service: ServiceInfo) =
if not self.servicesToAdvertise.containsOrIncl(service):
self.protocol.startAdvertising(service)
debug "added service to advertise", service = service.id
proc removeServiceToDiscover*(self: WakuKademlia, service: string) =
if not self.servicesToDiscover.missingOrExcl(service):
self.protocol.unregisterInterest(service)
debug "removed service to discover", service
proc removeServiceToAdvertise*(
self: WakuKademlia, service: ServiceInfo
) {.async: (raises: [CancelledError]).} =
if not self.servicesToAdvertise.missingOrExcl(service):
await self.protocol.stopAdvertising(service.id)
debug "removed service to advertise", service = service.id

View File

@ -0,0 +1,15 @@
import libp2p/peerinfo, brokers/[event_broker, request_broker]
import logos_delivery/waku/waku_core
EventBroker:
# Event emitted when peers are discovered via random or service lookup
type PeersDiscoveredEvent* = object
peers*: seq[RemotePeerInfo]
RequestBroker:
# Request broker for on-demand service peer lookup
type ServicePeersRequest* = object
serviceId*: string
peers*: seq[RemotePeerInfo]
proc signature*(serviceId: string): Future[Result[ServicePeersRequest, string]]

View File

@ -1,3 +1,9 @@
import ./[message_events, delivery_events, health_events, peer_events, lifecycle_events]
import
./[
message_events, delivery_events, health_events, peer_events, lifecycle_events,
discovery_events,
]
export message_events, delivery_events, health_events, peer_events, lifecycle_events
export
message_events, delivery_events, health_events, peer_events, lifecycle_events,
discovery_events

View File

@ -1,19 +1,24 @@
import logos_delivery/waku/compat/option_valueor
import chronicles, std/options, results
import logos_delivery/waku/discovery/waku_kademlia
import chronos
import libp2p/[peerid, multiaddress, peerinfo]
import logos_delivery/waku/factory/waku_conf
import libp2p/protocols/kademlia/types
import libp2p/protocols/service_discovery/types as sd_types
logScope:
topics = "waku conf builder kademlia discovery"
const DefaultKadEnabled*: bool = false
const
DefaultKadEnabled*: bool = false
DefaultRandomLookupInterval* = chronos.seconds(60)
DefaultServiceLookupInterval* = chronos.seconds(60)
#######################################
## Kademlia Discovery Config Builder ##
#######################################
type KademliaDiscoveryConfBuilder* = object
enabled*: Option[bool]
bootstrapNodes*: seq[string]
randomLookupInterval*: Option[Duration]
serviceLookupInterval*: Option[Duration]
proc init*(T: type KademliaDiscoveryConfBuilder): KademliaDiscoveryConfBuilder =
KademliaDiscoveryConfBuilder()
@ -26,6 +31,16 @@ proc withBootstrapNodes*(
) =
b.bootstrapNodes = bootstrapNodes
proc withRandomLookupInterval*(
b: var KademliaDiscoveryConfBuilder, interval: Duration
) =
b.randomLookupInterval = some(interval)
proc withServiceLookupInterval*(
b: var KademliaDiscoveryConfBuilder, interval: Duration
) =
b.serviceLookupInterval = some(interval)
proc build*(
b: KademliaDiscoveryConfBuilder
): Result[Option[KademliaDiscoveryConf], string] =
@ -35,10 +50,23 @@ proc build*(
# Otherwise enabled if config-enabled or any bootstrap nodes are provided.
if not b.enabled.get(DefaultKadEnabled) and b.bootstrapNodes.len == 0:
return ok(none(KademliaDiscoveryConf))
var parsedNodes: seq[(PeerId, seq[MultiAddress])]
for nodeStr in b.bootstrapNodes:
let (peerId, ma) = parseFullAddress(nodeStr).valueOr:
return err("Failed to parse kademlia bootstrap node: " & error)
parsedNodes.add((peerId, @[ma]))
return ok(some(KademliaDiscoveryConf(bootstrapNodes: parsedNodes)))
return ok(
some(
KademliaDiscoveryConf(
bootstrapNodes: parsedNodes,
randomLookupInterval: b.randomLookupInterval.get(DefaultRandomLookupInterval),
serviceLookupInterval: b.serviceLookupInterval.get(DefaultServiceLookupInterval),
kadDhtConfig: KadDHTConfig.new(),
discoConfig: sd_types.ServiceDiscoveryConfig.new(),
clientMode: false,
xprPublishing: true,
)
)
)

View File

@ -8,7 +8,9 @@ import
libp2p/protocols/connectivity/relay/relay,
libp2p/nameresolving/dnsresolver,
libp2p/crypto/crypto,
libp2p/crypto/curve25519
libp2p/crypto/curve25519,
libp2p/extended_peer_record,
libp2p_mix/mix_protocol
import
./internal_config,
@ -36,7 +38,8 @@ import
../node/peer_manager/peer_store/waku_peer_storage,
../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
../waku_lightpush_legacy/common,
../common/rate_limit/setting
../common/rate_limit/setting,
../events/discovery_events
## Peer persistence
@ -167,31 +170,29 @@ proc setupProtocols(
(await node.mountMix(conf.clusterId, mixConf.mixKey, mixConf.mixnodes)).isOkOr:
return err("failed to mount waku mix protocol: " & $error)
# Setup extended kademlia discovery
# Setup service discovery
if conf.kademliaDiscoveryConf.isSome():
let mixPubKey =
if conf.mixConf.isSome():
some(conf.mixConf.get().mixPubKey)
else:
none(Curve25519Key)
var kadConf = conf.kademliaDiscoveryConf.get()
node.wakuKademlia = WakuKademlia.new(
node.switch,
ExtendedKademliaDiscoveryParams(
bootstrapNodes: conf.kademliaDiscoveryConf.get().bootstrapNodes,
mixPubKey: mixPubKey,
advertiseMix: conf.mixConf.isSome(),
),
node.peerManager,
getMixNodePoolSize = proc(): int {.gcsafe, raises: [].} =
if node.wakuMix.isNil():
0
else:
node.getMixNodePoolSize(),
isNodeStarted = proc(): bool {.gcsafe, raises: [].} =
node.started,
).valueOr:
return err("failed to setup kademlia discovery: " & error)
if conf.mixConf.isSome():
let mixService =
ServiceInfo(id: MixProtocolID, data: @(conf.mixConf.get().mixPubKey))
kadConf.servicesToAdvertise.incl(mixService)
kadConf.servicesToDiscover.incl(mixService.id)
node.mountKademlia(kadConf).isOkOr:
return err("failed to setup service discovery: " & error)
# Register ServicePeersRequest provider
ServicePeersRequest.setProvider(
node.brokerCtx,
proc(serviceId: string): Future[Result[ServicePeersRequest, string]] {.async.} =
let peers = (await node.wakuKademlia.lookupServicePeers(serviceId)).valueOr:
return err("failed call to lookupServicePeers: " & error)
return ok(ServicePeersRequest(serviceId: serviceId, peers: peers)),
).isOkOr:
error "Can't set provider for ServicePeersRequest", error = error
return err("Can't set provider for ServicePeersRequest: " & error)
if conf.storeServiceConf.isSome():
let storeServiceConf = conf.storeServiceConf.get()
@ -452,11 +453,6 @@ proc startNode*(
if conf.relay:
node.peerManager.start()
if not node.wakuKademlia.isNil():
let minMixPeers = if conf.mixConf.isSome(): 4 else: 0
(await node.wakuKademlia.start(minMixPeers = minMixPeers)).isOkOr:
return err("failed to start kademlia discovery: " & error)
return ok()
proc setupNode*(

View File

@ -1,10 +1,14 @@
import
std/[net, options, strutils],
chronicles,
chronos,
libp2p/crypto/crypto,
libp2p/multiaddress,
libp2p/crypto/curve25519,
libp2p/peerid,
libp2p/extended_peer_record,
libp2p/protocols/kademlia/types,
libp2p/protocols/service_discovery/types as sd_types,
secp256k1,
results
@ -12,12 +16,14 @@ import
../waku_rln_relay/rln_relay,
../rest_api/endpoint/builder,
../discovery/waku_discv5,
../discovery/waku_kademlia,
../node/waku_metrics,
../common/logging,
../common/rate_limit/setting,
../waku_enr/capabilities,
./networks_config,
../waku_mix
../waku_mix,
./conf_builder/kademlia_discovery_conf_builder
export RlnRelayConf, RlnRelayCreds, RestServerConf, Discv5Conf, MetricsServerConf
@ -55,10 +61,6 @@ type MixConf* = ref object
mixPubKey*: Curve25519Key
mixnodes*: seq[MixNodePubInfo]
type KademliaDiscoveryConf* = object
bootstrapNodes*: seq[(PeerId, seq[MultiAddress])]
## Bootstrap nodes for extended kademlia discovery.
type StoreServiceConf* {.requiresInit.} = object
dbMigration*: bool
dbURl*: string

View File

@ -134,7 +134,6 @@ type
## Kernel API Relay appHandlers (if any)
subscriptionManager*: SubscriptionManager
wakuMix*: WakuMix
kademliaDiscoveryLoop*: Future[void]
wakuKademlia*: WakuKademlia
ports*: BoundPorts
@ -337,6 +336,29 @@ proc mountMix*(
return err(error.msg)
return ok()
proc mountKademlia*(
node: WakuNode, config: KademliaDiscoveryConf
): Result[void, string] =
if not node.wakuKademlia.isNil():
return err("WakuKademlia already mounted, skipping")
let wk = WakuKademlia.new(
node.switch, node.peerManager, config.bootstrapNodes, config.servicesToAdvertise,
config.servicesToDiscover, config.randomLookupInterval,
config.serviceLookupInterval, node.rng, config.kadDhtConfig, config.discoConfig,
config.clientMode, config.xprPublishing,
).valueOr:
return err("failed to create service discovery: " & error)
node.wakuKademlia = wk
let mountRes = catch:
node.switch.mount(wk.protocol)
mountRes.isOkOr:
return err("failed to mount service discovery: " & error.msg)
return ok()
## Waku Sync
proc mountStoreSync*(
@ -603,6 +625,9 @@ proc start*(node: WakuNode) {.async.} =
node.started = true
if not node.wakuKademlia.isNil():
await node.wakuKademlia.start()
if not node.wakuFilterClient.isNil():
node.wakuFilterClient.registerPushHandler(
proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
@ -629,6 +654,9 @@ proc stop*(node: WakuNode) {.async.} =
node.stopProvidersAndListeners()
if not node.wakuKademlia.isNil():
await node.wakuKademlia.stop()
## NOTE: This will dispatch gossipsub stop to the WakuRelay.stop method override
await node.switch.stop()
@ -650,9 +678,6 @@ proc stop*(node: WakuNode) {.async.} =
not node.wakuPeerExchangeClient.pxLoopHandle.isNil():
await node.wakuPeerExchangeClient.pxLoopHandle.cancelAndWait()
if not node.wakuKademlia.isNil():
await node.wakuKademlia.stop()
if not node.wakuRendezvousClient.isNil():
await node.wakuRendezvousClient.stopWait()

View File

@ -68,7 +68,8 @@ import
./test_waku_switch,
./test_waku_rendezvous,
./test_waku_metadata,
./waku_discv5/test_waku_discv5
./waku_discv5/test_waku_discv5,
./waku_kademlia/test_waku_kademlia
# Waku Keystore test suite
import ./test_waku_keystore_keyfile, ./test_waku_keystore

View File

@ -0,0 +1,203 @@
import
std/[options, sequtils, strutils],
results,
chronos,
chronicles,
testutils/unittests,
libp2p/crypto/crypto as libp2p_keys,
libp2p/crypto/curve25519,
libp2p/[peerid, multiaddress, switch, extended_peer_record],
libp2p/extended_peer_record,
libp2p/protocols/service_discovery/types as sd_types,
libp2p_mix/mix_protocol
import
logos_delivery/waku/discovery/waku_kademlia,
logos_delivery/waku/waku_core/peers,
logos_delivery/waku/node/peer_manager/waku_peer_store
import ../testlib/[wakucore, testasync, assertions, futures, testutils]
import ./utils as kad_utils
suite "Waku Kademlia service discovery":
asyncTest "seed node starts with no bootstrap nodes":
let
switch = newTestSwitch()
wk = kad_utils.newTestKademlia(
switch, servicesToAdvertise = @[ServiceInfo(id: "/seed/svc/1.0.0", data: @[])]
)
await switch.start()
await wk.start()
await sleepAsync(FUTURE_TIMEOUT)
check:
not wk.protocol.isNil()
await wk.stop()
await switch.stop()
suite "extractMixPubKey":
proc validKeyBytes(): seq[byte] =
var b = newSeq[byte](Curve25519KeySize)
for i in 0 ..< Curve25519KeySize:
b[i] = byte(i)
b
test "non-mix service returns none":
let svc = ServiceInfo(id: "/foo/1.0.0", data: validKeyBytes())
check:
extractMixPubKey(svc).isNone()
test "mix service with wrong data length returns none":
let svc = ServiceInfo(id: MixProtocolID, data: @[0u8, 1u8, 2u8])
check:
extractMixPubKey(svc).isNone()
test "mix service with correct length returns some":
let bytes = validKeyBytes()
let svc = ServiceInfo(id: MixProtocolID, data: bytes)
let res = extractMixPubKey(svc)
require:
res.isSome()
let key = res.get()
check:
key.getBytes() == bytes
test "round-trip matches intoCurve25519Key on raw bytes":
let bytes = validKeyBytes()
let svc = ServiceInfo(id: MixProtocolID, data: bytes)
let extracted = extractMixPubKey(svc).get()
let direct = intoCurve25519Key(bytes)
check:
extracted.getBytes() == direct.getBytes()
suite "remotePeerInfoFrom":
proc randomPeerId(): PeerId =
PeerId.init(generateSecp256k1Key()).tryGet()
proc testAddr(port: uint16): MultiAddress =
MultiAddress.init("/ip4/127.0.0.1/tcp/" & $port).tryGet()
proc mixService(data: seq[byte]): ServiceInfo =
ServiceInfo(id: MixProtocolID, data: data)
proc validMixService(): ServiceInfo =
var b = newSeq[byte](Curve25519KeySize)
for i in 0 ..< Curve25519KeySize:
b[i] = byte(i)
mixService(b)
test "empty addresses returns none":
let record = buildExtendedPeerRecord(randomPeerId(), @[])
check:
remotePeerInfoFrom(record).isNone()
test "origin set to PeerOrigin.Kademlia":
let
pid = randomPeerId()
record = buildExtendedPeerRecord(pid, @[testAddr(61600)])
res = remotePeerInfoFrom(record)
require:
res.isSome()
let peerInfo = res.get()
check:
peerInfo.origin == PeerOrigin.Kademlia
peerInfo.peerId == pid
test "mixPubKey extracted from first mix service":
let
pid = randomPeerId()
svc = validMixService()
record = buildExtendedPeerRecord(pid, @[testAddr(61600)], @[svc])
res = remotePeerInfoFrom(record)
require:
res.isSome()
let peerInfo = res.get()
check:
peerInfo.mixPubKey.isSome()
peerInfo.mixPubKey.get().getBytes() == svc.data
test "mixPubKey stays none when no mix service present":
let
pid = randomPeerId()
svc = ServiceInfo(id: "/other/1.0.0", data: @[1u8])
record = buildExtendedPeerRecord(pid, @[testAddr(61600)], @[svc])
res = remotePeerInfoFrom(record)
require:
res.isSome()
check:
res.get().mixPubKey.isNone()
test "addresses mapped correctly":
let
pid = randomPeerId()
addrs = @[testAddr(61600), testAddr(61601), testAddr(61602)]
record = buildExtendedPeerRecord(pid, addrs)
res = remotePeerInfoFrom(record)
require:
res.isSome()
let peerInfo = res.get()
check:
peerInfo.addrs.len == 3
peerInfo.addrs == addrs
test "multiple mix services, first one wins":
let
pid = randomPeerId()
firstBytes = block:
var b = newSeq[byte](Curve25519KeySize)
for i in 0 ..< Curve25519KeySize:
b[i] = byte(i)
b
secondBytes = block:
var b = newSeq[byte](Curve25519KeySize)
for i in 0 ..< Curve25519KeySize:
b[i] = byte(i + 100)
b
record = buildExtendedPeerRecord(
pid, @[testAddr(61600)], @[mixService(firstBytes), mixService(secondBytes)]
)
res = remotePeerInfoFrom(record)
require:
res.isSome()
check:
res.get().mixPubKey.get().getBytes() == firstBytes
test "mix service with bad key length is skipped silently":
let
pid = randomPeerId()
badSvc = mixService(@[0u8, 1u8, 2u8])
record = buildExtendedPeerRecord(pid, @[testAddr(61600)], @[badSvc])
res = remotePeerInfoFrom(record)
require:
res.isSome()
let peerInfo = res.get()
check:
peerInfo.peerId == pid
peerInfo.mixPubKey.isNone()
suite "lookupServicePeers":
asyncTest "returns err when protocol is nil":
let
switch = newTestSwitch()
wk = kad_utils.newTestKademlia(switch)
wk.protocol = nil
let res = await wk.lookupServicePeers("/some/service/1.0.0")
check:
res.isErr()
res.error.contains("service discovery not mounted")
asyncTest "returns ok with empty seq when no advertisements":
let
switch = newTestSwitch()
wk = kad_utils.newTestKademlia(switch)
await switch.start()
await wk.start()
let res = await wk.lookupServicePeers("/nonexistent/service/1.0.0")
check:
res.isOk()
res.value.len == 0
await wk.stop()
await switch.stop()

View File

@ -0,0 +1,50 @@
{.used.}
import std/[options, sets]
import chronos, chronicles, results
import libp2p/[peerid, multiaddress, switch]
import libp2p/extended_peer_record
import libp2p/protocols/service_discovery/types as sd_types
import libp2p/crypto/crypto as libp2p_keys
import
logos_delivery/waku/discovery/waku_kademlia,
logos_delivery/waku/node/peer_manager/peer_manager
import ../testlib/[wakucore, common]
export wakucore, common, peerid, multiaddress, switch, extended_peer_record, sd_types
proc newTestKademlia*(
switch: Switch,
bootstrapNodes: seq[(PeerId, seq[MultiAddress])] = @[],
servicesToAdvertise: seq[ServiceInfo] = @[],
servicesToDiscover: seq[string] = @[],
randomLookupInterval: Duration = 100.milliseconds,
serviceLookupInterval: Duration = 100.milliseconds,
clientMode: bool = false,
xprPublishing: bool = true,
): WakuKademlia =
let peerManager = PeerManager.new(switch)
let wk = WakuKademlia
.new(
switch = switch,
peerManager = peerManager,
bootstrapNodes = bootstrapNodes,
servicesToAdvertise = toHashSet(servicesToAdvertise),
servicesToDiscover = toHashSet(servicesToDiscover),
randomLookupInterval = randomLookupInterval,
serviceLookupInterval = serviceLookupInterval,
rng = rng(),
clientMode = clientMode,
xprPublishing = xprPublishing,
)
.tryGet()
switch.mount(wk.protocol)
wk
proc buildExtendedPeerRecord*(
peerId: PeerId, addrs: seq[MultiAddress], services: seq[ServiceInfo] = @[]
): ExtendedPeerRecord =
ExtendedPeerRecord.init(peerId = peerId, addresses = addrs, services = services)

View File

@ -685,6 +685,18 @@ hence would have reachability issues.""",
name: "kad-bootstrap-node"
.}: seq[string]
kadRandomLookupIntervalSec* {.
desc: "Interval seconds between random kademlia lookups.",
defaultValue: 60,
name: "kad-random-lookup-interval"
.}: uint32
kadServiceLookupIntervalSec* {.
desc: "Interval seconds between service-specific kademlia lookups.",
defaultValue: 60,
name: "kad-service-lookup-interval"
.}: uint32
## websocket config
websocketSupport* {.
desc: "Enable websocket: true|false",
@ -1183,6 +1195,15 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] =
b.kademliaDiscoveryConf.withEnabled(n.enableKadDiscovery.get())
b.kademliaDiscoveryConf.withBootstrapNodes(n.kadBootstrapNodes)
if n.kadRandomLookupIntervalSec > 0:
b.kademliaDiscoveryConf.withRandomLookupInterval(
chronos.seconds(n.kadRandomLookupIntervalSec.int64)
)
if n.kadServiceLookupIntervalSec > 0:
b.kademliaDiscoveryConf.withServiceLookupInterval(
chronos.seconds(n.kadServiceLookupIntervalSec.int64)
)
# Mode-driven configuration overrides
case n.mode
of WakuMode.Core: