From 3859aab5c606ff0d09994fe1a315bf2f364691d7 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Tue, 2 Oct 2018 13:50:14 +0300 Subject: [PATCH] Fix race condition, when close transport happens while writing queue is not empty. --- asyncdispatch2.nimble | 2 +- asyncdispatch2/asyncloop.nim | 41 ++++++++++++++++++++++++------------ tests/testdatagram.nim | 14 ++++++------ 3 files changed, 35 insertions(+), 22 deletions(-) diff --git a/asyncdispatch2.nimble b/asyncdispatch2.nimble index 9513171..a668452 100644 --- a/asyncdispatch2.nimble +++ b/asyncdispatch2.nimble @@ -1,5 +1,5 @@ packageName = "asyncdispatch2" -version = "2.1.2" +version = "2.1.3" author = "Status Research & Development GmbH" description = "Asyncdispatch2" license = "Apache License 2.0 or MIT" diff --git a/asyncdispatch2/asyncloop.nim b/asyncdispatch2/asyncloop.nim index fe35be7..e989d96 100644 --- a/asyncdispatch2/asyncloop.nim +++ b/asyncdispatch2/asyncloop.nim @@ -400,11 +400,11 @@ when defined(windows) or defined(nimdoc): loop.transmitFile = cast[WSAPROC_TRANSMITFILE](funcPointer) close(sock) - proc closeSocket*(socket: AsyncFD, aftercb: CallbackFunc = nil) = + proc closeSocket*(fd: AsyncFD, aftercb: CallbackFunc = nil) = ## Closes a socket and ensures that it is unregistered. let loop = getGlobalDispatcher() - socket.SocketHandle.close() - loop.handles.excl(socket) + loop.handles.excl(fd) + close(SocketHandle(fd)) if not isNil(aftercb): var acb = AsyncCallback(function: aftercb) loop.callbacks.addLast(acb) @@ -551,19 +551,30 @@ else: let loop = getGlobalDispatcher() proc continuation(udata: pointer) = - aftercb(nil) unregister(fd) close(SocketHandle(fd)) + if not isNil(aftercb): + aftercb(nil) withData(loop.selector, int(fd), adata) do: + # We are scheduling reader and writer callbacks to be called + # explicitly, so they can get an error and continue work. if not isNil(adata.reader.function): - loop.callbacks.addLast(adata.reader) + if not adata.reader.deleted: + loop.callbacks.addLast(adata.reader) if not isNil(adata.writer.function): - loop.callbacks.addLast(adata.writer) + if not adata.writer.deleted: + loop.callbacks.addLast(adata.writer) + # Mark callbacks as deleted, we don't need to get REAL notifications + # from system queue for this reader and writer. + adata.reader.deleted = true + adata.writer.deleted = true - if not isNil(aftercb): - var acb = AsyncCallback(function: continuation) - loop.callbacks.addLast(acb) + # We can't unregister file descriptor from system queue here, because + # in such case processing queue will stuck on poll() call, because there + # can be no file descriptors registered in system queue. + var acb = AsyncCallback(function: continuation) + loop.callbacks.addLast(acb) when ioselSupportedPlatform: proc addSignal*(signal: int, cb: CallbackFunc, @@ -608,17 +619,21 @@ else: withData(loop.selector, fd, adata) do: if Event.Read in events or events == {Event.Error}: - loop.callbacks.addLast(adata.reader) + if not adata.reader.deleted: + loop.callbacks.addLast(adata.reader) if Event.Write in events or events == {Event.Error}: - loop.callbacks.addLast(adata.writer) + if not adata.writer.deleted: + loop.callbacks.addLast(adata.writer) if Event.User in events: - loop.callbacks.addLast(adata.reader) + if not adata.reader.deleted: + loop.callbacks.addLast(adata.reader) when ioselSupportedPlatform: if customSet * events != {}: - loop.callbacks.addLast(adata.reader) + if not adata.reader.deleted: + loop.callbacks.addLast(adata.reader) # Moving expired timers to `loop.callbacks`. loop.processTimers() diff --git a/tests/testdatagram.nim b/tests/testdatagram.nim index d10c920..d008dc6 100644 --- a/tests/testdatagram.nim +++ b/tests/testdatagram.nim @@ -439,20 +439,18 @@ proc test3(bounded: bool): Future[int] {.async.} = for i in 0..