Moving idle timeout to Connection to enable across all connection streams (#307)

* move idle timeout logic to connection

* more informative logs

* more informative logs
This commit is contained in:
Dmitriy Ryajov 2020-08-04 07:22:05 -06:00 committed by GitHub
parent 5f0637c49a
commit cf2b42b914
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 90 additions and 75 deletions

View File

@ -20,7 +20,7 @@ macro checkFutures*[T](futs: seq[Future[T]], exclude: untyped = []): untyped =
let exc = res.readError() let exc = res.readError()
# We still don't abort but warn # We still don't abort but warn
warn "A future has failed, enable trace logging for details", error = exc.name warn "A future has failed, enable trace logging for details", error = exc.name
trace "Exception message", msg=exc.msg trace "Exception message", msg= exc.msg, stack = getStackTrace()
else: else:
quote do: quote do:
for res in `futs`: for res in `futs`:

View File

@ -46,8 +46,6 @@ logScope:
type type
LPChannel* = ref object of BufferStream LPChannel* = ref object of BufferStream
id*: uint64 # channel id id*: uint64 # channel id
timeout: Duration # channel timeout if no activity
activity: bool # reset every time data is sent or received
name*: string # name of the channel (for debugging) name*: string # name of the channel (for debugging)
conn*: Connection # wrapped connection used to for writing conn*: Connection # wrapped connection used to for writing
initiator*: bool # initiated remotely or locally flag initiator*: bool # initiated remotely or locally flag
@ -57,7 +55,6 @@ type
msgCode*: MessageType # cached in/out message code msgCode*: MessageType # cached in/out message code
closeCode*: MessageType # cached in/out close code closeCode*: MessageType # cached in/out close code
resetCode*: MessageType # cached in/out reset code resetCode*: MessageType # cached in/out reset code
timerTaskFut: Future[void] # the current timer instanse
proc open*(s: LPChannel) {.async, gcsafe.} proc open*(s: LPChannel) {.async, gcsafe.}
@ -79,11 +76,6 @@ template withEOFExceptions(body: untyped): untyped =
except LPStreamIncompleteError as exc: except LPStreamIncompleteError as exc:
trace "incomplete message", exc = exc.msg trace "incomplete message", exc = exc.msg
proc cleanupTimer(s: LPChannel) {.async.} =
## cleanup timers
if not isNil(s.timerTaskFut) and not s.timerTaskFut.finished:
s.timerTaskFut.cancel()
proc closeMessage(s: LPChannel) {.async.} = proc closeMessage(s: LPChannel) {.async.} =
logScope: logScope:
id = s.id id = s.id
@ -150,7 +142,6 @@ proc closeRemote*(s: LPChannel) {.async.} =
# call to avoid leaks # call to avoid leaks
await procCall BufferStream(s).close() # close parent bufferstream await procCall BufferStream(s).close() # close parent bufferstream
await s.cleanupTimer()
trace "channel closed on EOF" trace "channel closed on EOF"
except CancelledError as exc: except CancelledError as exc:
@ -193,8 +184,6 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} =
s.isEof = true s.isEof = true
s.closedLocal = true s.closedLocal = true
await s.cleanupTimer()
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
@ -222,7 +211,6 @@ method close*(s: LPChannel) {.async, gcsafe.} =
await s.closeMessage().wait(2.minutes) await s.closeMessage().wait(2.minutes)
if s.atEof: # already closed by remote close parent buffer immediately if s.atEof: # already closed by remote close parent buffer immediately
await procCall BufferStream(s).close() await procCall BufferStream(s).close()
await s.cleanupTimer()
except CancelledError as exc: except CancelledError as exc:
await s.reset() await s.reset()
raise exc raise exc
@ -235,60 +223,16 @@ method close*(s: LPChannel) {.async, gcsafe.} =
s.closedLocal = true s.closedLocal = true
asyncCheck closeInternal() asyncCheck closeInternal()
proc timeoutMonitor(s: LPChannel) {.async.} =
## monitor the channel for innactivity
##
## if the timeout was hit, it means that
## neither incoming nor outgoing activity
## has been detected and the channel will
## be reset
##
logScope:
id = s.id
initiator = s.initiator
name = s.name
oid = $s.oid
peer = $s.conn.peerInfo
try:
while true:
await sleepAsync(s.timeout)
if s.closed or s.atEof:
return
if s.activity:
s.activity = false
continue
break
# reset channel on innactivity timeout
trace "channel timed out, resetting"
await s.reset()
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in timeout", exc = exc.msg
method initStream*(s: LPChannel) = method initStream*(s: LPChannel) =
if s.objName.len == 0: if s.objName.len == 0:
s.objName = "LPChannel" s.objName = "LPChannel"
s.timeoutHandler = proc() {.async, gcsafe.} =
trace "idle timeout expired, resetting LPChannel"
await s.reset()
procCall BufferStream(s).initStream() procCall BufferStream(s).initStream()
method readOnce*(s: LPChannel,
pbytes: pointer,
nbytes: int):
Future[int] =
s.activity = true
procCall BufferStream(s).readOnce(pbytes, nbytes)
method write*(s: LPChannel, msg: seq[byte]): Future[void] =
s.activity = true
procCall BufferStream(s).write(msg)
proc init*( proc init*(
L: type LPChannel, L: type LPChannel,
id: uint64, id: uint64,
@ -339,7 +283,6 @@ proc init*(
when chronicles.enabledLogLevel == LogLevel.TRACE: when chronicles.enabledLogLevel == LogLevel.TRACE:
chann.name = if chann.name.len > 0: chann.name else: $chann.oid chann.name = if chann.name.len > 0: chann.name else: $chann.oid
chann.timerTaskFut = chann.timeoutMonitor()
trace "created new lpchannel" trace "created new lpchannel"
return chann return chann

View File

@ -16,7 +16,7 @@ logScope:
topics = "muxer" topics = "muxer"
const const
DefaultChanTimeout* = 1.minutes DefaultChanTimeout* = 5.minutes
type type
StreamHandler* = proc(conn: Connection): Future[void] {.gcsafe.} StreamHandler* = proc(conn: Connection): Future[void] {.gcsafe.}

View File

@ -220,6 +220,8 @@ method readOnce*(s: BufferStream,
var index = 0 var index = 0
var size = min(nbytes, s.len) var size = min(nbytes, s.len)
let output = cast[ptr UncheckedArray[byte]](pbytes) let output = cast[ptr UncheckedArray[byte]](pbytes)
s.activity = true # reset activity flag
while s.len() > 0 and index < size: while s.len() > 0 and index < size:
output[index] = s.popFirst() output[index] = s.popFirst()
inc(index) inc(index)
@ -243,6 +245,7 @@ method write*(s: BufferStream, msg: seq[byte]) {.async.} =
if isNil(s.writeHandler): if isNil(s.writeHandler):
raise newNotWritableError() raise newNotWritableError()
s.activity = true # reset activity flag
await s.writeHandler(msg) await s.writeHandler(msg)
# TODO: move pipe routines out # TODO: move pipe routines out

View File

@ -14,18 +14,28 @@ import connection
logScope: logScope:
topics = "chronosstream" topics = "chronosstream"
type ChronosStream* = ref object of Connection const
DefaultChronosStreamTimeout = 10.minutes
type
ChronosStream* = ref object of Connection
client: StreamTransport client: StreamTransport
method initStream*(s: ChronosStream) = method initStream*(s: ChronosStream) =
if s.objName.len == 0: if s.objName.len == 0:
s.objName = "ChronosStream" s.objName = "ChronosStream"
s.timeoutHandler = proc() {.async, gcsafe.} =
trace "idle timeout expired, closing ChronosStream"
await s.close()
procCall Connection(s).initStream() procCall Connection(s).initStream()
proc newChronosStream*(client: StreamTransport): ChronosStream = proc init*(C: type ChronosStream,
new result client: StreamTransport,
result.client = client timeout = DefaultChronosStreamTimeout): ChronosStream =
result = C(client: client,
timeout: timeout)
result.initStream() result.initStream()
template withExceptions(body: untyped) = template withExceptions(body: untyped) =
@ -48,6 +58,7 @@ method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.
withExceptions: withExceptions:
result = await s.client.readOnce(pbytes, nbytes) result = await s.client.readOnce(pbytes, nbytes)
s.activity = true # reset activity flag
method write*(s: ChronosStream, msg: seq[byte]) {.async.} = method write*(s: ChronosStream, msg: seq[byte]) {.async.} =
if s.closed: if s.closed:
@ -60,6 +71,7 @@ method write*(s: ChronosStream, msg: seq[byte]) {.async.} =
var written = 0 var written = 0
while not s.client.closed and written < msg.len: while not s.client.closed and written < msg.len:
written += await s.client.write(msg[written..<msg.len]) written += await s.client.write(msg[written..<msg.len])
s.activity = true # reset activity flag
if written < msg.len: if written < msg.len:
raise (ref LPStreamClosedError)(msg: "Write couldn't finish writing") raise (ref LPStreamClosedError)(msg: "Write couldn't finish writing")

View File

@ -20,12 +20,19 @@ logScope:
const const
ConnectionTrackerName* = "libp2p.connection" ConnectionTrackerName* = "libp2p.connection"
DefaultConnectionTimeout* = 1.minutes
type type
TimeoutHandler* = proc(): Future[void] {.gcsafe.}
Direction* {.pure.} = enum Direction* {.pure.} = enum
None, In, Out None, In, Out
Connection* = ref object of LPStream Connection* = ref object of LPStream
activity*: bool # reset every time data is sent or received
timeout*: Duration # channel timeout if no activity
timerTaskFut: Future[void] # the current timer instanse
timeoutHandler*: TimeoutHandler # timeout handler
peerInfo*: PeerInfo peerInfo*: PeerInfo
observedAddr*: Multiaddress observedAddr*: Multiaddress
dir*: Direction dir*: Direction
@ -35,6 +42,7 @@ type
closed*: uint64 closed*: uint64
proc setupConnectionTracker(): ConnectionTracker {.gcsafe.} proc setupConnectionTracker(): ConnectionTracker {.gcsafe.}
proc timeoutMonitor(s: Connection) {.async, gcsafe.}
proc getConnectionTracker*(): ConnectionTracker {.gcsafe.} = proc getConnectionTracker*(): ConnectionTracker {.gcsafe.} =
result = cast[ConnectionTracker](getTracker(ConnectionTrackerName)) result = cast[ConnectionTracker](getTracker(ConnectionTrackerName))
@ -58,21 +66,23 @@ proc setupConnectionTracker(): ConnectionTracker =
result.isLeaked = leakTransport result.isLeaked = leakTransport
addTracker(ConnectionTrackerName, result) addTracker(ConnectionTrackerName, result)
proc init*(C: type Connection,
peerInfo: PeerInfo,
dir: Direction): Connection =
result = C(peerInfo: peerInfo, dir: dir)
result.initStream()
method initStream*(s: Connection) = method initStream*(s: Connection) =
if s.objName.len == 0: if s.objName.len == 0:
s.objName = "Connection" s.objName = "Connection"
procCall LPStream(s).initStream() procCall LPStream(s).initStream()
s.closeEvent = newAsyncEvent() s.closeEvent = newAsyncEvent()
doAssert(isNil(s.timerTaskFut))
s.timerTaskFut = s.timeoutMonitor()
inc getConnectionTracker().opened inc getConnectionTracker().opened
method close*(s: Connection) {.async.} = method close*(s: Connection) {.async.} =
## cleanup timers
if not isNil(s.timerTaskFut) and not s.timerTaskFut.finished:
s.timerTaskFut.cancel()
if not s.isClosed: if not s.isClosed:
await procCall LPStream(s).close() await procCall LPStream(s).close()
inc getConnectionTracker().closed inc getConnectionTracker().closed
@ -83,3 +93,50 @@ proc `$`*(conn: Connection): string =
func hash*(p: Connection): Hash = func hash*(p: Connection): Hash =
cast[pointer](p).hash cast[pointer](p).hash
proc timeoutMonitor(s: Connection) {.async, gcsafe.} =
## monitor the channel for innactivity
##
## if the timeout was hit, it means that
## neither incoming nor outgoing activity
## has been detected and the channel will
## be reset
##
logScope:
oid = $s.oid
try:
while true:
await sleepAsync(s.timeout)
if s.closed or s.atEof:
return
if s.activity:
s.activity = false
continue
break
# reset channel on innactivity timeout
trace "Connection timed out"
if not(isNil(s.timeoutHandler)):
await s.timeoutHandler()
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in timeout", exc = exc.msg
proc init*(C: type Connection,
peerInfo: PeerInfo,
dir: Direction,
timeout: Duration = DefaultConnectionTimeout,
timeoutHandler: TimeoutHandler = nil): Connection =
result = C(peerInfo: peerInfo,
dir: dir,
timeout: timeout,
timeoutHandler: timeoutHandler)
result.initStream()

View File

@ -63,7 +63,7 @@ proc connHandler*(t: TcpTransport,
client: StreamTransport, client: StreamTransport,
initiator: bool): Connection = initiator: bool): Connection =
trace "handling connection", address = $client.remoteAddress trace "handling connection", address = $client.remoteAddress
let conn: Connection = Connection(newChronosStream(client)) let conn: Connection = Connection(ChronosStream.init(client))
conn.observedAddr = MultiAddress.init(client.remoteAddress).tryGet() conn.observedAddr = MultiAddress.init(client.remoteAddress).tryGet()
if not initiator: if not initiator:
if not isNil(t.handler): if not isNil(t.handler):