mirror of
https://github.com/status-im/nim-chronos.git
synced 2025-01-31 13:35:11 +00:00
Merge pull request #54 from status-im/astreamcancel
Change AsyncStream close procedure from events to cancellation.
This commit is contained in:
commit
ae128b0f65
@ -38,6 +38,7 @@ type
|
|||||||
of String:
|
of String:
|
||||||
data3*: string
|
data3*: string
|
||||||
size*: int
|
size*: int
|
||||||
|
offset*: int
|
||||||
future*: Future[void]
|
future*: Future[void]
|
||||||
|
|
||||||
AsyncStreamState* = enum
|
AsyncStreamState* = enum
|
||||||
@ -57,7 +58,6 @@ type
|
|||||||
tsource*: StreamTransport
|
tsource*: StreamTransport
|
||||||
readerLoop*: StreamReaderLoop
|
readerLoop*: StreamReaderLoop
|
||||||
state*: AsyncStreamState
|
state*: AsyncStreamState
|
||||||
exevent*: AsyncEvent
|
|
||||||
buffer*: AsyncBuffer
|
buffer*: AsyncBuffer
|
||||||
udata: pointer
|
udata: pointer
|
||||||
error*: ref Exception
|
error*: ref Exception
|
||||||
@ -68,7 +68,6 @@ type
|
|||||||
tsource*: StreamTransport
|
tsource*: StreamTransport
|
||||||
writerLoop*: StreamWriterLoop
|
writerLoop*: StreamWriterLoop
|
||||||
state*: AsyncStreamState
|
state*: AsyncStreamState
|
||||||
exevent*: AsyncEvent
|
|
||||||
queue*: AsyncQueue[WriteItem]
|
queue*: AsyncQueue[WriteItem]
|
||||||
udata: pointer
|
udata: pointer
|
||||||
future: Future[void]
|
future: Future[void]
|
||||||
@ -730,17 +729,23 @@ proc close*(rw: AsyncStreamRW) =
|
|||||||
untrackAsyncStreamWriter(rw)
|
untrackAsyncStreamWriter(rw)
|
||||||
|
|
||||||
when rw is AsyncStreamReader:
|
when rw is AsyncStreamReader:
|
||||||
if isNil(rw.rsource) or isNil(rw.readerLoop):
|
if isNil(rw.rsource) or isNil(rw.readerLoop) or isNil(rw.future):
|
||||||
callSoon(continuation)
|
callSoon(continuation)
|
||||||
else:
|
else:
|
||||||
rw.exevent.fire()
|
if rw.future.finished():
|
||||||
rw.future.addCallback(continuation)
|
callSoon(continuation)
|
||||||
|
else:
|
||||||
|
rw.future.cancel()
|
||||||
|
rw.future.addCallback(continuation)
|
||||||
elif rw is AsyncStreamWriter:
|
elif rw is AsyncStreamWriter:
|
||||||
if isNil(rw.wsource) or isNil(rw.writerLoop):
|
if isNil(rw.wsource) or isNil(rw.writerLoop) or isNil(rw.future):
|
||||||
callSoon(continuation)
|
callSoon(continuation)
|
||||||
else:
|
else:
|
||||||
rw.exevent.fire()
|
if rw.future.finished():
|
||||||
rw.future.addCallback(continuation)
|
callSoon(continuation)
|
||||||
|
else:
|
||||||
|
rw.future.cancel()
|
||||||
|
rw.future.addCallback(continuation)
|
||||||
|
|
||||||
proc closeWait*(rw: AsyncStreamRW): Future[void] =
|
proc closeWait*(rw: AsyncStreamRW): Future[void] =
|
||||||
## Close and frees resources of stream ``rw``.
|
## Close and frees resources of stream ``rw``.
|
||||||
@ -768,7 +773,6 @@ proc init*(child, wsource: AsyncStreamWriter, loop: StreamWriterLoop,
|
|||||||
child.writerLoop = loop
|
child.writerLoop = loop
|
||||||
child.wsource = wsource
|
child.wsource = wsource
|
||||||
child.tsource = wsource.tsource
|
child.tsource = wsource.tsource
|
||||||
child.exevent = newAsyncEvent()
|
|
||||||
child.queue = newAsyncQueue[WriteItem](queueSize)
|
child.queue = newAsyncQueue[WriteItem](queueSize)
|
||||||
trackAsyncStreamWriter(child)
|
trackAsyncStreamWriter(child)
|
||||||
child.startWriter()
|
child.startWriter()
|
||||||
@ -780,7 +784,6 @@ proc init*[T](child, wsource: AsyncStreamWriter, loop: StreamWriterLoop,
|
|||||||
child.writerLoop = loop
|
child.writerLoop = loop
|
||||||
child.wsource = wsource
|
child.wsource = wsource
|
||||||
child.tsource = wsource.tsource
|
child.tsource = wsource.tsource
|
||||||
child.exevent = newAsyncEvent()
|
|
||||||
child.queue = newAsyncQueue[WriteItem](queueSize)
|
child.queue = newAsyncQueue[WriteItem](queueSize)
|
||||||
if not isNil(udata):
|
if not isNil(udata):
|
||||||
GC_ref(udata)
|
GC_ref(udata)
|
||||||
@ -795,7 +798,6 @@ proc init*(child, rsource: AsyncStreamReader, loop: StreamReaderLoop,
|
|||||||
child.readerLoop = loop
|
child.readerLoop = loop
|
||||||
child.rsource = rsource
|
child.rsource = rsource
|
||||||
child.tsource = rsource.tsource
|
child.tsource = rsource.tsource
|
||||||
child.exevent = newAsyncEvent()
|
|
||||||
child.buffer = AsyncBuffer.init(bufferSize)
|
child.buffer = AsyncBuffer.init(bufferSize)
|
||||||
trackAsyncStreamReader(child)
|
trackAsyncStreamReader(child)
|
||||||
child.startReader()
|
child.startReader()
|
||||||
@ -808,7 +810,6 @@ proc init*[T](child, rsource: AsyncStreamReader, loop: StreamReaderLoop,
|
|||||||
child.readerLoop = loop
|
child.readerLoop = loop
|
||||||
child.rsource = rsource
|
child.rsource = rsource
|
||||||
child.tsource = rsource.tsource
|
child.tsource = rsource.tsource
|
||||||
child.exevent = newAsyncEvent()
|
|
||||||
child.buffer = AsyncBuffer.init(bufferSize)
|
child.buffer = AsyncBuffer.init(bufferSize)
|
||||||
if not isNil(udata):
|
if not isNil(udata):
|
||||||
GC_ref(udata)
|
GC_ref(udata)
|
||||||
|
@ -27,21 +27,6 @@ type
|
|||||||
proc newProtocolError(): ref Exception {.inline.} =
|
proc newProtocolError(): ref Exception {.inline.} =
|
||||||
newException(ChunkedStreamProtocolError, "Protocol error!")
|
newException(ChunkedStreamProtocolError, "Protocol error!")
|
||||||
|
|
||||||
proc oneOf*[A, B](fut1: Future[A], fut2: Future[B]): Future[void] =
|
|
||||||
## Wait for completion of `fut1` or `fut2`, resulting future do not care about
|
|
||||||
## error, so you need to check `fut1` and `fut2` for error.
|
|
||||||
var retFuture = newFuture[void]("chunked.oneOf()")
|
|
||||||
proc cb(data: pointer) {.gcsafe.} =
|
|
||||||
if not(retFuture.finished()):
|
|
||||||
if cast[pointer](fut1) == data:
|
|
||||||
fut2.removeCallback(cb)
|
|
||||||
elif cast[pointer](fut2) == data:
|
|
||||||
fut1.removeCallback(cb)
|
|
||||||
retFuture.complete()
|
|
||||||
fut1.callback = cb
|
|
||||||
fut2.callback = cb
|
|
||||||
return retFuture
|
|
||||||
|
|
||||||
proc getChunkSize(buffer: openarray[byte]): uint64 =
|
proc getChunkSize(buffer: openarray[byte]): uint64 =
|
||||||
# We using `uint64` representation, but allow only 2^32 chunk size,
|
# We using `uint64` representation, but allow only 2^32 chunk size,
|
||||||
# ChunkHeaderSize.
|
# ChunkHeaderSize.
|
||||||
@ -87,197 +72,157 @@ proc setChunkSize(buffer: var openarray[byte], length: int64): int =
|
|||||||
|
|
||||||
proc chunkedReadLoop(stream: AsyncStreamReader) {.async.} =
|
proc chunkedReadLoop(stream: AsyncStreamReader) {.async.} =
|
||||||
var rstream = cast[ChunkedStreamReader](stream)
|
var rstream = cast[ChunkedStreamReader](stream)
|
||||||
var exitFut = rstream.exevent.wait()
|
|
||||||
var buffer = newSeq[byte](1024)
|
var buffer = newSeq[byte](1024)
|
||||||
|
rstream.state = AsyncStreamState.Running
|
||||||
|
|
||||||
while true:
|
try:
|
||||||
# Reading chunk size
|
while true:
|
||||||
var ruFut1 = rstream.rsource.readUntil(addr buffer[0], 1024, CRLF)
|
# Reading chunk size
|
||||||
await oneOf(ruFut1, exitFut)
|
var ruFut1 = awaitne rstream.rsource.readUntil(addr buffer[0], 1024, CRLF)
|
||||||
|
if ruFut1.failed():
|
||||||
if exitFut.finished():
|
rstream.error = ruFut1.error
|
||||||
rstream.state = AsyncStreamState.Stopped
|
|
||||||
break
|
|
||||||
|
|
||||||
if ruFut1.failed():
|
|
||||||
rstream.error = ruFut1.error
|
|
||||||
rstream.state = AsyncStreamState.Error
|
|
||||||
break
|
|
||||||
|
|
||||||
let length = ruFut1.read()
|
|
||||||
var chunksize = getChunkSize(buffer.toOpenArray(0, length - len(CRLF) - 1))
|
|
||||||
|
|
||||||
if chunksize == 0xFFFF_FFFF_FFFF_FFFF'u64:
|
|
||||||
rstream.error = newProtocolError()
|
|
||||||
rstream.state = AsyncStreamState.Error
|
|
||||||
break
|
|
||||||
elif chunksize > 0'u64:
|
|
||||||
while chunksize > 0'u64:
|
|
||||||
let toRead = min(int(chunksize), rstream.buffer.bufferLen())
|
|
||||||
var reFut2 = rstream.rsource.readExactly(rstream.buffer.getBuffer(),
|
|
||||||
toRead)
|
|
||||||
await oneOf(reFut2, exitFut)
|
|
||||||
|
|
||||||
if exitFut.finished():
|
|
||||||
rstream.state = AsyncStreamState.Stopped
|
|
||||||
break
|
|
||||||
|
|
||||||
if reFut2.failed():
|
|
||||||
rstream.error = reFut2.error
|
|
||||||
rstream.state = AsyncStreamState.Error
|
|
||||||
break
|
|
||||||
|
|
||||||
rstream.buffer.update(toRead)
|
|
||||||
|
|
||||||
await oneOf(rstream.buffer.transfer(), exitFut)
|
|
||||||
|
|
||||||
if exitFut.finished():
|
|
||||||
rstream.state = AsyncStreamState.Stopped
|
|
||||||
break
|
|
||||||
|
|
||||||
chunksize = chunksize - uint64(toRead)
|
|
||||||
|
|
||||||
if rstream.state != AsyncStreamState.Running:
|
|
||||||
break
|
|
||||||
|
|
||||||
# Reading chunk trailing CRLF
|
|
||||||
var reFut3 = rstream.rsource.readExactly(addr buffer[0], 2)
|
|
||||||
await oneOf(reFut3, exitFut)
|
|
||||||
if exitFut.finished():
|
|
||||||
rstream.state = AsyncStreamState.Stopped
|
|
||||||
break
|
|
||||||
|
|
||||||
if reFut3.failed():
|
|
||||||
rstream.error = reFut3.error
|
|
||||||
rstream.state = AsyncStreamState.Error
|
rstream.state = AsyncStreamState.Error
|
||||||
break
|
break
|
||||||
|
|
||||||
if buffer[0] != CRLF[0] or buffer[1] != CRLF[1]:
|
let length = ruFut1.read()
|
||||||
|
var chunksize = getChunkSize(buffer.toOpenArray(0,
|
||||||
|
length - len(CRLF) - 1))
|
||||||
|
|
||||||
|
if chunksize == 0xFFFF_FFFF_FFFF_FFFF'u64:
|
||||||
rstream.error = newProtocolError()
|
rstream.error = newProtocolError()
|
||||||
rstream.state = AsyncStreamState.Error
|
rstream.state = AsyncStreamState.Error
|
||||||
break
|
break
|
||||||
else:
|
elif chunksize > 0'u64:
|
||||||
# Reading trailing line for last chunk
|
while chunksize > 0'u64:
|
||||||
var ruFut4 = rstream.rsource.readUntil(addr buffer[0], len(buffer), CRLF)
|
let toRead = min(int(chunksize), rstream.buffer.bufferLen())
|
||||||
await oneOf(ruFut4, exitFut)
|
var reFut2 = awaitne rstream.rsource.readExactly(
|
||||||
|
rstream.buffer.getBuffer(), toRead)
|
||||||
|
if reFut2.failed():
|
||||||
|
rstream.error = reFut2.error
|
||||||
|
rstream.state = AsyncStreamState.Error
|
||||||
|
break
|
||||||
|
|
||||||
if exitFut.finished():
|
rstream.buffer.update(toRead)
|
||||||
rstream.state = AsyncStreamState.Stopped
|
await rstream.buffer.transfer()
|
||||||
|
chunksize = chunksize - uint64(toRead)
|
||||||
|
|
||||||
|
if rstream.state != AsyncStreamState.Running:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Reading chunk trailing CRLF
|
||||||
|
var reFut3 = awaitne rstream.rsource.readExactly(addr buffer[0], 2)
|
||||||
|
if reFut3.failed():
|
||||||
|
rstream.error = reFut3.error
|
||||||
|
rstream.state = AsyncStreamState.Error
|
||||||
|
break
|
||||||
|
|
||||||
|
if buffer[0] != CRLF[0] or buffer[1] != CRLF[1]:
|
||||||
|
rstream.error = newProtocolError()
|
||||||
|
rstream.state = AsyncStreamState.Error
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
# Reading trailing line for last chunk
|
||||||
|
var ruFut4 = awaitne rstream.rsource.readUntil(addr buffer[0],
|
||||||
|
len(buffer), CRLF)
|
||||||
|
if ruFut4.failed():
|
||||||
|
rstream.error = ruFut4.error
|
||||||
|
rstream.state = AsyncStreamState.Error
|
||||||
|
break
|
||||||
|
|
||||||
|
rstream.state = AsyncStreamState.Finished
|
||||||
|
await rstream.buffer.transfer()
|
||||||
break
|
break
|
||||||
|
|
||||||
if ruFut4.failed():
|
except CancelledError:
|
||||||
rstream.error = ruFut4.error
|
rstream.state = AsyncStreamState.Stopped
|
||||||
rstream.state = AsyncStreamState.Error
|
finally:
|
||||||
break
|
if rstream.state in {AsyncStreamState.Stopped, AsyncStreamState.Error}:
|
||||||
|
# We need to notify consumer about error/close, but we do not care about
|
||||||
rstream.state = AsyncStreamState.Finished
|
# incoming data anymore.
|
||||||
|
rstream.buffer.forget()
|
||||||
await oneOf(rstream.buffer.transfer(), exitFut)
|
|
||||||
|
|
||||||
if exitFut.finished():
|
|
||||||
rstream.state = AsyncStreamState.Stopped
|
|
||||||
break
|
|
||||||
|
|
||||||
if rstream.state in {AsyncStreamState.Stopped, AsyncStreamState.Error}:
|
|
||||||
# We need to notify consumer about error/close, but we do not care about
|
|
||||||
# incoming data anymore.
|
|
||||||
rstream.buffer.forget()
|
|
||||||
|
|
||||||
proc chunkedWriteLoop(stream: AsyncStreamWriter) {.async.} =
|
proc chunkedWriteLoop(stream: AsyncStreamWriter) {.async.} =
|
||||||
var wstream = cast[ChunkedStreamWriter](stream)
|
var wstream = cast[ChunkedStreamWriter](stream)
|
||||||
var exitFut = wstream.exevent.wait()
|
|
||||||
var buffer: array[16, byte]
|
var buffer: array[16, byte]
|
||||||
var wFut1, wFut2: Future[void]
|
var wFut1, wFut2: Future[void]
|
||||||
|
var error: ref Exception
|
||||||
wstream.state = AsyncStreamState.Running
|
wstream.state = AsyncStreamState.Running
|
||||||
while true:
|
|
||||||
# Getting new item from stream's queue.
|
|
||||||
var getFut = wstream.queue.get()
|
|
||||||
await oneOf(getFut, exitFut)
|
|
||||||
if exitFut.finished():
|
|
||||||
wstream.state = AsyncStreamState.Stopped
|
|
||||||
break
|
|
||||||
var item = getFut.read()
|
|
||||||
# `item.size == 0` is marker of stream finish, while `item.size != 0` is
|
|
||||||
# data's marker.
|
|
||||||
if item.size > 0:
|
|
||||||
let length = setChunkSize(buffer, int64(item.size))
|
|
||||||
# Writing chunk header <length>CRLF.
|
|
||||||
wFut1 = wstream.wsource.write(addr buffer[0], length)
|
|
||||||
await oneOf(wFut1, exitFut)
|
|
||||||
|
|
||||||
if exitFut.finished():
|
try:
|
||||||
wstream.state = AsyncStreamState.Stopped
|
while true:
|
||||||
|
# Getting new item from stream's queue.
|
||||||
|
var item = await wstream.queue.get()
|
||||||
|
# `item.size == 0` is marker of stream finish, while `item.size != 0` is
|
||||||
|
# data's marker.
|
||||||
|
if item.size > 0:
|
||||||
|
let length = setChunkSize(buffer, int64(item.size))
|
||||||
|
# Writing chunk header <length>CRLF.
|
||||||
|
wFut1 = awaitne wstream.wsource.write(addr buffer[0], length)
|
||||||
|
if wFut1.failed():
|
||||||
|
error = wFut1.error
|
||||||
|
item.future.fail(error)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Writing chunk data.
|
||||||
|
if item.kind == Pointer:
|
||||||
|
wFut2 = awaitne wstream.wsource.write(item.data1, item.size)
|
||||||
|
elif item.kind == Sequence:
|
||||||
|
wFut2 = awaitne wstream.wsource.write(addr item.data2[0], item.size)
|
||||||
|
elif item.kind == String:
|
||||||
|
wFut2 = awaitne wstream.wsource.write(addr item.data3[0], item.size)
|
||||||
|
if wFut2.failed():
|
||||||
|
error = wFut2.error
|
||||||
|
item.future.fail(error)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Writing chunk footer CRLF.
|
||||||
|
var wFut3 = awaitne wstream.wsource.write(CRLF)
|
||||||
|
if wFut3.failed():
|
||||||
|
error = wFut3.error
|
||||||
|
item.future.fail(error)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Everything is fine, completing queue item's future.
|
||||||
|
item.future.complete()
|
||||||
|
else:
|
||||||
|
let length = setChunkSize(buffer, 0'i64)
|
||||||
|
|
||||||
|
# Write finish chunk `0`.
|
||||||
|
wFut1 = awaitne wstream.wsource.write(addr buffer[0], length)
|
||||||
|
if wFut1.failed():
|
||||||
|
error = wFut1.error
|
||||||
|
item.future.fail(error)
|
||||||
|
# We break here, because this is last chunk
|
||||||
|
break
|
||||||
|
|
||||||
|
# Write trailing CRLF.
|
||||||
|
wFut2 = awaitne wstream.wsource.write(CRLF)
|
||||||
|
if wFut2.failed():
|
||||||
|
error = wFut2.error
|
||||||
|
item.future.fail(error)
|
||||||
|
# We break here, because this is last chunk
|
||||||
|
break
|
||||||
|
|
||||||
|
# Everything is fine, completing queue item's future.
|
||||||
|
item.future.complete()
|
||||||
|
|
||||||
|
# Set stream state to Finished.
|
||||||
|
wstream.state = AsyncStreamState.Finished
|
||||||
break
|
break
|
||||||
|
except CancelledError:
|
||||||
if wFut1.failed():
|
wstream.state = AsyncStreamState.Stopped
|
||||||
item.future.fail(wFut1.error)
|
finally:
|
||||||
continue
|
if wstream.state == AsyncStreamState.Stopped:
|
||||||
|
while len(wstream.queue) > 0:
|
||||||
# Writing chunk data.
|
let item = wstream.queue.popFirstNoWait()
|
||||||
if item.kind == Pointer:
|
if not(item.future.finished()):
|
||||||
wFut2 = wstream.wsource.write(item.data1, item.size)
|
item.future.complete()
|
||||||
elif item.kind == Sequence:
|
elif wstream.state == AsyncStreamState.Error:
|
||||||
wFut2 = wstream.wsource.write(addr item.data2[0], item.size)
|
while len(wstream.queue) > 0:
|
||||||
elif item.kind == String:
|
let item = wstream.queue.popFirstNoWait()
|
||||||
wFut2 = wstream.wsource.write(addr item.data3[0], item.size)
|
if not(item.future.finished()):
|
||||||
|
if not isNil(error):
|
||||||
await oneOf(wFut2, exitFut)
|
item.future.fail(error)
|
||||||
if exitFut.finished():
|
|
||||||
wstream.state = AsyncStreamState.Stopped
|
|
||||||
break
|
|
||||||
|
|
||||||
if wFut2.failed():
|
|
||||||
item.future.fail(wFut2.error)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Writing chunk footer CRLF.
|
|
||||||
var wFut3 = wstream.wsource.write(CRLF)
|
|
||||||
await oneOf(wFut3, exitFut)
|
|
||||||
if exitFut.finished():
|
|
||||||
wstream.state = AsyncStreamState.Stopped
|
|
||||||
break
|
|
||||||
|
|
||||||
if wFut3.failed():
|
|
||||||
item.future.fail(wFut3.error)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Everything is fine, completing queue item's future.
|
|
||||||
item.future.complete()
|
|
||||||
else:
|
|
||||||
let length = setChunkSize(buffer, 0'i64)
|
|
||||||
|
|
||||||
# Write finish chunk `0`.
|
|
||||||
wFut1 = wstream.wsource.write(addr buffer[0], length)
|
|
||||||
await oneOf(wFut1, exitFut)
|
|
||||||
if exitFut.finished():
|
|
||||||
wstream.state = AsyncStreamState.Stopped
|
|
||||||
break
|
|
||||||
|
|
||||||
if wFut1.failed():
|
|
||||||
item.future.fail(wFut1.error)
|
|
||||||
# We break here, because this is last chunk
|
|
||||||
break
|
|
||||||
|
|
||||||
# Write trailing CRLF.
|
|
||||||
wFut2 = wstream.wsource.write(CRLF)
|
|
||||||
await oneOf(wFut2, exitFut)
|
|
||||||
|
|
||||||
if exitFut.finished():
|
|
||||||
wstream.state = AsyncStreamState.Stopped
|
|
||||||
break
|
|
||||||
|
|
||||||
if wFut2.failed():
|
|
||||||
item.future.fail(wFut2.error)
|
|
||||||
# We break here, because this is last chunk
|
|
||||||
break
|
|
||||||
|
|
||||||
# Everything is fine, completing queue item's future.
|
|
||||||
item.future.complete()
|
|
||||||
|
|
||||||
# Set stream state to Finished.
|
|
||||||
wstream.state = AsyncStreamState.Finished
|
|
||||||
break
|
|
||||||
|
|
||||||
proc init*[T](child: ChunkedStreamReader, rsource: AsyncStreamReader,
|
proc init*[T](child: ChunkedStreamReader, rsource: AsyncStreamReader,
|
||||||
bufferSize = ChunkBufferSize, udata: ref T) =
|
bufferSize = ChunkBufferSize, udata: ref T) =
|
||||||
|
Loading…
x
Reference in New Issue
Block a user