diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 3645fc6..9d676a6 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -23,6 +23,8 @@ import ../muxer, logScope: topic = "Mplex" +const HandleTimeout = 30.seconds + type Mplex* = ref object of Muxer remote*: Table[uint, LPChannel] @@ -59,15 +61,20 @@ proc cleanupChann(m: Mplex, chann: LPChannel, initiator: bool) {.async, inline.} m.getChannelList(initiator).del(chann.id) trace "cleaned up channel", id = chann.id +proc messageTimeout(t: Duration): Future[Option[Msg]] {.async, inline.} = + await sleepAsync(t) + return Msg.none + method handle*(m: Mplex) {.async, gcsafe.} = trace "starting mplex main loop" try: while not m.connection.closed: trace "waiting for data" - let msg = await m.connection.readMsg() + let + res = await one(m.connection.readMsg(), messageTimeout(HandleTimeout)) + msg = res.read() if msg.isNone: trace "connection EOF" - # TODO: allow poll with timeout to avoid using `sleepAsync` await sleepAsync(1.millis) continue