diff --git a/asyncdispatch2/asyncfutures2.nim b/asyncdispatch2/asyncfutures2.nim index e16449fd..c570a817 100644 --- a/asyncdispatch2/asyncfutures2.nim +++ b/asyncdispatch2/asyncfutures2.nim @@ -196,7 +196,7 @@ proc addCallback*[T](future: Future[T], cb: CallbackFunc) = ## Adds the callbacks proc to be called when the future completes. ## ## If future has already completed then ``cb`` will be called immediately. - future.addCallback(cb, cast[pointer](unsafeAddr future)) + future.addCallback(cb, cast[pointer](future)) proc removeCallback*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) = @@ -205,7 +205,7 @@ proc removeCallback*(future: FutureBase, cb: CallbackFunc, future.callbacks.remove acb proc removeCallback*[T](future: Future[T], cb: CallbackFunc) = - future.removeCallback(cb, cast[pointer](unsafeAddr future)) + future.removeCallback(cb, cast[pointer](future)) proc `callback=`*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) = ## Clears the list of callbacks and sets the callback proc to be called when diff --git a/asyncdispatch2/transports/datagram.nim b/asyncdispatch2/transports/datagram.nim index af09456e..45ff6d07 100644 --- a/asyncdispatch2/transports/datagram.nim +++ b/asyncdispatch2/transports/datagram.nim @@ -8,7 +8,7 @@ # MIT license (LICENSE-MIT) import net, nativesockets, os, deques, strutils -import ../asyncloop, ../handles +import ../asyncloop, ../handles, ../hexdump import common type @@ -91,7 +91,6 @@ when defined(windows): return var ovl = cast[PCustomOverlapped](udata) var transp = cast[WindowsDatagramTransport](ovl.data.udata) - echo "writeDatagramLoop(" & toHex(cast[uint](transp)) & ")" while len(transp.queue) > 0: if WritePending in transp.state: ## Continuation @@ -156,12 +155,12 @@ when defined(windows): transp.state.incl(ReadEof) transp.state.incl(ReadPaused) fromSockAddr(transp.raddr, transp.ralen, raddr.address, raddr.port) - spawn transp.function(transp, addr transp.buffer[0], bytesCount, - raddr, transp.udata) + discard transp.function(transp, addr transp.buffer[0], bytesCount, + raddr, transp.udata) else: transp.setReadError(err) transp.state.incl(ReadPaused) - spawn transp.function(transp, nil, 0, raddr, transp.udata) + discard transp.function(transp, nil, 0, raddr, transp.udata) else: ## Initiation if (ReadEof notin transp.state) and (ReadClosed notin transp.state): @@ -180,11 +179,17 @@ when defined(windows): if ret != 0: let err = osLastError() if int(err) == ERROR_OPERATION_ABORTED: + transp.state.excl(ReadPending) transp.state.incl(ReadPaused) - elif int(err) != ERROR_IO_PENDING: + elif int(err) == WSAECONNRESET: + transp.state.excl(ReadPending) + continue + elif int(err) == ERROR_IO_PENDING: + discard + else: transp.state.excl(ReadPending) transp.setReadError(err) - spawn transp.function(transp, nil, 0, raddr, transp.udata) + discard transp.function(transp, nil, 0, raddr, transp.udata) break proc resumeRead(transp: DatagramTransport) {.inline.} = @@ -307,15 +312,15 @@ else: addr slen) if res >= 0: fromSockAddr(saddr, slen, raddr.address, raddr.port) - spawn transp.function(transp, addr transp.buffer[0], res, - raddr, transp.udata) + discard transp.function(transp, addr transp.buffer[0], res, + raddr, transp.udata) else: let err = osLastError() if int(err) == EINTR: continue else: transp.setReadError(err) - spawn transp.function(transp, nil, 0, raddr, transp.udata) + discard transp.function(transp, nil, 0, raddr, transp.udata) break proc writeDatagramLoop(udata: pointer) = diff --git a/tests/testdatagram.nim b/tests/testdatagram.nim index 3ae5c90f..8e654ea2 100644 --- a/tests/testdatagram.nim +++ b/tests/testdatagram.nim @@ -3,11 +3,8 @@ import ../asyncdispatch2 const TestsCount = 5000 - ClientsCount = 50 - MessagesCount = 350 - -when defined(vcc): - {.passC: "/Zi /FS".} + ClientsCount = 2 + MessagesCount = 1000 proc client1(transp: DatagramTransport, pbytes: pointer, nbytes: int, raddr: TransportAddress, udata: pointer): Future[void] {.async.} = @@ -25,6 +22,7 @@ proc client1(transp: DatagramTransport, pbytes: pointer, nbytes: int, await transp.sendTo(addr err[0], len(err), raddr) else: ## Read operation failed with error + echo "SERVER ERROR HAPPENS QUITING" var counterPtr = cast[ptr int](udata) counterPtr[] = -1 transp.close() @@ -71,13 +69,11 @@ proc client3(transp: DatagramTransport, pbytes: pointer, nbytes: int, var req = "REQUEST" & $counterPtr[] await transp.send(addr req[0], len(req)) else: - echo "ERROR" var counterPtr = cast[ptr int](udata) counterPtr[] = -1 transp.close() else: ## Read operation failed with error - echo "ERROR" var counterPtr = cast[ptr int](udata) counterPtr[] = -1 transp.close() @@ -98,13 +94,13 @@ proc client4(transp: DatagramTransport, pbytes: pointer, nbytes: int, echo $counterPtr[] & "-SEND" await transp.send(addr req[0], len(req)) else: - echo "ERROR" + echo "ERROR1 [" & $data & "]" var counterPtr = cast[ptr int](udata) counterPtr[] = -1 transp.close() else: ## Read operation failed with error - echo "ERROR" + echo "ERROR2" var counterPtr = cast[ptr int](udata) counterPtr[] = -1 transp.close() @@ -161,7 +157,7 @@ proc test3(): Future[int] {.async.} = var data = "REQUEST0" await dgram.sendTo(addr data[0], len(data), ta) clients[i] = dgram.join() - + # await dgram1.join() await waitAll(clients) dgram1.close() result = 0 @@ -174,5 +170,5 @@ when isMainModule: # check waitFor(test1()) == TestsCount # test "Bound test (5000 times)": # check waitFor(test2()) == TestsCount - test "Multiple clients with messages (100 clients x 50 messages each)": + test "Multiple clients with messages": echo waitFor(test3()) diff --git a/tests/teststream.nim b/tests/teststream.nim index e6e0828b..ddb7b0b5 100644 --- a/tests/teststream.nim +++ b/tests/teststream.nim @@ -2,8 +2,8 @@ import strutils, net, unittest import ../asyncdispatch2 const - ClientsCount = 2 - MessagesCount = 1000 + ClientsCount = 1 + MessagesCount = 100000 proc serveClient1(transp: StreamTransport, udata: pointer) {.async.} = echo "SERVER STARTING (0x" & toHex[uint](cast[uint](transp)) & ")"