Some gossip fixes (#467)
* fix some missing rpc in rebalanceMesh * clarify some variable names and lifetime * further improvements
This commit is contained in:
parent
94e672ead0
commit
b4738d723c
|
@ -411,40 +411,40 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
|||
# create a mesh topic that we're subscribing to
|
||||
|
||||
var
|
||||
grafts, prunes, grafting: seq[PubSubPeer]
|
||||
prunes, grafts: seq[PubSubPeer]
|
||||
|
||||
let npeers = g.mesh.peers(topic)
|
||||
if npeers < g.parameters.dLow:
|
||||
trace "replenishing mesh", peers = g.mesh.peers(topic)
|
||||
trace "replenishing mesh", peers = npeers
|
||||
# replenish the mesh if we're below Dlo
|
||||
grafts = toSeq(
|
||||
var candidates = toSeq(
|
||||
g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) -
|
||||
g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]())
|
||||
).filterIt(it.connected)
|
||||
|
||||
grafts.keepIf do (x: PubSubPeer) -> bool:
|
||||
).filterIt(
|
||||
it.connected and
|
||||
# avoid negative score peers
|
||||
x.score >= 0.0 and
|
||||
it.score >= 0.0 and
|
||||
# don't pick explicit peers
|
||||
x.peerId notin g.parameters.directPeers and
|
||||
it.peerId notin g.parameters.directPeers and
|
||||
# and avoid peers we are backing off
|
||||
x.peerId notin g.backingOff
|
||||
it.peerId notin g.backingOff
|
||||
)
|
||||
|
||||
# shuffle anyway, score might be not used
|
||||
shuffle(grafts)
|
||||
shuffle(candidates)
|
||||
|
||||
# sort peers by score, high score first since we graft
|
||||
grafts.sort(byScore, SortOrder.Descending)
|
||||
candidates.sort(byScore, SortOrder.Descending)
|
||||
|
||||
# Graft peers so we reach a count of D
|
||||
grafts.setLen(min(grafts.len, g.parameters.d - g.mesh.peers(topic)))
|
||||
candidates.setLen(min(candidates.len, g.parameters.d - npeers))
|
||||
|
||||
trace "grafting", grafts = grafts.len
|
||||
for peer in grafts:
|
||||
trace "grafting", grafting = candidates.len
|
||||
for peer in candidates:
|
||||
if g.mesh.addPeer(topic, peer):
|
||||
g.grafted(peer, topic)
|
||||
g.fanout.removePeer(topic, peer)
|
||||
grafting &= peer
|
||||
grafts &= peer
|
||||
|
||||
else:
|
||||
var meshPeers = toSeq(g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()))
|
||||
|
@ -452,46 +452,45 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
|||
if meshPeers.len < g.parameters.dOut:
|
||||
trace "replenishing mesh outbound quota", peers = g.mesh.peers(topic)
|
||||
|
||||
grafts = toSeq(
|
||||
var candidates = toSeq(
|
||||
g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) -
|
||||
g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]())
|
||||
).filterIt(
|
||||
it.connected and
|
||||
# get only outbound ones
|
||||
it.outbound and
|
||||
# avoid negative score peers
|
||||
it.score >= 0.0 and
|
||||
# don't pick explicit peers
|
||||
it.peerId notin g.parameters.directPeers and
|
||||
# and avoid peers we are backing off
|
||||
it.peerId notin g.backingOff
|
||||
)
|
||||
|
||||
grafts.keepIf do (x: PubSubPeer) -> bool:
|
||||
# get only outbound ones
|
||||
x.outbound and
|
||||
# avoid negative score peers
|
||||
x.score >= 0.0 and
|
||||
# don't pick explicit peers
|
||||
x.peerId notin g.parameters.directPeers and
|
||||
# and avoid peers we are backing off
|
||||
x.peerId notin g.backingOff
|
||||
|
||||
# shuffle anyway, score might be not used
|
||||
shuffle(grafts)
|
||||
shuffle(candidates)
|
||||
|
||||
# sort peers by score, high score first, we are grafting
|
||||
grafts.sort(byScore, SortOrder.Descending)
|
||||
candidates.sort(byScore, SortOrder.Descending)
|
||||
|
||||
# Graft peers so we reach a count of D
|
||||
grafts.setLen(min(grafts.len, g.parameters.dOut))
|
||||
candidates.setLen(min(candidates.len, g.parameters.dOut))
|
||||
|
||||
trace "grafting outbound peers", topic, peers = grafts.len
|
||||
trace "grafting outbound peers", topic, peers = candidates.len
|
||||
|
||||
for peer in grafts:
|
||||
for peer in candidates:
|
||||
if g.mesh.addPeer(topic, peer):
|
||||
g.grafted(peer, topic)
|
||||
g.fanout.removePeer(topic, peer)
|
||||
grafting &= peer
|
||||
grafts &= peer
|
||||
|
||||
|
||||
if g.mesh.peers(topic) > g.parameters.dHigh:
|
||||
# prune peers if we've gone over Dhi
|
||||
prunes = toSeq(g.mesh[topic])
|
||||
# avoid pruning peers we are currently grafting in this heartbeat
|
||||
prunes.keepIf do (x: PubSubPeer) -> bool: x notin grafting
|
||||
let mesh = prunes
|
||||
|
||||
prunes.keepIf do (x: PubSubPeer) -> bool: x notin grafts
|
||||
|
||||
# shuffle anyway, score might be not used
|
||||
shuffle(prunes)
|
||||
|
||||
|
@ -511,28 +510,25 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
|||
else:
|
||||
inbound &= peer
|
||||
|
||||
let
|
||||
meshOutbound = prunes.countIt(it.outbound)
|
||||
maxOutboundPrunes = meshOutbound - g.parameters.dOut
|
||||
|
||||
# ensure that there are at least D_out peers first and rebalance to g.d after that
|
||||
let maxOutboundPrunes =
|
||||
block:
|
||||
var count = 0
|
||||
for peer in mesh:
|
||||
if peer.outbound:
|
||||
inc count
|
||||
count - g.parameters.dOut
|
||||
outbound.setLen(min(outbound.len, max(0, maxOutboundPrunes)))
|
||||
|
||||
# concat remaining outbound peers
|
||||
inbound &= outbound
|
||||
prunes = inbound & outbound
|
||||
|
||||
let pruneLen = inbound.len - g.parameters.d
|
||||
let pruneLen = prunes.len - g.parameters.d
|
||||
if pruneLen > 0:
|
||||
# Ok we got some peers to prune,
|
||||
# for this heartbeat let's prune those
|
||||
shuffle(inbound)
|
||||
inbound.setLen(pruneLen)
|
||||
shuffle(prunes)
|
||||
prunes.setLen(pruneLen)
|
||||
|
||||
trace "pruning", prunes = inbound.len
|
||||
for peer in inbound:
|
||||
trace "pruning", prunes = prunes.len
|
||||
for peer in prunes:
|
||||
g.pruned(peer, topic)
|
||||
g.mesh.removePeer(topic, peer)
|
||||
|
||||
|
@ -565,7 +561,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
|||
for peer in avail:
|
||||
if g.mesh.addPeer(topic, peer):
|
||||
g.grafted(peer, topic)
|
||||
grafting &= peer
|
||||
grafts &= peer
|
||||
trace "opportunistic grafting", peer
|
||||
|
||||
when defined(libp2p_expensive_metrics):
|
||||
|
|
Loading…
Reference in New Issue