peer exchange on prune

This commit is contained in:
vyzo 2019-11-22 00:09:37 +02:00
parent 6fc8050bac
commit 6c59beedb8

View File

@ -14,7 +14,7 @@ import (
)
const (
GossipSubID = protocol.ID("/meshsub/1.0.0")
GossipSubID_v10 = protocol.ID("/meshsub/1.0.0")
GossipSubID_v11 = protocol.ID("/meshsub/1.1.0")
)
@ -34,6 +34,9 @@ var (
// fanout ttl
GossipSubFanoutTTL = 60 * time.Second
// number of peers to include in prune Peer eXchange
GossipSubPrunePeers = 16
)
// NewGossipSub returns a new PubSub object using GossipSubRouter as the router.
@ -70,7 +73,7 @@ type GossipSubRouter struct {
}
func (gs *GossipSubRouter) Protocols() []protocol.ID {
return []protocol.ID{GossipSubID_v11, GossipSubID, FloodSubID}
return []protocol.ID{GossipSubID_v11, GossipSubID_v10, FloodSubID}
}
func (gs *GossipSubRouter) Attach(p *PubSub) {
@ -227,7 +230,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
cprune := make([]*pb.ControlPrune, 0, len(prune))
for _, topic := range prune {
cprune = append(cprune, &pb.ControlPrune{TopicID: &topic})
cprune = append(cprune, gs.makePrune(p, topic))
}
return cprune
@ -361,7 +364,7 @@ func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) {
}
func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string) {
prune := []*pb.ControlPrune{&pb.ControlPrune{TopicID: &topic}}
prune := []*pb.ControlPrune{gs.makePrune(p, topic)}
out := rpcWithControl(nil, nil, nil, nil, prune)
gs.sendRPC(p, out)
}
@ -545,7 +548,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string)
delete(toprune, p)
prune = make([]*pb.ControlPrune, 0, len(pruning))
for _, topic := range pruning {
prune = append(prune, &pb.ControlPrune{TopicID: &topic})
prune = append(prune, gs.makePrune(p, topic))
}
}
@ -556,7 +559,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string)
for p, topics := range toprune {
prune := make([]*pb.ControlPrune, 0, len(topics))
for _, topic := range topics {
prune = append(prune, &pb.ControlPrune{TopicID: &topic})
prune = append(prune, gs.makePrune(p, topic))
}
out := rpcWithControl(nil, nil, nil, nil, prune)
@ -674,6 +677,37 @@ func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.Control
}
}
func (gs *GossipSubRouter) makePrune(p peer.ID, topic string) *pb.ControlPrune {
if gs.peers[p] == GossipSubID_v10 {
// GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
return &pb.ControlPrune{TopicID: &topic}
}
// select peers for Peer eXchange
peers := gs.getPeers(topic, GossipSubPrunePeers, func(xp peer.ID) bool {
return p != xp
})
px := make([]*pb.PeerInfo, 0, len(peers))
for _, p := range peers {
// see if we have a signed address record to send back; if we don't, just send
// the peer ID and let the pruned peer find them in the DHT -- we can't trust
// unsigned address records through px anyway.
srr := gs.p.host.Peerstore().SignedRoutingState(p)
var saddrs []byte
var err error
if srr != nil {
saddrs, err = srr.Marshal()
if err != nil {
log.Warningf("error marshaling signed routing state for %s: %s", p, err)
}
}
px = append(px, &pb.PeerInfo{PeerID: []byte(p), SignedAddrs: saddrs})
}
return &pb.ControlPrune{TopicID: &topic, Peers: px}
}
func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID) bool) []peer.ID {
tmap, ok := gs.p.topics[topic]
if !ok {
@ -682,7 +716,7 @@ func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID
peers := make([]peer.ID, 0, len(tmap))
for p := range tmap {
if (gs.peers[p] == GossipSubID || gs.peers[p] == GossipSubID_v11) && filter(p) {
if (gs.peers[p] == GossipSubID_v10 || gs.peers[p] == GossipSubID_v11) && filter(p) {
peers = append(peers, p)
}
}