diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 4de07383e..2c75f21bd 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -138,6 +138,8 @@ type parameters*: GossipSubParams topicParams*: Table[string, TopicParams] + heartbeatEvents*: seq[AsyncEvent] + when not defined(release): prunedPeers: HashSet[PubSubPeer] @@ -549,7 +551,7 @@ proc updateScores(g: GossipSub) = # avoid async proc heartbeat(g: GossipSub) {.async.} = while g.heartbeatRunning: try: - trace "running heartbeat" + trace "running heartbeat", instance = cast[int](g) g.updateScores() @@ -586,6 +588,10 @@ proc heartbeat(g: GossipSub) {.async.} = warn "exception ocurred in gossipsub heartbeat", exc = exc.msg, trace = exc.getStackTrace() assert(false, "exception ocurred in gossipsub heartbeat") + for trigger in g.heartbeatEvents: + trace "firing heartbeat event", instance = cast[int](g) + trigger.fire() + await sleepAsync(GossipSubHeartbeatInterval) method handleDisconnect*(g: GossipSub, peer: PubSubPeer) = diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 6ef20f760..b918913bb 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -32,6 +32,13 @@ proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} = # peers can be inside `mesh` and `fanout`, not just `gossipsub` var ceil = 15 let fsub = GossipSub(sender.pubSub.get()) + let ev = newAsyncEvent() + fsub.heartbeatEvents.add(ev) + + # await first heartbeat + await ev.wait() + ev.clear() + while (not fsub.gossipsub.hasKey(key) or not fsub.gossipsub.hasPeerID(key, receiver.peerInfo.id)) and (not fsub.mesh.hasKey(key) or @@ -39,7 +46,11 @@ proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} = (not fsub.fanout.hasKey(key) or not fsub.fanout.hasPeerID(key , receiver.peerInfo.id)): trace "waitSub sleeping..." - await sleepAsync(1.seconds) + + # await more heartbeats + await ev.wait() + ev.clear() + dec ceil doAssert(ceil > 0, "waitSub timeout!") @@ -77,6 +88,12 @@ suite "GossipSub": await nodes[0].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler) + var subs: seq[Future[void]] + subs &= waitSub(nodes[1], nodes[0], "foobar") + subs &= waitSub(nodes[0], nodes[1], "foobar") + + await allFuturesThrowing(subs) + var validatorFut = newFuture[bool]() proc validator(topic: string, message: Message): @@ -121,6 +138,19 @@ suite "GossipSub": await nodes[0].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler) + var subs: seq[Future[void]] + subs &= waitSub(nodes[1], nodes[0], "foobar") + subs &= waitSub(nodes[0], nodes[1], "foobar") + + await allFuturesThrowing(subs) + + let gossip1 = GossipSub(nodes[0].pubSub.get()) + let gossip2 = GossipSub(nodes[1].pubSub.get()) + + check: + gossip1.mesh["foobar"].len == 1 and "foobar" notin gossip1.fanout + gossip2.mesh["foobar"].len == 1 and "foobar" notin gossip2.fanout + var validatorFut = newFuture[bool]() proc validator(topic: string, message: Message): @@ -133,11 +163,21 @@ suite "GossipSub": result = await validatorFut - let gossip1 = GossipSub(nodes[0].pubSub.get()) - let gossip2 = GossipSub(nodes[1].pubSub.get()) + # gossip 1.1, gossip1 peer with negative score will be pruned in gossip2, + # and so mesh will be empty + # wait 2 heartbeats + let ev1 = newAsyncEvent() + gossip1.heartbeatEvents.add(ev1) + let ev2 = newAsyncEvent() + gossip2.heartbeatEvents.add(ev2) + for _ in 0..1: + await allFuturesThrowing(ev1.wait(), ev2.wait()) + ev1.clear() + ev2.clear() + check: gossip1.mesh["foobar"].len == 1 and "foobar" notin gossip1.fanout - gossip2.mesh["foobar"].len == 1 and "foobar" notin gossip2.fanout + "foobar" notin gossip2.mesh and "foobar" notin gossip2.fanout await allFuturesThrowing( nodes[0].stop(), @@ -418,7 +458,7 @@ suite "GossipSub": subs &= dialer.subscribe("foobar", handler) - await allFuturesThrowing(subs) + await allFuturesThrowing(subs).wait(30.seconds) tryPublish await wait(nodes[0].publish("foobar", cast[seq[byte]]("from node " &