From e6a06dcf329e62a75e195a6151f89148f88c364e Mon Sep 17 00:00:00 2001 From: Alejandro Cabeza Romero Date: Wed, 4 Sep 2024 22:05:16 +0200 Subject: [PATCH] Implement gossip adaptive dissemination tests. --- tests/pubsub/testgossipsub.nim | 162 ++++++++++++++++++++++++++++++++- tests/pubsub/utils.nim | 4 + 2 files changed, 162 insertions(+), 4 deletions(-) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 6abe6707e..d62a3452a 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -1337,7 +1337,7 @@ suite "Gossipsub Parameters": nodesFut = await allFinished(nodes.mapIt(it.switch.start())) # All of them are checking for iHave messages - var receivedIHaves: seq[bool] = repeat(false, numberOfNodes) + var receivedIHaves: seq[int] = repeat(0, numberOfNodes) for i in 0 ..< numberOfNodes: var pubsubObserver: PubSubObserver capture i: @@ -1347,8 +1347,7 @@ suite "Gossipsub Parameters": if iHave.len > 0: for msg in iHave: if msg.topicID == topic: - receivedIHaves[i] = true - break + receivedIHaves[i] += 1 pubsubObserver = PubSubObserver(onRecv: checkForIhaves) @@ -1369,7 +1368,7 @@ suite "Gossipsub Parameters": # At least one of the nodes should have received an iHave message # The check is made this way because the mesh structure changes from run to run check: - anyIt(receivedIHaves, it == true) + anyIt(receivedIHaves, it > 0) await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) @@ -1473,3 +1472,158 @@ suite "Gossipsub Parameters": # Cleanup await allFuturesThrowing(nodes.mapIt(it.switch.stop())) await allFuturesThrowing(nodesFut) + + asyncTest "adaptive gossip dissemination, dLazy and gossipFactor to 0": + # Given 20 nodes + let + numberOfNodes = 20 + topic = "foobar" + dValues = DValues( + dLow: some(2), dHigh: some(3), d: some(2), dOut: some(1), dLazy: some(0) + ) + nodes = generateNodes( + numberOfNodes, + gossip = true, + dValues = some(dValues), + gossipFactor = some(0.float), + ) + nodesFut = await allFinished(nodes.mapIt(it.switch.start())) + + # All of them are checking for iHave messages + var receivedIHaves: seq[int] = repeat(0, numberOfNodes) + for i in 0 ..< numberOfNodes: + var pubsubObserver: PubSubObserver + capture i: + let checkForIhaves = proc(peer: PubSubPeer, msgs: var RPCMsg) = + if msgs.control.isSome: + let iHave = msgs.control.get.ihave + if iHave.len > 0: + for msg in iHave: + if msg.topicID == topic: + receivedIHaves[i] += 1 + + pubsubObserver = PubSubObserver(onRecv: checkForIhaves) + + nodes[i].addObserver(pubsubObserver) + + # All of them are connected to node 0 + for i in 1 ..< numberOfNodes: + await nodes[0].switch.connect(nodes[i].peerInfo.peerId, nodes[i].peerInfo.addrs) + + # And subscribed to the same topic + for node in nodes: + node.subscribe(topic, voidTopicHandler) + await sleepAsync(DURATION_TIMEOUT) + + # When node 0 sends a message + discard nodes[0].publish(topic, "Hello!".toBytes()) + await sleepAsync(DURATION_TIMEOUT) + + # None of the nodes should have received an iHave message + check: + filterIt(receivedIHaves, it > 0).len == 0 + + await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) + + asyncTest "adaptive gossip dissemination, with gossipFactor priority": + # Given 20 nodes + let + numberOfNodes = 20 + topic = "foobar" + dValues = DValues( + dLow: some(2), dHigh: some(3), d: some(2), dOut: some(1), dLazy: some(4) + ) + nodes = generateNodes( + numberOfNodes, gossip = true, dValues = some(dValues), gossipFactor = some(0.5) + ) + nodesFut = await allFinished(nodes.mapIt(it.switch.start())) + + # All of them are checking for iHave messages + var receivedIHaves: seq[int] = repeat(0, numberOfNodes) + for i in 0 ..< numberOfNodes: + var pubsubObserver: PubSubObserver + capture i: + let checkForIhaves = proc(peer: PubSubPeer, msgs: var RPCMsg) = + if msgs.control.isSome: + let iHave = msgs.control.get.ihave + if iHave.len > 0: + for msg in iHave: + if msg.topicID == topic: + receivedIHaves[i] += 1 + + pubsubObserver = PubSubObserver(onRecv: checkForIhaves) + + nodes[i].addObserver(pubsubObserver) + + # All of them are connected to node 0 + for i in 1 ..< numberOfNodes: + await nodes[0].switch.connect(nodes[i].peerInfo.peerId, nodes[i].peerInfo.addrs) + + # And subscribed to the same topic + for node in nodes: + node.subscribe(topic, voidTopicHandler) + await sleepAsync(DURATION_TIMEOUT) + + # When node 0 sends a message + discard nodes[0].publish(topic, "Hello!".toBytes()) + await sleepAsync(DURATION_TIMEOUT) + + # At least 8 of the nodes should have received an iHave message + # That's because the gossip factor is 0.5 over 16 available nodes + check: + filterIt(receivedIHaves, it > 0).len >= 8 + + await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) + + asyncTest "adaptive gossip dissemination, with dLazy priority": + # Given 20 nodes + let + numberOfNodes = 20 + topic = "foobar" + dValues = DValues( + dLow: some(2), dHigh: some(3), d: some(2), dOut: some(1), dLazy: some(6) + ) + nodes = generateNodes( + numberOfNodes, + gossip = true, + dValues = some(dValues), + gossipFactor = some(0.float), + ) + nodesFut = await allFinished(nodes.mapIt(it.switch.start())) + + # All of them are checking for iHave messages + var receivedIHaves: seq[int] = repeat(0, numberOfNodes) + for i in 0 ..< numberOfNodes: + var pubsubObserver: PubSubObserver + capture i: + let checkForIhaves = proc(peer: PubSubPeer, msgs: var RPCMsg) = + if msgs.control.isSome: + let iHave = msgs.control.get.ihave + if iHave.len > 0: + for msg in iHave: + if msg.topicID == topic: + receivedIHaves[i] += 1 + + pubsubObserver = PubSubObserver(onRecv: checkForIhaves) + + nodes[i].addObserver(pubsubObserver) + + # All of them are connected to node 0 + for i in 1 ..< numberOfNodes: + await nodes[0].switch.connect(nodes[i].peerInfo.peerId, nodes[i].peerInfo.addrs) + + # And subscribed to the same topic + for node in nodes: + node.subscribe(topic, voidTopicHandler) + await sleepAsync(DURATION_TIMEOUT) + + # When node 0 sends a message + discard nodes[0].publish(topic, "Hello!".toBytes()) + await sleepAsync(DURATION_TIMEOUT) + + # At least 6 of the nodes should have received an iHave message + # That's because the dLazy is 6 + check: + filterIt(receivedIHaves, it > 0).len == 6 + + await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index 662ea204a..1da5a9b9c 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -82,6 +82,7 @@ proc generateNodes*( gossipSubVersion: string = "", floodPublish: bool = false, dValues: Option[DValues] = DValues.none(), + gossipFactor: Option[float] = float.none(), ): seq[PubSub] = for i in 0 ..< num: let switch = newStandardSwitch( @@ -106,6 +107,9 @@ proc generateNodes*( p.enablePX = enablePX p.overheadRateLimit = overheadRateLimit + if gossipFactor.isSome: + p.gossipFactor = gossipFactor.get + if dValues.isSome: let dValuesSome = dValues.get