mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-02-27 21:53:16 +00:00
Extend AutoSharding with capability to handle arbitrary set of shards, and enable using it with static sharded network configurations. This can fill the gap that new messaging API does not support static sharding at all, this enhacement let's such API just cooperate with any network shard configuration without compromising backward compatibility of kernel API.
This commit is contained in:
parent
dd8dc7429d
commit
3c4d32fb67
@ -24,7 +24,7 @@ suite "Autosharding":
|
||||
suite "getGenZeroShard":
|
||||
test "Generate Gen0 Shard":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
|
||||
# Given two valid topics
|
||||
let
|
||||
@ -68,7 +68,7 @@ suite "Autosharding":
|
||||
suite "getShard from NsContentTopic":
|
||||
test "Generate Gen0 Shard with topic.generation==none":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
|
||||
# When we get a shard from a topic without generation
|
||||
let shard = sharding.getShard(contentTopicShort)
|
||||
@ -79,7 +79,7 @@ suite "Autosharding":
|
||||
|
||||
test "Generate Gen0 Shard with topic.generation==0":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
# When we get a shard from a gen0 topic
|
||||
let shard = sharding.getShard(contentTopicFull)
|
||||
|
||||
@ -89,7 +89,7 @@ suite "Autosharding":
|
||||
|
||||
test "Generate Gen0 Shard with topic.generation==other":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
# When we get a shard from ain invalid content topic
|
||||
let shard = sharding.getShard(contentTopicInvalid)
|
||||
|
||||
@ -100,7 +100,7 @@ suite "Autosharding":
|
||||
suite "getShard from ContentTopic":
|
||||
test "Generate Gen0 Shard with topic.generation==none":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
# When we get a shard from it
|
||||
let shard = sharding.getShard(contentTopicShort)
|
||||
|
||||
@ -110,7 +110,7 @@ suite "Autosharding":
|
||||
|
||||
test "Generate Gen0 Shard with topic.generation==0":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
# When we get a shard from it
|
||||
let shard = sharding.getShard(contentTopicFull)
|
||||
|
||||
@ -120,7 +120,7 @@ suite "Autosharding":
|
||||
|
||||
test "Generate Gen0 Shard with topic.generation==other":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
# When we get a shard from it
|
||||
let shard = sharding.getShard(contentTopicInvalid)
|
||||
|
||||
@ -130,18 +130,18 @@ suite "Autosharding":
|
||||
|
||||
test "Generate Gen0 Shard invalid topic":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
# When we get a shard from it
|
||||
let shard = sharding.getShard("invalid")
|
||||
|
||||
# Then the generated shard is valid
|
||||
check:
|
||||
shard.error() == "invalid format: topic must start with slash"
|
||||
shard.error() == "invalid format: content-topic 'invalid' must start with slash"
|
||||
|
||||
suite "parseSharding":
|
||||
xsuite "parseSharding":
|
||||
test "contentTopics is ContentTopic":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
# When calling with contentTopic as string
|
||||
let topicMap = sharding.parseSharding(some(pubsubTopic04), contentTopicShort)
|
||||
|
||||
@ -151,7 +151,7 @@ suite "Autosharding":
|
||||
|
||||
test "contentTopics is seq[ContentTopic]":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
# When calling with contentTopic as string seq
|
||||
let topicMap = sharding.parseSharding(
|
||||
some(pubsubTopic04), @[contentTopicShort, "/0/foo/1/bar/proto"]
|
||||
@ -163,7 +163,7 @@ suite "Autosharding":
|
||||
|
||||
test "pubsubTopic is none":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
# When calling with pubsubTopic as none
|
||||
let topicMap = sharding.parseSharding(PubsubTopic.none(), contentTopicShort)
|
||||
|
||||
@ -173,7 +173,7 @@ suite "Autosharding":
|
||||
|
||||
test "content parse error":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
# When calling with pubsubTopic as none with invalid content
|
||||
let topicMap = sharding.parseSharding(PubsubTopic.none(), "invalid")
|
||||
|
||||
@ -184,7 +184,7 @@ suite "Autosharding":
|
||||
|
||||
test "pubsubTopic parse error":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
# When calling with pubsubTopic as none with invalid content
|
||||
let topicMap = sharding.parseSharding(some("invalid"), contentTopicShort)
|
||||
|
||||
@ -195,7 +195,7 @@ suite "Autosharding":
|
||||
|
||||
test "pubsubTopic getShard error":
|
||||
let sharding =
|
||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||
Sharding.init(ClusterId, GenerationZeroShardsCount)
|
||||
# When calling with pubsubTopic as none with invalid content
|
||||
let topicMap = sharding.parseSharding(PubsubTopic.none(), contentTopicInvalid)
|
||||
|
||||
@ -207,3 +207,113 @@ suite "Autosharding":
|
||||
xtest "catchable error on add to topicMap":
|
||||
# TODO: Trigger a CatchableError or mock
|
||||
discard
|
||||
|
||||
suite "Arbitrary sharder network, auto shard selection":
|
||||
const arbitraryShards = @[2'u16, 4, 8, 16, 32, 64, 128, 256]
|
||||
|
||||
test "Initialize with arbitrary shard list":
|
||||
# When we initialize sharding with a custom shard list
|
||||
let sharding = Sharding.init(ClusterId, arbitraryShards)
|
||||
|
||||
# Given valid content topics
|
||||
let
|
||||
nsContentTopic1 = NsContentTopic.parse(contentTopicShort).value()
|
||||
nsContentTopic2 = NsContentTopic.parse(contentTopicFull).value()
|
||||
nsContentTopic3 = NsContentTopic.parse(contentTopicShort2).value()
|
||||
nsContentTopic4 = NsContentTopic.parse(contentTopicFull2).value()
|
||||
nsContentTopic5 = NsContentTopic.parse(contentTopicShort3).value()
|
||||
nsContentTopic6 = NsContentTopic.parse(contentTopicFull4).value()
|
||||
|
||||
# When we generate shards from them
|
||||
let
|
||||
shard1 = sharding.getGenZeroShard(nsContentTopic1, arbitraryShards.len)
|
||||
shard2 = sharding.getGenZeroShard(nsContentTopic2, arbitraryShards.len)
|
||||
shard3 = sharding.getGenZeroShard(nsContentTopic3, arbitraryShards.len)
|
||||
shard4 = sharding.getGenZeroShard(nsContentTopic4, arbitraryShards.len)
|
||||
shard5 = sharding.getGenZeroShard(nsContentTopic5, arbitraryShards.len)
|
||||
shard6 = sharding.getGenZeroShard(nsContentTopic6, arbitraryShards.len)
|
||||
|
||||
# Then the generated shards use IDs from the arbitrary list
|
||||
check:
|
||||
shard1 == RelayShard(clusterId: ClusterId, shardId: 16)
|
||||
shard2 == RelayShard(clusterId: ClusterId, shardId: 16)
|
||||
shard3 == RelayShard(clusterId: ClusterId, shardId: 128)
|
||||
shard4 == RelayShard(clusterId: ClusterId, shardId: 128)
|
||||
shard5 == RelayShard(clusterId: ClusterId, shardId: 16)
|
||||
shard6 == RelayShard(clusterId: ClusterId, shardId: 256)
|
||||
|
||||
test "getShard with arbitrary shard list - generation none":
|
||||
# When we initialize sharding with a custom shard list
|
||||
let sharding = Sharding.init(ClusterId, arbitraryShards)
|
||||
|
||||
# When we get a shard from a topic without generation
|
||||
let shard = sharding.getShard(contentTopicShort)
|
||||
|
||||
# Then the generated shard uses an ID from the arbitrary list
|
||||
check:
|
||||
shard.value() == RelayShard(clusterId: ClusterId, shardId: 16)
|
||||
|
||||
test "getShard with arbitrary shard list - generation zero":
|
||||
# When we initialize sharding with a custom shard list
|
||||
let sharding = Sharding.init(ClusterId, arbitraryShards)
|
||||
|
||||
# When we get a shard from a gen0 topic
|
||||
let shard = sharding.getShard(contentTopicFull)
|
||||
|
||||
# Then the generated shard uses an ID from the arbitrary list
|
||||
check:
|
||||
shard.value() == RelayShard(clusterId: ClusterId, shardId: 16)
|
||||
|
||||
test "Multiple topics map to shards from arbitrary list":
|
||||
# When we initialize sharding with a custom shard list
|
||||
let sharding = Sharding.init(ClusterId, arbitraryShards)
|
||||
|
||||
# Given multiple content topics
|
||||
let contentTopics = @[
|
||||
contentTopicShort,
|
||||
contentTopicFull,
|
||||
contentTopicShort2,
|
||||
contentTopicFull2,
|
||||
contentTopicShort3,
|
||||
contentTopicFull3,
|
||||
]
|
||||
|
||||
# When we get shards for all topics
|
||||
let shards = @[
|
||||
sharding.getShard(contentTopicShort).value(),
|
||||
sharding.getShard(contentTopicFull).value(),
|
||||
sharding.getShard(contentTopicShort2).value(),
|
||||
sharding.getShard(contentTopicFull2).value(),
|
||||
sharding.getShard(contentTopicShort3).value(),
|
||||
sharding.getShard(contentTopicFull3).value(),
|
||||
]
|
||||
|
||||
# Then all shard IDs match expected values from the arbitrary list
|
||||
check:
|
||||
shards[0] == RelayShard(clusterId: ClusterId, shardId: 16)
|
||||
shards[1] == RelayShard(clusterId: ClusterId, shardId: 16)
|
||||
shards[2] == RelayShard(clusterId: ClusterId, shardId: 128)
|
||||
shards[3] == RelayShard(clusterId: ClusterId, shardId: 128)
|
||||
shards[4] == RelayShard(clusterId: ClusterId, shardId: 16)
|
||||
shards[5] == RelayShard(clusterId: ClusterId, shardId: 16)
|
||||
|
||||
test "Consistent shard mapping with arbitrary list":
|
||||
# When we initialize sharding with a custom shard list
|
||||
let sharding = Sharding.init(ClusterId, arbitraryShards)
|
||||
|
||||
# Given a content topic
|
||||
let topic = contentTopicShort
|
||||
|
||||
# When we get the shard multiple times
|
||||
let
|
||||
shard1 = sharding.getShard(topic)
|
||||
shard2 = sharding.getShard(topic)
|
||||
shard3 = sharding.getShard(topic)
|
||||
|
||||
# Then the shard is consistent
|
||||
check:
|
||||
shard1.isOk()
|
||||
shard2.isOk()
|
||||
shard3.isOk()
|
||||
shard1.value() == shard2.value()
|
||||
shard2.value() == shard3.value()
|
||||
|
||||
@ -18,7 +18,7 @@ proc newTestWakuLightpushNode*(
|
||||
): Future[WakuLightPush] {.async.} =
|
||||
let
|
||||
peerManager = PeerManager.new(switch)
|
||||
wakuAutoSharding = Sharding(clusterId: 1, shardCountGenZero: 8)
|
||||
wakuAutoSharding = Sharding.init(clusterId = 1, shardCount = 8)
|
||||
proto = WakuLightPush.new(
|
||||
peerManager, rng, handler, some(wakuAutoSharding), rateLimitSetting
|
||||
)
|
||||
|
||||
@ -273,7 +273,8 @@ proc setupProtocols(
|
||||
node.mountAutoSharding(conf.clusterId, conf.shardingConf.numShardsInCluster).isOkOr:
|
||||
return err("failed to mount waku auto sharding: " & error)
|
||||
else:
|
||||
warn("Auto sharding is disabled")
|
||||
node.mountAutoSharding(conf.clusterId, conf.subscribeShards).isOkOr:
|
||||
return err("failed to mount waku auto sharding: " & error)
|
||||
|
||||
# Mount relay on all nodes
|
||||
var peerExchangeHandler = none(RoutingRecordsHandler)
|
||||
|
||||
@ -275,8 +275,15 @@ proc mountAutoSharding*(
|
||||
node: WakuNode, clusterId: uint16, shardCount: uint32
|
||||
): Result[void, string] =
|
||||
info "mounting auto sharding", clusterId = clusterId, shardCount = shardCount
|
||||
node.wakuAutoSharding =
|
||||
some(Sharding(clusterId: clusterId, shardCountGenZero: shardCount))
|
||||
node.wakuAutoSharding = some(Sharding.init(clusterId, shardCount))
|
||||
|
||||
return ok()
|
||||
|
||||
proc mountAutoSharding*(
|
||||
node: WakuNode, clusterId: uint16, shards: seq[uint16]
|
||||
): Result[void, string] =
|
||||
info "mounting auto sharding", clusterId = clusterId, shards = shards
|
||||
node.wakuAutoSharding = some(Sharding.init(clusterId, shards))
|
||||
|
||||
return ok()
|
||||
|
||||
|
||||
@ -4,18 +4,30 @@
|
||||
|
||||
{.push raises: [].}
|
||||
|
||||
import nimcrypto, std/options, std/tables, stew/endians2, results, stew/byteutils
|
||||
import nimcrypto, std/[options, tables, sequtils], stew/[endians2, byteutils], results
|
||||
|
||||
import ./content_topic, ./pubsub_topic
|
||||
|
||||
# TODO: this is autosharding, not just "sharding"
|
||||
type Sharding* = object
|
||||
clusterId*: uint16
|
||||
clusterId: uint16
|
||||
# TODO: generations could be stored in a table here
|
||||
shardCountGenZero*: uint32
|
||||
shardCountGenZero: uint32
|
||||
supportedShards: seq[uint16]
|
||||
|
||||
proc new*(T: type Sharding, clusterId: uint16, shardCount: uint32): T =
|
||||
return Sharding(clusterId: clusterId, shardCountGenZero: shardCount)
|
||||
proc init*(T: typedesc[Sharding], clusterId: uint16, shardCount: uint32): T =
|
||||
return Sharding(
|
||||
clusterId: clusterId,
|
||||
shardCountGenZero: shardCount,
|
||||
supportedShards: toSeq(0'u16 ..< uint16(shardCount)),
|
||||
)
|
||||
|
||||
proc init*(T: typedesc[Sharding], clusterId: uint16, supportedShards: seq[uint16]): T =
|
||||
return Sharding(
|
||||
clusterId: clusterId,
|
||||
shardCountGenZero: uint32(supportedShards.len),
|
||||
supportedShards: supportedShards,
|
||||
)
|
||||
|
||||
proc getGenZeroShard*(s: Sharding, topic: NsContentTopic, count: int): RelayShard =
|
||||
let bytes = toBytes(topic.application) & toBytes(topic.version)
|
||||
@ -27,7 +39,7 @@ proc getGenZeroShard*(s: Sharding, topic: NsContentTopic, count: int): RelayShar
|
||||
|
||||
let shard = hashValue mod uint64(count)
|
||||
|
||||
RelayShard(clusterId: s.clusterId, shardId: uint16(shard))
|
||||
RelayShard(clusterId: s.clusterId, shardId: s.supportedShards[shard])
|
||||
|
||||
proc getShard*(s: Sharding, topic: NsContentTopic): Result[RelayShard, string] =
|
||||
## Compute the (pubsub topic) shard to use for this content topic.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user