maintain mix node pool based on discovered nodes

This commit is contained in:
Prem Chaitanya Prathi 2025-03-13 13:24:07 +05:30
parent 1119ef0f64
commit dd713154b9
3 changed files with 108 additions and 9 deletions

View File

@ -14,6 +14,7 @@ import
eth/p2p/discoveryv5/enr,
libp2p/crypto/crypto,
libp2p/crypto/curve25519,
libp2p/[multiaddress, multicodec],
libp2p/protocols/ping,
libp2p/protocols/pubsub/gossipsub,
libp2p/protocols/pubsub/rpc/messages,
@ -27,7 +28,6 @@ import
../../vendor/mix/src/curve25519,
../../vendor/mix/src/protocol
import
../waku_core,
../waku_core/topics/sharding,
@ -56,7 +56,10 @@ import
../waku_rln_relay,
./net_config,
./peer_manager,
../common/rate_limit/setting
../common/rate_limit/setting,
../discovery/autonat_service,
../common/nimchronos,
../waku_enr/mix
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
declarePublicHistogram waku_histogram_message_size,
@ -212,7 +215,7 @@ proc mountSharding*(
node.wakuSharding = Sharding(clusterId: clusterId, shardCountGenZero: shardCount)
return ok()
proc getBootStrapMixNodes(node: WakuNode, exceptPeerID: PeerId): Table[PeerId, MixPubInfo] =
#[ proc getBootStrapMixNodes(node: WakuNode, exceptPeerID: PeerId): Table[PeerId, MixPubInfo] =
var mixNodes = initTable[PeerId, MixPubInfo]()
# MixNode Multiaddrs and PublicKeys:
let bootNodesMultiaddrs = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o",
@ -239,10 +242,86 @@ proc getBootStrapMixNodes(node: WakuNode, exceptPeerID: PeerId): Table[PeerId, M
mixNodes[peerId] = mixNodePubInfo
info "using mix bootstrap nodes ", bootNodes = mixNodes
return mixNodes
]#
#TODO: Ideally these procs should be moved out into mix specific file, but keeping it here for now.
proc mixPoolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool =
if peer.enr.isNone():
debug "peer has no ENR", peer = $peer
return false
# Mix Protocol
proc mountMix*(node: WakuNode, mixPrivKey: string): Future[Result[void, string]] {.async.} =
if cluster.isSome() and peer.enr.get().isClusterMismatched(cluster.get()):
debug "peer has mismatching cluster", peer = $peer
return false
#TODO: Filter if mix is enabled
return true
proc addPeerId*(multiaddr: MultiAddress, peerId: PeerId): MultiAddress =
if multiaddr.contains(multiCodec("p2p")).get():
return multiaddr
var maddrStr = multiaddr.toString().valueOr:
error "Failed to convert multiaddress to string.", err = error
return multiaddr
maddrStr.add("/p2p/" & $peerId)
var cleanAddr = MultiAddress.init(maddrStr).valueOr:
error "Failed to convert string to multiaddress.", err = error
return multiaddr
return cleanAddr
proc populateMixNodePool*(node: WakuNode) {.async.} =
var cluster: uint16
let enrRes = node.enr.toTyped()
if enrRes.isOk():
let shardingRes = enrRes.get().relaySharding()
if shardingRes.isSome():
let relayShard = shardingRes.get()
cluster = relayShard.clusterID
else:
error "could not get cluster from ENR", error = enrRes.error
# populate only peers that i) are reachable ii) share cluster iii) support mix
let remotePeers = node.peerManager.wakuPeerStore.getReachablePeers().filterIt(
mixPoolFilter(some(cluster), it)
)
var mixNodes = initTable[PeerId, MixPubInfo]()
for i in 0 ..< min(remotePeers.len, 100):
let remotePeerENR = remotePeers[i].enr.get()
# TODO: use the most exposed/external multiaddr of the peer, right now using the first
let maddrWithPeerId =
toString(addPeerId(remotePeers[i].addrs[0], remotePeers[i].peerId))
debug "remote peer ENR",
peerId = remotePeers[i].peerId, enr = remotePeerENR, maddr = maddrWithPeerId
let peerMixPubKey = mixKey(remotePeerENR).get()
let mixNodePubInfo =
createMixPubInfo(maddrWithPeerId.value, intoCurve25519Key(peerMixPubKey))
mixNodes[remotePeers[i].peerId] = mixNodePubInfo
# set the mix node pool
node.mix.setNodePool(mixNodes)
debug "mix node pool updated", poolSize = node.mix.getNodePoolSize()
return
proc startMixNodePoolMgr*(node: WakuNode) {.async.} =
# try more aggressively to populate the pool at startup
var attempts = 50
while node.mix.getNodePoolSize() < 100 and attempts > 0:
attempts -= 1
discard node.populateMixNodePool()
await sleepAsync(1.seconds)
heartbeat "Updating mix node pool", 10.minutes:
discard node.populateMixNodePool()
# Mix Protocol
proc mountMix*(
node: WakuNode, mixPrivKey: string
): Future[Result[void, string]] {.async.} =
info "mounting mix protocol", nodeId = node.info #TODO log the config used
info "mixPrivKey", mixPrivKey = mixPrivKey
@ -254,11 +333,16 @@ proc mountMix*(node: WakuNode, mixPrivKey: string): Future[Result[void, string]]
info "local addr", localaddr = localaddrStr
let localMixNodeInfo = initMixNodeInfo(
localaddrStr & "/p2p/" & $node.peerId, mixPubKey, mixKey, node.switch.peerInfo.publicKey.skkey,
localaddrStr & "/p2p/" & $node.peerId,
mixPubKey,
mixKey,
node.switch.peerInfo.publicKey.skkey,
node.switch.peerInfo.privateKey.skkey,
)
let protoRes = MixProtocol.initMix(localMixNodeInfo, node.switch, node.getBootStrapMixNodes(node.peerId))
# TODO: Pass bootnodes from config,
# TODO : ideally mix should not be marked ready until certain min pool of mixNodes are discovered
let protoRes =
MixProtocol.initMix(localMixNodeInfo, node.switch, initTable[PeerId, MixPubInfo]())
if protoRes.isErr:
error "Mix protocol initialization failed", err = protoRes.error
return
@ -269,6 +353,8 @@ proc mountMix*(node: WakuNode, mixPrivKey: string): Future[Result[void, string]]
if catchRes.isErr():
return err(catchRes.error.msg)
discard startMixNodePoolMgr(node)
return ok()
## Waku Sync

View File

@ -3,6 +3,7 @@
import
std/[options, bitops, sequtils, net, tables], results, eth/keys, libp2p/crypto/crypto
import ../common/enr, ../waku_core/codecs
import ../../vendor/mix/src/mix_protocol
const CapabilitiesEnrField* = "waku2"
@ -28,10 +29,12 @@ const capabilityToCodec = {
Capabilities.Filter: WakuFilterSubscribeCodec,
Capabilities.Lightpush: WakuLightPushCodec,
Capabilities.Sync: WakuReconciliationCodec,
Capabilities.Mix: MixProtocolID,
}.toTable
func init*(
T: type CapabilitiesBitfield, lightpush, filter, store, relay, sync, mix: bool = false
T: type CapabilitiesBitfield,
lightpush, filter, store, relay, sync, mix: bool = false,
): T =
## Creates an waku2 ENR flag bit field according to RFC 31 (https://rfc.vac.dev/spec/31/)
var bitfield: uint8

View File

@ -20,3 +20,13 @@ func mixKey*(record: TypedRecord): Option[seq[byte]] =
if field.isNone():
return none(seq[byte])
return field
func mixKey*(record: Record): Option[seq[byte]] =
let recordRes = record.toTyped()
if recordRes.isErr():
return none(seq[byte])
let field = recordRes.value.tryGet(MixKeyEnrField, seq[byte])
if field.isNone():
return none(seq[byte])
return field