diff --git a/gossipsub.go b/gossipsub.go index 4ff1da3..9608bec 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -75,6 +75,13 @@ var ( // then there is no PRUNE response emitted. This protects against GRAFT floods and should be // less than GossipSubPruneBackoff. GossipSubGraftFloodThreshold = 10 * time.Second + + // Maximum number of messages to include in an IHAVE message. Also controls the maximum + // number of IHAVE ids we will accept and request with IWANT from a peer within a heartbeat, + // to protect from IHAVE floods. You should adjust this value from the default if your + // system is pushing more than 5000 messages in GossipSubHistoryGossip heartbeats; with the + // defaults this is 1666 messages/s. + GossipSubMaxIHaveLength = 5000 ) // NewGossipSub returns a new PubSub object using GossipSubRouter as the router. @@ -87,6 +94,7 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er gossip: make(map[peer.ID][]*pb.ControlIHave), control: make(map[peer.ID]*pb.ControlMessage), backoff: make(map[string]map[peer.ID]time.Time), + iasked: make(map[peer.ID]int), connect: make(chan connectInfo, GossipSubMaxPendingConnections), mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength), } @@ -203,6 +211,7 @@ type GossipSubRouter struct { lastpub map[string]int64 // last publish time for fanout topics gossip map[peer.ID][]*pb.ControlIHave // pending gossip control map[peer.ID]*pb.ControlMessage // pending control messages + iasked map[peer.ID]int // messages we have asked for in the last heartbeat backoff map[string]map[peer.ID]time.Time // prune backoff connect chan connectInfo // px connection requests mcache *MessageCache @@ -364,6 +373,12 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb. return nil } + // IHAVE flood protection + if gs.iasked[p] >= GossipSubMaxIHaveLength { + log.Debugf("IHAVE: peer %s has already advertised too many messages; ignoring", p) + return nil + } + iwant := make(map[string]struct{}) for _, ihave := range ctl.GetIhave() { topic := ihave.GetTopicID() @@ -384,7 +399,12 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb. return nil } - log.Debugf("IHAVE: Asking for %d messages from %s", len(iwant), p) + iask := len(iwant) + if iask+gs.iasked[p] > GossipSubMaxIHaveLength { + iask = GossipSubMaxIHaveLength - gs.iasked[p] + } + + log.Debugf("IHAVE: Asking for %d messages from %s", iask, p) iwantlst := make([]string, 0, len(iwant)) for mid := range iwant { @@ -394,6 +414,10 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb. // ask in random order shuffleStrings(iwantlst) + // truncate to the messages we are actually asking for and update the iasked counter + iwantlst = iwantlst[:iask] + gs.iasked[p] += iask + return []*pb.ControlIWant{&pb.ControlIWant{MessageIDs: iwantlst}} } @@ -867,6 +891,9 @@ func (gs *GossipSubRouter) heartbeat() { // clean up expired backoffs gs.clearBackoff() + // clean up iasked counters + gs.clearIasked() + // ensure direct peers are connected gs.directConnect() @@ -1039,6 +1066,13 @@ func (gs *GossipSubRouter) heartbeat() { gs.mcache.Shift() } +func (gs *GossipSubRouter) clearIasked() { + if len(gs.iasked) > 0 { + // throw away the old map and make a new one + gs.iasked = make(map[peer.ID]int) + } +} + func (gs *GossipSubRouter) clearBackoff() { // we only clear once every 15 ticks to avoid iterating over the map(s) too much if gs.heartbeatTicks%15 != 0 { @@ -1123,6 +1157,15 @@ func (gs *GossipSubRouter) emitGossip(topic string, exclude map[peer.ID]struct{} return } + // shuffle to emit in random order + shuffleStrings(mids) + + // if we are emitting more than GossipSubMaxIHaveLength mids, truncate the list + if len(mids) > GossipSubMaxIHaveLength { + log.Debugf("too many messages for gossip; truncating IHAVE list (%d messages)", len(mids)) + mids = mids[:GossipSubMaxIHaveLength] + } + // Send gossip to GossipFactor peers above threshold, with a minimum of D_lazy. // First we collect the peers above gossipThreshold that are not in the exclude set // and then randomly select from that set.