Merge branch 'master' into gossip-one-one

This commit is contained in:
Giovanni Petrantoni 2020-07-20 15:57:54 +09:00
commit 81f7413e87
2 changed files with 71 additions and 20 deletions

View File

@ -80,6 +80,14 @@ template withEOFExceptions(body: untyped): untyped =
except LPStreamIncompleteError as exc:
trace "incomplete message", exc = exc.msg
proc cleanupTimer(s: LPChannel) {.async.} =
## cleanup timers
##
if not(isNil(s.timerFut)) and
not(s.timerFut.finished):
s.timerFut.cancel()
await s.timerTaskFut
proc closeMessage(s: LPChannel) {.async.} =
logScope:
id = s.id
@ -146,6 +154,8 @@ proc closeRemote*(s: LPChannel) {.async.} =
# call to avoid leaks
await procCall BufferStream(s).close() # close parent bufferstream
await s.cleanupTimer()
trace "channel closed on EOF"
except CancelledError as exc:
raise exc
@ -187,6 +197,8 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} =
s.isEof = true
s.closedLocal = true
await s.cleanupTimer()
except CancelledError as exc:
raise exc
except CatchableError as exc:
@ -214,6 +226,7 @@ method close*(s: LPChannel) {.async, gcsafe.} =
await s.closeMessage().wait(2.minutes)
if s.atEof: # already closed by remote close parent buffer immediately
await procCall BufferStream(s).close()
await s.cleanupTimer()
except CancelledError as exc:
await s.reset()
raise exc
@ -226,18 +239,6 @@ method close*(s: LPChannel) {.async, gcsafe.} =
s.closedLocal = true
asyncCheck closeInternal()
proc cleanupOnClose(s: LPChannel) {.async.} =
## await this stream's close event
## to cleanup timers and other resources
##
await s.closeEvent.wait()
if not(isNil(s.timerFut)) and
not(s.timerFut.finished):
s.timerFut.cancel()
await s.timerTaskFut
proc timeoutMonitor(s: LPChannel) {.async.} =
## monitor the channel for innactivity
##
@ -337,10 +338,6 @@ proc init*(
when chronicles.enabledLogLevel == LogLevel.TRACE:
chann.name = if chann.name.len > 0: chann.name else: $chann.oid
# launch task to cancel and cleanup
# timer on stream close
asyncCheck chann.cleanupOnClose()
chann.timerTaskFut = chann.timeoutMonitor()
trace "created new lpchannel"

View File

@ -87,7 +87,15 @@ suite "GossipSub":
nodes[1].addValidator("foobar", validator)
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
result = (await validatorFut) and (await handlerFut)
let gossip1 = GossipSub(nodes[0].pubSub.get())
let gossip2 = GossipSub(nodes[1].pubSub.get())
check:
gossip1.mesh["foobar"].len == 1 and "foobar" notin gossip1.fanout
gossip2.mesh["foobar"].len == 1 and "foobar" notin gossip2.fanout
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop())
@ -108,6 +116,7 @@ suite "GossipSub":
await subscribeNodes(nodes)
await nodes[0].subscribe("foobar", handler)
await nodes[1].subscribe("foobar", handler)
var validatorFut = newFuture[bool]()
@ -121,6 +130,13 @@ suite "GossipSub":
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
result = await validatorFut
let gossip1 = GossipSub(nodes[0].pubSub.get())
let gossip2 = GossipSub(nodes[1].pubSub.get())
check:
gossip1.mesh["foobar"].len == 1 and "foobar" notin gossip1.fanout
gossip2.mesh["foobar"].len == 1 and "foobar" notin gossip2.fanout
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop())
@ -161,6 +177,15 @@ suite "GossipSub":
tryPublish await nodes[0].publish("bar", "Hello!".toBytes()), 1
result = ((await passed) and (await failed) and (await handlerFut))
let gossip1 = GossipSub(nodes[0].pubSub.get())
let gossip2 = GossipSub(nodes[1].pubSub.get())
check:
"foo" notin gossip1.mesh and gossip1.fanout["foo"].len == 1
"foo" notin gossip2.mesh and "foo" notin gossip2.fanout
"bar" notin gossip1.mesh and gossip1.fanout["bar"].len == 1
"bar" notin gossip2.mesh and "bar" notin gossip2.fanout
await allFuturesThrowing(
nodes[0].stop(),
nodes[1].stop())
@ -281,10 +306,13 @@ suite "GossipSub":
tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1
var gossipSub1: GossipSub = GossipSub(nodes[0].pubSub.get())
var gossip1: GossipSub = GossipSub(nodes[0].pubSub.get())
var gossip2: GossipSub = GossipSub(nodes[1].pubSub.get())
check:
"foobar" in gossipSub1.gossipsub
"foobar" in gossip1.gossipsub
gossip1.fanout.hasPeerID("foobar", gossip2.peerInfo.id)
not gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.id)
await passed.wait(2.seconds)
@ -318,6 +346,7 @@ suite "GossipSub":
await subscribeNodes(nodes)
await nodes[0].subscribe("foobar", handler)
await nodes[1].subscribe("foobar", handler)
await waitSub(nodes[0], nodes[1], "foobar")
@ -325,6 +354,17 @@ suite "GossipSub":
result = await passed
var gossip1: GossipSub = GossipSub(nodes[0].pubSub.get())
var gossip2: GossipSub = GossipSub(nodes[1].pubSub.get())
check:
"foobar" in gossip1.gossipsub
"foobar" in gossip2.gossipsub
gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.id)
not gossip1.fanout.hasPeerID("foobar", gossip2.peerInfo.id)
gossip2.mesh.hasPeerID("foobar", gossip1.peerInfo.id)
not gossip2.fanout.hasPeerID("foobar", gossip1.peerInfo.id)
await nodes[0].stop()
await nodes[1].stop()
await allFuturesThrowing(wait)
@ -375,6 +415,13 @@ suite "GossipSub":
for k, v in seen.pairs:
check: v >= 1
for node in nodes:
var gossip: GossipSub = GossipSub(node.pubSub.get())
check:
"foobar" in gossip.gossipsub
gossip.fanout.len == 0
gossip.mesh["foobar"].len > 0
await allFuturesThrowing(nodes.mapIt(it.stop()))
await allFuturesThrowing(awaitters)
result = true
@ -394,7 +441,7 @@ suite "GossipSub":
secureManagers = [SecureProtocol.Secio])
awaitters.add((await nodes[i].start()))
await subscribeSparseNodes(nodes, 1) # TODO: figure out better sparse mesh
await subscribeSparseNodes(nodes)
var seen: Table[string, int]
var subs: seq[Future[void]]
@ -419,13 +466,20 @@ suite "GossipSub":
tryPublish await wait(nodes[0].publish("foobar",
cast[seq[byte]]("from node " &
nodes[1].peerInfo.id)),
1.minutes), 3, 5.seconds
1.minutes), 2, 5.seconds
await wait(seenFut, 5.minutes)
check: seen.len >= runs
for k, v in seen.pairs:
check: v >= 1
for node in nodes:
var gossip: GossipSub = GossipSub(node.pubSub.get())
check:
"foobar" in gossip.gossipsub
gossip.fanout.len == 0
gossip.mesh["foobar"].len > 0
await allFuturesThrowing(nodes.mapIt(it.stop()))
await allFuturesThrowing(awaitters)
result = true