diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index a5e7c997d..079b1184f 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -1799,3 +1799,96 @@ suite "Gossipsub Parameters": # Cleanup await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) await allFuturesThrowing(nodesFut) + + asyncTest "Verify inclusion of backoff periods and peer lists in PRUNE messages for Peer Exchange": + # A, B & C are subscribed to something + # B unsubcribe from it, it should send + # PX to A & C + # + # C sent his SPR, not A + let + topic = "foobar" + nodes = + generateNodes(2, gossip = true, enablePX = true) & + generateNodes(1, gossip = true, sendSignedPeerRecord = true) + + # start switches + nodesFut = await allFinished( + nodes[0].switch.start(), nodes[1].switch.start(), nodes[2].switch.start() + ) + + var + gossip0 = GossipSub(nodes[0]) + gossip1 = GossipSub(nodes[1]) + gossip2 = GossipSub(nodes[2]) + + # Connect nodes + await subscribeNodes(nodes) + + # Subscribe nodes to the same topic + for node in nodes: + node.subscribe(topic, voidTopicHandler) + + for x in 0 ..< 3: + for y in 0 ..< 3: + if x != y: + await waitSub(nodes[x], nodes[y], topic) + + # Add an observer to node 2 for PRUNE messages + var receivedPrunes: seq[ControlPrune] = @[] + let checkForPrunes = proc(peer: PubSubPeer, msgs: var RPCMsg) = + if msgs.control.isSome: + let prunes = msgs.control.get.prune + for prune in prunes: + if prune.topicID == topic: + receivedPrunes.add(prune) + + gossip2.addObserver(PubSubObserver(onRecv: checkForPrunes)) + + # Setup record handlers for all nodes + var + passed0: Future[void] = newFuture[void]() + passed1: Future[void] = newFuture[void]() + passed2: Future[void] = newFuture[void]() + gossip0.routingRecordsHandler.add( + proc(peer: PeerId, tag: string, peers: seq[RoutingRecordsPair]) = + check: + tag == topic + peers.len == 2 + peers[0].record.isSome() xor peers[1].record.isSome() + passed0.complete() + ) + gossip1.routingRecordsHandler.add( + proc(peer: PeerId, tag: string, peers: seq[RoutingRecordsPair]) = + passed1.complete() + ) + gossip2.routingRecordsHandler.add( + proc(peer: PeerId, tag: string, peers: seq[RoutingRecordsPair]) = + check: + tag == topic + peers.len == 2 + peers[0].record.isSome() xor peers[1].record.isSome() + passed2.complete() + ) + + # Unsubscribe from the topic + nodes[1].unsubscribe(topic, handler) + + # Then verify what nodes receive the PX + check: + (await passed0.waitForResult()).isOk + not (await passed1.waitForResult()).isOk + (await passed2.waitForResult()).isOk + + # And verify node 2 received the PRUNE message and it contains the backoff period and peer list + check: + receivedPrunes.len == 1 + receivedPrunes[0].topicID == "foobar" + receivedPrunes[0].peers.len == 2 + receivedPrunes[0].backoff == 1 + + await allFuturesThrowing( + nodes[0].switch.stop(), nodes[1].switch.stop(), nodes[2].switch.stop() + ) + + await allFuturesThrowing(nodesFut.concat())