diff --git a/gossipsub.go b/gossipsub.go index 3875392..fcf278d 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -72,7 +72,7 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er } // WithPeerScore is a gossipsub router option that enables peer scoring. -func WithPeerScore(params *PeerScoreParams) Option { +func WithPeerScore(params *PeerScoreParams, gossipThreshold float64) Option { return func(ps *PubSub) error { gs, ok := ps.rt.(*GossipSubRouter) if !ok { @@ -80,6 +80,7 @@ func WithPeerScore(params *PeerScoreParams) Option { } gs.score = newPeerScore(gs, params) + gs.gossipThreshold = gossipThreshold // hook the tracer if ps.tracer != nil { @@ -112,6 +113,11 @@ type GossipSubRouter struct { mcache *MessageCache tracer *pubsubTracer score *peerScore + + // threshold for peer score to emit/accept gossip + // If the peer score is below this threshold, we won't emit or accept gossip from the peer. + // When there is no score, this value is 0. + gossipThreshold float64 } type connectInfo struct { @@ -203,8 +209,14 @@ func (gs *GossipSubRouter) HandleRPC(rpc *RPC) { } func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb.ControlIWant { - iwant := make(map[string]struct{}) + // we ignore IHAVE gossip from any peer whose score is below the gossip threshold + score := gs.score.Score(p) + if score < gs.gossipThreshold { + log.Debugf("IHAVE: ignoring peer %s with score below threshold [score = %f]", p, score) + return nil + } + iwant := make(map[string]struct{}) for _, ihave := range ctl.GetIhave() { topic := ihave.GetTopicID() _, ok := gs.mesh[topic] @@ -235,6 +247,15 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb. } func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.Message { + // we don't respond to IWANT requests from any peer whose score is below the gossip threshold + score := gs.score.Score(p) + if score < gs.gossipThreshold { + log.Debugf("IWANT: ignorin peer %s with score below threshold [score = %f]", p, score) + return nil + } + + // TODO: [spam hardening] only send back the same message to the same peer a limited number of times + ihave := make(map[string]*pb.Message) for _, iwant := range ctl.GetIwant() { for _, mid := range iwant.GetMessageIDs() { @@ -261,17 +282,36 @@ func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb. func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.ControlPrune { var prune []string + + doPX := true + score := gs.score.Score(p) + for _, graft := range ctl.GetGraft() { topic := graft.GetTopicID() peers, ok := gs.mesh[topic] if !ok { - prune = append(prune, topic) - } else { - log.Debugf("GRAFT: Add mesh link from %s in %s", p, topic) - gs.tracer.Graft(p, topic) - peers[p] = struct{}{} - gs.tagPeer(p, topic) + // don't do PX when there is an unknown topic to avoid leaking our peers + doPX = false + // spam harndening: ignore GRAFTs for unknown topics + continue } + + // check the score + if score < 0 { + // we don't GRAFT peers with negative score + log.Debugf("GRAFT: ignoring peer %s with negative score [score = %f, topic = %s]", p, score, topic) + // we do send them PRUNE however, because it's a matter of protocol correctness + prune = append(prune, topic) + // but we won't PX to them + doPX = false + continue + } + + log.Debugf("GRAFT: Add mesh link from %s in %s", p, topic) + gs.tracer.Graft(p, topic) + peers[p] = struct{}{} + gs.tagPeer(p, topic) + } if len(prune) == 0 { @@ -280,26 +320,37 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb. cprune := make([]*pb.ControlPrune, 0, len(prune)) for _, topic := range prune { - cprune = append(cprune, gs.makePrune(p, topic)) + cprune = append(cprune, gs.makePrune(p, topic, doPX)) } return cprune } func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) { + score := gs.score.Score(p) + for _, prune := range ctl.GetPrune() { topic := prune.GetTopicID() peers, ok := gs.mesh[topic] - if ok { - log.Debugf("PRUNE: Remove mesh link to %s in %s", p, topic) - gs.tracer.Prune(p, topic) - delete(peers, p) - gs.untagPeer(p, topic) - gs.addBackoff(p, topic) - px := prune.GetPeers() - if len(px) > 0 { - gs.pxConnect(px) + if !ok { + continue + } + + log.Debugf("PRUNE: Remove mesh link to %s in %s", p, topic) + gs.tracer.Prune(p, topic) + delete(peers, p) + gs.untagPeer(p, topic) + gs.addBackoff(p, topic) + + px := prune.GetPeers() + if len(px) > 0 { + // we ignore PX from peers with negative score + if score < 0 { + log.Debugf("PRUNE: ignoring PX from peer %s with negative score [score = %f, topic = %s]", p, score, topic) + continue } + + gs.pxConnect(px) } } } @@ -511,7 +562,7 @@ func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) { } func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string) { - prune := []*pb.ControlPrune{gs.makePrune(p, topic)} + prune := []*pb.ControlPrune{gs.makePrune(p, topic, true)} out := rpcWithControl(nil, nil, nil, nil, prune) gs.sendRPC(p, out) } @@ -593,6 +644,7 @@ func (gs *GossipSubRouter) heartbeat() { tograft := make(map[peer.ID][]string) toprune := make(map[peer.ID][]string) + doPX := true // clean up expired backoffs gs.clearBackoff() @@ -681,7 +733,7 @@ func (gs *GossipSubRouter) heartbeat() { } // send coalesced GRAFT/PRUNE messages (will piggyback gossip) - gs.sendGraftPrune(tograft, toprune) + gs.sendGraftPrune(tograft, toprune, doPX) // advance the message history window gs.mcache.Shift() @@ -701,7 +753,7 @@ func (gs *GossipSubRouter) clearBackoff() { } } -func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string) { +func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string, doPX bool) { for p, topics := range tograft { graft := make([]*pb.ControlGraft, 0, len(topics)) for _, topic := range topics { @@ -714,7 +766,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string) delete(toprune, p) prune = make([]*pb.ControlPrune, 0, len(pruning)) for _, topic := range pruning { - prune = append(prune, gs.makePrune(p, topic)) + prune = append(prune, gs.makePrune(p, topic, doPX)) } } @@ -725,7 +777,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string) for p, topics := range toprune { prune := make([]*pb.ControlPrune, 0, len(topics)) for _, topic := range topics { - prune = append(prune, gs.makePrune(p, topic)) + prune = append(prune, gs.makePrune(p, topic, doPX)) } out := rpcWithControl(nil, nil, nil, nil, prune) @@ -843,35 +895,38 @@ func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.Control } } -func (gs *GossipSubRouter) makePrune(p peer.ID, topic string) *pb.ControlPrune { +func (gs *GossipSubRouter) makePrune(p peer.ID, topic string, doPX bool) *pb.ControlPrune { if gs.peers[p] == GossipSubID_v10 { // GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway return &pb.ControlPrune{TopicID: &topic} } - // select peers for Peer eXchange - peers := gs.getPeers(topic, GossipSubPrunePeers, func(xp peer.ID) bool { - return p != xp - }) + var px []*pb.PeerInfo + if doPX { + // select peers for Peer eXchange + peers := gs.getPeers(topic, GossipSubPrunePeers, func(xp peer.ID) bool { + return p != xp + }) - cab, ok := peerstore.GetCertifiedAddrBook(gs.p.host.Peerstore()) - px := make([]*pb.PeerInfo, 0, len(peers)) - for _, p := range peers { - // see if we have a signed peer record to send back; if we don't, just send - // the peer ID and let the pruned peer find them in the DHT -- we can't trust - // unsigned address records through px anyway. - var recordBytes []byte - if ok { - spr := cab.GetPeerRecord(p) - var err error - if spr != nil { - recordBytes, err = spr.Marshal() - if err != nil { - log.Warnf("error marshaling signed peer record for %s: %s", p, err) + cab, ok := peerstore.GetCertifiedAddrBook(gs.p.host.Peerstore()) + px = make([]*pb.PeerInfo, 0, len(peers)) + for _, p := range peers { + // see if we have a signed peer record to send back; if we don't, just send + // the peer ID and let the pruned peer find them in the DHT -- we can't trust + // unsigned address records through px anyway. + var recordBytes []byte + if ok { + spr := cab.GetPeerRecord(p) + var err error + if spr != nil { + recordBytes, err = spr.Marshal() + if err != nil { + log.Warningf("error marshaling signed peer record for %s: %s", p, err) + } } } + px = append(px, &pb.PeerInfo{PeerID: []byte(p), SignedPeerRecord: recordBytes}) } - px = append(px, &pb.PeerInfo{PeerID: []byte(p), SignedPeerRecord: recordBytes}) } return &pb.ControlPrune{TopicID: &topic, Peers: px} diff --git a/score.go b/score.go index 36839ee..7023436 100644 --- a/score.go +++ b/score.go @@ -19,7 +19,7 @@ func newPeerScore(gs *GossipSubRouter, params *PeerScoreParams) *peerScore { } // router interface -func (ps *peerScore) Score(p peer.ID, topic string) float64 { +func (ps *peerScore) Score(p peer.ID) float64 { return 0 }