unifying clusterId to be uint16 (#2777)

This commit is contained in:
gabrielmer 2024-06-05 15:32:35 +02:00 committed by GitHub
parent 16709d1dd9
commit 21a4b5bd4a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 25 additions and 45 deletions

View File

@ -91,7 +91,7 @@ type LiteProtocolTesterConf* = object
# desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.",
# defaultValue: @[],
# name: "shard"
# .}: seq[ShardIdx]
# .}: seq[uint16]
contentTopics* {.
desc: "Default content topic to subscribe to. Argument may be repeated.",
defaultValue: @[LiteContentTopic],

View File

@ -24,7 +24,7 @@ import
procSuite "Waku Metadata Protocol":
asyncTest "request() returns the supported metadata of the peer":
let clusterId = 10.uint32
let clusterId = 10.uint16
let
node1 = newTestWakuNode(
generateSecp256k1Key(),

View File

@ -35,7 +35,7 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf =
nat: "any",
maxConnections: 50,
maxMessageSize: "1024 KiB",
clusterId: 0.uint32,
clusterId: 0,
pubsubTopics: @["/waku/2/rs/1/0"],
relay: true,
storeMessageDbUrl: "sqlite://store.sqlite3",

View File

@ -72,7 +72,12 @@ proc readValue*[T](r: var EnvvarReader, value: var T) {.raises: [SerializationEr
elif T is (seq or array):
when uTypeIsPrimitives(T):
let key = constructKey(r.prefix, r.key)
getValue(key, value)
try:
getValue(key, value)
except ValueError:
raise newException(
SerializationError, "Couldn't get value: " & getCurrentExceptionMsg()
)
else:
let key = r.key[^1]
for i in 0 ..< value.len:

View File

@ -42,7 +42,9 @@ proc getValue*(key: string, outVal: var string) {.raises: [ValueError].} =
outVal.setLen(size)
decodePaddedHex(hex, cast[ptr UncheckedArray[byte]](outVal[0].addr), size)
proc getValue*[T: SomePrimitives](key: string, outVal: var seq[T]) =
proc getValue*[T: SomePrimitives](
key: string, outVal: var seq[T]
) {.raises: [ValueError].} =
let hex = os.getEnv(key)
let byteSize = (hex.len div 2) + (hex.len and 0x01)
let size = (byteSize + sizeof(T) - 1) div sizeof(T)

View File

@ -30,8 +30,6 @@ type ProtectedTopic* = object
topic*: string
key*: secp256k1.SkPublicKey
type ShardIdx = distinct uint16
type EthRpcUrl* = distinct string
type StartUpCommand* = enum
@ -140,7 +138,7 @@ type WakuNodeConf* = object
"Cluster id that the node is running in. Node in a different cluster id is disconnected.",
defaultValue: 0,
name: "cluster-id"
.}: uint32
.}: uint16
agentString* {.
defaultValue: "nwaku",
@ -307,7 +305,7 @@ type WakuNodeConf* = object
desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.",
defaultValue: @[],
name: "shard"
.}: seq[ShardIdx]
.}: seq[uint16]
contentTopics* {.
desc: "Default content topic to subscribe to. Argument may be repeated.",
@ -663,15 +661,6 @@ proc defaultColocationLimit*(): int =
proc completeCmdArg*(T: type Port, val: string): seq[string] =
return @[]
proc completeCmdArg*(T: type ShardIdx, val: string): seq[ShardIdx] =
return @[]
proc parseCmdArg*(T: type ShardIdx, p: string): T =
try:
ShardIdx(parseInt(p))
except CatchableError:
raise newException(ValueError, "Invalid shard index")
proc completeCmdArg*(T: type EthRpcUrl, val: string): seq[string] =
return @[]
@ -732,22 +721,6 @@ proc readValue*(
except CatchableError:
raise newException(SerializationError, getCurrentExceptionMsg())
proc readValue*(
r: var TomlReader, value: var ShardIdx
) {.raises: [SerializationError].} =
try:
value = parseCmdArg(ShardIdx, r.readValue(string))
except CatchableError:
raise newException(SerializationError, getCurrentExceptionMsg())
proc readValue*(
r: var EnvvarReader, value: var ShardIdx
) {.raises: [SerializationError].} =
try:
value = parseCmdArg(ShardIdx, r.readValue(string))
except CatchableError:
raise newException(SerializationError, getCurrentExceptionMsg())
proc readValue*(
r: var TomlReader, value: var EthRpcUrl
) {.raises: [SerializationError].} =

View File

@ -45,7 +45,7 @@ proc enrConfiguration*(
shards = toSeq(conf.shards.mapIt(uint16(it)))
enrBuilder.withWakuRelaySharding(
RelayShards(clusterId: uint16(conf.clusterId), shardIds: shards)
RelayShards(clusterId: conf.clusterId, shardIds: shards)
).isOkOr:
return err("could not initialize ENR with shards")

View File

@ -5,7 +5,7 @@ else:
type ClusterConf* = object
maxMessageSize*: string
clusterId*: uint32
clusterId*: uint16
rlnRelay*: bool
rlnRelayEthContractAddress*: string
rlnRelayDynamic*: bool
@ -21,7 +21,7 @@ type ClusterConf* = object
# overrides existing cli configuration
proc ClusterZeroConf*(T: type ClusterConf): ClusterConf =
return ClusterConf(
clusterId: 0.uint32,
clusterId: 0,
pubsubTopics:
@["/waku/2/default-waku/proto"] # TODO: Add more config such as bootstrap, etc
,
@ -33,7 +33,7 @@ proc ClusterZeroConf*(T: type ClusterConf): ClusterConf =
proc TheWakuNetworkConf*(T: type ClusterConf): ClusterConf =
return ClusterConf(
maxMessageSize: "150KiB",
clusterId: 1.uint32,
clusterId: 1,
rlnRelay: true,
rlnRelayEthContractAddress: "0xF471d71E9b1455bBF4b85d475afb9BB0954A29c4",
rlnRelayDynamic: true,

View File

@ -12,7 +12,7 @@ import ../waku_enr
type NetConfig* = object
hostAddress*: MultiAddress
clusterId*: uint32
clusterId*: uint16
wsHostAddress*: Option[MultiAddress]
hostExtAddress*: Option[MultiAddress]
wsExtAddress*: Option[MultiAddress]
@ -78,7 +78,7 @@ proc init*(
wssEnabled: bool = false,
dns4DomainName = none(string),
discv5UdpPort = none(Port),
clusterId: uint32 = 0,
clusterId: uint16 = 0,
wakuFlags = none(CapabilitiesBitfield),
): NetConfigResult =
## Initialize and validate waku node network configuration

View File

@ -212,7 +212,7 @@ proc mountMetadata*(node: WakuNode, clusterId: uint32): Result[void, string] =
## Waku Sharding
proc mountSharding*(
node: WakuNode, clusterId: uint32, shardCount: uint32
node: WakuNode, clusterId: uint16, shardCount: uint32
): Result[void, string] =
info "mounting sharding", clusterId = clusterId, shardCount = shardCount
node.wakuSharding = Sharding(clusterId: clusterId, shardCountGenZero: shardCount)

View File

@ -12,11 +12,11 @@ import nimcrypto, std/options, std/tables, stew/endians2, stew/results, stew/byt
import ./content_topic, ./pubsub_topic
type Sharding* = object
clusterId*: uint32
clusterId*: uint16
# TODO: generations could be stored in a table here
shardCountGenZero*: uint32
proc new*(T: type Sharding, clusterId: uint32, shardCount: uint32): T =
proc new*(T: type Sharding, clusterId: uint16, shardCount: uint32): T =
return Sharding(clusterId: clusterId, shardCountGenZero: shardCount)
proc getGenZeroShard*(s: Sharding, topic: NsContentTopic, count: int): NsPubsubTopic =
@ -30,7 +30,7 @@ proc getGenZeroShard*(s: Sharding, topic: NsContentTopic, count: int): NsPubsubT
# This is equilavent to modulo shard count but faster
let shard = hashValue and uint64((count - 1))
NsPubsubTopic.staticSharding(uint16(s.clusterId), uint16(shard))
NsPubsubTopic.staticSharding(s.clusterId, uint16(shard))
proc getShard*(s: Sharding, topic: NsContentTopic): Result[NsPubsubTopic, string] =
## Compute the (pubsub topic) shard to use for this content topic.

View File

@ -260,7 +260,7 @@ proc containsShard*(r: Record, topic: PubsubTopic | string): bool =
containsShard(r, parseRes.value)
proc isClusterMismatched*(record: Record, clusterId: uint32): bool =
proc isClusterMismatched*(record: Record, clusterId: uint16): bool =
## Check the ENR sharding info for matching cluster id
if (let typedRecord = record.toTyped(); typedRecord.isOk()):
if (let relayShard = typedRecord.get().relaySharding(); relayShard.isSome()):