diff --git a/gossipsub.go b/gossipsub.go index 9608bec..e793e69 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -87,16 +87,17 @@ var ( // NewGossipSub returns a new PubSub object using GossipSubRouter as the router. func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, error) { rt := &GossipSubRouter{ - peers: make(map[peer.ID]protocol.ID), - mesh: make(map[string]map[peer.ID]struct{}), - fanout: make(map[string]map[peer.ID]struct{}), - lastpub: make(map[string]int64), - gossip: make(map[peer.ID][]*pb.ControlIHave), - control: make(map[peer.ID]*pb.ControlMessage), - backoff: make(map[string]map[peer.ID]time.Time), - iasked: make(map[peer.ID]int), - connect: make(chan connectInfo, GossipSubMaxPendingConnections), - mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength), + peers: make(map[peer.ID]protocol.ID), + mesh: make(map[string]map[peer.ID]struct{}), + fanout: make(map[string]map[peer.ID]struct{}), + lastpub: make(map[string]int64), + gossip: make(map[peer.ID][]*pb.ControlIHave), + control: make(map[peer.ID]*pb.ControlMessage), + backoff: make(map[string]map[peer.ID]time.Time), + peerhave: make(map[peer.ID]int), + iasked: make(map[peer.ID]int), + connect: make(chan connectInfo, GossipSubMaxPendingConnections), + mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength), } return NewPubSub(ctx, h, rt, opts...) } @@ -203,20 +204,21 @@ func WithDirectPeers(pis []peer.AddrInfo) Option { // is the fanout map. Fanout peer lists are expired if we don't publish any // messages to their topic for GossipSubFanoutTTL. type GossipSubRouter struct { - p *PubSub - peers map[peer.ID]protocol.ID // peer protocols - direct map[peer.ID]struct{} // direct peers - mesh map[string]map[peer.ID]struct{} // topic meshes - fanout map[string]map[peer.ID]struct{} // topic fanout - lastpub map[string]int64 // last publish time for fanout topics - gossip map[peer.ID][]*pb.ControlIHave // pending gossip - control map[peer.ID]*pb.ControlMessage // pending control messages - iasked map[peer.ID]int // messages we have asked for in the last heartbeat - backoff map[string]map[peer.ID]time.Time // prune backoff - connect chan connectInfo // px connection requests - mcache *MessageCache - tracer *pubsubTracer - score *peerScore + p *PubSub + peers map[peer.ID]protocol.ID // peer protocols + direct map[peer.ID]struct{} // direct peers + mesh map[string]map[peer.ID]struct{} // topic meshes + fanout map[string]map[peer.ID]struct{} // topic fanout + lastpub map[string]int64 // last publish time for fanout topics + gossip map[peer.ID][]*pb.ControlIHave // pending gossip + control map[peer.ID]*pb.ControlMessage // pending control messages + peerhave map[peer.ID]int // number of IHAVEs received from peer in the last heartbeat + iasked map[peer.ID]int // number of messages we have asked from peer in the last heartbeat + backoff map[string]map[peer.ID]time.Time // prune backoff + connect chan connectInfo // px connection requests + mcache *MessageCache + tracer *pubsubTracer + score *peerScore // whether PX is enabled; this should be enabled in bootstrappers and other well connected/trusted // nodes. @@ -374,6 +376,12 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb. } // IHAVE flood protection + if gs.peerhave[p] > 2 { + log.Debugf("IHAVE: peer %s has advertised too many times within this heartbeat interval; ignoring", p) + return nil + } + gs.peerhave[p]++ + if gs.iasked[p] >= GossipSubMaxIHaveLength { log.Debugf("IHAVE: peer %s has already advertised too many messages; ignoring", p) return nil @@ -892,7 +900,7 @@ func (gs *GossipSubRouter) heartbeat() { gs.clearBackoff() // clean up iasked counters - gs.clearIasked() + gs.clearIHaveCounters() // ensure direct peers are connected gs.directConnect() @@ -1066,7 +1074,12 @@ func (gs *GossipSubRouter) heartbeat() { gs.mcache.Shift() } -func (gs *GossipSubRouter) clearIasked() { +func (gs *GossipSubRouter) clearIHaveCounters() { + if len(gs.peerhave) > 0 { + // throw away the old map and make a new one + gs.peerhave = make(map[peer.ID]int) + } + if len(gs.iasked) > 0 { // throw away the old map and make a new one gs.iasked = make(map[peer.ID]int) diff --git a/gossipsub_spam_test.go b/gossipsub_spam_test.go index b2f90c6..73b46ed 100644 --- a/gossipsub_spam_test.go +++ b/gossipsub_spam_test.go @@ -215,13 +215,11 @@ func TestGossipsubAttackSpamIHAVE(t *testing.T) { // Should have sent more IWANTs after the heartbeat iwc = getIWantCount() - if iwc <= GossipSubMaxIHaveLength { + if iwc == firstBatchCount { t.Fatal("Expecting to receive more IWANTs after heartbeat but did not") } // Should not be more than the maximum per heartbeat - // note that we multiply by 2 because things may come in the middle of the heartbeat which - // results in a reset of the heartbeat counter (has been observed in travis) - if iwc-firstBatchCount > 2*GossipSubMaxIHaveLength { + if iwc-firstBatchCount > GossipSubMaxIHaveLength { t.Fatalf("Expecting max %d IWANTs per heartbeat but received %d", GossipSubMaxIHaveLength, iwc-firstBatchCount) } }()