mplex fixes (#356)

* close the right connection when channel send fails
* don't crash on channel id that is not unique
This commit is contained in:
Jacek Sieka 2020-09-08 08:24:28 +02:00 committed by GitHub
parent 2b72d485a3
commit 82c179db9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 32 additions and 23 deletions

View File

@ -76,11 +76,12 @@ proc contains*(c: ConnManager, muxer: Muxer): bool =
return muxer == c.muxed[conn].muxer return muxer == c.muxed[conn].muxer
proc closeMuxerHolder(muxerHolder: MuxerHolder) {.async.} = proc closeMuxerHolder(muxerHolder: MuxerHolder) {.async.} =
trace "cleaning up muxer for peer" trace "Cleaning up muxer", m = muxerHolder.muxer
await muxerHolder.muxer.close() await muxerHolder.muxer.close()
if not(isNil(muxerHolder.handle)): if not(isNil(muxerHolder.handle)):
await muxerHolder.handle # TODO noraises? await muxerHolder.handle # TODO noraises?
trace "Cleaned up muxer", m = muxerHolder.muxer
proc delConn(c: ConnManager, conn: Connection) = proc delConn(c: ConnManager, conn: Connection) =
let peerId = conn.peerInfo.peerId let peerId = conn.peerInfo.peerId
@ -91,6 +92,8 @@ proc delConn(c: ConnManager, conn: Connection) =
c.conns.del(peerId) c.conns.del(peerId)
libp2p_peers.set(c.conns.len.int64) libp2p_peers.set(c.conns.len.int64)
trace "Removed connection", conn
proc cleanupConn(c: ConnManager, conn: Connection) {.async.} = proc cleanupConn(c: ConnManager, conn: Connection) {.async.} =
## clean connection's resources such as muxers and streams ## clean connection's resources such as muxers and streams
@ -113,7 +116,7 @@ proc cleanupConn(c: ConnManager, conn: Connection) {.async.} =
finally: finally:
await conn.close() await conn.close()
trace "connection cleaned up", conn trace "Connection cleaned up", conn
proc onClose(c: ConnManager, conn: Connection) {.async.} = proc onClose(c: ConnManager, conn: Connection) {.async.} =
## connection close even handler ## connection close even handler
@ -122,15 +125,15 @@ proc onClose(c: ConnManager, conn: Connection) {.async.} =
## ##
try: try:
await conn.join() await conn.join()
trace "triggering connection cleanup", conn trace "Connection closed, cleaning up", conn
await c.cleanupConn(conn) await c.cleanupConn(conn)
except CancelledError: except CancelledError:
# This is top-level procedure which will work as separate task, so it # This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError. # do not need to propogate CancelledError.
trace "Unexpected cancellation in connection manager's cleanup" debug "Unexpected cancellation in connection manager's cleanup", conn
except CatchableError as exc: except CatchableError as exc:
trace "Unexpected exception in connection manager's cleanup", debug "Unexpected exception in connection manager's cleanup",
errMsg = exc.msg errMsg = exc.msg, conn
proc selectConn*(c: ConnManager, proc selectConn*(c: ConnManager,
peerId: PeerID, peerId: PeerID,
@ -181,7 +184,7 @@ proc storeConn*(c: ConnManager, conn: Connection) =
let peerId = conn.peerInfo.peerId let peerId = conn.peerInfo.peerId
if c.conns.getOrDefault(peerId).len > c.maxConns: if c.conns.getOrDefault(peerId).len > c.maxConns:
trace "too many connections", peer = $peerId, debug "too many connections", peer = conn,
conns = c.conns.getOrDefault(peerId).len conns = c.conns.getOrDefault(peerId).len
raise newTooManyConnections() raise newTooManyConnections()
@ -196,7 +199,8 @@ proc storeConn*(c: ConnManager, conn: Connection) =
asyncSpawn c.onClose(conn) asyncSpawn c.onClose(conn)
libp2p_peers.set(c.conns.len.int64) libp2p_peers.set(c.conns.len.int64)
trace "stored connection", connections = c.conns.len, conn trace "Stored connection",
connections = c.conns.len, conn, direction = $conn.dir
proc storeOutgoing*(c: ConnManager, conn: Connection) = proc storeOutgoing*(c: ConnManager, conn: Connection) =
conn.dir = Direction.Out conn.dir = Direction.Out
@ -222,7 +226,7 @@ proc storeMuxer*(c: ConnManager,
muxer: muxer, muxer: muxer,
handle: handle) handle: handle)
trace "stored muxer", connections = c.conns.len, muxer trace "Stored muxer", connections = c.conns.len, muxer
proc getMuxedStream*(c: ConnManager, proc getMuxedStream*(c: ConnManager,
peerId: PeerID, peerId: PeerID,
@ -256,8 +260,10 @@ proc getMuxedStream*(c: ConnManager,
proc dropPeer*(c: ConnManager, peerId: PeerID) {.async.} = proc dropPeer*(c: ConnManager, peerId: PeerID) {.async.} =
## drop connections and cleanup resources for peer ## drop connections and cleanup resources for peer
## ##
trace "Dropping peer", peerId
let conns = c.conns.getOrDefault(peerId) let conns = c.conns.getOrDefault(peerId)
for conn in conns: for conn in conns:
trace "Removing connection", conn
delConn(c, conn) delConn(c, conn)
var muxers: seq[MuxerHolder] var muxers: seq[MuxerHolder]
@ -271,6 +277,7 @@ proc dropPeer*(c: ConnManager, peerId: PeerID) {.async.} =
for conn in conns: for conn in conns:
await conn.close() await conn.close()
trace "Dropped peer", peerId
proc close*(c: ConnManager) {.async.} = proc close*(c: ConnManager) {.async.} =
## cleanup resources for the connection ## cleanup resources for the connection

View File

@ -90,7 +90,7 @@ proc resetMessage(s: LPChannel) {.async.} =
except CancelledError: except CancelledError:
# This procedure is called from one place and never awaited, so there no # This procedure is called from one place and never awaited, so there no
# need to re-raise CancelledError. # need to re-raise CancelledError.
trace "Unexpected cancellation while resetting channel" debug "Unexpected cancellation while resetting channel", s
except LPStreamEOFError as exc: except LPStreamEOFError as exc:
trace "muxed connection EOF", exc = exc.msg, s trace "muxed connection EOF", exc = exc.msg, s
except LPStreamClosedError as exc: except LPStreamClosedError as exc:
@ -217,14 +217,14 @@ proc init*(
await chann.open() await chann.open()
# writes should happen in sequence # writes should happen in sequence
trace "sending data", len = data.len, chann trace "sending data", len = data.len, conn, chann
await conn.writeMsg(chann.id, await conn.writeMsg(chann.id,
chann.msgCode, chann.msgCode,
data) data)
except CatchableError as exc: except CatchableError as exc:
trace "exception in lpchannel write handler", exc = exc.msg, chann trace "exception in lpchannel write handler", exc = exc.msg, chann
await chann.reset() asyncSpawn conn.close()
raise exc raise exc
chann.initBufferStream(writeHandler, size) chann.initBufferStream(writeHandler, size)

View File

@ -32,6 +32,7 @@ when defined(libp2p_expensive_metrics):
type type
TooManyChannels* = object of CatchableError TooManyChannels* = object of CatchableError
InvalidChannelIdError* = object of CatchableError
Mplex* = ref object of Muxer Mplex* = ref object of Muxer
channels: array[bool, Table[uint64, LPChannel]] channels: array[bool, Table[uint64, LPChannel]]
@ -48,13 +49,16 @@ chronicles.formatIt(Mplex): shortLog(it)
proc newTooManyChannels(): ref TooManyChannels = proc newTooManyChannels(): ref TooManyChannels =
newException(TooManyChannels, "max allowed channel count exceeded") newException(TooManyChannels, "max allowed channel count exceeded")
proc newInvalidChannelIdError(): ref InvalidChannelIdError =
newException(InvalidChannelIdError, "max allowed channel count exceeded")
proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} = proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} =
## remove the local channel from the internal tables ## remove the local channel from the internal tables
## ##
try: try:
await chann.join() await chann.join()
m.channels[chann.initiator].del(chann.id) m.channels[chann.initiator].del(chann.id)
trace "cleaned up channel", m, chann debug "cleaned up channel", m, chann
when defined(libp2p_expensive_metrics): when defined(libp2p_expensive_metrics):
libp2p_mplex_channels.set( libp2p_mplex_channels.set(
@ -63,10 +67,10 @@ proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} =
except CancelledError: except CancelledError:
# This is top-level procedure which will work as separate task, so it # This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError. # do not need to propogate CancelledError.
trace "Unexpected cancellation in mplex channel cleanup", debug "Unexpected cancellation in mplex channel cleanup",
m, chann m, chann
except CatchableError as exc: except CatchableError as exc:
trace "error cleaning up mplex channel", exc = exc.msg, m, chann debug "error cleaning up mplex channel", exc = exc.msg, m, chann
proc newStreamInternal*(m: Mplex, proc newStreamInternal*(m: Mplex,
initiator: bool = true, initiator: bool = true,
@ -81,10 +85,9 @@ proc newStreamInternal*(m: Mplex,
m.currentId.inc(); m.currentId m.currentId.inc(); m.currentId
else: chanId else: chanId
trace "creating new channel", id, if id in m.channels[initiator]:
initiator, raise newInvalidChannelIdError()
name,
m
result = LPChannel.init( result = LPChannel.init(
id, id,
m.connection, m.connection,
@ -96,8 +99,7 @@ proc newStreamInternal*(m: Mplex,
result.peerInfo = m.connection.peerInfo result.peerInfo = m.connection.peerInfo
result.observedAddr = m.connection.observedAddr result.observedAddr = m.connection.observedAddr
doAssert(id notin m.channels[initiator], trace "Creating new channel", id, initiator, name, m, channel = result
"channel slot already taken!")
m.channels[initiator][id] = result m.channels[initiator][id] = result
@ -127,7 +129,7 @@ proc handleStream(m: Mplex, chann: LPChannel) {.async.} =
await chann.reset() await chann.reset()
method handle*(m: Mplex) {.async, gcsafe.} = method handle*(m: Mplex) {.async, gcsafe.} =
trace "starting mplex main loop", m, peer = m.connection.peerInfo.peerId trace "Starting mplex main loop", m
try: try:
while not m.connection.atEof: while not m.connection.atEof:
trace "waiting for data", m trace "waiting for data", m
@ -186,7 +188,7 @@ method handle*(m: Mplex) {.async, gcsafe.} =
except CancelledError: except CancelledError:
# This procedure is spawned as task and it is not part of public API, so # This procedure is spawned as task and it is not part of public API, so
# there no way for this procedure to be cancelled implicitely. # there no way for this procedure to be cancelled implicitely.
trace "Unexpected cancellation in mplex handler" debug "Unexpected cancellation in mplex handler", m
except CatchableError as exc: except CatchableError as exc:
trace "Exception occurred", exception = exc.msg, m trace "Exception occurred", exception = exc.msg, m
finally: finally: