Split dialer (#542)

* extracting dialing logic to dialer

* exposing upgrade methods on transport

* cleanup

* fixing tests to use new interfaces

* add comments
This commit is contained in:
Dmitriy Ryajov 2021-03-18 09:20:36 -06:00
parent 3da656687b
commit a3c00af945
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
14 changed files with 641 additions and 453 deletions

46
libp2p/dial.nim Normal file
View File

@ -0,0 +1,46 @@
## Nim-LibP2P
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import chronos
import peerid,
stream/connection
type
Dial* = ref object of RootObj
method connect*(
self: Dial,
peerId: PeerID,
addrs: seq[MultiAddress]) {.async, base.} =
## connect remote peer without negotiating
## a protocol
##
doAssert(false, "Not implemented!")
method dial*(
self: Dial,
peerId: PeerID,
protos: seq[string]): Future[Connection] {.async, base.} =
## create a protocol stream over an
## existing connection
##
doAssert(false, "Not implemented!")
method dial*(
self: Dial,
peerId: PeerID,
addrs: seq[MultiAddress],
protos: seq[string]): Future[Connection] {.async, base.} =
## create a protocol stream and establish
## a connection if one doesn't exist already
##
doAssert(false, "Not implemented!")

241
libp2p/dialer.nim Normal file
View File

@ -0,0 +1,241 @@
## Nim-LibP2P
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/[sugar, tables]
import pkg/[chronos,
chronicles,
metrics]
import dial,
peerid,
peerinfo,
multistream,
connmanager,
stream/connection,
transports/transport
export dial
logScope:
topics = "libp2p dialer"
declareCounter(libp2p_total_dial_attempts, "total attempted dials")
declareCounter(libp2p_successful_dials, "dialed successful peers")
declareCounter(libp2p_failed_dials, "failed dials")
declareCounter(libp2p_failed_upgrades_outgoing, "outgoing connections failed upgrades")
type
DialFailedError* = object of CatchableError
Dialer* = ref object of Dial
peerInfo*: PeerInfo
ms: MultistreamSelect
connManager: ConnManager
dialLock: Table[PeerID, AsyncLock]
transports: seq[Transport]
proc dialAndUpgrade(
self: Dialer,
peerId: PeerID,
addrs: seq[MultiAddress]):
Future[Connection] {.async.} =
debug "Dialing peer", peerId
# Avoid "cannot be captured as it would violate memory safety" errors in Nim-1.4.x.
var
transport: Transport
address: MultiAddress
for t in self.transports: # for each transport
transport = t
for a in addrs: # for each address
address = a
if t.handles(a): # check if it can dial it
trace "Dialing address", address = $a, peerId
let dialed = try:
libp2p_total_dial_attempts.inc()
# await a connection slot when the total
# connection count is equal to `maxConns`
await self.connManager.trackOutgoingConn(
() => transport.dial(address)
)
except TooManyConnectionsError as exc:
trace "Connection limit reached!"
raise exc
except CancelledError as exc:
debug "Dialing canceled", msg = exc.msg, peerId
raise exc
except CatchableError as exc:
debug "Dialing failed", msg = exc.msg, peerId
libp2p_failed_dials.inc()
continue # Try the next address
# make sure to assign the peer to the connection
dialed.peerInfo = PeerInfo.init(peerId, addrs)
# also keep track of the connection's bottom unsafe transport direction
# required by gossipsub scoring
dialed.transportDir = Direction.Out
libp2p_successful_dials.inc()
let conn = try:
await transport.upgradeOutgoing(dialed)
except CatchableError as exc:
# If we failed to establish the connection through one transport,
# we won't succeeded through another - no use in trying again
await dialed.close()
debug "Upgrade failed", msg = exc.msg, peerId
if exc isnot CancelledError:
libp2p_failed_upgrades_outgoing.inc()
raise exc
doAssert not isNil(conn), "connection died after upgradeOutgoing"
debug "Dial successful", conn, peerInfo = conn.peerInfo
return conn
proc internalConnect(
self: Dialer,
peerId: PeerID,
addrs: seq[MultiAddress]):
Future[Connection] {.async.} =
if self.peerInfo.peerId == peerId:
raise newException(CatchableError, "can't dial self!")
# Ensure there's only one in-flight attempt per peer
let lock = self.dialLock.mgetOrPut(peerId, newAsyncLock())
try:
await lock.acquire()
# Check if we have a connection already and try to reuse it
var conn = self.connManager.selectConn(peerId)
if conn != nil:
if conn.atEof or conn.closed:
# This connection should already have been removed from the connection
# manager - it's essentially a bug that we end up here - we'll fail
# for now, hoping that this will clean themselves up later...
warn "dead connection in connection manager", conn
await conn.close()
raise newException(DialFailedError, "Zombie connection encountered")
trace "Reusing existing connection", conn, direction = $conn.dir
return conn
conn = await self.dialAndUpgrade(peerId, addrs)
if isNil(conn): # None of the addresses connected
raise newException(DialFailedError, "Unable to establish outgoing link")
# We already check for this in Connection manager
# but a disconnect could have happened right after
# we've added the connection so we check again
# to prevent races due to that.
if conn.closed() or conn.atEof():
# This can happen when the other ends drops us
# before we get a chance to return the connection
# back to the dialer.
trace "Connection dead on arrival", conn
raise newLPStreamClosedError()
return conn
finally:
if lock.locked():
lock.release()
method connect*(
self: Dialer,
peerId: PeerID,
addrs: seq[MultiAddress]) {.async.} =
## connect remote peer without negotiating
## a protocol
##
if self.connManager.connCount(peerId) > 0:
return
discard await self.internalConnect(peerId, addrs)
proc negotiateStream(
self: Dialer,
conn: Connection,
protos: seq[string]): Future[Connection] {.async.} =
trace "Negotiating stream", conn, protos
let selected = await self.ms.select(conn, protos)
if not protos.contains(selected):
await conn.closeWithEOF()
raise newException(DialFailedError, "Unable to select sub-protocol " & $protos)
return conn
method dial*(
self: Dialer,
peerId: PeerID,
protos: seq[string]): Future[Connection] {.async.} =
## create a protocol stream over an
## existing connection
##
trace "Dialing (existing)", peerId, protos
let stream = await self.connManager.getStream(peerId)
if stream.isNil:
raise newException(DialFailedError, "Couldn't get muxed stream")
return await self.negotiateStream(stream, protos)
method dial*(
self: Dialer,
peerId: PeerID,
addrs: seq[MultiAddress],
protos: seq[string]): Future[Connection] {.async.} =
## create a protocol stream and establish
## a connection if one doesn't exist already
##
var
conn: Connection
stream: Connection
proc cleanup() {.async.} =
if not(isNil(stream)):
await stream.closeWithEOF()
if not(isNil(conn)):
await conn.close()
try:
trace "Dialing (new)", peerId, protos
conn = await self.internalConnect(peerId, addrs)
trace "Opening stream", conn
stream = await self.connManager.getStream(conn)
if isNil(stream):
raise newException(DialFailedError,
"Couldn't get muxed stream")
return await self.negotiateStream(stream, protos)
except CancelledError as exc:
trace "Dial canceled", conn
await cleanup()
raise exc
except CatchableError as exc:
debug "Error dialing", conn, msg = exc.msg
await cleanup()
raise exc
proc new*(
T: type Dialer,
peerInfo: PeerInfo,
connManager: ConnManager,
transports: seq[Transport],
ms: MultistreamSelect): Dialer =
T(peerInfo: peerInfo,
connManager: connManager,
transports: transports,
ms: ms)

71
libp2p/standard_setup.nim Normal file
View File

@ -0,0 +1,71 @@
import
options, tables, chronos, bearssl,
switch, peerid, peerinfo, stream/connection, multiaddress,
crypto/crypto, transports/[transport, tcptransport],
muxers/[muxer, mplex/mplex],
protocols/[identify, secure/secure, secure/noise],
upgrademngrs/[upgrade, muxedupgrade], connmanager
export
switch, peerid, peerinfo, connection, multiaddress, crypto
type
SecureProtocol* {.pure.} = enum
Noise,
Secio {.deprecated.}
proc newStandardSwitch*(privKey = none(PrivateKey),
address = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
secureManagers: openarray[SecureProtocol] = [
SecureProtocol.Noise,
],
transportFlags: set[ServerFlags] = {},
rng = newRng(),
inTimeout: Duration = 5.minutes,
outTimeout: Duration = 5.minutes,
maxConnections = MaxConnections,
maxIn = -1,
maxOut = -1,
maxConnsPerPeer = MaxConnectionsPerPeer): Switch =
proc createMplex(conn: Connection): Muxer =
Mplex.init(
conn,
inTimeout = inTimeout,
outTimeout = outTimeout)
if rng == nil: # newRng could fail
raise (ref CatchableError)(msg: "Cannot initialize RNG")
let
seckey = privKey.get(otherwise = PrivateKey.random(rng[]).tryGet())
peerInfo = PeerInfo.init(seckey, [address])
var
secureManagerInstances: seq[Secure]
for sec in secureManagers:
case sec
of SecureProtocol.Noise:
secureManagerInstances &= newNoise(rng, seckey).Secure
of SecureProtocol.Secio:
quit("Secio is deprecated!") # use of secio is unsafe
let
mplexProvider = newMuxerProvider(createMplex, MplexCodec)
ms = newMultistream()
identify = newIdentify(peerInfo)
muxers = {MplexCodec: mplexProvider}.toTable
connManager = ConnManager.init(maxConnsPerPeer, maxConnections, maxIn, maxOut)
muxedUpgrade = MuxedUpgrade.init(identify, muxers, secureManagerInstances, connManager, ms)
transports = @[Transport(TcpTransport.init(transportFlags, muxedUpgrade))]
let switch = newSwitch(
peerInfo,
transports,
identify,
muxers,
secureManagers = secureManagerInstances,
connManager = connManager,
ms = ms)
return switch

View File

@ -33,9 +33,10 @@ import stream/connection,
utils/semaphore,
connmanager,
peerid,
errors
errors,
dialer
export connmanager, upgrade
export connmanager, upgrade, dialer
logScope:
topics = "libp2p switch"
@ -46,26 +47,19 @@ logScope:
# and only if the channel has been secured (i.e. if a secure manager has been
# previously provided)
declareCounter(libp2p_total_dial_attempts, "total attempted dials")
declareCounter(libp2p_successful_dials, "dialed successful peers")
declareCounter(libp2p_failed_dials, "failed dials")
declareCounter(libp2p_failed_upgrades_incoming, "incoming connections failed upgrades")
declareCounter(libp2p_failed_upgrades_outgoing, "outgoing connections failed upgrades")
const
ConcurrentUpgrades* = 4
type
DialFailedError* = object of LPError
Switch* = ref object of RootObj
Switch* = ref object of Dial
peerInfo*: PeerInfo
connManager*: ConnManager
transports*: seq[Transport]
ms*: MultistreamSelect
dialLock: Table[PeerID, AsyncLock]
acceptFuts: seq[Future[void]]
upgrade: Upgrade
dialer*: Dial
proc addConnEventHandler*(s: Switch,
handler: ConnEventHandler,
@ -97,186 +91,36 @@ proc isConnected*(s: Switch, peerId: PeerID): bool =
proc disconnect*(s: Switch, peerId: PeerID): Future[void] {.gcsafe.} =
s.connManager.dropPeer(peerId)
proc dialAndUpgrade(s: Switch,
method connect*(
s: Switch,
peerId: PeerID,
addrs: seq[MultiAddress]):
Future[Connection] {.async.} =
debug "Dialing peer", peerId
addrs: seq[MultiAddress]): Future[void] =
s.dialer.connect(peerId, addrs)
# Avoid "cannot be captured as it would violate memory safety" errors in Nim-1.4.x.
var
transport: Transport
address: MultiAddress
for t in s.transports: # for each transport
transport = t
for a in addrs: # for each address
address = a
if t.handles(a): # check if it can dial it
trace "Dialing address", address = $a, peerId
let dialed = try:
libp2p_total_dial_attempts.inc()
# await a connection slot when the total
# connection count is equal to `maxConns`
await s.connManager.trackOutgoingConn(
() => transport.dial(address)
)
except TooManyConnectionsError as exc:
trace "Connection limit reached!"
raise exc
except CancelledError as exc:
debug "Dialing canceled", msg = exc.msg, peerId
raise exc
except CatchableError as exc:
debug "Dialing failed", msg = exc.msg, peerId
libp2p_failed_dials.inc()
continue # Try the next address
# make sure to assign the peer to the connection
dialed.peerInfo = PeerInfo.init(peerId, addrs)
# also keep track of the connection's bottom unsafe transport direction
# required by gossipsub scoring
dialed.transportDir = Direction.Out
libp2p_successful_dials.inc()
let conn = try:
await s.upgrade.upgradeOutgoing(dialed)
except CatchableError as exc:
# If we failed to establish the connection through one transport,
# we won't succeeded through another - no use in trying again
await dialed.close()
debug "Upgrade failed", msg = exc.msg, peerId
if exc isnot CancelledError:
libp2p_failed_upgrades_outgoing.inc()
raise exc
doAssert not isNil(conn), "connection died after upgradeOutgoing"
debug "Dial successful", conn, peerInfo = conn.peerInfo
return conn
proc internalConnect(s: Switch,
method dial*(
s: Switch,
peerId: PeerID,
addrs: seq[MultiAddress]):
Future[Connection] {.async.} =
if s.peerInfo.peerId == peerId:
raise newException(CatchableError, "can't dial self!")
# Ensure there's only one in-flight attempt per peer
let lock = s.dialLock.mgetOrPut(peerId, newAsyncLock())
try:
await lock.acquire()
# Check if we have a connection already and try to reuse it
var conn = s.connManager.selectConn(peerId)
if conn != nil:
if conn.atEof or conn.closed:
# This connection should already have been removed from the connection
# manager - it's essentially a bug that we end up here - we'll fail
# for now, hoping that this will clean themselves up later...
warn "dead connection in connection manager", conn
await conn.close()
raise newException(DialFailedError, "Zombie connection encountered")
trace "Reusing existing connection", conn, direction = $conn.dir
return conn
conn = await s.dialAndUpgrade(peerId, addrs)
if isNil(conn): # None of the addresses connected
raise newException(DialFailedError, "Unable to establish outgoing link")
# We already check for this in Connection manager
# but a disconnect could have happened right after
# we've added the connection so we check again
# to prevent races due to that.
if conn.closed() or conn.atEof():
# This can happen when the other ends drops us
# before we get a chance to return the connection
# back to the dialer.
trace "Connection dead on arrival", conn
raise newLPStreamClosedError()
return conn
finally:
if lock.locked():
lock.release()
proc connect*(s: Switch, peerId: PeerID, addrs: seq[MultiAddress]) {.async.} =
## attempt to create establish a connection
## with a remote peer
##
if s.connManager.connCount(peerId) > 0:
return
discard await s.internalConnect(peerId, addrs)
proc negotiateStream(s: Switch, conn: Connection, protos: seq[string]): Future[Connection] {.async.} =
trace "Negotiating stream", conn, protos
let selected = await s.ms.select(conn, protos)
if not protos.contains(selected):
await conn.closeWithEOF()
raise newException(DialFailedError, "Unable to select sub-protocol " & $protos)
return conn
proc dial*(s: Switch,
peerId: PeerID,
protos: seq[string]): Future[Connection] {.async.} =
trace "Dialing (existing)", peerId, protos
let stream = await s.connManager.getStream(peerId)
if stream.isNil:
raise newException(DialFailedError, "Couldn't get muxed stream")
return await s.negotiateStream(stream, protos)
protos: seq[string]): Future[Connection] =
s.dialer.dial(peerId, protos)
proc dial*(s: Switch,
peerId: PeerID,
proto: string): Future[Connection] =
dial(s, peerId, @[proto])
proc dial*(s: Switch,
method dial*(
s: Switch,
peerId: PeerID,
addrs: seq[MultiAddress],
protos: seq[string]):
Future[Connection] {.async.} =
var
conn: Connection
stream: Connection
protos: seq[string]): Future[Connection] =
s.dialer.dial(peerId, addrs, protos)
proc cleanup() {.async.} =
if not(isNil(stream)):
await stream.closeWithEOF()
if not(isNil(conn)):
await conn.close()
try:
trace "Dialing (new)", peerId, protos
conn = await s.internalConnect(peerId, addrs)
trace "Opening stream", conn
stream = await s.connManager.getStream(conn)
if isNil(stream):
raise newException(DialFailedError,
"Couldn't get muxed stream")
return await s.negotiateStream(stream, protos)
except CancelledError as exc:
trace "Dial canceled", conn
await cleanup()
raise exc
except CatchableError as exc:
debug "Error dialing", conn, msg = exc.msg
await cleanup()
raise exc
proc dial*(s: Switch,
proc dial*(
s: Switch,
peerId: PeerID,
addrs: seq[MultiAddress],
proto: string):
Future[Connection] = dial(s, peerId, addrs, @[proto])
proto: string): Future[Connection] =
dial(s, peerId, addrs, @[proto])
proc mount*[T: LPProtocol](s: Switch, proto: T, matcher: Matcher = nil)
{.gcsafe, raises: [Defect, LPError].} =
@ -346,7 +190,7 @@ proc accept(s: Switch, transport: Transport) {.async.} = # noraises
debug "Accepted an incoming connection", conn
asyncSpawn upgradeMonitor(conn, upgrades)
asyncSpawn s.upgrade.upgradeIncoming(conn)
asyncSpawn transport.upgradeIncoming(conn)
except CancelledError as exc:
trace "releasing semaphore on cancellation"
upgrades.release() # always release the slot
@ -404,26 +248,17 @@ proc newSwitch*(peerInfo: PeerInfo,
identity: Identify,
muxers: Table[string, MuxerProvider],
secureManagers: openarray[Secure] = [],
maxConnections = MaxConnections,
maxIn = -1,
maxOut = -1,
maxConnsPerPeer = MaxConnectionsPerPeer): Switch
{.raises: [Defect, LPError].} =
connManager: ConnManager,
ms: MultistreamSelect): Switch =
if secureManagers.len == 0:
raise (ref LPError)(msg: "Provide at least one secure manager")
let ms = newMultistream()
let connManager = ConnManager.init(maxConnsPerPeer, maxConnections, maxIn, maxOut)
let upgrade = MuxedUpgrade.init(identity, muxers, secureManagers, connManager, ms)
let switch = Switch(
peerInfo: peerInfo,
ms: ms,
transports: transports,
connManager: connManager,
upgrade: upgrade,
)
dialer: Dialer.new(peerInfo, connManager, transports, ms))
switch.mount(identity)
return switch

View File

@ -14,10 +14,13 @@ import chronos, chronicles
import transport,
../errors,
../wire,
../multiaddress,
../multicodec,
../multistream,
../connmanager,
../multiaddress,
../stream/connection,
../stream/chronosstream
../stream/chronosstream,
../upgrademngrs/upgrade
logScope:
topics = "libp2p tcptransport"
@ -61,7 +64,7 @@ proc setupTcpTransportTracker(): TcpTransportTracker =
result.isLeaked = leakTransport
addTracker(TcpTransportTrackerName, result)
proc connHandler*(t: TcpTransport,
proc connHandler*(self: TcpTransport,
client: StreamTransport,
dir: Direction): Future[Connection] {.async.} =
var observedAddr: MultiAddress = MultiAddress()
@ -75,8 +78,8 @@ proc connHandler*(t: TcpTransport,
trace "Handling tcp connection", address = $observedAddr,
dir = $dir,
clients = t.clients[Direction.In].len +
t.clients[Direction.Out].len
clients = self.clients[Direction.In].len +
self.clients[Direction.Out].len
let conn = Connection(
ChronosStream.init(
@ -95,7 +98,7 @@ proc connHandler*(t: TcpTransport,
trace "Cleaning up client", addrs = $client.remoteAddress,
conn
t.clients[dir].keepItIf( it != client )
self.clients[dir].keepItIf( it != client )
await allFuturesThrowing(
conn.close(), client.closeWait())
@ -106,82 +109,108 @@ proc connHandler*(t: TcpTransport,
let useExc {.used.} = exc
debug "Error cleaning up client", errMsg = exc.msg, conn
t.clients[dir].add(client)
self.clients[dir].add(client)
asyncSpawn onClose()
return conn
proc init*(T: type TcpTransport,
flags: set[ServerFlags] = {}): T =
result = T(flags: flags)
func init*(
T: type TcpTransport,
flags: set[ServerFlags] = {},
upgrade: Upgrade): T =
result = T(
flags: flags,
upgrader: upgrade
)
result.initTransport()
method initTransport*(t: TcpTransport) =
t.multicodec = multiCodec("tcp")
method initTransport*(self: TcpTransport) =
self.multicodec = multiCodec("tcp")
inc getTcpTransportTracker().opened
method start*(t: TcpTransport, ma: MultiAddress) {.async.} =
method start*(
self: TcpTransport,
ma: MultiAddress) {.async.} =
## listen on the transport
##
if t.running:
if self.running:
trace "TCP transport already running"
return
await procCall Transport(t).start(ma)
await procCall Transport(self).start(ma)
trace "Starting TCP transport"
t.server = createStreamServer(
ma = t.ma,
flags = t.flags,
udata = t)
self.server = createStreamServer(
ma = self.ma,
flags = self.flags,
udata = self)
# always get the resolved address in case we're bound to 0.0.0.0:0
t.ma = MultiAddress.init(t.server.sock.getLocalAddress()).tryGet()
t.running = true
self.ma = MultiAddress.init(self.server.sock.getLocalAddress()).tryGet()
self.running = true
trace "Listening on", address = t.ma
trace "Listening on", address = self.ma
method stop*(t: TcpTransport) {.async, gcsafe.} =
method stop*(self: TcpTransport) {.async, gcsafe.} =
## stop the transport
##
t.running = false # mark stopped as soon as possible
self.running = false # mark stopped as soon as possible
try:
trace "Stopping TCP transport"
await procCall Transport(t).stop() # call base
await procCall Transport(self).stop() # call base
checkFutures(
await allFinished(
t.clients[Direction.In].mapIt(it.closeWait()) &
t.clients[Direction.Out].mapIt(it.closeWait())))
self.clients[Direction.In].mapIt(it.closeWait()) &
self.clients[Direction.Out].mapIt(it.closeWait())))
# server can be nil
if not isNil(t.server):
await t.server.closeWait()
if not isNil(self.server):
await self.server.closeWait()
t.server = nil
self.server = nil
trace "Transport stopped"
inc getTcpTransportTracker().closed
except CatchableError as exc:
trace "Error shutting down tcp transport", exc = exc.msg
method accept*(t: TcpTransport): Future[Connection] {.async, gcsafe.} =
method upgradeIncoming*(
self: TcpTransport,
conn: Connection): Future[void] {.gcsafe.} =
## base upgrade method that the transport uses to perform
## transport specific upgrades
##
self.upgrader.upgradeIncoming(conn)
method upgradeOutgoing*(
self: TcpTransport,
conn: Connection): Future[Connection] {.gcsafe.} =
## base upgrade method that the transport uses to perform
## transport specific upgrades
##
self.upgrader.upgradeOutgoing(conn)
method accept*(self: TcpTransport): Future[Connection] {.async, gcsafe.} =
## accept a new TCP connection
##
if not t.running:
if not self.running:
raise newTransportClosedError()
try:
let transp = await t.server.accept()
return await t.connHandler(transp, Direction.In)
let transp = await self.server.accept()
return await self.connHandler(transp, Direction.In)
except TransportOsError as exc:
# TODO: it doesn't sound like all OS errors
# can be ignored, we should re-raise those
# that can't.
# that can'self.
debug "OS Error", exc = exc.msg
except TransportTooManyError as exc:
debug "Too many files opened", exc = exc.msg
@ -192,16 +221,16 @@ method accept*(t: TcpTransport): Future[Connection] {.async, gcsafe.} =
warn "Unexpected error creating connection", exc = exc.msg
raise exc
method dial*(t: TcpTransport,
address: MultiAddress):
Future[Connection] {.async, gcsafe.} =
method dial*(
self: TcpTransport,
address: MultiAddress): Future[Connection] {.async, gcsafe.} =
## dial a peer
##
trace "Dialing remote peer", address = $address
let transp = await connect(address)
return await t.connHandler(transp, Direction.Out)
return await self.connHandler(transp, Direction.Out)
method handles*(t: TcpTransport, address: MultiAddress): bool {.gcsafe.} =
if procCall Transport(t).handles(address):

View File

@ -14,7 +14,8 @@ import sequtils
import chronos, chronicles
import ../stream/connection,
../multiaddress,
../multicodec
../multicodec,
../upgrademngrs/upgrade
logScope:
topics = "libp2p transport"
@ -25,56 +26,72 @@ type
Transport* = ref object of RootObj
ma*: Multiaddress
multicodec*: MultiCodec
running*: bool
upgrader*: Upgrade
multicodec*: MultiCodec
proc newTransportClosedError*(parent: ref Exception = nil): ref LPError =
newException(TransportClosedError,
"Transport closed, no more connections!", parent)
method initTransport*(t: Transport) {.base, gcsafe, locks: "unknown".} =
method initTransport*(self: Transport) {.base, gcsafe, locks: "unknown".} =
## perform protocol initialization
##
discard
method start*(t: Transport, ma: MultiAddress) {.base, async.} =
method start*(
self: Transport,
ma: MultiAddress): Future[void] {.base, async.} =
## start the transport
##
t.ma = ma
self.ma = ma
trace "starting transport", address = $ma
method stop*(t: Transport) {.base, async.} =
method stop*(self: Transport): Future[void] {.base, async.} =
## stop and cleanup the transport
## including all outstanding connections
##
discard
method accept*(t: Transport): Future[Connection]
{.base, async, gcsafe.} =
method accept*(self: Transport): Future[Connection]
{.base, gcsafe.} =
## accept incoming connections
##
discard
method dial*(t: Transport,
address: MultiAddress): Future[Connection]
{.base, async, gcsafe.} =
method dial*(
self: Transport,
address: MultiAddress): Future[Connection] {.base, gcsafe.} =
## dial a peer
##
discard
method upgrade*(t: Transport) {.base, async, gcsafe.} =
method upgradeIncoming*(
self: Transport,
conn: Connection): Future[void] {.base, gcsafe.} =
## base upgrade method that the transport uses to perform
## transport specific upgrades
##
discard
doAssert(false, "Not implemented!")
method handles*(t: Transport, address: MultiAddress): bool {.base, gcsafe.} =
method upgradeOutgoing*(
self: Transport,
conn: Connection): Future[Connection] {.base, gcsafe.} =
## base upgrade method that the transport uses to perform
## transport specific upgrades
##
doAssert(false, "Not implemented!")
method handles*(
self: Transport,
address: MultiAddress): bool {.base, gcsafe.} =
## check if transport supports the multiaddress
##
@ -83,7 +100,7 @@ method handles*(t: Transport, address: MultiAddress): bool {.base, gcsafe.} =
if address.protocols.isOk:
return address.protocols.get().filterIt( it == multiCodec("p2p-circuit") ).len == 0
method localAddress*(t: Transport): MultiAddress {.base, gcsafe.} =
method localAddress*(self: Transport): MultiAddress {.base, gcsafe.} =
## get the local address of the transport in case started with 0.0.0.0:0
##

View File

@ -25,26 +25,30 @@ type
muxers*: Table[string, MuxerProvider]
streamHandler*: StreamHandler
proc identify*(u: MuxedUpgrade, muxer: Muxer) {.async, gcsafe.} =
proc identify*(
self: MuxedUpgrade,
muxer: Muxer) {.async, gcsafe.} =
# new stream for identify
var stream = await muxer.newStream()
if stream == nil:
return
try:
await u.identify(stream)
await self.identify(stream)
finally:
await stream.closeWithEOF()
proc mux*(u: MuxedUpgrade, conn: Connection): Future[Muxer] {.async, gcsafe.} =
proc mux*(
self: MuxedUpgrade,
conn: Connection): Future[Muxer] {.async, gcsafe.} =
## mux incoming connection
trace "Muxing connection", conn
if u.muxers.len == 0:
if self.muxers.len == 0:
warn "no muxers registered, skipping upgrade flow", conn
return
let muxerName = await u.ms.select(conn, toSeq(u.muxers.keys()))
let muxerName = await self.ms.select(conn, toSeq(self.muxers.keys()))
if muxerName.len == 0 or muxerName == "na":
debug "no muxer available, early exit", conn
return
@ -52,18 +56,18 @@ proc mux*(u: MuxedUpgrade, conn: Connection): Future[Muxer] {.async, gcsafe.} =
trace "Found a muxer", conn, muxerName
# create new muxer for connection
let muxer = u.muxers[muxerName].newMuxer(conn)
let muxer = self.muxers[muxerName].newMuxer(conn)
# install stream handler
muxer.streamHandler = u.streamHandler
muxer.streamHandler = self.streamHandler
u.connManager.storeConn(conn)
self.connManager.storeConn(conn)
# store it in muxed connections if we have a peer for it
u.connManager.storeMuxer(muxer, muxer.handle()) # store muxer and start read loop
self.connManager.storeMuxer(muxer, muxer.handle()) # store muxer and start read loop
try:
await u.identify(muxer)
await self.identify(muxer)
except CatchableError as exc:
# Identify is non-essential, though if it fails, it might indicate that
# the connection was closed already - this will be picked up by the read
@ -72,10 +76,12 @@ proc mux*(u: MuxedUpgrade, conn: Connection): Future[Muxer] {.async, gcsafe.} =
return muxer
method upgradeOutgoing*(u: MuxedUpgrade, conn: Connection): Future[Connection] {.async, gcsafe.} =
method upgradeOutgoing*(
self: MuxedUpgrade,
conn: Connection): Future[Connection] {.async, gcsafe.} =
trace "Upgrading outgoing connection", conn
let sconn = await u.secure(conn) # secure the connection
let sconn = await self.secure(conn) # secure the connection
if isNil(sconn):
raise newException(UpgradeFailedError,
"unable to secure connection, stopping upgrade")
@ -84,7 +90,7 @@ method upgradeOutgoing*(u: MuxedUpgrade, conn: Connection): Future[Connection] {
raise newException(UpgradeFailedError,
"current version of nim-libp2p requires that secure protocol negotiates peerid")
let muxer = await u.mux(sconn) # mux it if possible
let muxer = await self.mux(sconn) # mux it if possible
if muxer == nil:
# TODO this might be relaxed in the future
raise newException(UpgradeFailedError,
@ -99,7 +105,9 @@ method upgradeOutgoing*(u: MuxedUpgrade, conn: Connection): Future[Connection] {
return sconn
method upgradeIncoming*(u: MuxedUpgrade, incomingConn: Connection) {.async, gcsafe.} = # noraises
method upgradeIncoming*(
self: MuxedUpgrade,
incomingConn: Connection): Future[void] {.async, gcsafe.} = # noraises
trace "Upgrading incoming connection", incomingConn
let ms = newMultistream()
@ -108,7 +116,7 @@ method upgradeIncoming*(u: MuxedUpgrade, incomingConn: Connection) {.async, gcsa
proto: string)
{.async, gcsafe, closure.} =
trace "Starting secure handler", conn
let secure = u.secureManagers.filterIt(it.codec == proto)[0]
let secure = self.secureManagers.filterIt(it.codec == proto)[0]
var cconn = conn
try:
@ -118,7 +126,7 @@ method upgradeIncoming*(u: MuxedUpgrade, incomingConn: Connection) {.async, gcsa
cconn = sconn
# add the muxer
for muxer in u.muxers.values:
for muxer in self.muxers.values:
ms.addHandler(muxer.codecs, muxer)
# handle subsequent secure requests
@ -136,7 +144,7 @@ method upgradeIncoming*(u: MuxedUpgrade, incomingConn: Connection) {.async, gcsa
try:
if (await ms.select(incomingConn)): # just handshake
# add the secure handlers
for k in u.secureManagers:
for k in self.secureManagers:
ms.addHandler(k.codec, securedHandler)
# handle un-secured connections
@ -150,7 +158,9 @@ method upgradeIncoming*(u: MuxedUpgrade, incomingConn: Connection) {.async, gcsa
if not isNil(incomingConn):
await incomingConn.close()
proc muxerHandler(u: MuxedUpgrade, muxer: Muxer) {.async, gcsafe.} =
proc muxerHandler(
self: MuxedUpgrade,
muxer: Muxer) {.async, gcsafe.} =
let
conn = muxer.connection
@ -160,13 +170,13 @@ proc muxerHandler(u: MuxedUpgrade, muxer: Muxer) {.async, gcsafe.} =
return
# store incoming connection
u.connManager.storeConn(conn)
self.connManager.storeConn(conn)
# store muxer and muxed connection
u.connManager.storeMuxer(muxer)
self.connManager.storeMuxer(muxer)
try:
await u.identify(muxer)
await self.identify(muxer)
except IdentifyError as exc:
# Identify is non-essential, though if it fails, it might indicate that
# the connection was closed already - this will be picked up by the read
@ -198,17 +208,20 @@ proc init*(
connManager: connManager,
ms: ms)
upgrader.streamHandler = proc(conn: Connection) {.async, gcsafe.} = # noraises
trace "Starting stream handler", conn
proc streamHandler(conn: Connection) {.async, gcsafe.} = # noraises
# trace "Starting stream handler", conn
try:
await upgrader.ms.handle(conn) # handle incoming connection
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in stream handler", conn, msg = exc.msg
# trace "exception in stream handler", conn, msg = exc.msg
discard
finally:
await conn.closeWithEOF()
trace "Stream handler done", conn
# trace "Stream handler done", conn
upgrader.streamHandler = streamHandler
for _, val in muxers:
val.streamHandler = upgrader.streamHandler

View File

@ -35,22 +35,28 @@ type
connManager*: ConnManager
secureManagers*: seq[Secure]
method upgradeIncoming*(u: Upgrade, conn: Connection): Future[void] {.base.} =
method upgradeIncoming*(
self: Upgrade,
conn: Connection): Future[void] {.base.} =
doAssert(false, "Not implemented!")
method upgradeOutgoing*(u: Upgrade, conn: Connection): Future[Connection] {.base.} =
method upgradeOutgoing*(
self: Upgrade,
conn: Connection): Future[Connection] {.base.} =
doAssert(false, "Not implemented!")
proc secure*(u: Upgrade, conn: Connection): Future[Connection] {.async, gcsafe.} =
if u.secureManagers.len <= 0:
proc secure*(
self: Upgrade,
conn: Connection): Future[Connection] {.async, gcsafe.} =
if self.secureManagers.len <= 0:
raise newException(UpgradeFailedError, "No secure managers registered!")
let codec = await u.ms.select(conn, u.secureManagers.mapIt(it.codec))
let codec = await self.ms.select(conn, self.secureManagers.mapIt(it.codec))
if codec.len == 0:
raise newException(UpgradeFailedError, "Unable to negotiate a secure channel!")
trace "Securing connection", conn, codec
let secureProtocol = u.secureManagers.filterIt(it.codec == codec)
let secureProtocol = self.secureManagers.filterIt(it.codec == codec)
# ms.select should deal with the correctness of this
# let's avoid duplicating checks but detect if it fails to do it properly
@ -58,11 +64,13 @@ proc secure*(u: Upgrade, conn: Connection): Future[Connection] {.async, gcsafe.}
return await secureProtocol[0].secure(conn, true)
proc identify*(u: Upgrade, conn: Connection) {.async, gcsafe.} =
proc identify*(
self: Upgrade,
conn: Connection) {.async, gcsafe.} =
## identify the connection
if (await u.ms.select(conn, u.identity.codec)):
let info = await u.identity.identify(conn, conn.peerInfo)
if (await self.ms.select(conn, self.identity.codec)):
let info = await self.identity.identify(conn, conn.peerInfo)
if info.pubKey.isNone and isNil(conn):
raise newException(UpgradeFailedError,

View File

@ -8,7 +8,8 @@ import ../libp2p/[protocols/identify,
multistream,
transports/transport,
transports/tcptransport,
crypto/crypto]
crypto/crypto,
upgrademngrs/upgrade]
import ./helpers
when defined(nimHasUsed): {.used.}
@ -38,8 +39,8 @@ suite "Identify":
remotePeerInfo = PeerInfo.init(
remoteSecKey, [ma], ["/test/proto1/1.0.0", "/test/proto2/1.0.0"])
transport1 = TcpTransport.init()
transport2 = TcpTransport.init()
transport1 = TcpTransport.init(upgrade = Upgrade())
transport2 = TcpTransport.init(upgrade = Upgrade())
identifyProto1 = newIdentify(remotePeerInfo)
identifyProto2 = newIdentify(remotePeerInfo)

View File

@ -9,6 +9,7 @@ import ../libp2p/[errors,
muxers/mplex/mplex,
muxers/mplex/coder,
muxers/mplex/lpchannel,
upgrademngrs/upgrade,
vbuffer,
varint]
@ -379,7 +380,7 @@ suite "Mplex":
asyncTest "read/write receiver":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1: TcpTransport = TcpTransport.init()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let listenFut = transport1.start(ma)
proc acceptHandler() {.async, gcsafe.} =
@ -395,7 +396,7 @@ suite "Mplex":
await mplexListen.close()
let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.init()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn)
@ -416,7 +417,7 @@ suite "Mplex":
asyncTest "read/write receiver lazy":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1: TcpTransport = TcpTransport.init()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let listenFut = transport1.start(ma)
proc acceptHandler() {.async, gcsafe.} =
@ -432,7 +433,7 @@ suite "Mplex":
await mplexListen.close()
let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.init()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn)
@ -460,7 +461,7 @@ suite "Mplex":
for _ in 0..<MaxMsgSize:
bigseq.add(uint8(rand(uint('A')..uint('z'))))
let transport1: TcpTransport = TcpTransport.init()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let listenFut = transport1.start(ma)
proc acceptHandler() {.async, gcsafe.} =
@ -484,7 +485,7 @@ suite "Mplex":
check false
let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.init()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn)
@ -507,7 +508,7 @@ suite "Mplex":
asyncTest "read/write initiator":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1: TcpTransport = TcpTransport.init()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let listenFut = transport1.start(ma)
proc acceptHandler() {.async, gcsafe.} =
@ -521,7 +522,7 @@ suite "Mplex":
await mplexListen.handle()
await mplexListen.close()
let transport2: TcpTransport = TcpTransport.init()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let acceptFut = acceptHandler()
@ -543,7 +544,7 @@ suite "Mplex":
asyncTest "multiple streams":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1 = TcpTransport.init()
let transport1 = TcpTransport.init(upgrade = Upgrade())
let listenFut = transport1.start(ma)
let done = newFuture[void]()
@ -563,7 +564,7 @@ suite "Mplex":
await mplexListen.handle()
await mplexListen.close()
let transport2: TcpTransport = TcpTransport.init()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let acceptFut = acceptHandler()
@ -587,7 +588,7 @@ suite "Mplex":
asyncTest "multiple read/write streams":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1: TcpTransport = TcpTransport.init()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let listenFut = transport1.start(ma)
let done = newFuture[void]()
@ -608,7 +609,7 @@ suite "Mplex":
await mplexListen.handle()
await mplexListen.close()
let transport2: TcpTransport = TcpTransport.init()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let acceptFut = acceptHandler()
@ -634,7 +635,7 @@ suite "Mplex":
asyncTest "channel closes listener with EOF":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1 = TcpTransport.init()
let transport1 = TcpTransport.init(upgrade = Upgrade())
var listenStreams: seq[Connection]
proc acceptHandler() {.async, gcsafe.} =
let conn = await transport1.accept()
@ -656,7 +657,7 @@ suite "Mplex":
await transport1.start(ma)
let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.init()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn)
@ -681,7 +682,7 @@ suite "Mplex":
asyncTest "channel closes dialer with EOF":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1 = TcpTransport.init()
let transport1 = TcpTransport.init(upgrade = Upgrade())
var count = 0
var done = newFuture[void]()
@ -704,7 +705,7 @@ suite "Mplex":
await transport1.start(ma)
let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.init()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn)
@ -746,7 +747,7 @@ suite "Mplex":
asyncTest "dialing mplex closes both ends":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1 = TcpTransport.init()
let transport1 = TcpTransport.init(upgrade = Upgrade())
var listenStreams: seq[Connection]
proc acceptHandler() {.async, gcsafe.} =
@ -763,7 +764,7 @@ suite "Mplex":
await transport1.start(ma)
let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.init()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn)
@ -788,7 +789,7 @@ suite "Mplex":
asyncTest "listening mplex closes both ends":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1 = TcpTransport.init()
let transport1 = TcpTransport.init(upgrade = Upgrade())
var mplexListen: Mplex
var listenStreams: seq[Connection]
@ -806,7 +807,7 @@ suite "Mplex":
await transport1.start(ma)
let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.init()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn)
@ -831,7 +832,7 @@ suite "Mplex":
asyncTest "canceling mplex handler closes both ends":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1 = TcpTransport.init()
let transport1 = TcpTransport.init(upgrade = Upgrade())
var mplexHandle: Future[void]
var listenStreams: seq[Connection]
@ -850,7 +851,7 @@ suite "Mplex":
await transport1.start(ma)
let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.init()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn)
@ -874,7 +875,7 @@ suite "Mplex":
asyncTest "closing dialing connection should close both ends":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1 = TcpTransport.init()
let transport1 = TcpTransport.init(upgrade = Upgrade())
var listenStreams: seq[Connection]
proc acceptHandler() {.async, gcsafe.} =
@ -891,7 +892,7 @@ suite "Mplex":
await transport1.start(ma)
let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.init()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn)
@ -916,7 +917,7 @@ suite "Mplex":
asyncTest "canceling listening connection should close both ends":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1 = TcpTransport.init()
let transport1 = TcpTransport.init(upgrade = Upgrade())
var listenConn: Connection
var listenStreams: seq[Connection]
@ -934,7 +935,7 @@ suite "Mplex":
await transport1.start(ma)
let acceptFut = acceptHandler()
let transport2: TcpTransport = TcpTransport.init()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let mplexDial = Mplex.init(conn)
@ -962,7 +963,7 @@ suite "Mplex":
asyncTest "channel should be able to handle erratic read/writes":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1: TcpTransport = TcpTransport.init()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let listenFut = transport1.start(ma)
var complete = newFuture[void]()
@ -983,7 +984,7 @@ suite "Mplex":
await mplexListen.handle()
await mplexListen.close()
let transport2: TcpTransport = TcpTransport.init()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let acceptFut = acceptHandler()
@ -1034,7 +1035,7 @@ suite "Mplex":
asyncTest "channel should handle 1 byte read/write":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1: TcpTransport = TcpTransport.init()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let listenFut = transport1.start(ma)
var complete = newFuture[void]()
@ -1052,7 +1053,7 @@ suite "Mplex":
await mplexListen.handle()
await mplexListen.close()
let transport2: TcpTransport = TcpTransport.init()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let acceptFut = acceptHandler()

View File

@ -7,7 +7,8 @@ import ../libp2p/errors,
../libp2p/multiaddress,
../libp2p/transports/transport,
../libp2p/transports/tcptransport,
../libp2p/protocols/protocol
../libp2p/protocols/protocol,
../libp2p/upgrademngrs/upgrade
import ./helpers
@ -247,7 +248,7 @@ suite "Multistream select":
let msListen = newMultistream()
msListen.addHandler("/test/proto/1.0.0", protocol)
let transport1: TcpTransport = TcpTransport.init()
let transport1 = TcpTransport.init(upgrade = Upgrade())
asyncCheck transport1.start(ma)
proc acceptHandler(): Future[void] {.async, gcsafe.} =
@ -258,7 +259,7 @@ suite "Multistream select":
let handlerWait = acceptHandler()
let msDial = newMultistream()
let transport2: TcpTransport = TcpTransport.init()
let transport2 = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
check (await msDial.select(conn, "/test/proto/1.0.0")) == true
@ -294,7 +295,7 @@ suite "Multistream select":
msListen.addHandler("/test/proto1/1.0.0", protocol)
msListen.addHandler("/test/proto2/1.0.0", protocol)
let transport1: TcpTransport = TcpTransport.init()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let listenFut = transport1.start(ma)
proc acceptHandler(): Future[void] {.async, gcsafe.} =
@ -310,7 +311,7 @@ suite "Multistream select":
let acceptFut = acceptHandler()
let msDial = newMultistream()
let transport2: TcpTransport = TcpTransport.init()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
let ls = await msDial.list(conn)
@ -339,7 +340,7 @@ suite "Multistream select":
let msListen = newMultistream()
msListen.addHandler("/test/proto/1.0.0", protocol)
let transport1: TcpTransport = TcpTransport.init()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
asyncCheck transport1.start(ma)
proc acceptHandler(): Future[void] {.async, gcsafe.} =
@ -348,7 +349,7 @@ suite "Multistream select":
let acceptFut = acceptHandler()
let msDial = newMultistream()
let transport2: TcpTransport = TcpTransport.init()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
check (await msDial.select(conn,
@ -377,7 +378,7 @@ suite "Multistream select":
msListen.addHandler("/test/proto1/1.0.0", protocol)
msListen.addHandler("/test/proto2/1.0.0", protocol)
let transport1: TcpTransport = TcpTransport.init()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
asyncCheck transport1.start(ma)
proc acceptHandler(): Future[void] {.async, gcsafe.} =
@ -386,7 +387,7 @@ suite "Multistream select":
let acceptFut = acceptHandler()
let msDial = newMultistream()
let transport2: TcpTransport = TcpTransport.init()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
check (await msDial.select(conn,

View File

@ -29,7 +29,9 @@ import ../libp2p/[switch,
muxers/mplex/mplex,
protocols/secure/noise,
protocols/secure/secio,
protocols/secure/secure]
protocols/secure/secure,
upgrademngrs/muxedupgrade,
connmanager]
import ./helpers
const
@ -51,23 +53,31 @@ method init(p: TestProto) {.gcsafe.} =
proc createSwitch(ma: MultiAddress; outgoing: bool, secio: bool = false): (Switch, PeerInfo) =
var peerInfo: PeerInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get())
peerInfo.addrs.add(ma)
let identify = newIdentify(peerInfo)
proc createMplex(conn: Connection): Muxer =
result = Mplex.init(conn)
let mplexProvider = newMuxerProvider(createMplex, MplexCodec)
let transports = @[Transport(TcpTransport.init())]
let muxers = [(MplexCodec, mplexProvider)].toTable()
let secureManagers = if secio:
let
identify = newIdentify(peerInfo)
mplexProvider = newMuxerProvider(createMplex, MplexCodec)
muxers = [(MplexCodec, mplexProvider)].toTable()
secureManagers = if secio:
[Secure(newSecio(rng, peerInfo.privateKey))]
else:
[Secure(newNoise(rng, peerInfo.privateKey, outgoing = outgoing))]
let switch = newSwitch(peerInfo,
connManager = ConnManager.init()
ms = newMultistream()
muxedUpgrade = MuxedUpgrade.init(identify, muxers, secureManagers, connManager, ms)
transports = @[Transport(TcpTransport.init(upgrade = muxedUpgrade))]
let switch = newSwitch(
peerInfo,
transports,
identify,
muxers,
secureManagers)
secureManagers,
connManager,
ms)
result = (switch, peerInfo)
suite "Noise":
@ -80,7 +90,7 @@ suite "Noise":
serverInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [server])
serverNoise = newNoise(rng, serverInfo.privateKey, outgoing = false)
let transport1: TcpTransport = TcpTransport.init()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
asyncCheck transport1.start(server)
proc acceptHandler() {.async.} =
@ -94,7 +104,7 @@ suite "Noise":
let
acceptFut = acceptHandler()
transport2: TcpTransport = TcpTransport.init()
transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
clientInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [transport1.ma])
clientNoise = newNoise(rng, clientInfo.privateKey, outgoing = true)
conn = await transport2.dial(transport1.ma)
@ -118,7 +128,7 @@ suite "Noise":
serverNoise = newNoise(rng, serverInfo.privateKey, outgoing = false)
let
transport1: TcpTransport = TcpTransport.init()
transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
asyncCheck transport1.start(server)
@ -134,7 +144,7 @@ suite "Noise":
let
handlerWait = acceptHandler()
transport2: TcpTransport = TcpTransport.init()
transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
clientInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [transport1.ma])
clientNoise = newNoise(rng, clientInfo.privateKey, outgoing = true, commonPrologue = @[1'u8, 2'u8, 3'u8])
conn = await transport2.dial(transport1.ma)
@ -154,7 +164,7 @@ suite "Noise":
serverNoise = newNoise(rng, serverInfo.privateKey, outgoing = false)
readTask = newFuture[void]()
let transport1: TcpTransport = TcpTransport.init()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
asyncCheck transport1.start(server)
proc acceptHandler() {.async, gcsafe.} =
@ -170,7 +180,7 @@ suite "Noise":
let
acceptFut = acceptHandler()
transport2: TcpTransport = TcpTransport.init()
transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
clientInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [transport1.ma])
clientNoise = newNoise(rng, clientInfo.privateKey, outgoing = true)
conn = await transport2.dial(transport1.ma)
@ -195,7 +205,7 @@ suite "Noise":
trace "Sending huge payload", size = hugePayload.len
let
transport1: TcpTransport = TcpTransport.init()
transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
listenFut = transport1.start(server)
proc acceptHandler() {.async, gcsafe.} =
@ -209,7 +219,7 @@ suite "Noise":
let
acceptFut = acceptHandler()
transport2: TcpTransport = TcpTransport.init()
transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
clientInfo = PeerInfo.init(PrivateKey.random(ECDSA, rng[]).get(), [transport1.ma])
clientNoise = newNoise(rng, clientInfo.privateKey, outgoing = true)
conn = await transport2.dial(transport1.ma)
@ -278,89 +288,3 @@ suite "Noise":
switch2.stop())
await allFuturesThrowing(awaiters)
# test "interop with rust noise":
# when true: # disable cos in CI we got no interop server/client
# proc testListenerDialer(): Future[bool] {.async.} =
# const
# proto = "/noise/xx/25519/chachapoly/sha256/0.1.0"
# let
# local = Multiaddress.init("/ip4/0.0.0.0/tcp/23456")
# info = PeerInfo.init(PrivateKey.random(ECDSA), [local])
# noise = newNoise(info.privateKey)
# ms = newMultistream()
# transport = TcpTransport.newTransport()
# proc connHandler(conn: Connection) {.async, gcsafe.} =
# try:
# await ms.handle(conn)
# trace "ms.handle exited"
# except:
# error getCurrentExceptionMsg()
# finally:
# await conn.close()
# ms.addHandler(proto, noise)
# let
# clientConn = await transport.listen(local, connHandler)
# await clientConn
# result = true
# check:
# waitFor(testListenerDialer()) == true
# test "interop with rust noise":
# when true: # disable cos in CI we got no interop server/client
# proc testListenerDialer(): Future[bool] {.async.} =
# const
# proto = "/noise/xx/25519/chachapoly/sha256/0.1.0"
# let
# local = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
# remote = Multiaddress.init("/ip4/127.0.0.1/tcp/23456")
# info = PeerInfo.init(PrivateKey.random(ECDSA), [local])
# noise = newNoise(info.privateKey)
# ms = newMultistream()
# transport = TcpTransport.newTransport()
# conn = await transport.dial(remote)
# check ms.select(conn, @[proto]).await == proto
# let
# sconn = await noise.secure(conn, true)
# # use sconn
# result = true
# check:
# waitFor(testListenerDialer()) == true
# test "interop with go noise":
# when true: # disable cos in CI we got no interop server/client
# proc testListenerDialer(): Future[bool] {.async.} =
# let
# local = Multiaddress.init("/ip4/0.0.0.0/tcp/23456")
# info = PeerInfo.init(PrivateKey.random(ECDSA), [local])
# noise = newNoise(info.privateKey)
# ms = newMultistream()
# transport = TcpTransport.newTransport()
# proc connHandler(conn: Connection) {.async, gcsafe.} =
# try:
# let seconn = await noise.secure(conn, false)
# trace "ms.handle exited"
# finally:
# await conn.close()
# let
# clientConn = await transport.listen(local, connHandler)
# await clientConn
# result = true
# check:
# waitFor(testListenerDialer()) == true

View File

@ -47,10 +47,10 @@ suite "Switch":
testProto.codec = TestCodec
testProto.handler = handle
let switch1 = newStandardSwitch(secureManagers = [SecureProtocol.Noise])
let switch1 = newStandardSwitch()
switch1.mount(testProto)
let switch2 = newStandardSwitch(secureManagers = [SecureProtocol.Noise])
let switch2 = newStandardSwitch()
var awaiters: seq[Future[void]]
awaiters.add(await switch1.start())
awaiters.add(await switch2.start())
@ -620,7 +620,7 @@ suite "Switch":
asyncTest "e2e canceling dial should not leak":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport = TcpTransport.init()
let transport = TcpTransport.init(upgrade = Upgrade())
await transport.start(ma)
proc acceptHandler() {.async, gcsafe.} =
@ -656,7 +656,7 @@ suite "Switch":
asyncTest "e2e closing remote conn should not leak":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport = TcpTransport.init()
let transport = TcpTransport.init(upgrade = Upgrade())
await transport.start(ma)
proc acceptHandler() {.async, gcsafe.} =

View File

@ -5,6 +5,7 @@ import chronos, stew/byteutils
import ../libp2p/[stream/connection,
transports/transport,
transports/tcptransport,
upgrademngrs/upgrade,
multiaddress,
errors,
wire]
@ -17,7 +18,7 @@ suite "TCP transport":
asyncTest "test listener: handle write":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport: TcpTransport = TcpTransport.init()
let transport: TcpTransport = TcpTransport.init(upgrade = Upgrade())
asyncCheck transport.start(ma)
proc acceptHandler() {.async, gcsafe.} =
@ -39,7 +40,7 @@ suite "TCP transport":
asyncTest "test listener: handle read":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport: TcpTransport = TcpTransport.init()
let transport: TcpTransport = TcpTransport.init(upgrade = Upgrade())
asyncCheck transport.start(ma)
proc acceptHandler() {.async, gcsafe.} =
@ -77,7 +78,7 @@ suite "TCP transport":
server.start()
let ma: MultiAddress = MultiAddress.init(server.sock.getLocalAddress()).tryGet()
let transport: TcpTransport = TcpTransport.init()
let transport: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport.dial(ma)
var msg = newSeq[byte](6)
await conn.readExactly(addr msg[0], 6)
@ -111,7 +112,7 @@ suite "TCP transport":
server.start()
let ma: MultiAddress = MultiAddress.init(server.sock.getLocalAddress()).tryGet()
let transport: TcpTransport = TcpTransport.init()
let transport: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport.dial(ma)
await conn.write("Hello!")
@ -127,7 +128,7 @@ suite "TCP transport":
asyncTest "e2e: handle write":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1: TcpTransport = TcpTransport.init()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
await transport1.start(ma)
proc acceptHandler() {.async, gcsafe.} =
@ -137,7 +138,7 @@ suite "TCP transport":
let handlerWait = acceptHandler()
let transport2: TcpTransport = TcpTransport.init()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
var msg = newSeq[byte](6)
await conn.readExactly(addr msg[0], 6)
@ -152,7 +153,7 @@ suite "TCP transport":
asyncTest "e2e: handle read":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1: TcpTransport = TcpTransport.init()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
asyncCheck transport1.start(ma)
proc acceptHandler() {.async, gcsafe.} =
@ -164,7 +165,7 @@ suite "TCP transport":
let handlerWait = acceptHandler()
let transport2: TcpTransport = TcpTransport.init()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let conn = await transport2.dial(transport1.ma)
await conn.write("Hello!")
@ -177,10 +178,10 @@ suite "TCP transport":
asyncTest "e2e: handle dial cancellation":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1: TcpTransport = TcpTransport.init()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
await transport1.start(ma)
let transport2: TcpTransport = TcpTransport.init()
let transport2: TcpTransport = TcpTransport.init(upgrade = Upgrade())
let cancellation = transport2.dial(transport1.ma)
await cancellation.cancelAndWait()
@ -192,7 +193,7 @@ suite "TCP transport":
asyncTest "e2e: handle accept cancellation":
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
let transport1: TcpTransport = TcpTransport.init()
let transport1: TcpTransport = TcpTransport.init(upgrade = Upgrade())
await transport1.start(ma)
let acceptHandler = transport1.accept()