diff --git a/examples/lightpush_publisher_mix.nim b/examples/lightpush_publisher_mix.nim index 607c78e12..a634c3248 100644 --- a/examples/lightpush_publisher_mix.nim +++ b/examples/lightpush_publisher_mix.nim @@ -82,6 +82,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = ).isOkOr: error "failed to mount waku mix protocol: ", error = $error return + discard node.setMixBootStrapNodes() let destPeerId = PeerId.init("16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o").valueOr: error "Failed to initialize PeerId", err = error @@ -97,8 +98,11 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = node.peerManager.start() notice "publisher service started" - while true: - let text = "hi there i'm a publisher using mix" + var numMsgs = 2 + var i = 0 + while i < numMsgs: + i = i + 1 + let text = "hi there i'm a publisher using mix, this is msg number " & $i let message = WakuMessage( payload: toBytes(text), # content of the message contentTopic: LightpushContentTopic, # content topic to publish to @@ -118,8 +122,8 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = else: error "failed to publish message", error = res.error - await sleepAsync(5000) - break + await sleepAsync(500) + when isMainModule: let rng = crypto.newRng() diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 3f0b4d8b9..06de59db9 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -219,7 +219,7 @@ proc mountSharding*( node.wakuSharding = Sharding(clusterId: clusterId, shardCountGenZero: shardCount) return ok() -#[ proc getBootStrapMixNodes(node: WakuNode, exceptPeerID: PeerId): Table[PeerId, MixPubInfo] = +proc getBootStrapMixNodes*(node: WakuNode): Table[PeerId, MixPubInfo] = var mixNodes = initTable[PeerId, MixPubInfo]() # MixNode Multiaddrs and PublicKeys: let bootNodesMultiaddrs = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", @@ -239,20 +239,20 @@ proc mountSharding*( if peerIdRes.isErr: error "Failed to get peer id from multiaddress: " , error = peerIdRes.error let peerId = peerIdRes.get() - if peerID == exceptPeerID: - continue + #if (not peerID == nil) and peerID == exceptPeerID: + # continue let mixNodePubInfo = createMixPubInfo(mixNodeMultiaddr, intoCurve25519Key(ncrutils.fromHex(bootNodesMixPubKeys[index]))) mixNodes[peerId] = mixNodePubInfo info "using mix bootstrap nodes ", bootNodes = mixNodes return mixNodes - ]# + #TODO: Ideally these procs should be moved out into mix specific file, but keeping it here for now. proc mixPoolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool = if peer.enr.isNone(): - debug "peer has no ENR", peer = $peer + trace "peer has no ENR", peer = $peer return false if cluster.isSome() and peer.enr.get().isClusterMismatched(cluster.get()): @@ -297,7 +297,7 @@ proc populateMixNodePool*(node: WakuNode){.async} = let remotePeerENR = remotePeers[i].enr.get() # TODO: use the most exposed/external multiaddr of the peer, right now using the first let maddrWithPeerId = toString(addPeerId(remotePeers[i].addrs[0],remotePeers[i].peerId)) - debug "remote peer ENR", peerId = remotePeers[i].peerId, enr = remotePeerENR, maddr = maddrWithPeerId + trace "remote peer ENR", peerId = remotePeers[i].peerId, enr = remotePeerENR, maddr = maddrWithPeerId let peerMixPubKey = mixKey(remotePeerENR).get() let mixNodePubInfo = createMixPubInfo(maddrWithPeerId.value, intoCurve25519Key(peerMixPubKey)) @@ -305,21 +305,26 @@ proc populateMixNodePool*(node: WakuNode){.async} = # set the mix node pool node.mix.setNodePool(mixNodes) - debug "mix node pool updated", poolSize = node.mix.getNodePoolSize() + trace "mix node pool updated", poolSize = node.mix.getNodePoolSize() return proc startMixNodePoolMgr*(node: WakuNode ){.async} = # try more aggressively to populate the pool at startup var attempts = 50 + # TODO: make initial pool size configurable while node.mix.getNodePoolSize() < 100 and attempts > 0: attempts -= 1 discard node.populateMixNodePool() await sleepAsync(1.seconds) + # TODO: make interval configurable heartbeat "Updating mix node pool", 10.minutes: discard node.populateMixNodePool() +proc setMixBootStrapNodes*(node: WakuNode,){.async}= + node.mix.setNodePool(node.getBootStrapMixNodes()) + # Mix Protocol proc mountMix*(node: WakuNode, mixPrivKey: string): Future[Result[void, string]] {.async.} = info "mounting mix protocol", nodeId = node.info #TODO log the config used