2019-09-01 21:51:39 +00:00
|
|
|
## Nim-LibP2P
|
2019-09-24 17:48:23 +00:00
|
|
|
## Copyright (c) 2019 Status Research & Development GmbH
|
2019-09-01 21:51:39 +00:00
|
|
|
## Licensed under either of
|
|
|
|
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
|
|
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
|
|
## at your option.
|
|
|
|
## This file may not be copied, modified, or distributed except according to
|
|
|
|
## those terms.
|
|
|
|
|
2020-01-07 08:02:37 +00:00
|
|
|
## This module implements an asynchronous buffer stream
|
2019-12-04 04:44:54 +00:00
|
|
|
## which emulates physical async IO.
|
2019-09-02 04:42:29 +00:00
|
|
|
##
|
2020-01-07 08:02:37 +00:00
|
|
|
## The stream is based on the standard library's `Deque`,
|
|
|
|
## which is itself based on a ring buffer.
|
2019-09-02 04:42:29 +00:00
|
|
|
##
|
2020-01-07 08:02:37 +00:00
|
|
|
## It works by exposing a regular LPStream interface and
|
|
|
|
## a method ``pushTo`` to push data to the internal read
|
2020-06-27 17:33:34 +00:00
|
|
|
## buffer; as well as a handler that can be registered
|
2020-01-07 08:02:37 +00:00
|
|
|
## that gets triggered on every write to the stream. This
|
|
|
|
## allows using the buffered stream as a sort of proxy,
|
|
|
|
## which can be consumed as a regular LPStream but allows
|
|
|
|
## injecting data for reads and intercepting writes.
|
|
|
|
##
|
|
|
|
## Another notable feature is that the stream is fully
|
|
|
|
## ordered and asynchronous. Reads are queued up in order
|
2019-09-02 04:42:29 +00:00
|
|
|
## and are suspended when not enough data available. This
|
2020-01-07 08:02:37 +00:00
|
|
|
## allows preserving backpressure while maintaining full
|
2020-06-27 17:33:34 +00:00
|
|
|
## asynchrony. Both writing to the internal buffer with
|
2020-01-07 08:02:37 +00:00
|
|
|
## ``pushTo`` as well as reading with ``read*` methods,
|
|
|
|
## will suspend until either the amount of elements in the
|
2019-09-03 20:41:38 +00:00
|
|
|
## buffer goes below ``maxSize`` or more data becomes available.
|
2019-09-02 04:42:29 +00:00
|
|
|
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
import std/strformat
|
|
|
|
import stew/byteutils
|
2020-04-14 13:27:07 +00:00
|
|
|
import chronos, chronicles, metrics
|
2020-06-19 17:29:43 +00:00
|
|
|
import ../stream/connection
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
import ./streamseq
|
2019-09-01 21:51:39 +00:00
|
|
|
|
2020-05-23 17:08:39 +00:00
|
|
|
when chronicles.enabledLogLevel == LogLevel.TRACE:
|
|
|
|
import oids
|
|
|
|
|
2020-06-19 17:29:43 +00:00
|
|
|
export connection
|
2020-05-06 16:31:47 +00:00
|
|
|
|
2020-05-20 00:14:15 +00:00
|
|
|
logScope:
|
2020-06-10 08:48:01 +00:00
|
|
|
topics = "bufferstream"
|
2019-09-06 21:28:54 +00:00
|
|
|
|
2020-05-20 00:14:15 +00:00
|
|
|
const
|
|
|
|
BufferStreamTrackerName* = "libp2p.bufferstream"
|
2019-12-04 04:44:54 +00:00
|
|
|
|
2020-05-20 00:14:15 +00:00
|
|
|
type
|
2020-04-21 01:24:42 +00:00
|
|
|
BufferStreamTracker* = ref object of TrackerBase
|
|
|
|
opened*: uint64
|
|
|
|
closed*: uint64
|
|
|
|
|
|
|
|
proc setupBufferStreamTracker(): BufferStreamTracker {.gcsafe.}
|
|
|
|
|
|
|
|
proc getBufferStreamTracker(): BufferStreamTracker {.gcsafe.} =
|
|
|
|
result = cast[BufferStreamTracker](getTracker(BufferStreamTrackerName))
|
|
|
|
if isNil(result):
|
|
|
|
result = setupBufferStreamTracker()
|
|
|
|
|
|
|
|
proc dumpTracking(): string {.gcsafe.} =
|
|
|
|
var tracker = getBufferStreamTracker()
|
|
|
|
result = "Opened buffers: " & $tracker.opened & "\n" &
|
|
|
|
"Closed buffers: " & $tracker.closed
|
|
|
|
|
|
|
|
proc leakTransport(): bool {.gcsafe.} =
|
|
|
|
var tracker = getBufferStreamTracker()
|
|
|
|
result = (tracker.opened != tracker.closed)
|
|
|
|
|
|
|
|
proc setupBufferStreamTracker(): BufferStreamTracker =
|
|
|
|
result = new BufferStreamTracker
|
|
|
|
result.opened = 0
|
|
|
|
result.closed = 0
|
|
|
|
result.dump = dumpTracking
|
|
|
|
result.isLeaked = leakTransport
|
|
|
|
addTracker(BufferStreamTrackerName, result)
|
2020-05-20 00:14:15 +00:00
|
|
|
|
|
|
|
type
|
2020-06-19 17:29:43 +00:00
|
|
|
BufferStream* = ref object of Connection
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
readQueue*: AsyncQueue[seq[byte]] # read queue for managing backpressure
|
2020-09-21 17:48:19 +00:00
|
|
|
readBuf*: StreamSeq # overflow buffer for readOnce
|
|
|
|
pushing*: int # number of ongoing push operations
|
|
|
|
|
|
|
|
pushedEof*: bool
|
2019-12-04 04:44:54 +00:00
|
|
|
|
2020-09-06 08:31:47 +00:00
|
|
|
func shortLog*(s: BufferStream): auto =
|
|
|
|
if s.isNil: "BufferStream(nil)"
|
|
|
|
elif s.peerInfo.isNil: $s.oid
|
|
|
|
else: &"{shortLog(s.peerInfo.peerId)}:{s.oid}"
|
|
|
|
chronicles.formatIt(BufferStream): shortLog(it)
|
|
|
|
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
proc len*(s: BufferStream): int =
|
|
|
|
s.readBuf.len + (if s.readQueue.len > 0: s.readQueue[0].len() else: 0)
|
2020-05-23 17:08:39 +00:00
|
|
|
|
2020-06-19 17:29:43 +00:00
|
|
|
method initStream*(s: BufferStream) =
|
|
|
|
if s.objName.len == 0:
|
|
|
|
s.objName = "BufferStream"
|
2020-05-23 17:08:39 +00:00
|
|
|
|
2020-06-19 17:29:43 +00:00
|
|
|
procCall Connection(s).initStream()
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
|
|
|
|
s.readQueue = newAsyncQueue[seq[byte]](1)
|
|
|
|
|
|
|
|
trace "BufferStream created", s
|
2020-05-23 17:08:39 +00:00
|
|
|
inc getBufferStreamTracker().opened
|
2019-09-01 21:51:39 +00:00
|
|
|
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
proc newBufferStream*(timeout: Duration = DefaultConnectionTimeout): BufferStream =
|
2019-09-01 21:51:39 +00:00
|
|
|
new result
|
2020-08-10 22:17:11 +00:00
|
|
|
result.timeout = timeout
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
result.initStream()
|
2019-09-01 21:51:39 +00:00
|
|
|
|
2020-09-21 17:48:19 +00:00
|
|
|
method pushData*(s: BufferStream, data: seq[byte]) {.base, async.} =
|
2020-01-07 08:02:37 +00:00
|
|
|
## Write bytes to internal read buffer, use this to fill up the
|
2019-09-01 21:51:39 +00:00
|
|
|
## buffer with data.
|
|
|
|
##
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
## `pushTo` will block if the queue is full, thus maintaining backpressure.
|
2019-12-04 04:44:54 +00:00
|
|
|
##
|
2020-09-21 17:48:19 +00:00
|
|
|
if s.isClosed or s.pushedEof:
|
2020-05-20 00:14:15 +00:00
|
|
|
raise newLPStreamEOFError()
|
2020-04-14 13:21:16 +00:00
|
|
|
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
if data.len == 0:
|
|
|
|
return # Don't push 0-length buffers, these signal EOF
|
2020-07-12 16:37:10 +00:00
|
|
|
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
# We will block here if there is already data queued, until it has been
|
|
|
|
# processed
|
2020-09-21 17:48:19 +00:00
|
|
|
inc s.pushing
|
|
|
|
try:
|
|
|
|
trace "Pushing data", s, data = data.len
|
|
|
|
await s.readQueue.addLast(data)
|
|
|
|
finally:
|
|
|
|
dec s.pushing
|
|
|
|
|
|
|
|
method pushEof*(s: BufferStream) {.base, async.} =
|
|
|
|
if s.pushedEof:
|
|
|
|
return
|
|
|
|
s.pushedEof = true
|
|
|
|
|
|
|
|
# We will block here if there is already data queued, until it has been
|
|
|
|
# processed
|
|
|
|
inc s.pushing
|
|
|
|
try:
|
|
|
|
trace "Pushing EOF", s
|
|
|
|
await s.readQueue.addLast(@[])
|
|
|
|
finally:
|
|
|
|
dec s.pushing
|
2020-07-12 16:37:10 +00:00
|
|
|
|
2019-09-04 06:40:53 +00:00
|
|
|
method readOnce*(s: BufferStream,
|
|
|
|
pbytes: pointer,
|
|
|
|
nbytes: int):
|
2019-12-04 04:44:54 +00:00
|
|
|
Future[int] {.async.} =
|
2020-09-16 09:55:25 +00:00
|
|
|
doAssert(nbytes > 0, "nbytes must be positive integer")
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
if s.isEof and s.readBuf.len() == 0:
|
2020-05-20 00:14:15 +00:00
|
|
|
raise newLPStreamEOFError()
|
|
|
|
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
var
|
|
|
|
p = cast[ptr UncheckedArray[byte]](pbytes)
|
2019-12-04 04:44:54 +00:00
|
|
|
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
# First consume leftovers from previous read
|
|
|
|
var rbytes = s.readBuf.consumeTo(toOpenArray(p, 0, nbytes - 1))
|
2020-08-04 13:22:05 +00:00
|
|
|
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
if rbytes < nbytes:
|
|
|
|
# There's space in the buffer - consume some data from the read queue
|
|
|
|
trace "popping readQueue", s, rbytes, nbytes
|
|
|
|
let buf = await s.readQueue.popFirst()
|
2020-06-27 17:33:34 +00:00
|
|
|
|
2020-09-21 17:48:19 +00:00
|
|
|
if buf.len == 0 or s.isEof: # Another task might have set EOF!
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
# No more data will arrive on read queue
|
|
|
|
s.isEof = true
|
|
|
|
else:
|
|
|
|
let remaining = min(buf.len, nbytes - rbytes)
|
|
|
|
toOpenArray(p, rbytes, nbytes - 1)[0..<remaining] =
|
|
|
|
buf.toOpenArray(0, remaining - 1)
|
|
|
|
rbytes += remaining
|
2019-09-01 21:51:39 +00:00
|
|
|
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
if remaining < buf.len:
|
|
|
|
trace "add leftovers", s, len = buf.len - remaining
|
|
|
|
s.readBuf.add(buf.toOpenArray(remaining, buf.high))
|
2020-05-20 00:14:15 +00:00
|
|
|
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
if s.isEof and s.readBuf.len() == 0:
|
|
|
|
# We can clear the readBuf memory since it won't be used any more
|
|
|
|
s.readBuf = StreamSeq()
|
2020-05-20 00:14:15 +00:00
|
|
|
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
s.activity = true
|
2020-03-04 19:45:14 +00:00
|
|
|
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
return rbytes
|
2019-12-04 04:44:54 +00:00
|
|
|
|
2020-09-21 17:48:19 +00:00
|
|
|
method closeImpl*(s: BufferStream): Future[void] =
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
## close the stream and clear the buffer
|
2020-09-21 17:48:19 +00:00
|
|
|
trace "Closing BufferStream", s, len = s.len
|
2019-12-04 04:44:54 +00:00
|
|
|
|
2020-09-21 17:48:19 +00:00
|
|
|
if not s.pushedEof: # Potentially wake up reader
|
|
|
|
asyncSpawn s.pushEof()
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
|
|
|
|
inc getBufferStreamTracker().closed
|
|
|
|
trace "Closed BufferStream", s
|
2020-09-21 17:48:19 +00:00
|
|
|
|
|
|
|
procCall Connection(s).closeImpl() # noraises, nocancels
|