Transport hole punching (#873)

Co-authored-by: Tanguy <tanguy@status.im>
This commit is contained in:
diegomrsantos 2023-04-06 15:23:35 +02:00 committed by GitHub
parent 95e98e8c51
commit 0041ed4cf8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 68 additions and 13 deletions

View File

@ -1,6 +1,6 @@
bearssl;https://github.com/status-im/nim-bearssl@#acf9645e328bdcab481cfda1c158e07ecd46bd7b bearssl;https://github.com/status-im/nim-bearssl@#acf9645e328bdcab481cfda1c158e07ecd46bd7b
chronicles;https://github.com/status-im/nim-chronicles@#32ac8679680ea699f7dbc046e8e0131cac97d41a chronicles;https://github.com/status-im/nim-chronicles@#32ac8679680ea699f7dbc046e8e0131cac97d41a
chronos;https://github.com/status-im/nim-chronos@#f7835a192b45c37e97614d865141f21eea8c156e chronos;https://github.com/status-im/nim-chronos@#ab5a8c2e0f6941fe3debd61dff0293790079d1b0
dnsclient;https://github.com/ba0f3/dnsclient.nim@#fcd7443634b950eaea574e5eaa00a628ae029823 dnsclient;https://github.com/ba0f3/dnsclient.nim@#fcd7443634b950eaea574e5eaa00a628ae029823
faststreams;https://github.com/status-im/nim-faststreams@#814f8927e1f356f39219f37f069b83066bcc893a faststreams;https://github.com/status-im/nim-faststreams@#814f8927e1f356f39219f37f069b83066bcc893a
httputils;https://github.com/status-im/nim-http-utils@#a85bd52ae0a956983ca6b3267c72961d2ec0245f httputils;https://github.com/status-im/nim-http-utils@#a85bd52ae0a956983ca6b3267c72961d2ec0245f

View File

@ -58,6 +58,9 @@ type
dial*: Option[AutonatDial] dial*: Option[AutonatDial]
response*: Option[AutonatDialResponse] response*: Option[AutonatDialResponse]
NetworkReachability* {.pure.} = enum
Unknown, NotReachable, Reachable
proc encode(p: AutonatPeerInfo): ProtoBuffer = proc encode(p: AutonatPeerInfo): ProtoBuffer =
result = initProtoBuffer() result = initProtoBuffer()
if p.id.isSome(): if p.id.isSome():

View File

@ -44,9 +44,6 @@ type
dialTimeout: Duration dialTimeout: Duration
enableAddressMapper: bool enableAddressMapper: bool
NetworkReachability* {.pure.} = enum
NotReachable, Reachable, Unknown
StatusAndConfidenceHandler* = proc (networkReachability: NetworkReachability, confidence: Option[float]): Future[void] {.gcsafe, raises: [Defect].} StatusAndConfidenceHandler* = proc (networkReachability: NetworkReachability, confidence: Option[float]): Future[void] {.gcsafe, raises: [Defect].}
proc new*( proc new*(

View File

@ -42,13 +42,15 @@ type
servers*: seq[StreamServer] servers*: seq[StreamServer]
clients: array[Direction, seq[StreamTransport]] clients: array[Direction, seq[StreamTransport]]
flags: set[ServerFlags] flags: set[ServerFlags]
clientFlags: set[TransportFlags] clientFlags: set[SocketFlags]
acceptFuts: seq[Future[StreamTransport]] acceptFuts: seq[Future[StreamTransport]]
TcpTransportTracker* = ref object of TrackerBase TcpTransportTracker* = ref object of TrackerBase
opened*: uint64 opened*: uint64
closed*: uint64 closed*: uint64
TcpTransportError* = object of transport.TransportError
proc setupTcpTransportTracker(): TcpTransportTracker {.gcsafe, raises: [Defect].} proc setupTcpTransportTracker(): TcpTransportTracker {.gcsafe, raises: [Defect].}
proc getTcpTransportTracker(): TcpTransportTracker {.gcsafe.} = proc getTcpTransportTracker(): TcpTransportTracker {.gcsafe.} =
@ -136,13 +138,14 @@ proc new*(
clientFlags: clientFlags:
if ServerFlags.TcpNoDelay in flags: if ServerFlags.TcpNoDelay in flags:
compilesOr: compilesOr:
{TransportFlags.TcpNoDelay} {SocketFlags.TcpNoDelay}
do: do:
doAssert(false) doAssert(false)
default(set[TransportFlags]) default(set[SocketFlags])
else: else:
default(set[TransportFlags]), default(set[SocketFlags]),
upgrader: upgrade) upgrader: upgrade,
networkReachability: NetworkReachability.Unknown)
return transport return transport
@ -165,6 +168,7 @@ method start*(
trace "Invalid address detected, skipping!", address = ma trace "Invalid address detected, skipping!", address = ma
continue continue
self.flags.incl(ServerFlags.ReusePort)
let server = createStreamServer( let server = createStreamServer(
ma = ma, ma = ma,
flags = self.flags, flags = self.flags,
@ -263,8 +267,13 @@ method dial*(
## ##
trace "Dialing remote peer", address = $address trace "Dialing remote peer", address = $address
let transp =
if self.networkReachability == NetworkReachability.NotReachable and self.addrs.len > 0:
self.clientFlags.incl(SocketFlags.ReusePort)
await connect(address, flags = self.clientFlags, localAddress = Opt.some(self.addrs[0]))
else:
await connect(address, flags = self.clientFlags)
let transp = await connect(address, flags = self.clientFlags)
try: try:
let observedAddr = await getObservedAddr(transp) let observedAddr = await getObservedAddr(transp)
return await self.connHandler(transp, Opt.some(observedAddr), Direction.Out) return await self.connHandler(transp, Opt.some(observedAddr), Direction.Out)

View File

@ -19,7 +19,10 @@ import ../stream/connection,
../multiaddress, ../multiaddress,
../multicodec, ../multicodec,
../muxers/muxer, ../muxers/muxer,
../upgrademngrs/upgrade ../upgrademngrs/upgrade,
../protocols/connectivity/autonat/core
export core.NetworkReachability
logScope: logScope:
topics = "libp2p transport" topics = "libp2p transport"
@ -33,6 +36,7 @@ type
addrs*: seq[MultiAddress] addrs*: seq[MultiAddress]
running*: bool running*: bool
upgrader*: Upgrade upgrader*: Upgrade
networkReachability*: NetworkReachability
proc newTransportClosedError*(parent: ref Exception = nil): ref LPError = proc newTransportClosedError*(parent: ref Exception = nil): ref LPError =
newException(TransportClosedError, newException(TransportClosedError,

View File

@ -77,7 +77,8 @@ proc connect*(
ma: MultiAddress, ma: MultiAddress,
bufferSize = DefaultStreamBufferSize, bufferSize = DefaultStreamBufferSize,
child: StreamTransport = nil, child: StreamTransport = nil,
flags = default(set[TransportFlags])): Future[StreamTransport] flags = default(set[SocketFlags]),
localAddress: Opt[MultiAddress] = Opt.none(MultiAddress)): Future[StreamTransport]
{.raises: [Defect, LPError, MaInvalidAddress].} = {.raises: [Defect, LPError, MaInvalidAddress].} =
## Open new connection to remote peer with address ``ma`` and create ## Open new connection to remote peer with address ``ma`` and create
## new transport object ``StreamTransport`` for established connection. ## new transport object ``StreamTransport`` for established connection.
@ -90,7 +91,9 @@ proc connect*(
let transportAddress = initTAddress(ma).tryGet() let transportAddress = initTAddress(ma).tryGet()
compilesOr: compilesOr:
return connect(transportAddress, bufferSize, child, flags) return connect(transportAddress, bufferSize, child,
if localAddress.isSome(): initTAddress(localAddress.get()).tryGet() else : TransportAddress(),
flags)
do: do:
# support for older chronos versions # support for older chronos versions
return connect(transportAddress, bufferSize, child) return connect(transportAddress, bufferSize, child)

View File

@ -7,6 +7,7 @@ import ../libp2p/[stream/connection,
transports/tcptransport, transports/tcptransport,
upgrademngrs/upgrade, upgrademngrs/upgrade,
multiaddress, multiaddress,
multicodec,
errors, errors,
wire] wire]
@ -125,6 +126,44 @@ suite "TCP transport":
server.close() server.close()
await server.join() await server.join()
asyncTest "Starting with duplicate but zero ports addresses must NOT fail":
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet(),
MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport: TcpTransport = TcpTransport.new(upgrade = Upgrade())
await transport.start(ma)
await transport.stop()
asyncTest "Bind to listening port when not reachable":
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport: TcpTransport = TcpTransport.new(upgrade = Upgrade())
await transport.start(ma)
let ma2 = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
await transport2.start(ma2)
let ma3 = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
let transport3: TcpTransport = TcpTransport.new(upgrade = Upgrade())
await transport3.start(ma3)
let listeningPort = transport.addrs[0][multiCodec("tcp")].get()
let conn = await transport.dial(transport2.addrs[0])
let acceptedConn = await transport2.accept()
let acceptedPort = acceptedConn.observedAddr.get()[multiCodec("tcp")].get()
check listeningPort != acceptedPort
transport.networkReachability = NetworkReachability.NotReachable
let conn2 = await transport.dial(transport3.addrs[0])
let acceptedConn2 = await transport3.accept()
let acceptedPort2 = acceptedConn2.observedAddr.get()[multiCodec("tcp")].get()
check listeningPort == acceptedPort2
await allFutures(transport.stop(), transport2.stop(), transport3.stop())
proc transProvider(): Transport = TcpTransport.new(upgrade = Upgrade()) proc transProvider(): Transport = TcpTransport.new(upgrade = Upgrade())
commonTransportTest( commonTransportTest(