diff --git a/libp2p.nimble b/libp2p.nimble index ef37b2dca..fbdb3f410 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -47,6 +47,7 @@ task testinterop, "Runs interop tests": runTest("testinterop") task testpubsub, "Runs pubsub tests": + runTest("pubsub/testgossipinternal", sign = false, verify = false, moreoptions = "-d:pubsub_internal_testing") runTest("pubsub/testpubsub") runTest("pubsub/testpubsub", sign = false, verify = false) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 293f741db..cebda5217 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -370,7 +370,8 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = var grafts, prunes, grafting: seq[PubSubPeer] - if g.mesh.peers(topic) < GossipSubDlo: + let npeers = g.mesh.peers(topic) + if npeers < GossipSubDlo: trace "replenishing mesh", peers = g.mesh.peers(topic) # replenish the mesh if we're below Dlo grafts = toSeq( @@ -409,15 +410,44 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = g.grafted(peer, topic) g.fanout.removePeer(topic, peer) grafting &= peer + elif npeers < g.parameters.dOut: + # TODO + discard if g.mesh.peers(topic) > GossipSubDhi: # prune peers if we've gone over Dhi prunes = toSeq(g.mesh[topic]) - shuffle(prunes) - prunes.setLen(prunes.len - GossipSubD) # .. down to D peers - trace "about to prune mesh", prunes = prunes.len + # we must try to keep outbound peers + # to keep an outbound mesh quota + # so we try to first prune inbound peers + # if none we add up some outbound + + var outbound: seq[PubSubPeer] + var inbound: seq[PubSubPeer] for peer in prunes: + if peer.outbound: + outbound &= peer + else: + inbound &= peer + + let pruneLen = inbound.len - GossipSubD + if pruneLen > 0: + # Ok we got some peers to prune, + # for this heartbeat let's prune those + shuffle(inbound) + inbound.setLen(pruneLen) + else: + # We could not find any inbound to prune + # Yet we are on Hi, so we need to cull outbound peers + let keepDOutLen = outbound.len - g.parameters.dOut + if keepDOutLen > 0: + shuffle(outbound) + outbound.setLen(keepDOutLen) + inbound &= outbound + + trace "about to prune mesh", prunes = inbound.len + for peer in inbound: g.pruned(peer, topic) g.mesh.removePeer(topic, peer) @@ -822,7 +852,7 @@ proc handleGraft(g: GossipSub, # If they send us a graft before they send us a subscribe, what should # we do? For now, we add them to mesh but don't add them to gossipsub. if topic in g.topics: - if g.mesh.peers(topic) < GossipSubDHi: + if g.mesh.peers(topic) < GossipSubDHi or peer.outbound: # In the spec, there's no mention of DHi here, but implicitly, a # peer will be removed from the mesh on next rebalance, so we don't want # this peer to push someone else out diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index cbdb1af27..2f9d3b4e7 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -93,7 +93,8 @@ proc send*( raise exc except CatchableError as exc: trace "exception sending pubsub message to peer", peer = $peer, msg = msg - p.unsubscribePeer(peer.peerId) + when not defined(pubsub_internal_testing): + p.unsubscribePeer(peer.peerId) raise exc proc broadcast*( @@ -207,6 +208,7 @@ method subscribePeer*(p: PubSub, peer: PeerID) {.base.} = ## let pubsubPeer = p.getOrCreatePeer(peer, p.codec) + pubsubPeer.outbound = true # flag as outbound if p.topics.len > 0: asyncCheck p.sendSubs(pubsubPeer, toSeq(p.topics.keys), true) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 100968b10..c55f09d24 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -50,6 +50,7 @@ type score*: float64 iWantBudget*: int + outbound*: bool RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} diff --git a/tests/pubsub/testpubsub.nim b/tests/pubsub/testpubsub.nim index 70f47b3c8..a4f72dd1b 100644 --- a/tests/pubsub/testpubsub.nim +++ b/tests/pubsub/testpubsub.nim @@ -1,7 +1,6 @@ {.used.} -import testgossipinternal, - testfloodsub, +import testfloodsub, testgossipsub, testmcache, testmessage