mirror of https://github.com/waku-org/nwaku.git
Merge branch 'master' into rlnv2-only
This commit is contained in:
commit
cfa6464166
|
@ -44,4 +44,4 @@ Update `nwaku` "vendor" dependencies.
|
|||
- [ ] nim-web3
|
||||
- [ ] nim-websock
|
||||
- [ ] nim-zlib
|
||||
- [ ] zerokit ( this should be kept in version `v0.3.4` )
|
||||
- [ ] zerokit ( this should be kept in version `v0.5.1` )
|
||||
|
|
|
@ -91,7 +91,7 @@ type LiteProtocolTesterConf* = object
|
|||
# desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.",
|
||||
# defaultValue: @[],
|
||||
# name: "shard"
|
||||
# .}: seq[ShardIdx]
|
||||
# .}: seq[uint16]
|
||||
contentTopics* {.
|
||||
desc: "Default content topic to subscribe to. Argument may be repeated.",
|
||||
defaultValue: @[LiteContentTopic],
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
const ContentScriptVersion_5* =
|
||||
"""
|
||||
CREATE INDEX IF NOT EXISTS i_query_storedAt ON messages (storedAt, id);
|
||||
|
||||
UPDATE version SET version = 5 WHERE version = 4;
|
||||
"""
|
|
@ -1,6 +1,6 @@
|
|||
import
|
||||
content_script_version_1, content_script_version_2, content_script_version_3,
|
||||
content_script_version_4
|
||||
content_script_version_4, content_script_version_5
|
||||
|
||||
type MigrationScript* = object
|
||||
version*: int
|
||||
|
@ -15,6 +15,7 @@ const PgMigrationScripts* =
|
|||
MigrationScript(version: 2, scriptContent: ContentScriptVersion_2),
|
||||
MigrationScript(version: 3, scriptContent: ContentScriptVersion_3),
|
||||
MigrationScript(version: 4, scriptContent: ContentScriptVersion_4),
|
||||
MigrationScript(version: 5, scriptContent: ContentScriptVersion_5),
|
||||
]
|
||||
|
||||
proc getMigrationScripts*(currentVersion: int64, targetVersion: int64): seq[string] =
|
||||
|
|
|
@ -17,7 +17,8 @@ import
|
|||
./waku_archive/test_driver_sqlite_query,
|
||||
./waku_archive/test_driver_sqlite,
|
||||
./waku_archive/test_retention_policy,
|
||||
./waku_archive/test_waku_archive
|
||||
./waku_archive/test_waku_archive,
|
||||
./waku_archive/test_partition_manager
|
||||
|
||||
const os* {.strdefine.} = ""
|
||||
when os == "Linux" and
|
||||
|
|
|
@ -24,7 +24,7 @@ import
|
|||
|
||||
procSuite "Waku Metadata Protocol":
|
||||
asyncTest "request() returns the supported metadata of the peer":
|
||||
let clusterId = 10.uint32
|
||||
let clusterId = 10.uint16
|
||||
let
|
||||
node1 = newTestWakuNode(
|
||||
generateSecp256k1Key(),
|
||||
|
|
|
@ -35,7 +35,7 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf =
|
|||
nat: "any",
|
||||
maxConnections: 50,
|
||||
maxMessageSize: "1024 KiB",
|
||||
clusterId: 0.uint32,
|
||||
clusterId: 0,
|
||||
pubsubTopics: @["/waku/2/rs/1/0"],
|
||||
relay: true,
|
||||
storeMessageDbUrl: "sqlite://store.sqlite3",
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
{.used.}
|
||||
|
||||
import testutils/unittests, chronos
|
||||
import
|
||||
../../../waku/waku_archive/driver/postgres_driver/partitions_manager,
|
||||
../../../waku/waku_core/time
|
||||
|
||||
suite "Partition Manager":
|
||||
test "Calculate end partition time":
|
||||
# 1717372850 == Mon Jun 03 2024 00:00:50 GMT+0000
|
||||
# 1717376400 == Mon Jun 03 2024 01:00:00 GMT+0000
|
||||
check 1717376400 == partitions_manager.calcEndPartitionTime(Timestamp(1717372850))
|
||||
|
||||
# 1717372800 == Mon Jun 03 2024 00:00:00 GMT+0000
|
||||
# 1717376400 == Mon Jun 03 2024 01:00:00 GMT+0000
|
||||
check 1717376400 == partitions_manager.calcEndPartitionTime(Timestamp(1717372800))
|
|
@ -72,7 +72,12 @@ proc readValue*[T](r: var EnvvarReader, value: var T) {.raises: [SerializationEr
|
|||
elif T is (seq or array):
|
||||
when uTypeIsPrimitives(T):
|
||||
let key = constructKey(r.prefix, r.key)
|
||||
getValue(key, value)
|
||||
try:
|
||||
getValue(key, value)
|
||||
except ValueError:
|
||||
raise newException(
|
||||
SerializationError, "Couldn't get value: " & getCurrentExceptionMsg()
|
||||
)
|
||||
else:
|
||||
let key = r.key[^1]
|
||||
for i in 0 ..< value.len:
|
||||
|
|
|
@ -42,7 +42,9 @@ proc getValue*(key: string, outVal: var string) {.raises: [ValueError].} =
|
|||
outVal.setLen(size)
|
||||
decodePaddedHex(hex, cast[ptr UncheckedArray[byte]](outVal[0].addr), size)
|
||||
|
||||
proc getValue*[T: SomePrimitives](key: string, outVal: var seq[T]) =
|
||||
proc getValue*[T: SomePrimitives](
|
||||
key: string, outVal: var seq[T]
|
||||
) {.raises: [ValueError].} =
|
||||
let hex = os.getEnv(key)
|
||||
let byteSize = (hex.len div 2) + (hex.len and 0x01)
|
||||
let size = (byteSize + sizeof(T) - 1) div sizeof(T)
|
||||
|
|
|
@ -30,8 +30,6 @@ type ProtectedTopic* = object
|
|||
topic*: string
|
||||
key*: secp256k1.SkPublicKey
|
||||
|
||||
type ShardIdx = distinct uint16
|
||||
|
||||
type EthRpcUrl* = distinct string
|
||||
|
||||
type StartUpCommand* = enum
|
||||
|
@ -140,7 +138,7 @@ type WakuNodeConf* = object
|
|||
"Cluster id that the node is running in. Node in a different cluster id is disconnected.",
|
||||
defaultValue: 0,
|
||||
name: "cluster-id"
|
||||
.}: uint32
|
||||
.}: uint16
|
||||
|
||||
agentString* {.
|
||||
defaultValue: "nwaku",
|
||||
|
@ -307,7 +305,7 @@ type WakuNodeConf* = object
|
|||
desc: "Shards index to subscribe to [0..MAX_SHARDS-1]. Argument may be repeated.",
|
||||
defaultValue: @[],
|
||||
name: "shard"
|
||||
.}: seq[ShardIdx]
|
||||
.}: seq[uint16]
|
||||
|
||||
contentTopics* {.
|
||||
desc: "Default content topic to subscribe to. Argument may be repeated.",
|
||||
|
@ -663,15 +661,6 @@ proc defaultColocationLimit*(): int =
|
|||
proc completeCmdArg*(T: type Port, val: string): seq[string] =
|
||||
return @[]
|
||||
|
||||
proc completeCmdArg*(T: type ShardIdx, val: string): seq[ShardIdx] =
|
||||
return @[]
|
||||
|
||||
proc parseCmdArg*(T: type ShardIdx, p: string): T =
|
||||
try:
|
||||
ShardIdx(parseInt(p))
|
||||
except CatchableError:
|
||||
raise newException(ValueError, "Invalid shard index")
|
||||
|
||||
proc completeCmdArg*(T: type EthRpcUrl, val: string): seq[string] =
|
||||
return @[]
|
||||
|
||||
|
@ -732,22 +721,6 @@ proc readValue*(
|
|||
except CatchableError:
|
||||
raise newException(SerializationError, getCurrentExceptionMsg())
|
||||
|
||||
proc readValue*(
|
||||
r: var TomlReader, value: var ShardIdx
|
||||
) {.raises: [SerializationError].} =
|
||||
try:
|
||||
value = parseCmdArg(ShardIdx, r.readValue(string))
|
||||
except CatchableError:
|
||||
raise newException(SerializationError, getCurrentExceptionMsg())
|
||||
|
||||
proc readValue*(
|
||||
r: var EnvvarReader, value: var ShardIdx
|
||||
) {.raises: [SerializationError].} =
|
||||
try:
|
||||
value = parseCmdArg(ShardIdx, r.readValue(string))
|
||||
except CatchableError:
|
||||
raise newException(SerializationError, getCurrentExceptionMsg())
|
||||
|
||||
proc readValue*(
|
||||
r: var TomlReader, value: var EthRpcUrl
|
||||
) {.raises: [SerializationError].} =
|
||||
|
|
|
@ -45,7 +45,7 @@ proc enrConfiguration*(
|
|||
shards = toSeq(conf.shards.mapIt(uint16(it)))
|
||||
|
||||
enrBuilder.withWakuRelaySharding(
|
||||
RelayShards(clusterId: uint16(conf.clusterId), shardIds: shards)
|
||||
RelayShards(clusterId: conf.clusterId, shardIds: shards)
|
||||
).isOkOr:
|
||||
return err("could not initialize ENR with shards")
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@ else:
|
|||
|
||||
type ClusterConf* = object
|
||||
maxMessageSize*: string
|
||||
clusterId*: uint32
|
||||
clusterId*: uint16
|
||||
rlnRelay*: bool
|
||||
rlnRelayEthContractAddress*: string
|
||||
rlnRelayDynamic*: bool
|
||||
|
@ -21,7 +21,7 @@ type ClusterConf* = object
|
|||
# overrides existing cli configuration
|
||||
proc ClusterZeroConf*(T: type ClusterConf): ClusterConf =
|
||||
return ClusterConf(
|
||||
clusterId: 0.uint32,
|
||||
clusterId: 0,
|
||||
pubsubTopics:
|
||||
@["/waku/2/default-waku/proto"] # TODO: Add more config such as bootstrap, etc
|
||||
,
|
||||
|
@ -33,7 +33,7 @@ proc ClusterZeroConf*(T: type ClusterConf): ClusterConf =
|
|||
proc TheWakuNetworkConf*(T: type ClusterConf): ClusterConf =
|
||||
return ClusterConf(
|
||||
maxMessageSize: "150KiB",
|
||||
clusterId: 1.uint32,
|
||||
clusterId: 1,
|
||||
rlnRelay: true,
|
||||
rlnRelayEthContractAddress: "0xF471d71E9b1455bBF4b85d475afb9BB0954A29c4",
|
||||
rlnRelayDynamic: true,
|
||||
|
|
|
@ -12,7 +12,7 @@ import ../waku_enr
|
|||
|
||||
type NetConfig* = object
|
||||
hostAddress*: MultiAddress
|
||||
clusterId*: uint32
|
||||
clusterId*: uint16
|
||||
wsHostAddress*: Option[MultiAddress]
|
||||
hostExtAddress*: Option[MultiAddress]
|
||||
wsExtAddress*: Option[MultiAddress]
|
||||
|
@ -78,7 +78,7 @@ proc init*(
|
|||
wssEnabled: bool = false,
|
||||
dns4DomainName = none(string),
|
||||
discv5UdpPort = none(Port),
|
||||
clusterId: uint32 = 0,
|
||||
clusterId: uint16 = 0,
|
||||
wakuFlags = none(CapabilitiesBitfield),
|
||||
): NetConfigResult =
|
||||
## Initialize and validate waku node network configuration
|
||||
|
|
|
@ -212,7 +212,7 @@ proc mountMetadata*(node: WakuNode, clusterId: uint32): Result[void, string] =
|
|||
|
||||
## Waku Sharding
|
||||
proc mountSharding*(
|
||||
node: WakuNode, clusterId: uint32, shardCount: uint32
|
||||
node: WakuNode, clusterId: uint16, shardCount: uint32
|
||||
): Result[void, string] =
|
||||
info "mounting sharding", clusterId = clusterId, shardCount = shardCount
|
||||
node.wakuSharding = Sharding(clusterId: clusterId, shardCountGenZero: shardCount)
|
||||
|
|
|
@ -9,7 +9,7 @@ import
|
|||
logScope:
|
||||
topics = "waku archive migration"
|
||||
|
||||
const SchemaVersion* = 4 # increase this when there is an update in the database schema
|
||||
const SchemaVersion* = 5 # increase this when there is an update in the database schema
|
||||
|
||||
proc breakIntoStatements*(script: string): seq[string] =
|
||||
## Given a full migration script, that can potentially contain a list
|
||||
|
|
|
@ -4,8 +4,9 @@
|
|||
## The created partitions are referenced by the 'storedAt' field.
|
||||
##
|
||||
|
||||
import std/deques
|
||||
import std/[deques, times]
|
||||
import chronos, chronicles
|
||||
import ../../../waku_core/time
|
||||
|
||||
logScope:
|
||||
topics = "waku archive partitions_manager"
|
||||
|
@ -95,6 +96,36 @@ proc containsMoment*(partition: Partition, time: int64): bool =
|
|||
|
||||
return false
|
||||
|
||||
proc calcEndPartitionTime*(startTime: Timestamp): Timestamp =
|
||||
## Each partition has an "startTime" and "end" time. This proc calculates the "end" time so that
|
||||
## it precisely matches the next o'clock time.
|
||||
## This considers that the partitions should be 1 hour long.
|
||||
## For example, if `startTime` == 14:28 , then the returned end time should be 15:00.
|
||||
## Notice both `startTime` and returned time are in seconds since Epoch.
|
||||
##
|
||||
## startTime - seconds from Epoch that represents the partition start time
|
||||
|
||||
let startDateTime: DateTime = times.fromUnix(startTime).utc()
|
||||
|
||||
let maxPartitionDuration: times.Duration = times.initDuration(hours = 1)
|
||||
## Max time range covered by each parition
|
||||
## It is max because we aim to make the partition times synced to
|
||||
## o'clock hours. i.e. each partition edge will have min == sec == nanosec == 0
|
||||
|
||||
let endDateTime = startDateTime + maxPartitionDuration
|
||||
let endDateTimeOClock = times.dateTime(
|
||||
year = endDateTime.year,
|
||||
month = endDateTime.month,
|
||||
monthday = endDateTime.monthday,
|
||||
hour = endDateTime.hour,
|
||||
minute = 0,
|
||||
second = 0,
|
||||
nanosecond = 0,
|
||||
zone = utc(),
|
||||
)
|
||||
|
||||
return Timestamp(endDateTimeOClock.toTime().toUnix())
|
||||
|
||||
proc getName*(partition: Partition): string =
|
||||
return partition.name
|
||||
|
||||
|
|
|
@ -34,8 +34,11 @@ const InsertRowStmtDefinition = # TODO: get the sql queries from a file
|
|||
version, timestamp, meta) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, CASE WHEN $9 = '' THEN NULL ELSE $9 END) ON CONFLICT DO NOTHING;"""
|
||||
|
||||
const SelectNoCursorAscStmtName = "SelectWithoutCursorAsc"
|
||||
const SelectClause = """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages """
|
||||
const SelectNoCursorAscStmtDef = SelectClause & """
|
||||
const SelectClause =
|
||||
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages """
|
||||
const SelectNoCursorAscStmtDef =
|
||||
SelectClause &
|
||||
"""
|
||||
WHERE contentTopic IN ($1) AND
|
||||
messageHash IN ($2) AND
|
||||
pubsubTopic = $3 AND
|
||||
|
@ -44,7 +47,9 @@ const SelectNoCursorAscStmtDef = SelectClause & """
|
|||
ORDER BY storedAt ASC, messageHash ASC LIMIT $6;"""
|
||||
|
||||
const SelectNoCursorDescStmtName = "SelectWithoutCursorDesc"
|
||||
const SelectNoCursorDescStmtDef = SelectClause & """
|
||||
const SelectNoCursorDescStmtDef =
|
||||
SelectClause &
|
||||
"""
|
||||
WHERE contentTopic IN ($1) AND
|
||||
messageHash IN ($2) AND
|
||||
pubsubTopic = $3 AND
|
||||
|
@ -53,7 +58,9 @@ const SelectNoCursorDescStmtDef = SelectClause & """
|
|||
ORDER BY storedAt DESC, messageHash DESC LIMIT $6;"""
|
||||
|
||||
const SelectWithCursorDescStmtName = "SelectWithCursorDesc"
|
||||
const SelectWithCursorDescStmtDef = SelectClause & """
|
||||
const SelectWithCursorDescStmtDef =
|
||||
SelectClause &
|
||||
"""
|
||||
WHERE contentTopic IN ($1) AND
|
||||
messageHash IN ($2) AND
|
||||
pubsubTopic = $3 AND
|
||||
|
@ -63,7 +70,9 @@ const SelectWithCursorDescStmtDef = SelectClause & """
|
|||
ORDER BY storedAt DESC, messageHash DESC LIMIT $8;"""
|
||||
|
||||
const SelectWithCursorAscStmtName = "SelectWithCursorAsc"
|
||||
const SelectWithCursorAscStmtDef = SelectClause & """
|
||||
const SelectWithCursorAscStmtDef =
|
||||
SelectClause &
|
||||
"""
|
||||
WHERE contentTopic IN ($1) AND
|
||||
messageHash IN ($2) AND
|
||||
pubsubTopic = $3 AND
|
||||
|
@ -76,7 +85,9 @@ const SelectMessageByHashName = "SelectMessageByHash"
|
|||
const SelectMessageByHashDef = SelectClause & """WHERE messageHash = $1"""
|
||||
|
||||
const SelectNoCursorV2AscStmtName = "SelectWithoutCursorV2Asc"
|
||||
const SelectNoCursorV2AscStmtDef = SelectClause & """
|
||||
const SelectNoCursorV2AscStmtDef =
|
||||
SelectClause &
|
||||
"""
|
||||
WHERE contentTopic IN ($1) AND
|
||||
pubsubTopic = $2 AND
|
||||
storedAt >= $3 AND
|
||||
|
@ -84,7 +95,9 @@ const SelectNoCursorV2AscStmtDef = SelectClause & """
|
|||
ORDER BY storedAt ASC LIMIT $5;"""
|
||||
|
||||
const SelectNoCursorV2DescStmtName = "SelectWithoutCursorV2Desc"
|
||||
const SelectNoCursorV2DescStmtDef = SelectClause & """
|
||||
const SelectNoCursorV2DescStmtDef =
|
||||
SelectClause &
|
||||
"""
|
||||
WHERE contentTopic IN ($1) AND
|
||||
pubsubTopic = $2 AND
|
||||
storedAt >= $3 AND
|
||||
|
@ -92,7 +105,9 @@ const SelectNoCursorV2DescStmtDef = SelectClause & """
|
|||
ORDER BY storedAt DESC LIMIT $5;"""
|
||||
|
||||
const SelectWithCursorV2DescStmtName = "SelectWithCursorV2Desc"
|
||||
const SelectWithCursorV2DescStmtDef = SelectClause & """
|
||||
const SelectWithCursorV2DescStmtDef =
|
||||
SelectClause &
|
||||
"""
|
||||
WHERE contentTopic IN ($1) AND
|
||||
pubsubTopic = $2 AND
|
||||
(storedAt, id) < ($3,$4) AND
|
||||
|
@ -101,7 +116,9 @@ const SelectWithCursorV2DescStmtDef = SelectClause & """
|
|||
ORDER BY storedAt DESC LIMIT $7;"""
|
||||
|
||||
const SelectWithCursorV2AscStmtName = "SelectWithCursorV2Asc"
|
||||
const SelectWithCursorV2AscStmtDef = SelectClause & """
|
||||
const SelectWithCursorV2AscStmtDef =
|
||||
SelectClause &
|
||||
"""
|
||||
WHERE contentTopic IN ($1) AND
|
||||
pubsubTopic = $2 AND
|
||||
(storedAt, id) > ($3,$4) AND
|
||||
|
@ -892,14 +909,14 @@ proc performWriteQuery*(
|
|||
return ok()
|
||||
|
||||
proc addPartition(
|
||||
self: PostgresDriver, startTime: Timestamp, duration: timer.Duration
|
||||
self: PostgresDriver, startTime: Timestamp
|
||||
): Future[ArchiveDriverResult[void]] {.async.} =
|
||||
## Creates a partition table that will store the messages that fall in the range
|
||||
## `startTime` <= storedAt < `startTime + duration`.
|
||||
## `startTime` is measured in seconds since epoch
|
||||
|
||||
let beginning = startTime
|
||||
let `end` = (startTime + duration.seconds)
|
||||
let `end` = partitions_manager.calcEndPartitionTime(startTime)
|
||||
|
||||
let fromInSec: string = $beginning
|
||||
let untilInSec: string = $`end`
|
||||
|
@ -914,6 +931,11 @@ proc addPartition(
|
|||
"messages FOR VALUES FROM ('" & fromInNanoSec & "') TO ('" & untilInNanoSec & "');"
|
||||
|
||||
(await self.performWriteQuery(createPartitionQuery)).isOkOr:
|
||||
if error.contains("already exists"):
|
||||
debug "skip create new partition as it already exists: ", skipped_error = $error
|
||||
return ok()
|
||||
|
||||
## for any different error, just consider it
|
||||
return err(fmt"error adding partition [{partitionName}]: " & $error)
|
||||
|
||||
debug "new partition added", query = createPartitionQuery
|
||||
|
@ -953,7 +975,6 @@ proc initializePartitionsInfo(
|
|||
return ok()
|
||||
|
||||
const DefaultDatabasePartitionCheckTimeInterval = timer.minutes(10)
|
||||
const PartitionsRangeInterval = timer.hours(1) ## Time range covered by each parition
|
||||
|
||||
proc loopPartitionFactory(
|
||||
self: PostgresDriver, onFatalError: OnFatalErrorHandler
|
||||
|
@ -963,11 +984,6 @@ proc loopPartitionFactory(
|
|||
|
||||
debug "starting loopPartitionFactory"
|
||||
|
||||
if PartitionsRangeInterval < DefaultDatabasePartitionCheckTimeInterval:
|
||||
onFatalError(
|
||||
"partition factory partition range interval should be bigger than check interval"
|
||||
)
|
||||
|
||||
## First of all, let's make the 'partition_manager' aware of the current partitions
|
||||
(await self.initializePartitionsInfo()).isOkOr:
|
||||
onFatalError("issue in loopPartitionFactory: " & $error)
|
||||
|
@ -979,7 +995,7 @@ proc loopPartitionFactory(
|
|||
|
||||
if self.partitionMngr.isEmpty():
|
||||
debug "adding partition because now there aren't more partitions"
|
||||
(await self.addPartition(now, PartitionsRangeInterval)).isOkOr:
|
||||
(await self.addPartition(now)).isOkOr:
|
||||
onFatalError("error when creating a new partition from empty state: " & $error)
|
||||
else:
|
||||
let newestPartitionRes = self.partitionMngr.getNewestPartition()
|
||||
|
@ -992,18 +1008,14 @@ proc loopPartitionFactory(
|
|||
## The current used partition is the last one that was created.
|
||||
## Thus, let's create another partition for the future.
|
||||
|
||||
(
|
||||
await self.addPartition(
|
||||
newestPartition.getLastMoment(), PartitionsRangeInterval
|
||||
)
|
||||
).isOkOr:
|
||||
(await self.addPartition(newestPartition.getLastMoment())).isOkOr:
|
||||
onFatalError("could not add the next partition for 'now': " & $error)
|
||||
elif now >= newestPartition.getLastMoment():
|
||||
debug "creating a new partition to contain current messages"
|
||||
## There is no partition to contain the current time.
|
||||
## This happens if the node has been stopped for quite a long time.
|
||||
## Then, let's create the needed partition to contain 'now'.
|
||||
(await self.addPartition(now, PartitionsRangeInterval)).isOkOr:
|
||||
(await self.addPartition(now)).isOkOr:
|
||||
onFatalError("could not add the next partition: " & $error)
|
||||
|
||||
await sleepAsync(DefaultDatabasePartitionCheckTimeInterval)
|
||||
|
|
|
@ -12,11 +12,11 @@ import nimcrypto, std/options, std/tables, stew/endians2, stew/results, stew/byt
|
|||
import ./content_topic, ./pubsub_topic
|
||||
|
||||
type Sharding* = object
|
||||
clusterId*: uint32
|
||||
clusterId*: uint16
|
||||
# TODO: generations could be stored in a table here
|
||||
shardCountGenZero*: uint32
|
||||
|
||||
proc new*(T: type Sharding, clusterId: uint32, shardCount: uint32): T =
|
||||
proc new*(T: type Sharding, clusterId: uint16, shardCount: uint32): T =
|
||||
return Sharding(clusterId: clusterId, shardCountGenZero: shardCount)
|
||||
|
||||
proc getGenZeroShard*(s: Sharding, topic: NsContentTopic, count: int): NsPubsubTopic =
|
||||
|
@ -30,7 +30,7 @@ proc getGenZeroShard*(s: Sharding, topic: NsContentTopic, count: int): NsPubsubT
|
|||
# This is equilavent to modulo shard count but faster
|
||||
let shard = hashValue and uint64((count - 1))
|
||||
|
||||
NsPubsubTopic.staticSharding(uint16(s.clusterId), uint16(shard))
|
||||
NsPubsubTopic.staticSharding(s.clusterId, uint16(shard))
|
||||
|
||||
proc getShard*(s: Sharding, topic: NsContentTopic): Result[NsPubsubTopic, string] =
|
||||
## Compute the (pubsub topic) shard to use for this content topic.
|
||||
|
|
|
@ -260,7 +260,7 @@ proc containsShard*(r: Record, topic: PubsubTopic | string): bool =
|
|||
|
||||
containsShard(r, parseRes.value)
|
||||
|
||||
proc isClusterMismatched*(record: Record, clusterId: uint32): bool =
|
||||
proc isClusterMismatched*(record: Record, clusterId: uint16): bool =
|
||||
## Check the ENR sharding info for matching cluster id
|
||||
if (let typedRecord = record.toTyped(); typedRecord.isOk()):
|
||||
if (let relayShard = typedRecord.get().relaySharding(); relayShard.isSome()):
|
||||
|
|
Loading…
Reference in New Issue