modified lp example config

This commit is contained in:
Prem Chaitanya Prathi 2025-04-02 15:23:12 +05:30
parent f396a7a341
commit 8b8fafab49
2 changed files with 43 additions and 67 deletions

View File

@ -1,5 +1,5 @@
import import
std/[tables, times, sequtils], std/[tables, times, sequtils, strutils],
stew/byteutils, stew/byteutils,
stew/shims/net, stew/shims/net,
chronicles, chronicles,
@ -26,12 +26,11 @@ import
waku_enr, waku_enr,
discovery/waku_discv5, discovery/waku_discv5,
factory/builder, factory/builder,
waku_lightpush/client waku_lightpush/client,
], ],
./lightpush_publisher_mix_config, ./lightpush_publisher_mix_config,
./lightpush_publisher_mix_metrics ./lightpush_publisher_mix_metrics
proc now*(): Timestamp = proc now*(): Timestamp =
getNanosecondTime(getTime().toUnixFloat()) getNanosecondTime(getTime().toUnixFloat())
@ -42,6 +41,17 @@ const
LightpushPubsubTopic = PubsubTopic("/waku/2/rs/66/0") LightpushPubsubTopic = PubsubTopic("/waku/2/rs/66/0")
LightpushContentTopic = ContentTopic("/examples/1/light-pubsub-mix-example/proto") LightpushContentTopic = ContentTopic("/examples/1/light-pubsub-mix-example/proto")
proc splitPeerIdAndAddr(maddr: string): (string, string) =
let parts = maddr.split("/p2p/")
if parts.len != 2:
error "Invalid multiaddress format", parts = parts
return
let
address = parts[0]
peerId = parts[1]
return (address, peerId)
proc setupAndPublish(rng: ref HmacDrbgContext, conf: LPMixConf) {.async.} = proc setupAndPublish(rng: ref HmacDrbgContext, conf: LPMixConf) {.async.} =
# use notice to filter all waku messaging # use notice to filter all waku messaging
setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT) setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)
@ -86,17 +96,15 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LPMixConf) {.async.} =
errmsg = getCurrentExceptionMsg() errmsg = getCurrentExceptionMsg()
return return
let pxPeerInfo = RemotePeerInfo.init( let (destPeerAddr, destPeerId) = splitPeerIdAndAddr(conf.destPeerAddr)
conf.destPeerId, let (pxPeerAddr, pxPeerId) = splitPeerIdAndAddr(conf.pxAddr)
@[MultiAddress.init(conf.destPeerAddr).get()],
) let pxPeerInfo =
RemotePeerInfo.init(destPeerId, @[MultiAddress.init(destPeerAddr).get()])
node.peerManager.addServicePeer(pxPeerInfo, WakuPeerExchangeCodec) node.peerManager.addServicePeer(pxPeerInfo, WakuPeerExchangeCodec)
let pxPeerInfo1 =
let pxPeerInfo1 = RemotePeerInfo.init( RemotePeerInfo.init(pxPeerId, @[MultiAddress.init(pxPeerAddr).get()])
conf.pxId,
@[MultiAddress.init(conf.pxAddr).get()],
)
node.peerManager.addServicePeer(pxPeerInfo1, WakuPeerExchangeCodec) node.peerManager.addServicePeer(pxPeerInfo1, WakuPeerExchangeCodec)
let keyPairResult = generateKeyPair() let keyPairResult = generateKeyPair()
@ -104,21 +112,15 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LPMixConf) {.async.} =
return return
let (mixPrivKey, mixPubKey) = keyPairResult.get() let (mixPrivKey, mixPubKey) = keyPairResult.get()
( (await node.mountMix(mixPrivKey)).isOkOr:
await node.mountMix(mixPrivKey)
).isOkOr:
error "failed to mount waku mix protocol: ", error = $error error "failed to mount waku mix protocol: ", error = $error
return return
let dPeerId = PeerId.init(destPeerId).valueOr:
let destPeerId = PeerId.init(conf.destPeerId).valueOr:
error "Failed to initialize PeerId", err = error error "Failed to initialize PeerId", err = error
return return
let conn = MixEntryConnection.newConn( let conn = MixEntryConnection.newConn(
conf.destPeerAddr, destPeerAddr, dPeerId, ProtocolType.fromString(WakuLightPushCodec), node.mix
destPeerId,
ProtocolType.fromString(WakuLightPushCodec),
node.mix,
) )
await node.start() await node.start()
@ -136,7 +138,8 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LPMixConf) {.async.} =
currentpoolSize = node.getMixNodePoolSize() currentpoolSize = node.getMixNodePoolSize()
await sleepAsync(1000) await sleepAsync(1000)
notice "publisher service started with mix node pool size ", currentpoolSize = node.getMixNodePoolSize() notice "publisher service started with mix node pool size ",
currentpoolSize = node.getMixNodePoolSize()
var i = 0 var i = 0
while i < conf.numMsgs: while i < conf.numMsgs:
i = i + 1 i = i + 1

View File

@ -1,51 +1,24 @@
import import confutils/defs
confutils/defs
type LPMixConf* = object
destPeerAddr* {.desc: "Destination peer address with peerId.", name: "dp-addr".}:
string
type pxAddr* {.desc: "Peer exchange address with peerId.", name: "px-addr".}: string
LPMixConf* = object
destPeerAddr* {. port* {.desc: "Port to listen on.", defaultValue: 50000, name: "port".}: int
desc: "Destination peer address.",
name: "dp-addr",
}: string
destPeerId* {. numMsgs* {.desc: "Number of messages to send.", defaultValue: 1, name: "num-msgs".}:
desc: "Destination peer ID.", int
name: "dp-id",
}: string
pxAddr* {. msgInterval* {.
desc: "Peer exchange address.", desc: "Interval between messages in milliseconds.",
defaultValue: "localhost:50001", defaultValue: 1000,
name: "px-addr", name: "msg-interval"
}: string .}: int
pxId* {.
desc: "Peer exchange ID.",
defaultValue: "waku-v2-peer-exchange",
name: "px-id",
}: string
port* {. minMixPoolSize* {.
desc: "Port to listen on.", desc: "Number of messages to wait for before sending.",
defaultValue: 50000, defaultValue: 3,
name: "port", name: "min-mix-pool-size"
}: int .}: int
numMsgs* {.
desc: "Number of messages to send.",
defaultValue: 1,
name: "num-msgs",
}: int
msgInterval*{.
desc: "Interval between messages in milliseconds.",
defaultValue: 1000,
name: "msg-interval",
}: int
minMixPoolSize* {.
desc: "Number of messages to wait for before sending.",
defaultValue: 3,
name: "min-mix-pool-size",
}: int