diff --git a/libp2p/muxers/yamux/yamux.nim b/libp2p/muxers/yamux/yamux.nim index ce800110c..7f9a33f8d 100644 --- a/libp2p/muxers/yamux/yamux.nim +++ b/libp2p/muxers/yamux/yamux.nim @@ -172,12 +172,16 @@ proc `$`(channel: YamuxChannel): string = result &= " {" & s.foldl(if a != "": a & ", " & b else: b, "") & "}" proc lengthSendQueue(channel: YamuxChannel): int = + ## Returns the length of what remains to be sent + ## channel.sendQueue.foldl(a + b.data.len - b.sent, 0) proc lengthSendQueueWithLimit(channel: YamuxChannel): int = + ## Returns the length of what remains to be sent, but limit the size of big messages. + ## # For leniency, limit big messages size to the third of maxSendQueueSize - # This value is arbitrary, it's not in the specs - # It permits to store up to 3 big messages if the peer is stalling. + # This value is arbitrary, it's not in the specs, it permits to store up to + # 3 big messages if the peer is stalling. channel.sendQueue.foldl(a + min(b.data.len - b.sent, channel.maxSendQueueSize div 3), 0) proc actuallyClose(channel: YamuxChannel) {.async.} = @@ -200,6 +204,9 @@ method closeImpl*(channel: YamuxChannel) {.async.} = await channel.actuallyClose() proc reset(channel: YamuxChannel, isLocal: bool = false) {.async.} = + # If we reset locally, we want to flush up to a maximum of recvWindow + # bytes. It's because the peer we're connected to can send us data before + # it receives the reset. if channel.isReset: return trace "Reset channel" @@ -220,11 +227,14 @@ proc reset(channel: YamuxChannel, isLocal: bool = false) {.async.} = await channel.remoteClosed() channel.receivedData.fire() if not isLocal: - # If we reset locally, we want to flush up to a maximum of recvWindow - # bytes. We use the recvWindow in the proc cleanupChann. + # If the reset is remote, there's no reason to flush anything. channel.recvWindow = 0 proc updateRecvWindow(channel: YamuxChannel) {.async.} = + ## Send to the peer a window update when the recvWindow is empty enough + ## + # In order to avoid spamming a window update everytime a byte is read, + # we send it everytime half of the maxRecvWindow is read. let inWindow = channel.recvWindow + channel.recvQueue.len if inWindow > channel.maxRecvWindow div 2: return @@ -242,6 +252,7 @@ method readOnce*( pbytes: pointer, nbytes: int): Future[int] {.async.} = + ## Read from a yamux channel if channel.isReset: raise if channel.remoteReset: @@ -287,17 +298,18 @@ proc trySend(channel: YamuxChannel) {.async.} = return channel.isSending = true defer: channel.isSending = false + while channel.sendQueue.len != 0: channel.sendQueue.keepItIf(not (it.fut.cancelled() and it.sent == 0)) if channel.sendWindow == 0: - trace "send window empty" + trace "trying to send while the sendWindow is empty" if channel.lengthSendQueueWithLimit() > channel.maxSendQueueSize: - debug "channel send queue too big, resetting", maxSendQueueSize=channel.maxSendQueueSize, + trace "channel send queue too big, resetting", maxSendQueueSize=channel.maxSendQueueSize, currentQueueSize = channel.lengthSendQueueWithLimit() try: await channel.reset(true) except CatchableError as exc: - debug "failed to reset", msg=exc.msg + warn "failed to reset", msg=exc.msg break let @@ -316,20 +328,24 @@ proc trySend(channel: YamuxChannel) {.async.} = var futures: seq[Future[void]] while inBuffer < toSend: + # concatenate the different message we try to send into one buffer let (data, sent, fut) = channel.sendQueue[0] let bufferToSend = min(data.len - sent, toSend - inBuffer) sendBuffer.toOpenArray(12, 12 + toSend - 1)[inBuffer..<(inBuffer+bufferToSend)] = channel.sendQueue[0].data.toOpenArray(sent, sent + bufferToSend - 1) channel.sendQueue[0].sent.inc(bufferToSend) if channel.sendQueue[0].sent >= data.len: + # if every byte of the message is in the buffer, add the write future to the + # sequence of futures to be completed (or failed) when the buffer is sent futures.add(fut) channel.sendQueue.delete(0) inBuffer.inc(bufferToSend) - trace "build send buffer", h = $header, msg=string.fromBytes(sendBuffer[12..^1]) + trace "try to send the buffer", h = $header channel.sendWindow.dec(toSend) try: await channel.conn.write(sendBuffer) except CatchableError as exc: + trace "failed to send the buffer" let connDown = newLPStreamConnDownError(exc) for fut in futures.items(): fut.fail(connDown) @@ -340,6 +356,8 @@ proc trySend(channel: YamuxChannel) {.async.} = channel.activity = true method write*(channel: YamuxChannel, msg: seq[byte]): Future[void] = + ## Write to yamux channel + ## result = newFuture[void]("Yamux Send") if channel.remoteReset: result.fail(newLPStreamResetError()) @@ -355,7 +373,9 @@ method write*(channel: YamuxChannel, msg: seq[byte]): Future[void] = libp2p_yamux_recv_queue.observe(channel.lengthSendQueue().int64) asyncSpawn channel.trySend() -proc open*(channel: YamuxChannel) {.async.} = +proc open(channel: YamuxChannel) {.async.} = + ## Open a yamux channel by sending a window update with Syn or Ack flag + ## if channel.opened: trace "Try to open channel twice" return @@ -381,7 +401,7 @@ proc lenBySrc(m: Yamux, isSrc: bool): int = for v in m.channels.values(): if v.isSrc == isSrc: result += 1 -proc cleanupChann(m: Yamux, channel: YamuxChannel) {.async.} = +proc cleanupChannel(m: Yamux, channel: YamuxChannel) {.async.} = await channel.join() m.channels.del(channel.id) when defined(libp2p_yamux_metrics): @@ -419,7 +439,7 @@ proc createStream(m: Yamux, id: uint32, isSrc: bool, when defined(libp2p_agents_metrics): result.shortAgent = m.connection.shortAgent m.channels[id] = result - asyncSpawn m.cleanupChann(result) + asyncSpawn m.cleanupChannel(result) trace "created channel", id, pid=m.connection.peerId when defined(libp2p_yamux_metrics): libp2p_yamux_channels.set(m.lenBySrc(isSrc).int64, [$isSrc, $result.peerId]) @@ -440,7 +460,7 @@ method close*(m: Yamux) {.async.} = trace "Closed yamux" proc handleStream(m: Yamux, channel: YamuxChannel) {.async.} = - ## call the muxer stream handler for this channel + ## Call the muxer stream handler for this channel ## try: await m.streamHandler(channel) @@ -474,6 +494,7 @@ method handle*(m: Yamux) {.async.} = else: if header.streamId in m.flushed: m.flushed.del(header.streamId) + if header.streamId mod 2 == m.currentId mod 2: debug "Peer used our reserved stream id, skipping", id=header.streamId, currentId=m.currentId, peerId=m.connection.peerId raise newException(YamuxError, "Peer used our reserved stream id") diff --git a/tests/testyamux.nim b/tests/testyamux.nim index 7351db1d6..06cc19b0d 100644 --- a/tests/testyamux.nim +++ b/tests/testyamux.nim @@ -36,8 +36,8 @@ suite "Yamux": yamuxa.close(), yamuxb.close(), handlera, handlerb) - suite "Basic": - asyncTest "Simple test": + suite "Simple Reading/Writing yamux messages": + asyncTest "Roundtrip of small messages": mSetup() yamuxb.streamHandler = proc(conn: Connection) {.async.} =