diff --git a/asyncdispatch2/asyncfutures2.nim b/asyncdispatch2/asyncfutures2.nim index 66b2a58..e16449f 100644 --- a/asyncdispatch2/asyncfutures2.nim +++ b/asyncdispatch2/asyncfutures2.nim @@ -258,7 +258,7 @@ proc `$`*(entries: seq[StackTraceEntry]): string = indent.inc(2) else: indent.dec(2) - result.add(spaces(indent)& "]#\n") + result.add(spaces(indent) & "]#\n") continue let left = "$#($#)" % [$entry.filename, $entry.line] @@ -349,6 +349,9 @@ proc asyncCheckProxy[T](udata: pointer) = injectStacktrace(future) raise future.error +proc spawnProxy[T](udata: pointer) = + discard + proc asyncCheck*[T](future: Future[T]) = ## Sets a callback on ``future`` which raises an exception if the future ## finished with an error. @@ -361,6 +364,10 @@ proc asyncCheck*[T](future: Future[T]) = # injectStacktrace(future) # raise future.error +proc spawn*[T](future: Future[T]) = + assert(not future.isNil, "Future is nil") + future.callback = spawnProxy[T] + proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = ## Returns a future which will complete once both ``fut1`` and ``fut2`` ## complete. diff --git a/asyncdispatch2/transports/datagram.nim b/asyncdispatch2/transports/datagram.nim index 3ed4f92..af09456 100644 --- a/asyncdispatch2/transports/datagram.nim +++ b/asyncdispatch2/transports/datagram.nim @@ -156,12 +156,12 @@ when defined(windows): transp.state.incl(ReadEof) transp.state.incl(ReadPaused) fromSockAddr(transp.raddr, transp.ralen, raddr.address, raddr.port) - discard transp.function(transp, addr transp.buffer[0], bytesCount, + spawn transp.function(transp, addr transp.buffer[0], bytesCount, raddr, transp.udata) else: transp.setReadError(err) transp.state.incl(ReadPaused) - discard transp.function(transp, nil, 0, raddr, transp.udata) + spawn transp.function(transp, nil, 0, raddr, transp.udata) else: ## Initiation if (ReadEof notin transp.state) and (ReadClosed notin transp.state): @@ -184,7 +184,7 @@ when defined(windows): elif int(err) != ERROR_IO_PENDING: transp.state.excl(ReadPending) transp.setReadError(err) - discard transp.function(transp, nil, 0, raddr, transp.udata) + spawn transp.function(transp, nil, 0, raddr, transp.udata) break proc resumeRead(transp: DatagramTransport) {.inline.} = @@ -307,7 +307,7 @@ else: addr slen) if res >= 0: fromSockAddr(saddr, slen, raddr.address, raddr.port) - discard transp.function(transp, addr transp.buffer[0], res, + spawn transp.function(transp, addr transp.buffer[0], res, raddr, transp.udata) else: let err = osLastError() @@ -315,7 +315,7 @@ else: continue else: transp.setReadError(err) - discard transp.function(transp, nil, 0, raddr, transp.udata) + spawn transp.function(transp, nil, 0, raddr, transp.udata) break proc writeDatagramLoop(udata: pointer) = @@ -461,7 +461,7 @@ proc join*(transp: DatagramTransport) {.async.} = await transp.future proc send*(transp: DatagramTransport, pbytes: pointer, - nbytes: int): Future[void] {.async.} = + nbytes: int) {.async.} = checkClosed(transp) if transp.remote.port == Port(0): raise newException(TransportError, "Remote peer is not set!") @@ -481,7 +481,7 @@ proc send*(transp: DatagramTransport, pbytes: pointer, raise transp.getError() proc sendTo*(transp: DatagramTransport, pbytes: pointer, nbytes: int, - remote: TransportAddress): Future[void] {.async.} = + remote: TransportAddress) {.async.} = checkClosed(transp) var saddr: Sockaddr_storage var slen: SockLen diff --git a/asyncdispatch2/transports/stream.nim b/asyncdispatch2/transports/stream.nim index aa07834..3452c44 100644 --- a/asyncdispatch2/transports/stream.nim +++ b/asyncdispatch2/transports/stream.nim @@ -21,7 +21,7 @@ when defined(windows): type StreamVector = object kind: VectorKind # Writer vector source kind - dataBuf: TWSABuf # Writer vector buffer + dataBuf: ptr TWSABuf # Writer vector buffer offset: uint # Writer vector offset writer: Future[void] # Writer vector completion Future @@ -121,7 +121,6 @@ template checkPending(t: untyped) = template shiftBuffer(t, c: untyped) = if (t).offset > c: - echo "moveMem(" & $int((t).offset) & ", " & $int(c) & ")" moveMem(addr((t).buffer[0]), addr((t).buffer[(c)]), (t).offset - (c)) (t).offset = (t).offset - (c) else: @@ -176,18 +175,12 @@ when defined(windows): cast[uint](addr t.buffer[0]) + uint((t).roffset)) (t).wsabuf.len = int32(len((t).buffer) - (t).roffset) - template initBufferStreamVector(v, p, n, t: untyped) = - (v).kind = DataBuffer - (v).dataBuf.buf = cast[cstring]((p)) - (v).dataBuf.len = cast[int32](n) - (v).writer = (t) - - template initTransmitStreamVector(v, h, o, n, t: untyped) = - (v).kind = DataFile - (v).dataBuf.buf = cast[cstring]((n)) - (v).dataBuf.len = cast[int32]((h)) - (v).offset = cast[uint]((o)) - (v).writer = (t) + # template initTransmitStreamVector(v, h, o, n, t: untyped) = + # (v).kind = DataFile + # (v).dataBuf.buf = cast[cstring]((n)) + # (v).dataBuf.len = cast[int32]((h)) + # (v).offset = cast[uint]((o)) + # (v).writer = (t) proc writeStreamLoop(udata: pointer) {.gcsafe.} = var bytesCount: int32 @@ -220,12 +213,6 @@ when defined(windows): transp.queue.addFirst(vector) else: vector.writer.complete() - elif transp.kind in {TransportKind.Pipe, TransportKind.File}: - if bytesCount < vector.dataBuf.len: - vector.slideBuffer(bytesCount) - transp.queue.addFirst(vector) - else: - vector.writer.complete() else: transp.setWriteError(err) transp.finishWriter() @@ -234,20 +221,26 @@ when defined(windows): transp.state.incl(WritePending) if transp.kind == TransportKind.Socket: let sock = SocketHandle(transp.wovl.data.fd) - if transp.queue[0].kind == VectorKind.DataBuffer: + var vector = transp.queue.popFirst() + if vector.kind == VectorKind.DataBuffer: transp.wovl.zeroOvelappedOffset() - let ret = WSASend(sock, addr transp.queue[0].dataBuf, 1, + let ret = WSASend(sock, vector.dataBuf, 1, addr bytesCount, DWORD(0), cast[POVERLAPPED](addr transp.wovl), nil) if ret != 0: let err = osLastError() - if int32(err) != ERROR_IO_PENDING: + if int(err) == ERROR_OPERATION_ABORTED: + transp.state.incl(WritePaused) + elif int(err) == ERROR_IO_PENDING: + transp.queue.addFirst(vector) + else: transp.state.excl(WritePending) transp.setWriteError(err) transp.finishWriter() + else: + transp.queue.addFirst(vector) else: let loop = getGlobalDispatcher() - var vector = transp.queue[0] var size: int32 var flags: int32 @@ -257,33 +250,21 @@ when defined(windows): size = int32(getFileSize(vector)) transp.wovl.setOverlappedOffset(vector.offset) - var ret = loop.transmitFile(sock, getFileHandle(vector), size, 0, cast[POVERLAPPED](addr transp.wovl), nil, flags) if ret == 0: let err = osLastError() - if int32(err) != ERROR_IO_PENDING: + if int(err) == ERROR_OPERATION_ABORTED: + transp.state.incl(WritePaused) + elif int(err) == ERROR_IO_PENDING: + transp.queue.addFirst(vector) + else: transp.state.excl(WritePending) transp.setWriteError(err) transp.finishWriter() - elif transp.kind in {TransportKind.Pipe, TransportKind.File}: - let fd = Handle(transp.wovl.data.fd) - var vector = transp.queue[0] - - if transp.kind == TransportKind.File: - transp.wovl.setOverlappedOffset(vector.offset) - else: - transp.wovl.zeroOvelappedOffset() - - var ret = writeFile(fd, vector.dataBuf.buf, vector.dataBuf.len, nil, - cast[POVERLAPPED](addr transp.wovl)) - if ret == 0: - let err = osLastError() - if int32(err) != ERROR_IO_PENDING: - transp.state.excl(WritePending) - transp.setWriteError(err) - transp.finishWriter() + else: + transp.queue.addFirst(vector) break if len(transp.queue) == 0: @@ -501,7 +482,7 @@ when defined(windows): if not acceptFut.failed: var sock = acceptFut.read() if sock != asyncInvalidSocket: - discard server.function( + spawn server.function( newStreamSocketTransport(sock, server.bufferSize), server.udata) @@ -668,7 +649,7 @@ else: if int(res) > 0: let sock = wrapAsyncSocket(res) if sock != asyncInvalidSocket: - discard server.function( + spawn server.function( newStreamSocketTransport(sock, server.bufferSize), server.udata) break @@ -784,8 +765,13 @@ proc write*(transp: StreamTransport, pbytes: pointer, nbytes: int): Future[int] {.async.} = checkClosed(transp) var waitFuture = newFuture[void]("transport.write") - var vector: StreamVector - vector.initBufferStreamVector(pbytes, nbytes, waitFuture) + var vector = StreamVector(kind: DataBuffer, writer: waitFuture) + when defined(windows): + var wsabuf = TWSABuf(buf: cast[cstring](pbytes), len: cast[int32](nbytes)) + vector.dataBuf = addr wsabuf + else: + vector.buf = pbytes + vector.buflen = nbytes transp.queue.addLast(vector) if WritePaused in transp.state: transp.resumeWrite() diff --git a/tests/testdatagram.nim b/tests/testdatagram.nim index f709678..3ae5c90 100644 --- a/tests/testdatagram.nim +++ b/tests/testdatagram.nim @@ -3,7 +3,7 @@ import ../asyncdispatch2 const TestsCount = 5000 - ClientsCount = 2 + ClientsCount = 50 MessagesCount = 350 when defined(vcc): diff --git a/tests/teststream.nim b/tests/teststream.nim index 552361f..e6e0828 100644 --- a/tests/teststream.nim +++ b/tests/teststream.nim @@ -2,8 +2,8 @@ import strutils, net, unittest import ../asyncdispatch2 const - ClientsCount = 10 - MessagesCount = 100 + ClientsCount = 2 + MessagesCount = 1000 proc serveClient1(transp: StreamTransport, udata: pointer) {.async.} = echo "SERVER STARTING (0x" & toHex[uint](cast[uint](transp)) & ")"