diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim index f711c0c7a..c4ab0015f 100644 --- a/libp2p/protocols/pubsub/gossipsub/behavior.nim +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -164,7 +164,8 @@ proc handleGraft*(g: GossipSub, # If they send us a graft before they send us a subscribe, what should # we do? For now, we add them to mesh but don't add them to gossipsub. if topic in g.topics: - if g.mesh.peers(topic) < g.parameters.dHigh or peer.outbound: + if g.mesh.peers(topic) < g.parameters.dHigh or + (peer.outbound and g.mesh.outboundPeers(topic) < g.parameters.dOut): # In the spec, there's no mention of DHi here, but implicitly, a # peer will be removed from the mesh on next rebalance, so we don't want # this peer to push someone else out @@ -328,10 +329,11 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) var prunes, grafts: seq[PubSubPeer] npeers = g.mesh.peers(topic) + nOutPeers = g.mesh.outboundPeers(topic) defaultMesh: HashSet[PubSubPeer] backingOff = g.backingOff.getOrDefault(topic) - if npeers < g.parameters.dLow: + if npeers < g.parameters.dLow: trace "replenishing mesh", peers = npeers # replenish the mesh if we're below Dlo @@ -370,7 +372,7 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) g.fanout.removePeer(topic, peer) grafts &= peer - else: + elif nOutPeers < g.parameters.dOut: trace "replenishing mesh outbound quota", peers = g.mesh.peers(topic) var @@ -398,8 +400,8 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) # sort peers by score, high score first, we are grafting candidates.sort(byScore, SortOrder.Descending) - # Graft peers so we reach a count of D - candidates.setLen(min(candidates.len, g.parameters.dOut)) + # Graft outgoing peers so we reach a count of dOut + candidates.setLen(min(candidates.len, g.parameters.dOut - nOutPeers)) trace "grafting outbound peers", topic, peers = candidates.len diff --git a/libp2p/protocols/pubsub/peertable.nim b/libp2p/protocols/pubsub/peertable.nim index 7818ded1c..2f19befaf 100644 --- a/libp2p/protocols/pubsub/peertable.nim +++ b/libp2p/protocols/pubsub/peertable.nim @@ -12,7 +12,7 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} -import std/[tables, sets] +import std/[tables, sets, sequtils] import ./pubsubpeer, ../../peerid export tables, sets @@ -53,3 +53,10 @@ func peers*(table: PeerTable, topic: string): int = except KeyError: raiseAssert "checked with in" else: 0 + +func outboundPeers*(table: PeerTable, topic: string): int = + if topic in table: + try: table[topic].countIt(it.outbound) + except KeyError: raiseAssert "checked with in" + else: + 0