update lightpush-mix example

This commit is contained in:
Prem Chaitanya Prathi 2025-03-13 16:05:51 +05:30
parent 4072fa7a50
commit 883db24132
2 changed files with 20 additions and 11 deletions

View File

@ -82,6 +82,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
).isOkOr:
error "failed to mount waku mix protocol: ", error = $error
return
discard node.setMixBootStrapNodes()
let destPeerId = PeerId.init("16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o").valueOr:
error "Failed to initialize PeerId", err = error
@ -97,8 +98,11 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
node.peerManager.start()
notice "publisher service started"
while true:
let text = "hi there i'm a publisher using mix"
var numMsgs = 2
var i = 0
while i < numMsgs:
i = i + 1
let text = "hi there i'm a publisher using mix, this is msg number " & $i
let message = WakuMessage(
payload: toBytes(text), # content of the message
contentTopic: LightpushContentTopic, # content topic to publish to
@ -118,8 +122,8 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
else:
error "failed to publish message", error = res.error
await sleepAsync(5000)
break
await sleepAsync(500)
when isMainModule:
let rng = crypto.newRng()

View File

@ -219,7 +219,7 @@ proc mountSharding*(
node.wakuSharding = Sharding(clusterId: clusterId, shardCountGenZero: shardCount)
return ok()
#[ proc getBootStrapMixNodes(node: WakuNode, exceptPeerID: PeerId): Table[PeerId, MixPubInfo] =
proc getBootStrapMixNodes*(node: WakuNode): Table[PeerId, MixPubInfo] =
var mixNodes = initTable[PeerId, MixPubInfo]()
# MixNode Multiaddrs and PublicKeys:
let bootNodesMultiaddrs = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o",
@ -239,20 +239,20 @@ proc mountSharding*(
if peerIdRes.isErr:
error "Failed to get peer id from multiaddress: " , error = peerIdRes.error
let peerId = peerIdRes.get()
if peerID == exceptPeerID:
continue
#if (not peerID == nil) and peerID == exceptPeerID:
# continue
let mixNodePubInfo = createMixPubInfo(mixNodeMultiaddr, intoCurve25519Key(ncrutils.fromHex(bootNodesMixPubKeys[index])))
mixNodes[peerId] = mixNodePubInfo
info "using mix bootstrap nodes ", bootNodes = mixNodes
return mixNodes
]#
#TODO: Ideally these procs should be moved out into mix specific file, but keeping it here for now.
proc mixPoolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool =
if peer.enr.isNone():
debug "peer has no ENR", peer = $peer
trace "peer has no ENR", peer = $peer
return false
if cluster.isSome() and peer.enr.get().isClusterMismatched(cluster.get()):
@ -297,7 +297,7 @@ proc populateMixNodePool*(node: WakuNode){.async} =
let remotePeerENR = remotePeers[i].enr.get()
# TODO: use the most exposed/external multiaddr of the peer, right now using the first
let maddrWithPeerId = toString(addPeerId(remotePeers[i].addrs[0],remotePeers[i].peerId))
debug "remote peer ENR", peerId = remotePeers[i].peerId, enr = remotePeerENR, maddr = maddrWithPeerId
trace "remote peer ENR", peerId = remotePeers[i].peerId, enr = remotePeerENR, maddr = maddrWithPeerId
let peerMixPubKey = mixKey(remotePeerENR).get()
let mixNodePubInfo = createMixPubInfo(maddrWithPeerId.value, intoCurve25519Key(peerMixPubKey))
@ -305,21 +305,26 @@ proc populateMixNodePool*(node: WakuNode){.async} =
# set the mix node pool
node.mix.setNodePool(mixNodes)
debug "mix node pool updated", poolSize = node.mix.getNodePoolSize()
trace "mix node pool updated", poolSize = node.mix.getNodePoolSize()
return
proc startMixNodePoolMgr*(node: WakuNode ){.async} =
# try more aggressively to populate the pool at startup
var attempts = 50
# TODO: make initial pool size configurable
while node.mix.getNodePoolSize() < 100 and attempts > 0:
attempts -= 1
discard node.populateMixNodePool()
await sleepAsync(1.seconds)
# TODO: make interval configurable
heartbeat "Updating mix node pool", 10.minutes:
discard node.populateMixNodePool()
proc setMixBootStrapNodes*(node: WakuNode,){.async}=
node.mix.setNodePool(node.getBootStrapMixNodes())
# Mix Protocol
proc mountMix*(node: WakuNode, mixPrivKey: string): Future[Result[void, string]] {.async.} =
info "mounting mix protocol", nodeId = node.info #TODO log the config used