2022-07-04 13:19:21 +00:00
|
|
|
|
# Nim-LibP2P
|
2024-03-05 07:06:27 +00:00
|
|
|
|
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
2022-07-04 13:19:21 +00:00
|
|
|
|
# Licensed under either of
|
|
|
|
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
|
|
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
|
|
|
# at your option.
|
|
|
|
|
# This file may not be copied, modified, or distributed except according to
|
|
|
|
|
# those terms.
|
|
|
|
|
|
2023-06-07 11:12:49 +00:00
|
|
|
|
{.push raises: [].}
|
2022-07-04 13:19:21 +00:00
|
|
|
|
|
|
|
|
|
import sequtils, std/[tables]
|
2022-08-01 12:52:42 +00:00
|
|
|
|
import chronos, chronicles, metrics, stew/[endians2, byteutils, objects]
|
2022-07-04 13:19:21 +00:00
|
|
|
|
import ../muxer, ../../stream/connection
|
|
|
|
|
|
|
|
|
|
export muxer
|
|
|
|
|
|
|
|
|
|
logScope:
|
|
|
|
|
topics = "libp2p yamux"
|
|
|
|
|
|
|
|
|
|
const
|
|
|
|
|
YamuxCodec* = "/yamux/1.0.0"
|
|
|
|
|
YamuxVersion = 0.uint8
|
2023-12-15 15:30:50 +00:00
|
|
|
|
YamuxDefaultWindowSize* = 256000
|
|
|
|
|
MaxSendQueueSize = 256000
|
2022-08-01 12:52:42 +00:00
|
|
|
|
MaxChannelCount = 200
|
|
|
|
|
|
|
|
|
|
when defined(libp2p_yamux_metrics):
|
2024-03-05 07:06:27 +00:00
|
|
|
|
declareGauge libp2p_yamux_channels, "yamux channels", labels = ["initiator", "peer"]
|
|
|
|
|
declareHistogram libp2p_yamux_send_queue,
|
|
|
|
|
"message send queue length (in byte)",
|
|
|
|
|
buckets = [0.0, 100.0, 250.0, 1000.0, 2000.0, 3200.0, 6400.0, 25600.0, 256000.0]
|
|
|
|
|
declareHistogram libp2p_yamux_recv_queue,
|
|
|
|
|
"message recv queue length (in byte)",
|
|
|
|
|
buckets = [0.0, 100.0, 250.0, 1000.0, 2000.0, 3200.0, 6400.0, 25600.0, 256000.0]
|
2022-07-04 13:19:21 +00:00
|
|
|
|
|
|
|
|
|
type
|
2024-03-04 18:26:27 +00:00
|
|
|
|
YamuxError* = object of MuxerError
|
2022-07-04 13:19:21 +00:00
|
|
|
|
|
|
|
|
|
MsgType = enum
|
|
|
|
|
Data = 0x0
|
|
|
|
|
WindowUpdate = 0x1
|
|
|
|
|
Ping = 0x2
|
|
|
|
|
GoAway = 0x3
|
|
|
|
|
|
2024-04-25 13:01:29 +00:00
|
|
|
|
MsgFlags {.size: 2.} = enum
|
2022-07-04 13:19:21 +00:00
|
|
|
|
Syn
|
|
|
|
|
Ack
|
|
|
|
|
Fin
|
|
|
|
|
Rst
|
|
|
|
|
|
|
|
|
|
GoAwayStatus = enum
|
|
|
|
|
NormalTermination = 0x0
|
|
|
|
|
ProtocolError = 0x1
|
|
|
|
|
InternalError = 0x2
|
|
|
|
|
|
2024-04-25 13:01:29 +00:00
|
|
|
|
YamuxHeader = object
|
2022-07-04 13:19:21 +00:00
|
|
|
|
version: uint8
|
|
|
|
|
msgType: MsgType
|
|
|
|
|
flags: set[MsgFlags]
|
|
|
|
|
streamId: uint32
|
|
|
|
|
length: uint32
|
|
|
|
|
|
2024-04-25 13:01:29 +00:00
|
|
|
|
proc readHeader(
|
2024-03-05 07:06:27 +00:00
|
|
|
|
conn: LPStream
|
|
|
|
|
): Future[YamuxHeader] {.async: (raises: [CancelledError, LPStreamError, MuxerError]).} =
|
2022-07-04 13:19:21 +00:00
|
|
|
|
var buffer: array[12, byte]
|
|
|
|
|
await conn.readExactly(addr buffer[0], 12)
|
|
|
|
|
|
|
|
|
|
result.version = buffer[0]
|
|
|
|
|
let flags = fromBytesBE(uint16, buffer[2 .. 3])
|
|
|
|
|
if not result.msgType.checkedEnumAssign(buffer[1]) or flags notin 0'u16 .. 15'u16:
|
|
|
|
|
raise newException(YamuxError, "Wrong header")
|
|
|
|
|
result.flags = cast[set[MsgFlags]](flags)
|
|
|
|
|
result.streamId = fromBytesBE(uint32, buffer[4 .. 7])
|
|
|
|
|
result.length = fromBytesBE(uint32, buffer[8 .. 11])
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
proc `$`(header: YamuxHeader): string =
|
2024-03-05 07:06:27 +00:00
|
|
|
|
"{" & $header.msgType & ", " & "{" &
|
|
|
|
|
header.flags.foldl(
|
|
|
|
|
if a != "":
|
|
|
|
|
a & ", " & $b
|
|
|
|
|
else:
|
|
|
|
|
$b
|
|
|
|
|
,
|
|
|
|
|
"",
|
|
|
|
|
) & "}, " & "streamId: " & $header.streamId & ", " & "length: " & $header.length &
|
|
|
|
|
"}"
|
2022-07-04 13:19:21 +00:00
|
|
|
|
|
|
|
|
|
proc encode(header: YamuxHeader): array[12, byte] =
|
|
|
|
|
result[0] = header.version
|
|
|
|
|
result[1] = uint8(header.msgType)
|
2023-05-18 08:24:17 +00:00
|
|
|
|
result[2 .. 3] = toBytesBE(uint16(cast[uint8](header.flags)))
|
|
|
|
|
# workaround https://github.com/nim-lang/Nim/issues/21789
|
2022-07-04 13:19:21 +00:00
|
|
|
|
result[4 .. 7] = toBytesBE(header.streamId)
|
|
|
|
|
result[8 .. 11] = toBytesBE(header.length)
|
|
|
|
|
|
2024-04-25 13:01:29 +00:00
|
|
|
|
proc write(
|
2024-03-05 07:06:27 +00:00
|
|
|
|
conn: LPStream, header: YamuxHeader
|
|
|
|
|
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
|
2022-07-04 13:19:21 +00:00
|
|
|
|
trace "write directly on stream", h = $header
|
|
|
|
|
var buffer = header.encode()
|
2024-03-05 07:06:27 +00:00
|
|
|
|
conn.write(@buffer)
|
2022-07-04 13:19:21 +00:00
|
|
|
|
|
|
|
|
|
proc ping(T: type[YamuxHeader], flag: MsgFlags, pingData: uint32): T =
|
|
|
|
|
T(version: YamuxVersion, msgType: MsgType.Ping, flags: {flag}, length: pingData)
|
|
|
|
|
|
|
|
|
|
proc goAway(T: type[YamuxHeader], status: GoAwayStatus): T =
|
|
|
|
|
T(version: YamuxVersion, msgType: MsgType.GoAway, length: uint32(status))
|
|
|
|
|
|
2024-04-25 13:01:29 +00:00
|
|
|
|
proc data(
|
2024-03-05 07:06:27 +00:00
|
|
|
|
T: type[YamuxHeader],
|
|
|
|
|
streamId: uint32,
|
|
|
|
|
length: uint32 = 0,
|
|
|
|
|
flags: set[MsgFlags] = {},
|
|
|
|
|
): T =
|
2022-07-04 13:19:21 +00:00
|
|
|
|
T(
|
|
|
|
|
version: YamuxVersion,
|
|
|
|
|
msgType: MsgType.Data,
|
|
|
|
|
length: length,
|
|
|
|
|
flags: flags,
|
|
|
|
|
streamId: streamId,
|
|
|
|
|
)
|
|
|
|
|
|
2024-04-25 13:01:29 +00:00
|
|
|
|
proc windowUpdate(
|
2024-03-05 07:06:27 +00:00
|
|
|
|
T: type[YamuxHeader], streamId: uint32, delta: uint32, flags: set[MsgFlags] = {}
|
|
|
|
|
): T =
|
2022-07-04 13:19:21 +00:00
|
|
|
|
T(
|
|
|
|
|
version: YamuxVersion,
|
|
|
|
|
msgType: MsgType.WindowUpdate,
|
|
|
|
|
length: delta,
|
|
|
|
|
flags: flags,
|
|
|
|
|
streamId: streamId,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type
|
|
|
|
|
ToSend =
|
|
|
|
|
tuple[
|
|
|
|
|
data: seq[byte],
|
|
|
|
|
sent: int,
|
2024-03-05 07:06:27 +00:00
|
|
|
|
fut: Future[void].Raising([CancelledError, LPStreamError]),
|
2024-06-11 15:18:06 +00:00
|
|
|
|
]
|
2022-07-04 13:19:21 +00:00
|
|
|
|
YamuxChannel* = ref object of Connection
|
2024-04-25 13:01:29 +00:00
|
|
|
|
id: uint32
|
2022-07-04 13:19:21 +00:00
|
|
|
|
recvWindow: int
|
|
|
|
|
sendWindow: int
|
|
|
|
|
maxRecvWindow: int
|
2023-12-15 15:30:50 +00:00
|
|
|
|
maxSendQueueSize: int
|
2024-04-25 13:01:29 +00:00
|
|
|
|
conn: Connection
|
2022-07-04 13:19:21 +00:00
|
|
|
|
isSrc: bool
|
|
|
|
|
opened: bool
|
|
|
|
|
isSending: bool
|
|
|
|
|
sendQueue: seq[ToSend]
|
|
|
|
|
recvQueue: seq[byte]
|
|
|
|
|
isReset: bool
|
2022-09-14 08:58:41 +00:00
|
|
|
|
remoteReset: bool
|
2024-06-21 10:11:18 +00:00
|
|
|
|
closedRemotely: AsyncEvent
|
2022-07-04 13:19:21 +00:00
|
|
|
|
closedLocally: bool
|
|
|
|
|
receivedData: AsyncEvent
|
|
|
|
|
|
|
|
|
|
proc `$`(channel: YamuxChannel): string =
|
|
|
|
|
result = if channel.conn.dir == Out: "=> " else: "<= "
|
|
|
|
|
result &= $channel.id
|
|
|
|
|
var s: seq[string] = @[]
|
2024-06-21 10:11:18 +00:00
|
|
|
|
if channel.closedRemotely.isSet():
|
2022-07-04 13:19:21 +00:00
|
|
|
|
s.add("ClosedRemotely")
|
|
|
|
|
if channel.closedLocally:
|
|
|
|
|
s.add("ClosedLocally")
|
|
|
|
|
if channel.isReset:
|
|
|
|
|
s.add("Reset")
|
|
|
|
|
if s.len > 0:
|
|
|
|
|
result &=
|
|
|
|
|
" {" &
|
|
|
|
|
s.foldl(
|
|
|
|
|
if a != "":
|
|
|
|
|
a & ", " & b
|
|
|
|
|
else:
|
|
|
|
|
b
|
2024-06-11 15:18:06 +00:00
|
|
|
|
,
|
2022-07-04 13:19:21 +00:00
|
|
|
|
"",
|
|
|
|
|
) & "}"
|
|
|
|
|
|
2023-12-15 15:30:50 +00:00
|
|
|
|
proc lengthSendQueue(channel: YamuxChannel): int =
|
2024-02-02 14:14:02 +00:00
|
|
|
|
## Returns the length of what remains to be sent
|
|
|
|
|
##
|
2023-12-15 15:30:50 +00:00
|
|
|
|
channel.sendQueue.foldl(a + b.data.len - b.sent, 0)
|
|
|
|
|
|
|
|
|
|
proc lengthSendQueueWithLimit(channel: YamuxChannel): int =
|
2024-02-02 14:14:02 +00:00
|
|
|
|
## Returns the length of what remains to be sent, but limit the size of big messages.
|
|
|
|
|
##
|
2023-12-15 15:30:50 +00:00
|
|
|
|
# For leniency, limit big messages size to the third of maxSendQueueSize
|
2024-02-02 14:14:02 +00:00
|
|
|
|
# This value is arbitrary, it's not in the specs, it permits to store up to
|
|
|
|
|
# 3 big messages if the peer is stalling.
|
2023-12-15 15:30:50 +00:00
|
|
|
|
channel.sendQueue.foldl(
|
|
|
|
|
a + min(b.data.len - b.sent, channel.maxSendQueueSize div 3), 0
|
|
|
|
|
)
|
2022-07-04 13:19:21 +00:00
|
|
|
|
|
2024-03-05 07:06:27 +00:00
|
|
|
|
proc actuallyClose(channel: YamuxChannel) {.async: (raises: []).} =
|
2022-07-04 13:19:21 +00:00
|
|
|
|
if channel.closedLocally and channel.sendQueue.len == 0 and
|
2024-06-21 10:11:18 +00:00
|
|
|
|
channel.closedRemotely.isSet():
|
2022-07-04 13:19:21 +00:00
|
|
|
|
await procCall Connection(channel).closeImpl()
|
|
|
|
|
|
2024-03-05 07:06:27 +00:00
|
|
|
|
proc remoteClosed(channel: YamuxChannel) {.async: (raises: []).} =
|
2024-06-21 10:11:18 +00:00
|
|
|
|
if not channel.closedRemotely.isSet():
|
|
|
|
|
channel.closedRemotely.fire()
|
2022-07-04 13:19:21 +00:00
|
|
|
|
await channel.actuallyClose()
|
|
|
|
|
|
2024-03-05 07:06:27 +00:00
|
|
|
|
method closeImpl*(channel: YamuxChannel) {.async: (raises: []).} =
|
2022-07-04 13:19:21 +00:00
|
|
|
|
if not channel.closedLocally:
|
2024-05-24 12:11:27 +00:00
|
|
|
|
trace "Closing yamux channel locally", streamId = channel.id, conn = channel.conn
|
2022-07-04 13:19:21 +00:00
|
|
|
|
channel.closedLocally = true
|
|
|
|
|
|
2024-03-03 23:05:59 +00:00
|
|
|
|
if not channel.isReset and channel.sendQueue.len == 0:
|
|
|
|
|
try:
|
|
|
|
|
await channel.conn.write(YamuxHeader.data(channel.id, 0, {Fin}))
|
|
|
|
|
except CancelledError, LPStreamError:
|
|
|
|
|
discard
|
2022-07-04 13:19:21 +00:00
|
|
|
|
await channel.actuallyClose()
|
|
|
|
|
|
2024-03-05 07:06:27 +00:00
|
|
|
|
proc reset(channel: YamuxChannel, isLocal: bool = false) {.async: (raises: []).} =
|
2024-02-02 14:14:02 +00:00
|
|
|
|
# If we reset locally, we want to flush up to a maximum of recvWindow
|
|
|
|
|
# bytes. It's because the peer we're connected to can send us data before
|
|
|
|
|
# it receives the reset.
|
2022-09-14 08:58:41 +00:00
|
|
|
|
if channel.isReset:
|
|
|
|
|
return
|
|
|
|
|
trace "Reset channel"
|
|
|
|
|
channel.isReset = true
|
|
|
|
|
channel.remoteReset = not isLocal
|
|
|
|
|
for (d, s, fut) in channel.sendQueue:
|
|
|
|
|
fut.fail(newLPStreamEOFError())
|
|
|
|
|
channel.sendQueue = @[]
|
|
|
|
|
channel.recvQueue = @[]
|
|
|
|
|
channel.sendWindow = 0
|
|
|
|
|
if not channel.closedLocally:
|
2024-03-03 23:23:42 +00:00
|
|
|
|
if isLocal and not channel.isSending:
|
2022-09-14 08:58:41 +00:00
|
|
|
|
try:
|
|
|
|
|
await channel.conn.write(YamuxHeader.data(channel.id, 0, {Rst}))
|
2024-03-05 07:06:27 +00:00
|
|
|
|
except CancelledError, LPStreamError:
|
|
|
|
|
discard
|
2022-09-14 08:58:41 +00:00
|
|
|
|
await channel.close()
|
2024-06-21 10:11:18 +00:00
|
|
|
|
if not channel.closedRemotely.isSet():
|
2022-09-14 08:58:41 +00:00
|
|
|
|
await channel.remoteClosed()
|
|
|
|
|
channel.receivedData.fire()
|
2022-07-04 13:19:21 +00:00
|
|
|
|
if not isLocal:
|
2024-02-02 14:14:02 +00:00
|
|
|
|
# If the reset is remote, there's no reason to flush anything.
|
2022-07-04 13:19:21 +00:00
|
|
|
|
channel.recvWindow = 0
|
|
|
|
|
|
2024-03-05 07:06:27 +00:00
|
|
|
|
proc updateRecvWindow(
|
|
|
|
|
channel: YamuxChannel
|
|
|
|
|
) {.async: (raises: [CancelledError, LPStreamError]).} =
|
2024-02-02 14:14:02 +00:00
|
|
|
|
## Send to the peer a window update when the recvWindow is empty enough
|
|
|
|
|
##
|
|
|
|
|
# In order to avoid spamming a window update everytime a byte is read,
|
|
|
|
|
# we send it everytime half of the maxRecvWindow is read.
|
2022-07-04 13:19:21 +00:00
|
|
|
|
let inWindow = channel.recvWindow + channel.recvQueue.len
|
|
|
|
|
if inWindow > channel.maxRecvWindow div 2:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
let delta = channel.maxRecvWindow - inWindow
|
|
|
|
|
channel.recvWindow.inc(delta)
|
|
|
|
|
await channel.conn.write(YamuxHeader.windowUpdate(channel.id, delta.uint32))
|
|
|
|
|
trace "increasing the recvWindow", delta
|
|
|
|
|
|
|
|
|
|
method readOnce*(
|
2024-03-05 07:06:27 +00:00
|
|
|
|
channel: YamuxChannel, pbytes: pointer, nbytes: int
|
|
|
|
|
): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} =
|
2024-02-02 14:14:02 +00:00
|
|
|
|
## Read from a yamux channel
|
2022-07-04 13:19:21 +00:00
|
|
|
|
|
2022-09-14 08:58:41 +00:00
|
|
|
|
if channel.isReset:
|
2024-03-05 07:06:27 +00:00
|
|
|
|
raise
|
|
|
|
|
if channel.remoteReset:
|
2022-09-14 08:58:41 +00:00
|
|
|
|
newLPStreamResetError()
|
|
|
|
|
elif channel.closedLocally:
|
|
|
|
|
newLPStreamClosedError()
|
|
|
|
|
else:
|
|
|
|
|
newLPStreamConnDownError()
|
2024-05-24 12:11:27 +00:00
|
|
|
|
if channel.isEof:
|
2022-09-14 08:58:41 +00:00
|
|
|
|
raise newLPStreamRemoteClosedError()
|
2022-07-04 13:19:21 +00:00
|
|
|
|
if channel.recvQueue.len == 0:
|
|
|
|
|
channel.receivedData.clear()
|
2024-08-12 17:21:16 +00:00
|
|
|
|
let
|
|
|
|
|
closedRemotelyFut = channel.closedRemotely.wait()
|
|
|
|
|
receivedDataFut = channel.receivedData.wait()
|
|
|
|
|
defer:
|
|
|
|
|
if not closedRemotelyFut.finished():
|
|
|
|
|
await closedRemotelyFut.cancelAndWait()
|
|
|
|
|
if not receivedDataFut.finished():
|
|
|
|
|
await receivedDataFut.cancelAndWait()
|
|
|
|
|
await closedRemotelyFut or receivedDataFut
|
2024-06-21 10:11:18 +00:00
|
|
|
|
if channel.closedRemotely.isSet() and channel.recvQueue.len == 0:
|
2023-11-21 15:03:29 +00:00
|
|
|
|
channel.isEof = true
|
2024-05-24 12:11:27 +00:00
|
|
|
|
return
|
|
|
|
|
0 # we return 0 to indicate that the channel is closed for reading from now on
|
2022-07-04 13:19:21 +00:00
|
|
|
|
|
|
|
|
|
let toRead = min(channel.recvQueue.len, nbytes)
|
|
|
|
|
|
|
|
|
|
var p = cast[ptr UncheckedArray[byte]](pbytes)
|
2024-03-05 07:06:27 +00:00
|
|
|
|
toOpenArray(p, 0, nbytes - 1)[0 ..< toRead] =
|
|
|
|
|
channel.recvQueue.toOpenArray(0, toRead - 1)
|
2022-07-04 13:19:21 +00:00
|
|
|
|
channel.recvQueue = channel.recvQueue[toRead ..^ 1]
|
|
|
|
|
|
|
|
|
|
# We made some room in the recv buffer let the peer know
|
|
|
|
|
await channel.updateRecvWindow()
|
|
|
|
|
channel.activity = true
|
|
|
|
|
return toRead
|
|
|
|
|
|
2024-03-05 07:06:27 +00:00
|
|
|
|
proc gotDataFromRemote(
|
|
|
|
|
channel: YamuxChannel, b: seq[byte]
|
|
|
|
|
) {.async: (raises: [CancelledError, LPStreamError]).} =
|
2022-07-04 13:19:21 +00:00
|
|
|
|
channel.recvWindow -= b.len
|
|
|
|
|
channel.recvQueue = channel.recvQueue.concat(b)
|
|
|
|
|
channel.receivedData.fire()
|
2022-08-01 12:52:42 +00:00
|
|
|
|
when defined(libp2p_yamux_metrics):
|
|
|
|
|
libp2p_yamux_recv_queue.observe(channel.recvQueue.len.int64)
|
2022-07-04 13:19:21 +00:00
|
|
|
|
await channel.updateRecvWindow()
|
|
|
|
|
|
|
|
|
|
proc setMaxRecvWindow*(channel: YamuxChannel, maxRecvWindow: int) =
|
|
|
|
|
channel.maxRecvWindow = maxRecvWindow
|
|
|
|
|
|
2024-03-05 07:06:27 +00:00
|
|
|
|
proc trySend(
|
|
|
|
|
channel: YamuxChannel
|
|
|
|
|
) {.async: (raises: [CancelledError, LPStreamError]).} =
|
2022-07-04 13:19:21 +00:00
|
|
|
|
if channel.isSending:
|
|
|
|
|
return
|
|
|
|
|
channel.isSending = true
|
|
|
|
|
defer:
|
|
|
|
|
channel.isSending = false
|
2024-02-02 14:14:02 +00:00
|
|
|
|
|
2022-07-04 13:19:21 +00:00
|
|
|
|
while channel.sendQueue.len != 0:
|
|
|
|
|
channel.sendQueue.keepItIf(not (it.fut.cancelled() and it.sent == 0))
|
|
|
|
|
if channel.sendWindow == 0:
|
2024-02-02 14:14:02 +00:00
|
|
|
|
trace "trying to send while the sendWindow is empty"
|
2023-12-15 15:30:50 +00:00
|
|
|
|
if channel.lengthSendQueueWithLimit() > channel.maxSendQueueSize:
|
2024-03-05 07:06:27 +00:00
|
|
|
|
trace "channel send queue too big, resetting",
|
|
|
|
|
maxSendQueueSize = channel.maxSendQueueSize,
|
2023-12-15 15:30:50 +00:00
|
|
|
|
currentQueueSize = channel.lengthSendQueueWithLimit()
|
2024-03-05 07:06:27 +00:00
|
|
|
|
await channel.reset(isLocal = true)
|
2022-07-04 13:19:21 +00:00
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
let
|
2023-12-15 15:30:50 +00:00
|
|
|
|
bytesAvailable = channel.lengthSendQueue()
|
2022-07-04 13:19:21 +00:00
|
|
|
|
toSend = min(channel.sendWindow, bytesAvailable)
|
|
|
|
|
var
|
|
|
|
|
sendBuffer = newSeqUninitialized[byte](toSend + 12)
|
|
|
|
|
header = YamuxHeader.data(channel.id, toSend.uint32)
|
|
|
|
|
inBuffer = 0
|
|
|
|
|
|
|
|
|
|
if toSend >= bytesAvailable and channel.closedLocally:
|
|
|
|
|
trace "last buffer we'll sent on this channel", toSend, bytesAvailable
|
|
|
|
|
header.flags.incl({Fin})
|
|
|
|
|
|
|
|
|
|
sendBuffer[0 ..< 12] = header.encode()
|
|
|
|
|
|
2024-03-05 07:06:27 +00:00
|
|
|
|
var futures: seq[Future[void].Raising([CancelledError, LPStreamError])]
|
2022-07-04 13:19:21 +00:00
|
|
|
|
while inBuffer < toSend:
|
2024-02-02 14:14:02 +00:00
|
|
|
|
# concatenate the different message we try to send into one buffer
|
2022-07-04 13:19:21 +00:00
|
|
|
|
let (data, sent, fut) = channel.sendQueue[0]
|
|
|
|
|
let bufferToSend = min(data.len - sent, toSend - inBuffer)
|
2024-06-11 15:18:06 +00:00
|
|
|
|
|
2022-07-04 13:19:21 +00:00
|
|
|
|
sendBuffer.toOpenArray(12, 12 + toSend - 1)[
|
|
|
|
|
inBuffer ..< (inBuffer + bufferToSend)
|
|
|
|
|
] = channel.sendQueue[0].data.toOpenArray(sent, sent + bufferToSend - 1)
|
|
|
|
|
channel.sendQueue[0].sent.inc(bufferToSend)
|
|
|
|
|
if channel.sendQueue[0].sent >= data.len:
|
2024-02-02 14:14:02 +00:00
|
|
|
|
# if every byte of the message is in the buffer, add the write future to the
|
|
|
|
|
# sequence of futures to be completed (or failed) when the buffer is sent
|
2022-07-04 13:19:21 +00:00
|
|
|
|
futures.add(fut)
|
|
|
|
|
channel.sendQueue.delete(0)
|
|
|
|
|
inBuffer.inc(bufferToSend)
|
|
|
|
|
|
2024-02-02 14:14:02 +00:00
|
|
|
|
trace "try to send the buffer", h = $header
|
2022-07-04 13:19:21 +00:00
|
|
|
|
channel.sendWindow.dec(toSend)
|
2024-03-05 07:06:27 +00:00
|
|
|
|
try:
|
|
|
|
|
await channel.conn.write(sendBuffer)
|
|
|
|
|
except CancelledError:
|
|
|
|
|
trace "cancelled sending the buffer"
|
|
|
|
|
for fut in futures.items():
|
|
|
|
|
fut.cancelSoon()
|
|
|
|
|
await channel.reset()
|
|
|
|
|
break
|
|
|
|
|
except LPStreamError as exc:
|
2024-02-02 14:14:02 +00:00
|
|
|
|
trace "failed to send the buffer"
|
2022-09-14 08:58:41 +00:00
|
|
|
|
let connDown = newLPStreamConnDownError(exc)
|
2022-07-04 13:19:21 +00:00
|
|
|
|
for fut in futures.items():
|
2022-09-14 08:58:41 +00:00
|
|
|
|
fut.fail(connDown)
|
2022-07-04 13:19:21 +00:00
|
|
|
|
await channel.reset()
|
|
|
|
|
break
|
|
|
|
|
for fut in futures.items():
|
|
|
|
|
fut.complete()
|
|
|
|
|
channel.activity = true
|
|
|
|
|
|
2024-03-05 07:06:27 +00:00
|
|
|
|
method write*(
|
|
|
|
|
channel: YamuxChannel, msg: seq[byte]
|
|
|
|
|
): Future[void] {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
|
2024-02-02 14:14:02 +00:00
|
|
|
|
## Write to yamux channel
|
|
|
|
|
##
|
2022-07-04 13:19:21 +00:00
|
|
|
|
result = newFuture[void]("Yamux Send")
|
2022-09-14 08:58:41 +00:00
|
|
|
|
if channel.remoteReset:
|
|
|
|
|
result.fail(newLPStreamResetError())
|
|
|
|
|
return result
|
2022-07-04 13:19:21 +00:00
|
|
|
|
if channel.closedLocally or channel.isReset:
|
2022-09-14 08:58:41 +00:00
|
|
|
|
result.fail(newLPStreamClosedError())
|
2022-07-04 13:19:21 +00:00
|
|
|
|
return result
|
|
|
|
|
if msg.len == 0:
|
|
|
|
|
result.complete()
|
|
|
|
|
return result
|
|
|
|
|
channel.sendQueue.add((msg, 0, result))
|
2022-08-01 12:52:42 +00:00
|
|
|
|
when defined(libp2p_yamux_metrics):
|
2024-02-08 11:36:58 +00:00
|
|
|
|
libp2p_yamux_send_queue.observe(channel.lengthSendQueue().int64)
|
2022-07-04 13:19:21 +00:00
|
|
|
|
asyncSpawn channel.trySend()
|
|
|
|
|
|
2024-03-05 07:06:27 +00:00
|
|
|
|
proc open(channel: YamuxChannel) {.async: (raises: [CancelledError, LPStreamError]).} =
|
2024-02-02 14:14:02 +00:00
|
|
|
|
## Open a yamux channel by sending a window update with Syn or Ack flag
|
|
|
|
|
##
|
2022-07-04 13:19:21 +00:00
|
|
|
|
if channel.opened:
|
|
|
|
|
trace "Try to open channel twice"
|
|
|
|
|
return
|
|
|
|
|
channel.opened = true
|
2023-12-15 15:30:50 +00:00
|
|
|
|
await channel.conn.write(
|
|
|
|
|
YamuxHeader.windowUpdate(
|
|
|
|
|
channel.id,
|
|
|
|
|
uint32(max(channel.maxRecvWindow - YamuxDefaultWindowSize, 0)),
|
|
|
|
|
{if channel.isSrc: Syn else: Ack},
|
|
|
|
|
)
|
2024-06-11 15:18:06 +00:00
|
|
|
|
)
|
2022-07-04 13:19:21 +00:00
|
|
|
|
|
2023-03-08 11:30:19 +00:00
|
|
|
|
method getWrapped*(channel: YamuxChannel): Connection =
|
|
|
|
|
channel.conn
|
|
|
|
|
|
2022-07-04 13:19:21 +00:00
|
|
|
|
type Yamux* = ref object of Muxer
|
|
|
|
|
channels: Table[uint32, YamuxChannel]
|
|
|
|
|
flushed: Table[uint32, int]
|
|
|
|
|
currentId: uint32
|
|
|
|
|
isClosed: bool
|
2022-08-01 12:52:42 +00:00
|
|
|
|
maxChannCount: int
|
2023-12-15 15:30:50 +00:00
|
|
|
|
windowSize: int
|
|
|
|
|
maxSendQueueSize: int
|
2024-02-22 09:21:34 +00:00
|
|
|
|
inTimeout: Duration
|
|
|
|
|
outTimeout: Duration
|
2022-08-01 12:52:42 +00:00
|
|
|
|
|
|
|
|
|
proc lenBySrc(m: Yamux, isSrc: bool): int =
|
|
|
|
|
for v in m.channels.values():
|
|
|
|
|
if v.isSrc == isSrc:
|
|
|
|
|
result += 1
|
2022-07-04 13:19:21 +00:00
|
|
|
|
|
2024-03-05 07:06:27 +00:00
|
|
|
|
proc cleanupChannel(m: Yamux, channel: YamuxChannel) {.async: (raises: []).} =
|
|
|
|
|
try:
|
|
|
|
|
await channel.join()
|
|
|
|
|
except CancelledError:
|
|
|
|
|
discard
|
2022-07-04 13:19:21 +00:00
|
|
|
|
m.channels.del(channel.id)
|
2022-08-01 12:52:42 +00:00
|
|
|
|
when defined(libp2p_yamux_metrics):
|
2024-03-05 07:06:27 +00:00
|
|
|
|
libp2p_yamux_channels.set(
|
|
|
|
|
m.lenBySrc(channel.isSrc).int64, [$channel.isSrc, $channel.peerId]
|
|
|
|
|
)
|
2024-04-25 13:01:29 +00:00
|
|
|
|
if channel.isReset and channel.recvWindow > 0:
|
2022-07-04 13:19:21 +00:00
|
|
|
|
m.flushed[channel.id] = channel.recvWindow
|
|
|
|
|
|
2024-03-05 07:06:27 +00:00
|
|
|
|
proc createStream(
|
|
|
|
|
m: Yamux, id: uint32, isSrc: bool, recvWindow: int, maxSendQueueSize: int
|
|
|
|
|
): YamuxChannel =
|
|
|
|
|
# During initialization, recvWindow can be larger than maxRecvWindow.
|
2023-12-15 15:30:50 +00:00
|
|
|
|
# This is because the peer we're connected to will always assume
|
|
|
|
|
# that the initial recvWindow is 256k.
|
2024-03-05 07:06:27 +00:00
|
|
|
|
# To solve this contradiction, no updateWindow will be sent until
|
|
|
|
|
# recvWindow is less than maxRecvWindow
|
2024-02-22 09:21:34 +00:00
|
|
|
|
var stream = YamuxChannel(
|
2022-07-04 13:19:21 +00:00
|
|
|
|
id: id,
|
2023-12-15 15:30:50 +00:00
|
|
|
|
maxRecvWindow: recvWindow,
|
|
|
|
|
recvWindow:
|
|
|
|
|
if recvWindow > YamuxDefaultWindowSize: recvWindow else: YamuxDefaultWindowSize,
|
|
|
|
|
sendWindow: YamuxDefaultWindowSize,
|
|
|
|
|
maxSendQueueSize: maxSendQueueSize,
|
2022-07-04 13:19:21 +00:00
|
|
|
|
isSrc: isSrc,
|
|
|
|
|
conn: m.connection,
|
|
|
|
|
receivedData: newAsyncEvent(),
|
2024-06-21 10:11:18 +00:00
|
|
|
|
closedRemotely: newAsyncEvent(),
|
2022-07-04 13:19:21 +00:00
|
|
|
|
)
|
2024-02-22 09:21:34 +00:00
|
|
|
|
stream.objName = "YamuxStream"
|
|
|
|
|
if isSrc:
|
|
|
|
|
stream.dir = Direction.Out
|
|
|
|
|
stream.timeout = m.outTimeout
|
|
|
|
|
else:
|
|
|
|
|
stream.dir = Direction.In
|
|
|
|
|
stream.timeout = m.inTimeout
|
2024-03-05 07:06:27 +00:00
|
|
|
|
stream.timeoutHandler = proc(): Future[void] {.async: (raises: [], raw: true).} =
|
|
|
|
|
trace "Idle timeout expired, resetting YamuxChannel"
|
|
|
|
|
stream.reset(isLocal = true)
|
2024-02-22 09:21:34 +00:00
|
|
|
|
stream.initStream()
|
|
|
|
|
stream.peerId = m.connection.peerId
|
|
|
|
|
stream.observedAddr = m.connection.observedAddr
|
|
|
|
|
stream.transportDir = m.connection.transportDir
|
2022-07-04 13:19:21 +00:00
|
|
|
|
when defined(libp2p_agents_metrics):
|
2024-02-22 09:21:34 +00:00
|
|
|
|
stream.shortAgent = m.connection.shortAgent
|
|
|
|
|
m.channels[id] = stream
|
|
|
|
|
asyncSpawn m.cleanupChannel(stream)
|
2022-07-04 13:19:21 +00:00
|
|
|
|
trace "created channel", id, pid = m.connection.peerId
|
2022-08-01 12:52:42 +00:00
|
|
|
|
when defined(libp2p_yamux_metrics):
|
2024-02-22 09:21:34 +00:00
|
|
|
|
libp2p_yamux_channels.set(m.lenBySrc(isSrc).int64, [$isSrc, $stream.peerId])
|
|
|
|
|
return stream
|
2022-07-04 13:19:21 +00:00
|
|
|
|
|
2024-03-05 07:06:27 +00:00
|
|
|
|
method close*(m: Yamux) {.async: (raises: []).} =
|
2022-07-04 13:19:21 +00:00
|
|
|
|
if m.isClosed == true:
|
|
|
|
|
trace "Already closed"
|
|
|
|
|
return
|
|
|
|
|
m.isClosed = true
|
|
|
|
|
|
|
|
|
|
trace "Closing yamux"
|
2022-09-14 08:58:41 +00:00
|
|
|
|
let channels = toSeq(m.channels.values())
|
|
|
|
|
for channel in channels:
|
2024-03-05 07:06:27 +00:00
|
|
|
|
await channel.reset(isLocal = true)
|
2022-12-16 17:14:40 +00:00
|
|
|
|
try:
|
|
|
|
|
await m.connection.write(YamuxHeader.goAway(NormalTermination))
|
2024-03-05 07:06:27 +00:00
|
|
|
|
except CancelledError as exc:
|
2024-08-14 15:19:54 +00:00
|
|
|
|
trace "cancelled sending goAway", description = exc.msg
|
2024-03-05 07:06:27 +00:00
|
|
|
|
except LPStreamError as exc:
|
2024-08-14 15:19:54 +00:00
|
|
|
|
trace "failed to send goAway", description = exc.msg
|
2022-07-04 13:19:21 +00:00
|
|
|
|
await m.connection.close()
|
|
|
|
|
trace "Closed yamux"
|
|
|
|
|
|
2024-03-05 07:06:27 +00:00
|
|
|
|
proc handleStream(m: Yamux, channel: YamuxChannel) {.async: (raises: []).} =
|
2024-02-02 14:14:02 +00:00
|
|
|
|
## Call the muxer stream handler for this channel
|
2022-07-04 13:19:21 +00:00
|
|
|
|
##
|
2024-03-05 07:06:27 +00:00
|
|
|
|
await m.streamHandler(channel)
|
|
|
|
|
trace "finished handling stream"
|
|
|
|
|
doAssert(channel.isClosed, "connection not closed by handler!")
|
|
|
|
|
|
|
|
|
|
method handle*(m: Yamux) {.async: (raises: []).} =
|
2022-07-04 13:19:21 +00:00
|
|
|
|
trace "Starting yamux handler", pid = m.connection.peerId
|
|
|
|
|
try:
|
|
|
|
|
while not m.connection.atEof:
|
|
|
|
|
trace "waiting for header"
|
|
|
|
|
let header = await m.connection.readHeader()
|
|
|
|
|
trace "got message", h = $header
|
|
|
|
|
|
|
|
|
|
case header.msgType
|
|
|
|
|
of Ping:
|
|
|
|
|
if MsgFlags.Syn in header.flags:
|
|
|
|
|
await m.connection.write(YamuxHeader.ping(MsgFlags.Ack, header.length))
|
|
|
|
|
of GoAway:
|
|
|
|
|
var status: GoAwayStatus
|
|
|
|
|
if status.checkedEnumAssign(header.length):
|
|
|
|
|
trace "Received go away", status
|
|
|
|
|
else:
|
|
|
|
|
trace "Received unexpected error go away"
|
|
|
|
|
break
|
|
|
|
|
of Data, WindowUpdate:
|
|
|
|
|
if MsgFlags.Syn in header.flags:
|
|
|
|
|
if header.streamId in m.channels:
|
|
|
|
|
debug "Trying to create an existing channel, skipping", id = header.streamId
|
|
|
|
|
else:
|
|
|
|
|
if header.streamId in m.flushed:
|
|
|
|
|
m.flushed.del(header.streamId)
|
2024-02-02 14:14:02 +00:00
|
|
|
|
|
2022-07-04 13:19:21 +00:00
|
|
|
|
if header.streamId mod 2 == m.currentId mod 2:
|
2023-11-21 15:03:29 +00:00
|
|
|
|
debug "Peer used our reserved stream id, skipping",
|
|
|
|
|
id = header.streamId,
|
|
|
|
|
currentId = m.currentId,
|
|
|
|
|
peerId = m.connection.peerId
|
2022-07-04 13:19:21 +00:00
|
|
|
|
raise newException(YamuxError, "Peer used our reserved stream id")
|
2023-12-15 15:30:50 +00:00
|
|
|
|
let newStream =
|
|
|
|
|
m.createStream(header.streamId, false, m.windowSize, m.maxSendQueueSize)
|
2022-08-01 12:52:42 +00:00
|
|
|
|
if m.channels.len >= m.maxChannCount:
|
|
|
|
|
await newStream.reset()
|
|
|
|
|
continue
|
2022-07-04 13:19:21 +00:00
|
|
|
|
await newStream.open()
|
|
|
|
|
asyncSpawn m.handleStream(newStream)
|
|
|
|
|
elif header.streamId notin m.channels:
|
2024-03-03 23:27:13 +00:00
|
|
|
|
# Flush the data
|
|
|
|
|
m.flushed.withValue(header.streamId, flushed):
|
|
|
|
|
if header.msgType == Data:
|
|
|
|
|
flushed[].dec(int(header.length))
|
|
|
|
|
if flushed[] < 0:
|
|
|
|
|
raise
|
|
|
|
|
newException(YamuxError, "Peer exhausted the recvWindow after reset")
|
2024-03-20 12:35:44 +00:00
|
|
|
|
if header.length > 0:
|
|
|
|
|
var buffer = newSeqUninitialized[byte](header.length)
|
|
|
|
|
await m.connection.readExactly(addr buffer[0], int(header.length))
|
2024-03-03 23:27:13 +00:00
|
|
|
|
do:
|
|
|
|
|
raise newException(YamuxError, "Unknown stream ID: " & $header.streamId)
|
2022-07-04 13:19:21 +00:00
|
|
|
|
continue
|
|
|
|
|
|
2024-03-03 23:24:18 +00:00
|
|
|
|
let channel =
|
|
|
|
|
try:
|
|
|
|
|
m.channels[header.streamId]
|
|
|
|
|
except KeyError:
|
|
|
|
|
raise newException(
|
|
|
|
|
YamuxError,
|
|
|
|
|
"Stream was cleaned up before handling data: " & $header.streamId,
|
|
|
|
|
)
|
2022-07-04 13:19:21 +00:00
|
|
|
|
|
|
|
|
|
if header.msgType == WindowUpdate:
|
|
|
|
|
channel.sendWindow += int(header.length)
|
|
|
|
|
await channel.trySend()
|
|
|
|
|
else:
|
|
|
|
|
if header.length.int > channel.recvWindow.int:
|
|
|
|
|
# check before allocating the buffer
|
|
|
|
|
raise newException(YamuxError, "Peer exhausted the recvWindow")
|
|
|
|
|
|
|
|
|
|
if header.length > 0:
|
|
|
|
|
var buffer = newSeqUninitialized[byte](header.length)
|
|
|
|
|
await m.connection.readExactly(addr buffer[0], int(header.length))
|
2024-08-14 15:19:54 +00:00
|
|
|
|
trace "Msg Rcv", description = shortLog(buffer)
|
2022-07-04 13:19:21 +00:00
|
|
|
|
await channel.gotDataFromRemote(buffer)
|
|
|
|
|
|
|
|
|
|
if MsgFlags.Fin in header.flags:
|
|
|
|
|
trace "remote closed channel"
|
|
|
|
|
await channel.remoteClosed()
|
|
|
|
|
if MsgFlags.Rst in header.flags:
|
|
|
|
|
trace "remote reset channel"
|
|
|
|
|
await channel.reset()
|
2024-03-05 07:06:27 +00:00
|
|
|
|
except CancelledError as exc:
|
2024-08-14 15:19:54 +00:00
|
|
|
|
debug "Unexpected cancellation in yamux handler", description = exc.msg
|
2022-07-04 13:19:21 +00:00
|
|
|
|
except LPStreamEOFError as exc:
|
2024-08-14 15:19:54 +00:00
|
|
|
|
trace "Stream EOF", description = exc.msg
|
2024-03-05 07:06:27 +00:00
|
|
|
|
except LPStreamError as exc:
|
2024-08-14 15:19:54 +00:00
|
|
|
|
debug "Unexpected stream exception in yamux read loop", description = exc.msg
|
2022-07-04 13:19:21 +00:00
|
|
|
|
except YamuxError as exc:
|
2024-08-14 15:19:54 +00:00
|
|
|
|
trace "Closing yamux connection", description = exc.msg
|
2024-03-05 07:06:27 +00:00
|
|
|
|
try:
|
|
|
|
|
await m.connection.write(YamuxHeader.goAway(ProtocolError))
|
|
|
|
|
except CancelledError, LPStreamError:
|
|
|
|
|
discard
|
|
|
|
|
except MuxerError as exc:
|
2024-08-14 15:19:54 +00:00
|
|
|
|
debug "Unexpected muxer exception in yamux read loop", description = exc.msg
|
2024-03-05 07:06:27 +00:00
|
|
|
|
try:
|
|
|
|
|
await m.connection.write(YamuxHeader.goAway(ProtocolError))
|
|
|
|
|
except CancelledError, LPStreamError:
|
|
|
|
|
discard
|
2022-07-04 13:19:21 +00:00
|
|
|
|
finally:
|
|
|
|
|
await m.close()
|
|
|
|
|
trace "Stopped yamux handler"
|
|
|
|
|
|
2023-03-30 22:16:39 +00:00
|
|
|
|
method getStreams*(m: Yamux): seq[Connection] =
|
|
|
|
|
for c in m.channels.values:
|
|
|
|
|
result.add(c)
|
|
|
|
|
|
2022-07-04 13:19:21 +00:00
|
|
|
|
method newStream*(
|
2024-03-05 07:06:27 +00:00
|
|
|
|
m: Yamux, name: string = "", lazy: bool = false
|
|
|
|
|
): Future[Connection] {.async: (raises: [CancelledError, LPStreamError, MuxerError]).} =
|
2022-08-01 12:52:42 +00:00
|
|
|
|
if m.channels.len > m.maxChannCount - 1:
|
|
|
|
|
raise newException(TooManyChannels, "max allowed channel count exceeded")
|
2023-12-15 15:30:50 +00:00
|
|
|
|
let stream = m.createStream(m.currentId, true, m.windowSize, m.maxSendQueueSize)
|
2022-07-04 13:19:21 +00:00
|
|
|
|
m.currentId += 2
|
|
|
|
|
if not lazy:
|
|
|
|
|
await stream.open()
|
|
|
|
|
return stream
|
|
|
|
|
|
2024-03-05 07:06:27 +00:00
|
|
|
|
proc new*(
|
|
|
|
|
T: type[Yamux],
|
|
|
|
|
conn: Connection,
|
|
|
|
|
maxChannCount: int = MaxChannelCount,
|
|
|
|
|
windowSize: int = YamuxDefaultWindowSize,
|
|
|
|
|
maxSendQueueSize: int = MaxSendQueueSize,
|
|
|
|
|
inTimeout: Duration = 5.minutes,
|
|
|
|
|
outTimeout: Duration = 5.minutes,
|
|
|
|
|
): T =
|
2022-07-04 13:19:21 +00:00
|
|
|
|
T(
|
|
|
|
|
connection: conn,
|
2022-08-01 12:52:42 +00:00
|
|
|
|
currentId: if conn.dir == Out: 1 else: 2,
|
2023-12-15 15:30:50 +00:00
|
|
|
|
maxChannCount: maxChannCount,
|
|
|
|
|
windowSize: windowSize,
|
2024-02-22 09:21:34 +00:00
|
|
|
|
maxSendQueueSize: maxSendQueueSize,
|
|
|
|
|
inTimeout: inTimeout,
|
|
|
|
|
outTimeout: outTimeout,
|
2022-07-04 13:19:21 +00:00
|
|
|
|
)
|