nim-libp2p/libp2p/stream/connection.nim

154 lines
4.3 KiB
Nim

# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
{.push raises: [].}
import std/[hashes, oids, strformat]
import stew/results
import chronicles, chronos, metrics
import lpstream,
../multiaddress,
../peerinfo,
../errors
export lpstream, peerinfo, errors, results
logScope:
topics = "libp2p connection"
const
ConnectionTrackerName* = "Connection"
DefaultConnectionTimeout* = 5.minutes
type
TimeoutHandler* = proc(): Future[void] {.gcsafe, raises: [].}
Connection* = ref object of LPStream
activity*: bool # reset every time data is sent or received
timeout*: Duration # channel timeout if no activity
timerTaskFut: Future[void] # the current timer instance
timeoutHandler*: TimeoutHandler # timeout handler
peerId*: PeerId
observedAddr*: Opt[MultiAddress]
protocol*: string # protocol used by the connection, used as tag for metrics
transportDir*: Direction # The bottom level transport (generally the socket) direction
when defined(libp2p_agents_metrics):
shortAgent*: string
proc timeoutMonitor(s: Connection) {.async.}
func shortLog*(conn: Connection): string =
try:
if conn.isNil: "Connection(nil)"
else: &"{shortLog(conn.peerId)}:{conn.oid}"
except ValueError as exc:
raiseAssert(exc.msg)
chronicles.formatIt(Connection): shortLog(it)
method initStream*(s: Connection) =
if s.objName.len == 0:
s.objName = ConnectionTrackerName
procCall LPStream(s).initStream()
doAssert(isNil(s.timerTaskFut))
if s.timeout > 0.millis:
trace "Monitoring for timeout", s, timeout = s.timeout
s.timerTaskFut = s.timeoutMonitor()
if isNil(s.timeoutHandler):
s.timeoutHandler = proc(): Future[void] =
trace "Idle timeout expired, closing connection", s
s.close()
method closeImpl*(s: Connection): Future[void] =
# Cleanup timeout timer
trace "Closing connection", s
if not isNil(s.timerTaskFut) and not s.timerTaskFut.finished:
s.timerTaskFut.cancel()
s.timerTaskFut = nil
trace "Closed connection", s
procCall LPStream(s).closeImpl()
func hash*(p: Connection): Hash =
cast[pointer](p).hash
proc pollActivity(s: Connection): Future[bool] {.async.} =
if s.closed and s.atEof:
return false # Done, no more monitoring
if s.activity:
s.activity = false
return true
# Inactivity timeout happened, call timeout monitor
trace "Connection timed out", s
if not(isNil(s.timeoutHandler)):
trace "Calling timeout handler", s
try:
await s.timeoutHandler()
except CancelledError:
# timeoutHandler is expected to be fast, but it's still possible that
# cancellation will happen here - no need to warn about it - we do want to
# stop the polling however
debug "Timeout handler cancelled", s
except CatchableError as exc: # Shouldn't happen
warn "exception in timeout handler", s, exc = exc.msg
return false
proc timeoutMonitor(s: Connection) {.async.} =
## monitor the channel for inactivity
##
## if the timeout was hit, it means that
## neither incoming nor outgoing activity
## has been detected and the channel will
## be reset
##
while true:
try: # Sleep at least once!
await sleepAsync(s.timeout)
except CancelledError:
return
if not await s.pollActivity():
return
method getWrapped*(s: Connection): Connection {.base.} =
doAssert(false, "not implemented!")
when defined(libp2p_agents_metrics):
proc setShortAgent*(s: Connection, shortAgent: string) =
var conn = s
while not isNil(conn):
conn.shortAgent = shortAgent
conn = conn.getWrapped()
proc new*(C: type Connection,
peerId: PeerId,
dir: Direction,
observedAddr: Opt[MultiAddress],
timeout: Duration = DefaultConnectionTimeout,
timeoutHandler: TimeoutHandler = nil): Connection =
result = C(peerId: peerId,
dir: dir,
timeout: timeout,
timeoutHandler: timeoutHandler,
observedAddr: observedAddr)
result.initStream()