From c8bb598e665c4a40aca303a71344f64b03ae9581 Mon Sep 17 00:00:00 2001 From: Tanguy Date: Tue, 21 Mar 2023 15:59:57 +0100 Subject: [PATCH] Yamux automatic window scaling --- libp2p/muxers/yamux/yamux.nim | 13 ++++++++++--- tests/testyamux.nim | 21 +++++++++++++++++++++ 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/libp2p/muxers/yamux/yamux.nim b/libp2p/muxers/yamux/yamux.nim index 8d94b719d..aeaac19ff 100644 --- a/libp2p/muxers/yamux/yamux.nim +++ b/libp2p/muxers/yamux/yamux.nim @@ -146,6 +146,7 @@ type recvWindow: int sendWindow: int maxRecvWindow: int + automaticWindowScaling: bool conn: Connection isSrc: bool opened: bool @@ -232,6 +233,10 @@ proc updateRecvWindow(channel: YamuxChannel) {.async.} = )) trace "increasing the recvWindow", delta +proc setMaxRecvWindow*(channel: YamuxChannel, maxRecvWindow: int) = + channel.automaticWindowScaling = false + channel.maxRecvWindow = maxRecvWindow + method readOnce*( channel: YamuxChannel, pbytes: pointer, @@ -247,6 +252,7 @@ method readOnce*( newLPStreamConnDownError() if channel.returnedEof: raise newLPStreamRemoteClosedError() + if channel.recvQueue.len == 0: channel.receivedData.clear() await channel.closedRemotely or channel.receivedData.wait() @@ -260,6 +266,9 @@ method readOnce*( toOpenArray(p, 0, nbytes - 1)[0.. channel.maxRecvWindow and channel.automaticWindowScaling: + channel.maxRecvWindow = nbytes + # We made some room in the recv buffer let the peer know await channel.updateRecvWindow() channel.activity = true @@ -273,9 +282,6 @@ proc gotDataFromRemote(channel: YamuxChannel, b: seq[byte]) {.async.} = libp2p_yamux_recv_queue.observe(channel.recvQueue.len.int64) await channel.updateRecvWindow() -proc setMaxRecvWindow*(channel: YamuxChannel, maxRecvWindow: int) = - channel.maxRecvWindow = maxRecvWindow - proc trySend(channel: YamuxChannel) {.async.} = if channel.isSending: return @@ -384,6 +390,7 @@ proc createStream(m: Yamux, id: uint32, isSrc: bool): YamuxChannel = maxRecvWindow: DefaultWindowSize, recvWindow: DefaultWindowSize, sendWindow: DefaultWindowSize, + automaticWindowScaling: true, isSrc: isSrc, conn: m.connection, receivedData: newAsyncEvent(), diff --git a/tests/testyamux.nim b/tests/testyamux.nim index b9a4cf590..1d82440c3 100644 --- a/tests/testyamux.nim +++ b/tests/testyamux.nim @@ -134,6 +134,27 @@ suite "Yamux": # 1 for initial exhaustion + (142 / 20) = 9 check numberOfRead == 9 + asyncTest "Automatic window size": + mSetup() + + let writerBlocker = newFuture[void]() + var numberOfRead = 0 + yamuxb.streamHandler = proc(conn: Connection) {.async.} = + var buffer: array[512000, byte] + while (await conn.readOnce(addr buffer[0], 512000)) > 0: + numberOfRead.inc() + writerBlocker.complete() + await conn.close() + let streamA = await yamuxa.newStream() + # Need to exhaust initial window first + await wait(streamA.write(newSeq[byte](256000)), 1.seconds) # shouldn't block + await streamA.write(newSeq[byte](256000 * 5)) + await streamA.close() + + await writerBlocker + + check numberOfRead == 4 + asyncTest "Saturate until reset": mSetup() let writerBlocker = newFuture[void]()