From 5ffee89689143c7faac05f46c8a6bdb06de1df95 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 25 Mar 2025 11:35:00 +0530 Subject: [PATCH] added px capability for lightpush example, node pool related fixes --- examples/lightpush_publisher_mix.nim | 37 ++++++++++++++++++++++++---- vendor/mix | 2 +- waku/node/waku_node.nim | 8 ++++-- 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/examples/lightpush_publisher_mix.nim b/examples/lightpush_publisher_mix.nim index 9572d77b6..2f2ee14b7 100644 --- a/examples/lightpush_publisher_mix.nim +++ b/examples/lightpush_publisher_mix.nim @@ -7,6 +7,7 @@ import chronos, confutils, libp2p/crypto/crypto, + libp2p/multiaddress, eth/keys, eth/p2p/discoveryv5/enr @@ -37,7 +38,7 @@ const shardId = @[0'u16] const LightpushPeer = - "/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu" + "/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" LightpushPubsubTopic = PubsubTopic("/waku/2/rs/2/0") LightpushContentTopic = ContentTopic("/examples/1/light-pubsub-mix-example/proto") @@ -73,16 +74,35 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = builder.withNodeKey(nodeKey) builder.withRecord(record) builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet() + let node = builder.build().tryGet() node.mountMetadata(clusterId).expect("failed to mount waku metadata protocol") node.mountLightPushClient() + try: + await node.mountPeerExchange( + some(uint16(clusterId)) + ) + except CatchableError: + error "failed to mount waku peer-exchange protocol: ", errmsg = getCurrentExceptionMsg() + return + + let pxPeerInfo = RemotePeerInfo.init( + "16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", + @[MultiAddress.init("/ip4/127.0.0.1/tcp/60001").get()] + ) + node.peerManager.addServicePeer(pxPeerInfo, WakuPeerExchangeCodec) + let pxPeerInfo2 = RemotePeerInfo.init( + "16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu", + @[MultiAddress.init("/ip4/127.0.0.1/tcp/60005").get()] + ) + node.peerManager.addServicePeer(pxPeerInfo2, WakuPeerExchangeCodec) ( await node.mountMix("401dd1eb5582f6dc9488d424aa26ed1092becefcf8543172e6d92c17ed07265a") ).isOkOr: error "failed to mount waku mix protocol: ", error = $error return - discard node.setMixBootStrapNodes() + #discard node.setMixBootStrapNodes() let destPeerId = PeerId.init("16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o").valueOr: error "Failed to initialize PeerId", err = error @@ -96,9 +116,17 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = await node.start() node.peerManager.start() + node.startPeerExchangeLoop() + + (await node.fetchPeerExchangePeers()).isOkOr: + warn "Cannot fetch peers from peer exchange", cause = error + + while node.getMixNodePoolSize() < 3: + info "waiting for mix nodes to be discovered", currentpoolSize = node.getMixNodePoolSize() + await sleepAsync(1000) notice "publisher service started" - var numMsgs = 1 + var numMsgs = 4 var i = 0 while i < numMsgs: i = i + 1 @@ -122,8 +150,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = else: error "failed to publish message", error = res.error - await sleepAsync(5000) - break + await sleepAsync(1000) when isMainModule: diff --git a/vendor/mix b/vendor/mix index 709eb5a5e..230a5c495 160000 --- a/vendor/mix +++ b/vendor/mix @@ -1 +1 @@ -Subproject commit 709eb5a5e9cdecc113b03ce313cce3bb4f37c70e +Subproject commit 230a5c49583f8d82711f047e1ece21da9eff2b01 diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 351879b45..4caf1cbee 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -222,13 +222,13 @@ proc mountSharding*( proc getBootStrapMixNodes*(node: WakuNode): Table[PeerId, MixPubInfo] = var mixNodes = initTable[PeerId, MixPubInfo]() # MixNode Multiaddrs and PublicKeys: - let bootNodesMultiaddrs = [#"/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", + let bootNodesMultiaddrs = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", "/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF", "/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA", "/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f", "/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu", ] - let bootNodesMixPubKeys = [#"9d09ce624f76e8f606265edb9cca2b7de9b41772a6d784bddaf92ffa8fba7d2c", + let bootNodesMixPubKeys = ["9d09ce624f76e8f606265edb9cca2b7de9b41772a6d784bddaf92ffa8fba7d2c", "9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a", "275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c", "e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18", @@ -310,6 +310,7 @@ proc populateMixNodePool*(node: WakuNode){.async} = proc startMixNodePoolMgr*(node: WakuNode ){.async} = + info "starting mix node pool manager" # try more aggressively to populate the pool at startup var attempts = 50 # TODO: make initial pool size configurable @@ -322,6 +323,9 @@ proc startMixNodePoolMgr*(node: WakuNode ){.async} = heartbeat "Updating mix node pool", 10.minutes: discard node.populateMixNodePool() +proc getMixNodePoolSize*(node: WakuNode): int = + return node.mix.getNodePoolSize() + proc setMixBootStrapNodes*(node: WakuNode,){.async}= node.mix.setNodePool(node.getBootStrapMixNodes())