From 90ac1d21de58ece0c3074418e4dce6fd39ab007a Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Thu, 20 Aug 2020 20:41:02 -0600 Subject: [PATCH] default channel size and pushTo timeout --- libp2p/muxers/mplex/mplex.nim | 49 +++++++++++++++++--------- tests/testmplex.nim | 65 +++++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 16 deletions(-) diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 5915f0042..cfb1baa4d 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -25,6 +25,7 @@ logScope: const MaxChannelCount = 200 + DefaultPushTimeout = 1.seconds when defined(libp2p_expensive_metrics): declareGauge(libp2p_mplex_channels, @@ -38,9 +39,11 @@ type currentId: uint64 inChannTimeout: Duration outChannTimeout: Duration + pushTimeout: Duration isClosed: bool oid*: Oid - maxChannCount: int + maxChanCount: int + chanSize: int proc newTooManyChannels(): ref TooManyChannels = newException(TooManyChannels, "max allowed channel count exceeded") @@ -80,7 +83,8 @@ proc newStreamInternal*(m: Mplex, initiator, name, lazy = lazy, - timeout = timeout) + timeout = timeout, + size = m.chanSize) result.peerInfo = m.connection.peerInfo result.observedAddr = m.connection.observedAddr @@ -113,7 +117,8 @@ proc handleStream(m: Mplex, chann: LPChannel) {.async.} = await chann.reset() method handle*(m: Mplex) {.async, gcsafe.} = - logScope: moid = $m.oid + logScope: + mplexOid = $m.oid trace "starting mplex main loop" try: @@ -130,27 +135,26 @@ method handle*(m: Mplex) {.async, gcsafe.} = logScope: id = id initiator = initiator - msgType = msgType + msgType = $msgType size = data.len trace "read message from connection", data = data.shortLog var channel = if MessageType(msgType) != MessageType.New: - let tmp = m.channels[initiator].getOrDefault(id, nil) - if tmp == nil: - trace "Channel not found, skipping" - continue - - tmp + m.channels[initiator].getOrDefault(id, nil) else: - if m.channels[false].len > m.maxChannCount - 1: + if m.channels[false].len > m.maxChanCount - 1: warn "too many channels created by remote peer", allowedMax = MaxChannelCount raise newTooManyChannels() let name = string.fromBytes(data) m.newStreamInternal(false, id, name, timeout = m.outChannTimeout) + if channel == nil: + trace "Channel not found, skipping" + continue + logScope: name = channel.name oid = $channel.oid @@ -168,9 +172,18 @@ method handle*(m: Mplex) {.async, gcsafe.} = warn "attempting to send a packet larger than allowed", allowed = MaxMsgSize raise newLPStreamLimitError() - trace "pushing data to channel" - await channel.pushTo(data) - trace "pushed data to channel" + try: + trace "pushing data to channel" + # The timeout on the pushTo bellow is to + # prevent slow readers from blocking the + # read loop and thus other readers that + # are wating for data. + await channel.pushTo(data).wait(m.pushTimeout) + trace "pushed data to channel" + except AsyncTimeoutError: + debug "slow reader detected, resetting channel" + await channel.reset() + continue of MessageType.CloseIn, MessageType.CloseOut: trace "closing channel" @@ -188,12 +201,16 @@ method handle*(m: Mplex) {.async, gcsafe.} = proc init*(M: type Mplex, conn: Connection, inTimeout, outTimeout: Duration = DefaultChanTimeout, - maxChannCount: int = MaxChannelCount): Mplex = + maxChanCount: int = MaxChannelCount, + chanSize: int = DefaultChannelSize, + pushTimeout: Duration = DefaultPushTimeout): Mplex = M(connection: conn, inChannTimeout: inTimeout, outChannTimeout: outTimeout, + pushTimeout: pushTimeout, oid: genOid(), - maxChannCount: maxChannCount) + maxChanCount: maxChanCount, + chanSize: chanSize) method newStream*(m: Mplex, name: string = "", diff --git a/tests/testmplex.nim b/tests/testmplex.nim index 0ea9131fd..51cdcf803 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -673,3 +673,68 @@ suite "Mplex": await listenFut waitFor(test()) + + test "e2e - slow channels should reset": + proc test() {.async.} = + let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + let hello = "HELLO".toBytes + + var done = newFuture[void]() + var fast = false + proc reader(wait: Duration, stream: Connection) {.async.} = + try: + var read: seq[byte] + await sleepAsync(wait) + read.add((await stream.readLp(hello.len))) # read byte by byte + check read == hello + except CatchableError as exc: + # echo exc.msg + discard + + proc writer(stream: Connection): Future[seq[byte]] {.async.} = + await stream.writeLp(hello) + return await stream.readLp(hello.len) + + proc connHandler(conn: Connection) {.async, gcsafe.} = + let mplexListen = Mplex.init(conn, chanSize = 2, pushTimeout = 10.millis) + mplexListen.streamHandler = proc(stream: Connection) + {.async, gcsafe.} = + + if not fast: + fast = true + await reader(2.seconds, stream) + else: + await reader(0.millis, stream) + + await stream.writeLp(hello) + await stream.close() + + await mplexListen.handle() + await mplexListen.close() + + let transport1: TcpTransport = TcpTransport.init() + let listenFut = await transport1.listen(ma, connHandler) + + let transport2: TcpTransport = TcpTransport.init() + let conn = await transport2.dial(transport1.ma) + + let mplexDial = Mplex.init(conn, chanSize = 2) + let mplexDialFut = mplexDial.handle() + let streamSlow = await mplexDial.newStream() + let streamFast = await mplexDial.newStream() + + let reads = await allFinished( + writer(streamSlow), + writer(streamFast)) + + check (not reads[0].error.isNil) + check reads[1].read == hello + + await streamSlow.close() + await streamFast.close() + + await allFuturesThrowing( + transport1.close(), + transport2.close()) + + waitFor(test())