Merge pull request #10 from status-im/racefix

Fix race condition, when close transport happens while writing queue …
This commit is contained in:
Eugene Kabanov 2018-10-02 23:25:15 +03:00 committed by GitHub
commit 610b508a3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 35 additions and 22 deletions

View File

@ -1,5 +1,5 @@
packageName = "asyncdispatch2"
version = "2.1.2"
version = "2.1.3"
author = "Status Research & Development GmbH"
description = "Asyncdispatch2"
license = "Apache License 2.0 or MIT"

View File

@ -400,11 +400,11 @@ when defined(windows) or defined(nimdoc):
loop.transmitFile = cast[WSAPROC_TRANSMITFILE](funcPointer)
close(sock)
proc closeSocket*(socket: AsyncFD, aftercb: CallbackFunc = nil) =
proc closeSocket*(fd: AsyncFD, aftercb: CallbackFunc = nil) =
## Closes a socket and ensures that it is unregistered.
let loop = getGlobalDispatcher()
socket.SocketHandle.close()
loop.handles.excl(socket)
loop.handles.excl(fd)
close(SocketHandle(fd))
if not isNil(aftercb):
var acb = AsyncCallback(function: aftercb)
loop.callbacks.addLast(acb)
@ -551,19 +551,30 @@ else:
let loop = getGlobalDispatcher()
proc continuation(udata: pointer) =
aftercb(nil)
unregister(fd)
close(SocketHandle(fd))
if not isNil(aftercb):
aftercb(nil)
withData(loop.selector, int(fd), adata) do:
# We are scheduling reader and writer callbacks to be called
# explicitly, so they can get an error and continue work.
if not isNil(adata.reader.function):
loop.callbacks.addLast(adata.reader)
if not adata.reader.deleted:
loop.callbacks.addLast(adata.reader)
if not isNil(adata.writer.function):
loop.callbacks.addLast(adata.writer)
if not adata.writer.deleted:
loop.callbacks.addLast(adata.writer)
# Mark callbacks as deleted, we don't need to get REAL notifications
# from system queue for this reader and writer.
adata.reader.deleted = true
adata.writer.deleted = true
if not isNil(aftercb):
var acb = AsyncCallback(function: continuation)
loop.callbacks.addLast(acb)
# We can't unregister file descriptor from system queue here, because
# in such case processing queue will stuck on poll() call, because there
# can be no file descriptors registered in system queue.
var acb = AsyncCallback(function: continuation)
loop.callbacks.addLast(acb)
when ioselSupportedPlatform:
proc addSignal*(signal: int, cb: CallbackFunc,
@ -608,17 +619,21 @@ else:
withData(loop.selector, fd, adata) do:
if Event.Read in events or events == {Event.Error}:
loop.callbacks.addLast(adata.reader)
if not adata.reader.deleted:
loop.callbacks.addLast(adata.reader)
if Event.Write in events or events == {Event.Error}:
loop.callbacks.addLast(adata.writer)
if not adata.writer.deleted:
loop.callbacks.addLast(adata.writer)
if Event.User in events:
loop.callbacks.addLast(adata.reader)
if not adata.reader.deleted:
loop.callbacks.addLast(adata.reader)
when ioselSupportedPlatform:
if customSet * events != {}:
loop.callbacks.addLast(adata.reader)
if not adata.reader.deleted:
loop.callbacks.addLast(adata.reader)
# Moving expired timers to `loop.callbacks`.
loop.processTimers()

View File

@ -439,20 +439,18 @@ proc test3(bounded: bool): Future[int] {.async.} =
for i in 0..<ClientsCount:
result += counters[i]
proc client20(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = 1
transp.close()
proc testConnReset(): Future[bool] {.async.} =
var ta = initTAddress("127.0.0.1:65000")
var counter = 0
proc clientMark(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
counter = 1
transp.close()
var dgram1 = newDatagramTransport(client1, local = ta)
dgram1.close()
var dgram2 = newDatagramTransport(client20, udata = addr counter)
var dgram2 = newDatagramTransport(clientMark)
var data = "MESSAGE"
discard dgram2.sendTo(ta, data)
asyncCheck dgram2.sendTo(ta, data)
await sleepAsync(1000)
result = (counter == 0)