From 7029f8bc1e98e6b4360ffcfe37e1602cf25fd6c8 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Wed, 28 Aug 2019 07:57:06 +0300 Subject: [PATCH] Fix behavior for write operations. --- chronos.nimble | 2 +- chronos/transports/stream.nim | 96 +++++++++++++++++++++++------------ 2 files changed, 65 insertions(+), 33 deletions(-) diff --git a/chronos.nimble b/chronos.nimble index 02840cd..5a9f608 100644 --- a/chronos.nimble +++ b/chronos.nimble @@ -1,5 +1,5 @@ packageName = "chronos" -version = "2.2.8" +version = "2.2.9" author = "Status Research & Development GmbH" description = "Chronos" license = "Apache License 2.0 or MIT" diff --git a/chronos/transports/stream.nim b/chronos/transports/stream.nim index 489a27f..041a3a0 100644 --- a/chronos/transports/stream.nim +++ b/chronos/transports/stream.nim @@ -252,6 +252,13 @@ proc completePendingWriteQueue(queue: var Deque[StreamVector], if not(vector.writer.finished()): vector.writer.complete(v) +proc failPendingWriteQueue(queue: var Deque[StreamVector], + error: ref Exception) {.inline.} = + while len(queue) > 0: + var vector = queue.popFirst() + if not(vector.writer.finished()): + vector.writer.fail(error) + when defined(windows): template zeroOvelappedOffset(t: untyped) = @@ -324,25 +331,29 @@ when defined(windows): vector.writer.complete(transp.wwsabuf.len) elif int(err) == ERROR_OPERATION_ABORTED: # CancelIO() interrupt - transp.state.incl(WritePaused) - let v = transp.queue.popFirst() - if not(v.writer.finished()): - v.writer.complete(0) + transp.state.incl({WritePaused, WriteEof}) + let vector = transp.queue.popFirst() + if not(vector.writer.finished()): + vector.writer.complete(0) + completePendingWriteQueue(transp.queue, 0) break else: - let v = transp.queue.popFirst() + let vector = transp.queue.popFirst() if isConnResetError(err): # Soft error happens which indicates that remote peer got # disconnected, complete all pending writes in queue with 0. - transp.state.incl(WriteEof) - if not(v.writer.finished()): - v.writer.complete(0) + transp.state.incl({WritePaused, WriteEof}) + if not(vector.writer.finished()): + vector.writer.complete(0) completePendingWriteQueue(transp.queue, 0) break else: - transp.state.incl(WriteError) - if not(v.writer.finished()): - v.writer.fail(getTransportOsError(err)) + transp.state.incl({WritePaused, WriteError}) + let error = getTransportOsError(err) + if not(vector.writer.finished()): + vector.writer.fail(error) + failPendingWriteQueue(transp.queue, error) + break else: ## Initiation transp.state.incl(WritePending) @@ -360,9 +371,11 @@ when defined(windows): if int(err) == ERROR_OPERATION_ABORTED: # CancelIO() interrupt transp.state.excl(WritePending) - transp.state.incl(WritePaused) + transp.state.incl({WritePaused, WriteEof}) if not(vector.writer.finished()): vector.writer.complete(0) + completePendingWriteQueue(transp.queue, 0) + break elif int(err) == ERROR_IO_PENDING: transp.queue.addFirst(vector) else: @@ -377,8 +390,11 @@ when defined(windows): break else: transp.state.incl({WritePaused, WriteError}) + let error = getTransportOsError(err) if not(vector.writer.finished()): - vector.writer.fail(getTransportOsError(err)) + vector.writer.fail(error) + failPendingWriteQueue(transp.queue, error) + break else: transp.queue.addFirst(vector) else: @@ -400,9 +416,11 @@ when defined(windows): if int(err) == ERROR_OPERATION_ABORTED: # CancelIO() interrupt transp.state.excl(WritePending) - transp.state.incl(WritePaused) + transp.state.incl({WritePaused, WriteEof}) if not(vector.writer.finished()): vector.writer.complete(0) + completePendingWriteQueue(transp.queue, 0) + break elif int(err) == ERROR_IO_PENDING: transp.queue.addFirst(vector) else: @@ -417,8 +435,11 @@ when defined(windows): break else: transp.state.incl({WritePaused, WriteError}) + let error = getTransportOsError(err) if not(vector.writer.finished()): - vector.writer.fail(getTransportOsError(err)) + vector.writer.fail(error) + failPendingWriteQueue(transp.queue, error) + break else: transp.queue.addFirst(vector) elif transp.kind == TransportKind.Pipe: @@ -432,20 +453,16 @@ when defined(windows): cast[POVERLAPPED](addr transp.wovl)) if ret == 0: let err = osLastError() - if int(err) == ERROR_OPERATION_ABORTED: + if int(err) in {ERROR_OPERATION_ABORTED, ERROR_NO_DATA}: # CancelIO() interrupt transp.state.excl(WritePending) - transp.state.incl(WritePaused) + transp.state.incl({WritePaused, WriteEof}) if not(vector.writer.finished()): vector.writer.complete(0) + completePendingWriteQueue(transp.queue, 0) + break elif int(err) == ERROR_IO_PENDING: transp.queue.addFirst(vector) - elif int(err) == ERROR_NO_DATA: - # The pipe is being closed. - transp.state.excl(WritePending) - transp.state.incl(WritePaused) - if not(vector.writer.finished()): - vector.writer.complete(0) else: transp.state.excl(WritePending) if isConnResetError(err): @@ -458,8 +475,11 @@ when defined(windows): break else: transp.state.incl({WritePaused, WriteError}) + let error = getTransportOsError(err) if not(vector.writer.finished()): - vector.writer.fail(getTransportOsError(err)) + vector.writer.fail(error) + failPendingWriteQueue(transp.queue, error) + break else: transp.queue.addFirst(vector) break @@ -974,6 +994,7 @@ else: if int(err) == EINTR: continue else: + transp.fd.removeWriter() if isConnResetError(err): # Soft error happens which indicates that remote peer got # disconnected, complete all pending writes in queue with 0. @@ -981,10 +1002,12 @@ else: if not(vector.writer.finished()): vector.writer.complete(0) completePendingWriteQueue(transp.queue, 0) - transp.fd.removeWriter() else: + transp.state.incl({WriteError, WritePaused}) + let error = getTransportOsError(err) if not(vector.writer.finished()): - vector.writer.fail(getTransportOsError(err)) + vector.writer.fail(error) + failPendingWriteQueue(transp.queue, error) else: var nbytes = cast[int](vector.buf) let res = sendfile(int(fd), cast[int](vector.buflen), @@ -1004,6 +1027,7 @@ else: if int(err) == EINTR: continue else: + transp.fd.removeWriter() if isConnResetError(err): # Soft error happens which indicates that remote peer got # disconnected, complete all pending writes in queue with 0. @@ -1011,10 +1035,12 @@ else: if not(vector.writer.finished()): vector.writer.complete(0) completePendingWriteQueue(transp.queue, 0) - transp.fd.removeWriter() else: + transp.state.incl({WriteError, WritePaused}) + let error = getTransportOsError(err) if not(vector.writer.finished()): - vector.writer.fail(getTransportOsError(err)) + vector.writer.fail(error) + failPendingWriteQueue(transp.queue, error) break elif transp.kind == TransportKind.Pipe: @@ -1032,6 +1058,7 @@ else: if int(err) == EINTR: continue else: + transp.fd.removeWriter() if isConnResetError(err): # Soft error happens which indicates that remote peer got # disconnected, complete all pending writes in queue with 0. @@ -1039,10 +1066,12 @@ else: if not(vector.writer.finished()): vector.writer.complete(0) completePendingWriteQueue(transp.queue, 0) - transp.fd.removeWriter() else: + transp.state.incl({WriteError, WritePaused}) + let error = getTransportOsError(err) if not(vector.writer.finished()): - vector.writer.fail(getTransportOsError(err)) + vector.writer.fail(error) + failPendingWriteQueue(transp.queue, error) else: var nbytes = cast[int](vector.buf) let res = sendfile(int(fd), cast[int](vector.buflen), @@ -1062,6 +1091,7 @@ else: if int(err) == EINTR: continue else: + transp.fd.removeWriter() if isConnResetError(err): # Soft error happens which indicates that remote peer got # disconnected, complete all pending writes in queue with 0. @@ -1069,10 +1099,12 @@ else: if not(vector.writer.finished()): vector.writer.complete(0) completePendingWriteQueue(transp.queue, 0) - transp.fd.removeWriter() else: + transp.state.incl({WriteError, WritePaused}) + let error = getTransportOsError(err) if not(vector.writer.finished()): - vector.writer.fail(getTransportOsError(err)) + vector.writer.fail(error) + failPendingWriteQueue(transp.queue, error) break else: transp.state.incl(WritePaused)