2023-08-01 09:05:16 -04:00
## Waku autosharding utils
## See 51/WAKU2-RELAY-SHARDING RFC: https://rfc.vac.dev/spec/51/#automatic-sharding
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
{.push raises: [].}
2023-08-17 08:11:18 -04:00
2023-08-01 09:05:16 -04:00
## For indices allocation and other magic numbers refer to RFC 51
2023-08-17 08:11:18 -04:00
const ClusterIndex* = 1
const GenerationZeroShardsCount* = 8
proc getGenZeroShard*(topic: NsContentTopic, count: int): NsPubsubTopic =
let bytes = toBytes(topic.application) & toBytes(topic.version)
let hash = sha256.digest(bytes)
# We only use the last 64 bits of the hash as having more shards is unlikely.
let hashValue = uint64.fromBytesBE(hash.data[24..31])
# This is equilavent to modulo shard count but faster
let shard = hashValue and uint64((count - 1))
NsPubsubTopic.staticSharding(ClusterIndex, uint16(shard))
proc getShard*(topic: NsContentTopic): Result[NsPubsubTopic, string] =
## Compute the (pubsub topic) shard to use for this content topic.
if topic.generation.isNone():
## Implicit generation # is 0 for all content topic
return ok(getGenZeroShard(topic, GenerationZeroShardsCount))
case topic.generation.get():
of 0: return ok(getGenZeroShard(topic, GenerationZeroShardsCount))
else: return err("Generation > 0 are not supported yet")
2023-08-31 16:13:45 -04:00
proc getShard*(topic: ContentTopic): Result[PubsubTopic, string] =
let parsedTopic = NsContentTopic.parse(topic).valueOr:
return err($error)
let shard = ?getShard(parsedTopic)
2023-08-17 08:11:18 -04:00
proc parseSharding*(pubsubTopic: Option[PubsubTopic], contentTopics: ContentTopic|seq[ContentTopic]): Result[Table[NsPubsubTopic, seq[NsContentTopic]], string] =
var topics: seq[ContentTopic]
when contentTopics is seq[ContentTopic]:
topics = contentTopics
topics = @[contentTopics]
var topicMap = initTable[NsPubsubTopic, seq[NsContentTopic]]()
for contentTopic in topics:
let parseRes = NsContentTopic.parse(contentTopic)
let content =
if parseRes.isErr():
return err("Cannot parse content topic: " & $parseRes.error)
else: parseRes.get()
let pubsub =
if pubsubTopic.isSome():
let parseRes = NsPubsubTopic.parse(pubsubTopic.get())
if parseRes.isErr():
return err("Cannot parse pubsub topic: " & $parseRes.error)
else: parseRes.get()
let shardsRes = getShard(content)
if shardsRes.isErr():
return err("Cannot autoshard content topic: " & $shardsRes.error)
else: shardsRes.get()
if not topicMap.hasKey(pubsub):
topicMap[pubsub] = @[]
except CatchableError:
return err(getCurrentExceptionMsg())
#type ShardsPriority = seq[tuple[topic: NsPubsubTopic, value: float64]]
#[ proc shardCount*(topic: NsContentTopic): Result[int, string] =
## Returns the total shard count from the content topic.
2023-08-01 09:05:16 -04:00
let shardCount =
if topic.generation.isNone():
## Implicit generation # is 0 for all content topic
case topic.generation.get():
of 0:
return err("Generation > 0 are not supported yet")
2023-08-17 08:11:18 -04:00
ok((shardCount)) ]#
2023-08-01 09:05:16 -04:00
2023-08-17 08:11:18 -04:00
#[ proc applyWeight(hashValue: uint64, weight: float64): float64 =
(-weight) / math.ln(float64(hashValue) / float64(high(uint64))) ]#
2023-08-01 09:05:16 -04:00
2023-08-17 08:11:18 -04:00
#[ proc hashOrder*(x, y: (NsPubsubTopic, float64)): int =
cmp(x[1], y[1]) ]#
2023-08-01 09:05:16 -04:00
2023-08-17 08:11:18 -04:00
#[ proc weightedShardList*(topic: NsContentTopic, shardCount: int, weightList: seq[float64]): Result[ShardsPriority, string] =
2023-08-01 09:05:16 -04:00
## Returns the ordered list of shards and their priority values.
if weightList.len < shardCount:
return err("Must provide weights for every shards")
let shardsNWeights = zip(toSeq(0..shardCount), weightList)
var list = newSeq[(NsPubsubTopic, float64)](shardCount)
for (shard, weight) in shardsNWeights:
let pubsub = NsPubsubTopic.staticSharding(ClusterIndex, uint16(shard))
let clusterBytes = toBytesBE(uint16(ClusterIndex))
let shardBytes = toBytesBE(uint16(shard))
let bytes = toBytes(topic.application) & toBytes(topic.version) & @clusterBytes & @shardBytes
let hash = sha256.digest(bytes)
let hashValue = uint64.fromBytesBE(hash.data)
let value = applyWeight(hashValue, weight)
list[shard] = (pubsub, value)
2023-08-17 08:11:18 -04:00
ok(list) ]#
2023-08-01 09:05:16 -04:00
2023-08-17 08:11:18 -04:00
#[ proc singleHighestWeigthShard*(topic: NsContentTopic): Result[NsPubsubTopic, string] =
2023-08-01 09:05:16 -04:00
let count = ? shardCount(topic)
2023-08-17 08:11:18 -04:00
let weights = repeat(1.0, count)
2023-08-01 09:05:16 -04:00
let list = ? weightedShardList(topic, count, weights)
let (pubsub, _) = list[list.len - 1]
2023-08-17 08:11:18 -04:00
ok(pubsub) ]#