From ba071cafa61629d5631fb8d33c4625029a7c5cee Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Fri, 17 Jul 2020 12:44:41 -0600 Subject: [PATCH] Channel timeout (#278) * add support for channel timeouts * tests for channel timeout * add timeouts to standard switch * fix mplex init * cleanup timer on stream close * add comment for `isConnected` * move cleanup event --- examples/directchat.nim | 2 +- libp2p/muxers/mplex/lpchannel.nim | 212 +++++++++++++++++++++--------- libp2p/muxers/mplex/mplex.nim | 53 +++++--- libp2p/muxers/muxer.nim | 3 + libp2p/standard_setup.nim | 9 +- libp2p/switch.nim | 4 + tests/testinterop.nim | 34 ++++- tests/testmplex.nim | 57 +++++--- tests/testnoise.nim | 2 +- tests/testswitch.nim | 4 +- 10 files changed, 261 insertions(+), 119 deletions(-) diff --git a/examples/directchat.nim b/examples/directchat.nim index 6843bed45..fe1c35374 100644 --- a/examples/directchat.nim +++ b/examples/directchat.nim @@ -172,7 +172,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = # a constructor for building different multiplexers under various connections proc createMplex(conn: Connection): Muxer = - result = newMplex(conn) + result = Mplex.init(conn) let mplexProvider = newMuxerProvider(createMplex, MplexCodec) let transports = @[Transport(TcpTransport.init())] diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index ff63946d3..43ec0a21a 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -11,6 +11,7 @@ import oids, deques import chronos, chronicles, metrics import types, coder, + ../muxer, nimcrypto/utils, ../../stream/connection, ../../stream/bufferstream, @@ -45,6 +46,8 @@ logScope: type LPChannel* = ref object of BufferStream id*: uint64 # channel id + timeout: Duration # channel timeout if no activity + activity: bool # reset every time data is sent or received name*: string # name of the channel (for debugging) conn*: Connection # wrapped connection used to for writing initiator*: bool # initiated remotely or locally flag @@ -54,6 +57,8 @@ type msgCode*: MessageType # cached in/out message code closeCode*: MessageType # cached in/out close code resetCode*: MessageType # cached in/out reset code + timerFut: Future[void] # the current timer instanse + timerTaskFut: Future[void] # the current timer instanse proc open*(s: LPChannel) {.async, gcsafe.} @@ -75,60 +80,6 @@ template withEOFExceptions(body: untyped): untyped = except LPStreamIncompleteError as exc: trace "incomplete message", exc = exc.msg -method reset*(s: LPChannel) {.base, async, gcsafe.} - -method initStream*(s: LPChannel) = - if s.objName.len == 0: - s.objName = "LPChannel" - - procCall BufferStream(s).initStream() - -proc newChannel*(id: uint64, - conn: Connection, - initiator: bool, - name: string = "", - size: int = DefaultBufferSize, - lazy: bool = false): LPChannel = - result = LPChannel(id: id, - name: name, - conn: conn, - initiator: initiator, - msgCode: if initiator: MessageType.MsgOut else: MessageType.MsgIn, - closeCode: if initiator: MessageType.CloseOut else: MessageType.CloseIn, - resetCode: if initiator: MessageType.ResetOut else: MessageType.ResetIn, - isLazy: lazy) - - let chan = result - logScope: - id = chan.id - initiator = chan.initiator - name = chan.name - oid = $chan.oid - peer = $chan.conn.peerInfo - # stack = getStackTrace() - - proc writeHandler(data: seq[byte]): Future[void] {.async, gcsafe.} = - try: - if chan.isLazy and not(chan.isOpen): - await chan.open() - - # writes should happen in sequence - trace "sending data" - - await conn.writeMsg(chan.id, - chan.msgCode, - data).wait(2.minutes) # write header - except CatchableError as exc: - trace "exception in lpchannel write handler", exc = exc.msg - await chan.reset() - raise exc - - result.initBufferStream(writeHandler, size) - when chronicles.enabledLogLevel == LogLevel.TRACE: - result.name = if result.name.len > 0: result.name else: $result.oid - - trace "created new lpchannel" - proc closeMessage(s: LPChannel) {.async.} = logScope: id = s.id @@ -187,14 +138,19 @@ proc closeRemote*(s: LPChannel) {.async.} = # stack = getStackTrace() trace "got EOF, closing channel" - await s.drainBuffer() + try: + await s.drainBuffer() - s.isEof = true # set EOF immediately to prevent further reads - await s.close() # close local end + s.isEof = true # set EOF immediately to prevent further reads + await s.close() # close local end - # call to avoid leaks - await procCall BufferStream(s).close() # close parent bufferstream - trace "channel closed on EOF" + # call to avoid leaks + await procCall BufferStream(s).close() # close parent bufferstream + trace "channel closed on EOF" + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "exception closing remote channel", exc = exc.msg method closed*(s: LPChannel): bool = ## this emulates half-closed behavior @@ -223,12 +179,18 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} = # optimistic asyncCheck s.resetMessage() - # drain the buffer before closing - await s.drainBuffer() - await procCall BufferStream(s).close() + try: + # drain the buffer before closing + await s.drainBuffer() + await procCall BufferStream(s).close() - s.isEof = true - s.closedLocal = true + s.isEof = true + s.closedLocal = true + + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "exception in reset", exc = exc.msg trace "channel reset" @@ -263,3 +225,123 @@ method close*(s: LPChannel) {.async, gcsafe.} = s.closedLocal = true asyncCheck closeInternal() + +proc cleanupOnClose(s: LPChannel) {.async.} = + ## await this stream's close event + ## to cleanup timers and other resources + ## + + await s.closeEvent.wait() + + if not(isNil(s.timerFut)) and + not(s.timerFut.finished): + s.timerFut.cancel() + await s.timerTaskFut + +proc timeoutMonitor(s: LPChannel) {.async.} = + ## monitor the channel for innactivity + ## + ## if the timeout was hit, it means that + ## neither incoming nor outgoing activity + ## has been detected and the channel will + ## be reset + ## + + if not(isNil(s.timerFut)): + return + + try: + while true: + s.timerFut = sleepAsync(s.timeout) + await s.timerFut + if s.closed or s.atEof: + return + + if s.activity: + s.activity = false + continue + + break + + # reset channel on innactivity timeout + trace "channel timed out, resetting" + await s.reset() + except CatchableError as exc: + trace "exception in timeout", exc = exc.msg + +method initStream*(s: LPChannel) = + if s.objName.len == 0: + s.objName = "LPChannel" + + procCall BufferStream(s).initStream() + +method readOnce*(s: LPChannel, + pbytes: pointer, + nbytes: int): + Future[int] = + s.activity = true + procCall BufferStream(s).readOnce(pbytes, nbytes) + +method write*(s: LPChannel, msg: seq[byte]): Future[void] = + s.activity = true + procCall BufferStream(s).write(msg) + +proc init*( + L: type LPChannel, + id: uint64, + conn: Connection, + initiator: bool, + name: string = "", + size: int = DefaultBufferSize, + lazy: bool = false, + timeout: Duration = DefaultChanTimeout): LPChannel = + + let chann = L( + id: id, + name: name, + conn: conn, + initiator: initiator, + isLazy: lazy, + timeout: timeout, + msgCode: if initiator: MessageType.MsgOut else: MessageType.MsgIn, + closeCode: if initiator: MessageType.CloseOut else: MessageType.CloseIn, + resetCode: if initiator: MessageType.ResetOut else: MessageType.ResetIn, + dir: if initiator: Direction.Out else: Direction.In) + + logScope: + id = chann.id + initiator = chann.initiator + name = chann.name + oid = $chann.oid + peer = $chann.conn.peerInfo + # stack = getStackTrace() + + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = + try: + if chann.isLazy and not(chann.isOpen): + await chann.open() + + # writes should happen in sequence + trace "sending data" + + await conn.writeMsg(chann.id, + chann.msgCode, + data) + + except CatchableError as exc: + trace "exception in lpchannel write handler", exc = exc.msg + await chann.reset() + raise exc + + chann.initBufferStream(writeHandler, size) + when chronicles.enabledLogLevel == LogLevel.TRACE: + chann.name = if chann.name.len > 0: chann.name else: $chann.oid + + # launch task to cancel and cleanup + # timer on stream close + asyncCheck chann.cleanupOnClose() + + chann.timerTaskFut = chann.timeoutMonitor() + trace "created new lpchannel" + + return chann diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index c11172da3..708a15b59 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -18,6 +18,8 @@ import ../muxer, types, lpchannel +export muxer + logScope: topics = "mplex" @@ -29,9 +31,10 @@ type local: Table[uint64, LPChannel] currentId*: uint64 maxChannels*: uint64 + inChannTimeout: Duration + outChannTimeout: Duration isClosed: bool - when chronicles.enabledLogLevel == LogLevel.TRACE: - oid*: Oid + oid*: Oid proc getChannelList(m: Mplex, initiator: bool): var Table[uint64, LPChannel] = if initiator: @@ -45,7 +48,8 @@ proc newStreamInternal*(m: Mplex, initiator: bool = true, chanId: uint64 = 0, name: string = "", - lazy: bool = false): + lazy: bool = false, + timeout: Duration): Future[LPChannel] {.async, gcsafe.} = ## create new channel/stream ## @@ -57,11 +61,13 @@ proc newStreamInternal*(m: Mplex, initiator = initiator, name = name, oid = $m.oid - result = newChannel(id, - m.connection, - initiator, - name, - lazy = lazy) + result = LPChannel.init( + id, + m.connection, + initiator, + name, + lazy = lazy, + timeout = timeout) result.peerInfo = m.connection.peerInfo result.observedAddr = m.connection.observedAddr @@ -142,7 +148,11 @@ method handle*(m: Mplex) {.async, gcsafe.} = case msgType: of MessageType.New: let name = string.fromBytes(data) - channel = await m.newStreamInternal(false, id, name) + channel = await m.newStreamInternal( + false, + id, + name, + timeout = m.outChannTimeout) trace "created channel", name = channel.name, oid = $channel.oid @@ -189,21 +199,24 @@ method handle*(m: Mplex) {.async, gcsafe.} = except CatchableError as exc: trace "Exception occurred", exception = exc.msg, oid = $m.oid -proc newMplex*(conn: Connection, - maxChanns: uint = MaxChannels): Mplex = - new result - result.connection = conn - result.maxChannels = maxChanns - result.remote = initTable[uint64, LPChannel]() - result.local = initTable[uint64, LPChannel]() - - when chronicles.enabledLogLevel == LogLevel.TRACE: - result.oid = genOid() +proc init*(M: type Mplex, + conn: Connection, + maxChanns: uint = MaxChannels, + inTimeout, outTimeout: Duration = DefaultChanTimeout): Mplex = + M(connection: conn, + maxChannels: maxChanns, + inChannTimeout: inTimeout, + outChannTimeout: outTimeout, + remote: initTable[uint64, LPChannel](), + local: initTable[uint64, LPChannel](), + oid: genOid()) method newStream*(m: Mplex, name: string = "", lazy: bool = false): Future[Connection] {.async, gcsafe.} = - let channel = await m.newStreamInternal(lazy = lazy) + let channel = await m.newStreamInternal( + lazy = lazy, timeout = m.inChannTimeout) + if not lazy: await channel.open() diff --git a/libp2p/muxers/muxer.nim b/libp2p/muxers/muxer.nim index 001fbc761..a22576630 100644 --- a/libp2p/muxers/muxer.nim +++ b/libp2p/muxers/muxer.nim @@ -15,6 +15,9 @@ import ../protocols/protocol, logScope: topics = "muxer" +const + DefaultChanTimeout* = 1.minutes + type StreamHandler* = proc(conn: Connection): Future[void] {.gcsafe.} MuxerHandler* = proc(muxer: Muxer): Future[void] {.gcsafe.} diff --git a/libp2p/standard_setup.nim b/libp2p/standard_setup.nim index fc9d2eeb2..702673f7a 100644 --- a/libp2p/standard_setup.nim +++ b/libp2p/standard_setup.nim @@ -37,9 +37,14 @@ proc newStandardSwitch*(privKey = none(PrivateKey), sign = libp2p_pubsub_sign, transportFlags: set[ServerFlags] = {}, msgIdProvider: MsgIdProvider = defaultMsgIdProvider, - rng = newRng()): Switch = + rng = newRng(), + inTimeout: Duration = 1.minutes, + outTimeout: Duration = 5.minutes): Switch = proc createMplex(conn: Connection): Muxer = - newMplex(conn) + Mplex.init( + conn, + inTimeout = inTimeout, + outTimeout = outTimeout) if rng == nil: # newRng could fail raise (ref CatchableError)(msg: "Cannot initialize RNG") diff --git a/libp2p/switch.nim b/libp2p/switch.nim index a45ce449e..ea40b8410 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -71,6 +71,10 @@ proc cleanupPubSubPeer(s: Switch, conn: Connection) {.async.} = await s.pubSub.get().unsubscribePeer(conn.peerInfo) proc isConnected*(s: Switch, peer: PeerInfo): bool = + ## returns true if the peer has one or more + ## associated connections (sockets) + ## + peer.peerId in s.connManager proc secure(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = diff --git a/tests/testinterop.nim b/tests/testinterop.nim index fc3bef471..c0b4fd670 100644 --- a/tests/testinterop.nim +++ b/tests/testinterop.nim @@ -71,7 +71,11 @@ proc testPubSubDaemonPublish(gossip: bool = false, let daemonNode = await newDaemonApi(flags) let daemonPeer = await daemonNode.identity() - let nativeNode = newStandardSwitch(gossip = gossip, secureManagers = [SecureProtocol.Noise]) + let nativeNode = newStandardSwitch( + gossip = gossip, + secureManagers = [SecureProtocol.Noise], + outTimeout = 5.minutes) + let awaiters = nativeNode.start() let nativePeer = nativeNode.peerInfo @@ -122,7 +126,11 @@ proc testPubSubNodePublish(gossip: bool = false, let daemonNode = await newDaemonApi(flags) let daemonPeer = await daemonNode.identity() - let nativeNode = newStandardSwitch(gossip = gossip, secureManagers = [SecureProtocol.Secio]) + let nativeNode = newStandardSwitch( + gossip = gossip, + secureManagers = [SecureProtocol.Secio], + outTimeout = 5.minutes) + let awaiters = nativeNode.start() let nativePeer = nativeNode.peerInfo @@ -175,7 +183,10 @@ suite "Interop": proc runTests(): Future[bool] {.async.} = var protos = @["/test-stream"] - let nativeNode = newStandardSwitch(secureManagers = [SecureProtocol.Noise]) + let nativeNode = newStandardSwitch( + secureManagers = [SecureProtocol.Noise], + outTimeout = 5.minutes) + let awaiters = await nativeNode.start() let daemonNode = await newDaemonApi() let daemonPeer = await daemonNode.identity() @@ -228,7 +239,10 @@ suite "Interop": var expect = newString(len(buffer) - 2) copyMem(addr expect[0], addr buffer.buffer[0], len(expect)) - let nativeNode = newStandardSwitch(secureManagers = [SecureProtocol.Secio]) + let nativeNode = newStandardSwitch( + secureManagers = [SecureProtocol.Secio], + outTimeout = 5.minutes) + let awaiters = await nativeNode.start() let daemonNode = await newDaemonApi() @@ -275,7 +289,9 @@ suite "Interop": proto.handler = nativeHandler proto.codec = protos[0] # codec - let nativeNode = newStandardSwitch(secureManagers = [SecureProtocol.Noise]) + let nativeNode = newStandardSwitch( + secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes) + nativeNode.mount(proto) let awaiters = await nativeNode.start() @@ -317,7 +333,9 @@ suite "Interop": proto.handler = nativeHandler proto.codec = protos[0] # codec - let nativeNode = newStandardSwitch(secureManagers = [SecureProtocol.Secio]) + let nativeNode = newStandardSwitch( + secureManagers = [SecureProtocol.Secio], outTimeout = 5.minutes) + nativeNode.mount(proto) let awaiters = await nativeNode.start() @@ -366,7 +384,9 @@ suite "Interop": proto.handler = nativeHandler proto.codec = protos[0] # codec - let nativeNode = newStandardSwitch(secureManagers = [SecureProtocol.Noise]) + let nativeNode = newStandardSwitch( + secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes) + nativeNode.mount(proto) let awaiters = await nativeNode.start() diff --git a/tests/testmplex.nim b/tests/testmplex.nim index 987d8a734..82664ccfe 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -117,7 +117,7 @@ suite "Mplex": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) - chann = newChannel(1, conn, true) + chann = LPChannel.init(1, conn, true) await chann.close() try: await chann.write("Hello") @@ -137,7 +137,7 @@ suite "Mplex": proc (data: seq[byte]) {.gcsafe, async.} = result = nil ) - chann = newChannel(1, conn, true) + chann = LPChannel.init(1, conn, true) await chann.pushTo(("Hello!").toBytes) let closeFut = chann.closeRemote() @@ -161,7 +161,7 @@ suite "Mplex": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) - chann = newChannel(1, conn, true) + chann = LPChannel.init(1, conn, true) await chann.closeRemote() try: await chann.pushTo(@[byte(1)]) @@ -179,7 +179,7 @@ suite "Mplex": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) - chann = newChannel(1, conn, true) + chann = LPChannel.init(1, conn, true) await chann.reset() var data = newSeq[byte](1) @@ -199,7 +199,7 @@ suite "Mplex": proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard let conn = newBufferStream(writeHandler) - chann = newChannel(1, conn, true) + chann = LPChannel.init(1, conn, true) await chann.reset() try: await chann.write(("Hello!").toBytes) @@ -211,13 +211,28 @@ suite "Mplex": check: waitFor(testResetWrite()) == true + test "timeout, channel should reset": + proc testResetWrite(): Future[bool] {.async.} = + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard + let + conn = newBufferStream(writeHandler) + chann = LPChannel.init( + 1, conn, true, timeout = 100.millis) + + await chann.closeEvent.wait() + await conn.close() + result = true + + check: + waitFor(testResetWrite()) + test "e2e - read/write receiver": proc testNewStream() {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() var done = newFuture[void]() proc connHandler(conn: Connection) {.async, gcsafe.} = - let mplexListen = newMplex(conn) + let mplexListen = Mplex.init(conn) mplexListen.streamHandler = proc(stream: Connection) {.async, gcsafe.} = let msg = await stream.readLp(1024) @@ -234,7 +249,7 @@ suite "Mplex": let transport2: TcpTransport = TcpTransport.init() let conn = await transport2.dial(transport1.ma) - let mplexDial = newMplex(conn) + let mplexDial = Mplex.init(conn) let mplexDialFut = mplexDial.handle() let stream = await mplexDial.newStream() await stream.writeLp("HELLO") @@ -257,7 +272,7 @@ suite "Mplex": var done = newFuture[void]() proc connHandler(conn: Connection) {.async, gcsafe.} = - let mplexListen = newMplex(conn) + let mplexListen = Mplex.init(conn) mplexListen.streamHandler = proc(stream: Connection) {.async, gcsafe.} = let msg = await stream.readLp(1024) @@ -274,7 +289,7 @@ suite "Mplex": let transport2: TcpTransport = TcpTransport.init() let conn = await transport2.dial(transport1.ma) - let mplexDial = newMplex(conn) + let mplexDial = Mplex.init(conn) let stream = await mplexDial.newStream(lazy = true) let mplexDialFut = mplexDial.handle() check not LPChannel(stream).isOpen # assert lazy @@ -303,7 +318,7 @@ suite "Mplex": bigseq.add(uint8(rand(uint('A')..uint('z')))) proc connHandler(conn: Connection) {.async, gcsafe.} = - let mplexListen = newMplex(conn) + let mplexListen = Mplex.init(conn) mplexListen.streamHandler = proc(stream: Connection) {.async, gcsafe.} = let msg = await stream.readLp(MaxMsgSize) @@ -321,7 +336,7 @@ suite "Mplex": let transport2: TcpTransport = TcpTransport.init() let conn = await transport2.dial(transport1.ma) - let mplexDial = newMplex(conn) + let mplexDial = Mplex.init(conn) let mplexDialFut = mplexDial.handle() let stream = await mplexDial.newStream() @@ -347,7 +362,7 @@ suite "Mplex": let done = newFuture[void]() proc connHandler(conn: Connection) {.async, gcsafe.} = - let mplexListen = newMplex(conn) + let mplexListen = Mplex.init(conn) mplexListen.streamHandler = proc(stream: Connection) {.async, gcsafe.} = await stream.writeLp("Hello from stream!") @@ -363,7 +378,7 @@ suite "Mplex": let transport2: TcpTransport = TcpTransport.init() let conn = await transport2.dial(transport1.ma) - let mplexDial = newMplex(conn) + let mplexDial = Mplex.init(conn) let mplexDialFut = mplexDial.handle() let stream = await mplexDial.newStream("DIALER") let msg = string.fromBytes(await stream.readLp(1024)) @@ -387,7 +402,7 @@ suite "Mplex": let done = newFuture[void]() proc connHandler(conn: Connection) {.async, gcsafe.} = var count = 1 - let mplexListen = newMplex(conn) + let mplexListen = Mplex.init(conn) mplexListen.streamHandler = proc(stream: Connection) {.async, gcsafe.} = let msg = await stream.readLp(1024) @@ -406,7 +421,7 @@ suite "Mplex": let transport2: TcpTransport = TcpTransport.init() let conn = await transport2.dial(transport1.ma) - let mplexDial = newMplex(conn) + let mplexDial = Mplex.init(conn) # TODO: Reenable once half-closed is working properly let mplexDialFut = mplexDial.handle() for i in 1..10: @@ -431,7 +446,7 @@ suite "Mplex": let done = newFuture[void]() proc connHandler(conn: Connection) {.async, gcsafe.} = var count = 1 - let mplexListen = newMplex(conn) + let mplexListen = Mplex.init(conn) mplexListen.streamHandler = proc(stream: Connection) {.async, gcsafe.} = let msg = await stream.readLp(1024) @@ -451,7 +466,7 @@ suite "Mplex": let transport2: TcpTransport = TcpTransport.init() let conn = await transport2.dial(transport1.ma) - let mplexDial = newMplex(conn) + let mplexDial = Mplex.init(conn) let mplexDialFut = mplexDial.handle() for i in 1..10: let stream = await mplexDial.newStream("dialer stream") @@ -477,7 +492,7 @@ suite "Mplex": var complete = newFuture[void]() const MsgSize = 1024 proc connHandler(conn: Connection) {.async, gcsafe.} = - let mplexListen = newMplex(conn) + let mplexListen = Mplex.init(conn) mplexListen.streamHandler = proc(stream: Connection) {.async, gcsafe.} = let msg = await stream.readLp(MsgSize) @@ -494,7 +509,7 @@ suite "Mplex": let transport2: TcpTransport = TcpTransport.init() let conn = await transport2.dial(transport1.ma) - let mplexDial = newMplex(conn) + let mplexDial = Mplex.init(conn) let mplexDialFut = mplexDial.handle() let stream = await mplexDial.newStream() var bigseq = newSeqOfCap[uint8](MaxMsgSize + 1) @@ -547,7 +562,7 @@ suite "Mplex": var complete = newFuture[void]() const MsgSize = 512 proc connHandler(conn: Connection) {.async, gcsafe.} = - let mplexListen = newMplex(conn) + let mplexListen = Mplex.init(conn) mplexListen.streamHandler = proc(stream: Connection) {.async, gcsafe.} = let msg = await stream.readLp(MsgSize) @@ -564,7 +579,7 @@ suite "Mplex": let transport2: TcpTransport = TcpTransport.init() let conn = await transport2.dial(transport1.ma) - let mplexDial = newMplex(conn) + let mplexDial = Mplex.init(conn) let stream = await mplexDial.newStream() let mplexDialFut = mplexDial.handle() var bigseq = newSeqOfCap[uint8](MsgSize + 1) diff --git a/tests/testnoise.nim b/tests/testnoise.nim index d9d057e22..6da7a1e8c 100644 --- a/tests/testnoise.nim +++ b/tests/testnoise.nim @@ -54,7 +54,7 @@ proc createSwitch(ma: MultiAddress; outgoing: bool): (Switch, PeerInfo) = let identify = newIdentify(peerInfo) proc createMplex(conn: Connection): Muxer = - result = newMplex(conn) + result = Mplex.init(conn) let mplexProvider = newMuxerProvider(createMplex, MplexCodec) let transports = @[Transport(TcpTransport.init())] diff --git a/tests/testswitch.nim b/tests/testswitch.nim index 030c12274..a237ffd4c 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -114,14 +114,14 @@ suite "Switch": await sleepAsync(2.seconds) # wait a little for cleanup to happen var bufferTracker = getTracker(BufferStreamTrackerName) - echo bufferTracker.dump() + # echo bufferTracker.dump() # plus 4 for the pubsub streams check (BufferStreamTracker(bufferTracker).opened == (BufferStreamTracker(bufferTracker).closed + 4.uint64)) var connTracker = getTracker(ConnectionTrackerName) - echo connTracker.dump() + # echo connTracker.dump() # plus 8 is for the secured connection and the socket # and the pubsub streams that won't clean up until