diff --git a/libp2p/connection.nim b/libp2p/connection.nim index ae47d0439..64b3066fe 100644 --- a/libp2p/connection.nim +++ b/libp2p/connection.nim @@ -15,7 +15,7 @@ import peerinfo, varint, vbuffer -const DefaultReadSize*: uint = 1 shl 20 # 1mb, in order to fit mplex spec +const DefaultReadSize* = 1 shl 20 type Connection* = ref object of LPStream @@ -24,10 +24,14 @@ type observedAddrs*: Multiaddress InvalidVarintException = object of LPStreamError + InvalidVarintSizeException = object of LPStreamError proc newInvalidVarintException*(): ref InvalidVarintException = newException(InvalidVarintException, "unable to parse varint") +proc newInvalidVarintSizeException*(): ref InvalidVarintSizeException = + newException(InvalidVarintSizeException, "wrong varint size") + proc init*[T: Connection](self: var T, stream: LPStream) = ## create a new Connection for the specified async reader/writer new self @@ -124,8 +128,8 @@ proc readLp*(s: Connection): Future[seq[byte]] {.async, gcsafe.} = break if res != VarintStatus.Success: raise newInvalidVarintException() - if size > DefaultReadSize: - raise newLPStreamLimitError() + if size.int > DefaultReadSize: + raise newInvalidVarintSizeException() buff.setLen(size) if size > 0.uint: trace "reading exact bytes from stream", size = size diff --git a/libp2p/muxers/mplex/coder.nim b/libp2p/muxers/mplex/coder.nim index 0451805d4..9f8ee9ee3 100644 --- a/libp2p/muxers/mplex/coder.nim +++ b/libp2p/muxers/mplex/coder.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import chronos, options +import chronos import nimcrypto/utils, chronicles import types, ../../connection, @@ -18,30 +18,35 @@ import types, logScope: topic = "MplexCoder" +const DefaultChannelSize* = 1 shr 20 + type Msg* = tuple id: uint msgType: MessageType data: seq[byte] -proc readMplexVarint(conn: Connection): Future[Option[uint]] {.async, gcsafe.} = +proc readMplexVarint(conn: Connection): Future[uint] {.async, gcsafe.} = var varint: uint length: int res: VarintStatus buffer = newSeq[byte](10) - result = none(uint) try: for i in 0.. DefaultReadSize: + raise newInvalidVarintSizeException() + return varint except LPStreamIncompleteError as exc: trace "unable to read varint", exc = exc.msg + raise exc proc readMsg*(conn: Connection): Future[Msg] {.async, gcsafe.} = let headerVarint = await conn.readMplexVarint()