Fix race condition, when close transport happens while writing queue is not empty.

This commit is contained in:
cheatfate 2018-10-02 13:50:14 +03:00
parent 756f85576e
commit 3859aab5c6
3 changed files with 35 additions and 22 deletions

View File

@ -1,5 +1,5 @@
packageName = "asyncdispatch2" packageName = "asyncdispatch2"
version = "2.1.2" version = "2.1.3"
author = "Status Research & Development GmbH" author = "Status Research & Development GmbH"
description = "Asyncdispatch2" description = "Asyncdispatch2"
license = "Apache License 2.0 or MIT" license = "Apache License 2.0 or MIT"

View File

@ -400,11 +400,11 @@ when defined(windows) or defined(nimdoc):
loop.transmitFile = cast[WSAPROC_TRANSMITFILE](funcPointer) loop.transmitFile = cast[WSAPROC_TRANSMITFILE](funcPointer)
close(sock) close(sock)
proc closeSocket*(socket: AsyncFD, aftercb: CallbackFunc = nil) = proc closeSocket*(fd: AsyncFD, aftercb: CallbackFunc = nil) =
## Closes a socket and ensures that it is unregistered. ## Closes a socket and ensures that it is unregistered.
let loop = getGlobalDispatcher() let loop = getGlobalDispatcher()
socket.SocketHandle.close() loop.handles.excl(fd)
loop.handles.excl(socket) close(SocketHandle(fd))
if not isNil(aftercb): if not isNil(aftercb):
var acb = AsyncCallback(function: aftercb) var acb = AsyncCallback(function: aftercb)
loop.callbacks.addLast(acb) loop.callbacks.addLast(acb)
@ -551,17 +551,28 @@ else:
let loop = getGlobalDispatcher() let loop = getGlobalDispatcher()
proc continuation(udata: pointer) = proc continuation(udata: pointer) =
aftercb(nil)
unregister(fd) unregister(fd)
close(SocketHandle(fd)) close(SocketHandle(fd))
if not isNil(aftercb):
aftercb(nil)
withData(loop.selector, int(fd), adata) do: withData(loop.selector, int(fd), adata) do:
# We are scheduling reader and writer callbacks to be called
# explicitly, so they can get an error and continue work.
if not isNil(adata.reader.function): if not isNil(adata.reader.function):
if not adata.reader.deleted:
loop.callbacks.addLast(adata.reader) loop.callbacks.addLast(adata.reader)
if not isNil(adata.writer.function): if not isNil(adata.writer.function):
if not adata.writer.deleted:
loop.callbacks.addLast(adata.writer) loop.callbacks.addLast(adata.writer)
# Mark callbacks as deleted, we don't need to get REAL notifications
# from system queue for this reader and writer.
adata.reader.deleted = true
adata.writer.deleted = true
if not isNil(aftercb): # We can't unregister file descriptor from system queue here, because
# in such case processing queue will stuck on poll() call, because there
# can be no file descriptors registered in system queue.
var acb = AsyncCallback(function: continuation) var acb = AsyncCallback(function: continuation)
loop.callbacks.addLast(acb) loop.callbacks.addLast(acb)
@ -608,16 +619,20 @@ else:
withData(loop.selector, fd, adata) do: withData(loop.selector, fd, adata) do:
if Event.Read in events or events == {Event.Error}: if Event.Read in events or events == {Event.Error}:
if not adata.reader.deleted:
loop.callbacks.addLast(adata.reader) loop.callbacks.addLast(adata.reader)
if Event.Write in events or events == {Event.Error}: if Event.Write in events or events == {Event.Error}:
if not adata.writer.deleted:
loop.callbacks.addLast(adata.writer) loop.callbacks.addLast(adata.writer)
if Event.User in events: if Event.User in events:
if not adata.reader.deleted:
loop.callbacks.addLast(adata.reader) loop.callbacks.addLast(adata.reader)
when ioselSupportedPlatform: when ioselSupportedPlatform:
if customSet * events != {}: if customSet * events != {}:
if not adata.reader.deleted:
loop.callbacks.addLast(adata.reader) loop.callbacks.addLast(adata.reader)
# Moving expired timers to `loop.callbacks`. # Moving expired timers to `loop.callbacks`.

View File

@ -439,20 +439,18 @@ proc test3(bounded: bool): Future[int] {.async.} =
for i in 0..<ClientsCount: for i in 0..<ClientsCount:
result += counters[i] result += counters[i]
proc client20(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = 1
transp.close()
proc testConnReset(): Future[bool] {.async.} = proc testConnReset(): Future[bool] {.async.} =
var ta = initTAddress("127.0.0.1:65000") var ta = initTAddress("127.0.0.1:65000")
var counter = 0 var counter = 0
proc clientMark(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
counter = 1
transp.close()
var dgram1 = newDatagramTransport(client1, local = ta) var dgram1 = newDatagramTransport(client1, local = ta)
dgram1.close() dgram1.close()
var dgram2 = newDatagramTransport(client20, udata = addr counter) var dgram2 = newDatagramTransport(clientMark)
var data = "MESSAGE" var data = "MESSAGE"
discard dgram2.sendTo(ta, data) asyncCheck dgram2.sendTo(ta, data)
await sleepAsync(1000) await sleepAsync(1000)
result = (counter == 0) result = (counter == 0)