
154 lines
4.3 KiB

# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [].}
import std/[hashes, oids, strformat]
import stew/results
import chronicles, chronos, metrics
import lpstream,
export lpstream, peerinfo, errors, results
topics = "libp2p connection"
ConnectionTrackerName* = "Connection"
DefaultConnectionTimeout* = 5.minutes
TimeoutHandler* = proc(): Future[void] {.gcsafe, raises: [].}
Connection* = ref object of LPStream
activity*: bool # reset every time data is sent or received
timeout*: Duration # channel timeout if no activity
timerTaskFut: Future[void] # the current timer instance
timeoutHandler*: TimeoutHandler # timeout handler
peerId*: PeerId
observedAddr*: Opt[MultiAddress]
protocol*: string # protocol used by the connection, used as tag for metrics
transportDir*: Direction # The bottom level transport (generally the socket) direction
when defined(libp2p_agents_metrics):
shortAgent*: string
proc timeoutMonitor(s: Connection) {.async.}
func shortLog*(conn: Connection): string =
if conn.isNil: "Connection(nil)"
else: &"{shortLog(conn.peerId)}:{conn.oid}"
except ValueError as exc:
chronicles.formatIt(Connection): shortLog(it)
method initStream*(s: Connection) =
if s.objName.len == 0:
s.objName = ConnectionTrackerName
procCall LPStream(s).initStream()
if s.timeout > 0.millis:
trace "Monitoring for timeout", s, timeout = s.timeout
s.timerTaskFut = s.timeoutMonitor()
if isNil(s.timeoutHandler):
s.timeoutHandler = proc(): Future[void] =
trace "Idle timeout expired, closing connection", s
method closeImpl*(s: Connection): Future[void] =
# Cleanup timeout timer
trace "Closing connection", s
if not isNil(s.timerTaskFut) and not s.timerTaskFut.finished:
s.timerTaskFut = nil
trace "Closed connection", s
procCall LPStream(s).closeImpl()
func hash*(p: Connection): Hash =
proc pollActivity(s: Connection): Future[bool] {.async.} =
if s.closed and s.atEof:
return false # Done, no more monitoring
if s.activity:
s.activity = false
return true
# Inactivity timeout happened, call timeout monitor
trace "Connection timed out", s
if not(isNil(s.timeoutHandler)):
trace "Calling timeout handler", s
await s.timeoutHandler()
except CancelledError:
# timeoutHandler is expected to be fast, but it's still possible that
# cancellation will happen here - no need to warn about it - we do want to
# stop the polling however
debug "Timeout handler cancelled", s
except CatchableError as exc: # Shouldn't happen
warn "exception in timeout handler", s, exc = exc.msg
return false
proc timeoutMonitor(s: Connection) {.async.} =
## monitor the channel for inactivity
## if the timeout was hit, it means that
## neither incoming nor outgoing activity
## has been detected and the channel will
## be reset
while true:
try: # Sleep at least once!
await sleepAsync(s.timeout)
except CancelledError:
if not await s.pollActivity():
method getWrapped*(s: Connection): Connection {.base.} =
doAssert(false, "not implemented!")
when defined(libp2p_agents_metrics):
proc setShortAgent*(s: Connection, shortAgent: string) =
var conn = s
while not isNil(conn):
conn.shortAgent = shortAgent
conn = conn.getWrapped()
proc new*(C: type Connection,
peerId: PeerId,
dir: Direction,
observedAddr: Opt[MultiAddress],
timeout: Duration = DefaultConnectionTimeout,
timeoutHandler: TimeoutHandler = nil): Connection =
result = C(peerId: peerId,
dir: dir,
timeout: timeout,
timeoutHandler: timeoutHandler,
observedAddr: observedAddr)