grafted and pruned procs

This commit is contained in:
Giovanni Petrantoni 2020-07-16 22:32:43 +09:00
parent 19d3d57d4c
commit 6f638259ee
2 changed files with 28 additions and 6 deletions

View File

@ -212,8 +212,9 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
for peer in newPeers: for peer in newPeers:
# send a graft message to the peer # send a graft message to the peer
grafts.add peer grafts.add(peer)
discard g.mesh.addPeer(topic, peer) if g.mesh.addPeer(topic, peer):
peer.grafted(topic)
trace "got peer", peer = $peer trace "got peer", peer = $peer
if g.mesh.peers(topic) > GossipSubDhi: if g.mesh.peers(topic) > GossipSubDhi:
@ -238,6 +239,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
trace "pruning peers", peers = g.mesh.peers(topic) trace "pruning peers", peers = g.mesh.peers(topic)
# send a graft message to the peer # send a graft message to the peer
peer.pruned(topic)
g.mesh.removePeer(topic, peer) g.mesh.removePeer(topic, peer)
prunes.add(peer) prunes.add(peer)
@ -436,6 +438,7 @@ proc handleGraft(g: GossipSub,
# peer will be removed from the mesh on next rebalance, so we don't want # peer will be removed from the mesh on next rebalance, so we don't want
# this peer to push someone else out # this peer to push someone else out
if g.mesh.addPeer(topic, peer): if g.mesh.addPeer(topic, peer):
peer.grafted(topic)
g.fanout.removePeer(topic, peer) g.fanout.removePeer(topic, peer)
else: else:
trace "Peer already in mesh", topic, peer = $peer trace "Peer already in mesh", topic, peer = $peer
@ -452,6 +455,7 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
trace "processing prune message", peer = $peer, trace "processing prune message", peer = $peer,
topicID = prune.topicID topicID = prune.topicID
peer.pruned(prune.topicID)
g.mesh.removePeer(prune.topicID, peer) g.mesh.removePeer(prune.topicID, peer)
libp2p_gossipsub_peers_per_topic_mesh libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(prune.topicID).int64, labelValues = [prune.topicID]) .set(g.mesh.peers(prune.topicID).int64, labelValues = [prune.topicID])
@ -560,6 +564,11 @@ method subscribe*(g: GossipSub,
topic: string, topic: string,
handler: TopicHandler) {.async.} = handler: TopicHandler) {.async.} =
await procCall PubSub(g).subscribe(topic, handler) await procCall PubSub(g).subscribe(topic, handler)
# if we have a fanout on this topic break it
if topic in g.fanout:
g.fanout.del(topic)
await g.rebalanceMesh(topic) await g.rebalanceMesh(topic)
method unsubscribe*(g: GossipSub, method unsubscribe*(g: GossipSub,
@ -573,6 +582,7 @@ method unsubscribe*(g: GossipSub,
g.mesh.del(topic) g.mesh.del(topic)
for peer in peers: for peer in peers:
peer.pruned(topic)
await peer.sendPrune(@[topic]) await peer.sendPrune(@[topic])
method publish*(g: GossipSub, method publish*(g: GossipSub,

View File

@ -31,6 +31,11 @@ type
onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
TopicInfo = object
# gossip 1.1 related
graftTime*: Moment
meshTime*: Duration
PubSubPeer* = ref object of RootObj PubSubPeer* = ref object of RootObj
proto*: string # the protocol that this peer joined from proto*: string # the protocol that this peer joined from
sendConn: Connection sendConn: Connection
@ -42,10 +47,7 @@ type
onConnect*: AsyncEvent onConnect*: AsyncEvent
observers*: ref seq[PubSubObserver] # ref as in smart_ptr observers*: ref seq[PubSubObserver] # ref as in smart_ptr
refs: int # how many active connections this peer has refs: int # how many active connections this peer has
topicInfos: Table[string, TopicInfo]
# gossip 1.1 related
graftTime*: Moment
meshTime*: Duration
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
@ -217,6 +219,16 @@ proc sendPrune*(p: PubSubPeer, topics: seq[string]): Future[void] =
proc `$`*(p: PubSubPeer): string = proc `$`*(p: PubSubPeer): string =
p.id p.id
proc grafted*(p: PubSubPeer, topic: string) =
var info = p.topicInfos.mgetOrPut(topic, TopicInfo())
info.graftTime = Moment.now()
info.meshTime = 0.seconds
proc pruned*(p: PubSubPeer, topic: string) =
var _ = p.topicInfos.mgetOrPut(topic, TopicInfo())
# TODO
proc newPubSubPeer*(peerInfo: PeerInfo, proc newPubSubPeer*(peerInfo: PeerInfo,
proto: string): PubSubPeer = proto: string): PubSubPeer =
new result new result