mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-07 15:23:08 +00:00
gossipsub: tag mesh peers to discourage pruning their connections
This commit is contained in:
parent
e1cd22937e
commit
2478eb9a87
21
gossipsub.go
21
gossipsub.go
@ -2,6 +2,7 @@ package pubsub
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -179,6 +180,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
|
|||||||
} else {
|
} else {
|
||||||
log.Debugf("GRAFT: Add mesh link from %s in %s", p, topic)
|
log.Debugf("GRAFT: Add mesh link from %s in %s", p, topic)
|
||||||
peers[p] = struct{}{}
|
peers[p] = struct{}{}
|
||||||
|
gs.tagPeer(p, topic)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -201,6 +203,7 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
|
|||||||
if ok {
|
if ok {
|
||||||
log.Debugf("PRUNE: Remove mesh link to %s in %s", p, topic)
|
log.Debugf("PRUNE: Remove mesh link to %s in %s", p, topic)
|
||||||
delete(peers, p)
|
delete(peers, p)
|
||||||
|
gs.untagPeer(p, topic)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -277,6 +280,7 @@ func (gs *GossipSubRouter) Join(topic string) {
|
|||||||
for p := range gmap {
|
for p := range gmap {
|
||||||
log.Debugf("JOIN: Add mesh link to %s in %s", p, topic)
|
log.Debugf("JOIN: Add mesh link to %s in %s", p, topic)
|
||||||
gs.sendGraft(p, topic)
|
gs.sendGraft(p, topic)
|
||||||
|
gs.tagPeer(p, topic)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -293,6 +297,7 @@ func (gs *GossipSubRouter) Leave(topic string) {
|
|||||||
for p := range gmap {
|
for p := range gmap {
|
||||||
log.Debugf("LEAVE: Remove mesh link to %s in %s", p, topic)
|
log.Debugf("LEAVE: Remove mesh link to %s in %s", p, topic)
|
||||||
gs.sendPrune(p, topic)
|
gs.sendPrune(p, topic)
|
||||||
|
gs.untagPeer(p, topic)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -399,6 +404,7 @@ func (gs *GossipSubRouter) heartbeat() {
|
|||||||
for _, p := range plst {
|
for _, p := range plst {
|
||||||
log.Debugf("HEARTBEAT: Add mesh link to %s in %s", p, topic)
|
log.Debugf("HEARTBEAT: Add mesh link to %s in %s", p, topic)
|
||||||
peers[p] = struct{}{}
|
peers[p] = struct{}{}
|
||||||
|
gs.tagPeer(p, topic)
|
||||||
topics := tograft[p]
|
topics := tograft[p]
|
||||||
tograft[p] = append(topics, topic)
|
tograft[p] = append(topics, topic)
|
||||||
}
|
}
|
||||||
@ -413,6 +419,7 @@ func (gs *GossipSubRouter) heartbeat() {
|
|||||||
for _, p := range plst[:idontneed] {
|
for _, p := range plst[:idontneed] {
|
||||||
log.Debugf("HEARTBEAT: Remove mesh link to %s in %s", p, topic)
|
log.Debugf("HEARTBEAT: Remove mesh link to %s in %s", p, topic)
|
||||||
delete(peers, p)
|
delete(peers, p)
|
||||||
|
gs.untagPeer(p, topic)
|
||||||
topics := toprune[p]
|
topics := toprune[p]
|
||||||
toprune[p] = append(topics, topic)
|
toprune[p] = append(topics, topic)
|
||||||
}
|
}
|
||||||
@ -624,6 +631,20 @@ func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID
|
|||||||
return peers
|
return peers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (gs *GossipSubRouter) tagPeer(p peer.ID, topic string) {
|
||||||
|
tag := topicTag(topic)
|
||||||
|
gs.p.host.ConnManager().TagPeer(p, tag, 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gs *GossipSubRouter) untagPeer(p peer.ID, topic string) {
|
||||||
|
tag := topicTag(topic)
|
||||||
|
gs.p.host.ConnManager().UntagPeer(p, tag)
|
||||||
|
}
|
||||||
|
|
||||||
|
func topicTag(topic string) string {
|
||||||
|
return fmt.Sprintf("pubsub:%s", topic)
|
||||||
|
}
|
||||||
|
|
||||||
func peerListToMap(peers []peer.ID) map[peer.ID]struct{} {
|
func peerListToMap(peers []peer.ID) map[peer.ID]struct{} {
|
||||||
pmap := make(map[peer.ID]struct{})
|
pmap := make(map[peer.ID]struct{})
|
||||||
for _, p := range peers {
|
for _, p := range peers {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user