default channel size and pushTo timeout

This commit is contained in:
Dmitriy Ryajov 2020-08-20 20:41:02 -06:00
parent b9d03cf91b
commit 90ac1d21de
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
2 changed files with 98 additions and 16 deletions

View File

@ -25,6 +25,7 @@ logScope:
const
MaxChannelCount = 200
DefaultPushTimeout = 1.seconds
when defined(libp2p_expensive_metrics):
declareGauge(libp2p_mplex_channels,
@ -38,9 +39,11 @@ type
currentId: uint64
inChannTimeout: Duration
outChannTimeout: Duration
pushTimeout: Duration
isClosed: bool
oid*: Oid
maxChannCount: int
maxChanCount: int
chanSize: int
proc newTooManyChannels(): ref TooManyChannels =
newException(TooManyChannels, "max allowed channel count exceeded")
@ -80,7 +83,8 @@ proc newStreamInternal*(m: Mplex,
initiator,
name,
lazy = lazy,
timeout = timeout)
timeout = timeout,
size = m.chanSize)
result.peerInfo = m.connection.peerInfo
result.observedAddr = m.connection.observedAddr
@ -113,7 +117,8 @@ proc handleStream(m: Mplex, chann: LPChannel) {.async.} =
await chann.reset()
method handle*(m: Mplex) {.async, gcsafe.} =
logScope: moid = $m.oid
logScope:
mplexOid = $m.oid
trace "starting mplex main loop"
try:
@ -130,27 +135,26 @@ method handle*(m: Mplex) {.async, gcsafe.} =
logScope:
id = id
initiator = initiator
msgType = msgType
msgType = $msgType
size = data.len
trace "read message from connection", data = data.shortLog
var channel =
if MessageType(msgType) != MessageType.New:
let tmp = m.channels[initiator].getOrDefault(id, nil)
if tmp == nil:
trace "Channel not found, skipping"
continue
tmp
m.channels[initiator].getOrDefault(id, nil)
else:
if m.channels[false].len > m.maxChannCount - 1:
if m.channels[false].len > m.maxChanCount - 1:
warn "too many channels created by remote peer", allowedMax = MaxChannelCount
raise newTooManyChannels()
let name = string.fromBytes(data)
m.newStreamInternal(false, id, name, timeout = m.outChannTimeout)
if channel == nil:
trace "Channel not found, skipping"
continue
logScope:
name = channel.name
oid = $channel.oid
@ -168,9 +172,18 @@ method handle*(m: Mplex) {.async, gcsafe.} =
warn "attempting to send a packet larger than allowed", allowed = MaxMsgSize
raise newLPStreamLimitError()
trace "pushing data to channel"
await channel.pushTo(data)
trace "pushed data to channel"
try:
trace "pushing data to channel"
# The timeout on the pushTo bellow is to
# prevent slow readers from blocking the
# read loop and thus other readers that
# are wating for data.
await channel.pushTo(data).wait(m.pushTimeout)
trace "pushed data to channel"
except AsyncTimeoutError:
debug "slow reader detected, resetting channel"
await channel.reset()
continue
of MessageType.CloseIn, MessageType.CloseOut:
trace "closing channel"
@ -188,12 +201,16 @@ method handle*(m: Mplex) {.async, gcsafe.} =
proc init*(M: type Mplex,
conn: Connection,
inTimeout, outTimeout: Duration = DefaultChanTimeout,
maxChannCount: int = MaxChannelCount): Mplex =
maxChanCount: int = MaxChannelCount,
chanSize: int = DefaultChannelSize,
pushTimeout: Duration = DefaultPushTimeout): Mplex =
M(connection: conn,
inChannTimeout: inTimeout,
outChannTimeout: outTimeout,
pushTimeout: pushTimeout,
oid: genOid(),
maxChannCount: maxChannCount)
maxChanCount: maxChanCount,
chanSize: chanSize)
method newStream*(m: Mplex,
name: string = "",

View File

@ -673,3 +673,68 @@ suite "Mplex":
await listenFut
waitFor(test())
test "e2e - slow channels should reset":
proc test() {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let hello = "HELLO".toBytes
var done = newFuture[void]()
var fast = false
proc reader(wait: Duration, stream: Connection) {.async.} =
try:
var read: seq[byte]
await sleepAsync(wait)
read.add((await stream.readLp(hello.len))) # read byte by byte
check read == hello
except CatchableError as exc:
# echo exc.msg
discard
proc writer(stream: Connection): Future[seq[byte]] {.async.} =
await stream.writeLp(hello)
return await stream.readLp(hello.len)
proc connHandler(conn: Connection) {.async, gcsafe.} =
let mplexListen = Mplex.init(conn, chanSize = 2, pushTimeout = 10.millis)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
if not fast:
fast = true
await reader(2.seconds, stream)
else:
await reader(0.millis, stream)
await stream.writeLp(hello)
await stream.close()
await mplexListen.handle()
await mplexListen.close()
let transport1: TcpTransport = TcpTransport.init()
let listenFut = await transport1.listen(ma, connHandler)
let transport2: TcpTransport = TcpTransport.init()
let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn, chanSize = 2)
let mplexDialFut = mplexDial.handle()
let streamSlow = await mplexDial.newStream()
let streamFast = await mplexDial.newStream()
let reads = await allFinished(
writer(streamSlow),
writer(streamFast))
check (not reads[0].error.isNil)
check reads[1].read == hello
await streamSlow.close()
await streamFast.close()
await allFuturesThrowing(
transport1.close(),
transport2.close())
waitFor(test())