diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 43ec0a21a..6fb6221d1 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -80,6 +80,14 @@ template withEOFExceptions(body: untyped): untyped = except LPStreamIncompleteError as exc: trace "incomplete message", exc = exc.msg +proc cleanupTimer(s: LPChannel) {.async.} = + ## cleanup timers + ## + if not(isNil(s.timerFut)) and + not(s.timerFut.finished): + s.timerFut.cancel() + await s.timerTaskFut + proc closeMessage(s: LPChannel) {.async.} = logScope: id = s.id @@ -146,6 +154,8 @@ proc closeRemote*(s: LPChannel) {.async.} = # call to avoid leaks await procCall BufferStream(s).close() # close parent bufferstream + await s.cleanupTimer() + trace "channel closed on EOF" except CancelledError as exc: raise exc @@ -187,6 +197,8 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} = s.isEof = true s.closedLocal = true + await s.cleanupTimer() + except CancelledError as exc: raise exc except CatchableError as exc: @@ -214,6 +226,7 @@ method close*(s: LPChannel) {.async, gcsafe.} = await s.closeMessage().wait(2.minutes) if s.atEof: # already closed by remote close parent buffer immediately await procCall BufferStream(s).close() + await s.cleanupTimer() except CancelledError as exc: await s.reset() raise exc @@ -226,18 +239,6 @@ method close*(s: LPChannel) {.async, gcsafe.} = s.closedLocal = true asyncCheck closeInternal() -proc cleanupOnClose(s: LPChannel) {.async.} = - ## await this stream's close event - ## to cleanup timers and other resources - ## - - await s.closeEvent.wait() - - if not(isNil(s.timerFut)) and - not(s.timerFut.finished): - s.timerFut.cancel() - await s.timerTaskFut - proc timeoutMonitor(s: LPChannel) {.async.} = ## monitor the channel for innactivity ## @@ -337,10 +338,6 @@ proc init*( when chronicles.enabledLogLevel == LogLevel.TRACE: chann.name = if chann.name.len > 0: chann.name else: $chann.oid - # launch task to cancel and cleanup - # timer on stream close - asyncCheck chann.cleanupOnClose() - chann.timerTaskFut = chann.timeoutMonitor() trace "created new lpchannel"