2019-08-22 15:35:47 -06:00
|
|
|
## Nim-LibP2P
|
2019-09-24 11:48:23 -06:00
|
|
|
## Copyright (c) 2019 Status Research & Development GmbH
|
2019-08-22 15:35:47 -06:00
|
|
|
## Licensed under either of
|
|
|
|
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
|
|
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
|
|
## at your option.
|
|
|
|
## This file may not be copied, modified, or distributed except according to
|
|
|
|
## those terms.
|
|
|
|
|
2020-04-15 12:12:40 -06:00
|
|
|
# {.push raises: [].}
|
|
|
|
|
2019-09-11 20:10:38 -06:00
|
|
|
import chronos, chronicles
|
2020-04-15 12:12:40 -06:00
|
|
|
import stream
|
2019-08-22 15:35:47 -06:00
|
|
|
|
2019-09-11 20:10:38 -06:00
|
|
|
logScope:
|
|
|
|
topic = "ChronosStream"
|
|
|
|
|
2020-04-12 22:26:42 -06:00
|
|
|
const DefaultChunkSize* = 1 shl 20 # 1MB
|
|
|
|
|
2020-04-14 17:14:27 -06:00
|
|
|
type ChronosStream* = ref object of Stream
|
2019-08-22 15:35:47 -06:00
|
|
|
reader: AsyncStreamReader
|
|
|
|
writer: AsyncStreamWriter
|
|
|
|
server: StreamServer
|
|
|
|
client: StreamTransport
|
2020-04-14 22:16:36 -06:00
|
|
|
buffer: seq[byte]
|
2020-04-12 22:26:42 -06:00
|
|
|
maxChunkSize: int
|
|
|
|
|
2020-04-14 11:51:26 -06:00
|
|
|
proc init*(C: type[ChronosStream],
|
2020-04-13 12:06:37 -06:00
|
|
|
server: StreamServer,
|
|
|
|
client: StreamTransport,
|
2020-04-14 11:51:26 -06:00
|
|
|
maxChunkSize = DefaultChunkSize): C =
|
2020-04-12 22:26:42 -06:00
|
|
|
|
|
|
|
ChronosStream(server: server,
|
|
|
|
client: client,
|
|
|
|
reader: newAsyncStreamReader(client),
|
|
|
|
writer: newAsyncStreamWriter(client),
|
2020-04-14 22:16:36 -06:00
|
|
|
maxChunkSize: maxChunkSize,
|
|
|
|
buffer: newSeq[byte](maxChunkSize))
|
|
|
|
|
|
|
|
proc internalRead(c: ChronosStream): Future[seq[byte]] {.async.} =
|
2020-04-15 12:12:40 -06:00
|
|
|
var read = await c.reader.readOnce(addr c.buffer[0], c.buffer.len)
|
|
|
|
if read <= 0:
|
|
|
|
c.eof = true
|
|
|
|
return
|
|
|
|
|
|
|
|
result = c.buffer[0..<read]
|
2020-04-14 11:51:26 -06:00
|
|
|
|
2020-04-14 17:14:27 -06:00
|
|
|
method source*(c: ChronosStream): Source[seq[byte]] =
|
2020-04-14 11:51:26 -06:00
|
|
|
return iterator(): Future[seq[byte]] =
|
2020-04-15 12:12:40 -06:00
|
|
|
while not c.atEof():
|
2020-04-14 22:16:36 -06:00
|
|
|
yield c.internalRead()
|
2020-04-15 12:12:40 -06:00
|
|
|
echo c.buffer
|
2020-04-14 11:51:26 -06:00
|
|
|
|
2020-04-14 17:14:27 -06:00
|
|
|
method sink*(c: ChronosStream): Sink[seq[byte]] =
|
2020-04-14 11:51:26 -06:00
|
|
|
return proc(i: Source[seq[byte]]) {.async.} =
|
|
|
|
for chunk in i:
|
|
|
|
if c.closed:
|
|
|
|
break
|
|
|
|
|
2020-04-14 18:02:39 -06:00
|
|
|
# sadly `await c.writer.write((await chunk))` breaks
|
2020-04-14 17:14:27 -06:00
|
|
|
var cchunk = await chunk
|
|
|
|
await c.writer.write(cchunk)
|
2020-04-12 22:26:42 -06:00
|
|
|
|
2020-04-15 12:12:40 -06:00
|
|
|
proc close*(c: ChronosStream) {.async.} =
|
2020-04-13 12:06:37 -06:00
|
|
|
if not c.closed:
|
|
|
|
trace "shutting chronos stream", address = $c.client.remoteAddress()
|
|
|
|
if not c.writer.closed():
|
|
|
|
await c.writer.closeWait()
|
2020-04-12 22:26:42 -06:00
|
|
|
|
2020-04-13 12:06:37 -06:00
|
|
|
if not c.reader.closed():
|
|
|
|
await c.reader.closeWait()
|
2020-04-12 22:26:42 -06:00
|
|
|
|
2020-04-13 12:06:37 -06:00
|
|
|
if not c.client.closed():
|
|
|
|
await c.client.closeWait()
|
2020-04-12 22:26:42 -06:00
|
|
|
|
2020-04-15 12:19:45 -06:00
|
|
|
c.closed = true
|
2020-04-13 12:06:37 -06:00
|
|
|
|
2020-04-15 12:12:40 -06:00
|
|
|
proc atEof*(c: ChronosStream): bool =
|
|
|
|
c.eof
|
|
|
|
|
|
|
|
# {.pop.}
|