Address #320 issue. (#372)

This commit is contained in:
Eugene Kabanov 2023-03-24 17:34:45 +02:00 committed by GitHub
parent 0688d2ef8f
commit b0af576c7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 106 additions and 88 deletions

View File

@ -1065,60 +1065,69 @@ when defined(windows):
return retFuture
proc continuationSocket(udata: pointer) {.gcsafe.} =
var ovl = cast[PtrCustomOverlapped](udata)
var server = cast[StreamServer](ovl.data.udata)
if retFuture.finished():
# `retFuture` could become finished in 2 cases:
# 1. OS sends IOCP notification about failure, but we already failed
# `retFuture` with proper error.
# 2. `accept()` call has been cancelled. Cancellation callback closed
# accepting socket, so OS sends IOCP notification with an
# `ERROR_OPERATION_ABORTED` error.
return
var
ovl = cast[PtrCustomOverlapped](udata)
server = cast[StreamServer](ovl.data.udata)
server.apending = false
if not(retFuture.finished()):
if server.status in {ServerStatus.Stopped, ServerStatus.Closed}:
retFuture.fail(getServerUseClosedError())
if server.status in {ServerStatus.Stopped, ServerStatus.Closed}:
retFuture.fail(getServerUseClosedError())
server.asock.closeSocket()
server.clean()
else:
case ovl.data.errCode
of OSErrorCode(-1):
if setsockopt(SocketHandle(server.asock), cint(osdefs.SOL_SOCKET),
cint(osdefs.SO_UPDATE_ACCEPT_CONTEXT),
addr server.sock,
SockLen(sizeof(SocketHandle))) != 0'i32:
let err = osLastError()
server.asock.closeSocket()
if err == osdefs.WSAENOTSOCK:
# This can be happened when server get closed, but continuation
# was already scheduled, so we failing it not with OS error.
retFuture.fail(getServerUseClosedError())
else:
let errorMsg = osErrorMsg(err)
retFuture.fail(getConnectionAbortedError(errorMsg))
else:
var ntransp: StreamTransport
if not(isNil(server.init)):
let transp = server.init(server, server.asock)
ntransp = newStreamSocketTransport(server.asock,
server.bufferSize,
transp)
else:
ntransp = newStreamSocketTransport(server.asock,
server.bufferSize, nil)
# Start tracking transport
trackStream(ntransp)
retFuture.complete(ntransp)
of OSErrorCode(osdefs.ERROR_OPERATION_ABORTED):
# CancelIO() interrupt or close.
server.asock.closeSocket()
retFuture.fail(getServerUseClosedError())
server.clean()
of OSErrorCode(osdefs.WSAENETDOWN),
OSErrorCode(osdefs.WSAENETRESET),
OSErrorCode(osdefs.WSAECONNABORTED),
OSErrorCode(osdefs.WSAECONNRESET),
OSErrorCode(osdefs.WSAETIMEDOUT):
server.asock.closeSocket()
retFuture.fail(getConnectionAbortedError(int(ovl.data.errCode)))
server.clean()
else:
case ovl.data.errCode
of OSErrorCode(-1):
if setsockopt(SocketHandle(server.asock), cint(osdefs.SOL_SOCKET),
cint(osdefs.SO_UPDATE_ACCEPT_CONTEXT),
addr server.sock,
SockLen(sizeof(SocketHandle))) != 0'i32:
let err = osLastError()
server.asock.closeSocket()
if err == osdefs.WSAENOTSOCK:
# This can be happened when server get closed, but continuation
# was already scheduled, so we failing it not with OS error.
retFuture.fail(getServerUseClosedError())
else:
let errorMsg = osErrorMsg(err)
retFuture.fail(getConnectionAbortedError(errorMsg))
else:
var ntransp: StreamTransport
if not(isNil(server.init)):
let transp = server.init(server, server.asock)
ntransp = newStreamSocketTransport(server.asock,
server.bufferSize,
transp)
else:
ntransp = newStreamSocketTransport(server.asock,
server.bufferSize, nil)
# Start tracking transport
trackStream(ntransp)
retFuture.complete(ntransp)
of OSErrorCode(osdefs.ERROR_OPERATION_ABORTED):
# CancelIO() interrupt or close.
server.asock.closeSocket()
retFuture.fail(getServerUseClosedError())
server.clean()
of OSErrorCode(osdefs.WSAENETDOWN),
OSErrorCode(osdefs.WSAENETRESET),
OSErrorCode(osdefs.WSAECONNABORTED),
OSErrorCode(osdefs.WSAECONNRESET),
OSErrorCode(osdefs.WSAETIMEDOUT):
server.asock.closeSocket()
retFuture.fail(getConnectionAbortedError(int(ovl.data.errCode)))
server.clean()
else:
server.asock.closeSocket()
retFuture.fail(getTransportOsError(ovl.data.errCode))
server.asock.closeSocket()
retFuture.fail(getTransportOsError(ovl.data.errCode))
proc cancellationSocket(udata: pointer) {.gcsafe.} =
if server.apending:
@ -1126,50 +1135,59 @@ when defined(windows):
server.asock.closeSocket()
proc continuationPipe(udata: pointer) {.gcsafe.} =
var ovl = cast[PtrCustomOverlapped](udata)
var server = cast[StreamServer](ovl.data.udata)
if retFuture.finished():
# `retFuture` could become finished in 2 cases:
# 1. OS sends IOCP notification about failure, but we already failed
# `retFuture` with proper error.
# 2. `accept()` call has been cancelled. Cancellation callback closed
# accepting socket, so OS sends IOCP notification with an
# `ERROR_OPERATION_ABORTED` error.
return
var
ovl = cast[PtrCustomOverlapped](udata)
server = cast[StreamServer](ovl.data.udata)
server.apending = false
if not(retFuture.finished()):
if server.status in {ServerStatus.Stopped, ServerStatus.Closed}:
if server.status in {ServerStatus.Stopped, ServerStatus.Closed}:
retFuture.fail(getServerUseClosedError())
server.sock.closeHandle()
server.clean()
else:
if ovl.data.errCode == OSErrorCode(-1):
var ntransp: StreamTransport
var flags = {WinServerPipe}
if NoPipeFlash in server.flags:
flags.incl(WinNoPipeFlash)
if not(isNil(server.init)):
var transp = server.init(server, server.sock)
ntransp = newStreamPipeTransport(server.sock, server.bufferSize,
transp, flags)
else:
ntransp = newStreamPipeTransport(server.sock, server.bufferSize,
nil, flags)
server.sock = server.createAcceptPipe().valueOr:
server.sock = asyncInvalidSocket
server.errorCode = error
retFuture.fail(getTransportOsError(error))
return
trackStream(ntransp)
retFuture.complete(ntransp)
elif int32(ovl.data.errCode) in {osdefs.ERROR_OPERATION_ABORTED,
osdefs.ERROR_PIPE_NOT_CONNECTED}:
# CancelIO() interrupt or close call.
retFuture.fail(getServerUseClosedError())
server.sock.closeHandle()
server.clean()
else:
if ovl.data.errCode == OSErrorCode(-1):
var ntransp: StreamTransport
var flags = {WinServerPipe}
if NoPipeFlash in server.flags:
flags.incl(WinNoPipeFlash)
if not(isNil(server.init)):
var transp = server.init(server, server.sock)
ntransp = newStreamPipeTransport(server.sock, server.bufferSize,
transp, flags)
else:
ntransp = newStreamPipeTransport(server.sock, server.bufferSize,
nil, flags)
server.sock = server.createAcceptPipe().valueOr:
server.sock = asyncInvalidSocket
server.errorCode = error
retFuture.fail(getTransportOsError(error))
return
trackStream(ntransp)
retFuture.complete(ntransp)
elif int32(ovl.data.errCode) in {osdefs.ERROR_OPERATION_ABORTED,
osdefs.ERROR_PIPE_NOT_CONNECTED}:
# CancelIO() interrupt or close call.
retFuture.fail(getServerUseClosedError())
server.clean()
else:
discard closeHandle(HANDLE(server.sock))
server.sock = server.createAcceptPipe().valueOr:
server.sock = asyncInvalidSocket
server.errorCode = error
retFuture.fail(getTransportOsError(error))
return
retFuture.fail(getTransportOsError(ovl.data.errCode))
discard closeHandle(HANDLE(server.sock))
server.sock = server.createAcceptPipe().valueOr:
server.sock = asyncInvalidSocket
server.errorCode = error
retFuture.fail(getTransportOsError(error))
return
retFuture.fail(getTransportOsError(ovl.data.errCode))
proc cancellationPipe(udata: pointer) {.gcsafe.} =
if server.apending: