diff --git a/chronos.nimble b/chronos.nimble index a55a193..80920a0 100644 --- a/chronos.nimble +++ b/chronos.nimble @@ -1,5 +1,5 @@ packageName = "chronos" -version = "3.0.9" +version = "3.0.10" author = "Status Research & Development GmbH" description = "Chronos" license = "Apache License 2.0 or MIT" diff --git a/chronos/sendfile.nim b/chronos/sendfile.nim index f983278..a4c6d0a 100644 --- a/chronos/sendfile.nim +++ b/chronos/sendfile.nim @@ -29,7 +29,8 @@ when defined(nimdoc): ## ## ``count`` is the number of bytes to copy between the file descriptors. ## On exit ``count`` will hold number of bytes actually transferred between - ## file descriptors. + ## file descriptors. May be >0 even in the case of error return, if some + ## bytes were sent before the error occurred. ## ## If the transfer was successful, the number of bytes written to ``outfd`` ## is stored in ``count``, and ``0`` returned. Note that a successful call @@ -45,10 +46,13 @@ when defined(linux) or defined(android): proc sendfile*(outfd, infd: int, offset: int, count: var int): int = var o = offset - result = osSendFile(cint(outfd), cint(infd), addr o, count) - if result >= 0: - count = result - result = 0 + let res = osSendFile(cint(outfd), cint(infd), addr o, count) + if res >= 0: + count = res + 0 + else: + count = 0 + -1 elif defined(freebsd) or defined(openbsd) or defined(netbsd) or defined(dragonflybsd): @@ -69,18 +73,17 @@ elif defined(freebsd) or defined(openbsd) or defined(netbsd) or proc sendfile*(outfd, infd: int, offset: int, count: var int): int = var o = 0'u - result = osSendFile(cint(infd), cint(outfd), uint(offset), uint(count), nil, + let res = osSendFile(cint(infd), cint(outfd), uint(offset), uint(count), nil, addr o, 0) - if result >= 0: + if res >= 0: count = int(o) - result = 0 + 0 else: let err = osLastError() - if int(err) == EAGAIN: - count = int(o) - result = 0 - else: - result = -1 + count = + if int(err) == EAGAIN: int(o) + else: 0 + -1 elif defined(macosx): import posix, os @@ -100,14 +103,13 @@ elif defined(macosx): proc sendfile*(outfd, infd: int, offset: int, count: var int): int = var o = count - result = osSendFile(cint(infd), cint(outfd), offset, addr o, nil, 0) - if result >= 0: + let res = osSendFile(cint(infd), cint(outfd), offset, addr o, nil, 0) + if res >= 0: count = int(o) - result = 0 + 0 else: let err = osLastError() - if int(err) == EAGAIN: - count = int(o) - result = 0 - else: - result = -1 + count = + if int(err) == EAGAIN: int(o) + else: 0 + -1 diff --git a/chronos/transports/stream.nim b/chronos/transports/stream.nim index 7b8520f..1e6611f 100644 --- a/chronos/transports/stream.nim +++ b/chronos/transports/stream.nim @@ -191,11 +191,11 @@ template shiftBuffer(t, c: untyped) = else: (t).offset = 0 -template shiftVectorBuffer(v, o: untyped) = +template shiftVectorBuffer(v: var StreamVector, o: untyped) = (v).buf = cast[pointer](cast[uint]((v).buf) + uint(o)) (v).buflen -= int(o) -template shiftVectorFile(v, o: untyped) = +template shiftVectorFile(v: var StreamVector, o: untyped) = (v).buf = cast[pointer](cast[uint]((v).buf) - cast[uint](o)) (v).offset += cast[uint]((o)) @@ -1239,8 +1239,17 @@ else: result = (err == OSErrorCode(ECONNRESET)) or (err == OSErrorCode(EPIPE)) + proc removeWriter(transp: StreamTransport) = + try: + transp.fd.removeWriter() + # For debugging, record that we're no longer getting write notifications + transp.state.incl WritePaused + except IOSelectorsException as exc: + raiseAsDefect exc, "removeWriter" + except ValueError as exc: + raiseAsDefect exc, "removeWriter" + proc writeStreamLoop(udata: pointer) = - # TODO fix Defect raises - they "shouldn't" happen var cdata = cast[ptr CompletionData](udata) var transp = cast[StreamTransport](cdata.udata) let fd = SocketHandle(cdata.fd) @@ -1251,171 +1260,91 @@ else: return if WriteClosed in transp.state: - transp.state.incl({WritePaused}) - let error = getTransportUseClosedError() - failPendingWriteQueue(transp.queue, error) - else: - if len(transp.queue) > 0: - var vector = transp.queue.popFirst() - while true: - if transp.kind == TransportKind.Socket: - if vector.kind == VectorKind.DataBuffer: - let res = posix.send(fd, vector.buf, vector.buflen, MSG_NOSIGNAL) - if res >= 0: - if vector.buflen - res == 0: - if not(vector.writer.finished()): - vector.writer.complete(vector.size) - else: - vector.shiftVectorBuffer(res) - transp.queue.addFirst(vector) - else: - let err = osLastError() - if int(err) == EINTR: - continue - else: - try: - transp.fd.removeWriter() - except IOSelectorsException as exc: - raiseAsDefect exc, "removeWriter" - except ValueError as exc: - raiseAsDefect exc, "removeWriter" + if transp.queue.len > 0: + transp.removeWriter() - if isConnResetError(err): - # Soft error happens which indicates that remote peer got - # disconnected, complete all pending writes in queue with 0. - transp.state.incl({WriteEof, WritePaused}) - if not(vector.writer.finished()): - vector.writer.complete(0) - completePendingWriteQueue(transp.queue, 0) - else: - transp.state.incl({WriteError, WritePaused}) - let error = getTransportOsError(err) - if not(vector.writer.finished()): - vector.writer.fail(error) - failPendingWriteQueue(transp.queue, error) - else: - var nbytes = cast[int](vector.buf) - let res = sendfile(int(fd), cast[int](vector.buflen), - int(vector.offset), - nbytes) - if res >= 0: - if cast[int](vector.buf) - nbytes == 0: - vector.size += nbytes - if not(vector.writer.finished()): - vector.writer.complete(vector.size) - else: - vector.size += nbytes - vector.shiftVectorFile(nbytes) - transp.queue.addFirst(vector) - else: - let err = osLastError() - if int(err) == EINTR: - continue - else: - try: - transp.fd.removeWriter() - except IOSelectorsException as exc: - raiseAsDefect exc, "removeWriter" - except ValueError as exc: - raiseAsDefect exc, "removeWriter" + let error = getTransportUseClosedError() + failPendingWriteQueue(transp.queue, error) + return - if isConnResetError(err): - # Soft error happens which indicates that remote peer got - # disconnected, complete all pending writes in queue with 0. - transp.state.incl({WriteEof, WritePaused}) - if not(vector.writer.finished()): - vector.writer.complete(0) - completePendingWriteQueue(transp.queue, 0) - else: - transp.state.incl({WriteError, WritePaused}) - let error = getTransportOsError(err) - if not(vector.writer.finished()): - vector.writer.fail(error) - failPendingWriteQueue(transp.queue, error) - break + # We exit this loop in two ways: + # * The queue is empty: we call removeWriter to disable further callbacks + # * EWOULDBLOCK is returned and we need to wait for a new notification - elif transp.kind == TransportKind.Pipe: - if vector.kind == VectorKind.DataBuffer: - let res = posix.write(cint(fd), vector.buf, vector.buflen) - if res >= 0: - if vector.buflen - res == 0: - if not(vector.writer.finished()): - vector.writer.complete(vector.size) - else: - vector.shiftVectorBuffer(res) - transp.queue.addFirst(vector) - else: - let err = osLastError() - if int(err) == EINTR: - continue - else: - try: - transp.fd.removeWriter() - except IOSelectorsException as exc: - raiseAsDefect exc, "removeWriter" - except ValueError as exc: - raiseAsDefect exc, "removeWriter" + while len(transp.queue) > 0: + template handleError() = + let err = osLastError() - if isConnResetError(err): - # Soft error happens which indicates that remote peer got - # disconnected, complete all pending writes in queue with 0. - transp.state.incl({WriteEof, WritePaused}) - if not(vector.writer.finished()): - vector.writer.complete(0) - completePendingWriteQueue(transp.queue, 0) - else: - transp.state.incl({WriteError, WritePaused}) - let error = getTransportOsError(err) - if not(vector.writer.finished()): - vector.writer.fail(error) - failPendingWriteQueue(transp.queue, error) - else: - var nbytes = cast[int](vector.buf) - let res = sendfile(int(fd), cast[int](vector.buflen), - int(vector.offset), - nbytes) - if res >= 0: - if cast[int](vector.buf) - nbytes == 0: - vector.size += nbytes - if not(vector.writer.finished()): - vector.writer.complete(vector.size) - else: - vector.size += nbytes - vector.shiftVectorFile(nbytes) - transp.queue.addFirst(vector) - else: - let err = osLastError() - if int(err) == EINTR: - continue - else: - try: - transp.fd.removeWriter() - except IOSelectorsException as exc: - raiseAsDefect exc, "removeWriter" - except ValueError as exc: - raiseAsDefect exc, "removeWriter" - if isConnResetError(err): - # Soft error happens which indicates that remote peer got - # disconnected, complete all pending writes in queue with 0. - transp.state.incl({WriteEof, WritePaused}) - if not(vector.writer.finished()): - vector.writer.complete(0) - completePendingWriteQueue(transp.queue, 0) - else: - transp.state.incl({WriteError, WritePaused}) - let error = getTransportOsError(err) - if not(vector.writer.finished()): - vector.writer.fail(error) - failPendingWriteQueue(transp.queue, error) - break - else: - transp.state.incl(WritePaused) - try: - transp.fd.removeWriter() - except IOSelectorsException as exc: - raiseAsDefect exc, "removeWriter" - except ValueError as exc: - raiseAsDefect exc, "removeWriter" + if cint(err) == EINTR: + # Signal happened while writing - try again with all data + transp.queue.addFirst(vector) + continue + + if cint(err) in [EWOULDBLOCK, EAGAIN]: + # Socket buffer is full - wait until next write notification - in + # particular, ensure removeWriter is not called + transp.queue.addFirst(vector) + return + + # The errors below will clear the write queue, meaning we'll exit the + # loop + if isConnResetError(err): + # Soft error happens which indicates that remote peer got + # disconnected, complete all pending writes in queue with 0. + transp.state.incl({WriteEof}) + if not(vector.writer.finished()): + vector.writer.complete(0) + completePendingWriteQueue(transp.queue, 0) + else: + transp.state.incl({WriteError}) + let error = getTransportOsError(err) + if not(vector.writer.finished()): + vector.writer.fail(error) + failPendingWriteQueue(transp.queue, error) + + var vector = transp.queue.popFirst() + case vector.kind + of VectorKind.DataBuffer: + let res = + case transp.kind + of TransportKind.Socket: + posix.send(fd, vector.buf, vector.buflen, MSG_NOSIGNAL) + of TransportKind.Pipe: + posix.write(cint(fd), vector.buf, vector.buflen) + else: raiseAssert "Unsupported transport kind: " & $transp.kind + + if res >= 0: + if vector.buflen == res: + if not(vector.writer.finished()): + vector.writer.complete(vector.size) + else: + vector.shiftVectorBuffer(res) + transp.queue.addFirst(vector) # Try again with rest of data + else: + handleError() + + of VectorKind.DataFile: + var nbytes = cast[int](vector.buf) + let res = sendfile(int(fd), cast[int](vector.buflen), + int(vector.offset), nbytes) + + # In case of some errors on some systems, some bytes may have been + # written (see sendfile.nim) + vector.size += nbytes + + if res >= 0: + if cast[int](vector.buf) == nbytes: + if not(vector.writer.finished()): + vector.writer.complete(vector.size) + else: + vector.shiftVectorFile(nbytes) + transp.queue.addFirst(vector) + else: + vector.shiftVectorFile(nbytes) + handleError() + + # Nothing left in the queue - no need for further write notifications + transp.removeWriter() proc readStreamLoop(udata: pointer) = # TODO fix Defect raises - they "shouldn't" happen @@ -1700,11 +1629,17 @@ else: raiseAsDefect exc, "addReader" proc resumeWrite(transp: StreamTransport) {.inline.} = - if WritePaused in transp.state: - transp.state.excl(WritePaused) - # TODO reset flag on exception?? + if transp.queue.len() == 1: + # writeStreamLoop keeps writing until queue is empty - we should not call + # resumeWrite under any other condition than when the items are + # added to a queue - if the flag is not set here, it means that the socket + # was not removed from write notifications at the right time, and this + # would mean an imbalance in registration and deregistration + doAssert WritePaused in transp.state try: addWriter(transp.fd, writeStreamLoop, cast[pointer](transp)) + + transp.state.excl WritePaused except IOSelectorsException as exc: raiseAsDefect exc, "addWriter" except ValueError as exc: @@ -2101,6 +2036,40 @@ proc getUserData*[T](server: StreamServer): T {.inline.} = ## Obtain user data stored in ``server`` object. result = cast[T](server.udata) +template fastWrite(fd: auto, pbytes: var ptr byte, rbytes: var int, nbytes: int) = + # On windows, the write could be initiated here if there is no other write + # ongoing, but the queue is still needed due to the mechanics of iocp + + when not defined(windows): + if transp.queue.len == 0: + while rbytes > 0: + let res = posix.send(SocketHandle(fd), pbytes, rbytes, MSG_NOSIGNAL) + if res > 0: + pbytes = cast[ptr byte](cast[uint](pbytes) + cast[uint](res)) + rbytes -= res + + if rbytes == 0: + retFuture.complete(nbytes) + return retFuture + # Not all bytes written - keep going + else: + let err = osLastError() + if cint(err) in [EAGAIN, EWOULDBLOCK]: + break # No bytes written, add to queue + + if cint(err) == EINTR: + continue + + if isConnResetError(err): + transp.state.incl({WriteEof}) + retFuture.complete(0) + return retFuture + else: + transp.state.incl({WriteError}) + let error = getTransportOsError(err) + retFuture.fail(error) + return retFuture + proc write*(transp: StreamTransport, pbytes: pointer, nbytes: int): Future[int] = ## Write data from buffer ``pbytes`` with size ``nbytes`` using transport @@ -2108,8 +2077,15 @@ proc write*(transp: StreamTransport, pbytes: pointer, var retFuture = newFuture[int]("stream.transport.write(pointer)") transp.checkClosed(retFuture) transp.checkWriteEof(retFuture) + + var + pbytes = cast[ptr byte](pbytes) + rbytes = nbytes # Remaining bytes + + fastWrite(transp.fd, pbytes, rbytes, nbytes) + var vector = StreamVector(kind: DataBuffer, writer: retFuture, - buf: pbytes, buflen: nbytes, size: nbytes) + buf: pbytes, buflen: rbytes, size: nbytes) transp.queue.addLast(vector) transp.resumeWrite() return retFuture @@ -2119,15 +2095,28 @@ proc write*(transp: StreamTransport, msg: string, msglen = -1): Future[int] = var retFuture = newFutureStr[int]("stream.transport.write(string)") transp.checkClosed(retFuture) transp.checkWriteEof(retFuture) - if not(isLiteral(msg)): + + let + nbytes = if msglen <= 0: len(msg) else: msglen + + var + pbytes = cast[ptr byte](unsafeAddr msg[0]) + rbytes = nbytes + + fastWrite(transp.fd, pbytes, rbytes, nbytes) + + let + written = nbytes - rbytes # In case fastWrite wrote some + + pbytes = if not(isLiteral(msg)): shallowCopy(retFuture.gcholder, msg) + cast[ptr byte](addr retFuture.gcholder[written]) else: - retFuture.gcholder = msg - let length = if msglen <= 0: len(msg) else: msglen - var vector = StreamVector(kind: DataBuffer, - writer: cast[Future[int]](retFuture), - buf: addr retFuture.gcholder[0], buflen: length, - size: length) + retFuture.gcholder = msg[written.. 65*1024: + # We've queued 64mb on the socket and it still allows writing, + # something is wrong - we'll break here which will cause the test + # to fail + break await transp.closeWait() await sleepAsync(100.milliseconds) for i in 0 ..< len(futs): + # writes may complete via fast write if futs[i].failed() and (futs[i].error of TransportUseClosedError): inc(res) @@ -1244,29 +1238,34 @@ suite "Stream Transport test suite": for i in 0..