Gossipsub: Rebalance mesh immediately when peer sub/unsub (#719)
This commit is contained in:
parent
543358b262
commit
3ffc03ed16
|
@ -435,6 +435,13 @@ method rpcHandler*(g: GossipSub,
|
|||
if rpcMsg.control.isSome():
|
||||
g.handleControl(peer, rpcMsg.control.unsafeGet())
|
||||
|
||||
# Now, check subscription to update the meshes if required
|
||||
for i in 0..<min(g.topicsHigh, rpcMsg.subscriptions.len):
|
||||
let topic = rpcMsg.subscriptions[i].topic
|
||||
if topic in g.topics and g.mesh.peers(topic) < g.parameters.dLow:
|
||||
# rebalance but don't update metrics here, we do that only in the heartbeat
|
||||
g.rebalanceMesh(topic, metrics = nil)
|
||||
|
||||
g.updateMetrics(rpcMsg)
|
||||
|
||||
method onTopicSubscription*(g: GossipSub, topic: string, subscribed: bool) =
|
||||
|
|
|
@ -32,18 +32,11 @@ proc `$`(peer: PubSubPeer): string = shortLog(peer)
|
|||
proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
|
||||
if sender == receiver:
|
||||
return
|
||||
# turn things deterministic
|
||||
let timeout = Moment.now() + 5.seconds
|
||||
let fsub = GossipSub(sender)
|
||||
|
||||
# this is for testing purposes only
|
||||
# peers can be inside `mesh` and `fanout`, not just `gossipsub`
|
||||
var ceil = 15
|
||||
let fsub = GossipSub(sender)
|
||||
let ev = newAsyncEvent()
|
||||
fsub.heartbeatEvents.add(ev)
|
||||
|
||||
# await first heartbeat
|
||||
await ev.wait()
|
||||
ev.clear()
|
||||
|
||||
while (not fsub.gossipsub.hasKey(key) or
|
||||
not fsub.gossipsub.hasPeerId(key, receiver.peerInfo.peerId)) and
|
||||
(not fsub.mesh.hasKey(key) or
|
||||
|
@ -52,23 +45,19 @@ proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
|
|||
not fsub.fanout.hasPeerId(key , receiver.peerInfo.peerId)):
|
||||
trace "waitSub sleeping..."
|
||||
|
||||
# await more heartbeats
|
||||
await ev.wait()
|
||||
ev.clear()
|
||||
# await
|
||||
await sleepAsync(5.milliseconds)
|
||||
doAssert Moment.now() < timeout, "waitSub timeout!"
|
||||
|
||||
dec ceil
|
||||
doAssert(ceil > 0, "waitSub timeout!")
|
||||
|
||||
template tryPublish(call: untyped, require: int, wait: Duration = 1.seconds, times: int = 10): untyped =
|
||||
template tryPublish(call: untyped, require: int, wait = 10.milliseconds, timeout = 5.seconds): untyped =
|
||||
var
|
||||
limit = times
|
||||
expiration = Moment.now() + timeout
|
||||
pubs = 0
|
||||
while pubs < require and limit > 0:
|
||||
while pubs < require and Moment.now() < expiration:
|
||||
pubs = pubs + call
|
||||
await sleepAsync(wait)
|
||||
limit.dec()
|
||||
if limit == 0:
|
||||
doAssert(false, "Failed to publish!")
|
||||
|
||||
doAssert pubs >= require, "Failed to publish!"
|
||||
|
||||
suite "GossipSub":
|
||||
teardown:
|
||||
|
@ -343,15 +332,15 @@ suite "GossipSub":
|
|||
await subscribeNodes(nodes)
|
||||
|
||||
nodes[1].subscribe("foobar", handler)
|
||||
await sleepAsync(10.seconds)
|
||||
|
||||
let gossip1 = GossipSub(nodes[0])
|
||||
let gossip2 = GossipSub(nodes[1])
|
||||
|
||||
check:
|
||||
"foobar" in gossip2.topics
|
||||
"foobar" in gossip1.gossipsub
|
||||
check await checkExpiring(
|
||||
"foobar" in gossip2.topics and
|
||||
"foobar" in gossip1.gossipsub and
|
||||
gossip1.gossipsub.hasPeerId("foobar", gossip2.peerInfo.peerId)
|
||||
)
|
||||
|
||||
await allFuturesThrowing(
|
||||
nodes[0].switch.stop(),
|
||||
|
@ -706,7 +695,7 @@ suite "GossipSub":
|
|||
tryPublish await wait(nodes[0].publish("foobar",
|
||||
toBytes("from node " &
|
||||
$nodes[0].peerInfo.peerId)),
|
||||
1.minutes), 1, 5.seconds
|
||||
1.minutes), 1
|
||||
|
||||
await wait(seenFut, 2.minutes)
|
||||
check: seen.len >= runs
|
||||
|
@ -756,7 +745,7 @@ suite "GossipSub":
|
|||
tryPublish await wait(nodes[0].publish("foobar",
|
||||
toBytes("from node " &
|
||||
$nodes[0].peerInfo.peerId)),
|
||||
1.minutes), 1, 5.seconds
|
||||
1.minutes), 1
|
||||
|
||||
await wait(seenFut, 5.minutes)
|
||||
check: seen.len >= runs
|
||||
|
|
|
@ -28,18 +28,11 @@ import ../helpers
|
|||
proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
|
||||
if sender == receiver:
|
||||
return
|
||||
# turn things deterministic
|
||||
let timeout = Moment.now() + 5.seconds
|
||||
let fsub = GossipSub(sender)
|
||||
|
||||
# this is for testing purposes only
|
||||
# peers can be inside `mesh` and `fanout`, not just `gossipsub`
|
||||
var ceil = 15
|
||||
let fsub = GossipSub(sender)
|
||||
let ev = newAsyncEvent()
|
||||
fsub.heartbeatEvents.add(ev)
|
||||
|
||||
# await first heartbeat
|
||||
await ev.wait()
|
||||
ev.clear()
|
||||
|
||||
while (not fsub.gossipsub.hasKey(key) or
|
||||
not fsub.gossipsub.hasPeerId(key, receiver.peerInfo.peerId)) and
|
||||
(not fsub.mesh.hasKey(key) or
|
||||
|
@ -48,23 +41,19 @@ proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
|
|||
not fsub.fanout.hasPeerId(key , receiver.peerInfo.peerId)):
|
||||
trace "waitSub sleeping..."
|
||||
|
||||
# await more heartbeats
|
||||
await ev.wait()
|
||||
ev.clear()
|
||||
# await
|
||||
await sleepAsync(5.milliseconds)
|
||||
doAssert Moment.now() < timeout, "waitSub timeout!"
|
||||
|
||||
dec ceil
|
||||
doAssert(ceil > 0, "waitSub timeout!")
|
||||
|
||||
template tryPublish(call: untyped, require: int, wait: Duration = 1.seconds, times: int = 10): untyped =
|
||||
template tryPublish(call: untyped, require: int, wait = 10.milliseconds, timeout = 10.seconds): untyped =
|
||||
var
|
||||
limit = times
|
||||
expiration = Moment.now() + timeout
|
||||
pubs = 0
|
||||
while pubs < require and limit > 0:
|
||||
while pubs < require and Moment.now() < expiration:
|
||||
pubs = pubs + call
|
||||
await sleepAsync(wait)
|
||||
limit.dec()
|
||||
if limit == 0:
|
||||
doAssert(false, "Failed to publish!")
|
||||
|
||||
doAssert pubs >= require, "Failed to publish!"
|
||||
|
||||
suite "GossipSub":
|
||||
teardown:
|
||||
|
@ -314,7 +303,7 @@ suite "GossipSub":
|
|||
tryPublish await wait(nodes[0].publish("foobar",
|
||||
toBytes("from node " &
|
||||
$nodes[0].peerInfo.peerId)),
|
||||
1.minutes), 1, 5.seconds
|
||||
1.minutes), 1, 5.seconds, 3.minutes
|
||||
|
||||
await wait(seenFut, 5.minutes)
|
||||
check: seen.len >= runs
|
||||
|
@ -337,10 +326,8 @@ suite "GossipSub":
|
|||
# Waiting 2 heartbeats
|
||||
|
||||
for _ in 0..1:
|
||||
for i in 0..<runs:
|
||||
if i mod 3 == 0:
|
||||
let evnt = newAsyncEvent()
|
||||
GossipSub(nodes[i]).heartbeatEvents &= evnt
|
||||
GossipSub(nodes[0]).heartbeatEvents &= evnt
|
||||
await evnt.wait()
|
||||
|
||||
# ensure peer stats are stored properly and kept properly
|
||||
|
@ -359,10 +346,8 @@ suite "GossipSub":
|
|||
# Waiting 2 heartbeats
|
||||
|
||||
for _ in 0..1:
|
||||
for i in 0..<runs:
|
||||
if i mod 3 == 0:
|
||||
let evnt = newAsyncEvent()
|
||||
GossipSub(nodes[i]).heartbeatEvents &= evnt
|
||||
GossipSub(nodes[0]).heartbeatEvents &= evnt
|
||||
await evnt.wait()
|
||||
|
||||
# ensure peer stats are stored properly and kept properly
|
||||
|
|
Loading…
Reference in New Issue