Fix accept defect (#206)
* Fix accept() FutureDefect on cancellation. * Do not close pipe twice. * Check for retFuture state before changing it state.
This commit is contained in:
parent
15137f71c3
commit
14ebf269e9
|
@ -1032,44 +1032,45 @@ when defined(windows):
|
|||
var server = cast[StreamServer](ovl.data.udata)
|
||||
|
||||
server.apending = false
|
||||
if server.status in {ServerStatus.Stopped, ServerStatus.Closed}:
|
||||
server.asock.closeSocket()
|
||||
retFuture.fail(getServerUseClosedError())
|
||||
server.clean()
|
||||
else:
|
||||
if ovl.data.errCode == OSErrorCode(-1):
|
||||
if setsockopt(SocketHandle(server.asock), cint(SOL_SOCKET),
|
||||
cint(SO_UPDATE_ACCEPT_CONTEXT), addr server.sock,
|
||||
SockLen(sizeof(SocketHandle))) != 0'i32:
|
||||
let err = OSErrorCode(wsaGetLastError())
|
||||
server.asock.closeSocket()
|
||||
if int32(err) == WSAENOTSOCK:
|
||||
# This can be happened when server get closed, but continuation
|
||||
# was already scheduled, so we failing it not with OS error.
|
||||
retFuture.fail(getServerUseClosedError())
|
||||
else:
|
||||
retFuture.fail(getTransportOsError(err))
|
||||
else:
|
||||
var ntransp: StreamTransport
|
||||
if not isNil(server.init):
|
||||
let transp = server.init(server, server.asock)
|
||||
ntransp = newStreamSocketTransport(server.asock,
|
||||
server.bufferSize,
|
||||
transp)
|
||||
else:
|
||||
ntransp = newStreamSocketTransport(server.asock,
|
||||
server.bufferSize, nil)
|
||||
# Start tracking transport
|
||||
trackStream(ntransp)
|
||||
retFuture.complete(ntransp)
|
||||
elif int32(ovl.data.errCode) == ERROR_OPERATION_ABORTED:
|
||||
# CancelIO() interrupt or close.
|
||||
server.asock.closeSocket()
|
||||
if not(retFuture.finished()):
|
||||
if server.status in {ServerStatus.Stopped, ServerStatus.Closed}:
|
||||
retFuture.fail(getServerUseClosedError())
|
||||
server.asock.closeSocket()
|
||||
server.clean()
|
||||
else:
|
||||
server.asock.closeSocket()
|
||||
retFuture.fail(getTransportOsError(ovl.data.errCode))
|
||||
if ovl.data.errCode == OSErrorCode(-1):
|
||||
if setsockopt(SocketHandle(server.asock), cint(SOL_SOCKET),
|
||||
cint(SO_UPDATE_ACCEPT_CONTEXT), addr server.sock,
|
||||
SockLen(sizeof(SocketHandle))) != 0'i32:
|
||||
let err = OSErrorCode(wsaGetLastError())
|
||||
server.asock.closeSocket()
|
||||
if int32(err) == WSAENOTSOCK:
|
||||
# This can be happened when server get closed, but continuation
|
||||
# was already scheduled, so we failing it not with OS error.
|
||||
retFuture.fail(getServerUseClosedError())
|
||||
else:
|
||||
retFuture.fail(getTransportOsError(err))
|
||||
else:
|
||||
var ntransp: StreamTransport
|
||||
if not isNil(server.init):
|
||||
let transp = server.init(server, server.asock)
|
||||
ntransp = newStreamSocketTransport(server.asock,
|
||||
server.bufferSize,
|
||||
transp)
|
||||
else:
|
||||
ntransp = newStreamSocketTransport(server.asock,
|
||||
server.bufferSize, nil)
|
||||
# Start tracking transport
|
||||
trackStream(ntransp)
|
||||
retFuture.complete(ntransp)
|
||||
elif int32(ovl.data.errCode) == ERROR_OPERATION_ABORTED:
|
||||
# CancelIO() interrupt or close.
|
||||
server.asock.closeSocket()
|
||||
retFuture.fail(getServerUseClosedError())
|
||||
server.clean()
|
||||
else:
|
||||
server.asock.closeSocket()
|
||||
retFuture.fail(getTransportOsError(ovl.data.errCode))
|
||||
|
||||
proc cancellationSocket(udata: pointer) {.gcsafe.} =
|
||||
server.asock.closeSocket()
|
||||
|
@ -1079,48 +1080,50 @@ when defined(windows):
|
|||
var server = cast[StreamServer](ovl.data.udata)
|
||||
|
||||
server.apending = false
|
||||
if server.status in {ServerStatus.Stopped, ServerStatus.Closed}:
|
||||
retFuture.fail(getServerUseClosedError())
|
||||
server.clean()
|
||||
else:
|
||||
if ovl.data.errCode == OSErrorCode(-1):
|
||||
var ntransp: StreamTransport
|
||||
var flags = {WinServerPipe}
|
||||
if NoPipeFlash in server.flags:
|
||||
flags.incl(WinNoPipeFlash)
|
||||
if not isNil(server.init):
|
||||
var transp = server.init(server, server.sock)
|
||||
ntransp = newStreamPipeTransport(server.sock, server.bufferSize,
|
||||
transp, flags)
|
||||
else:
|
||||
ntransp = newStreamPipeTransport(server.sock, server.bufferSize,
|
||||
nil, flags)
|
||||
# Start tracking transport
|
||||
try:
|
||||
server.createAcceptPipe()
|
||||
except CatchableError as exc:
|
||||
closeHandle(server.sock)
|
||||
retFuture.fail(exc)
|
||||
return
|
||||
trackStream(ntransp)
|
||||
retFuture.complete(ntransp)
|
||||
|
||||
elif int32(ovl.data.errCode) in {ERROR_OPERATION_ABORTED,
|
||||
ERROR_PIPE_NOT_CONNECTED}:
|
||||
# CancelIO() interrupt or close call.
|
||||
if not(retFuture.finished()):
|
||||
if server.status in {ServerStatus.Stopped, ServerStatus.Closed}:
|
||||
retFuture.fail(getServerUseClosedError())
|
||||
server.sock.closeHandle()
|
||||
server.clean()
|
||||
else:
|
||||
let sock = server.sock
|
||||
try:
|
||||
server.createAcceptPipe()
|
||||
except CatchableError as exc:
|
||||
closeHandle(sock)
|
||||
retFuture.fail(exc)
|
||||
return
|
||||
closeHandle(sock)
|
||||
if ovl.data.errCode == OSErrorCode(-1):
|
||||
var ntransp: StreamTransport
|
||||
var flags = {WinServerPipe}
|
||||
if NoPipeFlash in server.flags:
|
||||
flags.incl(WinNoPipeFlash)
|
||||
if not isNil(server.init):
|
||||
var transp = server.init(server, server.sock)
|
||||
ntransp = newStreamPipeTransport(server.sock, server.bufferSize,
|
||||
transp, flags)
|
||||
else:
|
||||
ntransp = newStreamPipeTransport(server.sock, server.bufferSize,
|
||||
nil, flags)
|
||||
# Start tracking transport
|
||||
try:
|
||||
server.createAcceptPipe()
|
||||
except CatchableError as exc:
|
||||
closeHandle(server.sock)
|
||||
retFuture.fail(exc)
|
||||
return
|
||||
trackStream(ntransp)
|
||||
retFuture.complete(ntransp)
|
||||
|
||||
retFuture.fail(getTransportOsError(ovl.data.errCode))
|
||||
elif int32(ovl.data.errCode) in {ERROR_OPERATION_ABORTED,
|
||||
ERROR_PIPE_NOT_CONNECTED}:
|
||||
# CancelIO() interrupt or close call.
|
||||
retFuture.fail(getServerUseClosedError())
|
||||
server.clean()
|
||||
else:
|
||||
let sock = server.sock
|
||||
try:
|
||||
server.createAcceptPipe()
|
||||
except CatchableError as exc:
|
||||
closeHandle(sock)
|
||||
retFuture.fail(exc)
|
||||
return
|
||||
closeHandle(sock)
|
||||
|
||||
retFuture.fail(getTransportOsError(ovl.data.errCode))
|
||||
|
||||
proc cancellationPipe(udata: pointer) {.gcsafe.} =
|
||||
server.sock.closeHandle()
|
||||
|
@ -1698,51 +1701,56 @@ else:
|
|||
saddr: Sockaddr_storage
|
||||
slen: SockLen
|
||||
|
||||
if server.status in {ServerStatus.Stopped, ServerStatus.Closed}:
|
||||
retFuture.fail(getServerUseClosedError())
|
||||
else:
|
||||
while true:
|
||||
let res = posix.accept(SocketHandle(server.sock),
|
||||
cast[ptr SockAddr](addr saddr), addr slen)
|
||||
if int(res) > 0:
|
||||
let sock = try: wrapAsyncSocket(res)
|
||||
except CatchableError as exc:
|
||||
close(res)
|
||||
retFuture.fail(exc)
|
||||
return
|
||||
if not(retFuture.finished()):
|
||||
if server.status in {ServerStatus.Stopped, ServerStatus.Closed}:
|
||||
retFuture.fail(getServerUseClosedError())
|
||||
else:
|
||||
while true:
|
||||
let res = posix.accept(SocketHandle(server.sock),
|
||||
cast[ptr SockAddr](addr saddr), addr slen)
|
||||
if int(res) > 0:
|
||||
let sock =
|
||||
try:
|
||||
wrapAsyncSocket(res)
|
||||
except CatchableError as exc:
|
||||
close(res)
|
||||
retFuture.fail(exc)
|
||||
return
|
||||
|
||||
if sock != asyncInvalidSocket:
|
||||
var ntransp: StreamTransport
|
||||
if not isNil(server.init):
|
||||
let transp = server.init(server, sock)
|
||||
ntransp = newStreamSocketTransport(sock, server.bufferSize,
|
||||
transp)
|
||||
if sock != asyncInvalidSocket:
|
||||
var ntransp: StreamTransport
|
||||
if not isNil(server.init):
|
||||
let transp = server.init(server, sock)
|
||||
ntransp = newStreamSocketTransport(sock, server.bufferSize,
|
||||
transp)
|
||||
else:
|
||||
ntransp = newStreamSocketTransport(sock, server.bufferSize,
|
||||
nil)
|
||||
# Start tracking transport
|
||||
trackStream(ntransp)
|
||||
retFuture.complete(ntransp)
|
||||
else:
|
||||
ntransp = newStreamSocketTransport(sock, server.bufferSize, nil)
|
||||
# Start tracking transport
|
||||
trackStream(ntransp)
|
||||
retFuture.complete(ntransp)
|
||||
retFuture.fail(getTransportOsError(osLastError()))
|
||||
else:
|
||||
retFuture.fail(getTransportOsError(osLastError()))
|
||||
else:
|
||||
let err = osLastError()
|
||||
if int(err) == EINTR:
|
||||
continue
|
||||
elif int(err) == EAGAIN:
|
||||
# This error appears only when server get closed, while accept()
|
||||
# continuation is already scheduled.
|
||||
retFuture.fail(getServerUseClosedError())
|
||||
elif int(err) == EMFILE:
|
||||
retFuture.fail(getTransportTooManyError())
|
||||
else:
|
||||
retFuture.fail(getTransportOsError(err))
|
||||
break
|
||||
try:
|
||||
removeReader(server.sock)
|
||||
except IOSelectorsException as exc:
|
||||
raiseAsDefect exc, "removeReader"
|
||||
except ValueError as exc:
|
||||
raiseAsDefect exc, "removeReader"
|
||||
let err = osLastError()
|
||||
if int(err) == EINTR:
|
||||
continue
|
||||
elif int(err) == EAGAIN:
|
||||
# This error appears only when server get closed, while accept()
|
||||
# continuation is already scheduled.
|
||||
retFuture.fail(getServerUseClosedError())
|
||||
elif int(err) == EMFILE:
|
||||
retFuture.fail(getTransportTooManyError())
|
||||
else:
|
||||
retFuture.fail(getTransportOsError(err))
|
||||
break
|
||||
|
||||
try:
|
||||
removeReader(server.sock)
|
||||
except IOSelectorsException as exc:
|
||||
raiseAsDefect exc, "removeReader"
|
||||
except ValueError as exc:
|
||||
raiseAsDefect exc, "removeReader"
|
||||
|
||||
proc cancellation(udata: pointer) =
|
||||
try:
|
||||
|
|
Loading…
Reference in New Issue