From 0041ed4cf8127cce4fc0a2345dce1030b6794355 Mon Sep 17 00:00:00 2001 From: diegomrsantos Date: Thu, 6 Apr 2023 15:23:35 +0200 Subject: [PATCH] Transport hole punching (#873) Co-authored-by: Tanguy --- .pinned | 2 +- .../protocols/connectivity/autonat/core.nim | 3 ++ .../connectivity/autonat/service.nim | 3 -- libp2p/transports/tcptransport.nim | 21 +++++++--- libp2p/transports/transport.nim | 6 ++- libp2p/wire.nim | 7 +++- tests/testtcptransport.nim | 39 +++++++++++++++++++ 7 files changed, 68 insertions(+), 13 deletions(-) diff --git a/.pinned b/.pinned index 110bc93fb..7df527b2c 100644 --- a/.pinned +++ b/.pinned @@ -1,6 +1,6 @@ bearssl;https://github.com/status-im/nim-bearssl@#acf9645e328bdcab481cfda1c158e07ecd46bd7b chronicles;https://github.com/status-im/nim-chronicles@#32ac8679680ea699f7dbc046e8e0131cac97d41a -chronos;https://github.com/status-im/nim-chronos@#f7835a192b45c37e97614d865141f21eea8c156e +chronos;https://github.com/status-im/nim-chronos@#ab5a8c2e0f6941fe3debd61dff0293790079d1b0 dnsclient;https://github.com/ba0f3/dnsclient.nim@#fcd7443634b950eaea574e5eaa00a628ae029823 faststreams;https://github.com/status-im/nim-faststreams@#814f8927e1f356f39219f37f069b83066bcc893a httputils;https://github.com/status-im/nim-http-utils@#a85bd52ae0a956983ca6b3267c72961d2ec0245f diff --git a/libp2p/protocols/connectivity/autonat/core.nim b/libp2p/protocols/connectivity/autonat/core.nim index ce76f7a28..703ff9120 100644 --- a/libp2p/protocols/connectivity/autonat/core.nim +++ b/libp2p/protocols/connectivity/autonat/core.nim @@ -58,6 +58,9 @@ type dial*: Option[AutonatDial] response*: Option[AutonatDialResponse] + NetworkReachability* {.pure.} = enum + Unknown, NotReachable, Reachable + proc encode(p: AutonatPeerInfo): ProtoBuffer = result = initProtoBuffer() if p.id.isSome(): diff --git a/libp2p/protocols/connectivity/autonat/service.nim b/libp2p/protocols/connectivity/autonat/service.nim index 65ae99ceb..d10f96c26 100644 --- a/libp2p/protocols/connectivity/autonat/service.nim +++ b/libp2p/protocols/connectivity/autonat/service.nim @@ -44,9 +44,6 @@ type dialTimeout: Duration enableAddressMapper: bool - NetworkReachability* {.pure.} = enum - NotReachable, Reachable, Unknown - StatusAndConfidenceHandler* = proc (networkReachability: NetworkReachability, confidence: Option[float]): Future[void] {.gcsafe, raises: [Defect].} proc new*( diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index 46aa0a259..4af207ee1 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -42,13 +42,15 @@ type servers*: seq[StreamServer] clients: array[Direction, seq[StreamTransport]] flags: set[ServerFlags] - clientFlags: set[TransportFlags] + clientFlags: set[SocketFlags] acceptFuts: seq[Future[StreamTransport]] TcpTransportTracker* = ref object of TrackerBase opened*: uint64 closed*: uint64 + TcpTransportError* = object of transport.TransportError + proc setupTcpTransportTracker(): TcpTransportTracker {.gcsafe, raises: [Defect].} proc getTcpTransportTracker(): TcpTransportTracker {.gcsafe.} = @@ -136,13 +138,14 @@ proc new*( clientFlags: if ServerFlags.TcpNoDelay in flags: compilesOr: - {TransportFlags.TcpNoDelay} + {SocketFlags.TcpNoDelay} do: doAssert(false) - default(set[TransportFlags]) + default(set[SocketFlags]) else: - default(set[TransportFlags]), - upgrader: upgrade) + default(set[SocketFlags]), + upgrader: upgrade, + networkReachability: NetworkReachability.Unknown) return transport @@ -165,6 +168,7 @@ method start*( trace "Invalid address detected, skipping!", address = ma continue + self.flags.incl(ServerFlags.ReusePort) let server = createStreamServer( ma = ma, flags = self.flags, @@ -263,8 +267,13 @@ method dial*( ## trace "Dialing remote peer", address = $address + let transp = + if self.networkReachability == NetworkReachability.NotReachable and self.addrs.len > 0: + self.clientFlags.incl(SocketFlags.ReusePort) + await connect(address, flags = self.clientFlags, localAddress = Opt.some(self.addrs[0])) + else: + await connect(address, flags = self.clientFlags) - let transp = await connect(address, flags = self.clientFlags) try: let observedAddr = await getObservedAddr(transp) return await self.connHandler(transp, Opt.some(observedAddr), Direction.Out) diff --git a/libp2p/transports/transport.nim b/libp2p/transports/transport.nim index a5a651d7e..9a06a66f5 100644 --- a/libp2p/transports/transport.nim +++ b/libp2p/transports/transport.nim @@ -19,7 +19,10 @@ import ../stream/connection, ../multiaddress, ../multicodec, ../muxers/muxer, - ../upgrademngrs/upgrade + ../upgrademngrs/upgrade, + ../protocols/connectivity/autonat/core + +export core.NetworkReachability logScope: topics = "libp2p transport" @@ -33,6 +36,7 @@ type addrs*: seq[MultiAddress] running*: bool upgrader*: Upgrade + networkReachability*: NetworkReachability proc newTransportClosedError*(parent: ref Exception = nil): ref LPError = newException(TransportClosedError, diff --git a/libp2p/wire.nim b/libp2p/wire.nim index 6c3671f2d..088814945 100644 --- a/libp2p/wire.nim +++ b/libp2p/wire.nim @@ -77,7 +77,8 @@ proc connect*( ma: MultiAddress, bufferSize = DefaultStreamBufferSize, child: StreamTransport = nil, - flags = default(set[TransportFlags])): Future[StreamTransport] + flags = default(set[SocketFlags]), + localAddress: Opt[MultiAddress] = Opt.none(MultiAddress)): Future[StreamTransport] {.raises: [Defect, LPError, MaInvalidAddress].} = ## Open new connection to remote peer with address ``ma`` and create ## new transport object ``StreamTransport`` for established connection. @@ -90,7 +91,9 @@ proc connect*( let transportAddress = initTAddress(ma).tryGet() compilesOr: - return connect(transportAddress, bufferSize, child, flags) + return connect(transportAddress, bufferSize, child, + if localAddress.isSome(): initTAddress(localAddress.get()).tryGet() else : TransportAddress(), + flags) do: # support for older chronos versions return connect(transportAddress, bufferSize, child) diff --git a/tests/testtcptransport.nim b/tests/testtcptransport.nim index d0666e1d3..d765bfa5c 100644 --- a/tests/testtcptransport.nim +++ b/tests/testtcptransport.nim @@ -7,6 +7,7 @@ import ../libp2p/[stream/connection, transports/tcptransport, upgrademngrs/upgrade, multiaddress, + multicodec, errors, wire] @@ -125,6 +126,44 @@ suite "TCP transport": server.close() await server.join() + asyncTest "Starting with duplicate but zero ports addresses must NOT fail": + let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet(), + MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] + + let transport: TcpTransport = TcpTransport.new(upgrade = Upgrade()) + + await transport.start(ma) + await transport.stop() + + asyncTest "Bind to listening port when not reachable": + let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] + let transport: TcpTransport = TcpTransport.new(upgrade = Upgrade()) + await transport.start(ma) + + let ma2 = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] + let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade()) + await transport2.start(ma2) + + let ma3 = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()] + let transport3: TcpTransport = TcpTransport.new(upgrade = Upgrade()) + await transport3.start(ma3) + + let listeningPort = transport.addrs[0][multiCodec("tcp")].get() + + let conn = await transport.dial(transport2.addrs[0]) + let acceptedConn = await transport2.accept() + let acceptedPort = acceptedConn.observedAddr.get()[multiCodec("tcp")].get() + check listeningPort != acceptedPort + + transport.networkReachability = NetworkReachability.NotReachable + + let conn2 = await transport.dial(transport3.addrs[0]) + let acceptedConn2 = await transport3.accept() + let acceptedPort2 = acceptedConn2.observedAddr.get()[multiCodec("tcp")].get() + check listeningPort == acceptedPort2 + + await allFutures(transport.stop(), transport2.stop(), transport3.stop()) + proc transProvider(): Transport = TcpTransport.new(upgrade = Upgrade()) commonTransportTest(