From 38eb36efaee09551e0cd6c1d4530c9abfe9cb322 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Sat, 18 Jul 2020 11:00:44 -0600 Subject: [PATCH 1/2] don't use close event to stop timer (#280) --- libp2p/muxers/mplex/lpchannel.nim | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 43ec0a21a..6fb6221d1 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -80,6 +80,14 @@ template withEOFExceptions(body: untyped): untyped = except LPStreamIncompleteError as exc: trace "incomplete message", exc = exc.msg +proc cleanupTimer(s: LPChannel) {.async.} = + ## cleanup timers + ## + if not(isNil(s.timerFut)) and + not(s.timerFut.finished): + s.timerFut.cancel() + await s.timerTaskFut + proc closeMessage(s: LPChannel) {.async.} = logScope: id = s.id @@ -146,6 +154,8 @@ proc closeRemote*(s: LPChannel) {.async.} = # call to avoid leaks await procCall BufferStream(s).close() # close parent bufferstream + await s.cleanupTimer() + trace "channel closed on EOF" except CancelledError as exc: raise exc @@ -187,6 +197,8 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} = s.isEof = true s.closedLocal = true + await s.cleanupTimer() + except CancelledError as exc: raise exc except CatchableError as exc: @@ -214,6 +226,7 @@ method close*(s: LPChannel) {.async, gcsafe.} = await s.closeMessage().wait(2.minutes) if s.atEof: # already closed by remote close parent buffer immediately await procCall BufferStream(s).close() + await s.cleanupTimer() except CancelledError as exc: await s.reset() raise exc @@ -226,18 +239,6 @@ method close*(s: LPChannel) {.async, gcsafe.} = s.closedLocal = true asyncCheck closeInternal() -proc cleanupOnClose(s: LPChannel) {.async.} = - ## await this stream's close event - ## to cleanup timers and other resources - ## - - await s.closeEvent.wait() - - if not(isNil(s.timerFut)) and - not(s.timerFut.finished): - s.timerFut.cancel() - await s.timerTaskFut - proc timeoutMonitor(s: LPChannel) {.async.} = ## monitor the channel for innactivity ## @@ -337,10 +338,6 @@ proc init*( when chronicles.enabledLogLevel == LogLevel.TRACE: chann.name = if chann.name.len > 0: chann.name else: $chann.oid - # launch task to cancel and cleanup - # timer on stream close - asyncCheck chann.cleanupOnClose() - chann.timerTaskFut = chann.timeoutMonitor() trace "created new lpchannel" From c3af7659b09385aacace2941edfceed15abc6231 Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni Date: Mon, 20 Jul 2020 15:55:00 +0900 Subject: [PATCH 2/2] Add more checks and fix some issues in gossip tests (#281) --- tests/pubsub/testgossipsub.nim | 62 +++++++++++++++++++++++++++++++--- 1 file changed, 58 insertions(+), 4 deletions(-) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 5e7a80c1e..651aa2273 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -87,7 +87,15 @@ suite "GossipSub": nodes[1].addValidator("foobar", validator) tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 + result = (await validatorFut) and (await handlerFut) + + let gossip1 = GossipSub(nodes[0].pubSub.get()) + let gossip2 = GossipSub(nodes[1].pubSub.get()) + check: + gossip1.mesh["foobar"].len == 1 and "foobar" notin gossip1.fanout + gossip2.mesh["foobar"].len == 1 and "foobar" notin gossip2.fanout + await allFuturesThrowing( nodes[0].stop(), nodes[1].stop()) @@ -108,6 +116,7 @@ suite "GossipSub": await subscribeNodes(nodes) + await nodes[0].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler) var validatorFut = newFuture[bool]() @@ -121,6 +130,13 @@ suite "GossipSub": tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 result = await validatorFut + + let gossip1 = GossipSub(nodes[0].pubSub.get()) + let gossip2 = GossipSub(nodes[1].pubSub.get()) + check: + gossip1.mesh["foobar"].len == 1 and "foobar" notin gossip1.fanout + gossip2.mesh["foobar"].len == 1 and "foobar" notin gossip2.fanout + await allFuturesThrowing( nodes[0].stop(), nodes[1].stop()) @@ -161,6 +177,15 @@ suite "GossipSub": tryPublish await nodes[0].publish("bar", "Hello!".toBytes()), 1 result = ((await passed) and (await failed) and (await handlerFut)) + + let gossip1 = GossipSub(nodes[0].pubSub.get()) + let gossip2 = GossipSub(nodes[1].pubSub.get()) + check: + "foo" notin gossip1.mesh and gossip1.fanout["foo"].len == 1 + "foo" notin gossip2.mesh and "foo" notin gossip2.fanout + "bar" notin gossip1.mesh and gossip1.fanout["bar"].len == 1 + "bar" notin gossip2.mesh and "bar" notin gossip2.fanout + await allFuturesThrowing( nodes[0].stop(), nodes[1].stop()) @@ -281,10 +306,13 @@ suite "GossipSub": tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 - var gossipSub1: GossipSub = GossipSub(nodes[0].pubSub.get()) + var gossip1: GossipSub = GossipSub(nodes[0].pubSub.get()) + var gossip2: GossipSub = GossipSub(nodes[1].pubSub.get()) check: - "foobar" in gossipSub1.gossipsub + "foobar" in gossip1.gossipsub + gossip1.fanout.hasPeerID("foobar", gossip2.peerInfo.id) + not gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.id) await passed.wait(2.seconds) @@ -314,6 +342,7 @@ suite "GossipSub": await subscribeNodes(nodes) + await nodes[0].subscribe("foobar", handler) await nodes[1].subscribe("foobar", handler) await waitSub(nodes[0], nodes[1], "foobar") @@ -321,6 +350,17 @@ suite "GossipSub": result = await passed + var gossip1: GossipSub = GossipSub(nodes[0].pubSub.get()) + var gossip2: GossipSub = GossipSub(nodes[1].pubSub.get()) + + check: + "foobar" in gossip1.gossipsub + "foobar" in gossip2.gossipsub + gossip1.mesh.hasPeerID("foobar", gossip2.peerInfo.id) + not gossip1.fanout.hasPeerID("foobar", gossip2.peerInfo.id) + gossip2.mesh.hasPeerID("foobar", gossip1.peerInfo.id) + not gossip2.fanout.hasPeerID("foobar", gossip1.peerInfo.id) + await nodes[0].stop() await nodes[1].stop() await allFuturesThrowing(wait) @@ -371,6 +411,13 @@ suite "GossipSub": for k, v in seen.pairs: check: v >= 1 + for node in nodes: + var gossip: GossipSub = GossipSub(node.pubSub.get()) + check: + "foobar" in gossip.gossipsub + gossip.fanout.len == 0 + gossip.mesh["foobar"].len > 0 + await allFuturesThrowing(nodes.mapIt(it.stop())) await allFuturesThrowing(awaitters) result = true @@ -390,7 +437,7 @@ suite "GossipSub": secureManagers = [SecureProtocol.Secio]) awaitters.add((await nodes[i].start())) - await subscribeSparseNodes(nodes, 1) # TODO: figure out better sparse mesh + await subscribeSparseNodes(nodes) var seen: Table[string, int] var subs: seq[Future[void]] @@ -415,12 +462,19 @@ suite "GossipSub": tryPublish await wait(nodes[0].publish("foobar", cast[seq[byte]]("from node " & nodes[1].peerInfo.id)), - 1.minutes), 3, 5.seconds + 1.minutes), 2, 5.seconds await wait(seenFut, 5.minutes) check: seen.len >= runs for k, v in seen.pairs: check: v >= 1 + + for node in nodes: + var gossip: GossipSub = GossipSub(node.pubSub.get()) + check: + "foobar" in gossip.gossipsub + gossip.fanout.len == 0 + gossip.mesh["foobar"].len > 0 await allFuturesThrowing(nodes.mapIt(it.stop())) await allFuturesThrowing(awaitters)