implement peer exchange graft message

This commit is contained in:
Giovanni Petrantoni 2020-08-13 20:41:54 +09:00
parent 05463873a2
commit 585cfc5996

View File

@ -326,6 +326,13 @@ proc pruned(g: GossipSub, p: PubSubPeer, topic: string) =
else:
doAssert(false, "pruned: TopicInfo key not found for " & $p)
proc peerExchangeList(g: GossipSub, topic: string): seq[PeerInfoMsg] =
var peers = g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()).toSeq()
peers.keepIf do (x: PubSubPeer) -> bool:
x.score >= 0.0
peers.map do (x: PubSubPeer) -> PeerInfoMsg:
PeerInfoMsg(peerID: x.peerId.getBytes())
proc replenishFanout(g: GossipSub, topic: string) =
## get fanout peers for a topic
trace "about to replenish fanout"
@ -451,7 +458,11 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
# Send changes to peers after table updates to avoid stale state
let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))
let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))
let prune = RPCMsg(control: some(ControlMessage(
prune: @[ControlPrune(
topicID: topic,
peers: g.peerExchangeList(topic),
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
discard await g.broadcast(grafting, graft, DefaultSendTimeout)
discard await g.broadcast(prunes, prune, DefaultSendTimeout)
@ -615,7 +626,11 @@ proc heartbeat(g: GossipSub) {.async.} =
g.pruned(peer, t)
g.mesh.removePeer(t, peer)
prunes &= peer
let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: t)])))
let prune = RPCMsg(control: some(ControlMessage(
prune: @[ControlPrune(
topicID: t,
peers: g.peerExchangeList(t),
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
discard await g.broadcast(prunes, prune, DefaultSendTimeout)
await g.rebalanceMesh(t)
@ -777,10 +792,16 @@ proc handleGraft(g: GossipSub,
else:
trace "peer already in mesh"
else:
result.add(ControlPrune(topicID: topic))
result.add(ControlPrune(
topicID: topic,
peers: g.peerExchangeList(topic),
backoff: g.parameters.pruneBackoff.seconds.uint64))
else:
trace "peer grafting topic we're not interested in"
result.add(ControlPrune(topicID: topic))
result.add(ControlPrune(
topicID: topic,
peers: g.peerExchangeList(topic),
backoff: g.parameters.pruneBackoff.seconds.uint64))
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_mesh
@ -969,7 +990,11 @@ method unsubscribe*(g: GossipSub,
g.mesh.del(topic)
for peer in peers:
g.pruned(peer, topic)
let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))
let prune = RPCMsg(control: some(ControlMessage(
prune: @[ControlPrune(
topicID: topic,
peers: g.peerExchangeList(topic),
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
discard await g.broadcast(toSeq(peers), prune, DefaultSendTimeout)
method unsubscribeAll*(g: GossipSub, topic: string) {.async.} =
@ -980,7 +1005,11 @@ method unsubscribeAll*(g: GossipSub, topic: string) {.async.} =
g.mesh.del(topic)
for peer in peers:
g.pruned(peer, topic)
let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))
let prune = RPCMsg(control: some(ControlMessage(
prune: @[ControlPrune(
topicID: topic,
peers: g.peerExchangeList(topic),
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
discard await g.broadcast(toSeq(peers), prune, DefaultSendTimeout)
method publish*(g: GossipSub,