rich router acceptance semantics
Allows us to ignore payload messages when the validation queue is under strain
This commit is contained in:
parent
06a12f17b7
commit
3b92bdc1e9
|
@ -67,8 +67,8 @@ func (fs *FloodSubRouter) EnoughPeers(topic string, suggested int) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (fs *FloodSubRouter) AcceptFrom(peer.ID) bool {
|
||||
return true
|
||||
func (fs *FloodSubRouter) AcceptFrom(peer.ID) AcceptStatus {
|
||||
return AcceptAll
|
||||
}
|
||||
|
||||
func (fs *FloodSubRouter) HandleRPC(rpc *RPC) {}
|
||||
|
|
13
gossipsub.go
13
gossipsub.go
|
@ -505,9 +505,18 @@ func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) AcceptFrom(p peer.ID) bool {
|
||||
func (gs *GossipSubRouter) AcceptFrom(p peer.ID) AcceptStatus {
|
||||
_, direct := gs.direct[p]
|
||||
return direct || gs.score.Score(p) >= gs.graylistThreshold
|
||||
if direct {
|
||||
return AcceptAll
|
||||
}
|
||||
|
||||
if gs.score.Score(p) < gs.graylistThreshold {
|
||||
return AcceptNone
|
||||
}
|
||||
|
||||
// TODO throttle tracking and reaction
|
||||
return AcceptAll
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) HandleRPC(rpc *RPC) {
|
||||
|
|
40
pubsub.go
40
pubsub.go
|
@ -168,8 +168,8 @@ type PubSubRouter interface {
|
|||
// AcceptFrom is invoked on any incoming message before pushing it to the validation pipeline
|
||||
// or processing control information.
|
||||
// Allows routers with internal scoring to vet peers before committing any processing resources
|
||||
// to the message and implement an effective graylist.
|
||||
AcceptFrom(peer.ID) bool
|
||||
// to the message and implement an effective graylist and react to validation queue overload.
|
||||
AcceptFrom(peer.ID) AcceptStatus
|
||||
// HandleRPC is invoked to process control messages in the RPC envelope.
|
||||
// It is invoked after subscriptions and payload messages have been processed.
|
||||
HandleRPC(*RPC)
|
||||
|
@ -183,6 +183,17 @@ type PubSubRouter interface {
|
|||
Leave(topic string)
|
||||
}
|
||||
|
||||
type AcceptStatus int
|
||||
|
||||
const (
|
||||
// AcceptAll signals to accept the incoming RPC for full processing
|
||||
AcceptNone AcceptStatus = iota
|
||||
// AcceptControl signals to accept the incoming RPC only for control message processing
|
||||
AcceptControl
|
||||
// AcceptNone signals to drop the incoming RPC
|
||||
AcceptAll
|
||||
)
|
||||
|
||||
type Message struct {
|
||||
*pb.Message
|
||||
ReceivedFrom peer.ID
|
||||
|
@ -923,19 +934,26 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) {
|
|||
}
|
||||
|
||||
// ask the router to vet the peer before commiting any processing resources
|
||||
if !p.rt.AcceptFrom(rpc.from) {
|
||||
log.Debugf("received message from router graylisted peer %s. Dropping RPC", rpc.from)
|
||||
switch p.rt.AcceptFrom(rpc.from) {
|
||||
case AcceptNone:
|
||||
log.Debugf("received message from router graylisted peer %s; dropping RPC", rpc.from)
|
||||
return
|
||||
}
|
||||
|
||||
for _, pmsg := range rpc.GetPublish() {
|
||||
if !(p.subscribedToMsg(pmsg) || p.canRelayMsg(pmsg)) {
|
||||
log.Debug("received message we didn't subscribe to. Dropping.")
|
||||
continue
|
||||
case AcceptControl:
|
||||
if len(rpc.GetPublish()) > 0 {
|
||||
log.Debugf("ignoring payload in message from peer %s", rpc.from)
|
||||
}
|
||||
|
||||
msg := &Message{pmsg, rpc.from, nil}
|
||||
p.pushMsg(msg)
|
||||
case AcceptAll:
|
||||
for _, pmsg := range rpc.GetPublish() {
|
||||
if !(p.subscribedToMsg(pmsg) || p.canRelayMsg(pmsg)) {
|
||||
log.Debug("received message we didn't subscribe to; ignoring payload message")
|
||||
continue
|
||||
}
|
||||
|
||||
msg := &Message{pmsg, rpc.from, nil}
|
||||
p.pushMsg(msg)
|
||||
}
|
||||
}
|
||||
|
||||
p.rt.HandleRPC(rpc)
|
||||
|
|
|
@ -90,8 +90,8 @@ func (rs *RandomSubRouter) EnoughPeers(topic string, suggested int) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (rs *RandomSubRouter) AcceptFrom(peer.ID) bool {
|
||||
return true
|
||||
func (rs *RandomSubRouter) AcceptFrom(peer.ID) AcceptStatus {
|
||||
return AcceptAll
|
||||
}
|
||||
|
||||
func (rs *RandomSubRouter) HandleRPC(rpc *RPC) {}
|
||||
|
@ -133,7 +133,7 @@ func (rs *RandomSubRouter) Publish(msg *Message) {
|
|||
}
|
||||
xpeers := peerMapToList(rspeers)
|
||||
shufflePeers(xpeers)
|
||||
xpeers = xpeers[:RandomSubD]
|
||||
xpeers = xpeers[:target]
|
||||
for _, p := range xpeers {
|
||||
tosend[p] = struct{}{}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue