diff --git a/apps/wakubridge/message_compat.nim b/apps/wakubridge/message_compat.nim index b1c75cb54..5b773deea 100644 --- a/apps/wakubridge/message_compat.nim +++ b/apps/wakubridge/message_compat.nim @@ -4,6 +4,7 @@ else: {.push raises: [].} import + std/options, stew/[byteutils, results], libp2p/crypto/crypto import @@ -35,6 +36,8 @@ proc toV2ContentTopic*(v1Topic: waku_protocol.Topic): ContentTopic = ## should be prefixed with `0x` var namespacedTopic = NsContentTopic() + namespacedTopic.generation = none(int) + namespacedTopic.bias = Unbiased namespacedTopic.application = ContentTopicApplication namespacedTopic.version = ContentTopicAppVersion namespacedTopic.name = v1Topic.to0xHex() diff --git a/tests/v2/test_utils_compat.nim b/tests/v2/test_utils_compat.nim index 99c91231c..5ae6a90cf 100644 --- a/tests/v2/test_utils_compat.nim +++ b/tests/v2/test_utils_compat.nim @@ -3,7 +3,8 @@ import testutils/unittests import - ../../waku/v2/waku_core, + ../../waku/v2/waku_core/message, + ../../waku/v2/waku_core/time, ../../waku/v2/utils/compat, ./testlib/common diff --git a/tests/v2/waku_core/test_namespaced_topics.nim b/tests/v2/waku_core/test_namespaced_topics.nim index 08920dfaf..d4fdd5820 100644 --- a/tests/v2/waku_core/test_namespaced_topics.nim +++ b/tests/v2/waku_core/test_namespaced_topics.nim @@ -1,6 +1,7 @@ {.used.} import + std/options, stew/results, testutils/unittests import @@ -11,6 +12,8 @@ suite "Waku Message - Content topics namespacing": test "Stringify namespaced content topic": ## Given var ns = NsContentTopic() + ns.generation = none(int) + ns.bias = Unbiased ns.application = "toychat" ns.version = "2" ns.name = "huilong" @@ -31,10 +34,31 @@ suite "Waku Message - Content topics namespacing": let nsRes = NsContentTopic.parse(topic) ## Then - check nsRes.isOk() + assert nsRes.isOk(), $nsRes.error let ns = nsRes.get() check: + ns.generation == none(int) + ns.bias == Unbiased + ns.application == "toychat" + ns.version == "2" + ns.name == "huilong" + ns.encoding == "proto" + + test "Parse content topic string - Valid string with sharding": + ## Given + let topic = "/0/lower20/toychat/2/huilong/proto" + + ## When + let nsRes = NsContentTopic.parse(topic) + + ## Then + assert nsRes.isOk(), $nsRes.error + + let ns = nsRes.get() + check: + ns.generation == some(0) + ns.bias == Lower20 ns.application == "toychat" ns.version == "2" ns.name == "huilong" @@ -48,7 +72,8 @@ suite "Waku Message - Content topics namespacing": let ns = NsContentTopic.parse(topic) ## Then - check ns.isErr() + assert ns.isErr(), $ns.get() + let err = ns.tryError() check: err.kind == ParsingErrorKind.InvalidFormat @@ -62,13 +87,13 @@ suite "Waku Message - Content topics namespacing": let ns = NsContentTopic.parse(topic) ## Then - check ns.isErr() + assert ns.isErr(), $ns.get() + let err = ns.tryError() check: err.kind == ParsingErrorKind.InvalidFormat err.cause == "invalid topic structure" - test "Parse content topic string - Invalid string: missing encoding part": ## Given let topic = "/toychat/2/huilong" @@ -77,13 +102,14 @@ suite "Waku Message - Content topics namespacing": let ns = NsContentTopic.parse(topic) ## Then - check ns.isErr() + assert ns.isErr(), $ns.get() + let err = ns.tryError() check: err.kind == ParsingErrorKind.InvalidFormat err.cause == "invalid topic structure" - test "Parse content topic string - Invalid string: too many parts": + test "Parse content topic string - Invalid string: wrong extra parts": ## Given let topic = "/toychat/2/huilong/proto/33" @@ -91,12 +117,42 @@ suite "Waku Message - Content topics namespacing": let ns = NsContentTopic.parse(topic) ## Then - check ns.isErr() + assert ns.isErr(), $ns.get() + let err = ns.tryError() check: err.kind == ParsingErrorKind.InvalidFormat err.cause == "invalid topic structure" + test "Parse content topic string - Invalid string: non numeric generation": + ## Given + let topic = "/first/unbiased/toychat/2/huilong/proto" + + ## When + let ns = NsContentTopic.parse(topic) + + ## Then + assert ns.isErr(), $ns.get() + + let err = ns.tryError() + check: + err.kind == ParsingErrorKind.InvalidFormat + err.cause == "generation should be a numeric value" + + test "Parse content topic string - Invalid string: invalid bias": + ## Given + let topic = "/0/no/toychat/2/huilong/proto" + + ## When + let ns = NsContentTopic.parse(topic) + + ## Then + assert ns.isErr(), $ns.get() + + let err = ns.tryError() + check: + err.kind == ParsingErrorKind.InvalidFormat + err.cause == "bias should be one of; unbiased, lower20 or higher80" suite "Waku Message - Pub-sub topics namespacing": @@ -178,7 +234,6 @@ suite "Waku Message - Pub-sub topics namespacing": err.kind == ParsingErrorKind.MissingPart err.part == "shard_cluster_index" - test "Parse static sharding pub-sub topic string - Invalid string: cluster value": ## Given let topic = "/waku/2/rs/xx/77" diff --git a/tests/v2/waku_core/test_sharding.nim b/tests/v2/waku_core/test_sharding.nim new file mode 100644 index 000000000..0eced8534 --- /dev/null +++ b/tests/v2/waku_core/test_sharding.nim @@ -0,0 +1,178 @@ +{.used.} + +import + std/options, + std/strutils, + std/sugar, + std/algorithm, + std/random, + stew/results, + testutils/unittests +import + ../../../waku/v2/waku_core/topics + +suite "Waku Sharding": + + randomize() + + const WordLength = 5 + + proc randomContentTopic(): NsContentTopic = + var app = "" + + for n in 0.. 0 are not supported yet" + + test "Weigths bias": + ## Given + let count = 5 + + ## When + let anonWeigths = biasedWeights(count, ShardingBias.Lower20) + let speedWeigths = biasedWeights(count, ShardingBias.Higher80) + + ## Then + check: + anonWeigths[0] == 2.0 + anonWeigths[1] == 1.0 + anonWeigths[2] == 1.0 + anonWeigths[3] == 1.0 + anonWeigths[4] == 1.0 + + speedWeigths[0] == 1.0 + speedWeigths[1] == 2.0 + speedWeigths[2] == 2.0 + speedWeigths[3] == 2.0 + speedWeigths[4] == 2.0 + + test "Sorted shard list": + ## Given + let topic = "/0/unbiased/toychat/2/huilong/proto" + + ## When + let contentTopic = NsContentTopic.parse(topic).expect("Parsing") + let count = shardCount(contentTopic).expect("Valid parameters") + let weights = biasedWeights(count, contentTopic.bias) + + let shardsRes = weightedShardList(contentTopic, count, weights) + + ## Then + assert shardsRes.isOk(), shardsRes.error + + let shards = shardsRes.get() + check: + shards.len == count + isSorted(shards, hashOrder) + + test "Shard Choice Reproducibility": + ## Given + let topic = "/toychat/2/huilong/proto" + + ## When + let contentTopic = NsContentTopic.parse(topic).expect("Parsing") + + let res = singleHighestWeigthShard(contentTopic) + + ## Then + assert res.isOk(), res.error + + let pubsubTopic = res.get() + + check: + pubsubTopic == NsPubsubTopic.staticSharding(ClusterIndex, 3) + + test "Shard Choice Simulation": + ## Given + let topics = collect: + for i in 0..<100000: + randomContentTopic() + + var counts = newSeq[0](GenerationZeroShardsCount) + + ## When + for topic in topics: + let pubsub = singleHighestWeigthShard(topic).expect("Valid Topic") + counts[pubsub.shard] += 1 + + ## Then + for i in 1..= (float64(counts[i]) * 0.95) + float64(counts[i]) >= (float64(counts[i - 1]) * 0.95) + + #echo counts + + + + + + + + + + diff --git a/waku/v2/waku_core/topics.nim b/waku/v2/waku_core/topics.nim index 1d2720274..08519ce6f 100644 --- a/waku/v2/waku_core/topics.nim +++ b/waku/v2/waku_core/topics.nim @@ -1,7 +1,9 @@ import ./topics/content_topic, - ./topics/pubsub_topic + ./topics/pubsub_topic, + ./topics/sharding export content_topic, - pubsub_topic + pubsub_topic, + sharding diff --git a/waku/v2/waku_core/topics/content_topic.nim b/waku/v2/waku_core/topics/content_topic.nim index ef2e03464..96cdd84bc 100644 --- a/waku/v2/waku_core/topics/content_topic.nim +++ b/waku/v2/waku_core/topics/content_topic.nim @@ -8,6 +8,7 @@ else: {.push raises: [].} import + std/options, std/strutils, stew/results import @@ -25,63 +26,115 @@ const DefaultContentTopic* = ContentTopic("/waku/2/default-content/proto") ## Namespaced content topic +type ShardingBias* = enum + Unbiased = "unbiased" + Lower20 = "lower20" + Higher80 = "higher80" + type NsContentTopic* = object + generation*: Option[int] + bias*: ShardingBias application*: string version*: string name*: string encoding*: string -proc init*(T: type NsContentTopic, application, version, name, encoding: string): T = +proc init*(T: type NsContentTopic, generation: Option[int], bias: ShardingBias, + application: string, version: string, name: string, encoding: string): T = NsContentTopic( + generation: generation, + bias: bias, application: application, version: version, name: name, encoding: encoding ) - # Serialization proc `$`*(topic: NsContentTopic): string = ## Returns a string representation of a namespaced topic ## in the format `////` - "/" & topic.application & "/" & topic.version & "/" & topic.name & "/" & topic.encoding + ## Autosharding adds 2 optional prefixes `//bias + var formatted = "" + + if topic.generation.isSome(): + formatted = formatted & "/" & $topic.generation.get() + + if topic.bias != ShardingBias.Unbiased: + formatted = formatted & "/" & $topic.bias + + formatted & "/" & topic.application & "/" & topic.version & "/" & topic.name & "/" & topic.encoding # Deserialization proc parse*(T: type NsContentTopic, topic: ContentTopic|string): ParsingResult[NsContentTopic] = ## Splits a namespaced topic string into its constituent parts. ## The topic string has to be in the format `////` + ## Autosharding adds 2 optional prefixes `//bias if not topic.startsWith("/"): return err(ParsingError.invalidFormat("topic must start with slash")) let parts = topic[1.. 0 are not supported yet") + + ok((shardCount)) + +proc biasedWeights*(shardCount: int, bias: ShardingBias): seq[float64] = + var weights = repeat(1.0, shardCount) + + case bias: + of Unbiased: + return weights + of Lower20: + # we choose the lower 20% of shards and double their weigths + let index = shardCount div 5 + for i in (0..