test(sharding): Implement sharding tests (#2603)

* Implement sharding tests.
This commit is contained in:
Álex Cabeza Romero 2024-05-13 17:43:14 +02:00 committed by GitHub
parent 49dd6c1989
commit 6c3ad50455
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 1325 additions and 80 deletions

View File

@ -16,71 +16,10 @@ import
[node/waku_node, node/peer_manager, waku_core, waku_node, waku_rln_relay], [node/waku_node, node/peer_manager, waku_core, waku_node, waku_rln_relay],
../waku_store/store_utils, ../waku_store/store_utils,
../waku_archive/archive_utils, ../waku_archive/archive_utils,
../waku_relay/utils,
../testlib/[wakucore, wakunode, testasync, futures], ../testlib/[wakucore, wakunode, testasync, futures],
../resources/payloads ../resources/payloads
proc setupRln(node: WakuNode, identifier: uint) {.async.} =
await node.mountRlnRelay(
WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(identifier),
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_" & $identifier),
rlnEpochSizeSec: 1,
)
)
proc setupRelayWithRln(
node: WakuNode, identifier: uint, pubsubTopics: seq[string]
) {.async.} =
await node.mountRelay(pubsubTopics)
await setupRln(node, identifier)
proc subscribeCompletionHandler(node: WakuNode, pubsubTopic: string): Future[bool] =
var completionFut = newFuture[bool]()
proc relayHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
if topic == pubsubTopic:
completionFut.complete(true)
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
return completionFut
proc sendRlnMessage(
client: WakuNode,
pubsubTopic: string,
contentTopic: string,
completionFuture: Future[bool],
payload: seq[byte] = "Hello".toBytes(),
): Future[bool] {.async.} =
var message = WakuMessage(payload: payload, contentTopic: contentTopic)
doAssert(client.wakuRlnRelay.appendRLNProof(message, epochTime()).isOk())
discard await client.publish(some(pubsubTopic), message)
let isCompleted = await completionFuture.withTimeout(FUTURE_TIMEOUT)
return isCompleted
proc sendRlnMessageWithInvalidProof(
client: WakuNode,
pubsubTopic: string,
contentTopic: string,
completionFuture: Future[bool],
payload: seq[byte] = "Hello".toBytes(),
): Future[bool] {.async.} =
let
extraBytes: seq[byte] = @[byte(1), 2, 3]
rateLimitProofRes = client.wakuRlnRelay.groupManager.generateProof(
concat(payload, extraBytes),
# we add extra bytes to invalidate proof verification against original payload
client.wakuRlnRelay.getCurrentEpoch(),
)
rateLimitProof = rateLimitProofRes.get().encode().buffer
message =
WakuMessage(payload: @payload, contentTopic: contentTopic, proof: rateLimitProof)
discard await client.publish(some(pubsubTopic), message)
let isCompleted = await completionFuture.withTimeout(FUTURE_TIMEOUT)
return isCompleted
suite "Waku RlnRelay - End to End": suite "Waku RlnRelay - End to End":
var var
pubsubTopic {.threadvar.}: PubsubTopic pubsubTopic {.threadvar.}: PubsubTopic
@ -237,23 +176,29 @@ suite "Waku RlnRelay - End to End":
doAssert( doAssert(
client.wakuRlnRelay client.wakuRlnRelay
.appendRLNProof(message1b, epoch + client.wakuRlnRelay.rlnEpochSizeSec * 0) .appendRLNProof(
.isOk() message1b, epoch + float64(client.wakuRlnRelay.rlnEpochSizeSec * 0)
) )
doAssert(
client.wakuRlnRelay
.appendRLNProof(message1kib, epoch + client.wakuRlnRelay.rlnEpochSizeSec * 1)
.isOk()
)
doAssert(
client.wakuRlnRelay
.appendRLNProof(message150kib, epoch + client.wakuRlnRelay.rlnEpochSizeSec * 2)
.isOk() .isOk()
) )
doAssert( doAssert(
client.wakuRlnRelay client.wakuRlnRelay
.appendRLNProof( .appendRLNProof(
message151kibPlus, epoch + client.wakuRlnRelay.rlnEpochSizeSec * 3 message1kib, epoch + float64(client.wakuRlnRelay.rlnEpochSizeSec * 1)
)
.isOk()
)
doAssert(
client.wakuRlnRelay
.appendRLNProof(
message150kib, epoch + float64(client.wakuRlnRelay.rlnEpochSizeSec * 2)
)
.isOk()
)
doAssert(
client.wakuRlnRelay
.appendRLNProof(
message151kibPlus, epoch + float64(client.wakuRlnRelay.rlnEpochSizeSec * 3)
) )
.isOk() .isOk()
) )
@ -317,9 +262,11 @@ suite "Waku RlnRelay - End to End":
WakuMessage(payload: @payload150kibPlus, contentTopic: contentTopic) WakuMessage(payload: @payload150kibPlus, contentTopic: contentTopic)
doAssert( doAssert(
client.wakuRlnRelay.appendRLNProof( client.wakuRlnRelay
message151kibPlus, epoch + client.wakuRlnRelay.rlnEpochSizeSec * 3 .appendRLNProof(
message151kibPlus, epoch + float64(client.wakuRlnRelay.rlnEpochSizeSec * 3)
) )
.isOk()
) )
# When sending the 150KiB plus message # When sending the 150KiB plus message

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,4 @@
import chronos import chronos
template assertResultOk*[T, E](result: Result[T, E]) = template assertResultOk*[T, E](result: Result[T, E]) =
assert result.isOk(), result.error() assert result.isOk(), $result.error()

View File

@ -41,3 +41,8 @@ proc waitForResult*[T](
): Future[Result[T, string]] {.async.} = ): Future[Result[T, string]] {.async.} =
discard await future.withTimeout(timeout) discard await future.withTimeout(timeout)
return future.toResult() return future.toResult()
proc reset*[T](future: Future[T]): void =
# Likely an incomplete reset, but good enough for testing purposes (for now)
future.internalError = nil
future.internalState = FutureState.Pending

View File

@ -0,0 +1,44 @@
{.used.}
import std/[options], testutils/unittests, results
import ../../../../waku/[waku_core/topics/pubsub_topic], ../../testlib/[wakucore]
suite "Static Sharding Functionality":
test "Shard Cluster Identification":
let topic = NsPubsubTopic.parseStaticSharding("/waku/2/rs/0/1").get()
check:
topic.clusterId == 0
topic.shardId == 1
topic == NsPubsubTopic.staticSharding(0, 1)
test "Pubsub Topic Naming Compliance":
let topic = NsPubsubTopic.staticSharding(0, 1)
check:
topic.clusterId == 0
topic.shardId == 1
topic == "/waku/2/rs/0/1"
suite "Automatic Sharding Mechanics":
test "Shard Selection Algorithm":
let
topic1 = NsPubsubTopic.parseNamedSharding("/waku/2/xxx").get()
topic2 = NsPubsubTopic.parseNamedSharding("/waku/2/123").get()
topic3 = NsPubsubTopic.parseNamedSharding("/waku/2/xxx123").get()
check:
# topic1.shardId == 1
# topic1.clusterId == 0
topic1 == NsPubsubTopic.staticSharding(0, 1)
# topic2.shardId == 1
# topic2.clusterId == 0
topic2 == NsPubsubTopic.staticSharding(0, 1)
# topic3.shardId == 1
# topic3.clusterId == 0
topic3 == NsPubsubTopic.staticSharding(0, 1)
test "Shard Selection Algorithm without topicName":
let topicResult = NsPubsubTopic.parseNamedSharding("/waku/2/")
check:
topicResult.isErr()

View File

@ -11,16 +11,33 @@ suite "Autosharding":
pubsubTopic13 = "/waku/2/rs/1/3" pubsubTopic13 = "/waku/2/rs/1/3"
contentTopicShort = "/toychat/2/huilong/proto" contentTopicShort = "/toychat/2/huilong/proto"
contentTopicFull = "/0/toychat/2/huilong/proto" contentTopicFull = "/0/toychat/2/huilong/proto"
contentTopicShort2 = "/toychat2/2/huilong/proto"
contentTopicFull2 = "/0/toychat2/2/huilong/proto"
contentTopicShort3 = "/toychat/2/huilong/proto2"
contentTopicFull3 = "/0/toychat/2/huilong/proto2"
contentTopicShort4 = "/toychat/4/huilong/proto2"
contentTopicFull4 = "/0/toychat/4/huilong/proto2"
contentTopicFull5 = "/1/toychat/2/huilong/proto"
contentTopicFull6 = "/1/toychat2/2/huilong/proto"
contentTopicInvalid = "/1/toychat/2/huilong/proto" contentTopicInvalid = "/1/toychat/2/huilong/proto"
suite "getGenZeroShard": suite "getGenZeroShard":
test "Generate Gen0 Shard": test "Generate Gen0 Shard":
let sharding = let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount) Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
# Given two valid topics # Given two valid topics
let let
nsContentTopic1 = NsContentTopic.parse(contentTopicShort).value() nsContentTopic1 = NsContentTopic.parse(contentTopicShort).value()
nsContentTopic2 = NsContentTopic.parse(contentTopicFull).value() nsContentTopic2 = NsContentTopic.parse(contentTopicFull).value()
nsContentTopic3 = NsContentTopic.parse(contentTopicShort2).value()
nsContentTopic4 = NsContentTopic.parse(contentTopicFull2).value()
nsContentTopic5 = NsContentTopic.parse(contentTopicShort3).value()
nsContentTopic6 = NsContentTopic.parse(contentTopicFull3).value()
nsContentTopic7 = NsContentTopic.parse(contentTopicShort3).value()
nsContentTopic8 = NsContentTopic.parse(contentTopicFull3).value()
nsContentTopic9 = NsContentTopic.parse(contentTopicFull4).value()
nsContentTopic10 = NsContentTopic.parse(contentTopicFull5).value()
# When we generate a gen0 shard from them # When we generate a gen0 shard from them
let let
@ -28,11 +45,35 @@ suite "Autosharding":
sharding.getGenZeroShard(nsContentTopic1, GenerationZeroShardsCount) sharding.getGenZeroShard(nsContentTopic1, GenerationZeroShardsCount)
nsPubsubTopic2 = nsPubsubTopic2 =
sharding.getGenZeroShard(nsContentTopic2, GenerationZeroShardsCount) sharding.getGenZeroShard(nsContentTopic2, GenerationZeroShardsCount)
nsPubsubTopic3 =
sharding.getGenZeroShard(nsContentTopic3, GenerationZeroShardsCount)
nsPubsubTopic4 =
sharding.getGenZeroShard(nsContentTopic4, GenerationZeroShardsCount)
nsPubsubTopic5 =
sharding.getGenZeroShard(nsContentTopic5, GenerationZeroShardsCount)
nsPubsubTopic6 =
sharding.getGenZeroShard(nsContentTopic6, GenerationZeroShardsCount)
nsPubsubTopic7 =
sharding.getGenZeroShard(nsContentTopic7, GenerationZeroShardsCount)
nsPubsubTopic8 =
sharding.getGenZeroShard(nsContentTopic8, GenerationZeroShardsCount)
nsPubsubTopic9 =
sharding.getGenZeroShard(nsContentTopic9, GenerationZeroShardsCount)
nsPubsubTopic10 =
sharding.getGenZeroShard(nsContentTopic10, GenerationZeroShardsCount)
# Then the generated shards are valid # Then the generated shards are valid
check: check:
nsPubsubTopic1 == NsPubsubTopic.staticSharding(ClusterId, 3) nsPubsubTopic1 == NsPubsubTopic.staticSharding(ClusterId, 3)
nsPubsubTopic2 == NsPubsubTopic.staticSharding(ClusterId, 3) nsPubsubTopic2 == NsPubsubTopic.staticSharding(ClusterId, 3)
nsPubsubTopic3 == NsPubsubTopic.staticSharding(ClusterId, 6)
nsPubsubTopic4 == NsPubsubTopic.staticSharding(ClusterId, 6)
nsPubsubTopic5 == NsPubsubTopic.staticSharding(ClusterId, 3)
nsPubsubTopic6 == NsPubsubTopic.staticSharding(ClusterId, 3)
nsPubsubTopic7 == NsPubsubTopic.staticSharding(ClusterId, 3)
nsPubsubTopic8 == NsPubsubTopic.staticSharding(ClusterId, 3)
nsPubsubTopic9 == NsPubsubTopic.staticSharding(ClusterId, 7)
nsPubsubTopic10 == NsPubsubTopic.staticSharding(ClusterId, 3)
suite "getShard from NsContentTopic": suite "getShard from NsContentTopic":
test "Generate Gen0 Shard with topic.generation==none": test "Generate Gen0 Shard with topic.generation==none":

View File

@ -9,7 +9,7 @@ import
eth/keys as eth_keys eth/keys as eth_keys
import import
../../../waku/[waku_enr, discovery/waku_discv5, waku_core], ../../../waku/[waku_enr, discovery/waku_discv5, waku_core, common/enr],
../testlib/wakucore, ../testlib/wakucore,
../waku_discv5/utils, ../waku_discv5/utils,
./utils ./utils
@ -114,3 +114,55 @@ suite "Sharding":
## Cleanup ## Cleanup
await node.stop() await node.stop()
suite "Discovery Mechanisms for Shards":
test "Index List Representation":
# Given a valid index list and its representation
let
indicesList: seq[uint8] = @[0, 73, 2, 0, 1, 0, 10]
clusterId: uint16 = 73 # bitVector's clusterId
shardIds: seq[uint16] = @[1u16, 10u16] # bitVector's shardIds
let
enrSeqNum = 1u64
enrPrivKey = generatesecp256k1key()
# When building an ENR with the index list
var builder = EnrBuilder.init(enrPrivKey, enrSeqNum)
builder.addFieldPair(ShardingIndicesListEnrField, indicesList)
let
record = builder.build().tryGet()
relayShards = record.toTyped().tryGet().relayShardingIndicesList().get()
# Then the ENR should be correctly parsed
check:
relayShards == RelayShards.init(clusterId, shardIds).expect("Valid Shards")
test "Bit Vector Representation":
# Given a valid bit vector and its representation
let
bitVector: seq[byte] =
@[
0, 73, 2, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
]
clusterId: uint16 = 73 # bitVector's clusterId
shardIds: seq[uint16] = @[1u16, 10u16] # bitVector's shardIds
let
enrSeqNum = 1u64
enrPrivKey = generatesecp256k1key()
# When building an ENR with the bit vector
var builder = EnrBuilder.init(enrPrivKey, enrSeqNum)
builder.addFieldPair(ShardingBitVectorEnrField, bitVector)
let
record = builder.build().tryGet()
relayShards = record.toTyped().tryGet().relayShardingBitVector().get()
# Then the ENR should be correctly parsed
check:
relayShards == RelayShards.init(clusterId, shardIds).expect("Valid Shards")

View File

@ -1,8 +1,30 @@
{.used.} {.used.}
import std/[strutils], stew/shims/net as stewNet, chronos import
std/[strutils, sequtils, tempfiles],
stew/byteutils,
stew/shims/net as stewNet,
testutils/unittests,
chronos,
libp2p/switch,
libp2p/protocols/pubsub/pubsub
import ../../../waku/waku_relay, ../../../waku/waku_core, ../testlib/wakucore from std/times import epochTime
import
../../../waku/
[
waku_relay,
node/waku_node,
node/peer_manager,
waku_core,
waku_node,
waku_rln_relay,
],
../waku_store/store_utils,
../waku_archive/archive_utils,
../testlib/[wakucore, wakunode, testasync, futures],
../resources/payloads
proc noopRawHandler*(): WakuRelayHandler = proc noopRawHandler*(): WakuRelayHandler =
var handler: WakuRelayHandler var handler: WakuRelayHandler
@ -19,3 +41,105 @@ proc newTestWakuRelay*(switch = newTestSwitch()): Future[WakuRelay] {.async.} =
switch.mount(proto, protocolMatcher) switch.mount(proto, protocolMatcher)
return proto return proto
proc setupRln*(node: WakuNode, identifier: uint) {.async.} =
await node.mountRlnRelay(
WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(identifier),
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_" & $identifier),
rlnEpochSizeSec: 1,
)
)
proc setupRelayWithRln*(
node: WakuNode, identifier: uint, pubsubTopics: seq[string]
) {.async.} =
await node.mountRelay(pubsubTopics)
await setupRln(node, identifier)
proc subscribeToContentTopicWithHandler*(
node: WakuNode, contentTopic: string
): Future[bool] =
var completionFut = newFuture[bool]()
proc relayHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
if topic == topic:
completionFut.complete(true)
node.subscribe((kind: ContentSub, topic: contentTopic), some(relayHandler))
return completionFut
proc subscribeCompletionHandler*(node: WakuNode, pubsubTopic: string): Future[bool] =
var completionFut = newFuture[bool]()
proc relayHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
if topic == pubsubTopic:
completionFut.complete(true)
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
return completionFut
proc sendRlnMessage*(
client: WakuNode,
pubsubTopic: string,
contentTopic: string,
completionFuture: Future[bool],
payload: seq[byte] = "Hello".toBytes(),
): Future[bool] {.async.} =
var message = WakuMessage(payload: payload, contentTopic: contentTopic)
doAssert(client.wakuRlnRelay.appendRLNProof(message, epochTime()).isOk())
discard await client.publish(some(pubsubTopic), message)
let isCompleted = await completionFuture.withTimeout(FUTURE_TIMEOUT)
return isCompleted
when defined(rln_v2):
proc sendRlnMessageWithInvalidProof*(
client: WakuNode,
pubsubTopic: string,
contentTopic: string,
completionFuture: Future[bool],
payload: seq[byte] = "Hello".toBytes(),
): Future[bool] {.async.} =
let
extraBytes: seq[byte] = @[byte(1), 2, 3]
rateLimitProofRes = client.wakuRlnRelay.groupManager.generateProof(
concat(payload, extraBytes),
# we add extra bytes to invalidate proof verification against original payload
client.wakuRlnRelay.getCurrentEpoch(),
messageId = MessageId(0),
)
rateLimitProof = rateLimitProofRes.get().encode().buffer
message = WakuMessage(
payload: @payload, contentTopic: contentTopic, proof: rateLimitProof
)
discard await client.publish(some(pubsubTopic), message)
let isCompleted = await completionFuture.withTimeout(FUTURE_TIMEOUT)
return isCompleted
else:
proc sendRlnMessageWithInvalidProof*(
client: WakuNode,
pubsubTopic: string,
contentTopic: string,
completionFuture: Future[bool],
payload: seq[byte] = "Hello".toBytes(),
): Future[bool] {.async.} =
let
extraBytes: seq[byte] = @[byte(1), 2, 3]
rateLimitProofRes = client.wakuRlnRelay.groupManager.generateProof(
concat(payload, extraBytes),
# we add extra bytes to invalidate proof verification against original payload
client.wakuRlnRelay.getCurrentEpoch(),
)
rateLimitProof = rateLimitProofRes.get().encode().buffer
message = WakuMessage(
payload: @payload, contentTopic: contentTopic, proof: rateLimitProof
)
discard await client.publish(some(pubsubTopic), message)
let isCompleted = await completionFuture.withTimeout(FUTURE_TIMEOUT)
return isCompleted