mergeing updates

This commit is contained in:
stubbsta 2025-04-01 14:14:05 +02:00
commit aa33e48fe0
14 changed files with 354 additions and 128 deletions

View File

@ -30,7 +30,7 @@ import
const os* {.strdefine.} = "" const os* {.strdefine.} = ""
when os == "Linux" and when os == "Linux" and
# GitHub only supports container actions on Linux # GitHub only supports container actions on Linux
# and we need to start a postgress database in a docker container # and we need to start a postgres database in a docker container
defined(postgres): defined(postgres):
import import
./waku_archive/test_driver_postgres_query, ./waku_archive/test_driver_postgres_query,
@ -106,3 +106,4 @@ import
import ./waku_rln_relay/test_all import ./waku_rln_relay/test_all
# Node Factory # Node Factory
import ./factory/test_config

View File

@ -0,0 +1,157 @@
{.used.}
import
std/options,
testutils/unittests,
chronos,
libp2p/crypto/[crypto, secp],
libp2p/multiaddress,
nimcrypto/utils,
secp256k1,
confutils
import
../../waku/factory/external_config,
../../waku/factory/internal_config,
../../waku/factory/networks_config,
../../waku/common/logging
suite "Waku config - apply preset":
test "Default preset is TWN":
## Setup
let expectedConf = ClusterConf.TheWakuNetworkConf()
## Given
let preConfig = WakuNodeConf(cmd: noCommand, preset: "default")
## When
let res = applyPresetConfiguration(preConfig)
assert res.isOk(), $res.error
## Then
let conf = res.get()
assert conf.maxMessageSize == expectedConf.maxMessageSize
assert conf.clusterId == expectedConf.clusterId
assert conf.rlnRelay == expectedConf.rlnRelay
assert conf.rlnRelayEthContractAddress == expectedConf.rlnRelayEthContractAddress
assert conf.rlnRelayDynamic == expectedConf.rlnRelayDynamic
assert conf.rlnRelayChainId == expectedConf.rlnRelayChainId
assert conf.rlnRelayBandwidthThreshold == expectedConf.rlnRelayBandwidthThreshold
assert conf.rlnEpochSizeSec == expectedConf.rlnEpochSizeSec
assert conf.rlnRelayUserMessageLimit == expectedConf.rlnRelayUserMessageLimit
assert conf.numShardsInNetwork == expectedConf.numShardsInNetwork
assert conf.discv5BootstrapNodes == expectedConf.discv5BootstrapNodes
test "Subscribes to all valid shards in default network":
## Setup
let expectedConf = ClusterConf.TheWakuNetworkConf()
## Given
let shards: seq[uint16] = @[0, 1, 2, 3, 4, 5, 6, 7]
let preConfig = WakuNodeConf(cmd: noCommand, preset: "default", shards: shards)
## When
let res = applyPresetConfiguration(preConfig)
assert res.isOk(), $res.error
## Then
let conf = res.get()
assert conf.shards.len == expectedConf.numShardsInNetwork.int
test "Subscribes to some valid shards in default network":
## Setup
let expectedConf = ClusterConf.TheWakuNetworkConf()
## Given
let shards: seq[uint16] = @[0, 4, 7]
let preConfig = WakuNodeConf(cmd: noCommand, preset: "default", shards: shards)
## When
let resConf = applyPresetConfiguration(preConfig)
let res = validateShards(resConf.get())
assert res.isOk(), $res.error
## Then
let conf = resConf.get()
assert conf.shards.len() == shards.len()
for index, shard in shards:
assert shard in conf.shards
test "Subscribes to invalid shards in default network":
## Setup
## Given
let shards: seq[uint16] = @[0, 4, 7, 10]
let preConfig = WakuNodeConf(cmd: noCommand, preset: "default", shards: shards)
let postConfig = applyPresetConfiguration(preConfig)
## When
let res = validateShards(postConfig.get())
## Then
assert res.isErr(), "Invalid shard was accepted"
suite "Waku config - node key":
test "Passed node key is used":
## Setup
let nodeKeyStr =
"0011223344556677889900aabbccddeeff0011223344556677889900aabbccddeeff"
let nodekey = block:
let key = SkPrivateKey.init(utils.fromHex(nodeKeyStr)).tryGet()
crypto.PrivateKey(scheme: Secp256k1, skkey: key)
## Given
let config = WakuNodeConf.load(version = "", cmdLine = @["--nodekey=" & nodeKeyStr])
## When
let res = getNodeKey(config)
assert res.isOk(), $res.error
## Then
let resKey = res.get()
assert utils.toHex(resKey.getRawBytes().get()) ==
utils.toHex(nodekey.getRawBytes().get())
suite "Waku config - Shards":
test "Shards are valid":
## Setup
## Given
let shards: seq[uint16] = @[0, 2, 4]
let numShardsInNetwork = 5.uint32
let config = WakuNodeConf(
cmd: noCommand, shards: shards, numShardsInNetwork: numShardsInNetwork
)
## When
let res = validateShards(config)
## Then
assert res.isOk(), $res.error
test "Shards are not in range":
## Setup
## Given
let shards: seq[uint16] = @[0, 2, 5]
let numShardsInNetwork = 5.uint32
let config = WakuNodeConf(
cmd: noCommand, shards: shards, numShardsInNetwork: numShardsInNetwork
)
## When
let res = validateShards(config)
## Then
assert res.isErr(), "Invalid shard was accepted"
test "Shard is passed without num shards":
## Setup
## Given
let config = WakuNodeConf.load(version = "", cmdLine = @["--shard=32"])
## When
let res = validateShards(config)
## Then
assert res.isOk(), $res.error

View File

@ -55,6 +55,8 @@ proc doRlnKeystoreGenerator*(conf: WakuNodeConf) =
quit(QuitFailure) quit(QuitFailure)
# 4. initialize OnchainGroupManager # 4. initialize OnchainGroupManager
info "attempting to initialize OnchainGroupManager"
let groupManager = OnchainGroupManager( let groupManager = OnchainGroupManager(
ethClientUrl: string(conf.rlnRelayethClientAddress), ethClientUrl: string(conf.rlnRelayethClientAddress),
chainId: conf.rlnRelayChainId, chainId: conf.rlnRelayChainId,

View File

@ -163,14 +163,28 @@ task libwakuStatic, "Build the cbindings waku node library":
let name = "libwaku" let name = "libwaku"
buildLibrary name, buildLibrary name,
"library/", "library/",
"""-d:chronicles_line_numbers -d:chronicles_runtime_filtering=on -d:chronicles_sinks="textlines,json" -d:chronicles_default_output_device=Dynamic -d:chronicles_disabled_topics="eth,dnsdisc.client" --warning:Deprecated:off --warning:UnusedImport:on """, """-d:chronicles_line_numbers \
-d:chronicles_runtime_filtering=on \
-d:chronicles_sinks="textlines,json" \
-d:chronicles_default_output_device=Dynamic \
-d:chronicles_disabled_topics="eth,dnsdisc.client" \
--warning:Deprecated:off \
--warning:UnusedImport:on \
-d:chronicles_log_level=TRACE """,
"static" "static"
task libwakuDynamic, "Build the cbindings waku node library": task libwakuDynamic, "Build the cbindings waku node library":
let name = "libwaku" let name = "libwaku"
buildLibrary name, buildLibrary name,
"library/", "library/",
"""-d:chronicles_line_numbers -d:chronicles_runtime_filtering=on -d:chronicles_sinks="textlines,json" -d:chronicles_default_output_device=Dynamic -d:chronicles_disabled_topics="eth,dnsdisc.client" --warning:Deprecated:off --warning:UnusedImport:on """, """-d:chronicles_line_numbers \
-d:chronicles_runtime_filtering=on \
-d:chronicles_sinks="textlines,json" \
-d:chronicles_default_output_device=Dynamic \
-d:chronicles_disabled_topics="eth,dnsdisc.client" \
--warning:Deprecated:off \
--warning:UnusedImport:on \
-d:chronicles_log_level=TRACE """,
"dynamic" "dynamic"
### Mobile Android ### Mobile Android

View File

@ -22,6 +22,8 @@ import
libp2p/stream/connection, libp2p/stream/connection,
libp2p/utility libp2p/utility
import std/times except TimeInterval, Duration, seconds, minutes
import ./[single_token_limiter, service_metrics, timed_map] import ./[single_token_limiter, service_metrics, timed_map]
export token_bucket, setting, service_metrics export token_bucket, setting, service_metrics
@ -76,8 +78,15 @@ template checkUsageLimit*(
bodyWithinLimit, bodyRejected: untyped, bodyWithinLimit, bodyRejected: untyped,
) = ) =
if t.checkUsage(proto, conn): if t.checkUsage(proto, conn):
let requestStartTime = getTime().toUnixFloat()
waku_service_requests.inc(labelValues = [proto, "served"]) waku_service_requests.inc(labelValues = [proto, "served"])
bodyWithinLimit bodyWithinLimit
let requestDurationSec = getTime().toUnixFloat() - requestStartTime
waku_service_request_handling_duration_seconds.observe(
requestDurationSec, labelValues = [proto]
)
else: else:
waku_service_requests.inc(labelValues = [proto, "rejected"]) waku_service_requests.inc(labelValues = [proto, "rejected"])
bodyRejected bodyRejected

View File

@ -17,3 +17,6 @@ proc setServiceLimitMetric*(service: string, limit: Option[RateLimitSetting]) =
waku_service_requests_limit.set( waku_service_requests_limit.set(
limit.get().calculateLimitPerSecond(), labelValues = [service] limit.get().calculateLimitPerSecond(), labelValues = [service]
) )
declarePublicHistogram waku_service_request_handling_duration_seconds,
"duration of non-relay service handling", ["service"]

View File

@ -4,6 +4,8 @@
import std/[options], chronos/timer, libp2p/stream/connection, libp2p/utility import std/[options], chronos/timer, libp2p/stream/connection, libp2p/utility
import std/times except TimeInterval, Duration
import ./[token_bucket, setting, service_metrics] import ./[token_bucket, setting, service_metrics]
export token_bucket, setting, service_metrics export token_bucket, setting, service_metrics
@ -43,8 +45,15 @@ template checkUsageLimit*(
bodyWithinLimit, bodyRejected: untyped, bodyWithinLimit, bodyRejected: untyped,
) = ) =
if t.checkUsage(proto): if t.checkUsage(proto):
let requestStartTime = getTime().toUnixFloat()
waku_service_requests.inc(labelValues = [proto, "served"]) waku_service_requests.inc(labelValues = [proto, "served"])
bodyWithinLimit bodyWithinLimit
let requestDurationSec = getTime().toUnixFloat() - requestStartTime
waku_service_request_handling_duration_seconds.observe(
requestDurationSec, labelValues = [proto]
)
else: else:
waku_service_requests.inc(labelValues = [proto, "rejected"]) waku_service_requests.inc(labelValues = [proto, "rejected"])
bodyRejected bodyRejected

View File

@ -76,7 +76,7 @@ type WakuNodeConf* = object
.}: EthRpcUrl .}: EthRpcUrl
rlnRelayEthContractAddress* {. rlnRelayEthContractAddress* {.
desc: "Address of membership contract on an Ethereum testnet", desc: "Address of membership contract on an Ethereum testnet.",
defaultValue: "", defaultValue: "",
name: "rln-relay-eth-contract-address" name: "rln-relay-eth-contract-address"
.}: string .}: string
@ -100,6 +100,7 @@ type WakuNodeConf* = object
name: "rln-relay-eth-private-key" name: "rln-relay-eth-private-key"
.}: string .}: string
# TODO: Remove "Default is" when it's already visible on the CLI
rlnRelayUserMessageLimit* {. rlnRelayUserMessageLimit* {.
desc: desc:
"Set a user message limit for the rln membership registration. Must be a positive integer. Default is 1.", "Set a user message limit for the rln membership registration. Must be a positive integer. Default is 1.",
@ -145,6 +146,13 @@ type WakuNodeConf* = object
.}: seq[ProtectedShard] .}: seq[ProtectedShard]
## General node config ## General node config
preset* {.
desc:
"Network preset to use. 'twn' is The RLN-protected Waku Network (cluster 1).",
defaultValue: "",
name: "preset"
.}: string
clusterId* {. clusterId* {.
desc: desc:
"Cluster id that the node is running in. Node in a different cluster id is disconnected.", "Cluster id that the node is running in. Node in a different cluster id is disconnected.",
@ -276,7 +284,7 @@ hence would have reachability issues.""",
.}: bool .}: bool
rlnRelay* {. rlnRelay* {.
desc: "Enable spam protection through rln-relay: true|false", desc: "Enable spam protection through rln-relay: true|false.",
defaultValue: false, defaultValue: false,
name: "rln-relay" name: "rln-relay"
.}: bool .}: bool
@ -287,7 +295,7 @@ hence would have reachability issues.""",
.}: Option[uint] .}: Option[uint]
rlnRelayDynamic* {. rlnRelayDynamic* {.
desc: "Enable waku-rln-relay with on-chain dynamic group management: true|false", desc: "Enable waku-rln-relay with on-chain dynamic group management: true|false.",
defaultValue: false, defaultValue: false,
name: "rln-relay-dynamic" name: "rln-relay-dynamic"
.}: bool .}: bool
@ -311,7 +319,8 @@ hence would have reachability issues.""",
.}: string .}: string
rlnRelayBandwidthThreshold* {. rlnRelayBandwidthThreshold* {.
desc: "Message rate in bytes/sec after which verification of proofs should happen", desc:
"Message rate in bytes/sec after which verification of proofs should happen.",
defaultValue: 0, # to maintain backwards compatibility defaultValue: 0, # to maintain backwards compatibility
name: "rln-relay-bandwidth-threshold" name: "rln-relay-bandwidth-threshold"
.}: int .}: int
@ -327,6 +336,7 @@ hence would have reachability issues.""",
name: "keep-alive" name: "keep-alive"
.}: bool .}: bool
# TODO: This is trying to do too much, this should only be used for autosharding, which itself should be configurable
# If numShardsInNetwork is not set, we use the number of shards configured as numShardsInNetwork # If numShardsInNetwork is not set, we use the number of shards configured as numShardsInNetwork
numShardsInNetwork* {. numShardsInNetwork* {.
desc: "Number of shards in the network", desc: "Number of shards in the network",
@ -590,7 +600,7 @@ with the drawback of consuming some more bandwidth.""",
## Discovery v5 config ## Discovery v5 config
discv5Discovery* {. discv5Discovery* {.
desc: "Enable discovering nodes via Node Discovery v5", desc: "Enable discovering nodes via Node Discovery v5.",
defaultValue: false, defaultValue: false,
name: "discv5-discovery" name: "discv5-discovery"
.}: bool .}: bool

View File

@ -4,7 +4,7 @@ import
libp2p/crypto/crypto, libp2p/crypto/crypto,
libp2p/multiaddress, libp2p/multiaddress,
libp2p/nameresolving/dnsresolver, libp2p/nameresolving/dnsresolver,
std/[options, sequtils, net], std/[options, sequtils, strutils, net],
results results
import import
./external_config, ./external_config,
@ -12,7 +12,8 @@ import
../node/config, ../node/config,
../waku_enr/capabilities, ../waku_enr/capabilities,
../waku_enr, ../waku_enr,
../waku_core ../waku_core,
./networks_config
proc enrConfiguration*( proc enrConfiguration*(
conf: WakuNodeConf, netConfig: NetConfig, key: crypto.PrivateKey conf: WakuNodeConf, netConfig: NetConfig, key: crypto.PrivateKey
@ -157,3 +158,71 @@ proc networkConfiguration*(conf: WakuNodeConf, clientId: string): NetConfigResul
) )
return netConfigRes return netConfigRes
proc applyPresetConfiguration*(srcConf: WakuNodeConf): Result[WakuNodeConf, string] =
var resConf = srcConf
if resConf.clusterId == 1:
warn(
"TWN - The Waku Network configuration will not be applied when `--cluster-id=1` is passed in future releases. Use `--preset=twn` instead."
)
resConf.preset = "twn"
case toLowerAscii(resConf.preset)
of "twn":
let twnClusterConf = ClusterConf.TheWakuNetworkConf()
# Override configuration
resConf.maxMessageSize = twnClusterConf.maxMessageSize
resConf.clusterId = twnClusterConf.clusterId
resConf.rlnRelay = twnClusterConf.rlnRelay
resConf.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress
resConf.rlnRelayChainId = twnClusterConf.rlnRelayChainId
resConf.rlnRelayDynamic = twnClusterConf.rlnRelayDynamic
resConf.rlnRelayBandwidthThreshold = twnClusterConf.rlnRelayBandwidthThreshold
resConf.discv5Discovery = twnClusterConf.discv5Discovery
resConf.discv5BootstrapNodes =
resConf.discv5BootstrapNodes & twnClusterConf.discv5BootstrapNodes
resConf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec
resConf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit
resConf.numShardsInNetwork = twnClusterConf.numShardsInNetwork
if resConf.relay:
resConf.rlnRelay = twnClusterConf.rlnRelay
else:
discard
return ok(resConf)
# TODO: numShardsInNetwork should be mandatory with autosharding, and unneeded otherwise
proc getNumShardsInNetwork*(conf: WakuNodeConf): uint32 =
if conf.numShardsInNetwork != 0:
return conf.numShardsInNetwork
# If conf.numShardsInNetwork is not set, use 1024 - the maximum possible as per the static sharding spec
# https://github.com/waku-org/specs/blob/master/standards/core/relay-sharding.md#static-sharding
return uint32(MaxShardIndex + 1)
proc validateShards*(conf: WakuNodeConf): Result[void, string] =
let numShardsInNetwork = getNumShardsInNetwork(conf)
for shard in conf.shards:
if shard >= numShardsInNetwork:
let msg =
"validateShards invalid shard: " & $shard & " when numShardsInNetwork: " &
$numShardsInNetwork # fmt doesn't work
error "validateShards failed", error = msg
return err(msg)
return ok()
proc getNodeKey*(
conf: WakuNodeConf, rng: ref HmacDrbgContext = crypto.newRng()
): Result[PrivateKey, string] =
if conf.nodekey.isSome():
return ok(conf.nodekey.get())
warn "missing node key, generating new set"
let key = crypto.PrivateKey.random(Secp256k1, rng[]).valueOr:
error "Failed to generate key", error = error
return err("Failed to generate key: " & $error)
return ok(key)

View File

@ -30,6 +30,7 @@ proc TheWakuNetworkConf*(T: type ClusterConf): ClusterConf =
rlnRelayUserMessageLimit: 20, rlnRelayUserMessageLimit: 20,
numShardsInNetwork: 8, numShardsInNetwork: 8,
discv5Discovery: true, discv5Discovery: true,
# TODO: Why is this part of the conf? eg an edge node would not have this
discv5BootstrapNodes: discv5BootstrapNodes:
@[ @[
"enr:-QESuED0qW1BCmF-oH_ARGPr97Nv767bl_43uoy70vrbah3EaCAdK3Q0iRQ6wkSTTpdrg_dU_NC2ydO8leSlRpBX4pxiAYJpZIJ2NIJpcIRA4VDAim11bHRpYWRkcnO4XAArNiZub2RlLTAxLmRvLWFtczMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQZ2XwAtNiZub2RlLTAxLmRvLWFtczMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQYfQN4DgnJzkwABCAAAAAEAAgADAAQABQAGAAeJc2VjcDI1NmsxoQOTd-h5owwj-cx7xrmbvQKU8CV3Fomfdvcv1MBc-67T5oN0Y3CCdl-DdWRwgiMohXdha3UyDw", "enr:-QESuED0qW1BCmF-oH_ARGPr97Nv767bl_43uoy70vrbah3EaCAdK3Q0iRQ6wkSTTpdrg_dU_NC2ydO8leSlRpBX4pxiAYJpZIJ2NIJpcIRA4VDAim11bHRpYWRkcnO4XAArNiZub2RlLTAxLmRvLWFtczMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQZ2XwAtNiZub2RlLTAxLmRvLWFtczMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQYfQN4DgnJzkwABCAAAAAEAAgADAAQABQAGAAeJc2VjcDI1NmsxoQOTd-h5owwj-cx7xrmbvQKU8CV3Fomfdvcv1MBc-67T5oN0Y3CCdl-DdWRwgiMohXdha3UyDw",

View File

@ -137,13 +137,6 @@ proc initNode(
## Mount protocols ## Mount protocols
proc getNumShardsInNetwork*(conf: WakuNodeConf): uint32 =
if conf.numShardsInNetwork != 0:
return conf.numShardsInNetwork
# If conf.numShardsInNetwork is not set, use 1024 - the maximum possible as per the static sharding spec
# https://github.com/waku-org/specs/blob/master/standards/core/relay-sharding.md#static-sharding
return uint32(MaxShardIndex + 1)
proc getAutoshards*( proc getAutoshards*(
node: WakuNode, contentTopics: seq[string] node: WakuNode, contentTopics: seq[string]
): Result[seq[RelayShard], string] = ): Result[seq[RelayShard], string] =
@ -265,6 +258,7 @@ proc setupProtocols(
if conf.numShardsInNetwork == 0: if conf.numShardsInNetwork == 0:
warn "Number of shards in network not configured, setting it to", warn "Number of shards in network not configured, setting it to",
# TODO: If not configured, it mounts 1024 shards! Make it a mandatory configuration instead
numShardsInNetwork = $numShardsInNetwork numShardsInNetwork = $numShardsInNetwork
node.mountSharding(conf.clusterId, numShardsInNetwork).isOkOr: node.mountSharding(conf.clusterId, numShardsInNetwork).isOkOr:

View File

@ -99,19 +99,6 @@ proc logConfig(conf: WakuNodeConf) =
func version*(waku: Waku): string = func version*(waku: Waku): string =
waku.version waku.version
proc validateShards(conf: WakuNodeConf): Result[void, string] =
let numShardsInNetwork = getNumShardsInNetwork(conf)
for shard in conf.shards:
if shard >= numShardsInNetwork:
let msg =
"validateShards invalid shard: " & $shard & " when numShardsInNetwork: " &
$numShardsInNetwork # fmt doesn't work
error "validateShards failed", error = msg
return err(msg)
return ok()
proc setupSwitchServices( proc setupSwitchServices(
waku: Waku, conf: WakuNodeConf, circuitRelay: Relay, rng: ref HmacDrbgContext waku: Waku, conf: WakuNodeConf, circuitRelay: Relay, rng: ref HmacDrbgContext
) = ) =
@ -214,46 +201,28 @@ proc new*(
shards.add(shard) shards.add(shard)
confCopy.shards = shards confCopy.shards = shards
case confCopy.clusterId # Why can't I replace this block with a concise `.valueOr`?
confCopy = block:
let res = applyPresetConfiguration(confCopy)
if res.isErr():
error "Failed to complete the config", error = res.error
return err("Failed to complete the config:" & $res.error)
res.get()
# cluster-id=1 (aka The Waku Network) logConfig(confCopy)
of 1:
let twnClusterConf = ClusterConf.TheWakuNetworkConf()
# Override configuration
confCopy.maxMessageSize = twnClusterConf.maxMessageSize
confCopy.clusterId = twnClusterConf.clusterId
confCopy.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress
confCopy.rlnRelayChainId = twnClusterConf.rlnRelayChainId
confCopy.rlnRelayDynamic = twnClusterConf.rlnRelayDynamic
confCopy.rlnRelayBandwidthThreshold = twnClusterConf.rlnRelayBandwidthThreshold
confCopy.discv5Discovery = twnClusterConf.discv5Discovery
confCopy.discv5BootstrapNodes =
confCopy.discv5BootstrapNodes & twnClusterConf.discv5BootstrapNodes
confCopy.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec
confCopy.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit
confCopy.numShardsInNetwork = twnClusterConf.numShardsInNetwork
# Only set rlnRelay to true if relay is configured
if confCopy.relay:
confCopy.rlnRelay = twnClusterConf.rlnRelay
else:
discard
info "Running nwaku node", version = git_version info "Running nwaku node", version = git_version
logConfig(confCopy)
let validateShardsRes = validateShards(confCopy) let validateShardsRes = validateShards(confCopy)
if validateShardsRes.isErr(): if validateShardsRes.isErr():
error "Failed validating shards", error = $validateShardsRes.error error "Failed validating shards", error = $validateShardsRes.error
return err("Failed validating shards: " & $validateShardsRes.error) return err("Failed validating shards: " & $validateShardsRes.error)
if not confCopy.nodekey.isSome(): let keyRes = getNodeKey(confCopy, rng)
let keyRes = crypto.PrivateKey.random(Secp256k1, rng[]) if keyRes.isErr():
if keyRes.isErr(): error "Failed to generate key", error = $keyRes.error
error "Failed to generate key", error = $keyRes.error return err("Failed to generate key: " & $keyRes.error)
return err("Failed to generate key: " & $keyRes.error) confCopy.nodeKey = some(keyRes.get())
confCopy.nodekey = some(keyRes.get())
var relay = newCircuitRelay(confCopy.isRelayClient) var relay = newCircuitRelay(confCopy.isRelayClient)
@ -284,6 +253,7 @@ proc new*(
var waku = Waku( var waku = Waku(
version: git_version, version: git_version,
# TODO: WakuNodeConf is re-used for too many context, `conf` here should be a dedicated subtype
conf: confCopy, conf: confCopy,
rng: rng, rng: rng,
key: confCopy.nodekey.get(), key: confCopy.nodekey.get(),

View File

@ -138,31 +138,53 @@ proc toMerkleNode*(uint256Value: UInt256): MerkleNode =
return merkleNode return merkleNode
proc updateRoots*(g: OnchainGroupManager, root: MerkleNode): bool = proc updateRoots*(g: OnchainGroupManager): Future[bool] {.async.} =
if g.validRoots.len > 0 and g.validRoots[^1] == root:
return false
let overflowCount = g.validRoots.len - AcceptableRootWindowSize + 1
if overflowCount > 0:
for i in 0 ..< overflowCount:
discard g.validRoots.popFirst()
g.validRoots.addLast(root)
return true
proc slideRootQueue*(g: OnchainGroupManager) {.async.} =
let rootRes = await g.fetchMerkleRoot() let rootRes = await g.fetchMerkleRoot()
if rootRes.isErr(): if rootRes.isErr():
raise newException(ValueError, "failed to get merkle root: " & rootRes.error) return false
let merkleRoot = toMerkleNode(rootRes.get()) let merkleRoot = toMerkleNode(rootRes.get())
if g.validRoots.len > 0 and g.validRoots[g.validRoots.len - 1] != merkleRoot:
let overflowCount = g.validRoots.len - AcceptableRootWindowSize + 1
if overflowCount > 0:
for i in 0 ..< overflowCount:
discard g.validRoots.popFirst()
let overflowCount = g.validRoots.len - AcceptableRootWindowSize + 1 g.validRoots.addLast(merkleRoot)
if overflowCount > 0: debug "~~~~~~~~~~~~~ Detected new Merkle root ~~~~~~~~~~~~~~~~",
for i in 0 ..< overflowCount: root = merkleRoot.toHex, totalRoots = g.validRoots.len
discard g.validRoots.popFirst() return true
else:
debug "~~~~~~~~~~~~~ No new Merkle root ~~~~~~~~~~~~~~~~",
root = merkleRoot.toHex, totalRoots = g.validRoots.len
return false
proc trackRootChanges*(g: OnchainGroupManager): Future[void] {.async.} =
## Continuously track changes to the Merkle root
initializedGuard(g)
let ethRpc = g.ethRpc.get()
let wakuRlnContract = g.wakuRlnContract.get()
# Set up the polling interval - more frequent to catch roots
const rpcDelay = 5.seconds
info "Starting to track Merkle root changes"
while true:
debug "starting to update roots"
let rootUpdated = await g.updateRoots()
if rootUpdated:
let proofResult = await g.fetchMerkleProofElements()
if proofResult.isErr():
error "Failed to fetch Merkle proof", error = proofResult.error
g.merkleProofCache = proofResult.get()
debug "sleeping for 5 seconds"
await sleepAsync(rpcDelay)
g.validRoots.addLast(merkleRoot)
method atomicBatch*( method atomicBatch*(
g: OnchainGroupManager, g: OnchainGroupManager,
@ -182,16 +204,7 @@ method atomicBatch*(
membersSeq.add(member) membersSeq.add(member)
await g.registerCb.get()(membersSeq) await g.registerCb.get()(membersSeq)
let rootRes = await g.fetchMerkleRoot() discard await g.updateRoots()
if rootRes.isErr():
raise newException(ValueError, "failed to get merkle root: " & rootRes.error)
let merkleRoot = toMerkleNode(rootRes.get())
let rootUpdated = g.updateRoots(merkleRoot)
if rootUpdated:
info "Detected new Merkle root",
root = merkleRoot.toHex, totalRoots = g.validRoots.len
method register*( method register*(
g: OnchainGroupManager, rateCommitment: RateCommitment g: OnchainGroupManager, rateCommitment: RateCommitment
@ -404,43 +417,6 @@ method onRegister*(g: OnchainGroupManager, cb: OnRegisterCallback) {.gcsafe.} =
method onWithdraw*(g: OnchainGroupManager, cb: OnWithdrawCallback) {.gcsafe.} = method onWithdraw*(g: OnchainGroupManager, cb: OnWithdrawCallback) {.gcsafe.} =
g.withdrawCb = some(cb) g.withdrawCb = some(cb)
proc trackRootChanges*(g: OnchainGroupManager): Future[void] {.async.} =
## Continuously track changes to the Merkle root
initializedGuard(g)
let ethRpc = g.ethRpc.get()
let wakuRlnContract = g.wakuRlnContract.get()
# Set up the polling interval - more frequent to catch roots
const rpcDelay = 1.seconds
info "Starting to track Merkle root changes"
while true:
try:
let rootRes = await g.fetchMerkleRoot()
if rootRes.isErr():
raise newException(ValueError, "failed to get merkle root: " & rootRes.error)
continue
let merkleRoot = toMerkleNode(rootRes.get())
let rootUpdated = g.updateRoots(merkleRoot)
if rootUpdated:
info "Detected new Merkle root",
root = merkleRoot.toHex, totalRoots = g.validRoots.len
let proofResult = await g.fetchMerkleProofElements()
if proofResult.isErr():
error "Failed to fetch Merkle proof", error = proofResult.error
g.merkleProofCache = proofResult.get()
await sleepAsync(rpcDelay)
except CatchableError as e:
error "Error while tracking Merkle root", error = e.msg
await sleepAsync(rpcDelay)
method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.} = method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.} =
# check if the Ethereum client is reachable # check if the Ethereum client is reachable
var ethRpc: Web3 var ethRpc: Web3
@ -540,7 +516,6 @@ method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.}
waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet())) waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet()))
g.initialized = true g.initialized = true
return ok() return ok()
method stop*(g: OnchainGroupManager): Future[void] {.async, gcsafe.} = method stop*(g: OnchainGroupManager): Future[void] {.async, gcsafe.} =

View File

@ -479,6 +479,18 @@ proc mount(
# Start epoch monitoring in the background # Start epoch monitoring in the background
wakuRlnRelay.epochMonitorFuture = monitorEpochs(wakuRlnRelay) wakuRlnRelay.epochMonitorFuture = monitorEpochs(wakuRlnRelay)
# Start tracking root changes after successful initialization and registration
debug "~~~~~~~~~~~~~ Starting root tracking ~~~~~~~~~~~~~~~~"
debug "~~~~~~~~~~~~~ groupManager ~~~~~~~~~~~~~~~~"
await sleepAsync(5 * 60 * 1000)
if conf.rlnRelayDynamic:
debug "~~~~~~~~~~~~~ groupManager is dynamic ~~~~~~~~~~~~~~~~"
let onchainGroupManager = cast[OnchainGroupManager](groupManager)
asyncSpawn onchainGroupManager.trackRootChanges()
await sleepAsync(5 * 60 * 1000)
return ok(wakuRlnRelay) return ok(wakuRlnRelay)
proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async: (raises: [Exception]).} = proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async: (raises: [Exception]).} =