mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-02 14:03:06 +00:00
fix autosharding/sharding config
This commit is contained in:
parent
4cc836e6fd
commit
fc7caa4a63
@ -213,12 +213,6 @@ type WakuNodeConf* = object
|
||||
name: "max-connections"
|
||||
.}: int
|
||||
|
||||
maxRelayPeers* {.
|
||||
desc:
|
||||
"Deprecated. Use relay-service-ratio instead. It represents the maximum allowed number of relay peers.",
|
||||
name: "max-relay-peers"
|
||||
.}: Option[int]
|
||||
|
||||
relayServiceRatio* {.
|
||||
desc:
|
||||
"This percentage ratio represents the relay peers to service peers. For example, 60:40, tells that 60% of the max-connections will be used for relay protocol and the other 40% of max-connections will be reserved for other service protocols (e.g., filter, lightpush, store, metadata, etc.)",
|
||||
@ -323,30 +317,18 @@ hence would have reachability issues.""",
|
||||
name: "keep-alive"
|
||||
.}: bool
|
||||
|
||||
# TODO: This is trying to do too much, this should only be used for autosharding, which itself should be configurable
|
||||
# If numShardsInNetwork is not set, we use the number of shards configured as numShardsInNetwork
|
||||
numShardsInNetwork* {.
|
||||
desc: "Number of shards in the network",
|
||||
desc:
|
||||
"Enable autosharding by specifying the total number of shards in the network",
|
||||
defaultValue: 0,
|
||||
name: "num-shards-in-network"
|
||||
.}: uint32
|
||||
|
||||
shards* {.
|
||||
desc:
|
||||
"Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.",
|
||||
defaultValue:
|
||||
@[
|
||||
uint16(0),
|
||||
uint16(1),
|
||||
uint16(2),
|
||||
uint16(3),
|
||||
uint16(4),
|
||||
uint16(5),
|
||||
uint16(6),
|
||||
uint16(7),
|
||||
],
|
||||
"Shard indexes for relay to subscribe to. Argument may be repeated. If autosharding is enabled, it subscribes to all shards by default.",
|
||||
name: "shard"
|
||||
.}: seq[uint16]
|
||||
.}: Option[seq[uint16]]
|
||||
|
||||
contentTopics* {.
|
||||
desc: "Default content topic to subscribe to. Argument may be repeated.",
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
{.push raises: [].}
|
||||
import std/options
|
||||
|
||||
# TODO: Rename this type to match file name
|
||||
|
||||
@ -11,8 +12,7 @@ type ClusterConf* = object
|
||||
rlnRelayDynamic*: bool
|
||||
rlnEpochSizeSec*: uint64
|
||||
rlnRelayUserMessageLimit*: uint64
|
||||
# TODO: should be uint16 like the `shards` parameter
|
||||
numShardsInNetwork*: uint32
|
||||
numShardsInNetwork*: Option[uint16] # If some, it means autosharding is enabled
|
||||
discv5Discovery*: bool
|
||||
discv5BootstrapNodes*: seq[string]
|
||||
|
||||
@ -29,7 +29,7 @@ proc TheWakuNetworkConf*(T: type ClusterConf): ClusterConf =
|
||||
rlnRelayChainId: 11155111,
|
||||
rlnEpochSizeSec: 600,
|
||||
rlnRelayUserMessageLimit: 100,
|
||||
numShardsInNetwork: 8,
|
||||
numShardsInNetwork: some(8),
|
||||
discv5Discovery: true,
|
||||
discv5BootstrapNodes:
|
||||
@[
|
||||
|
||||
@ -65,6 +65,15 @@ type NetworkConfig* = object # TODO: make enum
|
||||
extMultiAddrs*: seq[MultiAddress]
|
||||
extMultiAddrsOnly*: bool
|
||||
|
||||
type RelayConf* {.requiresInit.} = object
|
||||
shards*: seq[uint16]
|
||||
relayShardedPeerManagement*: bool
|
||||
relayServiceRatio*: string
|
||||
|
||||
type AutoShardingConf* {.requiresInit.} = object
|
||||
numShardsInNetwork*: uint16
|
||||
contentTopics*: seq[string]
|
||||
|
||||
## `WakuConf` is a valid configuration for a Waku node
|
||||
## All information needed by a waku node should be contained
|
||||
## In this object. A convenient `validate` method enables doing
|
||||
@ -75,14 +84,10 @@ type WakuConf* {.requiresInit.} = ref object
|
||||
nodeKey*: crypto.PrivateKey
|
||||
|
||||
clusterId*: uint16
|
||||
shards*: seq[uint16]
|
||||
protectedShards*: seq[ProtectedShard]
|
||||
|
||||
# TODO: move to an autoShardingConf
|
||||
numShardsInNetwork*: uint32
|
||||
contentTopics*: seq[string]
|
||||
autoShardingConf*: Option[AutoShardingConf]
|
||||
|
||||
relay*: bool
|
||||
lightPush*: bool
|
||||
peerExchange*: bool
|
||||
|
||||
@ -92,6 +97,7 @@ type WakuConf* {.requiresInit.} = ref object
|
||||
circuitRelayClient*: bool
|
||||
keepAlive*: bool
|
||||
|
||||
relayConf*: Option[RelayConf]
|
||||
discv5Conf*: Option[Discv5Conf]
|
||||
dnsDiscoveryConf*: Option[DnsDiscoveryConf]
|
||||
filterServiceConf*: Option[FilterServiceConf]
|
||||
@ -132,12 +138,6 @@ type WakuConf* {.requiresInit.} = ref object
|
||||
# TODO: use proper type
|
||||
rateLimits*: seq[string]
|
||||
|
||||
# TODO: those could be in a relay conf object
|
||||
maxRelayPeers*: Option[int]
|
||||
relayShardedPeerManagement*: bool
|
||||
# TODO: use proper type
|
||||
relayServiceRatio*: string
|
||||
|
||||
p2pReliability*: bool
|
||||
|
||||
proc logConf*(conf: WakuConf) =
|
||||
@ -175,7 +175,10 @@ proc validateNodeKey(wakuConf: WakuConf): Result[void, string] =
|
||||
return ok()
|
||||
|
||||
proc validateShards(wakuConf: WakuConf): Result[void, string] =
|
||||
let numShardsInNetwork = wakuConf.numShardsInNetwork
|
||||
if wakuConf.autoShardingConf.isNone():
|
||||
return ok()
|
||||
|
||||
let numShardsInNetwork = wakuConf.autoShardingConf.get().numShardsInNetwork
|
||||
|
||||
for shard in wakuConf.shards:
|
||||
if shard >= numShardsInNetwork:
|
||||
@ -188,7 +191,7 @@ proc validateShards(wakuConf: WakuConf): Result[void, string] =
|
||||
return ok()
|
||||
|
||||
proc validateNoEmptyStrings(wakuConf: WakuConf): Result[void, string] =
|
||||
if wakuConf.networkConf.dns4DomainName.isSome and
|
||||
if wakuConf.networkConf.dns4DomainName.isSome() and
|
||||
isEmptyOrWhiteSpace(wakuConf.networkConf.dns4DomainName.get().string):
|
||||
return err("dns4DomainName is an empty string, set it to none(string) instead")
|
||||
|
||||
@ -199,25 +202,25 @@ proc validateNoEmptyStrings(wakuConf: WakuConf): Result[void, string] =
|
||||
if isEmptyOrWhiteSpace(sn):
|
||||
return err("staticNodes contain an empty string")
|
||||
|
||||
if wakuConf.remoteStoreNode.isSome and
|
||||
if wakuConf.remoteStoreNode.isSome() and
|
||||
isEmptyOrWhiteSpace(wakuConf.remoteStoreNode.get()):
|
||||
return err("remoteStoreNode is an empty string, set it to none(string) instead")
|
||||
|
||||
if wakuConf.remoteLightPushNode.isSome and
|
||||
if wakuConf.remoteLightPushNode.isSome() and
|
||||
isEmptyOrWhiteSpace(wakuConf.remoteLightPushNode.get()):
|
||||
return err("remoteLightPushNode is an empty string, set it to none(string) instead")
|
||||
|
||||
if wakuConf.remotePeerExchangeNode.isSome and
|
||||
if wakuConf.remotePeerExchangeNode.isSome() and
|
||||
isEmptyOrWhiteSpace(wakuConf.remotePeerExchangeNode.get()):
|
||||
return
|
||||
err("remotePeerExchangeNode is an empty string, set it to none(string) instead")
|
||||
|
||||
if wakuConf.remoteFilterNode.isSome and
|
||||
if wakuConf.remoteFilterNode.isSome() and
|
||||
isEmptyOrWhiteSpace(wakuConf.remoteFilterNode.get()):
|
||||
return
|
||||
err("remotePeerExchangeNode is an empty string, set it to none(string) instead")
|
||||
|
||||
if wakuConf.dnsDiscoveryConf.isSome and
|
||||
if wakuConf.dnsDiscoveryConf.isSome() and
|
||||
isEmptyOrWhiteSpace(wakuConf.dnsDiscoveryConf.get().enrTreeUrl):
|
||||
return err ("dnsDiscoveryConf.enrTreeUrl is an empty string")
|
||||
|
||||
|
||||
@ -362,6 +362,34 @@ proc build(b: DnsDiscoveryConfBuilder): Result[Option[DnsDiscoveryConf], string]
|
||||
some(DnsDiscoveryConf(nameServers: b.nameServers, enrTreeUrl: b.enrTreeUrl.get()))
|
||||
)
|
||||
|
||||
#################################
|
||||
## AutoSharding Config Builder ##
|
||||
#################################
|
||||
type AutoShardingConfBuilder = object
|
||||
enabled: Option[bool]
|
||||
numShardsInNetwork*: Option[uint16]
|
||||
contentTopics*: seq[string]
|
||||
|
||||
proc init(T: type AutoShardingConfBuilder): AutoShardingConfBuilder =
|
||||
AutoShardingConfBuilder()
|
||||
|
||||
with(AutoShardingConfBuilder, enabled, bool)
|
||||
|
||||
proc build(b: AutoShardingConfBuilder): Result[Option[AutoShardingConf], string] =
|
||||
if not b.enabled.get(false):
|
||||
return ok(none(AutoShardingConf))
|
||||
|
||||
if b.numShardsInNetwork.isNone:
|
||||
return err("autoSharding.numShardsInNetwork is not specified")
|
||||
|
||||
return ok(
|
||||
some(
|
||||
AutoShardingConf(
|
||||
numShardsInNetwork: b.numShardsInNetwork.get(), contentTopics: b.contentTopics
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
###########################
|
||||
## Discv5 Config Builder ##
|
||||
###########################
|
||||
@ -529,12 +557,13 @@ type WakuConfBuilder* = object
|
||||
nodeKey: Option[PrivateKey]
|
||||
|
||||
clusterId: Option[uint16]
|
||||
numShardsInNetwork: Option[uint32]
|
||||
|
||||
shards: Option[seq[uint16]]
|
||||
protectedShards: Option[seq[ProtectedShard]]
|
||||
contentTopics: Option[seq[string]]
|
||||
|
||||
# Conf builders
|
||||
autoShardingConf*: AutoShardingConfBuilder
|
||||
dnsDiscoveryConf*: DnsDiscoveryConfBuilder
|
||||
discv5Conf*: Discv5ConfBuilder
|
||||
filterServiceConf*: FilterServiceConfBuilder
|
||||
@ -597,6 +626,7 @@ type WakuConfBuilder* = object
|
||||
|
||||
proc init*(T: type WakuConfBuilder): WakuConfBuilder =
|
||||
WakuConfBuilder(
|
||||
autoShardingConf: AutoShardingConfBuilder.init(),
|
||||
dnsDiscoveryConf: DnsDiscoveryConfBuilder.init(),
|
||||
discv5Conf: Discv5ConfBuilder.init(),
|
||||
filterServiceConf: FilterServiceConfBuilder.init(),
|
||||
@ -610,10 +640,8 @@ proc init*(T: type WakuConfBuilder): WakuConfBuilder =
|
||||
with(WakuConfBuilder, clusterConf, ClusterConf)
|
||||
with(WakuConfBuilder, nodeKey, PrivateKey)
|
||||
with(WakuConfBuilder, clusterId, uint16)
|
||||
with(WakuConfBuilder, numShardsInNetwork, uint32)
|
||||
with(WakuConfBuilder, shards, seq[uint16])
|
||||
with(WakuConfBuilder, protectedShards, seq[ProtectedShard])
|
||||
with(WakuConfBuilder, contentTopics, seq[string])
|
||||
with(WakuConfBuilder, relay, bool)
|
||||
with(WakuConfBuilder, lightPush, bool)
|
||||
with(WakuConfBuilder, storeSync, bool)
|
||||
@ -736,11 +764,12 @@ proc applyClusterConf(builder: var WakuConfBuilder) =
|
||||
warn "Max Message Size was manually provided alongside a cluster conf",
|
||||
used = $builder.maxMessageSize, discarded = clusterConf.maxMessageSize
|
||||
|
||||
if builder.numShardsInNetwork.isNone:
|
||||
builder.numShardsInNetwork = some(clusterConf.numShardsInNetwork)
|
||||
if builder.autoShardingConf.numShardsInNetwork.isNone:
|
||||
builder.autoShardingConf.numShardsInNetwork = clusterConf.numShardsInNetwork
|
||||
else:
|
||||
warn "Num Shards In Network was manually provided alongside a cluster conf",
|
||||
used = builder.numShardsInNetwork, discarded = clusterConf.numShardsInNetwork
|
||||
used = builder.autoShardingConf.numShardsInNetwork,
|
||||
discarded = clusterConf.numShardsInNetwork
|
||||
|
||||
if clusterConf.discv5Discovery:
|
||||
if builder.discv5Conf.enabled.isNone:
|
||||
@ -802,21 +831,15 @@ proc build*(
|
||||
if builder.clusterId.isNone():
|
||||
return err("Cluster Id was not specified")
|
||||
|
||||
let numShardsInNetwork =
|
||||
if builder.numShardsInNetwork.isSome():
|
||||
builder.numShardsInNetwork.get()
|
||||
else:
|
||||
warn "Number of shards in network not specified, defaulting to one shard"
|
||||
1
|
||||
|
||||
let shards =
|
||||
if builder.shards.isSome():
|
||||
builder.shards.get()
|
||||
else:
|
||||
warn "shards not specified, defaulting to all shards in network"
|
||||
# TODO: conversion should not be needed
|
||||
let upperShard: uint16 = uint16(numShardsInNetwork - 1)
|
||||
elif builder.autoShardingConf.numShardsInNetwork.isSome():
|
||||
info "shards not specified and auto-sharding is enabled via numShardsInNetwork, subscribing to all shards in network"
|
||||
let upperShard: uint16 = builder.autoShardingConf.numShardsInNetwork.get() - 1
|
||||
toSeq(0.uint16 .. upperShard)
|
||||
else:
|
||||
return err("")
|
||||
|
||||
let protectedShards = builder.protectedShards.get(@[])
|
||||
|
||||
@ -832,7 +855,10 @@ proc build*(
|
||||
|
||||
let contentTopics = builder.contentTopics.get(@[])
|
||||
|
||||
# Build sub-configsdnsDiscoveryConf
|
||||
# Build sub-configs
|
||||
let autoShardingConf = builder.autoShardingConf.build().valueOr:
|
||||
return err("AutoSharding Conf building failed: " & $error)
|
||||
|
||||
let discv5Conf = builder.discv5Conf.build().valueOr:
|
||||
return err("Discv5 Conf building failed: " & $error)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user