docs: add comments and improve yamux readability (#1006)

This commit is contained in:
Ludovic Chenut 2024-02-02 15:14:02 +01:00 committed by GitHub
parent 9bc5ec1566
commit eb0890cd6f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 35 additions and 14 deletions

View File

@ -172,12 +172,16 @@ proc `$`(channel: YamuxChannel): string =
result &= " {" & s.foldl(if a != "": a & ", " & b else: b, "") & "}" result &= " {" & s.foldl(if a != "": a & ", " & b else: b, "") & "}"
proc lengthSendQueue(channel: YamuxChannel): int = proc lengthSendQueue(channel: YamuxChannel): int =
## Returns the length of what remains to be sent
##
channel.sendQueue.foldl(a + b.data.len - b.sent, 0) channel.sendQueue.foldl(a + b.data.len - b.sent, 0)
proc lengthSendQueueWithLimit(channel: YamuxChannel): int = proc lengthSendQueueWithLimit(channel: YamuxChannel): int =
## Returns the length of what remains to be sent, but limit the size of big messages.
##
# For leniency, limit big messages size to the third of maxSendQueueSize # For leniency, limit big messages size to the third of maxSendQueueSize
# This value is arbitrary, it's not in the specs # This value is arbitrary, it's not in the specs, it permits to store up to
# It permits to store up to 3 big messages if the peer is stalling. # 3 big messages if the peer is stalling.
channel.sendQueue.foldl(a + min(b.data.len - b.sent, channel.maxSendQueueSize div 3), 0) channel.sendQueue.foldl(a + min(b.data.len - b.sent, channel.maxSendQueueSize div 3), 0)
proc actuallyClose(channel: YamuxChannel) {.async.} = proc actuallyClose(channel: YamuxChannel) {.async.} =
@ -200,6 +204,9 @@ method closeImpl*(channel: YamuxChannel) {.async.} =
await channel.actuallyClose() await channel.actuallyClose()
proc reset(channel: YamuxChannel, isLocal: bool = false) {.async.} = proc reset(channel: YamuxChannel, isLocal: bool = false) {.async.} =
# If we reset locally, we want to flush up to a maximum of recvWindow
# bytes. It's because the peer we're connected to can send us data before
# it receives the reset.
if channel.isReset: if channel.isReset:
return return
trace "Reset channel" trace "Reset channel"
@ -220,11 +227,14 @@ proc reset(channel: YamuxChannel, isLocal: bool = false) {.async.} =
await channel.remoteClosed() await channel.remoteClosed()
channel.receivedData.fire() channel.receivedData.fire()
if not isLocal: if not isLocal:
# If we reset locally, we want to flush up to a maximum of recvWindow # If the reset is remote, there's no reason to flush anything.
# bytes. We use the recvWindow in the proc cleanupChann.
channel.recvWindow = 0 channel.recvWindow = 0
proc updateRecvWindow(channel: YamuxChannel) {.async.} = proc updateRecvWindow(channel: YamuxChannel) {.async.} =
## Send to the peer a window update when the recvWindow is empty enough
##
# In order to avoid spamming a window update everytime a byte is read,
# we send it everytime half of the maxRecvWindow is read.
let inWindow = channel.recvWindow + channel.recvQueue.len let inWindow = channel.recvWindow + channel.recvQueue.len
if inWindow > channel.maxRecvWindow div 2: if inWindow > channel.maxRecvWindow div 2:
return return
@ -242,6 +252,7 @@ method readOnce*(
pbytes: pointer, pbytes: pointer,
nbytes: int): nbytes: int):
Future[int] {.async.} = Future[int] {.async.} =
## Read from a yamux channel
if channel.isReset: if channel.isReset:
raise if channel.remoteReset: raise if channel.remoteReset:
@ -287,17 +298,18 @@ proc trySend(channel: YamuxChannel) {.async.} =
return return
channel.isSending = true channel.isSending = true
defer: channel.isSending = false defer: channel.isSending = false
while channel.sendQueue.len != 0: while channel.sendQueue.len != 0:
channel.sendQueue.keepItIf(not (it.fut.cancelled() and it.sent == 0)) channel.sendQueue.keepItIf(not (it.fut.cancelled() and it.sent == 0))
if channel.sendWindow == 0: if channel.sendWindow == 0:
trace "send window empty" trace "trying to send while the sendWindow is empty"
if channel.lengthSendQueueWithLimit() > channel.maxSendQueueSize: if channel.lengthSendQueueWithLimit() > channel.maxSendQueueSize:
debug "channel send queue too big, resetting", maxSendQueueSize=channel.maxSendQueueSize, trace "channel send queue too big, resetting", maxSendQueueSize=channel.maxSendQueueSize,
currentQueueSize = channel.lengthSendQueueWithLimit() currentQueueSize = channel.lengthSendQueueWithLimit()
try: try:
await channel.reset(true) await channel.reset(true)
except CatchableError as exc: except CatchableError as exc:
debug "failed to reset", msg=exc.msg warn "failed to reset", msg=exc.msg
break break
let let
@ -316,20 +328,24 @@ proc trySend(channel: YamuxChannel) {.async.} =
var futures: seq[Future[void]] var futures: seq[Future[void]]
while inBuffer < toSend: while inBuffer < toSend:
# concatenate the different message we try to send into one buffer
let (data, sent, fut) = channel.sendQueue[0] let (data, sent, fut) = channel.sendQueue[0]
let bufferToSend = min(data.len - sent, toSend - inBuffer) let bufferToSend = min(data.len - sent, toSend - inBuffer)
sendBuffer.toOpenArray(12, 12 + toSend - 1)[inBuffer..<(inBuffer+bufferToSend)] = sendBuffer.toOpenArray(12, 12 + toSend - 1)[inBuffer..<(inBuffer+bufferToSend)] =
channel.sendQueue[0].data.toOpenArray(sent, sent + bufferToSend - 1) channel.sendQueue[0].data.toOpenArray(sent, sent + bufferToSend - 1)
channel.sendQueue[0].sent.inc(bufferToSend) channel.sendQueue[0].sent.inc(bufferToSend)
if channel.sendQueue[0].sent >= data.len: if channel.sendQueue[0].sent >= data.len:
# if every byte of the message is in the buffer, add the write future to the
# sequence of futures to be completed (or failed) when the buffer is sent
futures.add(fut) futures.add(fut)
channel.sendQueue.delete(0) channel.sendQueue.delete(0)
inBuffer.inc(bufferToSend) inBuffer.inc(bufferToSend)
trace "build send buffer", h = $header, msg=string.fromBytes(sendBuffer[12..^1]) trace "try to send the buffer", h = $header
channel.sendWindow.dec(toSend) channel.sendWindow.dec(toSend)
try: await channel.conn.write(sendBuffer) try: await channel.conn.write(sendBuffer)
except CatchableError as exc: except CatchableError as exc:
trace "failed to send the buffer"
let connDown = newLPStreamConnDownError(exc) let connDown = newLPStreamConnDownError(exc)
for fut in futures.items(): for fut in futures.items():
fut.fail(connDown) fut.fail(connDown)
@ -340,6 +356,8 @@ proc trySend(channel: YamuxChannel) {.async.} =
channel.activity = true channel.activity = true
method write*(channel: YamuxChannel, msg: seq[byte]): Future[void] = method write*(channel: YamuxChannel, msg: seq[byte]): Future[void] =
## Write to yamux channel
##
result = newFuture[void]("Yamux Send") result = newFuture[void]("Yamux Send")
if channel.remoteReset: if channel.remoteReset:
result.fail(newLPStreamResetError()) result.fail(newLPStreamResetError())
@ -355,7 +373,9 @@ method write*(channel: YamuxChannel, msg: seq[byte]): Future[void] =
libp2p_yamux_recv_queue.observe(channel.lengthSendQueue().int64) libp2p_yamux_recv_queue.observe(channel.lengthSendQueue().int64)
asyncSpawn channel.trySend() asyncSpawn channel.trySend()
proc open*(channel: YamuxChannel) {.async.} = proc open(channel: YamuxChannel) {.async.} =
## Open a yamux channel by sending a window update with Syn or Ack flag
##
if channel.opened: if channel.opened:
trace "Try to open channel twice" trace "Try to open channel twice"
return return
@ -381,7 +401,7 @@ proc lenBySrc(m: Yamux, isSrc: bool): int =
for v in m.channels.values(): for v in m.channels.values():
if v.isSrc == isSrc: result += 1 if v.isSrc == isSrc: result += 1
proc cleanupChann(m: Yamux, channel: YamuxChannel) {.async.} = proc cleanupChannel(m: Yamux, channel: YamuxChannel) {.async.} =
await channel.join() await channel.join()
m.channels.del(channel.id) m.channels.del(channel.id)
when defined(libp2p_yamux_metrics): when defined(libp2p_yamux_metrics):
@ -419,7 +439,7 @@ proc createStream(m: Yamux, id: uint32, isSrc: bool,
when defined(libp2p_agents_metrics): when defined(libp2p_agents_metrics):
result.shortAgent = m.connection.shortAgent result.shortAgent = m.connection.shortAgent
m.channels[id] = result m.channels[id] = result
asyncSpawn m.cleanupChann(result) asyncSpawn m.cleanupChannel(result)
trace "created channel", id, pid=m.connection.peerId trace "created channel", id, pid=m.connection.peerId
when defined(libp2p_yamux_metrics): when defined(libp2p_yamux_metrics):
libp2p_yamux_channels.set(m.lenBySrc(isSrc).int64, [$isSrc, $result.peerId]) libp2p_yamux_channels.set(m.lenBySrc(isSrc).int64, [$isSrc, $result.peerId])
@ -440,7 +460,7 @@ method close*(m: Yamux) {.async.} =
trace "Closed yamux" trace "Closed yamux"
proc handleStream(m: Yamux, channel: YamuxChannel) {.async.} = proc handleStream(m: Yamux, channel: YamuxChannel) {.async.} =
## call the muxer stream handler for this channel ## Call the muxer stream handler for this channel
## ##
try: try:
await m.streamHandler(channel) await m.streamHandler(channel)
@ -474,6 +494,7 @@ method handle*(m: Yamux) {.async.} =
else: else:
if header.streamId in m.flushed: if header.streamId in m.flushed:
m.flushed.del(header.streamId) m.flushed.del(header.streamId)
if header.streamId mod 2 == m.currentId mod 2: if header.streamId mod 2 == m.currentId mod 2:
debug "Peer used our reserved stream id, skipping", id=header.streamId, currentId=m.currentId, peerId=m.connection.peerId debug "Peer used our reserved stream id, skipping", id=header.streamId, currentId=m.currentId, peerId=m.connection.peerId
raise newException(YamuxError, "Peer used our reserved stream id") raise newException(YamuxError, "Peer used our reserved stream id")

View File

@ -36,8 +36,8 @@ suite "Yamux":
yamuxa.close(), yamuxb.close(), yamuxa.close(), yamuxb.close(),
handlera, handlerb) handlera, handlerb)
suite "Basic": suite "Simple Reading/Writing yamux messages":
asyncTest "Simple test": asyncTest "Roundtrip of small messages":
mSetup() mSetup()
yamuxb.streamHandler = proc(conn: Connection) {.async.} = yamuxb.streamHandler = proc(conn: Connection) {.async.} =