mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-02-27 21:53:16 +00:00
Merge 3c4d32fb6707e2af9f27a63f1daa2b17d85a6f63 into ba85873f03a1da6ab04287949849815fd97b7bfd
This commit is contained in:
commit
0402542a23
@ -24,7 +24,7 @@ suite "Autosharding":
|
||||
suite "getGenZeroShard":
|
||||
test "Generate Gen0 Shard":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
|
||||
# Given two valid topics
|
||||
let
|
||||
@ -68,7 +68,7 @@ suite "Autosharding":
|
||||
suite "getShard from NsContentTopic":
|
||||
test "Generate Gen0 Shard with topic.generation==none":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
|
||||
# When we get a shard from a topic without generation
|
||||
let shard = sharding.getShard(contentTopicShort)
|
||||
@ -79,7 +79,7 @@ suite "Autosharding":
|
||||
|
||||
test "Generate Gen0 Shard with topic.generation==0":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
# When we get a shard from a gen0 topic
|
||||
let shard = sharding.getShard(contentTopicFull)
|
||||
|
||||
@ -89,7 +89,7 @@ suite "Autosharding":
|
||||
|
||||
test "Generate Gen0 Shard with topic.generation==other":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
# When we get a shard from ain invalid content topic
|
||||
let shard = sharding.getShard(contentTopicInvalid)
|
||||
|
||||
@ -100,7 +100,7 @@ suite "Autosharding":
|
||||
suite "getShard from ContentTopic":
|
||||
test "Generate Gen0 Shard with topic.generation==none":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
# When we get a shard from it
|
||||
let shard = sharding.getShard(contentTopicShort)
|
||||
|
||||
@ -110,7 +110,7 @@ suite "Autosharding":
|
||||
|
||||
test "Generate Gen0 Shard with topic.generation==0":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
# When we get a shard from it
|
||||
let shard = sharding.getShard(contentTopicFull)
|
||||
|
||||
@ -120,7 +120,7 @@ suite "Autosharding":
|
||||
|
||||
test "Generate Gen0 Shard with topic.generation==other":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
# When we get a shard from it
|
||||
let shard = sharding.getShard(contentTopicInvalid)
|
||||
|
||||
@ -130,18 +130,18 @@ suite "Autosharding":
|
||||
|
||||
test "Generate Gen0 Shard invalid topic":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
# When we get a shard from it
|
||||
let shard = sharding.getShard("invalid")
|
||||
|
||||
# Then the generated shard is valid
|
||||
check:
|
||||
shard.error() == "invalid format: topic must start with slash"
|
||||
shard.error() == "invalid format: content-topic 'invalid' must start with slash"
|
||||
|
||||
suite "parseSharding":
|
||||
xsuite "parseSharding":
|
||||
test "contentTopics is ContentTopic":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
# When calling with contentTopic as string
|
||||
let topicMap = sharding.parseSharding(some(pubsubTopic04), contentTopicShort)
|
||||
|
||||
@ -151,7 +151,7 @@ suite "Autosharding":
|
||||
|
||||
test "contentTopics is seq[ContentTopic]":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
# When calling with contentTopic as string seq
|
||||
let topicMap = sharding.parseSharding(
|
||||
some(pubsubTopic04), @[contentTopicShort, "/0/foo/1/bar/proto"]
|
||||
@ -163,7 +163,7 @@ suite "Autosharding":
|
||||
|
||||
test "pubsubTopic is none":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
# When calling with pubsubTopic as none
|
||||
let topicMap = sharding.parseSharding(PubsubTopic.none(), contentTopicShort)
|
||||
|
||||
@ -173,7 +173,7 @@ suite "Autosharding":
|
||||
|
||||
test "content parse error":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
# When calling with pubsubTopic as none with invalid content
|
||||
let topicMap = sharding.parseSharding(PubsubTopic.none(), "invalid")
|
||||
|
||||
@ -184,7 +184,7 @@ suite "Autosharding":
|
||||
|
||||
test "pubsubTopic parse error":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
# When calling with pubsubTopic as none with invalid content
|
||||
let topicMap = sharding.parseSharding(some("invalid"), contentTopicShort)
|
||||
|
||||
@ -195,7 +195,7 @@ suite "Autosharding":
|
||||
|
||||
test "pubsubTopic getShard error":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
# When calling with pubsubTopic as none with invalid content
|
||||
let topicMap = sharding.parseSharding(PubsubTopic.none(), contentTopicInvalid)
|
||||
|
||||
@ -207,3 +207,113 @@ suite "Autosharding":
|
||||
xtest "catchable error on add to topicMap":
|
||||
# TODO: Trigger a CatchableError or mock
|
||||
discard
|
||||
|
||||
suite "Arbitrary sharder network, auto shard selection":
|
||||
const arbitraryShards = @[2'u16, 4, 8, 16, 32, 64, 128, 256]
|
||||
|
||||
test "Initialize with arbitrary shard list":
|
||||
# When we initialize sharding with a custom shard list
|
||||
let sharding = Sharding.init(ClusterId, arbitraryShards)
|
||||
|
||||
# Given valid content topics
|
||||
let
|
||||
nsContentTopic1 = NsContentTopic.parse(contentTopicShort).value()
|
||||
nsContentTopic2 = NsContentTopic.parse(contentTopicFull).value()
|
||||
nsContentTopic3 = NsContentTopic.parse(contentTopicShort2).value()
|
||||
nsContentTopic4 = NsContentTopic.parse(contentTopicFull2).value()
|
||||
nsContentTopic5 = NsContentTopic.parse(contentTopicShort3).value()
|
||||
nsContentTopic6 = NsContentTopic.parse(contentTopicFull4).value()
|
||||
|
||||
# When we generate shards from them
|
||||
let
|
||||
shard1 = sharding.getGenZeroShard(nsContentTopic1, arbitraryShards.len)
|
||||
shard2 = sharding.getGenZeroShard(nsContentTopic2, arbitraryShards.len)
|
||||
shard3 = sharding.getGenZeroShard(nsContentTopic3, arbitraryShards.len)
|
||||
shard4 = sharding.getGenZeroShard(nsContentTopic4, arbitraryShards.len)
|
||||
shard5 = sharding.getGenZeroShard(nsContentTopic5, arbitraryShards.len)
|
||||
shard6 = sharding.getGenZeroShard(nsContentTopic6, arbitraryShards.len)
|
||||
|
||||
# Then the generated shards use IDs from the arbitrary list
|
||||
check:
|
||||
shard1 == RelayShard(clusterId: ClusterId, shardId: 16)
|
||||
shard2 == RelayShard(clusterId: ClusterId, shardId: 16)
|
||||
shard3 == RelayShard(clusterId: ClusterId, shardId: 128)
|
||||
shard4 == RelayShard(clusterId: ClusterId, shardId: 128)
|
||||
shard5 == RelayShard(clusterId: ClusterId, shardId: 16)
|
||||
shard6 == RelayShard(clusterId: ClusterId, shardId: 256)
|
||||
|
||||
test "getShard with arbitrary shard list - generation none":
|
||||
# When we initialize sharding with a custom shard list
|
||||
let sharding = Sharding.init(ClusterId, arbitraryShards)
|
||||
|
||||
# When we get a shard from a topic without generation
|
||||
let shard = sharding.getShard(contentTopicShort)
|
||||
|
||||
# Then the generated shard uses an ID from the arbitrary list
|
||||
check:
|
||||
shard.value() == RelayShard(clusterId: ClusterId, shardId: 16)
|
||||
|
||||
test "getShard with arbitrary shard list - generation zero":
|
||||
# When we initialize sharding with a custom shard list
|
||||
let sharding = Sharding.init(ClusterId, arbitraryShards)
|
||||
|
||||
# When we get a shard from a gen0 topic
|
||||
let shard = sharding.getShard(contentTopicFull)
|
||||
|
||||
# Then the generated shard uses an ID from the arbitrary list
|
||||
check:
|
||||
shard.value() == RelayShard(clusterId: ClusterId, shardId: 16)
|
||||
|
||||
test "Multiple topics map to shards from arbitrary list":
|
||||
# When we initialize sharding with a custom shard list
|
||||
let sharding = Sharding.init(ClusterId, arbitraryShards)
|
||||
|
||||
# Given multiple content topics
|
||||
let contentTopics = @[
|
||||
contentTopicShort,
|
||||
contentTopicFull,
|
||||
contentTopicShort2,
|
||||
contentTopicFull2,
|
||||
contentTopicShort3,
|
||||
contentTopicFull3,
|
||||
]
|
||||
|
||||
# When we get shards for all topics
|
||||
let shards = @[
|
||||
sharding.getShard(contentTopicShort).value(),
|
||||
sharding.getShard(contentTopicFull).value(),
|
||||
sharding.getShard(contentTopicShort2).value(),
|
||||
sharding.getShard(contentTopicFull2).value(),
|
||||
sharding.getShard(contentTopicShort3).value(),
|
||||
sharding.getShard(contentTopicFull3).value(),
|
||||
]
|
||||
|
||||
# Then all shard IDs match expected values from the arbitrary list
|
||||
check:
|
||||
shards[0] == RelayShard(clusterId: ClusterId, shardId: 16)
|
||||
shards[1] == RelayShard(clusterId: ClusterId, shardId: 16)
|
||||
shards[2] == RelayShard(clusterId: ClusterId, shardId: 128)
|
||||
shards[3] == RelayShard(clusterId: ClusterId, shardId: 128)
|
||||
shards[4] == RelayShard(clusterId: ClusterId, shardId: 16)
|
||||
shards[5] == RelayShard(clusterId: ClusterId, shardId: 16)
|
||||
|
||||
test "Consistent shard mapping with arbitrary list":
|
||||
# When we initialize sharding with a custom shard list
|
||||
let sharding = Sharding.init(ClusterId, arbitraryShards)
|
||||
|
||||
# Given a content topic
|
||||
let topic = contentTopicShort
|
||||
|
||||
# When we get the shard multiple times
|
||||
let
|
||||
shard1 = sharding.getShard(topic)
|
||||
shard2 = sharding.getShard(topic)
|
||||
shard3 = sharding.getShard(topic)
|
||||
|
||||
# Then the shard is consistent
|
||||
check:
|
||||
shard1.isOk()
|
||||
shard2.isOk()
|
||||
shard3.isOk()
|
||||
shard1.value() == shard2.value()
|
||||
shard2.value() == shard3.value()
|
||||
|
||||
@ -18,7 +18,7 @@ proc newTestWakuLightpushNode*(
|
||||
): Future[WakuLightPush] {.async.} =
|
||||
let
|
||||
peerManager = PeerManager.new(switch)
|
||||
wakuAutoSharding = Sharding(clusterId: 1, shardCountGenZero: 8)
|
||||
wakuAutoSharding = Sharding.init(clusterId = 1, shardCount = 8)
|
||||
proto = WakuLightPush.new(
|
||||
peerManager, rng, handler, some(wakuAutoSharding), rateLimitSetting
|
||||
)
|
||||
|
||||
@ -298,7 +298,8 @@ proc setupProtocols(
|
||||
node.mountAutoSharding(conf.clusterId, conf.shardingConf.numShardsInCluster).isOkOr:
|
||||
return err("failed to mount waku auto sharding: " & error)
|
||||
else:
|
||||
warn("Auto sharding is disabled")
|
||||
node.mountAutoSharding(conf.clusterId, conf.subscribeShards).isOkOr:
|
||||
return err("failed to mount waku auto sharding: " & error)
|
||||
|
||||
# Mount relay on all nodes
|
||||
var peerExchangeHandler = none(RoutingRecordsHandler)
|
||||
|
||||
@ -301,8 +301,15 @@ proc mountAutoSharding*(
|
||||
node: WakuNode, clusterId: uint16, shardCount: uint32
|
||||
): Result[void, string] =
|
||||
info "mounting auto sharding", clusterId = clusterId, shardCount = shardCount
|
||||
node.wakuAutoSharding =
|
||||
some(Sharding(clusterId: clusterId, shardCountGenZero: shardCount))
|
||||
node.wakuAutoSharding = some(Sharding.init(clusterId, shardCount))
|
||||
|
||||
return ok()
|
||||
|
||||
proc mountAutoSharding*(
|
||||
node: WakuNode, clusterId: uint16, shards: seq[uint16]
|
||||
): Result[void, string] =
|
||||
info "mounting auto sharding", clusterId = clusterId, shards = shards
|
||||
node.wakuAutoSharding = some(Sharding.init(clusterId, shards))
|
||||
|
||||
return ok()
|
||||
|
||||
|
||||
@ -4,18 +4,30 @@
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import nimcrypto, std/options, std/tables, stew/endians2, results, stew/byteutils
|
||||
import nimcrypto, std/[options, tables, sequtils], stew/[endians2, byteutils], results
|
||||
|
||||
import ./content_topic, ./pubsub_topic
|
||||
|
||||
# TODO: this is autosharding, not just "sharding"
|
||||
type Sharding* = object
|
||||
clusterId*: uint16
|
||||
clusterId: uint16
|
||||
# TODO: generations could be stored in a table here
|
||||
shardCountGenZero*: uint32
|
||||
shardCountGenZero: uint32
|
||||
supportedShards: seq[uint16]
|
||||
|
||||
proc new*(T: type Sharding, clusterId: uint16, shardCount: uint32): T =
|
||||
return Sharding(clusterId: clusterId, shardCountGenZero: shardCount)
|
||||
proc init*(T: typedesc[Sharding], clusterId: uint16, shardCount: uint32): T =
|
||||
return Sharding(
|
||||
clusterId: clusterId,
|
||||
shardCountGenZero: shardCount,
|
||||
supportedShards: toSeq(0'u16 ..< uint16(shardCount)),
|
||||
)
|
||||
|
||||
proc init*(T: typedesc[Sharding], clusterId: uint16, supportedShards: seq[uint16]): T =
|
||||
return Sharding(
|
||||
clusterId: clusterId,
|
||||
shardCountGenZero: uint32(supportedShards.len),
|
||||
supportedShards: supportedShards,
|
||||
)
|
||||
|
||||
proc getGenZeroShard*(s: Sharding, topic: NsContentTopic, count: int): RelayShard =
|
||||
let bytes = toBytes(topic.application) & toBytes(topic.version)
|
||||
@ -27,7 +39,7 @@ proc getGenZeroShard*(s: Sharding, topic: NsContentTopic, count: int): RelayShar
|
||||
|
||||
let shard = hashValue mod uint64(count)
|
||||
|
||||
RelayShard(clusterId: s.clusterId, shardId: uint16(shard))
|
||||
RelayShard(clusterId: s.clusterId, shardId: s.supportedShards[shard])
|
||||
|
||||
proc getShard*(s: Sharding, topic: NsContentTopic): Result[RelayShard, string] =
|
||||
## Compute the (pubsub topic) shard to use for this content topic.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user