diff --git a/docs/GETTING_STARTED.md b/docs/GETTING_STARTED.md index 4d64f6c47..52ad72269 100644 --- a/docs/GETTING_STARTED.md +++ b/docs/GETTING_STARTED.md @@ -34,7 +34,7 @@ type method init(p: TestProto) {.gcsafe.} = # handle incoming connections in closure proc handle(conn: Connection, proto: string) {.async, gcsafe.} = - echo "Got from remote - ", cast[string](await conn.readLp()) + echo "Got from remote - ", cast[string](await conn.readLp(1024)) await conn.writeLp("Hello!") await conn.close() @@ -90,7 +90,7 @@ proc main() {.async, gcsafe.} = await conn.writeLp("Hello!") # writeLp send a length prefixed buffer over the wire # readLp reads length prefixed bytes and returns a buffer without the prefix - echo "Remote responded with - ", cast[string](await conn.readLp()) + echo "Remote responded with - ", cast[string](await conn.readLp(1024)) await allFutures(switch1.stop(), switch2.stop()) # close connections and shutdown all transports await allFutures(switch1Fut & switch2Fut) # wait for all transports to shutdown diff --git a/docs/tutorial/directchat/second.nim b/docs/tutorial/directchat/second.nim index 3addd81a3..d1858baae 100644 --- a/docs/tutorial/directchat/second.nim +++ b/docs/tutorial/directchat/second.nim @@ -2,23 +2,23 @@ when not(compileOption("threads")): {.fatal: "Please, compile this program with the --threads:on option!".} import tables, strformat, strutils -import chronos -import ../libp2p/[switch, - multistream, - crypto/crypto, - protocols/identify, - connection, - transports/transport, - transports/tcptransport, - multiaddress, - peerinfo, - peer, - protocols/protocol, - protocols/secure/secure, - protocols/secure/secio, - muxers/muxer, - muxers/mplex/mplex, - muxers/mplex/types] +import chronos +import ../libp2p/[switch, + multistream, + crypto/crypto, + protocols/identify, + connection, + transports/transport, + transports/tcptransport, + multiaddress, + peerinfo, + peer, + protocols/protocol, + protocols/secure/secure, + protocols/secure/secio, + muxers/muxer, + muxers/mplex/mplex, + muxers/mplex/types] const ChatCodec = "/nim-libp2p/chat/1.0.0" const DefaultAddr = "/ip4/127.0.0.1/tcp/55505" @@ -33,12 +33,12 @@ const Help = """ type ChatProto = ref object of LPProtocol switch: Switch # a single entry point for dialing and listening to peer - transp: StreamTransport # transport streams between read & write file descriptor + transp: StreamTransport # transport streams between read & write file descriptor conn: Connection # create and close read & write stream connected: bool # if the node is connected to another peer started: bool # if the node has started -# copied from https://github.com/status-im/nim-beacon-chain/blob/0ed657e953740a92458f23033d47483ffa17ccb0/beacon_chain/eth2_network.nim#L109-L115 +# copied from https://github.com/status-im/nim-beacon-chain/blob/0ed657e953740a92458f23033d47483ffa17ccb0/beacon_chain/eth2_network.nim#L109-L115 proc initAddress(T: type MultiAddress, str: string): T = let address = MultiAddress.init(str) if IPFS.match(address) and matchPartial(multiaddress.TCP, address): @@ -60,7 +60,7 @@ proc dialPeer(p: ChatProto, address: string) {.async.} = proc readAndPrint(p: ChatProto) {.async.} = while true: while p.connected: - echo cast[string](await p.conn.readLp()) + echo cast[string](await p.conn.readLp(1024)) await sleepAsync(100.millis) proc writeAndPrint(p: ChatProto) {.async.} = @@ -140,12 +140,11 @@ proc main() {.async.} = let (rfd, wfd) = createAsyncPipe() if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe: raise newException(ValueError, "Could not initialize pipe!") - + var thread: Thread[AsyncFD] thread.createThread(readInput, wfd) - + await processInput(rfd) when isMainModule: # isMainModule = true when the module is compiled as the main file waitFor(main()) - \ No newline at end of file diff --git a/docs/tutorial/second.nim b/docs/tutorial/second.nim index 3addd81a3..d1858baae 100644 --- a/docs/tutorial/second.nim +++ b/docs/tutorial/second.nim @@ -2,23 +2,23 @@ when not(compileOption("threads")): {.fatal: "Please, compile this program with the --threads:on option!".} import tables, strformat, strutils -import chronos -import ../libp2p/[switch, - multistream, - crypto/crypto, - protocols/identify, - connection, - transports/transport, - transports/tcptransport, - multiaddress, - peerinfo, - peer, - protocols/protocol, - protocols/secure/secure, - protocols/secure/secio, - muxers/muxer, - muxers/mplex/mplex, - muxers/mplex/types] +import chronos +import ../libp2p/[switch, + multistream, + crypto/crypto, + protocols/identify, + connection, + transports/transport, + transports/tcptransport, + multiaddress, + peerinfo, + peer, + protocols/protocol, + protocols/secure/secure, + protocols/secure/secio, + muxers/muxer, + muxers/mplex/mplex, + muxers/mplex/types] const ChatCodec = "/nim-libp2p/chat/1.0.0" const DefaultAddr = "/ip4/127.0.0.1/tcp/55505" @@ -33,12 +33,12 @@ const Help = """ type ChatProto = ref object of LPProtocol switch: Switch # a single entry point for dialing and listening to peer - transp: StreamTransport # transport streams between read & write file descriptor + transp: StreamTransport # transport streams between read & write file descriptor conn: Connection # create and close read & write stream connected: bool # if the node is connected to another peer started: bool # if the node has started -# copied from https://github.com/status-im/nim-beacon-chain/blob/0ed657e953740a92458f23033d47483ffa17ccb0/beacon_chain/eth2_network.nim#L109-L115 +# copied from https://github.com/status-im/nim-beacon-chain/blob/0ed657e953740a92458f23033d47483ffa17ccb0/beacon_chain/eth2_network.nim#L109-L115 proc initAddress(T: type MultiAddress, str: string): T = let address = MultiAddress.init(str) if IPFS.match(address) and matchPartial(multiaddress.TCP, address): @@ -60,7 +60,7 @@ proc dialPeer(p: ChatProto, address: string) {.async.} = proc readAndPrint(p: ChatProto) {.async.} = while true: while p.connected: - echo cast[string](await p.conn.readLp()) + echo cast[string](await p.conn.readLp(1024)) await sleepAsync(100.millis) proc writeAndPrint(p: ChatProto) {.async.} = @@ -140,12 +140,11 @@ proc main() {.async.} = let (rfd, wfd) = createAsyncPipe() if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe: raise newException(ValueError, "Could not initialize pipe!") - + var thread: Thread[AsyncFD] thread.createThread(readInput, wfd) - + await processInput(rfd) when isMainModule: # isMainModule = true when the module is compiled as the main file waitFor(main()) - \ No newline at end of file diff --git a/examples/directchat.nim b/examples/directchat.nim index 47bbae541..a0d7f9e43 100644 --- a/examples/directchat.nim +++ b/examples/directchat.nim @@ -12,14 +12,14 @@ import ../libp2p/[switch, # manage transports, a single entry transports/transport, # listen and dial to other peers using p2p protocol transports/tcptransport, # listen and dial to other peers using client-server protocol multiaddress, # encode different addressing schemes. For example, /ip4/7.7.7.7/tcp/6543 means it is using IPv4 protocol and TCP - peerinfo, # manage the information of a peer, such as peer ID and public / private key + peerinfo, # manage the information of a peer, such as peer ID and public / private key peer, # Implement how peers interact protocols/protocol, # define the protocol base type protocols/secure/secure, # define the protocol of secure connection protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS muxers/muxer, # define an interface for stream multiplexing, allowing peers to offer many protocols over a single connection - muxers/mplex/mplex, # implement stream multiplexing - muxers/mplex/types] # define some contants and message types for stream multiplexing + muxers/mplex/mplex, # implement stream multiplexing + muxers/mplex/types] # define some contants and message types for stream multiplexing const ChatCodec = "/nim-libp2p/chat/1.0.0" const DefaultAddr = "/ip4/127.0.0.1/tcp/55505" @@ -34,7 +34,7 @@ const Help = """ type ChatProto = ref object of LPProtocol switch: Switch # a single entry point for dialing and listening to peer - transp: StreamTransport # transport streams between read & write file descriptor + transp: StreamTransport # transport streams between read & write file descriptor conn: Connection # create and close read & write stream connected: bool # if the node is connected to another peer started: bool # if the node has started @@ -63,7 +63,7 @@ proc readAndPrint(p: ChatProto) {.async.} = while p.connected: # TODO: echo &"{p.id} -> " - echo cast[string](await p.conn.readLp()) + echo cast[string](await p.conn.readLp(1024)) await sleepAsync(100.millis) proc writeAndPrint(p: ChatProto) {.async.} = diff --git a/libp2p/connection.nim b/libp2p/connection.nim index d30f223dc..f6230921d 100644 --- a/libp2p/connection.nim +++ b/libp2p/connection.nim @@ -23,7 +23,6 @@ logScope: topic = "Connection" const - DefaultReadSize* = 1 shl 20 ConnectionTrackerName* = "libp2p.connection" type @@ -35,9 +34,6 @@ type # (we got many actually :-)) readLoops*: seq[Future[void]] - InvalidVarintException = object of LPStreamError - InvalidVarintSizeException = object of LPStreamError - ConnectionTracker* = ref object of TrackerBase opened*: uint64 closed*: uint64 @@ -68,12 +64,6 @@ proc setupConnectionTracker(): ConnectionTracker = declareGauge libp2p_open_connection, "open Connection instances" -proc newInvalidVarintException*(): ref InvalidVarintException = - newException(InvalidVarintException, "Unable to parse varint") - -proc newInvalidVarintSizeException*(): ref InvalidVarintSizeException = - newException(InvalidVarintSizeException, "Wrong varint size") - proc bindStreamClose(conn: Connection) {.async.} = # bind stream's close event to connection's close # to ensure correct close propagation @@ -155,42 +145,6 @@ method close*(s: Connection) {.async, gcsafe.} = s.peerInfo.id else: "" libp2p_open_connection.dec() -proc readLp*(s: Connection): Future[seq[byte]] {.async, gcsafe.} = - ## read lenght prefixed msg - var - size: uint - length: int - res: VarintStatus - buff = newSeq[byte](10) - try: - for i in 0.. DefaultReadSize: - raise newInvalidVarintSizeException() - buff.setLen(size) - if size > 0.uint: - trace "reading exact bytes from stream", size = size - await s.readExactly(addr buff[0], int(size)) - return buff - except LPStreamIncompleteError as exc: - trace "remote connection ended unexpectedly", exc = exc.msg - raise exc - except LPStreamReadError as exc: - trace "couldn't read from stream", exc = exc.msg - raise exc - -proc writeLp*(s: Connection, msg: string | seq[byte]): Future[void] {.gcsafe.} = - ## write lenght prefixed - var buf = initVBuffer() - buf.writeSeq(msg) - buf.finish() - s.write(buf.buffer) - method getObservedAddrs*(c: Connection): Future[MultiAddress] {.base, async, gcsafe.} = ## get resolved multiaddresses for the connection result = c.observedAddrs diff --git a/libp2p/multistream.nim b/libp2p/multistream.nim index dbb555200..41ada6e48 100644 --- a/libp2p/multistream.nim +++ b/libp2p/multistream.nim @@ -58,7 +58,7 @@ proc select*(m: MultistreamSelect, trace "selecting proto", proto = proto await conn.writeLp((proto[0] & "\n")) # select proto - result = cast[string]((await conn.readLp())) # read ms header + result = cast[string]((await conn.readLp(1024))) # read ms header result.removeSuffix("\n") if result != Codec: error "handshake failed", codec = result.toHex() @@ -67,7 +67,7 @@ proc select*(m: MultistreamSelect, if proto.len() == 0: # no protocols, must be a handshake call return - result = string.fromBytes(await conn.readLp()) # read the first proto + result = string.fromBytes(await conn.readLp(1024)) # read the first proto trace "reading first requested proto" result.removeSuffix("\n") if result == proto[0]: @@ -78,7 +78,7 @@ proc select*(m: MultistreamSelect, trace "selecting one of several protos" for p in proto[1.. 0: list.add(s) @@ -115,7 +115,7 @@ proc handle*(m: MultistreamSelect, conn: Connection) {.async, gcsafe.} = trace "handle: starting multistream handling" tryAndWarn "multistream handle": while not conn.closed: - var ms = string.fromBytes(await conn.readLp()) + var ms = string.fromBytes(await conn.readLp(1024)) ms.removeSuffix("\n") trace "handle: got request for ", ms diff --git a/libp2p/muxers/mplex/coder.nim b/libp2p/muxers/mplex/coder.nim index d7a6040fc..f64391ad3 100644 --- a/libp2p/muxers/mplex/coder.nim +++ b/libp2p/muxers/mplex/coder.nim @@ -11,6 +11,7 @@ import chronos import nimcrypto/utils, chronicles import types, ../../connection, + ../../utility, ../../varint, ../../vbuffer, ../../stream/lpstream @@ -29,46 +30,18 @@ type proc newInvalidMplexMsgType*(): ref InvalidMplexMsgType = newException(InvalidMplexMsgType, "invalid message type") -proc readMplexVarint(conn: Connection): Future[uint64] {.async, gcsafe.} = - var - varint: uint - length: int - res: VarintStatus - buffer = newSeq[byte](10) - - try: - for i in 0.. DefaultReadSize: - raise newInvalidVarintSizeException() - - var data: seq[byte] = newSeq[byte](dataLenVarint.int) - if dataLenVarint.int > 0: - await conn.readExactly(addr data[0], dataLenVarint.int) - trace "read data", data = data.len + let data = await conn.readLp(MaxMsgSize) + trace "read data", dataLen = data.len, data = shortLog(data) let msgType = header and 0x7 if msgType.int > ord(MessageType.ResetOut): raise newInvalidMplexMsgType() - result = (uint64(header shr 3), MessageType(msgType), data) + result = (header shr 3, MessageType(msgType), data) proc writeMsg*(conn: Connection, id: uint64, diff --git a/libp2p/muxers/mplex/types.nim b/libp2p/muxers/mplex/types.nim index 559e20249..86709effc 100644 --- a/libp2p/muxers/mplex/types.nim +++ b/libp2p/muxers/mplex/types.nim @@ -9,6 +9,7 @@ import chronos +# https://github.com/libp2p/specs/tree/master/mplex#writing-to-a-stream const MaxMsgSize* = 1 shl 20 # 1mb const MaxChannels* = 1000 const MplexCodec* = "/mplex/6.7.0" diff --git a/libp2p/protocols/identify.nim b/libp2p/protocols/identify.nim index 7eae82146..d4a85e18c 100644 --- a/libp2p/protocols/identify.nim +++ b/libp2p/protocols/identify.nim @@ -124,7 +124,7 @@ proc identify*(p: Identify, conn: Connection, remotePeerInfo: PeerInfo): Future[IdentifyInfo] {.async, gcsafe.} = trace "initiating identify" - var message = await conn.readLp() + var message = await conn.readLp(64*1024) if len(message) == 0: trace "identify: Invalid or empty message received!" raise newException(IdentityInvalidMsgError, diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index ce5c156e4..3b761d9dd 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -56,7 +56,7 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = try: while not conn.closed: trace "waiting for data", peer = p.id, closed = conn.closed - let data = await conn.readLp() + let data = await conn.readLp(64 * 1024) let hexData = data.toHex() trace "read data from peer", peer = p.id, data = data.shortLog if $hexData.hash in p.recvdRpcCache: diff --git a/libp2p/stream/lpstream.nim b/libp2p/stream/lpstream.nim index 1e934d429..cfb452437 100644 --- a/libp2p/stream/lpstream.nim +++ b/libp2p/stream/lpstream.nim @@ -8,7 +8,9 @@ ## those terms. import oids -import chronicles, chronos +import chronicles, chronos, strformat +import ../varint, + ../vbuffer type LPStream* = ref object of RootObj @@ -27,6 +29,9 @@ type par*: ref Exception LPStreamEOFError* = object of LPStreamError + InvalidVarintError* = object of LPStreamError + MaxSizeError* = object of LPStreamError + proc newLPStreamReadError*(p: ref Exception): ref Exception = var w = newException(LPStreamReadError, "Read stream failed") w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg @@ -109,6 +114,45 @@ proc readLine*(s: LPStream, limit = 0, sep = "\r\n"): Future[string] {.async, de except LPStreamIncompleteError, LPStreamReadError: discard # EOF, in which case we should return whatever we read so far.. +proc readVarint*(conn: LPStream): Future[uint64] {.async, gcsafe.} = + var + varint: uint64 + length: int + buffer: array[10, byte] + + for i in 0.. maxLen: + raise (ref MaxSizeError)(msg: "Message exceeds maximum length") + + if length == 0: + return + + var res = newSeq[byte](length) + await s.readExactly(addr res[0], res.len) + return res + +proc writeLp*(s: LPStream, msg: string | seq[byte]): Future[void] {.gcsafe.} = + ## write length prefixed + var buf = initVBuffer() + buf.writeSeq(msg) + buf.finish() + s.write(buf.buffer) + method write*(s: LPStream, msg: seq[byte]) {.base, async.} = doAssert(false, "not implemented!") diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index 1f3ab2352..ef9456b01 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -7,6 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +{.used.} + import unittest, sequtils, options, tables, sets import chronos import utils, diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 06104bd90..a4b317040 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -1,5 +1,7 @@ include ../../libp2p/protocols/pubsub/gossipsub +{.used.} + import unittest import ../../libp2p/errors import ../../libp2p/stream/bufferstream diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 82c629964..05d5c490d 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -7,6 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +{.used.} + import unittest, sequtils, options, tables, sets import chronos import chronicles diff --git a/tests/pubsub/testmcache.nim b/tests/pubsub/testmcache.nim index 226063aad..7eec8d342 100644 --- a/tests/pubsub/testmcache.nim +++ b/tests/pubsub/testmcache.nim @@ -1,3 +1,5 @@ +{.used.} + import options, sets, sequtils import unittest import ../../libp2p/[peer, diff --git a/tests/testinterop.nim b/tests/testinterop.nim index 9c4c98d99..362d96fed 100644 --- a/tests/testinterop.nim +++ b/tests/testinterop.nim @@ -55,7 +55,7 @@ proc readLp*(s: StreamTransport): Future[seq[byte]] {.async, gcsafe.} = if res == VarintStatus.Success: break if res != VarintStatus.Success: - raise newInvalidVarintException() + raise (ref InvalidVarintError)() result.setLen(size) if size > 0.uint: await s.readExactly(addr result[0], int(size)) @@ -209,11 +209,11 @@ suite "Interop": daemonPeer.addresses), protos[0]) await conn.writeLp("test 1") - check "test 2" == cast[string]((await conn.readLp())) + check "test 2" == cast[string]((await conn.readLp(1024))) await sleepAsync(10.millis) await conn.writeLp("test 3") - check "test 4" == cast[string]((await conn.readLp())) + check "test 4" == cast[string]((await conn.readLp(1024))) await wait(testFuture, 10.secs) await nativeNode.stop() @@ -271,7 +271,7 @@ suite "Interop": var testFuture = newFuture[string]("test.future") proc nativeHandler(conn: Connection, proto: string) {.async.} = - var line = cast[string](await conn.readLp()) + var line = cast[string](await conn.readLp(1024)) check line == test testFuture.complete(line) await conn.close() @@ -306,10 +306,10 @@ suite "Interop": var testFuture = newFuture[void]("test.future") proc nativeHandler(conn: Connection, proto: string) {.async.} = - check "test 1" == cast[string](await conn.readLp()) + check "test 1" == cast[string](await conn.readLp(1024)) await conn.writeLp(cast[seq[byte]]("test 2")) - check "test 3" == cast[string](await conn.readLp()) + check "test 3" == cast[string](await conn.readLp(1024)) await conn.writeLp(cast[seq[byte]]("test 4")) testFuture.complete() @@ -355,7 +355,7 @@ suite "Interop": var testFuture = newFuture[int]("test.future") proc nativeHandler(conn: Connection, proto: string) {.async.} = while count < 10: - var line = cast[string](await conn.readLp()) + var line = cast[string](await conn.readLp(1024)) check line == test await conn.writeLp(cast[seq[byte]](test)) count.inc() diff --git a/tests/testmplex.nim b/tests/testmplex.nim index 83ac55860..c5c8f5154 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -153,7 +153,7 @@ suite "Mplex": proc connHandler(conn: Connection) {.async, gcsafe.} = proc handleMplexListen(stream: Connection) {.async, gcsafe.} = - let msg = await stream.readLp() + let msg = await stream.readLp(1024) check cast[string](msg) == "Hello from stream!" await stream.close() done.complete() @@ -200,7 +200,7 @@ suite "Mplex": proc connHandler(conn: Connection) {.async, gcsafe.} = proc handleMplexListen(stream: Connection) {.async, gcsafe.} = - let msg = await stream.readLp() + let msg = await stream.readLp(1024) check cast[string](msg) == "Hello from stream!" await stream.close() done.complete() @@ -251,7 +251,7 @@ suite "Mplex": proc handleMplexListen(stream: Connection) {.async, gcsafe.} = defer: await stream.close() - let msg = await stream.readLp() + let msg = await stream.readLp(MaxMsgSize) check msg == bigseq trace "Bigseq check passed!" listenJob.complete() @@ -312,7 +312,7 @@ suite "Mplex": let mplexDial = newMplex(conn) let dialFut = mplexDial.handle() let stream = await mplexDial.newStream("DIALER") - let msg = cast[string](await stream.readLp()) + let msg = cast[string](await stream.readLp(1024)) check msg == "Hello from stream!" # await dialFut @@ -339,7 +339,7 @@ suite "Mplex": var listenConn: Connection proc connHandler(conn: Connection) {.async, gcsafe.} = proc handleMplexListen(stream: Connection) {.async, gcsafe.} = - let msg = await stream.readLp() + let msg = await stream.readLp(1024) check cast[string](msg) == &"stream {count}!" count.inc await stream.close() @@ -386,7 +386,7 @@ suite "Mplex": proc connHandler(conn: Connection) {.async, gcsafe.} = listenConn = conn proc handleMplexListen(stream: Connection) {.async, gcsafe.} = - let msg = await stream.readLp() + let msg = await stream.readLp(1024) check cast[string](msg) == &"stream {count} from dialer!" await stream.writeLp(&"stream {count} from listener!") count.inc @@ -411,7 +411,7 @@ suite "Mplex": for i in 1..10: let stream = await mplexDial.newStream("dialer stream") await stream.writeLp(&"stream {i} from dialer!") - let msg = await stream.readLp() + let msg = await stream.readLp(1024) check cast[string](msg) == &"stream {i} from listener!" await stream.close() @@ -473,7 +473,7 @@ suite "Mplex": const MsgSize = 1024 proc connHandler(conn: Connection) {.async, gcsafe.} = proc handleMplexListen(stream: Connection) {.async, gcsafe.} = - let msg = await stream.readLp() + let msg = await stream.readLp(MsgSize) check msg.len == MsgSize await stream.close() complete.complete() @@ -542,7 +542,7 @@ suite "Mplex": const MsgSize = 512 proc connHandler(conn: Connection) {.async, gcsafe.} = proc handleMplexListen(stream: Connection) {.async, gcsafe.} = - let msg = await stream.readLp() + let msg = await stream.readLp(MsgSize) check msg.len == MsgSize await stream.close() complete.complete() diff --git a/tests/testmultistream.nim b/tests/testmultistream.nim index eaba30e29..38c4caf26 100644 --- a/tests/testmultistream.nim +++ b/tests/testmultistream.nim @@ -269,7 +269,7 @@ suite "Multistream select": check (await msDial.select(conn, "/test/proto/1.0.0")) == true - let hello = cast[string](await conn.readLp()) + let hello = cast[string](await conn.readLp(1024)) result = hello == "Hello!" await conn.close() @@ -358,7 +358,7 @@ suite "Multistream select": check (await msDial.select(conn, @["/test/proto/1.0.0", "/test/no/proto/1.0.0"])) == "/test/proto/1.0.0" - let hello = cast[string](await conn.readLp()) + let hello = cast[string](await conn.readLp(1024)) result = hello == "Hello!" await conn.close() @@ -397,7 +397,7 @@ suite "Multistream select": check (await msDial.select(conn, @["/test/proto2/1.0.0", "/test/proto1/1.0.0"])) == "/test/proto2/1.0.0" - result = cast[string](await conn.readLp()) == "Hello from /test/proto2/1.0.0!" + result = cast[string](await conn.readLp(1024)) == "Hello from /test/proto2/1.0.0!" await conn.close() await transport2.close() diff --git a/tests/testnoise.nim b/tests/testnoise.nim index 520cf5695..28c8f50a4 100644 --- a/tests/testnoise.nim +++ b/tests/testnoise.nim @@ -41,7 +41,7 @@ type method init(p: TestProto) {.gcsafe.} = proc handle(conn: Connection, proto: string) {.async, gcsafe.} = - let msg = cast[string](await conn.readLp()) + let msg = cast[string](await conn.readLp(1024)) check "Hello!" == msg await conn.writeLp("Hello!") await conn.close() @@ -166,7 +166,7 @@ suite "Noise": let sconn = await serverNoise.secure(conn, false) defer: await sconn.close() - let msg = await sconn.readLp() + let msg = await sconn.readLp(1024*1024) check msg == hugePayload readTask.complete() @@ -214,7 +214,7 @@ suite "Noise": awaiters.add(await switch2.start()) let conn = await switch2.dial(switch1.peerInfo, TestCodec) await conn.writeLp("Hello!") - let msg = cast[string](await conn.readLp()) + let msg = cast[string](await conn.readLp(1024)) check "Hello!" == msg await allFuturesThrowing(switch1.stop(), switch2.stop()) diff --git a/tests/testswitch.nim b/tests/testswitch.nim index 9f3ab7a16..a1d133c1b 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -68,7 +68,7 @@ suite "Switch": let done = newFuture[void]() proc handle(conn: Connection, proto: string) {.async, gcsafe.} = - let msg = cast[string](await conn.readLp()) + let msg = cast[string](await conn.readLp(1024)) check "Hello!" == msg await conn.writeLp("Hello!") await conn.close() @@ -87,7 +87,7 @@ suite "Switch": try: await conn.writeLp("Hello!") - let msg = cast[string](await conn.readLp()) + let msg = cast[string](await conn.readLp(1024)) check "Hello!" == msg result = true except LPStreamError: @@ -118,7 +118,7 @@ suite "Switch": (switch1, peerInfo1) = createSwitch(ma1) proc handle(conn: Connection, proto: string) {.async, gcsafe.} = - let msg = cast[string](await conn.readLp()) + let msg = cast[string](await conn.readLp(1024)) check "Hello!" == msg await conn.writeLp("Hello!") await conn.close() @@ -136,7 +136,7 @@ suite "Switch": try: await conn.writeLp("Hello!") - let msg = cast[string](await conn.readLp()) + let msg = cast[string](await conn.readLp(1024)) check "Hello!" == msg result = true except LPStreamError: