allow concurrent closeWithEOF (#466)

* allow concurrent closeWithEOF

* add dedicated closedWithEOF flag
This commit is contained in:
Dmitriy Ryajov 2020-12-01 02:44:21 -06:00 committed by GitHub
parent 5c2a54bdd9
commit 94e672ead0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 51 additions and 1 deletions

View File

@ -38,6 +38,7 @@ type
objName*: string objName*: string
oid*: Oid oid*: Oid
dir*: Direction dir*: Direction
closedWithEOF: bool # prevent concurrent calls
LPStreamError* = object of CatchableError LPStreamError* = object of CatchableError
LPStreamIncompleteError* = object of LPStreamError LPStreamIncompleteError* = object of LPStreamError
@ -280,6 +281,16 @@ proc closeWithEOF*(s: LPStream): Future[void] {.async.} =
## ##
## In particular, it must not be used when there is another concurrent read ## In particular, it must not be used when there is another concurrent read
## ongoing (which may be the case during cancellations)! ## ongoing (which may be the case during cancellations)!
##
trace "Closing with EOF", s
if s.closedWithEOF:
trace "Already closed"
return
# prevent any further calls to avoid triggering
# reading the stream twice (which should assert)
s.closedWithEOF = true
await s.close() await s.close()
if s.atEof(): if s.atEof():

View File

@ -212,7 +212,6 @@ suite "Switch":
check not switch1.isConnected(switch2.peerInfo) check not switch1.isConnected(switch2.peerInfo)
check not switch2.isConnected(switch1.peerInfo) check not switch2.isConnected(switch1.peerInfo)
asyncTest "e2e should not leak on peer disconnect": asyncTest "e2e should not leak on peer disconnect":
var awaiters: seq[Future[void]] var awaiters: seq[Future[void]]
@ -698,6 +697,46 @@ suite "Switch":
# this needs to go at end # this needs to go at end
await allFuturesThrowing(awaiters) await allFuturesThrowing(awaiters)
asyncTest "e2e calling closeWithEOF on the same stream should not assert":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
discard await conn.readLp(100)
let testProto = new TestProto
testProto.codec = TestCodec
testProto.handler = handle
let switch1 = newStandardSwitch(secureManagers = [SecureProtocol.Noise])
switch1.mount(testProto)
let switch2 = newStandardSwitch(secureManagers = [SecureProtocol.Noise])
var awaiters: seq[Future[void]]
awaiters.add(await switch1.start())
var peerId = PeerID.init(PrivateKey.random(ECDSA, rng[]).get()).get()
let conn = await switch2.dial(switch1.peerInfo, TestCodec)
proc closeReader() {.async.} =
await conn.closeWithEOF()
var readers: seq[Future[void]]
for i in 0..10:
readers.add(closeReader())
await allFuturesThrowing(readers)
checkTracker(LPChannelTrackerName)
checkTracker(SecureConnTrackerName)
checkTracker(ChronosStreamTrackerName)
await allFuturesThrowing(
switch1.stop(),
switch2.stop())
# this needs to go at end
await allFuturesThrowing(awaiters)
asyncTest "connect to inexistent peer": asyncTest "connect to inexistent peer":
let switch2 = newStandardSwitch(secureManagers = [SecureProtocol.Noise]) let switch2 = newStandardSwitch(secureManagers = [SecureProtocol.Noise])
let sfut = await switch2.start() let sfut = await switch2.start()