diff --git a/libp2p/stream/connection.nim b/libp2p/stream/connection.nim index 0acdeeb..f83bdaf 100644 --- a/libp2p/stream/connection.nim +++ b/libp2p/stream/connection.nim @@ -63,6 +63,7 @@ method closeImpl*(s: Connection): Future[void] = trace "Closing connection", s if not isNil(s.timerTaskFut) and not s.timerTaskFut.finished: s.timerTaskFut.cancel() + s.timerTaskFut = nil trace "Closed connection", s @@ -71,6 +72,32 @@ method closeImpl*(s: Connection): Future[void] = func hash*(p: Connection): Hash = cast[pointer](p).hash +proc pollActivity(s: Connection): Future[bool] {.async.} = + if s.closed and s.atEof: + return false # Done, no more monitoring + + if s.activity: + s.activity = false + return true + + # Inactivity timeout happened, call timeout monitor + + trace "Connection timed out", s + if not(isNil(s.timeoutHandler)): + trace "Calling timeout handler", s + + try: + await s.timeoutHandler() + except CancelledError: + # timeoutHandler is expected to be fast, but it's still possible that + # cancellation will happen here - no need to warn about it - we do want to + # stop the polling however + debug "Timeout handler cancelled", s + except CatchableError as exc: # Shouldn't happen + warn "exception in timeout handler", s, exc = exc.msg + + return false + proc timeoutMonitor(s: Connection) {.async, gcsafe.} = ## monitor the channel for inactivity ## @@ -80,31 +107,14 @@ proc timeoutMonitor(s: Connection) {.async, gcsafe.} = ## be reset ## - try: - while true: + while true: + try: # Sleep at least once! await sleepAsync(s.timeout) + except CancelledError: + return - if s.closed and s.atEof: - return - - if s.activity: - s.activity = false - continue - - break - - # reset channel on inactivity timeout - trace "Connection timed out", s - if not(isNil(s.timeoutHandler)): - trace "Calling timeout handler", s - await s.timeoutHandler() - - except CancelledError as exc: - raise exc - except CatchableError as exc: # Shouldn't happen - warn "exception in timeout", s, exc = exc.msg - finally: - s.timerTaskFut = nil + if not await s.pollActivity(): + return proc init*(C: type Connection, peerInfo: PeerInfo,