improve handling of IHAVE floods

This commit is contained in:
vyzo 2020-04-20 15:02:58 +03:00
parent 78bbe13b49
commit 8150843cf3
2 changed files with 41 additions and 30 deletions

View File

@ -87,16 +87,17 @@ var (
// NewGossipSub returns a new PubSub object using GossipSubRouter as the router. // NewGossipSub returns a new PubSub object using GossipSubRouter as the router.
func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) { func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) {
rt := &GossipSubRouter{ rt := &GossipSubRouter{
peers: make(map[peer.ID]protocol.ID), peers: make(map[peer.ID]protocol.ID),
mesh: make(map[string]map[peer.ID]struct{}), mesh: make(map[string]map[peer.ID]struct{}),
fanout: make(map[string]map[peer.ID]struct{}), fanout: make(map[string]map[peer.ID]struct{}),
lastpub: make(map[string]int64), lastpub: make(map[string]int64),
gossip: make(map[peer.ID][]*pb.ControlIHave), gossip: make(map[peer.ID][]*pb.ControlIHave),
control: make(map[peer.ID]*pb.ControlMessage), control: make(map[peer.ID]*pb.ControlMessage),
backoff: make(map[string]map[peer.ID]time.Time), backoff: make(map[string]map[peer.ID]time.Time),
iasked: make(map[peer.ID]int), peerhave: make(map[peer.ID]int),
connect: make(chan connectInfo, GossipSubMaxPendingConnections), iasked: make(map[peer.ID]int),
mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength), connect: make(chan connectInfo, GossipSubMaxPendingConnections),
mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength),
} }
return NewPubSub(ctx, h, rt, opts...) return NewPubSub(ctx, h, rt, opts...)
} }
@ -203,20 +204,21 @@ func WithDirectPeers(pis []peer.AddrInfo) Option {
// is the fanout map. Fanout peer lists are expired if we don't publish any // is the fanout map. Fanout peer lists are expired if we don't publish any
// messages to their topic for GossipSubFanoutTTL. // messages to their topic for GossipSubFanoutTTL.
type GossipSubRouter struct { type GossipSubRouter struct {
p *PubSub p *PubSub
peers map[peer.ID]protocol.ID // peer protocols peers map[peer.ID]protocol.ID // peer protocols
direct map[peer.ID]struct{} // direct peers direct map[peer.ID]struct{} // direct peers
mesh map[string]map[peer.ID]struct{} // topic meshes mesh map[string]map[peer.ID]struct{} // topic meshes
fanout map[string]map[peer.ID]struct{} // topic fanout fanout map[string]map[peer.ID]struct{} // topic fanout
lastpub map[string]int64 // last publish time for fanout topics lastpub map[string]int64 // last publish time for fanout topics
gossip map[peer.ID][]*pb.ControlIHave // pending gossip gossip map[peer.ID][]*pb.ControlIHave // pending gossip
control map[peer.ID]*pb.ControlMessage // pending control messages control map[peer.ID]*pb.ControlMessage // pending control messages
iasked map[peer.ID]int // messages we have asked for in the last heartbeat peerhave map[peer.ID]int // number of IHAVEs received from peer in the last heartbeat
backoff map[string]map[peer.ID]time.Time // prune backoff iasked map[peer.ID]int // number of messages we have asked from peer in the last heartbeat
connect chan connectInfo // px connection requests backoff map[string]map[peer.ID]time.Time // prune backoff
mcache *MessageCache connect chan connectInfo // px connection requests
tracer *pubsubTracer mcache *MessageCache
score *peerScore tracer *pubsubTracer
score *peerScore
// whether PX is enabled; this should be enabled in bootstrappers and other well connected/trusted // whether PX is enabled; this should be enabled in bootstrappers and other well connected/trusted
// nodes. // nodes.
@ -374,6 +376,12 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb.
} }
// IHAVE flood protection // IHAVE flood protection
if gs.peerhave[p] > 2 {
log.Debugf("IHAVE: peer %s has advertised too many times within this heartbeat interval; ignoring", p)
return nil
}
gs.peerhave[p]++
if gs.iasked[p] >= GossipSubMaxIHaveLength { if gs.iasked[p] >= GossipSubMaxIHaveLength {
log.Debugf("IHAVE: peer %s has already advertised too many messages; ignoring", p) log.Debugf("IHAVE: peer %s has already advertised too many messages; ignoring", p)
return nil return nil
@ -892,7 +900,7 @@ func (gs *GossipSubRouter) heartbeat() {
gs.clearBackoff() gs.clearBackoff()
// clean up iasked counters // clean up iasked counters
gs.clearIasked() gs.clearIHaveCounters()
// ensure direct peers are connected // ensure direct peers are connected
gs.directConnect() gs.directConnect()
@ -1066,7 +1074,12 @@ func (gs *GossipSubRouter) heartbeat() {
gs.mcache.Shift() gs.mcache.Shift()
} }
func (gs *GossipSubRouter) clearIasked() { func (gs *GossipSubRouter) clearIHaveCounters() {
if len(gs.peerhave) > 0 {
// throw away the old map and make a new one
gs.peerhave = make(map[peer.ID]int)
}
if len(gs.iasked) > 0 { if len(gs.iasked) > 0 {
// throw away the old map and make a new one // throw away the old map and make a new one
gs.iasked = make(map[peer.ID]int) gs.iasked = make(map[peer.ID]int)

View File

@ -215,13 +215,11 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) {
// Should have sent more IWANTs after the heartbeat // Should have sent more IWANTs after the heartbeat
iwc = getIWantCount() iwc = getIWantCount()
if iwc <= GossipSubMaxIHaveLength { if iwc == firstBatchCount {
t.Fatal("Expecting to receive more IWANTs after heartbeat but did not") t.Fatal("Expecting to receive more IWANTs after heartbeat but did not")
} }
// Should not be more than the maximum per heartbeat // Should not be more than the maximum per heartbeat
// note that we multiply by 2 because things may come in the middle of the heartbeat which if iwc-firstBatchCount > GossipSubMaxIHaveLength {
// results in a reset of the heartbeat counter (has been observed in travis)
if iwc-firstBatchCount > 2*GossipSubMaxIHaveLength {
t.Fatalf("Expecting max %d IWANTs per heartbeat but received %d", GossipSubMaxIHaveLength, iwc-firstBatchCount) t.Fatalf("Expecting max %d IWANTs per heartbeat but received %d", GossipSubMaxIHaveLength, iwc-firstBatchCount)
} }
}() }()