Fix datagram: for send/sendTo (string, seq[T]) versions.

Fix stream: for write (string, seq[T]) versions
More tests for both datagram/stream.
This commit is contained in:
cheatfate 2018-06-05 23:21:07 +03:00
parent 2e6697d0d7
commit b8e8d96f3b
5 changed files with 407 additions and 136 deletions

View File

@ -8,7 +8,7 @@
# MIT license (LICENSE-MIT) # MIT license (LICENSE-MIT)
import net, nativesockets, strutils import net, nativesockets, strutils
import ../asyncloop, ../asyncsync import ../asyncloop
const const
DefaultStreamBufferSize* = 4096 ## Default buffer size for stream DefaultStreamBufferSize* = 4096 ## Default buffer size for stream
@ -38,14 +38,20 @@ type
Running, # Server running Running, # Server running
Closed # Server closed Closed # Server closed
FutureGCString*[T] = ref object of Future[T]
## Future to hold GC strings
gcholder*: string
FutureGCSeq*[A, B] = ref object of Future[A]
## Future to hold GC seqs
gcholder*: seq[B]
when defined(windows): when defined(windows):
type type
SocketServer* = ref object of RootRef SocketServer* = ref object of RootRef
## Socket server object ## Socket server object
sock*: AsyncFD # Socket sock*: AsyncFD # Socket
local*: TransportAddress # Address local*: TransportAddress # Address
# actEvent*: AsyncEvent # Activation event
# action*: ServerCommand # Activation command
status*: ServerStatus # Current server status status*: ServerStatus # Current server status
udata*: pointer # User-defined pointer udata*: pointer # User-defined pointer
flags*: set[ServerFlags] # Flags flags*: set[ServerFlags] # Flags
@ -62,8 +68,6 @@ else:
## Socket server object ## Socket server object
sock*: AsyncFD # Socket sock*: AsyncFD # Socket
local*: TransportAddress # Address local*: TransportAddress # Address
# actEvent*: AsyncEvent # Activation event
# action*: ServerCommand # Activation command
status*: ServerStatus # Current server status status*: ServerStatus # Current server status
udata*: pointer # User-defined pointer udata*: pointer # User-defined pointer
flags*: set[ServerFlags] # Flags flags*: set[ServerFlags] # Flags
@ -194,6 +198,11 @@ template checkClosed*(t: untyped) =
if (ReadClosed in (t).state) or (WriteClosed in (t).state): if (ReadClosed in (t).state) or (WriteClosed in (t).state):
raise newException(TransportError, "Transport is already closed!") raise newException(TransportError, "Transport is already closed!")
template checkClosed*(t: untyped, future: untyped) =
if (ReadClosed in (t).state) or (WriteClosed in (t).state):
future.fail(newException(TransportError, "Transport is already closed!"))
return future
template getError*(t: untyped): ref Exception = template getError*(t: untyped): ref Exception =
var err = (t).error var err = (t).error
(t).error = nil (t).error = nil

View File

@ -63,10 +63,6 @@ template setReadError(t, e: untyped) =
(t).state.incl(ReadError) (t).state.incl(ReadError)
(t).error = newException(TransportOsError, osErrorMsg((e))) (t).error = newException(TransportOsError, osErrorMsg((e)))
template setWriteError(t, e: untyped) =
(t).state.incl(WriteError)
(t).error = newException(TransportOsError, osErrorMsg((e)))
template setWriterWSABuffer(t, v: untyped) = template setWriterWSABuffer(t, v: untyped) =
(t).wwsabuf.buf = cast[cstring](v.buf) (t).wwsabuf.buf = cast[cstring](v.buf)
(t).wwsabuf.len = cast[int32](v.buflen) (t).wwsabuf.len = cast[int32](v.buflen)
@ -84,10 +80,6 @@ when defined(windows):
wlen: SockLen # Writer address length wlen: SockLen # Writer address length
wwsabuf: TWSABuf # Writer WSABUF structure wwsabuf: TWSABuf # Writer WSABUF structure
template finishWriter(t: untyped) =
var vv = (t).queue.popFirst()
vv.writer.complete()
proc writeDatagramLoop(udata: pointer) = proc writeDatagramLoop(udata: pointer) =
var bytesCount: int32 var bytesCount: int32
var ovl = cast[PtrCustomOverlapped](udata) var ovl = cast[PtrCustomOverlapped](udata)
@ -97,16 +89,16 @@ when defined(windows):
## Continuation ## Continuation
transp.state.excl(WritePending) transp.state.excl(WritePending)
let err = transp.wovl.data.errCode let err = transp.wovl.data.errCode
let vector = transp.queue.popFirst()
if err == OSErrorCode(-1): if err == OSErrorCode(-1):
discard vector.writer.complete()
elif int(err) == ERROR_OPERATION_ABORTED: elif int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt # CancelIO() interrupt
transp.state.incl(WritePaused) transp.state.incl(WritePaused)
transp.finishWriter() vector.writer.complete()
break
else: else:
transp.setWriteError(err) transp.state = transp.state + {WritePaused, WriteError}
transp.finishWriter() vector.writer.fail(newException(TransportOsError, osErrorMsg(err)))
else: else:
## Initiation ## Initiation
transp.state.incl(WritePending) transp.state.incl(WritePending)
@ -129,12 +121,13 @@ when defined(windows):
if int(err) == ERROR_OPERATION_ABORTED: if int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt # CancelIO() interrupt
transp.state.incl(WritePaused) transp.state.incl(WritePaused)
vector.writer.complete()
elif int(err) == ERROR_IO_PENDING: elif int(err) == ERROR_IO_PENDING:
transp.queue.addFirst(vector) transp.queue.addFirst(vector)
else: else:
transp.state.excl(WritePending) transp.state.excl(WritePending)
transp.setWriteError(err) transp.state = transp.state + {WritePaused, WriteError}
vector.writer.complete() vector.writer.fail(newException(TransportOsError, osErrorMsg(err)))
else: else:
transp.queue.addFirst(vector) transp.queue.addFirst(vector)
break break
@ -190,12 +183,14 @@ when defined(windows):
transp.state.excl(ReadPending) transp.state.excl(ReadPending)
transp.state.incl(ReadPaused) transp.state.incl(ReadPaused)
elif int(err) == WSAECONNRESET: elif int(err) == WSAECONNRESET:
transp.state = {ReadPaused, ReadEof} transp.state.excl(ReadPending)
transp.state = transp.state + {ReadPaused, ReadEof}
break break
elif int(err) == ERROR_IO_PENDING: elif int(err) == ERROR_IO_PENDING:
discard discard
else: else:
transp.state.excl(ReadPending) transp.state.excl(ReadPending)
transp.state.incl(ReadPaused)
transp.setReadError(err) transp.setReadError(err)
discard transp.function(transp, nil, 0, raddr, transp.udata) discard transp.function(transp, nil, 0, raddr, transp.udata)
break break
@ -325,7 +320,7 @@ else:
raddr: TransportAddress raddr: TransportAddress
var cdata = cast[ptr CompletionData](udata) var cdata = cast[ptr CompletionData](udata)
if not isNil(cdata) and int(cdata.fd) == 0: if not isNil(cdata) and (int(cdata.fd) == 0 or isNil(cdata.udata)):
# Transport was closed earlier, exiting # Transport was closed earlier, exiting
return return
var transp = cast[DatagramTransport](cdata.udata) var transp = cast[DatagramTransport](cdata.udata)
@ -380,8 +375,8 @@ else:
if int(err) == EINTR: if int(err) == EINTR:
continue continue
else: else:
transp.setWriteError(err) vector.writer.fail(newException(TransportOsError,
vector.writer.complete() osErrorMsg(err)))
break break
else: else:
transp.state.incl(WritePaused) transp.state.incl(WritePaused)
@ -525,61 +520,93 @@ proc join*(transp: DatagramTransport) {.async.} =
await transp.future await transp.future
proc send*(transp: DatagramTransport, pbytes: pointer, proc send*(transp: DatagramTransport, pbytes: pointer,
nbytes: int) {.async.} = nbytes: int): Future[void] =
## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport ## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport
## ``transp`` to remote destination address which was bounded on transport. ## ``transp`` to remote destination address which was bounded on transport.
checkClosed(transp) var retFuture = newFuture[void]()
transp.checkClosed(retFuture)
if transp.remote.port == Port(0): if transp.remote.port == Port(0):
raise newException(TransportError, "Remote peer is not set!") retFuture.fail(newException(TransportError, "Remote peer not set!"))
var waitFuture = newFuture[void]("datagram.transport.send") return retFuture
var vector = GramVector(kind: WithoutAddress, buf: pbytes, buflen: nbytes, var vector = GramVector(kind: WithoutAddress, buf: pbytes, buflen: nbytes,
writer: waitFuture) writer: retFuture)
transp.queue.addLast(vector) transp.queue.addLast(vector)
if WritePaused in transp.state: if WritePaused in transp.state:
transp.resumeWrite() transp.resumeWrite()
await vector.writer return retFuture
if WriteError in transp.state:
raise transp.getError() proc send*(transp: DatagramTransport, msg: string): Future[void] =
## Send string ``msg`` using transport ``transp`` to remote destination
## address which was bounded on transport.
var retFuture = FutureGCString[void]()
transp.checkClosed(retFuture)
shallowCopy(retFuture.gcholder, msg)
let vector = GramVector(kind: WithoutAddress, buf: unsafeAddr msg[0],
buflen: len(msg),
writer: cast[Future[void]](retFuture))
transp.queue.addLast(vector)
if WritePaused in transp.state:
transp.resumeWrite()
return retFuture
proc send*[T](transp: DatagramTransport, msg: seq[T]): Future[void] =
## Send string ``msg`` using transport ``transp`` to remote destination
## address which was bounded on transport.
var retFuture = FutureGCSeq[void, T]()
transp.checkClosed(retFuture)
shallowCopy(retFuture.gcholder, msg)
let vector = GramVector(kind: WithoutAddress, buf: unsafeAddr msg[0],
buflen: (len(msg) * sizeof(T)),
writer: cast[Future[void]](retFuture))
transp.queue.addLast(vector)
if WritePaused in transp.state:
transp.resumeWrite()
return retFuture
proc sendTo*(transp: DatagramTransport, pbytes: pointer, nbytes: int, proc sendTo*(transp: DatagramTransport, pbytes: pointer, nbytes: int,
remote: TransportAddress) {.async.} = remote: TransportAddress): Future[void] =
## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport ## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport
## ``transp`` to remote destination address ``remote``. ## ``transp`` to remote destination address ``remote``.
checkClosed(transp) var retFuture = newFuture[void]()
var saddr: Sockaddr_storage transp.checkClosed(retFuture)
var slen: SockLen let vector = GramVector(kind: WithAddress, buf: pbytes, buflen: nbytes,
toSockAddr(remote.address, remote.port, saddr, slen) writer: retFuture, address: remote)
var waitFuture = newFuture[void]("datagram.transport.sendto")
var vector = GramVector(kind: WithAddress, buf: pbytes, buflen: nbytes,
writer: waitFuture, address: remote)
transp.queue.addLast(vector) transp.queue.addLast(vector)
if WritePaused in transp.state: if WritePaused in transp.state:
transp.resumeWrite() transp.resumeWrite()
await vector.writer return retFuture
if WriteError in transp.state:
raise transp.getError()
template send*(transp: DatagramTransport, msg: var string): untyped = proc sendTo*(transp: DatagramTransport, msg: string,
## Send message ``msg`` using transport ``transp`` to remote destination remote: TransportAddress): Future[void] =
## address which was bounded on transport. ## Send string ``msg`` using transport ``transp`` to remote destination
send(transp, addr msg[0], len(msg)) ## address ``remote``.
var retFuture = FutureGCString[void]()
transp.checkClosed(retFuture)
shallowCopy(retFuture.gcholder, msg)
let vector = GramVector(kind: WithAddress, buf: unsafeAddr msg[0],
buflen: len(msg),
writer: cast[Future[void]](retFuture),
address: remote)
transp.queue.addLast(vector)
if WritePaused in transp.state:
transp.resumeWrite()
return retFuture
template send*(transp: DatagramTransport, msg: var seq[byte]): untyped = proc sendTo*[T](transp: DatagramTransport, msg: seq[T],
## Send message ``msg`` using transport ``transp`` to remote destination remote: TransportAddress): Future[void] =
## address which was bounded on transport. ## Send sequence ``msg`` using transport ``transp`` to remote destination
send(transp, addr msg[0], len(msg)) ## address ``remote``.
var retFuture = FutureGCSeq[void, T]()
template sendTo*(transp: DatagramTransport, msg: var string, transp.checkClosed(retFuture)
remote: TransportAddress): untyped = shallowCopy(retFuture.gcholder, msg)
## Send message ``msg`` using transport ``transp`` to remote let vector = GramVector(kind: WithAddress, buf: unsafeAddr msg[0],
## destination address ``remote``. buflen: (len(msg) * sizeof(T)),
sendTo(transp, addr msg[0], len(msg), remote) writer: cast[Future[void]](retFuture),
address: remote)
template sendTo*(transp: DatagramTransport, msg: var seq[byte], transp.queue.addLast(vector)
remote: TransportAddress): untyped = if WritePaused in transp.state:
## Send message ``msg`` using transport ``transp`` to remote transp.resumeWrite()
## destination address ``remote``. return retFuture
sendTo(transp, addr msg[0], len(msg), remote)
proc createDatagramServer*(host: TransportAddress, proc createDatagramServer*(host: TransportAddress,
cbproc: DatagramCallback, cbproc: DatagramCallback,

View File

@ -21,13 +21,12 @@ type
DataBuffer, # Simple buffer pointer/length DataBuffer, # Simple buffer pointer/length
DataFile # File handle for sendfile/TransmitFile DataFile # File handle for sendfile/TransmitFile
type
StreamVector = object StreamVector = object
kind: VectorKind # Writer vector source kind kind: VectorKind # Writer vector source kind
buf: pointer # Writer buffer pointer buf: pointer # Writer buffer pointer
buflen: int # Writer buffer size buflen: int # Writer buffer size
offset: uint # Writer vector offset offset: uint # Writer vector offset
writer: Future[void] # Writer vector completion Future writer: Future[int] # Writer vector completion Future
TransportKind* {.pure.} = enum TransportKind* {.pure.} = enum
Socket, # Socket transport Socket, # Socket transport
@ -102,9 +101,9 @@ template setReadError(t, e: untyped) =
(t).state.incl(ReadError) (t).state.incl(ReadError)
(t).error = newException(TransportOsError, osErrorMsg((e))) (t).error = newException(TransportOsError, osErrorMsg((e)))
template setWriteError(t, e: untyped) = # template setWriteError(t, e: untyped) =
(t).state.incl(WriteError) # (t).state.incl(WriteError)
(t).error = newException(TransportOsError, osErrorMsg((e))) # (t).error = newException(TransportOsError, osErrorMsg((e)))
template finishReader(t: untyped) = template finishReader(t: untyped) =
var reader = (t).reader var reader = (t).reader
@ -141,10 +140,6 @@ when defined(windows):
const SO_UPDATE_CONNECT_CONTEXT = 0x7010 const SO_UPDATE_CONNECT_CONTEXT = 0x7010
template finishWriter(t: untyped) =
var vv = (t).queue.popFirst()
vv.writer.complete()
template zeroOvelappedOffset(t: untyped) = template zeroOvelappedOffset(t: untyped) =
(t).offset = 0 (t).offset = 0
(t).offsetHigh = 0 (t).offsetHigh = 0
@ -186,7 +181,7 @@ when defined(windows):
bytesCount = transp.wovl.data.bytesCount bytesCount = transp.wovl.data.bytesCount
var vector = transp.queue.popFirst() var vector = transp.queue.popFirst()
if bytesCount == 0: if bytesCount == 0:
vector.writer.complete() vector.writer.complete(0)
else: else:
if transp.kind == TransportKind.Socket: if transp.kind == TransportKind.Socket:
if vector.kind == VectorKind.DataBuffer: if vector.kind == VectorKind.DataBuffer:
@ -194,19 +189,23 @@ when defined(windows):
vector.shiftVectorBuffer(bytesCount) vector.shiftVectorBuffer(bytesCount)
transp.queue.addFirst(vector) transp.queue.addFirst(vector)
else: else:
vector.writer.complete() vector.writer.complete(transp.wwsabuf.len)
else: else:
if uint(bytesCount) < getFileSize(vector): if uint(bytesCount) < getFileSize(vector):
vector.shiftVectorFile(bytesCount) vector.shiftVectorFile(bytesCount)
transp.queue.addFirst(vector) transp.queue.addFirst(vector)
else: else:
vector.writer.complete() vector.writer.complete(int(getFileSize(vector)))
elif int(err) == ERROR_OPERATION_ABORTED: elif int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt # CancelIO() interrupt
transp.finishWriter() transp.state.incl(WritePaused)
let v = transp.queue.popFirst()
v.writer.complete(0)
break
else: else:
transp.setWriteError(err) let v = transp.queue.popFirst()
transp.finishWriter() transp.state.incl(WriteError)
v.writer.fail(newException(TransportOsError, osErrorMsg(err)))
else: else:
## Initiation ## Initiation
transp.state.incl(WritePending) transp.state.incl(WritePending)
@ -225,12 +224,14 @@ when defined(windows):
# CancelIO() interrupt # CancelIO() interrupt
transp.state.excl(WritePending) transp.state.excl(WritePending)
transp.state.incl(WritePaused) transp.state.incl(WritePaused)
vector.writer.complete(0)
elif int(err) == ERROR_IO_PENDING: elif int(err) == ERROR_IO_PENDING:
transp.queue.addFirst(vector) transp.queue.addFirst(vector)
else: else:
transp.state.excl(WritePending) transp.state.excl(WritePending)
transp.setWriteError(err) transp.state = transp.state + {WritePaused, WriteError}
vector.writer.complete() vector.writer.fail(newException(TransportOsError,
osErrorMsg(err)))
else: else:
transp.queue.addFirst(vector) transp.queue.addFirst(vector)
else: else:
@ -253,12 +254,14 @@ when defined(windows):
# CancelIO() interrupt # CancelIO() interrupt
transp.state.excl(WritePending) transp.state.excl(WritePending)
transp.state.incl(WritePaused) transp.state.incl(WritePaused)
vector.writer.complete(0)
elif int(err) == ERROR_IO_PENDING: elif int(err) == ERROR_IO_PENDING:
transp.queue.addFirst(vector) transp.queue.addFirst(vector)
else: else:
transp.state.excl(WritePending) transp.state.excl(WritePending)
transp.setWriteError(err) transp.state = transp.state + {WritePaused, WriteError}
vector.writer.complete() vector.writer.fail(newException(TransportOsError,
osErrorMsg(err)))
else: else:
transp.queue.addFirst(vector) transp.queue.addFirst(vector)
break break
@ -385,11 +388,11 @@ when defined(windows):
sock = createAsyncSocket(address.address.getDomain(), SockType.SOCK_STREAM, sock = createAsyncSocket(address.address.getDomain(), SockType.SOCK_STREAM,
Protocol.IPPROTO_TCP) Protocol.IPPROTO_TCP)
if sock == asyncInvalidSocket: if sock == asyncInvalidSocket:
result.fail(newException(OSError, osErrorMsg(osLastError()))) result.fail(newException(TransportOsError, osErrorMsg(osLastError())))
if not bindToDomain(sock, address.address.getDomain()): if not bindToDomain(sock, address.address.getDomain()):
sock.closeAsyncSocket() sock.closeAsyncSocket()
result.fail(newException(OSError, osErrorMsg(osLastError()))) result.fail(newException(TransportOsError, osErrorMsg(osLastError())))
proc continuation(udata: pointer) = proc continuation(udata: pointer) =
var ovl = cast[RefCustomOverlapped](udata) var ovl = cast[RefCustomOverlapped](udata)
@ -399,13 +402,15 @@ when defined(windows):
cint(SO_UPDATE_CONNECT_CONTEXT), nil, cint(SO_UPDATE_CONNECT_CONTEXT), nil,
SockLen(0)) != 0'i32: SockLen(0)) != 0'i32:
sock.closeAsyncSocket() sock.closeAsyncSocket()
retFuture.fail(newException(OSError, osErrorMsg(osLastError()))) retFuture.fail(newException(TransportOsError,
osErrorMsg(osLastError())))
else: else:
retFuture.complete(newStreamSocketTransport(povl.data.fd, retFuture.complete(newStreamSocketTransport(povl.data.fd,
bufferSize)) bufferSize))
else: else:
sock.closeAsyncSocket() sock.closeAsyncSocket()
retFuture.fail(newException(OSError, osErrorMsg(ovl.data.errCode))) retFuture.fail(newException(TransportOsError,
osErrorMsg(ovl.data.errCode)))
GC_unref(ovl) GC_unref(ovl)
povl = RefCustomOverlapped() povl = RefCustomOverlapped()
@ -421,7 +426,7 @@ when defined(windows):
if int32(err) != ERROR_IO_PENDING: if int32(err) != ERROR_IO_PENDING:
GC_unref(povl) GC_unref(povl)
sock.closeAsyncSocket() sock.closeAsyncSocket()
retFuture.fail(newException(OSError, osErrorMsg(err))) retFuture.fail(newException(TransportOsError, osErrorMsg(err)))
return retFuture return retFuture
proc acceptLoop(udata: pointer) {.gcsafe, nimcall.} = proc acceptLoop(udata: pointer) {.gcsafe, nimcall.} =
@ -442,7 +447,7 @@ when defined(windows):
addr server.sock, addr server.sock,
SockLen(sizeof(SocketHandle))) != 0'i32: SockLen(sizeof(SocketHandle))) != 0'i32:
server.asock.closeAsyncSocket() server.asock.closeAsyncSocket()
raiseOsError(osLastError()) raise newException(TransportOsError, osErrorMsg(osLastError()))
else: else:
discard server.function(server, discard server.function(server,
newStreamSocketTransport(server.asock, server.bufferSize), newStreamSocketTransport(server.asock, server.bufferSize),
@ -521,7 +526,7 @@ else:
proc writeStreamLoop(udata: pointer) {.gcsafe.} = proc writeStreamLoop(udata: pointer) {.gcsafe.} =
var cdata = cast[ptr CompletionData](udata) var cdata = cast[ptr CompletionData](udata)
if not isNil(cdata) and int(cdata.fd) == 0: if not isNil(cdata) and (int(cdata.fd) == 0 or isNil(cdata.udata)):
# Transport was closed earlier, exiting # Transport was closed earlier, exiting
return return
var transp = cast[UnixStreamTransport](cdata.udata) var transp = cast[UnixStreamTransport](cdata.udata)
@ -534,7 +539,7 @@ else:
let res = posix.send(fd, vector.buf, vector.buflen, MSG_NOSIGNAL) let res = posix.send(fd, vector.buf, vector.buflen, MSG_NOSIGNAL)
if res >= 0: if res >= 0:
if vector.buflen - res == 0: if vector.buflen - res == 0:
vector.writer.complete() vector.writer.complete(vector.buflen)
else: else:
vector.shiftVectorBuffer(res) vector.shiftVectorBuffer(res)
transp.queue.addFirst(vector) transp.queue.addFirst(vector)
@ -543,15 +548,15 @@ else:
if int(err) == EINTR: if int(err) == EINTR:
continue continue
else: else:
transp.setWriteError(err) vector.writer.fail(newException(TransportOsError,
vector.writer.complete() osErrorMsg(err)))
else: else:
let res = sendfile(int(fd), cast[int](vector.buflen), let res = sendfile(int(fd), cast[int](vector.buflen),
int(vector.offset), int(vector.offset),
cast[int](vector.buf)) cast[int](vector.buf))
if res >= 0: if res >= 0:
if cast[int](vector.buf) - res == 0: if cast[int](vector.buf) - res == 0:
vector.writer.complete() vector.writer.complete(cast[int](vector.buf))
else: else:
vector.shiftVectorFile(res) vector.shiftVectorFile(res)
transp.queue.addFirst(vector) transp.queue.addFirst(vector)
@ -560,16 +565,16 @@ else:
if int(err) == EINTR: if int(err) == EINTR:
continue continue
else: else:
transp.setWriteError(err) vector.writer.fail(newException(TransportOsError,
vector.writer.complete() osErrorMsg(err)))
break break
else: else:
transp.state.incl(WritePaused) transp.state.incl(WritePaused)
transp.fd.removeWriter() transp.fd.removeWriter()
proc readStreamLoop(udata: pointer) {.gcsafe.} = proc readStreamLoop(udata: pointer) {.gcsafe.} =
var cdata = cast[ptr CompletionData](udata) var cdata = cast[ptr CompletionData](udata)
if not isNil(cdata) and int(cdata.fd) == 0: if not isNil(cdata) and (int(cdata.fd) == 0 or isNil(cdata.udata)):
# Transport was closed earlier, exiting # Transport was closed earlier, exiting
return return
var transp = cast[UnixStreamTransport](cdata.udata) var transp = cast[UnixStreamTransport](cdata.udata)
@ -582,15 +587,13 @@ else:
if int(err) == EINTR: if int(err) == EINTR:
continue continue
elif int(err) in {ECONNRESET}: elif int(err) in {ECONNRESET}:
transp.state.incl(ReadEof) transp.state = transp.state + {ReadEof, ReadPaused}
transp.state.incl(ReadPaused)
cdata.fd.removeReader() cdata.fd.removeReader()
else: else:
transp.setReadError(err) transp.setReadError(err)
cdata.fd.removeReader() cdata.fd.removeReader()
elif res == 0: elif res == 0:
transp.state.incl(ReadEof) transp.state = transp.state + {ReadEof, ReadPaused}
transp.state.incl(ReadPaused)
cdata.fd.removeReader() cdata.fd.removeReader()
else: else:
transp.offset += res transp.offset += res
@ -796,49 +799,62 @@ proc createStreamServer*(host: TransportAddress,
result.resumeAccept() result.resumeAccept()
proc write*(transp: StreamTransport, pbytes: pointer, proc write*(transp: StreamTransport, pbytes: pointer,
nbytes: int): Future[int] {.async.} = nbytes: int): Future[int] =
## Write data from buffer ``pbytes`` with size ``nbytes`` using transport ## Write data from buffer ``pbytes`` with size ``nbytes`` using transport
## ``transp``. ## ``transp``.
checkClosed(transp) var retFuture = newFuture[int]()
var waitFuture = newFuture[void]("transport.write") transp.checkClosed(retFuture)
var vector = StreamVector(kind: DataBuffer, writer: waitFuture, var vector = StreamVector(kind: DataBuffer, writer: retFuture,
buf: pbytes, buflen: nbytes) buf: pbytes, buflen: nbytes)
transp.queue.addLast(vector) transp.queue.addLast(vector)
if WritePaused in transp.state: if WritePaused in transp.state:
transp.resumeWrite() transp.resumeWrite()
await vector.writer return retFuture
if WriteError in transp.state:
raise transp.getError()
result = nbytes
template write*(transp: StreamTransport, msg: var string): untyped = proc write*(transp: StreamTransport, msg: string): Future[int] =
## Write string ``msg`` using transport ``transp``. ## Write data from string ``msg`` using transport ``transp``.
write(transp, addr msg[0], len(msg)) var retFuture = FutureGCString[int]()
transp.checkClosed(retFuture)
shallowCopy(retFuture.gcholder, msg)
var vector = StreamVector(kind: DataBuffer,
writer: cast[Future[int]](retFuture),
buf: unsafeAddr msg[0], buflen: len(msg))
transp.queue.addLast(vector)
if WritePaused in transp.state:
transp.resumeWrite()
return retFuture
template write*(transp: StreamTransport, msg: var seq[byte]): untyped = proc write*[T](transp: StreamTransport, msg: seq[T]): Future[int] =
## Write seq[byte] ``msg`` using transport ``transp``. ## Write sequence ``msg`` using transport ``transp``.
write(transp, addr msg[0], len(msg)) var retFuture = FutureGCSeq[int, T]()
transp.checkClosed(retFuture)
shallowCopy(retFuture.gcholder, msg)
var vector = StreamVector(kind: DataBuffer,
writer: cast[Future[int]](retFuture),
buf: unsafeAddr msg[0],
buflen: len(msg) * sizeof(T))
transp.queue.addLast(vector)
if WritePaused in transp.state:
transp.resumeWrite()
return retFuture
proc writeFile*(transp: StreamTransport, handle: int, proc writeFile*(transp: StreamTransport, handle: int,
offset: uint = 0, offset: uint = 0,
size: int = 0): Future[void] {.async.} = size: int = 0): Future[int] =
## Write data from file descriptor ``handle`` to transport ``transp``. ## Write data from file descriptor ``handle`` to transport ``transp``.
## ##
## You can specify starting ``offset`` in opened file and number of bytes ## You can specify starting ``offset`` in opened file and number of bytes
## to transfer from file to transport via ``size``. ## to transfer from file to transport via ``size``.
if transp.kind != TransportKind.Socket: var retFuture = newFuture[int]("transport.writeFile")
raise newException(TransportError, "You can transmit files only to sockets")
checkClosed(transp) checkClosed(transp)
var waitFuture = newFuture[void]("transport.writeFile")
var vector = StreamVector(kind: DataFile, writer: waitFuture, var vector = StreamVector(kind: DataFile, writer: retFuture,
buf: cast[pointer](size), offset: offset, buf: cast[pointer](size), offset: offset,
buflen: handle) buflen: handle)
transp.queue.addLast(vector) transp.queue.addLast(vector)
if WritePaused in transp.state: if WritePaused in transp.state:
transp.resumeWrite() transp.resumeWrite()
await vector.writer return retFuture
if WriteError in transp.state:
raise transp.getError()
proc atEof*(transp: StreamTransport): bool {.inline.} = proc atEof*(transp: StreamTransport): bool {.inline.} =
## Returns ``true`` if ``transp`` is at EOF. ## Returns ``true`` if ``transp`` is at EOF.

View File

@ -132,7 +132,154 @@ proc client5(transp: DatagramTransport, pbytes: pointer, nbytes: int,
counterPtr[] = -1 counterPtr[] = -1
transp.close() transp.close()
proc test1(): Future[int] {.async.} = proc client6(transp: DatagramTransport, pbytes: pointer, nbytes: int,
raddr: TransportAddress, udata: pointer): Future[void] {.async.} =
if not isNil(pbytes):
var data = newString(nbytes + 1)
copyMem(addr data[0], pbytes, nbytes)
data.setLen(nbytes)
if data.startsWith("REQUEST"):
var numstr = data[7..^1]
var num = parseInt(numstr)
var ans = "ANSWER" & $num
await transp.sendTo(ans, raddr)
else:
var err = "ERROR"
await transp.sendTo(err, raddr)
else:
## Read operation failed with error
var counterPtr = cast[ptr int](udata)
counterPtr[] = -1
transp.close()
proc client7(transp: DatagramTransport, pbytes: pointer, nbytes: int,
raddr: TransportAddress, udata: pointer): Future[void] {.async.} =
if not isNil(pbytes):
var data = newString(nbytes + 1)
copyMem(addr data[0], pbytes, nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
else:
var ta = initTAddress("127.0.0.1:33336")
var req = "REQUEST" & $counterPtr[]
await transp.sendTo(req, ta)
else:
var counterPtr = cast[ptr int](udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](udata)
counterPtr[] = -1
transp.close()
proc client8(transp: DatagramTransport, pbytes: pointer, nbytes: int,
raddr: TransportAddress, udata: pointer): Future[void] {.async.} =
if not isNil(pbytes):
var data = newString(nbytes + 1)
copyMem(addr data[0], pbytes, nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
else:
var req = "REQUEST" & $counterPtr[]
await transp.send(req)
else:
var counterPtr = cast[ptr int](udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](udata)
counterPtr[] = -1
transp.close()
proc client9(transp: DatagramTransport, pbytes: pointer, nbytes: int,
raddr: TransportAddress, udata: pointer): Future[void] {.async.} =
if not isNil(pbytes):
var data = newString(nbytes + 1)
copyMem(addr data[0], pbytes, nbytes)
data.setLen(nbytes)
if data.startsWith("REQUEST"):
var numstr = data[7..^1]
var num = parseInt(numstr)
var ans = "ANSWER" & $num
var ansseq = newSeq[byte](len(ans))
copyMem(addr ansseq[0], addr ans[0], len(ans))
await transp.sendTo(ansseq, raddr)
else:
var err = "ERROR"
var errseq = newSeq[byte](len(err))
copyMem(addr errseq[0], addr err[0], len(err))
await transp.sendTo(errseq, raddr)
else:
## Read operation failed with error
var counterPtr = cast[ptr int](udata)
counterPtr[] = -1
transp.close()
proc client10(transp: DatagramTransport, pbytes: pointer, nbytes: int,
raddr: TransportAddress, udata: pointer): Future[void] {.async.} =
if not isNil(pbytes):
var data = newString(nbytes + 1)
copyMem(addr data[0], pbytes, nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
else:
var ta = initTAddress("127.0.0.1:33336")
var req = "REQUEST" & $counterPtr[]
var reqseq = newSeq[byte](len(req))
copyMem(addr reqseq[0], addr req[0], len(req))
await transp.sendTo(reqseq, ta)
else:
var counterPtr = cast[ptr int](udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](udata)
counterPtr[] = -1
transp.close()
proc client11(transp: DatagramTransport, pbytes: pointer, nbytes: int,
raddr: TransportAddress, udata: pointer): Future[void] {.async.} =
if not isNil(pbytes):
var data = newString(nbytes + 1)
copyMem(addr data[0], pbytes, nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
else:
var req = "REQUEST" & $counterPtr[]
var reqseq = newSeq[byte](len(req))
copyMem(addr reqseq[0], addr req[0], len(req))
await transp.send(reqseq)
else:
var counterPtr = cast[ptr int](udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](udata)
counterPtr[] = -1
transp.close()
proc testPointerSendTo(): Future[int] {.async.} =
## sendTo(pointer) test
var ta = initTAddress("127.0.0.1:33336") var ta = initTAddress("127.0.0.1:33336")
var counter = 0 var counter = 0
var dgram1 = newDatagramTransport(client1, udata = addr counter, local = ta) var dgram1 = newDatagramTransport(client1, udata = addr counter, local = ta)
@ -144,7 +291,8 @@ proc test1(): Future[int] {.async.} =
dgram2.close() dgram2.close()
result = counter result = counter
proc test2(): Future[int] {.async.} = proc testPointerSend(): Future[int] {.async.} =
## send(pointer) test
var ta = initTAddress("127.0.0.1:33337") var ta = initTAddress("127.0.0.1:33337")
var counter = 0 var counter = 0
var dgram1 = newDatagramTransport(client1, udata = addr counter, local = ta) var dgram1 = newDatagramTransport(client1, udata = addr counter, local = ta)
@ -156,6 +304,64 @@ proc test2(): Future[int] {.async.} =
dgram2.close() dgram2.close()
result = counter result = counter
proc testStringSendTo(): Future[int] {.async.} =
## sendTo(string) test
var ta = initTAddress("127.0.0.1:33336")
var counter = 0
var dgram1 = newDatagramTransport(client6, udata = addr counter, local = ta)
var dgram2 = newDatagramTransport(client7, udata = addr counter)
var data = "REQUEST0"
await dgram2.sendTo(data, ta)
await dgram2.join()
dgram1.close()
dgram2.close()
result = counter
proc testStringSend(): Future[int] {.async.} =
## send(string) test
var ta = initTAddress("127.0.0.1:33337")
var counter = 0
var dgram1 = newDatagramTransport(client6, udata = addr counter, local = ta)
var dgram2 = newDatagramTransport(client8, udata = addr counter, remote = ta)
var data = "REQUEST0"
await dgram2.send(data)
await dgram2.join()
dgram1.close()
dgram2.close()
result = counter
proc testSeqSendTo(): Future[int] {.async.} =
## sendTo(string) test
var ta = initTAddress("127.0.0.1:33336")
var counter = 0
var dgram1 = newDatagramTransport(client9, udata = addr counter, local = ta)
var dgram2 = newDatagramTransport(client10, udata = addr counter)
var data = "REQUEST0"
var dataseq = newSeq[byte](len(data))
copyMem(addr dataseq[0], addr data[0], len(data))
await dgram2.sendTo(dataseq, ta)
await dgram2.join()
dgram1.close()
dgram2.close()
result = counter
proc testSeqSend(): Future[int] {.async.} =
## send(string) test
var ta = initTAddress("127.0.0.1:33337")
var counter = 0
var dgram1 = newDatagramTransport(client9, udata = addr counter, local = ta)
var dgram2 = newDatagramTransport(client11, udata = addr counter, remote = ta)
var data = "REQUEST0"
var dataseq = newSeq[byte](len(data))
copyMem(addr dataseq[0], addr data[0], len(data))
await dgram2.send(data)
await dgram2.join()
dgram1.close()
dgram2.close()
result = counter
#
proc waitAll(futs: seq[Future[void]]): Future[void] = proc waitAll(futs: seq[Future[void]]): Future[void] =
var counter = len(futs) var counter = len(futs)
var retFuture = newFuture[void]("waitAll") var retFuture = newFuture[void]("waitAll")
@ -277,22 +483,34 @@ proc test4(): Future[int] {.async.} =
when isMainModule: when isMainModule:
const const
m1 = "Unbounded test (" & $TestsCount & " messages)" m1 = "sendTo(pointer) test (" & $TestsCount & " messages)"
m2 = "Bounded test (" & $TestsCount & " messages)" m2 = "send(pointer) test (" & $TestsCount & " messages)"
m3 = "Unbounded multiple clients with messages (" & $ClientsCount & m3 = "sendTo(string) test (" & $TestsCount & " messages)"
m4 = "send(string) test (" & $TestsCount & " messages)"
m5 = "sendTo(seq[byte]) test (" & $TestsCount & " messages)"
m6 = "send(seq[byte]) test (" & $TestsCount & " messages)"
m7 = "Unbounded multiple clients with messages (" & $ClientsCount &
" clients x " & $MessagesCount & " messages)" " clients x " & $MessagesCount & " messages)"
m4 = "Bounded multiple clients with messages (" & $ClientsCount & m8 = "Bounded multiple clients with messages (" & $ClientsCount &
" clients x " & $MessagesCount & " messages)" " clients x " & $MessagesCount & " messages)"
m5 = "DatagramServer multiple clients with messages (" & $ClientsCount & m9 = "DatagramServer multiple clients with messages (" & $ClientsCount &
" clients x " & $MessagesCount & " messages)" " clients x " & $MessagesCount & " messages)"
suite "Datagram Transport test suite": suite "Datagram Transport test suite":
test m1: test m1:
check waitFor(test1()) == TestsCount check waitFor(testPointerSendTo()) == TestsCount
test m2: test m2:
check waitFor(test2()) == TestsCount check waitFor(testPointerSend()) == TestsCount
test m3: test m3:
check waitFor(test3(false)) == ClientsCount * MessagesCount check waitFor(testStringSendTo()) == TestsCount
test m4: test m4:
check waitFor(testStringSend()) == TestsCount
test m5:
check waitFor(testSeqSendTo()) == TestsCount
test m6:
check waitFor(testSeqSend()) == TestsCount
test m7:
check waitFor(test3(false)) == ClientsCount * MessagesCount
test m8:
check waitFor(test3(true)) == ClientsCount * MessagesCount check waitFor(test3(true)) == ClientsCount * MessagesCount
# test m5: test m9:
# check waitFor(test4()) == ClientsCount * MessagesCount check waitFor(test4()) == ClientsCount * MessagesCount

View File

@ -253,7 +253,8 @@ proc swarmWorker4(address: TransportAddress): Future[int] {.async.} =
doAssert(res == len(name)) doAssert(res == len(name))
res = await transp.write(cast[pointer](addr ssize[0]), len(ssize)) res = await transp.write(cast[pointer](addr ssize[0]), len(ssize))
doAssert(res == len(ssize)) doAssert(res == len(ssize))
await transp.writeFile(handle, 0'u, size) var checksize = await transp.writeFile(handle, 0'u, size)
doAssert(checksize == size)
close(fhandle) close(fhandle)
var ans = await transp.readLine() var ans = await transp.readLine()
doAssert(ans == "OK") doAssert(ans == "OK")