mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-04 05:43:06 +00:00
pull overlay parameters into the GossipSubRouter struct
pacify the race detector in travis
This commit is contained in:
parent
caffc3bf2c
commit
5397bb5abc
40
gossipsub.go
40
gossipsub.go
@ -107,7 +107,14 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er
|
|||||||
iasked: make(map[peer.ID]int),
|
iasked: make(map[peer.ID]int),
|
||||||
connect: make(chan connectInfo, GossipSubMaxPendingConnections),
|
connect: make(chan connectInfo, GossipSubMaxPendingConnections),
|
||||||
mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength),
|
mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength),
|
||||||
|
|
||||||
// these must be pulled in to resolve races in tests... sigh.
|
// these must be pulled in to resolve races in tests... sigh.
|
||||||
|
D: GossipSubD,
|
||||||
|
Dlo: GossipSubDlo,
|
||||||
|
Dhi: GossipSubDhi,
|
||||||
|
Dscore: GossipSubDscore,
|
||||||
|
Dlazy: GossipSubDlazy,
|
||||||
|
|
||||||
directConnectTicks: GossipSubDirectConnectTicks,
|
directConnectTicks: GossipSubDirectConnectTicks,
|
||||||
opportunisticGraftTicks: GossipSubOpportunisticGraftTicks,
|
opportunisticGraftTicks: GossipSubOpportunisticGraftTicks,
|
||||||
}
|
}
|
||||||
@ -262,6 +269,11 @@ type GossipSubRouter struct {
|
|||||||
// clean up -- eg backoff clean up.
|
// clean up -- eg backoff clean up.
|
||||||
heartbeatTicks uint64
|
heartbeatTicks uint64
|
||||||
|
|
||||||
|
// overly parameter "constants"
|
||||||
|
// these are pulled from their global value or else the race detector is angry on travis
|
||||||
|
// it also allows us to change them per peer in tests, which is a plus
|
||||||
|
D, Dlo, Dhi, Dscore, Dlazy int
|
||||||
|
|
||||||
// tick "constants" for triggering direct connect and opportunistic grafting
|
// tick "constants" for triggering direct connect and opportunistic grafting
|
||||||
// these are pulled from their global value or else the race detector is angry on travis
|
// these are pulled from their global value or else the race detector is angry on travis
|
||||||
directConnectTicks, opportunisticGraftTicks uint64
|
directConnectTicks, opportunisticGraftTicks uint64
|
||||||
@ -352,10 +364,10 @@ func (gs *GossipSubRouter) EnoughPeers(topic string, suggested int) bool {
|
|||||||
gsPeers = len(gs.mesh[topic])
|
gsPeers = len(gs.mesh[topic])
|
||||||
|
|
||||||
if suggested == 0 {
|
if suggested == 0 {
|
||||||
suggested = GossipSubDlo
|
suggested = gs.Dlo
|
||||||
}
|
}
|
||||||
|
|
||||||
if fsPeers+gsPeers >= suggested || gsPeers >= GossipSubDhi {
|
if fsPeers+gsPeers >= suggested || gsPeers >= gs.Dhi {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -741,7 +753,7 @@ func (gs *GossipSubRouter) Publish(msg *Message) {
|
|||||||
gmap, ok = gs.fanout[topic]
|
gmap, ok = gs.fanout[topic]
|
||||||
if !ok || len(gmap) == 0 {
|
if !ok || len(gmap) == 0 {
|
||||||
// we don't have any, pick some with score above the publish threshold
|
// we don't have any, pick some with score above the publish threshold
|
||||||
peers := gs.getPeers(topic, GossipSubD, func(p peer.ID) bool {
|
peers := gs.getPeers(topic, gs.D, func(p peer.ID) bool {
|
||||||
return gs.score.Score(p) >= gs.publishThreshold
|
return gs.score.Score(p) >= gs.publishThreshold
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -787,9 +799,9 @@ func (gs *GossipSubRouter) Join(topic string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(gmap) < GossipSubD {
|
if len(gmap) < gs.D {
|
||||||
// we need more peers; eager, as this would get fixed in the next heartbeat
|
// we need more peers; eager, as this would get fixed in the next heartbeat
|
||||||
more := gs.getPeers(topic, GossipSubD-len(gmap), func(p peer.ID) bool {
|
more := gs.getPeers(topic, gs.D-len(gmap), func(p peer.ID) bool {
|
||||||
// filter our current peers, direct peers, and peers with negative scores
|
// filter our current peers, direct peers, and peers with negative scores
|
||||||
_, inMesh := gmap[p]
|
_, inMesh := gmap[p]
|
||||||
_, direct := gs.direct[p]
|
_, direct := gs.direct[p]
|
||||||
@ -803,7 +815,7 @@ func (gs *GossipSubRouter) Join(topic string) {
|
|||||||
delete(gs.fanout, topic)
|
delete(gs.fanout, topic)
|
||||||
delete(gs.lastpub, topic)
|
delete(gs.lastpub, topic)
|
||||||
} else {
|
} else {
|
||||||
peers := gs.getPeers(topic, GossipSubD, func(p peer.ID) bool {
|
peers := gs.getPeers(topic, gs.D, func(p peer.ID) bool {
|
||||||
// filter direct peers and peers with negative score
|
// filter direct peers and peers with negative score
|
||||||
_, direct := gs.direct[p]
|
_, direct := gs.direct[p]
|
||||||
return !direct && gs.score.Score(p) >= 0
|
return !direct && gs.score.Score(p) >= 0
|
||||||
@ -973,9 +985,9 @@ func (gs *GossipSubRouter) heartbeat() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// do we have enough peers?
|
// do we have enough peers?
|
||||||
if l := len(peers); l < GossipSubDlo {
|
if l := len(peers); l < gs.Dlo {
|
||||||
backoff := gs.backoff[topic]
|
backoff := gs.backoff[topic]
|
||||||
ineed := GossipSubD - l
|
ineed := gs.D - l
|
||||||
plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
|
plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
|
||||||
// filter our current and direct peers, peers we are backing off, and peers with negative score
|
// filter our current and direct peers, peers we are backing off, and peers with negative score
|
||||||
_, inMesh := peers[p]
|
_, inMesh := peers[p]
|
||||||
@ -990,7 +1002,7 @@ func (gs *GossipSubRouter) heartbeat() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// do we have too many peers?
|
// do we have too many peers?
|
||||||
if len(peers) > GossipSubDhi {
|
if len(peers) > gs.Dhi {
|
||||||
plst := peerMapToList(peers)
|
plst := peerMapToList(peers)
|
||||||
|
|
||||||
// sort by score (but shuffle first for the case we don't use the score)
|
// sort by score (but shuffle first for the case we don't use the score)
|
||||||
@ -1000,8 +1012,8 @@ func (gs *GossipSubRouter) heartbeat() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
// We keep the first D_score peers by score and the remaining up to D_lo randomly
|
// We keep the first D_score peers by score and the remaining up to D_lo randomly
|
||||||
shufflePeers(plst[GossipSubDscore:])
|
shufflePeers(plst[gs.Dscore:])
|
||||||
for _, p := range plst[GossipSubD:] {
|
for _, p := range plst[gs.D:] {
|
||||||
log.Debugf("HEARTBEAT: Remove mesh link to %s in %s", p, topic)
|
log.Debugf("HEARTBEAT: Remove mesh link to %s in %s", p, topic)
|
||||||
prunePeer(p)
|
prunePeer(p)
|
||||||
}
|
}
|
||||||
@ -1075,8 +1087,8 @@ func (gs *GossipSubRouter) heartbeat() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// do we need more peers?
|
// do we need more peers?
|
||||||
if len(peers) < GossipSubD {
|
if len(peers) < gs.D {
|
||||||
ineed := GossipSubD - len(peers)
|
ineed := gs.D - len(peers)
|
||||||
plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
|
plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
|
||||||
// filter our current and direct peers and peers with score above the publish threshold
|
// filter our current and direct peers and peers with score above the publish threshold
|
||||||
_, inFanout := peers[p]
|
_, inFanout := peers[p]
|
||||||
@ -1222,7 +1234,7 @@ func (gs *GossipSubRouter) emitGossip(topic string, exclude map[peer.ID]struct{}
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
target := GossipSubDlazy
|
target := gs.Dlazy
|
||||||
factor := int(GossipSubGossipFactor * float64(len(peers)))
|
factor := int(GossipSubGossipFactor * float64(len(peers)))
|
||||||
if factor > target {
|
if factor > target {
|
||||||
target = factor
|
target = factor
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user