2022-07-01 20:19:57 +02:00
|
|
|
# Nim-LibP2P
|
2023-01-20 15:47:40 +01:00
|
|
|
# Copyright (c) 2023 Status Research & Development GmbH
|
2022-07-01 20:19:57 +02:00
|
|
|
# Licensed under either of
|
|
|
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
|
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
|
|
# at your option.
|
|
|
|
# This file may not be copied, modified, or distributed except according to
|
|
|
|
# those terms.
|
|
|
|
|
|
|
|
## TCP transport implementation
|
2019-08-20 10:18:15 -06:00
|
|
|
|
2023-06-07 13:12:49 +02:00
|
|
|
{.push raises: [].}
|
2021-05-21 10:27:01 -06:00
|
|
|
|
2023-05-18 10:24:17 +02:00
|
|
|
import std/[sequtils]
|
2022-09-22 21:55:59 +02:00
|
|
|
import stew/results
|
2020-11-18 20:06:42 -06:00
|
|
|
import chronos, chronicles
|
2019-09-24 10:16:39 -06:00
|
|
|
import transport,
|
2020-04-21 10:24:42 +09:00
|
|
|
../errors,
|
2019-09-24 10:16:39 -06:00
|
|
|
../wire,
|
|
|
|
../multicodec,
|
2021-03-18 09:20:36 -06:00
|
|
|
../connmanager,
|
|
|
|
../multiaddress,
|
2020-06-19 11:29:43 -06:00
|
|
|
../stream/connection,
|
2021-03-18 09:20:36 -06:00
|
|
|
../stream/chronosstream,
|
2022-07-01 20:19:57 +02:00
|
|
|
../upgrademngrs/upgrade,
|
|
|
|
../utility
|
2019-08-20 10:18:15 -06:00
|
|
|
|
2019-09-11 18:15:04 -06:00
|
|
|
logScope:
|
2020-12-01 11:34:27 -06:00
|
|
|
topics = "libp2p tcptransport"
|
2019-09-11 18:15:04 -06:00
|
|
|
|
2022-09-22 21:55:59 +02:00
|
|
|
export transport, results
|
2020-11-28 02:05:12 -06:00
|
|
|
|
2020-04-21 10:24:42 +09:00
|
|
|
const
|
|
|
|
TcpTransportTrackerName* = "libp2p.tcptransport"
|
|
|
|
|
|
|
|
type
|
|
|
|
TcpTransport* = ref object of Transport
|
2021-11-24 14:01:12 -06:00
|
|
|
servers*: seq[StreamServer]
|
2020-11-18 20:06:42 -06:00
|
|
|
clients: array[Direction, seq[StreamTransport]]
|
2020-05-18 13:04:05 -06:00
|
|
|
flags: set[ServerFlags]
|
2023-04-06 15:23:35 +02:00
|
|
|
clientFlags: set[SocketFlags]
|
2021-11-24 14:01:12 -06:00
|
|
|
acceptFuts: seq[Future[StreamTransport]]
|
2023-06-07 14:27:32 +02:00
|
|
|
connectionsTimeout: Duration
|
2020-04-21 10:24:42 +09:00
|
|
|
|
|
|
|
TcpTransportTracker* = ref object of TrackerBase
|
|
|
|
opened*: uint64
|
|
|
|
closed*: uint64
|
|
|
|
|
2023-04-06 15:23:35 +02:00
|
|
|
TcpTransportError* = object of transport.TransportError
|
|
|
|
|
2023-06-07 13:12:49 +02:00
|
|
|
proc setupTcpTransportTracker(): TcpTransportTracker {.gcsafe, raises: [].}
|
2020-04-21 10:24:42 +09:00
|
|
|
|
|
|
|
proc getTcpTransportTracker(): TcpTransportTracker {.gcsafe.} =
|
|
|
|
result = cast[TcpTransportTracker](getTracker(TcpTransportTrackerName))
|
|
|
|
if isNil(result):
|
|
|
|
result = setupTcpTransportTracker()
|
|
|
|
|
|
|
|
proc dumpTracking(): string {.gcsafe.} =
|
|
|
|
var tracker = getTcpTransportTracker()
|
2020-05-19 18:14:15 -06:00
|
|
|
result = "Opened tcp transports: " & $tracker.opened & "\n" &
|
|
|
|
"Closed tcp transports: " & $tracker.closed
|
2020-04-21 10:24:42 +09:00
|
|
|
|
|
|
|
proc leakTransport(): bool {.gcsafe.} =
|
|
|
|
var tracker = getTcpTransportTracker()
|
|
|
|
result = (tracker.opened != tracker.closed)
|
|
|
|
|
|
|
|
proc setupTcpTransportTracker(): TcpTransportTracker =
|
|
|
|
result = new TcpTransportTracker
|
|
|
|
result.opened = 0
|
|
|
|
result.closed = 0
|
|
|
|
result.dump = dumpTracking
|
|
|
|
result.isLeaked = leakTransport
|
|
|
|
addTracker(TcpTransportTrackerName, result)
|
2019-08-20 10:18:15 -06:00
|
|
|
|
2022-09-22 21:55:59 +02:00
|
|
|
proc getObservedAddr(client: StreamTransport): Future[MultiAddress] {.async.} =
|
2020-11-25 13:34:48 -06:00
|
|
|
try:
|
2022-09-22 21:55:59 +02:00
|
|
|
return MultiAddress.init(client.remoteAddress).tryGet()
|
2020-11-25 13:34:48 -06:00
|
|
|
except CatchableError as exc:
|
2021-10-14 13:16:34 +02:00
|
|
|
trace "Failed to create observedAddr", exc = exc.msg
|
2020-11-25 13:34:48 -06:00
|
|
|
if not(isNil(client) and client.closed):
|
|
|
|
await client.closeWait()
|
2021-10-14 13:16:34 +02:00
|
|
|
raise exc
|
2020-11-25 13:34:48 -06:00
|
|
|
|
2022-09-22 21:55:59 +02:00
|
|
|
proc connHandler*(self: TcpTransport,
|
|
|
|
client: StreamTransport,
|
|
|
|
observedAddr: Opt[MultiAddress],
|
|
|
|
dir: Direction): Future[Connection] {.async.} =
|
|
|
|
|
2021-01-04 12:59:05 -06:00
|
|
|
trace "Handling tcp connection", address = $observedAddr,
|
2020-11-18 20:06:42 -06:00
|
|
|
dir = $dir,
|
2021-03-18 09:20:36 -06:00
|
|
|
clients = self.clients[Direction.In].len +
|
|
|
|
self.clients[Direction.Out].len
|
2020-11-01 16:23:26 -06:00
|
|
|
|
|
|
|
let conn = Connection(
|
|
|
|
ChronosStream.init(
|
2020-11-25 13:34:48 -06:00
|
|
|
client = client,
|
|
|
|
dir = dir,
|
2023-06-07 14:27:32 +02:00
|
|
|
observedAddr = observedAddr,
|
|
|
|
timeout = self.connectionsTimeout
|
2020-11-18 20:06:42 -06:00
|
|
|
))
|
2020-11-01 16:23:26 -06:00
|
|
|
|
2020-11-18 20:06:42 -06:00
|
|
|
proc onClose() {.async.} =
|
2020-05-23 11:10:01 -06:00
|
|
|
try:
|
2020-11-25 07:35:25 -06:00
|
|
|
let futs = @[client.join(), conn.join()]
|
|
|
|
await futs[0] or futs[1]
|
|
|
|
for f in futs:
|
2020-11-28 09:48:06 -06:00
|
|
|
if not f.finished: await f.cancelAndWait() # cancel outstanding join()
|
2020-11-25 07:35:25 -06:00
|
|
|
|
2020-11-18 20:06:42 -06:00
|
|
|
trace "Cleaning up client", addrs = $client.remoteAddress,
|
|
|
|
conn
|
|
|
|
|
2021-03-18 09:20:36 -06:00
|
|
|
self.clients[dir].keepItIf( it != client )
|
2020-11-23 18:22:15 -06:00
|
|
|
await allFuturesThrowing(
|
|
|
|
conn.close(), client.closeWait())
|
2020-11-18 20:06:42 -06:00
|
|
|
|
|
|
|
trace "Cleaned up client", addrs = $client.remoteAddress,
|
|
|
|
conn
|
|
|
|
|
2020-05-23 11:10:01 -06:00
|
|
|
except CatchableError as exc:
|
2020-11-18 20:06:42 -06:00
|
|
|
let useExc {.used.} = exc
|
|
|
|
debug "Error cleaning up client", errMsg = exc.msg, conn
|
2020-05-23 11:10:01 -06:00
|
|
|
|
2021-03-18 09:20:36 -06:00
|
|
|
self.clients[dir].add(client)
|
2020-11-18 20:06:42 -06:00
|
|
|
asyncSpawn onClose()
|
2020-11-02 14:35:26 -06:00
|
|
|
|
2020-11-18 20:06:42 -06:00
|
|
|
return conn
|
|
|
|
|
2021-06-30 10:59:30 +02:00
|
|
|
proc new*(
|
|
|
|
T: typedesc[TcpTransport],
|
2021-03-18 09:20:36 -06:00
|
|
|
flags: set[ServerFlags] = {},
|
2023-06-07 14:27:32 +02:00
|
|
|
upgrade: Upgrade,
|
|
|
|
connectionsTimeout = 10.minutes): T {.public.} =
|
2021-03-18 09:20:36 -06:00
|
|
|
|
2023-02-15 17:18:29 +01:00
|
|
|
let
|
|
|
|
transport = T(
|
|
|
|
flags: flags,
|
|
|
|
clientFlags:
|
|
|
|
if ServerFlags.TcpNoDelay in flags:
|
|
|
|
compilesOr:
|
2023-04-06 15:23:35 +02:00
|
|
|
{SocketFlags.TcpNoDelay}
|
2023-02-15 17:18:29 +01:00
|
|
|
do:
|
|
|
|
doAssert(false)
|
2023-04-06 15:23:35 +02:00
|
|
|
default(set[SocketFlags])
|
2023-02-15 17:18:29 +01:00
|
|
|
else:
|
2023-04-06 15:23:35 +02:00
|
|
|
default(set[SocketFlags]),
|
|
|
|
upgrader: upgrade,
|
2023-06-07 14:27:32 +02:00
|
|
|
networkReachability: NetworkReachability.Unknown,
|
|
|
|
connectionsTimeout: connectionsTimeout)
|
2020-11-18 20:06:42 -06:00
|
|
|
|
2021-06-30 10:59:30 +02:00
|
|
|
return transport
|
2020-04-21 10:24:42 +09:00
|
|
|
|
2021-03-18 09:20:36 -06:00
|
|
|
method start*(
|
|
|
|
self: TcpTransport,
|
2021-11-24 14:01:12 -06:00
|
|
|
addrs: seq[MultiAddress]) {.async.} =
|
2020-11-18 20:06:42 -06:00
|
|
|
## listen on the transport
|
|
|
|
##
|
2020-05-27 14:46:25 -06:00
|
|
|
|
2021-03-18 09:20:36 -06:00
|
|
|
if self.running:
|
2021-11-24 14:01:12 -06:00
|
|
|
warn "TCP transport already running"
|
2020-11-18 20:06:42 -06:00
|
|
|
return
|
2019-08-21 16:53:16 -06:00
|
|
|
|
2021-11-24 14:01:12 -06:00
|
|
|
await procCall Transport(self).start(addrs)
|
2020-11-18 20:06:42 -06:00
|
|
|
trace "Starting TCP transport"
|
2022-08-01 14:31:22 +02:00
|
|
|
inc getTcpTransportTracker().opened
|
2020-05-18 11:05:34 -06:00
|
|
|
|
2021-11-24 14:01:12 -06:00
|
|
|
for i, ma in addrs:
|
|
|
|
if not self.handles(ma):
|
|
|
|
trace "Invalid address detected, skipping!", address = ma
|
|
|
|
continue
|
2020-04-21 10:24:42 +09:00
|
|
|
|
2023-04-06 15:23:35 +02:00
|
|
|
self.flags.incl(ServerFlags.ReusePort)
|
2021-11-24 14:01:12 -06:00
|
|
|
let server = createStreamServer(
|
|
|
|
ma = ma,
|
|
|
|
flags = self.flags,
|
|
|
|
udata = self)
|
2020-04-21 10:24:42 +09:00
|
|
|
|
2021-11-24 14:01:12 -06:00
|
|
|
# always get the resolved address in case we're bound to 0.0.0.0:0
|
|
|
|
self.addrs[i] = MultiAddress.init(
|
|
|
|
server.sock.getLocalAddress()
|
|
|
|
).tryGet()
|
|
|
|
|
|
|
|
self.servers &= server
|
|
|
|
|
|
|
|
trace "Listening on", address = ma
|
2020-11-18 20:06:42 -06:00
|
|
|
|
2021-03-18 09:20:36 -06:00
|
|
|
method stop*(self: TcpTransport) {.async, gcsafe.} =
|
2020-11-18 20:06:42 -06:00
|
|
|
## stop the transport
|
|
|
|
##
|
|
|
|
try:
|
|
|
|
trace "Stopping TCP transport"
|
2020-04-11 13:08:25 +09:00
|
|
|
|
2020-05-27 14:46:25 -06:00
|
|
|
checkFutures(
|
2020-11-18 20:06:42 -06:00
|
|
|
await allFinished(
|
2021-03-18 09:20:36 -06:00
|
|
|
self.clients[Direction.In].mapIt(it.closeWait()) &
|
|
|
|
self.clients[Direction.Out].mapIt(it.closeWait())))
|
2019-08-20 10:18:15 -06:00
|
|
|
|
2022-08-01 14:31:22 +02:00
|
|
|
if not self.running:
|
|
|
|
warn "TCP transport already stopped"
|
|
|
|
return
|
|
|
|
|
|
|
|
await procCall Transport(self).stop() # call base
|
2021-11-24 14:01:12 -06:00
|
|
|
var toWait: seq[Future[void]]
|
|
|
|
for fut in self.acceptFuts:
|
|
|
|
if not fut.finished:
|
|
|
|
toWait.add(fut.cancelAndWait())
|
|
|
|
elif fut.done:
|
|
|
|
toWait.add(fut.read().closeWait())
|
|
|
|
|
|
|
|
for server in self.servers:
|
|
|
|
server.stop()
|
|
|
|
toWait.add(server.closeWait())
|
|
|
|
|
|
|
|
await allFutures(toWait)
|
|
|
|
|
|
|
|
self.servers = @[]
|
2020-11-18 20:06:42 -06:00
|
|
|
|
|
|
|
trace "Transport stopped"
|
2020-05-27 14:46:25 -06:00
|
|
|
inc getTcpTransportTracker().closed
|
|
|
|
except CatchableError as exc:
|
2020-11-18 20:06:42 -06:00
|
|
|
trace "Error shutting down tcp transport", exc = exc.msg
|
2020-04-21 10:24:42 +09:00
|
|
|
|
2021-03-18 09:20:36 -06:00
|
|
|
method accept*(self: TcpTransport): Future[Connection] {.async, gcsafe.} =
|
2020-11-18 20:06:42 -06:00
|
|
|
## accept a new TCP connection
|
|
|
|
##
|
2019-09-25 11:36:39 -06:00
|
|
|
|
2021-03-18 09:20:36 -06:00
|
|
|
if not self.running:
|
2020-11-18 20:06:42 -06:00
|
|
|
raise newTransportClosedError()
|
|
|
|
|
2020-11-19 09:10:25 -06:00
|
|
|
try:
|
2021-11-24 14:01:12 -06:00
|
|
|
if self.acceptFuts.len <= 0:
|
|
|
|
self.acceptFuts = self.servers.mapIt(it.accept())
|
|
|
|
|
|
|
|
if self.acceptFuts.len <= 0:
|
|
|
|
return
|
|
|
|
|
|
|
|
let
|
|
|
|
finished = await one(self.acceptFuts)
|
|
|
|
index = self.acceptFuts.find(finished)
|
|
|
|
|
|
|
|
self.acceptFuts[index] = self.servers[index].accept()
|
|
|
|
|
|
|
|
let transp = await finished
|
2022-09-22 21:55:59 +02:00
|
|
|
let observedAddr = await getObservedAddr(transp)
|
|
|
|
return await self.connHandler(transp, Opt.some(observedAddr), Direction.In)
|
2020-11-19 09:10:25 -06:00
|
|
|
except TransportTooManyError as exc:
|
2021-01-04 12:59:05 -06:00
|
|
|
debug "Too many files opened", exc = exc.msg
|
2023-06-14 17:55:56 +02:00
|
|
|
except TransportAbortedError as exc:
|
|
|
|
debug "Connection aborted", exc = exc.msg
|
2020-11-19 09:10:25 -06:00
|
|
|
except TransportUseClosedError as exc:
|
2021-01-04 12:59:05 -06:00
|
|
|
debug "Server was closed", exc = exc.msg
|
2020-11-19 09:10:25 -06:00
|
|
|
raise newTransportClosedError(exc)
|
2021-11-24 14:01:12 -06:00
|
|
|
except CancelledError as exc:
|
2022-01-05 16:27:33 +01:00
|
|
|
raise exc
|
2023-06-14 17:55:56 +02:00
|
|
|
except TransportOsError as exc:
|
|
|
|
info "OS Error", exc = exc.msg
|
|
|
|
raise exc
|
2020-11-19 09:10:25 -06:00
|
|
|
except CatchableError as exc:
|
2023-06-14 17:55:56 +02:00
|
|
|
info "Unexpected error accepting connection", exc = exc.msg
|
2020-11-19 09:10:25 -06:00
|
|
|
raise exc
|
2019-08-20 10:18:15 -06:00
|
|
|
|
2021-03-18 09:20:36 -06:00
|
|
|
method dial*(
|
|
|
|
self: TcpTransport,
|
2021-11-08 13:02:03 +01:00
|
|
|
hostname: string,
|
2023-01-31 12:46:10 +01:00
|
|
|
address: MultiAddress,
|
|
|
|
peerId: Opt[PeerId] = Opt.none(PeerId)): Future[Connection] {.async, gcsafe.} =
|
2019-08-20 10:18:15 -06:00
|
|
|
## dial a peer
|
2020-11-18 20:06:42 -06:00
|
|
|
##
|
|
|
|
|
|
|
|
trace "Dialing remote peer", address = $address
|
2023-04-06 15:23:35 +02:00
|
|
|
let transp =
|
|
|
|
if self.networkReachability == NetworkReachability.NotReachable and self.addrs.len > 0:
|
|
|
|
self.clientFlags.incl(SocketFlags.ReusePort)
|
|
|
|
await connect(address, flags = self.clientFlags, localAddress = Opt.some(self.addrs[0]))
|
|
|
|
else:
|
|
|
|
await connect(address, flags = self.clientFlags)
|
2020-11-18 20:06:42 -06:00
|
|
|
|
2021-11-24 14:01:12 -06:00
|
|
|
try:
|
2022-09-22 21:55:59 +02:00
|
|
|
let observedAddr = await getObservedAddr(transp)
|
|
|
|
return await self.connHandler(transp, Opt.some(observedAddr), Direction.Out)
|
2021-11-24 14:01:12 -06:00
|
|
|
except CatchableError as err:
|
|
|
|
await transp.closeWait()
|
|
|
|
raise err
|
2019-08-21 18:23:13 -06:00
|
|
|
|
2019-12-10 14:50:35 -06:00
|
|
|
method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} =
|
2019-12-03 22:44:54 -06:00
|
|
|
if procCall Transport(t).handles(address):
|
2021-05-21 10:27:01 -06:00
|
|
|
if address.protocols.isOk:
|
2021-08-03 15:48:03 +02:00
|
|
|
return TCP.match(address)
|