From 50412d18801ca4004894b87f7018cb21f4edeb14 Mon Sep 17 00:00:00 2001 From: Simon-Pierre Vivier Date: Mon, 19 Jun 2023 18:16:25 -0400 Subject: [PATCH] sharded topic validation & ENR update (#1795) Add sharded topic validation and update the node ENR accordingly. --- apps/networkmonitor/networkmonitor.nim | 2 +- apps/wakunode2/app.nim | 4 ++- examples/v2/publisher.nim | 4 ++- examples/v2/subscriber.nim | 4 ++- tests/v2/test_waku_discv5.nim | 33 ++++++++++++++++++-- tests/v2/test_waku_peer_exchange.nim | 6 ++-- waku/v2/waku_discv5.nim | 42 +++++++++++++++++++++++++- 7 files changed, 86 insertions(+), 9 deletions(-) diff --git a/apps/networkmonitor/networkmonitor.nim b/apps/networkmonitor/networkmonitor.nim index 7e95f49a6..85fc95bab 100644 --- a/apps/networkmonitor/networkmonitor.nim +++ b/apps/networkmonitor/networkmonitor.nim @@ -274,7 +274,7 @@ proc initAndStartNode(conf: NetworkMonitorConf): Result[WakuNode, string] = node.wakuDiscv5 = WakuDiscoveryV5.new( some(extIp), some(nodeTcpPort), some(nodeUdpPort), bindIp, nodeUdpPort, discv5BootstrapEnrs, false, - keys.PrivateKey(nodeKey.skkey), flags, @[], node.rng) + keys.PrivateKey(nodeKey.skkey), flags, @[], node.rng, @[]) node.wakuDiscv5.protocol.open() return ok(node) diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index 137deac76..b5a010dd2 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -439,7 +439,8 @@ proc initNode(conf: WakuNodeConf, flags = netConfig.wakuFlags.get(), multiaddrs = netConfig.enrMultiaddrs, rng = rng, - discv5Config = discv5Config, + conf.topics, + discv5Config = discv5Config )) except CatchableError: return err("failed to create waku discv5 instance: " & getCurrentExceptionMsg()) @@ -512,6 +513,7 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf, pubsubTopics = conf.topicsDeprecated.split(" ") else: pubsubTopics = conf.topics + try: await mountRelay(node, pubsubTopics, peerExchangeHandler = peerExchangeHandler) except CatchableError: diff --git a/examples/v2/publisher.nim b/examples/v2/publisher.nim index 99f9a8101..01ed4470d 100644 --- a/examples/v2/publisher.nim +++ b/examples/v2/publisher.nim @@ -61,7 +61,9 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = bootstrapEnrs = @[bootstrapNodeEnr], privateKey = keys.PrivateKey(nodeKey.skkey), flags = flags, - rng = node.rng) + rng = node.rng, + topics = @[], + ) await node.start() await node.mountRelay() diff --git a/examples/v2/subscriber.nim b/examples/v2/subscriber.nim index 26c23e39a..542d0bc4d 100644 --- a/examples/v2/subscriber.nim +++ b/examples/v2/subscriber.nim @@ -56,7 +56,9 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} = bootstrapEnrs = @[bootstrapNodeEnr], privateKey = keys.PrivateKey(nodeKey.skkey), flags = flags, - rng = node.rng) + rng = node.rng, + topics = @[], + ) await node.start() await node.mountRelay() diff --git a/tests/v2/test_waku_discv5.nim b/tests/v2/test_waku_discv5.nim index 3a56c4576..7820fee0c 100644 --- a/tests/v2/test_waku_discv5.nim +++ b/tests/v2/test_waku_discv5.nim @@ -1,11 +1,10 @@ {.used.} import - std/[sequtils, sets], + std/[sequtils], stew/results, stew/shims/net, chronos, - chronicles, testutils/unittests, libp2p/crypto/crypto as libp2p_keys, eth/keys as eth_keys @@ -280,3 +279,33 @@ procSuite "Waku Discovery v5": # Cleanup await allFutures(node1.stop(), node2.stop(), node3.stop(), node4.stop()) + + asyncTest "get relayShards from topics": + ## Given + let mixedTopics = @["/waku/2/thisisatest", "/waku/2/rs/0/2", "/waku/2/rs/0/8"] + let shardedTopics = @["/waku/2/rs/0/2", "/waku/2/rs/0/4", "/waku/2/rs/0/8"] + let namedTopics = @["/waku/2/thisisatest", "/waku/2/atestthisis", "/waku/2/isthisatest"] + let gibberish = @["aedyttydcb/uioasduyio", "jhdfsjhlsdfjhk/sadjhk", "khfsd/hjfdsgjh/dfs"] + let empty: seq[string] = @[] + + let relayShards = RelayShards.init(0, @[uint16(2), uint16(4), uint16(8)]) + + ## When + + let mixedRes = topicsToRelayShards(mixedTopics) + let shardedRes = topicsToRelayShards(shardedTopics) + let namedRes = topicsToRelayShards(namedTopics) + let gibberishRes = topicsToRelayShards(gibberish) + let emptyRes = topicsToRelayShards(empty) + + ## Then + assert mixedRes.isErr(), $mixedRes.value + assert shardedRes.isOk(), shardedRes.error + assert shardedRes.value.isSome() + assert shardedRes.value.get() == relayShards, $shardedRes.value.get() + assert namedRes.isOk(), namedRes.error + assert namedRes.value.isNone(), $namedRes.value + assert gibberishRes.isErr(), $gibberishRes.value + assert emptyRes.isOk(), emptyRes.error + assert emptyRes.value.isNone(), $emptyRes.value + diff --git a/tests/v2/test_waku_peer_exchange.nim b/tests/v2/test_waku_peer_exchange.nim index 1e310fbf2..a03752bc9 100644 --- a/tests/v2/test_waku_peer_exchange.nim +++ b/tests/v2/test_waku_peer_exchange.nim @@ -110,7 +110,8 @@ procSuite "Waku Peer Exchange": keys.PrivateKey(nodeKey1.skkey), flags, newSeq[MultiAddress](), # Empty multiaddr fields, for now - node1.rng + node1.rng, + newSeq[string]() ) node2.wakuDiscv5 = WakuDiscoveryV5.new( @@ -122,7 +123,8 @@ procSuite "Waku Peer Exchange": keys.PrivateKey(nodeKey2.skkey), flags, newSeq[MultiAddress](), # Empty multiaddr fields, for now - node2.rng + node2.rng, + newSeq[string]() ) ## Given diff --git a/waku/v2/waku_discv5.nim b/waku/v2/waku_discv5.nim index 7616d7af4..41eb97913 100644 --- a/waku/v2/waku_discv5.nim +++ b/waku/v2/waku_discv5.nim @@ -48,6 +48,27 @@ type WakuDiscoveryV5* = ref object protocol*: protocol.Protocol listening*: bool +func topicsToRelayShards*(topics: seq[string]): Result[Option[RelayShards], string] = + if topics.len < 1: + return ok(none(RelayShards)) + + let parsedTopicsRes = topics.mapIt(NsPubsubTopic.parse(it)) + + for res in parsedTopicsRes: + if res.isErr(): + return err("failed to parse topic: " & $res.error) + + if parsedTopicsRes.allIt(it.get().kind == NsPubsubTopicKind.NamedSharding): + return ok(none(RelayShards)) + + if parsedTopicsRes.anyIt(it.get().kind == NsPubsubTopicKind.NamedSharding): + return err("use named topics OR sharded ones not both.") + + if parsedTopicsRes.anyIt(it.get().cluster != parsedTopicsRes[0].get().cluster): + return err("use sharded topics within the same cluster.") + + return ok(some(RelayShards.init(parsedTopicsRes[0].get().cluster, parsedTopicsRes.mapIt(it.get().shard)))) + proc new*(T: type WakuDiscoveryV5, rng: ref HmacDrbgContext, conf: WakuDiscoveryV5Config, record: Option[waku_enr.Record]): T = let protocol = newProtocol( rng = rng, @@ -77,9 +98,19 @@ proc new*(T: type WakuDiscoveryV5, flags: CapabilitiesBitfield, multiaddrs = newSeq[MultiAddress](), rng: ref HmacDrbgContext, - discv5Config: protocol.DiscoveryConfig = protocol.defaultDiscoveryConfig): T {. + topics: seq[string], + discv5Config: protocol.DiscoveryConfig = protocol.defaultDiscoveryConfig + ): T {. deprecated: "use the config and record proc variant instead".}= + let relayShardsRes = topicsToRelayShards(topics) + + let relayShard = + if relayShardsRes.isErr(): + debug "pubsub topic parsing error", reason = relayShardsRes.error + none(RelayShards) + else: relayShardsRes.get() + let record = block: var builder = EnrBuilder.init(privateKey) builder.withIpAddressAndPorts( @@ -89,6 +120,15 @@ proc new*(T: type WakuDiscoveryV5, ) builder.withWakuCapabilities(flags) builder.withMultiaddrs(multiaddrs) + + if relayShard.isSome(): + let res = builder.withWakuRelaySharding(relayShard.get()) + + if res.isErr(): + debug "building ENR with relay sharding failed", reason = res.error + else: + debug "building ENR with relay sharding information", cluster = $relayShard.get().cluster(), shards = $relayShard.get().indices() + builder.build().expect("Record within size limits") let conf = WakuDiscoveryV5Config(