diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 7584d2c6e..01a3f246a 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -212,8 +212,9 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = for peer in newPeers: # send a graft message to the peer - grafts.add peer - discard g.mesh.addPeer(topic, peer) + grafts.add(peer) + if g.mesh.addPeer(topic, peer): + peer.grafted(topic) trace "got peer", peer = $peer if g.mesh.peers(topic) > GossipSubDhi: @@ -238,6 +239,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = trace "pruning peers", peers = g.mesh.peers(topic) # send a graft message to the peer + peer.pruned(topic) g.mesh.removePeer(topic, peer) prunes.add(peer) @@ -436,6 +438,7 @@ proc handleGraft(g: GossipSub, # peer will be removed from the mesh on next rebalance, so we don't want # this peer to push someone else out if g.mesh.addPeer(topic, peer): + peer.grafted(topic) g.fanout.removePeer(topic, peer) else: trace "Peer already in mesh", topic, peer = $peer @@ -452,6 +455,7 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = trace "processing prune message", peer = $peer, topicID = prune.topicID + peer.pruned(prune.topicID) g.mesh.removePeer(prune.topicID, peer) libp2p_gossipsub_peers_per_topic_mesh .set(g.mesh.peers(prune.topicID).int64, labelValues = [prune.topicID]) @@ -560,6 +564,11 @@ method subscribe*(g: GossipSub, topic: string, handler: TopicHandler) {.async.} = await procCall PubSub(g).subscribe(topic, handler) + + # if we have a fanout on this topic break it + if topic in g.fanout: + g.fanout.del(topic) + await g.rebalanceMesh(topic) method unsubscribe*(g: GossipSub, @@ -573,6 +582,7 @@ method unsubscribe*(g: GossipSub, g.mesh.del(topic) for peer in peers: + peer.pruned(topic) await peer.sendPrune(@[topic]) method publish*(g: GossipSub, diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 07a75dcef..b313d3cec 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -31,6 +31,11 @@ type onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} + TopicInfo = object + # gossip 1.1 related + graftTime*: Moment + meshTime*: Duration + PubSubPeer* = ref object of RootObj proto*: string # the protocol that this peer joined from sendConn: Connection @@ -42,10 +47,7 @@ type onConnect*: AsyncEvent observers*: ref seq[PubSubObserver] # ref as in smart_ptr refs: int # how many active connections this peer has - - # gossip 1.1 related - graftTime*: Moment - meshTime*: Duration + topicInfos: Table[string, TopicInfo] RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} @@ -217,6 +219,16 @@ proc sendPrune*(p: PubSubPeer, topics: seq[string]): Future[void] = proc `$`*(p: PubSubPeer): string = p.id +proc grafted*(p: PubSubPeer, topic: string) = + var info = p.topicInfos.mgetOrPut(topic, TopicInfo()) + info.graftTime = Moment.now() + info.meshTime = 0.seconds + +proc pruned*(p: PubSubPeer, topic: string) = + var _ = p.topicInfos.mgetOrPut(topic, TopicInfo()) + # TODO + + proc newPubSubPeer*(peerInfo: PeerInfo, proto: string): PubSubPeer = new result