Change AsyncStream close procedure from events to cancellation.

This commit is contained in:
cheatfate 2019-10-08 10:28:43 +03:00
parent b0fe8398e8
commit 3c928918a4
No known key found for this signature in database
GPG Key ID: 46ADD633A7201F95
2 changed files with 145 additions and 199 deletions

View File

@ -38,6 +38,7 @@ type
of String:
data3*: string
size*: int
offset*: int
future*: Future[void]
AsyncStreamState* = enum
@ -57,7 +58,6 @@ type
tsource*: StreamTransport
readerLoop*: StreamReaderLoop
state*: AsyncStreamState
exevent*: AsyncEvent
buffer*: AsyncBuffer
udata: pointer
error*: ref Exception
@ -68,7 +68,6 @@ type
tsource*: StreamTransport
writerLoop*: StreamWriterLoop
state*: AsyncStreamState
exevent*: AsyncEvent
queue*: AsyncQueue[WriteItem]
udata: pointer
future: Future[void]
@ -730,17 +729,23 @@ proc close*(rw: AsyncStreamRW) =
untrackAsyncStreamWriter(rw)
when rw is AsyncStreamReader:
if isNil(rw.rsource) or isNil(rw.readerLoop):
if isNil(rw.rsource) or isNil(rw.readerLoop) or isNil(rw.future):
callSoon(continuation)
else:
rw.exevent.fire()
rw.future.addCallback(continuation)
if rw.future.finished():
callSoon(continuation)
else:
rw.future.cancel()
rw.future.addCallback(continuation)
elif rw is AsyncStreamWriter:
if isNil(rw.wsource) or isNil(rw.writerLoop):
if isNil(rw.wsource) or isNil(rw.writerLoop) or isNil(rw.future):
callSoon(continuation)
else:
rw.exevent.fire()
rw.future.addCallback(continuation)
if rw.future.finished():
callSoon(continuation)
else:
rw.future.cancel()
rw.future.addCallback(continuation)
proc closeWait*(rw: AsyncStreamRW): Future[void] =
## Close and frees resources of stream ``rw``.
@ -768,7 +773,6 @@ proc init*(child, wsource: AsyncStreamWriter, loop: StreamWriterLoop,
child.writerLoop = loop
child.wsource = wsource
child.tsource = wsource.tsource
child.exevent = newAsyncEvent()
child.queue = newAsyncQueue[WriteItem](queueSize)
trackAsyncStreamWriter(child)
child.startWriter()
@ -780,7 +784,6 @@ proc init*[T](child, wsource: AsyncStreamWriter, loop: StreamWriterLoop,
child.writerLoop = loop
child.wsource = wsource
child.tsource = wsource.tsource
child.exevent = newAsyncEvent()
child.queue = newAsyncQueue[WriteItem](queueSize)
if not isNil(udata):
GC_ref(udata)
@ -795,7 +798,6 @@ proc init*(child, rsource: AsyncStreamReader, loop: StreamReaderLoop,
child.readerLoop = loop
child.rsource = rsource
child.tsource = rsource.tsource
child.exevent = newAsyncEvent()
child.buffer = AsyncBuffer.init(bufferSize)
trackAsyncStreamReader(child)
child.startReader()
@ -808,7 +810,6 @@ proc init*[T](child, rsource: AsyncStreamReader, loop: StreamReaderLoop,
child.readerLoop = loop
child.rsource = rsource
child.tsource = rsource.tsource
child.exevent = newAsyncEvent()
child.buffer = AsyncBuffer.init(bufferSize)
if not isNil(udata):
GC_ref(udata)

View File

@ -27,21 +27,6 @@ type
proc newProtocolError(): ref Exception {.inline.} =
newException(ChunkedStreamProtocolError, "Protocol error!")
proc oneOf*[A, B](fut1: Future[A], fut2: Future[B]): Future[void] =
## Wait for completion of `fut1` or `fut2`, resulting future do not care about
## error, so you need to check `fut1` and `fut2` for error.
var retFuture = newFuture[void]("chunked.oneOf()")
proc cb(data: pointer) {.gcsafe.} =
if not(retFuture.finished()):
if cast[pointer](fut1) == data:
fut2.removeCallback(cb)
elif cast[pointer](fut2) == data:
fut1.removeCallback(cb)
retFuture.complete()
fut1.callback = cb
fut2.callback = cb
return retFuture
proc getChunkSize(buffer: openarray[byte]): uint64 =
# We using `uint64` representation, but allow only 2^32 chunk size,
# ChunkHeaderSize.
@ -87,197 +72,157 @@ proc setChunkSize(buffer: var openarray[byte], length: int64): int =
proc chunkedReadLoop(stream: AsyncStreamReader) {.async.} =
var rstream = cast[ChunkedStreamReader](stream)
var exitFut = rstream.exevent.wait()
var buffer = newSeq[byte](1024)
rstream.state = AsyncStreamState.Running
while true:
# Reading chunk size
var ruFut1 = rstream.rsource.readUntil(addr buffer[0], 1024, CRLF)
await oneOf(ruFut1, exitFut)
if exitFut.finished():
rstream.state = AsyncStreamState.Stopped
break
if ruFut1.failed():
rstream.error = ruFut1.error
rstream.state = AsyncStreamState.Error
break
let length = ruFut1.read()
var chunksize = getChunkSize(buffer.toOpenArray(0, length - len(CRLF) - 1))
if chunksize == 0xFFFF_FFFF_FFFF_FFFF'u64:
rstream.error = newProtocolError()
rstream.state = AsyncStreamState.Error
break
elif chunksize > 0'u64:
while chunksize > 0'u64:
let toRead = min(int(chunksize), rstream.buffer.bufferLen())
var reFut2 = rstream.rsource.readExactly(rstream.buffer.getBuffer(),
toRead)
await oneOf(reFut2, exitFut)
if exitFut.finished():
rstream.state = AsyncStreamState.Stopped
break
if reFut2.failed():
rstream.error = reFut2.error
rstream.state = AsyncStreamState.Error
break
rstream.buffer.update(toRead)
await oneOf(rstream.buffer.transfer(), exitFut)
if exitFut.finished():
rstream.state = AsyncStreamState.Stopped
break
chunksize = chunksize - uint64(toRead)
if rstream.state != AsyncStreamState.Running:
break
# Reading chunk trailing CRLF
var reFut3 = rstream.rsource.readExactly(addr buffer[0], 2)
await oneOf(reFut3, exitFut)
if exitFut.finished():
rstream.state = AsyncStreamState.Stopped
break
if reFut3.failed():
rstream.error = reFut3.error
try:
while true:
# Reading chunk size
var ruFut1 = awaitne rstream.rsource.readUntil(addr buffer[0], 1024, CRLF)
if ruFut1.failed():
rstream.error = ruFut1.error
rstream.state = AsyncStreamState.Error
break
if buffer[0] != CRLF[0] or buffer[1] != CRLF[1]:
let length = ruFut1.read()
var chunksize = getChunkSize(buffer.toOpenArray(0,
length - len(CRLF) - 1))
if chunksize == 0xFFFF_FFFF_FFFF_FFFF'u64:
rstream.error = newProtocolError()
rstream.state = AsyncStreamState.Error
break
else:
# Reading trailing line for last chunk
var ruFut4 = rstream.rsource.readUntil(addr buffer[0], len(buffer), CRLF)
await oneOf(ruFut4, exitFut)
elif chunksize > 0'u64:
while chunksize > 0'u64:
let toRead = min(int(chunksize), rstream.buffer.bufferLen())
var reFut2 = awaitne rstream.rsource.readExactly(
rstream.buffer.getBuffer(), toRead)
if reFut2.failed():
rstream.error = reFut2.error
rstream.state = AsyncStreamState.Error
break
if exitFut.finished():
rstream.state = AsyncStreamState.Stopped
rstream.buffer.update(toRead)
await rstream.buffer.transfer()
chunksize = chunksize - uint64(toRead)
if rstream.state != AsyncStreamState.Running:
break
# Reading chunk trailing CRLF
var reFut3 = awaitne rstream.rsource.readExactly(addr buffer[0], 2)
if reFut3.failed():
rstream.error = reFut3.error
rstream.state = AsyncStreamState.Error
break
if buffer[0] != CRLF[0] or buffer[1] != CRLF[1]:
rstream.error = newProtocolError()
rstream.state = AsyncStreamState.Error
break
else:
# Reading trailing line for last chunk
var ruFut4 = awaitne rstream.rsource.readUntil(addr buffer[0],
len(buffer), CRLF)
if ruFut4.failed():
rstream.error = ruFut4.error
rstream.state = AsyncStreamState.Error
break
rstream.state = AsyncStreamState.Finished
await rstream.buffer.transfer()
break
if ruFut4.failed():
rstream.error = ruFut4.error
rstream.state = AsyncStreamState.Error
break
rstream.state = AsyncStreamState.Finished
await oneOf(rstream.buffer.transfer(), exitFut)
if exitFut.finished():
rstream.state = AsyncStreamState.Stopped
break
if rstream.state in {AsyncStreamState.Stopped, AsyncStreamState.Error}:
# We need to notify consumer about error/close, but we do not care about
# incoming data anymore.
rstream.buffer.forget()
except CancelledError:
rstream.state = AsyncStreamState.Stopped
finally:
if rstream.state in {AsyncStreamState.Stopped, AsyncStreamState.Error}:
# We need to notify consumer about error/close, but we do not care about
# incoming data anymore.
rstream.buffer.forget()
proc chunkedWriteLoop(stream: AsyncStreamWriter) {.async.} =
var wstream = cast[ChunkedStreamWriter](stream)
var exitFut = wstream.exevent.wait()
var buffer: array[16, byte]
var wFut1, wFut2: Future[void]
var error: ref Exception
wstream.state = AsyncStreamState.Running
while true:
# Getting new item from stream's queue.
var getFut = wstream.queue.get()
await oneOf(getFut, exitFut)
if exitFut.finished():
wstream.state = AsyncStreamState.Stopped
break
var item = getFut.read()
# `item.size == 0` is marker of stream finish, while `item.size != 0` is
# data's marker.
if item.size > 0:
let length = setChunkSize(buffer, int64(item.size))
# Writing chunk header <length>CRLF.
wFut1 = wstream.wsource.write(addr buffer[0], length)
await oneOf(wFut1, exitFut)
if exitFut.finished():
wstream.state = AsyncStreamState.Stopped
try:
while true:
# Getting new item from stream's queue.
var item = await wstream.queue.get()
# `item.size == 0` is marker of stream finish, while `item.size != 0` is
# data's marker.
if item.size > 0:
let length = setChunkSize(buffer, int64(item.size))
# Writing chunk header <length>CRLF.
wFut1 = awaitne wstream.wsource.write(addr buffer[0], length)
if wFut1.failed():
error = wFut1.error
item.future.fail(error)
continue
# Writing chunk data.
if item.kind == Pointer:
wFut2 = awaitne wstream.wsource.write(item.data1, item.size)
elif item.kind == Sequence:
wFut2 = awaitne wstream.wsource.write(addr item.data2[0], item.size)
elif item.kind == String:
wFut2 = awaitne wstream.wsource.write(addr item.data3[0], item.size)
if wFut2.failed():
error = wFut2.error
item.future.fail(error)
continue
# Writing chunk footer CRLF.
var wFut3 = awaitne wstream.wsource.write(CRLF)
if wFut3.failed():
error = wFut3.error
item.future.fail(error)
continue
# Everything is fine, completing queue item's future.
item.future.complete()
else:
let length = setChunkSize(buffer, 0'i64)
# Write finish chunk `0`.
wFut1 = awaitne wstream.wsource.write(addr buffer[0], length)
if wFut1.failed():
error = wFut1.error
item.future.fail(error)
# We break here, because this is last chunk
break
# Write trailing CRLF.
wFut2 = awaitne wstream.wsource.write(CRLF)
if wFut2.failed():
error = wFut2.error
item.future.fail(error)
# We break here, because this is last chunk
break
# Everything is fine, completing queue item's future.
item.future.complete()
# Set stream state to Finished.
wstream.state = AsyncStreamState.Finished
break
if wFut1.failed():
item.future.fail(wFut1.error)
continue
# Writing chunk data.
if item.kind == Pointer:
wFut2 = wstream.wsource.write(item.data1, item.size)
elif item.kind == Sequence:
wFut2 = wstream.wsource.write(addr item.data2[0], item.size)
elif item.kind == String:
wFut2 = wstream.wsource.write(addr item.data3[0], item.size)
await oneOf(wFut2, exitFut)
if exitFut.finished():
wstream.state = AsyncStreamState.Stopped
break
if wFut2.failed():
item.future.fail(wFut2.error)
continue
# Writing chunk footer CRLF.
var wFut3 = wstream.wsource.write(CRLF)
await oneOf(wFut3, exitFut)
if exitFut.finished():
wstream.state = AsyncStreamState.Stopped
break
if wFut3.failed():
item.future.fail(wFut3.error)
continue
# Everything is fine, completing queue item's future.
item.future.complete()
else:
let length = setChunkSize(buffer, 0'i64)
# Write finish chunk `0`.
wFut1 = wstream.wsource.write(addr buffer[0], length)
await oneOf(wFut1, exitFut)
if exitFut.finished():
wstream.state = AsyncStreamState.Stopped
break
if wFut1.failed():
item.future.fail(wFut1.error)
# We break here, because this is last chunk
break
# Write trailing CRLF.
wFut2 = wstream.wsource.write(CRLF)
await oneOf(wFut2, exitFut)
if exitFut.finished():
wstream.state = AsyncStreamState.Stopped
break
if wFut2.failed():
item.future.fail(wFut2.error)
# We break here, because this is last chunk
break
# Everything is fine, completing queue item's future.
item.future.complete()
# Set stream state to Finished.
wstream.state = AsyncStreamState.Finished
break
except CancelledError:
wstream.state = AsyncStreamState.Stopped
finally:
if wstream.state == AsyncStreamState.Stopped:
while len(wstream.queue) > 0:
let item = wstream.queue.popFirstNoWait()
if not(item.future.finished()):
item.future.complete()
elif wstream.state == AsyncStreamState.Error:
while len(wstream.queue) > 0:
let item = wstream.queue.popFirstNoWait()
if not(item.future.finished()):
if not isNil(error):
item.future.fail(error)
proc init*[T](child: ChunkedStreamReader, rsource: AsyncStreamReader,
bufferSize = ChunkBufferSize, udata: ref T) =