From af5299f26c195540d3fbe0f1a20742658f8a6220 Mon Sep 17 00:00:00 2001 From: diegomrsantos Date: Fri, 24 Mar 2023 16:42:49 +0100 Subject: [PATCH] Create an ObservedAddrManager and add an AddressMapper in AutonatService and AutoRelayService (#871) Co-authored-by: Tanguy --- .pinned | 2 +- libp2p/observedaddrmanager.nim | 98 +++++++++++++++++++ libp2p/peerstore.nim | 5 +- .../connectivity/autonat/service.nim | 39 +++++++- libp2p/protocols/identify.nim | 41 ++++---- libp2p/services/autorelayservice.nim | 17 +++- tests/asyncunit.nim | 4 +- tests/testautonatservice.nim | 6 +- tests/testautorelay.nim | 5 +- tests/testmultiaddress.nim | 7 ++ tests/testnative.nim | 1 + tests/testobservedaddrmanager.nim | 64 ++++++++++++ 12 files changed, 261 insertions(+), 28 deletions(-) create mode 100644 libp2p/observedaddrmanager.nim create mode 100644 tests/testobservedaddrmanager.nim diff --git a/.pinned b/.pinned index a9b5e63f5..110bc93fb 100644 --- a/.pinned +++ b/.pinned @@ -1,6 +1,6 @@ bearssl;https://github.com/status-im/nim-bearssl@#acf9645e328bdcab481cfda1c158e07ecd46bd7b chronicles;https://github.com/status-im/nim-chronicles@#32ac8679680ea699f7dbc046e8e0131cac97d41a -chronos;https://github.com/status-im/nim-chronos@#5d3da66e563d21277b57a9b601744273c083a01b +chronos;https://github.com/status-im/nim-chronos@#f7835a192b45c37e97614d865141f21eea8c156e dnsclient;https://github.com/ba0f3/dnsclient.nim@#fcd7443634b950eaea574e5eaa00a628ae029823 faststreams;https://github.com/status-im/nim-faststreams@#814f8927e1f356f39219f37f069b83066bcc893a httputils;https://github.com/status-im/nim-http-utils@#a85bd52ae0a956983ca6b3267c72961d2ec0245f diff --git a/libp2p/observedaddrmanager.nim b/libp2p/observedaddrmanager.nim new file mode 100644 index 000000000..882e16ea7 --- /dev/null +++ b/libp2p/observedaddrmanager.nim @@ -0,0 +1,98 @@ +# Nim-LibP2P +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/[sequtils, tables], + chronos, chronicles, + multiaddress, multicodec + +type + ## Manages observed MultiAddresses by reomte peers. It keeps track of the most observed IP and IP/Port. + ObservedAddrManager* = ref object of RootObj + observedIPsAndPorts: seq[MultiAddress] + maxSize: int + minCount: int + +proc addObservation*(self:ObservedAddrManager, observedAddr: MultiAddress): bool = + ## Adds a new observed MultiAddress. If the number of observations exceeds maxSize, the oldest one is removed. + if self.observedIPsAndPorts.len >= self.maxSize: + self.observedIPsAndPorts.del(0) + self.observedIPsAndPorts.add(observedAddr) + return true + +proc getProtocol(self: ObservedAddrManager, observations: seq[MultiAddress], multiCodec: MultiCodec): Opt[MultiAddress] = + var countTable = toCountTable(observations) + countTable.sort() + var orderedPairs = toSeq(countTable.pairs) + for (ma, count) in orderedPairs: + let maFirst = ma[0].get() + if maFirst.protoCode.get() == multiCodec and count >= self.minCount: + return Opt.some(ma) + return Opt.none(MultiAddress) + +proc getMostObservedProtocol(self: ObservedAddrManager, multiCodec: MultiCodec): Opt[MultiAddress] = + ## Returns the most observed IP address or none if the number of observations are less than minCount. + let observedIPs = self.observedIPsAndPorts.mapIt(it[0].get()) + return self.getProtocol(observedIPs, multiCodec) + +proc getMostObservedProtoAndPort(self: ObservedAddrManager, multiCodec: MultiCodec): Opt[MultiAddress] = + ## Returns the most observed IP/Port address or none if the number of observations are less than minCount. + return self.getProtocol(self.observedIPsAndPorts, multiCodec) + +proc getMostObservedProtosAndPorts*(self: ObservedAddrManager): seq[MultiAddress] = + ## Returns the most observed IP4/Port and IP6/Port address or an empty seq if the number of observations + ## are less than minCount. + var res: seq[MultiAddress] + let ip4 = self.getMostObservedProtoAndPort(multiCodec("ip4")) + if ip4.isSome(): + res.add(ip4.get()) + let ip6 = self.getMostObservedProtoAndPort(multiCodec("ip6")) + if ip6.isSome(): + res.add(ip6.get()) + return res + +proc guessDialableAddr*( + self: ObservedAddrManager, + ma: MultiAddress): MultiAddress = + ## Replaces the first proto valeu of each listen address by the corresponding (matching the proto code) most observed value. + ## If the most observed value is not available, the original MultiAddress is returned. + try: + let maFirst = ma[0] + let maRest = ma[1..^1] + if maRest.isErr(): + return ma + + let observedIP = self.getMostObservedProtocol(maFirst.get().protoCode().get()) + return + if observedIP.isNone() or maFirst.get() == observedIP.get(): + ma + else: + observedIP.get() & maRest.get() + except CatchableError as error: + debug "Error while handling manual port forwarding", msg = error.msg + return ma + +proc `$`*(self: ObservedAddrManager): string = + ## Returns a string representation of the ObservedAddrManager. + return "IPs and Ports: " & $self.observedIPsAndPorts + +proc new*( + T: typedesc[ObservedAddrManager], + maxSize = 10, + minCount = 3): T = + ## Creates a new ObservedAddrManager. + return T( + observedIPsAndPorts: newSeq[MultiAddress](), + maxSize: maxSize, + minCount: minCount) diff --git a/libp2p/peerstore.nim b/libp2p/peerstore.nim index 5cb1df44b..617a40ec6 100644 --- a/libp2p/peerstore.nim +++ b/libp2p/peerstore.nim @@ -28,7 +28,7 @@ else: import std/[tables, sets, options, macros], - chronos, + chronos, chronicles, ./crypto/crypto, ./protocols/identify, ./protocols/protocol, @@ -220,3 +220,6 @@ proc identify*( peerStore.updatePeerInfo(info) finally: await stream.closeWithEOF() + +proc guessDialableAddr*(self: PeerStore, ma: MultiAddress): MultiAddress = + return self.identify.observedAddrManager.guessDialableAddr(ma) diff --git a/libp2p/protocols/connectivity/autonat/service.nim b/libp2p/protocols/connectivity/autonat/service.nim index f75907d33..65ae99ceb 100644 --- a/libp2p/protocols/connectivity/autonat/service.nim +++ b/libp2p/protocols/connectivity/autonat/service.nim @@ -15,6 +15,7 @@ else: import std/[options, deques, sequtils] import chronos, metrics import ../../../switch +import ../../../wire import client import ../../../utils/heartbeat import ../../../crypto/crypto @@ -27,6 +28,7 @@ declarePublicGauge(libp2p_autonat_reachability_confidence, "autonat reachability type AutonatService* = ref object of Service newConnectedPeerHandler: PeerEventHandler + addressMapper: AddressMapper scheduleHandle: Future[void] networkReachability: NetworkReachability confidence: Option[float] @@ -40,6 +42,7 @@ type maxQueueSize: int minConfidence: float dialTimeout: Duration + enableAddressMapper: bool NetworkReachability* {.pure.} = enum NotReachable, Reachable, Unknown @@ -55,7 +58,8 @@ proc new*( numPeersToAsk: int = 5, maxQueueSize: int = 10, minConfidence: float = 0.3, - dialTimeout = 30.seconds): T = + dialTimeout = 30.seconds, + enableAddressMapper = true): T = return T( scheduleInterval: scheduleInterval, networkReachability: Unknown, @@ -67,7 +71,8 @@ proc new*( numPeersToAsk: numPeersToAsk, maxQueueSize: maxQueueSize, minConfidence: minConfidence, - dialTimeout: dialTimeout) + dialTimeout: dialTimeout, + enableAddressMapper: enableAddressMapper) proc networkReachability*(self: AutonatService): NetworkReachability {.inline.} = return self.networkReachability @@ -133,6 +138,7 @@ proc askPeer(self: AutonatService, switch: Switch, peerId: PeerId): Future[Netwo await self.handleAnswer(ans) if not isNil(self.statusAndConfidenceHandler): await self.statusAndConfidenceHandler(self.networkReachability, self.confidence) + await switch.peerInfo.update() return ans proc askConnectedPeers(self: AutonatService, switch: Switch) {.async.} = @@ -153,7 +159,30 @@ proc schedule(service: AutonatService, switch: Switch, interval: Duration) {.asy heartbeat "Scheduling AutonatService run", interval: await service.run(switch) +proc addressMapper( + self: AutonatService, + peerStore: PeerStore, + listenAddrs: seq[MultiAddress]): Future[seq[MultiAddress]] {.gcsafe, async.} = + + if self.networkReachability != NetworkReachability.Reachable: + return listenAddrs + + var addrs = newSeq[MultiAddress]() + for listenAddr in listenAddrs: + var processedMA = listenAddr + try: + let hostIP = initTAddress(listenAddr).get() + if not hostIP.isGlobal() and self.networkReachability == NetworkReachability.Reachable: + processedMA = peerStore.guessDialableAddr(listenAddr) # handle manual port forwarding + except CatchableError as exc: + debug "Error while handling address mapper", msg = exc.msg + addrs.add(processedMA) + return addrs + method setup*(self: AutonatService, switch: Switch): Future[bool] {.async.} = + self.addressMapper = proc (listenAddrs: seq[MultiAddress]): Future[seq[MultiAddress]] {.gcsafe, async.} = + return await addressMapper(self, switch.peerStore, listenAddrs) + info "Setting up AutonatService" let hasBeenSetup = await procCall Service(self).setup(switch) if hasBeenSetup: @@ -163,6 +192,8 @@ method setup*(self: AutonatService, switch: Switch): Future[bool] {.async.} = switch.connManager.addPeerEventHandler(self.newConnectedPeerHandler, PeerEventKind.Joined) if self.scheduleInterval.isSome(): self.scheduleHandle = schedule(self, switch, self.scheduleInterval.get()) + if self.enableAddressMapper: + switch.peerInfo.addressMappers.add(self.addressMapper) return hasBeenSetup method run*(self: AutonatService, switch: Switch) {.async, public.} = @@ -170,7 +201,6 @@ method run*(self: AutonatService, switch: Switch) {.async, public.} = await askConnectedPeers(self, switch) await self.callHandler() - method stop*(self: AutonatService, switch: Switch): Future[bool] {.async, public.} = info "Stopping AutonatService" let hasBeenStopped = await procCall Service(self).stop(switch) @@ -180,6 +210,9 @@ method stop*(self: AutonatService, switch: Switch): Future[bool] {.async, public self.scheduleHandle = nil if not isNil(self.newConnectedPeerHandler): switch.connManager.removePeerEventHandler(self.newConnectedPeerHandler, PeerEventKind.Joined) + if self.enableAddressMapper: + switch.peerInfo.addressMappers.keepItIf(it != self.addressMapper) + await switch.peerInfo.update() return hasBeenStopped proc statusAndConfidenceHandler*(self: AutonatService, statusAndConfidenceHandler: StatusAndConfidenceHandler) = diff --git a/libp2p/protocols/identify.nim b/libp2p/protocols/identify.nim index 61a8ddad1..85202d596 100644 --- a/libp2p/protocols/identify.nim +++ b/libp2p/protocols/identify.nim @@ -26,7 +26,10 @@ import ../protobuf/minprotobuf, ../multiaddress, ../protocols/protocol, ../utility, - ../errors + ../errors, + ../observedaddrmanager + +export observedaddrmanager logScope: topics = "libp2p identify" @@ -56,6 +59,7 @@ type Identify* = ref object of LPProtocol peerInfo*: PeerInfo sendSignedPeerRecord*: bool + observedAddrManager*: ObservedAddrManager IdentifyPushHandler* = proc ( peer: PeerId, @@ -160,7 +164,8 @@ proc new*( ): T = let identify = T( peerInfo: peerInfo, - sendSignedPeerRecord: sendSignedPeerRecord + sendSignedPeerRecord: sendSignedPeerRecord, + observedAddrManager: ObservedAddrManager.new(), ) identify.init() identify @@ -182,7 +187,7 @@ method init*(p: Identify) = p.handler = handle p.codec = IdentifyCodec -proc identify*(p: Identify, +proc identify*(self: Identify, conn: Connection, remotePeerId: PeerId): Future[IdentifyInfo] {.async, gcsafe.} = trace "initiating identify", conn @@ -194,23 +199,25 @@ proc identify*(p: Identify, let infoOpt = decodeMsg(message) if infoOpt.isNone(): raise newException(IdentityInvalidMsgError, "Incorrect message received!") - result = infoOpt.get() - if result.pubkey.isSome: - let peer = PeerId.init(result.pubkey.get()) - if peer.isErr: - raise newException(IdentityInvalidMsgError, $peer.error) - else: - result.peerId = peer.get() - if peer.get() != remotePeerId: - trace "Peer ids don't match", - remote = peer, - local = remotePeerId - - raise newException(IdentityNoMatchError, "Peer ids don't match") - else: + var info = infoOpt.get() + if info.pubkey.isNone(): raise newException(IdentityInvalidMsgError, "No pubkey in identify") + let peer = PeerId.init(info.pubkey.get()) + if peer.isErr: + raise newException(IdentityInvalidMsgError, $peer.error) + + if peer.get() != remotePeerId: + trace "Peer ids don't match", remote = peer, local = remotePeerId + raise newException(IdentityNoMatchError, "Peer ids don't match") + info.peerId = peer.get() + + if info.observedAddr.isSome: + if not self.observedAddrManager.addObservation(info.observedAddr.get()): + debug "Observed address is not valid", observedAddr = info.observedAddr.get() + return info + proc new*(T: typedesc[IdentifyPush], handler: IdentifyPushHandler = nil): T {.public.} = ## Create a IdentifyPush protocol. `handler` will be called every time ## a peer sends us new `PeerInfo` diff --git a/libp2p/services/autorelayservice.nim b/libp2p/services/autorelayservice.nim index 14f58c76a..defd42f17 100644 --- a/libp2p/services/autorelayservice.nim +++ b/libp2p/services/autorelayservice.nim @@ -32,9 +32,15 @@ type backingOff: seq[PeerId] peerAvailable: AsyncEvent onReservation: OnReservationHandler + addressMapper: AddressMapper rng: ref HmacDrbgContext -proc reserveAndUpdate(self: AutoRelayService, relayPid: PeerId, selfPid: PeerId) {.async.} = +proc addressMapper( + self: AutoRelayService, + listenAddrs: seq[MultiAddress]): Future[seq[MultiAddress]] {.gcsafe, async.} = + return concat(toSeq(self.relayAddresses.values)) + +proc reserveAndUpdate(self: AutoRelayService, relayPid: PeerId, switch: Switch) {.async.} = while self.running: let rsvp = await self.client.reserve(relayPid).wait(chronos.seconds(5)) @@ -46,11 +52,15 @@ proc reserveAndUpdate(self: AutoRelayService, relayPid: PeerId, selfPid: PeerId) break if relayPid notin self.relayAddresses or self.relayAddresses[relayPid] != relayedAddr: self.relayAddresses[relayPid] = relayedAddr + await switch.peerInfo.update() if not self.onReservation.isNil(): self.onReservation(concat(toSeq(self.relayAddresses.values))) await sleepAsync chronos.seconds(ttl - 30) method setup*(self: AutoRelayService, switch: Switch): Future[bool] {.async, gcsafe.} = + self.addressMapper = proc (listenAddrs: seq[MultiAddress]): Future[seq[MultiAddress]] {.gcsafe, async.} = + return await addressMapper(self, listenAddrs) + let hasBeenSetUp = await procCall Service(self).setup(switch) if hasBeenSetUp: proc handlePeerJoined(peerId: PeerId, event: PeerEvent) {.async.} = @@ -63,6 +73,7 @@ method setup*(self: AutoRelayService, switch: Switch): Future[bool] {.async, gcs future[].cancel() switch.addPeerEventHandler(handlePeerJoined, Joined) switch.addPeerEventHandler(handlePeerLeft, Left) + switch.peerInfo.addressMappers.add(self.addressMapper) await self.run(switch) return hasBeenSetUp @@ -96,7 +107,7 @@ proc innerRun(self: AutoRelayService, switch: Switch) {.async, gcsafe.} = for relayPid in connectedPeers: if self.relayPeers.len() >= self.numRelays: break - self.relayPeers[relayPid] = self.reserveAndUpdate(relayPid, switch.peerInfo.peerId) + self.relayPeers[relayPid] = self.reserveAndUpdate(relayPid, switch) if self.relayPeers.len() > 0: await one(toSeq(self.relayPeers.values())) or self.peerAvailable.wait() @@ -116,6 +127,8 @@ method stop*(self: AutoRelayService, switch: Switch): Future[bool] {.async, gcsa if hasBeenStopped: self.running = false self.runner.cancel() + switch.peerInfo.addressMappers.keepItIf(it != self.addressMapper) + await switch.peerInfo.update() return hasBeenStopped proc getAddresses*(self: AutoRelayService): seq[MultiAddress] = diff --git a/tests/asyncunit.nim b/tests/asyncunit.nim index fa10c9eb2..1589bf621 100644 --- a/tests/asyncunit.nim +++ b/tests/asyncunit.nim @@ -1,6 +1,6 @@ -import unittest2 +import unittest2, chronos -export unittest2 +export unittest2, chronos template asyncTeardown*(body: untyped): untyped = teardown: diff --git a/tests/testautonatservice.nim b/tests/testautonatservice.nim index 4a0d35d63..21636c74b 100644 --- a/tests/testautonatservice.nim +++ b/tests/testautonatservice.nim @@ -7,7 +7,7 @@ # This file may not be copied, modified, or distributed except according to # those terms. -import std/options +import std/[options, sequtils] import chronos, metrics import unittest2 import ../libp2p/[builders, @@ -104,9 +104,13 @@ suite "Autonat Service": check autonatService.networkReachability() == NetworkReachability.Reachable check libp2p_autonat_reachability_confidence.value(["Reachable"]) == 0.3 + check switch1.peerInfo.addrs == switch1.peerInfo.listenAddrs.mapIt(switch1.peerStore.guessDialableAddr(it)) + await allFuturesThrowing( switch1.stop(), switch2.stop(), switch3.stop(), switch4.stop()) + check switch1.peerInfo.addrs == switch1.peerInfo.listenAddrs + asyncTest "Peer must be not reachable and then reachable": let autonatClientStub = AutonatClientStub.new(expectedDials = 6) diff --git a/tests/testautorelay.nim b/tests/testautorelay.nim index 9fc58d96c..3117387ee 100644 --- a/tests/testautorelay.nim +++ b/tests/testautorelay.nim @@ -72,10 +72,13 @@ suite "Autorelay": await fut.wait(1.seconds) let addresses = autorelay.getAddresses() check: - addresses[0] == buildRelayMA(switchRelay, switchClient) + addresses == @[buildRelayMA(switchRelay, switchClient)] addresses.len() == 1 + addresses == switchClient.peerInfo.addrs await allFutures(switchClient.stop(), switchRelay.stop()) + check addresses != switchClient.peerInfo.addrs + asyncTest "Three relays connections": var state = 0 let diff --git a/tests/testmultiaddress.nim b/tests/testmultiaddress.nim index c180db7f2..3a0b9afc1 100644 --- a/tests/testmultiaddress.nim +++ b/tests/testmultiaddress.nim @@ -405,5 +405,12 @@ suite "MultiAddress test suite": let maWithTcp = MultiAddress.init(onionMAWithTcpStr).get() check $(maWithTcp[multiCodec("onion3")].tryGet()) == onionMAStr + test "matchPartial": + const + tcp = mapEq("tcp") + let ma = MultiAddress.init("/ip4/0.0.0.0/tcp/0").get() + + check not tcp.matchPartial(ma) + check IP4.matchPartial(ma) diff --git a/tests/testnative.nim b/tests/testnative.nim index b41508160..25dd7d0f0 100644 --- a/tests/testnative.nim +++ b/tests/testnative.nim @@ -28,6 +28,7 @@ import testtcptransport, testmultistream, testbufferstream, testidentify, + testobservedaddrmanager, testconnmngr, testswitch, testnoise, diff --git a/tests/testobservedaddrmanager.nim b/tests/testobservedaddrmanager.nim new file mode 100644 index 000000000..a5b60df70 --- /dev/null +++ b/tests/testobservedaddrmanager.nim @@ -0,0 +1,64 @@ +import unittest2, + ../libp2p/multiaddress, + ../libp2p/observedaddrmanager, + ./helpers + +suite "ObservedAddrManager": + teardown: + checkTrackers() + + asyncTest "Calculate the most oberserved IP correctly": + + let observedAddrManager = ObservedAddrManager.new(minCount = 3) + + # Calculate the most oberserved IP4 correctly + let mostObservedIP4AndPort = MultiAddress.init("/ip4/1.2.3.0/tcp/1").get() + let maIP4 = MultiAddress.init("/ip4/0.0.0.0/tcp/80").get() + + check: + observedAddrManager.addObservation(mostObservedIP4AndPort) + observedAddrManager.addObservation(mostObservedIP4AndPort) + + observedAddrManager.guessDialableAddr(maIP4) == maIP4 + + observedAddrManager.addObservation(MultiAddress.init("/ip4/1.2.3.0/tcp/2").get()) + observedAddrManager.addObservation(MultiAddress.init("/ip4/1.2.3.1/tcp/1").get()) + + observedAddrManager.guessDialableAddr(maIP4) == MultiAddress.init("/ip4/1.2.3.0/tcp/80").get() + observedAddrManager.getMostObservedProtosAndPorts().len == 0 + + observedAddrManager.addObservation(mostObservedIP4AndPort) + + observedAddrManager.getMostObservedProtosAndPorts() == @[mostObservedIP4AndPort] + + # Calculate the most oberserved IP6 correctly + let mostObservedIP6AndPort = MultiAddress.init("/ip6/::2/tcp/1").get() + let maIP6 = MultiAddress.init("/ip6/::1/tcp/80").get() + + check: + observedAddrManager.addObservation(mostObservedIP6AndPort) + observedAddrManager.addObservation(mostObservedIP6AndPort) + + observedAddrManager.guessDialableAddr(maIP6) == maIP6 + + observedAddrManager.addObservation(MultiAddress.init("/ip6/::2/tcp/2").get()) + observedAddrManager.addObservation(MultiAddress.init("/ip6/::3/tcp/1").get()) + + observedAddrManager.guessDialableAddr(maIP6) == MultiAddress.init("/ip6/::2/tcp/80").get() + observedAddrManager.getMostObservedProtosAndPorts().len == 1 + + observedAddrManager.addObservation(mostObservedIP6AndPort) + + observedAddrManager.getMostObservedProtosAndPorts() == @[mostObservedIP4AndPort, mostObservedIP6AndPort] + + asyncTest "replace first proto value by most observed when there is only one protocol": + let observedAddrManager = ObservedAddrManager.new(minCount = 3) + let mostObservedIP4AndPort = MultiAddress.init("/ip4/1.2.3.4/tcp/1").get() + + check: + observedAddrManager.addObservation(mostObservedIP4AndPort) + observedAddrManager.addObservation(mostObservedIP4AndPort) + observedAddrManager.addObservation(mostObservedIP4AndPort) + + observedAddrManager.guessDialableAddr( + MultiAddress.init("/ip4/0.0.0.0").get()) == MultiAddress.init("/ip4/1.2.3.4").get()