Mix usage of service discovery

This commit is contained in:
SionoiS 2026-03-23 13:28:45 -04:00
parent a856ebc22a
commit ac1589e02c
No known key found for this signature in database
GPG Key ID: C9458A8CB1852951
7 changed files with 130 additions and 127 deletions

View File

@ -263,7 +263,8 @@ type WakuNodeConf* = object
## Circuit-relay config
isRelayClient* {.
desc: """Set the node as a relay-client.
desc:
"""Set the node as a relay-client.
Set it to true for nodes that run behind a NAT or firewall and
hence would have reachability issues.""",
defaultValue: false,
@ -631,12 +632,6 @@ with the drawback of consuming some more bandwidth.""",
name: "mixkey"
.}: Option[string]
mixnodes* {.
desc:
"Multiaddress and mix-key of mix node to be statically specified in format multiaddr:mixPubKey. Argument may be repeated.",
name: "mixnode"
.}: seq[MixNodePubInfo]
# Kademlia Discovery config
enableKadDiscovery* {.
desc:
@ -730,22 +725,6 @@ proc isNumber(x: string): bool =
except ValueError:
result = false
proc parseCmdArg*(T: type MixNodePubInfo, p: string): T =
let elements = p.split(":")
if elements.len != 2:
raise newException(
ValueError, "Invalid format for mix node expected multiaddr:mixPublicKey"
)
let multiaddr = MultiAddress.init(elements[0]).valueOr:
raise newException(ValueError, "Invalid multiaddress format")
if not multiaddr.contains(multiCodec("ip4")).get():
raise newException(
ValueError, "Invalid format for ip address, expected a ipv4 multiaddress"
)
return MixNodePubInfo(
multiaddr: elements[0], pubKey: intoCurve25519Key(ncrutils.fromHex(elements[1]))
)
proc parseCmdArg*(T: type ProtectedShard, p: string): T =
let elements = p.split(":")
if elements.len != 2:
@ -829,22 +808,6 @@ proc readValue*(
except CatchableError:
raise newException(SerializationError, getCurrentExceptionMsg())
proc readValue*(
r: var TomlReader, value: var MixNodePubInfo
) {.raises: [SerializationError].} =
try:
value = parseCmdArg(MixNodePubInfo, r.readValue(string))
except CatchableError:
raise newException(SerializationError, getCurrentExceptionMsg())
proc readValue*(
r: var EnvvarReader, value: var MixNodePubInfo
) {.raises: [SerializationError].} =
try:
value = parseCmdArg(MixNodePubInfo, r.readValue(string))
except CatchableError:
raise newException(SerializationError, getCurrentExceptionMsg())
proc readValue*(
r: var TomlReader, value: var ProtectedShard
) {.raises: [SerializationError].} =
@ -1062,7 +1025,6 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] =
b.storeServiceConf.storeSyncConf.withRelayJitterSec(n.storeSyncRelayJitter)
b.mixConf.withEnabled(n.mix)
b.mixConf.withMixNodes(n.mixnodes)
b.withMix(n.mix)
if n.mixkey.isSome():
b.mixConf.withMixKey(n.mixkey.get())

View File

@ -1,11 +1,13 @@
{.push raises: [].}
import std/[options, sequtils, sugar]
import std/[options, sequtils]
import
chronos,
chronicles,
results,
stew/byteutils,
libp2p/crypto/curve25519,
libp2p/crypto/crypto,
libp2p/protocols/mix/mix_protocol,
libp2p/[peerid, multiaddress, switch],
libp2p/extended_peer_record,
libp2p/protocols/[kademlia, kad_disco],
@ -42,9 +44,24 @@ proc toRemotePeerInfo(record: ExtendedPeerRecord): Option[RemotePeerInfo] =
let protocols = record.services.mapIt(it.id)
var mixPubKey = none(Curve25519Key)
for service in record.services:
if service.id != MixProtocolID:
continue
if service.data.len != Curve25519KeySize:
continue
mixPubKey = some(intoCurve25519Key(service.data))
break
return some(
RemotePeerInfo.init(
record.peerId, addrs = addrs, protocols = protocols, origin = PeerOrigin.Kademlia
record.peerId,
addrs = addrs,
protocols = protocols,
origin = PeerOrigin.Kademlia,
mixPubKey = mixPubKey,
)
)

View File

@ -11,7 +11,6 @@ logScope:
type MixConfBuilder* = object
enabled: Option[bool]
mixKey: Option[string]
mixNodes: seq[MixNodePubInfo]
proc init*(T: type MixConfBuilder): MixConfBuilder =
MixConfBuilder()
@ -22,9 +21,6 @@ proc withEnabled*(b: var MixConfBuilder, enabled: bool) =
proc withMixKey*(b: var MixConfBuilder, mixKey: string) =
b.mixKey = some(mixKey)
proc withMixNodes*(b: var MixConfBuilder, mixNodes: seq[MixNodePubInfo]) =
b.mixNodes = mixNodes
proc build*(b: MixConfBuilder): Result[Option[MixConf], string] =
if not b.enabled.get(false):
return ok(none[MixConf]())
@ -32,12 +28,8 @@ proc build*(b: MixConfBuilder): Result[Option[MixConf], string] =
if b.mixKey.isSome():
let mixPrivKey = intoCurve25519Key(ncrutils.fromHex(b.mixKey.get()))
let mixPubKey = public(mixPrivKey)
return ok(
some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey, mixNodes: b.mixNodes))
)
return ok(some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey)))
else:
let (mixPrivKey, mixPubKey) = generateKeyPair().valueOr:
return err("Generate key pair error: " & $error)
return ok(
some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey, mixNodes: b.mixNodes))
)
return ok(some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey)))

View File

@ -32,6 +32,7 @@ import
../waku_filter_v2,
../waku_peer_exchange,
../discovery/waku_kademlia,
../waku_mix/protocol,
../node/peer_manager,
../node/peer_manager/peer_store/waku_peer_storage,
../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
@ -171,7 +172,7 @@ proc setupProtocols(
let mixService = ServiceInfo(id: MixProtocolID, data: @(mixConf.mixKey))
providedServices.add(mixService)
(await node.mountMix(conf.clusterId, mixConf.mixKey, mixConf.mixnodes)).isOkOr:
(await node.mountMix(mixConf.mixKey)).isOkOr:
return err("failed to mount waku mix protocol: " & $error)
if conf.storeServiceConf.isSome():
@ -420,6 +421,10 @@ proc setupProtocols(
node.wakuKademlia = kademlia
# Connect kademlia to mix for peer discovery
if not node.wakuMix.isNil() and not node.wakuKademlia.isNil():
node.wakuMix.setKademlia(node.wakuKademlia)
return ok()
## Start node

View File

@ -50,7 +50,6 @@ type StoreSyncConf* {.requiresInit.} = object
type MixConf* = ref object
mixKey*: Curve25519Key
mixPubKey*: Curve25519Key
mixnodes*: seq[MixNodePubInfo]
type KademliaDiscoveryConf* = object
bootstrapNodes*: seq[(PeerId, seq[MultiAddress])]

View File

@ -297,10 +297,7 @@ proc getMixNodePoolSize*(node: WakuNode): int =
return node.wakuMix.poolSize()
proc mountMix*(
node: WakuNode,
clusterId: uint16,
mixPrivKey: Curve25519Key,
mixnodes: seq[MixNodePubInfo],
node: WakuNode, mixPrivKey: Curve25519Key
): Future[Result[void, string]] {.async.} =
info "mounting mix protocol", nodeId = node.info #TODO log the config used
@ -312,10 +309,13 @@ proc mountMix*(
info "local addr", localaddr = localaddrStr
node.wakuMix = WakuMix.new(
localaddrStr, node.peerManager, clusterId, mixPrivKey, mixnodes
mixPrivKey = mixPrivKey,
nodeAddr = localaddrStr,
switch = node.switch,
wakuKademlia = node.wakuKademlia,
).valueOr:
error "Waku Mix protocol initialization failed", err = error
return
return err("Waku Mix protocol initialization failed: " & error)
#TODO: should we do the below only for exit node? Also, what if multiple protocols use mix?
node.wakuMix.registerDestReadBehavior(WakuLightPushCodec, readLp(int(-1)))
let catchRes = catch:
@ -341,11 +341,12 @@ proc mountStoreSync*(
let pubsubTopics = shards.mapIt($RelayShard(clusterId: cluster, shardId: it))
let recon = ?await SyncReconciliation.new(
pubsubTopics, contentTopics, node.peerManager, node.wakuArchive,
storeSyncRange.seconds, storeSyncInterval.seconds, storeSyncRelayJitter.seconds,
idsChannel, wantsChannel, needsChannel,
)
let recon =
?await SyncReconciliation.new(
pubsubTopics, contentTopics, node.peerManager, node.wakuArchive,
storeSyncRange.seconds, storeSyncInterval.seconds, storeSyncRelayJitter.seconds,
idsChannel, wantsChannel, needsChannel,
)
node.wakuStoreReconciliation = recon
@ -583,7 +584,7 @@ proc start*(node: WakuNode) {.async.} =
await node.startRelay()
if not node.wakuMix.isNil():
node.wakuMix.start()
await node.wakuMix.start()
if not node.wakuMetadata.isNil():
node.wakuMetadata.start()

View File

@ -10,104 +10,131 @@ import
libp2p/protocols/mix/mix_protocol,
libp2p/protocols/mix/mix_metrics,
libp2p/protocols/mix/delay_strategy,
libp2p/[multiaddress, peerid],
libp2p/[multiaddress, peerid, switch],
libp2p/extended_peer_record,
eth/common/keys
import
waku/node/peer_manager,
waku/waku_core,
waku/waku_enr,
waku/node/peer_manager/waku_peer_store
waku/node/peer_manager/waku_peer_store,
waku/discovery/waku_kademlia
logScope:
topics = "waku mix"
const minMixPoolSize = 4
const
MinimumMixPoolSize = 4
DefaultMixPoolMaintenanceInterval = chronos.seconds(10)
type
WakuMix* = ref object of MixProtocol
peerManager*: PeerManager
clusterId: uint16
pubKey*: Curve25519Key
type WakuMix* = ref object of MixProtocol
pubKey*: Curve25519Key
targetMixPoolSize: int
currentMixPoolSize: int
maintenanceInterval: Duration
maintenanceIntervalFut: Future[void]
wakuKademlia: WakuKademlia
WakuMixResult*[T] = Result[T, string]
proc mixPoolMaintenance(
self: WakuMix, interval: Duration
) {.async: (raises: [CancelledError]).} =
## Periodic maintenance of the mix pool
MixNodePubInfo* = object
multiAddr*: string
pubKey*: Curve25519Key
while true:
await sleepAsync(interval)
proc processBootNodes(
bootnodes: seq[MixNodePubInfo], peermgr: PeerManager, mix: WakuMix
) =
var count = 0
for node in bootnodes:
let pInfo = parsePeerInfo(node.multiAddr).valueOr:
error "Failed to get peer id from multiaddress: ",
error = error, multiAddr = $node.multiAddr
continue
let peerId = pInfo.peerId
var peerPubKey: crypto.PublicKey
if not peerId.extractPublicKey(peerPubKey):
warn "Failed to extract public key from peerId, skipping node", peerId = peerId
# Update current pool size from nodePool
self.currentMixPoolSize = self.nodePool.len()
mix_pool_size.set(self.currentMixPoolSize.int64)
if self.currentMixPoolSize >= self.targetMixPoolSize:
continue
if peerPubKey.scheme != PKScheme.Secp256k1:
warn "Peer public key is not Secp256k1, skipping node",
peerId = peerId, scheme = peerPubKey.scheme
# Skip discovery if kademlia not available
if self.wakuKademlia.isNil():
debug "kademlia not available for mix peer discovery"
continue
let multiAddr = MultiAddress.init(node.multiAddr).valueOr:
error "Failed to parse multiaddress", multiAddr = node.multiAddr, error = error
continue
debug "mix node pool below threshold, performing targeted lookup",
currentPoolSize = self.currentMixPoolSize, threshold = self.targetMixPoolSize
let mixPubInfo = MixPubInfo.init(peerId, multiAddr, node.pubKey, peerPubKey.skkey)
mix.nodePool.add(mixPubInfo)
count.inc()
let mixPeers = await self.wakuKademlia.lookup(MixProtocolID)
peermgr.addPeer(
RemotePeerInfo.init(peerId, @[multiAddr], mixPubKey = some(node.pubKey))
)
mix_pool_size.set(count)
info "using mix bootstrap nodes ", count = count
# Pool size will be updated on next iteration
info "mix peer discovery completed", discoveredPeers = mixPeers.len
proc new*(
T: typedesc[WakuMix],
nodeAddr: string,
peermgr: PeerManager,
clusterId: uint16,
mixPrivKey: Curve25519Key,
bootnodes: seq[MixNodePubInfo],
): WakuMixResult[T] =
nodeAddr: string,
switch: Switch,
targetMixPoolSize: int = MinimumMixPoolSize,
maintenanceInterval: Duration = DefaultMixPoolMaintenanceInterval,
wakuKademlia: WakuKademlia = nil,
): Result[T, string] =
let mixPubKey = public(mixPrivKey)
info "mixPubKey", mixPubKey = mixPubKey
let nodeMultiAddr = MultiAddress.init(nodeAddr).valueOr:
return err("failed to parse mix node address: " & $nodeAddr & ", error: " & error)
let localMixNodeInfo = initMixNodeInfo(
peermgr.switch.peerInfo.peerId, nodeMultiAddr, mixPubKey, mixPrivKey,
peermgr.switch.peerInfo.publicKey.skkey, peermgr.switch.peerInfo.privateKey.skkey,
switch.peerInfo.peerId, nodeMultiAddr, mixPubKey, mixPrivKey,
switch.peerInfo.publicKey.skkey, switch.peerInfo.privateKey.skkey,
)
var m = WakuMix(peerManager: peermgr, clusterId: clusterId, pubKey: mixPubKey)
procCall MixProtocol(m).init(
let mix = WakuMix(
pubKey: mixPubKey,
targetMixPoolSize: targetMixPoolSize,
currentMixPoolSize: 0,
maintenanceInterval: maintenanceInterval,
maintenanceIntervalFut: nil,
wakuKademlia: wakuKademlia,
)
procCall MixProtocol(mix).init(
localMixNodeInfo,
peermgr.switch,
switch,
delayStrategy =
ExponentialDelayStrategy.new(meanDelayMs = 50, rng = crypto.newRng()),
)
processBootNodes(bootnodes, peermgr, m)
return ok(mix)
if m.nodePool.len < minMixPoolSize:
warn "publishing with mix won't work until atleast 3 mix nodes in node pool"
return ok(m)
proc setKademlia*(self: WakuMix, wakuKademlia: WakuKademlia) =
self.wakuKademlia = wakuKademlia
proc poolSize*(mix: WakuMix): int =
mix.nodePool.len
proc poolSize*(self: WakuMix): int =
if self.nodePool.isNil():
0
else:
self.nodePool.len()
method start*(mix: WakuMix) =
info "starting waku mix protocol"
method start*(self: WakuMix) {.async.} =
if self.started:
warn "Starting Waku Mix twice"
return
method stop*(mix: WakuMix) {.async.} =
discard
info "Starting Waku Mix"
# Mix Protocol
await procCall start(MixProtocol(self))
self.maintenanceIntervalFut = self.mixPoolMaintenance(self.maintenanceInterval)
info "Waku Mix Started"
method stop*(self: WakuMix) {.async.} =
if not self.started:
return
info "Stopping Waku Mix"
if not self.maintenanceIntervalFut.isNil():
self.maintenanceIntervalFut.cancelSoon()
self.maintenanceIntervalFut = nil
await procCall stop(MixProtocol(self))
info "Successfully stopped Waku Mix"