Attempts to fix #01

This commit is contained in:
Your Name 2018-05-17 11:45:18 +03:00
parent 868ae64ae0
commit 570467a306
4 changed files with 26 additions and 25 deletions

View File

@ -196,7 +196,7 @@ proc addCallback*[T](future: Future[T], cb: CallbackFunc) =
## Adds the callbacks proc to be called when the future completes. ## Adds the callbacks proc to be called when the future completes.
## ##
## If future has already completed then ``cb`` will be called immediately. ## If future has already completed then ``cb`` will be called immediately.
future.addCallback(cb, cast[pointer](unsafeAddr future)) future.addCallback(cb, cast[pointer](future))
proc removeCallback*(future: FutureBase, cb: CallbackFunc, proc removeCallback*(future: FutureBase, cb: CallbackFunc,
udata: pointer = nil) = udata: pointer = nil) =
@ -205,7 +205,7 @@ proc removeCallback*(future: FutureBase, cb: CallbackFunc,
future.callbacks.remove acb future.callbacks.remove acb
proc removeCallback*[T](future: Future[T], cb: CallbackFunc) = proc removeCallback*[T](future: Future[T], cb: CallbackFunc) =
future.removeCallback(cb, cast[pointer](unsafeAddr future)) future.removeCallback(cb, cast[pointer](future))
proc `callback=`*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) = proc `callback=`*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) =
## Clears the list of callbacks and sets the callback proc to be called when ## Clears the list of callbacks and sets the callback proc to be called when

View File

@ -8,7 +8,7 @@
# MIT license (LICENSE-MIT) # MIT license (LICENSE-MIT)
import net, nativesockets, os, deques, strutils import net, nativesockets, os, deques, strutils
import ../asyncloop, ../handles import ../asyncloop, ../handles, ../hexdump
import common import common
type type
@ -91,7 +91,6 @@ when defined(windows):
return return
var ovl = cast[PCustomOverlapped](udata) var ovl = cast[PCustomOverlapped](udata)
var transp = cast[WindowsDatagramTransport](ovl.data.udata) var transp = cast[WindowsDatagramTransport](ovl.data.udata)
echo "writeDatagramLoop(" & toHex(cast[uint](transp)) & ")"
while len(transp.queue) > 0: while len(transp.queue) > 0:
if WritePending in transp.state: if WritePending in transp.state:
## Continuation ## Continuation
@ -156,12 +155,12 @@ when defined(windows):
transp.state.incl(ReadEof) transp.state.incl(ReadEof)
transp.state.incl(ReadPaused) transp.state.incl(ReadPaused)
fromSockAddr(transp.raddr, transp.ralen, raddr.address, raddr.port) fromSockAddr(transp.raddr, transp.ralen, raddr.address, raddr.port)
spawn transp.function(transp, addr transp.buffer[0], bytesCount, discard transp.function(transp, addr transp.buffer[0], bytesCount,
raddr, transp.udata) raddr, transp.udata)
else: else:
transp.setReadError(err) transp.setReadError(err)
transp.state.incl(ReadPaused) transp.state.incl(ReadPaused)
spawn transp.function(transp, nil, 0, raddr, transp.udata) discard transp.function(transp, nil, 0, raddr, transp.udata)
else: else:
## Initiation ## Initiation
if (ReadEof notin transp.state) and (ReadClosed notin transp.state): if (ReadEof notin transp.state) and (ReadClosed notin transp.state):
@ -180,11 +179,17 @@ when defined(windows):
if ret != 0: if ret != 0:
let err = osLastError() let err = osLastError()
if int(err) == ERROR_OPERATION_ABORTED: if int(err) == ERROR_OPERATION_ABORTED:
transp.state.excl(ReadPending)
transp.state.incl(ReadPaused) transp.state.incl(ReadPaused)
elif int(err) != ERROR_IO_PENDING: elif int(err) == WSAECONNRESET:
transp.state.excl(ReadPending)
continue
elif int(err) == ERROR_IO_PENDING:
discard
else:
transp.state.excl(ReadPending) transp.state.excl(ReadPending)
transp.setReadError(err) transp.setReadError(err)
spawn transp.function(transp, nil, 0, raddr, transp.udata) discard transp.function(transp, nil, 0, raddr, transp.udata)
break break
proc resumeRead(transp: DatagramTransport) {.inline.} = proc resumeRead(transp: DatagramTransport) {.inline.} =
@ -307,7 +312,7 @@ else:
addr slen) addr slen)
if res >= 0: if res >= 0:
fromSockAddr(saddr, slen, raddr.address, raddr.port) fromSockAddr(saddr, slen, raddr.address, raddr.port)
spawn transp.function(transp, addr transp.buffer[0], res, discard transp.function(transp, addr transp.buffer[0], res,
raddr, transp.udata) raddr, transp.udata)
else: else:
let err = osLastError() let err = osLastError()
@ -315,7 +320,7 @@ else:
continue continue
else: else:
transp.setReadError(err) transp.setReadError(err)
spawn transp.function(transp, nil, 0, raddr, transp.udata) discard transp.function(transp, nil, 0, raddr, transp.udata)
break break
proc writeDatagramLoop(udata: pointer) = proc writeDatagramLoop(udata: pointer) =

View File

@ -3,11 +3,8 @@ import ../asyncdispatch2
const const
TestsCount = 5000 TestsCount = 5000
ClientsCount = 50 ClientsCount = 2
MessagesCount = 350 MessagesCount = 1000
when defined(vcc):
{.passC: "/Zi /FS".}
proc client1(transp: DatagramTransport, pbytes: pointer, nbytes: int, proc client1(transp: DatagramTransport, pbytes: pointer, nbytes: int,
raddr: TransportAddress, udata: pointer): Future[void] {.async.} = raddr: TransportAddress, udata: pointer): Future[void] {.async.} =
@ -25,6 +22,7 @@ proc client1(transp: DatagramTransport, pbytes: pointer, nbytes: int,
await transp.sendTo(addr err[0], len(err), raddr) await transp.sendTo(addr err[0], len(err), raddr)
else: else:
## Read operation failed with error ## Read operation failed with error
echo "SERVER ERROR HAPPENS QUITING"
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](udata)
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
@ -71,13 +69,11 @@ proc client3(transp: DatagramTransport, pbytes: pointer, nbytes: int,
var req = "REQUEST" & $counterPtr[] var req = "REQUEST" & $counterPtr[]
await transp.send(addr req[0], len(req)) await transp.send(addr req[0], len(req))
else: else:
echo "ERROR"
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](udata)
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
else: else:
## Read operation failed with error ## Read operation failed with error
echo "ERROR"
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](udata)
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
@ -98,13 +94,13 @@ proc client4(transp: DatagramTransport, pbytes: pointer, nbytes: int,
echo $counterPtr[] & "-SEND" echo $counterPtr[] & "-SEND"
await transp.send(addr req[0], len(req)) await transp.send(addr req[0], len(req))
else: else:
echo "ERROR" echo "ERROR1 [" & $data & "]"
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](udata)
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
else: else:
## Read operation failed with error ## Read operation failed with error
echo "ERROR" echo "ERROR2"
var counterPtr = cast[ptr int](udata) var counterPtr = cast[ptr int](udata)
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
@ -161,7 +157,7 @@ proc test3(): Future[int] {.async.} =
var data = "REQUEST0" var data = "REQUEST0"
await dgram.sendTo(addr data[0], len(data), ta) await dgram.sendTo(addr data[0], len(data), ta)
clients[i] = dgram.join() clients[i] = dgram.join()
# await dgram1.join()
await waitAll(clients) await waitAll(clients)
dgram1.close() dgram1.close()
result = 0 result = 0
@ -174,5 +170,5 @@ when isMainModule:
# check waitFor(test1()) == TestsCount # check waitFor(test1()) == TestsCount
# test "Bound test (5000 times)": # test "Bound test (5000 times)":
# check waitFor(test2()) == TestsCount # check waitFor(test2()) == TestsCount
test "Multiple clients with messages (100 clients x 50 messages each)": test "Multiple clients with messages":
echo waitFor(test3()) echo waitFor(test3())

View File

@ -2,8 +2,8 @@ import strutils, net, unittest
import ../asyncdispatch2 import ../asyncdispatch2
const const
ClientsCount = 2 ClientsCount = 1
MessagesCount = 1000 MessagesCount = 100000
proc serveClient1(transp: StreamTransport, udata: pointer) {.async.} = proc serveClient1(transp: StreamTransport, udata: pointer) {.async.} =
echo "SERVER STARTING (0x" & toHex[uint](cast[uint](transp)) & ")" echo "SERVER STARTING (0x" & toHex[uint](cast[uint](transp)) & ")"