diff --git a/gossipsub.go b/gossipsub.go index 2c11510..cf5f9b1 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -347,6 +347,7 @@ type GossipSubRouter struct { score *peerScore gossipTracer *gossipTracer tagTracer *tagTracer + gate *peerGater // whether PX is enabled; this should be enabled in bootstrappers and other well connected/trusted // nodes. @@ -515,8 +516,7 @@ func (gs *GossipSubRouter) AcceptFrom(p peer.ID) AcceptStatus { return AcceptNone } - // TODO throttle tracking and reaction - return AcceptAll + return gs.gate.AcceptFrom(p) } func (gs *GossipSubRouter) HandleRPC(rpc *RPC) { diff --git a/peer_gater.go b/peer_gater.go new file mode 100644 index 0000000..3bb6208 --- /dev/null +++ b/peer_gater.go @@ -0,0 +1,60 @@ +package pubsub + +import ( + "fmt" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" +) + +type peerGater struct { +} + +// WithPeerGater is a gossipsub router option that enables reactive validation queue +// management. +func WithPeerGater() Option { + return func(ps *PubSub) error { + gs, ok := ps.rt.(*GossipSubRouter) + if !ok { + return fmt.Errorf("pubsub router is not gossipsub") + } + + gs.gate = newPeerGater() + + // hook the tracer + if ps.tracer != nil { + ps.tracer.internal = append(ps.tracer.internal, gs.gate) + } else { + ps.tracer = &pubsubTracer{ + internal: []internalTracer{gs.gate}, + pid: ps.host.ID(), + msgID: ps.msgID, + } + } + + return nil + } +} + +func newPeerGater() *peerGater { + return &peerGater{} +} + +func (pg *peerGater) AcceptFrom(p peer.ID) AcceptStatus { + if pg == nil { + return AcceptAll + } + + return AcceptAll +} + +func (pg *peerGater) AddPeer(p peer.ID, proto protocol.ID) {} +func (pg *peerGater) RemovePeer(p peer.ID) {} +func (pg *peerGater) Join(topic string) {} +func (pg *peerGater) Leave(topic string) {} +func (pg *peerGater) Graft(p peer.ID, topic string) {} +func (pg *peerGater) Prune(p peer.ID, topic string) {} +func (pg *peerGater) ValidateMessage(msg *Message) {} +func (pg *peerGater) DeliverMessage(msg *Message) {} +func (pg *peerGater) RejectMessage(msg *Message, reason string) {} +func (pg *peerGater) DuplicateMessage(msg *Message) {}