Add ability to create copies of AsyncStreams.

This commit is contained in:
cheatfate 2019-06-18 20:11:13 +03:00
parent c113aa1be7
commit 31f4dc3096
No known key found for this signature in database
GPG Key ID: 46ADD633A7201F95
1 changed files with 280 additions and 180 deletions

View File

@ -276,23 +276,26 @@ proc readExactly*(rstream: AsyncStreamReader, pbytes: pointer,
except:
raise newAsyncStreamReadError(getCurrentException())
else:
var index = 0
while true:
let datalen = rstream.buffer.dataLen()
if rstream.state == Error:
raise newAsyncStreamReadError(rstream.error)
if datalen == 0 and rstream.atEof():
raise newAsyncStreamIncompleteError()
if isNil(rstream.readerLoop):
await readExactly(rstream.rsource, pbytes, nbytes)
else:
var index = 0
while true:
let datalen = rstream.buffer.dataLen()
if rstream.state == Error:
raise newAsyncStreamReadError(rstream.error)
if datalen == 0 and rstream.atEof():
raise newAsyncStreamIncompleteError()
if datalen >= (nbytes - index):
rstream.buffer.copyData(pbytes, index, nbytes - index)
rstream.buffer.shift(nbytes - index)
break
else:
rstream.buffer.copyData(pbytes, index, datalen)
index += datalen
rstream.buffer.shift(datalen)
await rstream.buffer.wait()
if datalen >= (nbytes - index):
rstream.buffer.copyData(pbytes, index, nbytes - index)
rstream.buffer.shift(nbytes - index)
break
else:
rstream.buffer.copyData(pbytes, index, datalen)
index += datalen
rstream.buffer.shift(datalen)
await rstream.buffer.wait()
proc readOnce*(rstream: AsyncStreamReader, pbytes: pointer,
nbytes: int): Future[int] {.async.} =
@ -309,21 +312,24 @@ proc readOnce*(rstream: AsyncStreamReader, pbytes: pointer,
except:
raise newAsyncStreamReadError(getCurrentException())
else:
while true:
let datalen = rstream.buffer.dataLen()
if rstream.state == Error:
raise newAsyncStreamReadError(rstream.error)
if datalen == 0:
if rstream.atEof():
result = 0
if isNil(rstream.rsource):
result = await readOnce(rstream.rsource, pbytes, nbytes)
else:
while true:
let datalen = rstream.buffer.dataLen()
if rstream.state == Error:
raise newAsyncStreamReadError(rstream.error)
if datalen == 0:
if rstream.atEof():
result = 0
break
await rstream.buffer.wait()
else:
let size = min(datalen, nbytes)
rstream.buffer.copyData(pbytes, 0, size)
rstream.buffer.shift(size)
result = size
break
await rstream.buffer.wait()
else:
let size = min(datalen, nbytes)
rstream.buffer.copyData(pbytes, 0, size)
rstream.buffer.shift(size)
result = size
break
proc readUntil*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int,
sep: seq[byte]): Future[int] {.async.} =
@ -353,41 +359,44 @@ proc readUntil*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int,
except:
raise newAsyncStreamReadError(getCurrentException())
else:
var
dest = cast[ptr UncheckedArray[byte]](pbytes)
state = 0
k = 0
if isNil(rstream.readerLoop):
result = await readUntil(rstream.rsource, pbytes, nbytes, sep)
else:
var
dest = cast[ptr UncheckedArray[byte]](pbytes)
state = 0
k = 0
while true:
let datalen = rstream.buffer.dataLen()
if rstream.state == Error:
raise newAsyncStreamReadError(rstream.error)
if datalen == 0 and rstream.atEof():
raise newAsyncStreamIncompleteError()
while true:
let datalen = rstream.buffer.dataLen()
if rstream.state == Error:
raise newAsyncStreamReadError(rstream.error)
if datalen == 0 and rstream.atEof():
raise newAsyncStreamIncompleteError()
var index = 0
while index < datalen:
let ch = rstream.buffer[index]
if sep[state] == ch:
inc(state)
else:
state = 0
if k < nbytes:
dest[k] = ch
inc(k)
else:
raise newAsyncStreamLimitError()
if state == len(sep):
break
inc(index)
var index = 0
while index < datalen:
let ch = rstream.buffer[index]
if sep[state] == ch:
inc(state)
else:
state = 0
if k < nbytes:
dest[k] = ch
inc(k)
else:
raise newAsyncStreamLimitError()
if state == len(sep):
rstream.buffer.shift(index + 1)
result = k
break
inc(index)
if state == len(sep):
rstream.buffer.shift(index + 1)
result = k
break
else:
rstream.buffer.shift(datalen)
await rstream.buffer.wait()
else:
rstream.buffer.shift(datalen)
await rstream.buffer.wait()
proc readLine*(rstream: AsyncStreamReader, limit = 0,
sep = "\r\n"): Future[string] {.async.} =
@ -411,41 +420,44 @@ proc readLine*(rstream: AsyncStreamReader, limit = 0,
except:
raise newAsyncStreamReadError(getCurrentException())
else:
var res = ""
var
lim = if limit <= 0: -1 else: limit
state = 0
if isNil(rstream.readerLoop):
result = await readLine(rstream.rsource, limit, sep)
else:
var res = ""
var
lim = if limit <= 0: -1 else: limit
state = 0
while true:
let datalen = rstream.buffer.dataLen()
if rstream.state == Error:
raise newAsyncStreamReadError(rstream.error)
if datalen == 0 and rstream.atEof():
result = res
break
while true:
let datalen = rstream.buffer.dataLen()
if rstream.state == Error:
raise newAsyncStreamReadError(rstream.error)
if datalen == 0 and rstream.atEof():
result = res
break
var index = 0
while index < datalen:
let ch = char(rstream.buffer[index])
if sep[state] == ch:
inc(state)
if state == len(sep) or len(res) == lim:
rstream.buffer.shift(index + 1)
break
var index = 0
while index < datalen:
let ch = char(rstream.buffer[index])
if sep[state] == ch:
inc(state)
if state == len(sep) or len(res) == lim:
rstream.buffer.shift(index + 1)
break
else:
state = 0
res.add(ch)
if len(res) == lim:
rstream.buffer.shift(index + 1)
break
inc(index)
if state == len(sep) or (lim == len(res)):
result = res
break
else:
state = 0
res.add(ch)
if len(res) == lim:
rstream.buffer.shift(index + 1)
break
inc(index)
if state == len(sep) or (lim == len(res)):
result = res
break
else:
rstream.buffer.shift(datalen)
await rstream.buffer.wait()
rstream.buffer.shift(datalen)
await rstream.buffer.wait()
proc read*(rstream: AsyncStreamReader, n = 0): Future[seq[byte]] {.async.} =
## Read all bytes (n <= 0) or exactly `n` bytes from read-only stream
@ -461,36 +473,39 @@ proc read*(rstream: AsyncStreamReader, n = 0): Future[seq[byte]] {.async.} =
except:
raise newAsyncStreamReadError(getCurrentException())
else:
var res = newSeq[byte]()
while true:
let datalen = rstream.buffer.dataLen()
if rstream.state == Error:
raise newAsyncStreamReadError(rstream.error)
if datalen == 0 and rstream.atEof():
result = res
break
if isNil(rstream.readerLoop):
result = await read(rstream.rsource, n)
else:
var res = newSeq[byte]()
while true:
let datalen = rstream.buffer.dataLen()
if rstream.state == Error:
raise newAsyncStreamReadError(rstream.error)
if datalen == 0 and rstream.atEof():
result = res
break
if datalen > 0:
let s = len(res)
let o = s + datalen
if n <= 0:
res.setLen(o)
rstream.buffer.copyData(addr res[s], 0, datalen)
rstream.buffer.shift(datalen)
else:
let left = n - s
if datalen >= left:
res.setLen(n)
rstream.buffer.copyData(addr res[s], 0, left)
rstream.buffer.shift(left)
result = res
break
else:
if datalen > 0:
let s = len(res)
let o = s + datalen
if n <= 0:
res.setLen(o)
rstream.buffer.copyData(addr res[s], 0, datalen)
rstream.buffer.shift(datalen)
else:
let left = n - s
if datalen >= left:
res.setLen(n)
rstream.buffer.copyData(addr res[s], 0, left)
rstream.buffer.shift(left)
result = res
break
else:
res.setLen(o)
rstream.buffer.copyData(addr res[s], 0, datalen)
rstream.buffer.shift(datalen)
await rstream.buffer.wait()
await rstream.buffer.wait()
proc consume*(rstream: AsyncStreamReader, n = -1): Future[int] {.async.} =
## Consume (discard) all bytes (n <= 0) or ``n`` bytes from read-only stream
@ -508,34 +523,37 @@ proc consume*(rstream: AsyncStreamReader, n = -1): Future[int] {.async.} =
except:
raise newAsyncStreamReadError(getCurrentException())
else:
var res = 0
while true:
let datalen = rstream.buffer.dataLen()
if rstream.state == Error:
raise newAsyncStreamReadError(rstream.error)
if datalen == 0:
if rstream.atEof():
if n <= 0:
result = res
break
else:
raise newAsyncStreamLimitError()
else:
if n <= 0:
res += datalen
rstream.buffer.shift(datalen)
if isNil(rstream.readerLoop):
result = await consume(rstream.rsource, n)
else:
var res = 0
while true:
let datalen = rstream.buffer.dataLen()
if rstream.state == Error:
raise newAsyncStreamReadError(rstream.error)
if datalen == 0:
if rstream.atEof():
if n <= 0:
result = res
break
else:
raise newAsyncStreamLimitError()
else:
let left = n - res
if datalen >= left:
res += left
rstream.buffer.shift(left)
result = res
break
else:
if n <= 0:
res += datalen
rstream.buffer.shift(datalen)
else:
let left = n - res
if datalen >= left:
res += left
rstream.buffer.shift(left)
result = res
break
else:
res += datalen
rstream.buffer.shift(datalen)
await rstream.buffer.wait()
await rstream.buffer.wait()
proc write*(wstream: AsyncStreamWriter, pbytes: pointer,
nbytes: int) {.async.} =
@ -556,14 +574,17 @@ proc write*(wstream: AsyncStreamWriter, pbytes: pointer,
if resFut.read() != nbytes:
raise newAsyncStreamIncompleteError()
else:
var item = WriteItem(kind: Pointer)
item.data1 = pbytes
item.size = nbytes
item.future = newFuture[void]("async.stream.write(pointer)")
await wstream.queue.put(item)
yield item.future
if item.future.failed:
raise newAsyncStreamWriteError(item.future.error)
if isNil(wstream.writerLoop):
await write(wstream.wsource, pbytes, nbytes)
else:
var item = WriteItem(kind: Pointer)
item.data1 = pbytes
item.size = nbytes
item.future = newFuture[void]("async.stream.write(pointer)")
await wstream.queue.put(item)
yield item.future
if item.future.failed:
raise newAsyncStreamWriteError(item.future.error)
proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte],
msglen = -1) {.async.} =
@ -583,24 +604,27 @@ proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte],
raise newAsyncStreamIncorrectError("Zero length message")
if isNil(wstream.wsource):
var resFut = write(wstream.tsource, sbytes)
var resFut = write(wstream.tsource, sbytes, msglen)
yield resFut
if resFut.failed:
raise newAsyncStreamWriteError(resFut.error)
if resFut.read() != length:
raise newAsyncStreamIncompleteError()
else:
var item = WriteItem(kind: Sequence)
if not isLiteral(sbytes):
shallowCopy(item.data2, sbytes)
if isNil(wstream.writerLoop):
await write(wstream.wsource, sbytes, msglen)
else:
item.data2 = sbytes
item.size = length
item.future = newFuture[void]("async.stream.write(seq)")
await wstream.queue.put(item)
yield item.future
if item.future.failed:
raise newAsyncStreamWriteError(item.future.error)
var item = WriteItem(kind: Sequence)
if not isLiteral(sbytes):
shallowCopy(item.data2, sbytes)
else:
item.data2 = sbytes
item.size = length
item.future = newFuture[void]("async.stream.write(seq)")
await wstream.queue.put(item)
yield item.future
if item.future.failed:
raise newAsyncStreamWriteError(item.future.error)
proc write*(wstream: AsyncStreamWriter, sbytes: string,
msglen = -1) {.async.} =
@ -626,17 +650,20 @@ proc write*(wstream: AsyncStreamWriter, sbytes: string,
if resFut.read() != length:
raise newAsyncStreamIncompleteError()
else:
var item = WriteItem(kind: String)
if not isLiteral(sbytes):
shallowCopy(item.data3, sbytes)
if isNil(wstream.writerLoop):
await write(wstream.wsource, sbytes, msglen)
else:
item.data3 = sbytes
item.size = length
item.future = newFuture[void]("async.stream.write(string)")
await wstream.queue.put(item)
yield item.future
if item.future.failed:
raise newAsyncStreamWriteError(item.future.error)
var item = WriteItem(kind: String)
if not isLiteral(sbytes):
shallowCopy(item.data3, sbytes)
else:
item.data3 = sbytes
item.size = length
item.future = newFuture[void]("async.stream.write(string)")
await wstream.queue.put(item)
yield item.future
if item.future.failed:
raise newAsyncStreamWriteError(item.future.error)
proc finish*(wstream: AsyncStreamWriter) {.async.} =
## Finish write stream ``wstream``.
@ -644,13 +671,16 @@ proc finish*(wstream: AsyncStreamWriter) {.async.} =
raise newAsyncStreamIncorrectError("Incorrect stream state")
if not isNil(wstream.wsource):
var item = WriteItem(kind: Pointer)
item.size = 0
item.future = newFuture[void]("async.stream.finish")
await wstream.queue.put(item)
yield item.future
if item.future.failed:
raise newAsyncStreamWriteError(item.future.error)
if isNil(wstream.writerLoop):
await wstream.wsource.finish()
else:
var item = WriteItem(kind: Pointer)
item.size = 0
item.future = newFuture[void]("async.stream.finish")
await wstream.queue.put(item)
yield item.future
if item.future.failed:
raise newAsyncStreamWriteError(item.future.error)
proc join*(rw: AsyncStreamRW): Future[void] =
## Get Future[void] which will be completed when stream become finished or
@ -684,12 +714,12 @@ proc close*(rw: AsyncStreamRW) =
untrackAsyncStreamWriter(rw)
when rw is AsyncStreamReader:
if isNil(rw.rsource):
if isNil(rw.rsource) or isNil(rw.readerLoop):
callSoon(continuation)
else:
rw.exevent.fire()
elif rw is AsyncStreamWriter:
if isNil(rw.wsource):
if isNil(rw.wsource) or isNil(rw.writerLoop):
callSoon(continuation)
else:
rw.exevent.fire()
@ -787,6 +817,27 @@ proc init*[T](child: AsyncStreamWriter, tsource: StreamTransport,
trackAsyncStreamWriter(child)
child.startWriter()
proc init*(child, wsource: AsyncStreamWriter) =
## Initialize newly allocated object ``child`` with AsyncStreamWriter
## parameters.
child.writerLoop = nil
child.wsource = wsource
child.tsource = wsource.tsource
trackAsyncStreamWriter(child)
child.startWriter()
proc init*[T](child, wsource: AsyncStreamWriter, udata: ref T) =
## Initialize newly allocated object ``child`` with AsyncStreamWriter
## parameters.
child.writerLoop = nil
child.wsource = wsource
child.tsource = wsource.tsource
if not isNil(udata):
GC_ref(udata)
child.udata = cast[pointer](udata)
trackAsyncStreamWriter(child)
child.startWriter()
proc init*(child: AsyncStreamReader, tsource: StreamTransport) =
## Initialize newly allocated object ``child`` with AsyncStreamReader
## parameters.
@ -809,6 +860,27 @@ proc init*[T](child: AsyncStreamReader, tsource: StreamTransport,
trackAsyncStreamReader(child)
child.startReader()
proc init*(child, rsource: AsyncStreamReader) =
## Initialize newly allocated object ``child`` with AsyncStreamReader
## parameters.
child.readerLoop = nil
child.rsource = rsource
child.tsource = rsource.tsource
trackAsyncStreamReader(child)
child.startReader()
proc init*[T](child, rsource: AsyncStreamReader, udata: ref T) =
## Initialize newly allocated object ``child`` with AsyncStreamReader
## parameters.
child.readerLoop = nil
child.rsource = rsource
child.tsource = rsource.tsource
if not isNil(udata):
GC_ref(udata)
child.udata = cast[pointer](udata)
trackAsyncStreamReader(child)
child.startReader()
proc newAsyncStreamReader*[T](rsource: AsyncStreamReader,
loop: StreamReaderLoop,
bufferSize = AsyncStreamDefaultBufferSize,
@ -899,6 +971,34 @@ proc newAsyncStreamWriter*(tsource: StreamTransport): AsyncStreamWriter =
result = new AsyncStreamWriter
result.init(tsource)
proc newAsyncStreamWriter*[T](wsource: AsyncStreamWriter,
udata: ref T): AsyncStreamWriter =
## Create copy of AsyncStreamWriter object ``wsource``.
##
## ``udata`` - user object which will be associated with new AsyncStreamWriter
## object.
result = new AsyncStreamWriter
result.init(wsource, udata)
proc newAsyncStreamWriter*(wsource: AsyncStreamWriter): AsyncStreamWriter =
## Create copy of AsyncStreamWriter object ``wsource``.
result = new AsyncStreamWriter
result.init(wsource)
proc newAsyncStreamReader*[T](rsource: AsyncStreamWriter,
udata: ref T): AsyncStreamWriter =
## Create copy of AsyncStreamReader object ``rsource``.
##
## ``udata`` - user object which will be associated with new AsyncStreamReader
## object.
result = new AsyncStreamReader
result.init(rsource, udata)
proc newAsyncStreamReader*(rsource: AsyncStreamReader): AsyncStreamReader =
## Create copy of AsyncStreamReader object ``rsource``.
result = new AsyncStreamReader
result.init(rsource)
proc getUserData*[T](rw: AsyncStreamRW): T {.inline.} =
## Obtain user data associated with AsyncStreamReader or AsyncStreamWriter
## object ``rw``.