From ac9b3e304f630a450efc996f47dc9e6133246a87 Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Wed, 18 Nov 2020 11:30:33 +0200 Subject: [PATCH] Fix deadlock for pending write() calls on transport close. (#139) Add tests for read() and write() deadlocks. --- chronos/transports/common.nim | 3 + chronos/transports/stream.nim | 603 +++++++++++++++++----------------- tests/teststream.nim | 52 +++ 3 files changed, 362 insertions(+), 296 deletions(-) diff --git a/chronos/transports/common.nim b/chronos/transports/common.nim index 0d87d91f..ad518f29 100644 --- a/chronos/transports/common.nim +++ b/chronos/transports/common.nim @@ -495,6 +495,9 @@ template getServerUseClosedError*(): ref TransportUseClosedError = template getTransportTooManyError*(): ref TransportTooManyError = newException(TransportTooManyError, "Too many open transports!") +template getTransportUseClosedError*(): ref TransportUseClosedError = + newException(TransportUseClosedError, "Transport is already closed!") + template getTransportOsError*(err: OSErrorCode): ref TransportOsError = var msg = "(" & $int(err) & ") " & osErrorMsg(err) var tre = newException(TransportOsError, msg) diff --git a/chronos/transports/stream.nim b/chronos/transports/stream.nim index 45073e11..5effeef3 100644 --- a/chronos/transports/stream.nim +++ b/chronos/transports/stream.nim @@ -322,198 +322,204 @@ when defined(windows): var ovl = cast[PtrCustomOverlapped](udata) var transp = cast[StreamTransport](ovl.data.udata) - while len(transp.queue) > 0: - if WritePending in transp.state: - ## Continuation - transp.state.excl(WritePending) - let err = transp.wovl.data.errCode - if err == OSErrorCode(-1): - bytesCount = transp.wovl.data.bytesCount - var vector = transp.queue.popFirst() - if bytesCount == 0: - if not(vector.writer.finished()): - vector.writer.complete(0) - else: - if transp.kind == TransportKind.Socket: - if vector.kind == VectorKind.DataBuffer: - if bytesCount < transp.wwsabuf.len: - vector.shiftVectorBuffer(bytesCount) - transp.queue.addFirst(vector) + if WriteClosed in transp.state: + transp.state.excl(WritePending) + transp.state.incl({WritePaused}) + let error = getTransportUseClosedError() + failPendingWriteQueue(transp.queue, error) + else: + while len(transp.queue) > 0: + if WritePending in transp.state: + ## Continuation + transp.state.excl(WritePending) + let err = transp.wovl.data.errCode + if err == OSErrorCode(-1): + bytesCount = transp.wovl.data.bytesCount + var vector = transp.queue.popFirst() + if bytesCount == 0: + if not(vector.writer.finished()): + vector.writer.complete(0) + else: + if transp.kind == TransportKind.Socket: + if vector.kind == VectorKind.DataBuffer: + if bytesCount < transp.wwsabuf.len: + vector.shiftVectorBuffer(bytesCount) + transp.queue.addFirst(vector) + else: + if not(vector.writer.finished()): + vector.writer.complete(transp.wwsabuf.len) else: - if not(vector.writer.finished()): - vector.writer.complete(transp.wwsabuf.len) - else: - if uint(bytesCount) < getFileSize(vector): - vector.shiftVectorFile(bytesCount) - transp.queue.addFirst(vector) - else: - if not(vector.writer.finished()): - vector.writer.complete(int(getFileSize(vector))) - elif transp.kind == TransportKind.Pipe: - if vector.kind == VectorKind.DataBuffer: - if bytesCount < transp.wwsabuf.len: - vector.shiftVectorBuffer(bytesCount) - transp.queue.addFirst(vector) - else: - if not(vector.writer.finished()): - vector.writer.complete(transp.wwsabuf.len) - elif int(err) == ERROR_OPERATION_ABORTED: - # CancelIO() interrupt - transp.state.incl({WritePaused, WriteEof}) - let vector = transp.queue.popFirst() - if not(vector.writer.finished()): - vector.writer.complete(0) - completePendingWriteQueue(transp.queue, 0) - break - else: - let vector = transp.queue.popFirst() - if isConnResetError(err): - # Soft error happens which indicates that remote peer got - # disconnected, complete all pending writes in queue with 0. + if uint(bytesCount) < getFileSize(vector): + vector.shiftVectorFile(bytesCount) + transp.queue.addFirst(vector) + else: + if not(vector.writer.finished()): + vector.writer.complete(int(getFileSize(vector))) + elif transp.kind == TransportKind.Pipe: + if vector.kind == VectorKind.DataBuffer: + if bytesCount < transp.wwsabuf.len: + vector.shiftVectorBuffer(bytesCount) + transp.queue.addFirst(vector) + else: + if not(vector.writer.finished()): + vector.writer.complete(transp.wwsabuf.len) + elif int(err) == ERROR_OPERATION_ABORTED: + # CancelIO() interrupt transp.state.incl({WritePaused, WriteEof}) + let vector = transp.queue.popFirst() if not(vector.writer.finished()): vector.writer.complete(0) completePendingWriteQueue(transp.queue, 0) break else: - transp.state.incl({WritePaused, WriteError}) - let error = getTransportOsError(err) - if not(vector.writer.finished()): - vector.writer.fail(error) - failPendingWriteQueue(transp.queue, error) - break - else: - ## Initiation - transp.state.incl(WritePending) - if transp.kind == TransportKind.Socket: - let sock = SocketHandle(transp.wovl.data.fd) - var vector = transp.queue.popFirst() - if vector.kind == VectorKind.DataBuffer: - transp.wovl.zeroOvelappedOffset() - transp.setWriterWSABuffer(vector) - let ret = WSASend(sock, addr transp.wwsabuf, 1, - addr bytesCount, DWORD(0), - cast[POVERLAPPED](addr transp.wovl), nil) - if ret != 0: - let err = osLastError() - if int(err) == ERROR_OPERATION_ABORTED: - # CancelIO() interrupt - transp.state.excl(WritePending) - transp.state.incl({WritePaused, WriteEof}) - if not(vector.writer.finished()): - vector.writer.complete(0) - completePendingWriteQueue(transp.queue, 0) - break - elif int(err) == ERROR_IO_PENDING: - transp.queue.addFirst(vector) - else: - transp.state.excl(WritePending) - if isConnResetError(err): - # Soft error happens which indicates that remote peer got - # disconnected, complete all pending writes in queue with 0. + let vector = transp.queue.popFirst() + if isConnResetError(err): + # Soft error happens which indicates that remote peer got + # disconnected, complete all pending writes in queue with 0. + transp.state.incl({WritePaused, WriteEof}) + if not(vector.writer.finished()): + vector.writer.complete(0) + completePendingWriteQueue(transp.queue, 0) + break + else: + transp.state.incl({WritePaused, WriteError}) + let error = getTransportOsError(err) + if not(vector.writer.finished()): + vector.writer.fail(error) + failPendingWriteQueue(transp.queue, error) + break + else: + ## Initiation + transp.state.incl(WritePending) + if transp.kind == TransportKind.Socket: + let sock = SocketHandle(transp.wovl.data.fd) + var vector = transp.queue.popFirst() + if vector.kind == VectorKind.DataBuffer: + transp.wovl.zeroOvelappedOffset() + transp.setWriterWSABuffer(vector) + let ret = WSASend(sock, addr transp.wwsabuf, 1, + addr bytesCount, DWORD(0), + cast[POVERLAPPED](addr transp.wovl), nil) + if ret != 0: + let err = osLastError() + if int(err) == ERROR_OPERATION_ABORTED: + # CancelIO() interrupt + transp.state.excl(WritePending) transp.state.incl({WritePaused, WriteEof}) if not(vector.writer.finished()): vector.writer.complete(0) completePendingWriteQueue(transp.queue, 0) break + elif int(err) == ERROR_IO_PENDING: + transp.queue.addFirst(vector) else: - transp.state.incl({WritePaused, WriteError}) - let error = getTransportOsError(err) - if not(vector.writer.finished()): - vector.writer.fail(error) - failPendingWriteQueue(transp.queue, error) - break - else: - transp.queue.addFirst(vector) - else: - let loop = getGlobalDispatcher() - var size: int32 - var flags: int32 - - if getFileSize(vector) > 2_147_483_646'u: - size = 2_147_483_646 - else: - size = int32(getFileSize(vector)) - - transp.wovl.setOverlappedOffset(vector.offset) - var ret = loop.transmitFile(sock, getFileHandle(vector), size, 0, - cast[POVERLAPPED](addr transp.wovl), - nil, flags) - if ret == 0: - let err = osLastError() - if int(err) == ERROR_OPERATION_ABORTED: - # CancelIO() interrupt - transp.state.excl(WritePending) - transp.state.incl({WritePaused, WriteEof}) - if not(vector.writer.finished()): - vector.writer.complete(0) - completePendingWriteQueue(transp.queue, 0) - break - elif int(err) == ERROR_IO_PENDING: - transp.queue.addFirst(vector) + transp.state.excl(WritePending) + if isConnResetError(err): + # Soft error happens which indicates that remote peer got + # disconnected, complete all pending writes in queue with 0. + transp.state.incl({WritePaused, WriteEof}) + if not(vector.writer.finished()): + vector.writer.complete(0) + completePendingWriteQueue(transp.queue, 0) + break + else: + transp.state.incl({WritePaused, WriteError}) + let error = getTransportOsError(err) + if not(vector.writer.finished()): + vector.writer.fail(error) + failPendingWriteQueue(transp.queue, error) + break else: - transp.state.excl(WritePending) - if isConnResetError(err): - # Soft error happens which indicates that remote peer got - # disconnected, complete all pending writes in queue with 0. + transp.queue.addFirst(vector) + else: + let loop = getGlobalDispatcher() + var size: int32 + var flags: int32 + + if getFileSize(vector) > 2_147_483_646'u: + size = 2_147_483_646 + else: + size = int32(getFileSize(vector)) + + transp.wovl.setOverlappedOffset(vector.offset) + var ret = loop.transmitFile(sock, getFileHandle(vector), size, 0, + cast[POVERLAPPED](addr transp.wovl), + nil, flags) + if ret == 0: + let err = osLastError() + if int(err) == ERROR_OPERATION_ABORTED: + # CancelIO() interrupt + transp.state.excl(WritePending) transp.state.incl({WritePaused, WriteEof}) if not(vector.writer.finished()): vector.writer.complete(0) completePendingWriteQueue(transp.queue, 0) break + elif int(err) == ERROR_IO_PENDING: + transp.queue.addFirst(vector) else: - transp.state.incl({WritePaused, WriteError}) - let error = getTransportOsError(err) - if not(vector.writer.finished()): - vector.writer.fail(error) - failPendingWriteQueue(transp.queue, error) - break - else: - transp.queue.addFirst(vector) - elif transp.kind == TransportKind.Pipe: - let pipe = Handle(transp.wovl.data.fd) - var vector = transp.queue.popFirst() - if vector.kind == VectorKind.DataBuffer: - transp.wovl.zeroOvelappedOffset() - transp.setWriterWSABuffer(vector) - let ret = writeFile(pipe, cast[pointer](transp.wwsabuf.buf), - DWORD(transp.wwsabuf.len), addr bytesCount, - cast[POVERLAPPED](addr transp.wovl)) - if ret == 0: - let err = osLastError() - if int(err) in {ERROR_OPERATION_ABORTED, ERROR_NO_DATA}: - # CancelIO() interrupt - transp.state.excl(WritePending) - transp.state.incl({WritePaused, WriteEof}) - if not(vector.writer.finished()): - vector.writer.complete(0) - completePendingWriteQueue(transp.queue, 0) - break - elif int(err) == ERROR_IO_PENDING: - transp.queue.addFirst(vector) + transp.state.excl(WritePending) + if isConnResetError(err): + # Soft error happens which indicates that remote peer got + # disconnected, complete all pending writes in queue with 0. + transp.state.incl({WritePaused, WriteEof}) + if not(vector.writer.finished()): + vector.writer.complete(0) + completePendingWriteQueue(transp.queue, 0) + break + else: + transp.state.incl({WritePaused, WriteError}) + let error = getTransportOsError(err) + if not(vector.writer.finished()): + vector.writer.fail(error) + failPendingWriteQueue(transp.queue, error) + break else: - transp.state.excl(WritePending) - if isConnResetError(err): - # Soft error happens which indicates that remote peer got - # disconnected, complete all pending writes in queue with 0. + transp.queue.addFirst(vector) + elif transp.kind == TransportKind.Pipe: + let pipe = Handle(transp.wovl.data.fd) + var vector = transp.queue.popFirst() + if vector.kind == VectorKind.DataBuffer: + transp.wovl.zeroOvelappedOffset() + transp.setWriterWSABuffer(vector) + let ret = writeFile(pipe, cast[pointer](transp.wwsabuf.buf), + DWORD(transp.wwsabuf.len), addr bytesCount, + cast[POVERLAPPED](addr transp.wovl)) + if ret == 0: + let err = osLastError() + if int(err) in {ERROR_OPERATION_ABORTED, ERROR_NO_DATA}: + # CancelIO() interrupt + transp.state.excl(WritePending) transp.state.incl({WritePaused, WriteEof}) if not(vector.writer.finished()): vector.writer.complete(0) completePendingWriteQueue(transp.queue, 0) break + elif int(err) == ERROR_IO_PENDING: + transp.queue.addFirst(vector) else: - transp.state.incl({WritePaused, WriteError}) - let error = getTransportOsError(err) - if not(vector.writer.finished()): - vector.writer.fail(error) - failPendingWriteQueue(transp.queue, error) - break - else: - transp.queue.addFirst(vector) - break + transp.state.excl(WritePending) + if isConnResetError(err): + # Soft error happens which indicates that remote peer got + # disconnected, complete all pending writes in queue with 0. + transp.state.incl({WritePaused, WriteEof}) + if not(vector.writer.finished()): + vector.writer.complete(0) + completePendingWriteQueue(transp.queue, 0) + break + else: + transp.state.incl({WritePaused, WriteError}) + let error = getTransportOsError(err) + if not(vector.writer.finished()): + vector.writer.fail(error) + failPendingWriteQueue(transp.queue, error) + break + else: + transp.queue.addFirst(vector) + break - if len(transp.queue) == 0: - transp.state.incl(WritePaused) + if len(transp.queue) == 0: + transp.state.incl(WritePaused) proc readStreamLoop(udata: pointer) {.gcsafe, nimcall.} = var ovl = cast[PtrCustomOverlapped](udata) @@ -1177,139 +1183,144 @@ else: ## after transport was closed. return - if len(transp.queue) > 0: - var vector = transp.queue.popFirst() - while true: - if transp.kind == TransportKind.Socket: - if vector.kind == VectorKind.DataBuffer: - let res = posix.send(fd, vector.buf, vector.buflen, MSG_NOSIGNAL) - if res >= 0: - if vector.buflen - res == 0: - if not(vector.writer.finished()): - vector.writer.complete(vector.size) - else: - vector.shiftVectorBuffer(res) - transp.queue.addFirst(vector) - else: - let err = osLastError() - if int(err) == EINTR: - continue - else: - transp.fd.removeWriter() - if isConnResetError(err): - # Soft error happens which indicates that remote peer got - # disconnected, complete all pending writes in queue with 0. - transp.state.incl({WriteEof, WritePaused}) - if not(vector.writer.finished()): - vector.writer.complete(0) - completePendingWriteQueue(transp.queue, 0) - else: - transp.state.incl({WriteError, WritePaused}) - let error = getTransportOsError(err) - if not(vector.writer.finished()): - vector.writer.fail(error) - failPendingWriteQueue(transp.queue, error) - else: - var nbytes = cast[int](vector.buf) - let res = sendfile(int(fd), cast[int](vector.buflen), - int(vector.offset), - nbytes) - if res >= 0: - if cast[int](vector.buf) - nbytes == 0: - vector.size += nbytes - if not(vector.writer.finished()): - vector.writer.complete(vector.size) - else: - vector.size += nbytes - vector.shiftVectorFile(nbytes) - transp.queue.addFirst(vector) - else: - let err = osLastError() - if int(err) == EINTR: - continue - else: - transp.fd.removeWriter() - if isConnResetError(err): - # Soft error happens which indicates that remote peer got - # disconnected, complete all pending writes in queue with 0. - transp.state.incl({WriteEof, WritePaused}) - if not(vector.writer.finished()): - vector.writer.complete(0) - completePendingWriteQueue(transp.queue, 0) - else: - transp.state.incl({WriteError, WritePaused}) - let error = getTransportOsError(err) - if not(vector.writer.finished()): - vector.writer.fail(error) - failPendingWriteQueue(transp.queue, error) - break - - elif transp.kind == TransportKind.Pipe: - if vector.kind == VectorKind.DataBuffer: - let res = posix.write(cint(fd), vector.buf, vector.buflen) - if res >= 0: - if vector.buflen - res == 0: - if not(vector.writer.finished()): - vector.writer.complete(vector.size) - else: - vector.shiftVectorBuffer(res) - transp.queue.addFirst(vector) - else: - let err = osLastError() - if int(err) == EINTR: - continue - else: - transp.fd.removeWriter() - if isConnResetError(err): - # Soft error happens which indicates that remote peer got - # disconnected, complete all pending writes in queue with 0. - transp.state.incl({WriteEof, WritePaused}) - if not(vector.writer.finished()): - vector.writer.complete(0) - completePendingWriteQueue(transp.queue, 0) - else: - transp.state.incl({WriteError, WritePaused}) - let error = getTransportOsError(err) - if not(vector.writer.finished()): - vector.writer.fail(error) - failPendingWriteQueue(transp.queue, error) - else: - var nbytes = cast[int](vector.buf) - let res = sendfile(int(fd), cast[int](vector.buflen), - int(vector.offset), - nbytes) - if res >= 0: - if cast[int](vector.buf) - nbytes == 0: - vector.size += nbytes - if not(vector.writer.finished()): - vector.writer.complete(vector.size) - else: - vector.size += nbytes - vector.shiftVectorFile(nbytes) - transp.queue.addFirst(vector) - else: - let err = osLastError() - if int(err) == EINTR: - continue - else: - transp.fd.removeWriter() - if isConnResetError(err): - # Soft error happens which indicates that remote peer got - # disconnected, complete all pending writes in queue with 0. - transp.state.incl({WriteEof, WritePaused}) - if not(vector.writer.finished()): - vector.writer.complete(0) - completePendingWriteQueue(transp.queue, 0) - else: - transp.state.incl({WriteError, WritePaused}) - let error = getTransportOsError(err) - if not(vector.writer.finished()): - vector.writer.fail(error) - failPendingWriteQueue(transp.queue, error) - break + if WriteClosed in transp.state: + transp.state.incl({WritePaused}) + let error = getTransportUseClosedError() + failPendingWriteQueue(transp.queue, error) else: - transp.state.incl(WritePaused) - transp.fd.removeWriter() + if len(transp.queue) > 0: + var vector = transp.queue.popFirst() + while true: + if transp.kind == TransportKind.Socket: + if vector.kind == VectorKind.DataBuffer: + let res = posix.send(fd, vector.buf, vector.buflen, MSG_NOSIGNAL) + if res >= 0: + if vector.buflen - res == 0: + if not(vector.writer.finished()): + vector.writer.complete(vector.size) + else: + vector.shiftVectorBuffer(res) + transp.queue.addFirst(vector) + else: + let err = osLastError() + if int(err) == EINTR: + continue + else: + transp.fd.removeWriter() + if isConnResetError(err): + # Soft error happens which indicates that remote peer got + # disconnected, complete all pending writes in queue with 0. + transp.state.incl({WriteEof, WritePaused}) + if not(vector.writer.finished()): + vector.writer.complete(0) + completePendingWriteQueue(transp.queue, 0) + else: + transp.state.incl({WriteError, WritePaused}) + let error = getTransportOsError(err) + if not(vector.writer.finished()): + vector.writer.fail(error) + failPendingWriteQueue(transp.queue, error) + else: + var nbytes = cast[int](vector.buf) + let res = sendfile(int(fd), cast[int](vector.buflen), + int(vector.offset), + nbytes) + if res >= 0: + if cast[int](vector.buf) - nbytes == 0: + vector.size += nbytes + if not(vector.writer.finished()): + vector.writer.complete(vector.size) + else: + vector.size += nbytes + vector.shiftVectorFile(nbytes) + transp.queue.addFirst(vector) + else: + let err = osLastError() + if int(err) == EINTR: + continue + else: + transp.fd.removeWriter() + if isConnResetError(err): + # Soft error happens which indicates that remote peer got + # disconnected, complete all pending writes in queue with 0. + transp.state.incl({WriteEof, WritePaused}) + if not(vector.writer.finished()): + vector.writer.complete(0) + completePendingWriteQueue(transp.queue, 0) + else: + transp.state.incl({WriteError, WritePaused}) + let error = getTransportOsError(err) + if not(vector.writer.finished()): + vector.writer.fail(error) + failPendingWriteQueue(transp.queue, error) + break + + elif transp.kind == TransportKind.Pipe: + if vector.kind == VectorKind.DataBuffer: + let res = posix.write(cint(fd), vector.buf, vector.buflen) + if res >= 0: + if vector.buflen - res == 0: + if not(vector.writer.finished()): + vector.writer.complete(vector.size) + else: + vector.shiftVectorBuffer(res) + transp.queue.addFirst(vector) + else: + let err = osLastError() + if int(err) == EINTR: + continue + else: + transp.fd.removeWriter() + if isConnResetError(err): + # Soft error happens which indicates that remote peer got + # disconnected, complete all pending writes in queue with 0. + transp.state.incl({WriteEof, WritePaused}) + if not(vector.writer.finished()): + vector.writer.complete(0) + completePendingWriteQueue(transp.queue, 0) + else: + transp.state.incl({WriteError, WritePaused}) + let error = getTransportOsError(err) + if not(vector.writer.finished()): + vector.writer.fail(error) + failPendingWriteQueue(transp.queue, error) + else: + var nbytes = cast[int](vector.buf) + let res = sendfile(int(fd), cast[int](vector.buflen), + int(vector.offset), + nbytes) + if res >= 0: + if cast[int](vector.buf) - nbytes == 0: + vector.size += nbytes + if not(vector.writer.finished()): + vector.writer.complete(vector.size) + else: + vector.size += nbytes + vector.shiftVectorFile(nbytes) + transp.queue.addFirst(vector) + else: + let err = osLastError() + if int(err) == EINTR: + continue + else: + transp.fd.removeWriter() + if isConnResetError(err): + # Soft error happens which indicates that remote peer got + # disconnected, complete all pending writes in queue with 0. + transp.state.incl({WriteEof, WritePaused}) + if not(vector.writer.finished()): + vector.writer.complete(0) + completePendingWriteQueue(transp.queue, 0) + else: + transp.state.incl({WriteError, WritePaused}) + let error = getTransportOsError(err) + if not(vector.writer.finished()): + vector.writer.fail(error) + failPendingWriteQueue(transp.queue, error) + break + else: + transp.state.incl(WritePaused) + transp.fd.removeWriter() proc readStreamLoop(udata: pointer) {.gcsafe.} = var cdata = cast[ptr CompletionData](udata) diff --git a/tests/teststream.nim b/tests/teststream.nim index 3e6ab302..b1b57b0f 100644 --- a/tests/teststream.nim +++ b/tests/teststream.nim @@ -1152,6 +1152,54 @@ suite "Stream Transport test suite": await server.closeWait() setMaxOpenFiles(maxFiles) + proc testWriteOnClose(address: TransportAddress): Future[bool] {.async.} = + var server = createStreamServer(address, flags = {ReuseAddr, NoPipeFlash}) + var res = 0 + + proc acceptTask(server: StreamServer) {.async.} = + let transp = await server.accept() + var futs = newSeq[Future[int]](TestsCount) + var msg = createBigMessage(1024) + for i in 0 ..< len(futs): + futs[i] = transp.write(msg) + + await transp.closeWait() + await sleepAsync(100.milliseconds) + + for i in 0 ..< len(futs): + if futs[i].failed() and (futs[i].error of TransportUseClosedError): + inc(res) + + await server.closeWait() + + var acceptFut = acceptTask(server) + var transp = await connect(address) + await server.join() + await transp.closeWait() + await acceptFut + return (res == TestsCount) + + proc testReadOnClose(address: TransportAddress): Future[bool] {.async.} = + var server = createStreamServer(address, flags = {ReuseAddr, NoPipeFlash}) + var res = false + + proc acceptTask(server: StreamServer) {.async.} = + let transp = await server.accept() + var buffer = newSeq[byte](1024) + var fut = transp.readOnce(addr buffer[0], len(buffer)) + await transp.closeWait() + await sleepAsync(100.milliseconds) + if fut.failed() and (fut.error of TransportUseClosedError): + res = true + await server.closeWait() + + var acceptFut = acceptTask(server) + var transp = await connect(address) + await server.join() + await transp.closeWait() + await acceptFut + return res + markFD = getCurrentFD() for i in 0..