diff --git a/gossipsub.go b/gossipsub.go index 2decff3..f837401 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -14,7 +14,7 @@ import ( ) const ( - GossipSubID = protocol.ID("/meshsub/1.0.0") + GossipSubID_v10 = protocol.ID("/meshsub/1.0.0") GossipSubID_v11 = protocol.ID("/meshsub/1.1.0") ) @@ -34,6 +34,9 @@ var ( // fanout ttl GossipSubFanoutTTL = 60 * time.Second + + // number of peers to include in prune Peer eXchange + GossipSubPrunePeers = 16 ) // NewGossipSub returns a new PubSub object using GossipSubRouter as the router. @@ -70,7 +73,7 @@ type GossipSubRouter struct { } func (gs *GossipSubRouter) Protocols() []protocol.ID { - return []protocol.ID{GossipSubID_v11, GossipSubID, FloodSubID} + return []protocol.ID{GossipSubID_v11, GossipSubID_v10, FloodSubID} } func (gs *GossipSubRouter) Attach(p *PubSub) { @@ -227,7 +230,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb. cprune := make([]*pb.ControlPrune, 0, len(prune)) for _, topic := range prune { - cprune = append(cprune, &pb.ControlPrune{TopicID: &topic}) + cprune = append(cprune, gs.makePrune(p, topic)) } return cprune @@ -361,7 +364,7 @@ func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) { } func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string) { - prune := []*pb.ControlPrune{&pb.ControlPrune{TopicID: &topic}} + prune := []*pb.ControlPrune{gs.makePrune(p, topic)} out := rpcWithControl(nil, nil, nil, nil, prune) gs.sendRPC(p, out) } @@ -545,7 +548,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string) delete(toprune, p) prune = make([]*pb.ControlPrune, 0, len(pruning)) for _, topic := range pruning { - prune = append(prune, &pb.ControlPrune{TopicID: &topic}) + prune = append(prune, gs.makePrune(p, topic)) } } @@ -556,7 +559,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string) for p, topics := range toprune { prune := make([]*pb.ControlPrune, 0, len(topics)) for _, topic := range topics { - prune = append(prune, &pb.ControlPrune{TopicID: &topic}) + prune = append(prune, gs.makePrune(p, topic)) } out := rpcWithControl(nil, nil, nil, nil, prune) @@ -674,6 +677,37 @@ func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.Control } } +func (gs *GossipSubRouter) makePrune(p peer.ID, topic string) *pb.ControlPrune { + if gs.peers[p] == GossipSubID_v10 { + // GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway + return &pb.ControlPrune{TopicID: &topic} + } + + // select peers for Peer eXchange + peers := gs.getPeers(topic, GossipSubPrunePeers, func(xp peer.ID) bool { + return p != xp + }) + + px := make([]*pb.PeerInfo, 0, len(peers)) + for _, p := range peers { + // see if we have a signed address record to send back; if we don't, just send + // the peer ID and let the pruned peer find them in the DHT -- we can't trust + // unsigned address records through px anyway. + srr := gs.p.host.Peerstore().SignedRoutingState(p) + var saddrs []byte + var err error + if srr != nil { + saddrs, err = srr.Marshal() + if err != nil { + log.Warningf("error marshaling signed routing state for %s: %s", p, err) + } + } + px = append(px, &pb.PeerInfo{PeerID: []byte(p), SignedAddrs: saddrs}) + } + + return &pb.ControlPrune{TopicID: &topic, Peers: px} +} + func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID) bool) []peer.ID { tmap, ok := gs.p.topics[topic] if !ok { @@ -682,7 +716,7 @@ func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID peers := make([]peer.ID, 0, len(tmap)) for p := range tmap { - if (gs.peers[p] == GossipSubID || gs.peers[p] == GossipSubID_v11) && filter(p) { + if (gs.peers[p] == GossipSubID_v10 || gs.peers[p] == GossipSubID_v11) && filter(p) { peers = append(peers, p) } }