diff --git a/tests/test_waku_discv5.nim b/tests/test_waku_discv5.nim index dd9e4fd04..bebdb5fe3 100644 --- a/tests/test_waku_discv5.nim +++ b/tests/test_waku_discv5.nim @@ -276,7 +276,7 @@ procSuite "Waku Discovery v5": # Cleanup await allFutures(node1.stop(), node2.stop(), node3.stop(), node4.stop()) - asyncTest "get relayShards from topics": + asyncTest "get shards from topics": ## Given let mixedTopics = @["/waku/2/thisisatest", "/waku/2/rs/0/2", "/waku/2/rs/0/8"] let shardedTopics = @["/waku/2/rs/0/2", "/waku/2/rs/0/4", "/waku/2/rs/0/8"] @@ -284,7 +284,7 @@ procSuite "Waku Discovery v5": let gibberish = @["aedyttydcb/uioasduyio", "jhdfsjhlsdfjhk/sadjhk", "khfsd/hjfdsgjh/dfs"] let empty: seq[string] = @[] - let relayShards = RelayShards.init(0, @[uint16(2), uint16(4), uint16(8)]).expect("Valid Shards") + let shardsTopics = RelayShards.init(0, @[uint16(2), uint16(4), uint16(8)]).expect("Valid shardIds") ## When @@ -298,7 +298,7 @@ procSuite "Waku Discovery v5": assert mixedRes.isErr(), $mixedRes.value assert shardedRes.isOk(), shardedRes.error assert shardedRes.value.isSome() - assert shardedRes.value.get() == relayShards, $shardedRes.value.get() + assert shardedRes.value.get() == shardsTopics, $shardedRes.value.get() assert namedRes.isOk(), namedRes.error assert namedRes.value.isNone(), $namedRes.value assert gibberishRes.isErr(), $gibberishRes.value @@ -313,13 +313,13 @@ procSuite "Waku Discovery v5": enrPrivKey = generatesecp256k1key() let - shardCluster: uint16 = 21 - shardIndices: seq[uint16] = @[1u16, 2u16, 5u16, 7u16, 9u16, 11u16] + clusterId: uint16 = 21 + shardIds: seq[uint16] = @[1u16, 2u16, 5u16, 7u16, 9u16, 11u16] - let shards = RelayShards.init(shardCluster, shardIndices).expect("Valid Shards") + let shardsTopics = RelayShards.init(clusterId, shardIds).expect("Valid shardIds") var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum) - require builder.withWakuRelaySharding(shards).isOk() + require builder.withWakuRelaySharding(shardsTopics).isOk() let recordRes = builder.build() require recordRes.isOk() @@ -331,13 +331,13 @@ procSuite "Waku Discovery v5": enrPrivKey = generatesecp256k1key() let - shardCluster: uint16 = 22 - shardIndices: seq[uint16] = @[2u16, 4u16, 5u16, 8u16, 10u16, 12u16] + clusterId: uint16 = 22 + shardIds: seq[uint16] = @[2u16, 4u16, 5u16, 8u16, 10u16, 12u16] - let shards = RelayShards.init(shardCluster, shardIndices).expect("Valid Shards") + let shardsTopics = RelayShards.init(clusterId, shardIds).expect("Valid shardIds") var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum) - require builder.withWakuRelaySharding(shards).isOk() + require builder.withWakuRelaySharding(shardsTopics).isOk() let recordRes = builder.build() require recordRes.isOk() @@ -349,13 +349,13 @@ procSuite "Waku Discovery v5": enrPrivKey = generatesecp256k1key() let - shardCluster: uint16 = 22 - shardIndices: seq[uint16] = @[1u16, 3u16, 6u16, 7u16, 9u16, 11u16] + clusterId: uint16 = 22 + shardIds: seq[uint16] = @[1u16, 3u16, 6u16, 7u16, 9u16, 11u16] - let shards = RelayShards.init(shardCluster, shardIndices).expect("Valid Shards") + let shardsTopics = RelayShards.init(clusterId, shardIds).expect("Valid shardIds") var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum) - require builder.withWakuRelaySharding(shards).isOk() + require builder.withWakuRelaySharding(shardsTopics).isOk() let recordRes = builder.build() require recordRes.isOk() diff --git a/tests/test_waku_enr.nim b/tests/test_waku_enr.nim index aebd4e5ef..848fb64ea 100644 --- a/tests/test_waku_enr.nim +++ b/tests/test_waku_enr.nim @@ -255,71 +255,71 @@ suite "Waku ENR - Multiaddresses": suite "Waku ENR - Relay static sharding": - test "new relay shards field with single invalid index": + test "new relay shards object with single invalid shard id": ## Given let - shardCluster: uint16 = 22 - shardIndex: uint16 = 1024 + clusterId: uint16 = 22 + shard: uint16 = 1024 ## When - let res = RelayShards.init(shardCluster, shardIndex) + let shardsTopics = RelayShards.init(clusterId, shard) ## Then - assert res.isErr(), $res.get() + assert shardsTopics.isErr(), $shardsTopics.get() - test "new relay shards field with single invalid index in list": + test "new relay shards object with single invalid shard id in list": ## Given let - shardCluster: uint16 = 22 - shardIndices: seq[uint16] = @[1u16, 1u16, 2u16, 3u16, 5u16, 8u16, 1024u16] + clusterId: uint16 = 22 + shardIds: seq[uint16] = @[1u16, 1u16, 2u16, 3u16, 5u16, 8u16, 1024u16] ## When - let res = RelayShards.init(shardCluster, shardIndices) + let shardsTopics = RelayShards.init(clusterId, shardIds) ## Then - assert res.isErr(), $res.get() + assert shardsTopics.isErr(), $shardsTopics.get() - test "new relay shards field with single valid index": + test "new relay shards object with single valid shard id": ## Given let - shardCluster: uint16 = 22 - shardIndex: uint16 = 1 + clusterId: uint16 = 22 + shardId: uint16 = 1 - let topic = NsPubsubTopic.staticSharding(shardCluster, shardIndex) + let topic = NsPubsubTopic.staticSharding(clusterId, shardId) ## When - let shards = RelayShards.init(shardCluster, shardIndex).expect("Valid Shards") + let shardsTopics = RelayShards.init(clusterId, shardId).expect("Valid Shards") ## Then check: - shards.cluster == shardCluster - shards.indices == @[1u16] + shardsTopics.clusterId == clusterId + shardsTopics.shardIds == @[1u16] - let topics = shards.topics.mapIt($it) + let topics = shardsTopics.topics.mapIt($it) check: topics == @[$topic] check: - shards.contains(shardCluster, shardIndex) - not shards.contains(shardCluster, 33u16) - not shards.contains(20u16, 33u16) + shardsTopics.contains(clusterId, shardId) + not shardsTopics.contains(clusterId, 33u16) + not shardsTopics.contains(20u16, 33u16) - shards.contains(topic) - shards.contains("/waku/2/rs/22/1") + shardsTopics.contains(topic) + shardsTopics.contains("/waku/2/rs/22/1") - test "new relay shards field with repeated but valid indices": + test "new relay shards object with repeated but valid shard ids": ## Given let - shardCluster: uint16 = 22 - shardIndices: seq[uint16] = @[1u16, 2u16, 2u16, 3u16, 3u16, 3u16] + clusterId: uint16 = 22 + shardIds: seq[uint16] = @[1u16, 2u16, 2u16, 3u16, 3u16, 3u16] ## When - let shards = RelayShards.init(shardCluster, shardIndices).expect("Valid Shards") + let shardsTopics = RelayShards.init(clusterId, shardIds).expect("Valid Shards") ## Then check: - shards.cluster == shardCluster - shards.indices == @[1u16, 2u16, 3u16] + shardsTopics.clusterId == clusterId + shardsTopics.shardIds == @[1u16, 2u16, 3u16] test "cannot decode relay shards from record if not present": ## Given @@ -338,21 +338,21 @@ suite "Waku ENR - Relay static sharding": ## Then check fieldOpt.isNone() - test "encode and decode record with relay shards field (EnrBuilder ext - indices list)": + test "encode and decode record with relay shards field (EnrBuilder ext - shardIds list)": ## Given let enrSeqNum = 1u64 enrPrivKey = generatesecp256k1key() let - shardCluster: uint16 = 22 - shardIndices: seq[uint16] = @[1u16, 1u16, 2u16, 3u16, 5u16, 8u16] + clusterId: uint16 = 22 + shardIds: seq[uint16] = @[1u16, 1u16, 2u16, 3u16, 5u16, 8u16] - let shards = RelayShards.init(shardCluster, shardIndices).expect("Valid Shards") + let shardsTopics = RelayShards.init(clusterId, shardIds).expect("Valid Shards") ## When var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum) - require builder.withWakuRelaySharding(shards).isOk() + require builder.withWakuRelaySharding(shardsTopics).isOk() let recordRes = builder.build() @@ -366,7 +366,7 @@ suite "Waku ENR - Relay static sharding": let shardsOpt = typedRecord.value.relaySharding check: shardsOpt.isSome() - shardsOpt.get() == shards + shardsOpt.get() == shardsTopics test "encode and decode record with relay shards field (EnrBuilder ext - bit vector)": ## Given @@ -374,10 +374,10 @@ suite "Waku ENR - Relay static sharding": enrSeqNum = 1u64 enrPrivKey = generatesecp256k1key() - let shards = RelayShards.init(33, toSeq(0u16 ..< 64u16)).expect("Valid Shards") + let shardsTopics = RelayShards.init(33, toSeq(0u16 ..< 64u16)).expect("Valid Shards") var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum) - require builder.withWakuRelaySharding(shards).isOk() + require builder.withWakuRelaySharding(shardsTopics).isOk() let recordRes = builder.build() require recordRes.isOk() @@ -393,22 +393,22 @@ suite "Waku ENR - Relay static sharding": ## Then check: shardsOpt.isSome() - shardsOpt.get() == shards + shardsOpt.get() == shardsTopics - test "decode record with relay shards indices list and bit vector fields": + test "decode record with relay shards shard list and bit vector fields": ## Given let enrSeqNum = 1u64 enrPrivKey = generatesecp256k1key() let - shardsIndicesList = RelayShards.init(22, @[1u16, 1u16, 2u16, 3u16, 5u16, 8u16]).expect("Valid Shards") - shardsBitVector = RelayShards.init(33, @[13u16, 24u16, 37u16, 61u16, 98u16, 159u16]).expect("Valid Shards") + relayShardsIndicesList = RelayShards.init(22, @[1u16, 1u16, 2u16, 3u16, 5u16, 8u16]).expect("Valid Shards") + relayShardsBitVector = RelayShards.init(33, @[13u16, 24u16, 37u16, 61u16, 98u16, 159u16]).expect("Valid Shards") var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum) - require builder.withWakuRelayShardingIndicesList(shardsIndicesList).isOk() - require builder.withWakuRelayShardingBitVector(shardsBitVector).isOk() + require builder.withWakuRelayShardingIndicesList(relayShardsIndicesList).isOk() + require builder.withWakuRelayShardingBitVector(relayShardsBitVector).isOk() let recordRes = builder.build() require recordRes.isOk() @@ -424,4 +424,4 @@ suite "Waku ENR - Relay static sharding": ## Then check: shardsOpt.isSome() - shardsOpt.get() == shardsIndicesList + shardsOpt.get() == relayShardsIndicesList diff --git a/tests/waku_core/test_namespaced_topics.nim b/tests/waku_core/test_namespaced_topics.nim index cf4ac3475..91f9b0250 100644 --- a/tests/waku_core/test_namespaced_topics.nim +++ b/tests/waku_core/test_namespaced_topics.nim @@ -151,7 +151,7 @@ suite "Waku Message - Pub-sub topics namespacing": test "Stringify static sharding pub-sub topic": ## Given - var ns = NsPubsubTopic.staticSharding(cluster=0, shard=2) + var ns = NsPubsubTopic.staticSharding(clusterId=0, shardId=2) ## When let topic = $ns @@ -186,8 +186,8 @@ suite "Waku Message - Pub-sub topics namespacing": let ns = nsRes.get() check: - ns.cluster == 16 - ns.shard == 42 + ns.clusterId == 16 + ns.shardId == 42 test "Parse pub-sub topic string - Invalid string: invalid protocol version": ## Given @@ -202,7 +202,7 @@ suite "Waku Message - Pub-sub topics namespacing": check: err.kind == ParsingErrorKind.InvalidFormat - test "Parse static sharding pub-sub topic string - Invalid string: empty shard value": + test "Parse static sharding pub-sub topic string - Invalid string: empty cluster id value": ## Given let topic = "/waku/2/rs//02" @@ -214,9 +214,9 @@ suite "Waku Message - Pub-sub topics namespacing": let err = ns.tryError() check: err.kind == ParsingErrorKind.MissingPart - err.part == "shard_cluster_index" + err.part == "cluster_id" - test "Parse static sharding pub-sub topic string - Invalid string: cluster value": + test "Parse static sharding pub-sub topic string - Invalid string: cluster id value": ## Given let topic = "/waku/2/rs/xx/77" diff --git a/tests/waku_core/test_sharding.nim b/tests/waku_core/test_sharding.nim index e67cb89d4..ea385f5db 100644 --- a/tests/waku_core/test_sharding.nim +++ b/tests/waku_core/test_sharding.nim @@ -109,7 +109,7 @@ suite "Waku Sharding": ## Then check: - pubsub == NsPubsubTopic.staticSharding(ClusterIndex, 3) + pubsub == NsPubsubTopic.staticSharding(ClusterId, 3) test "Shard Choice Simulation": ## Given diff --git a/waku/waku_core/topics/pubsub_topic.nim b/waku/waku_core/topics/pubsub_topic.nim index bdfb5e319..cd146ad7e 100644 --- a/waku/waku_core/topics/pubsub_topic.nim +++ b/waku/waku_core/topics/pubsub_topic.nim @@ -35,16 +35,16 @@ type NsPubsubTopic* = object case kind*: NsPubsubTopicKind of NsPubsubTopicKind.StaticSharding: - cluster*: uint16 - shard*: uint16 + clusterId*: uint16 + shardId*: uint16 of NsPubsubTopicKind.NamedSharding: name*: string -proc staticSharding*(T: type NsPubsubTopic, cluster, shard: uint16): T = +proc staticSharding*(T: type NsPubsubTopic, clusterId, shardId: uint16): T = NsPubsubTopic( kind: NsPubsubTopicKind.StaticSharding, - cluster: cluster, - shard: shard + clusterId: clusterId, + shardId: shardId ) proc named*(T: type NsPubsubTopic, name: string): T = @@ -63,7 +63,7 @@ proc `$`*(topic: NsPubsubTopic): string = of NsPubsubTopicKind.NamedSharding: "/waku/2/" & topic.name of NsPubsubTopicKind.StaticSharding: - "/waku/2/rs/" & $topic.cluster & "/" & $topic.shard + "/waku/2/rs/" & $topic.clusterId & "/" & $topic.shardId # Deserialization @@ -83,15 +83,15 @@ proc parseStaticSharding*(T: type NsPubsubTopic, topic: PubsubTopic|string): Par let clusterPart = parts[0] if clusterPart.len == 0: - return err(ParsingError.missingPart("shard_cluster_index")) - let cluster = ?Base10.decode(uint16, clusterPart).mapErr(proc(err: auto): auto = ParsingError.invalidFormat($err)) + return err(ParsingError.missingPart("cluster_id")) + let clusterId = ?Base10.decode(uint16, clusterPart).mapErr(proc(err: auto): auto = ParsingError.invalidFormat($err)) let shardPart = parts[1] if shardPart.len == 0: return err(ParsingError.missingPart("shard_number")) - let shard = ?Base10.decode(uint16, shardPart).mapErr(proc(err: auto): auto = ParsingError.invalidFormat($err)) + let shardId = ?Base10.decode(uint16, shardPart).mapErr(proc(err: auto): auto = ParsingError.invalidFormat($err)) - ok(NsPubsubTopic.staticSharding(cluster, shard)) + ok(NsPubsubTopic.staticSharding(clusterId, shardId)) proc parseNamedSharding*(T: type NsPubsubTopic, topic: PubsubTopic|string): ParsingResult[NsPubsubTopic] = if not topic.startsWith(Waku2PubsubTopicPrefix): @@ -123,10 +123,10 @@ proc `==`*[T: NsPubsubTopic](x, y: T): bool = if x.kind != NsPubsubTopicKind.StaticSharding: return false - if x.cluster != y.cluster: + if x.clusterId != y.clusterId: return false - if x.shard != y.shard: + if x.shardId != y.shardId: return false of NsPubsubTopicKind.NamedSharding: if x.kind != NsPubsubTopicKind.NamedSharding: diff --git a/waku/waku_core/topics/sharding.nim b/waku/waku_core/topics/sharding.nim index f616f54b7..ed4eef560 100644 --- a/waku/waku_core/topics/sharding.nim +++ b/waku/waku_core/topics/sharding.nim @@ -19,8 +19,8 @@ import ./content_topic, ./pubsub_topic -## For indices allocation and other magic numbers refer to RFC 51 -const ClusterIndex* = 1 +## For indices allocation and other magic numbers refer to RFC 64 +const ClusterId* = 1 const GenerationZeroShardsCount* = 8 proc getGenZeroShard*(topic: NsContentTopic, count: int): NsPubsubTopic = @@ -34,7 +34,7 @@ proc getGenZeroShard*(topic: NsContentTopic, count: int): NsPubsubTopic = # This is equilavent to modulo shard count but faster let shard = hashValue and uint64((count - 1)) - NsPubsubTopic.staticSharding(ClusterIndex, uint16(shard)) + NsPubsubTopic.staticSharding(ClusterId, uint16(shard)) proc getShard*(topic: NsContentTopic): Result[NsPubsubTopic, string] = ## Compute the (pubsub topic) shard to use for this content topic. @@ -128,9 +128,9 @@ proc parseSharding*(pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopi var list = newSeq[(NsPubsubTopic, float64)](shardCount) for (shard, weight) in shardsNWeights: - let pubsub = NsPubsubTopic.staticSharding(ClusterIndex, uint16(shard)) + let pubsub = NsPubsubTopic.staticSharding(ClusterId, uint16(shard)) - let clusterBytes = toBytesBE(uint16(ClusterIndex)) + let clusterBytes = toBytesBE(uint16(ClusterId)) let shardBytes = toBytesBE(uint16(shard)) let bytes = toBytes(topic.application) & toBytes(topic.version) & @clusterBytes & @shardBytes let hash = sha256.digest(bytes) diff --git a/waku/waku_discv5.nim b/waku/waku_discv5.nim index c02371dbb..6a9fe094f 100644 --- a/waku/waku_discv5.nim +++ b/waku/waku_discv5.nim @@ -70,7 +70,7 @@ proc shardingPredicate*(record: Record): Option[WakuDiscv5Predicate] = debug "peer filtering updated" let predicate = proc(record: waku_enr.Record): bool = - nodeShard.indices.anyIt(record.containsShard(nodeShard.cluster, it)) + nodeShard.shardIds.anyIt(record.containsShard(nodeShard.clusterId, it)) return some(predicate) @@ -150,7 +150,7 @@ proc new*(T: type WakuDiscoveryV5, if res.isErr(): debug "building ENR with relay sharding failed", reason = res.error else: - debug "building ENR with relay sharding information", cluster = $relayShard.get().cluster(), shards = $relayShard.get().indices() + debug "building ENR with relay sharding information", clusterId = $relayShard.get().clusterId(), shards = $relayShard.get().shardIds() builder.build().expect("Record within size limits") @@ -190,18 +190,18 @@ proc updateENRShards(wd: WakuDiscoveryV5, if add and currentShardsOp.isSome(): let currentShard = currentShardsOp.get() - if currentShard.cluster != newShard.cluster: - return err("ENR are limited to one shard cluster") + if currentShard.clusterId != newShard.clusterId: + return err("ENR are limited to one clusterId id") - ?RelayShards.init(currentShard.cluster, currentShard.indices & newShard.indices) + ?RelayShards.init(currentShard.clusterId, currentShard.shardIds & newShard.shardIds) elif not add and currentShardsOp.isSome(): let currentShard = currentShardsOp.get() - if currentShard.cluster != newShard.cluster: - return err("ENR are limited to one shard cluster") + if currentShard.clusterId != newShard.clusterId: + return err("ENR are limited to one clusterId id") - let currentSet = toHashSet(currentShard.indices) - let newSet = toHashSet(newShard.indices) + let currentSet = toHashSet(currentShard.shardIds) + let newSet = toHashSet(newShard.shardIds) let indices = toSeq(currentSet - newSet) @@ -215,12 +215,12 @@ proc updateENRShards(wd: WakuDiscoveryV5, return ok() - ?RelayShards.init(currentShard.cluster, indices) + ?RelayShards.init(currentShard.clusterId, indices) elif add and currentShardsOp.isNone(): newShard else: return ok() let (field, value) = - if resultShard.indices.len >= ShardingIndicesListMaxLength: + if resultShard.shardIds.len >= ShardingIndicesListMaxLength: (ShardingBitVectorEnrField, resultShard.toBitVector()) else: let listRes = resultShard.toIndicesList() diff --git a/waku/waku_enr/sharding.nim b/waku/waku_enr/sharding.nim index 17954803c..7b5c9f6a8 100644 --- a/waku/waku_enr/sharding.nim +++ b/waku/waku_enr/sharding.nim @@ -18,7 +18,6 @@ import logScope: topics = "waku enr sharding" - const MaxShardIndex: uint16 = 1023 const @@ -26,48 +25,45 @@ const ShardingIndicesListMaxLength* = 64 ShardingBitVectorEnrField* = "rsv" - type RelayShards* = object - cluster: uint16 - indices: seq[uint16] + clusterId: uint16 + shardIds: seq[uint16] +func clusterId*(rs: RelayShards): uint16 = + rs.clusterId -func cluster*(rs: RelayShards): uint16 = - rs.cluster - -func indices*(rs: RelayShards): seq[uint16] = - rs.indices +func shardIds*(rs: RelayShards): seq[uint16] = + rs.shardIds func topics*(rs: RelayShards): seq[NsPubsubTopic] = - rs.indices.mapIt(NsPubsubTopic.staticSharding(rs.cluster, it)) + rs.shardIds.mapIt(NsPubsubTopic.staticSharding(rs.clusterId, it)) +func init*(T: type RelayShards, clusterId, shardId: uint16): Result[T, string] = + if shardId > MaxShardIndex: + return err("invalid shard Id") -func init*(T: type RelayShards, cluster, index: uint16): Result[T, string] = - if index > MaxShardIndex: - return err("invalid index") + ok(RelayShards(clusterId: clusterId, shardIds: @[shardId])) - ok(RelayShards(cluster: cluster, indices: @[index])) +func init*(T: type RelayShards, clusterId: uint16, shardIds: varargs[uint16]): Result[T, string] = + if toSeq(shardIds).anyIt(it > MaxShardIndex): + return err("invalid shard") -func init*(T: type RelayShards, cluster: uint16, indices: varargs[uint16]): Result[T, string] = - if toSeq(indices).anyIt(it > MaxShardIndex): - return err("invalid index") + let indicesSeq = deduplicate(@shardIds) + if shardIds.len < 1: + return err("invalid shard count") - let indicesSeq = deduplicate(@indices) - if indices.len < 1: - return err("invalid index count") + ok(RelayShards(clusterId: clusterId, shardIds: indicesSeq)) - ok(RelayShards(cluster: cluster, indices: indicesSeq)) +func init*(T: type RelayShards, clusterId: uint16, shardIds: seq[uint16]): Result[T, string] = + if shardIds.anyIt(it > MaxShardIndex): + return err("invalid shard") -func init*(T: type RelayShards, cluster: uint16, indices: seq[uint16]): Result[T, string] = - if indices.anyIt(it > MaxShardIndex): - return err("invalid index") + let indicesSeq = deduplicate(shardIds) + if shardIds.len < 1: + return err("invalid shard count") - let indicesSeq = deduplicate(indices) - if indices.len < 1: - return err("invalid index count") - - ok(RelayShards(cluster: cluster, indices: indicesSeq)) + ok(RelayShards(clusterId: clusterId, shardIds: indicesSeq)) func topicsToRelayShards*(topics: seq[string]): Result[Option[RelayShards], string] = if topics.len < 1: @@ -83,23 +79,23 @@ func topicsToRelayShards*(topics: seq[string]): Result[Option[RelayShards], stri return ok(none(RelayShards)) if parsedTopicsRes.anyIt(it.get().kind == NsPubsubTopicKind.NamedSharding): - return err("use named topics OR sharded ones not both.") + return err("use named (/waku/2/*) OR static (/waku/2/rs/*/*) shards not both.") - if parsedTopicsRes.anyIt(it.get().cluster != parsedTopicsRes[0].get().cluster): - return err("use sharded topics within the same cluster.") + if parsedTopicsRes.anyIt(it.get().clusterId != parsedTopicsRes[0].get().clusterId): + return err("use shards with the same cluster Id.") - let relayShard = ?RelayShards.init(parsedTopicsRes[0].get().cluster, parsedTopicsRes.mapIt(it.get().shard)) + let relayShard = ?RelayShards.init(parsedTopicsRes[0].get().clusterId, parsedTopicsRes.mapIt(it.get().shardId)) return ok(some(relayShard)) -func contains*(rs: RelayShards, cluster, index: uint16): bool = - rs.cluster == cluster and rs.indices.contains(index) +func contains*(rs: RelayShards, clusterId, shardId: uint16): bool = + rs.clusterId == clusterId and rs.shardIds.contains(shardId) func contains*(rs: RelayShards, topic: NsPubsubTopic): bool = if topic.kind != NsPubsubTopicKind.StaticSharding: return false - rs.contains(topic.cluster, topic.shard) + rs.contains(topic.clusterId, topic.shardId) func contains*(rs: RelayShards, topic: PubsubTopic|string): bool = let parseRes = NsPubsubTopic.parse(topic) @@ -108,19 +104,18 @@ func contains*(rs: RelayShards, topic: PubsubTopic|string): bool = rs.contains(parseRes.value) - # ENR builder extension func toIndicesList*(rs: RelayShards): EnrResult[seq[byte]] = - if rs.indices.len > high(uint8).int: - return err("indices list too long") + if rs.shardIds.len > high(uint8).int: + return err("shards list too long") var res: seq[byte] - res.add(rs.cluster.toBytesBE()) + res.add(rs.clusterId.toBytesBE()) - res.add(rs.indices.len.uint8) - for index in rs.indices: - res.add(index.toBytesBE()) + res.add(rs.shardIds.len.uint8) + for shardId in rs.shardIds: + res.add(shardId.toBytesBE()) ok(res) @@ -128,30 +123,30 @@ func fromIndicesList(buf: seq[byte]): Result[RelayShards, string] = if buf.len < 3: return err("insufficient data: expected at least 3 bytes, got " & $buf.len & " bytes") - let cluster = uint16.fromBytesBE(buf[0..1]) + let clusterId = uint16.fromBytesBE(buf[0..1]) let length = int(buf[2]) if buf.len != 3 + 2 * length: return err("invalid data: `length` field is " & $length & " but " & $buf.len & " bytes were provided") - var indices: seq[uint16] + var shardIds: seq[uint16] for i in 0..= ShardingIndicesListMaxLength: + if rs.shardIds.len >= ShardingIndicesListMaxLength: builder.withWakuRelayShardingBitVector(rs) else: builder.withWakuRelayShardingIndicesList(rs) @@ -193,79 +187,65 @@ func withWakuRelaySharding*(builder: var EnrBuilder, rs: RelayShards): EnrResult func withShardedTopics*(builder: var EnrBuilder, topics: seq[string]): Result[void, string] = - let relayShardsRes = topicsToRelayShards(topics) - let relayShardOp = - if relayShardsRes.isErr(): - return err("building ENR with relay sharding failed: " & - $relayShardsRes.error) - else: relayShardsRes.get() + let relayShardOp = topicsToRelayShards(topics).valueOr: + return err("building ENR with relay sharding failed: " & $error) - if relayShardOp.isNone(): + let relayShard = relayShardOp.valueOr: return ok() - let res = builder.withWakuRelaySharding(relayShardOp.get()) - - if res.isErr(): - return err($res.error) + builder.withWakuRelaySharding(relayShard).isOkOr: + return err($error) return ok() # ENR record accessors (e.g., Record, TypedRecord, etc.) proc relayShardingIndicesList*(record: TypedRecord): Option[RelayShards] = - let field = record.tryGet(ShardingIndicesListEnrField, seq[byte]) - if field.isNone(): + let field = record.tryGet(ShardingIndicesListEnrField, seq[byte]).valueOr: return none(RelayShards) - let indexList = fromIndicesList(field.get()) - if indexList.isErr(): - debug "invalid sharding indices list", error = indexList.error + let indexList = fromIndicesList(field).valueOr: + debug "invalid shards list", error = error return none(RelayShards) - some(indexList.value) + some(indexList) proc relayShardingBitVector*(record: TypedRecord): Option[RelayShards] = - let field = record.tryGet(ShardingBitVectorEnrField, seq[byte]) - if field.isNone(): + let field = record.tryGet(ShardingBitVectorEnrField, seq[byte]).valueOr: return none(RelayShards) - let bitVector = fromBitVector(field.get()) - if bitVector.isErr(): - debug "invalid sharding bit vector", error = bitVector.error + let bitVector = fromBitVector(field).valueOr: + debug "invalid shards bit vector", error = error return none(RelayShards) - some(bitVector.value) + some(bitVector) proc relaySharding*(record: TypedRecord): Option[RelayShards] = - let indexList = record.relayShardingIndicesList() - if indexList.isSome(): - return indexList - - record.relayShardingBitVector() + let indexList = record.relayShardingIndicesList().valueOr: + return record.relayShardingBitVector() + return some(indexList) ## Utils -proc containsShard*(r: Record, cluster, index: uint16): bool = - if index > MaxShardIndex: +proc containsShard*(r: Record, clusterId, shardId: uint16): bool = + if shardId > MaxShardIndex: return false - let recordRes = r.toTyped() - if recordRes.isErr(): - debug "invalid ENR record", error = recordRes.error + let record = r.toTyped().valueOr: + debug "invalid ENR record", error = error return false - let rs = recordRes.value.relaySharding() - if rs.isNone(): + let rs = record.relaySharding().valueOr: return false - rs.get().contains(cluster, index) + rs.contains(clusterId, shardId) proc containsShard*(r: Record, topic: NsPubsubTopic): bool = if topic.kind != NsPubsubTopicKind.StaticSharding: return false - containsShard(r, topic.cluster, topic.shard) + containsShard(r, topic.clusterId, topic.shardId) proc containsShard*(r: Record, topic: PubsubTopic|string): bool = let parseRes = NsPubsubTopic.parse(topic)