mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-14 00:46:44 +00:00
sharded topic validation & ENR update (#1795)
Add sharded topic validation and update the node ENR accordingly.
This commit is contained in:
parent
e9028618fd
commit
50412d1880
@ -274,7 +274,7 @@ proc initAndStartNode(conf: NetworkMonitorConf): Result[WakuNode, string] =
|
||||
node.wakuDiscv5 = WakuDiscoveryV5.new(
|
||||
some(extIp), some(nodeTcpPort), some(nodeUdpPort),
|
||||
bindIp, nodeUdpPort, discv5BootstrapEnrs, false,
|
||||
keys.PrivateKey(nodeKey.skkey), flags, @[], node.rng)
|
||||
keys.PrivateKey(nodeKey.skkey), flags, @[], node.rng, @[])
|
||||
|
||||
node.wakuDiscv5.protocol.open()
|
||||
return ok(node)
|
||||
|
@ -439,7 +439,8 @@ proc initNode(conf: WakuNodeConf,
|
||||
flags = netConfig.wakuFlags.get(),
|
||||
multiaddrs = netConfig.enrMultiaddrs,
|
||||
rng = rng,
|
||||
discv5Config = discv5Config,
|
||||
conf.topics,
|
||||
discv5Config = discv5Config
|
||||
))
|
||||
except CatchableError:
|
||||
return err("failed to create waku discv5 instance: " & getCurrentExceptionMsg())
|
||||
@ -512,6 +513,7 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf,
|
||||
pubsubTopics = conf.topicsDeprecated.split(" ")
|
||||
else:
|
||||
pubsubTopics = conf.topics
|
||||
|
||||
try:
|
||||
await mountRelay(node, pubsubTopics, peerExchangeHandler = peerExchangeHandler)
|
||||
except CatchableError:
|
||||
|
@ -61,7 +61,9 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
|
||||
bootstrapEnrs = @[bootstrapNodeEnr],
|
||||
privateKey = keys.PrivateKey(nodeKey.skkey),
|
||||
flags = flags,
|
||||
rng = node.rng)
|
||||
rng = node.rng,
|
||||
topics = @[],
|
||||
)
|
||||
|
||||
await node.start()
|
||||
await node.mountRelay()
|
||||
|
@ -56,7 +56,9 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
|
||||
bootstrapEnrs = @[bootstrapNodeEnr],
|
||||
privateKey = keys.PrivateKey(nodeKey.skkey),
|
||||
flags = flags,
|
||||
rng = node.rng)
|
||||
rng = node.rng,
|
||||
topics = @[],
|
||||
)
|
||||
|
||||
await node.start()
|
||||
await node.mountRelay()
|
||||
|
@ -1,11 +1,10 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[sequtils, sets],
|
||||
std/[sequtils],
|
||||
stew/results,
|
||||
stew/shims/net,
|
||||
chronos,
|
||||
chronicles,
|
||||
testutils/unittests,
|
||||
libp2p/crypto/crypto as libp2p_keys,
|
||||
eth/keys as eth_keys
|
||||
@ -280,3 +279,33 @@ procSuite "Waku Discovery v5":
|
||||
|
||||
# Cleanup
|
||||
await allFutures(node1.stop(), node2.stop(), node3.stop(), node4.stop())
|
||||
|
||||
asyncTest "get relayShards from topics":
|
||||
## Given
|
||||
let mixedTopics = @["/waku/2/thisisatest", "/waku/2/rs/0/2", "/waku/2/rs/0/8"]
|
||||
let shardedTopics = @["/waku/2/rs/0/2", "/waku/2/rs/0/4", "/waku/2/rs/0/8"]
|
||||
let namedTopics = @["/waku/2/thisisatest", "/waku/2/atestthisis", "/waku/2/isthisatest"]
|
||||
let gibberish = @["aedyttydcb/uioasduyio", "jhdfsjhlsdfjhk/sadjhk", "khfsd/hjfdsgjh/dfs"]
|
||||
let empty: seq[string] = @[]
|
||||
|
||||
let relayShards = RelayShards.init(0, @[uint16(2), uint16(4), uint16(8)])
|
||||
|
||||
## When
|
||||
|
||||
let mixedRes = topicsToRelayShards(mixedTopics)
|
||||
let shardedRes = topicsToRelayShards(shardedTopics)
|
||||
let namedRes = topicsToRelayShards(namedTopics)
|
||||
let gibberishRes = topicsToRelayShards(gibberish)
|
||||
let emptyRes = topicsToRelayShards(empty)
|
||||
|
||||
## Then
|
||||
assert mixedRes.isErr(), $mixedRes.value
|
||||
assert shardedRes.isOk(), shardedRes.error
|
||||
assert shardedRes.value.isSome()
|
||||
assert shardedRes.value.get() == relayShards, $shardedRes.value.get()
|
||||
assert namedRes.isOk(), namedRes.error
|
||||
assert namedRes.value.isNone(), $namedRes.value
|
||||
assert gibberishRes.isErr(), $gibberishRes.value
|
||||
assert emptyRes.isOk(), emptyRes.error
|
||||
assert emptyRes.value.isNone(), $emptyRes.value
|
||||
|
||||
|
@ -110,7 +110,8 @@ procSuite "Waku Peer Exchange":
|
||||
keys.PrivateKey(nodeKey1.skkey),
|
||||
flags,
|
||||
newSeq[MultiAddress](), # Empty multiaddr fields, for now
|
||||
node1.rng
|
||||
node1.rng,
|
||||
newSeq[string]()
|
||||
)
|
||||
|
||||
node2.wakuDiscv5 = WakuDiscoveryV5.new(
|
||||
@ -122,7 +123,8 @@ procSuite "Waku Peer Exchange":
|
||||
keys.PrivateKey(nodeKey2.skkey),
|
||||
flags,
|
||||
newSeq[MultiAddress](), # Empty multiaddr fields, for now
|
||||
node2.rng
|
||||
node2.rng,
|
||||
newSeq[string]()
|
||||
)
|
||||
|
||||
## Given
|
||||
|
@ -48,6 +48,27 @@ type WakuDiscoveryV5* = ref object
|
||||
protocol*: protocol.Protocol
|
||||
listening*: bool
|
||||
|
||||
func topicsToRelayShards*(topics: seq[string]): Result[Option[RelayShards], string] =
|
||||
if topics.len < 1:
|
||||
return ok(none(RelayShards))
|
||||
|
||||
let parsedTopicsRes = topics.mapIt(NsPubsubTopic.parse(it))
|
||||
|
||||
for res in parsedTopicsRes:
|
||||
if res.isErr():
|
||||
return err("failed to parse topic: " & $res.error)
|
||||
|
||||
if parsedTopicsRes.allIt(it.get().kind == NsPubsubTopicKind.NamedSharding):
|
||||
return ok(none(RelayShards))
|
||||
|
||||
if parsedTopicsRes.anyIt(it.get().kind == NsPubsubTopicKind.NamedSharding):
|
||||
return err("use named topics OR sharded ones not both.")
|
||||
|
||||
if parsedTopicsRes.anyIt(it.get().cluster != parsedTopicsRes[0].get().cluster):
|
||||
return err("use sharded topics within the same cluster.")
|
||||
|
||||
return ok(some(RelayShards.init(parsedTopicsRes[0].get().cluster, parsedTopicsRes.mapIt(it.get().shard))))
|
||||
|
||||
proc new*(T: type WakuDiscoveryV5, rng: ref HmacDrbgContext, conf: WakuDiscoveryV5Config, record: Option[waku_enr.Record]): T =
|
||||
let protocol = newProtocol(
|
||||
rng = rng,
|
||||
@ -77,9 +98,19 @@ proc new*(T: type WakuDiscoveryV5,
|
||||
flags: CapabilitiesBitfield,
|
||||
multiaddrs = newSeq[MultiAddress](),
|
||||
rng: ref HmacDrbgContext,
|
||||
discv5Config: protocol.DiscoveryConfig = protocol.defaultDiscoveryConfig): T {.
|
||||
topics: seq[string],
|
||||
discv5Config: protocol.DiscoveryConfig = protocol.defaultDiscoveryConfig
|
||||
): T {.
|
||||
deprecated: "use the config and record proc variant instead".}=
|
||||
|
||||
let relayShardsRes = topicsToRelayShards(topics)
|
||||
|
||||
let relayShard =
|
||||
if relayShardsRes.isErr():
|
||||
debug "pubsub topic parsing error", reason = relayShardsRes.error
|
||||
none(RelayShards)
|
||||
else: relayShardsRes.get()
|
||||
|
||||
let record = block:
|
||||
var builder = EnrBuilder.init(privateKey)
|
||||
builder.withIpAddressAndPorts(
|
||||
@ -89,6 +120,15 @@ proc new*(T: type WakuDiscoveryV5,
|
||||
)
|
||||
builder.withWakuCapabilities(flags)
|
||||
builder.withMultiaddrs(multiaddrs)
|
||||
|
||||
if relayShard.isSome():
|
||||
let res = builder.withWakuRelaySharding(relayShard.get())
|
||||
|
||||
if res.isErr():
|
||||
debug "building ENR with relay sharding failed", reason = res.error
|
||||
else:
|
||||
debug "building ENR with relay sharding information", cluster = $relayShard.get().cluster(), shards = $relayShard.get().indices()
|
||||
|
||||
builder.build().expect("Record within size limits")
|
||||
|
||||
let conf = WakuDiscoveryV5Config(
|
||||
|
Loading…
x
Reference in New Issue
Block a user