chore!: make sharding configuration explicit (#3468)

* Reserve `networkconfig` name to waku network related settings

* Rename cluster conf to network conf

 A `NetworkConf` is a Waku network configuration.

# Conflicts:
#	tests/factory/test_waku_conf.nim

# Conflicts:
#	tests/factory/test_waku_conf.nim

* Improve sharding configuration

A smarter data types simplifies the logic.

* Fixing tests

* fixup! rename to endpointConf

* wip: autosharding is a specific configuration state and treat it like
it

# Conflicts:
#	waku/factory/external_config.nim

* refactor lightpush handler

some metrics error reporting were missing

# Conflicts:
#	waku/waku_lightpush/protocol.nim

* test_node_factory tests pass

* remove warnings

* fix tests

* Revert eager previous replace-all command

* fix up build tools compilation

* metadata is used to store cluster id

* Mount relay routes in static sharding

* Rename activeRelayShards to subscribeShards

To make it clearer that these are the shards the node will subscribe to.

* Remove unused msg var

* Improve error handling

* Set autosharding as default, with 1 shard in network

Also makes shards to subscribe to all shards in auto sharding, none in
static sharding.
This commit is contained in:
fryorcraken 2025-07-04 17:10:53 +10:00 committed by GitHub
parent 19b67b0581
commit 16bda047e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 578 additions and 412 deletions

View File

@ -570,17 +570,18 @@ when isMainModule:
info "cli flags", conf = conf
if conf.clusterId == 1:
let twnClusterConf = ClusterConf.TheWakuNetworkConf()
let twnNetworkConf = NetworkConf.TheWakuNetworkConf()
conf.bootstrapNodes = twnClusterConf.discv5BootstrapNodes
conf.rlnRelayDynamic = twnClusterConf.rlnRelayDynamic
conf.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress
conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec
conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit
conf.numShardsInNetwork = twnClusterConf.numShardsInNetwork
conf.bootstrapNodes = twnNetworkConf.discv5BootstrapNodes
conf.rlnRelayDynamic = twnNetworkConf.rlnRelayDynamic
conf.rlnRelayEthContractAddress = twnNetworkConf.rlnRelayEthContractAddress
conf.rlnEpochSizeSec = twnNetworkConf.rlnEpochSizeSec
conf.rlnRelayUserMessageLimit = twnNetworkConf.rlnRelayUserMessageLimit
conf.numShardsInNetwork = twnNetworkConf.shardingConf.numShardsInCluster
if conf.shards.len == 0:
conf.shards = toSeq(uint16(0) .. uint16(twnClusterConf.numShardsInNetwork - 1))
conf.shards =
toSeq(uint16(0) .. uint16(twnNetworkConf.shardingConf.numShardsInCluster - 1))
if conf.logLevel != LogLevel.NONE:
setLogLevel(conf.logLevel)

View File

@ -24,26 +24,26 @@ proc setup*(): Waku =
var conf = confRes.get()
let twnClusterConf = ClusterConf.TheWakuNetworkConf()
let twnNetworkConf = NetworkConf.TheWakuNetworkConf()
if len(conf.shards) != 0:
conf.pubsubTopics = conf.shards.mapIt(twnClusterConf.pubsubTopics[it.uint16])
conf.pubsubTopics = conf.shards.mapIt(twnNetworkConf.pubsubTopics[it.uint16])
else:
conf.pubsubTopics = twnClusterConf.pubsubTopics
conf.pubsubTopics = twnNetworkConf.pubsubTopics
# Override configuration
conf.maxMessageSize = twnClusterConf.maxMessageSize
conf.clusterId = twnClusterConf.clusterId
conf.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress
conf.rlnRelayDynamic = twnClusterConf.rlnRelayDynamic
conf.discv5Discovery = twnClusterConf.discv5Discovery
conf.maxMessageSize = twnNetworkConf.maxMessageSize
conf.clusterId = twnNetworkConf.clusterId
conf.rlnRelayEthContractAddress = twnNetworkConf.rlnRelayEthContractAddress
conf.rlnRelayDynamic = twnNetworkConf.rlnRelayDynamic
conf.discv5Discovery = twnNetworkConf.discv5Discovery
conf.discv5BootstrapNodes =
conf.discv5BootstrapNodes & twnClusterConf.discv5BootstrapNodes
conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec
conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit
conf.discv5BootstrapNodes & twnNetworkConf.discv5BootstrapNodes
conf.rlnEpochSizeSec = twnNetworkConf.rlnEpochSizeSec
conf.rlnRelayUserMessageLimit = twnNetworkConf.rlnRelayUserMessageLimit
# Only set rlnRelay to true if relay is configured
if conf.relay:
conf.rlnRelay = twnClusterConf.rlnRelay
conf.rlnRelay = twnNetworkConf.rlnRelay
debug "Starting node"
var waku = Waku.new(conf).valueOr:

View File

@ -17,10 +17,46 @@ import
../../waku/common/logging,
../../waku/common/utils/parse_size_units
suite "Waku config - apply preset":
test "Default preset is TWN":
suite "Waku external config - default values":
test "Default sharding value":
## Setup
let expectedConf = ClusterConf.TheWakuNetworkConf()
let defaultShardingMode = AutoSharding
let defaultNumShardsInCluster = 1.uint16
let defaultSubscribeShards = @[0.uint16]
## Given
let preConfig = defaultWakuNodeConf().get()
## When
let res = preConfig.toWakuConf()
assert res.isOk(), $res.error
## Then
let conf = res.get()
check conf.shardingConf.kind == defaultShardingMode
check conf.shardingConf.numShardsInCluster == defaultNumShardsInCluster
check conf.subscribeShards == defaultSubscribeShards
test "Default shards value in static sharding":
## Setup
let defaultSubscribeShards: seq[uint16] = @[]
## Given
var preConfig = defaultWakuNodeConf().get()
preConfig.numShardsInNetwork = 0.uint16
## When
let res = preConfig.toWakuConf()
assert res.isOk(), $res.error
## Then
let conf = res.get()
check conf.subscribeShards == defaultSubscribeShards
suite "Waku external config - apply preset":
test "Preset is TWN":
## Setup
let expectedConf = NetworkConf.TheWakuNetworkConf()
## Given
let preConfig = WakuNodeConf(
@ -48,7 +84,9 @@ suite "Waku config - apply preset":
check rlnRelayConf.chainId == expectedConf.rlnRelayChainId
check rlnRelayConf.epochSizeSec == expectedConf.rlnEpochSizeSec
check rlnRelayConf.userMessageLimit == expectedConf.rlnRelayUserMessageLimit
check conf.numShardsInNetwork == expectedConf.numShardsInNetwork
check conf.shardingConf.kind == expectedConf.shardingConf.kind
check conf.shardingConf.numShardsInCluster ==
expectedConf.shardingConf.numShardsInCluster
check conf.discv5Conf.isSome() == expectedConf.discv5Discovery
if conf.discv5Conf.isSome():
let discv5Conf = conf.discv5Conf.get()
@ -56,7 +94,7 @@ suite "Waku config - apply preset":
test "Subscribes to all valid shards in twn":
## Setup
let expectedConf = ClusterConf.TheWakuNetworkConf()
let expectedConf = NetworkConf.TheWakuNetworkConf()
## Given
let shards: seq[uint16] = @[0, 1, 2, 3, 4, 5, 6, 7]
@ -68,11 +106,11 @@ suite "Waku config - apply preset":
## Then
let conf = res.get()
check conf.shards.len == expectedConf.numShardsInNetwork.int
check conf.subscribeShards.len == expectedConf.shardingConf.numShardsInCluster.int
test "Subscribes to some valid shards in twn":
## Setup
let expectedConf = ClusterConf.TheWakuNetworkConf()
let expectedConf = NetworkConf.TheWakuNetworkConf()
## Given
let shards: seq[uint16] = @[0, 4, 7]
@ -84,9 +122,9 @@ suite "Waku config - apply preset":
## Then
let conf = resConf.get()
assert conf.shards.len() == shards.len()
assert conf.subscribeShards.len() == shards.len()
for index, shard in shards:
assert shard in conf.shards
assert shard in conf.subscribeShards
test "Subscribes to invalid shards in twn":
## Setup
@ -103,7 +141,7 @@ suite "Waku config - apply preset":
test "Apply TWN preset when cluster id = 1":
## Setup
let expectedConf = ClusterConf.TheWakuNetworkConf()
let expectedConf = NetworkConf.TheWakuNetworkConf()
## Given
let preConfig = WakuNodeConf(
@ -131,13 +169,15 @@ suite "Waku config - apply preset":
check rlnRelayConf.chainId == expectedConf.rlnRelayChainId
check rlnRelayConf.epochSizeSec == expectedConf.rlnEpochSizeSec
check rlnRelayConf.userMessageLimit == expectedConf.rlnRelayUserMessageLimit
check conf.numShardsInNetwork == expectedConf.numShardsInNetwork
check conf.shardingConf.kind == expectedConf.shardingConf.kind
check conf.shardingConf.numShardsInCluster ==
expectedConf.shardingConf.numShardsInCluster
check conf.discv5Conf.isSome() == expectedConf.discv5Discovery
if conf.discv5Conf.isSome():
let discv5Conf = conf.discv5Conf.get()
check discv5Conf.bootstrapNodes == expectedConf.discv5BootstrapNodes
suite "Waku config - node key":
suite "Waku external config - node key":
test "Passed node key is used":
## Setup
let nodeKeyStr =
@ -158,13 +198,13 @@ suite "Waku config - node key":
assert utils.toHex(resKey.getRawBytes().get()) ==
utils.toHex(nodekey.getRawBytes().get())
suite "Waku config - Shards":
suite "Waku external config - Shards":
test "Shards are valid":
## Setup
## Given
let shards: seq[uint16] = @[0, 2, 4]
let numShardsInNetwork = 5.uint32
let numShardsInNetwork = 5.uint16
let wakuNodeConf = WakuNodeConf(
cmd: noCommand, shards: shards, numShardsInNetwork: numShardsInNetwork
)
@ -183,7 +223,7 @@ suite "Waku config - Shards":
## Given
let shards: seq[uint16] = @[0, 2, 5]
let numShardsInNetwork = 5.uint32
let numShardsInNetwork = 5.uint16
let wakuNodeConf = WakuNodeConf(
cmd: noCommand, shards: shards, numShardsInNetwork: numShardsInNetwork
)
@ -198,7 +238,7 @@ suite "Waku config - Shards":
## Setup
## Given
let wakuNodeConf = WakuNodeConf.load(version = "", cmdLine = @["--shard=32"])
let wakuNodeConf = WakuNodeConf.load(version = "", cmdLine = @["--shard=0"])
## When
let res = wakuNodeConf.toWakuConf()
@ -207,3 +247,15 @@ suite "Waku config - Shards":
let wakuConf = res.get()
let vRes = wakuConf.validate()
assert vRes.isOk(), $vRes.error
test "Imvalid shard is passed without num shards":
## Setup
## Given
let wakuNodeConf = WakuNodeConf.load(version = "", cmdLine = @["--shard=32"])
## When
let res = wakuNodeConf.toWakuConf()
## Then
assert res.isErr(), "Invalid shard was accepted"

View File

@ -16,7 +16,7 @@ import
suite "Waku Conf - build with cluster conf":
test "Cluster Conf is passed and relay is enabled":
## Setup
let clusterConf = ClusterConf.TheWakuNetworkConf()
let networkConf = NetworkConf.TheWakuNetworkConf()
var builder = WakuConfBuilder.init()
builder.discv5Conf.withUdpPort(9000)
builder.withRelayServiceRatio("50:50")
@ -25,7 +25,7 @@ suite "Waku Conf - build with cluster conf":
## Given
builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"])
builder.withClusterConf(clusterConf)
builder.withNetworkConf(networkConf)
builder.withRelay(true)
builder.rlnRelayConf.withTreePath("/tmp/test-tree-path")
@ -37,27 +37,29 @@ suite "Waku Conf - build with cluster conf":
## Then
let resValidate = conf.validate()
assert resValidate.isOk(), $resValidate.error
check conf.clusterId == clusterConf.clusterId
check conf.numShardsInNetwork == clusterConf.numShardsInNetwork
check conf.shards == expectedShards
check conf.clusterId == networkConf.clusterId
check conf.shardingConf.kind == networkConf.shardingConf.kind
check conf.shardingConf.numShardsInCluster ==
networkConf.shardingConf.numShardsInCluster
check conf.subscribeShards == expectedShards
check conf.maxMessageSizeBytes ==
uint64(parseCorrectMsgSize(clusterConf.maxMessageSize))
check conf.discv5Conf.get().bootstrapNodes == clusterConf.discv5BootstrapNodes
uint64(parseCorrectMsgSize(networkConf.maxMessageSize))
check conf.discv5Conf.get().bootstrapNodes == networkConf.discv5BootstrapNodes
if clusterConf.rlnRelay:
if networkConf.rlnRelay:
assert conf.rlnRelayConf.isSome(), "RLN Relay conf is disabled"
let rlnRelayConf = conf.rlnRelayConf.get()
check rlnRelayConf.ethContractAddress.string ==
clusterConf.rlnRelayEthContractAddress
check rlnRelayConf.dynamic == clusterConf.rlnRelayDynamic
check rlnRelayConf.chainId == clusterConf.rlnRelayChainId
check rlnRelayConf.epochSizeSec == clusterConf.rlnEpochSizeSec
check rlnRelayConf.userMessageLimit == clusterConf.rlnRelayUserMessageLimit
networkConf.rlnRelayEthContractAddress
check rlnRelayConf.dynamic == networkConf.rlnRelayDynamic
check rlnRelayConf.chainId == networkConf.rlnRelayChainId
check rlnRelayConf.epochSizeSec == networkConf.rlnEpochSizeSec
check rlnRelayConf.userMessageLimit == networkConf.rlnRelayUserMessageLimit
test "Cluster Conf is passed, but relay is disabled":
## Setup
let clusterConf = ClusterConf.TheWakuNetworkConf()
let networkConf = NetworkConf.TheWakuNetworkConf()
var builder = WakuConfBuilder.init()
builder.withRelayServiceRatio("50:50")
builder.discv5Conf.withUdpPort(9000)
@ -66,7 +68,7 @@ suite "Waku Conf - build with cluster conf":
## Given
builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"])
builder.withClusterConf(clusterConf)
builder.withNetworkConf(networkConf)
builder.withRelay(false)
## When
@ -77,18 +79,20 @@ suite "Waku Conf - build with cluster conf":
## Then
let resValidate = conf.validate()
assert resValidate.isOk(), $resValidate.error
check conf.clusterId == clusterConf.clusterId
check conf.numShardsInNetwork == clusterConf.numShardsInNetwork
check conf.shards == expectedShards
check conf.clusterId == networkConf.clusterId
check conf.shardingConf.kind == networkConf.shardingConf.kind
check conf.shardingConf.numShardsInCluster ==
networkConf.shardingConf.numShardsInCluster
check conf.subscribeShards == expectedShards
check conf.maxMessageSizeBytes ==
uint64(parseCorrectMsgSize(clusterConf.maxMessageSize))
check conf.discv5Conf.get().bootstrapNodes == clusterConf.discv5BootstrapNodes
uint64(parseCorrectMsgSize(networkConf.maxMessageSize))
check conf.discv5Conf.get().bootstrapNodes == networkConf.discv5BootstrapNodes
assert conf.rlnRelayConf.isNone
test "Cluster Conf is passed, but rln relay is disabled":
## Setup
let clusterConf = ClusterConf.TheWakuNetworkConf()
let networkConf = NetworkConf.TheWakuNetworkConf()
var builder = WakuConfBuilder.init()
let # Mount all shards in network
@ -96,7 +100,7 @@ suite "Waku Conf - build with cluster conf":
## Given
builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"])
builder.withClusterConf(clusterConf)
builder.withNetworkConf(networkConf)
builder.rlnRelayConf.withEnabled(false)
## When
@ -107,24 +111,26 @@ suite "Waku Conf - build with cluster conf":
## Then
let resValidate = conf.validate()
assert resValidate.isOk(), $resValidate.error
check conf.clusterId == clusterConf.clusterId
check conf.numShardsInNetwork == clusterConf.numShardsInNetwork
check conf.shards == expectedShards
check conf.clusterId == networkConf.clusterId
check conf.shardingConf.kind == networkConf.shardingConf.kind
check conf.shardingConf.numShardsInCluster ==
networkConf.shardingConf.numShardsInCluster
check conf.subscribeShards == expectedShards
check conf.maxMessageSizeBytes ==
uint64(parseCorrectMsgSize(clusterConf.maxMessageSize))
check conf.discv5Conf.get().bootstrapNodes == clusterConf.discv5BootstrapNodes
uint64(parseCorrectMsgSize(networkConf.maxMessageSize))
check conf.discv5Conf.get().bootstrapNodes == networkConf.discv5BootstrapNodes
assert conf.rlnRelayConf.isNone
test "Cluster Conf is passed and valid shards are specified":
## Setup
let clusterConf = ClusterConf.TheWakuNetworkConf()
let networkConf = NetworkConf.TheWakuNetworkConf()
var builder = WakuConfBuilder.init()
let shards = @[2.uint16, 3.uint16]
## Given
builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"])
builder.withClusterConf(clusterConf)
builder.withShards(shards)
builder.withNetworkConf(networkConf)
builder.withSubscribeShards(shards)
## When
let resConf = builder.build()
@ -134,23 +140,25 @@ suite "Waku Conf - build with cluster conf":
## Then
let resValidate = conf.validate()
assert resValidate.isOk(), $resValidate.error
check conf.clusterId == clusterConf.clusterId
check conf.numShardsInNetwork == clusterConf.numShardsInNetwork
check conf.shards == shards
check conf.clusterId == networkConf.clusterId
check conf.shardingConf.kind == networkConf.shardingConf.kind
check conf.shardingConf.numShardsInCluster ==
networkConf.shardingConf.numShardsInCluster
check conf.subscribeShards == shards
check conf.maxMessageSizeBytes ==
uint64(parseCorrectMsgSize(clusterConf.maxMessageSize))
check conf.discv5Conf.get().bootstrapNodes == clusterConf.discv5BootstrapNodes
uint64(parseCorrectMsgSize(networkConf.maxMessageSize))
check conf.discv5Conf.get().bootstrapNodes == networkConf.discv5BootstrapNodes
test "Cluster Conf is passed and invalid shards are specified":
## Setup
let clusterConf = ClusterConf.TheWakuNetworkConf()
let networkConf = NetworkConf.TheWakuNetworkConf()
var builder = WakuConfBuilder.init()
let shards = @[2.uint16, 10.uint16]
## Given
builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"])
builder.withClusterConf(clusterConf)
builder.withShards(shards)
builder.withNetworkConf(networkConf)
builder.withSubscribeShards(shards)
## When
let resConf = builder.build()
@ -160,7 +168,7 @@ suite "Waku Conf - build with cluster conf":
test "Cluster Conf is passed and RLN contract is **not** overridden":
## Setup
let clusterConf = ClusterConf.TheWakuNetworkConf()
let networkConf = NetworkConf.TheWakuNetworkConf()
var builder = WakuConfBuilder.init()
builder.rlnRelayConf.withEthClientUrls(@["https://my_eth_rpc_url/"])
@ -170,7 +178,7 @@ suite "Waku Conf - build with cluster conf":
## Given
builder.rlnRelayConf.withEthContractAddress(contractAddress)
builder.withClusterConf(clusterConf)
builder.withNetworkConf(networkConf)
builder.withRelay(true)
builder.rlnRelayConf.withTreePath("/tmp/test")
@ -182,24 +190,26 @@ suite "Waku Conf - build with cluster conf":
## Then
let resValidate = conf.validate()
assert resValidate.isOk(), $resValidate.error
check conf.clusterId == clusterConf.clusterId
check conf.numShardsInNetwork == clusterConf.numShardsInNetwork
check conf.shards == expectedShards
check conf.clusterId == networkConf.clusterId
check conf.shardingConf.kind == networkConf.shardingConf.kind
check conf.shardingConf.numShardsInCluster ==
networkConf.shardingConf.numShardsInCluster
check conf.subscribeShards == expectedShards
check conf.maxMessageSizeBytes ==
uint64(parseCorrectMsgSize(clusterConf.maxMessageSize))
check conf.discv5Conf.isSome == clusterConf.discv5Discovery
check conf.discv5Conf.get().bootstrapNodes == clusterConf.discv5BootstrapNodes
uint64(parseCorrectMsgSize(networkConf.maxMessageSize))
check conf.discv5Conf.isSome == networkConf.discv5Discovery
check conf.discv5Conf.get().bootstrapNodes == networkConf.discv5BootstrapNodes
if clusterConf.rlnRelay:
if networkConf.rlnRelay:
assert conf.rlnRelayConf.isSome
let rlnRelayConf = conf.rlnRelayConf.get()
check rlnRelayConf.ethContractAddress.string ==
clusterConf.rlnRelayEthContractAddress
check rlnRelayConf.dynamic == clusterConf.rlnRelayDynamic
check rlnRelayConf.chainId == clusterConf.rlnRelayChainId
check rlnRelayConf.epochSizeSec == clusterConf.rlnEpochSizeSec
check rlnRelayConf.userMessageLimit == clusterConf.rlnRelayUserMessageLimit
networkConf.rlnRelayEthContractAddress
check rlnRelayConf.dynamic == networkConf.rlnRelayDynamic
check rlnRelayConf.chainId == networkConf.rlnRelayChainId
check rlnRelayConf.epochSizeSec == networkConf.rlnEpochSizeSec
check rlnRelayConf.userMessageLimit == networkConf.rlnRelayUserMessageLimit
suite "Waku Conf - node key":
test "Node key is generated":
@ -264,8 +274,8 @@ suite "Waku Conf - extMultiaddrs":
## Then
let resValidate = conf.validate()
assert resValidate.isOk(), $resValidate.error
check multiaddrs.len == conf.networkConf.extMultiAddrs.len
let resMultiaddrs = conf.networkConf.extMultiAddrs.map(
check multiaddrs.len == conf.endpointConf.extMultiAddrs.len
let resMultiaddrs = conf.endpointConf.extMultiAddrs.map(
proc(m: MultiAddress): string =
$m
)

View File

@ -420,7 +420,7 @@ procSuite "Peer Manager":
parseIpAddress("0.0.0.0"),
port,
clusterId = 3,
shards = @[uint16(0)],
subscribeShards = @[uint16(0)],
)
# same network
@ -429,14 +429,14 @@ procSuite "Peer Manager":
parseIpAddress("0.0.0.0"),
port,
clusterId = 4,
shards = @[uint16(0)],
subscribeShards = @[uint16(0)],
)
node3 = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
port,
clusterId = 4,
shards = @[uint16(0)],
subscribeShards = @[uint16(0)],
)
node1.mountMetadata(3).expect("Mounted Waku Metadata")

View File

@ -18,8 +18,8 @@ suite "Waku NetConfig":
let wakuFlags = defaultTestWakuFlags()
let netConfigRes = NetConfig.init(
bindIp = conf.networkConf.p2pListenAddress,
bindPort = conf.networkConf.p2pTcpPort,
bindIp = conf.endpointConf.p2pListenAddress,
bindPort = conf.endpointConf.p2pTcpPort,
extIp = none(IpAddress),
extPort = none(Port),
extMultiAddrs = @[],
@ -46,7 +46,8 @@ suite "Waku NetConfig":
let conf = defaultTestWakuConf()
let netConfigRes = NetConfig.init(
bindIp = conf.networkConf.p2pListenAddress, bindPort = conf.networkConf.p2pTcpPort
bindIp = conf.endpointConf.p2pListenAddress,
bindPort = conf.endpointConf.p2pTcpPort,
)
assert netConfigRes.isOk(), $netConfigRes.error
@ -57,7 +58,9 @@ suite "Waku NetConfig":
netConfig.announcedAddresses.len == 1 # Only bind address should be present
netConfig.announcedAddresses[0] ==
formatListenAddress(
ip4TcpEndPoint(conf.networkConf.p2pListenAddress, conf.networkConf.p2pTcpPort)
ip4TcpEndPoint(
conf.endpointConf.p2pListenAddress, conf.endpointConf.p2pTcpPort
)
)
asyncTest "AnnouncedAddresses contains external address if extIp/Port are provided":
@ -67,8 +70,8 @@ suite "Waku NetConfig":
extPort = Port(1234)
let netConfigRes = NetConfig.init(
bindIp = conf.networkConf.p2pListenAddress,
bindPort = conf.networkConf.p2pTcpPort,
bindIp = conf.endpointConf.p2pListenAddress,
bindPort = conf.endpointConf.p2pTcpPort,
extIp = some(extIp),
extPort = some(extPort),
)
@ -88,8 +91,8 @@ suite "Waku NetConfig":
extPort = Port(1234)
let netConfigRes = NetConfig.init(
bindIp = conf.networkConf.p2pListenAddress,
bindPort = conf.networkConf.p2pTcpPort,
bindIp = conf.endpointConf.p2pListenAddress,
bindPort = conf.endpointConf.p2pTcpPort,
dns4DomainName = some(dns4DomainName),
extPort = some(extPort),
)
@ -110,8 +113,8 @@ suite "Waku NetConfig":
extMultiAddrs = @[ip4TcpEndPoint(extIp, extPort)]
let netConfigRes = NetConfig.init(
bindIp = conf.networkConf.p2pListenAddress,
bindPort = conf.networkConf.p2pTcpPort,
bindIp = conf.endpointConf.p2pListenAddress,
bindPort = conf.endpointConf.p2pTcpPort,
extMultiAddrs = extMultiAddrs,
)
@ -131,8 +134,8 @@ suite "Waku NetConfig":
extPort = Port(1234)
let netConfigRes = NetConfig.init(
bindIp = conf.networkConf.p2pListenAddress,
bindPort = conf.networkConf.p2pTcpPort,
bindIp = conf.endpointConf.p2pListenAddress,
bindPort = conf.endpointConf.p2pTcpPort,
dns4DomainName = some(dns4DomainName),
extIp = some(extIp),
extPort = some(extPort),
@ -152,8 +155,8 @@ suite "Waku NetConfig":
wssEnabled = false
var netConfigRes = NetConfig.init(
bindIp = conf.networkConf.p2pListenAddress,
bindPort = conf.networkConf.p2pTcpPort,
bindIp = conf.endpointConf.p2pListenAddress,
bindPort = conf.endpointConf.p2pTcpPort,
wsEnabled = true,
wssEnabled = wssEnabled,
)
@ -165,8 +168,9 @@ suite "Waku NetConfig":
check:
netConfig.announcedAddresses.len == 2 # Bind address + wsHostAddress
netConfig.announcedAddresses[1] == (
ip4TcpEndPoint(conf.networkConf.p2pListenAddress, conf.webSocketConf.get().port) &
wsFlag(wssEnabled)
ip4TcpEndPoint(
conf.endpointConf.p2pListenAddress, conf.webSocketConf.get().port
) & wsFlag(wssEnabled)
)
## Now try the same for the case of wssEnabled = true
@ -174,8 +178,8 @@ suite "Waku NetConfig":
wssEnabled = true
netConfigRes = NetConfig.init(
bindIp = conf.networkConf.p2pListenAddress,
bindPort = conf.networkConf.p2pTcpPort,
bindIp = conf.endpointConf.p2pListenAddress,
bindPort = conf.endpointConf.p2pTcpPort,
wsEnabled = true,
wssEnabled = wssEnabled,
)
@ -187,8 +191,9 @@ suite "Waku NetConfig":
check:
netConfig.announcedAddresses.len == 2 # Bind address + wsHostAddress
netConfig.announcedAddresses[1] == (
ip4TcpEndPoint(conf.networkConf.p2pListenAddress, conf.websocketConf.get().port) &
wsFlag(wssEnabled)
ip4TcpEndPoint(
conf.endpointConf.p2pListenAddress, conf.websocketConf.get().port
) & wsFlag(wssEnabled)
)
asyncTest "Announced WebSocket address contains external IP if provided":
@ -199,8 +204,8 @@ suite "Waku NetConfig":
wssEnabled = false
let netConfigRes = NetConfig.init(
bindIp = conf.networkConf.p2pListenAddress,
bindPort = conf.networkConf.p2pTcpPort,
bindIp = conf.endpointConf.p2pListenAddress,
bindPort = conf.endpointConf.p2pTcpPort,
extIp = some(extIp),
extPort = some(extPort),
wsEnabled = true,
@ -224,8 +229,8 @@ suite "Waku NetConfig":
wssEnabled = false
let netConfigRes = NetConfig.init(
bindIp = conf.networkConf.p2pListenAddress,
bindPort = conf.networkConf.p2pTcpPort,
bindIp = conf.endpointConf.p2pListenAddress,
bindPort = conf.endpointConf.p2pTcpPort,
dns4DomainName = some(dns4DomainName),
extPort = some(extPort),
wsEnabled = true,
@ -252,8 +257,8 @@ suite "Waku NetConfig":
wssEnabled = false
let netConfigRes = NetConfig.init(
bindIp = conf.networkConf.p2pListenAddress,
bindPort = conf.networkConf.p2pTcpPort,
bindIp = conf.endpointConf.p2pListenAddress,
bindPort = conf.endpointConf.p2pTcpPort,
dns4DomainName = some(dns4DomainName),
extIp = some(extIp),
extPort = some(extPort),
@ -277,7 +282,8 @@ suite "Waku NetConfig":
let conf = defaultTestWakuConf()
let netConfigRes = NetConfig.init(
bindIp = conf.networkConf.p2pListenAddress, bindPort = conf.networkConf.p2pTcpPort
bindIp = conf.endpointConf.p2pListenAddress,
bindPort = conf.endpointConf.p2pTcpPort,
)
assert netConfigRes.isOk(), $netConfigRes.error
@ -285,8 +291,8 @@ suite "Waku NetConfig":
let netConfig = netConfigRes.get()
check:
netConfig.enrIp.get() == conf.networkConf.p2pListenAddress
netConfig.enrPort.get() == conf.networkConf.p2pTcpPort
netConfig.enrIp.get() == conf.endpointConf.p2pListenAddress
netConfig.enrPort.get() == conf.endpointConf.p2pTcpPort
asyncTest "ENR is set with extIp/Port if provided":
let
@ -295,8 +301,8 @@ suite "Waku NetConfig":
extPort = Port(1234)
let netConfigRes = NetConfig.init(
bindIp = conf.networkConf.p2pListenAddress,
bindPort = conf.networkConf.p2pTcpPort,
bindIp = conf.endpointConf.p2pListenAddress,
bindPort = conf.endpointConf.p2pTcpPort,
extIp = some(extIp),
extPort = some(extPort),
)
@ -316,8 +322,8 @@ suite "Waku NetConfig":
extPort = Port(1234)
let netConfigRes = NetConfig.init(
bindIp = conf.networkConf.p2pListenAddress,
bindPort = conf.networkConf.p2pTcpPort,
bindIp = conf.endpointConf.p2pListenAddress,
bindPort = conf.endpointConf.p2pTcpPort,
dns4DomainName = some(dns4DomainName),
extPort = some(extPort),
)
@ -339,8 +345,8 @@ suite "Waku NetConfig":
extMultiAddrs = @[(ip4TcpEndPoint(extAddIp, extAddPort) & wsFlag(wssEnabled))]
var netConfigRes = NetConfig.init(
bindIp = conf.networkConf.p2pListenAddress,
bindPort = conf.networkConf.p2pTcpPort,
bindIp = conf.endpointConf.p2pListenAddress,
bindPort = conf.endpointConf.p2pTcpPort,
extMultiAddrs = extMultiAddrs,
wsEnabled = wsEnabled,
)
@ -358,8 +364,8 @@ suite "Waku NetConfig":
extMultiAddrs = @[(ip4TcpEndPoint(extAddIp, extAddPort) & wsFlag(wssEnabled))]
netConfigRes = NetConfig.init(
bindIp = conf.networkConf.p2pListenAddress,
bindPort = conf.networkConf.p2pTcpPort,
bindIp = conf.endpointConf.p2pListenAddress,
bindPort = conf.endpointConf.p2pTcpPort,
extMultiAddrs = extMultiAddrs,
wssEnabled = wssEnabled,
)
@ -380,8 +386,8 @@ suite "Waku NetConfig":
extMultiAddrs = @[ip4TcpEndPoint(extAddIp, extAddPort)]
let netConfigRes = NetConfig.init(
bindIp = conf.networkConf.p2pListenAddress,
bindPort = conf.networkConf.p2pTcpPort,
bindIp = conf.endpointConf.p2pListenAddress,
bindPort = conf.endpointConf.p2pTcpPort,
extMultiAddrs = extMultiAddrs,
extMultiAddrsOnly = true,
)

View File

@ -37,7 +37,7 @@ proc defaultTestWakuConfBuilder*(): WakuConfBuilder =
builder.withRelayServiceRatio("60:40")
builder.withMaxMessageSize("1024 KiB")
builder.withClusterId(DefaultClusterId)
builder.withShards(@[DefaultShardId])
builder.withSubscribeShards(@[DefaultShardId])
builder.withRelay(true)
builder.withRendezvous(true)
builder.storeServiceConf.withDbMigration(false)
@ -72,7 +72,7 @@ proc newTestWakuNode*(
agentString = none(string),
peerStoreCapacity = none(int),
clusterId = DefaultClusterId,
shards = @[DefaultShardId],
subscribeShards = @[DefaultShardId],
): WakuNode =
var resolvedExtIp = extIp
@ -86,7 +86,7 @@ proc newTestWakuNode*(
var conf = defaultTestWakuConf()
conf.clusterId = clusterId
conf.shards = shards
conf.subscribeShards = subscribeShards
if dns4DomainName.isSome() and extIp.isNone():
# If there's an error resolving the IP, an exception is thrown and test fails
@ -114,7 +114,7 @@ proc newTestWakuNode*(
var enrBuilder = EnrBuilder.init(nodeKey)
enrBuilder.withWakuRelaySharding(
RelayShards(clusterId: conf.clusterId, shardIds: conf.shards)
RelayShards(clusterId: conf.clusterId, shardIds: conf.subscribeShards)
).isOkOr:
raise newException(Defect, "Invalid record: " & $error)

View File

@ -503,7 +503,7 @@ suite "Waku Discovery v5":
waku.dynamicBootstrapNodes,
waku.rng,
waku.conf.nodeKey,
waku.conf.networkConf.p2pListenAddress,
waku.conf.endpointConf.p2pListenAddress,
waku.conf.portsShift,
)
@ -534,7 +534,7 @@ suite "Waku Discovery v5":
waku.dynamicBootstrapNodes,
waku.rng,
waku.conf.nodeKey,
waku.conf.networkConf.p2pListenAddress,
waku.conf.endpointConf.p2pListenAddress,
waku.conf.portsShift,
)

View File

@ -18,8 +18,10 @@ proc newTestWakuLightpushNode*(
): Future[WakuLightPush] {.async.} =
let
peerManager = PeerManager.new(switch)
wakuSharding = Sharding(clusterId: 1, shardCountGenZero: 8)
proto = WakuLightPush.new(peerManager, rng, handler, wakuSharding, rateLimitSetting)
wakuAutoSharding = Sharding(clusterId: 1, shardCountGenZero: 8)
proto = WakuLightPush.new(
peerManager, rng, handler, some(wakuAutoSharding), rateLimitSetting
)
await proto.start()
switch.mount(proto)

View File

@ -657,7 +657,7 @@ suite "WakuNode - Relay":
await node.start()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
require node.mountSharding(1, 1).isOk
require node.mountAutoSharding(1, 1).isOk
## Given
let
@ -670,11 +670,14 @@ suite "WakuNode - Relay":
): Future[void] {.gcsafe, raises: [Defect].} =
discard pubsubTopic
discard message
assert shard == node.wakuSharding.getShard(contentTopicA).expect("Valid Topic"),
assert shard ==
node.wakuAutoSharding.get().getShard(contentTopicA).expect("Valid Topic"),
"topic must use the same shard"
assert shard == node.wakuSharding.getShard(contentTopicB).expect("Valid Topic"),
assert shard ==
node.wakuAutoSharding.get().getShard(contentTopicB).expect("Valid Topic"),
"topic must use the same shard"
assert shard == node.wakuSharding.getShard(contentTopicC).expect("Valid Topic"),
assert shard ==
node.wakuAutoSharding.get().getShard(contentTopicC).expect("Valid Topic"),
"topic must use the same shard"
## When

View File

@ -65,7 +65,7 @@ suite "Wakunode2 - Waku initialization":
test "app properly handles dynamic port configuration":
## Given
var conf = defaultTestWakuConf()
conf.networkConf.p2pTcpPort = Port(0)
conf.endpointConf.p2pTcpPort = Port(0)
## When
var waku = Waku.new(conf).valueOr:

View File

@ -1,7 +1,7 @@
{.used.}
import
std/[sequtils, strformat, net],
std/[sequtils, net],
testutils/unittests,
presto,
presto/client as presto_client,
@ -42,6 +42,14 @@ suite "Waku v2 Rest API - Admin":
node2 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(60602))
node3 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(60604))
let clusterId = 1.uint16
node1.mountMetadata(clusterId).isOkOr:
assert false, "Failed to mount metadata: " & $error
node2.mountMetadata(clusterId).isOkOr:
assert false, "Failed to mount metadata: " & $error
node3.mountMetadata(clusterId).isOkOr:
assert false, "Failed to mount metadata: " & $error
await allFutures(node1.start(), node2.start(), node3.start())
await allFutures(
node1.mountRelay(),
@ -56,7 +64,7 @@ suite "Waku v2 Rest API - Admin":
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
let shard = RelayShard(clusterId: 1, shardId: 0)
let shard = RelayShard(clusterId: clusterId, shardId: 0)
node1.subscribe((kind: PubsubSub, topic: $shard), simpleHandler).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
node2.subscribe((kind: PubsubSub, topic: $shard), simpleHandler).isOkOr:

View File

@ -296,7 +296,7 @@ suite "Waku v2 Rest API - Relay":
await node.start()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
require node.mountSharding(1, 8).isOk
require node.mountAutoSharding(1, 8).isOk
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
@ -346,6 +346,7 @@ suite "Waku v2 Rest API - Relay":
await node.start()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
require node.mountAutoSharding(1, 8).isOk
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
@ -404,6 +405,7 @@ suite "Waku v2 Rest API - Relay":
await node.start()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
require node.mountAutoSharding(1, 8).isOk
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
@ -469,6 +471,8 @@ suite "Waku v2 Rest API - Relay":
await node.start()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
require node.mountAutoSharding(1, 8).isOk
let wakuRlnConfig = WakuRlnConfig(
dynamic: false,
credIndex: some(1.uint),
@ -528,6 +532,8 @@ suite "Waku v2 Rest API - Relay":
await node.start()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
require node.mountAutoSharding(1, 8).isOk
let wakuRlnConfig = WakuRlnConfig(
dynamic: false,
credIndex: some(1.uint),
@ -641,6 +647,8 @@ suite "Waku v2 Rest API - Relay":
await node.start()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
require node.mountAutoSharding(1, 8).isOk
let wakuRlnConfig = WakuRlnConfig(
dynamic: false,
credIndex: some(1.uint),

View File

@ -59,8 +59,9 @@ type WakuConfBuilder* = object
nodeKey: Option[crypto.PrivateKey]
clusterId: Option[uint16]
numShardsInNetwork: Option[uint32]
shards: Option[seq[uint16]]
shardingConf: Option[ShardingConfKind]
numShardsInCluster: Option[uint16]
subscribeShards: Option[seq[uint16]]
protectedShards: Option[seq[ProtectedShard]]
contentTopics: Option[seq[string]]
@ -83,7 +84,7 @@ type WakuConfBuilder* = object
# TODO: move within a relayConf
rendezvous: Option[bool]
clusterConf: Option[ClusterConf]
networkConf: Option[NetworkConf]
staticNodes: seq[string]
@ -135,8 +136,8 @@ proc init*(T: type WakuConfBuilder): WakuConfBuilder =
webSocketConf: WebSocketConfBuilder.init(),
)
proc withClusterConf*(b: var WakuConfBuilder, clusterConf: ClusterConf) =
b.clusterConf = some(clusterConf)
proc withNetworkConf*(b: var WakuConfBuilder, networkConf: NetworkConf) =
b.networkConf = some(networkConf)
proc withNodeKey*(b: var WakuConfBuilder, nodeKey: crypto.PrivateKey) =
b.nodeKey = some(nodeKey)
@ -144,11 +145,14 @@ proc withNodeKey*(b: var WakuConfBuilder, nodeKey: crypto.PrivateKey) =
proc withClusterId*(b: var WakuConfBuilder, clusterId: uint16) =
b.clusterId = some(clusterId)
proc withNumShardsInNetwork*(b: var WakuConfBuilder, numShardsInNetwork: uint32) =
b.numShardsInNetwork = some(numShardsInNetwork)
proc withShardingConf*(b: var WakuConfBuilder, shardingConf: ShardingConfKind) =
b.shardingConf = some(shardingConf)
proc withShards*(b: var WakuConfBuilder, shards: seq[uint16]) =
b.shards = some(shards)
proc withNumShardsInCluster*(b: var WakuConfBuilder, numShardsInCluster: uint16) =
b.numShardsInCluster = some(numShardsInCluster)
proc withSubscribeShards*(b: var WakuConfBuilder, shards: seq[uint16]) =
b.subscribeShards = some(shards)
proc withProtectedShards*(
b: var WakuConfBuilder, protectedShards: seq[ProtectedShard]
@ -269,6 +273,8 @@ proc withMaxMessageSize*(builder: var WakuConfBuilder, maxMessageSize: string) =
proc withStaticNodes*(builder: var WakuConfBuilder, staticNodes: seq[string]) =
builder.staticNodes = concat(builder.staticNodes, staticNodes)
## Building
proc nodeKey(
builder: WakuConfBuilder, rng: ref HmacDrbgContext
): Result[crypto.PrivateKey, string] =
@ -281,77 +287,105 @@ proc nodeKey(
return err("Failed to generate key: " & $error)
return ok(nodeKey)
proc applyClusterConf(builder: var WakuConfBuilder) =
# Apply cluster conf, overrides most values passed individually
# If you want to tweak values, don't use clusterConf
if builder.clusterConf.isNone():
proc buildShardingConf(
bShardingConfKind: Option[ShardingConfKind],
bNumShardsInCluster: Option[uint16],
bSubscribeShards: Option[seq[uint16]],
): (ShardingConf, seq[uint16]) =
echo "bSubscribeShards: ", bSubscribeShards
case bShardingConfKind.get(AutoSharding)
of StaticSharding:
(ShardingConf(kind: StaticSharding), bSubscribeShards.get(@[]))
of AutoSharding:
let numShardsInCluster = bNumShardsInCluster.get(1)
let shardingConf =
ShardingConf(kind: AutoSharding, numShardsInCluster: numShardsInCluster)
let upperShard = uint16(numShardsInCluster - 1)
(shardingConf, bSubscribeShards.get(toSeq(0.uint16 .. upperShard)))
proc applyNetworkConf(builder: var WakuConfBuilder) =
# Apply network conf, overrides most values passed individually
# If you want to tweak values, don't use networkConf
# TODO: networkconf should be one field of the conf builder so that this function becomes unnecessary
if builder.networkConf.isNone():
return
let clusterConf = builder.clusterConf.get()
let networkConf = builder.networkConf.get()
if builder.clusterId.isSome():
warn "Cluster id was provided alongside a cluster conf",
used = clusterConf.clusterId, discarded = builder.clusterId.get()
builder.clusterId = some(clusterConf.clusterId)
warn "Cluster id was provided alongside a network conf",
used = networkConf.clusterId, discarded = builder.clusterId.get()
builder.clusterId = some(networkConf.clusterId)
# Apply relay parameters
if builder.relay.get(false) and clusterConf.rlnRelay:
if builder.relay.get(false) and networkConf.rlnRelay:
if builder.rlnRelayConf.enabled.isSome():
warn "RLN Relay was provided alongside a cluster conf",
used = clusterConf.rlnRelay, discarded = builder.rlnRelayConf.enabled
warn "RLN Relay was provided alongside a network conf",
used = networkConf.rlnRelay, discarded = builder.rlnRelayConf.enabled
builder.rlnRelayConf.withEnabled(true)
if builder.rlnRelayConf.ethContractAddress.get("") != "":
warn "RLN Relay ETH Contract Address was provided alongside a cluster conf",
used = clusterConf.rlnRelayEthContractAddress.string,
warn "RLN Relay ETH Contract Address was provided alongside a network conf",
used = networkConf.rlnRelayEthContractAddress.string,
discarded = builder.rlnRelayConf.ethContractAddress.get().string
builder.rlnRelayConf.withEthContractAddress(clusterConf.rlnRelayEthContractAddress)
builder.rlnRelayConf.withEthContractAddress(networkConf.rlnRelayEthContractAddress)
if builder.rlnRelayConf.chainId.isSome():
warn "RLN Relay Chain Id was provided alongside a cluster conf",
used = clusterConf.rlnRelayChainId, discarded = builder.rlnRelayConf.chainId
builder.rlnRelayConf.withChainId(clusterConf.rlnRelayChainId)
warn "RLN Relay Chain Id was provided alongside a network conf",
used = networkConf.rlnRelayChainId, discarded = builder.rlnRelayConf.chainId
builder.rlnRelayConf.withChainId(networkConf.rlnRelayChainId)
if builder.rlnRelayConf.dynamic.isSome():
warn "RLN Relay Dynamic was provided alongside a cluster conf",
used = clusterConf.rlnRelayDynamic, discarded = builder.rlnRelayConf.dynamic
builder.rlnRelayConf.withDynamic(clusterConf.rlnRelayDynamic)
warn "RLN Relay Dynamic was provided alongside a network conf",
used = networkConf.rlnRelayDynamic, discarded = builder.rlnRelayConf.dynamic
builder.rlnRelayConf.withDynamic(networkConf.rlnRelayDynamic)
if builder.rlnRelayConf.epochSizeSec.isSome():
warn "RLN Epoch Size in Seconds was provided alongside a cluster conf",
used = clusterConf.rlnEpochSizeSec,
warn "RLN Epoch Size in Seconds was provided alongside a network conf",
used = networkConf.rlnEpochSizeSec,
discarded = builder.rlnRelayConf.epochSizeSec
builder.rlnRelayConf.withEpochSizeSec(clusterConf.rlnEpochSizeSec)
builder.rlnRelayConf.withEpochSizeSec(networkConf.rlnEpochSizeSec)
if builder.rlnRelayConf.userMessageLimit.isSome():
warn "RLN Relay Dynamic was provided alongside a cluster conf",
used = clusterConf.rlnRelayUserMessageLimit,
warn "RLN Relay Dynamic was provided alongside a network conf",
used = networkConf.rlnRelayUserMessageLimit,
discarded = builder.rlnRelayConf.userMessageLimit
builder.rlnRelayConf.withUserMessageLimit(clusterConf.rlnRelayUserMessageLimit)
builder.rlnRelayConf.withUserMessageLimit(networkConf.rlnRelayUserMessageLimit)
# End Apply relay parameters
case builder.maxMessageSize.kind
of mmskNone:
discard
of mmskStr, mmskInt:
warn "Max Message Size was provided alongside a cluster conf",
used = clusterConf.maxMessageSize, discarded = $builder.maxMessageSize
builder.withMaxMessageSize(parseCorrectMsgSize(clusterConf.maxMessageSize))
warn "Max Message Size was provided alongside a network conf",
used = networkConf.maxMessageSize, discarded = $builder.maxMessageSize
builder.withMaxMessageSize(parseCorrectMsgSize(networkConf.maxMessageSize))
if builder.numShardsInNetwork.isSome():
warn "Num Shards In Network was provided alongside a cluster conf",
used = clusterConf.numShardsInNetwork, discarded = builder.numShardsInNetwork
builder.numShardsInNetwork = some(clusterConf.numShardsInNetwork)
if builder.shardingConf.isSome():
warn "Sharding Conf was provided alongside a network conf",
used = networkConf.shardingConf.kind, discarded = builder.shardingConf
if clusterConf.discv5Discovery:
if builder.numShardsInCluster.isSome():
warn "Num Shards In Cluster was provided alongside a network conf",
used = networkConf.shardingConf.numShardsInCluster,
discarded = builder.numShardsInCluster
case networkConf.shardingConf.kind
of StaticSharding:
builder.shardingConf = some(StaticSharding)
of AutoSharding:
builder.shardingConf = some(AutoSharding)
builder.numShardsInCluster = some(networkConf.shardingConf.numShardsInCluster)
if networkConf.discv5Discovery:
if builder.discv5Conf.enabled.isNone:
builder.discv5Conf.withEnabled(clusterConf.discv5Discovery)
builder.discv5Conf.withEnabled(networkConf.discv5Discovery)
if builder.discv5Conf.bootstrapNodes.len == 0 and
clusterConf.discv5BootstrapNodes.len > 0:
warn "Discv5 Boostrap nodes were provided alongside a cluster conf",
used = clusterConf.discv5BootstrapNodes,
networkConf.discv5BootstrapNodes.len > 0:
warn "Discv5 Bootstrap nodes were provided alongside a network conf",
used = networkConf.discv5BootstrapNodes,
discarded = builder.discv5Conf.bootstrapNodes
builder.discv5Conf.withBootstrapNodes(clusterConf.discv5BootstrapNodes)
builder.discv5Conf.withBootstrapNodes(networkConf.discv5BootstrapNodes)
proc build*(
builder: var WakuConfBuilder, rng: ref HmacDrbgContext = crypto.newRng()
@ -361,7 +395,7 @@ proc build*(
## of libwaku. It aims to be agnostic so it does not apply a
## default when it is opinionated.
applyClusterConf(builder)
applyNetworkConf(builder)
let relay =
if builder.relay.isSome():
@ -411,24 +445,14 @@ proc build*(
else:
builder.clusterId.get().uint16
let numShardsInNetwork =
if builder.numShardsInNetwork.isSome():
builder.numShardsInNetwork.get()
else:
warn "Number of shards in network not specified, defaulting to zero (improve is wip)"
0
let shards =
if builder.shards.isSome():
builder.shards.get()
else:
warn "shards not specified, defaulting to all shards in network"
# TODO: conversion should not be needed
let upperShard: uint16 = uint16(numShardsInNetwork - 1)
toSeq(0.uint16 .. upperShard)
let (shardingConf, subscribeShards) = buildShardingConf(
builder.shardingConf, builder.numShardsInCluster, builder.subscribeShards
)
let protectedShards = builder.protectedShards.get(@[])
info "Sharding configuration: ",
shardingConf = $shardingConf, subscribeShards = $subscribeShards
let maxMessageSizeBytes =
case builder.maxMessageSize.kind
of mmskInt:
@ -584,9 +608,9 @@ proc build*(
# end confs
nodeKey: nodeKey,
clusterId: clusterId,
numShardsInNetwork: numShardsInNetwork,
shardingConf: shardingConf,
contentTopics: contentTopics,
shards: shards,
subscribeShards: subscribeShards,
protectedShards: protectedShards,
relay: relay,
lightPush: lightPush,
@ -601,7 +625,7 @@ proc build*(
logLevel: logLevel,
logFormat: logFormat,
# TODO: Separate builders
networkConf: NetworkConfig(
endpointConf: EndpointConf(
natStrategy: natStrategy,
p2pTcpPort: p2pTcpPort,
dns4DomainName: dns4DomainName,

View File

@ -1,5 +1,5 @@
import chronicles, std/[net, options], results
import ../networks_config
import waku/factory/waku_conf
logScope:
topics = "waku conf builder websocket"

View File

@ -314,28 +314,16 @@ hence would have reachability issues.""",
name: "staticnode"
.}: seq[string]
# TODO: This is trying to do too much, this should only be used for autosharding, which itself should be configurable
# If numShardsInNetwork is not set, we use the number of shards configured as numShardsInNetwork
numShardsInNetwork* {.
desc: "Number of shards in the network",
defaultValue: 0,
desc:
"Enables autosharding and set number of shards in the cluster, set to `0` to use static sharding",
defaultValue: 1,
name: "num-shards-in-network"
.}: uint32
.}: uint16
shards* {.
desc:
"Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.",
defaultValue:
@[
uint16(0),
uint16(1),
uint16(2),
uint16(3),
uint16(4),
uint16(5),
uint16(6),
uint16(7),
],
"Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated. Subscribes to all shards by default in auto-sharding, no shard for static sharding",
name: "shard"
.}: seq[uint16]
@ -858,9 +846,9 @@ proc toKeystoreGeneratorConf*(n: WakuNodeConf): RlnKeystoreGeneratorConf =
proc toInspectRlnDbConf*(n: WakuNodeConf): InspectRlnDbConf =
return InspectRlnDbConf(treePath: n.treePath)
proc toClusterConf(
proc toNetworkConf(
preset: string, clusterId: Option[uint16]
): ConfResult[Option[ClusterConf]] =
): ConfResult[Option[NetworkConf]] =
var lcPreset = toLowerAscii(preset)
if clusterId.isSome() and clusterId.get() == 1:
warn(
@ -870,9 +858,9 @@ proc toClusterConf(
case lcPreset
of "":
ok(none(ClusterConf))
ok(none(NetworkConf))
of "twn":
ok(some(ClusterConf.TheWakuNetworkConf()))
ok(some(NetworkConf.TheWakuNetworkConf()))
else:
err("Invalid --preset value passed: " & lcPreset)
@ -909,11 +897,11 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] =
b.withProtectedShards(n.protectedShards)
b.withClusterId(n.clusterId)
let clusterConf = toClusterConf(n.preset, some(n.clusterId)).valueOr:
let networkConf = toNetworkConf(n.preset, some(n.clusterId)).valueOr:
return err("Error determining cluster from preset: " & $error)
if clusterConf.isSome():
b.withClusterConf(clusterConf.get())
if networkConf.isSome():
b.withNetworkConf(networkConf.get())
b.withAgentString(n.agentString)
@ -948,9 +936,16 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] =
b.withStaticNodes(n.staticNodes)
if n.numShardsInNetwork != 0:
b.withNumShardsInNetwork(n.numShardsInNetwork)
b.withNumShardsInCluster(n.numShardsInNetwork)
b.withShardingConf(AutoSharding)
else:
b.withShardingConf(StaticSharding)
# It is not possible to pass an empty sequence on the CLI
# If this is empty, it means the user did not specify any shards
if n.shards.len != 0:
b.withSubscribeShards(n.shards)
b.withShards(n.shards)
b.withContentTopics(n.contentTopics)
b.storeServiceConf.withEnabled(n.store)

View File

@ -6,13 +6,7 @@ import
libp2p/nameresolving/dnsresolver,
std/[options, sequtils, net],
results
import
../common/utils/nat,
../node/net_config,
../waku_enr,
../waku_core,
./waku_conf,
./networks_config
import ../common/utils/nat, ../node/net_config, ../waku_enr, ../waku_core, ./waku_conf
proc enrConfiguration*(
conf: WakuConf, netConfig: NetConfig
@ -29,7 +23,7 @@ proc enrConfiguration*(
enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs)
enrBuilder.withWakuRelaySharding(
RelayShards(clusterId: conf.clusterId, shardIds: conf.shards)
RelayShards(clusterId: conf.clusterId, shardIds: conf.subscribeShards)
).isOkOr:
return err("could not initialize ENR with shards")
@ -64,7 +58,7 @@ proc dnsResolve*(
# TODO: Reduce number of parameters, can be done once the same is done on Netconfig.init
proc networkConfiguration*(
clusterId: uint16,
conf: NetworkConfig,
conf: EndpointConf,
discv5Conf: Option[Discv5Conf],
webSocketConf: Option[WebSocketConf],
wakuFlags: CapabilitiesBitfield,
@ -143,11 +137,3 @@ proc networkConfiguration*(
)
return netConfigRes
# TODO: numShardsInNetwork should be mandatory with autosharding, and unneeded otherwise
proc getNumShardsInNetwork*(conf: WakuConf): uint32 =
if conf.numShardsInNetwork != 0:
return conf.numShardsInNetwork
# If conf.numShardsInNetwork is not set, use 1024 - the maximum possible as per the static sharding spec
# https://github.com/waku-org/specs/blob/master/standards/core/relay-sharding.md#static-sharding
return uint32(MaxShardIndex + 1)

View File

@ -1,18 +1,23 @@
{.push raises: [].}
import stint, std/[nativesockets, options]
import chronicles, results, stint
type WebSocketSecureConf* {.requiresInit.} = object
keyPath*: string
certPath*: string
logScope:
topics = "waku networks conf"
type WebSocketConf* = object
port*: Port
secureConf*: Option[WebSocketSecureConf]
type
ShardingConfKind* = enum
AutoSharding
StaticSharding
# TODO: Rename this type to match file name
ShardingConf* = object
case kind*: ShardingConfKind
of AutoSharding:
numShardsInCluster*: uint16
of StaticSharding:
discard
type ClusterConf* = object
type NetworkConf* = object
maxMessageSize*: string # TODO: static convert to a uint64
clusterId*: uint16
rlnRelay*: bool
@ -21,17 +26,16 @@ type ClusterConf* = object
rlnRelayDynamic*: bool
rlnEpochSizeSec*: uint64
rlnRelayUserMessageLimit*: uint64
# TODO: should be uint16 like the `shards` parameter
numShardsInNetwork*: uint32
shardingConf*: ShardingConf
discv5Discovery*: bool
discv5BootstrapNodes*: seq[string]
# cluster-id=1 (aka The Waku Network)
# Cluster configuration corresponding to The Waku Network. Note that it
# overrides existing cli configuration
proc TheWakuNetworkConf*(T: type ClusterConf): ClusterConf =
proc TheWakuNetworkConf*(T: type NetworkConf): NetworkConf =
const RelayChainId = 59141'u256
return ClusterConf(
return NetworkConf(
maxMessageSize: "150KiB",
clusterId: 1,
rlnRelay: true,
@ -40,7 +44,7 @@ proc TheWakuNetworkConf*(T: type ClusterConf): ClusterConf =
rlnRelayChainId: RelayChainId,
rlnEpochSizeSec: 600,
rlnRelayUserMessageLimit: 100,
numShardsInNetwork: 8,
shardingConf: ShardingConf(kind: AutoSharding, numShardsInCluster: 8),
discv5Discovery: true,
discv5BootstrapNodes:
@[
@ -49,3 +53,21 @@ proc TheWakuNetworkConf*(T: type ClusterConf): ClusterConf =
"enr:-QEkuEBfEzJm_kigJ2HoSS_RBFJYhKHocGdkhhBr6jSUAWjLdFPp6Pj1l4yiTQp7TGHyu1kC6FyaU573VN8klLsEm-XuAYJpZIJ2NIJpcIQI2SVcim11bHRpYWRkcnO4bgA0Ni9ub2RlLTAxLmFjLWNuLWhvbmdrb25nLWMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQZ2XwA2Ni9ub2RlLTAxLmFjLWNuLWhvbmdrb25nLWMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQYfQN4DgnJzkwABCAAAAAEAAgADAAQABQAGAAeJc2VjcDI1NmsxoQOwsS69tgD7u1K50r5-qG5hweuTwa0W26aYPnvivpNlrYN0Y3CCdl-DdWRwgiMohXdha3UyDw",
],
)
proc validateShards*(
shardingConf: ShardingConf, shards: seq[uint16]
): Result[void, string] =
case shardingConf.kind
of StaticSharding:
return ok()
of AutoSharding:
let numShardsInCluster = shardingConf.numShardsInCluster
for shard in shards:
if shard >= numShardsInCluster:
let msg =
"validateShards invalid shard: " & $shard & " when numShardsInCluster: " &
$numShardsInCluster
error "validateShards failed", error = msg
return err(msg)
return ok()

View File

@ -10,6 +10,7 @@ import
import
./internal_config,
./networks_config,
./waku_conf,
./builder,
./validator_signed,
@ -137,10 +138,12 @@ proc initNode(
proc getAutoshards*(
node: WakuNode, contentTopics: seq[string]
): Result[seq[RelayShard], string] =
if node.wakuAutoSharding.isNone():
return err("Static sharding used, cannot get shards from content topics")
var autoShards: seq[RelayShard]
for contentTopic in contentTopics:
let shard = node.wakuSharding.getShard(contentTopic).valueOr:
return err("Could not parse content topic: " & error)
let shard = node.wakuAutoSharding.get().getShard(contentTopic).valueOr:
return err("Could not parse content topic: " & error)
autoShards.add(shard)
return ok(autoshards)
@ -258,16 +261,11 @@ proc setupProtocols(
if conf.storeServiceConf.isSome and conf.storeServiceConf.get().resume:
node.setupStoreResume()
# If conf.numShardsInNetwork is not set, use the number of shards configured as numShardsInNetwork
let numShardsInNetwork = getNumShardsInNetwork(conf)
if conf.numShardsInNetwork == 0:
warn "Number of shards in network not configured, setting it to",
# TODO: If not configured, it mounts 1024 shards! Make it a mandatory configuration instead
numShardsInNetwork = $numShardsInNetwork
node.mountSharding(conf.clusterId, numShardsInNetwork).isOkOr:
return err("failed to mount waku sharding: " & error)
if conf.shardingConf.kind == AutoSharding:
node.mountAutoSharding(conf.clusterId, conf.shardingConf.numShardsInCluster).isOkOr:
return err("failed to mount waku auto sharding: " & error)
else:
warn("Auto sharding is disabled")
# Mount relay on all nodes
var peerExchangeHandler = none(RoutingRecordsHandler)
@ -290,14 +288,22 @@ proc setupProtocols(
peerExchangeHandler = some(handlePeerExchange)
let autoShards = node.getAutoshards(conf.contentTopics).valueOr:
return err("Could not get autoshards: " & error)
# TODO: when using autosharding, the user should not be expected to pass any shards, but only content topics
# Hence, this joint logic should be removed in favour of an either logic:
# use passed shards (static) or deduce shards from content topics (auto)
let autoShards =
if node.wakuAutoSharding.isSome():
node.getAutoshards(conf.contentTopics).valueOr:
return err("Could not get autoshards: " & error)
else:
@[]
debug "Shards created from content topics",
contentTopics = conf.contentTopics, shards = autoShards
let confShards =
conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it)))
let confShards = conf.subscribeShards.mapIt(
RelayShard(clusterId: conf.clusterId, shardId: uint16(it))
)
let shards = confShards & autoShards
if conf.relay:
@ -313,7 +319,7 @@ proc setupProtocols(
# Add validation keys to protected topics
var subscribedProtectedShards: seq[ProtectedShard]
for shardKey in conf.protectedShards:
if shardKey.shard notin conf.shards:
if shardKey.shard notin conf.subscribeShards:
warn "protected shard not in subscribed shards, skipping adding validator",
protectedShard = shardKey.shard, subscribedShards = shards
continue
@ -472,7 +478,7 @@ proc setupNode*(
wakuConf: WakuConf, rng: ref HmacDrbgContext = crypto.newRng(), relay: Relay
): Result[WakuNode, string] =
let netConfig = networkConfiguration(
wakuConf.clusterId, wakuConf.networkConf, wakuConf.discv5Conf,
wakuConf.clusterId, wakuConf.endpointConf, wakuConf.discv5Conf,
wakuConf.webSocketConf, wakuConf.wakuFlags, wakuConf.dnsAddrsNameServers,
wakuConf.portsShift, clientId,
).valueOr:

View File

@ -130,8 +130,9 @@ proc setupAppCallbacks(
let autoShards = node.getAutoshards(conf.contentTopics).valueOr:
return err("Could not get autoshards: " & error)
let confShards =
conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it)))
let confShards = conf.subscribeShards.mapIt(
RelayShard(clusterId: conf.clusterId, shardId: uint16(it))
)
let shards = confShards & autoShards
let uniqueShards = deduplicate(shards)
@ -249,14 +250,14 @@ proc getRunningNetConfig(waku: ptr Waku): Result[NetConfig, string] =
return err("Could not retrieve ports: " & error)
if tcpPort.isSome():
conf.networkConf.p2pTcpPort = tcpPort.get()
conf.endpointConf.p2pTcpPort = tcpPort.get()
if websocketPort.isSome() and conf.webSocketConf.isSome():
conf.webSocketConf.get().port = websocketPort.get()
# Rebuild NetConfig with bound port values
let netConf = networkConfiguration(
conf.clusterId, conf.networkConf, conf.discv5Conf, conf.webSocketConf,
conf.clusterId, conf.endpointConf, conf.discv5Conf, conf.webSocketConf,
conf.wakuFlags, conf.dnsAddrsNameServers, conf.portsShift, clientId,
).valueOr:
return err("Could not update NetConfig: " & error)
@ -306,7 +307,7 @@ proc updateAddressInENR(waku: ptr Waku): Result[void, string] =
proc updateWaku(waku: ptr Waku): Result[void, string] =
let conf = waku[].conf
if conf.networkConf.p2pTcpPort == Port(0) or
if conf.endpointConf.p2pTcpPort == Port(0) or
(conf.websocketConf.isSome() and conf.websocketConf.get.port == Port(0)):
updateEnr(waku).isOkOr:
return err("error calling updateEnr: " & $error)
@ -389,7 +390,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} =
waku.dynamicBootstrapNodes,
waku.rng,
conf.nodeKey,
conf.networkConf.p2pListenAddress,
conf.endpointConf.p2pListenAddress,
conf.portsShift,
)
@ -413,7 +414,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} =
conf.relay,
conf.lightPush,
conf.clusterId,
conf.shards,
conf.subscribeShards,
conf.contentTopics,
).isOkOr:
return err ("Starting protocols support REST server failed: " & $error)

View File

@ -20,6 +20,14 @@ export RlnRelayConf, RlnRelayCreds, RestServerConf, Discv5Conf, MetricsServerCon
logScope:
topics = "waku conf"
type WebSocketSecureConf* {.requiresInit.} = object
keyPath*: string
certPath*: string
type WebSocketConf* = object
port*: Port
secureConf*: Option[WebSocketSecureConf]
# TODO: should be defined in validator_signed.nim and imported here
type ProtectedShard* {.requiresInit.} = object
shard*: uint16
@ -50,7 +58,7 @@ type FilterServiceConf* {.requiresInit.} = object
subscriptionTimeout*: uint16
maxCriteria*: uint32
type NetworkConfig* = object # TODO: make enum
type EndpointConf* = object # TODO: make enum
natStrategy*: string
p2pTcpPort*: Port
dns4DomainName*: Option[string]
@ -68,11 +76,10 @@ type WakuConf* {.requiresInit.} = ref object
nodeKey*: crypto.PrivateKey
clusterId*: uint16
shards*: seq[uint16]
subscribeShards*: seq[uint16]
protectedShards*: seq[ProtectedShard]
# TODO: move to an autoShardingConf
numShardsInNetwork*: uint32
shardingConf*: ShardingConf
contentTopics*: seq[string]
relay*: bool
@ -95,7 +102,7 @@ type WakuConf* {.requiresInit.} = ref object
portsShift*: uint16
dnsAddrsNameServers*: seq[IpAddress]
networkConf*: NetworkConfig
endpointConf*: EndpointConf
wakuFlags*: CapabilitiesBitfield
# TODO: could probably make it a `PeerRemoteInfo`
@ -142,8 +149,8 @@ proc logConf*(conf: WakuConf) =
info "Configuration. Network", cluster = conf.clusterId
for shard in conf.shards:
info "Configuration. Shards", shard = shard
for shard in conf.subscribeShards:
info "Configuration. Active Relay Shards", shard = shard
if conf.discv5Conf.isSome():
for i in conf.discv5Conf.get().bootstrapNodes:
@ -165,26 +172,9 @@ proc validateNodeKey(wakuConf: WakuConf): Result[void, string] =
return err("nodekey param is invalid")
return ok()
proc validateShards(wakuConf: WakuConf): Result[void, string] =
let numShardsInNetwork = wakuConf.numShardsInNetwork
# TODO: fix up this behaviour
if numShardsInNetwork == 0:
return ok()
for shard in wakuConf.shards:
if shard >= numShardsInNetwork:
let msg =
"validateShards invalid shard: " & $shard & " when numShardsInNetwork: " &
$numShardsInNetwork # fmt doesn't work
error "validateShards failed", error = msg
return err(msg)
return ok()
proc validateNoEmptyStrings(wakuConf: WakuConf): Result[void, string] =
if wakuConf.networkConf.dns4DomainName.isSome() and
isEmptyOrWhiteSpace(wakuConf.networkConf.dns4DomainName.get().string):
if wakuConf.endpointConf.dns4DomainName.isSome() and
isEmptyOrWhiteSpace(wakuConf.endpointConf.dns4DomainName.get().string):
return err("dns4-domain-name is an empty string, set it to none(string) instead")
if isEmptyOrWhiteSpace(wakuConf.relayServiceRatio):
@ -236,6 +226,6 @@ proc validateNoEmptyStrings(wakuConf: WakuConf): Result[void, string] =
proc validate*(wakuConf: WakuConf): Result[void, string] =
?wakuConf.validateNodeKey()
?wakuConf.validateShards()
?wakuConf.shardingConf.validateShards(wakuConf.subscribeShards)
?wakuConf.validateNoEmptyStrings()
return ok()

View File

@ -112,7 +112,7 @@ type
wakuLightpushClient*: WakuLightPushClient
wakuPeerExchange*: WakuPeerExchange
wakuMetadata*: WakuMetadata
wakuSharding*: Sharding
wakuAutoSharding*: Option[Sharding]
enr*: enr.Record
libp2pPing*: Ping
rng*: ref rand.HmacDrbgContext
@ -198,12 +198,13 @@ proc mountMetadata*(node: WakuNode, clusterId: uint32): Result[void, string] =
return ok()
## Waku Sharding
proc mountSharding*(
## Waku AutoSharding
proc mountAutoSharding*(
node: WakuNode, clusterId: uint16, shardCount: uint32
): Result[void, string] =
info "mounting sharding", clusterId = clusterId, shardCount = shardCount
node.wakuSharding = Sharding(clusterId: clusterId, shardCountGenZero: shardCount)
info "mounting auto sharding", clusterId = clusterId, shardCount = shardCount
node.wakuAutoSharding =
some(Sharding(clusterId: clusterId, shardCountGenZero: shardCount))
return ok()
## Waku Sync
@ -322,11 +323,15 @@ proc subscribe*(
let (pubsubTopic, contentTopicOp) =
case subscription.kind
of ContentSub:
let shard = node.wakuSharding.getShard((subscription.topic)).valueOr:
error "Autosharding error", error = error
return err("Autosharding error: " & error)
($shard, some(subscription.topic))
if node.wakuAutoSharding.isSome():
let shard = node.wakuAutoSharding.get().getShard((subscription.topic)).valueOr:
error "Autosharding error", error = error
return err("Autosharding error: " & error)
($shard, some(subscription.topic))
else:
return err(
"Static sharding is used, relay subscriptions must specify a pubsub topic"
)
of PubsubSub:
(subscription.topic, none(ContentTopic))
else:
@ -353,11 +358,15 @@ proc unsubscribe*(
let (pubsubTopic, contentTopicOp) =
case subscription.kind
of ContentUnsub:
let shard = node.wakuSharding.getShard((subscription.topic)).valueOr:
error "Autosharding error", error = error
return err("Autosharding error: " & error)
($shard, some(subscription.topic))
if node.wakuAutoSharding.isSome():
let shard = node.wakuAutoSharding.get().getShard((subscription.topic)).valueOr:
error "Autosharding error", error = error
return err("Autosharding error: " & error)
($shard, some(subscription.topic))
else:
return err(
"Static sharding is used, relay subscriptions must specify a pubsub topic"
)
of PubsubUnsub:
(subscription.topic, none(ContentTopic))
else:
@ -388,9 +397,10 @@ proc publish*(
return err(msg)
let pubsubTopic = pubsubTopicOp.valueOr:
node.wakuSharding.getShard(message.contentTopic).valueOr:
if node.wakuAutoSharding.isNone():
return err("Pubsub topic must be specified when static sharding is enabled.")
node.wakuAutoSharding.get().getShard(message.contentTopic).valueOr:
let msg = "Autosharding error: " & error
error "publish error", err = msg
return err(msg)
#TODO instead of discard return error when 0 peers received the message
@ -564,8 +574,14 @@ proc filterSubscribe*(
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
return subRes
elif node.wakuAutoSharding.isNone():
error "Failed filter subscription, pubsub topic must be specified with static sharding"
waku_node_errors.inc(labelValues = ["subscribe_filter_failure"])
else:
let topicMapRes = node.wakuSharding.parseSharding(pubsubTopic, contentTopics)
# No pubsub topic, autosharding is used to deduce it
# but content topics must be well-formed for this
let topicMapRes =
node.wakuAutoSharding.get().getShardsFromContentTopics(contentTopics)
let topicMap =
if topicMapRes.isErr():
@ -575,11 +591,11 @@ proc filterSubscribe*(
topicMapRes.get()
var futures = collect(newSeq):
for pubsub, topics in topicMap.pairs:
for shard, topics in topicMap.pairs:
info "registering filter subscription to content",
pubsubTopic = pubsub, contentTopics = topics, peer = remotePeer.peerId
shard = shard, contentTopics = topics, peer = remotePeer.peerId
let content = topics.mapIt($it)
node.wakuFilterClient.subscribe(remotePeer, $pubsub, content)
node.wakuFilterClient.subscribe(remotePeer, $shard, content)
var subRes: FilterSubscribeResult = FilterSubscribeResult.ok()
try:
@ -643,8 +659,12 @@ proc filterUnsubscribe*(
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
return unsubRes
elif node.wakuAutoSharding.isNone():
error "Failed filter un-subscription, pubsub topic must be specified with static sharding"
waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"])
else: # pubsubTopic.isNone
let topicMapRes = node.wakuSharding.parseSharding(pubsubTopic, contentTopics)
let topicMapRes =
node.wakuAutoSharding.get().getShardsFromContentTopics(contentTopics)
let topicMap =
if topicMapRes.isErr():
@ -654,11 +674,11 @@ proc filterUnsubscribe*(
topicMapRes.get()
var futures = collect(newSeq):
for pubsub, topics in topicMap.pairs:
for shard, topics in topicMap.pairs:
info "deregistering filter subscription to content",
pubsubTopic = pubsub, contentTopics = topics, peer = remotePeer.peerId
shard = shard, contentTopics = topics, peer = remotePeer.peerId
let content = topics.mapIt($it)
node.wakuFilterClient.unsubscribe(remotePeer, $pubsub, content)
node.wakuFilterClient.unsubscribe(remotePeer, $shard, content)
var unsubRes: FilterSubscribeResult = FilterSubscribeResult.ok()
try:
@ -1064,7 +1084,10 @@ proc legacyLightpushPublish*(
if pubsubTopic.isSome():
return await internalPublish(node, pubsubTopic.get(), message, peer)
let topicMapRes = node.wakuSharding.parseSharding(pubsubTopic, message.contentTopic)
if node.wakuAutoSharding.isNone():
return err("Pubsub topic must be specified when static sharding is enabled")
let topicMapRes =
node.wakuAutoSharding.get().getShardsFromContentTopics(message.contentTopic)
let topicMap =
if topicMapRes.isErr():
@ -1120,7 +1143,7 @@ proc mountLightPush*(
lightpush_protocol.getRelayPushHandler(node.wakuRelay, rlnPeer)
node.wakuLightPush = WakuLightPush.new(
node.peerManager, node.rng, pushHandler, node.wakuSharding, some(rateLimit)
node.peerManager, node.rng, pushHandler, node.wakuAutoSharding, some(rateLimit)
)
if node.started:
@ -1181,12 +1204,17 @@ proc lightpushPublish*(
return lighpushErrorResult(NO_PEERS_TO_RELAY, "no suitable remote peers")
let pubsubForPublish = pubSubTopic.valueOr:
if node.wakuAutoSharding.isNone():
let msg = "Pubsub topic must be specified when static sharding is enabled"
error "lightpush publish error", error = msg
return lighpushErrorResult(INVALID_MESSAGE_ERROR, msg)
let parsedTopic = NsContentTopic.parse(message.contentTopic).valueOr:
let msg = "Invalid content-topic:" & $error
error "lightpush request handling error", error = msg
return lighpushErrorResult(INVALID_MESSAGE_ERROR, msg)
node.wakuSharding.getShard(parsedTopic).valueOr:
node.wakuAutoSharding.get().getShard(parsedTopic).valueOr:
let msg = "Autosharding error: " & error
error "lightpush publish error", error = msg
return lighpushErrorResult(INTERNAL_SERVER_ERROR, msg)

View File

@ -241,13 +241,20 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
let shard = shardId.valueOr:
return RestApiResponse.badRequest(fmt("Invalid shardId: {error}"))
if node.wakuMetadata.isNil():
return RestApiResponse.serviceUnavailable(
"Error: Metadata Protocol is not mounted to the node"
)
if node.wakuRelay.isNil():
return RestApiResponse.serviceUnavailable(
"Error: Relay Protocol is not mounted to the node"
)
let topic =
toPubsubTopic(RelayShard(clusterId: node.wakuSharding.clusterId, shardId: shard))
# TODO: clusterId and shards should be uint16 across all codebase and probably be defined as a type
let topic = toPubsubTopic(
RelayShard(clusterId: node.wakuMetadata.clusterId.uint16, shardId: shard)
)
let pubsubPeers =
node.wakuRelay.getConnectedPubSubPeers(topic).get(initHashSet[PubSubPeer](0))
let relayPeer = PeersOfShard(
@ -284,13 +291,19 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
let shard = shardId.valueOr:
return RestApiResponse.badRequest(fmt("Invalid shardId: {error}"))
if node.wakuMetadata.isNil():
return RestApiResponse.serviceUnavailable(
"Error: Metadata Protocol is not mounted to the node"
)
if node.wakuRelay.isNil():
return RestApiResponse.serviceUnavailable(
"Error: Relay Protocol is not mounted to the node"
)
let topic =
toPubsubTopic(RelayShard(clusterId: node.wakuSharding.clusterId, shardId: shard))
let topic = toPubsubTopic(
RelayShard(clusterId: node.wakuMetadata.clusterId.uint16, shardId: shard)
)
let peers =
node.wakuRelay.getPubSubPeersInMesh(topic).get(initHashSet[PubSubPeer](0))
let relayPeer = PeersOfShard(

View File

@ -151,17 +151,19 @@ proc startRestServerProtocolSupport*(
error "Could not subscribe", pubsubTopic, error
continue
for contentTopic in contentTopics:
cache.contentSubscribe(contentTopic)
if node.wakuAutoSharding.isSome():
# Only deduce pubsub topics to subscribe to from content topics if autosharding is enabled
for contentTopic in contentTopics:
cache.contentSubscribe(contentTopic)
let shard = node.wakuSharding.getShard(contentTopic).valueOr:
error "Autosharding error in REST", error = error
continue
let pubsubTopic = $shard
let shard = node.wakuAutoSharding.get().getShard(contentTopic).valueOr:
error "Autosharding error in REST", error = error
continue
let pubsubTopic = $shard
node.subscribe((kind: PubsubSub, topic: pubsubTopic), handler).isOkOr:
error "Could not subscribe", pubsubTopic, error
continue
node.subscribe((kind: PubsubSub, topic: pubsubTopic), handler).isOkOr:
error "Could not subscribe", pubsubTopic, error
continue
installRelayApiHandlers(router, node, cache)
else:

View File

@ -272,11 +272,16 @@ proc installRelayApiHandlers*(
var message: WakuMessage = req.toWakuMessage(version = 0).valueOr:
return RestApiResponse.badRequest()
let pubsubTopic = node.wakuSharding.getShard(message.contentTopic).valueOr:
let msg = "Autosharding error: " & error
if node.wakuAutoSharding.isNone():
let msg = "Autosharding is disabled"
error "publish error", err = msg
return RestApiResponse.badRequest("Failed to publish. " & msg)
let pubsubTopic = node.wakuAutoSharding.get().getShard(message.contentTopic).valueOr:
let msg = "Autosharding error: " & error
error "publish error", err = msg
return RestApiResponse.badRequest("Failed to publish. " & msg)
# if RLN is mounted, append the proof to the message
if not node.wakuRlnRelay.isNil():
node.wakuRlnRelay.appendRLNProof(message, float64(getTime().toUnix())).isOkOr:

View File

@ -122,6 +122,18 @@ proc parse*(
"Invalid content topic structure. Expected either /<application>/<version>/<topic-name>/<encoding> or /<gen>/<application>/<version>/<topic-name>/<encoding>"
return err(ParsingError.invalidFormat(errMsg))
proc parse*(
T: type NsContentTopic, topics: seq[ContentTopic]
): ParsingResult[seq[NsContentTopic]] =
var res: seq[NsContentTopic] = @[]
for contentTopic in topics:
let parseRes = NsContentTopic.parse(contentTopic)
if parseRes.isErr():
let error: ParsingError = parseRes.error
return ParsingResult[seq[NsContentTopic]].err(error)
res.add(parseRes.value)
return ParsingResult[seq[NsContentTopic]].ok(res)
# Content topic compatibility
converter toContentTopic*(topic: NsContentTopic): ContentTopic =

View File

@ -8,6 +8,7 @@ import nimcrypto, std/options, std/tables, stew/endians2, results, stew/byteutil
import ./content_topic, ./pubsub_topic
# TODO: this is autosharding, not just "sharding"
type Sharding* = object
clusterId*: uint16
# TODO: generations could be stored in a table here
@ -50,48 +51,32 @@ proc getShard*(s: Sharding, topic: ContentTopic): Result[RelayShard, string] =
ok(shard)
proc parseSharding*(
s: Sharding,
pubsubTopic: Option[PubsubTopic],
contentTopics: ContentTopic | seq[ContentTopic],
proc getShardsFromContentTopics*(
s: Sharding, contentTopics: ContentTopic | seq[ContentTopic]
): Result[Table[RelayShard, seq[NsContentTopic]], string] =
var topics: seq[ContentTopic]
when contentTopics is seq[ContentTopic]:
topics = contentTopics
else:
topics = @[contentTopics]
let topics =
when contentTopics is seq[ContentTopic]:
contentTopics
else:
@[contentTopics]
let parseRes = NsContentTopic.parse(topics)
let nsContentTopics =
if parseRes.isErr():
return err("Cannot parse content topic: " & $parseRes.error)
else:
parseRes.get()
var topicMap = initTable[RelayShard, seq[NsContentTopic]]()
for contentTopic in topics:
let parseRes = NsContentTopic.parse(contentTopic)
for content in nsContentTopics:
let shard = s.getShard(content).valueOr:
return err("Cannot deduce shard from content topic: " & $error)
let content =
if parseRes.isErr():
return err("Cannot parse content topic: " & $parseRes.error)
else:
parseRes.get()
let pubsub =
if pubsubTopic.isSome():
let parseRes = RelayShard.parse(pubsubTopic.get())
if parseRes.isErr():
return err("Cannot parse pubsub topic: " & $parseRes.error)
else:
parseRes.get()
else:
let shardsRes = s.getShard(content)
if shardsRes.isErr():
return err("Cannot autoshard content topic: " & $shardsRes.error)
else:
shardsRes.get()
if not topicMap.hasKey(pubsub):
topicMap[pubsub] = @[]
if not topicMap.hasKey(shard):
topicMap[shard] = @[]
try:
topicMap[pubsub].add(content)
topicMap[shard].add(content)
except CatchableError:
return err(getCurrentExceptionMsg())

View File

@ -26,12 +26,19 @@ type WakuLightPush* = ref object of LPProtocol
peerManager*: PeerManager
pushHandler*: PushMessageHandler
requestRateLimiter*: RequestRateLimiter
sharding: Sharding
autoSharding: Option[Sharding]
proc handleRequest(
wl: WakuLightPush, peerId: PeerId, pushRequest: LightPushRequest
wl: WakuLightPush, peerId: PeerId, pushRequest: LightpushRequest
): Future[WakuLightPushResult] {.async.} =
let pubsubTopic = pushRequest.pubSubTopic.valueOr:
if wl.autoSharding.isNone():
let msg = "Pubsub topic must be specified when static sharding is enabled"
error "lightpush request handling error", error = msg
return WakuLightPushResult.err(
(code: LightpushStatusCode.INVALID_MESSAGE_ERROR, desc: some(msg))
)
let parsedTopic = NsContentTopic.parse(pushRequest.message.contentTopic).valueOr:
let msg = "Invalid content-topic:" & $error
error "lightpush request handling error", error = msg
@ -39,8 +46,8 @@ proc handleRequest(
(code: LightPushStatusCode.INVALID_MESSAGE_ERROR, desc: some(msg))
)
wl.sharding.getShard(parsedTopic).valueOr:
let msg = "Sharding error: " & error
wl.autoSharding.get().getShard(parsedTopic).valueOr:
let msg = "Auto-sharding error: " & error
error "lightpush request handling error", error = msg
return WakuLightPushResult.err(
(code: LightPushStatusCode.INTERNAL_SERVER_ERROR, desc: some(msg))
@ -149,7 +156,7 @@ proc new*(
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
pushHandler: PushMessageHandler,
sharding: Sharding,
autoSharding: Option[Sharding],
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): T =
let wl = WakuLightPush(
@ -157,7 +164,7 @@ proc new*(
peerManager: peerManager,
pushHandler: pushHandler,
requestRateLimiter: newRequestRateLimiter(rateLimitSetting),
sharding: sharding,
autoSharding: autoSharding,
)
wl.initProtocolHandler()
setServiceLimitMetric(WakuLightpushCodec, rateLimitSetting)

View File

@ -29,7 +29,7 @@ proc respond(
m: WakuMetadata, conn: Connection
): Future[Result[void, string]] {.async, gcsafe.} =
let response =
WakuMetadataResponse(clusterId: some(m.clusterId), shards: toSeq(m.shards))
WakuMetadataResponse(clusterId: some(m.clusterId.uint32), shards: toSeq(m.shards))
let res = catch:
await conn.writeLP(response.encode().buffer)