another set of review comment fix

This commit is contained in:
shashankshampi 2024-10-24 19:16:13 +05:30
parent aa34c7f553
commit df674d586a

View File

@ -37,128 +37,118 @@ let DURATION_TIMEOUT = 500.milliseconds
suite "GossipSub Topic Membership Tests": suite "GossipSub Topic Membership Tests":
teardown: teardown:
checkTrackers() checkTrackers()
# Addition of Designed Test cases for 6. Topic Membership Tests: https://www.notion.so/Gossipsub-651e02d4d7894bb2ac1e4edb55f3192d # Addition of Designed Test cases for 6. Topic Membership Tests: https://www.notion.so/Gossipsub-651e02d4d7894bb2ac1e4edb55f3192d
# Generalized setup function to initialize one or more topics
proc setupGossipSub(
topics: seq[string], numPeers: int
): (TestGossipSub, seq[Connection]) =
let gossipSub = TestGossipSub.init(newStandardSwitch())
var conns = newSeq[Connection]()
for topic in topics:
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.topicParams[topic] = TopicParams.init()
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
for i in 0 ..< numPeers:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.sendConn = conn
gossipSub.gossipsub[topic].incl(peer)
return (gossipSub, conns)
# Wrapper function to initialize a single topic by converting it into a seq
proc setupGossipSub(topic: string, numPeers: int): (TestGossipSub, seq[Connection]) =
setupGossipSub(@[topic], numPeers)
# Helper function to subscribe to topics
proc subscribeToTopics(gossipSub: TestGossipSub, topics: seq[string]) =
for topic in topics:
gossipSub.subscribe(
topic,
proc(topic: string, data: seq[byte]): Future[void] {.async.} =
discard
,
)
# Helper function to unsubscribe to topics
proc unsubscribeFromTopics(gossipSub: TestGossipSub, topics: seq[string]) =
for topic in topics:
gossipSub.unsubscribeAll(topic)
proc commonSubscribe(
nodes: seq[TestGossipSub],
topic: string,
handler: proc(topic: string, data: seq[byte]) {.async.},
) =
for node in nodes:
node.subscribe(topic, handler)
echo "Subscribed all nodes to the topic: ", topic
proc commonUnsubscribe(
nodes: seq[TestGossipSub],
topic: string,
handler: proc(topic: string, data: seq[byte]) {.async.},
) =
for node in nodes:
node.unsubscribe(topic, handler)
echo "Unsubscribed all nodes from the topic: ", topic
# Simulate the `SUBSCRIBE` to the topic and check proper handling in the mesh and gossipsub structures # Simulate the `SUBSCRIBE` to the topic and check proper handling in the mesh and gossipsub structures
asyncTest "handle SUBSCRIBE to the topic": asyncTest "handle SUBSCRIBE to the topic":
let topic = "test-topic" let
let (gossipSub, conns) = setupGossipSub(topic, 5) numberOfNodes = 5
topic = "test-topic"
nodes = generateNodes(numberOfNodes, gossip = true)
subscribeToTopics(gossipSub, @[topic]) await allFuturesThrowing(nodes.mapIt(it.switch.start()))
check gossipSub.topics.contains(topic) await subscribeNodes(nodes)
for node in nodes:
node.subscribe(topic, voidTopicHandler)
check gossipSub.gossipsub[topic].len() == 5 await sleepAsync(2 * DURATION_TIMEOUT)
await allFuturesThrowing(conns.mapIt(it.close())) for node in nodes:
await gossipSub.switch.stop() let currentGossip = GossipSub(node)
# Simulate an UNSUBSCRIBE to the topic and check if the topic is removed from the relevant data structures but remains in gossipsub check currentGossip.topics.contains(topic)
check currentGossip.gossipsub[topic].len() == numberOfNodes - 1
check currentGossip.mesh[topic].len() == numberOfNodes - 1
await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))
# Simulate an UNSUBSCRIBE to the topic and check if the topic is removed from all relevant data structures
asyncTest "handle UNSUBSCRIBE to the topic": asyncTest "handle UNSUBSCRIBE to the topic":
let topic = "test-topic" let
let (gossipSub, conns) = setupGossipSub(topic, 5) numberOfNodes = 5
topic = "test-topic"
nodes = generateNodes(numberOfNodes, gossip = true)
subscribeToTopics(gossipSub, @[topic]) await allFuturesThrowing(nodes.mapIt(it.switch.start()))
unsubscribeFromTopics(gossipSub, @[topic]) await subscribeNodes(nodes)
for node in nodes:
node.subscribe(topic, voidTopicHandler)
check topic notin gossipSub.topics await sleepAsync(2 * DURATION_TIMEOUT)
check topic notin gossipSub.mesh
check topic in gossipSub.gossipsub
await allFuturesThrowing(conns.mapIt(it.close())) for node in nodes:
await gossipSub.switch.stop() node.unsubscribe(topic, voidTopicHandler)
await sleepAsync(2 * DURATION_TIMEOUT)
for node in nodes:
let currentGossip = GossipSub(node)
check topic notin currentGossip.topics
if topic in currentGossip.mesh:
check currentGossip.mesh[topic].len == 0
else:
check topic notin currentGossip.mesh
if topic in currentGossip.gossipsub:
check currentGossip.gossipsub[topic].len == 0
else:
check topic notin currentGossip.gossipsub
await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))
# Test subscribing and unsubscribing multiple topics # Test subscribing and unsubscribing multiple topics
asyncTest "handle SUBSCRIBE and UNSUBSCRIBE multiple topics": asyncTest "handle SUBSCRIBE and UNSUBSCRIBE multiple topics":
let topics = ["topic1", "topic2", "topic3"].toSeq() let
let (gossipSub, conns) = setupGossipSub(topics, 5) numberOfNodes = 5
topics = ["topic1", "topic2", "topic3"].toSeq()
nodes = generateNodes(numberOfNodes, gossip = true)
subscribeToTopics(gossipSub, topics) await allFuturesThrowing(nodes.mapIt(it.switch.start()))
check gossipSub.topics.len == 3 await subscribeNodes(nodes)
for topic in topics: for node in nodes:
check gossipSub.gossipsub[topic].len() == 5 for topic in topics:
node.subscribe(topic, voidTopicHandler)
unsubscribeFromTopics(gossipSub, topics) await sleepAsync(2 * DURATION_TIMEOUT)
for topic in topics: for node in nodes:
check topic notin gossipSub.topics let currentGossip = GossipSub(node)
check topic notin gossipSub.mesh check currentGossip.topics.len == topics.len
check topic in gossipSub.gossipsub for topic in topics:
check currentGossip.gossipsub[topic].len == numberOfNodes - 1
await allFuturesThrowing(conns.mapIt(it.close())) for node in nodes:
await gossipSub.switch.stop() for topic in topics:
node.unsubscribe(topic, voidTopicHandler)
await sleepAsync(2 * DURATION_TIMEOUT)
for node in nodes:
let currentGossip = GossipSub(node)
for topic in topics:
check topic notin currentGossip.topics
check topic notin currentGossip.mesh
check topic notin currentGossip.gossipsub
await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))
# Test ensuring that the number of subscriptions does not exceed the limit set in the GossipSub parameters # Test ensuring that the number of subscriptions does not exceed the limit set in the GossipSub parameters
asyncTest "subscription limit test": asyncTest "subscription limit test":
let topicCount = 15 let
let gossipSubParams = 10 topicCount = 15
let topicNames = toSeq(mapIt(0 .. topicCount - 1, "topic" & $it)) gossipSubParams = 10
topicNames = toSeq(mapIt(0 .. topicCount - 1, "topic" & $it))
numberOfNodes = 1
nodes = generateNodes(numberOfNodes, gossip = true)
let (gossipSub, conns) = setupGossipSub(topicNames, 0) await allFuturesThrowing(nodes.mapIt(it.switch.start()))
let gossipSub = GossipSub(nodes[0])
gossipSub.topicsHigh = gossipSubParams gossipSub.topicsHigh = gossipSubParams
for topic in topicNames: for topic in topicNames:
@ -172,25 +162,31 @@ suite "GossipSub Topic Membership Tests":
else: else:
check gossipSub.topics.len == gossipSub.topicsHigh check gossipSub.topics.len == gossipSub.topicsHigh
check gossipSub.topics.len <= gossipSub.topicsHigh
check gossipSub.topics.len == gossipSub.topicsHigh check gossipSub.topics.len == gossipSub.topicsHigh
await allFuturesThrowing(conns.mapIt(it.close())) await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))
await gossipSub.switch.stop()
# Test for verifying peers joining a topic using `JOIN(topic)` # Test for verifying peers joining a topic using `JOIN(topic)`
asyncTest "handle JOIN topic and mesh is updated": asyncTest "handle JOIN topic and mesh is updated":
let topic = "test-join-topic" let
topic = "test-join-topic"
numberOfNodes = 5
nodes = generateNodes(numberOfNodes, gossip = true)
let (gossipSub, conns) = setupGossipSub(topic, 5) await allFuturesThrowing(nodes.mapIt(it.switch.start()))
commonSubscribe(@[gossipSub], topic, voidTopicHandler) await subscribeNodes(nodes)
for node in nodes:
node.subscribe(topic, voidTopicHandler)
check gossipSub.mesh[topic].len == 5 await sleepAsync(2 * DURATION_TIMEOUT)
check gossipSub.topics.contains(topic)
await allFuturesThrowing(conns.mapIt(it.close())) for node in nodes:
await gossipSub.switch.stop() let currentGossip = GossipSub(node)
check currentGossip.mesh[topic].len == numberOfNodes - 1
check currentGossip.topics.contains(topic)
await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))
# Test the behavior when multiple peers join and leave a topic simultaneously. # Test the behavior when multiple peers join and leave a topic simultaneously.
asyncTest "multiple peers join and leave topic simultaneously": asyncTest "multiple peers join and leave topic simultaneously":
@ -209,9 +205,8 @@ suite "GossipSub Topic Membership Tests":
for i in 0 ..< numberOfNodes: for i in 0 ..< numberOfNodes:
let currentGossip = GossipSub(nodes[i]) let currentGossip = GossipSub(nodes[i])
check currentGossip.gossipsub.hasKey(topic) check currentGossip.gossipsub.hasKey(topic)
check currentGossip.mesh.hasKey(topic)
for i in 0 ..< numberOfNodes: check currentGossip.topics.contains(topic)
let currentGossip = GossipSub(nodes[i])
for x in 0 ..< numberOfNodes: for x in 0 ..< numberOfNodes:
for y in 0 ..< numberOfNodes: for y in 0 ..< numberOfNodes:
@ -236,5 +231,7 @@ suite "GossipSub Topic Membership Tests":
await sleepAsync(3 * DURATION_TIMEOUT) await sleepAsync(3 * DURATION_TIMEOUT)
check firstNodeGossip.mesh.getOrDefault(topic).len == 3 check firstNodeGossip.mesh.getOrDefault(topic).len == 3
check firstNodeGossip.gossipsub[topic].len == 3
check topic in firstNodeGossip.topics
await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop()))) await allFuturesThrowing(nodes.mapIt(allFutures(it.switch.stop())))