add protections from IHAVE floods

This commit is contained in:
vyzo 2020-04-20 12:15:59 +03:00
parent 1645516276
commit 34f2295c19
1 changed files with 44 additions and 1 deletions

View File

@ -75,6 +75,13 @@ var (
// then there is no PRUNE response emitted. This protects against GRAFT floods and should be // then there is no PRUNE response emitted. This protects against GRAFT floods and should be
// less than GossipSubPruneBackoff. // less than GossipSubPruneBackoff.
GossipSubGraftFloodThreshold = 10 * time.Second GossipSubGraftFloodThreshold = 10 * time.Second
// Maximum number of messages to include in an IHAVE message. Also controls the maximum
// number of IHAVE ids we will accept and request with IWANT from a peer within a heartbeat,
// to protect from IHAVE floods. You should adjust this value from the default if your
// system is pushing more than 5000 messages in GossipSubHistoryGossip heartbeats; with the
// defaults this is 1666 messages/s.
GossipSubMaxIHaveLength = 5000
) )
// NewGossipSub returns a new PubSub object using GossipSubRouter as the router. // NewGossipSub returns a new PubSub object using GossipSubRouter as the router.
@ -87,6 +94,7 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er
gossip: make(map[peer.ID][]*pb.ControlIHave), gossip: make(map[peer.ID][]*pb.ControlIHave),
control: make(map[peer.ID]*pb.ControlMessage), control: make(map[peer.ID]*pb.ControlMessage),
backoff: make(map[string]map[peer.ID]time.Time), backoff: make(map[string]map[peer.ID]time.Time),
iasked: make(map[peer.ID]int),
connect: make(chan connectInfo, GossipSubMaxPendingConnections), connect: make(chan connectInfo, GossipSubMaxPendingConnections),
mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength), mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength),
} }
@ -203,6 +211,7 @@ type GossipSubRouter struct {
lastpub map[string]int64 // last publish time for fanout topics lastpub map[string]int64 // last publish time for fanout topics
gossip map[peer.ID][]*pb.ControlIHave // pending gossip gossip map[peer.ID][]*pb.ControlIHave // pending gossip
control map[peer.ID]*pb.ControlMessage // pending control messages control map[peer.ID]*pb.ControlMessage // pending control messages
iasked map[peer.ID]int // messages we have asked for in the last heartbeat
backoff map[string]map[peer.ID]time.Time // prune backoff backoff map[string]map[peer.ID]time.Time // prune backoff
connect chan connectInfo // px connection requests connect chan connectInfo // px connection requests
mcache *MessageCache mcache *MessageCache
@ -364,6 +373,12 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb.
return nil return nil
} }
// IHAVE flood protection
if gs.iasked[p] >= GossipSubMaxIHaveLength {
log.Debugf("IHAVE: peer %s has already advertised too many messages; ignoring", p)
return nil
}
iwant := make(map[string]struct{}) iwant := make(map[string]struct{})
for _, ihave := range ctl.GetIhave() { for _, ihave := range ctl.GetIhave() {
topic := ihave.GetTopicID() topic := ihave.GetTopicID()
@ -384,7 +399,12 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb.
return nil return nil
} }
log.Debugf("IHAVE: Asking for %d messages from %s", len(iwant), p) iask := len(iwant)
if iask+gs.iasked[p] > GossipSubMaxIHaveLength {
iask = GossipSubMaxIHaveLength - gs.iasked[p]
}
log.Debugf("IHAVE: Asking for %d messages from %s", iask, p)
iwantlst := make([]string, 0, len(iwant)) iwantlst := make([]string, 0, len(iwant))
for mid := range iwant { for mid := range iwant {
@ -394,6 +414,10 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb.
// ask in random order // ask in random order
shuffleStrings(iwantlst) shuffleStrings(iwantlst)
// truncate to the messages we are actually asking for and update the iasked counter
iwantlst = iwantlst[:iask]
gs.iasked[p] += iask
return []*pb.ControlIWant{&pb.ControlIWant{MessageIDs: iwantlst}} return []*pb.ControlIWant{&pb.ControlIWant{MessageIDs: iwantlst}}
} }
@ -867,6 +891,9 @@ func (gs *GossipSubRouter) heartbeat() {
// clean up expired backoffs // clean up expired backoffs
gs.clearBackoff() gs.clearBackoff()
// clean up iasked counters
gs.clearIasked()
// ensure direct peers are connected // ensure direct peers are connected
gs.directConnect() gs.directConnect()
@ -1039,6 +1066,13 @@ func (gs *GossipSubRouter) heartbeat() {
gs.mcache.Shift() gs.mcache.Shift()
} }
func (gs *GossipSubRouter) clearIasked() {
if len(gs.iasked) > 0 {
// throw away the old map and make a new one
gs.iasked = make(map[peer.ID]int)
}
}
func (gs *GossipSubRouter) clearBackoff() { func (gs *GossipSubRouter) clearBackoff() {
// we only clear once every 15 ticks to avoid iterating over the map(s) too much // we only clear once every 15 ticks to avoid iterating over the map(s) too much
if gs.heartbeatTicks%15 != 0 { if gs.heartbeatTicks%15 != 0 {
@ -1123,6 +1157,15 @@ func (gs *GossipSubRouter) emitGossip(topic string, exclude map[peer.ID]struct{}
return return
} }
// shuffle to emit in random order
shuffleStrings(mids)
// if we are emitting more than GossipSubMaxIHaveLength mids, truncate the list
if len(mids) > GossipSubMaxIHaveLength {
log.Debugf("too many messages for gossip; truncating IHAVE list (%d messages)", len(mids))
mids = mids[:GossipSubMaxIHaveLength]
}
// Send gossip to GossipFactor peers above threshold, with a minimum of D_lazy. // Send gossip to GossipFactor peers above threshold, with a minimum of D_lazy.
// First we collect the peers above gossipThreshold that are not in the exclude set // First we collect the peers above gossipThreshold that are not in the exclude set
// and then randomly select from that set. // and then randomly select from that set.