don't hang reset on concurrent push and reads

This commit is contained in:
Dmitriy Ryajov 2020-11-19 22:39:30 -06:00
parent c42009d56e
commit f358e6a40c
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
2 changed files with 48 additions and 2 deletions

View File

@ -95,6 +95,24 @@ proc reset*(s: LPChannel) {.async, gcsafe.} =
# Make sure to drain any ongoing pushes - there's already at least one item
# more in the queue already so any ongoing reads shouldn't interfere
# Notably, popFirst is not fair - which reader/writer gets woken up depends
# if no reader are attached, attempt to drain
# the queue, otherwise, wait for the reader
# to complete and drain whatever left
# afterwards.
if s.reading:
# If a reader is attached, schedule a future to be
# completed as soon as possible, but after the ongoing
# read (either at the end of this poll or sometime during
# the next poll()), otherwise we'll schedule a pop after
# the read which will never complete because the reader
# steals it from under us.
var w = newFuture[void]()
proc wakeup(p: pointer = nil) = w.complete()
callSoon(wakeup)
await w
continue
discard await s.readQueue.popFirst()
if s.readQueue.len == 0 and s.reading:

View File

@ -111,7 +111,7 @@ suite "Mplex":
var data = newSeq[byte](6)
await chann.close() # closing channel
# should be able to read on local clsoe
# should be able to read on local close
await chann.readExactly(addr data[0], 3)
# closing remote end
let closeFut = chann.pushEof()
@ -300,6 +300,34 @@ suite "Mplex":
check await allFutures(rfut, rfut2, wfut, wfut2).withTimeout(100.millis)
await conn.close()
asyncTest "should not hang on concurrent push/reads":
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let
conn = newBufferStream(writeHandler)
chann = LPChannel.init(1, conn, true)
var data = newSeq[byte](1)
proc writer() {.async.} =
await chann.pushData(@[0'u8])
await chann.pushData(@[0'u8])
await chann.pushData(@[0'u8])
proc reader() {.async.} =
await chann.readExactly(addr data[0], 1)
await chann.readExactly(addr data[0], 1)
let rw = @[writer(), reader()] # order of execution is important!!!
await chann.close()
check await chann.reset() # this would hang
.withTimeout(100.millis)
check await allFuturesThrowing(
allFinished(rw))
.withTimeout(100.millis)
await conn.close()
asyncTest "channel should fail writing":
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let
@ -861,7 +889,7 @@ suite "Mplex":
transport2.stop())
await acceptFut
asyncTest "canceling listening connection should close both ends":
asyncTest "closing listening connection should close both ends":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1 = TcpTransport.init()