outbound mesh quota, internal tests fixing
This commit is contained in:
parent
309a4998c8
commit
69ff05a4f4
|
@ -47,6 +47,7 @@ task testinterop, "Runs interop tests":
|
||||||
runTest("testinterop")
|
runTest("testinterop")
|
||||||
|
|
||||||
task testpubsub, "Runs pubsub tests":
|
task testpubsub, "Runs pubsub tests":
|
||||||
|
runTest("pubsub/testgossipinternal", sign = false, verify = false, moreoptions = "-d:pubsub_internal_testing")
|
||||||
runTest("pubsub/testpubsub")
|
runTest("pubsub/testpubsub")
|
||||||
runTest("pubsub/testpubsub", sign = false, verify = false)
|
runTest("pubsub/testpubsub", sign = false, verify = false)
|
||||||
|
|
||||||
|
|
|
@ -370,7 +370,8 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
||||||
var
|
var
|
||||||
grafts, prunes, grafting: seq[PubSubPeer]
|
grafts, prunes, grafting: seq[PubSubPeer]
|
||||||
|
|
||||||
if g.mesh.peers(topic) < GossipSubDlo:
|
let npeers = g.mesh.peers(topic)
|
||||||
|
if npeers < GossipSubDlo:
|
||||||
trace "replenishing mesh", peers = g.mesh.peers(topic)
|
trace "replenishing mesh", peers = g.mesh.peers(topic)
|
||||||
# replenish the mesh if we're below Dlo
|
# replenish the mesh if we're below Dlo
|
||||||
grafts = toSeq(
|
grafts = toSeq(
|
||||||
|
@ -409,15 +410,44 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
||||||
g.grafted(peer, topic)
|
g.grafted(peer, topic)
|
||||||
g.fanout.removePeer(topic, peer)
|
g.fanout.removePeer(topic, peer)
|
||||||
grafting &= peer
|
grafting &= peer
|
||||||
|
elif npeers < g.parameters.dOut:
|
||||||
|
# TODO
|
||||||
|
discard
|
||||||
|
|
||||||
if g.mesh.peers(topic) > GossipSubDhi:
|
if g.mesh.peers(topic) > GossipSubDhi:
|
||||||
# prune peers if we've gone over Dhi
|
# prune peers if we've gone over Dhi
|
||||||
prunes = toSeq(g.mesh[topic])
|
prunes = toSeq(g.mesh[topic])
|
||||||
shuffle(prunes)
|
|
||||||
prunes.setLen(prunes.len - GossipSubD) # .. down to D peers
|
|
||||||
|
|
||||||
trace "about to prune mesh", prunes = prunes.len
|
# we must try to keep outbound peers
|
||||||
|
# to keep an outbound mesh quota
|
||||||
|
# so we try to first prune inbound peers
|
||||||
|
# if none we add up some outbound
|
||||||
|
|
||||||
|
var outbound: seq[PubSubPeer]
|
||||||
|
var inbound: seq[PubSubPeer]
|
||||||
for peer in prunes:
|
for peer in prunes:
|
||||||
|
if peer.outbound:
|
||||||
|
outbound &= peer
|
||||||
|
else:
|
||||||
|
inbound &= peer
|
||||||
|
|
||||||
|
let pruneLen = inbound.len - GossipSubD
|
||||||
|
if pruneLen > 0:
|
||||||
|
# Ok we got some peers to prune,
|
||||||
|
# for this heartbeat let's prune those
|
||||||
|
shuffle(inbound)
|
||||||
|
inbound.setLen(pruneLen)
|
||||||
|
else:
|
||||||
|
# We could not find any inbound to prune
|
||||||
|
# Yet we are on Hi, so we need to cull outbound peers
|
||||||
|
let keepDOutLen = outbound.len - g.parameters.dOut
|
||||||
|
if keepDOutLen > 0:
|
||||||
|
shuffle(outbound)
|
||||||
|
outbound.setLen(keepDOutLen)
|
||||||
|
inbound &= outbound
|
||||||
|
|
||||||
|
trace "about to prune mesh", prunes = inbound.len
|
||||||
|
for peer in inbound:
|
||||||
g.pruned(peer, topic)
|
g.pruned(peer, topic)
|
||||||
g.mesh.removePeer(topic, peer)
|
g.mesh.removePeer(topic, peer)
|
||||||
|
|
||||||
|
@ -822,7 +852,7 @@ proc handleGraft(g: GossipSub,
|
||||||
# If they send us a graft before they send us a subscribe, what should
|
# If they send us a graft before they send us a subscribe, what should
|
||||||
# we do? For now, we add them to mesh but don't add them to gossipsub.
|
# we do? For now, we add them to mesh but don't add them to gossipsub.
|
||||||
if topic in g.topics:
|
if topic in g.topics:
|
||||||
if g.mesh.peers(topic) < GossipSubDHi:
|
if g.mesh.peers(topic) < GossipSubDHi or peer.outbound:
|
||||||
# In the spec, there's no mention of DHi here, but implicitly, a
|
# In the spec, there's no mention of DHi here, but implicitly, a
|
||||||
# peer will be removed from the mesh on next rebalance, so we don't want
|
# peer will be removed from the mesh on next rebalance, so we don't want
|
||||||
# this peer to push someone else out
|
# this peer to push someone else out
|
||||||
|
|
|
@ -93,6 +93,7 @@ proc send*(
|
||||||
raise exc
|
raise exc
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "exception sending pubsub message to peer", peer = $peer, msg = msg
|
trace "exception sending pubsub message to peer", peer = $peer, msg = msg
|
||||||
|
when not defined(pubsub_internal_testing):
|
||||||
p.unsubscribePeer(peer.peerId)
|
p.unsubscribePeer(peer.peerId)
|
||||||
raise exc
|
raise exc
|
||||||
|
|
||||||
|
@ -207,6 +208,7 @@ method subscribePeer*(p: PubSub, peer: PeerID) {.base.} =
|
||||||
##
|
##
|
||||||
|
|
||||||
let pubsubPeer = p.getOrCreatePeer(peer, p.codec)
|
let pubsubPeer = p.getOrCreatePeer(peer, p.codec)
|
||||||
|
pubsubPeer.outbound = true # flag as outbound
|
||||||
if p.topics.len > 0:
|
if p.topics.len > 0:
|
||||||
asyncCheck p.sendSubs(pubsubPeer, toSeq(p.topics.keys), true)
|
asyncCheck p.sendSubs(pubsubPeer, toSeq(p.topics.keys), true)
|
||||||
|
|
||||||
|
|
|
@ -50,6 +50,7 @@ type
|
||||||
|
|
||||||
score*: float64
|
score*: float64
|
||||||
iWantBudget*: int
|
iWantBudget*: int
|
||||||
|
outbound*: bool
|
||||||
|
|
||||||
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
|
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
{.used.}
|
{.used.}
|
||||||
|
|
||||||
import testgossipinternal,
|
import testfloodsub,
|
||||||
testfloodsub,
|
|
||||||
testgossipsub,
|
testgossipsub,
|
||||||
testmcache,
|
testmcache,
|
||||||
testmessage
|
testmessage
|
||||||
|
|
Loading…
Reference in New Issue