From 585cfc5996d90a3eed4670e05d679224fcedfdfc Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni Date: Thu, 13 Aug 2020 20:41:54 +0900 Subject: [PATCH] implement peer exchange graft message --- libp2p/protocols/pubsub/gossipsub.nim | 41 +++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 2e0d26c62..8141701a1 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -326,6 +326,13 @@ proc pruned(g: GossipSub, p: PubSubPeer, topic: string) = else: doAssert(false, "pruned: TopicInfo key not found for " & $p) +proc peerExchangeList(g: GossipSub, topic: string): seq[PeerInfoMsg] = + var peers = g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()).toSeq() + peers.keepIf do (x: PubSubPeer) -> bool: + x.score >= 0.0 + peers.map do (x: PubSubPeer) -> PeerInfoMsg: + PeerInfoMsg(peerID: x.peerId.getBytes()) + proc replenishFanout(g: GossipSub, topic: string) = ## get fanout peers for a topic trace "about to replenish fanout" @@ -451,7 +458,11 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = # Send changes to peers after table updates to avoid stale state let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)]))) - let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) + let prune = RPCMsg(control: some(ControlMessage( + prune: @[ControlPrune( + topicID: topic, + peers: g.peerExchangeList(topic), + backoff: g.parameters.pruneBackoff.seconds.uint64)]))) discard await g.broadcast(grafting, graft, DefaultSendTimeout) discard await g.broadcast(prunes, prune, DefaultSendTimeout) @@ -615,7 +626,11 @@ proc heartbeat(g: GossipSub) {.async.} = g.pruned(peer, t) g.mesh.removePeer(t, peer) prunes &= peer - let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: t)]))) + let prune = RPCMsg(control: some(ControlMessage( + prune: @[ControlPrune( + topicID: t, + peers: g.peerExchangeList(t), + backoff: g.parameters.pruneBackoff.seconds.uint64)]))) discard await g.broadcast(prunes, prune, DefaultSendTimeout) await g.rebalanceMesh(t) @@ -777,10 +792,16 @@ proc handleGraft(g: GossipSub, else: trace "peer already in mesh" else: - result.add(ControlPrune(topicID: topic)) + result.add(ControlPrune( + topicID: topic, + peers: g.peerExchangeList(topic), + backoff: g.parameters.pruneBackoff.seconds.uint64)) else: trace "peer grafting topic we're not interested in" - result.add(ControlPrune(topicID: topic)) + result.add(ControlPrune( + topicID: topic, + peers: g.peerExchangeList(topic), + backoff: g.parameters.pruneBackoff.seconds.uint64)) when defined(libp2p_expensive_metrics): libp2p_gossipsub_peers_per_topic_mesh @@ -969,7 +990,11 @@ method unsubscribe*(g: GossipSub, g.mesh.del(topic) for peer in peers: g.pruned(peer, topic) - let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) + let prune = RPCMsg(control: some(ControlMessage( + prune: @[ControlPrune( + topicID: topic, + peers: g.peerExchangeList(topic), + backoff: g.parameters.pruneBackoff.seconds.uint64)]))) discard await g.broadcast(toSeq(peers), prune, DefaultSendTimeout) method unsubscribeAll*(g: GossipSub, topic: string) {.async.} = @@ -980,7 +1005,11 @@ method unsubscribeAll*(g: GossipSub, topic: string) {.async.} = g.mesh.del(topic) for peer in peers: g.pruned(peer, topic) - let prune = RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)]))) + let prune = RPCMsg(control: some(ControlMessage( + prune: @[ControlPrune( + topicID: topic, + peers: g.peerExchangeList(topic), + backoff: g.parameters.pruneBackoff.seconds.uint64)]))) discard await g.broadcast(toSeq(peers), prune, DefaultSendTimeout) method publish*(g: GossipSub,