precompute outbound peers on AddPeer, add Dout
This commit is contained in:
parent
7d320ac87a
commit
d74def1661
31
gossipsub.go
31
gossipsub.go
|
@ -28,6 +28,7 @@ var (
|
||||||
GossipSubDlo = 5
|
GossipSubDlo = 5
|
||||||
GossipSubDhi = 12
|
GossipSubDhi = 12
|
||||||
GossipSubDscore = 4
|
GossipSubDscore = 4
|
||||||
|
GossipSubDout = 2
|
||||||
|
|
||||||
// gossip parameters
|
// gossip parameters
|
||||||
GossipSubHistoryLength = 5
|
GossipSubHistoryLength = 5
|
||||||
|
@ -106,6 +107,7 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er
|
||||||
backoff: make(map[string]map[peer.ID]time.Time),
|
backoff: make(map[string]map[peer.ID]time.Time),
|
||||||
peerhave: make(map[peer.ID]int),
|
peerhave: make(map[peer.ID]int),
|
||||||
iasked: make(map[peer.ID]int),
|
iasked: make(map[peer.ID]int),
|
||||||
|
outbound: make(map[peer.ID]bool),
|
||||||
connect: make(chan connectInfo, GossipSubMaxPendingConnections),
|
connect: make(chan connectInfo, GossipSubMaxPendingConnections),
|
||||||
mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength),
|
mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength),
|
||||||
|
|
||||||
|
@ -114,6 +116,7 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er
|
||||||
Dlo: GossipSubDlo,
|
Dlo: GossipSubDlo,
|
||||||
Dhi: GossipSubDhi,
|
Dhi: GossipSubDhi,
|
||||||
Dscore: GossipSubDscore,
|
Dscore: GossipSubDscore,
|
||||||
|
Dout: GossipSubDout,
|
||||||
Dlazy: GossipSubDlazy,
|
Dlazy: GossipSubDlazy,
|
||||||
|
|
||||||
// these must be pulled in to resolve races in tests... sigh.
|
// these must be pulled in to resolve races in tests... sigh.
|
||||||
|
@ -243,6 +246,7 @@ type GossipSubRouter struct {
|
||||||
control map[peer.ID]*pb.ControlMessage // pending control messages
|
control map[peer.ID]*pb.ControlMessage // pending control messages
|
||||||
peerhave map[peer.ID]int // number of IHAVEs received from peer in the last heartbeat
|
peerhave map[peer.ID]int // number of IHAVEs received from peer in the last heartbeat
|
||||||
iasked map[peer.ID]int // number of messages we have asked from peer in the last heartbeat
|
iasked map[peer.ID]int // number of messages we have asked from peer in the last heartbeat
|
||||||
|
outbound map[peer.ID]bool // connection direction cache, marks peers with outbound connections
|
||||||
backoff map[string]map[peer.ID]time.Time // prune backoff
|
backoff map[string]map[peer.ID]time.Time // prune backoff
|
||||||
connect chan connectInfo // px connection requests
|
connect chan connectInfo // px connection requests
|
||||||
|
|
||||||
|
@ -284,7 +288,7 @@ type GossipSubRouter struct {
|
||||||
// overly parameter "constants"
|
// overly parameter "constants"
|
||||||
// these are pulled from their global value or else the race detector is angry on travis
|
// these are pulled from their global value or else the race detector is angry on travis
|
||||||
// it also allows us to change them per peer in tests, which is a plus
|
// it also allows us to change them per peer in tests, which is a plus
|
||||||
D, Dlo, Dhi, Dscore, Dlazy int
|
D, Dlo, Dhi, Dscore, Dout, Dlazy int
|
||||||
|
|
||||||
// tick "constants" for triggering direct connect and opportunistic grafting
|
// tick "constants" for triggering direct connect and opportunistic grafting
|
||||||
// these are pulled from their global value or else the race detector is angry on travis
|
// these are pulled from their global value or else the race detector is angry on travis
|
||||||
|
@ -348,6 +352,17 @@ func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
|
||||||
if direct {
|
if direct {
|
||||||
gs.p.host.ConnManager().TagPeer(p, "pubsub:direct", 1000)
|
gs.p.host.ConnManager().TagPeer(p, "pubsub:direct", 1000)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// track the connection direction
|
||||||
|
outbound := false
|
||||||
|
conns := gs.p.host.Network().ConnsToPeer(p)
|
||||||
|
for _, c := range conns {
|
||||||
|
if c.Stat().Direction == network.DirOutbound {
|
||||||
|
outbound = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
gs.outbound[p] = outbound
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gs *GossipSubRouter) RemovePeer(p peer.ID) {
|
func (gs *GossipSubRouter) RemovePeer(p peer.ID) {
|
||||||
|
@ -362,6 +377,7 @@ func (gs *GossipSubRouter) RemovePeer(p peer.ID) {
|
||||||
}
|
}
|
||||||
delete(gs.gossip, p)
|
delete(gs.gossip, p)
|
||||||
delete(gs.control, p)
|
delete(gs.control, p)
|
||||||
|
delete(gs.outbound, p)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool {
|
func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool {
|
||||||
|
@ -590,7 +606,7 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
|
||||||
// check the number of mesh peers; if it is at (or over) Dhi, we only accept grafts
|
// check the number of mesh peers; if it is at (or over) Dhi, we only accept grafts
|
||||||
// from peers with outbound connections; this is a defensive check to restrict potential
|
// from peers with outbound connections; this is a defensive check to restrict potential
|
||||||
// mesh takeover attacks combined with love bombing
|
// mesh takeover attacks combined with love bombing
|
||||||
if len(peers) >= gs.Dhi && !gs.isOutboundConnection(p) {
|
if len(peers) >= gs.Dhi && !gs.outbound[p] {
|
||||||
prune = append(prune, topic)
|
prune = append(prune, topic)
|
||||||
gs.addBackoff(p, topic)
|
gs.addBackoff(p, topic)
|
||||||
continue
|
continue
|
||||||
|
@ -649,17 +665,6 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gs *GossipSubRouter) isOutboundConnection(p peer.ID) bool {
|
|
||||||
conns := gs.p.host.Network().ConnsToPeer(p)
|
|
||||||
for _, c := range conns {
|
|
||||||
if c.Stat().Direction == network.DirOutbound {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string) {
|
func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string) {
|
||||||
gs.doAddBackoff(p, topic, GossipSubPruneBackoff)
|
gs.doAddBackoff(p, topic, GossipSubPruneBackoff)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue