Fix behavior for write operations.

This commit is contained in:
cheatfate 2019-08-28 07:57:06 +03:00
parent d3eac1d643
commit 7029f8bc1e
No known key found for this signature in database
GPG Key ID: 46ADD633A7201F95
2 changed files with 65 additions and 33 deletions

View File

@ -1,5 +1,5 @@
packageName = "chronos" packageName = "chronos"
version = "2.2.8" version = "2.2.9"
author = "Status Research & Development GmbH" author = "Status Research & Development GmbH"
description = "Chronos" description = "Chronos"
license = "Apache License 2.0 or MIT" license = "Apache License 2.0 or MIT"

View File

@ -252,6 +252,13 @@ proc completePendingWriteQueue(queue: var Deque[StreamVector],
if not(vector.writer.finished()): if not(vector.writer.finished()):
vector.writer.complete(v) vector.writer.complete(v)
proc failPendingWriteQueue(queue: var Deque[StreamVector],
error: ref Exception) {.inline.} =
while len(queue) > 0:
var vector = queue.popFirst()
if not(vector.writer.finished()):
vector.writer.fail(error)
when defined(windows): when defined(windows):
template zeroOvelappedOffset(t: untyped) = template zeroOvelappedOffset(t: untyped) =
@ -324,25 +331,29 @@ when defined(windows):
vector.writer.complete(transp.wwsabuf.len) vector.writer.complete(transp.wwsabuf.len)
elif int(err) == ERROR_OPERATION_ABORTED: elif int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt # CancelIO() interrupt
transp.state.incl(WritePaused) transp.state.incl({WritePaused, WriteEof})
let v = transp.queue.popFirst() let vector = transp.queue.popFirst()
if not(v.writer.finished()): if not(vector.writer.finished()):
v.writer.complete(0) vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
break break
else: else:
let v = transp.queue.popFirst() let vector = transp.queue.popFirst()
if isConnResetError(err): if isConnResetError(err):
# Soft error happens which indicates that remote peer got # Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0. # disconnected, complete all pending writes in queue with 0.
transp.state.incl(WriteEof) transp.state.incl({WritePaused, WriteEof})
if not(v.writer.finished()): if not(vector.writer.finished()):
v.writer.complete(0) vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0) completePendingWriteQueue(transp.queue, 0)
break break
else: else:
transp.state.incl(WriteError) transp.state.incl({WritePaused, WriteError})
if not(v.writer.finished()): let error = getTransportOsError(err)
v.writer.fail(getTransportOsError(err)) if not(vector.writer.finished()):
vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
break
else: else:
## Initiation ## Initiation
transp.state.incl(WritePending) transp.state.incl(WritePending)
@ -360,9 +371,11 @@ when defined(windows):
if int(err) == ERROR_OPERATION_ABORTED: if int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt # CancelIO() interrupt
transp.state.excl(WritePending) transp.state.excl(WritePending)
transp.state.incl(WritePaused) transp.state.incl({WritePaused, WriteEof})
if not(vector.writer.finished()): if not(vector.writer.finished()):
vector.writer.complete(0) vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
break
elif int(err) == ERROR_IO_PENDING: elif int(err) == ERROR_IO_PENDING:
transp.queue.addFirst(vector) transp.queue.addFirst(vector)
else: else:
@ -377,8 +390,11 @@ when defined(windows):
break break
else: else:
transp.state.incl({WritePaused, WriteError}) transp.state.incl({WritePaused, WriteError})
let error = getTransportOsError(err)
if not(vector.writer.finished()): if not(vector.writer.finished()):
vector.writer.fail(getTransportOsError(err)) vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
break
else: else:
transp.queue.addFirst(vector) transp.queue.addFirst(vector)
else: else:
@ -400,9 +416,11 @@ when defined(windows):
if int(err) == ERROR_OPERATION_ABORTED: if int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt # CancelIO() interrupt
transp.state.excl(WritePending) transp.state.excl(WritePending)
transp.state.incl(WritePaused) transp.state.incl({WritePaused, WriteEof})
if not(vector.writer.finished()): if not(vector.writer.finished()):
vector.writer.complete(0) vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
break
elif int(err) == ERROR_IO_PENDING: elif int(err) == ERROR_IO_PENDING:
transp.queue.addFirst(vector) transp.queue.addFirst(vector)
else: else:
@ -417,8 +435,11 @@ when defined(windows):
break break
else: else:
transp.state.incl({WritePaused, WriteError}) transp.state.incl({WritePaused, WriteError})
let error = getTransportOsError(err)
if not(vector.writer.finished()): if not(vector.writer.finished()):
vector.writer.fail(getTransportOsError(err)) vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
break
else: else:
transp.queue.addFirst(vector) transp.queue.addFirst(vector)
elif transp.kind == TransportKind.Pipe: elif transp.kind == TransportKind.Pipe:
@ -432,20 +453,16 @@ when defined(windows):
cast[POVERLAPPED](addr transp.wovl)) cast[POVERLAPPED](addr transp.wovl))
if ret == 0: if ret == 0:
let err = osLastError() let err = osLastError()
if int(err) == ERROR_OPERATION_ABORTED: if int(err) in {ERROR_OPERATION_ABORTED, ERROR_NO_DATA}:
# CancelIO() interrupt # CancelIO() interrupt
transp.state.excl(WritePending) transp.state.excl(WritePending)
transp.state.incl(WritePaused) transp.state.incl({WritePaused, WriteEof})
if not(vector.writer.finished()): if not(vector.writer.finished()):
vector.writer.complete(0) vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
break
elif int(err) == ERROR_IO_PENDING: elif int(err) == ERROR_IO_PENDING:
transp.queue.addFirst(vector) transp.queue.addFirst(vector)
elif int(err) == ERROR_NO_DATA:
# The pipe is being closed.
transp.state.excl(WritePending)
transp.state.incl(WritePaused)
if not(vector.writer.finished()):
vector.writer.complete(0)
else: else:
transp.state.excl(WritePending) transp.state.excl(WritePending)
if isConnResetError(err): if isConnResetError(err):
@ -458,8 +475,11 @@ when defined(windows):
break break
else: else:
transp.state.incl({WritePaused, WriteError}) transp.state.incl({WritePaused, WriteError})
let error = getTransportOsError(err)
if not(vector.writer.finished()): if not(vector.writer.finished()):
vector.writer.fail(getTransportOsError(err)) vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
break
else: else:
transp.queue.addFirst(vector) transp.queue.addFirst(vector)
break break
@ -974,6 +994,7 @@ else:
if int(err) == EINTR: if int(err) == EINTR:
continue continue
else: else:
transp.fd.removeWriter()
if isConnResetError(err): if isConnResetError(err):
# Soft error happens which indicates that remote peer got # Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0. # disconnected, complete all pending writes in queue with 0.
@ -981,10 +1002,12 @@ else:
if not(vector.writer.finished()): if not(vector.writer.finished()):
vector.writer.complete(0) vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0) completePendingWriteQueue(transp.queue, 0)
transp.fd.removeWriter()
else: else:
transp.state.incl({WriteError, WritePaused})
let error = getTransportOsError(err)
if not(vector.writer.finished()): if not(vector.writer.finished()):
vector.writer.fail(getTransportOsError(err)) vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
else: else:
var nbytes = cast[int](vector.buf) var nbytes = cast[int](vector.buf)
let res = sendfile(int(fd), cast[int](vector.buflen), let res = sendfile(int(fd), cast[int](vector.buflen),
@ -1004,6 +1027,7 @@ else:
if int(err) == EINTR: if int(err) == EINTR:
continue continue
else: else:
transp.fd.removeWriter()
if isConnResetError(err): if isConnResetError(err):
# Soft error happens which indicates that remote peer got # Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0. # disconnected, complete all pending writes in queue with 0.
@ -1011,10 +1035,12 @@ else:
if not(vector.writer.finished()): if not(vector.writer.finished()):
vector.writer.complete(0) vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0) completePendingWriteQueue(transp.queue, 0)
transp.fd.removeWriter()
else: else:
transp.state.incl({WriteError, WritePaused})
let error = getTransportOsError(err)
if not(vector.writer.finished()): if not(vector.writer.finished()):
vector.writer.fail(getTransportOsError(err)) vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
break break
elif transp.kind == TransportKind.Pipe: elif transp.kind == TransportKind.Pipe:
@ -1032,6 +1058,7 @@ else:
if int(err) == EINTR: if int(err) == EINTR:
continue continue
else: else:
transp.fd.removeWriter()
if isConnResetError(err): if isConnResetError(err):
# Soft error happens which indicates that remote peer got # Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0. # disconnected, complete all pending writes in queue with 0.
@ -1039,10 +1066,12 @@ else:
if not(vector.writer.finished()): if not(vector.writer.finished()):
vector.writer.complete(0) vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0) completePendingWriteQueue(transp.queue, 0)
transp.fd.removeWriter()
else: else:
transp.state.incl({WriteError, WritePaused})
let error = getTransportOsError(err)
if not(vector.writer.finished()): if not(vector.writer.finished()):
vector.writer.fail(getTransportOsError(err)) vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
else: else:
var nbytes = cast[int](vector.buf) var nbytes = cast[int](vector.buf)
let res = sendfile(int(fd), cast[int](vector.buflen), let res = sendfile(int(fd), cast[int](vector.buflen),
@ -1062,6 +1091,7 @@ else:
if int(err) == EINTR: if int(err) == EINTR:
continue continue
else: else:
transp.fd.removeWriter()
if isConnResetError(err): if isConnResetError(err):
# Soft error happens which indicates that remote peer got # Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0. # disconnected, complete all pending writes in queue with 0.
@ -1069,10 +1099,12 @@ else:
if not(vector.writer.finished()): if not(vector.writer.finished()):
vector.writer.complete(0) vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0) completePendingWriteQueue(transp.queue, 0)
transp.fd.removeWriter()
else: else:
transp.state.incl({WriteError, WritePaused})
let error = getTransportOsError(err)
if not(vector.writer.finished()): if not(vector.writer.finished()):
vector.writer.fail(getTransportOsError(err)) vector.writer.fail(error)
failPendingWriteQueue(transp.queue, error)
break break
else: else:
transp.state.incl(WritePaused) transp.state.incl(WritePaused)