mirror of https://github.com/waku-org/nwaku.git
chore: rename NsPubsubTopic (#2974)
This commit is contained in:
parent
b8550c5550
commit
67439057fb
|
@ -186,11 +186,11 @@ suite "Sharding":
|
||||||
# Given a connected server and client subscribed to different content topics
|
# Given a connected server and client subscribed to different content topics
|
||||||
let
|
let
|
||||||
contentTopic1 = "/toychat/2/huilong/proto"
|
contentTopic1 = "/toychat/2/huilong/proto"
|
||||||
pubsubTopic1 = "/waku/2/rs/0/58355"
|
shard1 = "/waku/2/rs/0/58355"
|
||||||
pubsubTopic12 = NsPubsubTopic.parse(contentTopic1)
|
shard12 = RelayShard.parse(contentTopic1)
|
||||||
# Automatically generated from the contentTopic above
|
# Automatically generated from the contentTopic above
|
||||||
contentTopic2 = "/0/toychat2/2/huilong/proto"
|
contentTopic2 = "/0/toychat2/2/huilong/proto"
|
||||||
pubsubTopic2 = "/waku/2/rs/0/23286"
|
shard2 = "/waku/2/rs/0/23286"
|
||||||
# Automatically generated from the contentTopic above
|
# Automatically generated from the contentTopic above
|
||||||
|
|
||||||
let
|
let
|
||||||
|
@ -201,7 +201,7 @@ suite "Sharding":
|
||||||
|
|
||||||
# When the server publishes a message in the server's subscribed topic
|
# When the server publishes a message in the server's subscribed topic
|
||||||
discard await server.publish(
|
discard await server.publish(
|
||||||
some(pubsubTopic1),
|
some(shard1),
|
||||||
WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic1),
|
WakuMessage(payload: "message1".toBytes(), contentTopic: contentTopic1),
|
||||||
)
|
)
|
||||||
let
|
let
|
||||||
|
@ -216,7 +216,7 @@ suite "Sharding":
|
||||||
serverHandler.reset()
|
serverHandler.reset()
|
||||||
clientHandler.reset()
|
clientHandler.reset()
|
||||||
discard await client.publish(
|
discard await client.publish(
|
||||||
some(pubsubTopic2),
|
some(shard2),
|
||||||
WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic2),
|
WakuMessage(payload: "message2".toBytes(), contentTopic: contentTopic2),
|
||||||
)
|
)
|
||||||
let
|
let
|
||||||
|
|
|
@ -280,7 +280,7 @@ suite "Waku ENR - Relay static sharding":
|
||||||
clusterId: uint16 = 22
|
clusterId: uint16 = 22
|
||||||
shardId: uint16 = 1
|
shardId: uint16 = 1
|
||||||
|
|
||||||
let topic = NsPubsubTopic.staticSharding(clusterId, shardId)
|
let shard = RelayShard.staticSharding(clusterId, shardId)
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let shardsTopics = RelayShards.init(clusterId, shardId).expect("Valid Shards")
|
let shardsTopics = RelayShards.init(clusterId, shardId).expect("Valid Shards")
|
||||||
|
@ -290,16 +290,16 @@ suite "Waku ENR - Relay static sharding":
|
||||||
shardsTopics.clusterId == clusterId
|
shardsTopics.clusterId == clusterId
|
||||||
shardsTopics.shardIds == @[1u16]
|
shardsTopics.shardIds == @[1u16]
|
||||||
|
|
||||||
let topics = shardsTopics.topics.mapIt($it)
|
let shards = shardsTopics.topics.mapIt($it)
|
||||||
check:
|
check:
|
||||||
topics == @[$topic]
|
shards == @[$shard]
|
||||||
|
|
||||||
check:
|
check:
|
||||||
shardsTopics.contains(clusterId, shardId)
|
shardsTopics.contains(clusterId, shardId)
|
||||||
not shardsTopics.contains(clusterId, 33u16)
|
not shardsTopics.contains(clusterId, 33u16)
|
||||||
not shardsTopics.contains(20u16, 33u16)
|
not shardsTopics.contains(20u16, 33u16)
|
||||||
|
|
||||||
shardsTopics.contains(topic)
|
shardsTopics.contains(shard)
|
||||||
shardsTopics.contains("/waku/2/rs/22/1")
|
shardsTopics.contains("/waku/2/rs/22/1")
|
||||||
|
|
||||||
test "new relay shards object with repeated but valid shard ids":
|
test "new relay shards object with repeated but valid shard ids":
|
||||||
|
|
|
@ -3,19 +3,19 @@ import std/[tables, sequtils, options]
|
||||||
import waku/waku_core/topics, ../testlib/wakucore
|
import waku/waku_core/topics, ../testlib/wakucore
|
||||||
|
|
||||||
proc `==`*(
|
proc `==`*(
|
||||||
table: Table[pubsub_topic.NsPubsubTopic, seq[NsContentTopic]],
|
table: Table[pubsub_topic.RelayShard, seq[NsContentTopic]],
|
||||||
other: array[0 .. 0, (string, seq[string])],
|
other: array[0 .. 0, (string, seq[string])],
|
||||||
): bool =
|
): bool =
|
||||||
let otherTyped = other.map(
|
let otherTyped = other.map(
|
||||||
proc(item: (string, seq[string])): (NsPubsubTopic, seq[NsContentTopic]) =
|
proc(item: (string, seq[string])): (RelayShard, seq[NsContentTopic]) =
|
||||||
let
|
let
|
||||||
(pubsubTopic, contentTopics) = item
|
(pubsubTopic, contentTopics) = item
|
||||||
nsPubsubTopic = NsPubsubTopic.parse(pubsubTopic).value()
|
shard = RelayShard.parse(pubsubTopic).value()
|
||||||
nsContentTopics = contentTopics.map(
|
nsContentTopics = contentTopics.map(
|
||||||
proc(contentTopic: string): NsContentTopic =
|
proc(contentTopic: string): NsContentTopic =
|
||||||
NsContentTopic.parse(contentTopic).value()
|
NsContentTopic.parse(contentTopic).value()
|
||||||
)
|
)
|
||||||
return (nsPubsubTopic, nsContentTopics)
|
return (shard, nsContentTopics)
|
||||||
)
|
)
|
||||||
|
|
||||||
table == otherTyped.toTable()
|
table == otherTyped.toTable()
|
||||||
|
|
|
@ -79,7 +79,7 @@ proc newTestWakuNode*(
|
||||||
|
|
||||||
let clusterId =
|
let clusterId =
|
||||||
if pubsubTopics.len() > 0:
|
if pubsubTopics.len() > 0:
|
||||||
NsPubsubTopic.parse(pubsubTopics[0]).get().clusterId
|
RelayShard.parse(pubsubTopics[0]).get().clusterId
|
||||||
else:
|
else:
|
||||||
1.uint16
|
1.uint16
|
||||||
|
|
||||||
|
|
|
@ -136,10 +136,10 @@ suite "Waku Message - Content topics namespacing":
|
||||||
suite "Waku Message - Pub-sub topics namespacing":
|
suite "Waku Message - Pub-sub topics namespacing":
|
||||||
test "Stringify static sharding pub-sub topic":
|
test "Stringify static sharding pub-sub topic":
|
||||||
## Given
|
## Given
|
||||||
var ns = NsPubsubTopic.staticSharding(clusterId = 0, shardId = 2)
|
var shard = RelayShard.staticSharding(clusterId = 0, shardId = 2)
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let topic = $ns
|
let topic = $shard
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
check:
|
check:
|
||||||
|
@ -150,11 +150,11 @@ suite "Waku Message - Pub-sub topics namespacing":
|
||||||
let topic = "/waku/2/waku-dev"
|
let topic = "/waku/2/waku-dev"
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let nsRes = NsPubsubTopic.parse(topic)
|
let shardRes = RelayShard.parse(topic)
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
check nsRes.isErr()
|
check shardRes.isErr()
|
||||||
let err = nsRes.tryError()
|
let err = shardRes.tryError()
|
||||||
check:
|
check:
|
||||||
err.kind == ParsingErrorKind.InvalidFormat
|
err.kind == ParsingErrorKind.InvalidFormat
|
||||||
|
|
||||||
|
@ -163,26 +163,26 @@ suite "Waku Message - Pub-sub topics namespacing":
|
||||||
let topic = "/waku/2/rs/16/42"
|
let topic = "/waku/2/rs/16/42"
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let nsRes = NsPubsubTopic.parse(topic)
|
let shardRes = RelayShard.parse(topic)
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
check nsRes.isOk()
|
check shardRes.isOk()
|
||||||
|
|
||||||
let ns = nsRes.get()
|
let shard = shardRes.get()
|
||||||
check:
|
check:
|
||||||
ns.clusterId == 16
|
shard.clusterId == 16
|
||||||
ns.shardId == 42
|
shard.shardId == 42
|
||||||
|
|
||||||
test "Parse pub-sub topic string - Invalid string: invalid protocol version":
|
test "Parse pub-sub topic string - Invalid string: invalid protocol version":
|
||||||
## Given
|
## Given
|
||||||
let topic = "/waku/1/rs/16/42"
|
let topic = "/waku/1/rs/16/42"
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let ns = NsPubsubTopic.parse(topic)
|
let shard = RelayShard.parse(topic)
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
check ns.isErr()
|
check shard.isErr()
|
||||||
let err = ns.tryError()
|
let err = shard.tryError()
|
||||||
check:
|
check:
|
||||||
err.kind == ParsingErrorKind.InvalidFormat
|
err.kind == ParsingErrorKind.InvalidFormat
|
||||||
|
|
||||||
|
@ -191,11 +191,11 @@ suite "Waku Message - Pub-sub topics namespacing":
|
||||||
let topic = "/waku/2/rs//02"
|
let topic = "/waku/2/rs//02"
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let ns = NsPubsubTopic.parse(topic)
|
let shard = RelayShard.parse(topic)
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
check ns.isErr()
|
check shard.isErr()
|
||||||
let err = ns.tryError()
|
let err = shard.tryError()
|
||||||
check:
|
check:
|
||||||
err.kind == ParsingErrorKind.MissingPart
|
err.kind == ParsingErrorKind.MissingPart
|
||||||
err.part == "cluster_id"
|
err.part == "cluster_id"
|
||||||
|
@ -205,10 +205,10 @@ suite "Waku Message - Pub-sub topics namespacing":
|
||||||
let topic = "/waku/2/rs/xx/77"
|
let topic = "/waku/2/rs/xx/77"
|
||||||
|
|
||||||
## When
|
## When
|
||||||
let ns = NsPubsubTopic.parse(topic)
|
let shard = RelayShard.parse(topic)
|
||||||
|
|
||||||
## Then
|
## Then
|
||||||
check ns.isErr()
|
check shard.isErr()
|
||||||
let err = ns.tryError()
|
let err = shard.tryError()
|
||||||
check:
|
check:
|
||||||
err.kind == ParsingErrorKind.InvalidFormat
|
err.kind == ParsingErrorKind.InvalidFormat
|
||||||
|
|
|
@ -6,15 +6,15 @@ import waku/waku_core/topics/pubsub_topic, ../../testlib/[wakucore]
|
||||||
|
|
||||||
suite "Static Sharding Functionality":
|
suite "Static Sharding Functionality":
|
||||||
test "Shard Cluster Identification":
|
test "Shard Cluster Identification":
|
||||||
let topic = NsPubsubTopic.parseStaticSharding("/waku/2/rs/0/1").get()
|
let shard = RelayShard.parseStaticSharding("/waku/2/rs/0/1").get()
|
||||||
check:
|
check:
|
||||||
topic.clusterId == 0
|
shard.clusterId == 0
|
||||||
topic.shardId == 1
|
shard.shardId == 1
|
||||||
topic == NsPubsubTopic.staticSharding(0, 1)
|
shard == RelayShard.staticSharding(0, 1)
|
||||||
|
|
||||||
test "Pubsub Topic Naming Compliance":
|
test "Pubsub Topic Naming Compliance":
|
||||||
let topic = NsPubsubTopic.staticSharding(0, 1)
|
let shard = RelayShard.staticSharding(0, 1)
|
||||||
check:
|
check:
|
||||||
topic.clusterId == 0
|
shard.clusterId == 0
|
||||||
topic.shardId == 1
|
shard.shardId == 1
|
||||||
topic == "/waku/2/rs/0/1"
|
shard == "/waku/2/rs/0/1"
|
||||||
|
|
|
@ -41,39 +41,29 @@ suite "Autosharding":
|
||||||
|
|
||||||
# When we generate a gen0 shard from them
|
# When we generate a gen0 shard from them
|
||||||
let
|
let
|
||||||
nsPubsubTopic1 =
|
shard1 = sharding.getGenZeroShard(nsContentTopic1, GenerationZeroShardsCount)
|
||||||
sharding.getGenZeroShard(nsContentTopic1, GenerationZeroShardsCount)
|
shard2 = sharding.getGenZeroShard(nsContentTopic2, GenerationZeroShardsCount)
|
||||||
nsPubsubTopic2 =
|
shard3 = sharding.getGenZeroShard(nsContentTopic3, GenerationZeroShardsCount)
|
||||||
sharding.getGenZeroShard(nsContentTopic2, GenerationZeroShardsCount)
|
shard4 = sharding.getGenZeroShard(nsContentTopic4, GenerationZeroShardsCount)
|
||||||
nsPubsubTopic3 =
|
shard5 = sharding.getGenZeroShard(nsContentTopic5, GenerationZeroShardsCount)
|
||||||
sharding.getGenZeroShard(nsContentTopic3, GenerationZeroShardsCount)
|
shard6 = sharding.getGenZeroShard(nsContentTopic6, GenerationZeroShardsCount)
|
||||||
nsPubsubTopic4 =
|
shard7 = sharding.getGenZeroShard(nsContentTopic7, GenerationZeroShardsCount)
|
||||||
sharding.getGenZeroShard(nsContentTopic4, GenerationZeroShardsCount)
|
shard8 = sharding.getGenZeroShard(nsContentTopic8, GenerationZeroShardsCount)
|
||||||
nsPubsubTopic5 =
|
shard9 = sharding.getGenZeroShard(nsContentTopic9, GenerationZeroShardsCount)
|
||||||
sharding.getGenZeroShard(nsContentTopic5, GenerationZeroShardsCount)
|
shard10 = sharding.getGenZeroShard(nsContentTopic10, GenerationZeroShardsCount)
|
||||||
nsPubsubTopic6 =
|
|
||||||
sharding.getGenZeroShard(nsContentTopic6, GenerationZeroShardsCount)
|
|
||||||
nsPubsubTopic7 =
|
|
||||||
sharding.getGenZeroShard(nsContentTopic7, GenerationZeroShardsCount)
|
|
||||||
nsPubsubTopic8 =
|
|
||||||
sharding.getGenZeroShard(nsContentTopic8, GenerationZeroShardsCount)
|
|
||||||
nsPubsubTopic9 =
|
|
||||||
sharding.getGenZeroShard(nsContentTopic9, GenerationZeroShardsCount)
|
|
||||||
nsPubsubTopic10 =
|
|
||||||
sharding.getGenZeroShard(nsContentTopic10, GenerationZeroShardsCount)
|
|
||||||
|
|
||||||
# Then the generated shards are valid
|
# Then the generated shards are valid
|
||||||
check:
|
check:
|
||||||
nsPubsubTopic1 == NsPubsubTopic.staticSharding(ClusterId, 3)
|
shard1 == RelayShard.staticSharding(ClusterId, 3)
|
||||||
nsPubsubTopic2 == NsPubsubTopic.staticSharding(ClusterId, 3)
|
shard2 == RelayShard.staticSharding(ClusterId, 3)
|
||||||
nsPubsubTopic3 == NsPubsubTopic.staticSharding(ClusterId, 6)
|
shard3 == RelayShard.staticSharding(ClusterId, 6)
|
||||||
nsPubsubTopic4 == NsPubsubTopic.staticSharding(ClusterId, 6)
|
shard4 == RelayShard.staticSharding(ClusterId, 6)
|
||||||
nsPubsubTopic5 == NsPubsubTopic.staticSharding(ClusterId, 3)
|
shard5 == RelayShard.staticSharding(ClusterId, 3)
|
||||||
nsPubsubTopic6 == NsPubsubTopic.staticSharding(ClusterId, 3)
|
shard6 == RelayShard.staticSharding(ClusterId, 3)
|
||||||
nsPubsubTopic7 == NsPubsubTopic.staticSharding(ClusterId, 3)
|
shard7 == RelayShard.staticSharding(ClusterId, 3)
|
||||||
nsPubsubTopic8 == NsPubsubTopic.staticSharding(ClusterId, 3)
|
shard8 == RelayShard.staticSharding(ClusterId, 3)
|
||||||
nsPubsubTopic9 == NsPubsubTopic.staticSharding(ClusterId, 7)
|
shard9 == RelayShard.staticSharding(ClusterId, 7)
|
||||||
nsPubsubTopic10 == NsPubsubTopic.staticSharding(ClusterId, 3)
|
shard10 == RelayShard.staticSharding(ClusterId, 3)
|
||||||
|
|
||||||
suite "getShard from NsContentTopic":
|
suite "getShard from NsContentTopic":
|
||||||
test "Generate Gen0 Shard with topic.generation==none":
|
test "Generate Gen0 Shard with topic.generation==none":
|
||||||
|
@ -81,72 +71,72 @@ suite "Autosharding":
|
||||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||||
|
|
||||||
# When we get a shard from a topic without generation
|
# When we get a shard from a topic without generation
|
||||||
let nsPubsubTopic = sharding.getShard(contentTopicShort)
|
let shard = sharding.getShard(contentTopicShort)
|
||||||
|
|
||||||
# Then the generated shard is valid
|
# Then the generated shard is valid
|
||||||
check:
|
check:
|
||||||
nsPubsubTopic.value() == NsPubsubTopic.staticSharding(ClusterId, 3)
|
shard.value() == RelayShard.staticSharding(ClusterId, 3)
|
||||||
|
|
||||||
test "Generate Gen0 Shard with topic.generation==0":
|
test "Generate Gen0 Shard with topic.generation==0":
|
||||||
let sharding =
|
let sharding =
|
||||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||||
# When we get a shard from a gen0 topic
|
# When we get a shard from a gen0 topic
|
||||||
let nsPubsubTopic = sharding.getShard(contentTopicFull)
|
let shard = sharding.getShard(contentTopicFull)
|
||||||
|
|
||||||
# Then the generated shard is valid
|
# Then the generated shard is valid
|
||||||
check:
|
check:
|
||||||
nsPubsubTopic.value() == NsPubsubTopic.staticSharding(ClusterId, 3)
|
shard.value() == RelayShard.staticSharding(ClusterId, 3)
|
||||||
|
|
||||||
test "Generate Gen0 Shard with topic.generation==other":
|
test "Generate Gen0 Shard with topic.generation==other":
|
||||||
let sharding =
|
let sharding =
|
||||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||||
# When we get a shard from ain invalid content topic
|
# When we get a shard from ain invalid content topic
|
||||||
let nsPubsubTopic = sharding.getShard(contentTopicInvalid)
|
let shard = sharding.getShard(contentTopicInvalid)
|
||||||
|
|
||||||
# Then the generated shard is valid
|
# Then the generated shard is valid
|
||||||
check:
|
check:
|
||||||
nsPubsubTopic.error() == "Generation > 0 are not supported yet"
|
shard.error() == "Generation > 0 are not supported yet"
|
||||||
|
|
||||||
suite "getShard from ContentTopic":
|
suite "getShard from ContentTopic":
|
||||||
test "Generate Gen0 Shard with topic.generation==none":
|
test "Generate Gen0 Shard with topic.generation==none":
|
||||||
let sharding =
|
let sharding =
|
||||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||||
# When we get a shard from it
|
# When we get a shard from it
|
||||||
let nsPubsubTopic = sharding.getShard(contentTopicShort)
|
let shard = sharding.getShard(contentTopicShort)
|
||||||
|
|
||||||
# Then the generated shard is valid
|
# Then the generated shard is valid
|
||||||
check:
|
check:
|
||||||
nsPubsubTopic.value() == NsPubsubTopic.staticSharding(ClusterId, 3)
|
shard.value() == RelayShard.staticSharding(ClusterId, 3)
|
||||||
|
|
||||||
test "Generate Gen0 Shard with topic.generation==0":
|
test "Generate Gen0 Shard with topic.generation==0":
|
||||||
let sharding =
|
let sharding =
|
||||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||||
# When we get a shard from it
|
# When we get a shard from it
|
||||||
let nsPubsubTopic = sharding.getShard(contentTopicFull)
|
let shard = sharding.getShard(contentTopicFull)
|
||||||
|
|
||||||
# Then the generated shard is valid
|
# Then the generated shard is valid
|
||||||
check:
|
check:
|
||||||
nsPubsubTopic.value() == NsPubsubTopic.staticSharding(ClusterId, 3)
|
shard.value() == RelayShard.staticSharding(ClusterId, 3)
|
||||||
|
|
||||||
test "Generate Gen0 Shard with topic.generation==other":
|
test "Generate Gen0 Shard with topic.generation==other":
|
||||||
let sharding =
|
let sharding =
|
||||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||||
# When we get a shard from it
|
# When we get a shard from it
|
||||||
let nsPubsubTopic = sharding.getShard(contentTopicInvalid)
|
let shard = sharding.getShard(contentTopicInvalid)
|
||||||
|
|
||||||
# Then the generated shard is valid
|
# Then the generated shard is valid
|
||||||
check:
|
check:
|
||||||
nsPubsubTopic.error() == "Generation > 0 are not supported yet"
|
shard.error() == "Generation > 0 are not supported yet"
|
||||||
|
|
||||||
test "Generate Gen0 Shard invalid topic":
|
test "Generate Gen0 Shard invalid topic":
|
||||||
let sharding =
|
let sharding =
|
||||||
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
|
||||||
# When we get a shard from it
|
# When we get a shard from it
|
||||||
let nsPubsubTopic = sharding.getShard("invalid")
|
let shard = sharding.getShard("invalid")
|
||||||
|
|
||||||
# Then the generated shard is valid
|
# Then the generated shard is valid
|
||||||
check:
|
check:
|
||||||
nsPubsubTopic.error() == "invalid format: topic must start with slash"
|
shard.error() == "invalid format: topic must start with slash"
|
||||||
|
|
||||||
suite "parseSharding":
|
suite "parseSharding":
|
||||||
test "contentTopics is ContentTopic":
|
test "contentTopics is ContentTopic":
|
||||||
|
|
|
@ -17,16 +17,16 @@ const DefaultPubsubTopic* = PubsubTopic("/waku/2/rs/0/0")
|
||||||
|
|
||||||
## Namespaced pub-sub topic
|
## Namespaced pub-sub topic
|
||||||
|
|
||||||
type NsPubsubTopic* = object
|
type RelayShard* = object
|
||||||
clusterId*: uint16
|
clusterId*: uint16
|
||||||
shardId*: uint16
|
shardId*: uint16
|
||||||
|
|
||||||
proc staticSharding*(T: type NsPubsubTopic, clusterId, shardId: uint16): T =
|
proc staticSharding*(T: type RelayShard, clusterId, shardId: uint16): T =
|
||||||
return NsPubsubTopic(clusterId: clusterId, shardId: shardId)
|
return RelayShard(clusterId: clusterId, shardId: shardId)
|
||||||
|
|
||||||
# Serialization
|
# Serialization
|
||||||
|
|
||||||
proc `$`*(topic: NsPubsubTopic): string =
|
proc `$`*(topic: RelayShard): string =
|
||||||
## Returns a string representation of a namespaced topic
|
## Returns a string representation of a namespaced topic
|
||||||
## in the format `/waku/2/rs/<cluster-id>/<shard-id>
|
## in the format `/waku/2/rs/<cluster-id>/<shard-id>
|
||||||
return "/waku/2/rs/" & $topic.clusterId & "/" & $topic.shardId
|
return "/waku/2/rs/" & $topic.clusterId & "/" & $topic.shardId
|
||||||
|
@ -38,8 +38,8 @@ const
|
||||||
StaticShardingPubsubTopicPrefix = Waku2PubsubTopicPrefix & "/rs"
|
StaticShardingPubsubTopicPrefix = Waku2PubsubTopicPrefix & "/rs"
|
||||||
|
|
||||||
proc parseStaticSharding*(
|
proc parseStaticSharding*(
|
||||||
T: type NsPubsubTopic, topic: PubsubTopic
|
T: type RelayShard, topic: PubsubTopic
|
||||||
): ParsingResult[NsPubsubTopic] =
|
): ParsingResult[RelayShard] =
|
||||||
if not topic.startsWith(StaticShardingPubsubTopicPrefix):
|
if not topic.startsWith(StaticShardingPubsubTopicPrefix):
|
||||||
return err(
|
return err(
|
||||||
ParsingError.invalidFormat("must start with " & StaticShardingPubsubTopicPrefix)
|
ParsingError.invalidFormat("must start with " & StaticShardingPubsubTopicPrefix)
|
||||||
|
@ -67,19 +67,19 @@ proc parseStaticSharding*(
|
||||||
ParsingError.invalidFormat($err)
|
ParsingError.invalidFormat($err)
|
||||||
)
|
)
|
||||||
|
|
||||||
ok(NsPubsubTopic.staticSharding(clusterId, shardId))
|
ok(RelayShard.staticSharding(clusterId, shardId))
|
||||||
|
|
||||||
proc parse*(T: type NsPubsubTopic, topic: PubsubTopic): ParsingResult[NsPubsubTopic] =
|
proc parse*(T: type RelayShard, topic: PubsubTopic): ParsingResult[RelayShard] =
|
||||||
## Splits a namespaced topic string into its constituent parts.
|
## Splits a namespaced topic string into its constituent parts.
|
||||||
## The topic string has to be in the format `/<application>/<version>/<topic-name>/<encoding>`
|
## The topic string has to be in the format `/<application>/<version>/<topic-name>/<encoding>`
|
||||||
NsPubsubTopic.parseStaticSharding(topic)
|
RelayShard.parseStaticSharding(topic)
|
||||||
|
|
||||||
# Pubsub topic compatibility
|
# Pubsub topic compatibility
|
||||||
|
|
||||||
converter toPubsubTopic*(topic: NsPubsubTopic): PubsubTopic =
|
converter toPubsubTopic*(topic: RelayShard): PubsubTopic =
|
||||||
$topic
|
$topic
|
||||||
|
|
||||||
proc `==`*[T: NsPubsubTopic](x, y: T): bool =
|
proc `==`*[T: RelayShard](x, y: T): bool =
|
||||||
if x.clusterId != y.clusterId:
|
if x.clusterId != y.clusterId:
|
||||||
return false
|
return false
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,7 @@ type Sharding* = object
|
||||||
proc new*(T: type Sharding, clusterId: uint16, shardCount: uint32): T =
|
proc new*(T: type Sharding, clusterId: uint16, shardCount: uint32): T =
|
||||||
return Sharding(clusterId: clusterId, shardCountGenZero: shardCount)
|
return Sharding(clusterId: clusterId, shardCountGenZero: shardCount)
|
||||||
|
|
||||||
proc getGenZeroShard*(s: Sharding, topic: NsContentTopic, count: int): NsPubsubTopic =
|
proc getGenZeroShard*(s: Sharding, topic: NsContentTopic, count: int): RelayShard =
|
||||||
let bytes = toBytes(topic.application) & toBytes(topic.version)
|
let bytes = toBytes(topic.application) & toBytes(topic.version)
|
||||||
|
|
||||||
let hash = sha256.digest(bytes)
|
let hash = sha256.digest(bytes)
|
||||||
|
@ -27,9 +27,9 @@ proc getGenZeroShard*(s: Sharding, topic: NsContentTopic, count: int): NsPubsubT
|
||||||
# This is equilavent to modulo shard count but faster
|
# This is equilavent to modulo shard count but faster
|
||||||
let shard = hashValue and uint64((count - 1))
|
let shard = hashValue and uint64((count - 1))
|
||||||
|
|
||||||
NsPubsubTopic.staticSharding(s.clusterId, uint16(shard))
|
RelayShard.staticSharding(s.clusterId, uint16(shard))
|
||||||
|
|
||||||
proc getShard*(s: Sharding, topic: NsContentTopic): Result[NsPubsubTopic, string] =
|
proc getShard*(s: Sharding, topic: NsContentTopic): Result[RelayShard, string] =
|
||||||
## Compute the (pubsub topic) shard to use for this content topic.
|
## Compute the (pubsub topic) shard to use for this content topic.
|
||||||
|
|
||||||
if topic.generation.isNone():
|
if topic.generation.isNone():
|
||||||
|
@ -54,14 +54,14 @@ proc parseSharding*(
|
||||||
s: Sharding,
|
s: Sharding,
|
||||||
pubsubTopic: Option[PubsubTopic],
|
pubsubTopic: Option[PubsubTopic],
|
||||||
contentTopics: ContentTopic | seq[ContentTopic],
|
contentTopics: ContentTopic | seq[ContentTopic],
|
||||||
): Result[Table[NsPubsubTopic, seq[NsContentTopic]], string] =
|
): Result[Table[RelayShard, seq[NsContentTopic]], string] =
|
||||||
var topics: seq[ContentTopic]
|
var topics: seq[ContentTopic]
|
||||||
when contentTopics is seq[ContentTopic]:
|
when contentTopics is seq[ContentTopic]:
|
||||||
topics = contentTopics
|
topics = contentTopics
|
||||||
else:
|
else:
|
||||||
topics = @[contentTopics]
|
topics = @[contentTopics]
|
||||||
|
|
||||||
var topicMap = initTable[NsPubsubTopic, seq[NsContentTopic]]()
|
var topicMap = initTable[RelayShard, seq[NsContentTopic]]()
|
||||||
for contentTopic in topics:
|
for contentTopic in topics:
|
||||||
let parseRes = NsContentTopic.parse(contentTopic)
|
let parseRes = NsContentTopic.parse(contentTopic)
|
||||||
|
|
||||||
|
@ -73,7 +73,7 @@ proc parseSharding*(
|
||||||
|
|
||||||
let pubsub =
|
let pubsub =
|
||||||
if pubsubTopic.isSome():
|
if pubsubTopic.isSome():
|
||||||
let parseRes = NsPubsubTopic.parse(pubsubTopic.get())
|
let parseRes = RelayShard.parse(pubsubTopic.get())
|
||||||
|
|
||||||
if parseRes.isErr():
|
if parseRes.isErr():
|
||||||
return err("Cannot parse pubsub topic: " & $parseRes.error)
|
return err("Cannot parse pubsub topic: " & $parseRes.error)
|
||||||
|
@ -97,7 +97,7 @@ proc parseSharding*(
|
||||||
|
|
||||||
ok(topicMap)
|
ok(topicMap)
|
||||||
|
|
||||||
#type ShardsPriority = seq[tuple[topic: NsPubsubTopic, value: float64]]
|
#type ShardsPriority = seq[tuple[topic: RelayShard, value: float64]]
|
||||||
|
|
||||||
#[ proc shardCount*(topic: NsContentTopic): Result[int, string] =
|
#[ proc shardCount*(topic: NsContentTopic): Result[int, string] =
|
||||||
## Returns the total shard count from the content topic.
|
## Returns the total shard count from the content topic.
|
||||||
|
@ -117,7 +117,7 @@ proc parseSharding*(
|
||||||
#[ proc applyWeight(hashValue: uint64, weight: float64): float64 =
|
#[ proc applyWeight(hashValue: uint64, weight: float64): float64 =
|
||||||
(-weight) / math.ln(float64(hashValue) / float64(high(uint64))) ]#
|
(-weight) / math.ln(float64(hashValue) / float64(high(uint64))) ]#
|
||||||
|
|
||||||
#[ proc hashOrder*(x, y: (NsPubsubTopic, float64)): int =
|
#[ proc hashOrder*(x, y: (RelayShard, float64)): int =
|
||||||
cmp(x[1], y[1]) ]#
|
cmp(x[1], y[1]) ]#
|
||||||
|
|
||||||
#[ proc weightedShardList*(topic: NsContentTopic, shardCount: int, weightList: seq[float64]): Result[ShardsPriority, string] =
|
#[ proc weightedShardList*(topic: NsContentTopic, shardCount: int, weightList: seq[float64]): Result[ShardsPriority, string] =
|
||||||
|
@ -127,10 +127,10 @@ proc parseSharding*(
|
||||||
|
|
||||||
let shardsNWeights = zip(toSeq(0..shardCount), weightList)
|
let shardsNWeights = zip(toSeq(0..shardCount), weightList)
|
||||||
|
|
||||||
var list = newSeq[(NsPubsubTopic, float64)](shardCount)
|
var list = newSeq[(RelayShard, float64)](shardCount)
|
||||||
|
|
||||||
for (shard, weight) in shardsNWeights:
|
for (shard, weight) in shardsNWeights:
|
||||||
let pubsub = NsPubsubTopic.staticSharding(ClusterId, uint16(shard))
|
let pubsub = RelayShard.staticSharding(ClusterId, uint16(shard))
|
||||||
|
|
||||||
let clusterBytes = toBytesBE(uint16(ClusterId))
|
let clusterBytes = toBytesBE(uint16(ClusterId))
|
||||||
let shardBytes = toBytesBE(uint16(shard))
|
let shardBytes = toBytesBE(uint16(shard))
|
||||||
|
@ -145,7 +145,7 @@ proc parseSharding*(
|
||||||
|
|
||||||
ok(list) ]#
|
ok(list) ]#
|
||||||
|
|
||||||
#[ proc singleHighestWeigthShard*(topic: NsContentTopic): Result[NsPubsubTopic, string] =
|
#[ proc singleHighestWeigthShard*(topic: NsContentTopic): Result[RelayShard, string] =
|
||||||
let count = ? shardCount(topic)
|
let count = ? shardCount(topic)
|
||||||
|
|
||||||
let weights = repeat(1.0, count)
|
let weights = repeat(1.0, count)
|
||||||
|
|
|
@ -24,8 +24,8 @@ type RelayShards* = object
|
||||||
clusterId*: uint16
|
clusterId*: uint16
|
||||||
shardIds*: seq[uint16]
|
shardIds*: seq[uint16]
|
||||||
|
|
||||||
func topics*(rs: RelayShards): seq[NsPubsubTopic] =
|
func topics*(rs: RelayShards): seq[RelayShard] =
|
||||||
rs.shardIds.mapIt(NsPubsubTopic.staticSharding(rs.clusterId, it))
|
rs.shardIds.mapIt(RelayShard.staticSharding(rs.clusterId, it))
|
||||||
|
|
||||||
func init*(T: type RelayShards, clusterId, shardId: uint16): Result[T, string] =
|
func init*(T: type RelayShards, clusterId, shardId: uint16): Result[T, string] =
|
||||||
if shardId > MaxShardIndex:
|
if shardId > MaxShardIndex:
|
||||||
|
@ -61,7 +61,7 @@ func topicsToRelayShards*(topics: seq[string]): Result[Option[RelayShards], stri
|
||||||
if topics.len < 1:
|
if topics.len < 1:
|
||||||
return ok(none(RelayShards))
|
return ok(none(RelayShards))
|
||||||
|
|
||||||
let parsedTopicsRes = topics.mapIt(NsPubsubTopic.parse(it))
|
let parsedTopicsRes = topics.mapIt(RelayShard.parse(it))
|
||||||
|
|
||||||
for res in parsedTopicsRes:
|
for res in parsedTopicsRes:
|
||||||
if res.isErr():
|
if res.isErr():
|
||||||
|
@ -80,11 +80,11 @@ func topicsToRelayShards*(topics: seq[string]): Result[Option[RelayShards], stri
|
||||||
func contains*(rs: RelayShards, clusterId, shardId: uint16): bool =
|
func contains*(rs: RelayShards, clusterId, shardId: uint16): bool =
|
||||||
return rs.clusterId == clusterId and rs.shardIds.contains(shardId)
|
return rs.clusterId == clusterId and rs.shardIds.contains(shardId)
|
||||||
|
|
||||||
func contains*(rs: RelayShards, topic: NsPubsubTopic): bool =
|
func contains*(rs: RelayShards, shard: RelayShard): bool =
|
||||||
return rs.contains(topic.clusterId, topic.shardId)
|
return rs.contains(shard.clusterId, shard.shardId)
|
||||||
|
|
||||||
func contains*(rs: RelayShards, topic: PubsubTopic): bool =
|
func contains*(rs: RelayShards, topic: PubsubTopic): bool =
|
||||||
let parseRes = NsPubsubTopic.parse(topic)
|
let parseRes = RelayShard.parse(topic)
|
||||||
if parseRes.isErr():
|
if parseRes.isErr():
|
||||||
return false
|
return false
|
||||||
|
|
||||||
|
@ -235,11 +235,11 @@ proc containsShard*(r: Record, clusterId, shardId: uint16): bool =
|
||||||
|
|
||||||
rs.contains(clusterId, shardId)
|
rs.contains(clusterId, shardId)
|
||||||
|
|
||||||
proc containsShard*(r: Record, topic: NsPubsubTopic): bool =
|
proc containsShard*(r: Record, shard: RelayShard): bool =
|
||||||
return containsShard(r, topic.clusterId, topic.shardId)
|
return containsShard(r, shard.clusterId, shard.shardId)
|
||||||
|
|
||||||
proc containsShard*(r: Record, topic: PubsubTopic): bool =
|
proc containsShard*(r: Record, topic: PubsubTopic): bool =
|
||||||
let parseRes = NsPubsubTopic.parse(topic)
|
let parseRes = RelayShard.parse(topic)
|
||||||
if parseRes.isErr():
|
if parseRes.isErr():
|
||||||
debug "invalid static sharding topic", topic = topic, error = parseRes.error
|
debug "invalid static sharding topic", topic = topic, error = parseRes.error
|
||||||
return false
|
return false
|
||||||
|
|
|
@ -128,17 +128,17 @@ proc subscriptionsListener(wm: WakuMetadata) {.async.} =
|
||||||
let events = await wm.topicSubscriptionQueue.waitEvents(key)
|
let events = await wm.topicSubscriptionQueue.waitEvents(key)
|
||||||
|
|
||||||
for event in events:
|
for event in events:
|
||||||
let parsedTopic = NsPubsubTopic.parse(event.topic).valueOr:
|
let parsedShard = RelayShard.parse(event.topic).valueOr:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if parsedTopic.clusterId != wm.clusterId:
|
if parsedShard.clusterId != wm.clusterId:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
case event.kind
|
case event.kind
|
||||||
of PubsubSub:
|
of PubsubSub:
|
||||||
wm.shards.incl(parsedTopic.shardId)
|
wm.shards.incl(parsedShard.shardId)
|
||||||
of PubsubUnsub:
|
of PubsubUnsub:
|
||||||
wm.shards.excl(parsedTopic.shardId)
|
wm.shards.excl(parsedShard.shardId)
|
||||||
else:
|
else:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue