Yamux automatic window scaling

This commit is contained in:
Tanguy 2023-03-21 15:59:57 +01:00
parent 8d5ea43e2b
commit c8bb598e66
No known key found for this signature in database
GPG Key ID: 7DD8EC6B6CE6C45E
2 changed files with 31 additions and 3 deletions

View File

@ -146,6 +146,7 @@ type
recvWindow: int
sendWindow: int
maxRecvWindow: int
automaticWindowScaling: bool
conn: Connection
isSrc: bool
opened: bool
@ -232,6 +233,10 @@ proc updateRecvWindow(channel: YamuxChannel) {.async.} =
))
trace "increasing the recvWindow", delta
proc setMaxRecvWindow*(channel: YamuxChannel, maxRecvWindow: int) =
channel.automaticWindowScaling = false
channel.maxRecvWindow = maxRecvWindow
method readOnce*(
channel: YamuxChannel,
pbytes: pointer,
@ -247,6 +252,7 @@ method readOnce*(
newLPStreamConnDownError()
if channel.returnedEof:
raise newLPStreamRemoteClosedError()
if channel.recvQueue.len == 0:
channel.receivedData.clear()
await channel.closedRemotely or channel.receivedData.wait()
@ -260,6 +266,9 @@ method readOnce*(
toOpenArray(p, 0, nbytes - 1)[0..<toRead] = channel.recvQueue.toOpenArray(0, toRead - 1)
channel.recvQueue = channel.recvQueue[toRead..^1]
if nbytes > channel.maxRecvWindow and channel.automaticWindowScaling:
channel.maxRecvWindow = nbytes
# We made some room in the recv buffer let the peer know
await channel.updateRecvWindow()
channel.activity = true
@ -273,9 +282,6 @@ proc gotDataFromRemote(channel: YamuxChannel, b: seq[byte]) {.async.} =
libp2p_yamux_recv_queue.observe(channel.recvQueue.len.int64)
await channel.updateRecvWindow()
proc setMaxRecvWindow*(channel: YamuxChannel, maxRecvWindow: int) =
channel.maxRecvWindow = maxRecvWindow
proc trySend(channel: YamuxChannel) {.async.} =
if channel.isSending:
return
@ -384,6 +390,7 @@ proc createStream(m: Yamux, id: uint32, isSrc: bool): YamuxChannel =
maxRecvWindow: DefaultWindowSize,
recvWindow: DefaultWindowSize,
sendWindow: DefaultWindowSize,
automaticWindowScaling: true,
isSrc: isSrc,
conn: m.connection,
receivedData: newAsyncEvent(),

View File

@ -134,6 +134,27 @@ suite "Yamux":
# 1 for initial exhaustion + (142 / 20) = 9
check numberOfRead == 9
asyncTest "Automatic window size":
mSetup()
let writerBlocker = newFuture[void]()
var numberOfRead = 0
yamuxb.streamHandler = proc(conn: Connection) {.async.} =
var buffer: array[512000, byte]
while (await conn.readOnce(addr buffer[0], 512000)) > 0:
numberOfRead.inc()
writerBlocker.complete()
await conn.close()
let streamA = await yamuxa.newStream()
# Need to exhaust initial window first
await wait(streamA.write(newSeq[byte](256000)), 1.seconds) # shouldn't block
await streamA.write(newSeq[byte](256000 * 5))
await streamA.close()
await writerBlocker
check numberOfRead == 4
asyncTest "Saturate until reset":
mSetup()
let writerBlocker = newFuture[void]()