chore: deprecating named sharding (#2723)

This commit is contained in:
gabrielmer 2024-07-09 18:36:12 +03:00 committed by GitHub
parent d0980eba4c
commit e1518cf9ff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 108 additions and 206 deletions

View File

@ -85,7 +85,7 @@ type
topics* {.
desc: "Default topics to subscribe to (space separated list).",
defaultValue: "/waku/2/default-waku/proto",
defaultValue: "/waku/2/rs/0/0",
name: "topics"
.}: string

View File

@ -69,7 +69,7 @@ type Chat2MatterbridgeConf* = object
topics* {.
desc: "Default topics to subscribe to (space separated list)",
defaultValue: "/waku/2/default-waku/proto",
defaultValue: "/waku/2/rs/0/0",
name: "topics"
.}: string

View File

@ -56,7 +56,7 @@ docker compose logs -f receivernode
| ---: | :--- | :--- |
| NUM_MESSAGES | Number of message to publish | 120 |
| DELAY_MESSAGES | Frequency of messages in milliseconds | 1000 |
| PUBSUB | Used pubsub_topic for testing | /waku/2/default-waku/proto |
| PUBSUB | Used pubsub_topic for testing | /waku/2/rs/0/0 |
| CONTENT_TOPIC | content_topic for testing | /tester/1/light-pubsub-example/proto |
### Lite Protocol Tester application cli options
@ -67,7 +67,7 @@ docker compose logs -f receivernode
| --service-node| Address of the service node to use for lightpush and/or filter service | - |
| --num-messages | Number of message to publish | 120 |
| --delay-messages | Frequency of messages in milliseconds | 1000 |
| --pubsub-topic | Used pubsub_topic for testing | /waku/2/default-waku/proto |
| --pubsub-topic | Used pubsub_topic for testing | /waku/2/rs/0/0 |
| --content_topic | content_topic for testing | /tester/1/light-pubsub-example/proto |
| --cluster-id | Cluster id for the test | 0 |
| --config-file | TOML configuration file to fine tune the light waku node<br>Note that some configurations (full node services) are not taken into account | - |

View File

@ -25,5 +25,5 @@ exec /usr/bin/wakunode\
--metrics-server-address=0.0.0.0\
--nodekey=e3f5e64568b3a612dee609f6e7c0203c501dab6131662922bdcbcabd474281d5\
--nat=extip:${IP}\
--pubsub-topic=/waku/2/default-waku/proto\
--pubsub-topic=/waku/2/rs/0/0\
--cluster-id=0

View File

@ -63,7 +63,7 @@ echo "Using service node: ${SERIVCE_NODE_ADDR}"
exec /usr/bin/liteprotocoltester\
--log-level=DEBUG\
--service-node="${SERIVCE_NODE_ADDR}"\
--pubsub-topic=/waku/2/default-waku/proto\
--pubsub-topic=/waku/2/rs/0/0\
--cluster-id=0\
--num-messages=${NUM_MESSAGES}\
--delay-messages=${DELAY_MESSAGES}\

View File

@ -26,7 +26,7 @@ import
export confTomlDefs, confTomlNet, confEnvvarDefs, confEnvvarNet
const
LitePubsubTopic* = PubsubTopic("/waku/2/default-waku/proto")
LitePubsubTopic* = PubsubTopic("/waku/2/rs/0/0")
LiteContentTopic* = ContentTopic("/tester/1/light-pubsub-example/proto")
type TesterFunctionality* = enum

View File

@ -18,7 +18,7 @@ By default a nwaku node will:
See [this tutorial](./configure-key.md) if you want to generate and configure a persistent private key.
- listen for incoming libp2p connections on the default TCP port (`60000`)
- enable `relay` protocol
- subscribe to the default pubsub topic, namely `/waku/2/default-waku/proto`
- subscribe to the default pubsub topic, namely `/waku/2/rs/0/0`
- enable `store` protocol, but only as a client.
This implies that the nwaku node will not persist any historical messages itself,
but can query `store` service peers who do so.
@ -107,7 +107,7 @@ enr=enr:-IO4QDxToTg86pPCK2KvMeVCXC2ADVZWrxXSvNZeaoa0JhShbM5qed69RQz1s1mWEEqJ3aok
## Typical configuration (relay node)
The typical configuration for a nwaku node is to run the `relay` protocol,
subscribed to the default pubsub topic `/waku/2/default-waku/proto`,
subscribed to the default pubsub topic `/waku/2/rs/0/0`,
and connecting to one or more existing peers.
We assume below that running nodes also participate in Discovery v5
to continually discover and connect to random peers for a more robust mesh.

View File

@ -19,7 +19,7 @@ At a high level, when a chat2 client is run with Waku-RLN-Relay mounted in on-ch
Under the hood, the chat2 client constantly listens to the membership contract and keeps itself updated with the latest state of the group.
In the following test setting, the chat2 clients are to be connected to the Waku test fleets as their first hop.
The test fleets will act as routers and are also set to run Waku-RLN-Relay over the same pubsub topic and content topic as chat2 clients i.e., the default pubsub topic of `/waku/2/default-waku/proto` and the content topic of `/toy-chat/3/mingde/proto`.
The test fleets will act as routers and are also set to run Waku-RLN-Relay over the same pubsub topic and content topic as chat2 clients i.e., the default pubsub topic of `/waku/2/rs/0/0` and the content topic of `/toy-chat/3/mingde/proto`.
Spam messages published on the said combination of topics will be caught by the test fleet nodes and will not be routed.
Note that spam protection does not rely on the presence of the test fleets.
In fact, all the chat2 clients are also capable of catching and dropping spam messages if they receive any.

View File

@ -14,7 +14,7 @@ As for the setup, please follow the tutorials below:
- [JS-chat](https://examples.waku.org/rln-js/)
Once you set up your chat client, it will be connected to the Waku v2 test fleets as its first hop.
Messages generated by the chat client are set to be published on a specific combination of pubsub and content topic i.e., the default pubsub topic of `/waku/2/default-waku/proto` and the content topic of `/toy-chat/3/mingde/proto`.
Messages generated by the chat client are set to be published on a specific combination of pubsub and content topic i.e., the default pubsub topic of `/waku/2/rs/0/0` and the content topic of `/toy-chat/3/mingde/proto`.
The test fleets also run Waku-RLN-Relay over the same pubsub topic and content topic.
Test fleets act as routers and enforce the message rate limit.
As such, any spam messages published by a chat client on the said combination of topics will be caught by the Waku v2 test fleet nodes and will not be routed.

View File

@ -317,7 +317,7 @@ int main(int argc, char** argv) {
userData) );
WAKU_CALL( waku_relay_subscribe(ctx,
"/waku/2/default-waku/proto",
"/waku/2/rs/0/0",
event_handler,
userData) );

View File

@ -8,7 +8,7 @@ const
FilterPeer =
"/ip4/34.16.1.67/tcp/30303/p2p/16Uiu2HAmDCp8XJ9z1ev18zuv8NHekAsjNyezAvmMfFEJkiharitG"
# node-01.gc-us-central1-a.waku.test.statusim.net on waku.test
FilterPubsubTopic = PubsubTopic("/waku/2/default-waku/proto")
FilterPubsubTopic = PubsubTopic("/waku/2/rs/0/0")
FilterContentTopic = ContentTopic("/examples/1/light-pubsub-example/proto")
proc unsubscribe(

View File

@ -8,7 +8,7 @@ const
LightpushPeer =
"/ip4/178.128.141.171/tcp/30303/p2p/16Uiu2HAkykgaECHswi3YKJ5dMLbq2kPVCo89fcyTd38UcQD6ej5W"
# node-01.do-ams3.waku.test.statusim.net on waku.test
LightpushPubsubTopic = PubsubTopic("/waku/2/default-waku/proto")
LightpushPubsubTopic = PubsubTopic("/waku/2/rs/0/0")
LightpushContentTopic = ContentTopic("/examples/1/light-pubsub-example/proto")
proc publishMessages(

View File

@ -107,7 +107,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
# Make sure it matches the publisher. Use default value
# see spec: https://rfc.vac.dev/spec/23/
let pubSubTopic = PubsubTopic("/waku/2/default-waku/proto")
let pubSubTopic = PubsubTopic("/waku/2/rs/0/0")
# any content topic can be chosen
let contentTopic = ContentTopic("/examples/1/pubsub-example/proto")

View File

@ -105,7 +105,7 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
# Make sure it matches the publisher. Use default value
# see spec: https://rfc.vac.dev/spec/23/
let pubSubTopic = PubsubTopic("/waku/2/default-waku/proto")
let pubSubTopic = PubsubTopic("/waku/2/rs/0/0")
# any content topic can be chosen. make sure it matches the publisher
let contentTopic = ContentTopic("/examples/1/pubsub-example/proto")

View File

@ -59,7 +59,7 @@ suite "Peer Manager":
# When making an operation that triggers onPeerMetadata
discard await client.filterSubscribe(
some("/waku/2/default-waku/proto"), "waku/lightpush/1", serverRemotePeerInfo
some("/waku/2/rs/0/0"), "waku/lightpush/1", serverRemotePeerInfo
)
await sleepAsync(FUTURE_TIMEOUT)

View File

@ -28,7 +28,7 @@ suite "WakuNode":
node1 = newTestWakuNode(nodeKey1, parseIpAddress("0.0.0.0"), Port(61000))
nodeKey2 = generateSecp256k1Key()
node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), Port(61002))
pubSubTopic = "/waku/2/default-waku/proto"
pubSubTopic = "/waku/2/rs/0/0"
contentTopic = ContentTopic("/waku/2/default-content/proto")
payload = "hello world".toBytes()
message = WakuMessage(payload: payload, contentTopic: contentTopic)

View File

@ -38,7 +38,7 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf =
maxConnections: 50,
maxMessageSize: "1024 KiB",
clusterId: 0,
pubsubTopics: @["/waku/2/rs/1/0"],
pubsubTopics: @["/waku/2/rs/0/0"],
relay: true,
storeMessageDbUrl: "sqlite://store.sqlite3",
)

View File

@ -7,16 +7,16 @@ suite "Waku Message - Deterministic hashing":
test "digest computation - empty meta field":
## Test vector:
##
## pubsub_topic = 0x2f77616b752f322f64656661756c742d77616b752f70726f746f
## pubsub_topic = 2f77616b752f322f72732f302f30
## waku_message.payload = 0x010203045445535405060708
## waku_message.content_topic = 0x2f77616b752f322f64656661756c742d636f6e74656e742f70726f746f
## waku_message.meta = <empty>
## waku_message.ts = 0x175789bfa23f8400
##
## message_hash = 0xa2554498b31f5bcdfcbf7fa58ad1c2d45f0254f3f8110a85588ec3cf10720fd8
## message_hash = 0xcccab07fed94181c83937c8ca8340c9108492b7ede354a6d95421ad34141fd37
## Given
let pubsubTopic = DefaultPubsubTopic # /waku/2/default-waku/proto
let pubsubTopic = DefaultPubsubTopic # /waku/2/rs/0/0
let message = fakeWakuMessage(
contentTopic = DefaultContentTopic, # /waku/2/default-content/proto
payload = "\x01\x02\x03\x04TEST\x05\x06\x07\x08".toBytes(),
@ -29,29 +29,28 @@ suite "Waku Message - Deterministic hashing":
## Then
check:
byteutils.toHex(pubsubTopic.toBytes()) ==
"2f77616b752f322f64656661756c742d77616b752f70726f746f"
byteutils.toHex(pubsubTopic.toBytes()) == "2f77616b752f322f72732f302f30"
byteutils.toHex(message.contentTopic.toBytes()) ==
"2f77616b752f322f64656661756c742d636f6e74656e742f70726f746f"
byteutils.toHex(message.payload) == "010203045445535405060708"
byteutils.toHex(message.meta) == ""
byteutils.toHex(toBytesBE(uint64(message.timestamp))) == "175789bfa23f8400"
messageHash.toHex() ==
"a2554498b31f5bcdfcbf7fa58ad1c2d45f0254f3f8110a85588ec3cf10720fd8"
"cccab07fed94181c83937c8ca8340c9108492b7ede354a6d95421ad34141fd37"
test "digest computation - meta field (12 bytes)":
## Test vector:
##
## pubsub_topic = 0x2f77616b752f322f64656661756c742d77616b752f70726f746f
## pubsub_topic = 0x2f77616b752f322f72732f302f30
## waku_message.payload = 0x010203045445535405060708
## waku_message.content_topic = 0x2f77616b752f322f64656661756c742d636f6e74656e742f70726f746f
## waku_message.meta = 0x73757065722d736563726574
## waku_message.ts = 0x175789bfa23f8400
##
## message_hash = 0x64cce733fed134e83da02b02c6f689814872b1a0ac97ea56b76095c3c72bfe05
## message_hash = 0xb9b4852f9d8c489846e8bfc6c5ca6a1a8d460a40d28832a966e029eb39619199
## Given
let pubsubTopic = DefaultPubsubTopic # /waku/2/default-waku/proto
let pubsubTopic = DefaultPubsubTopic # /waku/2/rs/0/0
let message = fakeWakuMessage(
contentTopic = DefaultContentTopic, # /waku/2/default-content/proto
payload = "\x01\x02\x03\x04TEST\x05\x06\x07\x08".toBytes(),
@ -64,29 +63,28 @@ suite "Waku Message - Deterministic hashing":
## Then
check:
byteutils.toHex(pubsubTopic.toBytes()) ==
"2f77616b752f322f64656661756c742d77616b752f70726f746f"
byteutils.toHex(pubsubTopic.toBytes()) == "2f77616b752f322f72732f302f30"
byteutils.toHex(message.contentTopic.toBytes()) ==
"2f77616b752f322f64656661756c742d636f6e74656e742f70726f746f"
byteutils.toHex(message.payload) == "010203045445535405060708"
byteutils.toHex(message.meta) == "73757065722d736563726574"
byteutils.toHex(toBytesBE(uint64(message.timestamp))) == "175789bfa23f8400"
messageHash.toHex() ==
"64cce733fed134e83da02b02c6f689814872b1a0ac97ea56b76095c3c72bfe05"
"b9b4852f9d8c489846e8bfc6c5ca6a1a8d460a40d28832a966e029eb39619199"
test "digest computation - meta field (64 bytes)":
## Test vector:
##
## pubsub_topic = 0x2f77616b752f322f64656661756c742d77616b752f70726f746f
## pubsub_topic = 0x2f77616b752f322f72732f302f30
## waku_message.payload = 0x010203045445535405060708
## waku_message.content_topic = 0x2f77616b752f322f64656661756c742d636f6e74656e742f70726f746f
## waku_message.meta = 0x000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f303132333435363738393a3b3c3d3e3f
## waku_message.ts = 0x175789bfa23f8400
##
## message_hash = 0x7158b6498753313368b9af8f6e0a0a05104f68f972981da42a43bc53fb0c1b27"
## message_hash = 0x653460d04f66c5b11814d235152f4f246e6f03ef80a305a825913636fbafd0ba
## Given
let pubsubTopic = DefaultPubsubTopic # /waku/2/default-waku/proto
let pubsubTopic = DefaultPubsubTopic # /waku/2/rs/0/0
let message = fakeWakuMessage(
contentTopic = DefaultContentTopic, # /waku/2/default-content/proto
payload = "\x01\x02\x03\x04TEST\x05\x06\x07\x08".toBytes(),
@ -99,8 +97,7 @@ suite "Waku Message - Deterministic hashing":
## Then
check:
byteutils.toHex(pubsubTopic.toBytes()) ==
"2f77616b752f322f64656661756c742d77616b752f70726f746f"
byteutils.toHex(pubsubTopic.toBytes()) == "2f77616b752f322f72732f302f30"
byteutils.toHex(message.contentTopic.toBytes()) ==
"2f77616b752f322f64656661756c742d636f6e74656e742f70726f746f"
byteutils.toHex(message.payload) == "010203045445535405060708"
@ -108,21 +105,21 @@ suite "Waku Message - Deterministic hashing":
"000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f303132333435363738393a3b3c3d3e3f"
byteutils.toHex(toBytesBE(uint64(message.timestamp))) == "175789bfa23f8400"
messageHash.toHex() ==
"7158b6498753313368b9af8f6e0a0a05104f68f972981da42a43bc53fb0c1b27"
"653460d04f66c5b11814d235152f4f246e6f03ef80a305a825913636fbafd0ba"
test "digest computation - zero length payload":
## Test vector:
##
## pubsub_topic = 0x2f77616b752f322f64656661756c742d77616b752f70726f746f
## pubsub_topic = 0x2f77616b752f322f72732f302f30
## waku_message.payload = []
## waku_message.content_topic = 0x2f77616b752f322f64656661756c742d636f6e74656e742f70726f746f
## waku_message.meta = 0x73757065722d736563726574
## waku_message.ts = 0x175789bfa23f8400
##
## message_hash = 0x483ea950cb63f9b9d6926b262bb36194d3f40a0463ce8446228350bd44e96de4
## message_hash = 0x0f6448cc23b2db6c696aa6ab4b693eff4cf3549ff346fe1dbeb281697396a09f
## Given
let pubsubTopic = DefaultPubsubTopic # /waku/2/default-waku/proto
let pubsubTopic = DefaultPubsubTopic # /waku/2/rs/0/0
let message = fakeWakuMessage(
contentTopic = DefaultContentTopic, # /waku/2/default-content/proto
payload = newSeq[byte](),
@ -136,7 +133,7 @@ suite "Waku Message - Deterministic hashing":
## Then
check:
messageHash.toHex() ==
"483ea950cb63f9b9d6926b262bb36194d3f40a0463ce8446228350bd44e96de4"
"0f6448cc23b2db6c696aa6ab4b693eff4cf3549ff346fe1dbeb281697396a09f"
test "waku message - check meta size is enforced":
# create message with meta size > 64 bytes (invalid)

View File

@ -134,17 +134,6 @@ suite "Waku Message - Content topics namespacing":
err.cause == "generation should be a numeric value"
suite "Waku Message - Pub-sub topics namespacing":
test "Stringify named sharding pub-sub topic":
## Given
var ns = NsPubsubTopic.named("waku-dev")
## When
let topic = $ns
## Then
check:
topic == "/waku/2/waku-dev"
test "Stringify static sharding pub-sub topic":
## Given
var ns = NsPubsubTopic.staticSharding(clusterId = 0, shardId = 2)
@ -156,7 +145,7 @@ suite "Waku Message - Pub-sub topics namespacing":
check:
topic == "/waku/2/rs/0/2"
test "Parse named pub-sub topic string - Valid string":
test "Parse invalid pub-sub topic string":
## Given
let topic = "/waku/2/waku-dev"
@ -164,11 +153,10 @@ suite "Waku Message - Pub-sub topics namespacing":
let nsRes = NsPubsubTopic.parse(topic)
## Then
check nsRes.isOk()
let ns = nsRes.get()
check nsRes.isErr()
let err = nsRes.tryError()
check:
ns.name == "waku-dev"
err.kind == ParsingErrorKind.InvalidFormat
test "Parse static sharding pub-sub topic string - Valid string":
## Given

View File

@ -18,27 +18,3 @@ suite "Static Sharding Functionality":
topic.clusterId == 0
topic.shardId == 1
topic == "/waku/2/rs/0/1"
suite "Automatic Sharding Mechanics":
test "Shard Selection Algorithm":
let
topic1 = NsPubsubTopic.parseNamedSharding("/waku/2/xxx").get()
topic2 = NsPubsubTopic.parseNamedSharding("/waku/2/123").get()
topic3 = NsPubsubTopic.parseNamedSharding("/waku/2/xxx123").get()
check:
# topic1.shardId == 1
# topic1.clusterId == 0
topic1 == NsPubsubTopic.staticSharding(0, 1)
# topic2.shardId == 1
# topic2.clusterId == 0
topic2 == NsPubsubTopic.staticSharding(0, 1)
# topic3.shardId == 1
# topic3.clusterId == 0
topic3 == NsPubsubTopic.staticSharding(0, 1)
test "Shard Selection Algorithm without topicName":
let topicResult = NsPubsubTopic.parseNamedSharding("/waku/2/")
check:
topicResult.isErr()

View File

@ -1,7 +1,7 @@
{.used.}
import
std/[options, sequtils, strutils],
std/[options, sequtils, strutils, strformat],
stew/shims/net as stewNet,
testutils/unittests,
chronicles,
@ -430,8 +430,8 @@ suite "Waku Relay":
asyncTest "How multiple interconnected nodes work":
# Given two other pubsub topics
let
pubsubTopicB = "pubsub-topic-b"
pubsubTopicC = "pubsub-topic-c"
pubsubTopicB = "/waku/2/rs/0/1"
pubsubTopicC = "/waku/2/rs/0/2"
# Given two other nodes connected to the first one
let
@ -767,13 +767,17 @@ suite "Waku Relay":
asyncTest "Single Node with Multiple Pubsub Topics":
# Given other pubsub topic
let pubsubTopicB = "pubsub-topic-b"
let pubsubTopicB = "/waku/2/rs/0/1"
# Given a node subscribed to multiple pubsub topics
let
topicHandler = node.subscribe(pubsubTopic, simpleFutureHandler)
topicHandlerB = node.subscribe(pubsubTopicB, simpleFutureHandler)
check node.subscribedTopics == @[pubsubTopic, pubsubTopicB]
assert pubsubTopic in node.subscribedTopics,
fmt"Node is not subscribed to {pubsubTopic}"
assert pubsubTopicB in node.subscribedTopics,
fmt"Node is not subscribed to {pubsubTopicB}"
# When unsubscribing from one of the pubsub topics
node.unsubscribe(pubsubTopic, topicHandler)
@ -811,14 +815,17 @@ suite "Waku Relay":
asyncTest "Single Node with Multiple Pubsub Topics":
# Given other pubsub topic
let pubsubTopicB = "pubsub-topic-b"
let pubsubTopicB = "/waku/2/rs/0/1"
# Given a node subscribed to multiple pubsub topics
discard node.subscribe(pubsubTopic, simpleFutureHandler)
discard node.subscribe(pubsubTopic, simpleFutureHandler)
discard node.subscribe(pubsubTopicB, simpleFutureHandler)
check node.subscribedTopics == @[pubsubTopic, pubsubTopicB]
assert pubsubTopic in node.subscribedTopics,
fmt"Node is not subscribed to {pubsubTopic}"
assert pubsubTopicB in node.subscribedTopics,
fmt"Node is not subscribed to {pubsubTopicB}"
# When unsubscribing all handlers from pubsubTopic
node.unsubscribeAll(pubsubTopic)
@ -1042,11 +1049,11 @@ suite "Waku Relay":
) # 100KiB
msg4 = fakeWakuMessage(
contentTopic = contentTopic,
payload = getByteSequence(DefaultMaxWakuMessageSize - sizeEmptyMsg - 38),
payload = getByteSequence(DefaultMaxWakuMessageSize - sizeEmptyMsg - 26),
) # Max Size (Inclusive Limit)
msg5 = fakeWakuMessage(
contentTopic = contentTopic,
payload = getByteSequence(DefaultMaxWakuMessageSize - sizeEmptyMsg - 37),
payload = getByteSequence(DefaultMaxWakuMessageSize - sizeEmptyMsg - 25),
) # Max Size (Exclusive Limit)
msg6 = fakeWakuMessage(
contentTopic = contentTopic,

View File

@ -152,7 +152,7 @@ suite "Waku v2 Rest API - Relay":
restPort = restServer.httpServer.address.port # update with bound port for client use
let pubSubTopic = "/waku/2/default-waku/proto"
let pubSubTopic = "/waku/2/rs/0/0"
var messages =
@[

View File

@ -305,12 +305,23 @@ type WakuNodeConf* = object
pubsubTopics* {.
desc: "Default pubsub topic to subscribe to. Argument may be repeated.",
defaultValue: @[],
name: "pubsub-topic"
.}: seq[string]
shards* {.
desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.",
defaultValue: @[],
defaultValue:
@[
uint16(0),
uint16(1),
uint16(2),
uint16(3),
uint16(4),
uint16(5),
uint16(6),
uint16(7),
],
name: "shard"
.}: seq[uint16]

View File

@ -30,19 +30,24 @@ proc enrConfiguration*(
var shards = newSeq[uint16]()
# no shards configured
if conf.shards.len == 0:
let shardsOpt = topicsToRelayShards(conf.pubsubTopics).valueOr:
error "failed to parse pubsub topic, please format according to static shard specification",
error = $error
return err("failed to parse pubsub topic: " & $error)
if shardsOpt.isSome():
shards = shardsOpt.get().shardIds
else:
info "no pubsub topics specified or pubsubtopic is of type Named sharding "
# some shards configured
else:
let shardsOpt = topicsToRelayShards(conf.pubsubTopics).valueOr:
error "failed to parse pubsub topic, please format according to static shard specification",
error = $error
return err("failed to parse pubsub topic: " & $error)
if shardsOpt.isSome():
let relayShards = shardsOpt.get()
if relayShards.clusterid != conf.clusterId:
error "pubsub topic corresponds to different shard than configured",
nodeCluster = conf.clusterId, pubsubCluster = relayShards.clusterid
return err("pubsub topic corresponds to different shard than configured")
shards = relayShards.shardIds
elif conf.shards.len > 0:
shards = toSeq(conf.shards.mapIt(uint16(it)))
else:
info "no pubsub topics specified"
enrBuilder.withWakuRelaySharding(
RelayShards(clusterId: conf.clusterId, shardIds: shards)

View File

@ -14,17 +14,6 @@ type ClusterConf* = object
discv5Discovery*: bool
discv5BootstrapNodes*: seq[string]
# cluster-id=0
# Cluster configuration for the default pubsub topic. Note that it
# overrides existing cli configuration
proc ClusterZeroConf*(T: type ClusterConf): ClusterConf =
return ClusterConf(
clusterId: 0,
pubsubTopics:
@["/waku/2/default-waku/proto"] # TODO: Add more config such as bootstrap, etc
,
)
# cluster-id=1 (aka The Waku Network)
# Cluster configuration corresponding to The Waku Network. Note that it
# overrides existing cli configuration

View File

@ -93,12 +93,6 @@ proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] =
case confCopy.clusterId
# cluster-id=0
of 0:
let clusterZeroConf = ClusterConf.ClusterZeroConf()
confCopy.pubsubTopics = clusterZeroConf.pubsubTopics
# TODO: Write some template to "merge" the configs
# cluster-id=1 (aka The Waku Network)
of 1:
let twnClusterConf = ClusterConf.TheWakuNetworkConf()

View File

@ -356,11 +356,6 @@ proc onConnEvent(pm: PeerManager, peerId: PeerID, event: ConnEvent) {.async.} =
discard
proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
# To prevent metadata protocol from breaking prev nodes, by now we only
# disconnect if the clusterid is specified.
if pm.wakuMetadata.clusterId == 0:
return
let res = catch:
await pm.switch.dial(peerId, WakuMetadataCodec)

View File

@ -420,7 +420,7 @@ proc mountRelay*(
node.switch.mount(node.wakuRelay, protocolMatcher(WakuRelayCodec))
info "relay mounted successfully"
info "relay mounted successfully", pubsubTopics = pubsubTopics
# Subscribe to topics
for pubsubTopic in pubsubTopics:

View File

@ -13,40 +13,23 @@ export parsing
type PubsubTopic* = string
const DefaultPubsubTopic* = PubsubTopic("/waku/2/default-waku/proto")
const DefaultPubsubTopic* = PubsubTopic("/waku/2/rs/0/0")
## Namespaced pub-sub topic
type NsPubsubTopicKind* {.pure.} = enum
StaticSharding
NamedSharding
type NsPubsubTopic* = object
case kind*: NsPubsubTopicKind
of NsPubsubTopicKind.StaticSharding:
clusterId*: uint16
shardId*: uint16
of NsPubsubTopicKind.NamedSharding:
name*: string
clusterId*: uint16
shardId*: uint16
proc staticSharding*(T: type NsPubsubTopic, clusterId, shardId: uint16): T =
NsPubsubTopic(
kind: NsPubsubTopicKind.StaticSharding, clusterId: clusterId, shardId: shardId
)
proc named*(T: type NsPubsubTopic, name: string): T =
NsPubsubTopic(kind: NsPubsubTopicKind.NamedSharding, name: name)
return NsPubsubTopic(clusterId: clusterId, shardId: shardId)
# Serialization
proc `$`*(topic: NsPubsubTopic): string =
## Returns a string representation of a namespaced topic
## in the format `/waku/2/<raw-topic>
case topic.kind
of NsPubsubTopicKind.NamedSharding:
"/waku/2/" & topic.name
of NsPubsubTopicKind.StaticSharding:
"/waku/2/rs/" & $topic.clusterId & "/" & $topic.shardId
## in the format `/waku/2/rs/<cluster-id>/<shard-id>
return "/waku/2/rs/" & $topic.clusterId & "/" & $topic.shardId
# Deserialization
@ -55,7 +38,7 @@ const
StaticShardingPubsubTopicPrefix = Waku2PubsubTopicPrefix & "/rs"
proc parseStaticSharding*(
T: type NsPubsubTopic, topic: PubsubTopic | string
T: type NsPubsubTopic, topic: PubsubTopic
): ParsingResult[NsPubsubTopic] =
if not topic.startsWith(StaticShardingPubsubTopicPrefix):
return err(
@ -86,27 +69,10 @@ proc parseStaticSharding*(
ok(NsPubsubTopic.staticSharding(clusterId, shardId))
proc parseNamedSharding*(
T: type NsPubsubTopic, topic: PubsubTopic | string
): ParsingResult[NsPubsubTopic] =
if not topic.startsWith(Waku2PubsubTopicPrefix):
return err(ParsingError.invalidFormat("must start with " & Waku2PubsubTopicPrefix))
let raw = topic[8 ..< topic.len]
if raw.len == 0:
return err(ParsingError.missingPart("topic-name"))
ok(NsPubsubTopic.named(name = raw))
proc parse*(
T: type NsPubsubTopic, topic: PubsubTopic | string
): ParsingResult[NsPubsubTopic] =
proc parse*(T: type NsPubsubTopic, topic: PubsubTopic): ParsingResult[NsPubsubTopic] =
## Splits a namespaced topic string into its constituent parts.
## The topic string has to be in the format `/<application>/<version>/<topic-name>/<encoding>`
if topic.startsWith(StaticShardingPubsubTopicPrefix):
NsPubsubTopic.parseStaticSharding(topic)
else:
NsPubsubTopic.parseNamedSharding(topic)
NsPubsubTopic.parseStaticSharding(topic)
# Pubsub topic compatibility
@ -114,21 +80,10 @@ converter toPubsubTopic*(topic: NsPubsubTopic): PubsubTopic =
$topic
proc `==`*[T: NsPubsubTopic](x, y: T): bool =
case y.kind
of NsPubsubTopicKind.StaticSharding:
if x.kind != NsPubsubTopicKind.StaticSharding:
return false
if x.clusterId != y.clusterId:
return false
if x.clusterId != y.clusterId:
return false
if x.shardId != y.shardId:
return false
if x.shardId != y.shardId:
return false
of NsPubsubTopicKind.NamedSharding:
if x.kind != NsPubsubTopicKind.NamedSharding:
return false
if x.name != y.name:
return false
true
return true

View File

@ -67,12 +67,6 @@ func topicsToRelayShards*(topics: seq[string]): Result[Option[RelayShards], stri
if res.isErr():
return err("failed to parse topic: " & $res.error)
if parsedTopicsRes.allIt(it.get().kind == NsPubsubTopicKind.NamedSharding):
return ok(none(RelayShards))
if parsedTopicsRes.anyIt(it.get().kind == NsPubsubTopicKind.NamedSharding):
return err("use named (/waku/2/*) OR static (/waku/2/rs/*/*) shards not both.")
if parsedTopicsRes.anyIt(it.get().clusterId != parsedTopicsRes[0].get().clusterId):
return err("use shards with the same cluster Id.")
@ -84,15 +78,12 @@ func topicsToRelayShards*(topics: seq[string]): Result[Option[RelayShards], stri
return ok(some(relayShard))
func contains*(rs: RelayShards, clusterId, shardId: uint16): bool =
rs.clusterId == clusterId and rs.shardIds.contains(shardId)
return rs.clusterId == clusterId and rs.shardIds.contains(shardId)
func contains*(rs: RelayShards, topic: NsPubsubTopic): bool =
if topic.kind != NsPubsubTopicKind.StaticSharding:
return false
return rs.contains(topic.clusterId, topic.shardId)
rs.contains(topic.clusterId, topic.shardId)
func contains*(rs: RelayShards, topic: PubsubTopic | string): bool =
func contains*(rs: RelayShards, topic: PubsubTopic): bool =
let parseRes = NsPubsubTopic.parse(topic)
if parseRes.isErr():
return false
@ -245,12 +236,9 @@ proc containsShard*(r: Record, clusterId, shardId: uint16): bool =
rs.contains(clusterId, shardId)
proc containsShard*(r: Record, topic: NsPubsubTopic): bool =
if topic.kind != NsPubsubTopicKind.StaticSharding:
return false
return containsShard(r, topic.clusterId, topic.shardId)
containsShard(r, topic.clusterId, topic.shardId)
proc containsShard*(r: Record, topic: PubsubTopic | string): bool =
proc containsShard*(r: Record, topic: PubsubTopic): bool =
let parseRes = NsPubsubTopic.parse(topic)
if parseRes.isErr():
debug "invalid static sharding topic", topic = topic, error = parseRes.error

View File

@ -130,9 +130,6 @@ proc subscriptionsListener(wm: WakuMetadata) {.async.} =
let parsedTopic = NsPubsubTopic.parse(event.topic).valueOr:
continue
if parsedTopic.kind != NsPubsubTopicKind.StaticSharding:
continue
if parsedTopic.clusterId != wm.clusterId:
continue