mirror of
https://github.com/codex-storage/nim-libp2p.git
synced 2025-01-12 12:04:08 +00:00
0959877b29
* master merge * wip * avoid deadlocks * tcp limits * expose client field in chronosstream * limit incoming connections * update with new listen api * fix release * don't override peerinfo in connection * rework transport with accept * use semaphore to track resource ussage * rework with new transport accept api * move events to conn manager (#373) * use semaphore to track resource ussage * merge master * expose api to acquire conn slots * don't fail expensive metrics * allow tracking and updating connections * set global connection limits to 80 * add per peer connection limits * make sure conn is closed if tracking failed * more descriptive naming for handle * rework with new transport accept api * add `getStream` hide `selectConn` * add TransportClosedError * make nil explicit * don't make unnecessary copies of message * logging * error handling * cleanup semaphore * track connections properly * throw `TooManyConnections` when tracking outgoing * use proper exception and handle conventions * check onCloseHandle for nil * revert internalConnect changes * adding upgraded flag * await stream before closing * simplify tracking * wip * logging * split connection limits into incoming and outgoing * further streamline connection limits split counts * don't use closeWithEOF * move peer and conn event triggers from switch * wip * wip * wip * merge master * handle nil connections properly * add clarifying comment * don't raise exc on nil * no finally * add proper min/max connections logic * rebase master * merge master * master merge * remove request timeout should be addressed in separate PR * merge master * share semaphore when in/out limits arent enforced * merge master * use import * pass semaphore to trackConn * don't close last conn * use storeConn * merge master * use storeConn
488 lines
16 KiB
Nim
488 lines
16 KiB
Nim
include ../../libp2p/protocols/pubsub/gossipsub
|
|
|
|
{.used.}
|
|
|
|
import options
|
|
import unittest, bearssl
|
|
import stew/byteutils
|
|
import ../../libp2p/standard_setup
|
|
import ../../libp2p/errors
|
|
import ../../libp2p/crypto/crypto
|
|
import ../../libp2p/stream/bufferstream
|
|
import ../../libp2p/switch
|
|
|
|
import ../helpers
|
|
|
|
type
|
|
TestGossipSub = ref object of GossipSub
|
|
|
|
proc noop(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
proc getPubSubPeer(p: TestGossipSub, peerId: PeerID): auto =
|
|
proc getConn(): Future[Connection] =
|
|
p.switch.dial(peerId, GossipSubCodec)
|
|
|
|
newPubSubPeer(peerId, getConn, nil, GossipSubCodec)
|
|
|
|
proc randomPeerInfo(): PeerInfo =
|
|
PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
|
|
|
|
suite "GossipSub internal":
|
|
teardown:
|
|
checkTrackers()
|
|
|
|
asyncTest "subscribe/unsubscribeAll":
|
|
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
|
|
|
proc handler(topic: string, data: seq[byte]): Future[void] {.gcsafe.} =
|
|
discard
|
|
|
|
let topic = "foobar"
|
|
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
|
gossipSub.topicParams[topic] = TopicParams.init()
|
|
|
|
var conns = newSeq[Connection]()
|
|
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
|
for i in 0..<15:
|
|
let conn = newBufferStream(noop)
|
|
conns &= conn
|
|
let peerInfo = randomPeerInfo()
|
|
conn.peerInfo = peerInfo
|
|
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
|
|
peer.sendConn = conn
|
|
gossipSub.onNewPeer(peer)
|
|
gossipSub.peers[peerInfo.peerId] = peer
|
|
gossipSub.gossipsub[topic].incl(peer)
|
|
|
|
# test via dynamic dispatch
|
|
gossipSub.PubSub.subscribe(topic, handler)
|
|
|
|
check:
|
|
gossipSub.topics.contains(topic)
|
|
gossipSub.gossipsub[topic].len() > 0
|
|
gossipSub.mesh[topic].len() > 0
|
|
|
|
# test via dynamic dispatch
|
|
gossipSub.PubSub.unsubscribeAll(topic)
|
|
|
|
check:
|
|
topic notin gossipSub.topics # not in local topics
|
|
topic notin gossipSub.mesh # not in mesh
|
|
topic in gossipSub.gossipsub # but still in gossipsub table (for fanning out)
|
|
|
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
|
await gossipSub.switch.stop()
|
|
|
|
asyncTest "topic params":
|
|
let params = TopicParams.init()
|
|
params.validateParameters().tryGet()
|
|
|
|
asyncTest "`rebalanceMesh` Degree Lo":
|
|
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
|
|
|
let topic = "foobar"
|
|
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
|
gossipSub.topicParams[topic] = TopicParams.init()
|
|
|
|
var conns = newSeq[Connection]()
|
|
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
|
for i in 0..<15:
|
|
let conn = newBufferStream(noop)
|
|
conns &= conn
|
|
let peerInfo = randomPeerInfo()
|
|
conn.peerInfo = peerInfo
|
|
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
|
|
peer.sendConn = conn
|
|
gossipSub.onNewPeer(peer)
|
|
gossipSub.peers[peerInfo.peerId] = peer
|
|
gossipSub.gossipsub[topic].incl(peer)
|
|
|
|
check gossipSub.peers.len == 15
|
|
gossipSub.rebalanceMesh(topic)
|
|
check gossipSub.mesh[topic].len == gossipSub.parameters.d # + 2 # account opportunistic grafts
|
|
|
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
|
await gossipSub.switch.stop()
|
|
|
|
asyncTest "`rebalanceMesh` Degree Hi":
|
|
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
|
|
|
let topic = "foobar"
|
|
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
|
gossipSub.topicParams[topic] = TopicParams.init()
|
|
|
|
var conns = newSeq[Connection]()
|
|
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
|
for i in 0..<15:
|
|
let conn = newBufferStream(noop)
|
|
conns &= conn
|
|
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
|
|
conn.peerInfo = peerInfo
|
|
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
|
|
gossipSub.onNewPeer(peer)
|
|
gossipSub.grafted(peer, topic)
|
|
gossipSub.peers[peerInfo.peerId] = peer
|
|
gossipSub.mesh[topic].incl(peer)
|
|
|
|
check gossipSub.mesh[topic].len == 15
|
|
gossipSub.rebalanceMesh(topic)
|
|
check gossipSub.mesh[topic].len == gossipSub.parameters.d + gossipSub.parameters.dScore
|
|
|
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
|
await gossipSub.switch.stop()
|
|
|
|
asyncTest "`replenishFanout` Degree Lo":
|
|
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
|
|
|
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
|
|
discard
|
|
|
|
let topic = "foobar"
|
|
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
|
gossipSub.topicParams[topic] = TopicParams.init()
|
|
|
|
var conns = newSeq[Connection]()
|
|
for i in 0..<15:
|
|
let conn = newBufferStream(noop)
|
|
conns &= conn
|
|
var peerInfo = randomPeerInfo()
|
|
conn.peerInfo = peerInfo
|
|
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
|
|
gossipSub.onNewPeer(peer)
|
|
peer.handler = handler
|
|
gossipSub.gossipsub[topic].incl(peer)
|
|
|
|
check gossipSub.gossipsub[topic].len == 15
|
|
gossipSub.replenishFanout(topic)
|
|
check gossipSub.fanout[topic].len == gossipSub.parameters.d
|
|
|
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
|
await gossipSub.switch.stop()
|
|
|
|
asyncTest "`dropFanoutPeers` drop expired fanout topics":
|
|
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
|
|
|
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
|
|
discard
|
|
|
|
let topic = "foobar"
|
|
gossipSub.topicParams[topic] = TopicParams.init()
|
|
gossipSub.fanout[topic] = initHashSet[PubSubPeer]()
|
|
gossipSub.lastFanoutPubSub[topic] = Moment.fromNow(1.millis)
|
|
await sleepAsync(5.millis) # allow the topic to expire
|
|
|
|
var conns = newSeq[Connection]()
|
|
for i in 0..<6:
|
|
let conn = newBufferStream(noop)
|
|
conns &= conn
|
|
let peerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
|
|
conn.peerInfo = peerInfo
|
|
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
|
|
gossipSub.onNewPeer(peer)
|
|
peer.handler = handler
|
|
gossipSub.fanout[topic].incl(peer)
|
|
|
|
check gossipSub.fanout[topic].len == gossipSub.parameters.d
|
|
|
|
gossipSub.dropFanoutPeers()
|
|
check topic notin gossipSub.fanout
|
|
|
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
|
await gossipSub.switch.stop()
|
|
|
|
asyncTest "`dropFanoutPeers` leave unexpired fanout topics":
|
|
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
|
|
|
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
|
|
discard
|
|
|
|
let topic1 = "foobar1"
|
|
let topic2 = "foobar2"
|
|
gossipSub.topicParams[topic1] = TopicParams.init()
|
|
gossipSub.topicParams[topic2] = TopicParams.init()
|
|
gossipSub.fanout[topic1] = initHashSet[PubSubPeer]()
|
|
gossipSub.fanout[topic2] = initHashSet[PubSubPeer]()
|
|
gossipSub.lastFanoutPubSub[topic1] = Moment.fromNow(1.millis)
|
|
gossipSub.lastFanoutPubSub[topic2] = Moment.fromNow(1.minutes)
|
|
await sleepAsync(5.millis) # allow the topic to expire
|
|
|
|
var conns = newSeq[Connection]()
|
|
for i in 0..<6:
|
|
let conn = newBufferStream(noop)
|
|
conns &= conn
|
|
let peerInfo = randomPeerInfo()
|
|
conn.peerInfo = peerInfo
|
|
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
|
|
gossipSub.onNewPeer(peer)
|
|
peer.handler = handler
|
|
gossipSub.fanout[topic1].incl(peer)
|
|
gossipSub.fanout[topic2].incl(peer)
|
|
|
|
check gossipSub.fanout[topic1].len == gossipSub.parameters.d
|
|
check gossipSub.fanout[topic2].len == gossipSub.parameters.d
|
|
|
|
gossipSub.dropFanoutPeers()
|
|
check topic1 notin gossipSub.fanout
|
|
check topic2 in gossipSub.fanout
|
|
|
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
|
await gossipSub.switch.stop()
|
|
|
|
asyncTest "`getGossipPeers` - should gather up to degree D non intersecting peers":
|
|
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
|
|
|
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
|
|
discard
|
|
|
|
let topic = "foobar"
|
|
gossipSub.topicParams[topic] = TopicParams.init()
|
|
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
|
gossipSub.fanout[topic] = initHashSet[PubSubPeer]()
|
|
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
|
var conns = newSeq[Connection]()
|
|
|
|
# generate mesh and fanout peers
|
|
for i in 0..<30:
|
|
let conn = newBufferStream(noop)
|
|
conns &= conn
|
|
let peerInfo = randomPeerInfo()
|
|
conn.peerInfo = peerInfo
|
|
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
|
|
gossipSub.onNewPeer(peer)
|
|
peer.handler = handler
|
|
if i mod 2 == 0:
|
|
gossipSub.fanout[topic].incl(peer)
|
|
else:
|
|
gossipSub.grafted(peer, topic)
|
|
gossipSub.mesh[topic].incl(peer)
|
|
|
|
# generate gossipsub (free standing) peers
|
|
for i in 0..<15:
|
|
let conn = newBufferStream(noop)
|
|
conns &= conn
|
|
let peerInfo = randomPeerInfo()
|
|
conn.peerInfo = peerInfo
|
|
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
|
|
gossipSub.onNewPeer(peer)
|
|
peer.handler = handler
|
|
gossipSub.gossipsub[topic].incl(peer)
|
|
|
|
# generate messages
|
|
var seqno = 0'u64
|
|
for i in 0..5:
|
|
let conn = newBufferStream(noop)
|
|
conns &= conn
|
|
let peerInfo = randomPeerInfo()
|
|
conn.peerInfo = peerInfo
|
|
inc seqno
|
|
let msg = Message.init(some(peerInfo), ("HELLO" & $i).toBytes(), topic, some(seqno), false)
|
|
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
|
|
|
|
check gossipSub.fanout[topic].len == 15
|
|
check gossipSub.mesh[topic].len == 15
|
|
check gossipSub.gossipsub[topic].len == 15
|
|
|
|
let peers = gossipSub.getGossipPeers()
|
|
check peers.len == gossipSub.parameters.d
|
|
for p in peers.keys:
|
|
check not gossipSub.fanout.hasPeerID(topic, p.peerId)
|
|
check not gossipSub.mesh.hasPeerID(topic, p.peerId)
|
|
|
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
|
await gossipSub.switch.stop()
|
|
|
|
asyncTest "`getGossipPeers` - should not crash on missing topics in mesh":
|
|
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
|
|
|
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
|
|
discard
|
|
|
|
let topic = "foobar"
|
|
gossipSub.topicParams[topic] = TopicParams.init()
|
|
gossipSub.fanout[topic] = initHashSet[PubSubPeer]()
|
|
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
|
var conns = newSeq[Connection]()
|
|
for i in 0..<30:
|
|
let conn = newBufferStream(noop)
|
|
conns &= conn
|
|
let peerInfo = randomPeerInfo()
|
|
conn.peerInfo = peerInfo
|
|
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
|
|
gossipSub.onNewPeer(peer)
|
|
peer.handler = handler
|
|
if i mod 2 == 0:
|
|
gossipSub.fanout[topic].incl(peer)
|
|
else:
|
|
gossipSub.gossipsub[topic].incl(peer)
|
|
|
|
# generate messages
|
|
var seqno = 0'u64
|
|
for i in 0..5:
|
|
let conn = newBufferStream(noop)
|
|
conns &= conn
|
|
let peerInfo = randomPeerInfo()
|
|
conn.peerInfo = peerInfo
|
|
inc seqno
|
|
let msg = Message.init(some(peerInfo), ("HELLO" & $i).toBytes(), topic, some(seqno), false)
|
|
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
|
|
|
|
let peers = gossipSub.getGossipPeers()
|
|
check peers.len == gossipSub.parameters.d
|
|
|
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
|
await gossipSub.switch.stop()
|
|
|
|
asyncTest "`getGossipPeers` - should not crash on missing topics in fanout":
|
|
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
|
|
|
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
|
|
discard
|
|
|
|
let topic = "foobar"
|
|
gossipSub.topicParams[topic] = TopicParams.init()
|
|
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
|
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
|
|
var conns = newSeq[Connection]()
|
|
for i in 0..<30:
|
|
let conn = newBufferStream(noop)
|
|
conns &= conn
|
|
let peerInfo = randomPeerInfo()
|
|
conn.peerInfo = peerInfo
|
|
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
|
|
gossipSub.onNewPeer(peer)
|
|
peer.handler = handler
|
|
if i mod 2 == 0:
|
|
gossipSub.mesh[topic].incl(peer)
|
|
gossipSub.grafted(peer, topic)
|
|
else:
|
|
gossipSub.gossipsub[topic].incl(peer)
|
|
|
|
# generate messages
|
|
var seqno = 0'u64
|
|
for i in 0..5:
|
|
let conn = newBufferStream(noop)
|
|
conns &= conn
|
|
let peerInfo = randomPeerInfo()
|
|
conn.peerInfo = peerInfo
|
|
inc seqno
|
|
let msg = Message.init(some(peerInfo), ("HELLO" & $i).toBytes(), topic, some(seqno), false)
|
|
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
|
|
|
|
let peers = gossipSub.getGossipPeers()
|
|
check peers.len == gossipSub.parameters.d
|
|
|
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
|
await gossipSub.switch.stop()
|
|
|
|
asyncTest "`getGossipPeers` - should not crash on missing topics in gossip":
|
|
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
|
|
|
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
|
|
discard
|
|
|
|
let topic = "foobar"
|
|
gossipSub.topicParams[topic] = TopicParams.init()
|
|
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
|
|
gossipSub.fanout[topic] = initHashSet[PubSubPeer]()
|
|
var conns = newSeq[Connection]()
|
|
for i in 0..<30:
|
|
let conn = newBufferStream(noop)
|
|
conns &= conn
|
|
let peerInfo = randomPeerInfo()
|
|
conn.peerInfo = peerInfo
|
|
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
|
|
gossipSub.onNewPeer(peer)
|
|
peer.handler = handler
|
|
if i mod 2 == 0:
|
|
gossipSub.mesh[topic].incl(peer)
|
|
gossipSub.grafted(peer, topic)
|
|
else:
|
|
gossipSub.fanout[topic].incl(peer)
|
|
|
|
# generate messages
|
|
var seqno = 0'u64
|
|
for i in 0..5:
|
|
let conn = newBufferStream(noop)
|
|
conns &= conn
|
|
let peerInfo = randomPeerInfo()
|
|
conn.peerInfo = peerInfo
|
|
inc seqno
|
|
let msg = Message.init(some(peerInfo), ("bar" & $i).toBytes(), topic, some(seqno), false)
|
|
gossipSub.mcache.put(gossipSub.msgIdProvider(msg), msg)
|
|
|
|
let peers = gossipSub.getGossipPeers()
|
|
check peers.len == 0
|
|
|
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
|
await gossipSub.switch.stop()
|
|
|
|
asyncTest "Drop messages of topics without subscription":
|
|
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
|
|
|
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
|
|
check false
|
|
|
|
let topic = "foobar"
|
|
var conns = newSeq[Connection]()
|
|
for i in 0..<30:
|
|
let conn = newBufferStream(noop)
|
|
conns &= conn
|
|
let peerInfo = randomPeerInfo()
|
|
conn.peerInfo = peerInfo
|
|
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
|
|
gossipSub.onNewPeer(peer)
|
|
peer.handler = handler
|
|
|
|
# generate messages
|
|
var seqno = 0'u64
|
|
for i in 0..5:
|
|
let conn = newBufferStream(noop)
|
|
conns &= conn
|
|
let peerInfo = randomPeerInfo()
|
|
conn.peerInfo = peerInfo
|
|
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
|
|
inc seqno
|
|
let msg = Message.init(some(peerInfo), ("bar" & $i).toBytes(), topic, some(seqno), false)
|
|
await gossipSub.rpcHandler(peer, RPCMsg(messages: @[msg]))
|
|
|
|
check gossipSub.mcache.msgs.len == 0
|
|
|
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
|
await gossipSub.switch.stop()
|
|
|
|
asyncTest "Disconnect bad peers":
|
|
let gossipSub = TestGossipSub.init(newStandardSwitch())
|
|
gossipSub.parameters.disconnectBadPeers = true
|
|
|
|
proc handler(peer: PubSubPeer, msg: RPCMsg) {.async.} =
|
|
check false
|
|
|
|
let topic = "foobar"
|
|
var conns = newSeq[Connection]()
|
|
for i in 0..<30:
|
|
let conn = newBufferStream(noop)
|
|
conns &= conn
|
|
let peerInfo = randomPeerInfo()
|
|
conn.peerInfo = peerInfo
|
|
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
|
|
gossipSub.onNewPeer(peer)
|
|
peer.sendConn = conn
|
|
peer.handler = handler
|
|
peer.score = gossipSub.parameters.graylistThreshold - 1
|
|
gossipSub.gossipsub.mgetOrPut(topic, initHashSet[PubSubPeer]()).incl(peer)
|
|
gossipSub.peers[peerInfo.peerId] = peer
|
|
gossipSub.switch.connManager.storeConn(conn)
|
|
|
|
gossipSub.updateScores()
|
|
|
|
await sleepAsync(100.millis)
|
|
|
|
check:
|
|
# test our disconnect mechanics
|
|
gossipSub.gossipsub.peers(topic) == 0
|
|
# also ensure we cleanup properly the peersInIP table
|
|
gossipSub.peersInIP.len == 0
|
|
|
|
await allFuturesThrowing(conns.mapIt(it.close()))
|
|
await gossipSub.switch.stop()
|