Optimize rebalanceMesh (#708)

This commit is contained in:
Tanguy 2022-05-25 12:59:33 +02:00 committed by GitHub
parent 60becadcf9
commit 7323ecc9c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -312,22 +312,29 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
var var
prunes, grafts: seq[PubSubPeer] prunes, grafts: seq[PubSubPeer]
npeers = g.mesh.peers(topic) npeers = g.mesh.peers(topic)
defaultMesh: HashSet[PubSubPeer]
backingOff = g.backingOff.getOrDefault(topic)
if npeers < g.parameters.dLow: if npeers < g.parameters.dLow:
trace "replenishing mesh", peers = npeers trace "replenishing mesh", peers = npeers
# replenish the mesh if we're below Dlo # replenish the mesh if we're below Dlo
var candidates = toSeq(
g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) - var
g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()) candidates: seq[PubSubPeer]
).filterIt( currentMesh = addr defaultMesh
it.connected and g.mesh.withValue(topic, v): currentMesh = v
# avoid negative score peers g.gossipSub.withValue(topic, peerList):
it.score >= 0.0 and for it in peerList[]:
# don't pick explicit peers if
it.peerId notin g.parameters.directPeers and it.connected and
# and avoid peers we are backing off # avoid negative score peers
it.peerId notin g.backingOff.getOrDefault(topic) it.score >= 0.0 and
) it notin currentMesh[] and
# don't pick explicit peers
it.peerId notin g.parameters.directPeers and
# and avoid peers we are backing off
it.peerId notin backingOff:
candidates.add(it)
# shuffle anyway, score might be not used # shuffle anyway, score might be not used
g.rng.shuffle(candidates) g.rng.shuffle(candidates)
@ -348,39 +355,43 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
grafts &= peer grafts &= peer
else: else:
trace "replenishing mesh outbound quota", peers = g.mesh.peers(topic) trace "replenishing mesh outbound quota", peers = g.mesh.peers(topic)
var candidates = toSeq( var
g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) - candidates: seq[PubSubPeer]
g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()) currentMesh = addr defaultMesh
).filterIt( g.mesh.withValue(topic, v): currentMesh = v
it.connected and g.gossipSub.withValue(topic, peerList):
# get only outbound ones for it in peerList[]:
it.outbound and if
# avoid negative score peers it.connected and
it.score >= 0.0 and # get only outbound ones
# don't pick explicit peers it.outbound and
it.peerId notin g.parameters.directPeers and it notin currentMesh[] and
# and avoid peers we are backing off # avoid negative score peers
it.peerId notin g.backingOff.getOrDefault(topic) it.score >= 0.0 and
) # don't pick explicit peers
it.peerId notin g.parameters.directPeers and
# and avoid peers we are backing off
it.peerId notin backingOff:
candidates.add(it)
# shuffle anyway, score might be not used # shuffle anyway, score might be not used
g.rng.shuffle(candidates) g.rng.shuffle(candidates)
# sort peers by score, high score first, we are grafting # sort peers by score, high score first, we are grafting
candidates.sort(byScore, SortOrder.Descending) candidates.sort(byScore, SortOrder.Descending)
# Graft peers so we reach a count of D # Graft peers so we reach a count of D
candidates.setLen(min(candidates.len, g.parameters.dOut)) candidates.setLen(min(candidates.len, g.parameters.dOut))
trace "grafting outbound peers", topic, peers = candidates.len trace "grafting outbound peers", topic, peers = candidates.len
for peer in candidates: for peer in candidates:
if g.mesh.addPeer(topic, peer): if g.mesh.addPeer(topic, peer):
g.grafted(peer, topic) g.grafted(peer, topic)
g.fanout.removePeer(topic, peer) g.fanout.removePeer(topic, peer)
grafts &= peer grafts &= peer
# get again npeers after possible grafts # get again npeers after possible grafts
@ -439,6 +450,8 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
g.pruned(peer, topic) g.pruned(peer, topic)
g.mesh.removePeer(topic, peer) g.mesh.removePeer(topic, peer)
backingOff = g.backingOff.getOrDefault(topic)
# opportunistic grafting, by spec mesh should not be empty... # opportunistic grafting, by spec mesh should not be empty...
if g.mesh.peers(topic) > 1: if g.mesh.peers(topic) > 1:
var peers = toSeq(try: g.mesh[topic] except KeyError: raiseAssert "have peers") var peers = toSeq(try: g.mesh[topic] except KeyError: raiseAssert "have peers")
@ -448,22 +461,26 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
let median = peers[medianIdx] let median = peers[medianIdx]
if median.score < g.parameters.opportunisticGraftThreshold: if median.score < g.parameters.opportunisticGraftThreshold:
trace "median score below opportunistic threshold", score = median.score trace "median score below opportunistic threshold", score = median.score
var avail = toSeq(
g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) -
g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]())
)
avail.keepIf do (x: PubSubPeer) -> bool: var
# avoid negative score peers avail: seq[PubSubPeer]
x.score >= median.score and currentMesh = addr defaultMesh
# don't pick explicit peers g.mesh.withValue(topic, v): currentMesh = v
x.peerId notin g.parameters.directPeers and g.gossipSub.withValue(topic, peerList):
# and avoid peers we are backing off for it in peerList[]:
x.peerId notin g.backingOff.getOrDefault(topic) if
# avoid negative score peers
it.score >= median.score and
it notin currentMesh[] and
# don't pick explicit peers
it.peerId notin g.parameters.directPeers and
# and avoid peers we are backing off
it.peerId notin backingOff:
avail.add(it)
# by spec, grab only 2 # by spec, grab only 2
if avail.len > 2: if avail.len > 1:
avail.setLen(2) break
for peer in avail: for peer in avail:
if g.mesh.addPeer(topic, peer): if g.mesh.addPeer(topic, peer):