chore: address review comments

This commit is contained in:
Prem Chaitanya Prathi 2025-04-21 12:29:40 +05:30
parent 7630d4bbfb
commit 60f99287cd
6 changed files with 35 additions and 47 deletions

View File

@ -31,9 +31,6 @@ import
./lightpush_publisher_mix_config, ./lightpush_publisher_mix_config,
./lightpush_publisher_mix_metrics ./lightpush_publisher_mix_metrics
proc now*(): Timestamp =
getNanosecondTime(getTime().toUnixFloat())
const clusterId = 66 const clusterId = 66
const shardId = @[0'u16] const shardId = @[0'u16]
@ -52,7 +49,7 @@ proc splitPeerIdAndAddr(maddr: string): (string, string) =
peerId = parts[1] peerId = parts[1]
return (address, peerId) return (address, peerId)
proc setupAndPublish(rng: ref HmacDrbgContext, conf: LPMixConf) {.async.} = proc setupAndPublish(rng: ref HmacDrbgContext, conf: LightPushMixConf) {.async.} =
# use notice to filter all waku messaging # use notice to filter all waku messaging
setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT) setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)
@ -72,13 +69,10 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LPMixConf) {.async.} =
"Building ENR with relay sharding failed" "Building ENR with relay sharding failed"
) )
let recordRes = enrBuilder.build() let record = enrBuilder.build().valueOr:
let record = error "failed to create enr record", error = error
if recordRes.isErr(): quit(QuitFailure)
error "failed to create enr record", error = recordRes.error
quit(QuitFailure)
else:
recordRes.get()
setLogLevel(logging.LogLevel.TRACE) setLogLevel(logging.LogLevel.TRACE)
var builder = WakuNodeBuilder.init() var builder = WakuNodeBuilder.init()
builder.withNodeKey(nodeKey) builder.withNodeKey(nodeKey)
@ -92,14 +86,14 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LPMixConf) {.async.} =
try: try:
await node.mountPeerExchange(some(uint16(clusterId))) await node.mountPeerExchange(some(uint16(clusterId)))
except CatchableError: except CatchableError:
error "failed to mount waku peer-exchange protocol: ", error "failed to mount waku peer-exchange protocol",
errmsg = getCurrentExceptionMsg() error = getCurrentExceptionMsg()
return return
let (destPeerAddr, destPeerId) = splitPeerIdAndAddr(conf.destPeerAddr) let (destPeerAddr, destPeerId) = splitPeerIdAndAddr(conf.destPeerAddr)
let (pxPeerAddr, pxPeerId) = splitPeerIdAndAddr(conf.pxAddr) let (pxPeerAddr, pxPeerId) = splitPeerIdAndAddr(conf.pxAddr)
info "dest peer address: ", destPeerAddr = destPeerAddr, destPeerId = destPeerId info "dest peer address", destPeerAddr = destPeerAddr, destPeerId = destPeerId
info "peer exchange address: ", pxPeerAddr = pxPeerAddr, pxPeerId = pxPeerId info "peer exchange address", pxPeerAddr = pxPeerAddr, pxPeerId = pxPeerId
let pxPeerInfo = let pxPeerInfo =
RemotePeerInfo.init(destPeerId, @[MultiAddress.init(destPeerAddr).get()]) RemotePeerInfo.init(destPeerId, @[MultiAddress.init(destPeerAddr).get()])
node.peerManager.addServicePeer(pxPeerInfo, WakuPeerExchangeCodec) node.peerManager.addServicePeer(pxPeerInfo, WakuPeerExchangeCodec)
@ -108,11 +102,10 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LPMixConf) {.async.} =
RemotePeerInfo.init(pxPeerId, @[MultiAddress.init(pxPeerAddr).get()]) RemotePeerInfo.init(pxPeerId, @[MultiAddress.init(pxPeerAddr).get()])
node.peerManager.addServicePeer(pxPeerInfo1, WakuPeerExchangeCodec) node.peerManager.addServicePeer(pxPeerInfo1, WakuPeerExchangeCodec)
if not conf.withoutMix: if not conf.mixDisabled:
let keyPairResult = generateKeyPair() let (mixPrivKey, mixPubKey) = generateKeyPair().valueOr:
if keyPairResult.isErr: error "failed to generate mix key pair", error = error
return return
let (mixPrivKey, mixPubKey) = keyPairResult.get()
(await node.mountMix(clusterId, mixPrivKey)).isOkOr: (await node.mountMix(clusterId, mixPrivKey)).isOkOr:
error "failed to mount waku mix protocol: ", error = $error error "failed to mount waku mix protocol: ", error = $error
return return
@ -121,7 +114,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LPMixConf) {.async.} =
error "Failed to initialize PeerId", err = error error "Failed to initialize PeerId", err = error
return return
var conn: Connection var conn: Connection
if not conf.withoutMix: if not conf.mixDisabled:
conn = MixEntryConnection.newConn( conn = MixEntryConnection.newConn(
destPeerAddr, dPeerId, ProtocolType.fromString(WakuLightPushCodec), node.mix destPeerAddr, dPeerId, ProtocolType.fromString(WakuLightPushCodec), node.mix
) )
@ -136,7 +129,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LPMixConf) {.async.} =
(await node.fetchPeerExchangePeers()).isOkOr: (await node.fetchPeerExchangePeers()).isOkOr:
warn "Cannot fetch peers from peer exchange", cause = error warn "Cannot fetch peers from peer exchange", cause = error
if not conf.withoutMix: if not conf.mixDisabled:
while node.getMixNodePoolSize() < conf.minMixPoolSize: while node.getMixNodePoolSize() < conf.minMixPoolSize:
info "waiting for mix nodes to be discovered", info "waiting for mix nodes to be discovered",
currentpoolSize = node.getMixNodePoolSize() currentpoolSize = node.getMixNodePoolSize()
@ -146,10 +139,10 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LPMixConf) {.async.} =
var i = 0 var i = 0
while i < conf.numMsgs: while i < conf.numMsgs:
if conf.withoutMix: if conf.mixDisabled:
let connOpt = await node.peerManager.dialPeer(dPeerId, WakuLightPushCodec) let connOpt = await node.peerManager.dialPeer(dPeerId, WakuLightPushCodec)
if connOpt.isNone(): if connOpt.isNone():
error "failed to dial peer" error "failed to dial peer with WakuLightPushCodec", target_peer_id = dPeerId
return return
conn = connOpt.get() conn = connOpt.get()
i = i + 1 i = i + 1
@ -166,14 +159,14 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LPMixConf) {.async.} =
payload: toBytes(text), # content of the message payload: toBytes(text), # content of the message
contentTopic: LightpushContentTopic, # content topic to publish to contentTopic: LightpushContentTopic, # content topic to publish to
ephemeral: true, # tell store nodes to not store it ephemeral: true, # tell store nodes to not store it
timestamp: now(), timestamp: getNowInNanosecondTime(),
) # current timestamp ) # current timestamp
let res = await node.wakuLightpushClient.publishWithConn( let res = await node.wakuLightpushClient.publishWithConn(
LightpushPubsubTopic, message, conn, dPeerId LightpushPubsubTopic, message, conn, dPeerId
) )
if res.isOk: if res.isOk():
lp_mix_success.inc() lp_mix_success.inc()
notice "published message", notice "published message",
text = text, text = text,
@ -184,14 +177,14 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LPMixConf) {.async.} =
error "failed to publish message", error = res.error error "failed to publish message", error = res.error
lp_mix_failed.inc(labelValues = ["publish_error"]) lp_mix_failed.inc(labelValues = ["publish_error"])
if conf.withoutMix: if conf.mixDisabled:
await conn.close() await conn.close()
await sleepAsync(conf.msgInterval) await sleepAsync(conf.msgIntervalMilliseconds)
info "###########Sent all messages via mix" info "###########Sent all messages via mix"
quit(0) quit(0)
when isMainModule: when isMainModule:
let conf = LPMixConf.load() let conf = LightPushMixConf.load()
let rng = crypto.newRng() let rng = crypto.newRng()
asyncSpawn setupAndPublish(rng, conf) asyncSpawn setupAndPublish(rng, conf)
runForever() runForever()

View File

@ -1,6 +1,6 @@
import confutils/defs import confutils/defs
type LPMixConf* = object type LightPushMixConf* = object
destPeerAddr* {.desc: "Destination peer address with peerId.", name: "dp-addr".}: destPeerAddr* {.desc: "Destination peer address with peerId.", name: "dp-addr".}:
string string
@ -11,7 +11,7 @@ type LPMixConf* = object
numMsgs* {.desc: "Number of messages to send.", defaultValue: 1, name: "num-msgs".}: numMsgs* {.desc: "Number of messages to send.", defaultValue: 1, name: "num-msgs".}:
int int
msgInterval* {. msgIntervalMilliseconds* {.
desc: "Interval between messages in milliseconds.", desc: "Interval between messages in milliseconds.",
defaultValue: 1000, defaultValue: 1000,
name: "msg-interval" name: "msg-interval"
@ -23,6 +23,6 @@ type LPMixConf* = object
name: "min-mix-pool-size" name: "min-mix-pool-size"
.}: int .}: int
withoutMix* {. mixDisabled* {.
desc: "Do not use mix for publishing.", defaultValue: false, name: "without-mix" desc: "Do not use mix for publishing.", defaultValue: false, name: "without-mix"
.}: bool .}: bool

View File

@ -632,17 +632,13 @@ with the drawback of consuming some more bandwidth.""",
.}: bool .}: bool
#Mix config #Mix config
mix* {. mix* {.desc: "Enable mix protocol: true|false", defaultValue: false, name: "mix".}:
desc: "Enable mix protocol: true|false", bool
defaultValue: false,
name: "mix" mixkey* {.desc: "ED25519 private key as 64 char hex string.", name: "mixkey".}:
.}: bool Option[string]
mixkey* {.
desc: "ED25519 private key as 64 char hex string.",
name: "mixkey"
.}: Option[string]
#TODO: Temp config for simulations.Ideally need to get this info from bootstrap ENRs #TODO: Temp config for simulations.Ideally need to get this info from bootstrap ENRs
#[ mixBootstrapNodes* {. #[ mixBootstrapNodes* {.
desc: desc:
"Text-encoded data for mix bootstrap node. Encoded in the format Multiaddress:libp2pPubKey:MixPubKey. Argument may be repeated.", "Text-encoded data for mix bootstrap node. Encoded in the format Multiaddress:libp2pPubKey:MixPubKey. Argument may be repeated.",
name: "mix-bootstrap-node" name: "mix-bootstrap-node"

View File

@ -231,8 +231,7 @@ proc mountMix*(
let nodeAddr = localaddrStr & "/p2p/" & $node.peerId let nodeAddr = localaddrStr & "/p2p/" & $node.peerId
# TODO: Pass bootnodes from config, # TODO: Pass bootnodes from config,
let protoRes = let protoRes = WakuMix.new(nodeAddr, node.peerManager, clusterId, mixPrivKey)
WakuMix.new(nodeAddr, node.switch, node.peerManager, clusterId, mixPrivKey)
if protoRes.isErr: if protoRes.isErr:
error "Waku Mix protocol initialization failed", err = protoRes.error error "Waku Mix protocol initialization failed", err = protoRes.error
return return

View File

@ -119,6 +119,7 @@ proc publishToAny*(
return lightpushSuccessResult(publishedCount) return lightpushSuccessResult(publishedCount)
#TODO: Remove multiple publishX procs and use Option pattern instead
proc publishWithConn*( proc publishWithConn*(
wl: WakuLightPushClient, wl: WakuLightPushClient,
pubSubTopic: PubsubTopic, pubSubTopic: PubsubTopic,

View File

@ -131,7 +131,6 @@ proc startMixNodePoolMgr*(mix: WakuMix) {.async.} =
proc new*( proc new*(
T: type WakuMix, T: type WakuMix,
nodeAddr: string, nodeAddr: string,
switch: Switch,
peermgr: PeerManager, peermgr: PeerManager,
clusterId: uint16, clusterId: uint16,
mixPrivKey: Curve25519Key, mixPrivKey: Curve25519Key,
@ -140,13 +139,13 @@ proc new*(
info "mixPrivKey", mixPrivKey = mixPrivKey, mixPubKey = mixPubKey info "mixPrivKey", mixPrivKey = mixPrivKey, mixPubKey = mixPubKey
let localMixNodeInfo = initMixNodeInfo( let localMixNodeInfo = initMixNodeInfo(
nodeAddr, mixPubKey, mixPrivKey, switch.peerInfo.publicKey.skkey, nodeAddr, mixPubKey, mixPrivKey, peermgr.switch.peerInfo.publicKey.skkey,
switch.peerInfo.privateKey.skkey, peermgr.switch.peerInfo.privateKey.skkey,
) )
# TODO : ideally mix should not be marked ready until certain min pool of mixNodes are discovered # TODO : ideally mix should not be marked ready until certain min pool of mixNodes are discovered
var m = WakuMix(peerManager: peermgr, clusterId: clusterId) var m = WakuMix(peerManager: peermgr, clusterId: clusterId)
m.init(localMixNodeInfo, switch, initTable[PeerId, MixPubInfo]()) m.init(localMixNodeInfo, peermgr.switch, initTable[PeerId, MixPubInfo]())
procCall MixProtocol(m).init() procCall MixProtocol(m).init()
return ok(m) return ok(m)