diff --git a/chronos/streams/asyncstream.nim b/chronos/streams/asyncstream.nim index a61874f..eceea21 100644 --- a/chronos/streams/asyncstream.nim +++ b/chronos/streams/asyncstream.nim @@ -276,23 +276,26 @@ proc readExactly*(rstream: AsyncStreamReader, pbytes: pointer, except: raise newAsyncStreamReadError(getCurrentException()) else: - var index = 0 - while true: - let datalen = rstream.buffer.dataLen() - if rstream.state == Error: - raise newAsyncStreamReadError(rstream.error) - if datalen == 0 and rstream.atEof(): - raise newAsyncStreamIncompleteError() + if isNil(rstream.readerLoop): + await readExactly(rstream.rsource, pbytes, nbytes) + else: + var index = 0 + while true: + let datalen = rstream.buffer.dataLen() + if rstream.state == Error: + raise newAsyncStreamReadError(rstream.error) + if datalen == 0 and rstream.atEof(): + raise newAsyncStreamIncompleteError() - if datalen >= (nbytes - index): - rstream.buffer.copyData(pbytes, index, nbytes - index) - rstream.buffer.shift(nbytes - index) - break - else: - rstream.buffer.copyData(pbytes, index, datalen) - index += datalen - rstream.buffer.shift(datalen) - await rstream.buffer.wait() + if datalen >= (nbytes - index): + rstream.buffer.copyData(pbytes, index, nbytes - index) + rstream.buffer.shift(nbytes - index) + break + else: + rstream.buffer.copyData(pbytes, index, datalen) + index += datalen + rstream.buffer.shift(datalen) + await rstream.buffer.wait() proc readOnce*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int): Future[int] {.async.} = @@ -309,21 +312,24 @@ proc readOnce*(rstream: AsyncStreamReader, pbytes: pointer, except: raise newAsyncStreamReadError(getCurrentException()) else: - while true: - let datalen = rstream.buffer.dataLen() - if rstream.state == Error: - raise newAsyncStreamReadError(rstream.error) - if datalen == 0: - if rstream.atEof(): - result = 0 + if isNil(rstream.rsource): + result = await readOnce(rstream.rsource, pbytes, nbytes) + else: + while true: + let datalen = rstream.buffer.dataLen() + if rstream.state == Error: + raise newAsyncStreamReadError(rstream.error) + if datalen == 0: + if rstream.atEof(): + result = 0 + break + await rstream.buffer.wait() + else: + let size = min(datalen, nbytes) + rstream.buffer.copyData(pbytes, 0, size) + rstream.buffer.shift(size) + result = size break - await rstream.buffer.wait() - else: - let size = min(datalen, nbytes) - rstream.buffer.copyData(pbytes, 0, size) - rstream.buffer.shift(size) - result = size - break proc readUntil*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int, sep: seq[byte]): Future[int] {.async.} = @@ -353,41 +359,44 @@ proc readUntil*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int, except: raise newAsyncStreamReadError(getCurrentException()) else: - var - dest = cast[ptr UncheckedArray[byte]](pbytes) - state = 0 - k = 0 + if isNil(rstream.readerLoop): + result = await readUntil(rstream.rsource, pbytes, nbytes, sep) + else: + var + dest = cast[ptr UncheckedArray[byte]](pbytes) + state = 0 + k = 0 - while true: - let datalen = rstream.buffer.dataLen() - if rstream.state == Error: - raise newAsyncStreamReadError(rstream.error) - if datalen == 0 and rstream.atEof(): - raise newAsyncStreamIncompleteError() + while true: + let datalen = rstream.buffer.dataLen() + if rstream.state == Error: + raise newAsyncStreamReadError(rstream.error) + if datalen == 0 and rstream.atEof(): + raise newAsyncStreamIncompleteError() + + var index = 0 + while index < datalen: + let ch = rstream.buffer[index] + if sep[state] == ch: + inc(state) + else: + state = 0 + if k < nbytes: + dest[k] = ch + inc(k) + else: + raise newAsyncStreamLimitError() + if state == len(sep): + break + inc(index) - var index = 0 - while index < datalen: - let ch = rstream.buffer[index] - if sep[state] == ch: - inc(state) - else: - state = 0 - if k < nbytes: - dest[k] = ch - inc(k) - else: - raise newAsyncStreamLimitError() if state == len(sep): + rstream.buffer.shift(index + 1) + result = k break - inc(index) - - if state == len(sep): - rstream.buffer.shift(index + 1) - result = k - break - else: - rstream.buffer.shift(datalen) - await rstream.buffer.wait() + else: + rstream.buffer.shift(datalen) + await rstream.buffer.wait() proc readLine*(rstream: AsyncStreamReader, limit = 0, sep = "\r\n"): Future[string] {.async.} = @@ -411,41 +420,44 @@ proc readLine*(rstream: AsyncStreamReader, limit = 0, except: raise newAsyncStreamReadError(getCurrentException()) else: - var res = "" - var - lim = if limit <= 0: -1 else: limit - state = 0 + if isNil(rstream.readerLoop): + result = await readLine(rstream.rsource, limit, sep) + else: + var res = "" + var + lim = if limit <= 0: -1 else: limit + state = 0 - while true: - let datalen = rstream.buffer.dataLen() - if rstream.state == Error: - raise newAsyncStreamReadError(rstream.error) - if datalen == 0 and rstream.atEof(): - result = res - break + while true: + let datalen = rstream.buffer.dataLen() + if rstream.state == Error: + raise newAsyncStreamReadError(rstream.error) + if datalen == 0 and rstream.atEof(): + result = res + break - var index = 0 - while index < datalen: - let ch = char(rstream.buffer[index]) - if sep[state] == ch: - inc(state) - if state == len(sep) or len(res) == lim: - rstream.buffer.shift(index + 1) - break + var index = 0 + while index < datalen: + let ch = char(rstream.buffer[index]) + if sep[state] == ch: + inc(state) + if state == len(sep) or len(res) == lim: + rstream.buffer.shift(index + 1) + break + else: + state = 0 + res.add(ch) + if len(res) == lim: + rstream.buffer.shift(index + 1) + break + inc(index) + + if state == len(sep) or (lim == len(res)): + result = res + break else: - state = 0 - res.add(ch) - if len(res) == lim: - rstream.buffer.shift(index + 1) - break - inc(index) - - if state == len(sep) or (lim == len(res)): - result = res - break - else: - rstream.buffer.shift(datalen) - await rstream.buffer.wait() + rstream.buffer.shift(datalen) + await rstream.buffer.wait() proc read*(rstream: AsyncStreamReader, n = 0): Future[seq[byte]] {.async.} = ## Read all bytes (n <= 0) or exactly `n` bytes from read-only stream @@ -461,36 +473,39 @@ proc read*(rstream: AsyncStreamReader, n = 0): Future[seq[byte]] {.async.} = except: raise newAsyncStreamReadError(getCurrentException()) else: - var res = newSeq[byte]() - while true: - let datalen = rstream.buffer.dataLen() - if rstream.state == Error: - raise newAsyncStreamReadError(rstream.error) - if datalen == 0 and rstream.atEof(): - result = res - break + if isNil(rstream.readerLoop): + result = await read(rstream.rsource, n) + else: + var res = newSeq[byte]() + while true: + let datalen = rstream.buffer.dataLen() + if rstream.state == Error: + raise newAsyncStreamReadError(rstream.error) + if datalen == 0 and rstream.atEof(): + result = res + break - if datalen > 0: - let s = len(res) - let o = s + datalen - if n <= 0: - res.setLen(o) - rstream.buffer.copyData(addr res[s], 0, datalen) - rstream.buffer.shift(datalen) - else: - let left = n - s - if datalen >= left: - res.setLen(n) - rstream.buffer.copyData(addr res[s], 0, left) - rstream.buffer.shift(left) - result = res - break - else: + if datalen > 0: + let s = len(res) + let o = s + datalen + if n <= 0: res.setLen(o) rstream.buffer.copyData(addr res[s], 0, datalen) rstream.buffer.shift(datalen) + else: + let left = n - s + if datalen >= left: + res.setLen(n) + rstream.buffer.copyData(addr res[s], 0, left) + rstream.buffer.shift(left) + result = res + break + else: + res.setLen(o) + rstream.buffer.copyData(addr res[s], 0, datalen) + rstream.buffer.shift(datalen) - await rstream.buffer.wait() + await rstream.buffer.wait() proc consume*(rstream: AsyncStreamReader, n = -1): Future[int] {.async.} = ## Consume (discard) all bytes (n <= 0) or ``n`` bytes from read-only stream @@ -508,34 +523,37 @@ proc consume*(rstream: AsyncStreamReader, n = -1): Future[int] {.async.} = except: raise newAsyncStreamReadError(getCurrentException()) else: - var res = 0 - while true: - let datalen = rstream.buffer.dataLen() - if rstream.state == Error: - raise newAsyncStreamReadError(rstream.error) - if datalen == 0: - if rstream.atEof(): - if n <= 0: - result = res - break - else: - raise newAsyncStreamLimitError() - else: - if n <= 0: - res += datalen - rstream.buffer.shift(datalen) + if isNil(rstream.readerLoop): + result = await consume(rstream.rsource, n) + else: + var res = 0 + while true: + let datalen = rstream.buffer.dataLen() + if rstream.state == Error: + raise newAsyncStreamReadError(rstream.error) + if datalen == 0: + if rstream.atEof(): + if n <= 0: + result = res + break + else: + raise newAsyncStreamLimitError() else: - let left = n - res - if datalen >= left: - res += left - rstream.buffer.shift(left) - result = res - break - else: + if n <= 0: res += datalen rstream.buffer.shift(datalen) + else: + let left = n - res + if datalen >= left: + res += left + rstream.buffer.shift(left) + result = res + break + else: + res += datalen + rstream.buffer.shift(datalen) - await rstream.buffer.wait() + await rstream.buffer.wait() proc write*(wstream: AsyncStreamWriter, pbytes: pointer, nbytes: int) {.async.} = @@ -556,14 +574,17 @@ proc write*(wstream: AsyncStreamWriter, pbytes: pointer, if resFut.read() != nbytes: raise newAsyncStreamIncompleteError() else: - var item = WriteItem(kind: Pointer) - item.data1 = pbytes - item.size = nbytes - item.future = newFuture[void]("async.stream.write(pointer)") - await wstream.queue.put(item) - yield item.future - if item.future.failed: - raise newAsyncStreamWriteError(item.future.error) + if isNil(wstream.writerLoop): + await write(wstream.wsource, pbytes, nbytes) + else: + var item = WriteItem(kind: Pointer) + item.data1 = pbytes + item.size = nbytes + item.future = newFuture[void]("async.stream.write(pointer)") + await wstream.queue.put(item) + yield item.future + if item.future.failed: + raise newAsyncStreamWriteError(item.future.error) proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte], msglen = -1) {.async.} = @@ -583,24 +604,27 @@ proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte], raise newAsyncStreamIncorrectError("Zero length message") if isNil(wstream.wsource): - var resFut = write(wstream.tsource, sbytes) + var resFut = write(wstream.tsource, sbytes, msglen) yield resFut if resFut.failed: raise newAsyncStreamWriteError(resFut.error) if resFut.read() != length: raise newAsyncStreamIncompleteError() else: - var item = WriteItem(kind: Sequence) - if not isLiteral(sbytes): - shallowCopy(item.data2, sbytes) + if isNil(wstream.writerLoop): + await write(wstream.wsource, sbytes, msglen) else: - item.data2 = sbytes - item.size = length - item.future = newFuture[void]("async.stream.write(seq)") - await wstream.queue.put(item) - yield item.future - if item.future.failed: - raise newAsyncStreamWriteError(item.future.error) + var item = WriteItem(kind: Sequence) + if not isLiteral(sbytes): + shallowCopy(item.data2, sbytes) + else: + item.data2 = sbytes + item.size = length + item.future = newFuture[void]("async.stream.write(seq)") + await wstream.queue.put(item) + yield item.future + if item.future.failed: + raise newAsyncStreamWriteError(item.future.error) proc write*(wstream: AsyncStreamWriter, sbytes: string, msglen = -1) {.async.} = @@ -626,17 +650,20 @@ proc write*(wstream: AsyncStreamWriter, sbytes: string, if resFut.read() != length: raise newAsyncStreamIncompleteError() else: - var item = WriteItem(kind: String) - if not isLiteral(sbytes): - shallowCopy(item.data3, sbytes) + if isNil(wstream.writerLoop): + await write(wstream.wsource, sbytes, msglen) else: - item.data3 = sbytes - item.size = length - item.future = newFuture[void]("async.stream.write(string)") - await wstream.queue.put(item) - yield item.future - if item.future.failed: - raise newAsyncStreamWriteError(item.future.error) + var item = WriteItem(kind: String) + if not isLiteral(sbytes): + shallowCopy(item.data3, sbytes) + else: + item.data3 = sbytes + item.size = length + item.future = newFuture[void]("async.stream.write(string)") + await wstream.queue.put(item) + yield item.future + if item.future.failed: + raise newAsyncStreamWriteError(item.future.error) proc finish*(wstream: AsyncStreamWriter) {.async.} = ## Finish write stream ``wstream``. @@ -644,13 +671,16 @@ proc finish*(wstream: AsyncStreamWriter) {.async.} = raise newAsyncStreamIncorrectError("Incorrect stream state") if not isNil(wstream.wsource): - var item = WriteItem(kind: Pointer) - item.size = 0 - item.future = newFuture[void]("async.stream.finish") - await wstream.queue.put(item) - yield item.future - if item.future.failed: - raise newAsyncStreamWriteError(item.future.error) + if isNil(wstream.writerLoop): + await wstream.wsource.finish() + else: + var item = WriteItem(kind: Pointer) + item.size = 0 + item.future = newFuture[void]("async.stream.finish") + await wstream.queue.put(item) + yield item.future + if item.future.failed: + raise newAsyncStreamWriteError(item.future.error) proc join*(rw: AsyncStreamRW): Future[void] = ## Get Future[void] which will be completed when stream become finished or @@ -684,12 +714,12 @@ proc close*(rw: AsyncStreamRW) = untrackAsyncStreamWriter(rw) when rw is AsyncStreamReader: - if isNil(rw.rsource): + if isNil(rw.rsource) or isNil(rw.readerLoop): callSoon(continuation) else: rw.exevent.fire() elif rw is AsyncStreamWriter: - if isNil(rw.wsource): + if isNil(rw.wsource) or isNil(rw.writerLoop): callSoon(continuation) else: rw.exevent.fire() @@ -787,6 +817,27 @@ proc init*[T](child: AsyncStreamWriter, tsource: StreamTransport, trackAsyncStreamWriter(child) child.startWriter() +proc init*(child, wsource: AsyncStreamWriter) = + ## Initialize newly allocated object ``child`` with AsyncStreamWriter + ## parameters. + child.writerLoop = nil + child.wsource = wsource + child.tsource = wsource.tsource + trackAsyncStreamWriter(child) + child.startWriter() + +proc init*[T](child, wsource: AsyncStreamWriter, udata: ref T) = + ## Initialize newly allocated object ``child`` with AsyncStreamWriter + ## parameters. + child.writerLoop = nil + child.wsource = wsource + child.tsource = wsource.tsource + if not isNil(udata): + GC_ref(udata) + child.udata = cast[pointer](udata) + trackAsyncStreamWriter(child) + child.startWriter() + proc init*(child: AsyncStreamReader, tsource: StreamTransport) = ## Initialize newly allocated object ``child`` with AsyncStreamReader ## parameters. @@ -809,6 +860,27 @@ proc init*[T](child: AsyncStreamReader, tsource: StreamTransport, trackAsyncStreamReader(child) child.startReader() +proc init*(child, rsource: AsyncStreamReader) = + ## Initialize newly allocated object ``child`` with AsyncStreamReader + ## parameters. + child.readerLoop = nil + child.rsource = rsource + child.tsource = rsource.tsource + trackAsyncStreamReader(child) + child.startReader() + +proc init*[T](child, rsource: AsyncStreamReader, udata: ref T) = + ## Initialize newly allocated object ``child`` with AsyncStreamReader + ## parameters. + child.readerLoop = nil + child.rsource = rsource + child.tsource = rsource.tsource + if not isNil(udata): + GC_ref(udata) + child.udata = cast[pointer](udata) + trackAsyncStreamReader(child) + child.startReader() + proc newAsyncStreamReader*[T](rsource: AsyncStreamReader, loop: StreamReaderLoop, bufferSize = AsyncStreamDefaultBufferSize, @@ -899,6 +971,34 @@ proc newAsyncStreamWriter*(tsource: StreamTransport): AsyncStreamWriter = result = new AsyncStreamWriter result.init(tsource) +proc newAsyncStreamWriter*[T](wsource: AsyncStreamWriter, + udata: ref T): AsyncStreamWriter = + ## Create copy of AsyncStreamWriter object ``wsource``. + ## + ## ``udata`` - user object which will be associated with new AsyncStreamWriter + ## object. + result = new AsyncStreamWriter + result.init(wsource, udata) + +proc newAsyncStreamWriter*(wsource: AsyncStreamWriter): AsyncStreamWriter = + ## Create copy of AsyncStreamWriter object ``wsource``. + result = new AsyncStreamWriter + result.init(wsource) + +proc newAsyncStreamReader*[T](rsource: AsyncStreamWriter, + udata: ref T): AsyncStreamWriter = + ## Create copy of AsyncStreamReader object ``rsource``. + ## + ## ``udata`` - user object which will be associated with new AsyncStreamReader + ## object. + result = new AsyncStreamReader + result.init(rsource, udata) + +proc newAsyncStreamReader*(rsource: AsyncStreamReader): AsyncStreamReader = + ## Create copy of AsyncStreamReader object ``rsource``. + result = new AsyncStreamReader + result.init(rsource) + proc getUserData*[T](rw: AsyncStreamRW): T {.inline.} = ## Obtain user data associated with AsyncStreamReader or AsyncStreamWriter ## object ``rw``.