243 lines
6.6 KiB
Nim
243 lines
6.6 KiB
Nim
## Nim-LibP2P
|
|
## Copyright (c) 2019 Status Research & Development GmbH
|
|
## Licensed under either of
|
|
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
## at your option.
|
|
## This file may not be copied, modified, or distributed except according to
|
|
## those terms.
|
|
|
|
{.push raises: [Defect].}
|
|
|
|
import std/[oids, sequtils]
|
|
import chronos, chronicles
|
|
import transport,
|
|
../errors,
|
|
../wire,
|
|
../multicodec,
|
|
../multistream,
|
|
../connmanager,
|
|
../multiaddress,
|
|
../stream/connection,
|
|
../stream/chronosstream,
|
|
../upgrademngrs/upgrade
|
|
|
|
logScope:
|
|
topics = "libp2p tcptransport"
|
|
|
|
export transport
|
|
|
|
const
|
|
TcpTransportTrackerName* = "libp2p.tcptransport"
|
|
|
|
type
|
|
TcpTransport* = ref object of Transport
|
|
server*: StreamServer
|
|
clients: array[Direction, seq[StreamTransport]]
|
|
flags: set[ServerFlags]
|
|
|
|
TcpTransportTracker* = ref object of TrackerBase
|
|
opened*: uint64
|
|
closed*: uint64
|
|
|
|
proc setupTcpTransportTracker(): TcpTransportTracker {.gcsafe, raises: [Defect].}
|
|
|
|
proc getTcpTransportTracker(): TcpTransportTracker {.gcsafe.} =
|
|
result = cast[TcpTransportTracker](getTracker(TcpTransportTrackerName))
|
|
if isNil(result):
|
|
result = setupTcpTransportTracker()
|
|
|
|
proc dumpTracking(): string {.gcsafe.} =
|
|
var tracker = getTcpTransportTracker()
|
|
result = "Opened tcp transports: " & $tracker.opened & "\n" &
|
|
"Closed tcp transports: " & $tracker.closed
|
|
|
|
proc leakTransport(): bool {.gcsafe.} =
|
|
var tracker = getTcpTransportTracker()
|
|
result = (tracker.opened != tracker.closed)
|
|
|
|
proc setupTcpTransportTracker(): TcpTransportTracker =
|
|
result = new TcpTransportTracker
|
|
result.opened = 0
|
|
result.closed = 0
|
|
result.dump = dumpTracking
|
|
result.isLeaked = leakTransport
|
|
addTracker(TcpTransportTrackerName, result)
|
|
|
|
proc connHandler*(self: TcpTransport,
|
|
client: StreamTransport,
|
|
dir: Direction): Future[Connection] {.async.} =
|
|
var observedAddr: MultiAddress = MultiAddress()
|
|
try:
|
|
observedAddr = MultiAddress.init(client.remoteAddress).tryGet()
|
|
except CatchableError as exc:
|
|
trace "Connection setup failed", exc = exc.msg
|
|
if not(isNil(client) and client.closed):
|
|
await client.closeWait()
|
|
raise exc
|
|
|
|
trace "Handling tcp connection", address = $observedAddr,
|
|
dir = $dir,
|
|
clients = self.clients[Direction.In].len +
|
|
self.clients[Direction.Out].len
|
|
|
|
let conn = Connection(
|
|
ChronosStream.init(
|
|
client = client,
|
|
dir = dir,
|
|
observedAddr = observedAddr
|
|
))
|
|
|
|
proc onClose() {.async.} =
|
|
try:
|
|
let futs = @[client.join(), conn.join()]
|
|
await futs[0] or futs[1]
|
|
for f in futs:
|
|
if not f.finished: await f.cancelAndWait() # cancel outstanding join()
|
|
|
|
trace "Cleaning up client", addrs = $client.remoteAddress,
|
|
conn
|
|
|
|
self.clients[dir].keepItIf( it != client )
|
|
await allFuturesThrowing(
|
|
conn.close(), client.closeWait())
|
|
|
|
trace "Cleaned up client", addrs = $client.remoteAddress,
|
|
conn
|
|
|
|
except CatchableError as exc:
|
|
let useExc {.used.} = exc
|
|
debug "Error cleaning up client", errMsg = exc.msg, conn
|
|
|
|
self.clients[dir].add(client)
|
|
asyncSpawn onClose()
|
|
|
|
return conn
|
|
|
|
func init*(
|
|
T: type TcpTransport,
|
|
flags: set[ServerFlags] = {},
|
|
upgrade: Upgrade): T =
|
|
|
|
result = T(
|
|
flags: flags,
|
|
upgrader: upgrade
|
|
)
|
|
|
|
result.initTransport()
|
|
|
|
method initTransport*(self: TcpTransport) =
|
|
self.multicodec = multiCodec("tcp")
|
|
inc getTcpTransportTracker().opened
|
|
|
|
method start*(
|
|
self: TcpTransport,
|
|
ma: MultiAddress) {.async.} =
|
|
## listen on the transport
|
|
##
|
|
|
|
if self.running:
|
|
trace "TCP transport already running"
|
|
return
|
|
|
|
await procCall Transport(self).start(ma)
|
|
trace "Starting TCP transport"
|
|
|
|
self.server = createStreamServer(
|
|
ma = self.ma,
|
|
flags = self.flags,
|
|
udata = self)
|
|
|
|
# always get the resolved address in case we're bound to 0.0.0.0:0
|
|
self.ma = MultiAddress.init(self.server.sock.getLocalAddress()).tryGet()
|
|
self.running = true
|
|
|
|
trace "Listening on", address = self.ma
|
|
|
|
method stop*(self: TcpTransport) {.async, gcsafe.} =
|
|
## stop the transport
|
|
##
|
|
|
|
self.running = false # mark stopped as soon as possible
|
|
|
|
try:
|
|
trace "Stopping TCP transport"
|
|
await procCall Transport(self).stop() # call base
|
|
|
|
checkFutures(
|
|
await allFinished(
|
|
self.clients[Direction.In].mapIt(it.closeWait()) &
|
|
self.clients[Direction.Out].mapIt(it.closeWait())))
|
|
|
|
# server can be nil
|
|
if not isNil(self.server):
|
|
await self.server.closeWait()
|
|
|
|
self.server = nil
|
|
trace "Transport stopped"
|
|
inc getTcpTransportTracker().closed
|
|
except CatchableError as exc:
|
|
trace "Error shutting down tcp transport", exc = exc.msg
|
|
|
|
method upgradeIncoming*(
|
|
self: TcpTransport,
|
|
conn: Connection): Future[void] {.gcsafe.} =
|
|
## base upgrade method that the transport uses to perform
|
|
## transport specific upgrades
|
|
##
|
|
|
|
self.upgrader.upgradeIncoming(conn)
|
|
|
|
method upgradeOutgoing*(
|
|
self: TcpTransport,
|
|
conn: Connection): Future[Connection] {.gcsafe.} =
|
|
## base upgrade method that the transport uses to perform
|
|
## transport specific upgrades
|
|
##
|
|
|
|
self.upgrader.upgradeOutgoing(conn)
|
|
|
|
method accept*(self: TcpTransport): Future[Connection] {.async, gcsafe.} =
|
|
## accept a new TCP connection
|
|
##
|
|
|
|
if not self.running:
|
|
raise newTransportClosedError()
|
|
|
|
try:
|
|
let transp = await self.server.accept()
|
|
return await self.connHandler(transp, Direction.In)
|
|
except TransportOsError as exc:
|
|
# TODO: it doesn't sound like all OS errors
|
|
# can be ignored, we should re-raise those
|
|
# that can'self.
|
|
debug "OS Error", exc = exc.msg
|
|
except TransportTooManyError as exc:
|
|
debug "Too many files opened", exc = exc.msg
|
|
except TransportUseClosedError as exc:
|
|
debug "Server was closed", exc = exc.msg
|
|
raise newTransportClosedError(exc)
|
|
except CatchableError as exc:
|
|
warn "Unexpected error creating connection", exc = exc.msg
|
|
raise exc
|
|
|
|
method dial*(
|
|
self: TcpTransport,
|
|
address: MultiAddress): Future[Connection] {.async, gcsafe.} =
|
|
## dial a peer
|
|
##
|
|
|
|
trace "Dialing remote peer", address = $address
|
|
|
|
let transp = await connect(address)
|
|
return await self.connHandler(transp, Direction.Out)
|
|
|
|
method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} =
|
|
if procCall Transport(t).handles(address):
|
|
if address.protocols.isOk:
|
|
return address.protocols
|
|
.get()
|
|
.filterIt(
|
|
it == multiCodec("tcp")
|
|
).len > 0
|