Limit size of stream queue

Writing to streams is blocked when the maximum stream offset
is reached. Previously we extended this offset when data was
*added* to the read queue of the receiving peer. Now we only
extend it once data has been *removed* from the queue,
thereby limiting the queue size.
This commit is contained in:
Mark Spanbroek 2021-01-19 16:52:29 +01:00 committed by markspanbroek
parent 867a01a2eb
commit 087be3a1b9
3 changed files with 29 additions and 4 deletions

View File

@ -29,6 +29,12 @@ proc clearUserData(state: OpenStream) =
except Ngtcp2RecoverableError:
discard # stream already closed
proc allowMoreIncomingBytes(state: OpenStream, amount: uint64) =
let conn = state.connection.conn
checkResult conn.ngtcp2_conn_extend_max_stream_offset(state.stream.id, amount)
conn.ngtcp2_conn_extend_max_offset(amount)
state.connection.send()
method enter(state: OpenStream, stream: Stream) =
procCall enter(StreamState(state), stream)
state.stream = stream
@ -41,6 +47,7 @@ method leave(state: OpenStream) =
method read(state: OpenStream): Future[seq[byte]] {.async.} =
result = await state.incoming.get()
state.allowMoreIncomingBytes(result.len.uint64)
method write(state: OpenStream, bytes: seq[byte]): Future[void] =
state.connection.send(state.stream.id, bytes)

View File

@ -40,9 +40,6 @@ proc onReceiveStreamData(connection: ptr ngtcp2_conn,
var bytes = newSeqUninitialized[byte](datalen)
copyMem(bytes.toUnsafePtr, data, datalen)
state.receive(bytes)
checkResult:
connection.ngtcp2_conn_extend_max_stream_offset(stream_id, datalen)
connection.ngtcp2_conn_extend_max_offset(datalen)
proc installStreamCallbacks*(callbacks: var ngtcp2_conn_callbacks) =
callbacks.stream_open = onStreamOpen

View File

@ -81,7 +81,7 @@ suite "streams":
let stream = await client.openStream()
let message = repeat(42'u8, 100 * sizeof(Ngtcp2Connection.buffer))
await stream.write(message)
asyncSpawn stream.write(message)
let incoming = await server.incomingStream()
for _ in 0..<100:
@ -89,6 +89,27 @@ suite "streams":
await simulation.cancelAndWait()
test "halts sender until receiver has caught up":
let simulation = simulateNetwork(client, server)
let message = repeat(42'u8, sizeof(Ngtcp2Connection.buffer))
# send until blocked
let sender = await client.openStream()
while true:
if not await sender.write(message).withTimeout(100.milliseconds):
break
# receive until blocked
let receiver = await server.incomingStream()
while true:
if not await receiver.read().withTimeout(100.milliseconds):
break
# check that sender is unblocked
check await sender.write(message).withTimeout(100.milliseconds)
await simulation.cancelAndWait()
test "handles packet loss":
let simulation = simulateLossyNetwork(client, server)