diff --git a/floodsub.go b/floodsub.go index 53fea34..5490efc 100644 --- a/floodsub.go +++ b/floodsub.go @@ -67,8 +67,8 @@ func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool { return false } -func (fs *FloodSubRouter) AcceptFrom(peer.ID) bool { - return true +func (fs *FloodSubRouter) AcceptFrom(peer.ID) AcceptStatus { + return AcceptAll } func (fs *FloodSubRouter) HandleRPC(rpc *RPC) {} diff --git a/gossipsub.go b/gossipsub.go index 6cf7164..2c11510 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -505,9 +505,18 @@ func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool { return false } -func (gs *GossipSubRouter) AcceptFrom(p peer.ID) bool { +func (gs *GossipSubRouter) AcceptFrom(p peer.ID) AcceptStatus { _, direct := gs.direct[p] - return direct || gs.score.Score(p) >= gs.graylistThreshold + if direct { + return AcceptAll + } + + if gs.score.Score(p) < gs.graylistThreshold { + return AcceptNone + } + + // TODO throttle tracking and reaction + return AcceptAll } func (gs *GossipSubRouter) HandleRPC(rpc *RPC) { diff --git a/pubsub.go b/pubsub.go index 9bb751a..3d13ac9 100644 --- a/pubsub.go +++ b/pubsub.go @@ -168,8 +168,8 @@ type PubSubRouter interface { // AcceptFrom is invoked on any incoming message before pushing it to the validation pipeline // or processing control information. // Allows routers with internal scoring to vet peers before committing any processing resources - // to the message and implement an effective graylist. - AcceptFrom(peer.ID) bool + // to the message and implement an effective graylist and react to validation queue overload. + AcceptFrom(peer.ID) AcceptStatus // HandleRPC is invoked to process control messages in the RPC envelope. // It is invoked after subscriptions and payload messages have been processed. HandleRPC(*RPC) @@ -183,6 +183,17 @@ type PubSubRouter interface { Leave(topic string) } +type AcceptStatus int + +const ( + // AcceptAll signals to accept the incoming RPC for full processing + AcceptNone AcceptStatus = iota + // AcceptControl signals to accept the incoming RPC only for control message processing + AcceptControl + // AcceptNone signals to drop the incoming RPC + AcceptAll +) + type Message struct { *pb.Message ReceivedFrom peer.ID @@ -923,19 +934,26 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { } // ask the router to vet the peer before commiting any processing resources - if !p.rt.AcceptFrom(rpc.from) { - log.Debugf("received message from router graylisted peer %s. Dropping RPC", rpc.from) + switch p.rt.AcceptFrom(rpc.from) { + case AcceptNone: + log.Debugf("received message from router graylisted peer %s; dropping RPC", rpc.from) return - } - for _, pmsg := range rpc.GetPublish() { - if !(p.subscribedToMsg(pmsg) || p.canRelayMsg(pmsg)) { - log.Debug("received message we didn't subscribe to. Dropping.") - continue + case AcceptControl: + if len(rpc.GetPublish()) > 0 { + log.Debugf("ignoring payload in message from peer %s", rpc.from) } - msg := &Message{pmsg, rpc.from, nil} - p.pushMsg(msg) + case AcceptAll: + for _, pmsg := range rpc.GetPublish() { + if !(p.subscribedToMsg(pmsg) || p.canRelayMsg(pmsg)) { + log.Debug("received message we didn't subscribe to; ignoring payload message") + continue + } + + msg := &Message{pmsg, rpc.from, nil} + p.pushMsg(msg) + } } p.rt.HandleRPC(rpc) diff --git a/randomsub.go b/randomsub.go index 7726b9e..b98c9bb 100644 --- a/randomsub.go +++ b/randomsub.go @@ -90,8 +90,8 @@ func (rs *RandomSubRouter) EnoughPeers(topic string, suggested int) bool { return false } -func (rs *RandomSubRouter) AcceptFrom(peer.ID) bool { - return true +func (rs *RandomSubRouter) AcceptFrom(peer.ID) AcceptStatus { + return AcceptAll } func (rs *RandomSubRouter) HandleRPC(rpc *RPC) {} @@ -133,7 +133,7 @@ func (rs *RandomSubRouter) Publish(msg *Message) { } xpeers := peerMapToList(rspeers) shufflePeers(xpeers) - xpeers = xpeers[:RandomSubD] + xpeers = xpeers[:target] for _, p := range xpeers { tosend[p] = struct{}{} }