diff --git a/floodsub.go b/floodsub.go index 4fe7669..66fcd73 100644 --- a/floodsub.go +++ b/floodsub.go @@ -67,6 +67,10 @@ func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool { return false } +func (fs *FloodSubRouter) AcceptFrom(peer.ID) bool { + return true +} + func (fs *FloodSubRouter) HandleRPC(rpc *RPC) {} func (fs *FloodSubRouter) Publish(msg *Message) { diff --git a/gossipsub.go b/gossipsub.go index f67f181..6122be2 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -74,10 +74,18 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er } // WithPeerScore is a gossipsub router option that enables peer scoring. +// // gossipThreshold is the score threshold below which gossip propagation is supressed. +// // publishThreshold is the score threshold below which we shouldn't publish when using flood -// publishing (also applies to fanout and floodsub peers). -func WithPeerScore(params *PeerScoreParams, gossipThreshold float64, publishThreshold float64) Option { +// publishing (also applies to fanout and floodsub peers). +// +// graylistThreshold is the score threshold below which message processing is supressed altogether, +// implementing an effective graylist according to peer score. +// +// These thresholds should generally be negative, allowing some information to disseminate from low +// scoring peers. +func WithPeerScore(params *PeerScoreParams, gossipThreshold, publishThreshold, graylistThreshold float64) Option { return func(ps *PubSub) error { gs, ok := ps.rt.(*GossipSubRouter) if !ok { @@ -87,6 +95,7 @@ func WithPeerScore(params *PeerScoreParams, gossipThreshold float64, publishThre gs.score = newPeerScore(gs, params) gs.gossipThreshold = gossipThreshold gs.publishThreshold = publishThreshold + gs.graylistThreshold = graylistThreshold // hook the tracer if ps.tracer != nil { @@ -145,6 +154,9 @@ type GossipSubRouter struct { // when using flood publishing or the peer is a fanout or floodsub peer. publishThreshold float64 + // threshold for peer score before we graylist the peer and silently ignore its RPCs + graylistThreshold float64 + // whether to use flood publishing floodPublish bool } @@ -218,6 +230,10 @@ func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool { return false } +func (gs *GossipSubRouter) AcceptFrom(p peer.ID) bool { + return gs.score.Score(p) >= gs.graylistThreshold +} + func (gs *GossipSubRouter) HandleRPC(rpc *RPC) { ctl := rpc.GetControl() if ctl == nil { diff --git a/pubsub.go b/pubsub.go index 4977e6d..dabcdbc 100644 --- a/pubsub.go +++ b/pubsub.go @@ -151,6 +151,11 @@ type PubSubRouter interface { // EnoughPeers returns whether the router needs more peers before it's ready to publish new records. // Suggested (if greater than 0) is a suggested number of peers that the router should need. EnoughPeers(topic string, suggested int) bool + // AcceptFrom is invoked on any incoming message before pushing it to the validation pipeline + // or processing control information. + // Allows routers with internal scoring to vet peers before commiting any processing resources + // to the message and implement an affective graylist. + AcceptFrom(peer.ID) bool // HandleRPC is invoked to process control messages in the RPC envelope. // It is invoked after subscriptions and payload messages have been processed. HandleRPC(*RPC) @@ -779,6 +784,12 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { } } + // ask the router to vet the peer before commiting any processing resources + if !p.rt.AcceptFrom(rpc.from) { + log.Warningf("received message from router graylisted peer %s. Dropping RPC", rpc.from) + return + } + for _, pmsg := range rpc.GetPublish() { if !p.subscribedToMsg(pmsg) { log.Warning("received message we didn't subscribe to. Dropping.") diff --git a/randomsub.go b/randomsub.go index 42bbd5c..9273b1a 100644 --- a/randomsub.go +++ b/randomsub.go @@ -86,6 +86,10 @@ func (rs *RandomSubRouter) EnoughPeers(topic string, suggested int) bool { return false } +func (rs *RandomSubRouter) AcceptFrom(peer.ID) bool { + return true +} + func (rs *RandomSubRouter) HandleRPC(rpc *RPC) {} func (rs *RandomSubRouter) Publish(msg *Message) {