Gossipsub refactor pt2 (#495)

* add sub/unsub test

* remove unused variable from gossip
This commit is contained in:
Giovanni Petrantoni 2020-12-20 00:45:34 +09:00 committed by GitHub
parent f970155d3b
commit 4858e0ab15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 8 deletions

View File

@ -166,9 +166,6 @@ type
heartbeatEvents*: seq[AsyncEvent] heartbeatEvents*: seq[AsyncEvent]
when not defined(release):
prunedPeers: HashSet[PubSubPeer]
when defined(libp2p_expensive_metrics): when defined(libp2p_expensive_metrics):
declareGauge(libp2p_gossipsub_peers_per_topic_mesh, declareGauge(libp2p_gossipsub_peers_per_topic_mesh,
"gossipsub peers per topic in mesh", "gossipsub peers per topic in mesh",
@ -339,9 +336,6 @@ proc grafted(g: GossipSub, p: PubSubPeer, topic: string) =
proc pruned(g: GossipSub, p: PubSubPeer, topic: string) = proc pruned(g: GossipSub, p: PubSubPeer, topic: string) =
g.peerStats.withValue(p.peerId, stats): g.peerStats.withValue(p.peerId, stats):
when not defined(release):
g.prunedPeers.incl(p)
if topic in stats.topicInfos: if topic in stats.topicInfos:
var info = stats.topicInfos[topic] var info = stats.topicInfos[topic]
let topicParams = g.topicParams.mgetOrPut(topic, TopicParams.init()) let topicParams = g.topicParams.mgetOrPut(topic, TopicParams.init())
@ -1270,12 +1264,12 @@ method unsubscribeAll*(g: GossipSub, topic: string) =
# send to peers NOT in mesh first # send to peers NOT in mesh first
g.broadcast(toSeq(gpeers), msg) g.broadcast(toSeq(gpeers), msg)
g.mesh.del(topic)
for peer in mpeers: for peer in mpeers:
trace "pruning unsubscribeAll call peer", peer, score = peer.score trace "pruning unsubscribeAll call peer", peer, score = peer.score
g.pruned(peer, topic) g.pruned(peer, topic)
g.mesh.del(topic)
msg.control = msg.control =
some(ControlMessage(prune: some(ControlMessage(prune:
@[ControlPrune(topicID: topic, @[ControlPrune(topicID: topic,

View File

@ -30,6 +30,48 @@ suite "GossipSub internal":
teardown: teardown:
checkTrackers() checkTrackers()
asyncTest "subscribe/unsubscribeAll":
let gossipSub = TestGossipSub.init(newStandardSwitch())
proc handler(topic: string, data: seq[byte]): Future[void] {.gcsafe.} =
discard
let topic = "foobar"
gossipSub.mesh[topic] = initHashSet[PubSubPeer]()
gossipSub.topicParams[topic] = TopicParams.init()
var conns = newSeq[Connection]()
gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]()
for i in 0..<15:
let conn = newBufferStream(noop)
conns &= conn
let peerInfo = randomPeerInfo()
conn.peerInfo = peerInfo
let peer = gossipSub.getPubSubPeer(peerInfo.peerId)
peer.sendConn = conn
gossipSub.onNewPeer(peer)
gossipSub.peers[peerInfo.peerId] = peer
gossipSub.gossipsub[topic].incl(peer)
# test via dynamic dispatch
gossipSub.PubSub.subscribe(topic, handler)
check:
gossipSub.topics.contains(topic)
gossipSub.gossipsub[topic].len() > 0
gossipSub.mesh[topic].len() > 0
# test via dynamic dispatch
gossipSub.PubSub.unsubscribeAll(topic)
check:
topic notin gossipSub.topics # not in local topics
topic notin gossipSub.mesh # not in mesh
topic in gossipSub.gossipsub # but still in gossipsub table (for fanning out)
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()
asyncTest "topic params": asyncTest "topic params":
let params = TopicParams.init() let params = TopicParams.init()
params.validateParameters().tryGet() params.validateParameters().tryGet()