Fix compilation warning at asyncfutures2.nim

Add spawn procedure
Add usage of spawn procedure in datagram.nim
Some fixes at stream.nim
Increase number of clients in testdatagram.nim
This commit is contained in:
cheatfate 2018-05-16 18:28:23 +03:00
parent bb36fc98a4
commit 868ae64ae0
5 changed files with 51 additions and 58 deletions

View File

@ -258,7 +258,7 @@ proc `$`*(entries: seq[StackTraceEntry]): string =
indent.inc(2) indent.inc(2)
else: else:
indent.dec(2) indent.dec(2)
result.add(spaces(indent)& "]#\n") result.add(spaces(indent) & "]#\n")
continue continue
let left = "$#($#)" % [$entry.filename, $entry.line] let left = "$#($#)" % [$entry.filename, $entry.line]
@ -349,6 +349,9 @@ proc asyncCheckProxy[T](udata: pointer) =
injectStacktrace(future) injectStacktrace(future)
raise future.error raise future.error
proc spawnProxy[T](udata: pointer) =
discard
proc asyncCheck*[T](future: Future[T]) = proc asyncCheck*[T](future: Future[T]) =
## Sets a callback on ``future`` which raises an exception if the future ## Sets a callback on ``future`` which raises an exception if the future
## finished with an error. ## finished with an error.
@ -361,6 +364,10 @@ proc asyncCheck*[T](future: Future[T]) =
# injectStacktrace(future) # injectStacktrace(future)
# raise future.error # raise future.error
proc spawn*[T](future: Future[T]) =
assert(not future.isNil, "Future is nil")
future.callback = spawnProxy[T]
proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] =
## Returns a future which will complete once both ``fut1`` and ``fut2`` ## Returns a future which will complete once both ``fut1`` and ``fut2``
## complete. ## complete.

View File

@ -156,12 +156,12 @@ when defined(windows):
transp.state.incl(ReadEof) transp.state.incl(ReadEof)
transp.state.incl(ReadPaused) transp.state.incl(ReadPaused)
fromSockAddr(transp.raddr, transp.ralen, raddr.address, raddr.port) fromSockAddr(transp.raddr, transp.ralen, raddr.address, raddr.port)
discard transp.function(transp, addr transp.buffer[0], bytesCount, spawn transp.function(transp, addr transp.buffer[0], bytesCount,
raddr, transp.udata) raddr, transp.udata)
else: else:
transp.setReadError(err) transp.setReadError(err)
transp.state.incl(ReadPaused) transp.state.incl(ReadPaused)
discard transp.function(transp, nil, 0, raddr, transp.udata) spawn transp.function(transp, nil, 0, raddr, transp.udata)
else: else:
## Initiation ## Initiation
if (ReadEof notin transp.state) and (ReadClosed notin transp.state): if (ReadEof notin transp.state) and (ReadClosed notin transp.state):
@ -184,7 +184,7 @@ when defined(windows):
elif int(err) != ERROR_IO_PENDING: elif int(err) != ERROR_IO_PENDING:
transp.state.excl(ReadPending) transp.state.excl(ReadPending)
transp.setReadError(err) transp.setReadError(err)
discard transp.function(transp, nil, 0, raddr, transp.udata) spawn transp.function(transp, nil, 0, raddr, transp.udata)
break break
proc resumeRead(transp: DatagramTransport) {.inline.} = proc resumeRead(transp: DatagramTransport) {.inline.} =
@ -307,7 +307,7 @@ else:
addr slen) addr slen)
if res >= 0: if res >= 0:
fromSockAddr(saddr, slen, raddr.address, raddr.port) fromSockAddr(saddr, slen, raddr.address, raddr.port)
discard transp.function(transp, addr transp.buffer[0], res, spawn transp.function(transp, addr transp.buffer[0], res,
raddr, transp.udata) raddr, transp.udata)
else: else:
let err = osLastError() let err = osLastError()
@ -315,7 +315,7 @@ else:
continue continue
else: else:
transp.setReadError(err) transp.setReadError(err)
discard transp.function(transp, nil, 0, raddr, transp.udata) spawn transp.function(transp, nil, 0, raddr, transp.udata)
break break
proc writeDatagramLoop(udata: pointer) = proc writeDatagramLoop(udata: pointer) =
@ -461,7 +461,7 @@ proc join*(transp: DatagramTransport) {.async.} =
await transp.future await transp.future
proc send*(transp: DatagramTransport, pbytes: pointer, proc send*(transp: DatagramTransport, pbytes: pointer,
nbytes: int): Future[void] {.async.} = nbytes: int) {.async.} =
checkClosed(transp) checkClosed(transp)
if transp.remote.port == Port(0): if transp.remote.port == Port(0):
raise newException(TransportError, "Remote peer is not set!") raise newException(TransportError, "Remote peer is not set!")
@ -481,7 +481,7 @@ proc send*(transp: DatagramTransport, pbytes: pointer,
raise transp.getError() raise transp.getError()
proc sendTo*(transp: DatagramTransport, pbytes: pointer, nbytes: int, proc sendTo*(transp: DatagramTransport, pbytes: pointer, nbytes: int,
remote: TransportAddress): Future[void] {.async.} = remote: TransportAddress) {.async.} =
checkClosed(transp) checkClosed(transp)
var saddr: Sockaddr_storage var saddr: Sockaddr_storage
var slen: SockLen var slen: SockLen

View File

@ -21,7 +21,7 @@ when defined(windows):
type type
StreamVector = object StreamVector = object
kind: VectorKind # Writer vector source kind kind: VectorKind # Writer vector source kind
dataBuf: TWSABuf # Writer vector buffer dataBuf: ptr TWSABuf # Writer vector buffer
offset: uint # Writer vector offset offset: uint # Writer vector offset
writer: Future[void] # Writer vector completion Future writer: Future[void] # Writer vector completion Future
@ -121,7 +121,6 @@ template checkPending(t: untyped) =
template shiftBuffer(t, c: untyped) = template shiftBuffer(t, c: untyped) =
if (t).offset > c: if (t).offset > c:
echo "moveMem(" & $int((t).offset) & ", " & $int(c) & ")"
moveMem(addr((t).buffer[0]), addr((t).buffer[(c)]), (t).offset - (c)) moveMem(addr((t).buffer[0]), addr((t).buffer[(c)]), (t).offset - (c))
(t).offset = (t).offset - (c) (t).offset = (t).offset - (c)
else: else:
@ -176,18 +175,12 @@ when defined(windows):
cast[uint](addr t.buffer[0]) + uint((t).roffset)) cast[uint](addr t.buffer[0]) + uint((t).roffset))
(t).wsabuf.len = int32(len((t).buffer) - (t).roffset) (t).wsabuf.len = int32(len((t).buffer) - (t).roffset)
template initBufferStreamVector(v, p, n, t: untyped) = # template initTransmitStreamVector(v, h, o, n, t: untyped) =
(v).kind = DataBuffer # (v).kind = DataFile
(v).dataBuf.buf = cast[cstring]((p)) # (v).dataBuf.buf = cast[cstring]((n))
(v).dataBuf.len = cast[int32](n) # (v).dataBuf.len = cast[int32]((h))
(v).writer = (t) # (v).offset = cast[uint]((o))
# (v).writer = (t)
template initTransmitStreamVector(v, h, o, n, t: untyped) =
(v).kind = DataFile
(v).dataBuf.buf = cast[cstring]((n))
(v).dataBuf.len = cast[int32]((h))
(v).offset = cast[uint]((o))
(v).writer = (t)
proc writeStreamLoop(udata: pointer) {.gcsafe.} = proc writeStreamLoop(udata: pointer) {.gcsafe.} =
var bytesCount: int32 var bytesCount: int32
@ -220,12 +213,6 @@ when defined(windows):
transp.queue.addFirst(vector) transp.queue.addFirst(vector)
else: else:
vector.writer.complete() vector.writer.complete()
elif transp.kind in {TransportKind.Pipe, TransportKind.File}:
if bytesCount < vector.dataBuf.len:
vector.slideBuffer(bytesCount)
transp.queue.addFirst(vector)
else:
vector.writer.complete()
else: else:
transp.setWriteError(err) transp.setWriteError(err)
transp.finishWriter() transp.finishWriter()
@ -234,20 +221,26 @@ when defined(windows):
transp.state.incl(WritePending) transp.state.incl(WritePending)
if transp.kind == TransportKind.Socket: if transp.kind == TransportKind.Socket:
let sock = SocketHandle(transp.wovl.data.fd) let sock = SocketHandle(transp.wovl.data.fd)
if transp.queue[0].kind == VectorKind.DataBuffer: var vector = transp.queue.popFirst()
if vector.kind == VectorKind.DataBuffer:
transp.wovl.zeroOvelappedOffset() transp.wovl.zeroOvelappedOffset()
let ret = WSASend(sock, addr transp.queue[0].dataBuf, 1, let ret = WSASend(sock, vector.dataBuf, 1,
addr bytesCount, DWORD(0), addr bytesCount, DWORD(0),
cast[POVERLAPPED](addr transp.wovl), nil) cast[POVERLAPPED](addr transp.wovl), nil)
if ret != 0: if ret != 0:
let err = osLastError() let err = osLastError()
if int32(err) != ERROR_IO_PENDING: if int(err) == ERROR_OPERATION_ABORTED:
transp.state.incl(WritePaused)
elif int(err) == ERROR_IO_PENDING:
transp.queue.addFirst(vector)
else:
transp.state.excl(WritePending) transp.state.excl(WritePending)
transp.setWriteError(err) transp.setWriteError(err)
transp.finishWriter() transp.finishWriter()
else:
transp.queue.addFirst(vector)
else: else:
let loop = getGlobalDispatcher() let loop = getGlobalDispatcher()
var vector = transp.queue[0]
var size: int32 var size: int32
var flags: int32 var flags: int32
@ -257,33 +250,21 @@ when defined(windows):
size = int32(getFileSize(vector)) size = int32(getFileSize(vector))
transp.wovl.setOverlappedOffset(vector.offset) transp.wovl.setOverlappedOffset(vector.offset)
var ret = loop.transmitFile(sock, getFileHandle(vector), size, 0, var ret = loop.transmitFile(sock, getFileHandle(vector), size, 0,
cast[POVERLAPPED](addr transp.wovl), cast[POVERLAPPED](addr transp.wovl),
nil, flags) nil, flags)
if ret == 0: if ret == 0:
let err = osLastError() let err = osLastError()
if int32(err) != ERROR_IO_PENDING: if int(err) == ERROR_OPERATION_ABORTED:
transp.state.incl(WritePaused)
elif int(err) == ERROR_IO_PENDING:
transp.queue.addFirst(vector)
else:
transp.state.excl(WritePending) transp.state.excl(WritePending)
transp.setWriteError(err) transp.setWriteError(err)
transp.finishWriter() transp.finishWriter()
elif transp.kind in {TransportKind.Pipe, TransportKind.File}: else:
let fd = Handle(transp.wovl.data.fd) transp.queue.addFirst(vector)
var vector = transp.queue[0]
if transp.kind == TransportKind.File:
transp.wovl.setOverlappedOffset(vector.offset)
else:
transp.wovl.zeroOvelappedOffset()
var ret = writeFile(fd, vector.dataBuf.buf, vector.dataBuf.len, nil,
cast[POVERLAPPED](addr transp.wovl))
if ret == 0:
let err = osLastError()
if int32(err) != ERROR_IO_PENDING:
transp.state.excl(WritePending)
transp.setWriteError(err)
transp.finishWriter()
break break
if len(transp.queue) == 0: if len(transp.queue) == 0:
@ -501,7 +482,7 @@ when defined(windows):
if not acceptFut.failed: if not acceptFut.failed:
var sock = acceptFut.read() var sock = acceptFut.read()
if sock != asyncInvalidSocket: if sock != asyncInvalidSocket:
discard server.function( spawn server.function(
newStreamSocketTransport(sock, server.bufferSize), newStreamSocketTransport(sock, server.bufferSize),
server.udata) server.udata)
@ -668,7 +649,7 @@ else:
if int(res) > 0: if int(res) > 0:
let sock = wrapAsyncSocket(res) let sock = wrapAsyncSocket(res)
if sock != asyncInvalidSocket: if sock != asyncInvalidSocket:
discard server.function( spawn server.function(
newStreamSocketTransport(sock, server.bufferSize), newStreamSocketTransport(sock, server.bufferSize),
server.udata) server.udata)
break break
@ -784,8 +765,13 @@ proc write*(transp: StreamTransport, pbytes: pointer,
nbytes: int): Future[int] {.async.} = nbytes: int): Future[int] {.async.} =
checkClosed(transp) checkClosed(transp)
var waitFuture = newFuture[void]("transport.write") var waitFuture = newFuture[void]("transport.write")
var vector: StreamVector var vector = StreamVector(kind: DataBuffer, writer: waitFuture)
vector.initBufferStreamVector(pbytes, nbytes, waitFuture) when defined(windows):
var wsabuf = TWSABuf(buf: cast[cstring](pbytes), len: cast[int32](nbytes))
vector.dataBuf = addr wsabuf
else:
vector.buf = pbytes
vector.buflen = nbytes
transp.queue.addLast(vector) transp.queue.addLast(vector)
if WritePaused in transp.state: if WritePaused in transp.state:
transp.resumeWrite() transp.resumeWrite()

View File

@ -3,7 +3,7 @@ import ../asyncdispatch2
const const
TestsCount = 5000 TestsCount = 5000
ClientsCount = 2 ClientsCount = 50
MessagesCount = 350 MessagesCount = 350
when defined(vcc): when defined(vcc):

View File

@ -2,8 +2,8 @@ import strutils, net, unittest
import ../asyncdispatch2 import ../asyncdispatch2
const const
ClientsCount = 10 ClientsCount = 2
MessagesCount = 100 MessagesCount = 1000
proc serveClient1(transp: StreamTransport, udata: pointer) {.async.} = proc serveClient1(transp: StreamTransport, udata: pointer) {.async.} =
echo "SERVER STARTING (0x" & toHex[uint](cast[uint](transp)) & ")" echo "SERVER STARTING (0x" & toHex[uint](cast[uint](transp)) & ")"