From 94e672ead0740c56adedfe6afecde0dfbb832bc9 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 1 Dec 2020 02:44:21 -0600 Subject: [PATCH] allow concurrent closeWithEOF (#466) * allow concurrent closeWithEOF * add dedicated closedWithEOF flag --- libp2p/stream/lpstream.nim | 11 ++++++++++ tests/testswitch.nim | 41 +++++++++++++++++++++++++++++++++++++- 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/libp2p/stream/lpstream.nim b/libp2p/stream/lpstream.nim index 89f199ff0..faebf473d 100644 --- a/libp2p/stream/lpstream.nim +++ b/libp2p/stream/lpstream.nim @@ -38,6 +38,7 @@ type objName*: string oid*: Oid dir*: Direction + closedWithEOF: bool # prevent concurrent calls LPStreamError* = object of CatchableError LPStreamIncompleteError* = object of LPStreamError @@ -280,6 +281,16 @@ proc closeWithEOF*(s: LPStream): Future[void] {.async.} = ## ## In particular, it must not be used when there is another concurrent read ## ongoing (which may be the case during cancellations)! + ## + + trace "Closing with EOF", s + if s.closedWithEOF: + trace "Already closed" + return + + # prevent any further calls to avoid triggering + # reading the stream twice (which should assert) + s.closedWithEOF = true await s.close() if s.atEof(): diff --git a/tests/testswitch.nim b/tests/testswitch.nim index 6e3ab23f4..4786f85ad 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -212,7 +212,6 @@ suite "Switch": check not switch1.isConnected(switch2.peerInfo) check not switch2.isConnected(switch1.peerInfo) - asyncTest "e2e should not leak on peer disconnect": var awaiters: seq[Future[void]] @@ -698,6 +697,46 @@ suite "Switch": # this needs to go at end await allFuturesThrowing(awaiters) + asyncTest "e2e calling closeWithEOF on the same stream should not assert": + let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + + proc handle(conn: Connection, proto: string) {.async, gcsafe.} = + discard await conn.readLp(100) + + let testProto = new TestProto + testProto.codec = TestCodec + testProto.handler = handle + + let switch1 = newStandardSwitch(secureManagers = [SecureProtocol.Noise]) + switch1.mount(testProto) + + let switch2 = newStandardSwitch(secureManagers = [SecureProtocol.Noise]) + + var awaiters: seq[Future[void]] + awaiters.add(await switch1.start()) + + var peerId = PeerID.init(PrivateKey.random(ECDSA, rng[]).get()).get() + let conn = await switch2.dial(switch1.peerInfo, TestCodec) + + proc closeReader() {.async.} = + await conn.closeWithEOF() + + var readers: seq[Future[void]] + for i in 0..10: + readers.add(closeReader()) + + await allFuturesThrowing(readers) + checkTracker(LPChannelTrackerName) + checkTracker(SecureConnTrackerName) + checkTracker(ChronosStreamTrackerName) + + await allFuturesThrowing( + switch1.stop(), + switch2.stop()) + + # this needs to go at end + await allFuturesThrowing(awaiters) + asyncTest "connect to inexistent peer": let switch2 = newStandardSwitch(secureManagers = [SecureProtocol.Noise]) let sfut = await switch2.start()