diff --git a/chronos/streams/asyncstream.nim b/chronos/streams/asyncstream.nim index dbf7220..e19b799 100644 --- a/chronos/streams/asyncstream.nim +++ b/chronos/streams/asyncstream.nim @@ -567,11 +567,12 @@ proc write*(wstream: AsyncStreamWriter, pbytes: pointer, raise newAsyncStreamIncorrectError("Zero length message") if isNil(wstream.wsource): - var resFut = write(wstream.tsource, pbytes, nbytes) - yield resFut - if resFut.failed(): - raise newAsyncStreamWriteError(resFut.error) - if resFut.read() != nbytes: + var res: int + try: + res = await write(wstream.tsource, pbytes, nbytes) + except: + raise newAsyncStreamWriteError(getCurrentException()) + if res != nbytes: raise newAsyncStreamIncompleteError() else: if isNil(wstream.writerLoop): @@ -582,8 +583,9 @@ proc write*(wstream: AsyncStreamWriter, pbytes: pointer, item.size = nbytes item.future = newFuture[void]("async.stream.write(pointer)") await wstream.queue.put(item) - yield item.future - if item.future.failed(): + try: + await item.future + except: raise newAsyncStreamWriteError(item.future.error) proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte], @@ -604,11 +606,12 @@ proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte], raise newAsyncStreamIncorrectError("Zero length message") if isNil(wstream.wsource): - var resFut = write(wstream.tsource, sbytes, msglen) - yield resFut - if resFut.failed(): - raise newAsyncStreamWriteError(resFut.error) - if resFut.read() != length: + var res: int + try: + res = await write(wstream.tsource, sbytes, msglen) + except: + raise newAsyncStreamWriteError(getCurrentException()) + if res != length: raise newAsyncStreamIncompleteError() else: if isNil(wstream.writerLoop): @@ -622,8 +625,9 @@ proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte], item.size = length item.future = newFuture[void]("async.stream.write(seq)") await wstream.queue.put(item) - yield item.future - if item.future.failed(): + try: + await item.future + except: raise newAsyncStreamWriteError(item.future.error) proc write*(wstream: AsyncStreamWriter, sbytes: string, @@ -643,11 +647,12 @@ proc write*(wstream: AsyncStreamWriter, sbytes: string, raise newAsyncStreamIncorrectError("Zero length message") if isNil(wstream.wsource): - var resFut = write(wstream.tsource, sbytes, msglen) - yield resFut - if resFut.failed(): - raise newAsyncStreamWriteError(resFut.error) - if resFut.read() != length: + var res: int + try: + res = await write(wstream.tsource, sbytes, msglen) + except: + raise newAsyncStreamWriteError(getCurrentException()) + if res != length: raise newAsyncStreamIncompleteError() else: if isNil(wstream.writerLoop): @@ -661,8 +666,9 @@ proc write*(wstream: AsyncStreamWriter, sbytes: string, item.size = length item.future = newFuture[void]("async.stream.write(string)") await wstream.queue.put(item) - yield item.future - if item.future.failed(): + try: + await item.future + except: raise newAsyncStreamWriteError(item.future.error) proc finish*(wstream: AsyncStreamWriter) {.async.} = @@ -678,8 +684,9 @@ proc finish*(wstream: AsyncStreamWriter) {.async.} = item.size = 0 item.future = newFuture[void]("async.stream.finish") await wstream.queue.put(item) - yield item.future - if item.future.failed(): + try: + await item.future + except: raise newAsyncStreamWriteError(item.future.error) proc join*(rw: AsyncStreamRW): Future[void] =