readMsg branch by @arnetheduck with some changes. (#83)
Co-authored-by: Eugene Kabanov <ka@hardcore.kiev.ua>
This commit is contained in:
parent
56fbdff096
commit
d8f8e3d9fc
|
@ -98,6 +98,8 @@ type
|
|||
code*: OSErrorCode
|
||||
TransportNoSupport* = object of TransportError
|
||||
## Transport's capability not supported exception
|
||||
TransportUseClosedError* = object of TransportError
|
||||
## Usage after transport close exception
|
||||
|
||||
TransportState* = enum
|
||||
## Transport's state
|
||||
|
|
|
@ -60,6 +60,10 @@ type
|
|||
opened*: int64
|
||||
closed*: int64
|
||||
|
||||
ReadMessagePredicate* = proc (data: openarray[byte]): tuple[consumed: int,
|
||||
done: bool] {.
|
||||
gcsafe, raises: [].}
|
||||
|
||||
const
|
||||
StreamTransportTrackerName = "stream.transport"
|
||||
StreamServerTrackerName = "stream.server"
|
||||
|
@ -163,6 +167,11 @@ proc localAddress*(server: StreamServer): TransportAddress =
|
|||
## Returns ``server`` bound local socket address.
|
||||
result = server.local
|
||||
|
||||
template completeReader(stream: StreamTransport) =
|
||||
if not(isNil(transp.reader)) and not(transp.reader.finished()):
|
||||
transp.reader.complete()
|
||||
transp.reader = nil
|
||||
|
||||
template setReadError(t, e: untyped) =
|
||||
(t).state.incl(ReadError)
|
||||
(t).error = getTransportOsError(e)
|
||||
|
@ -173,8 +182,9 @@ template checkPending(t: untyped) =
|
|||
|
||||
template shiftBuffer(t, c: untyped) =
|
||||
if (t).offset > c:
|
||||
moveMem(addr((t).buffer[0]), addr((t).buffer[(c)]), (t).offset - (c))
|
||||
(t).offset = (t).offset - (c)
|
||||
if c > 0:
|
||||
moveMem(addr((t).buffer[0]), addr((t).buffer[(c)]), (t).offset - (c))
|
||||
(t).offset = (t).offset - (c)
|
||||
else:
|
||||
(t).offset = 0
|
||||
|
||||
|
@ -525,9 +535,7 @@ when defined(windows):
|
|||
else:
|
||||
transp.setReadError(err)
|
||||
|
||||
if not(isNil(transp.reader)) and not(transp.reader.finished()):
|
||||
transp.reader.complete()
|
||||
transp.reader = nil
|
||||
transp.completeReader()
|
||||
|
||||
if ReadClosed in transp.state:
|
||||
# Stop tracking transport
|
||||
|
@ -563,16 +571,12 @@ when defined(windows):
|
|||
elif int32(err) in {WSAECONNRESET, WSAENETRESET, WSAECONNABORTED}:
|
||||
transp.state.excl(ReadPending)
|
||||
transp.state.incl({ReadEof, ReadPaused})
|
||||
if not(isNil(transp.reader)) and not(transp.reader.finished()):
|
||||
transp.reader.complete()
|
||||
transp.reader = nil
|
||||
transp.completeReader()
|
||||
elif int32(err) != ERROR_IO_PENDING:
|
||||
transp.state.excl(ReadPending)
|
||||
transp.state.incl(ReadPaused)
|
||||
transp.setReadError(err)
|
||||
if not(isNil(transp.reader)) and not(transp.reader.finished()):
|
||||
transp.reader.complete()
|
||||
transp.reader = nil
|
||||
transp.completeReader()
|
||||
elif transp.kind == TransportKind.Pipe:
|
||||
let pipe = Handle(transp.rovl.data.fd)
|
||||
transp.roffset = transp.offset
|
||||
|
@ -589,21 +593,15 @@ when defined(windows):
|
|||
elif int32(err) in {ERROR_BROKEN_PIPE, ERROR_PIPE_NOT_CONNECTED}:
|
||||
transp.state.excl(ReadPending)
|
||||
transp.state.incl({ReadEof, ReadPaused})
|
||||
if not(isNil(transp.reader)) and not(transp.reader.finished()):
|
||||
transp.reader.complete()
|
||||
transp.reader = nil
|
||||
transp.completeReader()
|
||||
elif int32(err) != ERROR_IO_PENDING:
|
||||
transp.state.excl(ReadPending)
|
||||
transp.state.incl(ReadPaused)
|
||||
transp.setReadError(err)
|
||||
if not(isNil(transp.reader)) and not(transp.reader.finished()):
|
||||
transp.reader.complete()
|
||||
transp.reader = nil
|
||||
transp.completeReader()
|
||||
else:
|
||||
transp.state.incl(ReadPaused)
|
||||
if not(isNil(transp.reader)) and not(transp.reader.finished()):
|
||||
transp.reader.complete()
|
||||
transp.reader = nil
|
||||
transp.completeReader()
|
||||
# Transport close happens in callback, and we not started new
|
||||
# WSARecvFrom session.
|
||||
if ReadClosed in transp.state:
|
||||
|
@ -947,12 +945,14 @@ when defined(windows):
|
|||
GC_unref(server)
|
||||
|
||||
proc resumeRead(transp: StreamTransport) {.inline.} =
|
||||
transp.state.excl(ReadPaused)
|
||||
readStreamLoop(cast[pointer](addr transp.rovl))
|
||||
if ReadPaused in transp.state:
|
||||
transp.state.excl(ReadPaused)
|
||||
readStreamLoop(cast[pointer](addr transp.rovl))
|
||||
|
||||
proc resumeWrite(transp: StreamTransport) {.inline.} =
|
||||
transp.state.excl(WritePaused)
|
||||
writeStreamLoop(cast[pointer](addr transp.wovl))
|
||||
if WritePaused in transp.state:
|
||||
transp.state.excl(WritePaused)
|
||||
writeStreamLoop(cast[pointer](addr transp.wovl))
|
||||
|
||||
proc pauseAccept(server: StreamServer) {.inline.} =
|
||||
if server.apending:
|
||||
|
@ -1124,9 +1124,7 @@ else:
|
|||
|
||||
if ReadClosed in transp.state:
|
||||
transp.state.incl({ReadPaused})
|
||||
if not(isNil(transp.reader)) and not(transp.reader.finished()):
|
||||
transp.reader.complete()
|
||||
transp.reader = nil
|
||||
transp.completeReader()
|
||||
else:
|
||||
if transp.kind == TransportKind.Socket:
|
||||
while true:
|
||||
|
@ -1151,9 +1149,7 @@ else:
|
|||
if transp.offset == len(transp.buffer):
|
||||
transp.state.incl(ReadPaused)
|
||||
cdata.fd.removeReader()
|
||||
if not(isNil(transp.reader)) and not(transp.reader.finished()):
|
||||
transp.reader.complete()
|
||||
transp.reader = nil
|
||||
transp.completeReader()
|
||||
break
|
||||
elif transp.kind == TransportKind.Pipe:
|
||||
while true:
|
||||
|
@ -1175,9 +1171,7 @@ else:
|
|||
if transp.offset == len(transp.buffer):
|
||||
transp.state.incl(ReadPaused)
|
||||
cdata.fd.removeReader()
|
||||
if not(isNil(transp.reader)) and not(transp.reader.finished()):
|
||||
transp.reader.complete()
|
||||
transp.reader = nil
|
||||
transp.completeReader()
|
||||
break
|
||||
|
||||
proc newStreamSocketTransport(sock: AsyncFD, bufsize: int,
|
||||
|
@ -1321,12 +1315,14 @@ else:
|
|||
removeReader(server.sock)
|
||||
|
||||
proc resumeRead(transp: StreamTransport) {.inline.} =
|
||||
transp.state.excl(ReadPaused)
|
||||
addReader(transp.fd, readStreamLoop, cast[pointer](transp))
|
||||
if ReadPaused in transp.state:
|
||||
transp.state.excl(ReadPaused)
|
||||
addReader(transp.fd, readStreamLoop, cast[pointer](transp))
|
||||
|
||||
proc resumeWrite(transp: StreamTransport) {.inline.} =
|
||||
transp.state.excl(WritePaused)
|
||||
addWriter(transp.fd, writeStreamLoop, cast[pointer](transp))
|
||||
if WritePaused in transp.state:
|
||||
transp.state.excl(WritePaused)
|
||||
addWriter(transp.fd, writeStreamLoop, cast[pointer](transp))
|
||||
|
||||
proc start*(server: StreamServer) =
|
||||
## Starts ``server``.
|
||||
|
@ -1604,8 +1600,7 @@ proc write*(transp: StreamTransport, pbytes: pointer,
|
|||
var vector = StreamVector(kind: DataBuffer, writer: retFuture,
|
||||
buf: pbytes, buflen: nbytes, size: nbytes)
|
||||
transp.queue.addLast(vector)
|
||||
if WritePaused in transp.state:
|
||||
transp.resumeWrite()
|
||||
transp.resumeWrite()
|
||||
return retFuture
|
||||
|
||||
proc write*(transp: StreamTransport, msg: string, msglen = -1): Future[int] =
|
||||
|
@ -1623,8 +1618,7 @@ proc write*(transp: StreamTransport, msg: string, msglen = -1): Future[int] =
|
|||
buf: addr retFuture.gcholder[0], buflen: length,
|
||||
size: length)
|
||||
transp.queue.addLast(vector)
|
||||
if WritePaused in transp.state:
|
||||
transp.resumeWrite()
|
||||
transp.resumeWrite()
|
||||
return retFuture
|
||||
|
||||
proc write*[T](transp: StreamTransport, msg: seq[T], msglen = -1): Future[int] =
|
||||
|
@ -1642,8 +1636,7 @@ proc write*[T](transp: StreamTransport, msg: seq[T], msglen = -1): Future[int] =
|
|||
buf: addr retFuture.gcholder[0],
|
||||
buflen: length, size: length)
|
||||
transp.queue.addLast(vector)
|
||||
if WritePaused in transp.state:
|
||||
transp.resumeWrite()
|
||||
transp.resumeWrite()
|
||||
return retFuture
|
||||
|
||||
proc writeFile*(transp: StreamTransport, handle: int,
|
||||
|
@ -1662,8 +1655,7 @@ proc writeFile*(transp: StreamTransport, handle: int,
|
|||
buf: cast[pointer](size), offset: offset,
|
||||
buflen: handle)
|
||||
transp.queue.addLast(vector)
|
||||
if WritePaused in transp.state:
|
||||
transp.resumeWrite()
|
||||
transp.resumeWrite()
|
||||
return retFuture
|
||||
|
||||
proc atEof*(transp: StreamTransport): bool {.inline.} =
|
||||
|
@ -1671,41 +1663,54 @@ proc atEof*(transp: StreamTransport): bool {.inline.} =
|
|||
result = (transp.offset == 0) and (ReadEof in transp.state) and
|
||||
(ReadPaused in transp.state)
|
||||
|
||||
template prepareReader(transp: StreamTransport,
|
||||
name: static string): Future[void] =
|
||||
checkPending(transp)
|
||||
var fut = newFuture[void](name)
|
||||
transp.reader = fut
|
||||
resumeRead(transp)
|
||||
fut
|
||||
|
||||
template readLoop(name, body: untyped): untyped =
|
||||
# Read data until a predicate is satisfied - the body should return a tuple
|
||||
# signalling how many bytes have been processed and whether we're done reading
|
||||
checkClosed(transp)
|
||||
checkPending(transp)
|
||||
while true:
|
||||
if ReadClosed in transp.state:
|
||||
raise newException(TransportUseClosedError,
|
||||
"Attempt to read data from closed stream")
|
||||
if transp.offset == 0:
|
||||
# We going to raise an error, only if transport buffer is empty.
|
||||
if ReadError in transp.state:
|
||||
raise transp.getError()
|
||||
|
||||
let (consumed, done) = body
|
||||
transp.shiftBuffer(consumed)
|
||||
if done:
|
||||
break
|
||||
else:
|
||||
await transp.prepareReader(name)
|
||||
|
||||
proc readExactly*(transp: StreamTransport, pbytes: pointer,
|
||||
nbytes: int) {.async.} =
|
||||
## Read exactly ``nbytes`` bytes from transport ``transp`` and store it to
|
||||
## ``pbytes``.
|
||||
##
|
||||
## If EOF is received and ``nbytes`` is not yet readed, the procedure
|
||||
## will raise ``TransportIncompleteError``.
|
||||
checkClosed(transp)
|
||||
checkPending(transp)
|
||||
|
||||
## will raise ``TransportIncompleteError``, potentially with some bytes
|
||||
## already written.
|
||||
var index = 0
|
||||
while true:
|
||||
var pbuffer = cast[ptr UncheckedArray[byte]](pbytes)
|
||||
readLoop("stream.transport.readExactly"):
|
||||
if transp.offset == 0:
|
||||
if (ReadError in transp.state):
|
||||
raise transp.getError()
|
||||
if (ReadClosed in transp.state) or transp.atEof():
|
||||
if transp.atEof():
|
||||
raise newException(TransportIncompleteError, "Data incomplete!")
|
||||
|
||||
if transp.offset >= (nbytes - index):
|
||||
copyMem(cast[pointer](cast[uint](pbytes) + uint(index)),
|
||||
addr(transp.buffer[0]), nbytes - index)
|
||||
transp.shiftBuffer(nbytes - index)
|
||||
break
|
||||
else:
|
||||
if transp.offset != 0:
|
||||
copyMem(cast[pointer](cast[uint](pbytes) + uint(index)),
|
||||
addr(transp.buffer[0]), transp.offset)
|
||||
index += transp.offset
|
||||
|
||||
var fut = newFuture[void]("stream.transport.readExactly")
|
||||
transp.reader = fut
|
||||
transp.offset = 0
|
||||
if ReadPaused in transp.state:
|
||||
transp.resumeRead()
|
||||
await fut
|
||||
let count = min(nbytes - index, transp.offset)
|
||||
if count > 0:
|
||||
copyMem(addr pbuffer[index], addr(transp.buffer[0]), count)
|
||||
index += count
|
||||
(consumed: count, done: index == nbytes)
|
||||
|
||||
proc readOnce*(transp: StreamTransport, pbytes: pointer,
|
||||
nbytes: int): Future[int] {.async.} =
|
||||
|
@ -1713,31 +1718,15 @@ proc readOnce*(transp: StreamTransport, pbytes: pointer,
|
|||
##
|
||||
## If internal buffer is not empty, ``nbytes`` bytes will be transferred from
|
||||
## internal buffer, otherwise it will wait until some bytes will be received.
|
||||
checkClosed(transp)
|
||||
checkPending(transp)
|
||||
|
||||
while true:
|
||||
var count = 0
|
||||
readLoop("stream.transport.readOnce"):
|
||||
if transp.offset == 0:
|
||||
if (ReadError in transp.state):
|
||||
raise transp.getError()
|
||||
if (ReadClosed in transp.state) or transp.atEof():
|
||||
result = 0
|
||||
break
|
||||
var fut = newFuture[void]("stream.transport.readOnce")
|
||||
transp.reader = fut
|
||||
if ReadPaused in transp.state:
|
||||
transp.resumeRead()
|
||||
await fut
|
||||
(0, transp.atEof())
|
||||
else:
|
||||
if transp.offset > nbytes:
|
||||
copyMem(pbytes, addr(transp.buffer[0]), nbytes)
|
||||
transp.shiftBuffer(nbytes)
|
||||
result = nbytes
|
||||
else:
|
||||
copyMem(pbytes, addr(transp.buffer[0]), transp.offset)
|
||||
result = transp.offset
|
||||
transp.offset = 0
|
||||
break
|
||||
count = min(transp.offset, nbytes)
|
||||
copyMem(pbytes, addr(transp.buffer[0]), count)
|
||||
(count, true)
|
||||
return count
|
||||
|
||||
proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int,
|
||||
sep: seq[byte]): Future[int] {.async.} =
|
||||
|
@ -1753,47 +1742,36 @@ proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int,
|
|||
## will raise ``TransportLimitError``.
|
||||
##
|
||||
## Procedure returns actual number of bytes read.
|
||||
checkClosed(transp)
|
||||
checkPending(transp)
|
||||
|
||||
var dest = cast[ptr UncheckedArray[byte]](pbytes)
|
||||
var pbuffer = cast[ptr UncheckedArray[byte]](pbytes)
|
||||
var state = 0
|
||||
var k = 0
|
||||
var index = 0
|
||||
|
||||
while true:
|
||||
if ReadError in transp.state:
|
||||
raise transp.getError()
|
||||
if (ReadClosed in transp.state) or transp.atEof():
|
||||
readLoop("stream.transport.readUntil"):
|
||||
if transp.atEof():
|
||||
raise newException(TransportIncompleteError, "Data incomplete!")
|
||||
|
||||
index = 0
|
||||
var index = 0
|
||||
|
||||
while index < transp.offset:
|
||||
let ch = transp.buffer[index]
|
||||
if sep[state] == ch:
|
||||
inc(state)
|
||||
else:
|
||||
state = 0
|
||||
if k < nbytes:
|
||||
dest[k] = ch
|
||||
inc(k)
|
||||
else:
|
||||
if k >= nbytes:
|
||||
raise newException(TransportLimitError, "Limit reached!")
|
||||
if state == len(sep):
|
||||
break
|
||||
|
||||
let ch = transp.buffer[index]
|
||||
inc(index)
|
||||
|
||||
if state == len(sep):
|
||||
transp.shiftBuffer(index + 1)
|
||||
result = k
|
||||
break
|
||||
else:
|
||||
transp.shiftBuffer(transp.offset)
|
||||
var fut = newFuture[void]("stream.transport.readUntil")
|
||||
transp.reader = fut
|
||||
if ReadPaused in transp.state:
|
||||
transp.resumeRead()
|
||||
await fut
|
||||
pbuffer[k] = ch
|
||||
inc(k)
|
||||
|
||||
if sep[state] == ch:
|
||||
inc(state)
|
||||
if state == len(sep):
|
||||
break
|
||||
else:
|
||||
state = 0
|
||||
|
||||
(index, state == len(sep))
|
||||
|
||||
return k
|
||||
|
||||
proc readLine*(transp: StreamTransport, limit = 0,
|
||||
sep = "\r\n"): Future[string] {.async.} =
|
||||
|
@ -1807,128 +1785,113 @@ proc readLine*(transp: StreamTransport, limit = 0,
|
|||
## empty string.
|
||||
##
|
||||
## If ``limit`` more then 0, then read is limited to ``limit`` bytes.
|
||||
checkClosed(transp)
|
||||
checkPending(transp)
|
||||
|
||||
result = ""
|
||||
var lim = if limit <= 0: -1 else: limit
|
||||
let lim = if limit <= 0: -1 else: limit
|
||||
var state = 0
|
||||
var index = 0
|
||||
|
||||
while true:
|
||||
if (ReadError in transp.state):
|
||||
raise transp.getError()
|
||||
if (ReadClosed in transp.state) or transp.atEof():
|
||||
break
|
||||
|
||||
index = 0
|
||||
while index < transp.offset:
|
||||
let ch = char(transp.buffer[index])
|
||||
if sep[state] == ch:
|
||||
inc(state)
|
||||
if state == len(sep):
|
||||
transp.shiftBuffer(index + 1)
|
||||
break
|
||||
else:
|
||||
state = 0
|
||||
result.add(ch)
|
||||
if len(result) == lim:
|
||||
transp.shiftBuffer(index + 1)
|
||||
break
|
||||
inc(index)
|
||||
|
||||
if (state == len(sep)) or (lim == len(result)):
|
||||
break
|
||||
readLoop("stream.transport.readLine"):
|
||||
if transp.atEof():
|
||||
(0, true)
|
||||
else:
|
||||
transp.shiftBuffer(transp.offset)
|
||||
var fut = newFuture[void]("stream.transport.readLine")
|
||||
transp.reader = fut
|
||||
if ReadPaused in transp.state:
|
||||
transp.resumeRead()
|
||||
await fut
|
||||
var index = 0
|
||||
while index < transp.offset:
|
||||
let ch = char(transp.buffer[index])
|
||||
index += 1
|
||||
|
||||
proc read*(transp: StreamTransport, n = -1): Future[seq[byte]] {.async.} =
|
||||
if sep[state] == ch:
|
||||
inc(state)
|
||||
if state == len(sep):
|
||||
break
|
||||
else:
|
||||
if state != 0:
|
||||
if limit > 0:
|
||||
let missing = min(state, lim - len(result) - 1)
|
||||
result.add(sep[0 ..< missing])
|
||||
else:
|
||||
result.add(sep[0 ..< state])
|
||||
state = 0
|
||||
|
||||
result.add(ch)
|
||||
if len(result) == lim:
|
||||
break
|
||||
|
||||
(index, (state == len(sep)) or (lim == len(result)))
|
||||
|
||||
proc read*(transp: StreamTransport): Future[seq[byte]] {.async.} =
|
||||
## Read all bytes from transport ``transp``.
|
||||
##
|
||||
## This procedure allocates buffer seq[byte] and return it as result.
|
||||
readLoop("stream.transport.read"):
|
||||
if transp.atEof():
|
||||
(0, true)
|
||||
else:
|
||||
result.add(transp.buffer.toOpenArray(0, transp.offset - 1))
|
||||
(transp.offset, false)
|
||||
|
||||
proc read*(transp: StreamTransport, n: int): Future[seq[byte]] {.async.} =
|
||||
## Read all bytes (n <= 0) or exactly `n` bytes from transport ``transp``.
|
||||
##
|
||||
## This procedure allocates buffer seq[byte] and return it as result.
|
||||
checkClosed(transp)
|
||||
checkPending(transp)
|
||||
var res = newSeq[byte]()
|
||||
while true:
|
||||
if (ReadError in transp.state):
|
||||
raise transp.getError()
|
||||
if (ReadClosed in transp.state) or transp.atEof():
|
||||
result = res
|
||||
break
|
||||
|
||||
if transp.offset > 0:
|
||||
let s = len(res)
|
||||
let o = s + transp.offset
|
||||
if n <= 0:
|
||||
# grabbing all incoming data, until EOF
|
||||
res.setLen(o)
|
||||
copyMem(cast[pointer](addr res[s]), addr(transp.buffer[0]),
|
||||
transp.offset)
|
||||
transp.offset = 0
|
||||
if n <= 0:
|
||||
return await transp.read()
|
||||
else:
|
||||
readLoop("stream.transport.read"):
|
||||
if transp.atEof():
|
||||
(0, true)
|
||||
else:
|
||||
let left = n - s
|
||||
if transp.offset >= left:
|
||||
# size of buffer data is more then we need, grabbing only part
|
||||
res.setLen(n)
|
||||
copyMem(cast[pointer](addr res[s]), addr(transp.buffer[0]),
|
||||
left)
|
||||
transp.shiftBuffer(left)
|
||||
result = res
|
||||
break
|
||||
else:
|
||||
# there not enough data in buffer, grabbing all
|
||||
res.setLen(o)
|
||||
copyMem(cast[pointer](addr res[s]), addr(transp.buffer[0]),
|
||||
transp.offset)
|
||||
transp.offset = 0
|
||||
let count = min(transp.offset, n - len(result))
|
||||
result.add(transp.buffer.toOpenArray(0, count - 1))
|
||||
(count, len(result) == n)
|
||||
|
||||
var fut = newFuture[void]("stream.transport.read")
|
||||
transp.reader = fut
|
||||
if ReadPaused in transp.state:
|
||||
transp.resumeRead()
|
||||
await fut
|
||||
|
||||
proc consume*(transp: StreamTransport, n = -1): Future[int] {.async.} =
|
||||
## Consume all bytes (n == -1) or ``n`` bytes from transport ``transp``.
|
||||
proc consume*(transp: StreamTransport): Future[int] {.async.} =
|
||||
## Consume all bytes from transport ``transp`` and discard it.
|
||||
##
|
||||
## Return number of bytes actually consumed
|
||||
checkClosed(transp)
|
||||
checkPending(transp)
|
||||
## Return number of bytes actually consumed and discarded.
|
||||
readLoop("stream.transport.consume"):
|
||||
if transp.atEof():
|
||||
(0, true)
|
||||
else:
|
||||
result += transp.offset
|
||||
(transp.offset, false)
|
||||
|
||||
result = 0
|
||||
while true:
|
||||
if (ReadError in transp.state):
|
||||
raise transp.getError()
|
||||
if ReadClosed in transp.state or transp.atEof():
|
||||
break
|
||||
|
||||
if transp.offset > 0:
|
||||
if n <= 0:
|
||||
# consume all incoming data, until EOF
|
||||
result += transp.offset
|
||||
transp.offset = 0
|
||||
proc consume*(transp: StreamTransport, n: int): Future[int] {.async.} =
|
||||
## Consume all bytes (n <= 0) or ``n`` bytes from transport ``transp`` and
|
||||
## discard it.
|
||||
##
|
||||
## Return number of bytes actually consumed and discarded.
|
||||
if n <= 0:
|
||||
return await transp.consume()
|
||||
else:
|
||||
readLoop("stream.transport.consume"):
|
||||
if transp.atEof():
|
||||
(0, true)
|
||||
else:
|
||||
let left = n - result
|
||||
if transp.offset >= left:
|
||||
# size of buffer data is more then we need, consume only part
|
||||
result += left
|
||||
transp.shiftBuffer(left)
|
||||
break
|
||||
else:
|
||||
# there not enough data in buffer, consume all
|
||||
result += transp.offset
|
||||
transp.offset = 0
|
||||
let count = min(transp.offset, n - result)
|
||||
result += count
|
||||
(count, result == n)
|
||||
|
||||
var fut = newFuture[void]("stream.transport.consume")
|
||||
transp.reader = fut
|
||||
if ReadPaused in transp.state:
|
||||
transp.resumeRead()
|
||||
await fut
|
||||
proc readMessage*(transp: StreamTransport,
|
||||
predicate: ReadMessagePredicate) {.async.} =
|
||||
## Read all bytes from transport ``transp`` until ``predicate`` callback
|
||||
## will not be satisfied.
|
||||
##
|
||||
## ``predicate`` callback should return tuple ``(consumed, result)``, where
|
||||
## ``consumed`` is the number of bytes processed and ``result`` is a
|
||||
## completion flag (``true`` if readMessage() should stop reading data,
|
||||
## or ``false`` if readMessage() should continue to read data from transport).
|
||||
##
|
||||
## ``predicate`` callback must copy all the data from ``data`` array and
|
||||
## return number of bytes it is going to consume.
|
||||
## ``predicate`` callback will receive (zero-length) openarray, if transport
|
||||
## is at EOF.
|
||||
readLoop("stream.transport.readMessage"):
|
||||
if transp.offset == 0:
|
||||
if transp.atEof():
|
||||
predicate([])
|
||||
else:
|
||||
# Case, when transport's buffer is not yet filled with data.
|
||||
(0, false)
|
||||
else:
|
||||
predicate(transp.buffer.toOpenArray(0, transp.offset - 1))
|
||||
|
||||
proc join*(transp: StreamTransport): Future[void] =
|
||||
## Wait until ``transp`` will not be closed.
|
||||
|
|
|
@ -633,11 +633,14 @@ suite "Stream Transport test suite":
|
|||
server.start()
|
||||
var transp = await connect(address)
|
||||
var fut = swarmWorker(transp)
|
||||
transp.close()
|
||||
# We perfrom shutdown(SHUT_RD/SD_RECEIVE) for the socket, in such way its
|
||||
# possible to emulate socket's EOF.
|
||||
discard shutdown(SocketHandle(transp.fd), 0)
|
||||
await fut
|
||||
server.stop()
|
||||
server.close()
|
||||
await server.join()
|
||||
transp.close()
|
||||
await transp.join()
|
||||
result = subres
|
||||
|
||||
|
@ -794,6 +797,266 @@ suite "Stream Transport test suite":
|
|||
await server.join()
|
||||
result = flag
|
||||
|
||||
proc testReadLine(address: TransportAddress): Future[bool] {.async.} =
|
||||
proc serveClient(server: StreamServer, transp: StreamTransport) {.async.} =
|
||||
discard await transp.write("DATA\r\r\r\r\r\n")
|
||||
transp.close()
|
||||
await transp.join()
|
||||
|
||||
var server = createStreamServer(address, serveClient, {ReuseAddr})
|
||||
server.start()
|
||||
try:
|
||||
var r1, r2, r3, r4, r5: string
|
||||
var t1 = await connect(address)
|
||||
try:
|
||||
r1 = await t1.readLine(4)
|
||||
finally:
|
||||
await t1.closeWait()
|
||||
|
||||
var t2 = await connect(address)
|
||||
try:
|
||||
r2 = await t2.readLine(6)
|
||||
finally:
|
||||
await t2.closeWait()
|
||||
|
||||
var t3 = await connect(address)
|
||||
try:
|
||||
r3 = await t3.readLine(8)
|
||||
finally:
|
||||
await t3.closeWait()
|
||||
|
||||
var t4 = await connect(address)
|
||||
try:
|
||||
r4 = await t4.readLine(8)
|
||||
finally:
|
||||
await t4.closeWait()
|
||||
|
||||
var t5 = await connect(address)
|
||||
try:
|
||||
r5 = await t5.readLine()
|
||||
finally:
|
||||
await t5.closeWait()
|
||||
|
||||
doAssert(r1 == "DATA")
|
||||
doAssert(r2 == "DATA\r\r")
|
||||
doAssert(r3 == "DATA\r\r\r\r")
|
||||
doAssert(r4 == "DATA\r\r\r\r")
|
||||
doAssert(r5 == "DATA\r\r\r\r")
|
||||
|
||||
result = true
|
||||
finally:
|
||||
server.stop()
|
||||
server.close()
|
||||
await server.join()
|
||||
proc readLV(transp: StreamTransport,
|
||||
maxLen: int): Future[seq[byte]] {.async.} =
|
||||
# Read length-prefixed value where length is a 32-bit integer in native
|
||||
# endian (don't do this at home)
|
||||
var
|
||||
valueLen = 0'u32
|
||||
res: seq[byte]
|
||||
error: ref Exception
|
||||
|
||||
proc predicate(data: openarray[byte]): tuple[consumed: int, done: bool] =
|
||||
if len(data) == 0:
|
||||
# There will be no more data, length-value incomplete
|
||||
error = newException(TransportIncompleteError, "LV incomplete")
|
||||
return (0, true)
|
||||
|
||||
var consumed = 0
|
||||
|
||||
if valueLen == 0:
|
||||
if len(data) < 4:
|
||||
return (0, false)
|
||||
copyMem(addr valueLen, unsafeAddr data[0], sizeof(valueLen))
|
||||
if valueLen == 0:
|
||||
return (sizeof(valueLen), true)
|
||||
if int(valueLen) > maxLen:
|
||||
error = newException(ValueError, "Size is too big")
|
||||
return (sizeof(valueLen), true)
|
||||
consumed += sizeof(valueLen)
|
||||
|
||||
let
|
||||
dataLeft = len(data) - consumed
|
||||
count = min(dataLeft, int(valueLen) - len(res))
|
||||
|
||||
res.add(data.toOpenArray(consumed, count + consumed - 1))
|
||||
return (consumed + count, len(res) == int(valueLen))
|
||||
|
||||
await transp.readMessage(predicate)
|
||||
if not isNil(error):
|
||||
raise error
|
||||
else:
|
||||
return res
|
||||
|
||||
proc createMessage(size: uint32): seq[byte] =
|
||||
var message = "MESSAGE"
|
||||
result = newSeq[byte](int(size))
|
||||
for i in 0 ..< size:
|
||||
result[int(i)] = byte(message[int(i) mod len(message)])
|
||||
|
||||
proc createLVMessage(size: uint32): seq[byte] =
|
||||
var message = "MESSAGE"
|
||||
result = newSeq[byte](sizeof(size) + int(size))
|
||||
copyMem(addr result[0], unsafeAddr size, sizeof(size))
|
||||
for i in 0 ..< size:
|
||||
result[int(i) + sizeof(size)] = byte(message[int(i) mod len(message)])
|
||||
|
||||
proc testReadMessage(address: TransportAddress): Future[bool] {.async.} =
|
||||
var state = 0
|
||||
var c1, c2, c3, c4, c5, c6, c7: bool
|
||||
|
||||
proc serveClient(server: StreamServer, transp: StreamTransport) {.async.} =
|
||||
if state == 0:
|
||||
# EOF from the beginning.
|
||||
state = 1
|
||||
await transp.closeWait()
|
||||
elif state == 1:
|
||||
# Message has only zero-size header.
|
||||
var message = createLVMessage(0'u32)
|
||||
discard await transp.write(message)
|
||||
state = 2
|
||||
await transp.closeWait()
|
||||
elif state == 2:
|
||||
# Message has header, but do not have any data at all.
|
||||
var message = createLVMessage(4'u32)
|
||||
message.setLen(4)
|
||||
discard await transp.write(message)
|
||||
state = 3
|
||||
await transp.closeWait()
|
||||
elif state == 3:
|
||||
# Message do not have enough data for specified size in header.
|
||||
var message = createLVMessage(1024'u32)
|
||||
message.setLen(1024)
|
||||
discard await transp.write(message)
|
||||
state = 4
|
||||
await transp.closeWait()
|
||||
elif state == 4:
|
||||
# Good encoded message with oversize.
|
||||
var message = createLVMessage(1024'u32)
|
||||
discard await transp.write(message)
|
||||
state = 5
|
||||
await transp.closeWait()
|
||||
elif state == 5:
|
||||
# Good encoded message.
|
||||
var message = createLVMessage(1024'u32)
|
||||
discard await transp.write(message)
|
||||
state = 6
|
||||
await transp.closeWait()
|
||||
elif state == 6:
|
||||
# Good encoded message with additional data.
|
||||
var message = createLVMessage(1024'u32)
|
||||
discard await transp.write(message)
|
||||
discard await transp.write("DONE")
|
||||
state = 7
|
||||
await transp.closeWait()
|
||||
else:
|
||||
doAssert(false)
|
||||
|
||||
var server = createStreamServer(address, serveClient, {ReuseAddr})
|
||||
server.start()
|
||||
|
||||
var t1 = await connect(address)
|
||||
try:
|
||||
discard await t1.readLV(2000)
|
||||
except TransportIncompleteError:
|
||||
c1 = true
|
||||
finally:
|
||||
await t1.closeWait()
|
||||
|
||||
if not c1:
|
||||
server.stop()
|
||||
server.close()
|
||||
await server.join()
|
||||
return false
|
||||
|
||||
var t2 = await connect(address)
|
||||
try:
|
||||
var r2 = await t2.readLV(2000)
|
||||
c2 = (r2 == @[])
|
||||
finally:
|
||||
await t2.closeWait()
|
||||
|
||||
if not c2:
|
||||
server.stop()
|
||||
server.close()
|
||||
await server.join()
|
||||
return false
|
||||
|
||||
var t3 = await connect(address)
|
||||
try:
|
||||
discard await t3.readLV(2000)
|
||||
except TransportIncompleteError:
|
||||
c3 = true
|
||||
finally:
|
||||
await t3.closeWait()
|
||||
|
||||
if not c3:
|
||||
server.stop()
|
||||
server.close()
|
||||
await server.join()
|
||||
return false
|
||||
|
||||
var t4 = await connect(address)
|
||||
try:
|
||||
discard await t4.readLV(2000)
|
||||
except TransportIncompleteError:
|
||||
c4 = true
|
||||
finally:
|
||||
await t4.closeWait()
|
||||
|
||||
if not c4:
|
||||
server.stop()
|
||||
server.close()
|
||||
await server.join()
|
||||
return false
|
||||
|
||||
var t5 = await connect(address)
|
||||
try:
|
||||
discard await t5.readLV(1000)
|
||||
except ValueError:
|
||||
c5 = true
|
||||
finally:
|
||||
await t5.closeWait()
|
||||
|
||||
if not c5:
|
||||
server.stop()
|
||||
server.close()
|
||||
await server.join()
|
||||
return false
|
||||
|
||||
var t6 = await connect(address)
|
||||
try:
|
||||
var expectMsg = createMessage(1024)
|
||||
var r6 = await t6.readLV(2000)
|
||||
if len(r6) == 1024 and r6 == expectMsg:
|
||||
c6 = true
|
||||
finally:
|
||||
await t6.closeWait()
|
||||
|
||||
if not c6:
|
||||
server.stop()
|
||||
server.close()
|
||||
await server.join()
|
||||
return false
|
||||
|
||||
var t7 = await connect(address)
|
||||
try:
|
||||
var expectMsg = createMessage(1024)
|
||||
var expectDone = "DONE"
|
||||
var r7 = await t7.readLV(2000)
|
||||
if len(r7) == 1024 and r7 == expectMsg:
|
||||
var m = await t7.read(4)
|
||||
if len(m) == 4 and equalMem(addr m[0], addr expectDone[0], 4):
|
||||
c7 = true
|
||||
finally:
|
||||
await t7.closeWait()
|
||||
|
||||
server.stop()
|
||||
server.close()
|
||||
await server.join()
|
||||
result = c7
|
||||
|
||||
for i in 0..<len(addresses):
|
||||
test prefixes[i] & "close(transport) test":
|
||||
check waitFor(testCloseTransport(addresses[i])) == 1
|
||||
|
@ -845,6 +1108,10 @@ suite "Stream Transport test suite":
|
|||
skip()
|
||||
test prefixes[i] & "write() return value test (issue #73)":
|
||||
check waitFor(testWriteReturn(addresses[i])) == true
|
||||
test prefixes[i] & "readLine() partial separator test":
|
||||
check waitFor(testReadLine(addresses[i])) == true
|
||||
test prefixes[i] & "readMessage() test":
|
||||
check waitFor(testReadMessage(addresses[i])) == true
|
||||
|
||||
test "Servers leak test":
|
||||
check getTracker("stream.server").isLeaked() == false
|
||||
|
|
Loading…
Reference in New Issue