From d2c98bd87d540ec2f2d1a7c244b60386dfda66d3 Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Fri, 15 Dec 2023 16:30:50 +0100 Subject: [PATCH] improvement(yamux): make the window size configurable (#987) Co-authored-by: Diego --- libp2p/builders.nim | 4 +-- libp2p/muxers/yamux/yamux.nim | 60 +++++++++++++++++++++++---------- tests/testyamux.nim | 62 +++++++++++++++++++++++++++++++++-- 3 files changed, 104 insertions(+), 22 deletions(-) diff --git a/libp2p/builders.nim b/libp2p/builders.nim index 63a216d46..9fd1a0693 100644 --- a/libp2p/builders.nim +++ b/libp2p/builders.nim @@ -122,8 +122,8 @@ proc withMplex*( b.muxers.add(MuxerProvider.new(newMuxer, MplexCodec)) b -proc withYamux*(b: SwitchBuilder): SwitchBuilder = - proc newMuxer(conn: Connection): Muxer = Yamux.new(conn) +proc withYamux*(b: SwitchBuilder, windowSize: int = YamuxDefaultWindowSize): SwitchBuilder = + proc newMuxer(conn: Connection): Muxer = Yamux.new(conn, windowSize) assert b.muxers.countIt(it.codec == YamuxCodec) == 0, "Yamux build multiple times" b.muxers.add(MuxerProvider.new(newMuxer, YamuxCodec)) diff --git a/libp2p/muxers/yamux/yamux.nim b/libp2p/muxers/yamux/yamux.nim index e86fc42ec..ce800110c 100644 --- a/libp2p/muxers/yamux/yamux.nim +++ b/libp2p/muxers/yamux/yamux.nim @@ -22,7 +22,8 @@ logScope: const YamuxCodec* = "/yamux/1.0.0" YamuxVersion = 0.uint8 - DefaultWindowSize = 256000 + YamuxDefaultWindowSize* = 256000 + MaxSendQueueSize = 256000 MaxChannelCount = 200 when defined(libp2p_yamux_metrics): @@ -143,6 +144,7 @@ type recvWindow: int sendWindow: int maxRecvWindow: int + maxSendQueueSize: int conn: Connection isSrc: bool opened: bool @@ -169,9 +171,14 @@ proc `$`(channel: YamuxChannel): string = if s.len > 0: result &= " {" & s.foldl(if a != "": a & ", " & b else: b, "") & "}" -proc sendQueueBytes(channel: YamuxChannel, limit: bool = false): int = - for (elem, sent, _) in channel.sendQueue: - result.inc(min(elem.len - sent, if limit: channel.maxRecvWindow div 3 else: elem.len - sent)) +proc lengthSendQueue(channel: YamuxChannel): int = + channel.sendQueue.foldl(a + b.data.len - b.sent, 0) + +proc lengthSendQueueWithLimit(channel: YamuxChannel): int = + # For leniency, limit big messages size to the third of maxSendQueueSize + # This value is arbitrary, it's not in the specs + # It permits to store up to 3 big messages if the peer is stalling. + channel.sendQueue.foldl(a + min(b.data.len - b.sent, channel.maxSendQueueSize div 3), 0) proc actuallyClose(channel: YamuxChannel) {.async.} = if channel.closedLocally and channel.sendQueue.len == 0 and @@ -284,9 +291,9 @@ proc trySend(channel: YamuxChannel) {.async.} = channel.sendQueue.keepItIf(not (it.fut.cancelled() and it.sent == 0)) if channel.sendWindow == 0: trace "send window empty" - if channel.sendQueueBytes(true) > channel.maxRecvWindow: - debug "channel send queue too big, resetting", maxSendWindow=channel.maxRecvWindow, - currentQueueSize = channel.sendQueueBytes(true) + if channel.lengthSendQueueWithLimit() > channel.maxSendQueueSize: + debug "channel send queue too big, resetting", maxSendQueueSize=channel.maxSendQueueSize, + currentQueueSize = channel.lengthSendQueueWithLimit() try: await channel.reset(true) except CatchableError as exc: @@ -294,7 +301,7 @@ proc trySend(channel: YamuxChannel) {.async.} = break let - bytesAvailable = channel.sendQueueBytes() + bytesAvailable = channel.lengthSendQueue() toSend = min(channel.sendWindow, bytesAvailable) var sendBuffer = newSeqUninitialized[byte](toSend + 12) @@ -345,7 +352,7 @@ method write*(channel: YamuxChannel, msg: seq[byte]): Future[void] = return result channel.sendQueue.add((msg, 0, result)) when defined(libp2p_yamux_metrics): - libp2p_yamux_recv_queue.observe(channel.sendQueueBytes().int64) + libp2p_yamux_recv_queue.observe(channel.lengthSendQueue().int64) asyncSpawn channel.trySend() proc open*(channel: YamuxChannel) {.async.} = @@ -353,7 +360,10 @@ proc open*(channel: YamuxChannel) {.async.} = trace "Try to open channel twice" return channel.opened = true - await channel.conn.write(YamuxHeader.data(channel.id, 0, {if channel.isSrc: Syn else: Ack})) + await channel.conn.write(YamuxHeader.windowUpdate( + channel.id, + uint32(max(channel.maxRecvWindow - YamuxDefaultWindowSize, 0)), + {if channel.isSrc: Syn else: Ack})) method getWrapped*(channel: YamuxChannel): Connection = channel.conn @@ -364,6 +374,8 @@ type currentId: uint32 isClosed: bool maxChannCount: int + windowSize: int + maxSendQueueSize: int proc lenBySrc(m: Yamux, isSrc: bool): int = for v in m.channels.values(): @@ -377,12 +389,19 @@ proc cleanupChann(m: Yamux, channel: YamuxChannel) {.async.} = if channel.isReset and channel.recvWindow > 0: m.flushed[channel.id] = channel.recvWindow -proc createStream(m: Yamux, id: uint32, isSrc: bool): YamuxChannel = +proc createStream(m: Yamux, id: uint32, isSrc: bool, + recvWindow: int, maxSendQueueSize: int): YamuxChannel = + # As you can see, during initialization, recvWindow can be larger than maxRecvWindow. + # This is because the peer we're connected to will always assume + # that the initial recvWindow is 256k. + # To solve this contradiction, no updateWindow will be sent until recvWindow is less + # than maxRecvWindow result = YamuxChannel( id: id, - maxRecvWindow: DefaultWindowSize, - recvWindow: DefaultWindowSize, - sendWindow: DefaultWindowSize, + maxRecvWindow: recvWindow, + recvWindow: if recvWindow > YamuxDefaultWindowSize: recvWindow else: YamuxDefaultWindowSize, + sendWindow: YamuxDefaultWindowSize, + maxSendQueueSize: maxSendQueueSize, isSrc: isSrc, conn: m.connection, receivedData: newAsyncEvent(), @@ -458,7 +477,7 @@ method handle*(m: Yamux) {.async.} = if header.streamId mod 2 == m.currentId mod 2: debug "Peer used our reserved stream id, skipping", id=header.streamId, currentId=m.currentId, peerId=m.connection.peerId raise newException(YamuxError, "Peer used our reserved stream id") - let newStream = m.createStream(header.streamId, false) + let newStream = m.createStream(header.streamId, false, m.windowSize, m.maxSendQueueSize) if m.channels.len >= m.maxChannCount: await newStream.reset() continue @@ -518,15 +537,20 @@ method newStream*( if m.channels.len > m.maxChannCount - 1: raise newException(TooManyChannels, "max allowed channel count exceeded") - let stream = m.createStream(m.currentId, true) + let stream = m.createStream(m.currentId, true, m.windowSize, m.maxSendQueueSize) m.currentId += 2 if not lazy: await stream.open() return stream -proc new*(T: type[Yamux], conn: Connection, maxChannCount: int = MaxChannelCount): T = +proc new*(T: type[Yamux], conn: Connection, + maxChannCount: int = MaxChannelCount, + windowSize: int = YamuxDefaultWindowSize, + maxSendQueueSize: int = MaxSendQueueSize): T = T( connection: conn, currentId: if conn.dir == Out: 1 else: 2, - maxChannCount: maxChannCount + maxChannCount: maxChannCount, + windowSize: windowSize, + maxSendQueueSize: maxSendQueueSize ) diff --git a/tests/testyamux.nim b/tests/testyamux.nim index c0c13624e..7351db1d6 100644 --- a/tests/testyamux.nim +++ b/tests/testyamux.nim @@ -22,11 +22,12 @@ suite "Yamux": teardown: checkTrackers() - template mSetup {.inject.} = + template mSetup(ws: int = YamuxDefaultWindowSize) {.inject.} = #TODO in a template to avoid threadvar let (conna {.inject.}, connb {.inject.}) = bridgedConnections() - (yamuxa {.inject.}, yamuxb {.inject.}) = (Yamux.new(conna), Yamux.new(connb)) + yamuxa {.inject.} = Yamux.new(conna, windowSize = ws) + yamuxb {.inject.} = Yamux.new(connb, windowSize = ws) (handlera, handlerb) = (yamuxa.handle(), yamuxb.handle()) defer: @@ -179,6 +180,63 @@ suite "Yamux": writerBlocker.complete() await streamA.close() + asyncTest "Increase window size": + mSetup(512000) + let readerBlocker = newFuture[void]() + yamuxb.streamHandler = proc(conn: Connection) {.async.} = + await readerBlocker + var buffer: array[260000, byte] + discard await conn.readOnce(addr buffer[0], 260000) + await conn.close() + + let streamA = await yamuxa.newStream() + check streamA == yamuxa.getStreams()[0] + + await wait(streamA.write(newSeq[byte](512000)), 1.seconds) # shouldn't block + + let secondWriter = streamA.write(newSeq[byte](10000)) + await sleepAsync(10.milliseconds) + check: not secondWriter.finished() + + readerBlocker.complete() + await wait(secondWriter, 1.seconds) + + await streamA.close() + + asyncTest "Reduce window size": + mSetup(64000) + let readerBlocker1 = newFuture[void]() + let readerBlocker2 = newFuture[void]() + yamuxb.streamHandler = proc(conn: Connection) {.async.} = + await readerBlocker1 + var buffer: array[256000, byte] + # For the first roundtrip, the send window size is assumed to be 256k + discard await conn.readOnce(addr buffer[0], 256000) + await readerBlocker2 + discard await conn.readOnce(addr buffer[0], 40000) + + await conn.close() + + let streamA = await yamuxa.newStream() + check streamA == yamuxa.getStreams()[0] + + await wait(streamA.write(newSeq[byte](256000)), 1.seconds) # shouldn't block + + let secondWriter = streamA.write(newSeq[byte](64000)) + await sleepAsync(10.milliseconds) + check: not secondWriter.finished() + + readerBlocker1.complete() + await wait(secondWriter, 1.seconds) + + let thirdWriter = streamA.write(newSeq[byte](10)) + await sleepAsync(10.milliseconds) + check: not thirdWriter.finished() + + readerBlocker2.complete() + await wait(thirdWriter, 1.seconds) + await streamA.close() + suite "Exception testing": asyncTest "Local & Remote close": mSetup()