fix crash after reading from fd > 1024 (#267)
The socket selector holds a `seq` of per-descriptor data. When a reader is registered, a pointer to a seq item is stored - when the `seq` grows, this pointer becomes dangling and causes crashes like https://github.com/status-im/nimbus-eth2/issues/3521. It turns out that there already exist two mechanisms for passing user data around - this PR simply removes one of them, saving on memory usage and removing the need to store pointers to the `seq` data that become dangling on resize.
This commit is contained in:
parent
ae2a87778f
commit
bb4c3298f5
|
@ -299,7 +299,6 @@ when defined(windows):
|
|||
CompletionKey = ULONG_PTR
|
||||
|
||||
CompletionData* = object
|
||||
fd*: AsyncFD
|
||||
cb*: CallbackFunc
|
||||
errCode*: OSErrorCode
|
||||
bytesCount*: int32
|
||||
|
@ -500,17 +499,9 @@ elif unixPlatform:
|
|||
type
|
||||
AsyncFD* = distinct cint
|
||||
|
||||
CompletionData* = object
|
||||
fd*: AsyncFD
|
||||
udata*: pointer
|
||||
|
||||
PCompletionData* = ptr CompletionData
|
||||
|
||||
SelectorData* = object
|
||||
reader*: AsyncCallback
|
||||
rdata*: CompletionData
|
||||
writer*: AsyncCallback
|
||||
wdata*: CompletionData
|
||||
|
||||
PDispatcher* = ref object of PDispatcherBase
|
||||
selector: Selector[SelectorData]
|
||||
|
@ -555,8 +546,6 @@ elif unixPlatform:
|
|||
## Register file descriptor ``fd`` in thread's dispatcher.
|
||||
let loop = getThreadDispatcher()
|
||||
var data: SelectorData
|
||||
data.rdata.fd = fd
|
||||
data.wdata.fd = fd
|
||||
loop.selector.registerHandle(int(fd), {}, data)
|
||||
|
||||
proc unregister*(fd: AsyncFD) {.raises: [Defect, CatchableError].} =
|
||||
|
@ -574,9 +563,8 @@ elif unixPlatform:
|
|||
let loop = getThreadDispatcher()
|
||||
var newEvents = {Event.Read}
|
||||
withData(loop.selector, int(fd), adata) do:
|
||||
let acb = AsyncCallback(function: cb, udata: addr adata.rdata)
|
||||
let acb = AsyncCallback(function: cb, udata: udata)
|
||||
adata.reader = acb
|
||||
adata.rdata = CompletionData(fd: fd, udata: udata)
|
||||
newEvents.incl(Event.Read)
|
||||
if not(isNil(adata.writer.function)):
|
||||
newEvents.incl(Event.Write)
|
||||
|
@ -592,7 +580,6 @@ elif unixPlatform:
|
|||
withData(loop.selector, int(fd), adata) do:
|
||||
# We need to clear `reader` data, because `selectors` don't do it
|
||||
adata.reader = default(AsyncCallback)
|
||||
# adata.rdata = CompletionData()
|
||||
if not(isNil(adata.writer.function)):
|
||||
newEvents.incl(Event.Write)
|
||||
do:
|
||||
|
@ -606,9 +593,8 @@ elif unixPlatform:
|
|||
let loop = getThreadDispatcher()
|
||||
var newEvents = {Event.Write}
|
||||
withData(loop.selector, int(fd), adata) do:
|
||||
let acb = AsyncCallback(function: cb, udata: addr adata.wdata)
|
||||
let acb = AsyncCallback(function: cb, udata: udata)
|
||||
adata.writer = acb
|
||||
adata.wdata = CompletionData(fd: fd, udata: udata)
|
||||
newEvents.incl(Event.Write)
|
||||
if not(isNil(adata.reader.function)):
|
||||
newEvents.incl(Event.Read)
|
||||
|
@ -624,7 +610,6 @@ elif unixPlatform:
|
|||
withData(loop.selector, int(fd), adata) do:
|
||||
# We need to clear `writer` data, because `selectors` don't do it
|
||||
adata.writer = default(AsyncCallback)
|
||||
# adata.wdata = CompletionData()
|
||||
if not(isNil(adata.reader.function)):
|
||||
newEvents.incl(Event.Read)
|
||||
do:
|
||||
|
@ -692,9 +677,7 @@ elif unixPlatform:
|
|||
var data: SelectorData
|
||||
result = loop.selector.registerSignal(signal, data)
|
||||
withData(loop.selector, result, adata) do:
|
||||
adata.reader = AsyncCallback(function: cb, udata: addr adata.rdata)
|
||||
adata.rdata.fd = AsyncFD(result)
|
||||
adata.rdata.udata = udata
|
||||
adata.reader = AsyncCallback(function: cb, udata: udata)
|
||||
do:
|
||||
raise newException(ValueError, "File descriptor not registered.")
|
||||
|
||||
|
|
|
@ -185,7 +185,7 @@ elif defined(windows):
|
|||
else:
|
||||
## Initiation
|
||||
transp.state.incl(WritePending)
|
||||
let fd = SocketHandle(ovl.data.fd)
|
||||
let fd = SocketHandle(transp.fd)
|
||||
var vector = transp.queue.popFirst()
|
||||
transp.setWriterWSABuffer(vector)
|
||||
var ret: cint
|
||||
|
@ -258,7 +258,7 @@ elif defined(windows):
|
|||
## Initiation
|
||||
if transp.state * {ReadEof, ReadClosed, ReadError} == {}:
|
||||
transp.state.incl(ReadPending)
|
||||
let fd = SocketHandle(ovl.data.fd)
|
||||
let fd = SocketHandle(transp.fd)
|
||||
transp.rflag = 0
|
||||
transp.ralen = SockLen(sizeof(Sockaddr_storage))
|
||||
let ret = WSARecvFrom(fd, addr transp.rwsabuf, DWORD(1),
|
||||
|
@ -406,9 +406,9 @@ elif defined(windows):
|
|||
result.udata = udata
|
||||
result.state = {WritePaused}
|
||||
result.future = newFuture[void]("datagram.transport")
|
||||
result.rovl.data = CompletionData(fd: localSock, cb: readDatagramLoop,
|
||||
result.rovl.data = CompletionData(cb: readDatagramLoop,
|
||||
udata: cast[pointer](result))
|
||||
result.wovl.data = CompletionData(fd: localSock, cb: writeDatagramLoop,
|
||||
result.wovl.data = CompletionData(cb: writeDatagramLoop,
|
||||
udata: cast[pointer](result))
|
||||
result.rwsabuf = TWSABuf(buf: cast[cstring](addr result.buffer[0]),
|
||||
len: int32(len(result.buffer)))
|
||||
|
@ -426,9 +426,8 @@ else:
|
|||
proc readDatagramLoop(udata: pointer) {.raises: Defect.}=
|
||||
var raddr: TransportAddress
|
||||
doAssert(not isNil(udata))
|
||||
var cdata = cast[ptr CompletionData](udata)
|
||||
var transp = cast[DatagramTransport](cdata.udata)
|
||||
let fd = SocketHandle(cdata.fd)
|
||||
let transp = cast[DatagramTransport](udata)
|
||||
let fd = SocketHandle(transp.fd)
|
||||
if int(fd) == 0:
|
||||
## This situation can be happen, when there events present
|
||||
## after transport was closed.
|
||||
|
@ -459,9 +458,8 @@ else:
|
|||
proc writeDatagramLoop(udata: pointer) =
|
||||
var res: int
|
||||
doAssert(not isNil(udata))
|
||||
var cdata = cast[ptr CompletionData](udata)
|
||||
var transp = cast[DatagramTransport](cdata.udata)
|
||||
let fd = SocketHandle(cdata.fd)
|
||||
var transp = cast[DatagramTransport](udata)
|
||||
let fd = SocketHandle(transp.fd)
|
||||
if int(fd) == 0:
|
||||
## This situation can be happen, when there events present
|
||||
## after transport was closed.
|
||||
|
|
|
@ -407,7 +407,7 @@ elif defined(windows):
|
|||
## Initiation
|
||||
transp.state.incl(WritePending)
|
||||
if transp.kind == TransportKind.Socket:
|
||||
let sock = SocketHandle(transp.wovl.data.fd)
|
||||
let sock = SocketHandle(transp.fd)
|
||||
var vector = transp.queue.popFirst()
|
||||
if vector.kind == VectorKind.DataBuffer:
|
||||
transp.wovl.zeroOvelappedOffset()
|
||||
|
@ -492,7 +492,7 @@ elif defined(windows):
|
|||
else:
|
||||
transp.queue.addFirst(vector)
|
||||
elif transp.kind == TransportKind.Pipe:
|
||||
let pipe = Handle(transp.wovl.data.fd)
|
||||
let pipe = Handle(transp.fd)
|
||||
var vector = transp.queue.popFirst()
|
||||
if vector.kind == VectorKind.DataBuffer:
|
||||
transp.wovl.zeroOvelappedOffset()
|
||||
|
@ -587,7 +587,7 @@ elif defined(windows):
|
|||
transp.state.excl(ReadPaused)
|
||||
transp.state.incl(ReadPending)
|
||||
if transp.kind == TransportKind.Socket:
|
||||
let sock = SocketHandle(transp.rovl.data.fd)
|
||||
let sock = SocketHandle(transp.fd)
|
||||
transp.roffset = transp.offset
|
||||
transp.setReaderWSABuffer()
|
||||
let ret = WSARecv(sock, addr transp.rwsabuf, 1,
|
||||
|
@ -610,7 +610,7 @@ elif defined(windows):
|
|||
transp.setReadError(err)
|
||||
transp.completeReader()
|
||||
elif transp.kind == TransportKind.Pipe:
|
||||
let pipe = Handle(transp.rovl.data.fd)
|
||||
let pipe = Handle(transp.fd)
|
||||
transp.roffset = transp.offset
|
||||
transp.setReaderWSABuffer()
|
||||
let ret = readFile(pipe, cast[pointer](transp.rwsabuf.buf),
|
||||
|
@ -650,9 +650,9 @@ elif defined(windows):
|
|||
else:
|
||||
transp = StreamTransport(kind: TransportKind.Socket)
|
||||
transp.fd = sock
|
||||
transp.rovl.data = CompletionData(fd: sock, cb: readStreamLoop,
|
||||
transp.rovl.data = CompletionData(cb: readStreamLoop,
|
||||
udata: cast[pointer](transp))
|
||||
transp.wovl.data = CompletionData(fd: sock, cb: writeStreamLoop,
|
||||
transp.wovl.data = CompletionData(cb: writeStreamLoop,
|
||||
udata: cast[pointer](transp))
|
||||
transp.buffer = newSeq[byte](bufsize)
|
||||
transp.state = {ReadPaused, WritePaused}
|
||||
|
@ -670,9 +670,9 @@ elif defined(windows):
|
|||
else:
|
||||
transp = StreamTransport(kind: TransportKind.Pipe)
|
||||
transp.fd = fd
|
||||
transp.rovl.data = CompletionData(fd: fd, cb: readStreamLoop,
|
||||
transp.rovl.data = CompletionData(cb: readStreamLoop,
|
||||
udata: cast[pointer](transp))
|
||||
transp.wovl.data = CompletionData(fd: fd, cb: writeStreamLoop,
|
||||
transp.wovl.data = CompletionData(cb: writeStreamLoop,
|
||||
udata: cast[pointer](transp))
|
||||
transp.buffer = newSeq[byte](bufsize)
|
||||
transp.flags = flags
|
||||
|
@ -746,8 +746,7 @@ elif defined(windows):
|
|||
sock.closeSocket()
|
||||
retFuture.fail(getTransportOsError(err))
|
||||
else:
|
||||
let transp = newStreamSocketTransport(povl.data.fd, bufferSize,
|
||||
child)
|
||||
let transp = newStreamSocketTransport(sock, bufferSize, child)
|
||||
# Start tracking transport
|
||||
trackStream(transp)
|
||||
retFuture.complete(transp)
|
||||
|
@ -761,7 +760,7 @@ elif defined(windows):
|
|||
|
||||
povl = RefCustomOverlapped()
|
||||
GC_ref(povl)
|
||||
povl.data = CompletionData(fd: sock, cb: socketContinuation)
|
||||
povl.data = CompletionData(cb: socketContinuation)
|
||||
let res = loop.connectEx(SocketHandle(sock),
|
||||
cast[ptr SockAddr](addr saddr),
|
||||
DWORD(slen), nil, 0, nil,
|
||||
|
@ -895,7 +894,6 @@ elif defined(windows):
|
|||
if pipeHandle == INVALID_HANDLE_VALUE:
|
||||
raiseAssert osErrorMsg(osLastError())
|
||||
server.sock = AsyncFD(pipeHandle)
|
||||
server.aovl.data.fd = AsyncFD(pipeHandle)
|
||||
try: register(server.sock)
|
||||
except CatchableError as exc:
|
||||
raiseAsDefect exc, "register"
|
||||
|
@ -1177,8 +1175,7 @@ elif defined(windows):
|
|||
let dwLocalAddressLength = DWORD(sizeof(Sockaddr_in6) + 16)
|
||||
let dwRemoteAddressLength = DWORD(sizeof(Sockaddr_in6) + 16)
|
||||
|
||||
server.aovl.data = CompletionData(fd: server.sock,
|
||||
cb: continuationSocket,
|
||||
server.aovl.data = CompletionData(cb: continuationSocket,
|
||||
udata: cast[pointer](server))
|
||||
server.apending = true
|
||||
let res = loop.acceptEx(SocketHandle(server.sock),
|
||||
|
@ -1219,8 +1216,7 @@ elif defined(windows):
|
|||
retFuture.fail(getTransportOsError(err))
|
||||
return retFuture
|
||||
|
||||
server.aovl.data = CompletionData(fd: server.sock,
|
||||
cb: continuationPipe,
|
||||
server.aovl.data = CompletionData(cb: continuationPipe,
|
||||
udata: cast[pointer](server))
|
||||
server.apending = true
|
||||
let res = connectNamedPipe(Handle(server.sock),
|
||||
|
@ -1260,15 +1256,17 @@ else:
|
|||
raiseAsDefect exc, "removeWriter"
|
||||
|
||||
proc writeStreamLoop(udata: pointer) =
|
||||
var cdata = cast[ptr CompletionData](udata)
|
||||
var transp = cast[StreamTransport](cdata.udata)
|
||||
let fd = SocketHandle(cdata.fd)
|
||||
|
||||
if int(fd) == 0 or isNil(transp):
|
||||
## This situation can be happen, when there events present
|
||||
## after transport was closed.
|
||||
if isNil(udata):
|
||||
# TODO this is an if rather than an assert for historical reasons:
|
||||
# it should not happen unless there are race conditions - but if there
|
||||
# are race conditions, `transp` might be invalid even if it's not nil:
|
||||
# it could have been released
|
||||
return
|
||||
|
||||
let
|
||||
transp = cast[StreamTransport](udata)
|
||||
fd = SocketHandle(transp.fd)
|
||||
|
||||
if WriteClosed in transp.state:
|
||||
if transp.queue.len > 0:
|
||||
transp.removeWriter()
|
||||
|
@ -1357,15 +1355,17 @@ else:
|
|||
transp.removeWriter()
|
||||
|
||||
proc readStreamLoop(udata: pointer) =
|
||||
# TODO fix Defect raises - they "shouldn't" happen
|
||||
var cdata = cast[ptr CompletionData](udata)
|
||||
var transp = cast[StreamTransport](cdata.udata)
|
||||
let fd = SocketHandle(cdata.fd)
|
||||
if int(fd) == 0 or isNil(transp):
|
||||
## This situation can be happen, when there events present
|
||||
## after transport was closed.
|
||||
if isNil(udata):
|
||||
# TODO this is an if rather than an assert for historical reasons:
|
||||
# it should not happen unless there are race conditions - but if there
|
||||
# are race conditions, `transp` might be invalid even if it's not nil:
|
||||
# it could have been released
|
||||
return
|
||||
|
||||
let
|
||||
transp = cast[StreamTransport](udata)
|
||||
fd = SocketHandle(transp.fd)
|
||||
|
||||
if ReadClosed in transp.state:
|
||||
transp.state.incl({ReadPaused})
|
||||
transp.completeReader()
|
||||
|
@ -1381,7 +1381,7 @@ else:
|
|||
elif int(err) in {ECONNRESET}:
|
||||
transp.state.incl({ReadEof, ReadPaused})
|
||||
try:
|
||||
cdata.fd.removeReader()
|
||||
transp.fd.removeReader()
|
||||
except IOSelectorsException as exc:
|
||||
raiseAsDefect exc, "removeReader"
|
||||
except ValueError as exc:
|
||||
|
@ -1390,7 +1390,7 @@ else:
|
|||
transp.state.incl(ReadPaused)
|
||||
transp.setReadError(err)
|
||||
try:
|
||||
cdata.fd.removeReader()
|
||||
transp.fd.removeReader()
|
||||
except IOSelectorsException as exc:
|
||||
raiseAsDefect exc, "removeReader"
|
||||
except ValueError as exc:
|
||||
|
@ -1398,7 +1398,7 @@ else:
|
|||
elif res == 0:
|
||||
transp.state.incl({ReadEof, ReadPaused})
|
||||
try:
|
||||
cdata.fd.removeReader()
|
||||
transp.fd.removeReader()
|
||||
except IOSelectorsException as exc:
|
||||
raiseAsDefect exc, "removeReader"
|
||||
except ValueError as exc:
|
||||
|
@ -1408,7 +1408,7 @@ else:
|
|||
if transp.offset == len(transp.buffer):
|
||||
transp.state.incl(ReadPaused)
|
||||
try:
|
||||
cdata.fd.removeReader()
|
||||
transp.fd.removeReader()
|
||||
except IOSelectorsException as exc:
|
||||
raiseAsDefect exc, "removeReader"
|
||||
except ValueError as exc:
|
||||
|
@ -1427,7 +1427,7 @@ else:
|
|||
transp.state.incl(ReadPaused)
|
||||
transp.setReadError(err)
|
||||
try:
|
||||
cdata.fd.removeReader()
|
||||
transp.fd.removeReader()
|
||||
except IOSelectorsException as exc:
|
||||
raiseAsDefect exc, "removeReader"
|
||||
except ValueError as exc:
|
||||
|
@ -1435,7 +1435,7 @@ else:
|
|||
elif res == 0:
|
||||
transp.state.incl({ReadEof, ReadPaused})
|
||||
try:
|
||||
cdata.fd.removeReader()
|
||||
transp.fd.removeReader()
|
||||
except IOSelectorsException as exc:
|
||||
raiseAsDefect exc, "removeReader"
|
||||
except ValueError as exc:
|
||||
|
@ -1445,7 +1445,7 @@ else:
|
|||
if transp.offset == len(transp.buffer):
|
||||
transp.state.incl(ReadPaused)
|
||||
try:
|
||||
cdata.fd.removeReader()
|
||||
transp.fd.removeReader()
|
||||
except IOSelectorsException as exc:
|
||||
raiseAsDefect exc, "removeReader"
|
||||
except ValueError as exc:
|
||||
|
@ -1519,11 +1519,9 @@ else:
|
|||
|
||||
proc continuation(udata: pointer) =
|
||||
if not(retFuture.finished()):
|
||||
var data = cast[ptr CompletionData](udata)
|
||||
var err = 0
|
||||
let fd = data.fd
|
||||
try:
|
||||
fd.removeWriter()
|
||||
sock.removeWriter()
|
||||
except IOSelectorsException as exc:
|
||||
retFuture.fail(exc)
|
||||
return
|
||||
|
@ -1531,15 +1529,15 @@ else:
|
|||
retFuture.fail(exc)
|
||||
return
|
||||
|
||||
if not(fd.getSocketError(err)):
|
||||
closeSocket(fd)
|
||||
if not(sock.getSocketError(err)):
|
||||
closeSocket(sock)
|
||||
retFuture.fail(getTransportOsError(osLastError()))
|
||||
return
|
||||
if err != 0:
|
||||
closeSocket(fd)
|
||||
closeSocket(sock)
|
||||
retFuture.fail(getTransportOsError(OSErrorCode(err)))
|
||||
return
|
||||
let transp = newStreamSocketTransport(fd, bufferSize, child)
|
||||
let transp = newStreamSocketTransport(sock, bufferSize, child)
|
||||
# Start tracking transport
|
||||
trackStream(transp)
|
||||
retFuture.complete(transp)
|
||||
|
@ -1581,11 +1579,18 @@ else:
|
|||
break
|
||||
return retFuture
|
||||
|
||||
proc acceptLoop(udata: pointer) {.gcsafe.} =
|
||||
proc acceptLoop(udata: pointer) =
|
||||
if isNil(udata):
|
||||
# TODO this is an if rather than an assert for historical reasons:
|
||||
# it should not happen unless there are race conditions - but if there
|
||||
# are race conditions, `transp` might be invalid even if it's not nil:
|
||||
# it could have been released
|
||||
return
|
||||
|
||||
var
|
||||
saddr: Sockaddr_storage
|
||||
slen: SockLen
|
||||
var server = cast[StreamServer](cast[ptr CompletionData](udata).udata)
|
||||
let server = cast[StreamServer](udata)
|
||||
while true:
|
||||
if server.status in {ServerStatus.Stopped, ServerStatus.Closed}:
|
||||
break
|
||||
|
@ -1990,7 +1995,7 @@ proc createStreamServer*(host: TransportAddress,
|
|||
cb = acceptPipeLoop
|
||||
|
||||
if not(isNil(cbproc)):
|
||||
result.aovl.data = CompletionData(fd: serverSocket, cb: cb,
|
||||
result.aovl.data = CompletionData(cb: cb,
|
||||
udata: cast[pointer](result))
|
||||
else:
|
||||
if host.family == AddressFamily.Unix:
|
||||
|
|
|
@ -15,13 +15,14 @@ when not defined(windows):
|
|||
|
||||
suite "Signal handling test suite":
|
||||
when not defined(windows):
|
||||
var signalCounter = 0
|
||||
var
|
||||
signalCounter = 0
|
||||
sigfd = -1
|
||||
|
||||
proc signalProc(udata: pointer) =
|
||||
var cdata = cast[ptr CompletionData](udata)
|
||||
signalCounter = cast[int](cdata.udata)
|
||||
signalCounter = cast[int](udata)
|
||||
try:
|
||||
removeSignal(int(cdata.fd))
|
||||
removeSignal(sigfd)
|
||||
except Exception as exc:
|
||||
raiseAssert exc.msg
|
||||
|
||||
|
@ -30,7 +31,7 @@ suite "Signal handling test suite":
|
|||
|
||||
proc test(signal, value: int): bool =
|
||||
try:
|
||||
discard addSignal(signal, signalProc, cast[pointer](value))
|
||||
sigfd = addSignal(signal, signalProc, cast[pointer](value))
|
||||
except Exception as exc:
|
||||
raiseAssert exc.msg
|
||||
var fut = asyncProc()
|
||||
|
|
Loading…
Reference in New Issue