Fix deadlock for pending write() calls on transport close. (#139)

Add tests for read() and write() deadlocks.
This commit is contained in:
Eugene Kabanov 2020-11-18 11:30:33 +02:00 committed by GitHub
parent 879c917242
commit ac9b3e304f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 362 additions and 296 deletions

View File

@ -495,6 +495,9 @@ template getServerUseClosedError*(): ref TransportUseClosedError =
template getTransportTooManyError*(): ref TransportTooManyError =
newException(TransportTooManyError, "Too many open transports!")
template getTransportUseClosedError*(): ref TransportUseClosedError =
newException(TransportUseClosedError, "Transport is already closed!")
template getTransportOsError*(err: OSErrorCode): ref TransportOsError =
var msg = "(" & $int(err) & ") " & osErrorMsg(err)
var tre = newException(TransportOsError, msg)

View File

@ -322,198 +322,204 @@ when defined(windows):
var ovl = cast[PtrCustomOverlapped](udata)
var transp = cast[StreamTransport](ovl.data.udata)
while len(transp.queue) > 0:
if WritePending in transp.state:
## Continuation
transp.state.excl(WritePending)
let err = transp.wovl.data.errCode
if err == OSErrorCode(-1):
bytesCount = transp.wovl.data.bytesCount
var vector = transp.queue.popFirst()
if bytesCount == 0:
if not(vector.writer.finished()):
vector.writer.complete(0)
else:
if transp.kind == TransportKind.Socket:
if vector.kind == VectorKind.DataBuffer:
if bytesCount < transp.wwsabuf.len:
vector.shiftVectorBuffer(bytesCount)
transp.queue.addFirst(vector)
if WriteClosed in transp.state:
transp.state.excl(WritePending)
transp.state.incl({WritePaused})
let error = getTransportUseClosedError()
failPendingWriteQueue(transp.queue, error)
else:
while len(transp.queue) > 0:
if WritePending in transp.state:
## Continuation
transp.state.excl(WritePending)
let err = transp.wovl.data.errCode
if err == OSErrorCode(-1):
bytesCount = transp.wovl.data.bytesCount
var vector = transp.queue.popFirst()
if bytesCount == 0:
if not(vector.writer.finished()):
vector.writer.complete(0)
else:
if transp.kind == TransportKind.Socket:
if vector.kind == VectorKind.DataBuffer:
if bytesCount < transp.wwsabuf.len:
vector.shiftVectorBuffer(bytesCount)
transp.queue.addFirst(vector)
else:
if not(vector.writer.finished()):
vector.writer.complete(transp.wwsabuf.len)
else:
if not(vector.writer.finished()):
vector.writer.complete(transp.wwsabuf.len)
else:
if uint(bytesCount) < getFileSize(vector):
vector.shiftVectorFile(bytesCount)
transp.queue.addFirst(vector)
else:
if not(vector.writer.finished()):
vector.writer.complete(int(getFileSize(vector)))
elif transp.kind == TransportKind.Pipe:
if vector.kind == VectorKind.DataBuffer:
if bytesCount < transp.wwsabuf.len:
vector.shiftVectorBuffer(bytesCount)
transp.queue.addFirst(vector)
else:
if not(vector.writer.finished()):
vector.writer.complete(transp.wwsabuf.len)
elif int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt
transp.state.incl({WritePaused, WriteEof})
let vector = transp.queue.popFirst()
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
break
else:
let vector = transp.queue.popFirst()
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
if uint(bytesCount) < getFileSize(vector):
vector.shiftVectorFile(bytesCount)
transp.queue.addFirst(vector)
else:
if not(vector.writer.finished()):
vector.writer.complete(int(getFileSize(vector)))
elif transp.kind == TransportKind.Pipe:
if vector.kind == VectorKind.DataBuffer:
if bytesCount < transp.wwsabuf.len:
vector.shiftVectorBuffer(bytesCount)
transp.queue.addFirst(vector)
else:
if not(vector.writer.finished()):
vector.writer.complete(transp.wwsabuf.len)
elif int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt
transp.state.incl({WritePaused, WriteEof})
let vector = transp.queue.popFirst()
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
break
else:
transp.state.incl({WritePaused, WriteError})
let error = getTransportOsError(err)
if not(vector.writer.finished()):
vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
break
else:
## Initiation
transp.state.incl(WritePending)
if transp.kind == TransportKind.Socket:
let sock = SocketHandle(transp.wovl.data.fd)
var vector = transp.queue.popFirst()
if vector.kind == VectorKind.DataBuffer:
transp.wovl.zeroOvelappedOffset()
transp.setWriterWSABuffer(vector)
let ret = WSASend(sock, addr transp.wwsabuf, 1,
addr bytesCount, DWORD(0),
cast[POVERLAPPED](addr transp.wovl), nil)
if ret != 0:
let err = osLastError()
if int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt
transp.state.excl(WritePending)
transp.state.incl({WritePaused, WriteEof})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
break
elif int(err) == ERROR_IO_PENDING:
transp.queue.addFirst(vector)
else:
transp.state.excl(WritePending)
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
let vector = transp.queue.popFirst()
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl({WritePaused, WriteEof})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
break
else:
transp.state.incl({WritePaused, WriteError})
let error = getTransportOsError(err)
if not(vector.writer.finished()):
vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
break
else:
## Initiation
transp.state.incl(WritePending)
if transp.kind == TransportKind.Socket:
let sock = SocketHandle(transp.wovl.data.fd)
var vector = transp.queue.popFirst()
if vector.kind == VectorKind.DataBuffer:
transp.wovl.zeroOvelappedOffset()
transp.setWriterWSABuffer(vector)
let ret = WSASend(sock, addr transp.wwsabuf, 1,
addr bytesCount, DWORD(0),
cast[POVERLAPPED](addr transp.wovl), nil)
if ret != 0:
let err = osLastError()
if int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt
transp.state.excl(WritePending)
transp.state.incl({WritePaused, WriteEof})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
break
elif int(err) == ERROR_IO_PENDING:
transp.queue.addFirst(vector)
else:
transp.state.incl({WritePaused, WriteError})
let error = getTransportOsError(err)
if not(vector.writer.finished()):
vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
break
else:
transp.queue.addFirst(vector)
else:
let loop = getGlobalDispatcher()
var size: int32
var flags: int32
if getFileSize(vector) > 2_147_483_646'u:
size = 2_147_483_646
else:
size = int32(getFileSize(vector))
transp.wovl.setOverlappedOffset(vector.offset)
var ret = loop.transmitFile(sock, getFileHandle(vector), size, 0,
cast[POVERLAPPED](addr transp.wovl),
nil, flags)
if ret == 0:
let err = osLastError()
if int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt
transp.state.excl(WritePending)
transp.state.incl({WritePaused, WriteEof})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
break
elif int(err) == ERROR_IO_PENDING:
transp.queue.addFirst(vector)
transp.state.excl(WritePending)
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl({WritePaused, WriteEof})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
break
else:
transp.state.incl({WritePaused, WriteError})
let error = getTransportOsError(err)
if not(vector.writer.finished()):
vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
break
else:
transp.state.excl(WritePending)
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.queue.addFirst(vector)
else:
let loop = getGlobalDispatcher()
var size: int32
var flags: int32
if getFileSize(vector) > 2_147_483_646'u:
size = 2_147_483_646
else:
size = int32(getFileSize(vector))
transp.wovl.setOverlappedOffset(vector.offset)
var ret = loop.transmitFile(sock, getFileHandle(vector), size, 0,
cast[POVERLAPPED](addr transp.wovl),
nil, flags)
if ret == 0:
let err = osLastError()
if int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt
transp.state.excl(WritePending)
transp.state.incl({WritePaused, WriteEof})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
break
elif int(err) == ERROR_IO_PENDING:
transp.queue.addFirst(vector)
else:
transp.state.incl({WritePaused, WriteError})
let error = getTransportOsError(err)
if not(vector.writer.finished()):
vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
break
else:
transp.queue.addFirst(vector)
elif transp.kind == TransportKind.Pipe:
let pipe = Handle(transp.wovl.data.fd)
var vector = transp.queue.popFirst()
if vector.kind == VectorKind.DataBuffer:
transp.wovl.zeroOvelappedOffset()
transp.setWriterWSABuffer(vector)
let ret = writeFile(pipe, cast[pointer](transp.wwsabuf.buf),
DWORD(transp.wwsabuf.len), addr bytesCount,
cast[POVERLAPPED](addr transp.wovl))
if ret == 0:
let err = osLastError()
if int(err) in {ERROR_OPERATION_ABORTED, ERROR_NO_DATA}:
# CancelIO() interrupt
transp.state.excl(WritePending)
transp.state.incl({WritePaused, WriteEof})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
break
elif int(err) == ERROR_IO_PENDING:
transp.queue.addFirst(vector)
transp.state.excl(WritePending)
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl({WritePaused, WriteEof})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
break
else:
transp.state.incl({WritePaused, WriteError})
let error = getTransportOsError(err)
if not(vector.writer.finished()):
vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
break
else:
transp.state.excl(WritePending)
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.queue.addFirst(vector)
elif transp.kind == TransportKind.Pipe:
let pipe = Handle(transp.wovl.data.fd)
var vector = transp.queue.popFirst()
if vector.kind == VectorKind.DataBuffer:
transp.wovl.zeroOvelappedOffset()
transp.setWriterWSABuffer(vector)
let ret = writeFile(pipe, cast[pointer](transp.wwsabuf.buf),
DWORD(transp.wwsabuf.len), addr bytesCount,
cast[POVERLAPPED](addr transp.wovl))
if ret == 0:
let err = osLastError()
if int(err) in {ERROR_OPERATION_ABORTED, ERROR_NO_DATA}:
# CancelIO() interrupt
transp.state.excl(WritePending)
transp.state.incl({WritePaused, WriteEof})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
break
elif int(err) == ERROR_IO_PENDING:
transp.queue.addFirst(vector)
else:
transp.state.incl({WritePaused, WriteError})
let error = getTransportOsError(err)
if not(vector.writer.finished()):
vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
break
else:
transp.queue.addFirst(vector)
break
transp.state.excl(WritePending)
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl({WritePaused, WriteEof})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
break
else:
transp.state.incl({WritePaused, WriteError})
let error = getTransportOsError(err)
if not(vector.writer.finished()):
vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
break
else:
transp.queue.addFirst(vector)
break
if len(transp.queue) == 0:
transp.state.incl(WritePaused)
if len(transp.queue) == 0:
transp.state.incl(WritePaused)
proc readStreamLoop(udata: pointer) {.gcsafe, nimcall.} =
var ovl = cast[PtrCustomOverlapped](udata)
@ -1177,139 +1183,144 @@ else:
## after transport was closed.
return
if len(transp.queue) > 0:
var vector = transp.queue.popFirst()
while true:
if transp.kind == TransportKind.Socket:
if vector.kind == VectorKind.DataBuffer:
let res = posix.send(fd, vector.buf, vector.buflen, MSG_NOSIGNAL)
if res >= 0:
if vector.buflen - res == 0:
if not(vector.writer.finished()):
vector.writer.complete(vector.size)
else:
vector.shiftVectorBuffer(res)
transp.queue.addFirst(vector)
else:
let err = osLastError()
if int(err) == EINTR:
continue
else:
transp.fd.removeWriter()
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl({WriteEof, WritePaused})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
else:
transp.state.incl({WriteError, WritePaused})
let error = getTransportOsError(err)
if not(vector.writer.finished()):
vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
else:
var nbytes = cast[int](vector.buf)
let res = sendfile(int(fd), cast[int](vector.buflen),
int(vector.offset),
nbytes)
if res >= 0:
if cast[int](vector.buf) - nbytes == 0:
vector.size += nbytes
if not(vector.writer.finished()):
vector.writer.complete(vector.size)
else:
vector.size += nbytes
vector.shiftVectorFile(nbytes)
transp.queue.addFirst(vector)
else:
let err = osLastError()
if int(err) == EINTR:
continue
else:
transp.fd.removeWriter()
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl({WriteEof, WritePaused})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
else:
transp.state.incl({WriteError, WritePaused})
let error = getTransportOsError(err)
if not(vector.writer.finished()):
vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
break
elif transp.kind == TransportKind.Pipe:
if vector.kind == VectorKind.DataBuffer:
let res = posix.write(cint(fd), vector.buf, vector.buflen)
if res >= 0:
if vector.buflen - res == 0:
if not(vector.writer.finished()):
vector.writer.complete(vector.size)
else:
vector.shiftVectorBuffer(res)
transp.queue.addFirst(vector)
else:
let err = osLastError()
if int(err) == EINTR:
continue
else:
transp.fd.removeWriter()
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl({WriteEof, WritePaused})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
else:
transp.state.incl({WriteError, WritePaused})
let error = getTransportOsError(err)
if not(vector.writer.finished()):
vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
else:
var nbytes = cast[int](vector.buf)
let res = sendfile(int(fd), cast[int](vector.buflen),
int(vector.offset),
nbytes)
if res >= 0:
if cast[int](vector.buf) - nbytes == 0:
vector.size += nbytes
if not(vector.writer.finished()):
vector.writer.complete(vector.size)
else:
vector.size += nbytes
vector.shiftVectorFile(nbytes)
transp.queue.addFirst(vector)
else:
let err = osLastError()
if int(err) == EINTR:
continue
else:
transp.fd.removeWriter()
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl({WriteEof, WritePaused})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
else:
transp.state.incl({WriteError, WritePaused})
let error = getTransportOsError(err)
if not(vector.writer.finished()):
vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
break
if WriteClosed in transp.state:
transp.state.incl({WritePaused})
let error = getTransportUseClosedError()
failPendingWriteQueue(transp.queue, error)
else:
transp.state.incl(WritePaused)
transp.fd.removeWriter()
if len(transp.queue) > 0:
var vector = transp.queue.popFirst()
while true:
if transp.kind == TransportKind.Socket:
if vector.kind == VectorKind.DataBuffer:
let res = posix.send(fd, vector.buf, vector.buflen, MSG_NOSIGNAL)
if res >= 0:
if vector.buflen - res == 0:
if not(vector.writer.finished()):
vector.writer.complete(vector.size)
else:
vector.shiftVectorBuffer(res)
transp.queue.addFirst(vector)
else:
let err = osLastError()
if int(err) == EINTR:
continue
else:
transp.fd.removeWriter()
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl({WriteEof, WritePaused})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
else:
transp.state.incl({WriteError, WritePaused})
let error = getTransportOsError(err)
if not(vector.writer.finished()):
vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
else:
var nbytes = cast[int](vector.buf)
let res = sendfile(int(fd), cast[int](vector.buflen),
int(vector.offset),
nbytes)
if res >= 0:
if cast[int](vector.buf) - nbytes == 0:
vector.size += nbytes
if not(vector.writer.finished()):
vector.writer.complete(vector.size)
else:
vector.size += nbytes
vector.shiftVectorFile(nbytes)
transp.queue.addFirst(vector)
else:
let err = osLastError()
if int(err) == EINTR:
continue
else:
transp.fd.removeWriter()
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl({WriteEof, WritePaused})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
else:
transp.state.incl({WriteError, WritePaused})
let error = getTransportOsError(err)
if not(vector.writer.finished()):
vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
break
elif transp.kind == TransportKind.Pipe:
if vector.kind == VectorKind.DataBuffer:
let res = posix.write(cint(fd), vector.buf, vector.buflen)
if res >= 0:
if vector.buflen - res == 0:
if not(vector.writer.finished()):
vector.writer.complete(vector.size)
else:
vector.shiftVectorBuffer(res)
transp.queue.addFirst(vector)
else:
let err = osLastError()
if int(err) == EINTR:
continue
else:
transp.fd.removeWriter()
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl({WriteEof, WritePaused})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
else:
transp.state.incl({WriteError, WritePaused})
let error = getTransportOsError(err)
if not(vector.writer.finished()):
vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
else:
var nbytes = cast[int](vector.buf)
let res = sendfile(int(fd), cast[int](vector.buflen),
int(vector.offset),
nbytes)
if res >= 0:
if cast[int](vector.buf) - nbytes == 0:
vector.size += nbytes
if not(vector.writer.finished()):
vector.writer.complete(vector.size)
else:
vector.size += nbytes
vector.shiftVectorFile(nbytes)
transp.queue.addFirst(vector)
else:
let err = osLastError()
if int(err) == EINTR:
continue
else:
transp.fd.removeWriter()
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl({WriteEof, WritePaused})
if not(vector.writer.finished()):
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
else:
transp.state.incl({WriteError, WritePaused})
let error = getTransportOsError(err)
if not(vector.writer.finished()):
vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
break
else:
transp.state.incl(WritePaused)
transp.fd.removeWriter()
proc readStreamLoop(udata: pointer) {.gcsafe.} =
var cdata = cast[ptr CompletionData](udata)

View File

@ -1152,6 +1152,54 @@ suite "Stream Transport test suite":
await server.closeWait()
setMaxOpenFiles(maxFiles)
proc testWriteOnClose(address: TransportAddress): Future[bool] {.async.} =
var server = createStreamServer(address, flags = {ReuseAddr, NoPipeFlash})
var res = 0
proc acceptTask(server: StreamServer) {.async.} =
let transp = await server.accept()
var futs = newSeq[Future[int]](TestsCount)
var msg = createBigMessage(1024)
for i in 0 ..< len(futs):
futs[i] = transp.write(msg)
await transp.closeWait()
await sleepAsync(100.milliseconds)
for i in 0 ..< len(futs):
if futs[i].failed() and (futs[i].error of TransportUseClosedError):
inc(res)
await server.closeWait()
var acceptFut = acceptTask(server)
var transp = await connect(address)
await server.join()
await transp.closeWait()
await acceptFut
return (res == TestsCount)
proc testReadOnClose(address: TransportAddress): Future[bool] {.async.} =
var server = createStreamServer(address, flags = {ReuseAddr, NoPipeFlash})
var res = false
proc acceptTask(server: StreamServer) {.async.} =
let transp = await server.accept()
var buffer = newSeq[byte](1024)
var fut = transp.readOnce(addr buffer[0], len(buffer))
await transp.closeWait()
await sleepAsync(100.milliseconds)
if fut.failed() and (fut.error of TransportUseClosedError):
res = true
await server.closeWait()
var acceptFut = acceptTask(server)
var transp = await connect(address)
await server.join()
await transp.closeWait()
await acceptFut
return res
markFD = getCurrentFD()
for i in 0..<len(addresses):
@ -1226,6 +1274,10 @@ suite "Stream Transport test suite":
skip()
else:
check waitFor(testAcceptTooMany(addresses[i])) == true
test prefixes[i] & "write() queue notification on close() test":
check waitFor(testWriteOnClose(addresses[i])) == true
test prefixes[i] & "read() notification on close() test":
check waitFor(testReadOnClose(addresses[i])) == true
test "Servers leak test":
check getTracker("stream.server").isLeaked() == false
test "Transports leak test":