From 5c6c723cb9da061aa886766f48db17899a91f41f Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 18 May 2018 02:40:42 +0300 Subject: [PATCH] Fix nasty GC misuse bug --- asyncdispatch2/transports/datagram.nim | 19 +++--- tests/testdatagram.nim | 84 ++++++++++++++++++++------ 2 files changed, 74 insertions(+), 29 deletions(-) diff --git a/asyncdispatch2/transports/datagram.nim b/asyncdispatch2/transports/datagram.nim index 45ff6d0..c148e1c 100644 --- a/asyncdispatch2/transports/datagram.nim +++ b/asyncdispatch2/transports/datagram.nim @@ -87,8 +87,6 @@ when defined(windows): proc writeDatagramLoop(udata: pointer) = var bytesCount: int32 - if isNil(udata): - return var ovl = cast[PCustomOverlapped](udata) var transp = cast[WindowsDatagramTransport](ovl.data.udata) while len(transp.queue) > 0: @@ -97,8 +95,7 @@ when defined(windows): transp.state.excl(WritePending) let err = transp.wovl.data.errCode if err == OSErrorCode(-1): - var vector = transp.queue.popFirst() - vector.writer.complete() + transp.finishWriter() else: transp.setWriteError(err) transp.finishWriter() @@ -126,7 +123,7 @@ when defined(windows): else: transp.state.excl(WritePending) transp.setWriteError(err) - transp.finishWriter() + vector.writer.complete() else: transp.queue.addFirst(vector) break @@ -138,8 +135,6 @@ when defined(windows): var bytesCount: int32 raddr: TransportAddress - if isNil(udata): - return var ovl = cast[PCustomOverlapped](udata) var transp = cast[WindowsDatagramTransport](ovl.data.udata) while true: @@ -156,7 +151,7 @@ when defined(windows): transp.state.incl(ReadPaused) fromSockAddr(transp.raddr, transp.ralen, raddr.address, raddr.port) discard transp.function(transp, addr transp.buffer[0], bytesCount, - raddr, transp.udata) + raddr, transp.udata) else: transp.setReadError(err) transp.state.incl(ReadPaused) @@ -280,6 +275,7 @@ when defined(windows): udata: cast[pointer](wresult)) wresult.wsabuf = TWSABuf(buf: cast[cstring](addr wresult.buffer[0]), len: int32(len(wresult.buffer))) + GC_ref(wresult) result = cast[DatagramTransport](wresult) result.resumeRead() @@ -291,6 +287,8 @@ when defined(windows): transp.state.incl(WriteClosed) transp.state.incl(ReadClosed) transp.future.complete() + var wresult = cast[WindowsDatagramTransport](transp) + GC_unref(wresult) else: @@ -430,6 +428,7 @@ else: result.udata = udata result.state = {WritePaused} result.future = newFuture[void]("datagram.transport") + GC_ref(result) result.resumeRead() proc close*(transp: DatagramTransport) = @@ -439,6 +438,7 @@ else: transp.state.incl(WriteClosed) transp.state.incl(ReadClosed) transp.future.complete() + GC_unref(transp) proc newDatagramTransport*(cbproc: DatagramCallback, remote: TransportAddress = AnyAddress, @@ -463,7 +463,8 @@ proc newDatagramTransport6*(cbproc: DatagramCallback, flags, udata, bufSize) proc join*(transp: DatagramTransport) {.async.} = - await transp.future + if not transp.future.finished: + await transp.future proc send*(transp: DatagramTransport, pbytes: pointer, nbytes: int) {.async.} = diff --git a/tests/testdatagram.nim b/tests/testdatagram.nim index 8e654ea..8165d69 100644 --- a/tests/testdatagram.nim +++ b/tests/testdatagram.nim @@ -1,10 +1,18 @@ +# Asyncdispatch2 +# (c) Copyright 2018 +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) + import strutils, net, unittest import ../asyncdispatch2 const - TestsCount = 5000 - ClientsCount = 2 - MessagesCount = 1000 + TestsCount = 10000 + ClientsCount = 100 + MessagesCount = 100 proc client1(transp: DatagramTransport, pbytes: pointer, nbytes: int, raddr: TransportAddress, udata: pointer): Future[void] {.async.} = @@ -22,7 +30,6 @@ proc client1(transp: DatagramTransport, pbytes: pointer, nbytes: int, await transp.sendTo(addr err[0], len(err), raddr) else: ## Read operation failed with error - echo "SERVER ERROR HAPPENS QUITING" var counterPtr = cast[ptr int](udata) counterPtr[] = -1 transp.close() @@ -91,19 +98,43 @@ proc client4(transp: DatagramTransport, pbytes: pointer, nbytes: int, transp.close() else: var req = "REQUEST" & $counterPtr[] - echo $counterPtr[] & "-SEND" await transp.send(addr req[0], len(req)) else: - echo "ERROR1 [" & $data & "]" var counterPtr = cast[ptr int](udata) counterPtr[] = -1 transp.close() else: ## Read operation failed with error - echo "ERROR2" var counterPtr = cast[ptr int](udata) counterPtr[] = -1 - transp.close() + transp.close() + +proc client5(transp: DatagramTransport, pbytes: pointer, nbytes: int, + raddr: TransportAddress, udata: pointer): Future[void] {.async.} = + if not isNil(pbytes): + var data = newString(nbytes + 1) + copyMem(addr data[0], pbytes, nbytes) + data.setLen(nbytes) + if data.startsWith("ANSWER"): + var counterPtr = cast[ptr int](udata) + counterPtr[] = counterPtr[] + 1 + if counterPtr[] == MessagesCount: + transp.close() + else: + var ta: TransportAddress + ta.address = parseIpAddress("127.0.0.1") + ta.port = Port(33337) + var req = "REQUEST" & $counterPtr[] + await transp.sendTo(addr req[0], len(req), ta) + else: + var counterPtr = cast[ptr int](udata) + counterPtr[] = -1 + transp.close() + else: + ## Read operation failed with error + var counterPtr = cast[ptr int](udata) + counterPtr[] = -1 + transp.close() proc test1(): Future[int] {.async.} = var ta: TransportAddress @@ -142,7 +173,7 @@ proc waitAll(futs: seq[Future[void]]): Future[void] = fut.addCallback(cb) return retFuture -proc test3(): Future[int] {.async.} = +proc test3(bounded: bool): Future[int] {.async.} = var ta: TransportAddress ta.address = parseIpAddress("127.0.0.1") ta.port = Port(33337) @@ -150,14 +181,18 @@ proc test3(): Future[int] {.async.} = var dgram1 = newDatagramTransport(client1, udata = addr counter, local = ta) var clients = newSeq[Future[void]](ClientsCount) var counters = newSeq[int](ClientsCount) + var dgram: DatagramTransport for i in 0..