diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index 39ac57caf..3e847ae86 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -30,7 +30,7 @@ import const os* {.strdefine.} = "" when os == "Linux" and # GitHub only supports container actions on Linux - # and we need to start a postgress database in a docker container + # and we need to start a postgres database in a docker container defined(postgres): import ./waku_archive/test_driver_postgres_query, @@ -106,3 +106,4 @@ import import ./waku_rln_relay/test_all # Node Factory +import ./factory/test_config diff --git a/tests/factory/test_config.nim b/tests/factory/test_config.nim new file mode 100644 index 000000000..4a1f6d78f --- /dev/null +++ b/tests/factory/test_config.nim @@ -0,0 +1,157 @@ +{.used.} + +import + std/options, + testutils/unittests, + chronos, + libp2p/crypto/[crypto, secp], + libp2p/multiaddress, + nimcrypto/utils, + secp256k1, + confutils +import + ../../waku/factory/external_config, + ../../waku/factory/internal_config, + ../../waku/factory/networks_config, + ../../waku/common/logging + +suite "Waku config - apply preset": + test "Default preset is TWN": + ## Setup + let expectedConf = ClusterConf.TheWakuNetworkConf() + + ## Given + let preConfig = WakuNodeConf(cmd: noCommand, preset: "default") + + ## When + let res = applyPresetConfiguration(preConfig) + assert res.isOk(), $res.error + + ## Then + let conf = res.get() + assert conf.maxMessageSize == expectedConf.maxMessageSize + assert conf.clusterId == expectedConf.clusterId + assert conf.rlnRelay == expectedConf.rlnRelay + assert conf.rlnRelayEthContractAddress == expectedConf.rlnRelayEthContractAddress + assert conf.rlnRelayDynamic == expectedConf.rlnRelayDynamic + assert conf.rlnRelayChainId == expectedConf.rlnRelayChainId + assert conf.rlnRelayBandwidthThreshold == expectedConf.rlnRelayBandwidthThreshold + assert conf.rlnEpochSizeSec == expectedConf.rlnEpochSizeSec + assert conf.rlnRelayUserMessageLimit == expectedConf.rlnRelayUserMessageLimit + assert conf.numShardsInNetwork == expectedConf.numShardsInNetwork + assert conf.discv5BootstrapNodes == expectedConf.discv5BootstrapNodes + + test "Subscribes to all valid shards in default network": + ## Setup + let expectedConf = ClusterConf.TheWakuNetworkConf() + + ## Given + let shards: seq[uint16] = @[0, 1, 2, 3, 4, 5, 6, 7] + let preConfig = WakuNodeConf(cmd: noCommand, preset: "default", shards: shards) + + ## When + let res = applyPresetConfiguration(preConfig) + assert res.isOk(), $res.error + + ## Then + let conf = res.get() + assert conf.shards.len == expectedConf.numShardsInNetwork.int + + test "Subscribes to some valid shards in default network": + ## Setup + let expectedConf = ClusterConf.TheWakuNetworkConf() + + ## Given + let shards: seq[uint16] = @[0, 4, 7] + let preConfig = WakuNodeConf(cmd: noCommand, preset: "default", shards: shards) + + ## When + let resConf = applyPresetConfiguration(preConfig) + let res = validateShards(resConf.get()) + assert res.isOk(), $res.error + + ## Then + let conf = resConf.get() + assert conf.shards.len() == shards.len() + for index, shard in shards: + assert shard in conf.shards + + test "Subscribes to invalid shards in default network": + ## Setup + + ## Given + let shards: seq[uint16] = @[0, 4, 7, 10] + let preConfig = WakuNodeConf(cmd: noCommand, preset: "default", shards: shards) + let postConfig = applyPresetConfiguration(preConfig) + + ## When + let res = validateShards(postConfig.get()) + + ## Then + assert res.isErr(), "Invalid shard was accepted" + +suite "Waku config - node key": + test "Passed node key is used": + ## Setup + let nodeKeyStr = + "0011223344556677889900aabbccddeeff0011223344556677889900aabbccddeeff" + let nodekey = block: + let key = SkPrivateKey.init(utils.fromHex(nodeKeyStr)).tryGet() + crypto.PrivateKey(scheme: Secp256k1, skkey: key) + + ## Given + let config = WakuNodeConf.load(version = "", cmdLine = @["--nodekey=" & nodeKeyStr]) + + ## When + let res = getNodeKey(config) + assert res.isOk(), $res.error + + ## Then + let resKey = res.get() + assert utils.toHex(resKey.getRawBytes().get()) == + utils.toHex(nodekey.getRawBytes().get()) + +suite "Waku config - Shards": + test "Shards are valid": + ## Setup + + ## Given + let shards: seq[uint16] = @[0, 2, 4] + let numShardsInNetwork = 5.uint32 + let config = WakuNodeConf( + cmd: noCommand, shards: shards, numShardsInNetwork: numShardsInNetwork + ) + + ## When + let res = validateShards(config) + + ## Then + assert res.isOk(), $res.error + + test "Shards are not in range": + ## Setup + + ## Given + let shards: seq[uint16] = @[0, 2, 5] + let numShardsInNetwork = 5.uint32 + let config = WakuNodeConf( + cmd: noCommand, shards: shards, numShardsInNetwork: numShardsInNetwork + ) + + ## When + let res = validateShards(config) + + ## Then + assert res.isErr(), "Invalid shard was accepted" + + test "Shard is passed without num shards": + ## Setup + + ## Given + let config = WakuNodeConf.load(version = "", cmdLine = @["--shard=32"]) + + ## When + let res = validateShards(config) + + ## Then + assert res.isOk(), $res.error diff --git a/tools/rln_keystore_generator/rln_keystore_generator.nim b/tools/rln_keystore_generator/rln_keystore_generator.nim index 1bde9ae01..0d301dd58 100644 --- a/tools/rln_keystore_generator/rln_keystore_generator.nim +++ b/tools/rln_keystore_generator/rln_keystore_generator.nim @@ -55,6 +55,8 @@ proc doRlnKeystoreGenerator*(conf: WakuNodeConf) = quit(QuitFailure) # 4. initialize OnchainGroupManager + info "attempting to initialize OnchainGroupManager" + let groupManager = OnchainGroupManager( ethClientUrl: string(conf.rlnRelayethClientAddress), chainId: conf.rlnRelayChainId, diff --git a/waku.nimble b/waku.nimble index 6cf804098..9c0e819fb 100644 --- a/waku.nimble +++ b/waku.nimble @@ -163,14 +163,28 @@ task libwakuStatic, "Build the cbindings waku node library": let name = "libwaku" buildLibrary name, "library/", - """-d:chronicles_line_numbers -d:chronicles_runtime_filtering=on -d:chronicles_sinks="textlines,json" -d:chronicles_default_output_device=Dynamic -d:chronicles_disabled_topics="eth,dnsdisc.client" --warning:Deprecated:off --warning:UnusedImport:on """, + """-d:chronicles_line_numbers \ + -d:chronicles_runtime_filtering=on \ + -d:chronicles_sinks="textlines,json" \ + -d:chronicles_default_output_device=Dynamic \ + -d:chronicles_disabled_topics="eth,dnsdisc.client" \ + --warning:Deprecated:off \ + --warning:UnusedImport:on \ + -d:chronicles_log_level=TRACE """, "static" task libwakuDynamic, "Build the cbindings waku node library": let name = "libwaku" buildLibrary name, "library/", - """-d:chronicles_line_numbers -d:chronicles_runtime_filtering=on -d:chronicles_sinks="textlines,json" -d:chronicles_default_output_device=Dynamic -d:chronicles_disabled_topics="eth,dnsdisc.client" --warning:Deprecated:off --warning:UnusedImport:on """, + """-d:chronicles_line_numbers \ + -d:chronicles_runtime_filtering=on \ + -d:chronicles_sinks="textlines,json" \ + -d:chronicles_default_output_device=Dynamic \ + -d:chronicles_disabled_topics="eth,dnsdisc.client" \ + --warning:Deprecated:off \ + --warning:UnusedImport:on \ + -d:chronicles_log_level=TRACE """, "dynamic" ### Mobile Android diff --git a/waku/common/rate_limit/request_limiter.nim b/waku/common/rate_limit/request_limiter.nim index 70fcc4905..7f33d0348 100644 --- a/waku/common/rate_limit/request_limiter.nim +++ b/waku/common/rate_limit/request_limiter.nim @@ -22,6 +22,8 @@ import libp2p/stream/connection, libp2p/utility +import std/times except TimeInterval, Duration, seconds, minutes + import ./[single_token_limiter, service_metrics, timed_map] export token_bucket, setting, service_metrics @@ -76,8 +78,15 @@ template checkUsageLimit*( bodyWithinLimit, bodyRejected: untyped, ) = if t.checkUsage(proto, conn): + let requestStartTime = getTime().toUnixFloat() waku_service_requests.inc(labelValues = [proto, "served"]) + bodyWithinLimit + + let requestDurationSec = getTime().toUnixFloat() - requestStartTime + waku_service_request_handling_duration_seconds.observe( + requestDurationSec, labelValues = [proto] + ) else: waku_service_requests.inc(labelValues = [proto, "rejected"]) bodyRejected diff --git a/waku/common/rate_limit/service_metrics.nim b/waku/common/rate_limit/service_metrics.nim index 339bf7f38..7d24d9530 100644 --- a/waku/common/rate_limit/service_metrics.nim +++ b/waku/common/rate_limit/service_metrics.nim @@ -17,3 +17,6 @@ proc setServiceLimitMetric*(service: string, limit: Option[RateLimitSetting]) = waku_service_requests_limit.set( limit.get().calculateLimitPerSecond(), labelValues = [service] ) + +declarePublicHistogram waku_service_request_handling_duration_seconds, + "duration of non-relay service handling", ["service"] diff --git a/waku/common/rate_limit/single_token_limiter.nim b/waku/common/rate_limit/single_token_limiter.nim index 1b62114bf..da01f61bb 100644 --- a/waku/common/rate_limit/single_token_limiter.nim +++ b/waku/common/rate_limit/single_token_limiter.nim @@ -4,6 +4,8 @@ import std/[options], chronos/timer, libp2p/stream/connection, libp2p/utility +import std/times except TimeInterval, Duration + import ./[token_bucket, setting, service_metrics] export token_bucket, setting, service_metrics @@ -43,8 +45,15 @@ template checkUsageLimit*( bodyWithinLimit, bodyRejected: untyped, ) = if t.checkUsage(proto): + let requestStartTime = getTime().toUnixFloat() waku_service_requests.inc(labelValues = [proto, "served"]) + bodyWithinLimit + + let requestDurationSec = getTime().toUnixFloat() - requestStartTime + waku_service_request_handling_duration_seconds.observe( + requestDurationSec, labelValues = [proto] + ) else: waku_service_requests.inc(labelValues = [proto, "rejected"]) bodyRejected diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 9bc073426..100d1b644 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -76,7 +76,7 @@ type WakuNodeConf* = object .}: EthRpcUrl rlnRelayEthContractAddress* {. - desc: "Address of membership contract on an Ethereum testnet", + desc: "Address of membership contract on an Ethereum testnet.", defaultValue: "", name: "rln-relay-eth-contract-address" .}: string @@ -100,6 +100,7 @@ type WakuNodeConf* = object name: "rln-relay-eth-private-key" .}: string + # TODO: Remove "Default is" when it's already visible on the CLI rlnRelayUserMessageLimit* {. desc: "Set a user message limit for the rln membership registration. Must be a positive integer. Default is 1.", @@ -145,6 +146,13 @@ type WakuNodeConf* = object .}: seq[ProtectedShard] ## General node config + preset* {. + desc: + "Network preset to use. 'twn' is The RLN-protected Waku Network (cluster 1).", + defaultValue: "", + name: "preset" + .}: string + clusterId* {. desc: "Cluster id that the node is running in. Node in a different cluster id is disconnected.", @@ -276,7 +284,7 @@ hence would have reachability issues.""", .}: bool rlnRelay* {. - desc: "Enable spam protection through rln-relay: true|false", + desc: "Enable spam protection through rln-relay: true|false.", defaultValue: false, name: "rln-relay" .}: bool @@ -287,7 +295,7 @@ hence would have reachability issues.""", .}: Option[uint] rlnRelayDynamic* {. - desc: "Enable waku-rln-relay with on-chain dynamic group management: true|false", + desc: "Enable waku-rln-relay with on-chain dynamic group management: true|false.", defaultValue: false, name: "rln-relay-dynamic" .}: bool @@ -311,7 +319,8 @@ hence would have reachability issues.""", .}: string rlnRelayBandwidthThreshold* {. - desc: "Message rate in bytes/sec after which verification of proofs should happen", + desc: + "Message rate in bytes/sec after which verification of proofs should happen.", defaultValue: 0, # to maintain backwards compatibility name: "rln-relay-bandwidth-threshold" .}: int @@ -327,6 +336,7 @@ hence would have reachability issues.""", name: "keep-alive" .}: bool + # TODO: This is trying to do too much, this should only be used for autosharding, which itself should be configurable # If numShardsInNetwork is not set, we use the number of shards configured as numShardsInNetwork numShardsInNetwork* {. desc: "Number of shards in the network", @@ -590,7 +600,7 @@ with the drawback of consuming some more bandwidth.""", ## Discovery v5 config discv5Discovery* {. - desc: "Enable discovering nodes via Node Discovery v5", + desc: "Enable discovering nodes via Node Discovery v5.", defaultValue: false, name: "discv5-discovery" .}: bool diff --git a/waku/factory/internal_config.nim b/waku/factory/internal_config.nim index 064a03acd..08f11f1c5 100644 --- a/waku/factory/internal_config.nim +++ b/waku/factory/internal_config.nim @@ -4,7 +4,7 @@ import libp2p/crypto/crypto, libp2p/multiaddress, libp2p/nameresolving/dnsresolver, - std/[options, sequtils, net], + std/[options, sequtils, strutils, net], results import ./external_config, @@ -12,7 +12,8 @@ import ../node/config, ../waku_enr/capabilities, ../waku_enr, - ../waku_core + ../waku_core, + ./networks_config proc enrConfiguration*( conf: WakuNodeConf, netConfig: NetConfig, key: crypto.PrivateKey @@ -157,3 +158,71 @@ proc networkConfiguration*(conf: WakuNodeConf, clientId: string): NetConfigResul ) return netConfigRes + +proc applyPresetConfiguration*(srcConf: WakuNodeConf): Result[WakuNodeConf, string] = + var resConf = srcConf + + if resConf.clusterId == 1: + warn( + "TWN - The Waku Network configuration will not be applied when `--cluster-id=1` is passed in future releases. Use `--preset=twn` instead." + ) + resConf.preset = "twn" + + case toLowerAscii(resConf.preset) + of "twn": + let twnClusterConf = ClusterConf.TheWakuNetworkConf() + + # Override configuration + resConf.maxMessageSize = twnClusterConf.maxMessageSize + resConf.clusterId = twnClusterConf.clusterId + resConf.rlnRelay = twnClusterConf.rlnRelay + resConf.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress + resConf.rlnRelayChainId = twnClusterConf.rlnRelayChainId + resConf.rlnRelayDynamic = twnClusterConf.rlnRelayDynamic + resConf.rlnRelayBandwidthThreshold = twnClusterConf.rlnRelayBandwidthThreshold + resConf.discv5Discovery = twnClusterConf.discv5Discovery + resConf.discv5BootstrapNodes = + resConf.discv5BootstrapNodes & twnClusterConf.discv5BootstrapNodes + resConf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec + resConf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit + resConf.numShardsInNetwork = twnClusterConf.numShardsInNetwork + + if resConf.relay: + resConf.rlnRelay = twnClusterConf.rlnRelay + else: + discard + + return ok(resConf) + +# TODO: numShardsInNetwork should be mandatory with autosharding, and unneeded otherwise +proc getNumShardsInNetwork*(conf: WakuNodeConf): uint32 = + if conf.numShardsInNetwork != 0: + return conf.numShardsInNetwork + # If conf.numShardsInNetwork is not set, use 1024 - the maximum possible as per the static sharding spec + # https://github.com/waku-org/specs/blob/master/standards/core/relay-sharding.md#static-sharding + return uint32(MaxShardIndex + 1) + +proc validateShards*(conf: WakuNodeConf): Result[void, string] = + let numShardsInNetwork = getNumShardsInNetwork(conf) + + for shard in conf.shards: + if shard >= numShardsInNetwork: + let msg = + "validateShards invalid shard: " & $shard & " when numShardsInNetwork: " & + $numShardsInNetwork # fmt doesn't work + error "validateShards failed", error = msg + return err(msg) + + return ok() + +proc getNodeKey*( + conf: WakuNodeConf, rng: ref HmacDrbgContext = crypto.newRng() +): Result[PrivateKey, string] = + if conf.nodekey.isSome(): + return ok(conf.nodekey.get()) + + warn "missing node key, generating new set" + let key = crypto.PrivateKey.random(Secp256k1, rng[]).valueOr: + error "Failed to generate key", error = error + return err("Failed to generate key: " & $error) + return ok(key) diff --git a/waku/factory/networks_config.nim b/waku/factory/networks_config.nim index 681a6f68d..03890de55 100644 --- a/waku/factory/networks_config.nim +++ b/waku/factory/networks_config.nim @@ -30,6 +30,7 @@ proc TheWakuNetworkConf*(T: type ClusterConf): ClusterConf = rlnRelayUserMessageLimit: 20, numShardsInNetwork: 8, discv5Discovery: true, + # TODO: Why is this part of the conf? eg an edge node would not have this discv5BootstrapNodes: @[ "enr:-QESuED0qW1BCmF-oH_ARGPr97Nv767bl_43uoy70vrbah3EaCAdK3Q0iRQ6wkSTTpdrg_dU_NC2ydO8leSlRpBX4pxiAYJpZIJ2NIJpcIRA4VDAim11bHRpYWRkcnO4XAArNiZub2RlLTAxLmRvLWFtczMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQZ2XwAtNiZub2RlLTAxLmRvLWFtczMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQYfQN4DgnJzkwABCAAAAAEAAgADAAQABQAGAAeJc2VjcDI1NmsxoQOTd-h5owwj-cx7xrmbvQKU8CV3Fomfdvcv1MBc-67T5oN0Y3CCdl-DdWRwgiMohXdha3UyDw", diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index d2d6b1d99..3142ff766 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -137,13 +137,6 @@ proc initNode( ## Mount protocols -proc getNumShardsInNetwork*(conf: WakuNodeConf): uint32 = - if conf.numShardsInNetwork != 0: - return conf.numShardsInNetwork - # If conf.numShardsInNetwork is not set, use 1024 - the maximum possible as per the static sharding spec - # https://github.com/waku-org/specs/blob/master/standards/core/relay-sharding.md#static-sharding - return uint32(MaxShardIndex + 1) - proc getAutoshards*( node: WakuNode, contentTopics: seq[string] ): Result[seq[RelayShard], string] = @@ -265,6 +258,7 @@ proc setupProtocols( if conf.numShardsInNetwork == 0: warn "Number of shards in network not configured, setting it to", + # TODO: If not configured, it mounts 1024 shards! Make it a mandatory configuration instead numShardsInNetwork = $numShardsInNetwork node.mountSharding(conf.clusterId, numShardsInNetwork).isOkOr: diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 546253176..854df8dde 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -99,19 +99,6 @@ proc logConfig(conf: WakuNodeConf) = func version*(waku: Waku): string = waku.version -proc validateShards(conf: WakuNodeConf): Result[void, string] = - let numShardsInNetwork = getNumShardsInNetwork(conf) - - for shard in conf.shards: - if shard >= numShardsInNetwork: - let msg = - "validateShards invalid shard: " & $shard & " when numShardsInNetwork: " & - $numShardsInNetwork # fmt doesn't work - error "validateShards failed", error = msg - return err(msg) - - return ok() - proc setupSwitchServices( waku: Waku, conf: WakuNodeConf, circuitRelay: Relay, rng: ref HmacDrbgContext ) = @@ -214,46 +201,28 @@ proc new*( shards.add(shard) confCopy.shards = shards - case confCopy.clusterId + # Why can't I replace this block with a concise `.valueOr`? + confCopy = block: + let res = applyPresetConfiguration(confCopy) + if res.isErr(): + error "Failed to complete the config", error = res.error + return err("Failed to complete the config:" & $res.error) + res.get() - # cluster-id=1 (aka The Waku Network) - of 1: - let twnClusterConf = ClusterConf.TheWakuNetworkConf() - - # Override configuration - confCopy.maxMessageSize = twnClusterConf.maxMessageSize - confCopy.clusterId = twnClusterConf.clusterId - confCopy.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress - confCopy.rlnRelayChainId = twnClusterConf.rlnRelayChainId - confCopy.rlnRelayDynamic = twnClusterConf.rlnRelayDynamic - confCopy.rlnRelayBandwidthThreshold = twnClusterConf.rlnRelayBandwidthThreshold - confCopy.discv5Discovery = twnClusterConf.discv5Discovery - confCopy.discv5BootstrapNodes = - confCopy.discv5BootstrapNodes & twnClusterConf.discv5BootstrapNodes - confCopy.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec - confCopy.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit - confCopy.numShardsInNetwork = twnClusterConf.numShardsInNetwork - - # Only set rlnRelay to true if relay is configured - if confCopy.relay: - confCopy.rlnRelay = twnClusterConf.rlnRelay - else: - discard + logConfig(confCopy) info "Running nwaku node", version = git_version - logConfig(confCopy) let validateShardsRes = validateShards(confCopy) if validateShardsRes.isErr(): error "Failed validating shards", error = $validateShardsRes.error return err("Failed validating shards: " & $validateShardsRes.error) - if not confCopy.nodekey.isSome(): - let keyRes = crypto.PrivateKey.random(Secp256k1, rng[]) - if keyRes.isErr(): - error "Failed to generate key", error = $keyRes.error - return err("Failed to generate key: " & $keyRes.error) - confCopy.nodekey = some(keyRes.get()) + let keyRes = getNodeKey(confCopy, rng) + if keyRes.isErr(): + error "Failed to generate key", error = $keyRes.error + return err("Failed to generate key: " & $keyRes.error) + confCopy.nodeKey = some(keyRes.get()) var relay = newCircuitRelay(confCopy.isRelayClient) @@ -284,6 +253,7 @@ proc new*( var waku = Waku( version: git_version, + # TODO: WakuNodeConf is re-used for too many context, `conf` here should be a dedicated subtype conf: confCopy, rng: rng, key: confCopy.nodekey.get(), diff --git a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim index ec6b048a6..14cf96610 100644 --- a/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim +++ b/waku/waku_rln_relay/group_manager/on_chain/group_manager.nim @@ -138,31 +138,53 @@ proc toMerkleNode*(uint256Value: UInt256): MerkleNode = return merkleNode -proc updateRoots*(g: OnchainGroupManager, root: MerkleNode): bool = - if g.validRoots.len > 0 and g.validRoots[^1] == root: - return false - - let overflowCount = g.validRoots.len - AcceptableRootWindowSize + 1 - if overflowCount > 0: - for i in 0 ..< overflowCount: - discard g.validRoots.popFirst() - - g.validRoots.addLast(root) - return true - -proc slideRootQueue*(g: OnchainGroupManager) {.async.} = +proc updateRoots*(g: OnchainGroupManager): Future[bool] {.async.} = let rootRes = await g.fetchMerkleRoot() if rootRes.isErr(): - raise newException(ValueError, "failed to get merkle root: " & rootRes.error) + return false let merkleRoot = toMerkleNode(rootRes.get()) + if g.validRoots.len > 0 and g.validRoots[g.validRoots.len - 1] != merkleRoot: + let overflowCount = g.validRoots.len - AcceptableRootWindowSize + 1 + if overflowCount > 0: + for i in 0 ..< overflowCount: + discard g.validRoots.popFirst() - let overflowCount = g.validRoots.len - AcceptableRootWindowSize + 1 - if overflowCount > 0: - for i in 0 ..< overflowCount: - discard g.validRoots.popFirst() + g.validRoots.addLast(merkleRoot) + debug "~~~~~~~~~~~~~ Detected new Merkle root ~~~~~~~~~~~~~~~~", + root = merkleRoot.toHex, totalRoots = g.validRoots.len + return true + else: + debug "~~~~~~~~~~~~~ No new Merkle root ~~~~~~~~~~~~~~~~", + root = merkleRoot.toHex, totalRoots = g.validRoots.len + + return false + +proc trackRootChanges*(g: OnchainGroupManager): Future[void] {.async.} = + ## Continuously track changes to the Merkle root + initializedGuard(g) + + let ethRpc = g.ethRpc.get() + let wakuRlnContract = g.wakuRlnContract.get() + + # Set up the polling interval - more frequent to catch roots + const rpcDelay = 5.seconds + + info "Starting to track Merkle root changes" + + while true: + debug "starting to update roots" + let rootUpdated = await g.updateRoots() + + if rootUpdated: + let proofResult = await g.fetchMerkleProofElements() + if proofResult.isErr(): + error "Failed to fetch Merkle proof", error = proofResult.error + g.merkleProofCache = proofResult.get() + + debug "sleeping for 5 seconds" + await sleepAsync(rpcDelay) - g.validRoots.addLast(merkleRoot) method atomicBatch*( g: OnchainGroupManager, @@ -182,16 +204,7 @@ method atomicBatch*( membersSeq.add(member) await g.registerCb.get()(membersSeq) - let rootRes = await g.fetchMerkleRoot() - if rootRes.isErr(): - raise newException(ValueError, "failed to get merkle root: " & rootRes.error) - - let merkleRoot = toMerkleNode(rootRes.get()) - - let rootUpdated = g.updateRoots(merkleRoot) - if rootUpdated: - info "Detected new Merkle root", - root = merkleRoot.toHex, totalRoots = g.validRoots.len + discard await g.updateRoots() method register*( g: OnchainGroupManager, rateCommitment: RateCommitment @@ -404,43 +417,6 @@ method onRegister*(g: OnchainGroupManager, cb: OnRegisterCallback) {.gcsafe.} = method onWithdraw*(g: OnchainGroupManager, cb: OnWithdrawCallback) {.gcsafe.} = g.withdrawCb = some(cb) -proc trackRootChanges*(g: OnchainGroupManager): Future[void] {.async.} = - ## Continuously track changes to the Merkle root - initializedGuard(g) - - let ethRpc = g.ethRpc.get() - let wakuRlnContract = g.wakuRlnContract.get() - - # Set up the polling interval - more frequent to catch roots - const rpcDelay = 1.seconds - - info "Starting to track Merkle root changes" - - while true: - try: - let rootRes = await g.fetchMerkleRoot() - if rootRes.isErr(): - raise newException(ValueError, "failed to get merkle root: " & rootRes.error) - continue - - let merkleRoot = toMerkleNode(rootRes.get()) - - let rootUpdated = g.updateRoots(merkleRoot) - if rootUpdated: - info "Detected new Merkle root", - root = merkleRoot.toHex, totalRoots = g.validRoots.len - - let proofResult = await g.fetchMerkleProofElements() - if proofResult.isErr(): - error "Failed to fetch Merkle proof", error = proofResult.error - g.merkleProofCache = proofResult.get() - - await sleepAsync(rpcDelay) - except CatchableError as e: - error "Error while tracking Merkle root", error = e.msg - - await sleepAsync(rpcDelay) - method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.} = # check if the Ethereum client is reachable var ethRpc: Web3 @@ -540,7 +516,6 @@ method init*(g: OnchainGroupManager): Future[GroupManagerResult[void]] {.async.} waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet())) g.initialized = true - return ok() method stop*(g: OnchainGroupManager): Future[void] {.async, gcsafe.} = diff --git a/waku/waku_rln_relay/rln_relay.nim b/waku/waku_rln_relay/rln_relay.nim index 04d197ed5..237ef2b44 100644 --- a/waku/waku_rln_relay/rln_relay.nim +++ b/waku/waku_rln_relay/rln_relay.nim @@ -479,6 +479,18 @@ proc mount( # Start epoch monitoring in the background wakuRlnRelay.epochMonitorFuture = monitorEpochs(wakuRlnRelay) + + # Start tracking root changes after successful initialization and registration + debug "~~~~~~~~~~~~~ Starting root tracking ~~~~~~~~~~~~~~~~" + debug "~~~~~~~~~~~~~ groupManager ~~~~~~~~~~~~~~~~" + + await sleepAsync(5 * 60 * 1000) + if conf.rlnRelayDynamic: + debug "~~~~~~~~~~~~~ groupManager is dynamic ~~~~~~~~~~~~~~~~" + let onchainGroupManager = cast[OnchainGroupManager](groupManager) + asyncSpawn onchainGroupManager.trackRootChanges() + await sleepAsync(5 * 60 * 1000) + return ok(wakuRlnRelay) proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async: (raises: [Exception]).} =