From e53d04130573131e8d197a74eb4c188f3ceb842f Mon Sep 17 00:00:00 2001 From: Alejandro Cabeza Romero Date: Tue, 27 Aug 2024 18:54:38 +0200 Subject: [PATCH] Fix resub after unsub test. --- tests/pubsub/testgossipsub.nim | 105 +++++++++++++++++++++++---------- tests/pubsub/utils.nim | 1 - 2 files changed, 74 insertions(+), 32 deletions(-) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 78a39717c..253b3582d 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -13,6 +13,7 @@ import sequtils, options, tables, sets, sugar import chronos, stew/byteutils, chronos/ratelimit import chronicles import metrics +import ../../libp2p/protocols/pubsub/gossipsub/behavior import utils, ../../libp2p/[ @@ -31,7 +32,7 @@ import protocols/pubsub/rpc/messages, ] import ../../libp2p/protocols/pubsub/errors as pubsub_errors -import ../helpers, ../utils/[async] +import ../helpers, ../utils/[async, futures, async, mock] proc `$`(peer: PubSubPeer): string = shortLog(peer) @@ -278,35 +279,24 @@ suite "GossipSub": await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop()) asyncTest "GossipSub unsub - resub faster than backoff": - var handlerFut = newFuture[bool]() - proc handler(topic: string, data: seq[byte]) {.async.} = + # Mock replenishFanout so it doesn't interfere with publishing, otherwise it's used as a backup mechanism during backoff period + let backup = behavior.replenishFanout + mock(behavior.replenishFanout): + proc mockedReplenishFanout(g: GossipSub) {.async.} = + discard + + mockedReplenishFanout + + # Instantiate handlers and validators + var handlerFut0 = newFuture[bool]() + proc handler0(topic: string, data: seq[byte]) {.async.} = check topic == "foobar" - handlerFut.complete(true) + handlerFut0.complete(true) - let - nodes = generateNodes(2, gossip = true) - - # start switches - nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start()) - - await subscribeNodes(nodes) - - nodes[0].subscribe("foobar", handler) - nodes[1].subscribe("foobar", handler) - - var subs: seq[Future[void]] - subs &= waitSub(nodes[1], nodes[0], "foobar") - subs &= waitSub(nodes[0], nodes[1], "foobar") - - await allFuturesThrowing(subs) - - nodes[0].unsubscribe("foobar", handler) - nodes[0].subscribe("foobar", handler) - - # regular backoff is 60 seconds, so we must not wait that long - await ( - waitSub(nodes[0], nodes[1], "foobar") and waitSub(nodes[1], nodes[0], "foobar") - ).wait(30.seconds) + var handlerFut1 = newFuture[bool]() + proc handler1(topic: string, data: seq[byte]) {.async.} = + check topic == "foobar" + handlerFut1.complete(true) var validatorFut = newFuture[bool]() proc validator( @@ -316,13 +306,66 @@ suite "GossipSub": validatorFut.complete(true) result = ValidationResult.Accept + # Setup nodes and start switches + let + nodes = generateNodes(2, gossip = true, unsubscribeBackoff = 5.seconds) + nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start()) + topic = "foobar" + + # Connect nodes + await subscribeNodes(nodes) + await sleepAsync(DURATION_TIMEOUT) + + # Subscribe both nodes to the topic and node1 (receiver) to the validator + nodes[0].subscribe(topic, handler0) + nodes[1].subscribe(topic, handler1) nodes[1].addValidator("foobar", validator) - tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 + await sleepAsync(DURATION_TIMEOUT) - check (await validatorFut) and (await handlerFut) + # Wait for both nodes to verify others' subscription + var subs: seq[Future[void]] + subs &= waitSub(nodes[1], nodes[0], topic) + subs &= waitSub(nodes[0], nodes[1], topic) + await allFuturesThrowing(subs) + # When unsubscribing and resubscribing in a short time frame, the backoff period should be triggered + nodes[1].unsubscribe(topic, handler1) + await sleepAsync(DURATION_TIMEOUT) + nodes[1].subscribe(topic, handler1) + await sleepAsync(DURATION_TIMEOUT) + + # Backoff is set to 5 seconds, and the amount of sleeping time since the unsubsribe until now is 3-4s~ + # Meaning, the subscription shouldn't have been processed yet because it's still in backoff period + # When publishing under this condition + discard await nodes[0].publish("foobar", "Hello!".toBytes()) + await sleepAsync(DURATION_TIMEOUT) + + # Then the message should not be received: + check: + validatorFut.toResult().error() == "Future still not finished." + handlerFut1.toResult().error() == "Future still not finished." + handlerFut0.toResult().error() == "Future still not finished." + + validatorFut.reset() + handlerFut0.reset() + handlerFut1.reset() + + # If we wait backoff period to end, around 1-2s + await waitForMesh(nodes[0], nodes[1], topic, 3.seconds) + + discard await nodes[0].publish("foobar", "Hello!".toBytes()) + await sleepAsync(DURATION_TIMEOUT) + + # Then the message should be received + check: + validatorFut.toResult().isOk() + handlerFut1.toResult().isOk() + handlerFut0.toResult().error() == "Future still not finished." + + # Cleanup + mock(behavior.replenishFanout): + backup await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop()) - await allFuturesThrowing(nodesFut.concat()) asyncTest "e2e - GossipSub should add remote peer topic subscriptions": diff --git a/tests/pubsub/utils.nim b/tests/pubsub/utils.nim index 937951954..faa77cd5c 100644 --- a/tests/pubsub/utils.nim +++ b/tests/pubsub/utils.nim @@ -222,4 +222,3 @@ proc waitForMesh*( while not gossipsubSender.mesh.hasPeerId(key, receiverPeerId): trace "waitForMesh sleeping..." await activeWait(5.milliseconds, timeoutMoment, "waitForMesh timeout!") - echo "waitForMesh done!"