From aac457ec5ba24fa7d553d1456f6364ee8f6605e9 Mon Sep 17 00:00:00 2001 From: Alejandro Cabeza Romero Date: Wed, 4 Sep 2024 18:35:28 +0200 Subject: [PATCH] Implement flood publish with score test --- tests/pubsub/testgossipsub.nim | 51 ++++++++++++++++++++++++++++++++++ tests/pubsub/utils.nim | 3 +- 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 02fb0e0c5..6abe6707e 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -1422,3 +1422,54 @@ suite "Gossipsub Parameters": (await handlerFuture2.waitForResult()).isOk await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) + + asyncTest "flood publish to all peers with score above threshold, regardless of subscription": + # Given 3 nodes + let + numberOfNodes = 3 + topic = "foobar" + nodes = generateNodes(numberOfNodes, gossip = true, floodPublish = true) + nodesFut = nodes.mapIt(it.switch.start()) + g0 = GossipSub(nodes[0]) + + # Nodes 1 and 2 are connected to node 0 + await nodes[0].switch.connect(nodes[1].peerInfo.peerId, nodes[1].peerInfo.addrs) + await nodes[0].switch.connect(nodes[2].peerInfo.peerId, nodes[2].peerInfo.addrs) + + # Given 2 handlers + var + handlerFut1 = newFuture[bool]() + handlerFut2 = newFuture[bool]() + + proc handler1(topic: string, data: seq[byte]) {.async.} = + handlerFut1.complete(true) + + proc handler2(topic: string, data: seq[byte]) {.async.} = + handlerFut2.complete(true) + + # Nodes are subscribed to the same topic + nodes[1].subscribe(topic, handler1) + nodes[2].subscribe(topic, handler2) + await sleepAsync(1.seconds) + + # Given node 2's score is below the threshold + for peer in g0.gossipsub.getOrDefault(topic): + if peer.peerId == nodes[2].peerInfo.peerId: + peer.score = (g0.parameters.publishThreshold - 1) + + # When node 0 publishes a message to topic "foo" + let message = "Hello!".toBytes() + check (await nodes[0].publish(topic, message)) > 0 + await sleepAsync(3.seconds) + + # Then only node 1 should receive the message + let + result1 = await handlerFut1.waitForResult(DURATION_TIMEOUT) + result2 = await handlerFut2.waitForResult(DURATION_TIMEOUT) + check: + result1.isOk and result1.get == true + result2.isErr + + # Cleanup + await allFuturesThrowing(nodes.mapIt(it.switch.stop())) + await allFuturesThrowing(nodesFut) diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index ba517bde6..662ea204a 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -80,6 +80,7 @@ proc generateNodes*( overheadRateLimit: Opt[tuple[bytes: int, interval: Duration]] = Opt.none(tuple[bytes: int, interval: Duration]), gossipSubVersion: string = "", + floodPublish: bool = false, dValues: Option[DValues] = DValues.none(), ): seq[PubSub] = for i in 0 ..< num: @@ -98,7 +99,7 @@ proc generateNodes*( maxMessageSize = maxMessageSize, parameters = ( var p = GossipSubParams.init() - p.floodPublish = false + p.floodPublish = floodPublish p.historyLength = 20 p.historyGossip = 20 p.unsubscribeBackoff = unsubscribeBackoff