make writeHandler async

This commit is contained in:
Dmitriy Ryajov 2019-09-02 14:46:22 -06:00
parent d63e0c003b
commit ad1eaffdd6

View File

@ -35,14 +35,14 @@ import chronos
import ../stream/lpstream import ../stream/lpstream
type type
WriteHandler* = proc (data: seq[byte]) {.gcsafe.} # TODO: figure out how to make this generic to avoid casts WriteHandler* = proc (data: seq[byte]): Future[void] {.gcsafe.} # TODO: figure out how to make this generic to avoid casts
BufferStream* = ref object of LPStream BufferStream* = ref object of LPStream
maxSize*: int maxSize*: int
readBuf: Deque[byte] # a deque is based on a ring buffer readBuf: Deque[byte] # a deque is based on a ring buffer
readReqs: Deque[Future[int]] # use dequeue to fire reads in order readReqs: Deque[Future[int]] # use dequeue to fire reads in order
dataReadEvent: AsyncEvent dataReadEvent: AsyncEvent
writeHandler: WriteHandler writeHandler*: WriteHandler
proc requestReadBytes(s: BufferStream): Future[int] = proc requestReadBytes(s: BufferStream): Future[int] =
## create a future that will complete when more ## create a future that will complete when more
@ -236,7 +236,7 @@ method write*(s: BufferStream,
## Return number of bytes actually consumed (discarded). ## Return number of bytes actually consumed (discarded).
var buf: seq[byte] = newSeq[byte](nbytes) var buf: seq[byte] = newSeq[byte](nbytes)
copyMem(addr buf[0], pbytes, nbytes) copyMem(addr buf[0], pbytes, nbytes)
s.writeHandler(buf) result = s.writeHandler(buf)
method write*(s: BufferStream, method write*(s: BufferStream,
msg: string, msg: string,
@ -251,7 +251,7 @@ method write*(s: BufferStream,
## stream. ## stream.
var buf = "" var buf = ""
shallowCopy(buf, if msglen > 0: msg[0..<msglen] else: msg) shallowCopy(buf, if msglen > 0: msg[0..<msglen] else: msg)
s.writeHandler(cast[seq[byte]](toSeq(buf.items))) result = s.writeHandler(cast[seq[byte]](toSeq(buf.items)))
method write*(s: BufferStream, method write*(s: BufferStream,
msg: seq[byte], msg: seq[byte],
@ -267,7 +267,7 @@ method write*(s: BufferStream,
## stream. ## stream.
var buf: seq[byte] var buf: seq[byte]
shallowCopy(buf, if msglen > 0: msg[0..<msglen] else: msg) shallowCopy(buf, if msglen > 0: msg[0..<msglen] else: msg)
s.writeHandler(buf) result = s.writeHandler(buf)
method close*(s: BufferStream) {.async, gcsafe.} = method close*(s: BufferStream) {.async, gcsafe.} =
## close the stream and clear the buffer ## close the stream and clear the buffer