From 2478eb9a87e2dee775ef66f7908266cf7ecdeea6 Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 26 Oct 2018 15:49:34 +0300 Subject: [PATCH] gossipsub: tag mesh peers to discourage pruning their connections --- gossipsub.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/gossipsub.go b/gossipsub.go index ab0a6d9..6f45333 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -2,6 +2,7 @@ package pubsub import ( "context" + "fmt" "math/rand" "time" @@ -179,6 +180,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb. } else { log.Debugf("GRAFT: Add mesh link from %s in %s", p, topic) peers[p] = struct{}{} + gs.tagPeer(p, topic) } } @@ -201,6 +203,7 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) { if ok { log.Debugf("PRUNE: Remove mesh link to %s in %s", p, topic) delete(peers, p) + gs.untagPeer(p, topic) } } } @@ -277,6 +280,7 @@ func (gs *GossipSubRouter) Join(topic string) { for p := range gmap { log.Debugf("JOIN: Add mesh link to %s in %s", p, topic) gs.sendGraft(p, topic) + gs.tagPeer(p, topic) } } @@ -293,6 +297,7 @@ func (gs *GossipSubRouter) Leave(topic string) { for p := range gmap { log.Debugf("LEAVE: Remove mesh link to %s in %s", p, topic) gs.sendPrune(p, topic) + gs.untagPeer(p, topic) } } @@ -399,6 +404,7 @@ func (gs *GossipSubRouter) heartbeat() { for _, p := range plst { log.Debugf("HEARTBEAT: Add mesh link to %s in %s", p, topic) peers[p] = struct{}{} + gs.tagPeer(p, topic) topics := tograft[p] tograft[p] = append(topics, topic) } @@ -413,6 +419,7 @@ func (gs *GossipSubRouter) heartbeat() { for _, p := range plst[:idontneed] { log.Debugf("HEARTBEAT: Remove mesh link to %s in %s", p, topic) delete(peers, p) + gs.untagPeer(p, topic) topics := toprune[p] toprune[p] = append(topics, topic) } @@ -624,6 +631,20 @@ func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID return peers } +func (gs *GossipSubRouter) tagPeer(p peer.ID, topic string) { + tag := topicTag(topic) + gs.p.host.ConnManager().TagPeer(p, tag, 2) +} + +func (gs *GossipSubRouter) untagPeer(p peer.ID, topic string) { + tag := topicTag(topic) + gs.p.host.ConnManager().UntagPeer(p, tag) +} + +func topicTag(topic string) string { + return fmt.Sprintf("pubsub:%s", topic) +} + func peerListToMap(peers []peer.ID) map[peer.ID]struct{} { pmap := make(map[peer.ID]struct{}) for _, p := range peers {