This commit is contained in:
Giovanni Petrantoni 2020-07-29 09:41:34 +09:00
parent a34eee7ed4
commit 7946743ae5
2 changed files with 52 additions and 6 deletions

View File

@ -138,6 +138,8 @@ type
parameters*: GossipSubParams parameters*: GossipSubParams
topicParams*: Table[string, TopicParams] topicParams*: Table[string, TopicParams]
heartbeatEvents*: seq[AsyncEvent]
when not defined(release): when not defined(release):
prunedPeers: HashSet[PubSubPeer] prunedPeers: HashSet[PubSubPeer]
@ -549,7 +551,7 @@ proc updateScores(g: GossipSub) = # avoid async
proc heartbeat(g: GossipSub) {.async.} = proc heartbeat(g: GossipSub) {.async.} =
while g.heartbeatRunning: while g.heartbeatRunning:
try: try:
trace "running heartbeat" trace "running heartbeat", instance = cast[int](g)
g.updateScores() g.updateScores()
@ -586,6 +588,10 @@ proc heartbeat(g: GossipSub) {.async.} =
warn "exception ocurred in gossipsub heartbeat", exc = exc.msg, trace = exc.getStackTrace() warn "exception ocurred in gossipsub heartbeat", exc = exc.msg, trace = exc.getStackTrace()
assert(false, "exception ocurred in gossipsub heartbeat") assert(false, "exception ocurred in gossipsub heartbeat")
for trigger in g.heartbeatEvents:
trace "firing heartbeat event", instance = cast[int](g)
trigger.fire()
await sleepAsync(GossipSubHeartbeatInterval) await sleepAsync(GossipSubHeartbeatInterval)
method handleDisconnect*(g: GossipSub, peer: PubSubPeer) = method handleDisconnect*(g: GossipSub, peer: PubSubPeer) =

View File

@ -32,6 +32,13 @@ proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
# peers can be inside `mesh` and `fanout`, not just `gossipsub` # peers can be inside `mesh` and `fanout`, not just `gossipsub`
var ceil = 15 var ceil = 15
let fsub = GossipSub(sender.pubSub.get()) let fsub = GossipSub(sender.pubSub.get())
let ev = newAsyncEvent()
fsub.heartbeatEvents.add(ev)
# await first heartbeat
await ev.wait()
ev.clear()
while (not fsub.gossipsub.hasKey(key) or while (not fsub.gossipsub.hasKey(key) or
not fsub.gossipsub.hasPeerID(key, receiver.peerInfo.id)) and not fsub.gossipsub.hasPeerID(key, receiver.peerInfo.id)) and
(not fsub.mesh.hasKey(key) or (not fsub.mesh.hasKey(key) or
@ -39,7 +46,11 @@ proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
(not fsub.fanout.hasKey(key) or (not fsub.fanout.hasKey(key) or
not fsub.fanout.hasPeerID(key , receiver.peerInfo.id)): not fsub.fanout.hasPeerID(key , receiver.peerInfo.id)):
trace "waitSub sleeping..." trace "waitSub sleeping..."
await sleepAsync(1.seconds)
# await more heartbeats
await ev.wait()
ev.clear()
dec ceil dec ceil
doAssert(ceil > 0, "waitSub timeout!") doAssert(ceil > 0, "waitSub timeout!")
@ -77,6 +88,12 @@ suite "GossipSub":
await nodes[0].subscribe("foobar", handler) await nodes[0].subscribe("foobar", handler)
await nodes[1].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler)
var subs: seq[Future[void]]
subs &= waitSub(nodes[1], nodes[0], "foobar")
subs &= waitSub(nodes[0], nodes[1], "foobar")
await allFuturesThrowing(subs)
var validatorFut = newFuture[bool]() var validatorFut = newFuture[bool]()
proc validator(topic: string, proc validator(topic: string,
message: Message): message: Message):
@ -121,6 +138,19 @@ suite "GossipSub":
await nodes[0].subscribe("foobar", handler) await nodes[0].subscribe("foobar", handler)
await nodes[1].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler)
var subs: seq[Future[void]]
subs &= waitSub(nodes[1], nodes[0], "foobar")
subs &= waitSub(nodes[0], nodes[1], "foobar")
await allFuturesThrowing(subs)
let gossip1 = GossipSub(nodes[0].pubSub.get())
let gossip2 = GossipSub(nodes[1].pubSub.get())
check:
gossip1.mesh["foobar"].len == 1 and "foobar" notin gossip1.fanout
gossip2.mesh["foobar"].len == 1 and "foobar" notin gossip2.fanout
var validatorFut = newFuture[bool]() var validatorFut = newFuture[bool]()
proc validator(topic: string, proc validator(topic: string,
message: Message): message: Message):
@ -133,11 +163,21 @@ suite "GossipSub":
result = await validatorFut result = await validatorFut
let gossip1 = GossipSub(nodes[0].pubSub.get()) # gossip 1.1, gossip1 peer with negative score will be pruned in gossip2,
let gossip2 = GossipSub(nodes[1].pubSub.get()) # and so mesh will be empty
# wait 2 heartbeats
let ev1 = newAsyncEvent()
gossip1.heartbeatEvents.add(ev1)
let ev2 = newAsyncEvent()
gossip2.heartbeatEvents.add(ev2)
for _ in 0..1:
await allFuturesThrowing(ev1.wait(), ev2.wait())
ev1.clear()
ev2.clear()
check: check:
gossip1.mesh["foobar"].len == 1 and "foobar" notin gossip1.fanout gossip1.mesh["foobar"].len == 1 and "foobar" notin gossip1.fanout
gossip2.mesh["foobar"].len == 1 and "foobar" notin gossip2.fanout "foobar" notin gossip2.mesh and "foobar" notin gossip2.fanout
await allFuturesThrowing( await allFuturesThrowing(
nodes[0].stop(), nodes[0].stop(),
@ -418,7 +458,7 @@ suite "GossipSub":
subs &= dialer.subscribe("foobar", handler) subs &= dialer.subscribe("foobar", handler)
await allFuturesThrowing(subs) await allFuturesThrowing(subs).wait(30.seconds)
tryPublish await wait(nodes[0].publish("foobar", tryPublish await wait(nodes[0].publish("foobar",
cast[seq[byte]]("from node " & cast[seq[byte]]("from node " &