From 3ffc03ed161d3a7741388d3ff302f7885fdf854b Mon Sep 17 00:00:00 2001 From: Tanguy Date: Fri, 2 Sep 2022 10:24:54 +0200 Subject: [PATCH] Gossipsub: Rebalance mesh immediately when peer sub/unsub (#719) --- libp2p/protocols/pubsub/gossipsub.nim | 7 ++++ tests/pubsub/testgossipsub.nim | 45 +++++++++-------------- tests/pubsub/testgossipsub2.nim | 51 ++++++++++----------------- 3 files changed, 42 insertions(+), 61 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 9d816795e..fb1873c8c 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -435,6 +435,13 @@ method rpcHandler*(g: GossipSub, if rpcMsg.control.isSome(): g.handleControl(peer, rpcMsg.control.unsafeGet()) + # Now, check subscription to update the meshes if required + for i in 0.. 0, "waitSub timeout!") - -template tryPublish(call: untyped, require: int, wait: Duration = 1.seconds, times: int = 10): untyped = +template tryPublish(call: untyped, require: int, wait = 10.milliseconds, timeout = 5.seconds): untyped = var - limit = times + expiration = Moment.now() + timeout pubs = 0 - while pubs < require and limit > 0: + while pubs < require and Moment.now() < expiration: pubs = pubs + call await sleepAsync(wait) - limit.dec() - if limit == 0: - doAssert(false, "Failed to publish!") + + doAssert pubs >= require, "Failed to publish!" suite "GossipSub": teardown: @@ -343,15 +332,15 @@ suite "GossipSub": await subscribeNodes(nodes) nodes[1].subscribe("foobar", handler) - await sleepAsync(10.seconds) let gossip1 = GossipSub(nodes[0]) let gossip2 = GossipSub(nodes[1]) - check: - "foobar" in gossip2.topics - "foobar" in gossip1.gossipsub + check await checkExpiring( + "foobar" in gossip2.topics and + "foobar" in gossip1.gossipsub and gossip1.gossipsub.hasPeerId("foobar", gossip2.peerInfo.peerId) + ) await allFuturesThrowing( nodes[0].switch.stop(), @@ -706,7 +695,7 @@ suite "GossipSub": tryPublish await wait(nodes[0].publish("foobar", toBytes("from node " & $nodes[0].peerInfo.peerId)), - 1.minutes), 1, 5.seconds + 1.minutes), 1 await wait(seenFut, 2.minutes) check: seen.len >= runs @@ -756,7 +745,7 @@ suite "GossipSub": tryPublish await wait(nodes[0].publish("foobar", toBytes("from node " & $nodes[0].peerInfo.peerId)), - 1.minutes), 1, 5.seconds + 1.minutes), 1 await wait(seenFut, 5.minutes) check: seen.len >= runs diff --git a/tests/pubsub/testgossipsub2.nim b/tests/pubsub/testgossipsub2.nim index fdb663cfc..3f21196e0 100644 --- a/tests/pubsub/testgossipsub2.nim +++ b/tests/pubsub/testgossipsub2.nim @@ -28,18 +28,11 @@ import ../helpers proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} = if sender == receiver: return - # turn things deterministic + let timeout = Moment.now() + 5.seconds + let fsub = GossipSub(sender) + # this is for testing purposes only # peers can be inside `mesh` and `fanout`, not just `gossipsub` - var ceil = 15 - let fsub = GossipSub(sender) - let ev = newAsyncEvent() - fsub.heartbeatEvents.add(ev) - - # await first heartbeat - await ev.wait() - ev.clear() - while (not fsub.gossipsub.hasKey(key) or not fsub.gossipsub.hasPeerId(key, receiver.peerInfo.peerId)) and (not fsub.mesh.hasKey(key) or @@ -48,23 +41,19 @@ proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} = not fsub.fanout.hasPeerId(key , receiver.peerInfo.peerId)): trace "waitSub sleeping..." - # await more heartbeats - await ev.wait() - ev.clear() + # await + await sleepAsync(5.milliseconds) + doAssert Moment.now() < timeout, "waitSub timeout!" - dec ceil - doAssert(ceil > 0, "waitSub timeout!") - -template tryPublish(call: untyped, require: int, wait: Duration = 1.seconds, times: int = 10): untyped = +template tryPublish(call: untyped, require: int, wait = 10.milliseconds, timeout = 10.seconds): untyped = var - limit = times + expiration = Moment.now() + timeout pubs = 0 - while pubs < require and limit > 0: + while pubs < require and Moment.now() < expiration: pubs = pubs + call await sleepAsync(wait) - limit.dec() - if limit == 0: - doAssert(false, "Failed to publish!") + + doAssert pubs >= require, "Failed to publish!" suite "GossipSub": teardown: @@ -314,7 +303,7 @@ suite "GossipSub": tryPublish await wait(nodes[0].publish("foobar", toBytes("from node " & $nodes[0].peerInfo.peerId)), - 1.minutes), 1, 5.seconds + 1.minutes), 1, 5.seconds, 3.minutes await wait(seenFut, 5.minutes) check: seen.len >= runs @@ -337,11 +326,9 @@ suite "GossipSub": # Waiting 2 heartbeats for _ in 0..1: - for i in 0..