hook peer score into control message handlers

This commit is contained in:
vyzo 2020-03-02 15:05:24 +02:00
parent 7d928697a2
commit c13e9c07e4
2 changed files with 99 additions and 44 deletions

View File

@ -72,7 +72,7 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er
}
// WithPeerScore is a gossipsub router option that enables peer scoring.
func WithPeerScore(params *PeerScoreParams) Option {
func WithPeerScore(params *PeerScoreParams, gossipThreshold float64) Option {
return func(ps *PubSub) error {
gs, ok := ps.rt.(*GossipSubRouter)
if !ok {
@ -80,6 +80,7 @@ func WithPeerScore(params *PeerScoreParams) Option {
}
gs.score = newPeerScore(gs, params)
gs.gossipThreshold = gossipThreshold
// hook the tracer
if ps.tracer != nil {
@ -112,6 +113,11 @@ type GossipSubRouter struct {
mcache *MessageCache
tracer *pubsubTracer
score *peerScore
// threshold for peer score to emit/accept gossip
// If the peer score is below this threshold, we won't emit or accept gossip from the peer.
// When there is no score, this value is 0.
gossipThreshold float64
}
type connectInfo struct {
@ -203,8 +209,14 @@ func (gs *GossipSubRouter) HandleRPC(rpc *RPC) {
}
func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb.ControlIWant {
iwant := make(map[string]struct{})
// we ignore IHAVE gossip from any peer whose score is below the gossip threshold
score := gs.score.Score(p)
if score < gs.gossipThreshold {
log.Debugf("IHAVE: ignoring peer %s with score below threshold [score = %f]", p, score)
return nil
}
iwant := make(map[string]struct{})
for _, ihave := range ctl.GetIhave() {
topic := ihave.GetTopicID()
_, ok := gs.mesh[topic]
@ -235,6 +247,15 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb.
}
func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.Message {
// we don't respond to IWANT requests from any peer whose score is below the gossip threshold
score := gs.score.Score(p)
if score < gs.gossipThreshold {
log.Debugf("IWANT: ignorin peer %s with score below threshold [score = %f]", p, score)
return nil
}
// TODO: [spam hardening] only send back the same message to the same peer a limited number of times
ihave := make(map[string]*pb.Message)
for _, iwant := range ctl.GetIwant() {
for _, mid := range iwant.GetMessageIDs() {
@ -261,17 +282,36 @@ func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.
func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.ControlPrune {
var prune []string
doPX := true
score := gs.score.Score(p)
for _, graft := range ctl.GetGraft() {
topic := graft.GetTopicID()
peers, ok := gs.mesh[topic]
if !ok {
prune = append(prune, topic)
} else {
log.Debugf("GRAFT: Add mesh link from %s in %s", p, topic)
gs.tracer.Graft(p, topic)
peers[p] = struct{}{}
gs.tagPeer(p, topic)
// don't do PX when there is an unknown topic to avoid leaking our peers
doPX = false
// spam harndening: ignore GRAFTs for unknown topics
continue
}
// check the score
if score < 0 {
// we don't GRAFT peers with negative score
log.Debugf("GRAFT: ignoring peer %s with negative score [score = %f, topic = %s]", p, score, topic)
// we do send them PRUNE however, because it's a matter of protocol correctness
prune = append(prune, topic)
// but we won't PX to them
doPX = false
continue
}
log.Debugf("GRAFT: Add mesh link from %s in %s", p, topic)
gs.tracer.Graft(p, topic)
peers[p] = struct{}{}
gs.tagPeer(p, topic)
}
if len(prune) == 0 {
@ -280,26 +320,37 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
cprune := make([]*pb.ControlPrune, 0, len(prune))
for _, topic := range prune {
cprune = append(cprune, gs.makePrune(p, topic))
cprune = append(cprune, gs.makePrune(p, topic, doPX))
}
return cprune
}
func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
score := gs.score.Score(p)
for _, prune := range ctl.GetPrune() {
topic := prune.GetTopicID()
peers, ok := gs.mesh[topic]
if ok {
log.Debugf("PRUNE: Remove mesh link to %s in %s", p, topic)
gs.tracer.Prune(p, topic)
delete(peers, p)
gs.untagPeer(p, topic)
gs.addBackoff(p, topic)
px := prune.GetPeers()
if len(px) > 0 {
gs.pxConnect(px)
if !ok {
continue
}
log.Debugf("PRUNE: Remove mesh link to %s in %s", p, topic)
gs.tracer.Prune(p, topic)
delete(peers, p)
gs.untagPeer(p, topic)
gs.addBackoff(p, topic)
px := prune.GetPeers()
if len(px) > 0 {
// we ignore PX from peers with negative score
if score < 0 {
log.Debugf("PRUNE: ignoring PX from peer %s with negative score [score = %f, topic = %s]", p, score, topic)
continue
}
gs.pxConnect(px)
}
}
}
@ -511,7 +562,7 @@ func (gs *GossipSubRouter) sendGraft(p peer.ID, topic string) {
}
func (gs *GossipSubRouter) sendPrune(p peer.ID, topic string) {
prune := []*pb.ControlPrune{gs.makePrune(p, topic)}
prune := []*pb.ControlPrune{gs.makePrune(p, topic, true)}
out := rpcWithControl(nil, nil, nil, nil, prune)
gs.sendRPC(p, out)
}
@ -593,6 +644,7 @@ func (gs *GossipSubRouter) heartbeat() {
tograft := make(map[peer.ID][]string)
toprune := make(map[peer.ID][]string)
doPX := true
// clean up expired backoffs
gs.clearBackoff()
@ -681,7 +733,7 @@ func (gs *GossipSubRouter) heartbeat() {
}
// send coalesced GRAFT/PRUNE messages (will piggyback gossip)
gs.sendGraftPrune(tograft, toprune)
gs.sendGraftPrune(tograft, toprune, doPX)
// advance the message history window
gs.mcache.Shift()
@ -701,7 +753,7 @@ func (gs *GossipSubRouter) clearBackoff() {
}
}
func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string) {
func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string, doPX bool) {
for p, topics := range tograft {
graft := make([]*pb.ControlGraft, 0, len(topics))
for _, topic := range topics {
@ -714,7 +766,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string)
delete(toprune, p)
prune = make([]*pb.ControlPrune, 0, len(pruning))
for _, topic := range pruning {
prune = append(prune, gs.makePrune(p, topic))
prune = append(prune, gs.makePrune(p, topic, doPX))
}
}
@ -725,7 +777,7 @@ func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string)
for p, topics := range toprune {
prune := make([]*pb.ControlPrune, 0, len(topics))
for _, topic := range topics {
prune = append(prune, gs.makePrune(p, topic))
prune = append(prune, gs.makePrune(p, topic, doPX))
}
out := rpcWithControl(nil, nil, nil, nil, prune)
@ -843,35 +895,38 @@ func (gs *GossipSubRouter) piggybackControl(p peer.ID, out *RPC, ctl *pb.Control
}
}
func (gs *GossipSubRouter) makePrune(p peer.ID, topic string) *pb.ControlPrune {
func (gs *GossipSubRouter) makePrune(p peer.ID, topic string, doPX bool) *pb.ControlPrune {
if gs.peers[p] == GossipSubID_v10 {
// GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
return &pb.ControlPrune{TopicID: &topic}
}
// select peers for Peer eXchange
peers := gs.getPeers(topic, GossipSubPrunePeers, func(xp peer.ID) bool {
return p != xp
})
var px []*pb.PeerInfo
if doPX {
// select peers for Peer eXchange
peers := gs.getPeers(topic, GossipSubPrunePeers, func(xp peer.ID) bool {
return p != xp
})
cab, ok := peerstore.GetCertifiedAddrBook(gs.p.host.Peerstore())
px := make([]*pb.PeerInfo, 0, len(peers))
for _, p := range peers {
// see if we have a signed peer record to send back; if we don't, just send
// the peer ID and let the pruned peer find them in the DHT -- we can't trust
// unsigned address records through px anyway.
var recordBytes []byte
if ok {
spr := cab.GetPeerRecord(p)
var err error
if spr != nil {
recordBytes, err = spr.Marshal()
if err != nil {
log.Warnf("error marshaling signed peer record for %s: %s", p, err)
cab, ok := peerstore.GetCertifiedAddrBook(gs.p.host.Peerstore())
px = make([]*pb.PeerInfo, 0, len(peers))
for _, p := range peers {
// see if we have a signed peer record to send back; if we don't, just send
// the peer ID and let the pruned peer find them in the DHT -- we can't trust
// unsigned address records through px anyway.
var recordBytes []byte
if ok {
spr := cab.GetPeerRecord(p)
var err error
if spr != nil {
recordBytes, err = spr.Marshal()
if err != nil {
log.Warningf("error marshaling signed peer record for %s: %s", p, err)
}
}
}
px = append(px, &pb.PeerInfo{PeerID: []byte(p), SignedPeerRecord: recordBytes})
}
px = append(px, &pb.PeerInfo{PeerID: []byte(p), SignedPeerRecord: recordBytes})
}
return &pb.ControlPrune{TopicID: &topic, Peers: px}

View File

@ -19,7 +19,7 @@ func newPeerScore(gs *GossipSubRouter, params *PeerScoreParams) *peerScore {
}
// router interface
func (ps *peerScore) Score(p peer.ID, topic string) float64 {
func (ps *peerScore) Score(p peer.ID) float64 {
return 0
}