From 60f99287cd6517e9b898484e5ad33ec683025cd3 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Mon, 21 Apr 2025 12:29:40 +0530 Subject: [PATCH] chore: address review comments --- examples/lightpush_publisher_mix.nim | 49 +++++++++------------ examples/lightpush_publisher_mix_config.nim | 6 +-- waku/factory/external_config.nim | 16 +++---- waku/node/waku_node.nim | 3 +- waku/waku_lightpush/client.nim | 1 + waku/waku_mix/protocol.nim | 7 ++- 6 files changed, 35 insertions(+), 47 deletions(-) diff --git a/examples/lightpush_publisher_mix.nim b/examples/lightpush_publisher_mix.nim index 41f92189b..17c4606eb 100644 --- a/examples/lightpush_publisher_mix.nim +++ b/examples/lightpush_publisher_mix.nim @@ -31,9 +31,6 @@ import ./lightpush_publisher_mix_config, ./lightpush_publisher_mix_metrics -proc now*(): Timestamp = - getNanosecondTime(getTime().toUnixFloat()) - const clusterId = 66 const shardId = @[0'u16] @@ -52,7 +49,7 @@ proc splitPeerIdAndAddr(maddr: string): (string, string) = peerId = parts[1] return (address, peerId) -proc setupAndPublish(rng: ref HmacDrbgContext, conf: LPMixConf) {.async.} = +proc setupAndPublish(rng: ref HmacDrbgContext, conf: LightPushMixConf) {.async.} = # use notice to filter all waku messaging setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT) @@ -72,13 +69,10 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LPMixConf) {.async.} = "Building ENR with relay sharding failed" ) - let recordRes = enrBuilder.build() - let record = - if recordRes.isErr(): - error "failed to create enr record", error = recordRes.error - quit(QuitFailure) - else: - recordRes.get() + let record = enrBuilder.build().valueOr: + error "failed to create enr record", error = error + quit(QuitFailure) + setLogLevel(logging.LogLevel.TRACE) var builder = WakuNodeBuilder.init() builder.withNodeKey(nodeKey) @@ -92,14 +86,14 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LPMixConf) {.async.} = try: await node.mountPeerExchange(some(uint16(clusterId))) except CatchableError: - error "failed to mount waku peer-exchange protocol: ", - errmsg = getCurrentExceptionMsg() + error "failed to mount waku peer-exchange protocol", + error = getCurrentExceptionMsg() return let (destPeerAddr, destPeerId) = splitPeerIdAndAddr(conf.destPeerAddr) let (pxPeerAddr, pxPeerId) = splitPeerIdAndAddr(conf.pxAddr) - info "dest peer address: ", destPeerAddr = destPeerAddr, destPeerId = destPeerId - info "peer exchange address: ", pxPeerAddr = pxPeerAddr, pxPeerId = pxPeerId + info "dest peer address", destPeerAddr = destPeerAddr, destPeerId = destPeerId + info "peer exchange address", pxPeerAddr = pxPeerAddr, pxPeerId = pxPeerId let pxPeerInfo = RemotePeerInfo.init(destPeerId, @[MultiAddress.init(destPeerAddr).get()]) node.peerManager.addServicePeer(pxPeerInfo, WakuPeerExchangeCodec) @@ -108,11 +102,10 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LPMixConf) {.async.} = RemotePeerInfo.init(pxPeerId, @[MultiAddress.init(pxPeerAddr).get()]) node.peerManager.addServicePeer(pxPeerInfo1, WakuPeerExchangeCodec) - if not conf.withoutMix: - let keyPairResult = generateKeyPair() - if keyPairResult.isErr: + if not conf.mixDisabled: + let (mixPrivKey, mixPubKey) = generateKeyPair().valueOr: + error "failed to generate mix key pair", error = error return - let (mixPrivKey, mixPubKey) = keyPairResult.get() (await node.mountMix(clusterId, mixPrivKey)).isOkOr: error "failed to mount waku mix protocol: ", error = $error return @@ -121,7 +114,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LPMixConf) {.async.} = error "Failed to initialize PeerId", err = error return var conn: Connection - if not conf.withoutMix: + if not conf.mixDisabled: conn = MixEntryConnection.newConn( destPeerAddr, dPeerId, ProtocolType.fromString(WakuLightPushCodec), node.mix ) @@ -136,7 +129,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LPMixConf) {.async.} = (await node.fetchPeerExchangePeers()).isOkOr: warn "Cannot fetch peers from peer exchange", cause = error - if not conf.withoutMix: + if not conf.mixDisabled: while node.getMixNodePoolSize() < conf.minMixPoolSize: info "waiting for mix nodes to be discovered", currentpoolSize = node.getMixNodePoolSize() @@ -146,10 +139,10 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LPMixConf) {.async.} = var i = 0 while i < conf.numMsgs: - if conf.withoutMix: + if conf.mixDisabled: let connOpt = await node.peerManager.dialPeer(dPeerId, WakuLightPushCodec) if connOpt.isNone(): - error "failed to dial peer" + error "failed to dial peer with WakuLightPushCodec", target_peer_id = dPeerId return conn = connOpt.get() i = i + 1 @@ -166,14 +159,14 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LPMixConf) {.async.} = payload: toBytes(text), # content of the message contentTopic: LightpushContentTopic, # content topic to publish to ephemeral: true, # tell store nodes to not store it - timestamp: now(), + timestamp: getNowInNanosecondTime(), ) # current timestamp let res = await node.wakuLightpushClient.publishWithConn( LightpushPubsubTopic, message, conn, dPeerId ) - if res.isOk: + if res.isOk(): lp_mix_success.inc() notice "published message", text = text, @@ -184,14 +177,14 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LPMixConf) {.async.} = error "failed to publish message", error = res.error lp_mix_failed.inc(labelValues = ["publish_error"]) - if conf.withoutMix: + if conf.mixDisabled: await conn.close() - await sleepAsync(conf.msgInterval) + await sleepAsync(conf.msgIntervalMilliseconds) info "###########Sent all messages via mix" quit(0) when isMainModule: - let conf = LPMixConf.load() + let conf = LightPushMixConf.load() let rng = crypto.newRng() asyncSpawn setupAndPublish(rng, conf) runForever() diff --git a/examples/lightpush_publisher_mix_config.nim b/examples/lightpush_publisher_mix_config.nim index e6555e11b..22cc89239 100644 --- a/examples/lightpush_publisher_mix_config.nim +++ b/examples/lightpush_publisher_mix_config.nim @@ -1,6 +1,6 @@ import confutils/defs -type LPMixConf* = object +type LightPushMixConf* = object destPeerAddr* {.desc: "Destination peer address with peerId.", name: "dp-addr".}: string @@ -11,7 +11,7 @@ type LPMixConf* = object numMsgs* {.desc: "Number of messages to send.", defaultValue: 1, name: "num-msgs".}: int - msgInterval* {. + msgIntervalMilliseconds* {. desc: "Interval between messages in milliseconds.", defaultValue: 1000, name: "msg-interval" @@ -23,6 +23,6 @@ type LPMixConf* = object name: "min-mix-pool-size" .}: int - withoutMix* {. + mixDisabled* {. desc: "Do not use mix for publishing.", defaultValue: false, name: "without-mix" .}: bool diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index ccb90747d..fcdb8160c 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -632,17 +632,13 @@ with the drawback of consuming some more bandwidth.""", .}: bool #Mix config - mix* {. - desc: "Enable mix protocol: true|false", - defaultValue: false, - name: "mix" - .}: bool - mixkey* {. - desc: "ED25519 private key as 64 char hex string.", - name: "mixkey" - .}: Option[string] + mix* {.desc: "Enable mix protocol: true|false", defaultValue: false, name: "mix".}: + bool + + mixkey* {.desc: "ED25519 private key as 64 char hex string.", name: "mixkey".}: + Option[string] #TODO: Temp config for simulations.Ideally need to get this info from bootstrap ENRs -#[ mixBootstrapNodes* {. + #[ mixBootstrapNodes* {. desc: "Text-encoded data for mix bootstrap node. Encoded in the format Multiaddress:libp2pPubKey:MixPubKey. Argument may be repeated.", name: "mix-bootstrap-node" diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 4200da20e..349ef2942 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -231,8 +231,7 @@ proc mountMix*( let nodeAddr = localaddrStr & "/p2p/" & $node.peerId # TODO: Pass bootnodes from config, - let protoRes = - WakuMix.new(nodeAddr, node.switch, node.peerManager, clusterId, mixPrivKey) + let protoRes = WakuMix.new(nodeAddr, node.peerManager, clusterId, mixPrivKey) if protoRes.isErr: error "Waku Mix protocol initialization failed", err = protoRes.error return diff --git a/waku/waku_lightpush/client.nim b/waku/waku_lightpush/client.nim index 76cc22f4b..18b4cabd4 100644 --- a/waku/waku_lightpush/client.nim +++ b/waku/waku_lightpush/client.nim @@ -119,6 +119,7 @@ proc publishToAny*( return lightpushSuccessResult(publishedCount) +#TODO: Remove multiple publishX procs and use Option pattern instead proc publishWithConn*( wl: WakuLightPushClient, pubSubTopic: PubsubTopic, diff --git a/waku/waku_mix/protocol.nim b/waku/waku_mix/protocol.nim index f9ff15bf5..462fa43fc 100644 --- a/waku/waku_mix/protocol.nim +++ b/waku/waku_mix/protocol.nim @@ -131,7 +131,6 @@ proc startMixNodePoolMgr*(mix: WakuMix) {.async.} = proc new*( T: type WakuMix, nodeAddr: string, - switch: Switch, peermgr: PeerManager, clusterId: uint16, mixPrivKey: Curve25519Key, @@ -140,13 +139,13 @@ proc new*( info "mixPrivKey", mixPrivKey = mixPrivKey, mixPubKey = mixPubKey let localMixNodeInfo = initMixNodeInfo( - nodeAddr, mixPubKey, mixPrivKey, switch.peerInfo.publicKey.skkey, - switch.peerInfo.privateKey.skkey, + nodeAddr, mixPubKey, mixPrivKey, peermgr.switch.peerInfo.publicKey.skkey, + peermgr.switch.peerInfo.privateKey.skkey, ) # TODO : ideally mix should not be marked ready until certain min pool of mixNodes are discovered var m = WakuMix(peerManager: peermgr, clusterId: clusterId) - m.init(localMixNodeInfo, switch, initTable[PeerId, MixPubInfo]()) + m.init(localMixNodeInfo, peermgr.switch, initTable[PeerId, MixPubInfo]()) procCall MixProtocol(m).init() return ok(m)