From 67ef25fae0f96804073faaff37cac994a805254a Mon Sep 17 00:00:00 2001 From: diegomrsantos Date: Fri, 16 Dec 2022 12:32:00 +0100 Subject: [PATCH] Autonat service (#814) Co-authored-by: Tanguy --- libp2p/builders.nim | 8 +- libp2p/connmanager.nim | 8 + libp2p/protocols/connectivity/autonat.nim | 47 ++++-- .../protocols/connectivity/relay/client.nim | 1 + libp2p/services/autonatservice.nim | 157 ++++++++++++++++++ libp2p/switch.nim | 38 ++++- tests/config.nims | 2 + tests/stubs/autonatstub.nim | 36 ++++ tests/{stubs.nim => stubs/torstub.nim} | 2 +- tests/testautonatservice.nim | 155 +++++++++++++++++ tests/testnative.nim | 3 +- tests/testtortransport.nim | 4 +- 12 files changed, 437 insertions(+), 24 deletions(-) create mode 100644 libp2p/services/autonatservice.nim create mode 100644 tests/stubs/autonatstub.nim rename tests/{stubs.nim => stubs/torstub.nim} (98%) create mode 100644 tests/testautonatservice.nim diff --git a/libp2p/builders.nim b/libp2p/builders.nim index fdff75ba0..5ff516970 100644 --- a/libp2p/builders.nim +++ b/libp2p/builders.nim @@ -61,6 +61,7 @@ type autonat: bool circuitRelay: Relay rdv: RendezVous + services: seq[Service] proc new*(T: type[SwitchBuilder]): T {.public.} = ## Creates a SwitchBuilder @@ -199,6 +200,10 @@ proc withRendezVous*(b: SwitchBuilder, rdv: RendezVous = RendezVous.new()): Swit b.rdv = rdv b +proc withServices*(b: SwitchBuilder, services: seq[Service]): SwitchBuilder = + b.services = services + b + proc build*(b: SwitchBuilder): Switch {.raises: [Defect, LPError], public.} = @@ -254,7 +259,8 @@ proc build*(b: SwitchBuilder): Switch connManager = connManager, ms = ms, nameResolver = b.nameResolver, - peerStore = peerStore) + peerStore = peerStore, + services = b.services) if b.autonat: let autonat = Autonat.new(switch) diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index b7ac3d87a..e411d40ad 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -110,6 +110,13 @@ proc new*(C: type ConnManager, proc connCount*(c: ConnManager, peerId: PeerId): int = c.conns.getOrDefault(peerId).len +proc connectedPeers*(c: ConnManager, dir: Direction): seq[PeerId] = + var peers = newSeq[PeerId]() + for peerId, conns in c.conns: + if conns.anyIt(it.dir == dir): + peers.add(peerId) + return peers + proc addConnEventHandler*(c: ConnManager, handler: ConnEventHandler, kind: ConnEventKind) = @@ -537,3 +544,4 @@ proc close*(c: ConnManager) {.async.} = await conn.close() trace "Closed ConnManager" + diff --git a/libp2p/protocols/connectivity/autonat.nim b/libp2p/protocols/connectivity/autonat.nim index 6408a7e04..a6d754da5 100644 --- a/libp2p/protocols/connectivity/autonat.nim +++ b/libp2p/protocols/connectivity/autonat.nim @@ -32,6 +32,7 @@ const type AutonatError* = object of LPError + AutonatUnreachableError* = object of LPError MsgType* = enum Dial = 0 @@ -203,25 +204,37 @@ type sem: AsyncSemaphore switch*: Switch -proc dialMe*(a: Autonat, pid: PeerId, ma: MultiAddress|seq[MultiAddress]): - Future[MultiAddress] {.async.} = - let addrs = when ma is MultiAddress: @[ma] else: ma - let conn = await a.switch.dial(pid, addrs, AutonatCodec) +method dialMe*(a: Autonat, pid: PeerId, addrs: seq[MultiAddress] = newSeq[MultiAddress]()): + Future[MultiAddress] {.base, async.} = + + proc getResponseOrRaise(autonatMsg: Option[AutonatMsg]): AutonatDialResponse {.raises: [UnpackError, AutonatError].} = + if autonatMsg.isNone() or + autonatMsg.get().msgType != DialResponse or + autonatMsg.get().response.isNone() or + autonatMsg.get().response.get().ma.isNone(): + raise newException(AutonatError, "Unexpected response") + else: + autonatMsg.get().response.get() + + let conn = + try: + if addrs.len == 0: + await a.switch.dial(pid, @[AutonatCodec]) + else: + await a.switch.dial(pid, addrs, AutonatCodec) + except CatchableError as err: + raise newException(AutonatError, "Unexpected error when dialling", err) + defer: await conn.close() await conn.sendDial(a.switch.peerInfo.peerId, a.switch.peerInfo.addrs) - let msgOpt = AutonatMsg.decode(await conn.readLp(1024)) - if msgOpt.isNone() or - msgOpt.get().msgType != DialResponse or - msgOpt.get().response.isNone(): - raise newException(AutonatError, "Unexpected response") - let response = msgOpt.get().response.get() - if response.status != ResponseStatus.Ok: - raise newException(AutonatError, "Bad status " & - $response.status & " " & - response.text.get("")) - if response.ma.isNone(): - raise newException(AutonatError, "Missing address") - return response.ma.get() + let response = getResponseOrRaise(AutonatMsg.decode(await conn.readLp(1024))) + return case response.status: + of ResponseStatus.Ok: + response.ma.get() + of ResponseStatus.DialError: + raise newException(AutonatUnreachableError, "Peer could not dial us back") + else: + raise newException(AutonatError, "Bad status " & $response.status & " " & response.text.get("")) proc tryDial(a: Autonat, conn: Connection, addrs: seq[MultiAddress]) {.async.} = try: diff --git a/libp2p/protocols/connectivity/relay/client.nim b/libp2p/protocols/connectivity/relay/client.nim index b12a728dc..e92d21b72 100644 --- a/libp2p/protocols/connectivity/relay/client.nim +++ b/libp2p/protocols/connectivity/relay/client.nim @@ -25,6 +25,7 @@ import ./relay, ../../../multiaddress, ../../../stream/connection +export options logScope: topics = "libp2p relay relay-client" diff --git a/libp2p/services/autonatservice.nim b/libp2p/services/autonatservice.nim new file mode 100644 index 000000000..a70aa0001 --- /dev/null +++ b/libp2p/services/autonatservice.nim @@ -0,0 +1,157 @@ +# Nim-LibP2P +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import std/[options, deques, sequtils] +import chronos, metrics +import ../switch +import ../protocols/[connectivity/autonat] +import ../utils/heartbeat +import ../crypto/crypto + +declarePublicGauge(libp2p_autonat_reachability_confidence, "autonat reachability confidence", labels = ["reachability"]) + +type + AutonatService* = ref object of Service + newConnectedPeerHandler: PeerEventHandler + scheduleHandle: Future[void] + networkReachability: NetworkReachability + confidence: Option[float] + answers: Deque[NetworkReachability] + autonat: Autonat + statusAndConfidenceHandler: StatusAndConfidenceHandler + rng: ref HmacDrbgContext + scheduleInterval: Option[Duration] + askNewConnectedPeers: bool + numPeersToAsk: int + maxQueueSize: int + minConfidence: float + dialTimeout: Duration + + NetworkReachability* {.pure.} = enum + NotReachable, Reachable, Unknown + + StatusAndConfidenceHandler* = proc (networkReachability: NetworkReachability, confidence: Option[float]): Future[void] {.gcsafe, raises: [Defect].} + +proc new*( + T: typedesc[AutonatService], + autonat: Autonat, + rng: ref HmacDrbgContext, + scheduleInterval: Option[Duration] = none(Duration), + askNewConnectedPeers = true, + numPeersToAsk: int = 5, + maxQueueSize: int = 10, + minConfidence: float = 0.3, + dialTimeout = 5.seconds): T = + return T( + scheduleInterval: scheduleInterval, + networkReachability: Unknown, + confidence: none(float), + answers: initDeque[NetworkReachability](), + autonat: autonat, + rng: rng, + askNewConnectedPeers: askNewConnectedPeers, + numPeersToAsk: numPeersToAsk, + maxQueueSize: maxQueueSize, + minConfidence: minConfidence, + dialTimeout: dialTimeout) + +proc networkReachability*(self: AutonatService): NetworkReachability {.inline.} = + return self.networkReachability + +proc callHandler(self: AutonatService) {.async.} = + if not isNil(self.statusAndConfidenceHandler): + await self.statusAndConfidenceHandler(self.networkReachability, self.confidence) + +proc handleAnswer(self: AutonatService, ans: NetworkReachability) {.async.} = + + if self.answers.len == self.maxQueueSize: + self.answers.popFirst() + + self.answers.addLast(ans) + + self.networkReachability = Unknown + self.confidence = none(float) + const reachabilityPriority = [Reachable, NotReachable] + for reachability in reachabilityPriority: + let confidence = self.answers.countIt(it == reachability) / self.maxQueueSize + libp2p_autonat_reachability_confidence.set(value = confidence, labelValues = [$reachability]) + if self.confidence.isNone and confidence >= self.minConfidence: + self.networkReachability = reachability + self.confidence = some(confidence) + + trace "Current status", currentStats = $self.networkReachability, confidence = $self.confidence + +proc askPeer(self: AutonatService, s: Switch, peerId: PeerId): Future[NetworkReachability] {.async.} = + trace "Asking for reachability", peerId = $peerId + let ans = + try: + discard await self.autonat.dialMe(peerId).wait(self.dialTimeout) + Reachable + except AutonatUnreachableError: + trace "dialMe answer is not reachable", peerId = $peerId + NotReachable + except AsyncTimeoutError: + trace "dialMe timed out", peerId = $peerId + Unknown + except CatchableError as err: + trace "dialMe unexpected error", peerId = $peerId, errMsg = $err.msg + Unknown + await self.handleAnswer(ans) + if not isNil(self.statusAndConfidenceHandler): + await self.statusAndConfidenceHandler(self.networkReachability, self.confidence) + return ans + +proc askConnectedPeers(self: AutonatService, switch: Switch) {.async.} = + var peers = switch.connectedPeers(Direction.Out) + self.rng.shuffle(peers) + var answersFromPeers = 0 + for peer in peers: + if answersFromPeers >= self.numPeersToAsk: + break + elif (await askPeer(self, switch, peer)) != Unknown: + answersFromPeers.inc() + +proc schedule(service: AutonatService, switch: Switch, interval: Duration) {.async.} = + heartbeat "Schedule AutonatService run", interval: + await service.run(switch) + +method setup*(self: AutonatService, switch: Switch): Future[bool] {.async.} = + let hasBeenSetup = await procCall Service(self).setup(switch) + if hasBeenSetup: + if self.askNewConnectedPeers: + self.newConnectedPeerHandler = proc (peerId: PeerId, event: PeerEvent): Future[void] {.async.} = + discard askPeer(self, switch, peerId) + await self.callHandler() + switch.connManager.addPeerEventHandler(self.newConnectedPeerHandler, PeerEventKind.Joined) + if self.scheduleInterval.isSome(): + self.scheduleHandle = schedule(self, switch, self.scheduleInterval.get()) + return hasBeenSetup + +method run*(self: AutonatService, switch: Switch) {.async, public.} = + await askConnectedPeers(self, switch) + await self.callHandler() + + +method stop*(self: AutonatService, switch: Switch): Future[bool] {.async, public.} = + let hasBeenStopped = await procCall Service(self).stop(switch) + if hasBeenStopped: + if not isNil(self.scheduleHandle): + self.scheduleHandle.cancel() + self.scheduleHandle = nil + if not isNil(self.newConnectedPeerHandler): + switch.connManager.removePeerEventHandler(self.newConnectedPeerHandler, PeerEventKind.Joined) + return hasBeenStopped + +proc statusAndConfidenceHandler*(self: AutonatService, statusAndConfidenceHandler: StatusAndConfidenceHandler) = + self.statusAndConfidenceHandler = statusAndConfidenceHandler diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 85fe583ef..964ddd7c3 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -74,6 +74,28 @@ type peerStore*: PeerStore nameResolver*: NameResolver started: bool + services*: seq[Service] + + Service* = ref object of RootObj + inUse: bool + + +method setup*(self: Service, switch: Switch): Future[bool] {.base, async, gcsafe.} = + if self.inUse: + warn "service setup has already been called" + return false + self.inUse = true + return true + +method run*(self: Service, switch: Switch) {.base, async, gcsafe.} = + doAssert(false, "Not implemented!") + +method stop*(self: Service, switch: Switch): Future[bool] {.base, async, gcsafe.} = + if not self.inUse: + warn "service is already stopped" + return false + self.inUse = false + return true proc addConnEventHandler*(s: Switch, handler: ConnEventHandler, @@ -108,6 +130,9 @@ method addTransport*(s: Switch, t: Transport) = s.transports &= t s.dialer.addTransport(t) +proc connectedPeers*(s: Switch, dir: Direction): seq[PeerId] = + s.connManager.connectedPeers(dir) + proc isConnected*(s: Switch, peerId: PeerId): bool {.public.} = ## returns true if the peer has one or more ## associated connections @@ -294,6 +319,9 @@ proc stop*(s: Switch) {.async, public.} = if not a.finished: a.cancel() + for service in s.services: + discard await service.stop(s) + await s.ms.stop() trace "Switch stopped" @@ -335,6 +363,9 @@ proc start*(s: Switch) {.async, gcsafe, public.} = await s.ms.start() + for service in s.services: + discard await service.setup(s) + s.started = true debug "Started libp2p node", peer = s.peerInfo @@ -346,7 +377,8 @@ proc newSwitch*(peerInfo: PeerInfo, connManager: ConnManager, ms: MultistreamSelect, nameResolver: NameResolver = nil, - peerStore = PeerStore.new()): Switch + peerStore = PeerStore.new(), + services = newSeq[Service]()): Switch {.raises: [Defect, LPError], public.} = if secureManagers.len == 0: raise newException(LPError, "Provide at least one secure manager") @@ -358,8 +390,10 @@ proc newSwitch*(peerInfo: PeerInfo, connManager: connManager, peerStore: peerStore, dialer: Dialer.new(peerInfo.peerId, connManager, transports, ms, nameResolver), - nameResolver: nameResolver) + nameResolver: nameResolver, + services: services) switch.connManager.peerStore = peerStore switch.mount(identity) + return switch diff --git a/tests/config.nims b/tests/config.nims index 3ec031e76..ab493670d 100644 --- a/tests/config.nims +++ b/tests/config.nims @@ -1,3 +1,5 @@ import ../config.nims --threads:on +--d:metrics +--d:withoutPCRE diff --git a/tests/stubs/autonatstub.nim b/tests/stubs/autonatstub.nim new file mode 100644 index 000000000..ce301f4bc --- /dev/null +++ b/tests/stubs/autonatstub.nim @@ -0,0 +1,36 @@ +{.used.} + +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import chronos +import ../../libp2p/protocols/connectivity/autonat +import ../../libp2p/peerid +import ../../libp2p/multiaddress + +type + AutonatStub* = ref object of Autonat + returnSuccess*: bool + dials: int + expectedDials: int + finished*: Future[void] + +proc new*(T: typedesc[AutonatStub], expectedDials: int): T = + return T(dials: 0, expectedDials: expectedDials, finished: newFuture[void]()) + +method dialMe*( + self: AutonatStub, + pid: PeerId, + addrs: seq[MultiAddress] = newSeq[MultiAddress]()): + Future[MultiAddress] {.async.} = + + self.dials += 1 + + if self.dials == self.expectedDials: + self.finished.complete() + if self.returnSuccess: + return MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + else: + raise newException(AutonatUnreachableError, "") diff --git a/tests/stubs.nim b/tests/stubs/torstub.nim similarity index 98% rename from tests/stubs.nim rename to tests/stubs/torstub.nim index aae661de4..ca5fe9776 100644 --- a/tests/stubs.nim +++ b/tests/stubs/torstub.nim @@ -7,7 +7,7 @@ else: import tables import chronos, stew/[byteutils, endians2, shims/net] -import ../libp2p/[stream/connection, +import ../../libp2p/[stream/connection, protocols/connectivity/relay/utils, transports/tcptransport, transports/tortransport, diff --git a/tests/testautonatservice.nim b/tests/testautonatservice.nim new file mode 100644 index 000000000..fd922c870 --- /dev/null +++ b/tests/testautonatservice.nim @@ -0,0 +1,155 @@ +# Nim-LibP2P +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +import std/options +import chronos, metrics +import unittest2 +import ../libp2p/[builders, + switch, + services/autonatservice] +import ./helpers +import stubs/autonatstub + +proc createSwitch(autonatSvc: Service = nil): Switch = + var builder = SwitchBuilder.new() + .withRng(newRng()) + .withAddresses(@[ MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() ]) + .withTcpTransport() + .withMplex() + .withAutonat() + .withNoise() + + if autonatSvc != nil: + builder = builder.withServices(@[autonatSvc]) + + return builder.build() + +suite "Autonat Service": + teardown: + checkTrackers() + + asyncTest "Autonat Service Private Reachability test": + + let autonatStub = AutonatStub.new(expectedDials = 3) + autonatStub.returnSuccess = false + + let autonatService = AutonatService.new(autonatStub, newRng()) + + let switch1 = createSwitch(autonatService) + let switch2 = createSwitch() + let switch3 = createSwitch() + let switch4 = createSwitch() + + check autonatService.networkReachability() == NetworkReachability.Unknown + + await switch1.start() + await switch2.start() + await switch3.start() + await switch4.start() + + await switch1.connect(switch2.peerInfo.peerId, switch2.peerInfo.addrs) + await switch1.connect(switch3.peerInfo.peerId, switch3.peerInfo.addrs) + await switch1.connect(switch4.peerInfo.peerId, switch4.peerInfo.addrs) + + await autonatStub.finished + + check autonatService.networkReachability() == NetworkReachability.NotReachable + check libp2p_autonat_reachability_confidence.value(["NotReachable"]) == 0.3 + + await allFuturesThrowing( + switch1.stop(), switch2.stop(), switch3.stop(), switch4.stop()) + + asyncTest "Autonat Service Public Reachability test": + + let autonatStub = AutonatStub.new(expectedDials = 3) + autonatStub.returnSuccess = true + + let autonatService = AutonatService.new(autonatStub, newRng(), some(1.seconds)) + + let switch1 = createSwitch(autonatService) + let switch2 = createSwitch() + let switch3 = createSwitch() + let switch4 = createSwitch() + + check autonatService.networkReachability() == NetworkReachability.Unknown + + await switch1.start() + await switch2.start() + await switch3.start() + await switch4.start() + + await switch1.connect(switch2.peerInfo.peerId, switch2.peerInfo.addrs) + await switch1.connect(switch3.peerInfo.peerId, switch3.peerInfo.addrs) + await switch1.connect(switch4.peerInfo.peerId, switch4.peerInfo.addrs) + + await autonatStub.finished + + check autonatService.networkReachability() == NetworkReachability.Reachable + check libp2p_autonat_reachability_confidence.value(["Reachable"]) == 0.3 + + await allFuturesThrowing( + switch1.stop(), switch2.stop(), switch3.stop(), switch4.stop()) + + asyncTest "Autonat Service Full Reachability test": + + let autonatStub = AutonatStub.new(expectedDials = 6) + autonatStub.returnSuccess = false + + let autonatService = AutonatService.new(autonatStub, newRng(), some(1.seconds)) + + let switch1 = createSwitch(autonatService) + let switch2 = createSwitch() + let switch3 = createSwitch() + let switch4 = createSwitch() + + let awaiter = newFuture[void]() + + proc statusAndConfidenceHandler(networkReachability: NetworkReachability, confidence: Option[float]) {.gcsafe, async.} = + if networkReachability == NetworkReachability.NotReachable and confidence.isSome() and confidence.get() >= 0.3: + if not awaiter.finished: + autonatStub.returnSuccess = true + awaiter.complete() + + check autonatService.networkReachability() == NetworkReachability.Unknown + + autonatService.statusAndConfidenceHandler(statusAndConfidenceHandler) + + await switch1.start() + await switch2.start() + await switch3.start() + await switch4.start() + + await switch1.connect(switch2.peerInfo.peerId, switch2.peerInfo.addrs) + await switch1.connect(switch3.peerInfo.peerId, switch3.peerInfo.addrs) + await switch1.connect(switch4.peerInfo.peerId, switch4.peerInfo.addrs) + + await awaiter + + check autonatService.networkReachability() == NetworkReachability.NotReachable + check libp2p_autonat_reachability_confidence.value(["NotReachable"]) == 0.3 + + await autonatStub.finished + + check autonatService.networkReachability() == NetworkReachability.Reachable + check libp2p_autonat_reachability_confidence.value(["Reachable"]) == 0.3 + + await allFuturesThrowing(switch1.stop(), switch2.stop(), switch3.stop(), switch4.stop()) + +asyncTest "Autonat Service setup and stop twice": + + let switch = createSwitch() + let autonatService = AutonatService.new(AutonatStub.new(expectedDials = 0), newRng(), some(1.seconds)) + + check (await autonatService.setup(switch)) == true + check (await autonatService.setup(switch)) == false + + check (await autonatService.stop(switch)) == true + check (await autonatService.stop(switch)) == false + + await allFuturesThrowing(switch.stop()) diff --git a/tests/testnative.nim b/tests/testnative.nim index cc04fab66..fb1f0de9d 100644 --- a/tests/testnative.nim +++ b/tests/testnative.nim @@ -41,4 +41,5 @@ import testtcptransport, testrendezvous, testdiscovery, testyamux, - testautonat + testautonat, + testautonatservice diff --git a/tests/testtortransport.nim b/tests/testtortransport.nim index de9050e5a..2431458e6 100644 --- a/tests/testtortransport.nim +++ b/tests/testtortransport.nim @@ -14,9 +14,9 @@ import ../libp2p/[stream/connection, multiaddress, builders] -import ./helpers, ./stubs, ./commontransport +import ./helpers, ./stubs/torstub, ./commontransport -const torServer = initTAddress("127.0.0.1", 9050.Port) +const torServer = initTAddress("127.0.0.1", 9050.Port) var stub: TorServerStub var startFut: Future[void] suite "Tor transport":