chore: deprecating pubsub topic (#2997)

This commit is contained in:
gabrielmer 2024-09-10 15:07:12 -06:00 committed by GitHub
parent f34a044ccf
commit 43bea3c476
35 changed files with 322 additions and 270 deletions

View File

@ -6,7 +6,7 @@ when not (compileOption("threads")):
{.push raises: [].}
import std/[strformat, strutils, times, options, random]
import std/[strformat, strutils, times, options, random, sequtils]
import
confutils,
chronicles,
@ -379,7 +379,9 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
raise newException(ConfigurationError, "rln-relay-cred-path MUST be passed")
if conf.relay:
await node.mountRelay(conf.topics.split(" "))
let shards =
conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it)))
await node.mountRelay(shards)
await node.mountLibp2pPing()

View File

@ -83,11 +83,19 @@ type
name: "keep-alive"
.}: bool
topics* {.
desc: "Default topics to subscribe to (space separated list).",
defaultValue: "/waku/2/rs/0/0",
name: "topics"
.}: string
clusterId* {.
desc:
"Cluster id that the node is running in. Node in a different cluster id is disconnected.",
defaultValue: 0,
name: "cluster-id"
.}: uint16
shards* {.
desc:
"Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.",
defaultValue: @[uint16(0)],
name: "shard"
.}: seq[uint16]
## Store config
store* {.

View File

@ -67,12 +67,6 @@ type Chat2MatterbridgeConf* = object
name: "nodekey"
.}: crypto.PrivateKey
topics* {.
desc: "Default topics to subscribe to (space separated list)",
defaultValue: "/waku/2/rs/0/0",
name: "topics"
.}: string
store* {.
desc: "Flag whether to start store protocol", defaultValue: true, name: "store"
.}: bool

View File

@ -98,7 +98,7 @@ type LiteProtocolTesterConf* = object
## TODO: extend lite protocol tester configuration based on testing needs
# shards* {.
# desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.",
# desc: "Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.",
# defaultValue: @[],
# name: "shard"
# .}: seq[uint16]

View File

@ -441,10 +441,12 @@ proc initAndStartApp(
ipAddr = some(extIp), tcpPort = some(nodeTcpPort), udpPort = some(nodeUdpPort)
)
builder.withWakuCapabilities(flags)
let addShardedTopics = builder.withShardedTopics(conf.pubsubTopics)
if addShardedTopics.isErr():
error "failed to add sharded topics to ENR", error = addShardedTopics.error
return err($addShardedTopics.error)
builder.withWakuRelaySharding(
RelayShards(clusterId: conf.clusterId, shardIds: conf.shards)
).isOkOr:
error "failed to add sharded topics to ENR", error = error
return err("failed to add sharded topics to ENR: " & $error)
let recordRes = builder.build()
let record =
@ -561,11 +563,14 @@ when isMainModule:
let twnClusterConf = ClusterConf.TheWakuNetworkConf()
conf.bootstrapNodes = twnClusterConf.discv5BootstrapNodes
conf.pubsubTopics = twnClusterConf.pubsubTopics
conf.rlnRelayDynamic = twnClusterConf.rlnRelayDynamic
conf.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress
conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec
conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit
conf.numShardsInNetwork = twnClusterConf.numShardsInNetwork
if conf.shards.len == 0:
conf.shards = toSeq(uint16(0) .. uint16(twnClusterConf.numShardsInNetwork - 1))
if conf.logLevel != LogLevel.NONE:
setLogLevel(conf.logLevel)
@ -631,9 +636,11 @@ when isMainModule:
error "failed to mount waku metadata protocol: ", err = error
quit 1
for pubsubTopic in conf.pubsubTopics:
# Subscribe the node to the default pubsubtopic, to count messages
subscribeAndHandleMessages(node, pubsubTopic, msgPerContentTopic)
for shard in conf.shards:
# Subscribe the node to the shards, to count messages
subscribeAndHandleMessages(
node, $RelayShard(shardId: shard, clusterId: conf.clusterId), msgPerContentTopic
)
# spawn the routine that crawls the network
# TODO: split into 3 routines (discovery, connections, ip2location)

View File

@ -38,10 +38,15 @@ type NetworkMonitorConf* = object
name: "dns-discovery-url"
.}: string
pubsubTopics* {.
desc: "Default pubsub topic to subscribe to. Argument may be repeated.",
name: "pubsub-topic"
.}: seq[string]
shards* {.
desc:
"Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.",
name: "shard"
.}: seq[uint16]
numShardsInNetwork* {.
desc: "Number of shards in the network", name: "num-shards-in-network"
.}: uint32
refreshInterval* {.
desc: "How often new peers are discovered and connected to (in seconds)",
@ -55,7 +60,7 @@ type NetworkMonitorConf* = object
"Cluster id that the node is running in. Node in a different cluster id is disconnected.",
defaultValue: 1,
name: "cluster-id"
.}: uint32
.}: uint16
rlnRelay* {.
desc: "Enable spam protection through rln-relay: true|false",

View File

@ -81,7 +81,8 @@ type WakuCanaryConf* = object
.}: bool
shards* {.
desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.",
desc:
"Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.",
defaultValue: @[],
name: "shard",
abbr: "s"

View File

@ -18,7 +18,7 @@ By default a nwaku node will:
See [this tutorial](./configure-key.md) if you want to generate and configure a persistent private key.
- listen for incoming libp2p connections on the default TCP port (`60000`)
- enable `relay` protocol
- subscribe to the default pubsub topic, namely `/waku/2/rs/0/0`
- subscribe to the default clusterId (0) and shard (0)
- enable `store` protocol, but only as a client.
This implies that the nwaku node will not persist any historical messages itself,
but can query `store` service peers who do so.

View File

@ -20,8 +20,6 @@ suite "Peer Manager":
serverKey {.threadvar.}: PrivateKey
clientKey {.threadvar.}: PrivateKey
clusterId {.threadvar.}: uint64
shardTopic0 {.threadvar.}: string
shardTopic1 {.threadvar.}: string
asyncSetup:
listenPort = Port(0)
@ -29,17 +27,15 @@ suite "Peer Manager":
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()
clusterId = 1
shardTopic0 = "/waku/2/rs/" & $clusterId & "/0"
shardTopic1 = "/waku/2/rs/" & $clusterId & "/1"
asyncTest "light client is not disconnected":
# Given two nodes with the same shardId
let
server = newTestWakuNode(
serverKey, listenAddress, listenPort, pubsubTopics = @[shardTopic0]
serverKey, listenAddress, listenPort, clusterId = clusterId, shards = @[0]
)
client = newTestWakuNode(
clientKey, listenAddress, listenPort, pubsubTopics = @[shardTopic1]
clientKey, listenAddress, listenPort, clusterId = clusterId, shards = @[1]
)
# And both mount metadata and filter
@ -71,10 +67,10 @@ suite "Peer Manager":
# Given two nodes with the same shardId
let
server = newTestWakuNode(
serverKey, listenAddress, listenPort, pubsubTopics = @[shardTopic0]
serverKey, listenAddress, listenPort, clusterId = clusterId, shards = @[0]
)
client = newTestWakuNode(
clientKey, listenAddress, listenPort, pubsubTopics = @[shardTopic0]
clientKey, listenAddress, listenPort, clusterId = clusterId, shards = @[1]
)
# And both mount metadata and relay
@ -104,10 +100,10 @@ suite "Peer Manager":
# Given two nodes with different shardIds
let
server = newTestWakuNode(
serverKey, listenAddress, listenPort, pubsubTopics = @[shardTopic0]
serverKey, listenAddress, listenPort, clusterId = clusterId, shards = @[0]
)
client = newTestWakuNode(
clientKey, listenAddress, listenPort, pubsubTopics = @[shardTopic1]
clientKey, listenAddress, listenPort, clusterId = clusterId, shards = @[1]
)
# And both mount metadata and relay

View File

@ -96,9 +96,9 @@ proc getWakuRlnConfigOnChain*(
)
proc setupRelayWithOnChainRln*(
node: WakuNode, pubsubTopics: seq[string], wakuRlnConfig: WakuRlnConfig
node: WakuNode, shards: seq[RelayShard], wakuRlnConfig: WakuRlnConfig
) {.async.} =
await node.mountRelay(pubsubTopics)
await node.mountRelay(shards)
await node.mountRlnRelay(wakuRlnConfig)
suite "Waku RlnRelay - End to End - Static":
@ -223,7 +223,7 @@ suite "Waku RlnRelay - End to End - Static":
nodekey = generateSecp256k1Key()
node = newTestWakuNode(nodekey, parseIpAddress("0.0.0.0"), Port(0))
await node.mountRelay(@[DefaultPubsubTopic])
await node.mountRelay(@[DefaultRelayShard])
let contractAddress = await uploadRLNContract(EthClient)
let wakuRlnConfig = WakuRlnConfig(

View File

@ -418,7 +418,8 @@ procSuite "Peer Manager":
generateSecp256k1Key(),
ValidIpAddress.init("0.0.0.0"),
port,
pubsubTopics = @["/waku/2/rs/3/0"],
clusterId = 3,
shards = @[uint16(0)],
)
# same network
@ -426,13 +427,15 @@ procSuite "Peer Manager":
generateSecp256k1Key(),
ValidIpAddress.init("0.0.0.0"),
port,
pubsubTopics = @["/waku/2/rs/4/0"],
clusterId = 4,
shards = @[uint16(0)],
)
node3 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("0.0.0.0"),
port,
pubsubTopics = @["/waku/2/rs/4/0"],
clusterId = 4,
shards = @[uint16(0)],
)
node1.mountMetadata(3).expect("Mounted Waku Metadata")

View File

@ -25,8 +25,8 @@ procSuite "Relay (GossipSub) Peer Exchange":
newTestWakuNode(node2Key, listenAddress, port, sendSignedPeerRecord = true)
# When both client and server mount relay without a handler
await node1.mountRelay(@[DefaultPubsubTopic])
await node2.mountRelay(@[DefaultPubsubTopic], none(RoutingRecordsHandler))
await node1.mountRelay(@[DefaultRelayShard])
await node2.mountRelay(@[DefaultRelayShard], none(RoutingRecordsHandler))
# Then the relays are mounted without a handler
check:
@ -75,9 +75,9 @@ procSuite "Relay (GossipSub) Peer Exchange":
peerExchangeHandle: RoutingRecordsHandler = peerExchangeHandler
# Givem the nodes mount relay with a peer exchange handler
await node1.mountRelay(@[DefaultPubsubTopic], some(emptyPeerExchangeHandle))
await node2.mountRelay(@[DefaultPubsubTopic], some(emptyPeerExchangeHandle))
await node3.mountRelay(@[DefaultPubsubTopic], some(peerExchangeHandle))
await node1.mountRelay(@[DefaultRelayShard], some(emptyPeerExchangeHandle))
await node2.mountRelay(@[DefaultRelayShard], some(emptyPeerExchangeHandle))
await node3.mountRelay(@[DefaultRelayShard], some(peerExchangeHandle))
# Ensure that node1 prunes all peers after the first connection
node1.wakuRelay.parameters.dHigh = 1

View File

@ -280,7 +280,7 @@ suite "Waku ENR - Relay static sharding":
clusterId: uint16 = 22
shardId: uint16 = 1
let shard = RelayShard.staticSharding(clusterId, shardId)
let shard = RelayShard(clusterId: clusterId, shardId: shardId)
## When
let shardsTopics = RelayShards.init(clusterId, shardId).expect("Valid Shards")

View File

@ -28,7 +28,7 @@ suite "WakuNode":
node1 = newTestWakuNode(nodeKey1, parseIpAddress("0.0.0.0"), Port(61000))
nodeKey2 = generateSecp256k1Key()
node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), Port(61002))
pubSubTopic = "/waku/2/rs/0/0"
shard = DefaultRelayShard
contentTopic = ContentTopic("/waku/2/default-content/proto")
payload = "hello world".toBytes()
message = WakuMessage(payload: payload, contentTopic: contentTopic)
@ -36,13 +36,13 @@ suite "WakuNode":
# Setup node 1 with stable codec "/vac/waku/relay/2.0.0"
await node1.start()
await node1.mountRelay(@[pubSubTopic])
await node1.mountRelay(@[shard])
node1.wakuRelay.codec = "/vac/waku/relay/2.0.0"
# Setup node 2 with beta codec "/vac/waku/relay/2.0.0-beta2"
await node2.start()
await node2.mountRelay(@[pubSubTopic])
await node2.mountRelay(@[shard])
node2.wakuRelay.codec = "/vac/waku/relay/2.0.0-beta2"
check:
@ -58,15 +58,15 @@ suite "WakuNode":
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
topic == $shard
msg.contentTopic == contentTopic
msg.payload == payload
completionFut.complete(true)
node2.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
node2.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
await sleepAsync(2000.millis)
var res = await node1.publish(some(pubSubTopic), message)
var res = await node1.publish(some($shard), message)
assert res.isOk(), $res.error
await sleepAsync(2000.millis)

View File

@ -19,8 +19,8 @@ suite "WakuNode - Lightpush":
await allFutures(destNode.start(), bridgeNode.start(), lightNode.start())
await destNode.mountRelay(@[DefaultPubsubTopic])
await bridgeNode.mountRelay(@[DefaultPubsubTopic])
await destNode.mountRelay(@[DefaultRelayShard])
await bridgeNode.mountRelay(@[DefaultRelayShard])
await bridgeNode.mountLightPush()
lightNode.mountLightPushClient()

View File

@ -37,8 +37,8 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf =
nat: "any",
maxConnections: 50,
maxMessageSize: "1024 KiB",
clusterId: 0,
pubsubTopics: @["/waku/2/rs/0/0"],
clusterId: DefaultClusterId,
shards: @[DefaultShardId],
relay: true,
storeMessageDbUrl: "sqlite://store.sqlite3",
)
@ -63,8 +63,9 @@ proc newTestWakuNode*(
dns4DomainName = none(string),
discv5UdpPort = none(Port),
agentString = none(string),
pubsubTopics: seq[string] = @["/waku/2/rs/1/0"],
peerStoreCapacity = none(int),
clusterId = DefaultClusterId,
shards = @[DefaultShardId],
): WakuNode =
var resolvedExtIp = extIp
@ -77,14 +78,8 @@ proc newTestWakuNode*(
var conf = defaultTestWakuNodeConf()
let clusterId =
if pubsubTopics.len() > 0:
RelayShard.parse(pubsubTopics[0]).get().clusterId
else:
1.uint16
conf.clusterId = clusterId
conf.pubsubTopics = pubsubTopics
conf.shards = shards
if dns4DomainName.isSome() and extIp.isNone():
# If there's an error resolving the IP, an exception is thrown and test fails
@ -95,7 +90,7 @@ proc newTestWakuNode*(
let netConf = NetConfig.init(
bindIp = bindIp,
clusterId = clusterId,
clusterId = conf.clusterId,
bindPort = bindPort,
extIp = resolvedExtIp,
extPort = extPort,
@ -111,8 +106,10 @@ proc newTestWakuNode*(
var enrBuilder = EnrBuilder.init(nodeKey)
enrBuilder.withShardedTopics(pubsubTopics).isOkOr:
raise newException(Defect, "Invalid record: " & error)
enrBuilder.withWakuRelaySharding(
RelayShards(clusterId: conf.clusterId, shardIds: conf.shards)
).isOkOr:
raise newException(Defect, "Invalid record: " & $error)
enrBuilder.withIpAddressAndPorts(
ipAddr = netConf.enrIp, tcpPort = netConf.enrPort, udpPort = netConf.discv5UdpPort

View File

@ -136,7 +136,7 @@ suite "Waku Message - Content topics namespacing":
suite "Waku Message - Pub-sub topics namespacing":
test "Stringify static sharding pub-sub topic":
## Given
var shard = RelayShard.staticSharding(clusterId = 0, shardId = 2)
var shard = RelayShard(clusterId: 0, shardId: 2)
## When
let topic = $shard

View File

@ -10,10 +10,10 @@ suite "Static Sharding Functionality":
check:
shard.clusterId == 0
shard.shardId == 1
shard == RelayShard.staticSharding(0, 1)
shard == RelayShard(clusterId: 0, shardId: 1)
test "Pubsub Topic Naming Compliance":
let shard = RelayShard.staticSharding(0, 1)
let shard = RelayShard(clusterId: 0, shardId: 1)
check:
shard.clusterId == 0
shard.shardId == 1

View File

@ -54,16 +54,16 @@ suite "Autosharding":
# Then the generated shards are valid
check:
shard1 == RelayShard.staticSharding(ClusterId, 3)
shard2 == RelayShard.staticSharding(ClusterId, 3)
shard3 == RelayShard.staticSharding(ClusterId, 6)
shard4 == RelayShard.staticSharding(ClusterId, 6)
shard5 == RelayShard.staticSharding(ClusterId, 3)
shard6 == RelayShard.staticSharding(ClusterId, 3)
shard7 == RelayShard.staticSharding(ClusterId, 3)
shard8 == RelayShard.staticSharding(ClusterId, 3)
shard9 == RelayShard.staticSharding(ClusterId, 7)
shard10 == RelayShard.staticSharding(ClusterId, 3)
shard1 == RelayShard(clusterId: ClusterId, shardId: 3)
shard2 == RelayShard(clusterId: ClusterId, shardId: 3)
shard3 == RelayShard(clusterId: ClusterId, shardId: 6)
shard4 == RelayShard(clusterId: ClusterId, shardId: 6)
shard5 == RelayShard(clusterId: ClusterId, shardId: 3)
shard6 == RelayShard(clusterId: ClusterId, shardId: 3)
shard7 == RelayShard(clusterId: ClusterId, shardId: 3)
shard8 == RelayShard(clusterId: ClusterId, shardId: 3)
shard9 == RelayShard(clusterId: ClusterId, shardId: 7)
shard10 == RelayShard(clusterId: ClusterId, shardId: 3)
suite "getShard from NsContentTopic":
test "Generate Gen0 Shard with topic.generation==none":
@ -75,7 +75,7 @@ suite "Autosharding":
# Then the generated shard is valid
check:
shard.value() == RelayShard.staticSharding(ClusterId, 3)
shard.value() == RelayShard(clusterId: ClusterId, shardId: 3)
test "Generate Gen0 Shard with topic.generation==0":
let sharding =
@ -85,7 +85,7 @@ suite "Autosharding":
# Then the generated shard is valid
check:
shard.value() == RelayShard.staticSharding(ClusterId, 3)
shard.value() == RelayShard(clusterId: ClusterId, shardId: 3)
test "Generate Gen0 Shard with topic.generation==other":
let sharding =
@ -106,7 +106,7 @@ suite "Autosharding":
# Then the generated shard is valid
check:
shard.value() == RelayShard.staticSharding(ClusterId, 3)
shard.value() == RelayShard(clusterId: ClusterId, shardId: 3)
test "Generate Gen0 Shard with topic.generation==0":
let sharding =
@ -116,7 +116,7 @@ suite "Autosharding":
# Then the generated shard is valid
check:
shard.value() == RelayShard.staticSharding(ClusterId, 3)
shard.value() == RelayShard(clusterId: ClusterId, shardId: 3)
test "Generate Gen0 Shard with topic.generation==other":
let sharding =

View File

@ -63,19 +63,19 @@ suite "WakuNode - Relay":
node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), Port(0))
nodeKey3 = generateSecp256k1Key()
node3 = newTestWakuNode(nodeKey3, parseIpAddress("0.0.0.0"), Port(0))
pubSubTopic = "test"
shard = DefaultRelayShard
contentTopic = ContentTopic("/waku/2/default-content/proto")
payload = "hello world".toBytes()
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
await node1.mountRelay(@[pubSubTopic])
await node1.mountRelay(@[shard])
await node2.start()
await node2.mountRelay(@[pubSubTopic])
await node2.mountRelay(@[shard])
await node3.start()
await node3.mountRelay(@[pubSubTopic])
await node3.mountRelay(@[shard])
await allFutures(
node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]),
@ -87,15 +87,15 @@ suite "WakuNode - Relay":
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
topic == $shard
msg.contentTopic == contentTopic
msg.payload == payload
completionFut.complete(true)
node3.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
node3.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
await sleepAsync(500.millis)
var res = await node1.publish(some(pubSubTopic), message)
var res = await node1.publish(some($shard), message)
assert res.isOk(), $res.error
## Then
@ -124,7 +124,7 @@ suite "WakuNode - Relay":
nodeKey3 = generateSecp256k1Key()
node3 = newTestWakuNode(nodeKey3, parseIpAddress("0.0.0.0"), Port(0))
pubSubTopic = "test"
shard = DefaultRelayShard
contentTopic1 = ContentTopic("/waku/2/default-content/proto")
payload = "hello world".toBytes()
message1 = WakuMessage(payload: payload, contentTopic: contentTopic1)
@ -135,13 +135,13 @@ suite "WakuNode - Relay":
# start all the nodes
await node1.start()
await node1.mountRelay(@[pubSubTopic])
await node1.mountRelay(@[shard])
await node2.start()
await node2.mountRelay(@[pubSubTopic])
await node2.mountRelay(@[shard])
await node3.start()
await node3.mountRelay(@[pubSubTopic])
await node3.mountRelay(@[shard])
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
@ -155,7 +155,7 @@ suite "WakuNode - Relay":
): Future[ValidationResult] {.async.} =
## the validator that only allows messages with contentTopic1 to be relayed
check:
topic == pubSubTopic
topic == $shard
# only relay messages with contentTopic1
if msg.contentTopic != contentTopic1:
@ -172,22 +172,22 @@ suite "WakuNode - Relay":
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
topic == $shard
# check that only messages with contentTopic1 is relayed (but not contentTopic2)
msg.contentTopic == contentTopic1
# relay handler is called
completionFut.complete(true)
node3.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
node3.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
await sleepAsync(500.millis)
var res = await node1.publish(some(pubSubTopic), message1)
var res = await node1.publish(some($shard), message1)
assert res.isOk(), $res.error
await sleepAsync(500.millis)
# message2 never gets relayed because of the validator
res = await node1.publish(some(pubSubTopic), message2)
res = await node1.publish(some($shard), message2)
assert res.isOk(), $res.error
await sleepAsync(500.millis)
@ -258,16 +258,16 @@ suite "WakuNode - Relay":
wsBindPort = Port(0),
wsEnabled = true,
)
pubSubTopic = "test"
shard = DefaultRelayShard
contentTopic = ContentTopic("/waku/2/default-content/proto")
payload = "hello world".toBytes()
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
await node1.mountRelay(@[pubSubTopic])
await node1.mountRelay(@[shard])
await node2.start()
await node2.mountRelay(@[pubSubTopic])
await node2.mountRelay(@[shard])
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
@ -276,15 +276,15 @@ suite "WakuNode - Relay":
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
topic == $shard
msg.contentTopic == contentTopic
msg.payload == payload
completionFut.complete(true)
node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
await sleepAsync(500.millis)
let res = await node2.publish(some(pubSubTopic), message)
let res = await node2.publish(some($shard), message)
assert res.isOk(), $res.error
await sleepAsync(500.millis)
@ -306,16 +306,16 @@ suite "WakuNode - Relay":
)
nodeKey2 = generateSecp256k1Key()
node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), bindPort = Port(0))
pubSubTopic = "test"
shard = DefaultRelayShard
contentTopic = ContentTopic("/waku/2/default-content/proto")
payload = "hello world".toBytes()
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
await node1.mountRelay(@[pubSubTopic])
await node1.mountRelay(@[shard])
await node2.start()
await node2.mountRelay(@[pubSubTopic])
await node2.mountRelay(@[shard])
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
@ -324,15 +324,15 @@ suite "WakuNode - Relay":
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
topic == $shard
msg.contentTopic == contentTopic
msg.payload == payload
completionFut.complete(true)
node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
await sleepAsync(500.millis)
let res = await node2.publish(some(pubSubTopic), message)
let res = await node2.publish(some($shard), message)
assert res.isOk(), $res.error
await sleepAsync(500.millis)
@ -354,16 +354,16 @@ suite "WakuNode - Relay":
wsBindPort = Port(0),
wsEnabled = true,
)
pubSubTopic = "test"
shard = DefaultRelayShard
contentTopic = ContentTopic("/waku/2/default-content/proto")
payload = "hello world".toBytes()
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
await node1.mountRelay(@[pubSubTopic])
await node1.mountRelay(@[shard])
await node2.start()
await node2.mountRelay(@[pubSubTopic])
await node2.mountRelay(@[shard])
#delete websocket peer address
# TODO: a better way to find the index - this is too brittle
@ -376,15 +376,15 @@ suite "WakuNode - Relay":
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
topic == $shard
msg.contentTopic == contentTopic
msg.payload == payload
completionFut.complete(true)
node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
await sleepAsync(500.millis)
let res = await node2.publish(some(pubSubTopic), message)
let res = await node2.publish(some($shard), message)
assert res.isOk(), $res.error
await sleepAsync(500.millis)
@ -408,16 +408,16 @@ suite "WakuNode - Relay":
)
nodeKey2 = generateSecp256k1Key()
node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), bindPort = Port(0))
pubSubTopic = "test"
shard = DefaultRelayShard
contentTopic = ContentTopic("/waku/2/default-content/proto")
payload = "hello world".toBytes()
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
await node1.mountRelay(@[pubSubTopic])
await node1.mountRelay(@[shard])
await node2.start()
await node2.mountRelay(@[pubSubTopic])
await node2.mountRelay(@[shard])
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
@ -426,15 +426,15 @@ suite "WakuNode - Relay":
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
topic == $shard
msg.contentTopic == contentTopic
msg.payload == payload
completionFut.complete(true)
node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
await sleepAsync(500.millis)
let res = await node2.publish(some(pubSubTopic), message)
let res = await node2.publish(some($shard), message)
assert res.isOk(), $res.error
await sleepAsync(500.millis)
@ -466,16 +466,16 @@ suite "WakuNode - Relay":
)
let
pubSubTopic = "test"
shard = DefaultRelayShard
contentTopic = ContentTopic("/waku/2/default-content/proto")
payload = "hello world".toBytes()
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
await node1.mountRelay(@[pubSubTopic])
await node1.mountRelay(@[shard])
await node2.start()
await node2.mountRelay(@[pubSubTopic])
await node2.mountRelay(@[shard])
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
@ -484,15 +484,15 @@ suite "WakuNode - Relay":
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
check:
topic == pubSubTopic
topic == $shard
msg.contentTopic == contentTopic
msg.payload == payload
completionFut.complete(true)
node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
await sleepAsync(500.millis)
let res = await node2.publish(some(pubSubTopic), message)
let res = await node2.publish(some($shard), message)
assert res.isOk(), $res.error
await sleepAsync(500.millis)

View File

@ -53,9 +53,9 @@ proc setupRln*(node: WakuNode, identifier: uint) {.async.} =
)
proc setupRelayWithRln*(
node: WakuNode, identifier: uint, pubsubTopics: seq[string]
node: WakuNode, identifier: uint, shards: seq[RelayShard]
) {.async.} =
await node.mountRelay(pubsubTopics)
await node.mountRelay(shards)
await setupRln(node, identifier)
proc subscribeToContentTopicWithHandler*(

View File

@ -50,7 +50,7 @@ procSuite "WakuNode - RLN relay":
# set up three nodes
# node1
await node1.mountRelay(@[DefaultPubsubTopic])
await node1.mountRelay(@[DefaultRelayShard])
# mount rlnrelay in off-chain mode
let wakuRlnConfig1 = WakuRlnConfig(
@ -66,7 +66,7 @@ procSuite "WakuNode - RLN relay":
await node1.start()
# node 2
await node2.mountRelay(@[DefaultPubsubTopic])
await node2.mountRelay(@[DefaultRelayShard])
# mount rlnrelay in off-chain mode
let wakuRlnConfig2 = WakuRlnConfig(
rlnRelayDynamic: false,
@ -81,7 +81,7 @@ procSuite "WakuNode - RLN relay":
await node2.start()
# node 3
await node3.mountRelay(@[DefaultPubsubTopic])
await node3.mountRelay(@[DefaultRelayShard])
let wakuRlnConfig3 = WakuRlnConfig(
rlnRelayDynamic: false,
@ -131,18 +131,15 @@ procSuite "WakuNode - RLN relay":
await node2.stop()
await node3.stop()
asyncTest "testing rln-relay is applied in all rln pubsub/content topics":
asyncTest "testing rln-relay is applied in all rln shards/content topics":
# create 3 nodes
let nodes = toSeq(0 ..< 3).mapIt(
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
)
await allFutures(nodes.mapIt(it.start()))
let pubsubTopics =
@[
PubsubTopic("/waku/2/pubsubtopic-a/proto"),
PubsubTopic("/waku/2/pubsubtopic-b/proto"),
]
let shards =
@[RelayShard(clusterId: 0, shardId: 0), RelayShard(clusterId: 0, shardId: 1)]
let contentTopics =
@[
ContentTopic("/waku/2/content-topic-a/proto"),
@ -150,7 +147,7 @@ procSuite "WakuNode - RLN relay":
]
# set up three nodes
await allFutures(nodes.mapIt(it.mountRelay(pubsubTopics)))
await allFutures(nodes.mapIt(it.mountRelay(shards)))
# mount rlnrelay in off-chain mode
for index, node in nodes:
@ -177,14 +174,14 @@ procSuite "WakuNode - RLN relay":
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
info "relayHandler. The received topic:", topic
if topic == pubsubTopics[0]:
if topic == $shards[0]:
rxMessagesTopic1 = rxMessagesTopic1 + 1
elif topic == pubsubTopics[1]:
elif topic == $shards[1]:
rxMessagesTopic2 = rxMessagesTopic2 + 1
# mount the relay handlers
nodes[2].subscribe((kind: PubsubSub, topic: pubsubTopics[0]), some(relayHandler))
nodes[2].subscribe((kind: PubsubSub, topic: pubsubTopics[1]), some(relayHandler))
nodes[2].subscribe((kind: PubsubSub, topic: $shards[0]), some(relayHandler))
nodes[2].subscribe((kind: PubsubSub, topic: $shards[1]), some(relayHandler))
await sleepAsync(1000.millis)
# generate some messages with rln proofs first. generating
@ -214,9 +211,9 @@ procSuite "WakuNode - RLN relay":
# publish 3 messages from node[0] (last 2 are spam, window is 10 secs)
# publish 3 messages from node[1] (last 2 are spam, window is 10 secs)
for msg in messages1:
discard await nodes[0].publish(some(pubsubTopics[0]), msg)
discard await nodes[0].publish(some($shards[0]), msg)
for msg in messages2:
discard await nodes[1].publish(some(pubsubTopics[1]), msg)
discard await nodes[1].publish(some($shards[1]), msg)
# wait for gossip to propagate
await sleepAsync(5000.millis)
@ -245,7 +242,7 @@ procSuite "WakuNode - RLN relay":
# set up three nodes
# node1
await node1.mountRelay(@[DefaultPubsubTopic])
await node1.mountRelay(@[DefaultRelayShard])
# mount rlnrelay in off-chain mode
let wakuRlnConfig1 = WakuRlnConfig(
@ -261,7 +258,7 @@ procSuite "WakuNode - RLN relay":
await node1.start()
# node 2
await node2.mountRelay(@[DefaultPubsubTopic])
await node2.mountRelay(@[DefaultRelayShard])
# mount rlnrelay in off-chain mode
let wakuRlnConfig2 = WakuRlnConfig(
rlnRelayDynamic: false,
@ -276,7 +273,7 @@ procSuite "WakuNode - RLN relay":
await node2.start()
# node 3
await node3.mountRelay(@[DefaultPubsubTopic])
await node3.mountRelay(@[DefaultRelayShard])
let wakuRlnConfig3 = WakuRlnConfig(
rlnRelayDynamic: false,
@ -361,7 +358,7 @@ procSuite "WakuNode - RLN relay":
# set up three nodes
# node1
await node1.mountRelay(@[DefaultPubsubTopic])
await node1.mountRelay(@[DefaultRelayShard])
# mount rlnrelay in off-chain mode
let wakuRlnConfig1 = WakuRlnConfig(
@ -377,7 +374,7 @@ procSuite "WakuNode - RLN relay":
await node1.start()
# node 2
await node2.mountRelay(@[DefaultPubsubTopic])
await node2.mountRelay(@[DefaultRelayShard])
# mount rlnrelay in off-chain mode
let wakuRlnConfig2 = WakuRlnConfig(
@ -392,7 +389,7 @@ procSuite "WakuNode - RLN relay":
await node2.start()
# node 3
await node3.mountRelay(@[DefaultPubsubTopic])
await node3.mountRelay(@[DefaultRelayShard])
# mount rlnrelay in off-chain mode
let wakuRlnConfig3 = WakuRlnConfig(
@ -485,7 +482,7 @@ procSuite "WakuNode - RLN relay":
# Given two nodes
let
contentTopic = ContentTopic("/waku/2/default-content/proto")
pubsubTopicSeq = @[DefaultPubsubTopic]
shardSeq = @[DefaultRelayShard]
nodeKey1 = generateSecp256k1Key()
node1 = newTestWakuNode(nodeKey1, parseIpAddress("0.0.0.0"), Port(0))
nodeKey2 = generateSecp256k1Key()
@ -493,12 +490,12 @@ procSuite "WakuNode - RLN relay":
epochSizeSec: uint64 = 5 # This means rlnMaxEpochGap = 4
# Given both nodes mount relay and rlnrelay
await node1.mountRelay(pubsubTopicSeq)
await node1.mountRelay(shardSeq)
let wakuRlnConfig1 = buildWakuRlnConfig(1, epochSizeSec, "wakunode_10")
await node1.mountRlnRelay(wakuRlnConfig1)
# Mount rlnrelay in node2 in off-chain mode
await node2.mountRelay(@[DefaultPubsubTopic])
await node2.mountRelay(@[DefaultRelayShard])
let wakuRlnConfig2 = buildWakuRlnConfig(2, epochSizeSec, "wakunode_11")
await node2.mountRlnRelay(wakuRlnConfig2)
@ -613,7 +610,7 @@ procSuite "WakuNode - RLN relay":
# Given two nodes
let
contentTopic = ContentTopic("/waku/2/default-content/proto")
pubsubTopicSeq = @[DefaultPubsubTopic]
shardSeq = @[DefaultRelayShard]
nodeKey1 = generateSecp256k1Key()
node1 = newTestWakuNode(nodeKey1, parseIpAddress("0.0.0.0"), Port(0))
nodeKey2 = generateSecp256k1Key()
@ -622,12 +619,12 @@ procSuite "WakuNode - RLN relay":
# Given both nodes mount relay and rlnrelay
# Mount rlnrelay in node1 in off-chain mode
await node1.mountRelay(pubsubTopicSeq)
await node1.mountRelay(shardSeq)
let wakuRlnConfig1 = buildWakuRlnConfig(1, epochSizeSec, "wakunode_10")
await node1.mountRlnRelay(wakuRlnConfig1)
# Mount rlnrelay in node2 in off-chain mode
await node2.mountRelay(@[DefaultPubsubTopic])
await node2.mountRelay(@[DefaultRelayShard])
let wakuRlnConfig2 = buildWakuRlnConfig(2, epochSizeSec, "wakunode_11")
await node2.mountRlnRelay(wakuRlnConfig2)

View File

@ -32,9 +32,9 @@ proc setupStaticRln*(
)
proc setupRelayWithStaticRln*(
node: WakuNode, identifier: uint, pubsubTopics: seq[string]
node: WakuNode, identifier: uint, shards: seq[RelayShard]
) {.async.} =
await node.mountRelay(pubsubTopics)
await node.mountRelay(shards)
await setupStaticRln(node, identifier)
proc subscribeCompletionHandler*(node: WakuNode, pubsubTopic: string): Future[bool] =

View File

@ -54,16 +54,16 @@ suite "Waku v2 Rest API - Relay":
installRelayApiHandlers(restServer.router, node, cache)
restServer.start()
let pubSubTopics =
@[
PubSubTopic("pubsub-topic-1"),
PubSubTopic("pubsub-topic-2"),
PubSubTopic("pubsub-topic-3"),
]
let
shard0 = RelayShard(clusterId: DefaultClusterId, shardId: 0)
shard1 = RelayShard(clusterId: DefaultClusterId, shardId: 1)
shard2 = RelayShard(clusterId: DefaultClusterId, shardId: 2)
let shards = @[$shard0, $shard1, $shard2]
# When
let client = newRestHttpClient(initTAddress(restAddress, restPort))
let response = await client.relayPostSubscriptionsV1(pubSubTopics)
let response = await client.relayPostSubscriptionsV1(shards)
# Then
check:
@ -72,12 +72,12 @@ suite "Waku v2 Rest API - Relay":
response.data == "OK"
check:
cache.isPubsubSubscribed("pubsub-topic-1")
cache.isPubsubSubscribed("pubsub-topic-2")
cache.isPubsubSubscribed("pubsub-topic-3")
cache.isPubsubSubscribed($shard0)
cache.isPubsubSubscribed($shard1)
cache.isPubsubSubscribed($shard2)
check:
toSeq(node.wakuRelay.subscribedTopics).len == pubSubTopics.len
toSeq(node.wakuRelay.subscribedTopics).len == shards.len
await restServer.stop()
await restServer.closeWait()
@ -87,9 +87,15 @@ suite "Waku v2 Rest API - Relay":
# Given
let node = testWakuNode()
await node.start()
await node.mountRelay(
@["pubsub-topic-1", "pubsub-topic-2", "pubsub-topic-3", "pubsub-topic-x"]
)
let
shard0 = RelayShard(clusterId: DefaultClusterId, shardId: 0)
shard1 = RelayShard(clusterId: DefaultClusterId, shardId: 1)
shard2 = RelayShard(clusterId: DefaultClusterId, shardId: 2)
shard3 = RelayShard(clusterId: DefaultClusterId, shardId: 3)
shard4 = RelayShard(clusterId: DefaultClusterId, shardId: 4)
await node.mountRelay(@[shard0, shard1, shard2, shard3])
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
@ -98,25 +104,19 @@ suite "Waku v2 Rest API - Relay":
restPort = restServer.httpServer.address.port # update with bound port for client use
let cache = MessageCache.init()
cache.pubsubSubscribe("pubsub-topic-1")
cache.pubsubSubscribe("pubsub-topic-2")
cache.pubsubSubscribe("pubsub-topic-3")
cache.pubsubSubscribe("pubsub-topic-x")
cache.pubsubSubscribe($shard0)
cache.pubsubSubscribe($shard1)
cache.pubsubSubscribe($shard2)
cache.pubsubSubscribe($shard3)
installRelayApiHandlers(restServer.router, node, cache)
restServer.start()
let pubSubTopics =
@[
PubSubTopic("pubsub-topic-1"),
PubSubTopic("pubsub-topic-2"),
PubSubTopic("pubsub-topic-3"),
PubSubTopic("pubsub-topic-y"),
]
let shards = @[$shard0, $shard1, $shard2, $shard4]
# When
let client = newRestHttpClient(initTAddress(restAddress, restPort))
let response = await client.relayDeleteSubscriptionsV1(pubSubTopics)
let response = await client.relayDeleteSubscriptionsV1(shards)
# Then
check:
@ -125,16 +125,16 @@ suite "Waku v2 Rest API - Relay":
response.data == "OK"
check:
not cache.isPubsubSubscribed("pubsub-topic-1")
not node.wakuRelay.isSubscribed("pubsub-topic-1")
not cache.isPubsubSubscribed("pubsub-topic-2")
not node.wakuRelay.isSubscribed("pubsub-topic-2")
not cache.isPubsubSubscribed("pubsub-topic-3")
not node.wakuRelay.isSubscribed("pubsub-topic-3")
cache.isPubsubSubscribed("pubsub-topic-x")
node.wakuRelay.isSubscribed("pubsub-topic-x")
not cache.isPubsubSubscribed("pubsub-topic-y")
not node.wakuRelay.isSubscribed("pubsub-topic-y")
not cache.isPubsubSubscribed($shard0)
not node.wakuRelay.isSubscribed($shard0)
not cache.isPubsubSubscribed($shard1)
not node.wakuRelay.isSubscribed($shard1)
not cache.isPubsubSubscribed($shard2)
not node.wakuRelay.isSubscribed($shard2)
cache.isPubsubSubscribed($shard3)
node.wakuRelay.isSubscribed($shard3)
not cache.isPubsubSubscribed($shard4)
not node.wakuRelay.isSubscribed($shard4)
await restServer.stop()
await restServer.closeWait()

View File

@ -312,14 +312,23 @@ type WakuNodeConf* = object
name: "keep-alive"
.}: bool
# If numShardsInNetwork is not set, we use the number of shards configured as numShardsInNetwork
numShardsInNetwork* {.
desc: "Number of shards in the network",
defaultValue: 0,
name: "num-shards-in-network"
.}: uint32
pubsubTopics* {.
desc: "Default pubsub topic to subscribe to. Argument may be repeated.",
desc:
"Deprecated. Default pubsub topic to subscribe to. Argument may be repeated.",
defaultValue: @[],
name: "pubsub-topic"
.}: seq[string]
shards* {.
desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.",
desc:
"Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.",
defaultValue:
@[
uint16(0),

View File

@ -28,29 +28,8 @@ proc enrConfiguration*(
enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs)
var shards = newSeq[uint16]()
let shardsOpt = topicsToRelayShards(conf.pubsubTopics).valueOr:
error "failed to parse pubsub topic, please format according to static shard specification",
error = $error
return err("failed to parse pubsub topic: " & $error)
if shardsOpt.isSome():
let relayShards = shardsOpt.get()
if relayShards.clusterid != conf.clusterId:
error "pubsub topic corresponds to different shard than configured",
nodeCluster = conf.clusterId, pubsubCluster = relayShards.clusterid
return err("pubsub topic corresponds to different shard than configured")
shards = relayShards.shardIds
elif conf.shards.len > 0:
shards = toSeq(conf.shards.mapIt(uint16(it)))
else:
info "no pubsub topics specified"
enrBuilder.withWakuRelaySharding(
RelayShards(clusterId: conf.clusterId, shardIds: shards)
RelayShards(clusterId: conf.clusterId, shardIds: conf.shards)
).isOkOr:
return err("could not initialize ENR with shards")

View File

@ -10,7 +10,7 @@ type ClusterConf* = object
rlnRelayBandwidthThreshold*: int
rlnEpochSizeSec*: uint64
rlnRelayUserMessageLimit*: uint64
pubsubTopics*: seq[string]
numShardsInNetwork*: uint32
discv5Discovery*: bool
discv5BootstrapNodes*: seq[string]
@ -28,11 +28,7 @@ proc TheWakuNetworkConf*(T: type ClusterConf): ClusterConf =
rlnRelayBandwidthThreshold: 0,
rlnEpochSizeSec: 600,
rlnRelayUserMessageLimit: 100,
pubsubTopics:
@[
"/waku/2/rs/1/0", "/waku/2/rs/1/1", "/waku/2/rs/1/2", "/waku/2/rs/1/3",
"/waku/2/rs/1/4", "/waku/2/rs/1/5", "/waku/2/rs/1/6", "/waku/2/rs/1/7",
],
numShardsInNetwork: 8,
discv5Discovery: true,
discv5BootstrapNodes:
@[

View File

@ -113,6 +113,13 @@ proc initNode(
## Mount protocols
proc getNumShardsInNetwork*(conf: WakuNodeConf): uint32 =
if conf.numShardsInNetwork != 0:
return conf.numShardsInNetwork
# If conf.numShardsInNetwork is not set, use 1024 - the maximum possible as per the static sharding spec
# https://github.com/waku-org/specs/blob/master/standards/core/relay-sharding.md#static-sharding
return uint32(MaxShardIndex + 1)
proc setupProtocols(
node: WakuNode, conf: WakuNodeConf, nodeKey: crypto.PrivateKey
): Future[Result[void, string]] {.async.} =
@ -127,7 +134,14 @@ proc setupProtocols(
node.mountMetadata(conf.clusterId).isOkOr:
return err("failed to mount waku metadata protocol: " & error)
node.mountSharding(conf.clusterId, uint32(conf.pubsubTopics.len)).isOkOr:
# If conf.numShardsInNetwork is not set, use the number of shards configured as numShardsInNetwork
let numShardsInNetwork = getNumShardsInNetwork(conf)
if conf.numShardsInNetwork == 0:
warn "Number of shards in network not configured, setting it to",
numShardsInNetwork = $numShardsInNetwork
node.mountSharding(conf.clusterId, numShardsInNetwork).isOkOr:
return err("failed to mount waku sharding: " & error)
# Mount relay on all nodes
@ -151,14 +165,20 @@ proc setupProtocols(
peerExchangeHandler = some(handlePeerExchange)
let shards =
conf.contentTopics.mapIt(node.wakuSharding.getShard(it).expect("Valid Shard"))
var autoShards: seq[RelayShard]
for contentTopic in conf.contentTopics:
let shard = node.wakuSharding.getShard(contentTopic).valueOr:
return err("Could not parse content topic: " & error)
autoShards.add(shard)
debug "Shards created from content topics",
contentTopics = conf.contentTopics, shards = shards
contentTopics = conf.contentTopics, shards = autoShards
let confShards =
conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it)))
let shards = confShards & autoShards
if conf.relay:
let pubsubTopics = conf.pubsubTopics & shards
let parsedMaxMsgSize = parseMsgSize(conf.maxMessageSize).valueOr:
return err("failed to parse 'max-num-bytes-msg-size' param: " & $error)
@ -166,10 +186,7 @@ proc setupProtocols(
try:
await mountRelay(
node,
pubsubTopics,
peerExchangeHandler = peerExchangeHandler,
int(parsedMaxMsgSize),
node, shards, peerExchangeHandler = peerExchangeHandler, int(parsedMaxMsgSize)
)
except CatchableError:
return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg())

View File

@ -68,7 +68,7 @@ proc logConfig(conf: WakuNodeConf) =
info "Configuration. Network", cluster = conf.clusterId, maxPeers = conf.maxRelayPeers
for shard in conf.pubsubTopics:
for shard in conf.shards:
info "Configuration. Shards", shard = shard
for i in conf.discv5BootstrapNodes:
@ -86,6 +86,19 @@ proc logConfig(conf: WakuNodeConf) =
func version*(waku: Waku): string =
waku.version
proc validateShards(conf: WakuNodeConf): Result[void, string] =
let numShardsInNetwork = getNumShardsInNetwork(conf)
for shard in conf.shards:
if shard >= numShardsInNetwork:
let msg =
"validateShards invalid shard: " & $shard & " when numShardsInNetwork: " &
$numShardsInNetwork # fmt doesn't work
error "validateShards failed", error = msg
return err(msg)
return ok()
## Initialisation
proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] =
@ -94,16 +107,35 @@ proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] =
logging.setupLog(conf.logLevel, conf.logFormat)
# TODO: remove after pubsubtopic config gets removed
var shards = newSeq[uint16]()
if conf.pubsubTopics.len > 0:
let shardsRes = topicsToRelayShards(conf.pubsubTopics)
if shardsRes.isErr():
error "failed to parse pubsub topic, please format according to static shard specification",
error = shardsRes.error
return err("failed to parse pubsub topic: " & $shardsRes.error)
let shardsOpt = shardsRes.get()
if shardsOpt.isSome():
let relayShards = shardsOpt.get()
if relayShards.clusterId != conf.clusterId:
error "clusterId of the pubsub topic should match the node's cluster. e.g. --pubsub-topic=/waku/2/rs/22/1 and --cluster-id=22",
nodeCluster = conf.clusterId, pubsubCluster = relayShards.clusterId
return err(
"clusterId of the pubsub topic should match the node's cluster. e.g. --pubsub-topic=/waku/2/rs/22/1 and --cluster-id=22"
)
for shard in relayShards.shardIds:
shards.add(shard)
confCopy.shards = shards
case confCopy.clusterId
# cluster-id=1 (aka The Waku Network)
of 1:
let twnClusterConf = ClusterConf.TheWakuNetworkConf()
if len(confCopy.shards) != 0:
confCopy.pubsubTopics =
confCopy.shards.mapIt(twnClusterConf.pubsubTopics[it.uint16])
else:
confCopy.pubsubTopics = twnClusterConf.pubsubTopics
# Override configuration
confCopy.maxMessageSize = twnClusterConf.maxMessageSize
@ -117,6 +149,7 @@ proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] =
confCopy.discv5BootstrapNodes & twnClusterConf.discv5BootstrapNodes
confCopy.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec
confCopy.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit
confCopy.numShardsInNetwork = twnClusterConf.numShardsInNetwork
# Only set rlnRelay to true if relay is configured
if confCopy.relay:
@ -127,6 +160,11 @@ proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] =
info "Running nwaku node", version = git_version
logConfig(confCopy)
let validateShardsRes = validateShards(confCopy)
if validateShardsRes.isErr():
error "Failed validating shards", error = $validateShardsRes.error
return err("Failed validating shards: " & $validateShardsRes.error)
if not confCopy.nodekey.isSome():
let keyRes = crypto.PrivateKey.random(Secp256k1, rng[])
if keyRes.isErr():

View File

@ -321,7 +321,7 @@ proc subscribe*(
error "Autosharding error", error = error
return
(shard, some(subscription.topic))
($shard, some(subscription.topic))
of PubsubSub:
(subscription.topic, none(ContentTopic))
else:
@ -356,7 +356,7 @@ proc unsubscribe*(node: WakuNode, subscription: SubscriptionEvent) =
error "Autosharding error", error = error
return
(shard, some(subscription.topic))
($shard, some(subscription.topic))
of PubsubUnsub:
(subscription.topic, none(ContentTopic))
else:
@ -437,7 +437,7 @@ proc startRelay*(node: WakuNode) {.async.} =
proc mountRelay*(
node: WakuNode,
pubsubTopics: seq[string] = @[],
shards: seq[RelayShard] = @[],
peerExchangeHandler = none(RoutingRecordsHandler),
maxMessageSize = int(DefaultMaxWakuMessageSize),
) {.async, gcsafe.} =
@ -466,11 +466,11 @@ proc mountRelay*(
node.switch.mount(node.wakuRelay, protocolMatcher(WakuRelayCodec))
info "relay mounted successfully", pubsubTopics = pubsubTopics
info "relay mounted successfully", shards = shards
# Subscribe to topics
for pubsubTopic in pubsubTopics:
node.subscribe((kind: PubsubSub, topic: pubsubTopic))
# Subscribe to shards
for shard in shards:
node.subscribe((kind: PubsubSub, topic: $shard))
## Waku filter

View File

@ -132,7 +132,8 @@ proc startRestServerProtocolSupport*(
let handler = messageCacheHandler(cache)
for pubsubTopic in conf.pubsubTopics:
for shard in conf.shards:
let pubsubTopic = $RelayShard(clusterId: conf.clusterId, shardId: shard)
cache.pubsubSubscribe(pubsubTopic)
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))

View File

@ -282,7 +282,7 @@ proc installRelayApiHandlers*(
debug "Publishing message",
contentTopic = message.contentTopic, rln = not node.wakuRlnRelay.isNil()
var publishFut = node.publish(some(pubsubTopic), message)
var publishFut = node.publish(some($pubsubTopic), message)
if not await publishFut.withTimeout(futTimeout):
return RestApiResponse.internalServerError("Failed to publish: timedout")

View File

@ -13,16 +13,16 @@ export parsing
type PubsubTopic* = string
const DefaultPubsubTopic* = PubsubTopic("/waku/2/rs/0/0")
## Namespaced pub-sub topic
## Relay Shard
type RelayShard* = object
clusterId*: uint16
shardId*: uint16
proc staticSharding*(T: type RelayShard, clusterId, shardId: uint16): T =
return RelayShard(clusterId: clusterId, shardId: shardId)
const DefaultShardId* = uint16(0)
const DefaultClusterId* = uint16(0)
const DefaultRelayShard* =
RelayShard(clusterId: DefaultClusterId, shardId: DefaultShardId)
# Serialization
@ -31,6 +31,8 @@ proc `$`*(topic: RelayShard): string =
## in the format `/waku/2/rs/<cluster-id>/<shard-id>
return "/waku/2/rs/" & $topic.clusterId & "/" & $topic.shardId
const DefaultPubsubTopic* = $DefaultRelayShard
# Deserialization
const
@ -67,7 +69,7 @@ proc parseStaticSharding*(
ParsingError.invalidFormat($err)
)
ok(RelayShard.staticSharding(clusterId, shardId))
ok(RelayShard(clusterId: clusterId, shardId: shardId))
proc parse*(T: type RelayShard, topic: PubsubTopic): ParsingResult[RelayShard] =
## Splits a namespaced topic string into its constituent parts.

View File

@ -27,7 +27,7 @@ proc getGenZeroShard*(s: Sharding, topic: NsContentTopic, count: int): RelayShar
# This is equilavent to modulo shard count but faster
let shard = hashValue and uint64((count - 1))
RelayShard.staticSharding(s.clusterId, uint16(shard))
RelayShard(clusterId: s.clusterId, shardId: uint16(shard))
proc getShard*(s: Sharding, topic: NsContentTopic): Result[RelayShard, string] =
## Compute the (pubsub topic) shard to use for this content topic.
@ -42,13 +42,13 @@ proc getShard*(s: Sharding, topic: NsContentTopic): Result[RelayShard, string] =
else:
return err("Generation > 0 are not supported yet")
proc getShard*(s: Sharding, topic: ContentTopic): Result[PubsubTopic, string] =
proc getShard*(s: Sharding, topic: ContentTopic): Result[RelayShard, string] =
let parsedTopic = NsContentTopic.parse(topic).valueOr:
return err($error)
let shard = ?s.getShard(parsedTopic)
ok($shard)
ok(shard)
proc parseSharding*(
s: Sharding,
@ -130,7 +130,7 @@ proc parseSharding*(
var list = newSeq[(RelayShard, float64)](shardCount)
for (shard, weight) in shardsNWeights:
let pubsub = RelayShard.staticSharding(ClusterId, uint16(shard))
let pubsub = RelayShard(clusterId: ClusterId, shardId: uint16(shard))
let clusterBytes = toBytesBE(uint16(ClusterId))
let shardBytes = toBytesBE(uint16(shard))

View File

@ -13,7 +13,7 @@ import ../common/enr, ../waku_core
logScope:
topics = "waku enr sharding"
const MaxShardIndex: uint16 = 1023
const MaxShardIndex*: uint16 = 1023
const
ShardingIndicesListEnrField* = "rs"
@ -25,7 +25,7 @@ type RelayShards* = object
shardIds*: seq[uint16]
func topics*(rs: RelayShards): seq[RelayShard] =
rs.shardIds.mapIt(RelayShard.staticSharding(rs.clusterId, it))
rs.shardIds.mapIt(RelayShard(clusterId: rs.clusterId, shardId: it))
func init*(T: type RelayShards, clusterId, shardId: uint16): Result[T, string] =
if shardId > MaxShardIndex: