improvement(yamux): make the window size configurable (#987)

Co-authored-by: Diego <diego@status.im>
This commit is contained in:
Ludovic Chenut 2023-12-15 16:30:50 +01:00 committed by GitHub
parent 3011ba4326
commit d2c98bd87d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 104 additions and 22 deletions

View File

@ -122,8 +122,8 @@ proc withMplex*(
b.muxers.add(MuxerProvider.new(newMuxer, MplexCodec))
b
proc withYamux*(b: SwitchBuilder): SwitchBuilder =
proc newMuxer(conn: Connection): Muxer = Yamux.new(conn)
proc withYamux*(b: SwitchBuilder, windowSize: int = YamuxDefaultWindowSize): SwitchBuilder =
proc newMuxer(conn: Connection): Muxer = Yamux.new(conn, windowSize)
assert b.muxers.countIt(it.codec == YamuxCodec) == 0, "Yamux build multiple times"
b.muxers.add(MuxerProvider.new(newMuxer, YamuxCodec))

View File

@ -22,7 +22,8 @@ logScope:
const
YamuxCodec* = "/yamux/1.0.0"
YamuxVersion = 0.uint8
DefaultWindowSize = 256000
YamuxDefaultWindowSize* = 256000
MaxSendQueueSize = 256000
MaxChannelCount = 200
when defined(libp2p_yamux_metrics):
@ -143,6 +144,7 @@ type
recvWindow: int
sendWindow: int
maxRecvWindow: int
maxSendQueueSize: int
conn: Connection
isSrc: bool
opened: bool
@ -169,9 +171,14 @@ proc `$`(channel: YamuxChannel): string =
if s.len > 0:
result &= " {" & s.foldl(if a != "": a & ", " & b else: b, "") & "}"
proc sendQueueBytes(channel: YamuxChannel, limit: bool = false): int =
for (elem, sent, _) in channel.sendQueue:
result.inc(min(elem.len - sent, if limit: channel.maxRecvWindow div 3 else: elem.len - sent))
proc lengthSendQueue(channel: YamuxChannel): int =
channel.sendQueue.foldl(a + b.data.len - b.sent, 0)
proc lengthSendQueueWithLimit(channel: YamuxChannel): int =
# For leniency, limit big messages size to the third of maxSendQueueSize
# This value is arbitrary, it's not in the specs
# It permits to store up to 3 big messages if the peer is stalling.
channel.sendQueue.foldl(a + min(b.data.len - b.sent, channel.maxSendQueueSize div 3), 0)
proc actuallyClose(channel: YamuxChannel) {.async.} =
if channel.closedLocally and channel.sendQueue.len == 0 and
@ -284,9 +291,9 @@ proc trySend(channel: YamuxChannel) {.async.} =
channel.sendQueue.keepItIf(not (it.fut.cancelled() and it.sent == 0))
if channel.sendWindow == 0:
trace "send window empty"
if channel.sendQueueBytes(true) > channel.maxRecvWindow:
debug "channel send queue too big, resetting", maxSendWindow=channel.maxRecvWindow,
currentQueueSize = channel.sendQueueBytes(true)
if channel.lengthSendQueueWithLimit() > channel.maxSendQueueSize:
debug "channel send queue too big, resetting", maxSendQueueSize=channel.maxSendQueueSize,
currentQueueSize = channel.lengthSendQueueWithLimit()
try:
await channel.reset(true)
except CatchableError as exc:
@ -294,7 +301,7 @@ proc trySend(channel: YamuxChannel) {.async.} =
break
let
bytesAvailable = channel.sendQueueBytes()
bytesAvailable = channel.lengthSendQueue()
toSend = min(channel.sendWindow, bytesAvailable)
var
sendBuffer = newSeqUninitialized[byte](toSend + 12)
@ -345,7 +352,7 @@ method write*(channel: YamuxChannel, msg: seq[byte]): Future[void] =
return result
channel.sendQueue.add((msg, 0, result))
when defined(libp2p_yamux_metrics):
libp2p_yamux_recv_queue.observe(channel.sendQueueBytes().int64)
libp2p_yamux_recv_queue.observe(channel.lengthSendQueue().int64)
asyncSpawn channel.trySend()
proc open*(channel: YamuxChannel) {.async.} =
@ -353,7 +360,10 @@ proc open*(channel: YamuxChannel) {.async.} =
trace "Try to open channel twice"
return
channel.opened = true
await channel.conn.write(YamuxHeader.data(channel.id, 0, {if channel.isSrc: Syn else: Ack}))
await channel.conn.write(YamuxHeader.windowUpdate(
channel.id,
uint32(max(channel.maxRecvWindow - YamuxDefaultWindowSize, 0)),
{if channel.isSrc: Syn else: Ack}))
method getWrapped*(channel: YamuxChannel): Connection = channel.conn
@ -364,6 +374,8 @@ type
currentId: uint32
isClosed: bool
maxChannCount: int
windowSize: int
maxSendQueueSize: int
proc lenBySrc(m: Yamux, isSrc: bool): int =
for v in m.channels.values():
@ -377,12 +389,19 @@ proc cleanupChann(m: Yamux, channel: YamuxChannel) {.async.} =
if channel.isReset and channel.recvWindow > 0:
m.flushed[channel.id] = channel.recvWindow
proc createStream(m: Yamux, id: uint32, isSrc: bool): YamuxChannel =
proc createStream(m: Yamux, id: uint32, isSrc: bool,
recvWindow: int, maxSendQueueSize: int): YamuxChannel =
# As you can see, during initialization, recvWindow can be larger than maxRecvWindow.
# This is because the peer we're connected to will always assume
# that the initial recvWindow is 256k.
# To solve this contradiction, no updateWindow will be sent until recvWindow is less
# than maxRecvWindow
result = YamuxChannel(
id: id,
maxRecvWindow: DefaultWindowSize,
recvWindow: DefaultWindowSize,
sendWindow: DefaultWindowSize,
maxRecvWindow: recvWindow,
recvWindow: if recvWindow > YamuxDefaultWindowSize: recvWindow else: YamuxDefaultWindowSize,
sendWindow: YamuxDefaultWindowSize,
maxSendQueueSize: maxSendQueueSize,
isSrc: isSrc,
conn: m.connection,
receivedData: newAsyncEvent(),
@ -458,7 +477,7 @@ method handle*(m: Yamux) {.async.} =
if header.streamId mod 2 == m.currentId mod 2:
debug "Peer used our reserved stream id, skipping", id=header.streamId, currentId=m.currentId, peerId=m.connection.peerId
raise newException(YamuxError, "Peer used our reserved stream id")
let newStream = m.createStream(header.streamId, false)
let newStream = m.createStream(header.streamId, false, m.windowSize, m.maxSendQueueSize)
if m.channels.len >= m.maxChannCount:
await newStream.reset()
continue
@ -518,15 +537,20 @@ method newStream*(
if m.channels.len > m.maxChannCount - 1:
raise newException(TooManyChannels, "max allowed channel count exceeded")
let stream = m.createStream(m.currentId, true)
let stream = m.createStream(m.currentId, true, m.windowSize, m.maxSendQueueSize)
m.currentId += 2
if not lazy:
await stream.open()
return stream
proc new*(T: type[Yamux], conn: Connection, maxChannCount: int = MaxChannelCount): T =
proc new*(T: type[Yamux], conn: Connection,
maxChannCount: int = MaxChannelCount,
windowSize: int = YamuxDefaultWindowSize,
maxSendQueueSize: int = MaxSendQueueSize): T =
T(
connection: conn,
currentId: if conn.dir == Out: 1 else: 2,
maxChannCount: maxChannCount
maxChannCount: maxChannCount,
windowSize: windowSize,
maxSendQueueSize: maxSendQueueSize
)

View File

@ -22,11 +22,12 @@ suite "Yamux":
teardown:
checkTrackers()
template mSetup {.inject.} =
template mSetup(ws: int = YamuxDefaultWindowSize) {.inject.} =
#TODO in a template to avoid threadvar
let
(conna {.inject.}, connb {.inject.}) = bridgedConnections()
(yamuxa {.inject.}, yamuxb {.inject.}) = (Yamux.new(conna), Yamux.new(connb))
yamuxa {.inject.} = Yamux.new(conna, windowSize = ws)
yamuxb {.inject.} = Yamux.new(connb, windowSize = ws)
(handlera, handlerb) = (yamuxa.handle(), yamuxb.handle())
defer:
@ -179,6 +180,63 @@ suite "Yamux":
writerBlocker.complete()
await streamA.close()
asyncTest "Increase window size":
mSetup(512000)
let readerBlocker = newFuture[void]()
yamuxb.streamHandler = proc(conn: Connection) {.async.} =
await readerBlocker
var buffer: array[260000, byte]
discard await conn.readOnce(addr buffer[0], 260000)
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
await wait(streamA.write(newSeq[byte](512000)), 1.seconds) # shouldn't block
let secondWriter = streamA.write(newSeq[byte](10000))
await sleepAsync(10.milliseconds)
check: not secondWriter.finished()
readerBlocker.complete()
await wait(secondWriter, 1.seconds)
await streamA.close()
asyncTest "Reduce window size":
mSetup(64000)
let readerBlocker1 = newFuture[void]()
let readerBlocker2 = newFuture[void]()
yamuxb.streamHandler = proc(conn: Connection) {.async.} =
await readerBlocker1
var buffer: array[256000, byte]
# For the first roundtrip, the send window size is assumed to be 256k
discard await conn.readOnce(addr buffer[0], 256000)
await readerBlocker2
discard await conn.readOnce(addr buffer[0], 40000)
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
await wait(streamA.write(newSeq[byte](256000)), 1.seconds) # shouldn't block
let secondWriter = streamA.write(newSeq[byte](64000))
await sleepAsync(10.milliseconds)
check: not secondWriter.finished()
readerBlocker1.complete()
await wait(secondWriter, 1.seconds)
let thirdWriter = streamA.write(newSeq[byte](10))
await sleepAsync(10.milliseconds)
check: not thirdWriter.finished()
readerBlocker2.complete()
await wait(thirdWriter, 1.seconds)
await streamA.close()
suite "Exception testing":
asyncTest "Local & Remote close":
mSetup()