Ignore ECONNRESET and EPIPE errors on sending.

Add test for ECONNRESET error.
This commit is contained in:
cheatfate 2019-05-28 09:29:00 +03:00
parent 62da8212a4
commit 317ce3c797
No known key found for this signature in database
GPG Key ID: 46ADD633A7201F95
3 changed files with 101 additions and 13 deletions

View File

@ -109,6 +109,7 @@ type
WritePending, # Writer operation pending (Windows)
WritePaused, # Writer operations paused
WriteClosed, # Writer operations closed
WriteEof, # Remote peer disconnected
WriteError # Write error
var

View File

@ -245,6 +245,12 @@ proc setupStreamServerTracker(): StreamServerTracker {.gcsafe.} =
result.isLeaked = leakServer
addTracker(StreamServerTrackerName, result)
proc completePendingWriteQueue(queue: var Deque[StreamVector],
v: int) {.inline.} =
while len(queue) > 0:
var vector = queue.popFirst()
vector.writer.complete(v)
when defined(windows):
template zeroOvelappedOffset(t: untyped) =
@ -274,6 +280,11 @@ when defined(windows):
(t).wwsabuf.buf = cast[cstring](v.buf)
(t).wwsabuf.len = cast[int32](v.buflen)
proc isConnResetError(err: OSErrorCode): bool {.inline.} =
result = (err == OSErrorCode(WSAECONNRESET)) or
(err == OSErrorCode(WSAECONNABORTED)) or
(err == OSErrorCode(ERROR_PIPE_NOT_CONNECTED))
proc writeStreamLoop(udata: pointer) {.gcsafe, nimcall.} =
var bytesCount: int32
var ovl = cast[PtrCustomOverlapped](udata)
@ -318,6 +329,14 @@ when defined(windows):
break
else:
let v = transp.queue.popFirst()
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl(WriteEof)
v.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
break
else:
transp.state.incl(WriteError)
v.writer.fail(getTransportOsError(err))
else:
@ -343,7 +362,15 @@ when defined(windows):
transp.queue.addFirst(vector)
else:
transp.state.excl(WritePending)
transp.state = transp.state + {WritePaused, WriteError}
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl({WritePaused, WriteEof})
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
break
else:
transp.state.incl({WritePaused, WriteError})
vector.writer.fail(getTransportOsError(err))
else:
transp.queue.addFirst(vector)
@ -372,7 +399,15 @@ when defined(windows):
transp.queue.addFirst(vector)
else:
transp.state.excl(WritePending)
transp.state = transp.state + {WritePaused, WriteError}
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl({WritePaused, WriteEof})
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
break
else:
transp.state.incl({WritePaused, WriteError})
vector.writer.fail(getTransportOsError(err))
else:
transp.queue.addFirst(vector)
@ -401,7 +436,15 @@ when defined(windows):
vector.writer.complete(0)
else:
transp.state.excl(WritePending)
transp.state = transp.state + {WritePaused, WriteError}
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl({WritePaused, WriteEof})
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
break
else:
transp.state.incl({WritePaused, WriteError})
vector.writer.fail(getTransportOsError(err))
else:
transp.queue.addFirst(vector)
@ -877,6 +920,10 @@ else:
(v).buflen = int(n)
(v).writer = (t)
proc isConnResetError(err: OSErrorCode): bool {.inline.} =
result = (err == OSErrorCode(ECONNRESET)) or
(err == OSErrorCode(EPIPE))
proc writeStreamLoop(udata: pointer) {.gcsafe.} =
var cdata = cast[ptr CompletionData](udata)
var transp = cast[StreamTransport](cdata.udata)
@ -903,6 +950,14 @@ else:
let err = osLastError()
if int(err) == EINTR:
continue
else:
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl({WriteEof, WritePaused})
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
transp.fd.removeWriter()
else:
vector.writer.fail(getTransportOsError(err))
else:
@ -922,6 +977,14 @@ else:
let err = osLastError()
if int(err) == EINTR:
continue
else:
if isConnResetError(err):
# Soft error happens which indicates that remote peer got
# disconnected, complete all pending writes in queue with 0.
transp.state.incl({WriteEof, WritePaused})
vector.writer.complete(0)
completePendingWriteQueue(transp.queue, 0)
transp.fd.removeWriter()
else:
vector.writer.fail(getTransportOsError(err))
break
@ -1475,7 +1538,7 @@ proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int,
## Read data from the transport ``transp`` until separator ``sep`` is found.
##
## On success, the data and separator will be removed from the internal
## buffer (consumed). Returned data will NOT include the separator at the end.
## buffer (consumed). Returned data will include the separator at the end.
##
## If EOF is received, and `sep` was not found, procedure will raise
## ``TransportIncompleteError``.
@ -1594,7 +1657,7 @@ proc read*(transp: StreamTransport, n = -1): Future[seq[byte]] {.async.} =
if transp.offset > 0:
let s = len(result)
let o = s + transp.offset
if n < 0:
if n <= 0:
# grabbing all incoming data, until EOF
result.setLen(o)
copyMem(cast[pointer](addr result[s]), addr(transp.buffer[0]),
@ -1637,7 +1700,7 @@ proc consume*(transp: StreamTransport, n = -1): Future[int] {.async.} =
break
if transp.offset > 0:
if n == -1:
if n <= 0:
# consume all incoming data, until EOF
result += transp.offset
transp.offset = 0

View File

@ -692,6 +692,28 @@ suite "Stream Transport test suite":
except:
discard
proc testWriteConnReset(address: TransportAddress): Future[int] {.async.} =
proc client(server: StreamServer, transp: StreamTransport) {.async.} =
await transp.closeWait()
var n = 10
var server = createStreamServer(address, client, {ReuseAddr})
server.start()
var msg = "HELLO"
var ntransp = await connect(address)
while true:
var res = await ntransp.write(msg)
if res == 0:
result = 1
break
else:
dec(n)
if n == 0:
break
server.stop()
await ntransp.closeWait()
await server.closeWait()
for i in 0..<len(addresses):
test prefixes[i] & "close(transport) test":
check waitFor(testCloseTransport(addresses[i])) == 1
@ -734,6 +756,8 @@ suite "Stream Transport test suite":
check waitFor(testConnectionRefused(address)) == true
test prefixes[i] & m16:
check waitFor(test16(addresses[i])) == 1
test prefixes[i] & "Connection reset test on send() only":
check waitFor(testWriteConnReset(addresses[i])) == 1
test "Servers leak test":
check getTracker("stream.server").isLeaked() == false