diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index cd69c5fdf..eda2e8cf9 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -312,22 +312,29 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) var prunes, grafts: seq[PubSubPeer] npeers = g.mesh.peers(topic) + defaultMesh: HashSet[PubSubPeer] + backingOff = g.backingOff.getOrDefault(topic) if npeers < g.parameters.dLow: trace "replenishing mesh", peers = npeers # replenish the mesh if we're below Dlo - var candidates = toSeq( - g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) - - g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()) - ).filterIt( - it.connected and - # avoid negative score peers - it.score >= 0.0 and - # don't pick explicit peers - it.peerId notin g.parameters.directPeers and - # and avoid peers we are backing off - it.peerId notin g.backingOff.getOrDefault(topic) - ) + + var + candidates: seq[PubSubPeer] + currentMesh = addr defaultMesh + g.mesh.withValue(topic, v): currentMesh = v + g.gossipSub.withValue(topic, peerList): + for it in peerList[]: + if + it.connected and + # avoid negative score peers + it.score >= 0.0 and + it notin currentMesh[] and + # don't pick explicit peers + it.peerId notin g.parameters.directPeers and + # and avoid peers we are backing off + it.peerId notin backingOff: + candidates.add(it) # shuffle anyway, score might be not used g.rng.shuffle(candidates) @@ -348,39 +355,43 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) grafts &= peer else: - trace "replenishing mesh outbound quota", peers = g.mesh.peers(topic) + trace "replenishing mesh outbound quota", peers = g.mesh.peers(topic) - var candidates = toSeq( - g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) - - g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()) - ).filterIt( - it.connected and - # get only outbound ones - it.outbound and - # avoid negative score peers - it.score >= 0.0 and - # don't pick explicit peers - it.peerId notin g.parameters.directPeers and - # and avoid peers we are backing off - it.peerId notin g.backingOff.getOrDefault(topic) - ) + var + candidates: seq[PubSubPeer] + currentMesh = addr defaultMesh + g.mesh.withValue(topic, v): currentMesh = v + g.gossipSub.withValue(topic, peerList): + for it in peerList[]: + if + it.connected and + # get only outbound ones + it.outbound and + it notin currentMesh[] and + # avoid negative score peers + it.score >= 0.0 and + # don't pick explicit peers + it.peerId notin g.parameters.directPeers and + # and avoid peers we are backing off + it.peerId notin backingOff: + candidates.add(it) - # shuffle anyway, score might be not used - g.rng.shuffle(candidates) + # shuffle anyway, score might be not used + g.rng.shuffle(candidates) - # sort peers by score, high score first, we are grafting - candidates.sort(byScore, SortOrder.Descending) + # sort peers by score, high score first, we are grafting + candidates.sort(byScore, SortOrder.Descending) - # Graft peers so we reach a count of D - candidates.setLen(min(candidates.len, g.parameters.dOut)) + # Graft peers so we reach a count of D + candidates.setLen(min(candidates.len, g.parameters.dOut)) - trace "grafting outbound peers", topic, peers = candidates.len + trace "grafting outbound peers", topic, peers = candidates.len - for peer in candidates: - if g.mesh.addPeer(topic, peer): - g.grafted(peer, topic) - g.fanout.removePeer(topic, peer) - grafts &= peer + for peer in candidates: + if g.mesh.addPeer(topic, peer): + g.grafted(peer, topic) + g.fanout.removePeer(topic, peer) + grafts &= peer # get again npeers after possible grafts @@ -439,6 +450,8 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) g.pruned(peer, topic) g.mesh.removePeer(topic, peer) + backingOff = g.backingOff.getOrDefault(topic) + # opportunistic grafting, by spec mesh should not be empty... if g.mesh.peers(topic) > 1: var peers = toSeq(try: g.mesh[topic] except KeyError: raiseAssert "have peers") @@ -448,22 +461,26 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) let median = peers[medianIdx] if median.score < g.parameters.opportunisticGraftThreshold: trace "median score below opportunistic threshold", score = median.score - var avail = toSeq( - g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) - - g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()) - ) - avail.keepIf do (x: PubSubPeer) -> bool: - # avoid negative score peers - x.score >= median.score and - # don't pick explicit peers - x.peerId notin g.parameters.directPeers and - # and avoid peers we are backing off - x.peerId notin g.backingOff.getOrDefault(topic) + var + avail: seq[PubSubPeer] + currentMesh = addr defaultMesh + g.mesh.withValue(topic, v): currentMesh = v + g.gossipSub.withValue(topic, peerList): + for it in peerList[]: + if + # avoid negative score peers + it.score >= median.score and + it notin currentMesh[] and + # don't pick explicit peers + it.peerId notin g.parameters.directPeers and + # and avoid peers we are backing off + it.peerId notin backingOff: + avail.add(it) - # by spec, grab only 2 - if avail.len > 2: - avail.setLen(2) + # by spec, grab only 2 + if avail.len > 1: + break for peer in avail: if g.mesh.addPeer(topic, peer):