mirror of
https://github.com/status-im/nim-libp2p.git
synced 2025-02-18 15:59:22 +00:00
* use stream directly in chronosstream for now, chronos.AsyncStream is not used to provide any features on top of chronos.Stream, so in order to simplify the code, chronosstream can be used directly. In particular, the exception handling is broken in the current chronosstream - opening and closing the stream is simplified this way as well. A future implementation that actually takes advantage of the AsyncStream features would wrap AsyncStream instead as a separate lpstream implementation, leaving this one as-is. * work around chronos exception type issue
333 lines
11 KiB
Nim
333 lines
11 KiB
Nim
include ../../libp2p/protocols/pubsub/gossipsub
|
|
|
|
import unittest
|
|
import ../../libp2p/errors
|
|
import ../../libp2p/stream/bufferstream
|
|
|
|
import ../helpers
|
|
|
|
type
|
|
TestGossipSub = ref object of GossipSub
|
|
|
|
suite "GossipSub internal":
|
|
teardown:
|
|
for tracker in testTrackers():
|
|
check tracker.isLeaked() == false
|
|
|
|
test "`rebalanceMesh` Degree Lo":
|
|
proc testRun(): Future[bool] {.async.} =
|
|
let gossipSub = newPubSub(TestGossipSub,
|
|
PeerInfo.init(PrivateKey.random(RSA)))
|
|
|
|
let topic = "foobar"
|
|
gossipSub.mesh[topic] = initHashSet[string]()
|
|
|
|
var conns = newSeq[Connection]()
|
|
for i in 0..<15:
|
|
let conn = newConnection(newBufferStream())
|
|
conns &= conn
|
|
let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
|
|
conn.peerInfo = peerInfo
|
|
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
|
|
gossipSub.peers[peerInfo.id].conn = conn
|
|
gossipSub.mesh[topic].incl(peerInfo.id)
|
|
|
|
check gossipSub.peers.len == 15
|
|
await gossipSub.rebalanceMesh(topic)
|
|
check gossipSub.mesh[topic].len == GossipSubD
|
|
|
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
|
|
|
result = true
|
|
|
|
check:
|
|
waitFor(testRun()) == true
|
|
|
|
test "`rebalanceMesh` Degree Hi":
|
|
proc testRun(): Future[bool] {.async.} =
|
|
let gossipSub = newPubSub(TestGossipSub,
|
|
PeerInfo.init(PrivateKey.random(RSA)))
|
|
|
|
let topic = "foobar"
|
|
gossipSub.gossipsub[topic] = initHashSet[string]()
|
|
|
|
var conns = newSeq[Connection]()
|
|
for i in 0..<15:
|
|
let conn = newConnection(newBufferStream())
|
|
conns &= conn
|
|
let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
|
|
conn.peerInfo = peerInfo
|
|
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
|
|
gossipSub.peers[peerInfo.id].conn = conn
|
|
gossipSub.gossipsub[topic].incl(peerInfo.id)
|
|
|
|
check gossipSub.gossipsub[topic].len == 15
|
|
await gossipSub.rebalanceMesh(topic)
|
|
check gossipSub.mesh[topic].len == GossipSubD
|
|
|
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
|
|
|
result = true
|
|
|
|
check:
|
|
waitFor(testRun()) == true
|
|
|
|
test "`replenishFanout` Degree Lo":
|
|
proc testRun(): Future[bool] {.async.} =
|
|
let gossipSub = newPubSub(TestGossipSub,
|
|
PeerInfo.init(PrivateKey.random(RSA)))
|
|
|
|
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
|
|
discard
|
|
|
|
let topic = "foobar"
|
|
gossipSub.gossipsub[topic] = initHashSet[string]()
|
|
|
|
var conns = newSeq[Connection]()
|
|
for i in 0..<15:
|
|
let conn = newConnection(newBufferStream())
|
|
conns &= conn
|
|
var peerInfo = PeerInfo.init(PrivateKey.random(RSA))
|
|
conn.peerInfo = peerInfo
|
|
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
|
|
gossipSub.peers[peerInfo.id].handler = handler
|
|
gossipSub.gossipsub[topic].incl(peerInfo.id)
|
|
|
|
check gossipSub.gossipsub[topic].len == 15
|
|
await gossipSub.replenishFanout(topic)
|
|
check gossipSub.fanout[topic].len == GossipSubD
|
|
|
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
|
|
|
result = true
|
|
|
|
check:
|
|
waitFor(testRun()) == true
|
|
|
|
test "`dropFanoutPeers` drop expired fanout topics":
|
|
proc testRun(): Future[bool] {.async.} =
|
|
let gossipSub = newPubSub(TestGossipSub,
|
|
PeerInfo.init(PrivateKey.random(RSA)))
|
|
|
|
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
|
|
discard
|
|
|
|
let topic = "foobar"
|
|
gossipSub.fanout[topic] = initHashSet[string]()
|
|
gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(100.millis)
|
|
|
|
var conns = newSeq[Connection]()
|
|
for i in 0..<6:
|
|
let conn = newConnection(newBufferStream())
|
|
conns &= conn
|
|
let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
|
|
conn.peerInfo = peerInfo
|
|
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
|
|
gossipSub.peers[peerInfo.id].handler = handler
|
|
gossipSub.fanout[topic].incl(peerInfo.id)
|
|
|
|
check gossipSub.fanout[topic].len == GossipSubD
|
|
|
|
await gossipSub.dropFanoutPeers()
|
|
check topic notin gossipSub.fanout
|
|
|
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
|
|
|
result = true
|
|
|
|
check:
|
|
waitFor(testRun()) == true
|
|
|
|
test "`dropFanoutPeers` leave unexpired fanout topics":
|
|
proc testRun(): Future[bool] {.async.} =
|
|
let gossipSub = newPubSub(TestGossipSub,
|
|
PeerInfo.init(PrivateKey.random(RSA)))
|
|
|
|
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
|
|
discard
|
|
|
|
let topic1 = "foobar1"
|
|
let topic2 = "foobar2"
|
|
gossipSub.fanout[topic1] = initHashSet[string]()
|
|
gossipSub.fanout[topic2] = initHashSet[string]()
|
|
gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(100.millis)
|
|
gossipSub.lastFanoutPubSub[topic2] = Moment.fromNow(5.seconds)
|
|
|
|
var conns = newSeq[Connection]()
|
|
for i in 0..<6:
|
|
let conn = newConnection(newBufferStream())
|
|
conns &= conn
|
|
let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
|
|
conn.peerInfo = peerInfo
|
|
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
|
|
gossipSub.peers[peerInfo.id].handler = handler
|
|
gossipSub.fanout[topic1].incl(peerInfo.id)
|
|
gossipSub.fanout[topic2].incl(peerInfo.id)
|
|
|
|
check gossipSub.fanout[topic1].len == GossipSubD
|
|
check gossipSub.fanout[topic2].len == GossipSubD
|
|
|
|
await gossipSub.dropFanoutPeers()
|
|
check topic1 notin gossipSub.fanout
|
|
check topic2 in gossipSub.fanout
|
|
|
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
|
|
|
result = true
|
|
|
|
check:
|
|
waitFor(testRun()) == true
|
|
|
|
test "`getGossipPeers` - should gather up to degree D non intersecting peers":
|
|
proc testRun(): Future[bool] {.async.} =
|
|
let gossipSub = newPubSub(TestGossipSub,
|
|
PeerInfo.init(PrivateKey.random(RSA)))
|
|
|
|
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
|
|
discard
|
|
|
|
let topic = "foobar"
|
|
gossipSub.mesh[topic] = initHashSet[string]()
|
|
gossipSub.fanout[topic] = initHashSet[string]()
|
|
gossipSub.gossipsub[topic] = initHashSet[string]()
|
|
var conns = newSeq[Connection]()
|
|
for i in 0..<30:
|
|
let conn = newConnection(newBufferStream())
|
|
conns &= conn
|
|
let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
|
|
conn.peerInfo = peerInfo
|
|
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
|
|
gossipSub.peers[peerInfo.id].handler = handler
|
|
if i mod 2 == 0:
|
|
gossipSub.fanout[topic].incl(peerInfo.id)
|
|
else:
|
|
gossipSub.mesh[topic].incl(peerInfo.id)
|
|
|
|
for i in 0..<15:
|
|
let conn = newConnection(newBufferStream())
|
|
conns &= conn
|
|
let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
|
|
conn.peerInfo = peerInfo
|
|
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
|
|
gossipSub.peers[peerInfo.id].handler = handler
|
|
gossipSub.gossipsub[topic].incl(peerInfo.id)
|
|
|
|
check gossipSub.fanout[topic].len == 15
|
|
check gossipSub.fanout[topic].len == 15
|
|
check gossipSub.gossipsub[topic].len == 15
|
|
|
|
let peers = gossipSub.getGossipPeers()
|
|
check peers.len == GossipSubD
|
|
for p in peers.keys:
|
|
check p notin gossipSub.fanout[topic]
|
|
check p notin gossipSub.mesh[topic]
|
|
|
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
|
|
|
result = true
|
|
|
|
check:
|
|
waitFor(testRun()) == true
|
|
|
|
test "`getGossipPeers` - should not crash on missing topics in mesh":
|
|
proc testRun(): Future[bool] {.async.} =
|
|
let gossipSub = newPubSub(TestGossipSub,
|
|
PeerInfo.init(PrivateKey.random(RSA)))
|
|
|
|
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
|
|
discard
|
|
|
|
let topic = "foobar"
|
|
gossipSub.fanout[topic] = initHashSet[string]()
|
|
gossipSub.gossipsub[topic] = initHashSet[string]()
|
|
var conns = newSeq[Connection]()
|
|
for i in 0..<30:
|
|
let conn = newConnection(newBufferStream())
|
|
conns &= conn
|
|
let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
|
|
conn.peerInfo = peerInfo
|
|
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
|
|
gossipSub.peers[peerInfo.id].handler = handler
|
|
if i mod 2 == 0:
|
|
gossipSub.fanout[topic].incl(peerInfo.id)
|
|
else:
|
|
gossipSub.gossipsub[topic].incl(peerInfo.id)
|
|
|
|
let peers = gossipSub.getGossipPeers()
|
|
check peers.len == GossipSubD
|
|
|
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
|
|
|
result = true
|
|
|
|
check:
|
|
waitFor(testRun()) == true
|
|
|
|
test "`getGossipPeers` - should not crash on missing topics in gossip":
|
|
proc testRun(): Future[bool] {.async.} =
|
|
let gossipSub = newPubSub(TestGossipSub,
|
|
PeerInfo.init(PrivateKey.random(RSA)))
|
|
|
|
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
|
|
discard
|
|
|
|
let topic = "foobar"
|
|
gossipSub.mesh[topic] = initHashSet[string]()
|
|
gossipSub.gossipsub[topic] = initHashSet[string]()
|
|
var conns = newSeq[Connection]()
|
|
for i in 0..<30:
|
|
let conn = newConnection(newBufferStream())
|
|
conns &= conn
|
|
let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
|
|
conn.peerInfo = peerInfo
|
|
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
|
|
gossipSub.peers[peerInfo.id].handler = handler
|
|
if i mod 2 == 0:
|
|
gossipSub.mesh[topic].incl(peerInfo.id)
|
|
else:
|
|
gossipSub.gossipsub[topic].incl(peerInfo.id)
|
|
|
|
let peers = gossipSub.getGossipPeers()
|
|
check peers.len == GossipSubD
|
|
|
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
|
|
|
result = true
|
|
|
|
check:
|
|
waitFor(testRun()) == true
|
|
|
|
test "`getGossipPeers` - should not crash on missing topics in gossip":
|
|
proc testRun(): Future[bool] {.async.} =
|
|
let gossipSub = newPubSub(TestGossipSub,
|
|
PeerInfo.init(PrivateKey.random(RSA)))
|
|
|
|
proc handler(peer: PubSubPeer, msg: seq[RPCMsg]) {.async.} =
|
|
discard
|
|
|
|
let topic = "foobar"
|
|
gossipSub.mesh[topic] = initHashSet[string]()
|
|
gossipSub.fanout[topic] = initHashSet[string]()
|
|
var conns = newSeq[Connection]()
|
|
for i in 0..<30:
|
|
let conn = newConnection(newBufferStream())
|
|
conns &= conn
|
|
let peerInfo = PeerInfo.init(PrivateKey.random(RSA))
|
|
conn.peerInfo = peerInfo
|
|
gossipSub.peers[peerInfo.id] = newPubSubPeer(peerInfo, GossipSubCodec)
|
|
gossipSub.peers[peerInfo.id].handler = handler
|
|
if i mod 2 == 0:
|
|
gossipSub.mesh[topic].incl(peerInfo.id)
|
|
else:
|
|
gossipSub.fanout[topic].incl(peerInfo.id)
|
|
|
|
let peers = gossipSub.getGossipPeers()
|
|
check peers.len == 0
|
|
|
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
|
|
|
result = true
|
|
|
|
check:
|
|
waitFor(testRun()) == true
|