
309 lines
9.6 KiB

# Nim-LibP2P
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
{.push raises: [].}
import std/[oids, strformat]
import pkg/[chronos, chronicles, metrics, nimcrypto/utils]
import ./coder,
../../stream/[bufferstream, connection, streamseq],
export connection
topics = "libp2p mplexchannel"
when defined(libp2p_mplex_metrics):
declareHistogram libp2p_mplex_qlen, "message queue length",
buckets = [0.0, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0]
declareCounter libp2p_mplex_qlenclose, "closed because of max queuelen"
declareHistogram libp2p_mplex_qtime, "message queuing time"
when defined(libp2p_network_protocols_metrics):
declareCounter libp2p_protocols_bytes, "total sent or received bytes", ["protocol", "direction"]
## Channel half-closed states
## | State | Closed local | Closed remote
## |=============================================
## | Read | Yes (until EOF) | No
## | Write | No | Yes
## Channels are considered fully closed when both outgoing and incoming
## directions are closed and when the reader of the channel has read the
## EOF marker
MaxWrites = 1024 ##\
## Maximum number of in-flight writes - after this, we disconnect the peer
LPChannelTrackerName* = "LPChannel"
LPChannel* = ref object of BufferStream
id*: uint64 # channel id
name*: string # name of the channel (for debugging)
conn*: Connection # wrapped connection used to for writing
initiator*: bool # initiated remotely or locally flag
isOpen*: bool # has channel been opened
closedLocal*: bool # has channel been closed locally
remoteReset*: bool # has channel been remotely reset
localReset*: bool # has channel been reset locally
msgCode*: MessageType # cached in/out message code
closeCode*: MessageType # cached in/out close code
resetCode*: MessageType # cached in/out reset code
writes*: int # In-flight writes
func shortLog*(s: LPChannel): auto =
if s.isNil: "LPChannel(nil)"
elif != $s.oid and > 0:
else: &"{shortLog(s.conn.peerId)}:{s.oid}"
except ValueError as exc:
raise newException(Defect, exc.msg)
chronicles.formatIt(LPChannel): shortLog(it)
proc open*(s: LPChannel) {.async, gcsafe.} =
trace "Opening channel", s, conn = s.conn
if s.conn.isClosed:
await s.conn.writeMsg(, MessageType.New,
s.isOpen = true
except CancelledError as exc:
raise exc
except CatchableError as exc:
await s.conn.close()
raise exc
method closed*(s: LPChannel): bool =
proc closeUnderlying(s: LPChannel): Future[void] {.async.} =
## Channels may be closed for reading and writing in any order - we'll close
## the underlying bufferstream when both directions are closed
if s.closedLocal and s.atEof():
await procCall BufferStream(s).close()
proc reset*(s: LPChannel) {.async, gcsafe.} =
if s.isClosed:
trace "Already closed", s
s.isClosed = true
s.closedLocal = true
s.localReset = not s.remoteReset
trace "Resetting channel", s, len = s.len
if s.isOpen and not s.conn.isClosed:
# If the connection is still active, notify the other end
proc resetMessage() {.async.} =
trace "sending reset message", s, conn = s.conn
await s.conn.writeMsg(, s.resetCode) # write reset
except CatchableError as exc:
# No cancellations
await s.conn.close()
trace "Can't send reset message", s, conn = s.conn, msg = exc.msg
asyncSpawn resetMessage()
await s.closeImpl() # noraises, nocancels
trace "Channel reset", s
method close*(s: LPChannel) {.async, gcsafe.} =
## Close channel for writing - a message will be sent to the other peer
## informing them that the channel is closed and that we're waiting for
## their acknowledgement.
if s.closedLocal:
trace "Already closed", s
s.closedLocal = true
trace "Closing channel", s, conn = s.conn, len = s.len
if s.isOpen and not s.conn.isClosed:
await s.conn.writeMsg(, s.closeCode) # write close
except CancelledError as exc:
await s.conn.close()
raise exc
except CatchableError as exc:
# It's harmless that close message cannot be sent - the connection is
# likely down already
await s.conn.close()
trace "Cannot send close message", s, id =, msg = exc.msg
await s.closeUnderlying() # maybe already eofed
trace "Closed channel", s, len = s.len
method initStream*(s: LPChannel) =
if s.objName.len == 0:
s.objName = LPChannelTrackerName
s.timeoutHandler = proc(): Future[void] {.gcsafe.} =
trace "Idle timeout expired, resetting LPChannel", s
procCall BufferStream(s).initStream()
method readOnce*(s: LPChannel,
pbytes: pointer,
nbytes: int):
Future[int] {.async.} =
## Mplex relies on reading being done regularly from every channel, or all
## channels are blocked - in particular, this means that reading from one
## channel must not be done from within a callback / read handler of another
## or the reads will lock each other.
if s.remoteReset:
raise newLPStreamResetError()
if s.localReset:
raise newLPStreamClosedError()
if s.atEof():
raise newLPStreamRemoteClosedError()
if s.conn.closed:
raise newLPStreamConnDownError()
let bytes = await procCall BufferStream(s).readOnce(pbytes, nbytes)
when defined(libp2p_network_protocols_metrics):
if s.protocol.len > 0:, labelValues=[s.protocol, "in"])
trace "readOnce", s, bytes
if bytes == 0:
await s.closeUnderlying()
return bytes
except CatchableError as exc:
# readOnce in BufferStream generally raises on EOF or cancellation - for
# the former, resetting is harmless, for the latter it's necessary because
# data has been lost in s.readBuf and there's no way to gracefully recover /
# use the channel any more
await s.reset()
raise newLPStreamConnDownError(exc)
proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
# prepareWrite is the slow path of writing a message - see conditions in
# write
if s.remoteReset:
raise newLPStreamResetError()
if s.closedLocal:
raise newLPStreamClosedError()
if s.conn.closed:
raise newLPStreamConnDownError()
if msg.len == 0:
if s.writes >= MaxWrites:
debug "Closing connection, too many in-flight writes on channel",
s, conn = s.conn, writes = s.writes
when defined(libp2p_mplex_metrics):
await s.reset()
await s.conn.close()
if not s.isOpen:
await s.conn.writeMsg(, s.msgCode, msg)
proc completeWrite(
s: LPChannel, fut: Future[void], msgLen: int): Future[void] {.async.} =
s.writes += 1
when defined(libp2p_mplex_metrics):
libp2p_mplex_qlen.observe(s.writes.int64 - 1)
await fut
await fut
when defined(libp2p_network_protocol_metrics):
if s.protocol.len > 0:, labelValues=[s.protocol, "out"])
s.activity = true
except CancelledError as exc:
# Chronos may still send the data
raise exc
except LPStreamConnDownError as exc:
await s.reset()
await s.conn.close()
raise exc
except LPStreamEOFError as exc:
raise exc
except CatchableError as exc:
trace "exception in lpchannel write handler", s, msg = exc.msg
await s.reset()
await s.conn.close()
raise newLPStreamConnDownError(exc)
s.writes -= 1
method write*(s: LPChannel, msg: seq[byte]): Future[void] =
## Write to mplex channel - there may be up to MaxWrite concurrent writes
## pending after which the peer is disconnected
closed = s.closedLocal or s.conn.closed
let fut =
if (not closed) and msg.len > 0 and s.writes < MaxWrites and s.isOpen:
# Fast path: Avoid a copy of msg being kept in the closure created by
# `{.async.}` as this drives up memory usage - the conditions are laid out
# in prepareWrite
s.conn.writeMsg(, s.msgCode, msg)
prepareWrite(s, msg)
s.completeWrite(fut, msg.len)
method getWrapped*(s: LPChannel): Connection = s.conn
proc init*(
L: type LPChannel,
id: uint64,
conn: Connection,
initiator: bool,
name: string = "",
timeout: Duration = DefaultChanTimeout): LPChannel =
let chann = L(
id: id,
name: name,
conn: conn,
initiator: initiator,
timeout: timeout,
isOpen: if initiator: false else: true,
msgCode: if initiator: MessageType.MsgOut else: MessageType.MsgIn,
closeCode: if initiator: MessageType.CloseOut else: MessageType.CloseIn,
resetCode: if initiator: MessageType.ResetOut else: MessageType.ResetIn,
dir: if initiator: Direction.Out else: Direction.In)
when chronicles.enabledLogLevel == LogLevel.TRACE: = if > 0: else: $chann.oid
trace "Created new lpchannel", s = chann, id, initiator
return chann