mirror of
https://github.com/status-im/nim-libp2p.git
synced 2025-01-19 01:12:23 +00:00
rebase with block6Test
This commit is contained in:
commit
e10e4d058b
@ -33,46 +33,6 @@ suite "GossipSub internal":
|
|||||||
teardown:
|
teardown:
|
||||||
checkTrackers()
|
checkTrackers()
|
||||||
|
|
||||||
asyncTest "subscribe/unsubscribeAll":
|
|
||||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
|
||||||
|
|
||||||
proc handler(topic: string, data: seq[byte]): Future[void] {.gcsafe.} =
|
|
||||||
discard
|
|
||||||
|
|
||||||
let topic = "foobar"
|
|
||||||
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
|
||||||
gossipSub.topicParams[topic] = TopicParams.init()
|
|
||||||
|
|
||||||
var conns = newSeq[Connection]()
|
|
||||||
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
|
||||||
for i in 0 ..< 15:
|
|
||||||
let conn = TestBufferStream.new(noop)
|
|
||||||
conns &= conn
|
|
||||||
let peerId = randomPeerId()
|
|
||||||
conn.peerId = peerId
|
|
||||||
let peer = gossipSub.getPubSubPeer(peerId)
|
|
||||||
peer.sendConn = conn
|
|
||||||
gossipSub.gossipsub[topic].incl(peer)
|
|
||||||
|
|
||||||
# test via dynamic dispatch
|
|
||||||
gossipSub.PubSub.subscribe(topic, handler)
|
|
||||||
|
|
||||||
check:
|
|
||||||
gossipSub.topics.contains(topic)
|
|
||||||
gossipSub.gossipsub[topic].len() > 0
|
|
||||||
gossipSub.mesh[topic].len() > 0
|
|
||||||
|
|
||||||
# test via dynamic dispatch
|
|
||||||
gossipSub.PubSub.unsubscribeAll(topic)
|
|
||||||
|
|
||||||
check:
|
|
||||||
topic notin gossipSub.topics # not in local topics
|
|
||||||
topic notin gossipSub.mesh # not in mesh
|
|
||||||
topic in gossipSub.gossipsub # but still in gossipsub table (for fanning out)
|
|
||||||
|
|
||||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
|
||||||
await gossipSub.switch.stop()
|
|
||||||
|
|
||||||
asyncTest "topic params":
|
asyncTest "topic params":
|
||||||
let params = TopicParams.init()
|
let params = TopicParams.init()
|
||||||
params.validateParameters().tryGet()
|
params.validateParameters().tryGet()
|
||||||
|
@ -15,10 +15,9 @@ import ../../libp2p/builders
|
|||||||
import ../../libp2p/errors
|
import ../../libp2p/errors
|
||||||
import ../../libp2p/crypto/crypto
|
import ../../libp2p/crypto/crypto
|
||||||
import ../../libp2p/stream/bufferstream
|
import ../../libp2p/stream/bufferstream
|
||||||
import ../../libp2p/protocols/pubsub/[pubsub, gossipsub, mcache, peertable]
|
|
||||||
import ../../libp2p/protocols/pubsub/rpc/[message, messages, protobuf]
|
|
||||||
import ../../libp2p/switch
|
import ../../libp2p/switch
|
||||||
import ../../libp2p/muxers/muxer
|
import ../../libp2p/muxers/muxer
|
||||||
|
import ../../libp2p/protocols/pubsub/rpc/protobuf
|
||||||
import utils
|
import utils
|
||||||
import chronos
|
import chronos
|
||||||
import unittest2
|
import unittest2
|
||||||
@ -41,97 +40,36 @@ suite "GossipSub Topic Membership Tests":
|
|||||||
|
|
||||||
# Addition of Designed Test cases for 6. Topic Membership Tests: https://www.notion.so/Gossipsub-651e02d4d7894bb2ac1e4edb55f3192d
|
# Addition of Designed Test cases for 6. Topic Membership Tests: https://www.notion.so/Gossipsub-651e02d4d7894bb2ac1e4edb55f3192d
|
||||||
|
|
||||||
# Simulate the `SUBSCRIBE` event and check proper handling in the mesh and gossipsub structures
|
# Generalized setup function to initialize one or more topics
|
||||||
asyncTest "handle SUBSCRIBE event":
|
proc setupGossipSub(
|
||||||
|
topics: seq[string], numPeers: int
|
||||||
|
): (TestGossipSub, seq[Connection]) =
|
||||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||||
|
|
||||||
# Ensure topic is correctly initialized
|
|
||||||
let topic = "test-topic"
|
|
||||||
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
|
||||||
gossipSub.topicParams[topic] = TopicParams.init()
|
|
||||||
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
|
||||||
# Initialize gossipsub for the topic
|
|
||||||
|
|
||||||
var conns = newSeq[Connection]()
|
var conns = newSeq[Connection]()
|
||||||
for i in 0 ..< 5:
|
|
||||||
let conn = TestBufferStream.new(noop)
|
|
||||||
conns &= conn
|
|
||||||
let peerId = randomPeerId()
|
|
||||||
conn.peerId = peerId
|
|
||||||
let peer = gossipSub.getPubSubPeer(peerId)
|
|
||||||
peer.sendConn = conn
|
|
||||||
gossipSub.gossipsub[topic].incl(peer) # Ensure the topic is added to gossipsub
|
|
||||||
|
|
||||||
# Subscribe to the topic
|
|
||||||
gossipSub.PubSub.subscribe(
|
|
||||||
topic,
|
|
||||||
proc(topic: string, data: seq[byte]): Future[void] {.async.} =
|
|
||||||
discard
|
|
||||||
,
|
|
||||||
)
|
|
||||||
|
|
||||||
check gossipSub.topics.contains(topic) # Check if the topic is in topics
|
|
||||||
check gossipSub.gossipsub[topic].len() > 0 # Check if topic added to gossipsub
|
|
||||||
|
|
||||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
|
||||||
await gossipSub.switch.stop()
|
|
||||||
|
|
||||||
# This test will simulate an UNSUBSCRIBE event and check if the topic is removed from the relevant data structures but remains in gossipsub
|
|
||||||
asyncTest "handle UNSUBSCRIBE event":
|
|
||||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
|
||||||
|
|
||||||
# Ensure topic is initialized properly in all relevant data structures
|
|
||||||
let topic = "test-topic"
|
|
||||||
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
|
||||||
gossipSub.topicParams[topic] = TopicParams.init()
|
|
||||||
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
|
||||||
# Initialize gossipsub for the topic
|
|
||||||
|
|
||||||
var conns = newSeq[Connection]()
|
|
||||||
for i in 0 ..< 5:
|
|
||||||
let conn = TestBufferStream.new(noop)
|
|
||||||
conns &= conn
|
|
||||||
let peerId = randomPeerId()
|
|
||||||
conn.peerId = peerId
|
|
||||||
let peer = gossipSub.getPubSubPeer(peerId)
|
|
||||||
peer.sendConn = conn
|
|
||||||
gossipSub.gossipsub[topic].incl(peer)
|
|
||||||
# Ensure peers are added to gossipsub for the topic
|
|
||||||
|
|
||||||
# Subscribe to the topic first
|
|
||||||
gossipSub.PubSub.subscribe(
|
|
||||||
topic,
|
|
||||||
proc(topic: string, data: seq[byte]): Future[void] {.async.} =
|
|
||||||
discard
|
|
||||||
,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Now unsubscribe from the topic
|
|
||||||
gossipSub.PubSub.unsubscribeAll(topic)
|
|
||||||
|
|
||||||
# Verify the topic is removed from relevant structures
|
|
||||||
check topic notin gossipSub.topics # The topic should not be in topics
|
|
||||||
check topic notin gossipSub.mesh # The topic should be removed from the mesh
|
|
||||||
check topic in gossipSub.gossipsub
|
|
||||||
# The topic should remain in gossipsub (for fanout)
|
|
||||||
|
|
||||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
|
||||||
await gossipSub.switch.stop()
|
|
||||||
|
|
||||||
# This test ensures that multiple topics can be subscribed to and unsubscribed from, with proper initialization of the topic structures.
|
|
||||||
asyncTest "handle multiple SUBSCRIBE and UNSUBSCRIBE events":
|
|
||||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
|
||||||
|
|
||||||
let topics = ["topic1", "topic2", "topic3"]
|
|
||||||
|
|
||||||
var conns = newSeq[Connection]()
|
|
||||||
for topic in topics:
|
for topic in topics:
|
||||||
# Initialize all relevant structures before subscribing
|
|
||||||
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
||||||
gossipSub.topicParams[topic] = TopicParams.init()
|
gossipSub.topicParams[topic] = TopicParams.init()
|
||||||
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
||||||
# Initialize gossipsub for each topic
|
|
||||||
|
|
||||||
|
for i in 0 ..< numPeers:
|
||||||
|
let conn = TestBufferStream.new(noop)
|
||||||
|
conns &= conn
|
||||||
|
let peerId = randomPeerId()
|
||||||
|
conn.peerId = peerId
|
||||||
|
let peer = gossipSub.getPubSubPeer(peerId)
|
||||||
|
peer.sendConn = conn
|
||||||
|
gossipSub.gossipsub[topic].incl(peer)
|
||||||
|
|
||||||
|
return (gossipSub, conns)
|
||||||
|
|
||||||
|
# Wrapper function to initialize a single topic by converting it into a seq
|
||||||
|
proc setupGossipSub(topic: string, numPeers: int): (TestGossipSub, seq[Connection]) =
|
||||||
|
setupGossipSub(@[topic], numPeers)
|
||||||
|
|
||||||
|
# Helper function to subscribe to topics
|
||||||
|
proc subscribeToTopics(gossipSub: TestGossipSub, topics: seq[string]) =
|
||||||
|
for topic in topics:
|
||||||
gossipSub.PubSub.subscribe(
|
gossipSub.PubSub.subscribe(
|
||||||
topic,
|
topic,
|
||||||
proc(topic: string, data: seq[byte]): Future[void] {.async.} =
|
proc(topic: string, data: seq[byte]): Future[void] {.async.} =
|
||||||
@ -139,14 +77,74 @@ suite "GossipSub Topic Membership Tests":
|
|||||||
,
|
,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Helper function to unsubscribe to topics
|
||||||
|
proc unsubscribeFromTopics(gossipSub: TestGossipSub, topics: seq[string]) =
|
||||||
|
for topic in topics:
|
||||||
|
gossipSub.PubSub.unsubscribeAll(topic)
|
||||||
|
|
||||||
|
# Simulate the `SUBSCRIBE` to the topic and check proper handling in the mesh and gossipsub structures
|
||||||
|
asyncTest "handle SUBSCRIBE to the topic":
|
||||||
|
let topic = "test-topic"
|
||||||
|
let (gossipSub, conns) = setupGossipSub(topic, 5)
|
||||||
|
|
||||||
|
# Check if the topic is added to gossipsub and the peers list is not empty
|
||||||
|
check gossipSub.gossipsub[topic].len() > 0
|
||||||
|
|
||||||
|
# Subscribe to the topic
|
||||||
|
subscribeToTopics(gossipSub, @[topic])
|
||||||
|
|
||||||
|
# Check if the topic is present in the list of subscribed topics
|
||||||
|
check gossipSub.topics.contains(topic)
|
||||||
|
|
||||||
|
# Close all peer connections and verify that they are properly cleaned up
|
||||||
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||||
|
|
||||||
|
# Stop the gossipSub switch and wait for it to stop completely
|
||||||
|
await gossipSub.switch.stop()
|
||||||
|
|
||||||
|
# Verify that connections have been closed and cleaned up after shutdown
|
||||||
|
for peer in gossipSub.peers.values:
|
||||||
|
check peer.sendConn == nil or peer.sendConn.closed()
|
||||||
|
|
||||||
|
# Ensure that the topic is removed from the mesh after stopping
|
||||||
|
check gossipSub.mesh[topic].len() == 0
|
||||||
|
|
||||||
|
# Simulate an UNSUBSCRIBE to the topic and check if the topic is removed from the relevant data structures but remains in gossipsub
|
||||||
|
asyncTest "handle UNSUBSCRIBE to the topic":
|
||||||
|
let topic = "test-topic"
|
||||||
|
let (gossipSub, conns) = setupGossipSub(topic, 5)
|
||||||
|
|
||||||
|
# Subscribe to the topic first
|
||||||
|
subscribeToTopics(gossipSub, @[topic])
|
||||||
|
|
||||||
|
# Now unsubscribe from the topic
|
||||||
|
unsubscribeFromTopics(gossipSub, @[topic])
|
||||||
|
|
||||||
|
# Verify the topic is removed from relevant structures
|
||||||
|
check topic notin gossipSub.topics
|
||||||
|
check topic notin gossipSub.mesh
|
||||||
|
check topic in gossipSub.gossipsub
|
||||||
|
|
||||||
|
# The topic should remain in gossipsub (for fanout)
|
||||||
|
|
||||||
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||||
|
await gossipSub.switch.stop()
|
||||||
|
|
||||||
|
# Test subscribing and unsubscribing multiple topics
|
||||||
|
asyncTest "handle SUBSCRIBE and UNSUBSCRIBE multiple topics":
|
||||||
|
let topics = ["topic1", "topic2", "topic3"].toSeq()
|
||||||
|
let (gossipSub, conns) = setupGossipSub(topics, 5)
|
||||||
|
|
||||||
|
# Subscribe to multiple topics
|
||||||
|
subscribeToTopics(gossipSub, topics)
|
||||||
|
|
||||||
# Verify that all topics are added to the topics and gossipsub
|
# Verify that all topics are added to the topics and gossipsub
|
||||||
check gossipSub.topics.len == 3
|
check gossipSub.topics.len == 3
|
||||||
for topic in topics:
|
for topic in topics:
|
||||||
check gossipSub.gossipsub[topic].len() >= 0
|
check gossipSub.gossipsub[topic].len() >= 0
|
||||||
|
|
||||||
# Now unsubscribe from all topics
|
# Unsubscribe from all topics
|
||||||
for topic in topics:
|
unsubscribeFromTopics(gossipSub, topics)
|
||||||
gossipSub.PubSub.unsubscribeAll(topic)
|
|
||||||
|
|
||||||
# Ensure topics are removed from topics and mesh, but still present in gossipsub
|
# Ensure topics are removed from topics and mesh, but still present in gossipsub
|
||||||
for topic in topics:
|
for topic in topics:
|
||||||
@ -157,10 +155,10 @@ suite "GossipSub Topic Membership Tests":
|
|||||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||||
await gossipSub.switch.stop()
|
await gossipSub.switch.stop()
|
||||||
|
|
||||||
# This test ensures that the number of subscriptions does not exceed the limit set in the GossipSub parameters
|
# Test ensuring that the number of subscriptions does not exceed the limit set in the GossipSub parameters
|
||||||
asyncTest "subscription limit test":
|
asyncTest "subscription limit test":
|
||||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
||||||
gossipSub.topicsHigh = 10 # Set a limit for the number of subscriptions
|
gossipSub.topicsHigh = 10
|
||||||
|
|
||||||
var conns = newSeq[Connection]()
|
var conns = newSeq[Connection]()
|
||||||
for i in 0 .. gossipSub.topicsHigh + 5:
|
for i in 0 .. gossipSub.topicsHigh + 5:
|
||||||
@ -189,76 +187,40 @@ suite "GossipSub Topic Membership Tests":
|
|||||||
await gossipSub.switch.stop()
|
await gossipSub.switch.stop()
|
||||||
|
|
||||||
# Test for verifying peers joining a topic using `JOIN(topic)`
|
# Test for verifying peers joining a topic using `JOIN(topic)`
|
||||||
asyncTest "handle JOIN event":
|
asyncTest "handle JOIN topic and mesh is updated":
|
||||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
|
||||||
|
|
||||||
let topic = "test-join-topic"
|
let topic = "test-join-topic"
|
||||||
|
|
||||||
# Initialize relevant data structures
|
# Initialize the GossipSub system and simulate peer connections
|
||||||
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
let (gossipSub, conns) = setupGossipSub(topic, 5)
|
||||||
gossipSub.topicParams[topic] = TopicParams.init()
|
|
||||||
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
|
||||||
|
|
||||||
var conns = newSeq[Connection]()
|
# Simulate peer joining the topic
|
||||||
|
subscribeToTopics(gossipSub, @[topic])
|
||||||
|
|
||||||
for i in 0 ..< 5:
|
# Check that peers are added to the mesh and the topic is tracked
|
||||||
let conn = TestBufferStream.new(noop)
|
check gossipSub.mesh[topic].len > 0
|
||||||
conns &= conn
|
check gossipSub.topics.contains(topic)
|
||||||
let peerId = randomPeerId()
|
|
||||||
conn.peerId = peerId
|
|
||||||
let peer = gossipSub.getPubSubPeer(peerId)
|
|
||||||
peer.sendConn = conn
|
|
||||||
gossipSub.gossipsub[topic].incl(peer)
|
|
||||||
|
|
||||||
# Simulate the peer joining the topic
|
|
||||||
gossipSub.PubSub.subscribe(
|
|
||||||
topic,
|
|
||||||
proc(topic: string, data: seq[byte]): Future[void] {.async.} =
|
|
||||||
discard
|
|
||||||
,
|
|
||||||
)
|
|
||||||
|
|
||||||
check gossipSub.mesh[topic].len > 0 # Ensure the peer is added to the mesh
|
|
||||||
check gossipSub.topics.contains(topic) # Ensure the topic is in `topics`
|
|
||||||
|
|
||||||
|
# Clean up by closing connections and stopping the gossipSub switch
|
||||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||||
await gossipSub.switch.stop()
|
await gossipSub.switch.stop()
|
||||||
|
|
||||||
# Test for verifying peers leaving a topic using `LEAVE(topic)`
|
# Test for verifying peers leaving a topic using `LEAVE(topic)`
|
||||||
asyncTest "handle LEAVE event":
|
asyncTest "handle LEAVE topic and mesh is updated":
|
||||||
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
|
||||||
|
|
||||||
let topic = "test-leave-topic"
|
let topic = "test-leave-topic"
|
||||||
|
|
||||||
# Initialize relevant data structures
|
# Initialize the GossipSub system and simulate peer connections
|
||||||
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
let (gossipSub, conns) = setupGossipSub(topic, 5)
|
||||||
gossipSub.topicParams[topic] = TopicParams.init()
|
|
||||||
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
|
||||||
|
|
||||||
var conns = newSeq[Connection]()
|
|
||||||
|
|
||||||
for i in 0 ..< 5:
|
|
||||||
let conn = TestBufferStream.new(noop)
|
|
||||||
conns &= conn
|
|
||||||
let peerId = randomPeerId()
|
|
||||||
conn.peerId = peerId
|
|
||||||
let peer = gossipSub.getPubSubPeer(peerId)
|
|
||||||
peer.sendConn = conn
|
|
||||||
gossipSub.gossipsub[topic].incl(peer)
|
|
||||||
|
|
||||||
# Simulate peer joining the topic first
|
# Simulate peer joining the topic first
|
||||||
gossipSub.PubSub.subscribe(
|
subscribeToTopics(gossipSub, @[topic])
|
||||||
topic,
|
|
||||||
proc(topic: string, data: seq[byte]): Future[void] {.async.} =
|
|
||||||
discard
|
|
||||||
,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Now simulate peer leaving the topic
|
# Now simulate peer leaving the topic
|
||||||
gossipSub.PubSub.unsubscribeAll(topic)
|
unsubscribeFromTopics(gossipSub, @[topic])
|
||||||
|
|
||||||
check topic notin gossipSub.mesh # Ensure the peer is removed from the mesh
|
# Check that peers are removed from the mesh but the topic remains in gossipsub
|
||||||
check topic in gossipSub.gossipsub # Ensure the topic remains in `gossipsub`
|
check topic notin gossipSub.mesh
|
||||||
|
check topic in gossipSub.gossipsub
|
||||||
|
|
||||||
|
# Clean up by closing connections and stopping the gossipSub switch
|
||||||
await allFuturesThrowing(conns.mapIt(it.close()))
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
||||||
await gossipSub.switch.stop()
|
await gossipSub.switch.stop()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user