diff --git a/chronos/streams/asyncstream.nim b/chronos/streams/asyncstream.nim index ade48f4..c1a1554 100644 --- a/chronos/streams/asyncstream.nim +++ b/chronos/streams/asyncstream.nim @@ -141,6 +141,24 @@ proc copyData*(sb: AsyncBuffer, dest: pointer, offset, length: int) {.inline.} = copyMem(cast[pointer](cast[uint](dest) + cast[uint](offset)), unsafeAddr sb.buffer[0], length) +proc upload*(sb: ptr AsyncBuffer, pbytes: ptr byte, + nbytes: int): Future[void] {.async.} = + var length = nbytes + while length > 0: + let size = min(length, sb[].bufferLen()) + if size == 0: + # Internal buffer is full, we need to transfer data to consumer. + await sb[].transfer() + continue + else: + copyMem(addr sb[].buffer[sb.offset], pbytes, size) + sb[].offset = sb[].offset + size + length = length - size + + if length == 0: + # We notify consumers that new data is available. + sb[].forget() + template toDataOpenArray*(sb: AsyncBuffer): auto = toOpenArray(sb.buffer, 0, sb.offset - 1) diff --git a/chronos/streams/tlsstream.nim b/chronos/streams/tlsstream.nim index cbd5223..c25cbc2 100644 --- a/chronos/streams/tlsstream.nim +++ b/chronos/streams/tlsstream.nim @@ -11,7 +11,7 @@ import bearssl, bearssl/cacert import ../asyncloop, ../timer, ../asyncsync import asyncstream, ../transports/stream, ../transports/common -import strutils +import strutils, hexdump type TLSStreamKind {.pure.} = enum @@ -80,19 +80,19 @@ template newTlsStreamProtocolError[T](message: T): ref Exception = # proc raiseTlsStreamProtoError*[T](message: T) = # raise newTlsStreamProtocolError(message) -# proc getStringState*(state: cuint): string = -# var n = newSeq[string]() -# if (state and SSL_CLOSED) == SSL_CLOSED: -# n.add("Closed") -# if (state and SSL_SENDREC) == SSL_SENDREC: -# n.add("SendRec") -# if (state and SSL_RECVREC) == SSL_RECVREC: -# n.add("RecvRec") -# if (state and SSL_SENDAPP) == SSL_SENDAPP: -# n.add("SendApp") -# if (state and SSL_RECVAPP) == SSL_RECVAPP: -# n.add("RecvApp") -# result = "{" & n.join(", ") & "} number (" & $state & ")" +proc getStringState*(state: cuint): string = + var n = newSeq[string]() + if (state and SSL_CLOSED) == SSL_CLOSED: + n.add("Closed") + if (state and SSL_SENDREC) == SSL_SENDREC: + n.add("SendRec") + if (state and SSL_RECVREC) == SSL_RECVREC: + n.add("RecvRec") + if (state and SSL_SENDAPP) == SSL_SENDAPP: + n.add("SendApp") + if (state and SSL_RECVAPP) == SSL_RECVAPP: + n.add("RecvApp") + result = "{" & n.join(", ") & "} number (" & $state & ")" proc tlsWriteLoop(stream: AsyncStreamWriter) {.async.} = var wstream = cast[TlsStreamWriter](stream) @@ -110,15 +110,18 @@ proc tlsWriteLoop(stream: AsyncStreamWriter) {.async.} = var length: uint while true: var state = engine.sslEngineCurrentState() + echo "tlsWriteLoop() state = ", getStringState(state) if (state and SSL_CLOSED) == SSL_CLOSED: wstream.state = AsyncStreamState.Finished break if (state and (SSL_RECVREC or SSL_RECVAPP)) != 0: + echo "tlsWriteLoop() signal to tlsReadLoop()" wstream.switchToReader.fire() if (state and (SSL_SENDREC or SSL_SENDAPP)) == 0: + echo "tlsWriteLoop() waiting" await wstream.switchToWriter.wait() wstream.switchToWriter.clear() # We need to refresh `state` because we just returned from readerLoop. @@ -138,7 +141,9 @@ proc tlsWriteLoop(stream: AsyncStreamWriter) {.async.} = if (state and SSL_SENDAPP) == SSL_SENDAPP: # Application data can be sent over stream. + echo "tlsWriteLoop() waiting for an item" var item = await wstream.queue.get() + echo "tlsWriteLoop() obtained an item" if item.size > 0: length = 0'u var buf = sslEngineSendappBuf(engine, length) @@ -204,6 +209,8 @@ proc tlsReadLoop(stream: AsyncStreamReader) {.async.} = var length: uint while true: var state = engine.sslEngineCurrentState() + echo "tlsReadLoop() state = ", getStringState(state) + if (state and SSL_CLOSED) == SSL_CLOSED: let err = engine.sslEngineLastError() if err != 0: @@ -215,9 +222,11 @@ proc tlsReadLoop(stream: AsyncStreamReader) {.async.} = break if (state and (SSL_SENDREC or SSL_SENDAPP)) != 0: + echo "tlsReadLoop() signal to tlsWriteLoop()" rstream.switchToWriter.fire() if (state and (SSL_RECVREC or SSL_RECVAPP)) == 0: + echo "tlsReadLoop() waiting" await rstream.switchToReader.wait() rstream.switchToReader.clear() # We need to refresh `state` because we just returned from writerLoop. @@ -227,7 +236,9 @@ proc tlsReadLoop(stream: AsyncStreamReader) {.async.} = # TLS records required for further processing length = 0'u var buf = sslEngineRecvrecBuf(engine, length) + echo "tlsReadLoop() reading" var resFut = awaitne rstream.rsource.readOnce(buf, int(length)) + echo "tlsReadLoop() read completed" if resFut.failed(): rstream.error = resFut.error rstream.state = AsyncStreamState.Error @@ -244,11 +255,11 @@ proc tlsReadLoop(stream: AsyncStreamReader) {.async.} = # Application data can be recovered. length = 0'u var buf = sslEngineRecvappBuf(engine, length) - let toRead = min(int(length), rstream.buffer.bufferLen()) - copyMem(rstream.buffer.getBuffer(), buf, toRead) - rstream.buffer.update(toRead) - sslEngineRecvappAck(engine, uint(toRead)) - await rstream.buffer.transfer() + echo "tlsReadLoop(SSL_RECVAPP) received ", length, " bytes" + await upload(addr rstream.buffer, buf, int(length)) + echo dumpHex(buf, int(length)) + echo "tlsReadLoop(SSL_RECVAPP) uploaded ", length, " bytes to buffer" + sslEngineRecvappAck(engine, length) continue except CancelledError: