Mix usage of service discovery

This commit is contained in:
SionoiS 2026-03-23 13:28:45 -04:00
parent f070992723
commit 8bed6241b0
No known key found for this signature in database
GPG Key ID: C9458A8CB1852951
7 changed files with 150 additions and 150 deletions

View File

@ -266,7 +266,8 @@ type WakuNodeConf* = object
## Circuit-relay config
isRelayClient* {.
desc: """Set the node as a relay-client.
desc:
"""Set the node as a relay-client.
Set it to true for nodes that run behind a NAT or firewall and
hence would have reachability issues.""",
defaultValue: false,
@ -636,12 +637,6 @@ with the drawback of consuming some more bandwidth.""",
name: "mixkey"
.}: Option[string]
mixnodes* {.
desc:
"Multiaddress and mix-key of mix node to be statically specified in format multiaddr:mixPubKey. Argument may be repeated.",
name: "mixnode"
.}: seq[MixNodePubInfo]
# Kademlia Discovery config
enableKadDiscovery* {.
desc:
@ -735,22 +730,6 @@ proc isNumber(x: string): bool =
except ValueError:
result = false
proc parseCmdArg*(T: type MixNodePubInfo, p: string): T =
let elements = p.split(":")
if elements.len != 2:
raise newException(
ValueError, "Invalid format for mix node expected multiaddr:mixPublicKey"
)
let multiaddr = MultiAddress.init(elements[0]).valueOr:
raise newException(ValueError, "Invalid multiaddress format")
if not multiaddr.contains(multiCodec("ip4")).get():
raise newException(
ValueError, "Invalid format for ip address, expected a ipv4 multiaddress"
)
return MixNodePubInfo(
multiaddr: elements[0], pubKey: intoCurve25519Key(ncrutils.fromHex(elements[1]))
)
proc parseCmdArg*(T: type ProtectedShard, p: string): T =
let elements = p.split(":")
if elements.len != 2:
@ -834,22 +813,6 @@ proc readValue*(
except CatchableError:
raise newException(SerializationError, getCurrentExceptionMsg())
proc readValue*(
r: var TomlReader, value: var MixNodePubInfo
) {.raises: [SerializationError].} =
try:
value = parseCmdArg(MixNodePubInfo, r.readValue(string))
except CatchableError:
raise newException(SerializationError, getCurrentExceptionMsg())
proc readValue*(
r: var EnvvarReader, value: var MixNodePubInfo
) {.raises: [SerializationError].} =
try:
value = parseCmdArg(MixNodePubInfo, r.readValue(string))
except CatchableError:
raise newException(SerializationError, getCurrentExceptionMsg())
proc readValue*(
r: var TomlReader, value: var ProtectedShard
) {.raises: [SerializationError].} =
@ -1067,7 +1030,6 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] =
b.storeServiceConf.storeSyncConf.withRelayJitterSec(n.storeSyncRelayJitter)
b.mixConf.withEnabled(n.mix)
b.mixConf.withMixNodes(n.mixnodes)
b.withMix(n.mix)
if n.mixkey.isSome():
b.mixConf.withMixKey(n.mixkey.get())

View File

@ -1,11 +1,13 @@
{.push raises: [].}
import std/[options, sequtils, sugar]
import std/[options, sequtils]
import
chronos,
chronicles,
results,
stew/byteutils,
libp2p/crypto/curve25519,
libp2p/crypto/crypto,
libp2p/protocols/mix/mix_protocol,
libp2p/[peerid, multiaddress, switch],
libp2p/extended_peer_record,
libp2p/protocols/[kademlia, kad_disco],
@ -15,14 +17,14 @@ import
import waku/waku_core, waku/node/peer_manager
logScope:
topics = "waku kademlia discovery"
topics = "waku kademlia"
const DefaultKademliaDiscoveryInterval* = chronos.seconds(10)
type WakuKademlia* = ref object
protocol*: KademliaDiscovery
peerManager: PeerManager
intervalFut: Future[void]
walkIntervalFut: Future[void]
proc toRemotePeerInfo(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
debug "processing kademlia record",
@ -42,22 +44,37 @@ proc toRemotePeerInfo(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
let protocols = record.services.mapIt(it.id)
var mixPubKey = none(Curve25519Key)
for service in record.services:
if service.id != MixProtocolID:
continue
if service.data.len != Curve25519KeySize:
continue
mixPubKey = some(intoCurve25519Key(service.data))
break
return some(
RemotePeerInfo.init(
record.peerId, addrs = addrs, protocols = protocols, origin = PeerOrigin.Kademlia
record.peerId,
addrs = addrs,
protocols = protocols,
origin = PeerOrigin.Kademlia,
mixPubKey = mixPubKey,
)
)
proc runDiscoveryLoop(
wk: WakuKademlia, interval: Duration
self: WakuKademlia, interval: Duration
) {.async: (raises: [CancelledError]).} =
info "kademlia discovery loop started", interval = interval
debug "kademlia discovery loop started", interval = interval
while true:
await sleepAsync(interval)
let res = catch:
await wk.protocol.randomRecords()
await self.protocol.randomRecords()
let records = res.valueOr:
error "kademlia discovery lookup failed", error = res.error.msg
continue
@ -66,15 +83,13 @@ proc runDiscoveryLoop(
let peerInfo = toRemotePeerInfo(record).valueOr:
continue
wk.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
self.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
debug "peer added via kademlia discovery",
debug "peer added via random walk",
peerId = $peerInfo.peerId,
addresses = peerInfo.addrs.mapIt($it),
protocols = peerInfo.protocols
#TODO peer added metric
proc lookup*(
self: WakuKademlia, codec: string
): Future[seq[RemotePeerInfo]] {.async: (raises: []).} =
@ -97,13 +112,12 @@ proc lookup*(
self.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia)
debug "peer added via kademlia discovery",
debug "peer added via service discovery",
service = codec,
peerId = $peerInfo.peerId,
addresses = peerInfo.addrs.mapIt($it),
protocols = peerInfo.protocols
#TODO peer added metric
peerInfos.add(peerInfo)
return peerInfos
@ -116,7 +130,7 @@ proc new*(
providedServices: var seq[ServiceInfo],
): T =
if bootstrapNodes.len == 0:
info "creating kademlia discovery as seed node (no bootstrap nodes)"
debug "creating kademlia discovery as seed node (no bootstrap nodes)"
let kademlia = KademliaDiscovery.new(
switch,
@ -127,34 +141,34 @@ proc new*(
services = providedServices,
)
info "kademlia service discovery created", bootstrapNodes = bootstrapNodes.len
return WakuKademlia(protocol: kademlia, peerManager: peerManager)
proc start*(
self: WakuKademlia, interval: Duration = DefaultKademliaDiscoveryInterval
) {.async: (raises: [CancelledError]).} =
) {.async.} =
if self.protocol.started:
warn "Starting kad-disco twice"
warn "Starting waku kad twice"
return
info "Starting Waku Kademlia"
await self.protocol.start()
self.intervalFut = self.runDiscoveryLoop(interval)
self.walkIntervalFut = self.runDiscoveryLoop(interval)
info "kademlia discovery started"
info "Waku Kademlia Started"
proc stop*(self: WakuKademlia) {.async: (raises: []).} =
proc stop*(self: WakuKademlia) {.async.} =
if not self.protocol.started:
return
info "Stopping kademlia discovery"
info "Stopping Waku Kademlia"
if not self.intervalFut.isNil():
self.intervalFut.cancelSoon()
self.intervalFut = nil
if not self.walkIntervalFut.isNil():
self.walkIntervalFut.cancelSoon()
self.walkIntervalFut = nil
if not self.protocol.isNil():
await self.protocol.stop()
info "Successfully stopped kademlia discovery"
info "Successfully stopped Waku Kademlia"

View File

@ -11,7 +11,6 @@ logScope:
type MixConfBuilder* = object
enabled: Option[bool]
mixKey: Option[string]
mixNodes: seq[MixNodePubInfo]
proc init*(T: type MixConfBuilder): MixConfBuilder =
MixConfBuilder()
@ -22,9 +21,6 @@ proc withEnabled*(b: var MixConfBuilder, enabled: bool) =
proc withMixKey*(b: var MixConfBuilder, mixKey: string) =
b.mixKey = some(mixKey)
proc withMixNodes*(b: var MixConfBuilder, mixNodes: seq[MixNodePubInfo]) =
b.mixNodes = mixNodes
proc build*(b: MixConfBuilder): Result[Option[MixConf], string] =
if not b.enabled.get(false):
return ok(none[MixConf]())
@ -32,12 +28,8 @@ proc build*(b: MixConfBuilder): Result[Option[MixConf], string] =
if b.mixKey.isSome():
let mixPrivKey = intoCurve25519Key(ncrutils.fromHex(b.mixKey.get()))
let mixPubKey = public(mixPrivKey)
return ok(
some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey, mixNodes: b.mixNodes))
)
return ok(some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey)))
else:
let (mixPrivKey, mixPubKey) = generateKeyPair().valueOr:
return err("Generate key pair error: " & $error)
return ok(
some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey, mixNodes: b.mixNodes))
)
return ok(some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey)))

View File

@ -32,6 +32,7 @@ import
../waku_filter_v2,
../waku_peer_exchange,
../discovery/waku_kademlia,
../waku_mix/protocol,
../node/peer_manager,
../node/peer_manager/peer_store/waku_peer_storage,
../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
@ -168,10 +169,10 @@ proc setupProtocols(
if conf.mixConf.isSome():
let mixConf = conf.mixConf.get()
let mixService = ServiceInfo(id: MixProtocolID, data: @(mixConf.mixKey))
let mixService = ServiceInfo(id: MixProtocolID, data: @(mixConf.mixPubKey))
providedServices.add(mixService)
(await node.mountMix(conf.clusterId, mixConf.mixKey, mixConf.mixnodes)).isOkOr:
(await node.mountMix(mixConf.mixKey)).isOkOr:
return err("failed to mount waku mix protocol: " & $error)
if conf.storeServiceConf.isSome():
@ -420,6 +421,10 @@ proc setupProtocols(
node.wakuKademlia = kademlia
# Connect kademlia to mix for peer discovery
if not node.wakuMix.isNil() and not node.wakuKademlia.isNil():
node.wakuMix.setKademlia(node.wakuKademlia)
return ok()
## Start node
@ -471,12 +476,6 @@ proc startNode*(
if conf.relay:
node.peerManager.start()
if not node.wakuKademlia.isNil():
let catchRes = catch:
await node.wakuKademlia.start()
if catchRes.isErr():
return err("failed to start kademlia discovery: " & catchRes.error.msg)
return ok()
proc setupNode*(

View File

@ -50,7 +50,6 @@ type StoreSyncConf* {.requiresInit.} = object
type MixConf* = ref object
mixKey*: Curve25519Key
mixPubKey*: Curve25519Key
mixnodes*: seq[MixNodePubInfo]
type KademliaDiscoveryConf* = object
bootstrapNodes*: seq[(PeerId, seq[MultiAddress])]

View File

@ -297,10 +297,7 @@ proc getMixNodePoolSize*(node: WakuNode): int =
return node.wakuMix.poolSize()
proc mountMix*(
node: WakuNode,
clusterId: uint16,
mixPrivKey: Curve25519Key,
mixnodes: seq[MixNodePubInfo],
node: WakuNode, mixPrivKey: Curve25519Key
): Future[Result[void, string]] {.async.} =
info "mounting mix protocol", nodeId = node.info #TODO log the config used
@ -312,10 +309,13 @@ proc mountMix*(
info "local addr", localaddr = localaddrStr
node.wakuMix = WakuMix.new(
localaddrStr, node.peerManager, clusterId, mixPrivKey, mixnodes
mixPrivKey = mixPrivKey,
nodeAddr = localaddrStr,
switch = node.switch,
wakuKademlia = node.wakuKademlia,
).valueOr:
error "Waku Mix protocol initialization failed", err = error
return
return err("Waku Mix protocol initialization failed: " & error)
#TODO: should we do the below only for exit node? Also, what if multiple protocols use mix?
node.wakuMix.registerDestReadBehavior(WakuLightPushCodec, readLp(int(-1)))
let catchRes = catch:
@ -638,6 +638,9 @@ proc stop*(node: WakuNode) {.async.} =
if not node.wakuRendezvousClient.isNil():
await node.wakuRendezvousClient.stopWait()
if not node.wakuKademlia.isNil():
await node.wakuKademlia.stop()
node.started = false
proc isReady*(node: WakuNode): Future[bool] {.async: (raises: [Exception]).} =

View File

@ -10,98 +10,129 @@ import
libp2p/protocols/mix/mix_protocol,
libp2p/protocols/mix/mix_metrics,
libp2p/protocols/mix/delay_strategy,
libp2p/[multiaddress, peerid],
libp2p/[multiaddress, peerid, switch],
libp2p/extended_peer_record,
eth/common/keys
import
waku/node/peer_manager,
waku/waku_core,
waku/waku_enr,
waku/node/peer_manager/waku_peer_store
waku/node/peer_manager/waku_peer_store,
waku/discovery/waku_kademlia
logScope:
topics = "waku mix"
const minMixPoolSize = 4
const
MinimumMixPoolSize = 4
DefaultMixPoolMaintenanceInterval = chronos.seconds(10)
type
WakuMix* = ref object of MixProtocol
peerManager*: PeerManager
clusterId: uint16
pubKey*: Curve25519Key
type WakuMix* = ref object of MixProtocol
pubKey*: Curve25519Key
targetMixPoolSize: int
currentMixPoolSize: int
maintenanceInterval: Duration
maintenanceIntervalFut: Future[void]
wakuKademlia: WakuKademlia
WakuMixResult*[T] = Result[T, string]
proc poolSize*(self: WakuMix): int =
if self.nodePool.isNil():
0
else:
self.nodePool.len()
MixNodePubInfo* = object
multiAddr*: string
pubKey*: Curve25519Key
proc mixPoolMaintenance(
self: WakuMix, interval: Duration
) {.async: (raises: [CancelledError]).} =
debug "mix pool maintenance loop started", interval = interval
proc processBootNodes(
bootnodes: seq[MixNodePubInfo], peermgr: PeerManager, mix: WakuMix
) =
var count = 0
for node in bootnodes:
let pInfo = parsePeerInfo(node.multiAddr).valueOr:
error "Failed to get peer id from multiaddress: ",
error = error, multiAddr = $node.multiAddr
continue
let peerId = pInfo.peerId
var peerPubKey: crypto.PublicKey
if not peerId.extractPublicKey(peerPubKey):
warn "Failed to extract public key from peerId, skipping node", peerId = peerId
while true:
await sleepAsync(interval)
self.currentMixPoolSize = self.poolSize()
mix_pool_size.set(self.currentMixPoolSize.int64)
if self.currentMixPoolSize >= self.targetMixPoolSize:
continue
if peerPubKey.scheme != PKScheme.Secp256k1:
warn "Peer public key is not Secp256k1, skipping node",
peerId = peerId, scheme = peerPubKey.scheme
# Skip discovery if kademlia not available
if self.wakuKademlia.isNil():
debug "kademlia not available for mix peer discovery"
continue
let multiAddr = MultiAddress.init(node.multiAddr).valueOr:
error "Failed to parse multiaddress", multiAddr = node.multiAddr, error = error
continue
trace "mix node pool below threshold, performing targeted lookup",
currentPoolSize = self.currentMixPoolSize, threshold = self.targetMixPoolSize
let mixPubInfo = MixPubInfo.init(peerId, multiAddr, node.pubKey, peerPubKey.skkey)
mix.nodePool.add(mixPubInfo)
count.inc()
let mixPeers = await self.wakuKademlia.lookup(MixProtocolID)
peermgr.addPeer(
RemotePeerInfo.init(peerId, @[multiAddr], mixPubKey = some(node.pubKey))
)
mix_pool_size.set(count)
info "using mix bootstrap nodes ", count = count
trace "mix peer discovery completed", discoveredPeers = mixPeers.len
proc new*(
T: typedesc[WakuMix],
nodeAddr: string,
peermgr: PeerManager,
clusterId: uint16,
mixPrivKey: Curve25519Key,
bootnodes: seq[MixNodePubInfo],
): WakuMixResult[T] =
nodeAddr: string,
switch: Switch,
targetMixPoolSize: int = MinimumMixPoolSize,
maintenanceInterval: Duration = DefaultMixPoolMaintenanceInterval,
wakuKademlia: WakuKademlia = nil,
): Result[T, string] =
let mixPubKey = public(mixPrivKey)
info "mixPubKey", mixPubKey = mixPubKey
debug "Mix Public Key", mixPubKey = mixPubKey
let nodeMultiAddr = MultiAddress.init(nodeAddr).valueOr:
return err("failed to parse mix node address: " & $nodeAddr & ", error: " & error)
let localMixNodeInfo = initMixNodeInfo(
peermgr.switch.peerInfo.peerId, nodeMultiAddr, mixPubKey, mixPrivKey,
peermgr.switch.peerInfo.publicKey.skkey, peermgr.switch.peerInfo.privateKey.skkey,
switch.peerInfo.peerId, nodeMultiAddr, mixPubKey, mixPrivKey,
switch.peerInfo.publicKey.skkey, switch.peerInfo.privateKey.skkey,
)
var m = WakuMix(peerManager: peermgr, clusterId: clusterId, pubKey: mixPubKey)
procCall MixProtocol(m).init(
let mix = WakuMix(
pubKey: mixPubKey,
targetMixPoolSize: targetMixPoolSize,
currentMixPoolSize: 0,
maintenanceInterval: maintenanceInterval,
maintenanceIntervalFut: nil,
wakuKademlia: wakuKademlia,
)
procCall MixProtocol(mix).init(
localMixNodeInfo,
peermgr.switch,
switch,
delayStrategy =
ExponentialDelayStrategy.new(meanDelayMs = 50, rng = crypto.newRng()),
)
processBootNodes(bootnodes, peermgr, m)
return ok(mix)
if m.nodePool.len < minMixPoolSize:
warn "publishing with mix won't work until atleast 3 mix nodes in node pool"
return ok(m)
proc setKademlia*(self: WakuMix, wakuKademlia: WakuKademlia) =
self.wakuKademlia = wakuKademlia
proc poolSize*(mix: WakuMix): int =
mix.nodePool.len
method start*(self: WakuMix) {.async.} =
if self.started:
warn "Starting Waku Mix twice"
return
# Mix Protocol
info "Starting Waku Mix"
await procCall start(MixProtocol(self))
self.maintenanceIntervalFut = self.mixPoolMaintenance(self.maintenanceInterval)
info "Waku Mix Started"
method stop*(self: WakuMix) {.async.} =
if not self.started:
return
info "Stopping Waku Mix"
if not self.maintenanceIntervalFut.isNil():
self.maintenanceIntervalFut.cancelSoon()
self.maintenanceIntervalFut = nil
await procCall stop(MixProtocol(self))
info "Successfully stopped Waku Mix"