add {.async: (raises).} to libp2p/stream modules (#1050)

Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
Co-authored-by: Jacek Sieka <jacek@status.im>
This commit is contained in:
Etan Kissling 2024-03-05 08:06:27 +01:00 committed by GitHub
parent 8294d5b9df
commit 28609597d1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 1099 additions and 744 deletions

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -42,7 +42,10 @@ const MaxMsgSize* = 1 shl 20 # 1mb
proc newInvalidMplexMsgType*(): ref InvalidMplexMsgType =
newException(InvalidMplexMsgType, "invalid message type")
proc readMsg*(conn: Connection): Future[Msg] {.async.} =
proc readMsg*(
conn: Connection
): Future[Msg] {.async: (raises: [
CancelledError, LPStreamError, MuxerError]).} =
let header = await conn.readVarint()
trace "read header varint", varint = header, conn
@ -55,10 +58,13 @@ proc readMsg*(conn: Connection): Future[Msg] {.async.} =
return (header shr 3, MessageType(msgType), data)
proc writeMsg*(conn: Connection,
id: uint64,
msgType: MessageType,
data: seq[byte] = @[]): Future[void] =
proc writeMsg*(
conn: Connection,
id: uint64,
msgType: MessageType,
data: seq[byte] = @[]
): Future[void] {.async: (raises: [
CancelledError, LPStreamError], raw: true).} =
var
left = data.len
offset = 0
@ -84,8 +90,11 @@ proc writeMsg*(conn: Connection,
# message gets written before some of the chunks
conn.write(buf.buffer)
proc writeMsg*(conn: Connection,
id: uint64,
msgType: MessageType,
data: string): Future[void] =
proc writeMsg*(
conn: Connection,
id: uint64,
msgType: MessageType,
data: string
): Future[void] {.async: (raises: [
CancelledError, LPStreamError], raw: true).} =
conn.writeMsg(id, msgType, data.toBytes())

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -28,7 +28,8 @@ when defined(libp2p_mplex_metrics):
declareHistogram libp2p_mplex_qtime, "message queuing time"
when defined(libp2p_network_protocols_metrics):
declareCounter libp2p_protocols_bytes, "total sent or received bytes", ["protocol", "direction"]
declareCounter libp2p_protocols_bytes,
"total sent or received bytes", ["protocol", "direction"]
## Channel half-closed states
##
@ -64,16 +65,16 @@ type
func shortLog*(s: LPChannel): auto =
try:
if s.isNil: "LPChannel(nil)"
if s == nil: "LPChannel(nil)"
elif s.name != $s.oid and s.name.len > 0:
&"{shortLog(s.conn.peerId)}:{s.oid}:{s.name}"
else: &"{shortLog(s.conn.peerId)}:{s.oid}"
except ValueError as exc:
raise newException(Defect, exc.msg)
raiseAssert(exc.msg)
chronicles.formatIt(LPChannel): shortLog(it)
proc open*(s: LPChannel) {.async.} =
proc open*(s: LPChannel) {.async: (raises: [CancelledError, LPStreamError]).} =
trace "Opening channel", s, conn = s.conn
if s.conn.isClosed:
return
@ -82,20 +83,20 @@ proc open*(s: LPChannel) {.async.} =
s.isOpen = true
except CancelledError as exc:
raise exc
except CatchableError as exc:
except LPStreamError as exc:
await s.conn.close()
raise exc
method closed*(s: LPChannel): bool =
s.closedLocal
proc closeUnderlying(s: LPChannel): Future[void] {.async.} =
proc closeUnderlying(s: LPChannel): Future[void] {.async: (raises: []).} =
## Channels may be closed for reading and writing in any order - we'll close
## the underlying bufferstream when both directions are closed
if s.closedLocal and s.atEof():
await procCall BufferStream(s).close()
proc reset*(s: LPChannel) {.async.} =
proc reset*(s: LPChannel) {.async: (raises: []).} =
if s.isClosed:
trace "Already closed", s
return
@ -108,22 +109,21 @@ proc reset*(s: LPChannel) {.async.} =
if s.isOpen and not s.conn.isClosed:
# If the connection is still active, notify the other end
proc resetMessage() {.async.} =
proc resetMessage() {.async: (raises: []).} =
try:
trace "sending reset message", s, conn = s.conn
await s.conn.writeMsg(s.id, s.resetCode) # write reset
except CatchableError as exc:
# No cancellations
await s.conn.close()
await noCancel s.conn.writeMsg(s.id, s.resetCode) # write reset
except LPStreamError as exc:
trace "Can't send reset message", s, conn = s.conn, msg = exc.msg
await s.conn.close()
asyncSpawn resetMessage()
await s.closeImpl() # noraises, nocancels
await s.closeImpl()
trace "Channel reset", s
method close*(s: LPChannel) {.async.} =
method close*(s: LPChannel) {.async: (raises: []).} =
## Close channel for writing - a message will be sent to the other peer
## informing them that the channel is closed and that we're waiting for
## their acknowledgement.
@ -137,10 +137,9 @@ method close*(s: LPChannel) {.async.} =
if s.isOpen and not s.conn.isClosed:
try:
await s.conn.writeMsg(s.id, s.closeCode) # write close
except CancelledError as exc:
except CancelledError:
await s.conn.close()
raise exc
except CatchableError as exc:
except LPStreamError as exc:
# It's harmless that close message cannot be sent - the connection is
# likely down already
await s.conn.close()
@ -154,16 +153,17 @@ method initStream*(s: LPChannel) =
if s.objName.len == 0:
s.objName = LPChannelTrackerName
s.timeoutHandler = proc(): Future[void] {.gcsafe.} =
s.timeoutHandler = proc(): Future[void] {.async: (raises: [], raw: true).} =
trace "Idle timeout expired, resetting LPChannel", s
s.reset()
procCall BufferStream(s).initStream()
method readOnce*(s: LPChannel,
pbytes: pointer,
nbytes: int):
Future[int] {.async.} =
method readOnce*(
s: LPChannel,
pbytes: pointer,
nbytes: int
): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} =
## Mplex relies on reading being done regularly from every channel, or all
## channels are blocked - in particular, this means that reading from one
## channel must not be done from within a callback / read handler of another
@ -186,15 +186,19 @@ method readOnce*(s: LPChannel,
if bytes == 0:
await s.closeUnderlying()
return bytes
except CatchableError as exc:
# readOnce in BufferStream generally raises on EOF or cancellation - for
# the former, resetting is harmless, for the latter it's necessary because
# data has been lost in s.readBuf and there's no way to gracefully recover /
# use the channel any more
except CancelledError as exc:
await s.reset()
raise exc
except LPStreamError as exc:
# Resetting is necessary because data has been lost in s.readBuf and
# there's no way to gracefully recover / use the channel any more
await s.reset()
raise newLPStreamConnDownError(exc)
proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
proc prepareWrite(
s: LPChannel,
msg: seq[byte]
): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} =
# prepareWrite is the slow path of writing a message - see conditions in
# write
if s.remoteReset:
@ -222,7 +226,10 @@ proc prepareWrite(s: LPChannel, msg: seq[byte]): Future[void] {.async.} =
await s.conn.writeMsg(s.id, s.msgCode, msg)
proc completeWrite(
s: LPChannel, fut: Future[void], msgLen: int): Future[void] {.async.} =
s: LPChannel,
fut: Future[void].Raising([CancelledError, LPStreamError]),
msgLen: int
): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} =
try:
s.writes += 1
@ -250,7 +257,7 @@ proc completeWrite(
raise exc
except LPStreamEOFError as exc:
raise exc
except CatchableError as exc:
except LPStreamError as exc:
trace "exception in lpchannel write handler", s, msg = exc.msg
await s.reset()
await s.conn.close()
@ -258,7 +265,11 @@ proc completeWrite(
finally:
s.writes -= 1
method write*(s: LPChannel, msg: seq[byte]): Future[void] =
method write*(
s: LPChannel,
msg: seq[byte]
): Future[void] {.async: (raises: [
CancelledError, LPStreamError], raw: true).} =
## Write to mplex channel - there may be up to MaxWrite concurrent writes
## pending after which the peer is disconnected
@ -279,13 +290,12 @@ method write*(s: LPChannel, msg: seq[byte]): Future[void] =
method getWrapped*(s: LPChannel): Connection = s.conn
proc init*(
L: type LPChannel,
id: uint64,
conn: Connection,
initiator: bool,
name: string = "",
timeout: Duration = DefaultChanTimeout): LPChannel =
L: type LPChannel,
id: uint64,
conn: Connection,
initiator: bool,
name: string = "",
timeout: Duration = DefaultChanTimeout): LPChannel =
let chann = L(
id: id,
name: name,

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -56,7 +56,7 @@ proc newTooManyChannels(): ref TooManyChannels =
proc newInvalidChannelIdError(): ref InvalidChannelIdError =
newException(InvalidChannelIdError, "max allowed channel count exceeded")
proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} =
proc cleanupChann(m: Mplex, chann: LPChannel) {.async: (raises: []), inline.} =
## remove the local channel from the internal tables
##
try:
@ -68,19 +68,19 @@ proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} =
libp2p_mplex_channels.set(
m.channels[chann.initiator].len.int64,
labelValues = [$chann.initiator, $m.connection.peerId])
except CatchableError as exc:
except CancelledError as exc:
warn "Error cleaning up mplex channel", m, chann, msg = exc.msg
proc newStreamInternal*(m: Mplex,
initiator: bool = true,
chanId: uint64 = 0,
name: string = "",
timeout: Duration): LPChannel
{.gcsafe, raises: [InvalidChannelIdError].} =
proc newStreamInternal*(
m: Mplex,
initiator: bool = true,
chanId: uint64 = 0,
name: string = "",
timeout: Duration): LPChannel {.gcsafe, raises: [InvalidChannelIdError].} =
## create new channel/stream
##
let id = if initiator:
m.currentId.inc(); m.currentId
let id =
if initiator: m.currentId.inc(); m.currentId
else: chanId
if id in m.channels[initiator]:
@ -111,18 +111,14 @@ proc newStreamInternal*(m: Mplex,
m.channels[initiator].len.int64,
labelValues = [$initiator, $m.connection.peerId])
proc handleStream(m: Mplex, chann: LPChannel) {.async.} =
proc handleStream(m: Mplex, chann: LPChannel) {.async: (raises: []).} =
## call the muxer stream handler for this channel
##
try:
await m.streamHandler(chann)
trace "finished handling stream", m, chann
doAssert(chann.closed, "connection not closed by handler!")
except CatchableError as exc:
trace "Exception in mplex stream handler", m, chann, msg = exc.msg
await chann.reset()
await m.streamHandler(chann)
trace "finished handling stream", m, chann
doAssert(chann.closed, "connection not closed by handler!")
method handle*(m: Mplex) {.async.} =
method handle*(m: Mplex) {.async: (raises: []).} =
trace "Starting mplex handler", m
try:
while not m.connection.atEof:
@ -150,7 +146,7 @@ method handle*(m: Mplex) {.async.} =
else:
if m.channels[false].len > m.maxChannCount - 1:
warn "too many channels created by remote peer",
allowedMax = MaxChannelCount, m
allowedMax = MaxChannelCount, m
raise newTooManyChannels()
let name = string.fromBytes(data)
@ -159,59 +155,65 @@ method handle*(m: Mplex) {.async.} =
trace "Processing channel message", m, channel, data = data.shortLog
case msgType:
of MessageType.New:
trace "created channel", m, channel
of MessageType.New:
trace "created channel", m, channel
if not isNil(m.streamHandler):
# Launch handler task
# All the errors are handled inside `handleStream()` procedure.
asyncSpawn m.handleStream(channel)
if m.streamHandler != nil:
# Launch handler task
# All the errors are handled inside `handleStream()` procedure.
asyncSpawn m.handleStream(channel)
of MessageType.MsgIn, MessageType.MsgOut:
if data.len > MaxMsgSize:
warn "attempting to send a packet larger than allowed",
allowed = MaxMsgSize, channel
raise newLPStreamLimitError()
of MessageType.MsgIn, MessageType.MsgOut:
if data.len > MaxMsgSize:
warn "attempting to send a packet larger than allowed",
allowed = MaxMsgSize, channel
raise newLPStreamLimitError()
trace "pushing data to channel", m, channel, len = data.len
try:
await channel.pushData(data)
trace "pushed data to channel", m, channel, len = data.len
except LPStreamClosedError as exc:
# Channel is being closed, but `cleanupChann` was not yet triggered.
trace "pushing data to channel failed", m, channel, len = data.len,
msg = exc.msg
discard # Ignore message, same as if `cleanupChann` had completed.
trace "pushing data to channel", m, channel, len = data.len
try:
await channel.pushData(data)
trace "pushed data to channel", m, channel, len = data.len
except LPStreamClosedError as exc:
# Channel is being closed, but `cleanupChann` was not yet triggered.
trace "pushing data to channel failed", m, channel, len = data.len,
msg = exc.msg
discard # Ignore message, same as if `cleanupChann` had completed.
of MessageType.CloseIn, MessageType.CloseOut:
await channel.pushEof()
of MessageType.ResetIn, MessageType.ResetOut:
channel.remoteReset = true
await channel.reset()
of MessageType.CloseIn, MessageType.CloseOut:
await channel.pushEof()
of MessageType.ResetIn, MessageType.ResetOut:
channel.remoteReset = true
await channel.reset()
except CancelledError:
debug "Unexpected cancellation in mplex handler", m
except LPStreamEOFError as exc:
trace "Stream EOF", m, msg = exc.msg
except CatchableError as exc:
debug "Unexpected exception in mplex read loop", m, msg = exc.msg
except LPStreamError as exc:
debug "Unexpected stream exception in mplex read loop", m, msg = exc.msg
except MuxerError as exc:
debug "Unexpected muxer exception in mplex read loop", m, msg = exc.msg
finally:
await m.close()
trace "Stopped mplex handler", m
proc new*(M: type Mplex,
conn: Connection,
inTimeout: Duration = DefaultChanTimeout,
outTimeout: Duration = DefaultChanTimeout,
maxChannCount: int = MaxChannelCount): Mplex =
proc new*(
M: type Mplex,
conn: Connection,
inTimeout: Duration = DefaultChanTimeout,
outTimeout: Duration = DefaultChanTimeout,
maxChannCount: int = MaxChannelCount): Mplex =
M(connection: conn,
inChannTimeout: inTimeout,
outChannTimeout: outTimeout,
oid: genOid(),
maxChannCount: maxChannCount)
method newStream*(m: Mplex,
name: string = "",
lazy: bool = false): Future[Connection] {.async.} =
method newStream*(
m: Mplex,
name: string = "",
lazy: bool = false
): Future[Connection] {.async: (raises: [
CancelledError, LPStreamError, MuxerError]).} =
let channel = m.newStreamInternal(timeout = m.inChannTimeout)
if not lazy:
@ -219,7 +221,7 @@ method newStream*(m: Mplex,
return Connection(channel)
method close*(m: Mplex) {.async.} =
method close*(m: Mplex) {.async: (raises: []).} =
if m.isClosed:
trace "Already closed", m
return

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -23,16 +23,17 @@ type
MuxerError* = object of LPError
TooManyChannels* = object of MuxerError
StreamHandler* = proc(conn: Connection): Future[void] {.gcsafe, raises: [].}
MuxerHandler* = proc(muxer: Muxer): Future[void] {.gcsafe, raises: [].}
StreamHandler* = proc(conn: Connection): Future[void] {.async: (raises: []).}
MuxerHandler* = proc(muxer: Muxer): Future[void] {.async: (raises: []).}
Muxer* = ref object of RootObj
streamHandler*: StreamHandler
handler*: Future[void]
handler*: Future[void].Raising([])
connection*: Connection
# user provider proc that returns a constructed Muxer
MuxerConstructor* = proc(conn: Connection): Muxer {.gcsafe, closure, raises: [].}
MuxerConstructor* =
proc(conn: Connection): Muxer {.gcsafe, closure, raises: [].}
# this wraps a creator proc that knows how to make muxers
MuxerProvider* = object
@ -40,24 +41,32 @@ type
codec*: string
func shortLog*(m: Muxer): auto =
if isNil(m): "nil"
if m == nil: "nil"
else: shortLog(m.connection)
chronicles.formatIt(Muxer): shortLog(it)
# muxer interface
method newStream*(m: Muxer, name: string = "", lazy: bool = false):
Future[Connection] {.base, async.} = discard
method close*(m: Muxer) {.base, async.} =
if not isNil(m.connection):
method newStream*(
m: Muxer,
name: string = "",
lazy: bool = false
): Future[Connection] {.base, async: (raises: [
CancelledError, LPStreamError, MuxerError], raw: true).} =
raiseAssert("Not implemented!")
method close*(m: Muxer) {.base, async: (raises: []).} =
if m.connection != nil:
await m.connection.close()
method handle*(m: Muxer): Future[void] {.base, async.} = discard
method handle*(m: Muxer): Future[void] {.base, async: (raises: []).} = discard
proc new*(
T: typedesc[MuxerProvider],
creator: MuxerConstructor,
codec: string): T {.gcsafe.} =
T: typedesc[MuxerProvider],
creator: MuxerConstructor,
codec: string): T {.gcsafe.} =
let muxerProvider = T(newMuxer: creator, codec: codec)
muxerProvider
method getStreams*(m: Muxer): seq[Connection] {.base.} = doAssert false, "not implemented"
method getStreams*(m: Muxer): seq[Connection] {.base.} =
raiseAssert("Not implemented!")

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -27,11 +27,14 @@ const
MaxChannelCount = 200
when defined(libp2p_yamux_metrics):
declareGauge(libp2p_yamux_channels, "yamux channels", labels = ["initiator", "peer"])
declareHistogram libp2p_yamux_send_queue, "message send queue length (in byte)",
buckets = [0.0, 100.0, 250.0, 1000.0, 2000.0, 3200.0, 6400.0, 25600.0, 256000.0]
declareHistogram libp2p_yamux_recv_queue, "message recv queue length (in byte)",
buckets = [0.0, 100.0, 250.0, 1000.0, 2000.0, 3200.0, 6400.0, 25600.0, 256000.0]
declareGauge libp2p_yamux_channels,
"yamux channels", labels = ["initiator", "peer"]
declareHistogram libp2p_yamux_send_queue,
"message send queue length (in byte)", buckets = [
0.0, 100.0, 250.0, 1000.0, 2000.0, 3200.0, 6400.0, 25600.0, 256000.0]
declareHistogram libp2p_yamux_recv_queue,
"message recv queue length (in byte)", buckets = [
0.0, 100.0, 250.0, 1000.0, 2000.0, 3200.0, 6400.0, 25600.0, 256000.0]
type
YamuxError* = object of MuxerError
@ -60,7 +63,10 @@ type
streamId: uint32
length: uint32
proc readHeader(conn: LPStream): Future[YamuxHeader] {.async.} =
proc readHeader(
conn: LPStream
): Future[YamuxHeader] {.async: (raises: [
CancelledError, LPStreamError, MuxerError]).} =
var buffer: array[12, byte]
await conn.readExactly(addr buffer[0], 12)
@ -74,10 +80,10 @@ proc readHeader(conn: LPStream): Future[YamuxHeader] {.async.} =
return result
proc `$`(header: YamuxHeader): string =
result = "{" & $header.msgType & ", "
result &= "{" & header.flags.foldl(if a != "": a & ", " & $b else: $b, "") & "}, "
result &= "streamId: " & $header.streamId & ", "
result &= "length: " & $header.length & "}"
"{" & $header.msgType & ", " &
"{" & header.flags.foldl(if a != "": a & ", " & $b else: $b, "") & "}, " &
"streamId: " & $header.streamId & ", " &
"length: " & $header.length & "}"
proc encode(header: YamuxHeader): array[12, byte] =
result[0] = header.version
@ -86,10 +92,14 @@ proc encode(header: YamuxHeader): array[12, byte] =
result[4..7] = toBytesBE(header.streamId)
result[8..11] = toBytesBE(header.length)
proc write(conn: LPStream, header: YamuxHeader): Future[void] {.gcsafe.} =
proc write(
conn: LPStream,
header: YamuxHeader
): Future[void] {.async: (raises: [
CancelledError, LPStreamError], raw: true).} =
trace "write directly on stream", h = $header
var buffer = header.encode()
return conn.write(@buffer)
conn.write(@buffer)
proc ping(T: type[YamuxHeader], flag: MsgFlags, pingData: uint32): T =
T(
@ -107,11 +117,10 @@ proc goAway(T: type[YamuxHeader], status: GoAwayStatus): T =
)
proc data(
T: type[YamuxHeader],
streamId: uint32,
length: uint32 = 0,
flags: set[MsgFlags] = {},
): T =
T: type[YamuxHeader],
streamId: uint32,
length: uint32 = 0,
flags: set[MsgFlags] = {}): T =
T(
version: YamuxVersion,
msgType: MsgType.Data,
@ -121,11 +130,10 @@ proc data(
)
proc windowUpdate(
T: type[YamuxHeader],
streamId: uint32,
delta: uint32,
flags: set[MsgFlags] = {},
): T =
T: type[YamuxHeader],
streamId: uint32,
delta: uint32,
flags: set[MsgFlags] = {}): T =
T(
version: YamuxVersion,
msgType: MsgType.WindowUpdate,
@ -138,7 +146,7 @@ type
ToSend = tuple
data: seq[byte]
sent: int
fut: Future[void]
fut: Future[void].Raising([CancelledError, LPStreamError])
YamuxChannel* = ref object of Connection
id: uint32
recvWindow: int
@ -153,7 +161,7 @@ type
recvQueue: seq[byte]
isReset: bool
remoteReset: bool
closedRemotely: Future[void]
closedRemotely: Future[void].Raising([])
closedLocally: bool
receivedData: AsyncEvent
returnedEof: bool
@ -162,7 +170,7 @@ proc `$`(channel: YamuxChannel): string =
result = if channel.conn.dir == Out: "=> " else: "<= "
result &= $channel.id
var s: seq[string] = @[]
if channel.closedRemotely.done():
if channel.closedRemotely.completed():
s.add("ClosedRemotely")
if channel.closedLocally:
s.add("ClosedLocally")
@ -184,17 +192,17 @@ proc lengthSendQueueWithLimit(channel: YamuxChannel): int =
# 3 big messages if the peer is stalling.
channel.sendQueue.foldl(a + min(b.data.len - b.sent, channel.maxSendQueueSize div 3), 0)
proc actuallyClose(channel: YamuxChannel) {.async.} =
proc actuallyClose(channel: YamuxChannel) {.async: (raises: []).} =
if channel.closedLocally and channel.sendQueue.len == 0 and
channel.closedRemotely.done():
channel.closedRemotely.completed():
await procCall Connection(channel).closeImpl()
proc remoteClosed(channel: YamuxChannel) {.async.} =
if not channel.closedRemotely.done():
proc remoteClosed(channel: YamuxChannel) {.async: (raises: []).} =
if not channel.closedRemotely.completed():
channel.closedRemotely.complete()
await channel.actuallyClose()
method closeImpl*(channel: YamuxChannel) {.async.} =
method closeImpl*(channel: YamuxChannel) {.async: (raises: []).} =
if not channel.closedLocally:
channel.closedLocally = true
channel.isEof = true
@ -204,7 +212,8 @@ method closeImpl*(channel: YamuxChannel) {.async.} =
except CancelledError, LPStreamError: discard
await channel.actuallyClose()
proc reset(channel: YamuxChannel, isLocal: bool = false) {.async.} =
proc reset(
channel: YamuxChannel, isLocal: bool = false) {.async: (raises: []).} =
# If we reset locally, we want to flush up to a maximum of recvWindow
# bytes. It's because the peer we're connected to can send us data before
# it receives the reset.
@ -221,17 +230,18 @@ proc reset(channel: YamuxChannel, isLocal: bool = false) {.async.} =
if not channel.closedLocally:
if isLocal and not channel.isSending:
try: await channel.conn.write(YamuxHeader.data(channel.id, 0, {Rst}))
except LPStreamEOFError as exc: discard
except LPStreamClosedError as exc: discard
except CancelledError, LPStreamError: discard
await channel.close()
if not channel.closedRemotely.done():
if not channel.closedRemotely.completed():
await channel.remoteClosed()
channel.receivedData.fire()
if not isLocal:
# If the reset is remote, there's no reason to flush anything.
channel.recvWindow = 0
proc updateRecvWindow(channel: YamuxChannel) {.async.} =
proc updateRecvWindow(
channel: YamuxChannel
) {.async: (raises: [CancelledError, LPStreamError]).} =
## Send to the peer a window update when the recvWindow is empty enough
##
# In order to avoid spamming a window update everytime a byte is read,
@ -249,14 +259,15 @@ proc updateRecvWindow(channel: YamuxChannel) {.async.} =
trace "increasing the recvWindow", delta
method readOnce*(
channel: YamuxChannel,
pbytes: pointer,
nbytes: int):
Future[int] {.async.} =
channel: YamuxChannel,
pbytes: pointer,
nbytes: int
): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} =
## Read from a yamux channel
if channel.isReset:
raise if channel.remoteReset:
raise
if channel.remoteReset:
newLPStreamResetError()
elif channel.closedLocally:
newLPStreamClosedError()
@ -269,7 +280,7 @@ method readOnce*(
try: # https://github.com/status-im/nim-chronos/issues/516
discard await race(channel.closedRemotely, channel.receivedData.wait())
except ValueError: raiseAssert("Futures list is not empty")
if channel.closedRemotely.done() and channel.recvQueue.len == 0:
if channel.closedRemotely.completed() and channel.recvQueue.len == 0:
channel.returnedEof = true
channel.isEof = true
return 0
@ -277,7 +288,8 @@ method readOnce*(
let toRead = min(channel.recvQueue.len, nbytes)
var p = cast[ptr UncheckedArray[byte]](pbytes)
toOpenArray(p, 0, nbytes - 1)[0..<toRead] = channel.recvQueue.toOpenArray(0, toRead - 1)
toOpenArray(p, 0, nbytes - 1)[0..<toRead] =
channel.recvQueue.toOpenArray(0, toRead - 1)
channel.recvQueue = channel.recvQueue[toRead..^1]
# We made some room in the recv buffer let the peer know
@ -285,7 +297,9 @@ method readOnce*(
channel.activity = true
return toRead
proc gotDataFromRemote(channel: YamuxChannel, b: seq[byte]) {.async.} =
proc gotDataFromRemote(
channel: YamuxChannel,
b: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} =
channel.recvWindow -= b.len
channel.recvQueue = channel.recvQueue.concat(b)
channel.receivedData.fire()
@ -296,7 +310,9 @@ proc gotDataFromRemote(channel: YamuxChannel, b: seq[byte]) {.async.} =
proc setMaxRecvWindow*(channel: YamuxChannel, maxRecvWindow: int) =
channel.maxRecvWindow = maxRecvWindow
proc trySend(channel: YamuxChannel) {.async.} =
proc trySend(
channel: YamuxChannel
) {.async: (raises: [CancelledError, LPStreamError]).} =
if channel.isSending:
return
channel.isSending = true
@ -307,12 +323,10 @@ proc trySend(channel: YamuxChannel) {.async.} =
if channel.sendWindow == 0:
trace "trying to send while the sendWindow is empty"
if channel.lengthSendQueueWithLimit() > channel.maxSendQueueSize:
trace "channel send queue too big, resetting", maxSendQueueSize=channel.maxSendQueueSize,
trace "channel send queue too big, resetting",
maxSendQueueSize = channel.maxSendQueueSize,
currentQueueSize = channel.lengthSendQueueWithLimit()
try:
await channel.reset(true)
except CatchableError as exc:
warn "failed to reset", msg=exc.msg
await channel.reset(isLocal = true)
break
let
@ -329,7 +343,7 @@ proc trySend(channel: YamuxChannel) {.async.} =
sendBuffer[0..<12] = header.encode()
var futures: seq[Future[void]]
var futures: seq[Future[void].Raising([CancelledError, LPStreamError])]
while inBuffer < toSend:
# concatenate the different message we try to send into one buffer
let (data, sent, fut) = channel.sendQueue[0]
@ -346,8 +360,15 @@ proc trySend(channel: YamuxChannel) {.async.} =
trace "try to send the buffer", h = $header
channel.sendWindow.dec(toSend)
try: await channel.conn.write(sendBuffer)
except CatchableError as exc:
try:
await channel.conn.write(sendBuffer)
except CancelledError:
trace "cancelled sending the buffer"
for fut in futures.items():
fut.cancelSoon()
await channel.reset()
break
except LPStreamError as exc:
trace "failed to send the buffer"
let connDown = newLPStreamConnDownError(exc)
for fut in futures.items():
@ -358,7 +379,11 @@ proc trySend(channel: YamuxChannel) {.async.} =
fut.complete()
channel.activity = true
method write*(channel: YamuxChannel, msg: seq[byte]): Future[void] =
method write*(
channel: YamuxChannel,
msg: seq[byte]
): Future[void] {.async: (raises: [
CancelledError, LPStreamError], raw: true).} =
## Write to yamux channel
##
result = newFuture[void]("Yamux Send")
@ -376,7 +401,9 @@ method write*(channel: YamuxChannel, msg: seq[byte]): Future[void] =
libp2p_yamux_send_queue.observe(channel.lengthSendQueue().int64)
asyncSpawn channel.trySend()
proc open(channel: YamuxChannel) {.async.} =
proc open(
channel: YamuxChannel
) {.async: (raises: [CancelledError, LPStreamError]).} =
## Open a yamux channel by sending a window update with Syn or Ack flag
##
if channel.opened:
@ -406,21 +433,28 @@ proc lenBySrc(m: Yamux, isSrc: bool): int =
for v in m.channels.values():
if v.isSrc == isSrc: result += 1
proc cleanupChannel(m: Yamux, channel: YamuxChannel) {.async.} =
await channel.join()
proc cleanupChannel(m: Yamux, channel: YamuxChannel) {.async: (raises: []).} =
try:
await channel.join()
except CancelledError:
discard
m.channels.del(channel.id)
when defined(libp2p_yamux_metrics):
libp2p_yamux_channels.set(m.lenBySrc(channel.isSrc).int64, [$channel.isSrc, $channel.peerId])
libp2p_yamux_channels.set(
m.lenBySrc(channel.isSrc).int64, [$channel.isSrc, $channel.peerId])
if channel.isReset and channel.recvWindow > 0:
m.flushed[channel.id] = channel.recvWindow
proc createStream(m: Yamux, id: uint32, isSrc: bool,
recvWindow: int, maxSendQueueSize: int): YamuxChannel =
# As you can see, during initialization, recvWindow can be larger than maxRecvWindow.
proc createStream(
m: Yamux, id: uint32, isSrc: bool,
recvWindow: int, maxSendQueueSize: int): YamuxChannel =
# During initialization, recvWindow can be larger than maxRecvWindow.
# This is because the peer we're connected to will always assume
# that the initial recvWindow is 256k.
# To solve this contradiction, no updateWindow will be sent until recvWindow is less
# than maxRecvWindow
# To solve this contradiction, no updateWindow will be sent until
# recvWindow is less than maxRecvWindow
proc newClosedRemotelyFut(): Future[void] {.async: (raises: [], raw: true).} =
newFuture[void]()
var stream = YamuxChannel(
id: id,
maxRecvWindow: recvWindow,
@ -430,7 +464,7 @@ proc createStream(m: Yamux, id: uint32, isSrc: bool,
isSrc: isSrc,
conn: m.connection,
receivedData: newAsyncEvent(),
closedRemotely: newFuture[void]()
closedRemotely: newClosedRemotelyFut()
)
stream.objName = "YamuxStream"
if isSrc:
@ -439,9 +473,10 @@ proc createStream(m: Yamux, id: uint32, isSrc: bool,
else:
stream.dir = Direction.In
stream.timeout = m.inTimeout
stream.timeoutHandler = proc(): Future[void] {.gcsafe.} =
trace "Idle timeout expired, resetting YamuxChannel"
stream.reset(true)
stream.timeoutHandler =
proc(): Future[void] {.async: (raises: [], raw: true).} =
trace "Idle timeout expired, resetting YamuxChannel"
stream.reset(isLocal = true)
stream.initStream()
stream.peerId = m.connection.peerId
stream.observedAddr = m.connection.observedAddr
@ -455,7 +490,7 @@ proc createStream(m: Yamux, id: uint32, isSrc: bool,
libp2p_yamux_channels.set(m.lenBySrc(isSrc).int64, [$isSrc, $stream.peerId])
return stream
method close*(m: Yamux) {.async.} =
method close*(m: Yamux) {.async: (raises: []).} =
if m.isClosed == true:
trace "Already closed"
return
@ -464,24 +499,21 @@ method close*(m: Yamux) {.async.} =
trace "Closing yamux"
let channels = toSeq(m.channels.values())
for channel in channels:
await channel.reset(true)
await channel.reset(isLocal = true)
try: await m.connection.write(YamuxHeader.goAway(NormalTermination))
except CatchableError as exc: trace "failed to send goAway", msg=exc.msg
except CancelledError as exc: trace "cancelled sending goAway", msg = exc.msg
except LPStreamError as exc: trace "failed to send goAway", msg = exc.msg
await m.connection.close()
trace "Closed yamux"
proc handleStream(m: Yamux, channel: YamuxChannel) {.async.} =
proc handleStream(m: Yamux, channel: YamuxChannel) {.async: (raises: []).} =
## Call the muxer stream handler for this channel
##
try:
await m.streamHandler(channel)
trace "finished handling stream"
doAssert(channel.isClosed, "connection not closed by handler!")
except CatchableError as exc:
trace "Exception in yamux stream handler", msg = exc.msg
await channel.reset()
await m.streamHandler(channel)
trace "finished handling stream"
doAssert(channel.isClosed, "connection not closed by handler!")
method handle*(m: Yamux) {.async.} =
method handle*(m: Yamux) {.async: (raises: []).} =
trace "Starting yamux handler", pid=m.connection.peerId
try:
while not m.connection.atEof:
@ -559,11 +591,24 @@ method handle*(m: Yamux) {.async.} =
if MsgFlags.Rst in header.flags:
trace "remote reset channel"
await channel.reset()
except CancelledError as exc:
debug "Unexpected cancellation in yamux handler", msg = exc.msg
except LPStreamEOFError as exc:
trace "Stream EOF", msg = exc.msg
except LPStreamError as exc:
debug "Unexpected stream exception in yamux read loop", msg = exc.msg
except YamuxError as exc:
trace "Closing yamux connection", error=exc.msg
await m.connection.write(YamuxHeader.goAway(ProtocolError))
try:
await m.connection.write(YamuxHeader.goAway(ProtocolError))
except CancelledError, LPStreamError:
discard
except MuxerError as exc:
debug "Unexpected muxer exception in yamux read loop", msg = exc.msg
try:
await m.connection.write(YamuxHeader.goAway(ProtocolError))
except CancelledError, LPStreamError:
discard
finally:
await m.close()
trace "Stopped yamux handler"
@ -572,10 +617,11 @@ method getStreams*(m: Yamux): seq[Connection] =
for c in m.channels.values: result.add(c)
method newStream*(
m: Yamux,
name: string = "",
lazy: bool = false): Future[Connection] {.async.} =
m: Yamux,
name: string = "",
lazy: bool = false
): Future[Connection] {.async: (raises: [
CancelledError, LPStreamError, MuxerError]).} =
if m.channels.len > m.maxChannCount - 1:
raise newException(TooManyChannels, "max allowed channel count exceeded")
let stream = m.createStream(m.currentId, true, m.windowSize, m.maxSendQueueSize)
@ -584,12 +630,13 @@ method newStream*(
await stream.open()
return stream
proc new*(T: type[Yamux], conn: Connection,
maxChannCount: int = MaxChannelCount,
windowSize: int = YamuxDefaultWindowSize,
maxSendQueueSize: int = MaxSendQueueSize,
inTimeout: Duration = 5.minutes,
outTimeout: Duration = 5.minutes): T =
proc new*(
T: type[Yamux], conn: Connection,
maxChannCount: int = MaxChannelCount,
windowSize: int = YamuxDefaultWindowSize,
maxSendQueueSize: int = MaxSendQueueSize,
inTimeout: Duration = 5.minutes,
outTimeout: Duration = 5.minutes): T =
T(
connection: conn,
currentId: if conn.dir == Out: 1 else: 2,

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -23,11 +23,15 @@ type
method readOnce*(
self: RelayConnection,
pbytes: pointer,
nbytes: int): Future[int] {.async.} =
nbytes: int
): Future[int] {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
self.activity = true
return await self.conn.readOnce(pbytes, nbytes)
self.conn.readOnce(pbytes, nbytes)
method write*(self: RelayConnection, msg: seq[byte]): Future[void] {.async.} =
method write*(
self: RelayConnection,
msg: seq[byte]
): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} =
self.dataSent.inc(msg.len)
if self.limitData != 0 and self.dataSent > self.limitData:
await self.close()
@ -35,25 +39,25 @@ method write*(self: RelayConnection, msg: seq[byte]): Future[void] {.async.} =
self.activity = true
await self.conn.write(msg)
method closeImpl*(self: RelayConnection): Future[void] {.async.} =
method closeImpl*(self: RelayConnection): Future[void] {.async: (raises: []).} =
await self.conn.close()
await procCall Connection(self).closeImpl()
method getWrapped*(self: RelayConnection): Connection = self.conn
proc new*(
T: typedesc[RelayConnection],
conn: Connection,
limitDuration: uint32,
limitData: uint64): T =
T: typedesc[RelayConnection],
conn: Connection,
limitDuration: uint32,
limitData: uint64): T =
let rc = T(conn: conn, limitDuration: limitDuration, limitData: limitData)
rc.dir = conn.dir
rc.initStream()
if limitDuration > 0:
proc checkDurationConnection() {.async.} =
let sleep = sleepAsync(limitDuration.seconds())
await sleep or conn.join()
if sleep.finished: await conn.close()
else: sleep.cancel()
proc checkDurationConnection() {.async: (raises: []).} =
try:
await noCancel conn.join().wait(limitDuration.seconds())
except AsyncTimeoutError:
await conn.close()
asyncSpawn checkDurationConnection()
return rc

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -99,10 +99,10 @@ type
func shortLog*(conn: NoiseConnection): auto =
try:
if conn.isNil: "NoiseConnection(nil)"
if conn == nil: "NoiseConnection(nil)"
else: &"{shortLog(conn.peerId)}:{conn.oid}"
except ValueError as exc:
raise newException(Defect, exc.msg)
raiseAssert(exc.msg)
chronicles.formatIt(NoiseConnection): shortLog(it)
@ -112,7 +112,7 @@ proc genKeyPair(rng: var HmacDrbgContext): KeyPair =
proc hashProtocol(name: string): MDigest[256] =
# If protocol_name is less than or equal to HASHLEN bytes in length,
# sets h equal to protocol_name with zero bytes appended to make HASHLEN bytes.
# sets h to protocol_name with zero bytes appended to make HASHLEN bytes.
# Otherwise sets h = HASH(protocol_name).
if name.len <= 32:
@ -301,7 +301,9 @@ template read_s: untyped =
msg.consume(rsLen)
proc readFrame(sconn: Connection): Future[seq[byte]] {.async.} =
proc readFrame(
sconn: Connection
): Future[seq[byte]] {.async: (raises: [CancelledError, LPStreamError]).} =
var besize {.noinit.}: array[2, byte]
await sconn.readExactly(addr besize[0], besize.len)
let size = uint16.fromBytesBE(besize).int
@ -426,7 +428,9 @@ proc handshakeXXInbound(
finally:
burnMem(hs)
method readMessage*(sconn: NoiseConnection): Future[seq[byte]] {.async.} =
method readMessage*(
sconn: NoiseConnection
): Future[seq[byte]] {.async: (raises: [CancelledError, LPStreamError]).} =
while true: # Discard 0-length payloads
let frame = await sconn.stream.readFrame()
sconn.activity = true
@ -458,7 +462,11 @@ proc encryptFrame(
cipherFrame[2 + src.len()..<cipherFrame.len] = tag
method write*(sconn: NoiseConnection, message: seq[byte]): Future[void] =
method write*(
sconn: NoiseConnection,
message: seq[byte]
): Future[void] {.async: (raises: [
CancelledError, LPStreamError], raw: true).} =
# Fast path: `{.async.}` would introduce a copy of `message`
const FramingSize = 2 + sizeof(ChaChaPolyTag)
@ -586,7 +594,7 @@ method handshake*(p: Noise, conn: Connection, initiator: bool, peerId: Opt[PeerI
return secure
method closeImpl*(s: NoiseConnection) {.async.} =
method closeImpl*(s: NoiseConnection) {.async: (raises: []).} =
await procCall SecureConn(s).closeImpl()
burnMem(s.readCs)
@ -597,15 +605,14 @@ method init*(p: Noise) {.gcsafe.} =
p.codec = NoiseCodec
proc new*(
T: typedesc[Noise],
rng: ref HmacDrbgContext,
privateKey: PrivateKey,
outgoing: bool = true,
commonPrologue: seq[byte] = @[]): T =
T: typedesc[Noise],
rng: ref HmacDrbgContext,
privateKey: PrivateKey,
outgoing: bool = true,
commonPrologue: seq[byte] = @[]): T =
let pkBytes = privateKey.getPublicKey()
.expect("Expected valid Private Key")
.getBytes().expect("Couldn't get public Key bytes")
.expect("Expected valid Private Key")
.getBytes().expect("Couldn't get public Key bytes")
var noise = Noise(
rng: rng,

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -77,10 +77,10 @@ type
func shortLog*(conn: SecioConn): auto =
try:
if conn.isNil: "SecioConn(nil)"
if conn == nil: "SecioConn(nil)"
else: &"{shortLog(conn.peerId)}:{conn.oid}"
except ValueError as exc:
raise newException(Defect, exc.msg)
raiseAssert(exc.msg)
chronicles.formatIt(SecioConn): shortLog(it)
@ -190,7 +190,9 @@ proc macCheckAndDecode(sconn: SecioConn, data: var seq[byte]): bool =
data.setLen(mark)
result = true
proc readRawMessage(conn: Connection): Future[seq[byte]] {.async.} =
proc readRawMessage(
conn: Connection
): Future[seq[byte]] {.async: (raises: [CancelledError, LPStreamError]).} =
while true: # Discard 0-length payloads
var lengthBuf: array[4, byte]
await conn.readExactly(addr lengthBuf[0], lengthBuf.len)
@ -211,7 +213,9 @@ proc readRawMessage(conn: Connection): Future[seq[byte]] {.async.} =
trace "Discarding 0-length payload", conn
method readMessage*(sconn: SecioConn): Future[seq[byte]] {.async.} =
method readMessage*(
sconn: SecioConn
): Future[seq[byte]] {.async: (raises: [CancelledError, LPStreamError]).} =
## Read message from channel secure connection ``sconn``.
when chronicles.enabledLogLevel == LogLevel.TRACE:
logScope:
@ -223,7 +227,9 @@ method readMessage*(sconn: SecioConn): Future[seq[byte]] {.async.} =
trace "Message MAC verification failed", buf = buf.shortLog
raise (ref SecioError)(msg: "message failed MAC verification")
method write*(sconn: SecioConn, message: seq[byte]) {.async.} =
method write*(
sconn: SecioConn,
message: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} =
## Write message ``message`` to secure connection ``sconn``.
if message.len == 0:
return
@ -432,13 +438,13 @@ method init(s: Secio) {.gcsafe.} =
s.codec = SecioCodec
proc new*(
T: typedesc[Secio],
rng: ref HmacDrbgContext,
localPrivateKey: PrivateKey): T =
T: typedesc[Secio],
rng: ref HmacDrbgContext,
localPrivateKey: PrivateKey): T =
let secio = Secio(
rng: rng,
localPrivateKey: localPrivateKey,
localPublicKey: localPrivateKey.getPublicKey().expect("Invalid private key"),
localPublicKey: localPrivateKey.getPublicKey().expect("Invalid private key")
)
secio.init()
secio

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -37,18 +37,19 @@ type
func shortLog*(conn: SecureConn): auto =
try:
if conn.isNil: "SecureConn(nil)"
if conn == nil: "SecureConn(nil)"
else: &"{shortLog(conn.peerId)}:{conn.oid}"
except ValueError as exc:
raise newException(Defect, exc.msg)
raiseAssert(exc.msg)
chronicles.formatIt(SecureConn): shortLog(it)
proc new*(T: type SecureConn,
conn: Connection,
peerId: PeerId,
observedAddr: Opt[MultiAddress],
timeout: Duration = DefaultConnectionTimeout): T =
proc new*(
T: type SecureConn,
conn: Connection,
peerId: PeerId,
observedAddr: Opt[MultiAddress],
timeout: Duration = DefaultConnectionTimeout): T =
result = T(stream: conn,
peerId: peerId,
observedAddr: observedAddr,
@ -63,15 +64,18 @@ method initStream*(s: SecureConn) =
procCall Connection(s).initStream()
method closeImpl*(s: SecureConn) {.async.} =
method closeImpl*(s: SecureConn) {.async: (raises: []).} =
trace "Closing secure conn", s, dir = s.dir
if not(isNil(s.stream)):
if s.stream != nil:
await s.stream.close()
await procCall Connection(s).closeImpl()
method readMessage*(c: SecureConn): Future[seq[byte]] {.async, base.} =
doAssert(false, "Not implemented!")
method readMessage*(
c: SecureConn
): Future[seq[byte]] {.async: (raises: [
CancelledError, LPStreamError], raw: true), base.} =
raiseAssert("Not implemented!")
method getWrapped*(s: SecureConn): Connection = s.stream
@ -79,12 +83,12 @@ method handshake*(s: Secure,
conn: Connection,
initiator: bool,
peerId: Opt[PeerId]): Future[SecureConn] {.async, base.} =
doAssert(false, "Not implemented!")
raiseAssert("Not implemented!")
proc handleConn(s: Secure,
conn: Connection,
initiator: bool,
peerId: Opt[PeerId]): Future[Connection] {.async.} =
conn: Connection,
initiator: bool,
peerId: Opt[PeerId]): Future[Connection] {.async.} =
var sconn = await s.handshake(conn, initiator, peerId)
# mark connection bottom level transport direction
# this is the safest place to do this
@ -122,7 +126,7 @@ proc handleConn(s: Secure,
# do not need to propagate CancelledError.
discard
if not isNil(sconn):
if sconn != nil:
# All the errors are handled inside `cleanup()` procedure.
asyncSpawn cleanup()
@ -154,10 +158,11 @@ method secure*(s: Secure,
Future[Connection] {.base.} =
s.handleConn(conn, conn.dir == Direction.Out, peerId)
method readOnce*(s: SecureConn,
pbytes: pointer,
nbytes: int):
Future[int] {.async.} =
method readOnce*(
s: SecureConn,
pbytes: pointer,
nbytes: int
): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} =
doAssert(nbytes > 0, "nbytes must be positive integer")
if s.isEof:
@ -174,7 +179,7 @@ method readOnce*(s: SecureConn,
raise err
except CancelledError as exc:
raise exc
except CatchableError as err:
except LPStreamError as err:
debug "Error while reading message from secure connection, closing.",
error = err.name,
message = err.msg,

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -34,10 +34,10 @@ type
func shortLog*(s: BufferStream): auto =
try:
if s.isNil: "BufferStream(nil)"
if s == nil: "BufferStream(nil)"
else: &"{shortLog(s.peerId)}:{s.oid}"
except ValueError as exc:
raise newException(Defect, exc.msg)
raiseAssert(exc.msg)
chronicles.formatIt(BufferStream): shortLog(it)
@ -55,14 +55,16 @@ method initStream*(s: BufferStream) =
trace "BufferStream created", s
proc new*(
T: typedesc[BufferStream],
timeout: Duration = DefaultConnectionTimeout): T =
T: typedesc[BufferStream],
timeout: Duration = DefaultConnectionTimeout): T =
let bufferStream = T(timeout: timeout)
bufferStream.initStream()
bufferStream
method pushData*(s: BufferStream, data: seq[byte]) {.base, async.} =
method pushData*(
s: BufferStream,
data: seq[byte]
) {.base, async: (raises: [CancelledError, LPStreamError]).} =
## Write bytes to internal read buffer, use this to fill up the
## buffer with data.
##
@ -70,7 +72,7 @@ method pushData*(s: BufferStream, data: seq[byte]) {.base, async.} =
##
doAssert(not s.pushing,
&"Only one concurrent push allowed for stream {s.shortLog()}")
"Only one concurrent push allowed for stream " & s.shortLog())
if s.isClosed or s.pushedEof:
raise newLPStreamClosedError()
@ -87,12 +89,14 @@ method pushData*(s: BufferStream, data: seq[byte]) {.base, async.} =
finally:
s.pushing = false
method pushEof*(s: BufferStream) {.base, async.} =
method pushEof*(
s: BufferStream
) {.base, async: (raises: [CancelledError, LPStreamError]).} =
if s.pushedEof:
return
doAssert(not s.pushing,
&"Only one concurrent push allowed for stream {s.shortLog()}")
"Only one concurrent push allowed for stream " & s.shortLog())
s.pushedEof = true
@ -108,13 +112,14 @@ method pushEof*(s: BufferStream) {.base, async.} =
method atEof*(s: BufferStream): bool =
s.isEof and s.readBuf.len == 0
method readOnce*(s: BufferStream,
pbytes: pointer,
nbytes: int):
Future[int] {.async.} =
method readOnce*(
s: BufferStream,
pbytes: pointer,
nbytes: int
): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} =
doAssert(nbytes > 0, "nbytes must be positive integer")
doAssert(not s.reading,
&"Only one concurrent read allowed for stream {s.shortLog()}")
"Only one concurrent read allowed for stream " & s.shortLog())
if s.returnedEof:
raise newLPStreamEOFError()
@ -135,13 +140,6 @@ method readOnce*(s: BufferStream,
# Not very efficient, but shouldn't happen often
s.readBuf.assign(@(p.toOpenArray(0, rbytes - 1)) & @(s.readBuf.data))
raise exc
except CatchableError as exc:
# When an exception happens here, the Bufferstream is effectively
# broken and no more reads will be valid - for now, return EOF if it's
# called again, though this is not completely true - EOF represents an
# "orderly" shutdown and that's not what happened here..
s.returnedEof = true
raise exc
finally:
s.reading = false
@ -173,7 +171,8 @@ method readOnce*(s: BufferStream,
return rbytes
method closeImpl*(s: BufferStream): Future[void] =
method closeImpl*(
s: BufferStream): Future[void] {.async: (raises: [], raw: true).} =
## close the stream and clear the buffer
trace "Closing BufferStream", s, len = s.len
@ -209,8 +208,8 @@ method closeImpl*(s: BufferStream): Future[void] =
if not s.readQueue.empty():
discard s.readQueue.popFirstNoWait()
except AsyncQueueFullError, AsyncQueueEmptyError:
raise newException(Defect, getCurrentExceptionMsg())
raiseAssert(getCurrentExceptionMsg())
trace "Closed BufferStream", s
procCall Connection(s).closeImpl() # noraises, nocancels
procCall Connection(s).closeImpl()

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -31,18 +31,22 @@ type
tracked: bool
when defined(libp2p_agents_metrics):
declareGauge(libp2p_peers_identity, "peers identities", labels = ["agent"])
declareCounter(libp2p_peers_traffic_read, "incoming traffic", labels = ["agent"])
declareCounter(libp2p_peers_traffic_write, "outgoing traffic", labels = ["agent"])
declareGauge libp2p_peers_identity,
"peers identities", labels = ["agent"]
declareCounter libp2p_peers_traffic_read,
"incoming traffic", labels = ["agent"]
declareCounter libp2p_peers_traffic_write,
"outgoing traffic", labels = ["agent"]
declareCounter(libp2p_network_bytes, "total traffic", labels = ["direction"])
declareCounter libp2p_network_bytes,
"total traffic", labels = ["direction"]
func shortLog*(conn: ChronosStream): auto =
try:
if conn.isNil: "ChronosStream(nil)"
if conn == nil: "ChronosStream(nil)"
else: &"{shortLog(conn.peerId)}:{conn.oid}"
except ValueError as exc:
raise newException(Defect, exc.msg)
raiseAssert(exc.msg)
chronicles.formatIt(ChronosStream): shortLog(it)
@ -50,17 +54,18 @@ method initStream*(s: ChronosStream) =
if s.objName.len == 0:
s.objName = ChronosStreamTrackerName
s.timeoutHandler = proc() {.async.} =
s.timeoutHandler = proc(): Future[void] {.async: (raises: [], raw: true).} =
trace "Idle timeout expired, closing ChronosStream", s
await s.close()
s.close()
procCall Connection(s).initStream()
proc init*(C: type ChronosStream,
client: StreamTransport,
dir: Direction,
timeout = DefaultChronosStreamTimeout,
observedAddr: Opt[MultiAddress]): ChronosStream =
proc init*(
C: type ChronosStream,
client: StreamTransport,
dir: Direction,
timeout = DefaultChronosStreamTimeout,
observedAddr: Opt[MultiAddress]): ChronosStream =
result = C(client: client,
timeout: timeout,
dir: dir,
@ -94,7 +99,11 @@ when defined(libp2p_agents_metrics):
libp2p_peers_identity.dec(labelValues = [s.shortAgent])
s.tracked = false
method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async.} =
method readOnce*(
s: ChronosStream,
pbytes: pointer,
nbytes: int
): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} =
if s.atEof:
raise newLPStreamEOFError()
withExceptions:
@ -107,7 +116,10 @@ method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.
libp2p_peers_traffic_read.inc(result.int64, labelValues = [s.shortAgent])
proc completeWrite(
s: ChronosStream, fut: Future[int], msgLen: int): Future[void] {.async.} =
s: ChronosStream,
fut: Future[int].Raising([TransportError, CancelledError]),
msgLen: int
): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} =
withExceptions:
# StreamTransport will only return written < msg.len on fatal failures where
# further writing is not possible - in such cases, we'll raise here,
@ -124,7 +136,11 @@ proc completeWrite(
if s.tracked:
libp2p_peers_traffic_write.inc(msgLen.int64, labelValues = [s.shortAgent])
method write*(s: ChronosStream, msg: seq[byte]): Future[void] =
method write*(
s: ChronosStream,
msg: seq[byte]
): Future[void] {.async: (raises: [
CancelledError, LPStreamError], raw: true).} =
# Avoid a copy of msg being kept in the closure created by `{.async.}` as this
# drives up memory usage
if msg.len == 0:
@ -145,19 +161,14 @@ method closed*(s: ChronosStream): bool =
method atEof*(s: ChronosStream): bool =
s.client.atEof()
method closeImpl*(s: ChronosStream) {.async.} =
try:
trace "Shutting down chronos stream", address = $s.client.remoteAddress(), s
method closeImpl*(
s: ChronosStream) {.async: (raises: []).} =
trace "Shutting down chronos stream", address = $s.client.remoteAddress(), s
if not s.client.closed():
await s.client.closeWait()
if not s.client.closed():
await s.client.closeWait()
trace "Shutdown chronos stream", address = $s.client.remoteAddress(), s
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "Error closing chronosstream", s, msg = exc.msg
trace "Shutdown chronos stream", address = $s.client.remoteAddress(), s
when defined(libp2p_agents_metrics):
# do this after closing!

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -27,25 +27,25 @@ const
DefaultConnectionTimeout* = 5.minutes
type
TimeoutHandler* = proc(): Future[void] {.gcsafe, raises: [].}
TimeoutHandler* = proc(): Future[void] {.async: (raises: []).}
Connection* = ref object of LPStream
activity*: bool # reset every time data is sent or received
timeout*: Duration # channel timeout if no activity
timerTaskFut: Future[void] # the current timer instance
activity*: bool # reset every time data is sent or received
timeout*: Duration # channel timeout if no activity
timerTaskFut: Future[void].Raising([]) # the current timer instance
timeoutHandler*: TimeoutHandler # timeout handler
peerId*: PeerId
observedAddr*: Opt[MultiAddress]
protocol*: string # protocol used by the connection, used as tag for metrics
transportDir*: Direction # The bottom level transport (generally the socket) direction
protocol*: string # protocol used by the connection, used as metrics tag
transportDir*: Direction # underlying transport (usually socket) direction
when defined(libp2p_agents_metrics):
shortAgent*: string
proc timeoutMonitor(s: Connection) {.async.}
proc timeoutMonitor(s: Connection) {.async: (raises: []).}
func shortLog*(conn: Connection): string =
try:
if conn.isNil: "Connection(nil)"
if conn == nil: "Connection(nil)"
else: &"{shortLog(conn.peerId)}:{conn.oid}"
except ValueError as exc:
raiseAssert(exc.msg)
@ -58,23 +58,28 @@ method initStream*(s: Connection) =
procCall LPStream(s).initStream()
doAssert(isNil(s.timerTaskFut))
doAssert(s.timerTaskFut == nil)
if s.timeout > 0.millis:
trace "Monitoring for timeout", s, timeout = s.timeout
s.timerTaskFut = s.timeoutMonitor()
if isNil(s.timeoutHandler):
s.timeoutHandler = proc(): Future[void] =
trace "Idle timeout expired, closing connection", s
s.close()
if s.timeoutHandler == nil:
s.timeoutHandler =
proc(): Future[void] {.async: (raises: [], raw: true).} =
trace "Idle timeout expired, closing connection", s
s.close()
method closeImpl*(s: Connection): Future[void] =
method closeImpl*(s: Connection): Future[void] {.async: (raises: []).} =
# Cleanup timeout timer
trace "Closing connection", s
if not isNil(s.timerTaskFut) and not s.timerTaskFut.finished:
s.timerTaskFut.cancel()
if s.timerTaskFut != nil and not s.timerTaskFut.finished:
# Don't `cancelAndWait` here to avoid risking deadlock in this scenario:
# - `pollActivity` is waiting for `s.timeoutHandler` to complete.
# - `s.timeoutHandler` may have triggered `closeImpl` and we are now here.
# In this situation, we have to return for `s.timerTaskFut` to complete.
s.timerTaskFut.cancelSoon()
s.timerTaskFut = nil
trace "Closed connection", s
@ -84,7 +89,7 @@ method closeImpl*(s: Connection): Future[void] =
func hash*(p: Connection): Hash =
cast[pointer](p).hash
proc pollActivity(s: Connection): Future[bool] {.async.} =
proc pollActivity(s: Connection): Future[bool] {.async: (raises: []).} =
if s.closed and s.atEof:
return false # Done, no more monitoring
@ -95,22 +100,13 @@ proc pollActivity(s: Connection): Future[bool] {.async.} =
# Inactivity timeout happened, call timeout monitor
trace "Connection timed out", s
if not(isNil(s.timeoutHandler)):
if s.timeoutHandler != nil:
trace "Calling timeout handler", s
try:
await s.timeoutHandler()
except CancelledError:
# timeoutHandler is expected to be fast, but it's still possible that
# cancellation will happen here - no need to warn about it - we do want to
# stop the polling however
debug "Timeout handler cancelled", s
except CatchableError as exc: # Shouldn't happen
warn "exception in timeout handler", s, exc = exc.msg
await s.timeoutHandler()
return false
proc timeoutMonitor(s: Connection) {.async.} =
proc timeoutMonitor(s: Connection) {.async: (raises: []).} =
## monitor the channel for inactivity
##
## if the timeout was hit, it means that
@ -129,21 +125,22 @@ proc timeoutMonitor(s: Connection) {.async.} =
return
method getWrapped*(s: Connection): Connection {.base.} =
doAssert(false, "not implemented!")
raiseAssert("Not implemented!")
when defined(libp2p_agents_metrics):
proc setShortAgent*(s: Connection, shortAgent: string) =
var conn = s
while not isNil(conn):
while conn != nil:
conn.shortAgent = shortAgent
conn = conn.getWrapped()
proc new*(C: type Connection,
peerId: PeerId,
dir: Direction,
observedAddr: Opt[MultiAddress],
timeout: Duration = DefaultConnectionTimeout,
timeoutHandler: TimeoutHandler = nil): Connection =
proc new*(
C: type Connection,
peerId: PeerId,
dir: Direction,
observedAddr: Opt[MultiAddress],
timeout: Duration = DefaultConnectionTimeout,
timeoutHandler: TimeoutHandler = nil): Connection =
result = C(peerId: peerId,
dir: dir,
timeout: timeout,

View File

@ -23,8 +23,8 @@ import ../varint,
export errors
declareGauge(libp2p_open_streams,
"open stream instances", labels = ["type", "dir"])
declareGauge libp2p_open_streams,
"open stream instances", labels = ["type", "dir"]
export oids
@ -98,8 +98,9 @@ proc newLPStreamConnDownError*(
parentException)
func shortLog*(s: LPStream): auto =
if s.isNil: "LPStream(nil)"
if s == nil: "LPStream(nil)"
else: $s.oid
chronicles.formatIt(LPStream): shortLog(it)
method initStream*(s: LPStream) {.base.} =
@ -126,19 +127,21 @@ method atEof*(s: LPStream): bool {.base, public.} =
s.isEof
method readOnce*(
s: LPStream,
pbytes: pointer,
nbytes: int):
Future[int] {.base, async, public.} =
s: LPStream,
pbytes: pointer,
nbytes: int
): Future[int] {.base, async: (raises: [
CancelledError, LPStreamError], raw: true), public.} =
## Reads whatever is available in the stream,
## up to `nbytes`. Will block if nothing is
## available
doAssert(false, "not implemented!")
raiseAssert("Not implemented!")
proc readExactly*(s: LPStream,
pbytes: pointer,
nbytes: int):
Future[void] {.async, public.} =
proc readExactly*(
s: LPStream,
pbytes: pointer,
nbytes: int
): Future[void] {.async: (raises: [CancelledError, LPStreamError]), public.} =
## Waits for `nbytes` to be available, then read
## them and return them
if s.atEof:
@ -172,10 +175,11 @@ proc readExactly*(s: LPStream,
trace "couldn't read all bytes, incomplete data", s, nbytes, read
raise newLPStreamIncompleteError()
proc readLine*(s: LPStream,
limit = 0,
sep = "\r\n"): Future[string]
{.async, public.} =
proc readLine*(
s: LPStream,
limit = 0,
sep = "\r\n"
): Future[string] {.async: (raises: [CancelledError, LPStreamError]), public.} =
## Reads up to `limit` bytes are read, or a `sep` is found
# TODO replace with something that exploits buffering better
var lim = if limit <= 0: -1 else: limit
@ -201,7 +205,9 @@ proc readLine*(s: LPStream,
if len(result) == lim:
break
proc readVarint*(conn: LPStream): Future[uint64] {.async, public.} =
proc readVarint*(
conn: LPStream
): Future[uint64] {.async: (raises: [CancelledError, LPStreamError]), public.} =
var
buffer: array[10, byte]
@ -219,7 +225,11 @@ proc readVarint*(conn: LPStream): Future[uint64] {.async, public.} =
if true: # can't end with a raise apparently
raise (ref InvalidVarintError)(msg: "Cannot parse varint")
proc readLp*(s: LPStream, maxSize: int): Future[seq[byte]] {.async, public.} =
proc readLp*(
s: LPStream,
maxSize: int
): Future[seq[byte]] {.async: (raises: [
CancelledError, LPStreamError]), public.} =
## read length prefixed msg, with the length encoded as a varint
let
length = await s.readVarint()
@ -233,13 +243,21 @@ proc readLp*(s: LPStream, maxSize: int): Future[seq[byte]] {.async, public.} =
var res = newSeqUninitialized[byte](length)
await s.readExactly(addr res[0], res.len)
return res
res
method write*(s: LPStream, msg: seq[byte]): Future[void] {.base, public.} =
method write*(
s: LPStream,
msg: seq[byte]
): Future[void] {.async: (raises: [
CancelledError, LPStreamError], raw: true), base, public.} =
# Write `msg` to stream, waiting for the write to be finished
doAssert(false, "not implemented!")
raiseAssert("Not implemented!")
proc writeLp*(s: LPStream, msg: openArray[byte]): Future[void] {.public.} =
proc writeLp*(
s: LPStream,
msg: openArray[byte]
): Future[void] {.async: (raises: [
CancelledError, LPStreamError], raw: true), public.} =
## Write `msg` with a varint-encoded length prefix
let vbytes = PB.toBytes(msg.len().uint64)
var buf = newSeqUninitialized[byte](msg.len() + vbytes.len)
@ -247,35 +265,53 @@ proc writeLp*(s: LPStream, msg: openArray[byte]): Future[void] {.public.} =
buf[vbytes.len..<buf.len] = msg
s.write(buf)
proc writeLp*(s: LPStream, msg: string): Future[void] {.public.} =
proc writeLp*(
s: LPStream,
msg: string
): Future[void] {.async: (raises: [
CancelledError, LPStreamError], raw: true), public.} =
writeLp(s, msg.toOpenArrayByte(0, msg.high))
proc write*(s: LPStream, msg: string): Future[void] {.public.} =
proc write*(
s: LPStream,
msg: string
): Future[void] {.async: (raises: [
CancelledError, LPStreamError], raw: true), public.} =
s.write(msg.toBytes())
method closeImpl*(s: LPStream): Future[void] {.async, base.} =
method closeImpl*(
s: LPStream
): Future[void] {.async: (raises: [], raw: true), base.} =
## Implementation of close - called only once
trace "Closing stream", s, objName = s.objName, dir = $s.dir
libp2p_open_streams.dec(labelValues = [s.objName, $s.dir])
untrackCounter(s.objName)
s.closeEvent.fire()
trace "Closed stream", s, objName = s.objName, dir = $s.dir
let fut = newFuture[void]()
fut.complete()
fut
method close*(s: LPStream): Future[void] {.base, async, public.} = # {.raises [Defect].}
method close*(
s: LPStream
): Future[void] {.async: (raises: [], raw: true), base, public.} =
## close the stream - this may block, but will not raise exceptions
##
if s.isClosed:
trace "Already closed", s
return
let fut = newFuture[void]()
fut.complete()
return fut
s.isClosed = true # Set flag before performing virtual close
# An separate implementation method is used so that even when derived types
# A separate implementation method is used so that even when derived types
# override `closeImpl`, it is called only once - anyone overriding `close`
# itself must implement this - once-only check as well, with their own field
await closeImpl(s)
closeImpl(s)
proc closeWithEOF*(s: LPStream): Future[void] {.async, public.} =
proc closeWithEOF*(
s: LPStream): Future[void] {.async: (raises: []), public.} =
## Close the stream and wait for EOF - use this with half-closed streams where
## an EOF is expected to arrive from the other end.
##
@ -304,9 +340,9 @@ proc closeWithEOF*(s: LPStream): Future[void] {.async, public.} =
var buf: array[8, byte]
if (await readOnce(s, addr buf[0], buf.len)) != 0:
debug "Unexpected bytes while waiting for EOF", s
except CancelledError:
discard
except LPStreamEOFError:
trace "Expected EOF came", s
except CancelledError as exc:
raise exc
except CatchableError as exc:
except LPStreamError as exc:
debug "Unexpected error while waiting for EOF", s, msg = exc.msg

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -44,11 +44,12 @@ method initStream*(s: WsStream) =
procCall Connection(s).initStream()
proc new*(T: type WsStream,
session: WSSession,
dir: Direction,
observedAddr: Opt[MultiAddress],
timeout = 10.minutes): T =
proc new*(
T: type WsStream,
session: WSSession,
dir: Direction,
observedAddr: Opt[MultiAddress],
timeout = 10.minutes): T =
let stream = T(
session: session,
@ -76,9 +77,10 @@ template mapExceptions(body: untyped) =
raise newLPStreamEOFError()
method readOnce*(
s: WsStream,
pbytes: pointer,
nbytes: int): Future[int] {.async.} =
s: WsStream,
pbytes: pointer,
nbytes: int
): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} =
let res = mapExceptions(await s.session.recv(pbytes, nbytes))
if res == 0 and s.session.readyState == ReadyState.Closed:
@ -87,13 +89,17 @@ method readOnce*(
return res
method write*(
s: WsStream,
msg: seq[byte]): Future[void] {.async.} =
s: WsStream,
msg: seq[byte]
): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} =
mapExceptions(await s.session.send(msg, Opcode.Binary))
s.activity = true # reset activity flag
method closeImpl*(s: WsStream): Future[void] {.async.} =
await s.session.close()
method closeImpl*(s: WsStream): Future[void] {.async: (raises: []).} =
try:
await s.session.close()
except CatchableError:
discard
await procCall Connection(s).closeImpl()
method getWrapped*(s: WsStream): Connection = nil
@ -140,7 +146,7 @@ method start*(
if WSS.match(ma):
if self.secure: true
else:
warn "Trying to listen on a WSS address without setting the certificate!"
warn "Trying to listen on a WSS address without setting certificate!"
false
else: false

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -31,8 +31,8 @@ proc getMuxerByCodec(self: MuxedUpgrade, muxerName: string): MuxerProvider =
return m
proc mux*(
self: MuxedUpgrade,
conn: Connection): Future[Muxer] {.async.} =
self: MuxedUpgrade,
conn: Connection): Future[Muxer] {.async.} =
## mux connection
trace "Muxing connection", conn
@ -59,13 +59,13 @@ proc mux*(
return muxer
method upgrade*(
self: MuxedUpgrade,
conn: Connection,
peerId: Opt[PeerId]): Future[Muxer] {.async.} =
self: MuxedUpgrade,
conn: Connection,
peerId: Opt[PeerId]): Future[Muxer] {.async.} =
trace "Upgrading connection", conn, direction = conn.dir
let sconn = await self.secure(conn, peerId) # secure the connection
if isNil(sconn):
if sconn == nil:
raise newException(UpgradeFailedError,
"unable to secure connection, stopping upgrade")
@ -86,22 +86,21 @@ method upgrade*(
return muxer
proc new*(
T: type MuxedUpgrade,
muxers: seq[MuxerProvider],
secureManagers: openArray[Secure] = [],
ms: MultistreamSelect): T =
T: type MuxedUpgrade,
muxers: seq[MuxerProvider],
secureManagers: openArray[Secure] = [],
ms: MultistreamSelect): T =
let upgrader = T(
muxers: muxers,
secureManagers: @secureManagers,
ms: ms)
upgrader.streamHandler = proc(conn: Connection) {.async.} =
upgrader.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
trace "Starting stream handler", conn
try:
await upgrader.ms.handle(conn) # handle incoming connection
except CancelledError as exc:
raise exc
return
except CatchableError as exc:
trace "exception in stream handler", conn, msg = exc.msg
finally:

View File

@ -1,27 +1,29 @@
import chronos
import
std/sequtils,
chronos
proc allFuturesThrowing*[F: FutureBase](args: varargs[F]): Future[void] =
proc allFuturesThrowing*(args: varargs[FutureBase]): Future[void] =
# This proc is only meant for use in tests / not suitable for general use.
# - Swallowing errors arbitrarily instead of aggregating them is bad design
# - It raises `CatchableError` instead of the union of the `futs` errors,
# inflating the caller's `raises` list unnecessarily. `macro` could fix it
var futs: seq[F]
for fut in args:
futs &= fut
proc call() {.async.} =
var first: ref CatchableError = nil
futs = await allFinished(futs)
let futs = @args
(proc() {.async: (raises: [CatchableError]).} =
await allFutures(futs)
var firstErr: ref CatchableError
for fut in futs:
if fut.failed:
let err = fut.readError()
if err of Defect:
let err = fut.error()
if err of CancelledError:
raise err
else:
if err of CancelledError:
raise err
if isNil(first):
first = err
if not isNil(first):
raise first
if firstErr == nil:
firstErr = err
if firstErr != nil:
raise firstErr)()
return call()
proc allFuturesThrowing*[T](futs: varargs[Future[T]]): Future[void] =
allFuturesThrowing(futs.mapIt(FutureBase(it)))
proc allFuturesThrowing*[T, E](
futs: varargs[InternalRaisesFuture[T, E]]): Future[void] =
allFuturesThrowing(futs.mapIt(FutureBase(it)))

View File

@ -76,11 +76,18 @@ template rng*(): ref HmacDrbgContext =
getRng()
type
WriteHandler* = proc(data: seq[byte]): Future[void] {.gcsafe, raises: [].}
WriteHandler* = proc(
data: seq[byte]
): Future[void] {.async: (raises: [CancelledError, LPStreamError]).}
TestBufferStream* = ref object of BufferStream
writeHandler*: WriteHandler
method write*(s: TestBufferStream, msg: seq[byte]): Future[void] =
method write*(
s: TestBufferStream,
msg: seq[byte]
): Future[void] {.async: (raises: [
CancelledError, LPStreamError], raw: true).} =
s.writeHandler(msg)
method getWrapped*(s: TestBufferStream): Connection = nil
@ -98,11 +105,15 @@ proc bridgedConnections*: (Connection, Connection) =
connB.dir = Direction.In
connA.initStream()
connB.initStream()
connA.writeHandler = proc(data: seq[byte]) {.async.} =
await connB.pushData(data)
connA.writeHandler =
proc(data: seq[byte]) {.async: (raises: [
CancelledError, LPStreamError], raw: true).} =
connB.pushData(data)
connB.writeHandler = proc(data: seq[byte]) {.async.} =
await connA.pushData(data)
connB.writeHandler =
proc(data: seq[byte]) {.async: (raises: [
CancelledError, LPStreamError], raw: true).} =
connA.pushData(data)
return (connA, connB)
macro checkUntilCustomTimeout*(timeout: Duration, code: untyped): untyped =

View File

@ -1,5 +1,5 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -24,7 +24,8 @@ import utils
import ../helpers
proc noop(data: seq[byte]) {.async.} = discard
proc noop(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
const MsgIdSuccess = "msg id gen success"

View File

@ -1,7 +1,7 @@
{.used.}
# Nim-Libp2p
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -29,11 +29,12 @@ type
peerId: PeerId
method newStream*(
m: TestMuxer,
name: string = "",
lazy: bool = false):
Future[Connection] {.async.} =
result = Connection.new(m.peerId, Direction.Out, Opt.none(MultiAddress))
m: TestMuxer,
name: string = "",
lazy: bool = false
): Future[Connection] {.async: (raises: [
CancelledError, LPStreamError, MuxerError]).} =
Connection.new(m.peerId, Direction.Out, Opt.none(MultiAddress))
suite "Connection Manager":
teardown:

View File

@ -1,7 +1,7 @@
{.used.}
# Nim-Libp2p
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -32,7 +32,8 @@ suite "Mplex":
suite "channel encoding":
asyncTest "encode header with channel id 0":
proc encHandler(msg: seq[byte]) {.async.} =
proc encHandler(
msg: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} =
check msg == fromHex("000873747265616d2031")
let conn = TestBufferStream.new(encHandler)
@ -40,7 +41,8 @@ suite "Mplex":
await conn.close()
asyncTest "encode header with channel id other than 0":
proc encHandler(msg: seq[byte]) {.async.} =
proc encHandler(
msg: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} =
check msg == fromHex("88010873747265616d2031")
let conn = TestBufferStream.new(encHandler)
@ -48,7 +50,8 @@ suite "Mplex":
await conn.close()
asyncTest "encode header and body with channel id 0":
proc encHandler(msg: seq[byte]) {.async.} =
proc encHandler(
msg: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} =
check msg == fromHex("020873747265616d2031")
let conn = TestBufferStream.new(encHandler)
@ -56,7 +59,8 @@ suite "Mplex":
await conn.close()
asyncTest "encode header and body with channel id other than 0":
proc encHandler(msg: seq[byte]) {.async.} =
proc encHandler(
msg: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} =
check msg == fromHex("8a010873747265616d2031")
let conn = TestBufferStream.new(encHandler)
@ -97,7 +101,10 @@ suite "Mplex":
suite "channel half-closed":
asyncTest "(local close) - should close for write":
proc writeHandler(data: seq[byte]) {.async.} = discard
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
@ -112,7 +119,9 @@ suite "Mplex":
asyncTest "(local close) - should allow reads until remote closes":
let
conn = TestBufferStream.new(
proc (data: seq[byte]) {.async.} =
proc (
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard,
)
chann = LPChannel.init(1, conn, true)
@ -139,7 +148,9 @@ suite "Mplex":
asyncTest "(remote close) - channel should close for reading by remote":
let
conn = TestBufferStream.new(
proc (data: seq[byte]) {.async.} =
proc (
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard,
)
chann = LPChannel.init(1, conn, true)
@ -162,7 +173,9 @@ suite "Mplex":
let
testData = "Hello!".toBytes
conn = TestBufferStream.new(
proc (data: seq[byte]) {.async.} =
proc (
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
)
chann = LPChannel.init(1, conn, true)
@ -175,7 +188,10 @@ suite "Mplex":
await conn.close()
asyncTest "should not allow pushing data to channel when remote end closed":
proc writeHandler(data: seq[byte]) {.async.} = discard
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
@ -192,7 +208,10 @@ suite "Mplex":
suite "channel reset":
asyncTest "channel should fail reading":
proc writeHandler(data: seq[byte]) {.async.} = discard
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
@ -205,7 +224,10 @@ suite "Mplex":
await conn.close()
asyncTest "reset should complete read":
proc writeHandler(data: seq[byte]) {.async.} = discard
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
@ -220,7 +242,10 @@ suite "Mplex":
await conn.close()
asyncTest "reset should complete pushData":
proc writeHandler(data: seq[byte]) {.async.} = discard
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
@ -239,7 +264,10 @@ suite "Mplex":
await conn.close()
asyncTest "reset should complete both read and push":
proc writeHandler(data: seq[byte]) {.async.} = discard
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
@ -254,7 +282,10 @@ suite "Mplex":
await conn.close()
asyncTest "reset should complete both read and pushes":
proc writeHandler(data: seq[byte]) {.async.} = discard
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
@ -279,7 +310,10 @@ suite "Mplex":
await conn.close()
asyncTest "reset should complete both read and push with cancel":
proc writeHandler(data: seq[byte]) {.async.} = discard
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
@ -293,7 +327,10 @@ suite "Mplex":
await conn.close()
asyncTest "should complete both read and push after reset":
proc writeHandler(data: seq[byte]) {.async.} = discard
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
@ -311,7 +348,10 @@ suite "Mplex":
await conn.close()
asyncTest "reset should complete ongoing push without reader":
proc writeHandler(data: seq[byte]) {.async.} = discard
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
@ -323,7 +363,10 @@ suite "Mplex":
await conn.close()
asyncTest "reset should complete ongoing read without a push":
proc writeHandler(data: seq[byte]) {.async.} = discard
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
@ -335,7 +378,10 @@ suite "Mplex":
await conn.close()
asyncTest "reset should allow all reads and pushes to complete":
proc writeHandler(data: seq[byte]) {.async.} = discard
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
@ -364,7 +410,10 @@ suite "Mplex":
await conn.close()
asyncTest "channel should fail writing":
proc writeHandler(data: seq[byte]) {.async.} = discard
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(1, conn, true)
@ -376,7 +425,10 @@ suite "Mplex":
await conn.close()
asyncTest "channel should reset on timeout":
proc writeHandler(data: seq[byte]) {.async.} = discard
proc writeHandler(
data: seq[byte]
) {.async: (raises: [CancelledError, LPStreamError]).} =
discard
let
conn = TestBufferStream.new(writeHandler)
chann = LPChannel.init(
@ -395,11 +447,15 @@ suite "Mplex":
proc acceptHandler() {.async.} =
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async.} =
let msg = await stream.readLp(1024)
check string.fromBytes(msg) == "HELLO"
await stream.close()
mplexListen.streamHandler =
proc(stream: Connection) {.async: (raises: []).} =
try:
let msg = await stream.readLp(1024)
check string.fromBytes(msg) == "HELLO"
except CancelledError, LPStreamError:
return
finally:
await stream.close()
await mplexListen.handle()
await mplexListen.close()
@ -432,11 +488,15 @@ suite "Mplex":
proc acceptHandler() {.async.} =
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async.} =
let msg = await stream.readLp(1024)
check string.fromBytes(msg) == "HELLO"
await stream.close()
mplexListen.streamHandler =
proc(stream: Connection) {.async: (raises: []).} =
try:
let msg = await stream.readLp(1024)
check string.fromBytes(msg) == "HELLO"
except CancelledError, LPStreamError:
return
finally:
await stream.close()
await mplexListen.handle()
await mplexListen.close()
@ -477,13 +537,17 @@ suite "Mplex":
try:
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async.} =
let msg = await stream.readLp(MaxMsgSize)
check msg == bigseq
trace "Bigseq check passed!"
await stream.close()
listenJob.complete()
mplexListen.streamHandler =
proc(stream: Connection) {.async: (raises: []).} =
try:
let msg = await stream.readLp(MaxMsgSize)
check msg == bigseq
trace "Bigseq check passed!"
except CancelledError, LPStreamError:
return
finally:
await stream.close()
listenJob.complete()
await mplexListen.handle()
await sleepAsync(1.seconds) # give chronos some slack to process things
@ -523,10 +587,14 @@ suite "Mplex":
proc acceptHandler() {.async.} =
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async.} =
await stream.writeLp("Hello from stream!")
await stream.close()
mplexListen.streamHandler =
proc(stream: Connection) {.async: (raises: []).} =
try:
await stream.writeLp("Hello from stream!")
except CancelledError, LPStreamError:
return
finally:
await stream.close()
await mplexListen.handle()
await mplexListen.close()
@ -561,14 +629,21 @@ suite "Mplex":
var count = 1
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async.} =
let msg = await stream.readLp(1024)
check string.fromBytes(msg) == &"stream {count}!"
count.inc
if count == 11:
done.complete()
await stream.close()
mplexListen.streamHandler =
proc(stream: Connection) {.async: (raises: []).} =
try:
let msg = await stream.readLp(1024)
try:
check string.fromBytes(msg) == &"stream {count}!"
except ValueError as exc:
raiseAssert(exc.msg)
count.inc
if count == 11:
done.complete()
except CancelledError, LPStreamError:
return
finally:
await stream.close()
await mplexListen.handle()
await mplexListen.close()
@ -605,15 +680,22 @@ suite "Mplex":
var count = 1
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async.} =
let msg = await stream.readLp(1024)
check string.fromBytes(msg) == &"stream {count} from dialer!"
await stream.writeLp(&"stream {count} from listener!")
count.inc
if count == 11:
done.complete()
await stream.close()
mplexListen.streamHandler =
proc(stream: Connection) {.async: (raises: []).} =
try:
let msg = await stream.readLp(1024)
try:
check string.fromBytes(msg) == &"stream {count} from dialer!"
await stream.writeLp(&"stream {count} from listener!")
except ValueError as exc:
raiseAssert(exc.msg)
count.inc
if count == 11:
done.complete()
except CancelledError, LPStreamError:
return
finally:
await stream.close()
await mplexListen.handle()
await mplexListen.close()
@ -650,16 +732,19 @@ suite "Mplex":
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async.} =
listenStreams.add(stream)
try:
discard await stream.readLp(1024)
except LPStreamEOFError:
await stream.close()
return
mplexListen.streamHandler =
proc(stream: Connection) {.async: (raises: []).} =
listenStreams.add(stream)
try:
discard await stream.readLp(1024)
except LPStreamEOFError:
return
except CancelledError, LPStreamError:
return
finally:
await stream.close()
check false
check false
await mplexListen.handle()
await mplexListen.close()
@ -700,14 +785,14 @@ suite "Mplex":
proc acceptHandler() {.async.} =
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async.} =
listenStreams.add(stream)
count.inc()
if count == 10:
done.complete()
mplexListen.streamHandler =
proc(stream: Connection) {.async: (raises: []).} =
listenStreams.add(stream)
count.inc()
if count == 10:
done.complete()
await stream.join()
await noCancel stream.join()
await mplexListen.handle()
await mplexListen.close()
@ -764,10 +849,10 @@ suite "Mplex":
proc acceptHandler() {.async.} =
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async.} =
mplexListen.streamHandler =
proc(stream: Connection) {.async: (raises: []).} =
listenStreams.add(stream)
await stream.join()
await noCancel stream.join()
await mplexListen.handle()
await mplexListen.close()
@ -808,10 +893,10 @@ suite "Mplex":
proc acceptHandler() {.async.} =
let conn = await transport1.accept()
mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async.} =
mplexListen.streamHandler =
proc(stream: Connection) {.async: (raises: []).} =
listenStreams.add(stream)
await stream.join()
await noCancel stream.join()
await mplexListen.handle()
await mplexListen.close()
@ -854,10 +939,10 @@ suite "Mplex":
proc acceptHandler() {.async.} =
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async.} =
mplexListen.streamHandler =
proc(stream: Connection) {.async: (raises: []).} =
listenStreams.add(stream)
await stream.join()
await noCancel stream.join()
mplexHandle = mplexListen.handle()
await mplexHandle
@ -899,10 +984,10 @@ suite "Mplex":
proc acceptHandler() {.async.} =
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async.} =
mplexListen.streamHandler =
proc(stream: Connection) {.async: (raises: []).} =
listenStreams.add(stream)
await stream.join()
await noCancel stream.join()
await mplexListen.handle()
await mplexListen.close()
@ -946,10 +1031,10 @@ suite "Mplex":
proc acceptHandler() {.async.} =
listenConn = await transport1.accept()
let mplexListen = Mplex.new(listenConn)
mplexListen.streamHandler = proc(stream: Connection)
{.async.} =
mplexListen.streamHandler =
proc(stream: Connection) {.async: (raises: []).} =
listenStreams.add(stream)
await stream.join()
await noCancel stream.join()
await mplexListen.handle()
await mplexListen.close()
@ -995,15 +1080,17 @@ suite "Mplex":
proc acceptHandler() {.async.} =
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async.} =
try:
let msg = await stream.readLp(MsgSize)
check msg.len == MsgSize
except CatchableError as e:
echo e.msg
await stream.close()
complete.complete()
mplexListen.streamHandler =
proc(stream: Connection) {.async: (raises: []).} =
try:
let msg = await stream.readLp(MsgSize)
check msg.len == MsgSize
except CancelledError as e:
echo e.msg
except LPStreamError as e:
echo e.msg
await stream.close()
complete.complete()
await mplexListen.handle()
await mplexListen.close()
@ -1067,12 +1154,16 @@ suite "Mplex":
proc acceptHandler() {.async.} =
let conn = await transport1.accept()
let mplexListen = Mplex.new(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async.} =
let msg = await stream.readLp(MsgSize)
check msg.len == MsgSize
await stream.close()
complete.complete()
mplexListen.streamHandler =
proc(stream: Connection) {.async: (raises: []).} =
try:
let msg = await stream.readLp(MsgSize)
check msg.len == MsgSize
except CancelledError, LPStreamError:
return
finally:
await stream.close()
complete.complete()
await mplexListen.handle()
await mplexListen.close()

View File

@ -1,7 +1,7 @@
{.used.}
# Nim-Libp2p
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -32,43 +32,57 @@ type
TestSelectStream = ref object of Connection
step*: int
method readOnce*(s: TestSelectStream,
pbytes: pointer,
nbytes: int): Future[int] {.async.} =
method readOnce*(
s: TestSelectStream,
pbytes: pointer,
nbytes: int
): Future[int] {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
let fut = newFuture[int]()
case s.step:
of 1:
var buf = newSeq[byte](1)
buf[0] = 19
copyMem(pbytes, addr buf[0], buf.len())
s.step = 2
return buf.len
of 2:
var buf = "/multistream/1.0.0\n"
copyMem(pbytes, addr buf[0], buf.len())
s.step = 3
return buf.len
of 3:
var buf = newSeq[byte](1)
buf[0] = 18
copyMem(pbytes, addr buf[0], buf.len())
s.step = 4
return buf.len
of 4:
var buf = "/test/proto/1.0.0\n"
copyMem(pbytes, addr buf[0], buf.len())
return buf.len
else:
copyMem(pbytes,
cstring("\0x3na\n"),
"\0x3na\n".len())
of 1:
var buf = newSeq[byte](1)
buf[0] = 19
copyMem(pbytes, addr buf[0], buf.len())
s.step = 2
fut.complete(buf.len)
of 2:
var buf = "/multistream/1.0.0\n"
copyMem(pbytes, addr buf[0], buf.len())
s.step = 3
fut.complete(buf.len)
of 3:
var buf = newSeq[byte](1)
buf[0] = 18
copyMem(pbytes, addr buf[0], buf.len())
s.step = 4
fut.complete(buf.len)
of 4:
var buf = "/test/proto/1.0.0\n"
copyMem(pbytes, addr buf[0], buf.len())
fut.complete(buf.len)
else:
copyMem(pbytes,
cstring("\0x3na\n"),
"\0x3na\n".len())
return "\0x3na\n".len()
fut.complete("\0x3na\n".len())
fut
method write*(s: TestSelectStream, msg: seq[byte]) {.async.} = discard
method write*(
s: TestSelectStream,
msg: seq[byte]
): Future[void] {.async: (raises: [
CancelledError, LPStreamError], raw: true).} =
let fut = newFuture[void]()
fut.complete()
fut
method close(s: TestSelectStream) {.async.} =
method close(s: TestSelectStream) {.async: (raises: [], raw: true).} =
s.isClosed = true
s.isEof = true
let fut = newFuture[void]()
fut.complete()
fut
proc newTestSelectStream(): TestSelectStream =
new result
@ -76,50 +90,65 @@ proc newTestSelectStream(): TestSelectStream =
## Mock stream for handles `ls` test
type
LsHandler = proc(procs: seq[byte]): Future[void] {.gcsafe, raises: [].}
LsHandler = proc(
procs: seq[byte]
): Future[void] {.async: (raises: [CancelledError, LPStreamError]).}
TestLsStream = ref object of Connection
step*: int
ls*: LsHandler
method readOnce*(s: TestLsStream,
pbytes: pointer,
nbytes: int):
Future[int] {.async.} =
method readOnce*(
s: TestLsStream,
pbytes: pointer,
nbytes: int
): Future[int] {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
let fut = newFuture[int]()
case s.step:
of 1:
var buf = newSeq[byte](1)
buf[0] = 19
copyMem(pbytes, addr buf[0], buf.len())
s.step = 2
return buf.len()
of 2:
var buf = "/multistream/1.0.0\n"
copyMem(pbytes, addr buf[0], buf.len())
s.step = 3
return buf.len()
of 3:
var buf = newSeq[byte](1)
buf[0] = 3
copyMem(pbytes, addr buf[0], buf.len())
s.step = 4
return buf.len()
of 4:
var buf = "ls\n"
copyMem(pbytes, addr buf[0], buf.len())
return buf.len()
else:
var buf = "na\n"
copyMem(pbytes, addr buf[0], buf.len())
return buf.len()
of 1:
var buf = newSeq[byte](1)
buf[0] = 19
copyMem(pbytes, addr buf[0], buf.len())
s.step = 2
fut.complete(buf.len())
of 2:
var buf = "/multistream/1.0.0\n"
copyMem(pbytes, addr buf[0], buf.len())
s.step = 3
fut.complete(buf.len())
of 3:
var buf = newSeq[byte](1)
buf[0] = 3
copyMem(pbytes, addr buf[0], buf.len())
s.step = 4
fut.complete(buf.len())
of 4:
var buf = "ls\n"
copyMem(pbytes, addr buf[0], buf.len())
fut.complete(buf.len())
else:
var buf = "na\n"
copyMem(pbytes, addr buf[0], buf.len())
fut.complete(buf.len())
fut
method write*(s: TestLsStream, msg: seq[byte]) {.async.} =
method write*(
s: TestLsStream,
msg: seq[byte]
): Future[void] {.async: (raises: [
CancelledError, LPStreamError], raw: true).} =
if s.step == 4:
await s.ls(msg)
return s.ls(msg)
let fut = newFuture[void]()
fut.complete()
fut
method close(s: TestLsStream) {.async.} =
method close(s: TestLsStream): Future[void] {.async: (raises: [], raw: true).} =
s.isClosed = true
s.isEof = true
let fut = newFuture[void]()
fut.complete()
fut
proc newTestLsStream(ls: LsHandler): TestLsStream {.gcsafe.} =
new result
@ -128,52 +157,67 @@ proc newTestLsStream(ls: LsHandler): TestLsStream {.gcsafe.} =
## Mock stream for handles `na` test
type
NaHandler = proc(procs: string): Future[void] {.gcsafe, raises: [].}
NaHandler = proc(
procs: string
): Future[void] {.async: (raises: [CancelledError, LPStreamError]).}
TestNaStream = ref object of Connection
step*: int
na*: NaHandler
method readOnce*(s: TestNaStream,
pbytes: pointer,
nbytes: int):
Future[int] {.async.} =
method readOnce*(
s: TestNaStream,
pbytes: pointer,
nbytes: int
): Future[int] {.async: (raises: [CancelledError, LPStreamError], raw: true).} =
let fut = newFuture[int]()
case s.step:
of 1:
var buf = newSeq[byte](1)
buf[0] = 19
copyMem(pbytes, addr buf[0], buf.len())
s.step = 2
return buf.len()
of 2:
var buf = "/multistream/1.0.0\n"
copyMem(pbytes, addr buf[0], buf.len())
s.step = 3
return buf.len()
of 3:
var buf = newSeq[byte](1)
buf[0] = 18
copyMem(pbytes, addr buf[0], buf.len())
s.step = 4
return buf.len()
of 4:
var buf = "/test/proto/1.0.0\n"
copyMem(pbytes, addr buf[0], buf.len())
return buf.len()
else:
copyMem(pbytes,
cstring("\0x3na\n"),
"\0x3na\n".len())
of 1:
var buf = newSeq[byte](1)
buf[0] = 19
copyMem(pbytes, addr buf[0], buf.len())
s.step = 2
fut.complete(buf.len())
of 2:
var buf = "/multistream/1.0.0\n"
copyMem(pbytes, addr buf[0], buf.len())
s.step = 3
fut.complete(buf.len())
of 3:
var buf = newSeq[byte](1)
buf[0] = 18
copyMem(pbytes, addr buf[0], buf.len())
s.step = 4
fut.complete(buf.len())
of 4:
var buf = "/test/proto/1.0.0\n"
copyMem(pbytes, addr buf[0], buf.len())
fut.complete(buf.len())
else:
copyMem(pbytes,
cstring("\0x3na\n"),
"\0x3na\n".len())
return "\0x3na\n".len()
fut.complete("\0x3na\n".len())
fut
method write*(s: TestNaStream, msg: seq[byte]) {.async.} =
method write*(
s: TestNaStream,
msg: seq[byte]
): Future[void] {.async: (raises: [
CancelledError, LPStreamError], raw: true).} =
if s.step == 4:
await s.na(string.fromBytes(msg))
return s.na(string.fromBytes(msg))
let fut = newFuture[void]()
fut.complete()
fut
method close(s: TestNaStream) {.async.} =
method close(s: TestNaStream): Future[void] {.async: (raises: [], raw: true).} =
s.isClosed = true
s.isEof = true
let fut = newFuture[void]()
fut.complete()
fut
proc newTestNaStream(na: NaHandler): TestNaStream =
new result
@ -210,7 +254,8 @@ suite "Multistream select":
var conn: Connection = nil
let done = newFuture[void]()
proc testLsHandler(proto: seq[byte]) {.async.} =
proc testLsHandler(
proto: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} =
var strProto: string = string.fromBytes(proto)
check strProto == "\x26/test/proto1/1.0.0\n/test/proto2/1.0.0\n"
await conn.close()
@ -230,7 +275,9 @@ suite "Multistream select":
let ms = MultistreamSelect.new()
var conn: Connection = nil
proc testNaHandler(msg: string): Future[void] {.async.} =
proc testNaHandler(
msg: string
): Future[void] {.async: (raises: [CancelledError, LPStreamError]).} =
check msg == "\x03na\n"
await conn.close()
conn = newTestNaStream(testNaHandler)

View File

@ -1,7 +1,7 @@
{.used.}
# Nim-Libp2p
# Copyright (c) 2023 Status Research & Development GmbH
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@ -18,6 +18,9 @@ import
],
./helpers
proc newBlockerFut(): Future[void] {.async: (raises: [], raw: true).} =
newFuture[void]()
suite "Yamux":
teardown:
checkTrackers()
@ -42,10 +45,14 @@ suite "Yamux":
asyncTest "Roundtrip of small messages":
mSetup()
yamuxb.streamHandler = proc(conn: Connection) {.async.} =
check (await conn.readLp(100)) == fromHex("1234")
await conn.writeLp(fromHex("5678"))
await conn.close()
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
try:
check (await conn.readLp(100)) == fromHex("1234")
await conn.writeLp(fromHex("5678"))
except CancelledError, LPStreamError:
return
finally:
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
@ -57,15 +64,19 @@ suite "Yamux":
asyncTest "Continuing read after close":
mSetup()
let
readerBlocker = newFuture[void]()
handlerBlocker = newFuture[void]()
readerBlocker = newBlockerFut()
handlerBlocker = newBlockerFut()
var numberOfRead = 0
yamuxb.streamHandler = proc(conn: Connection) {.async.} =
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
await readerBlocker
var buffer: array[25600, byte]
while (await conn.readOnce(addr buffer[0], 25600)) > 0:
numberOfRead.inc()
await conn.close()
try:
var buffer: array[25600, byte]
while (await conn.readOnce(addr buffer[0], 25600)) > 0:
numberOfRead.inc()
except CancelledError, LPStreamError:
return
finally:
await conn.close()
handlerBlocker.complete()
let streamA = await yamuxa.newStream()
@ -80,12 +91,16 @@ suite "Yamux":
suite "Window exhaustion":
asyncTest "Basic exhaustion blocking":
mSetup()
let readerBlocker = newFuture[void]()
yamuxb.streamHandler = proc(conn: Connection) {.async.} =
let readerBlocker = newBlockerFut()
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
await readerBlocker
var buffer: array[160000, byte]
discard await conn.readOnce(addr buffer[0], 160000)
await conn.close()
try:
var buffer: array[160000, byte]
discard await conn.readOnce(addr buffer[0], 160000)
except CancelledError, LPStreamError:
return
finally:
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
@ -103,12 +118,16 @@ suite "Yamux":
asyncTest "Exhaustion doesn't block other channels":
mSetup()
let readerBlocker = newFuture[void]()
yamuxb.streamHandler = proc(conn: Connection) {.async.} =
let readerBlocker = newBlockerFut()
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
await readerBlocker
var buffer: array[160000, byte]
discard await conn.readOnce(addr buffer[0], 160000)
await conn.close()
try:
var buffer: array[160000, byte]
discard await conn.readOnce(addr buffer[0], 160000)
except CancelledError, LPStreamError:
return
finally:
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
@ -120,10 +139,14 @@ suite "Yamux":
# Now that the secondWriter is stuck, create a second stream
# and exchange some data
yamuxb.streamHandler = proc(conn: Connection) {.async.} =
check (await conn.readLp(100)) == fromHex("1234")
await conn.writeLp(fromHex("5678"))
await conn.close()
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
try:
check (await conn.readLp(100)) == fromHex("1234")
await conn.writeLp(fromHex("5678"))
except CancelledError, LPStreamError:
return
finally:
await conn.close()
let streamB = await yamuxa.newStream()
await streamB.writeLp(fromHex("1234"))
@ -138,15 +161,19 @@ suite "Yamux":
asyncTest "Can set custom window size":
mSetup()
let writerBlocker = newFuture[void]()
let writerBlocker = newBlockerFut()
var numberOfRead = 0
yamuxb.streamHandler = proc(conn: Connection) {.async.} =
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
YamuxChannel(conn).setMaxRecvWindow(20)
var buffer: array[256000, byte]
while (await conn.readOnce(addr buffer[0], 256000)) > 0:
numberOfRead.inc()
writerBlocker.complete()
await conn.close()
try:
var buffer: array[256000, byte]
while (await conn.readOnce(addr buffer[0], 256000)) > 0:
numberOfRead.inc()
writerBlocker.complete()
except CancelledError, LPStreamError:
return
finally:
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
@ -163,12 +190,16 @@ suite "Yamux":
asyncTest "Saturate until reset":
mSetup()
let writerBlocker = newFuture[void]()
yamuxb.streamHandler = proc(conn: Connection) {.async.} =
let writerBlocker = newBlockerFut()
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
await writerBlocker
var buffer: array[256, byte]
check: (await conn.readOnce(addr buffer[0], 256)) == 0
await conn.close()
try:
var buffer: array[256, byte]
check: (await conn.readOnce(addr buffer[0], 256)) == 0
except CancelledError, LPStreamError:
return
finally:
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
@ -184,12 +215,16 @@ suite "Yamux":
asyncTest "Increase window size":
mSetup(512000)
let readerBlocker = newFuture[void]()
yamuxb.streamHandler = proc(conn: Connection) {.async.} =
let readerBlocker = newBlockerFut()
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
await readerBlocker
var buffer: array[260000, byte]
discard await conn.readOnce(addr buffer[0], 260000)
await conn.close()
try:
var buffer: array[260000, byte]
discard await conn.readOnce(addr buffer[0], 260000)
except CancelledError, LPStreamError:
return
finally:
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
@ -207,17 +242,20 @@ suite "Yamux":
asyncTest "Reduce window size":
mSetup(64000)
let readerBlocker1 = newFuture[void]()
let readerBlocker2 = newFuture[void]()
yamuxb.streamHandler = proc(conn: Connection) {.async.} =
await readerBlocker1
var buffer: array[256000, byte]
# For the first roundtrip, the send window size is assumed to be 256k
discard await conn.readOnce(addr buffer[0], 256000)
await readerBlocker2
discard await conn.readOnce(addr buffer[0], 40000)
await conn.close()
let readerBlocker1 = newBlockerFut()
let readerBlocker2 = newBlockerFut()
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
try:
await readerBlocker1
var buffer: array[256000, byte]
# For the first roundtrip, the send window size is assumed to be 256k
discard await conn.readOnce(addr buffer[0], 256000)
await readerBlocker2
discard await conn.readOnce(addr buffer[0], 40000)
except CancelledError, LPStreamError:
return
finally:
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
@ -242,15 +280,18 @@ suite "Yamux":
suite "Timeout testing":
asyncTest "Check if InTimeout close both streams correctly":
mSetup(inTo = 1.seconds)
let blocker = newFuture[void]()
let connBlocker = newFuture[void]()
let blocker = newBlockerFut()
let connBlocker = newBlockerFut()
yamuxb.streamHandler = proc(conn: Connection) {.async.} =
check (await conn.readLp(100)) == fromHex("1234")
await conn.writeLp(fromHex("5678"))
await blocker
check conn.isClosed
connBlocker.complete()
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
try:
check (await conn.readLp(100)) == fromHex("1234")
await conn.writeLp(fromHex("5678"))
await blocker
check conn.isClosed
connBlocker.complete()
except CancelledError, LPStreamError:
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
@ -265,15 +306,18 @@ suite "Yamux":
asyncTest "Check if OutTimeout close both streams correctly":
mSetup(outTo = 1.seconds)
let blocker = newFuture[void]()
let connBlocker = newFuture[void]()
let blocker = newBlockerFut()
let connBlocker = newBlockerFut()
yamuxb.streamHandler = proc(conn: Connection) {.async.} =
check (await conn.readLp(100)) == fromHex("1234")
await conn.writeLp(fromHex("5678"))
await blocker
check conn.isClosed
connBlocker.complete()
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
try:
check (await conn.readLp(100)) == fromHex("1234")
await conn.writeLp(fromHex("5678"))
await blocker
check conn.isClosed
connBlocker.complete()
except CancelledError, LPStreamError:
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
@ -290,11 +334,18 @@ suite "Yamux":
asyncTest "Local & Remote close":
mSetup()
yamuxb.streamHandler = proc(conn: Connection) {.async.} =
check (await conn.readLp(100)) == fromHex("1234")
await conn.close()
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
try:
check (await conn.readLp(100)) == fromHex("1234")
except CancelledError, LPStreamError:
return
finally:
await conn.close()
expect LPStreamClosedError: await conn.writeLp(fromHex("102030"))
check (await conn.readLp(100)) == fromHex("5678")
try:
check (await conn.readLp(100)) == fromHex("5678")
except CancelledError, LPStreamError:
return
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
@ -306,13 +357,17 @@ suite "Yamux":
asyncTest "Local & Remote reset":
mSetup()
let blocker = newFuture[void]()
let blocker = newBlockerFut()
yamuxb.streamHandler = proc(conn: Connection) {.async.} =
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
await blocker
expect LPStreamResetError: discard await conn.readLp(100)
expect LPStreamResetError: await conn.writeLp(fromHex("1234"))
await conn.close()
try:
expect LPStreamResetError: discard await conn.readLp(100)
expect LPStreamResetError: await conn.writeLp(fromHex("1234"))
except CancelledError, LPStreamError:
return
finally:
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]