From 16bda047e1a67124726bd17535c304a40154ab82 Mon Sep 17 00:00:00 2001 From: fryorcraken <110212804+fryorcraken@users.noreply.github.com> Date: Fri, 4 Jul 2025 17:10:53 +1000 Subject: [PATCH] chore!: make sharding configuration explicit (#3468) * Reserve `networkconfig` name to waku network related settings * Rename cluster conf to network conf A `NetworkConf` is a Waku network configuration. # Conflicts: # tests/factory/test_waku_conf.nim # Conflicts: # tests/factory/test_waku_conf.nim * Improve sharding configuration A smarter data types simplifies the logic. * Fixing tests * fixup! rename to endpointConf * wip: autosharding is a specific configuration state and treat it like it # Conflicts: # waku/factory/external_config.nim * refactor lightpush handler some metrics error reporting were missing # Conflicts: # waku/waku_lightpush/protocol.nim * test_node_factory tests pass * remove warnings * fix tests * Revert eager previous replace-all command * fix up build tools compilation * metadata is used to store cluster id * Mount relay routes in static sharding * Rename activeRelayShards to subscribeShards To make it clearer that these are the shards the node will subscribe to. * Remove unused msg var * Improve error handling * Set autosharding as default, with 1 shard in network Also makes shards to subscribe to all shards in auto sharding, none in static sharding. --- apps/networkmonitor/networkmonitor.nim | 17 +- examples/wakustealthcommitments/node_spec.nim | 24 +-- tests/factory/test_external_config.nim | 84 +++++++-- tests/factory/test_waku_conf.nim | 118 +++++++------ tests/test_peer_manager.nim | 6 +- tests/test_waku_netconfig.nim | 84 ++++----- tests/testlib/wakunode.nim | 8 +- tests/waku_discv5/test_waku_discv5.nim | 4 +- tests/waku_lightpush/lightpush_utils.nim | 6 +- tests/waku_relay/test_wakunode_relay.nim | 11 +- tests/wakunode2/test_app.nim | 2 +- tests/wakunode_rest/test_rest_admin.nim | 12 +- tests/wakunode_rest/test_rest_relay.nim | 10 +- .../conf_builder/waku_conf_builder.nim | 160 ++++++++++-------- .../conf_builder/web_socket_conf_builder.nim | 2 +- waku/factory/external_config.nim | 47 +++-- waku/factory/internal_config.nim | 20 +-- waku/factory/networks_config.nim | 50 ++++-- waku/factory/node_factory.nim | 42 +++-- waku/factory/waku.nim | 15 +- waku/factory/waku_conf.nim | 44 ++--- waku/node/waku_node.nim | 84 ++++++--- waku/waku_api/rest/admin/handlers.nim | 21 ++- waku/waku_api/rest/builder.nim | 20 ++- waku/waku_api/rest/relay/handlers.nim | 9 +- waku/waku_core/topics/content_topic.nim | 12 ++ waku/waku_core/topics/sharding.nim | 57 +++---- waku/waku_lightpush/protocol.nim | 19 ++- waku/waku_metadata/protocol.nim | 2 +- 29 files changed, 578 insertions(+), 412 deletions(-) diff --git a/apps/networkmonitor/networkmonitor.nim b/apps/networkmonitor/networkmonitor.nim index f8cde5281..f391b3d20 100644 --- a/apps/networkmonitor/networkmonitor.nim +++ b/apps/networkmonitor/networkmonitor.nim @@ -570,17 +570,18 @@ when isMainModule: info "cli flags", conf = conf if conf.clusterId == 1: - let twnClusterConf = ClusterConf.TheWakuNetworkConf() + let twnNetworkConf = NetworkConf.TheWakuNetworkConf() - conf.bootstrapNodes = twnClusterConf.discv5BootstrapNodes - conf.rlnRelayDynamic = twnClusterConf.rlnRelayDynamic - conf.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress - conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec - conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit - conf.numShardsInNetwork = twnClusterConf.numShardsInNetwork + conf.bootstrapNodes = twnNetworkConf.discv5BootstrapNodes + conf.rlnRelayDynamic = twnNetworkConf.rlnRelayDynamic + conf.rlnRelayEthContractAddress = twnNetworkConf.rlnRelayEthContractAddress + conf.rlnEpochSizeSec = twnNetworkConf.rlnEpochSizeSec + conf.rlnRelayUserMessageLimit = twnNetworkConf.rlnRelayUserMessageLimit + conf.numShardsInNetwork = twnNetworkConf.shardingConf.numShardsInCluster if conf.shards.len == 0: - conf.shards = toSeq(uint16(0) .. uint16(twnClusterConf.numShardsInNetwork - 1)) + conf.shards = + toSeq(uint16(0) .. uint16(twnNetworkConf.shardingConf.numShardsInCluster - 1)) if conf.logLevel != LogLevel.NONE: setLogLevel(conf.logLevel) diff --git a/examples/wakustealthcommitments/node_spec.nim b/examples/wakustealthcommitments/node_spec.nim index fdcd36986..c3468ccde 100644 --- a/examples/wakustealthcommitments/node_spec.nim +++ b/examples/wakustealthcommitments/node_spec.nim @@ -24,26 +24,26 @@ proc setup*(): Waku = var conf = confRes.get() - let twnClusterConf = ClusterConf.TheWakuNetworkConf() + let twnNetworkConf = NetworkConf.TheWakuNetworkConf() if len(conf.shards) != 0: - conf.pubsubTopics = conf.shards.mapIt(twnClusterConf.pubsubTopics[it.uint16]) + conf.pubsubTopics = conf.shards.mapIt(twnNetworkConf.pubsubTopics[it.uint16]) else: - conf.pubsubTopics = twnClusterConf.pubsubTopics + conf.pubsubTopics = twnNetworkConf.pubsubTopics # Override configuration - conf.maxMessageSize = twnClusterConf.maxMessageSize - conf.clusterId = twnClusterConf.clusterId - conf.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress - conf.rlnRelayDynamic = twnClusterConf.rlnRelayDynamic - conf.discv5Discovery = twnClusterConf.discv5Discovery + conf.maxMessageSize = twnNetworkConf.maxMessageSize + conf.clusterId = twnNetworkConf.clusterId + conf.rlnRelayEthContractAddress = twnNetworkConf.rlnRelayEthContractAddress + conf.rlnRelayDynamic = twnNetworkConf.rlnRelayDynamic + conf.discv5Discovery = twnNetworkConf.discv5Discovery conf.discv5BootstrapNodes = - conf.discv5BootstrapNodes & twnClusterConf.discv5BootstrapNodes - conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec - conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit + conf.discv5BootstrapNodes & twnNetworkConf.discv5BootstrapNodes + conf.rlnEpochSizeSec = twnNetworkConf.rlnEpochSizeSec + conf.rlnRelayUserMessageLimit = twnNetworkConf.rlnRelayUserMessageLimit # Only set rlnRelay to true if relay is configured if conf.relay: - conf.rlnRelay = twnClusterConf.rlnRelay + conf.rlnRelay = twnNetworkConf.rlnRelay debug "Starting node" var waku = Waku.new(conf).valueOr: diff --git a/tests/factory/test_external_config.nim b/tests/factory/test_external_config.nim index 927246b0d..ecd77826f 100644 --- a/tests/factory/test_external_config.nim +++ b/tests/factory/test_external_config.nim @@ -17,10 +17,46 @@ import ../../waku/common/logging, ../../waku/common/utils/parse_size_units -suite "Waku config - apply preset": - test "Default preset is TWN": +suite "Waku external config - default values": + test "Default sharding value": ## Setup - let expectedConf = ClusterConf.TheWakuNetworkConf() + let defaultShardingMode = AutoSharding + let defaultNumShardsInCluster = 1.uint16 + let defaultSubscribeShards = @[0.uint16] + + ## Given + let preConfig = defaultWakuNodeConf().get() + + ## When + let res = preConfig.toWakuConf() + assert res.isOk(), $res.error + + ## Then + let conf = res.get() + check conf.shardingConf.kind == defaultShardingMode + check conf.shardingConf.numShardsInCluster == defaultNumShardsInCluster + check conf.subscribeShards == defaultSubscribeShards + + test "Default shards value in static sharding": + ## Setup + let defaultSubscribeShards: seq[uint16] = @[] + + ## Given + var preConfig = defaultWakuNodeConf().get() + preConfig.numShardsInNetwork = 0.uint16 + + ## When + let res = preConfig.toWakuConf() + assert res.isOk(), $res.error + + ## Then + let conf = res.get() + check conf.subscribeShards == defaultSubscribeShards + +suite "Waku external config - apply preset": + test "Preset is TWN": + ## Setup + let expectedConf = NetworkConf.TheWakuNetworkConf() ## Given let preConfig = WakuNodeConf( @@ -48,7 +84,9 @@ suite "Waku config - apply preset": check rlnRelayConf.chainId == expectedConf.rlnRelayChainId check rlnRelayConf.epochSizeSec == expectedConf.rlnEpochSizeSec check rlnRelayConf.userMessageLimit == expectedConf.rlnRelayUserMessageLimit - check conf.numShardsInNetwork == expectedConf.numShardsInNetwork + check conf.shardingConf.kind == expectedConf.shardingConf.kind + check conf.shardingConf.numShardsInCluster == + expectedConf.shardingConf.numShardsInCluster check conf.discv5Conf.isSome() == expectedConf.discv5Discovery if conf.discv5Conf.isSome(): let discv5Conf = conf.discv5Conf.get() @@ -56,7 +94,7 @@ suite "Waku config - apply preset": test "Subscribes to all valid shards in twn": ## Setup - let expectedConf = ClusterConf.TheWakuNetworkConf() + let expectedConf = NetworkConf.TheWakuNetworkConf() ## Given let shards: seq[uint16] = @[0, 1, 2, 3, 4, 5, 6, 7] @@ -68,11 +106,11 @@ suite "Waku config - apply preset": ## Then let conf = res.get() - check conf.shards.len == expectedConf.numShardsInNetwork.int + check conf.subscribeShards.len == expectedConf.shardingConf.numShardsInCluster.int test "Subscribes to some valid shards in twn": ## Setup - let expectedConf = ClusterConf.TheWakuNetworkConf() + let expectedConf = NetworkConf.TheWakuNetworkConf() ## Given let shards: seq[uint16] = @[0, 4, 7] @@ -84,9 +122,9 @@ suite "Waku config - apply preset": ## Then let conf = resConf.get() - assert conf.shards.len() == shards.len() + assert conf.subscribeShards.len() == shards.len() for index, shard in shards: - assert shard in conf.shards + assert shard in conf.subscribeShards test "Subscribes to invalid shards in twn": ## Setup @@ -103,7 +141,7 @@ suite "Waku config - apply preset": test "Apply TWN preset when cluster id = 1": ## Setup - let expectedConf = ClusterConf.TheWakuNetworkConf() + let expectedConf = NetworkConf.TheWakuNetworkConf() ## Given let preConfig = WakuNodeConf( @@ -131,13 +169,15 @@ suite "Waku config - apply preset": check rlnRelayConf.chainId == expectedConf.rlnRelayChainId check rlnRelayConf.epochSizeSec == expectedConf.rlnEpochSizeSec check rlnRelayConf.userMessageLimit == expectedConf.rlnRelayUserMessageLimit - check conf.numShardsInNetwork == expectedConf.numShardsInNetwork + check conf.shardingConf.kind == expectedConf.shardingConf.kind + check conf.shardingConf.numShardsInCluster == + expectedConf.shardingConf.numShardsInCluster check conf.discv5Conf.isSome() == expectedConf.discv5Discovery if conf.discv5Conf.isSome(): let discv5Conf = conf.discv5Conf.get() check discv5Conf.bootstrapNodes == expectedConf.discv5BootstrapNodes -suite "Waku config - node key": +suite "Waku external config - node key": test "Passed node key is used": ## Setup let nodeKeyStr = @@ -158,13 +198,13 @@ suite "Waku config - node key": assert utils.toHex(resKey.getRawBytes().get()) == utils.toHex(nodekey.getRawBytes().get()) -suite "Waku config - Shards": +suite "Waku external config - Shards": test "Shards are valid": ## Setup ## Given let shards: seq[uint16] = @[0, 2, 4] - let numShardsInNetwork = 5.uint32 + let numShardsInNetwork = 5.uint16 let wakuNodeConf = WakuNodeConf( cmd: noCommand, shards: shards, numShardsInNetwork: numShardsInNetwork ) @@ -183,7 +223,7 @@ suite "Waku config - Shards": ## Given let shards: seq[uint16] = @[0, 2, 5] - let numShardsInNetwork = 5.uint32 + let numShardsInNetwork = 5.uint16 let wakuNodeConf = WakuNodeConf( cmd: noCommand, shards: shards, numShardsInNetwork: numShardsInNetwork ) @@ -198,7 +238,7 @@ suite "Waku config - Shards": ## Setup ## Given - let wakuNodeConf = WakuNodeConf.load(version = "", cmdLine = @["--shard=32"]) + let wakuNodeConf = WakuNodeConf.load(version = "", cmdLine = @["--shard=0"]) ## When let res = wakuNodeConf.toWakuConf() @@ -207,3 +247,15 @@ suite "Waku config - Shards": let wakuConf = res.get() let vRes = wakuConf.validate() assert vRes.isOk(), $vRes.error + + test "Imvalid shard is passed without num shards": + ## Setup + + ## Given + let wakuNodeConf = WakuNodeConf.load(version = "", cmdLine = @["--shard=32"]) + + ## When + let res = wakuNodeConf.toWakuConf() + + ## Then + assert res.isErr(), "Invalid shard was accepted" diff --git a/tests/factory/test_waku_conf.nim b/tests/factory/test_waku_conf.nim index c18a2c73c..436eb4e40 100644 --- a/tests/factory/test_waku_conf.nim +++ b/tests/factory/test_waku_conf.nim @@ -16,7 +16,7 @@ import suite "Waku Conf - build with cluster conf": test "Cluster Conf is passed and relay is enabled": ## Setup - let clusterConf = ClusterConf.TheWakuNetworkConf() + let networkConf = NetworkConf.TheWakuNetworkConf() var builder = WakuConfBuilder.init() builder.discv5Conf.withUdpPort(9000) builder.withRelayServiceRatio("50:50") @@ -25,7 +25,7 @@ suite "Waku Conf - build with cluster conf": ## Given builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"]) - builder.withClusterConf(clusterConf) + builder.withNetworkConf(networkConf) builder.withRelay(true) builder.rlnRelayConf.withTreePath("/tmp/test-tree-path") @@ -37,27 +37,29 @@ suite "Waku Conf - build with cluster conf": ## Then let resValidate = conf.validate() assert resValidate.isOk(), $resValidate.error - check conf.clusterId == clusterConf.clusterId - check conf.numShardsInNetwork == clusterConf.numShardsInNetwork - check conf.shards == expectedShards + check conf.clusterId == networkConf.clusterId + check conf.shardingConf.kind == networkConf.shardingConf.kind + check conf.shardingConf.numShardsInCluster == + networkConf.shardingConf.numShardsInCluster + check conf.subscribeShards == expectedShards check conf.maxMessageSizeBytes == - uint64(parseCorrectMsgSize(clusterConf.maxMessageSize)) - check conf.discv5Conf.get().bootstrapNodes == clusterConf.discv5BootstrapNodes + uint64(parseCorrectMsgSize(networkConf.maxMessageSize)) + check conf.discv5Conf.get().bootstrapNodes == networkConf.discv5BootstrapNodes - if clusterConf.rlnRelay: + if networkConf.rlnRelay: assert conf.rlnRelayConf.isSome(), "RLN Relay conf is disabled" let rlnRelayConf = conf.rlnRelayConf.get() check rlnRelayConf.ethContractAddress.string == - clusterConf.rlnRelayEthContractAddress - check rlnRelayConf.dynamic == clusterConf.rlnRelayDynamic - check rlnRelayConf.chainId == clusterConf.rlnRelayChainId - check rlnRelayConf.epochSizeSec == clusterConf.rlnEpochSizeSec - check rlnRelayConf.userMessageLimit == clusterConf.rlnRelayUserMessageLimit + networkConf.rlnRelayEthContractAddress + check rlnRelayConf.dynamic == networkConf.rlnRelayDynamic + check rlnRelayConf.chainId == networkConf.rlnRelayChainId + check rlnRelayConf.epochSizeSec == networkConf.rlnEpochSizeSec + check rlnRelayConf.userMessageLimit == networkConf.rlnRelayUserMessageLimit test "Cluster Conf is passed, but relay is disabled": ## Setup - let clusterConf = ClusterConf.TheWakuNetworkConf() + let networkConf = NetworkConf.TheWakuNetworkConf() var builder = WakuConfBuilder.init() builder.withRelayServiceRatio("50:50") builder.discv5Conf.withUdpPort(9000) @@ -66,7 +68,7 @@ suite "Waku Conf - build with cluster conf": ## Given builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"]) - builder.withClusterConf(clusterConf) + builder.withNetworkConf(networkConf) builder.withRelay(false) ## When @@ -77,18 +79,20 @@ suite "Waku Conf - build with cluster conf": ## Then let resValidate = conf.validate() assert resValidate.isOk(), $resValidate.error - check conf.clusterId == clusterConf.clusterId - check conf.numShardsInNetwork == clusterConf.numShardsInNetwork - check conf.shards == expectedShards + check conf.clusterId == networkConf.clusterId + check conf.shardingConf.kind == networkConf.shardingConf.kind + check conf.shardingConf.numShardsInCluster == + networkConf.shardingConf.numShardsInCluster + check conf.subscribeShards == expectedShards check conf.maxMessageSizeBytes == - uint64(parseCorrectMsgSize(clusterConf.maxMessageSize)) - check conf.discv5Conf.get().bootstrapNodes == clusterConf.discv5BootstrapNodes + uint64(parseCorrectMsgSize(networkConf.maxMessageSize)) + check conf.discv5Conf.get().bootstrapNodes == networkConf.discv5BootstrapNodes assert conf.rlnRelayConf.isNone test "Cluster Conf is passed, but rln relay is disabled": ## Setup - let clusterConf = ClusterConf.TheWakuNetworkConf() + let networkConf = NetworkConf.TheWakuNetworkConf() var builder = WakuConfBuilder.init() let # Mount all shards in network @@ -96,7 +100,7 @@ suite "Waku Conf - build with cluster conf": ## Given builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"]) - builder.withClusterConf(clusterConf) + builder.withNetworkConf(networkConf) builder.rlnRelayConf.withEnabled(false) ## When @@ -107,24 +111,26 @@ suite "Waku Conf - build with cluster conf": ## Then let resValidate = conf.validate() assert resValidate.isOk(), $resValidate.error - check conf.clusterId == clusterConf.clusterId - check conf.numShardsInNetwork == clusterConf.numShardsInNetwork - check conf.shards == expectedShards + check conf.clusterId == networkConf.clusterId + check conf.shardingConf.kind == networkConf.shardingConf.kind + check conf.shardingConf.numShardsInCluster == + networkConf.shardingConf.numShardsInCluster + check conf.subscribeShards == expectedShards check conf.maxMessageSizeBytes == - uint64(parseCorrectMsgSize(clusterConf.maxMessageSize)) - check conf.discv5Conf.get().bootstrapNodes == clusterConf.discv5BootstrapNodes + uint64(parseCorrectMsgSize(networkConf.maxMessageSize)) + check conf.discv5Conf.get().bootstrapNodes == networkConf.discv5BootstrapNodes assert conf.rlnRelayConf.isNone test "Cluster Conf is passed and valid shards are specified": ## Setup - let clusterConf = ClusterConf.TheWakuNetworkConf() + let networkConf = NetworkConf.TheWakuNetworkConf() var builder = WakuConfBuilder.init() let shards = @[2.uint16, 3.uint16] ## Given builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"]) - builder.withClusterConf(clusterConf) - builder.withShards(shards) + builder.withNetworkConf(networkConf) + builder.withSubscribeShards(shards) ## When let resConf = builder.build() @@ -134,23 +140,25 @@ suite "Waku Conf - build with cluster conf": ## Then let resValidate = conf.validate() assert resValidate.isOk(), $resValidate.error - check conf.clusterId == clusterConf.clusterId - check conf.numShardsInNetwork == clusterConf.numShardsInNetwork - check conf.shards == shards + check conf.clusterId == networkConf.clusterId + check conf.shardingConf.kind == networkConf.shardingConf.kind + check conf.shardingConf.numShardsInCluster == + networkConf.shardingConf.numShardsInCluster + check conf.subscribeShards == shards check conf.maxMessageSizeBytes == - uint64(parseCorrectMsgSize(clusterConf.maxMessageSize)) - check conf.discv5Conf.get().bootstrapNodes == clusterConf.discv5BootstrapNodes + uint64(parseCorrectMsgSize(networkConf.maxMessageSize)) + check conf.discv5Conf.get().bootstrapNodes == networkConf.discv5BootstrapNodes test "Cluster Conf is passed and invalid shards are specified": ## Setup - let clusterConf = ClusterConf.TheWakuNetworkConf() + let networkConf = NetworkConf.TheWakuNetworkConf() var builder = WakuConfBuilder.init() let shards = @[2.uint16, 10.uint16] ## Given builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"]) - builder.withClusterConf(clusterConf) - builder.withShards(shards) + builder.withNetworkConf(networkConf) + builder.withSubscribeShards(shards) ## When let resConf = builder.build() @@ -160,7 +168,7 @@ suite "Waku Conf - build with cluster conf": test "Cluster Conf is passed and RLN contract is **not** overridden": ## Setup - let clusterConf = ClusterConf.TheWakuNetworkConf() + let networkConf = NetworkConf.TheWakuNetworkConf() var builder = WakuConfBuilder.init() builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"]) @@ -170,7 +178,7 @@ suite "Waku Conf - build with cluster conf": ## Given builder.rlnRelayConf.withEthContractAddress(contractAddress) - builder.withClusterConf(clusterConf) + builder.withNetworkConf(networkConf) builder.withRelay(true) builder.rlnRelayConf.withTreePath("/tmp/test") @@ -182,24 +190,26 @@ suite "Waku Conf - build with cluster conf": ## Then let resValidate = conf.validate() assert resValidate.isOk(), $resValidate.error - check conf.clusterId == clusterConf.clusterId - check conf.numShardsInNetwork == clusterConf.numShardsInNetwork - check conf.shards == expectedShards + check conf.clusterId == networkConf.clusterId + check conf.shardingConf.kind == networkConf.shardingConf.kind + check conf.shardingConf.numShardsInCluster == + networkConf.shardingConf.numShardsInCluster + check conf.subscribeShards == expectedShards check conf.maxMessageSizeBytes == - uint64(parseCorrectMsgSize(clusterConf.maxMessageSize)) - check conf.discv5Conf.isSome == clusterConf.discv5Discovery - check conf.discv5Conf.get().bootstrapNodes == clusterConf.discv5BootstrapNodes + uint64(parseCorrectMsgSize(networkConf.maxMessageSize)) + check conf.discv5Conf.isSome == networkConf.discv5Discovery + check conf.discv5Conf.get().bootstrapNodes == networkConf.discv5BootstrapNodes - if clusterConf.rlnRelay: + if networkConf.rlnRelay: assert conf.rlnRelayConf.isSome let rlnRelayConf = conf.rlnRelayConf.get() check rlnRelayConf.ethContractAddress.string == - clusterConf.rlnRelayEthContractAddress - check rlnRelayConf.dynamic == clusterConf.rlnRelayDynamic - check rlnRelayConf.chainId == clusterConf.rlnRelayChainId - check rlnRelayConf.epochSizeSec == clusterConf.rlnEpochSizeSec - check rlnRelayConf.userMessageLimit == clusterConf.rlnRelayUserMessageLimit + networkConf.rlnRelayEthContractAddress + check rlnRelayConf.dynamic == networkConf.rlnRelayDynamic + check rlnRelayConf.chainId == networkConf.rlnRelayChainId + check rlnRelayConf.epochSizeSec == networkConf.rlnEpochSizeSec + check rlnRelayConf.userMessageLimit == networkConf.rlnRelayUserMessageLimit suite "Waku Conf - node key": test "Node key is generated": @@ -264,8 +274,8 @@ suite "Waku Conf - extMultiaddrs": ## Then let resValidate = conf.validate() assert resValidate.isOk(), $resValidate.error - check multiaddrs.len == conf.networkConf.extMultiAddrs.len - let resMultiaddrs = conf.networkConf.extMultiAddrs.map( + check multiaddrs.len == conf.endpointConf.extMultiAddrs.len + let resMultiaddrs = conf.endpointConf.extMultiAddrs.map( proc(m: MultiAddress): string = $m ) diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index 9ef5ddd90..889e397cc 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -420,7 +420,7 @@ procSuite "Peer Manager": parseIpAddress("0.0.0.0"), port, clusterId = 3, - shards = @[uint16(0)], + subscribeShards = @[uint16(0)], ) # same network @@ -429,14 +429,14 @@ procSuite "Peer Manager": parseIpAddress("0.0.0.0"), port, clusterId = 4, - shards = @[uint16(0)], + subscribeShards = @[uint16(0)], ) node3 = newTestWakuNode( generateSecp256k1Key(), parseIpAddress("0.0.0.0"), port, clusterId = 4, - shards = @[uint16(0)], + subscribeShards = @[uint16(0)], ) node1.mountMetadata(3).expect("Mounted Waku Metadata") diff --git a/tests/test_waku_netconfig.nim b/tests/test_waku_netconfig.nim index d2c9cc780..712fa4736 100644 --- a/tests/test_waku_netconfig.nim +++ b/tests/test_waku_netconfig.nim @@ -18,8 +18,8 @@ suite "Waku NetConfig": let wakuFlags = defaultTestWakuFlags() let netConfigRes = NetConfig.init( - bindIp = conf.networkConf.p2pListenAddress, - bindPort = conf.networkConf.p2pTcpPort, + bindIp = conf.endpointConf.p2pListenAddress, + bindPort = conf.endpointConf.p2pTcpPort, extIp = none(IpAddress), extPort = none(Port), extMultiAddrs = @[], @@ -46,7 +46,8 @@ suite "Waku NetConfig": let conf = defaultTestWakuConf() let netConfigRes = NetConfig.init( - bindIp = conf.networkConf.p2pListenAddress, bindPort = conf.networkConf.p2pTcpPort + bindIp = conf.endpointConf.p2pListenAddress, + bindPort = conf.endpointConf.p2pTcpPort, ) assert netConfigRes.isOk(), $netConfigRes.error @@ -57,7 +58,9 @@ suite "Waku NetConfig": netConfig.announcedAddresses.len == 1 # Only bind address should be present netConfig.announcedAddresses[0] == formatListenAddress( - ip4TcpEndPoint(conf.networkConf.p2pListenAddress, conf.networkConf.p2pTcpPort) + ip4TcpEndPoint( + conf.endpointConf.p2pListenAddress, conf.endpointConf.p2pTcpPort + ) ) asyncTest "AnnouncedAddresses contains external address if extIp/Port are provided": @@ -67,8 +70,8 @@ suite "Waku NetConfig": extPort = Port(1234) let netConfigRes = NetConfig.init( - bindIp = conf.networkConf.p2pListenAddress, - bindPort = conf.networkConf.p2pTcpPort, + bindIp = conf.endpointConf.p2pListenAddress, + bindPort = conf.endpointConf.p2pTcpPort, extIp = some(extIp), extPort = some(extPort), ) @@ -88,8 +91,8 @@ suite "Waku NetConfig": extPort = Port(1234) let netConfigRes = NetConfig.init( - bindIp = conf.networkConf.p2pListenAddress, - bindPort = conf.networkConf.p2pTcpPort, + bindIp = conf.endpointConf.p2pListenAddress, + bindPort = conf.endpointConf.p2pTcpPort, dns4DomainName = some(dns4DomainName), extPort = some(extPort), ) @@ -110,8 +113,8 @@ suite "Waku NetConfig": extMultiAddrs = @[ip4TcpEndPoint(extIp, extPort)] let netConfigRes = NetConfig.init( - bindIp = conf.networkConf.p2pListenAddress, - bindPort = conf.networkConf.p2pTcpPort, + bindIp = conf.endpointConf.p2pListenAddress, + bindPort = conf.endpointConf.p2pTcpPort, extMultiAddrs = extMultiAddrs, ) @@ -131,8 +134,8 @@ suite "Waku NetConfig": extPort = Port(1234) let netConfigRes = NetConfig.init( - bindIp = conf.networkConf.p2pListenAddress, - bindPort = conf.networkConf.p2pTcpPort, + bindIp = conf.endpointConf.p2pListenAddress, + bindPort = conf.endpointConf.p2pTcpPort, dns4DomainName = some(dns4DomainName), extIp = some(extIp), extPort = some(extPort), @@ -152,8 +155,8 @@ suite "Waku NetConfig": wssEnabled = false var netConfigRes = NetConfig.init( - bindIp = conf.networkConf.p2pListenAddress, - bindPort = conf.networkConf.p2pTcpPort, + bindIp = conf.endpointConf.p2pListenAddress, + bindPort = conf.endpointConf.p2pTcpPort, wsEnabled = true, wssEnabled = wssEnabled, ) @@ -165,8 +168,9 @@ suite "Waku NetConfig": check: netConfig.announcedAddresses.len == 2 # Bind address + wsHostAddress netConfig.announcedAddresses[1] == ( - ip4TcpEndPoint(conf.networkConf.p2pListenAddress, conf.webSocketConf.get().port) & - wsFlag(wssEnabled) + ip4TcpEndPoint( + conf.endpointConf.p2pListenAddress, conf.webSocketConf.get().port + ) & wsFlag(wssEnabled) ) ## Now try the same for the case of wssEnabled = true @@ -174,8 +178,8 @@ suite "Waku NetConfig": wssEnabled = true netConfigRes = NetConfig.init( - bindIp = conf.networkConf.p2pListenAddress, - bindPort = conf.networkConf.p2pTcpPort, + bindIp = conf.endpointConf.p2pListenAddress, + bindPort = conf.endpointConf.p2pTcpPort, wsEnabled = true, wssEnabled = wssEnabled, ) @@ -187,8 +191,9 @@ suite "Waku NetConfig": check: netConfig.announcedAddresses.len == 2 # Bind address + wsHostAddress netConfig.announcedAddresses[1] == ( - ip4TcpEndPoint(conf.networkConf.p2pListenAddress, conf.websocketConf.get().port) & - wsFlag(wssEnabled) + ip4TcpEndPoint( + conf.endpointConf.p2pListenAddress, conf.websocketConf.get().port + ) & wsFlag(wssEnabled) ) asyncTest "Announced WebSocket address contains external IP if provided": @@ -199,8 +204,8 @@ suite "Waku NetConfig": wssEnabled = false let netConfigRes = NetConfig.init( - bindIp = conf.networkConf.p2pListenAddress, - bindPort = conf.networkConf.p2pTcpPort, + bindIp = conf.endpointConf.p2pListenAddress, + bindPort = conf.endpointConf.p2pTcpPort, extIp = some(extIp), extPort = some(extPort), wsEnabled = true, @@ -224,8 +229,8 @@ suite "Waku NetConfig": wssEnabled = false let netConfigRes = NetConfig.init( - bindIp = conf.networkConf.p2pListenAddress, - bindPort = conf.networkConf.p2pTcpPort, + bindIp = conf.endpointConf.p2pListenAddress, + bindPort = conf.endpointConf.p2pTcpPort, dns4DomainName = some(dns4DomainName), extPort = some(extPort), wsEnabled = true, @@ -252,8 +257,8 @@ suite "Waku NetConfig": wssEnabled = false let netConfigRes = NetConfig.init( - bindIp = conf.networkConf.p2pListenAddress, - bindPort = conf.networkConf.p2pTcpPort, + bindIp = conf.endpointConf.p2pListenAddress, + bindPort = conf.endpointConf.p2pTcpPort, dns4DomainName = some(dns4DomainName), extIp = some(extIp), extPort = some(extPort), @@ -277,7 +282,8 @@ suite "Waku NetConfig": let conf = defaultTestWakuConf() let netConfigRes = NetConfig.init( - bindIp = conf.networkConf.p2pListenAddress, bindPort = conf.networkConf.p2pTcpPort + bindIp = conf.endpointConf.p2pListenAddress, + bindPort = conf.endpointConf.p2pTcpPort, ) assert netConfigRes.isOk(), $netConfigRes.error @@ -285,8 +291,8 @@ suite "Waku NetConfig": let netConfig = netConfigRes.get() check: - netConfig.enrIp.get() == conf.networkConf.p2pListenAddress - netConfig.enrPort.get() == conf.networkConf.p2pTcpPort + netConfig.enrIp.get() == conf.endpointConf.p2pListenAddress + netConfig.enrPort.get() == conf.endpointConf.p2pTcpPort asyncTest "ENR is set with extIp/Port if provided": let @@ -295,8 +301,8 @@ suite "Waku NetConfig": extPort = Port(1234) let netConfigRes = NetConfig.init( - bindIp = conf.networkConf.p2pListenAddress, - bindPort = conf.networkConf.p2pTcpPort, + bindIp = conf.endpointConf.p2pListenAddress, + bindPort = conf.endpointConf.p2pTcpPort, extIp = some(extIp), extPort = some(extPort), ) @@ -316,8 +322,8 @@ suite "Waku NetConfig": extPort = Port(1234) let netConfigRes = NetConfig.init( - bindIp = conf.networkConf.p2pListenAddress, - bindPort = conf.networkConf.p2pTcpPort, + bindIp = conf.endpointConf.p2pListenAddress, + bindPort = conf.endpointConf.p2pTcpPort, dns4DomainName = some(dns4DomainName), extPort = some(extPort), ) @@ -339,8 +345,8 @@ suite "Waku NetConfig": extMultiAddrs = @[(ip4TcpEndPoint(extAddIp, extAddPort) & wsFlag(wssEnabled))] var netConfigRes = NetConfig.init( - bindIp = conf.networkConf.p2pListenAddress, - bindPort = conf.networkConf.p2pTcpPort, + bindIp = conf.endpointConf.p2pListenAddress, + bindPort = conf.endpointConf.p2pTcpPort, extMultiAddrs = extMultiAddrs, wsEnabled = wsEnabled, ) @@ -358,8 +364,8 @@ suite "Waku NetConfig": extMultiAddrs = @[(ip4TcpEndPoint(extAddIp, extAddPort) & wsFlag(wssEnabled))] netConfigRes = NetConfig.init( - bindIp = conf.networkConf.p2pListenAddress, - bindPort = conf.networkConf.p2pTcpPort, + bindIp = conf.endpointConf.p2pListenAddress, + bindPort = conf.endpointConf.p2pTcpPort, extMultiAddrs = extMultiAddrs, wssEnabled = wssEnabled, ) @@ -380,8 +386,8 @@ suite "Waku NetConfig": extMultiAddrs = @[ip4TcpEndPoint(extAddIp, extAddPort)] let netConfigRes = NetConfig.init( - bindIp = conf.networkConf.p2pListenAddress, - bindPort = conf.networkConf.p2pTcpPort, + bindIp = conf.endpointConf.p2pListenAddress, + bindPort = conf.endpointConf.p2pTcpPort, extMultiAddrs = extMultiAddrs, extMultiAddrsOnly = true, ) diff --git a/tests/testlib/wakunode.nim b/tests/testlib/wakunode.nim index 54719aac1..fe040534e 100644 --- a/tests/testlib/wakunode.nim +++ b/tests/testlib/wakunode.nim @@ -37,7 +37,7 @@ proc defaultTestWakuConfBuilder*(): WakuConfBuilder = builder.withRelayServiceRatio("60:40") builder.withMaxMessageSize("1024 KiB") builder.withClusterId(DefaultClusterId) - builder.withShards(@[DefaultShardId]) + builder.withSubscribeShards(@[DefaultShardId]) builder.withRelay(true) builder.withRendezvous(true) builder.storeServiceConf.withDbMigration(false) @@ -72,7 +72,7 @@ proc newTestWakuNode*( agentString = none(string), peerStoreCapacity = none(int), clusterId = DefaultClusterId, - shards = @[DefaultShardId], + subscribeShards = @[DefaultShardId], ): WakuNode = var resolvedExtIp = extIp @@ -86,7 +86,7 @@ proc newTestWakuNode*( var conf = defaultTestWakuConf() conf.clusterId = clusterId - conf.shards = shards + conf.subscribeShards = subscribeShards if dns4DomainName.isSome() and extIp.isNone(): # If there's an error resolving the IP, an exception is thrown and test fails @@ -114,7 +114,7 @@ proc newTestWakuNode*( var enrBuilder = EnrBuilder.init(nodeKey) enrBuilder.withWakuRelaySharding( - RelayShards(clusterId: conf.clusterId, shardIds: conf.shards) + RelayShards(clusterId: conf.clusterId, shardIds: conf.subscribeShards) ).isOkOr: raise newException(Defect, "Invalid record: " & $error) diff --git a/tests/waku_discv5/test_waku_discv5.nim b/tests/waku_discv5/test_waku_discv5.nim index c5dd1c55e..79913ce92 100644 --- a/tests/waku_discv5/test_waku_discv5.nim +++ b/tests/waku_discv5/test_waku_discv5.nim @@ -503,7 +503,7 @@ suite "Waku Discovery v5": waku.dynamicBootstrapNodes, waku.rng, waku.conf.nodeKey, - waku.conf.networkConf.p2pListenAddress, + waku.conf.endpointConf.p2pListenAddress, waku.conf.portsShift, ) @@ -534,7 +534,7 @@ suite "Waku Discovery v5": waku.dynamicBootstrapNodes, waku.rng, waku.conf.nodeKey, - waku.conf.networkConf.p2pListenAddress, + waku.conf.endpointConf.p2pListenAddress, waku.conf.portsShift, ) diff --git a/tests/waku_lightpush/lightpush_utils.nim b/tests/waku_lightpush/lightpush_utils.nim index 9b867c707..7bd44a311 100644 --- a/tests/waku_lightpush/lightpush_utils.nim +++ b/tests/waku_lightpush/lightpush_utils.nim @@ -18,8 +18,10 @@ proc newTestWakuLightpushNode*( ): Future[WakuLightPush] {.async.} = let peerManager = PeerManager.new(switch) - wakuSharding = Sharding(clusterId: 1, shardCountGenZero: 8) - proto = WakuLightPush.new(peerManager, rng, handler, wakuSharding, rateLimitSetting) + wakuAutoSharding = Sharding(clusterId: 1, shardCountGenZero: 8) + proto = WakuLightPush.new( + peerManager, rng, handler, some(wakuAutoSharding), rateLimitSetting + ) await proto.start() switch.mount(proto) diff --git a/tests/waku_relay/test_wakunode_relay.nim b/tests/waku_relay/test_wakunode_relay.nim index ad8d83361..2b4f32617 100644 --- a/tests/waku_relay/test_wakunode_relay.nim +++ b/tests/waku_relay/test_wakunode_relay.nim @@ -657,7 +657,7 @@ suite "WakuNode - Relay": await node.start() (await node.mountRelay()).isOkOr: assert false, "Failed to mount relay" - require node.mountSharding(1, 1).isOk + require node.mountAutoSharding(1, 1).isOk ## Given let @@ -670,11 +670,14 @@ suite "WakuNode - Relay": ): Future[void] {.gcsafe, raises: [Defect].} = discard pubsubTopic discard message - assert shard == node.wakuSharding.getShard(contentTopicA).expect("Valid Topic"), + assert shard == + node.wakuAutoSharding.get().getShard(contentTopicA).expect("Valid Topic"), "topic must use the same shard" - assert shard == node.wakuSharding.getShard(contentTopicB).expect("Valid Topic"), + assert shard == + node.wakuAutoSharding.get().getShard(contentTopicB).expect("Valid Topic"), "topic must use the same shard" - assert shard == node.wakuSharding.getShard(contentTopicC).expect("Valid Topic"), + assert shard == + node.wakuAutoSharding.get().getShard(contentTopicC).expect("Valid Topic"), "topic must use the same shard" ## When diff --git a/tests/wakunode2/test_app.nim b/tests/wakunode2/test_app.nim index 4f52732da..2d62d4956 100644 --- a/tests/wakunode2/test_app.nim +++ b/tests/wakunode2/test_app.nim @@ -65,7 +65,7 @@ suite "Wakunode2 - Waku initialization": test "app properly handles dynamic port configuration": ## Given var conf = defaultTestWakuConf() - conf.networkConf.p2pTcpPort = Port(0) + conf.endpointConf.p2pTcpPort = Port(0) ## When var waku = Waku.new(conf).valueOr: diff --git a/tests/wakunode_rest/test_rest_admin.nim b/tests/wakunode_rest/test_rest_admin.nim index 4e59b0725..c928140e1 100644 --- a/tests/wakunode_rest/test_rest_admin.nim +++ b/tests/wakunode_rest/test_rest_admin.nim @@ -1,7 +1,7 @@ {.used.} import - std/[sequtils, strformat, net], + std/[sequtils, net], testutils/unittests, presto, presto/client as presto_client, @@ -42,6 +42,14 @@ suite "Waku v2 Rest API - Admin": node2 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(60602)) node3 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(60604)) + let clusterId = 1.uint16 + node1.mountMetadata(clusterId).isOkOr: + assert false, "Failed to mount metadata: " & $error + node2.mountMetadata(clusterId).isOkOr: + assert false, "Failed to mount metadata: " & $error + node3.mountMetadata(clusterId).isOkOr: + assert false, "Failed to mount metadata: " & $error + await allFutures(node1.start(), node2.start(), node3.start()) await allFutures( node1.mountRelay(), @@ -56,7 +64,7 @@ suite "Waku v2 Rest API - Admin": ): Future[void] {.async, gcsafe.} = await sleepAsync(0.milliseconds) - let shard = RelayShard(clusterId: 1, shardId: 0) + let shard = RelayShard(clusterId: clusterId, shardId: 0) node1.subscribe((kind: PubsubSub, topic: $shard), simpleHandler).isOkOr: assert false, "Failed to subscribe to topic: " & $error node2.subscribe((kind: PubsubSub, topic: $shard), simpleHandler).isOkOr: diff --git a/tests/wakunode_rest/test_rest_relay.nim b/tests/wakunode_rest/test_rest_relay.nim index 8ea7f2abe..147f6e68f 100644 --- a/tests/wakunode_rest/test_rest_relay.nim +++ b/tests/wakunode_rest/test_rest_relay.nim @@ -296,7 +296,7 @@ suite "Waku v2 Rest API - Relay": await node.start() (await node.mountRelay()).isOkOr: assert false, "Failed to mount relay" - require node.mountSharding(1, 8).isOk + require node.mountAutoSharding(1, 8).isOk var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") @@ -346,6 +346,7 @@ suite "Waku v2 Rest API - Relay": await node.start() (await node.mountRelay()).isOkOr: assert false, "Failed to mount relay" + require node.mountAutoSharding(1, 8).isOk var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") @@ -404,6 +405,7 @@ suite "Waku v2 Rest API - Relay": await node.start() (await node.mountRelay()).isOkOr: assert false, "Failed to mount relay" + require node.mountAutoSharding(1, 8).isOk var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") @@ -469,6 +471,8 @@ suite "Waku v2 Rest API - Relay": await node.start() (await node.mountRelay()).isOkOr: assert false, "Failed to mount relay" + require node.mountAutoSharding(1, 8).isOk + let wakuRlnConfig = WakuRlnConfig( dynamic: false, credIndex: some(1.uint), @@ -528,6 +532,8 @@ suite "Waku v2 Rest API - Relay": await node.start() (await node.mountRelay()).isOkOr: assert false, "Failed to mount relay" + require node.mountAutoSharding(1, 8).isOk + let wakuRlnConfig = WakuRlnConfig( dynamic: false, credIndex: some(1.uint), @@ -641,6 +647,8 @@ suite "Waku v2 Rest API - Relay": await node.start() (await node.mountRelay()).isOkOr: assert false, "Failed to mount relay" + require node.mountAutoSharding(1, 8).isOk + let wakuRlnConfig = WakuRlnConfig( dynamic: false, credIndex: some(1.uint), diff --git a/waku/factory/conf_builder/waku_conf_builder.nim b/waku/factory/conf_builder/waku_conf_builder.nim index 0b9ca0d88..ee7ca1b8c 100644 --- a/waku/factory/conf_builder/waku_conf_builder.nim +++ b/waku/factory/conf_builder/waku_conf_builder.nim @@ -59,8 +59,9 @@ type WakuConfBuilder* = object nodeKey: Option[crypto.PrivateKey] clusterId: Option[uint16] - numShardsInNetwork: Option[uint32] - shards: Option[seq[uint16]] + shardingConf: Option[ShardingConfKind] + numShardsInCluster: Option[uint16] + subscribeShards: Option[seq[uint16]] protectedShards: Option[seq[ProtectedShard]] contentTopics: Option[seq[string]] @@ -83,7 +84,7 @@ type WakuConfBuilder* = object # TODO: move within a relayConf rendezvous: Option[bool] - clusterConf: Option[ClusterConf] + networkConf: Option[NetworkConf] staticNodes: seq[string] @@ -135,8 +136,8 @@ proc init*(T: type WakuConfBuilder): WakuConfBuilder = webSocketConf: WebSocketConfBuilder.init(), ) -proc withClusterConf*(b: var WakuConfBuilder, clusterConf: ClusterConf) = - b.clusterConf = some(clusterConf) +proc withNetworkConf*(b: var WakuConfBuilder, networkConf: NetworkConf) = + b.networkConf = some(networkConf) proc withNodeKey*(b: var WakuConfBuilder, nodeKey: crypto.PrivateKey) = b.nodeKey = some(nodeKey) @@ -144,11 +145,14 @@ proc withNodeKey*(b: var WakuConfBuilder, nodeKey: crypto.PrivateKey) = proc withClusterId*(b: var WakuConfBuilder, clusterId: uint16) = b.clusterId = some(clusterId) -proc withNumShardsInNetwork*(b: var WakuConfBuilder, numShardsInNetwork: uint32) = - b.numShardsInNetwork = some(numShardsInNetwork) +proc withShardingConf*(b: var WakuConfBuilder, shardingConf: ShardingConfKind) = + b.shardingConf = some(shardingConf) -proc withShards*(b: var WakuConfBuilder, shards: seq[uint16]) = - b.shards = some(shards) +proc withNumShardsInCluster*(b: var WakuConfBuilder, numShardsInCluster: uint16) = + b.numShardsInCluster = some(numShardsInCluster) + +proc withSubscribeShards*(b: var WakuConfBuilder, shards: seq[uint16]) = + b.subscribeShards = some(shards) proc withProtectedShards*( b: var WakuConfBuilder, protectedShards: seq[ProtectedShard] @@ -269,6 +273,8 @@ proc withMaxMessageSize*(builder: var WakuConfBuilder, maxMessageSize: string) = proc withStaticNodes*(builder: var WakuConfBuilder, staticNodes: seq[string]) = builder.staticNodes = concat(builder.staticNodes, staticNodes) +## Building + proc nodeKey( builder: WakuConfBuilder, rng: ref HmacDrbgContext ): Result[crypto.PrivateKey, string] = @@ -281,77 +287,105 @@ proc nodeKey( return err("Failed to generate key: " & $error) return ok(nodeKey) -proc applyClusterConf(builder: var WakuConfBuilder) = - # Apply cluster conf, overrides most values passed individually - # If you want to tweak values, don't use clusterConf - if builder.clusterConf.isNone(): +proc buildShardingConf( + bShardingConfKind: Option[ShardingConfKind], + bNumShardsInCluster: Option[uint16], + bSubscribeShards: Option[seq[uint16]], +): (ShardingConf, seq[uint16]) = + echo "bSubscribeShards: ", bSubscribeShards + case bShardingConfKind.get(AutoSharding) + of StaticSharding: + (ShardingConf(kind: StaticSharding), bSubscribeShards.get(@[])) + of AutoSharding: + let numShardsInCluster = bNumShardsInCluster.get(1) + let shardingConf = + ShardingConf(kind: AutoSharding, numShardsInCluster: numShardsInCluster) + let upperShard = uint16(numShardsInCluster - 1) + (shardingConf, bSubscribeShards.get(toSeq(0.uint16 .. upperShard))) + +proc applyNetworkConf(builder: var WakuConfBuilder) = + # Apply network conf, overrides most values passed individually + # If you want to tweak values, don't use networkConf + # TODO: networkconf should be one field of the conf builder so that this function becomes unnecessary + if builder.networkConf.isNone(): return - let clusterConf = builder.clusterConf.get() + let networkConf = builder.networkConf.get() if builder.clusterId.isSome(): - warn "Cluster id was provided alongside a cluster conf", - used = clusterConf.clusterId, discarded = builder.clusterId.get() - builder.clusterId = some(clusterConf.clusterId) + warn "Cluster id was provided alongside a network conf", + used = networkConf.clusterId, discarded = builder.clusterId.get() + builder.clusterId = some(networkConf.clusterId) # Apply relay parameters - if builder.relay.get(false) and clusterConf.rlnRelay: + if builder.relay.get(false) and networkConf.rlnRelay: if builder.rlnRelayConf.enabled.isSome(): - warn "RLN Relay was provided alongside a cluster conf", - used = clusterConf.rlnRelay, discarded = builder.rlnRelayConf.enabled + warn "RLN Relay was provided alongside a network conf", + used = networkConf.rlnRelay, discarded = builder.rlnRelayConf.enabled builder.rlnRelayConf.withEnabled(true) if builder.rlnRelayConf.ethContractAddress.get("") != "": - warn "RLN Relay ETH Contract Address was provided alongside a cluster conf", - used = clusterConf.rlnRelayEthContractAddress.string, + warn "RLN Relay ETH Contract Address was provided alongside a network conf", + used = networkConf.rlnRelayEthContractAddress.string, discarded = builder.rlnRelayConf.ethContractAddress.get().string - builder.rlnRelayConf.withEthContractAddress(clusterConf.rlnRelayEthContractAddress) + builder.rlnRelayConf.withEthContractAddress(networkConf.rlnRelayEthContractAddress) if builder.rlnRelayConf.chainId.isSome(): - warn "RLN Relay Chain Id was provided alongside a cluster conf", - used = clusterConf.rlnRelayChainId, discarded = builder.rlnRelayConf.chainId - builder.rlnRelayConf.withChainId(clusterConf.rlnRelayChainId) + warn "RLN Relay Chain Id was provided alongside a network conf", + used = networkConf.rlnRelayChainId, discarded = builder.rlnRelayConf.chainId + builder.rlnRelayConf.withChainId(networkConf.rlnRelayChainId) if builder.rlnRelayConf.dynamic.isSome(): - warn "RLN Relay Dynamic was provided alongside a cluster conf", - used = clusterConf.rlnRelayDynamic, discarded = builder.rlnRelayConf.dynamic - builder.rlnRelayConf.withDynamic(clusterConf.rlnRelayDynamic) + warn "RLN Relay Dynamic was provided alongside a network conf", + used = networkConf.rlnRelayDynamic, discarded = builder.rlnRelayConf.dynamic + builder.rlnRelayConf.withDynamic(networkConf.rlnRelayDynamic) if builder.rlnRelayConf.epochSizeSec.isSome(): - warn "RLN Epoch Size in Seconds was provided alongside a cluster conf", - used = clusterConf.rlnEpochSizeSec, + warn "RLN Epoch Size in Seconds was provided alongside a network conf", + used = networkConf.rlnEpochSizeSec, discarded = builder.rlnRelayConf.epochSizeSec - builder.rlnRelayConf.withEpochSizeSec(clusterConf.rlnEpochSizeSec) + builder.rlnRelayConf.withEpochSizeSec(networkConf.rlnEpochSizeSec) if builder.rlnRelayConf.userMessageLimit.isSome(): - warn "RLN Relay Dynamic was provided alongside a cluster conf", - used = clusterConf.rlnRelayUserMessageLimit, + warn "RLN Relay Dynamic was provided alongside a network conf", + used = networkConf.rlnRelayUserMessageLimit, discarded = builder.rlnRelayConf.userMessageLimit - builder.rlnRelayConf.withUserMessageLimit(clusterConf.rlnRelayUserMessageLimit) + builder.rlnRelayConf.withUserMessageLimit(networkConf.rlnRelayUserMessageLimit) # End Apply relay parameters case builder.maxMessageSize.kind of mmskNone: discard of mmskStr, mmskInt: - warn "Max Message Size was provided alongside a cluster conf", - used = clusterConf.maxMessageSize, discarded = $builder.maxMessageSize - builder.withMaxMessageSize(parseCorrectMsgSize(clusterConf.maxMessageSize)) + warn "Max Message Size was provided alongside a network conf", + used = networkConf.maxMessageSize, discarded = $builder.maxMessageSize + builder.withMaxMessageSize(parseCorrectMsgSize(networkConf.maxMessageSize)) - if builder.numShardsInNetwork.isSome(): - warn "Num Shards In Network was provided alongside a cluster conf", - used = clusterConf.numShardsInNetwork, discarded = builder.numShardsInNetwork - builder.numShardsInNetwork = some(clusterConf.numShardsInNetwork) + if builder.shardingConf.isSome(): + warn "Sharding Conf was provided alongside a network conf", + used = networkConf.shardingConf.kind, discarded = builder.shardingConf - if clusterConf.discv5Discovery: + if builder.numShardsInCluster.isSome(): + warn "Num Shards In Cluster was provided alongside a network conf", + used = networkConf.shardingConf.numShardsInCluster, + discarded = builder.numShardsInCluster + + case networkConf.shardingConf.kind + of StaticSharding: + builder.shardingConf = some(StaticSharding) + of AutoSharding: + builder.shardingConf = some(AutoSharding) + builder.numShardsInCluster = some(networkConf.shardingConf.numShardsInCluster) + + if networkConf.discv5Discovery: if builder.discv5Conf.enabled.isNone: - builder.discv5Conf.withEnabled(clusterConf.discv5Discovery) + builder.discv5Conf.withEnabled(networkConf.discv5Discovery) if builder.discv5Conf.bootstrapNodes.len == 0 and - clusterConf.discv5BootstrapNodes.len > 0: - warn "Discv5 Boostrap nodes were provided alongside a cluster conf", - used = clusterConf.discv5BootstrapNodes, + networkConf.discv5BootstrapNodes.len > 0: + warn "Discv5 Bootstrap nodes were provided alongside a network conf", + used = networkConf.discv5BootstrapNodes, discarded = builder.discv5Conf.bootstrapNodes - builder.discv5Conf.withBootstrapNodes(clusterConf.discv5BootstrapNodes) + builder.discv5Conf.withBootstrapNodes(networkConf.discv5BootstrapNodes) proc build*( builder: var WakuConfBuilder, rng: ref HmacDrbgContext = crypto.newRng() @@ -361,7 +395,7 @@ proc build*( ## of libwaku. It aims to be agnostic so it does not apply a ## default when it is opinionated. - applyClusterConf(builder) + applyNetworkConf(builder) let relay = if builder.relay.isSome(): @@ -411,24 +445,14 @@ proc build*( else: builder.clusterId.get().uint16 - let numShardsInNetwork = - if builder.numShardsInNetwork.isSome(): - builder.numShardsInNetwork.get() - else: - warn "Number of shards in network not specified, defaulting to zero (improve is wip)" - 0 - - let shards = - if builder.shards.isSome(): - builder.shards.get() - else: - warn "shards not specified, defaulting to all shards in network" - # TODO: conversion should not be needed - let upperShard: uint16 = uint16(numShardsInNetwork - 1) - toSeq(0.uint16 .. upperShard) - + let (shardingConf, subscribeShards) = buildShardingConf( + builder.shardingConf, builder.numShardsInCluster, builder.subscribeShards + ) let protectedShards = builder.protectedShards.get(@[]) + info "Sharding configuration: ", + shardingConf = $shardingConf, subscribeShards = $subscribeShards + let maxMessageSizeBytes = case builder.maxMessageSize.kind of mmskInt: @@ -584,9 +608,9 @@ proc build*( # end confs nodeKey: nodeKey, clusterId: clusterId, - numShardsInNetwork: numShardsInNetwork, + shardingConf: shardingConf, contentTopics: contentTopics, - shards: shards, + subscribeShards: subscribeShards, protectedShards: protectedShards, relay: relay, lightPush: lightPush, @@ -601,7 +625,7 @@ proc build*( logLevel: logLevel, logFormat: logFormat, # TODO: Separate builders - networkConf: NetworkConfig( + endpointConf: EndpointConf( natStrategy: natStrategy, p2pTcpPort: p2pTcpPort, dns4DomainName: dns4DomainName, diff --git a/waku/factory/conf_builder/web_socket_conf_builder.nim b/waku/factory/conf_builder/web_socket_conf_builder.nim index 5ed3d230a..88edc0941 100644 --- a/waku/factory/conf_builder/web_socket_conf_builder.nim +++ b/waku/factory/conf_builder/web_socket_conf_builder.nim @@ -1,5 +1,5 @@ import chronicles, std/[net, options], results -import ../networks_config +import waku/factory/waku_conf logScope: topics = "waku conf builder websocket" diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 704c6d4e5..2d7205e87 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -314,28 +314,16 @@ hence would have reachability issues.""", name: "staticnode" .}: seq[string] - # TODO: This is trying to do too much, this should only be used for autosharding, which itself should be configurable - # If numShardsInNetwork is not set, we use the number of shards configured as numShardsInNetwork numShardsInNetwork* {. - desc: "Number of shards in the network", - defaultValue: 0, + desc: + "Enables autosharding and set number of shards in the cluster, set to `0` to use static sharding", + defaultValue: 1, name: "num-shards-in-network" - .}: uint32 + .}: uint16 shards* {. desc: - "Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.", - defaultValue: - @[ - uint16(0), - uint16(1), - uint16(2), - uint16(3), - uint16(4), - uint16(5), - uint16(6), - uint16(7), - ], + "Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated. Subscribes to all shards by default in auto-sharding, no shard for static sharding", name: "shard" .}: seq[uint16] @@ -858,9 +846,9 @@ proc toKeystoreGeneratorConf*(n: WakuNodeConf): RlnKeystoreGeneratorConf = proc toInspectRlnDbConf*(n: WakuNodeConf): InspectRlnDbConf = return InspectRlnDbConf(treePath: n.treePath) -proc toClusterConf( +proc toNetworkConf( preset: string, clusterId: Option[uint16] -): ConfResult[Option[ClusterConf]] = +): ConfResult[Option[NetworkConf]] = var lcPreset = toLowerAscii(preset) if clusterId.isSome() and clusterId.get() == 1: warn( @@ -870,9 +858,9 @@ proc toClusterConf( case lcPreset of "": - ok(none(ClusterConf)) + ok(none(NetworkConf)) of "twn": - ok(some(ClusterConf.TheWakuNetworkConf())) + ok(some(NetworkConf.TheWakuNetworkConf())) else: err("Invalid --preset value passed: " & lcPreset) @@ -909,11 +897,11 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] = b.withProtectedShards(n.protectedShards) b.withClusterId(n.clusterId) - let clusterConf = toClusterConf(n.preset, some(n.clusterId)).valueOr: + let networkConf = toNetworkConf(n.preset, some(n.clusterId)).valueOr: return err("Error determining cluster from preset: " & $error) - if clusterConf.isSome(): - b.withClusterConf(clusterConf.get()) + if networkConf.isSome(): + b.withNetworkConf(networkConf.get()) b.withAgentString(n.agentString) @@ -948,9 +936,16 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] = b.withStaticNodes(n.staticNodes) if n.numShardsInNetwork != 0: - b.withNumShardsInNetwork(n.numShardsInNetwork) + b.withNumShardsInCluster(n.numShardsInNetwork) + b.withShardingConf(AutoSharding) + else: + b.withShardingConf(StaticSharding) + + # It is not possible to pass an empty sequence on the CLI + # If this is empty, it means the user did not specify any shards + if n.shards.len != 0: + b.withSubscribeShards(n.shards) - b.withShards(n.shards) b.withContentTopics(n.contentTopics) b.storeServiceConf.withEnabled(n.store) diff --git a/waku/factory/internal_config.nim b/waku/factory/internal_config.nim index 4f252fd00..9fc3602a0 100644 --- a/waku/factory/internal_config.nim +++ b/waku/factory/internal_config.nim @@ -6,13 +6,7 @@ import libp2p/nameresolving/dnsresolver, std/[options, sequtils, net], results -import - ../common/utils/nat, - ../node/net_config, - ../waku_enr, - ../waku_core, - ./waku_conf, - ./networks_config +import ../common/utils/nat, ../node/net_config, ../waku_enr, ../waku_core, ./waku_conf proc enrConfiguration*( conf: WakuConf, netConfig: NetConfig @@ -29,7 +23,7 @@ proc enrConfiguration*( enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs) enrBuilder.withWakuRelaySharding( - RelayShards(clusterId: conf.clusterId, shardIds: conf.shards) + RelayShards(clusterId: conf.clusterId, shardIds: conf.subscribeShards) ).isOkOr: return err("could not initialize ENR with shards") @@ -64,7 +58,7 @@ proc dnsResolve*( # TODO: Reduce number of parameters, can be done once the same is done on Netconfig.init proc networkConfiguration*( clusterId: uint16, - conf: NetworkConfig, + conf: EndpointConf, discv5Conf: Option[Discv5Conf], webSocketConf: Option[WebSocketConf], wakuFlags: CapabilitiesBitfield, @@ -143,11 +137,3 @@ proc networkConfiguration*( ) return netConfigRes - -# TODO: numShardsInNetwork should be mandatory with autosharding, and unneeded otherwise -proc getNumShardsInNetwork*(conf: WakuConf): uint32 = - if conf.numShardsInNetwork != 0: - return conf.numShardsInNetwork - # If conf.numShardsInNetwork is not set, use 1024 - the maximum possible as per the static sharding spec - # https://github.com/waku-org/specs/blob/master/standards/core/relay-sharding.md#static-sharding - return uint32(MaxShardIndex + 1) diff --git a/waku/factory/networks_config.nim b/waku/factory/networks_config.nim index 9d1da0ace..c7193aa9c 100644 --- a/waku/factory/networks_config.nim +++ b/waku/factory/networks_config.nim @@ -1,18 +1,23 @@ {.push raises: [].} -import stint, std/[nativesockets, options] +import chronicles, results, stint -type WebSocketSecureConf* {.requiresInit.} = object - keyPath*: string - certPath*: string +logScope: + topics = "waku networks conf" -type WebSocketConf* = object - port*: Port - secureConf*: Option[WebSocketSecureConf] +type + ShardingConfKind* = enum + AutoSharding + StaticSharding -# TODO: Rename this type to match file name + ShardingConf* = object + case kind*: ShardingConfKind + of AutoSharding: + numShardsInCluster*: uint16 + of StaticSharding: + discard -type ClusterConf* = object +type NetworkConf* = object maxMessageSize*: string # TODO: static convert to a uint64 clusterId*: uint16 rlnRelay*: bool @@ -21,17 +26,16 @@ type ClusterConf* = object rlnRelayDynamic*: bool rlnEpochSizeSec*: uint64 rlnRelayUserMessageLimit*: uint64 - # TODO: should be uint16 like the `shards` parameter - numShardsInNetwork*: uint32 + shardingConf*: ShardingConf discv5Discovery*: bool discv5BootstrapNodes*: seq[string] # cluster-id=1 (aka The Waku Network) # Cluster configuration corresponding to The Waku Network. Note that it # overrides existing cli configuration -proc TheWakuNetworkConf*(T: type ClusterConf): ClusterConf = +proc TheWakuNetworkConf*(T: type NetworkConf): NetworkConf = const RelayChainId = 59141'u256 - return ClusterConf( + return NetworkConf( maxMessageSize: "150KiB", clusterId: 1, rlnRelay: true, @@ -40,7 +44,7 @@ proc TheWakuNetworkConf*(T: type ClusterConf): ClusterConf = rlnRelayChainId: RelayChainId, rlnEpochSizeSec: 600, rlnRelayUserMessageLimit: 100, - numShardsInNetwork: 8, + shardingConf: ShardingConf(kind: AutoSharding, numShardsInCluster: 8), discv5Discovery: true, discv5BootstrapNodes: @[ @@ -49,3 +53,21 @@ proc TheWakuNetworkConf*(T: type ClusterConf): ClusterConf = "enr:-QEkuEBfEzJm_kigJ2HoSS_RBFJYhKHocGdkhhBr6jSUAWjLdFPp6Pj1l4yiTQp7TGHyu1kC6FyaU573VN8klLsEm-XuAYJpZIJ2NIJpcIQI2SVcim11bHRpYWRkcnO4bgA0Ni9ub2RlLTAxLmFjLWNuLWhvbmdrb25nLWMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQZ2XwA2Ni9ub2RlLTAxLmFjLWNuLWhvbmdrb25nLWMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQYfQN4DgnJzkwABCAAAAAEAAgADAAQABQAGAAeJc2VjcDI1NmsxoQOwsS69tgD7u1K50r5-qG5hweuTwa0W26aYPnvivpNlrYN0Y3CCdl-DdWRwgiMohXdha3UyDw", ], ) + +proc validateShards*( + shardingConf: ShardingConf, shards: seq[uint16] +): Result[void, string] = + case shardingConf.kind + of StaticSharding: + return ok() + of AutoSharding: + let numShardsInCluster = shardingConf.numShardsInCluster + for shard in shards: + if shard >= numShardsInCluster: + let msg = + "validateShards invalid shard: " & $shard & " when numShardsInCluster: " & + $numShardsInCluster + error "validateShards failed", error = msg + return err(msg) + + return ok() diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 5298fa2b9..5e038ee0d 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -10,6 +10,7 @@ import import ./internal_config, + ./networks_config, ./waku_conf, ./builder, ./validator_signed, @@ -137,10 +138,12 @@ proc initNode( proc getAutoshards*( node: WakuNode, contentTopics: seq[string] ): Result[seq[RelayShard], string] = + if node.wakuAutoSharding.isNone(): + return err("Static sharding used, cannot get shards from content topics") var autoShards: seq[RelayShard] for contentTopic in contentTopics: - let shard = node.wakuSharding.getShard(contentTopic).valueOr: - return err("Could not parse content topic: " & error) + let shard = node.wakuAutoSharding.get().getShard(contentTopic).valueOr: + return err("Could not parse content topic: " & error) autoShards.add(shard) return ok(autoshards) @@ -258,16 +261,11 @@ proc setupProtocols( if conf.storeServiceConf.isSome and conf.storeServiceConf.get().resume: node.setupStoreResume() - # If conf.numShardsInNetwork is not set, use the number of shards configured as numShardsInNetwork - let numShardsInNetwork = getNumShardsInNetwork(conf) - - if conf.numShardsInNetwork == 0: - warn "Number of shards in network not configured, setting it to", - # TODO: If not configured, it mounts 1024 shards! Make it a mandatory configuration instead - numShardsInNetwork = $numShardsInNetwork - - node.mountSharding(conf.clusterId, numShardsInNetwork).isOkOr: - return err("failed to mount waku sharding: " & error) + if conf.shardingConf.kind == AutoSharding: + node.mountAutoSharding(conf.clusterId, conf.shardingConf.numShardsInCluster).isOkOr: + return err("failed to mount waku auto sharding: " & error) + else: + warn("Auto sharding is disabled") # Mount relay on all nodes var peerExchangeHandler = none(RoutingRecordsHandler) @@ -290,14 +288,22 @@ proc setupProtocols( peerExchangeHandler = some(handlePeerExchange) - let autoShards = node.getAutoshards(conf.contentTopics).valueOr: - return err("Could not get autoshards: " & error) + # TODO: when using autosharding, the user should not be expected to pass any shards, but only content topics + # Hence, this joint logic should be removed in favour of an either logic: + # use passed shards (static) or deduce shards from content topics (auto) + let autoShards = + if node.wakuAutoSharding.isSome(): + node.getAutoshards(conf.contentTopics).valueOr: + return err("Could not get autoshards: " & error) + else: + @[] debug "Shards created from content topics", contentTopics = conf.contentTopics, shards = autoShards - let confShards = - conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it))) + let confShards = conf.subscribeShards.mapIt( + RelayShard(clusterId: conf.clusterId, shardId: uint16(it)) + ) let shards = confShards & autoShards if conf.relay: @@ -313,7 +319,7 @@ proc setupProtocols( # Add validation keys to protected topics var subscribedProtectedShards: seq[ProtectedShard] for shardKey in conf.protectedShards: - if shardKey.shard notin conf.shards: + if shardKey.shard notin conf.subscribeShards: warn "protected shard not in subscribed shards, skipping adding validator", protectedShard = shardKey.shard, subscribedShards = shards continue @@ -472,7 +478,7 @@ proc setupNode*( wakuConf: WakuConf, rng: ref HmacDrbgContext = crypto.newRng(), relay: Relay ): Result[WakuNode, string] = let netConfig = networkConfiguration( - wakuConf.clusterId, wakuConf.networkConf, wakuConf.discv5Conf, + wakuConf.clusterId, wakuConf.endpointConf, wakuConf.discv5Conf, wakuConf.webSocketConf, wakuConf.wakuFlags, wakuConf.dnsAddrsNameServers, wakuConf.portsShift, clientId, ).valueOr: diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index faca627a4..d733c6bf5 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -130,8 +130,9 @@ proc setupAppCallbacks( let autoShards = node.getAutoshards(conf.contentTopics).valueOr: return err("Could not get autoshards: " & error) - let confShards = - conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it))) + let confShards = conf.subscribeShards.mapIt( + RelayShard(clusterId: conf.clusterId, shardId: uint16(it)) + ) let shards = confShards & autoShards let uniqueShards = deduplicate(shards) @@ -249,14 +250,14 @@ proc getRunningNetConfig(waku: ptr Waku): Result[NetConfig, string] = return err("Could not retrieve ports: " & error) if tcpPort.isSome(): - conf.networkConf.p2pTcpPort = tcpPort.get() + conf.endpointConf.p2pTcpPort = tcpPort.get() if websocketPort.isSome() and conf.webSocketConf.isSome(): conf.webSocketConf.get().port = websocketPort.get() # Rebuild NetConfig with bound port values let netConf = networkConfiguration( - conf.clusterId, conf.networkConf, conf.discv5Conf, conf.webSocketConf, + conf.clusterId, conf.endpointConf, conf.discv5Conf, conf.webSocketConf, conf.wakuFlags, conf.dnsAddrsNameServers, conf.portsShift, clientId, ).valueOr: return err("Could not update NetConfig: " & error) @@ -306,7 +307,7 @@ proc updateAddressInENR(waku: ptr Waku): Result[void, string] = proc updateWaku(waku: ptr Waku): Result[void, string] = let conf = waku[].conf - if conf.networkConf.p2pTcpPort == Port(0) or + if conf.endpointConf.p2pTcpPort == Port(0) or (conf.websocketConf.isSome() and conf.websocketConf.get.port == Port(0)): updateEnr(waku).isOkOr: return err("error calling updateEnr: " & $error) @@ -389,7 +390,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} = waku.dynamicBootstrapNodes, waku.rng, conf.nodeKey, - conf.networkConf.p2pListenAddress, + conf.endpointConf.p2pListenAddress, conf.portsShift, ) @@ -413,7 +414,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} = conf.relay, conf.lightPush, conf.clusterId, - conf.shards, + conf.subscribeShards, conf.contentTopics, ).isOkOr: return err ("Starting protocols support REST server failed: " & $error) diff --git a/waku/factory/waku_conf.nim b/waku/factory/waku_conf.nim index 584c60f33..6ffda1c14 100644 --- a/waku/factory/waku_conf.nim +++ b/waku/factory/waku_conf.nim @@ -20,6 +20,14 @@ export RlnRelayConf, RlnRelayCreds, RestServerConf, Discv5Conf, MetricsServerCon logScope: topics = "waku conf" +type WebSocketSecureConf* {.requiresInit.} = object + keyPath*: string + certPath*: string + +type WebSocketConf* = object + port*: Port + secureConf*: Option[WebSocketSecureConf] + # TODO: should be defined in validator_signed.nim and imported here type ProtectedShard* {.requiresInit.} = object shard*: uint16 @@ -50,7 +58,7 @@ type FilterServiceConf* {.requiresInit.} = object subscriptionTimeout*: uint16 maxCriteria*: uint32 -type NetworkConfig* = object # TODO: make enum +type EndpointConf* = object # TODO: make enum natStrategy*: string p2pTcpPort*: Port dns4DomainName*: Option[string] @@ -68,11 +76,10 @@ type WakuConf* {.requiresInit.} = ref object nodeKey*: crypto.PrivateKey clusterId*: uint16 - shards*: seq[uint16] + subscribeShards*: seq[uint16] protectedShards*: seq[ProtectedShard] - # TODO: move to an autoShardingConf - numShardsInNetwork*: uint32 + shardingConf*: ShardingConf contentTopics*: seq[string] relay*: bool @@ -95,7 +102,7 @@ type WakuConf* {.requiresInit.} = ref object portsShift*: uint16 dnsAddrsNameServers*: seq[IpAddress] - networkConf*: NetworkConfig + endpointConf*: EndpointConf wakuFlags*: CapabilitiesBitfield # TODO: could probably make it a `PeerRemoteInfo` @@ -142,8 +149,8 @@ proc logConf*(conf: WakuConf) = info "Configuration. Network", cluster = conf.clusterId - for shard in conf.shards: - info "Configuration. Shards", shard = shard + for shard in conf.subscribeShards: + info "Configuration. Active Relay Shards", shard = shard if conf.discv5Conf.isSome(): for i in conf.discv5Conf.get().bootstrapNodes: @@ -165,26 +172,9 @@ proc validateNodeKey(wakuConf: WakuConf): Result[void, string] = return err("nodekey param is invalid") return ok() -proc validateShards(wakuConf: WakuConf): Result[void, string] = - let numShardsInNetwork = wakuConf.numShardsInNetwork - - # TODO: fix up this behaviour - if numShardsInNetwork == 0: - return ok() - - for shard in wakuConf.shards: - if shard >= numShardsInNetwork: - let msg = - "validateShards invalid shard: " & $shard & " when numShardsInNetwork: " & - $numShardsInNetwork # fmt doesn't work - error "validateShards failed", error = msg - return err(msg) - - return ok() - proc validateNoEmptyStrings(wakuConf: WakuConf): Result[void, string] = - if wakuConf.networkConf.dns4DomainName.isSome() and - isEmptyOrWhiteSpace(wakuConf.networkConf.dns4DomainName.get().string): + if wakuConf.endpointConf.dns4DomainName.isSome() and + isEmptyOrWhiteSpace(wakuConf.endpointConf.dns4DomainName.get().string): return err("dns4-domain-name is an empty string, set it to none(string) instead") if isEmptyOrWhiteSpace(wakuConf.relayServiceRatio): @@ -236,6 +226,6 @@ proc validateNoEmptyStrings(wakuConf: WakuConf): Result[void, string] = proc validate*(wakuConf: WakuConf): Result[void, string] = ?wakuConf.validateNodeKey() - ?wakuConf.validateShards() + ?wakuConf.shardingConf.validateShards(wakuConf.subscribeShards) ?wakuConf.validateNoEmptyStrings() return ok() diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 6a5c3fdb0..ccd62664f 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -112,7 +112,7 @@ type wakuLightpushClient*: WakuLightPushClient wakuPeerExchange*: WakuPeerExchange wakuMetadata*: WakuMetadata - wakuSharding*: Sharding + wakuAutoSharding*: Option[Sharding] enr*: enr.Record libp2pPing*: Ping rng*: ref rand.HmacDrbgContext @@ -198,12 +198,13 @@ proc mountMetadata*(node: WakuNode, clusterId: uint32): Result[void, string] = return ok() -## Waku Sharding -proc mountSharding*( +## Waku AutoSharding +proc mountAutoSharding*( node: WakuNode, clusterId: uint16, shardCount: uint32 ): Result[void, string] = - info "mounting sharding", clusterId = clusterId, shardCount = shardCount - node.wakuSharding = Sharding(clusterId: clusterId, shardCountGenZero: shardCount) + info "mounting auto sharding", clusterId = clusterId, shardCount = shardCount + node.wakuAutoSharding = + some(Sharding(clusterId: clusterId, shardCountGenZero: shardCount)) return ok() ## Waku Sync @@ -322,11 +323,15 @@ proc subscribe*( let (pubsubTopic, contentTopicOp) = case subscription.kind of ContentSub: - let shard = node.wakuSharding.getShard((subscription.topic)).valueOr: - error "Autosharding error", error = error - return err("Autosharding error: " & error) - - ($shard, some(subscription.topic)) + if node.wakuAutoSharding.isSome(): + let shard = node.wakuAutoSharding.get().getShard((subscription.topic)).valueOr: + error "Autosharding error", error = error + return err("Autosharding error: " & error) + ($shard, some(subscription.topic)) + else: + return err( + "Static sharding is used, relay subscriptions must specify a pubsub topic" + ) of PubsubSub: (subscription.topic, none(ContentTopic)) else: @@ -353,11 +358,15 @@ proc unsubscribe*( let (pubsubTopic, contentTopicOp) = case subscription.kind of ContentUnsub: - let shard = node.wakuSharding.getShard((subscription.topic)).valueOr: - error "Autosharding error", error = error - return err("Autosharding error: " & error) - - ($shard, some(subscription.topic)) + if node.wakuAutoSharding.isSome(): + let shard = node.wakuAutoSharding.get().getShard((subscription.topic)).valueOr: + error "Autosharding error", error = error + return err("Autosharding error: " & error) + ($shard, some(subscription.topic)) + else: + return err( + "Static sharding is used, relay subscriptions must specify a pubsub topic" + ) of PubsubUnsub: (subscription.topic, none(ContentTopic)) else: @@ -388,9 +397,10 @@ proc publish*( return err(msg) let pubsubTopic = pubsubTopicOp.valueOr: - node.wakuSharding.getShard(message.contentTopic).valueOr: + if node.wakuAutoSharding.isNone(): + return err("Pubsub topic must be specified when static sharding is enabled.") + node.wakuAutoSharding.get().getShard(message.contentTopic).valueOr: let msg = "Autosharding error: " & error - error "publish error", err = msg return err(msg) #TODO instead of discard return error when 0 peers received the message @@ -564,8 +574,14 @@ proc filterSubscribe*( waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) return subRes + elif node.wakuAutoSharding.isNone(): + error "Failed filter subscription, pubsub topic must be specified with static sharding" + waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) else: - let topicMapRes = node.wakuSharding.parseSharding(pubsubTopic, contentTopics) + # No pubsub topic, autosharding is used to deduce it + # but content topics must be well-formed for this + let topicMapRes = + node.wakuAutoSharding.get().getShardsFromContentTopics(contentTopics) let topicMap = if topicMapRes.isErr(): @@ -575,11 +591,11 @@ proc filterSubscribe*( topicMapRes.get() var futures = collect(newSeq): - for pubsub, topics in topicMap.pairs: + for shard, topics in topicMap.pairs: info "registering filter subscription to content", - pubsubTopic = pubsub, contentTopics = topics, peer = remotePeer.peerId + shard = shard, contentTopics = topics, peer = remotePeer.peerId let content = topics.mapIt($it) - node.wakuFilterClient.subscribe(remotePeer, $pubsub, content) + node.wakuFilterClient.subscribe(remotePeer, $shard, content) var subRes: FilterSubscribeResult = FilterSubscribeResult.ok() try: @@ -643,8 +659,12 @@ proc filterUnsubscribe*( waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) return unsubRes + elif node.wakuAutoSharding.isNone(): + error "Failed filter un-subscription, pubsub topic must be specified with static sharding" + waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) else: # pubsubTopic.isNone - let topicMapRes = node.wakuSharding.parseSharding(pubsubTopic, contentTopics) + let topicMapRes = + node.wakuAutoSharding.get().getShardsFromContentTopics(contentTopics) let topicMap = if topicMapRes.isErr(): @@ -654,11 +674,11 @@ proc filterUnsubscribe*( topicMapRes.get() var futures = collect(newSeq): - for pubsub, topics in topicMap.pairs: + for shard, topics in topicMap.pairs: info "deregistering filter subscription to content", - pubsubTopic = pubsub, contentTopics = topics, peer = remotePeer.peerId + shard = shard, contentTopics = topics, peer = remotePeer.peerId let content = topics.mapIt($it) - node.wakuFilterClient.unsubscribe(remotePeer, $pubsub, content) + node.wakuFilterClient.unsubscribe(remotePeer, $shard, content) var unsubRes: FilterSubscribeResult = FilterSubscribeResult.ok() try: @@ -1064,7 +1084,10 @@ proc legacyLightpushPublish*( if pubsubTopic.isSome(): return await internalPublish(node, pubsubTopic.get(), message, peer) - let topicMapRes = node.wakuSharding.parseSharding(pubsubTopic, message.contentTopic) + if node.wakuAutoSharding.isNone(): + return err("Pubsub topic must be specified when static sharding is enabled") + let topicMapRes = + node.wakuAutoSharding.get().getShardsFromContentTopics(message.contentTopic) let topicMap = if topicMapRes.isErr(): @@ -1120,7 +1143,7 @@ proc mountLightPush*( lightpush_protocol.getRelayPushHandler(node.wakuRelay, rlnPeer) node.wakuLightPush = WakuLightPush.new( - node.peerManager, node.rng, pushHandler, node.wakuSharding, some(rateLimit) + node.peerManager, node.rng, pushHandler, node.wakuAutoSharding, some(rateLimit) ) if node.started: @@ -1181,12 +1204,17 @@ proc lightpushPublish*( return lighpushErrorResult(NO_PEERS_TO_RELAY, "no suitable remote peers") let pubsubForPublish = pubSubTopic.valueOr: + if node.wakuAutoSharding.isNone(): + let msg = "Pubsub topic must be specified when static sharding is enabled" + error "lightpush publish error", error = msg + return lighpushErrorResult(INVALID_MESSAGE_ERROR, msg) + let parsedTopic = NsContentTopic.parse(message.contentTopic).valueOr: let msg = "Invalid content-topic:" & $error error "lightpush request handling error", error = msg return lighpushErrorResult(INVALID_MESSAGE_ERROR, msg) - node.wakuSharding.getShard(parsedTopic).valueOr: + node.wakuAutoSharding.get().getShard(parsedTopic).valueOr: let msg = "Autosharding error: " & error error "lightpush publish error", error = msg return lighpushErrorResult(INTERNAL_SERVER_ERROR, msg) diff --git a/waku/waku_api/rest/admin/handlers.nim b/waku/waku_api/rest/admin/handlers.nim index 6bf44e8a2..04cc31010 100644 --- a/waku/waku_api/rest/admin/handlers.nim +++ b/waku/waku_api/rest/admin/handlers.nim @@ -241,13 +241,20 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = let shard = shardId.valueOr: return RestApiResponse.badRequest(fmt("Invalid shardId: {error}")) + if node.wakuMetadata.isNil(): + return RestApiResponse.serviceUnavailable( + "Error: Metadata Protocol is not mounted to the node" + ) + if node.wakuRelay.isNil(): return RestApiResponse.serviceUnavailable( "Error: Relay Protocol is not mounted to the node" ) - let topic = - toPubsubTopic(RelayShard(clusterId: node.wakuSharding.clusterId, shardId: shard)) + # TODO: clusterId and shards should be uint16 across all codebase and probably be defined as a type + let topic = toPubsubTopic( + RelayShard(clusterId: node.wakuMetadata.clusterId.uint16, shardId: shard) + ) let pubsubPeers = node.wakuRelay.getConnectedPubSubPeers(topic).get(initHashSet[PubSubPeer](0)) let relayPeer = PeersOfShard( @@ -284,13 +291,19 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = let shard = shardId.valueOr: return RestApiResponse.badRequest(fmt("Invalid shardId: {error}")) + if node.wakuMetadata.isNil(): + return RestApiResponse.serviceUnavailable( + "Error: Metadata Protocol is not mounted to the node" + ) + if node.wakuRelay.isNil(): return RestApiResponse.serviceUnavailable( "Error: Relay Protocol is not mounted to the node" ) - let topic = - toPubsubTopic(RelayShard(clusterId: node.wakuSharding.clusterId, shardId: shard)) + let topic = toPubsubTopic( + RelayShard(clusterId: node.wakuMetadata.clusterId.uint16, shardId: shard) + ) let peers = node.wakuRelay.getPubSubPeersInMesh(topic).get(initHashSet[PubSubPeer](0)) let relayPeer = PeersOfShard( diff --git a/waku/waku_api/rest/builder.nim b/waku/waku_api/rest/builder.nim index 6725aaeec..eb514439f 100644 --- a/waku/waku_api/rest/builder.nim +++ b/waku/waku_api/rest/builder.nim @@ -151,17 +151,19 @@ proc startRestServerProtocolSupport*( error "Could not subscribe", pubsubTopic, error continue - for contentTopic in contentTopics: - cache.contentSubscribe(contentTopic) + if node.wakuAutoSharding.isSome(): + # Only deduce pubsub topics to subscribe to from content topics if autosharding is enabled + for contentTopic in contentTopics: + cache.contentSubscribe(contentTopic) - let shard = node.wakuSharding.getShard(contentTopic).valueOr: - error "Autosharding error in REST", error = error - continue - let pubsubTopic = $shard + let shard = node.wakuAutoSharding.get().getShard(contentTopic).valueOr: + error "Autosharding error in REST", error = error + continue + let pubsubTopic = $shard - node.subscribe((kind: PubsubSub, topic: pubsubTopic), handler).isOkOr: - error "Could not subscribe", pubsubTopic, error - continue + node.subscribe((kind: PubsubSub, topic: pubsubTopic), handler).isOkOr: + error "Could not subscribe", pubsubTopic, error + continue installRelayApiHandlers(router, node, cache) else: diff --git a/waku/waku_api/rest/relay/handlers.nim b/waku/waku_api/rest/relay/handlers.nim index 06bbc0c06..c268870d7 100644 --- a/waku/waku_api/rest/relay/handlers.nim +++ b/waku/waku_api/rest/relay/handlers.nim @@ -272,11 +272,16 @@ proc installRelayApiHandlers*( var message: WakuMessage = req.toWakuMessage(version = 0).valueOr: return RestApiResponse.badRequest() - let pubsubTopic = node.wakuSharding.getShard(message.contentTopic).valueOr: - let msg = "Autosharding error: " & error + if node.wakuAutoSharding.isNone(): + let msg = "Autosharding is disabled" error "publish error", err = msg return RestApiResponse.badRequest("Failed to publish. " & msg) + let pubsubTopic = node.wakuAutoSharding.get().getShard(message.contentTopic).valueOr: + let msg = "Autosharding error: " & error + error "publish error", err = msg + return RestApiResponse.badRequest("Failed to publish. " & msg) + # if RLN is mounted, append the proof to the message if not node.wakuRlnRelay.isNil(): node.wakuRlnRelay.appendRLNProof(message, float64(getTime().toUnix())).isOkOr: diff --git a/waku/waku_core/topics/content_topic.nim b/waku/waku_core/topics/content_topic.nim index b897c4c44..5984a760b 100644 --- a/waku/waku_core/topics/content_topic.nim +++ b/waku/waku_core/topics/content_topic.nim @@ -122,6 +122,18 @@ proc parse*( "Invalid content topic structure. Expected either //// or /////" return err(ParsingError.invalidFormat(errMsg)) +proc parse*( + T: type NsContentTopic, topics: seq[ContentTopic] +): ParsingResult[seq[NsContentTopic]] = + var res: seq[NsContentTopic] = @[] + for contentTopic in topics: + let parseRes = NsContentTopic.parse(contentTopic) + if parseRes.isErr(): + let error: ParsingError = parseRes.error + return ParsingResult[seq[NsContentTopic]].err(error) + res.add(parseRes.value) + return ParsingResult[seq[NsContentTopic]].ok(res) + # Content topic compatibility converter toContentTopic*(topic: NsContentTopic): ContentTopic = diff --git a/waku/waku_core/topics/sharding.nim b/waku/waku_core/topics/sharding.nim index 4a4af4cb5..d2f652161 100644 --- a/waku/waku_core/topics/sharding.nim +++ b/waku/waku_core/topics/sharding.nim @@ -8,6 +8,7 @@ import nimcrypto, std/options, std/tables, stew/endians2, results, stew/byteutil import ./content_topic, ./pubsub_topic +# TODO: this is autosharding, not just "sharding" type Sharding* = object clusterId*: uint16 # TODO: generations could be stored in a table here @@ -50,48 +51,32 @@ proc getShard*(s: Sharding, topic: ContentTopic): Result[RelayShard, string] = ok(shard) -proc parseSharding*( - s: Sharding, - pubsubTopic: Option[PubsubTopic], - contentTopics: ContentTopic | seq[ContentTopic], +proc getShardsFromContentTopics*( + s: Sharding, contentTopics: ContentTopic | seq[ContentTopic] ): Result[Table[RelayShard, seq[NsContentTopic]], string] = - var topics: seq[ContentTopic] - when contentTopics is seq[ContentTopic]: - topics = contentTopics - else: - topics = @[contentTopics] + let topics = + when contentTopics is seq[ContentTopic]: + contentTopics + else: + @[contentTopics] + + let parseRes = NsContentTopic.parse(topics) + let nsContentTopics = + if parseRes.isErr(): + return err("Cannot parse content topic: " & $parseRes.error) + else: + parseRes.get() var topicMap = initTable[RelayShard, seq[NsContentTopic]]() - for contentTopic in topics: - let parseRes = NsContentTopic.parse(contentTopic) + for content in nsContentTopics: + let shard = s.getShard(content).valueOr: + return err("Cannot deduce shard from content topic: " & $error) - let content = - if parseRes.isErr(): - return err("Cannot parse content topic: " & $parseRes.error) - else: - parseRes.get() - - let pubsub = - if pubsubTopic.isSome(): - let parseRes = RelayShard.parse(pubsubTopic.get()) - - if parseRes.isErr(): - return err("Cannot parse pubsub topic: " & $parseRes.error) - else: - parseRes.get() - else: - let shardsRes = s.getShard(content) - - if shardsRes.isErr(): - return err("Cannot autoshard content topic: " & $shardsRes.error) - else: - shardsRes.get() - - if not topicMap.hasKey(pubsub): - topicMap[pubsub] = @[] + if not topicMap.hasKey(shard): + topicMap[shard] = @[] try: - topicMap[pubsub].add(content) + topicMap[shard].add(content) except CatchableError: return err(getCurrentExceptionMsg()) diff --git a/waku/waku_lightpush/protocol.nim b/waku/waku_lightpush/protocol.nim index 7831fb20f..45dc7c3c1 100644 --- a/waku/waku_lightpush/protocol.nim +++ b/waku/waku_lightpush/protocol.nim @@ -26,12 +26,19 @@ type WakuLightPush* = ref object of LPProtocol peerManager*: PeerManager pushHandler*: PushMessageHandler requestRateLimiter*: RequestRateLimiter - sharding: Sharding + autoSharding: Option[Sharding] proc handleRequest( - wl: WakuLightPush, peerId: PeerId, pushRequest: LightPushRequest + wl: WakuLightPush, peerId: PeerId, pushRequest: LightpushRequest ): Future[WakuLightPushResult] {.async.} = let pubsubTopic = pushRequest.pubSubTopic.valueOr: + if wl.autoSharding.isNone(): + let msg = "Pubsub topic must be specified when static sharding is enabled" + error "lightpush request handling error", error = msg + return WakuLightPushResult.err( + (code: LightpushStatusCode.INVALID_MESSAGE_ERROR, desc: some(msg)) + ) + let parsedTopic = NsContentTopic.parse(pushRequest.message.contentTopic).valueOr: let msg = "Invalid content-topic:" & $error error "lightpush request handling error", error = msg @@ -39,8 +46,8 @@ proc handleRequest( (code: LightPushStatusCode.INVALID_MESSAGE_ERROR, desc: some(msg)) ) - wl.sharding.getShard(parsedTopic).valueOr: - let msg = "Sharding error: " & error + wl.autoSharding.get().getShard(parsedTopic).valueOr: + let msg = "Auto-sharding error: " & error error "lightpush request handling error", error = msg return WakuLightPushResult.err( (code: LightPushStatusCode.INTERNAL_SERVER_ERROR, desc: some(msg)) @@ -149,7 +156,7 @@ proc new*( peerManager: PeerManager, rng: ref rand.HmacDrbgContext, pushHandler: PushMessageHandler, - sharding: Sharding, + autoSharding: Option[Sharding], rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](), ): T = let wl = WakuLightPush( @@ -157,7 +164,7 @@ proc new*( peerManager: peerManager, pushHandler: pushHandler, requestRateLimiter: newRequestRateLimiter(rateLimitSetting), - sharding: sharding, + autoSharding: autoSharding, ) wl.initProtocolHandler() setServiceLimitMetric(WakuLightpushCodec, rateLimitSetting) diff --git a/waku/waku_metadata/protocol.nim b/waku/waku_metadata/protocol.nim index 13a2916b3..75f021dbe 100644 --- a/waku/waku_metadata/protocol.nim +++ b/waku/waku_metadata/protocol.nim @@ -29,7 +29,7 @@ proc respond( m: WakuMetadata, conn: Connection ): Future[Result[void, string]] {.async, gcsafe.} = let response = - WakuMetadataResponse(clusterId: some(m.clusterId), shards: toSeq(m.shards)) + WakuMetadataResponse(clusterId: some(m.clusterId.uint32), shards: toSeq(m.shards)) let res = catch: await conn.writeLP(response.encode().buffer)