mirror of https://github.com/waku-org/nwaku.git
add burst and auto-registration
This commit is contained in:
parent
8aa76dc537
commit
68869a9b47
|
@ -48,42 +48,71 @@ proc send(
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
|
proc burstPublish(
|
||||||
|
waku: Waku, conf: WakuNodeConf, contentTopic: ContentTopic
|
||||||
|
) {.async.} =
|
||||||
|
var futures: seq[Future[Result[void, string]]]
|
||||||
|
var i: uint64 = 0
|
||||||
|
var start = getTime().toUnixFloat()
|
||||||
|
|
||||||
|
while i < conf.rlnRelayUserMessageLimit:
|
||||||
|
futures.add(send(waku, contentTopic))
|
||||||
|
inc i
|
||||||
|
|
||||||
|
let results = await allFinished(futures)
|
||||||
|
|
||||||
|
var current = getTime().toUnixFloat()
|
||||||
|
var tillNextBurst =
|
||||||
|
int(int64(conf.rlnEpochSizeSec * 1000) - int64((current - start) * 1000))
|
||||||
|
info "Published messages",
|
||||||
|
sleep = tillNextBurst, msgCount = conf.rlnRelayUserMessageLimit
|
||||||
|
|
||||||
|
await sleepAsync(tillNextBurst)
|
||||||
|
|
||||||
|
proc iterativePublish(
|
||||||
|
waku: Waku, conf: WakuNodeConf, contentTopic: ContentTopic
|
||||||
|
) {.async.} =
|
||||||
|
var start = getTime().toUnixFloat()
|
||||||
|
|
||||||
|
(await send(waku, contentTopic)).isOkOr:
|
||||||
|
error "Failed to publish", err = error
|
||||||
|
|
||||||
|
#echo await (waku.node.isReady())
|
||||||
|
var current = getTime().toUnixFloat()
|
||||||
|
var tillNextMsg = int(int64(conf.spammerDelay) - int64((current - start) * 1000))
|
||||||
|
info "Published message", sleep = tillNextMsg
|
||||||
|
|
||||||
|
await sleepAsync(tillNextMsg)
|
||||||
|
|
||||||
proc runSpammer*(
|
proc runSpammer*(
|
||||||
waku: Waku, conf: WakuNodeConf, contentTopic: ContentTopic = "/spammer/0/test/plain"
|
waku: Waku, conf: WakuNodeConf, contentTopic: ContentTopic = "/spammer/0/test/plain"
|
||||||
) {.async.} =
|
) {.async.} =
|
||||||
if not conf.enable:
|
if not conf.spammerEnable:
|
||||||
return
|
return
|
||||||
|
|
||||||
if not conf.rlnRelay:
|
if not conf.rlnRelay:
|
||||||
error "RLN not configured!"
|
error "RLN not configured!"
|
||||||
quit(QuitFailure)
|
quit(QuitFailure)
|
||||||
|
|
||||||
var gotPeers = false
|
while true:
|
||||||
while not gotPeers:
|
|
||||||
var (inRelayPeers, outRelayPeers) =
|
var (inRelayPeers, outRelayPeers) =
|
||||||
waku.node.peerManager.connectedPeers(WakuRelayCodec)
|
waku.node.peerManager.connectedPeers(WakuRelayCodec)
|
||||||
|
|
||||||
var numPeers = len(inRelayPeers) + len(outRelayPeers)
|
var numPeers = len(inRelayPeers) + len(outRelayPeers)
|
||||||
if numPeers > 0:
|
if numPeers > 0:
|
||||||
gotPeers = true
|
break
|
||||||
info "Waiting for peers", numPeers = numPeers
|
info "Waiting for peers", numPeers = numPeers
|
||||||
await sleepAsync(1000)
|
await sleepAsync(1000)
|
||||||
|
|
||||||
#var rate = int(float(1000) / float(conf.msgRate))
|
#var rate = int(float(1000) / float(conf.msgRate))
|
||||||
#var delayBetweenMsg =
|
#var delayBetweenMsg =
|
||||||
# float(conf.rlnEpochSizeSec * 1000) /
|
# float(conf.rlnEpochSizeSec * 1000) /
|
||||||
# (float(conf.rlnRelayUserMessageLimit) * conf.msgRateMultiplier)
|
# (float(conf.rlnRelayUserMessageLimit) * conf.msgRateMultiplier)
|
||||||
|
|
||||||
info "Sending message with delay", delay = conf.delay
|
info "Sending message with delay", delay = conf.spammerDelay
|
||||||
|
|
||||||
while true:
|
while true:
|
||||||
var start = getTime().toUnix()
|
if conf.spammerBurst:
|
||||||
|
await burstPublish(waku, conf, contentTopic)
|
||||||
(await send(waku, contentTopic)).isOkOr:
|
else:
|
||||||
error "Failed to publish", err = error
|
await iterativePublish(waku, conf, contentTopic)
|
||||||
|
|
||||||
#echo await (waku.node.isReady())
|
|
||||||
var current = getTime().toUnix()
|
|
||||||
var tillNextMsg = int(int64(conf.delay) - (current - start))
|
|
||||||
info "Published messages", sleep = tillNextMsg
|
|
||||||
|
|
||||||
await sleepAsync(tillNextMsg)
|
|
||||||
|
|
|
@ -53,6 +53,8 @@ when isMainModule:
|
||||||
of inspectRlnDb:
|
of inspectRlnDb:
|
||||||
doInspectRlnDb(conf)
|
doInspectRlnDb(conf)
|
||||||
of noCommand:
|
of noCommand:
|
||||||
|
if conf.spammerEnable:
|
||||||
|
doRlnKeystoreGenerator(conf, false)
|
||||||
# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it
|
# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it
|
||||||
# It will always be called from main thread anyway.
|
# It will always be called from main thread anyway.
|
||||||
# Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety
|
# Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety
|
||||||
|
|
|
@ -15,7 +15,7 @@ import
|
||||||
logScope:
|
logScope:
|
||||||
topics = "rln_keystore_generator"
|
topics = "rln_keystore_generator"
|
||||||
|
|
||||||
proc doRlnKeystoreGenerator*(conf: WakuNodeConf) =
|
proc doRlnKeystoreGenerator*(conf: WakuNodeConf, quitOnSucces: bool = true) =
|
||||||
# 1. load configuration
|
# 1. load configuration
|
||||||
trace "configuration", conf = $conf
|
trace "configuration", conf = $conf
|
||||||
|
|
||||||
|
@ -42,7 +42,7 @@ proc doRlnKeystoreGenerator*(conf: WakuNodeConf) =
|
||||||
idSecretHash = credential.idSecretHash.inHex(),
|
idSecretHash = credential.idSecretHash.inHex(),
|
||||||
idCommitment = credential.idCommitment.inHex()
|
idCommitment = credential.idCommitment.inHex()
|
||||||
|
|
||||||
if not conf.execute:
|
if quitOnSucces and not conf.execute:
|
||||||
info "not executing, exiting"
|
info "not executing, exiting"
|
||||||
quit(0)
|
quit(0)
|
||||||
|
|
||||||
|
@ -91,7 +91,6 @@ proc doRlnKeystoreGenerator*(conf: WakuNodeConf) =
|
||||||
userMessageLimit: conf.rlnRelayUserMessageLimit,
|
userMessageLimit: conf.rlnRelayUserMessageLimit,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
let persistRes = addMembershipCredentials(
|
let persistRes = addMembershipCredentials(
|
||||||
conf.rlnRelayCredPath, keystoreCred, conf.rlnRelayCredPassword, RLNAppInfo
|
conf.rlnRelayCredPath, keystoreCred, conf.rlnRelayCredPassword, RLNAppInfo
|
||||||
)
|
)
|
||||||
|
@ -106,4 +105,6 @@ proc doRlnKeystoreGenerator*(conf: WakuNodeConf) =
|
||||||
except CatchableError:
|
except CatchableError:
|
||||||
error "failure while stopping OnchainGroupManager", error = getCurrentExceptionMsg()
|
error "failure while stopping OnchainGroupManager", error = getCurrentExceptionMsg()
|
||||||
quit(0) # 0 because we already registered on-chain
|
quit(0) # 0 because we already registered on-chain
|
||||||
quit(0)
|
|
||||||
|
if quitOnSucces:
|
||||||
|
quit(0)
|
||||||
|
|
|
@ -595,7 +595,8 @@ type WakuNodeConf* = object
|
||||||
name: "request-rate-period"
|
name: "request-rate-period"
|
||||||
.}: int64
|
.}: int64
|
||||||
|
|
||||||
enable* {.desc: "Enable spammer", defaultValue: false, name: "spammer".}: bool
|
spammerEnable* {.desc: "Enable spammer", defaultValue: false, name: "spammer".}:
|
||||||
|
bool
|
||||||
# msgRate* {.
|
# msgRate* {.
|
||||||
# desc: "Number of messages published per epoch",
|
# desc: "Number of messages published per epoch",
|
||||||
# defaultValue: 10,
|
# defaultValue: 10,
|
||||||
|
@ -607,12 +608,18 @@ type WakuNodeConf* = object
|
||||||
# defaultValue: 1,
|
# defaultValue: 1,
|
||||||
# name: "spammer-msg-multiplier"
|
# name: "spammer-msg-multiplier"
|
||||||
# .}: float
|
# .}: float
|
||||||
delay* {.
|
spammerDelay* {.
|
||||||
desc: "Delay between spawning a publish method (in miliseconds)",
|
desc: "Delay between spawning a publish method (in miliseconds)",
|
||||||
defaultValue: 0,
|
defaultValue: 0,
|
||||||
name: "spammer-delay-between-msg"
|
name: "spammer-delay-between-msg"
|
||||||
.}: int
|
.}: int
|
||||||
|
|
||||||
|
spammerBurst* {.
|
||||||
|
desc: "Send messages in burst instead of one by one",
|
||||||
|
defaultValue: false,
|
||||||
|
name: "spammer-burst"
|
||||||
|
.}: bool
|
||||||
|
|
||||||
## Parsing
|
## Parsing
|
||||||
|
|
||||||
# NOTE: Keys are different in nim-libp2p
|
# NOTE: Keys are different in nim-libp2p
|
||||||
|
|
Loading…
Reference in New Issue