From fc7caa4a63cb3e12a127b79ff3a25d1b90a2c3e4 Mon Sep 17 00:00:00 2001 From: fryorcraken Date: Sat, 12 Apr 2025 11:43:08 +1000 Subject: [PATCH] fix autosharding/sharding config --- waku/factory/external_config.nim | 26 ++----------- waku/factory/networks_config.nim | 6 +-- waku/factory/waku_conf.nim | 39 ++++++++++--------- waku/factory/waku_conf_builder.nim | 62 +++++++++++++++++++++--------- 4 files changed, 72 insertions(+), 61 deletions(-) diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 228eee993..e1623ad43 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -213,12 +213,6 @@ type WakuNodeConf* = object name: "max-connections" .}: int - maxRelayPeers* {. - desc: - "Deprecated. Use relay-service-ratio instead. It represents the maximum allowed number of relay peers.", - name: "max-relay-peers" - .}: Option[int] - relayServiceRatio* {. desc: "This percentage ratio represents the relay peers to service peers. For example, 60:40, tells that 60% of the max-connections will be used for relay protocol and the other 40% of max-connections will be reserved for other service protocols (e.g., filter, lightpush, store, metadata, etc.)", @@ -323,30 +317,18 @@ hence would have reachability issues.""", name: "keep-alive" .}: bool - # TODO: This is trying to do too much, this should only be used for autosharding, which itself should be configurable - # If numShardsInNetwork is not set, we use the number of shards configured as numShardsInNetwork numShardsInNetwork* {. - desc: "Number of shards in the network", + desc: + "Enable autosharding by specifying the total number of shards in the network", defaultValue: 0, name: "num-shards-in-network" .}: uint32 shards* {. desc: - "Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.", - defaultValue: - @[ - uint16(0), - uint16(1), - uint16(2), - uint16(3), - uint16(4), - uint16(5), - uint16(6), - uint16(7), - ], + "Shard indexes for relay to subscribe to. Argument may be repeated. If autosharding is enabled, it subscribes to all shards by default.", name: "shard" - .}: seq[uint16] + .}: Option[seq[uint16]] contentTopics* {. desc: "Default content topic to subscribe to. Argument may be repeated.", diff --git a/waku/factory/networks_config.nim b/waku/factory/networks_config.nim index 812a4f17a..ab83eeece 100644 --- a/waku/factory/networks_config.nim +++ b/waku/factory/networks_config.nim @@ -1,4 +1,5 @@ {.push raises: [].} +import std/options # TODO: Rename this type to match file name @@ -11,8 +12,7 @@ type ClusterConf* = object rlnRelayDynamic*: bool rlnEpochSizeSec*: uint64 rlnRelayUserMessageLimit*: uint64 - # TODO: should be uint16 like the `shards` parameter - numShardsInNetwork*: uint32 + numShardsInNetwork*: Option[uint16] # If some, it means autosharding is enabled discv5Discovery*: bool discv5BootstrapNodes*: seq[string] @@ -29,7 +29,7 @@ proc TheWakuNetworkConf*(T: type ClusterConf): ClusterConf = rlnRelayChainId: 11155111, rlnEpochSizeSec: 600, rlnRelayUserMessageLimit: 100, - numShardsInNetwork: 8, + numShardsInNetwork: some(8), discv5Discovery: true, discv5BootstrapNodes: @[ diff --git a/waku/factory/waku_conf.nim b/waku/factory/waku_conf.nim index 6bdbd68ce..1bb2ec86a 100644 --- a/waku/factory/waku_conf.nim +++ b/waku/factory/waku_conf.nim @@ -65,6 +65,15 @@ type NetworkConfig* = object # TODO: make enum extMultiAddrs*: seq[MultiAddress] extMultiAddrsOnly*: bool +type RelayConf* {.requiresInit.} = object + shards*: seq[uint16] + relayShardedPeerManagement*: bool + relayServiceRatio*: string + +type AutoShardingConf* {.requiresInit.} = object + numShardsInNetwork*: uint16 + contentTopics*: seq[string] + ## `WakuConf` is a valid configuration for a Waku node ## All information needed by a waku node should be contained ## In this object. A convenient `validate` method enables doing @@ -75,14 +84,10 @@ type WakuConf* {.requiresInit.} = ref object nodeKey*: crypto.PrivateKey clusterId*: uint16 - shards*: seq[uint16] protectedShards*: seq[ProtectedShard] - # TODO: move to an autoShardingConf - numShardsInNetwork*: uint32 - contentTopics*: seq[string] + autoShardingConf*: Option[AutoShardingConf] - relay*: bool lightPush*: bool peerExchange*: bool @@ -92,6 +97,7 @@ type WakuConf* {.requiresInit.} = ref object circuitRelayClient*: bool keepAlive*: bool + relayConf*: Option[RelayConf] discv5Conf*: Option[Discv5Conf] dnsDiscoveryConf*: Option[DnsDiscoveryConf] filterServiceConf*: Option[FilterServiceConf] @@ -132,12 +138,6 @@ type WakuConf* {.requiresInit.} = ref object # TODO: use proper type rateLimits*: seq[string] - # TODO: those could be in a relay conf object - maxRelayPeers*: Option[int] - relayShardedPeerManagement*: bool - # TODO: use proper type - relayServiceRatio*: string - p2pReliability*: bool proc logConf*(conf: WakuConf) = @@ -175,7 +175,10 @@ proc validateNodeKey(wakuConf: WakuConf): Result[void, string] = return ok() proc validateShards(wakuConf: WakuConf): Result[void, string] = - let numShardsInNetwork = wakuConf.numShardsInNetwork + if wakuConf.autoShardingConf.isNone(): + return ok() + + let numShardsInNetwork = wakuConf.autoShardingConf.get().numShardsInNetwork for shard in wakuConf.shards: if shard >= numShardsInNetwork: @@ -188,7 +191,7 @@ proc validateShards(wakuConf: WakuConf): Result[void, string] = return ok() proc validateNoEmptyStrings(wakuConf: WakuConf): Result[void, string] = - if wakuConf.networkConf.dns4DomainName.isSome and + if wakuConf.networkConf.dns4DomainName.isSome() and isEmptyOrWhiteSpace(wakuConf.networkConf.dns4DomainName.get().string): return err("dns4DomainName is an empty string, set it to none(string) instead") @@ -199,25 +202,25 @@ proc validateNoEmptyStrings(wakuConf: WakuConf): Result[void, string] = if isEmptyOrWhiteSpace(sn): return err("staticNodes contain an empty string") - if wakuConf.remoteStoreNode.isSome and + if wakuConf.remoteStoreNode.isSome() and isEmptyOrWhiteSpace(wakuConf.remoteStoreNode.get()): return err("remoteStoreNode is an empty string, set it to none(string) instead") - if wakuConf.remoteLightPushNode.isSome and + if wakuConf.remoteLightPushNode.isSome() and isEmptyOrWhiteSpace(wakuConf.remoteLightPushNode.get()): return err("remoteLightPushNode is an empty string, set it to none(string) instead") - if wakuConf.remotePeerExchangeNode.isSome and + if wakuConf.remotePeerExchangeNode.isSome() and isEmptyOrWhiteSpace(wakuConf.remotePeerExchangeNode.get()): return err("remotePeerExchangeNode is an empty string, set it to none(string) instead") - if wakuConf.remoteFilterNode.isSome and + if wakuConf.remoteFilterNode.isSome() and isEmptyOrWhiteSpace(wakuConf.remoteFilterNode.get()): return err("remotePeerExchangeNode is an empty string, set it to none(string) instead") - if wakuConf.dnsDiscoveryConf.isSome and + if wakuConf.dnsDiscoveryConf.isSome() and isEmptyOrWhiteSpace(wakuConf.dnsDiscoveryConf.get().enrTreeUrl): return err ("dnsDiscoveryConf.enrTreeUrl is an empty string") diff --git a/waku/factory/waku_conf_builder.nim b/waku/factory/waku_conf_builder.nim index f78d9ac56..14b01e6b4 100644 --- a/waku/factory/waku_conf_builder.nim +++ b/waku/factory/waku_conf_builder.nim @@ -362,6 +362,34 @@ proc build(b: DnsDiscoveryConfBuilder): Result[Option[DnsDiscoveryConf], string] some(DnsDiscoveryConf(nameServers: b.nameServers, enrTreeUrl: b.enrTreeUrl.get())) ) +################################# +## AutoSharding Config Builder ## +################################# +type AutoShardingConfBuilder = object + enabled: Option[bool] + numShardsInNetwork*: Option[uint16] + contentTopics*: seq[string] + +proc init(T: type AutoShardingConfBuilder): AutoShardingConfBuilder = + AutoShardingConfBuilder() + +with(AutoShardingConfBuilder, enabled, bool) + +proc build(b: AutoShardingConfBuilder): Result[Option[AutoShardingConf], string] = + if not b.enabled.get(false): + return ok(none(AutoShardingConf)) + + if b.numShardsInNetwork.isNone: + return err("autoSharding.numShardsInNetwork is not specified") + + return ok( + some( + AutoShardingConf( + numShardsInNetwork: b.numShardsInNetwork.get(), contentTopics: b.contentTopics + ) + ) + ) + ########################### ## Discv5 Config Builder ## ########################### @@ -529,12 +557,13 @@ type WakuConfBuilder* = object nodeKey: Option[PrivateKey] clusterId: Option[uint16] - numShardsInNetwork: Option[uint32] + shards: Option[seq[uint16]] protectedShards: Option[seq[ProtectedShard]] contentTopics: Option[seq[string]] # Conf builders + autoShardingConf*: AutoShardingConfBuilder dnsDiscoveryConf*: DnsDiscoveryConfBuilder discv5Conf*: Discv5ConfBuilder filterServiceConf*: FilterServiceConfBuilder @@ -597,6 +626,7 @@ type WakuConfBuilder* = object proc init*(T: type WakuConfBuilder): WakuConfBuilder = WakuConfBuilder( + autoShardingConf: AutoShardingConfBuilder.init(), dnsDiscoveryConf: DnsDiscoveryConfBuilder.init(), discv5Conf: Discv5ConfBuilder.init(), filterServiceConf: FilterServiceConfBuilder.init(), @@ -610,10 +640,8 @@ proc init*(T: type WakuConfBuilder): WakuConfBuilder = with(WakuConfBuilder, clusterConf, ClusterConf) with(WakuConfBuilder, nodeKey, PrivateKey) with(WakuConfBuilder, clusterId, uint16) -with(WakuConfBuilder, numShardsInNetwork, uint32) with(WakuConfBuilder, shards, seq[uint16]) with(WakuConfBuilder, protectedShards, seq[ProtectedShard]) -with(WakuConfBuilder, contentTopics, seq[string]) with(WakuConfBuilder, relay, bool) with(WakuConfBuilder, lightPush, bool) with(WakuConfBuilder, storeSync, bool) @@ -736,11 +764,12 @@ proc applyClusterConf(builder: var WakuConfBuilder) = warn "Max Message Size was manually provided alongside a cluster conf", used = $builder.maxMessageSize, discarded = clusterConf.maxMessageSize - if builder.numShardsInNetwork.isNone: - builder.numShardsInNetwork = some(clusterConf.numShardsInNetwork) + if builder.autoShardingConf.numShardsInNetwork.isNone: + builder.autoShardingConf.numShardsInNetwork = clusterConf.numShardsInNetwork else: warn "Num Shards In Network was manually provided alongside a cluster conf", - used = builder.numShardsInNetwork, discarded = clusterConf.numShardsInNetwork + used = builder.autoShardingConf.numShardsInNetwork, + discarded = clusterConf.numShardsInNetwork if clusterConf.discv5Discovery: if builder.discv5Conf.enabled.isNone: @@ -802,21 +831,15 @@ proc build*( if builder.clusterId.isNone(): return err("Cluster Id was not specified") - let numShardsInNetwork = - if builder.numShardsInNetwork.isSome(): - builder.numShardsInNetwork.get() - else: - warn "Number of shards in network not specified, defaulting to one shard" - 1 - let shards = if builder.shards.isSome(): builder.shards.get() - else: - warn "shards not specified, defaulting to all shards in network" - # TODO: conversion should not be needed - let upperShard: uint16 = uint16(numShardsInNetwork - 1) + elif builder.autoShardingConf.numShardsInNetwork.isSome(): + info "shards not specified and auto-sharding is enabled via numShardsInNetwork, subscribing to all shards in network" + let upperShard: uint16 = builder.autoShardingConf.numShardsInNetwork.get() - 1 toSeq(0.uint16 .. upperShard) + else: + return err("") let protectedShards = builder.protectedShards.get(@[]) @@ -832,7 +855,10 @@ proc build*( let contentTopics = builder.contentTopics.get(@[]) - # Build sub-configsdnsDiscoveryConf + # Build sub-configs + let autoShardingConf = builder.autoShardingConf.build().valueOr: + return err("AutoSharding Conf building failed: " & $error) + let discv5Conf = builder.discv5Conf.build().valueOr: return err("Discv5 Conf building failed: " & $error)