diff --git a/gossipsub.go b/gossipsub.go index 75be76e..7eec5d3 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -28,6 +28,7 @@ var ( GossipSubDlo = 5 GossipSubDhi = 12 GossipSubDscore = 4 + GossipSubDout = 2 // gossip parameters GossipSubHistoryLength = 5 @@ -106,6 +107,7 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er backoff: make(map[string]map[peer.ID]time.Time), peerhave: make(map[peer.ID]int), iasked: make(map[peer.ID]int), + outbound: make(map[peer.ID]bool), connect: make(chan connectInfo, GossipSubMaxPendingConnections), mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength), @@ -114,6 +116,7 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er Dlo: GossipSubDlo, Dhi: GossipSubDhi, Dscore: GossipSubDscore, + Dout: GossipSubDout, Dlazy: GossipSubDlazy, // these must be pulled in to resolve races in tests... sigh. @@ -243,6 +246,7 @@ type GossipSubRouter struct { control map[peer.ID]*pb.ControlMessage // pending control messages peerhave map[peer.ID]int // number of IHAVEs received from peer in the last heartbeat iasked map[peer.ID]int // number of messages we have asked from peer in the last heartbeat + outbound map[peer.ID]bool // connection direction cache, marks peers with outbound connections backoff map[string]map[peer.ID]time.Time // prune backoff connect chan connectInfo // px connection requests @@ -284,7 +288,7 @@ type GossipSubRouter struct { // overly parameter "constants" // these are pulled from their global value or else the race detector is angry on travis // it also allows us to change them per peer in tests, which is a plus - D, Dlo, Dhi, Dscore, Dlazy int + D, Dlo, Dhi, Dscore, Dout, Dlazy int // tick "constants" for triggering direct connect and opportunistic grafting // these are pulled from their global value or else the race detector is angry on travis @@ -348,6 +352,17 @@ func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) { if direct { gs.p.host.ConnManager().TagPeer(p, "pubsub:direct", 1000) } + + // track the connection direction + outbound := false + conns := gs.p.host.Network().ConnsToPeer(p) + for _, c := range conns { + if c.Stat().Direction == network.DirOutbound { + outbound = true + break + } + } + gs.outbound[p] = outbound } func (gs *GossipSubRouter) RemovePeer(p peer.ID) { @@ -362,6 +377,7 @@ func (gs *GossipSubRouter) RemovePeer(p peer.ID) { } delete(gs.gossip, p) delete(gs.control, p) + delete(gs.outbound, p) } func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool { @@ -590,7 +606,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb. // check the number of mesh peers; if it is at (or over) Dhi, we only accept grafts // from peers with outbound connections; this is a defensive check to restrict potential // mesh takeover attacks combined with love bombing - if len(peers) >= gs.Dhi && !gs.isOutboundConnection(p) { + if len(peers) >= gs.Dhi && !gs.outbound[p] { prune = append(prune, topic) gs.addBackoff(p, topic) continue @@ -649,17 +665,6 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) { } } -func (gs *GossipSubRouter) isOutboundConnection(p peer.ID) bool { - conns := gs.p.host.Network().ConnsToPeer(p) - for _, c := range conns { - if c.Stat().Direction == network.DirOutbound { - return true - } - } - - return false -} - func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string) { gs.doAddBackoff(p, topic, GossipSubPruneBackoff) }