diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index faa4570..d810960 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -411,40 +411,40 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = # create a mesh topic that we're subscribing to var - grafts, prunes, grafting: seq[PubSubPeer] + prunes, grafts: seq[PubSubPeer] let npeers = g.mesh.peers(topic) if npeers < g.parameters.dLow: - trace "replenishing mesh", peers = g.mesh.peers(topic) + trace "replenishing mesh", peers = npeers # replenish the mesh if we're below Dlo - grafts = toSeq( + var candidates = toSeq( g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) - g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()) - ).filterIt(it.connected) - - grafts.keepIf do (x: PubSubPeer) -> bool: + ).filterIt( + it.connected and # avoid negative score peers - x.score >= 0.0 and + it.score >= 0.0 and # don't pick explicit peers - x.peerId notin g.parameters.directPeers and + it.peerId notin g.parameters.directPeers and # and avoid peers we are backing off - x.peerId notin g.backingOff + it.peerId notin g.backingOff + ) # shuffle anyway, score might be not used - shuffle(grafts) + shuffle(candidates) # sort peers by score, high score first since we graft - grafts.sort(byScore, SortOrder.Descending) + candidates.sort(byScore, SortOrder.Descending) # Graft peers so we reach a count of D - grafts.setLen(min(grafts.len, g.parameters.d - g.mesh.peers(topic))) + candidates.setLen(min(candidates.len, g.parameters.d - npeers)) - trace "grafting", grafts = grafts.len - for peer in grafts: + trace "grafting", grafting = candidates.len + for peer in candidates: if g.mesh.addPeer(topic, peer): g.grafted(peer, topic) g.fanout.removePeer(topic, peer) - grafting &= peer + grafts &= peer else: var meshPeers = toSeq(g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]())) @@ -452,46 +452,45 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = if meshPeers.len < g.parameters.dOut: trace "replenishing mesh outbound quota", peers = g.mesh.peers(topic) - grafts = toSeq( + var candidates = toSeq( g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) - g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()) + ).filterIt( + it.connected and + # get only outbound ones + it.outbound and + # avoid negative score peers + it.score >= 0.0 and + # don't pick explicit peers + it.peerId notin g.parameters.directPeers and + # and avoid peers we are backing off + it.peerId notin g.backingOff ) - grafts.keepIf do (x: PubSubPeer) -> bool: - # get only outbound ones - x.outbound and - # avoid negative score peers - x.score >= 0.0 and - # don't pick explicit peers - x.peerId notin g.parameters.directPeers and - # and avoid peers we are backing off - x.peerId notin g.backingOff - # shuffle anyway, score might be not used - shuffle(grafts) + shuffle(candidates) # sort peers by score, high score first, we are grafting - grafts.sort(byScore, SortOrder.Descending) + candidates.sort(byScore, SortOrder.Descending) # Graft peers so we reach a count of D - grafts.setLen(min(grafts.len, g.parameters.dOut)) + candidates.setLen(min(candidates.len, g.parameters.dOut)) - trace "grafting outbound peers", topic, peers = grafts.len + trace "grafting outbound peers", topic, peers = candidates.len - for peer in grafts: + for peer in candidates: if g.mesh.addPeer(topic, peer): g.grafted(peer, topic) g.fanout.removePeer(topic, peer) - grafting &= peer + grafts &= peer if g.mesh.peers(topic) > g.parameters.dHigh: # prune peers if we've gone over Dhi prunes = toSeq(g.mesh[topic]) # avoid pruning peers we are currently grafting in this heartbeat - prunes.keepIf do (x: PubSubPeer) -> bool: x notin grafting - let mesh = prunes - + prunes.keepIf do (x: PubSubPeer) -> bool: x notin grafts + # shuffle anyway, score might be not used shuffle(prunes) @@ -511,28 +510,25 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = else: inbound &= peer + let + meshOutbound = prunes.countIt(it.outbound) + maxOutboundPrunes = meshOutbound - g.parameters.dOut + # ensure that there are at least D_out peers first and rebalance to g.d after that - let maxOutboundPrunes = - block: - var count = 0 - for peer in mesh: - if peer.outbound: - inc count - count - g.parameters.dOut outbound.setLen(min(outbound.len, max(0, maxOutboundPrunes))) # concat remaining outbound peers - inbound &= outbound + prunes = inbound & outbound - let pruneLen = inbound.len - g.parameters.d + let pruneLen = prunes.len - g.parameters.d if pruneLen > 0: # Ok we got some peers to prune, # for this heartbeat let's prune those - shuffle(inbound) - inbound.setLen(pruneLen) + shuffle(prunes) + prunes.setLen(pruneLen) - trace "pruning", prunes = inbound.len - for peer in inbound: + trace "pruning", prunes = prunes.len + for peer in prunes: g.pruned(peer, topic) g.mesh.removePeer(topic, peer) @@ -565,7 +561,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = for peer in avail: if g.mesh.addPeer(topic, peer): g.grafted(peer, topic) - grafting &= peer + grafts &= peer trace "opportunistic grafting", peer when defined(libp2p_expensive_metrics):