2022-07-01 18:19:57 +00:00
|
|
|
# Nim-LibP2P
|
2024-03-05 07:06:27 +00:00
|
|
|
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
2022-07-01 18:19:57 +00:00
|
|
|
# Licensed under either of
|
|
|
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
|
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
|
|
# at your option.
|
|
|
|
# This file may not be copied, modified, or distributed except according to
|
|
|
|
# those terms.
|
2019-09-03 20:40:51 +00:00
|
|
|
|
2023-06-07 11:12:49 +00:00
|
|
|
{.push raises: [].}
|
2021-05-21 16:27:01 +00:00
|
|
|
|
2020-05-20 00:14:15 +00:00
|
|
|
import tables, sequtils, oids
|
2020-06-29 15:15:31 +00:00
|
|
|
import chronos, chronicles, stew/byteutils, metrics
|
2019-12-04 04:44:54 +00:00
|
|
|
import ../muxer,
|
2020-06-19 17:29:43 +00:00
|
|
|
../../stream/connection,
|
2020-05-20 00:14:15 +00:00
|
|
|
../../stream/bufferstream,
|
2020-03-23 06:03:36 +00:00
|
|
|
../../utility,
|
2020-06-29 15:15:31 +00:00
|
|
|
../../peerinfo,
|
2020-09-14 08:19:54 +00:00
|
|
|
./coder,
|
|
|
|
./lpchannel
|
2019-09-03 20:40:51 +00:00
|
|
|
|
2020-07-17 18:44:41 +00:00
|
|
|
export muxer
|
|
|
|
|
2019-09-10 02:15:52 +00:00
|
|
|
logScope:
|
2020-12-01 17:34:27 +00:00
|
|
|
topics = "libp2p mplex"
|
2019-09-10 02:15:52 +00:00
|
|
|
|
2020-09-14 08:19:54 +00:00
|
|
|
const MplexCodec* = "/mplex/6.7.0"
|
|
|
|
|
2020-08-05 05:16:04 +00:00
|
|
|
const
|
|
|
|
MaxChannelCount = 200
|
|
|
|
|
2020-08-04 23:27:59 +00:00
|
|
|
when defined(libp2p_expensive_metrics):
|
2020-08-05 05:16:04 +00:00
|
|
|
declareGauge(libp2p_mplex_channels,
|
|
|
|
"mplex channels", labels = ["initiator", "peer"])
|
2020-06-29 15:15:31 +00:00
|
|
|
|
2019-09-04 03:08:51 +00:00
|
|
|
type
|
2021-01-29 16:14:53 +00:00
|
|
|
InvalidChannelIdError* = object of MuxerError
|
2020-08-05 05:16:04 +00:00
|
|
|
|
2019-09-03 20:40:51 +00:00
|
|
|
Mplex* = ref object of Muxer
|
2020-08-15 05:58:30 +00:00
|
|
|
channels: array[bool, Table[uint64, LPChannel]]
|
|
|
|
currentId: uint64
|
2020-07-17 18:44:41 +00:00
|
|
|
inChannTimeout: Duration
|
|
|
|
outChannTimeout: Duration
|
2020-05-20 00:14:15 +00:00
|
|
|
isClosed: bool
|
2020-07-17 18:44:41 +00:00
|
|
|
oid*: Oid
|
2020-08-05 05:16:04 +00:00
|
|
|
maxChannCount: int
|
|
|
|
|
2021-12-16 10:05:20 +00:00
|
|
|
func shortLog*(m: Mplex): auto =
|
2021-05-21 16:27:01 +00:00
|
|
|
shortLog(m.connection)
|
|
|
|
|
2020-09-06 08:31:47 +00:00
|
|
|
chronicles.formatIt(Mplex): shortLog(it)
|
|
|
|
|
2020-08-05 05:16:04 +00:00
|
|
|
proc newTooManyChannels(): ref TooManyChannels =
|
|
|
|
newException(TooManyChannels, "max allowed channel count exceeded")
|
2019-09-03 20:40:51 +00:00
|
|
|
|
2020-09-08 06:24:28 +00:00
|
|
|
proc newInvalidChannelIdError(): ref InvalidChannelIdError =
|
|
|
|
newException(InvalidChannelIdError, "max allowed channel count exceeded")
|
|
|
|
|
2024-03-05 07:06:27 +00:00
|
|
|
proc cleanupChann(m: Mplex, chann: LPChannel) {.async: (raises: []), inline.} =
|
2020-08-15 05:58:30 +00:00
|
|
|
## remove the local channel from the internal tables
|
|
|
|
##
|
2020-09-04 16:30:45 +00:00
|
|
|
try:
|
|
|
|
await chann.join()
|
|
|
|
m.channels[chann.initiator].del(chann.id)
|
2020-11-01 20:49:25 +00:00
|
|
|
trace "cleaned up channel", m, chann
|
2020-09-04 16:30:45 +00:00
|
|
|
|
|
|
|
when defined(libp2p_expensive_metrics):
|
|
|
|
libp2p_mplex_channels.set(
|
|
|
|
m.channels[chann.initiator].len.int64,
|
2021-09-08 09:07:46 +00:00
|
|
|
labelValues = [$chann.initiator, $m.connection.peerId])
|
2024-03-05 07:06:27 +00:00
|
|
|
except CancelledError as exc:
|
2020-09-14 08:19:54 +00:00
|
|
|
warn "Error cleaning up mplex channel", m, chann, msg = exc.msg
|
2019-09-03 20:40:51 +00:00
|
|
|
|
2024-03-05 07:06:27 +00:00
|
|
|
proc newStreamInternal*(
|
|
|
|
m: Mplex,
|
|
|
|
initiator: bool = true,
|
|
|
|
chanId: uint64 = 0,
|
|
|
|
name: string = "",
|
|
|
|
timeout: Duration): LPChannel {.gcsafe, raises: [InvalidChannelIdError].} =
|
2019-09-03 20:40:51 +00:00
|
|
|
## create new channel/stream
|
2020-06-29 15:15:31 +00:00
|
|
|
##
|
2024-03-05 07:06:27 +00:00
|
|
|
let id =
|
|
|
|
if initiator: m.currentId.inc(); m.currentId
|
2020-05-23 19:25:53 +00:00
|
|
|
else: chanId
|
|
|
|
|
2020-09-08 06:24:28 +00:00
|
|
|
if id in m.channels[initiator]:
|
|
|
|
raise newInvalidChannelIdError()
|
|
|
|
|
2020-07-17 18:44:41 +00:00
|
|
|
result = LPChannel.init(
|
|
|
|
id,
|
|
|
|
m.connection,
|
|
|
|
initiator,
|
|
|
|
name,
|
|
|
|
timeout = timeout)
|
2020-06-19 17:29:43 +00:00
|
|
|
|
2021-09-08 09:07:46 +00:00
|
|
|
result.peerId = m.connection.peerId
|
2020-06-19 17:29:43 +00:00
|
|
|
result.observedAddr = m.connection.observedAddr
|
2021-03-02 23:23:40 +00:00
|
|
|
result.transportDir = m.connection.transportDir
|
2021-11-08 16:42:56 +00:00
|
|
|
when defined(libp2p_agents_metrics):
|
|
|
|
result.shortAgent = m.connection.shortAgent
|
2020-06-19 17:29:43 +00:00
|
|
|
|
2020-09-14 08:19:54 +00:00
|
|
|
trace "Creating new channel", m, channel = result, id, initiator, name
|
2020-06-29 15:15:31 +00:00
|
|
|
|
2020-08-15 05:58:30 +00:00
|
|
|
m.channels[initiator][id] = result
|
2019-09-03 20:40:51 +00:00
|
|
|
|
2020-09-04 16:30:45 +00:00
|
|
|
# All the errors are handled inside `cleanupChann()` procedure.
|
|
|
|
asyncSpawn m.cleanupChann(result)
|
2020-06-29 15:15:31 +00:00
|
|
|
|
2020-08-04 23:27:59 +00:00
|
|
|
when defined(libp2p_expensive_metrics):
|
|
|
|
libp2p_mplex_channels.set(
|
2020-08-15 05:58:30 +00:00
|
|
|
m.channels[initiator].len.int64,
|
2021-09-08 09:07:46 +00:00
|
|
|
labelValues = [$initiator, $m.connection.peerId])
|
2020-06-29 15:15:31 +00:00
|
|
|
|
2024-03-05 07:06:27 +00:00
|
|
|
proc handleStream(m: Mplex, chann: LPChannel) {.async: (raises: []).} =
|
2020-06-29 15:15:31 +00:00
|
|
|
## call the muxer stream handler for this channel
|
|
|
|
##
|
2024-03-05 07:06:27 +00:00
|
|
|
await m.streamHandler(chann)
|
|
|
|
trace "finished handling stream", m, chann
|
|
|
|
doAssert(chann.closed, "connection not closed by handler!")
|
2020-06-24 15:08:44 +00:00
|
|
|
|
2024-03-05 07:06:27 +00:00
|
|
|
method handle*(m: Mplex) {.async: (raises: []).} =
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
trace "Starting mplex handler", m
|
2019-09-07 23:34:40 +00:00
|
|
|
try:
|
2020-08-03 05:20:11 +00:00
|
|
|
while not m.connection.atEof:
|
2020-09-06 08:31:47 +00:00
|
|
|
trace "waiting for data", m
|
2020-08-15 05:58:30 +00:00
|
|
|
let
|
|
|
|
(id, msgType, data) = await m.connection.readMsg()
|
|
|
|
initiator = bool(ord(msgType) and 1)
|
2020-06-29 15:15:31 +00:00
|
|
|
|
|
|
|
logScope:
|
|
|
|
id = id
|
|
|
|
initiator = initiator
|
|
|
|
msgType = msgType
|
|
|
|
size = data.len
|
|
|
|
|
2020-09-06 08:31:47 +00:00
|
|
|
trace "read message from connection", m, data = data.shortLog
|
2020-08-15 05:58:30 +00:00
|
|
|
|
|
|
|
var channel =
|
|
|
|
if MessageType(msgType) != MessageType.New:
|
|
|
|
let tmp = m.channels[initiator].getOrDefault(id, nil)
|
|
|
|
if tmp == nil:
|
2020-09-06 08:31:47 +00:00
|
|
|
trace "Channel not found, skipping", m
|
2020-08-15 05:58:30 +00:00
|
|
|
continue
|
|
|
|
|
|
|
|
tmp
|
|
|
|
else:
|
|
|
|
if m.channels[false].len > m.maxChannCount - 1:
|
2020-09-04 16:30:45 +00:00
|
|
|
warn "too many channels created by remote peer",
|
2024-03-05 07:06:27 +00:00
|
|
|
allowedMax = MaxChannelCount, m
|
2020-08-05 05:16:04 +00:00
|
|
|
raise newTooManyChannels()
|
|
|
|
|
2020-08-15 05:58:30 +00:00
|
|
|
let name = string.fromBytes(data)
|
|
|
|
m.newStreamInternal(false, id, name, timeout = m.outChannTimeout)
|
2020-06-29 15:15:31 +00:00
|
|
|
|
2020-09-14 08:19:54 +00:00
|
|
|
trace "Processing channel message", m, channel, data = data.shortLog
|
|
|
|
|
2020-08-15 05:58:30 +00:00
|
|
|
case msgType:
|
2024-03-05 07:06:27 +00:00
|
|
|
of MessageType.New:
|
|
|
|
trace "created channel", m, channel
|
|
|
|
|
|
|
|
if m.streamHandler != nil:
|
|
|
|
# Launch handler task
|
|
|
|
# All the errors are handled inside `handleStream()` procedure.
|
|
|
|
asyncSpawn m.handleStream(channel)
|
|
|
|
|
|
|
|
of MessageType.MsgIn, MessageType.MsgOut:
|
|
|
|
if data.len > MaxMsgSize:
|
|
|
|
warn "attempting to send a packet larger than allowed",
|
|
|
|
allowed = MaxMsgSize, channel
|
|
|
|
raise newLPStreamLimitError()
|
|
|
|
|
|
|
|
trace "pushing data to channel", m, channel, len = data.len
|
|
|
|
try:
|
|
|
|
await channel.pushData(data)
|
|
|
|
trace "pushed data to channel", m, channel, len = data.len
|
|
|
|
except LPStreamClosedError as exc:
|
|
|
|
# Channel is being closed, but `cleanupChann` was not yet triggered.
|
|
|
|
trace "pushing data to channel failed", m, channel, len = data.len,
|
|
|
|
msg = exc.msg
|
|
|
|
discard # Ignore message, same as if `cleanupChann` had completed.
|
|
|
|
|
|
|
|
of MessageType.CloseIn, MessageType.CloseOut:
|
|
|
|
await channel.pushEof()
|
|
|
|
of MessageType.ResetIn, MessageType.ResetOut:
|
|
|
|
channel.remoteReset = true
|
|
|
|
await channel.reset()
|
2020-09-04 16:30:45 +00:00
|
|
|
except CancelledError:
|
2020-11-01 20:49:25 +00:00
|
|
|
debug "Unexpected cancellation in mplex handler", m
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
except LPStreamEOFError as exc:
|
2020-09-14 08:19:54 +00:00
|
|
|
trace "Stream EOF", m, msg = exc.msg
|
2024-03-05 07:06:27 +00:00
|
|
|
except LPStreamError as exc:
|
|
|
|
debug "Unexpected stream exception in mplex read loop", m, msg = exc.msg
|
|
|
|
except MuxerError as exc:
|
|
|
|
debug "Unexpected muxer exception in mplex read loop", m, msg = exc.msg
|
2020-09-06 08:31:47 +00:00
|
|
|
finally:
|
|
|
|
await m.close()
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
trace "Stopped mplex handler", m
|
2019-09-03 20:40:51 +00:00
|
|
|
|
2024-03-05 07:06:27 +00:00
|
|
|
proc new*(
|
|
|
|
M: type Mplex,
|
|
|
|
conn: Connection,
|
|
|
|
inTimeout: Duration = DefaultChanTimeout,
|
|
|
|
outTimeout: Duration = DefaultChanTimeout,
|
|
|
|
maxChannCount: int = MaxChannelCount): Mplex =
|
2020-07-17 18:44:41 +00:00
|
|
|
M(connection: conn,
|
|
|
|
inChannTimeout: inTimeout,
|
|
|
|
outChannTimeout: outTimeout,
|
2020-08-05 05:16:04 +00:00
|
|
|
oid: genOid(),
|
|
|
|
maxChannCount: maxChannCount)
|
2020-05-20 00:14:15 +00:00
|
|
|
|
2024-03-05 07:06:27 +00:00
|
|
|
method newStream*(
|
|
|
|
m: Mplex,
|
|
|
|
name: string = "",
|
|
|
|
lazy: bool = false
|
|
|
|
): Future[Connection] {.async: (raises: [
|
|
|
|
CancelledError, LPStreamError, MuxerError]).} =
|
2020-09-21 17:48:19 +00:00
|
|
|
let channel = m.newStreamInternal(timeout = m.inChannTimeout)
|
2020-07-17 18:44:41 +00:00
|
|
|
|
2020-02-11 17:30:36 +00:00
|
|
|
if not lazy:
|
|
|
|
await channel.open()
|
2019-09-03 20:40:51 +00:00
|
|
|
|
2020-06-19 17:29:43 +00:00
|
|
|
return Connection(channel)
|
2020-05-20 00:14:15 +00:00
|
|
|
|
2024-03-05 07:06:27 +00:00
|
|
|
method close*(m: Mplex) {.async: (raises: []).} =
|
2020-05-20 00:14:15 +00:00
|
|
|
if m.isClosed:
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
trace "Already closed", m
|
2020-05-20 00:14:15 +00:00
|
|
|
return
|
2020-08-15 05:58:30 +00:00
|
|
|
m.isClosed = true
|
|
|
|
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
trace "Closing mplex", m
|
|
|
|
|
2020-09-21 17:48:19 +00:00
|
|
|
var channs = toSeq(m.channels[false].values) & toSeq(m.channels[true].values)
|
2020-06-29 15:15:31 +00:00
|
|
|
|
|
|
|
for chann in channs:
|
2020-09-21 17:48:19 +00:00
|
|
|
await chann.close()
|
2020-06-29 15:15:31 +00:00
|
|
|
|
|
|
|
await m.connection.close()
|
2020-08-15 05:58:30 +00:00
|
|
|
|
|
|
|
# TODO while we're resetting, new channels may be created that will not be
|
|
|
|
# closed properly
|
2020-09-21 17:48:19 +00:00
|
|
|
|
|
|
|
channs = toSeq(m.channels[false].values) & toSeq(m.channels[true].values)
|
|
|
|
|
|
|
|
for chann in channs:
|
|
|
|
await chann.reset()
|
|
|
|
|
2020-08-15 05:58:30 +00:00
|
|
|
m.channels[false].clear()
|
|
|
|
m.channels[true].clear()
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
|
|
|
|
trace "Closed mplex", m
|
2023-03-30 22:16:39 +00:00
|
|
|
|
|
|
|
method getStreams*(m: Mplex): seq[Connection] =
|
|
|
|
for c in m.channels[false].values: result.add(c)
|
|
|
|
for c in m.channels[true].values: result.add(c)
|