diff --git a/libp2p/errors.nim b/libp2p/errors.nim index ed541fb..0292641 100644 --- a/libp2p/errors.nim +++ b/libp2p/errors.nim @@ -19,8 +19,8 @@ macro checkFutures*[T](futs: seq[Future[T]], exclude: untyped = []): untyped = if res.failed: let exc = res.readError() # We still don't abort but warn - warn "A future has failed, enable trace logging for details", error=exc.name - trace "Exception message", msg=exc.msg + warn "A future has failed, enable trace logging for details", error = exc.name + trace "Exception message", msg= exc.msg, stack = getStackTrace() else: quote do: for res in `futs`: diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index abe7faf..33b6b96 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -46,8 +46,6 @@ logScope: type LPChannel* = ref object of BufferStream id*: uint64 # channel id - timeout: Duration # channel timeout if no activity - activity: bool # reset every time data is sent or received name*: string # name of the channel (for debugging) conn*: Connection # wrapped connection used to for writing initiator*: bool # initiated remotely or locally flag @@ -57,7 +55,6 @@ type msgCode*: MessageType # cached in/out message code closeCode*: MessageType # cached in/out close code resetCode*: MessageType # cached in/out reset code - timerTaskFut: Future[void] # the current timer instanse proc open*(s: LPChannel) {.async, gcsafe.} @@ -79,11 +76,6 @@ template withEOFExceptions(body: untyped): untyped = except LPStreamIncompleteError as exc: trace "incomplete message", exc = exc.msg -proc cleanupTimer(s: LPChannel) {.async.} = - ## cleanup timers - if not isNil(s.timerTaskFut) and not s.timerTaskFut.finished: - s.timerTaskFut.cancel() - proc closeMessage(s: LPChannel) {.async.} = logScope: id = s.id @@ -150,7 +142,6 @@ proc closeRemote*(s: LPChannel) {.async.} = # call to avoid leaks await procCall BufferStream(s).close() # close parent bufferstream - await s.cleanupTimer() trace "channel closed on EOF" except CancelledError as exc: @@ -193,8 +184,6 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} = s.isEof = true s.closedLocal = true - await s.cleanupTimer() - except CancelledError as exc: raise exc except CatchableError as exc: @@ -222,7 +211,6 @@ method close*(s: LPChannel) {.async, gcsafe.} = await s.closeMessage().wait(2.minutes) if s.atEof: # already closed by remote close parent buffer immediately await procCall BufferStream(s).close() - await s.cleanupTimer() except CancelledError as exc: await s.reset() raise exc @@ -235,60 +223,16 @@ method close*(s: LPChannel) {.async, gcsafe.} = s.closedLocal = true asyncCheck closeInternal() -proc timeoutMonitor(s: LPChannel) {.async.} = - ## monitor the channel for innactivity - ## - ## if the timeout was hit, it means that - ## neither incoming nor outgoing activity - ## has been detected and the channel will - ## be reset - ## - - logScope: - id = s.id - initiator = s.initiator - name = s.name - oid = $s.oid - peer = $s.conn.peerInfo - - try: - while true: - await sleepAsync(s.timeout) - - if s.closed or s.atEof: - return - - if s.activity: - s.activity = false - continue - - break - - # reset channel on innactivity timeout - trace "channel timed out, resetting" - await s.reset() - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "exception in timeout", exc = exc.msg - method initStream*(s: LPChannel) = if s.objName.len == 0: s.objName = "LPChannel" + s.timeoutHandler = proc() {.async, gcsafe.} = + trace "idle timeout expired, resetting LPChannel" + await s.reset() + procCall BufferStream(s).initStream() -method readOnce*(s: LPChannel, - pbytes: pointer, - nbytes: int): - Future[int] = - s.activity = true - procCall BufferStream(s).readOnce(pbytes, nbytes) - -method write*(s: LPChannel, msg: seq[byte]): Future[void] = - s.activity = true - procCall BufferStream(s).write(msg) - proc init*( L: type LPChannel, id: uint64, @@ -339,7 +283,6 @@ proc init*( when chronicles.enabledLogLevel == LogLevel.TRACE: chann.name = if chann.name.len > 0: chann.name else: $chann.oid - chann.timerTaskFut = chann.timeoutMonitor() trace "created new lpchannel" return chann diff --git a/libp2p/muxers/muxer.nim b/libp2p/muxers/muxer.nim index a225766..baa76a2 100644 --- a/libp2p/muxers/muxer.nim +++ b/libp2p/muxers/muxer.nim @@ -16,7 +16,7 @@ logScope: topics = "muxer" const - DefaultChanTimeout* = 1.minutes + DefaultChanTimeout* = 5.minutes type StreamHandler* = proc(conn: Connection): Future[void] {.gcsafe.} diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index 06d289d..71562b8 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -220,6 +220,8 @@ method readOnce*(s: BufferStream, var index = 0 var size = min(nbytes, s.len) let output = cast[ptr UncheckedArray[byte]](pbytes) + + s.activity = true # reset activity flag while s.len() > 0 and index < size: output[index] = s.popFirst() inc(index) @@ -243,6 +245,7 @@ method write*(s: BufferStream, msg: seq[byte]) {.async.} = if isNil(s.writeHandler): raise newNotWritableError() + s.activity = true # reset activity flag await s.writeHandler(msg) # TODO: move pipe routines out diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index 33c17b3..b536c9a 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -14,18 +14,28 @@ import connection logScope: topics = "chronosstream" -type ChronosStream* = ref object of Connection +const + DefaultChronosStreamTimeout = 10.minutes + +type + ChronosStream* = ref object of Connection client: StreamTransport method initStream*(s: ChronosStream) = if s.objName.len == 0: s.objName = "ChronosStream" + s.timeoutHandler = proc() {.async, gcsafe.} = + trace "idle timeout expired, closing ChronosStream" + await s.close() + procCall Connection(s).initStream() -proc newChronosStream*(client: StreamTransport): ChronosStream = - new result - result.client = client +proc init*(C: type ChronosStream, + client: StreamTransport, + timeout = DefaultChronosStreamTimeout): ChronosStream = + result = C(client: client, + timeout: timeout) result.initStream() template withExceptions(body: untyped) = @@ -48,6 +58,7 @@ method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {. withExceptions: result = await s.client.readOnce(pbytes, nbytes) + s.activity = true # reset activity flag method write*(s: ChronosStream, msg: seq[byte]) {.async.} = if s.closed: @@ -60,6 +71,7 @@ method write*(s: ChronosStream, msg: seq[byte]) {.async.} = var written = 0 while not s.client.closed and written < msg.len: written += await s.client.write(msg[written..