From 489c115132f74f2d95ac59053b069092f2fd6e8b Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Tue, 17 Jan 2023 16:18:38 +0100 Subject: [PATCH] Autorelay service (#819) --- libp2p/services/autorelayservice.nim | 133 +++++++++++++++++++++++++++ libp2p/switch.nim | 4 + tests/testautorelay.nim | 122 ++++++++++++++++++++++++ tests/testnative.nim | 3 +- 4 files changed, 261 insertions(+), 1 deletion(-) create mode 100644 libp2p/services/autorelayservice.nim create mode 100644 tests/testautorelay.nim diff --git a/libp2p/services/autorelayservice.nim b/libp2p/services/autorelayservice.nim new file mode 100644 index 000000000..b0c27c6be --- /dev/null +++ b/libp2p/services/autorelayservice.nim @@ -0,0 +1,133 @@ +# Nim-LibP2P +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import chronos, chronicles, times, tables, sequtils, options +import ../switch, + ../protocols/connectivity/relay/[client, utils] + +logScope: + topics = "libp2p autorelay" + +type + OnReservationHandler = proc (addresses: seq[MultiAddress]) {.gcsafe, raises: [Defect].} + + AutoRelayService* = ref object of Service + running: bool + runner: Future[void] + client: RelayClient + numRelays: int + relayPeers: Table[PeerId, Future[void]] + relayAddresses: Table[PeerId, seq[MultiAddress]] + backingOff: seq[PeerId] + peerAvailable: AsyncEvent + onReservation: OnReservationHandler + rng: ref HmacDrbgContext + +proc reserveAndUpdate(self: AutoRelayService, relayPid: PeerId, selfPid: PeerId) {.async.} = + while self.running: + let + rsvp = await self.client.reserve(relayPid).wait(chronos.seconds(5)) + relayedAddr = rsvp.addrs.mapIt( + MultiAddress.init($it & "/p2p-circuit/p2p/" & $selfPid).tryGet()) + ttl = rsvp.expire.int64 - times.now().utc.toTime.toUnix + if ttl <= 60: + # A reservation under a minute is basically useless + break + if relayPid notin self.relayAddresses or self.relayAddresses[relayPid] != relayedAddr: + self.relayAddresses[relayPid] = relayedAddr + if not self.onReservation.isNil(): + self.onReservation(concat(toSeq(self.relayAddresses.values))) + await sleepAsync chronos.seconds(ttl - 30) + +method setup*(self: AutoRelayService, switch: Switch): Future[bool] {.async, gcsafe.} = + let hasBeenSetUp = await procCall Service(self).setup(switch) + if hasBeenSetUp: + proc handlePeerJoined(peerId: PeerId, event: PeerEvent) {.async.} = + trace "Peer Joined", peerId + if self.relayPeers.len < self.numRelays: + self.peerAvailable.fire() + proc handlePeerLeft(peerId: PeerId, event: PeerEvent) {.async.} = + trace "Peer Left", peerId + self.relayPeers.withValue(peerId, future): + future[].cancel() + switch.addPeerEventHandler(handlePeerJoined, Joined) + switch.addPeerEventHandler(handlePeerLeft, Left) + await self.run(switch) + return hasBeenSetUp + +proc manageBackedOff(self: AutoRelayService, pid: PeerId) {.async.} = + await sleepAsync(chronos.seconds(5)) + self.backingOff.keepItIf(it != pid) + self.peerAvailable.fire() + +proc innerRun(self: AutoRelayService, switch: Switch) {.async, gcsafe.} = + while true: + # Remove relayPeers that failed + let peers = toSeq(self.relayPeers.keys()) + for k in peers: + if self.relayPeers[k].finished(): + self.relayPeers.del(k) + self.relayAddresses.del(k) + if not self.onReservation.isNil(): + self.onReservation(concat(toSeq(self.relayAddresses.values))) + # To avoid ddosing our peers in certain conditions + self.backingOff.add(k) + asyncSpawn self.manageBackedOff(k) + + # Get all connected relayPeers + self.peerAvailable.clear() + var connectedPeers = switch.connectedPeers(Direction.Out) + connectedPeers.keepItIf(RelayV2HopCodec in switch.peerStore[ProtoBook][it] and + it notin self.relayPeers and + it notin self.backingOff) + self.rng.shuffle(connectedPeers) + + for relayPid in connectedPeers: + if self.relayPeers.len() >= self.numRelays: + break + self.relayPeers[relayPid] = self.reserveAndUpdate(relayPid, switch.peerInfo.peerId) + + if self.relayPeers.len() > 0: + await one(toSeq(self.relayPeers.values())) or self.peerAvailable.wait() + else: + await self.peerAvailable.wait() + await sleepAsync(200.millis) + +method run*(self: AutoRelayService, switch: Switch) {.async, gcsafe.} = + if self.running: + trace "Autorelay is already running" + return + self.running = true + self.runner = self.innerRun(switch) + +method stop*(self: AutoRelayService, switch: Switch): Future[bool] {.async, gcsafe.} = + let hasBeenStopped = await procCall Service(self).stop(switch) + if hasBeenStopped: + self.running = false + self.runner.cancel() + return hasBeenStopped + +proc getAddresses*(self: AutoRelayService): seq[MultiAddress] = + result = concat(toSeq(self.relayAddresses.values)) + +proc new*(T: typedesc[AutoRelayService], + numRelays: int, + client: RelayClient, + onReservation: OnReservationHandler, + rng: ref HmacDrbgContext): T = + T(numRelays: numRelays, + client: client, + onReservation: onReservation, + peerAvailable: newAsyncEvent(), + rng: rng) diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 964ddd7c3..6844dc515 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -296,6 +296,10 @@ proc stop*(s: Switch) {.async, public.} = trace "Stopping switch" s.started = false + + for service in s.services: + discard await service.stop(s) + # close and cleanup all connections await s.connManager.close() diff --git a/tests/testautorelay.nim b/tests/testautorelay.nim new file mode 100644 index 000000000..61062b512 --- /dev/null +++ b/tests/testautorelay.nim @@ -0,0 +1,122 @@ +{.used.} + +import chronos, options +import ../libp2p +import ../libp2p/[crypto/crypto, + protocols/connectivity/relay/relay, + protocols/connectivity/relay/messages, + protocols/connectivity/relay/utils, + protocols/connectivity/relay/client, + services/autorelayservice] +import ./helpers +import stew/byteutils + +proc createSwitch(r: Relay, autorelay: Service = nil): Switch = + var builder = SwitchBuilder.new() + .withRng(newRng()) + .withAddresses(@[ MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() ]) + .withTcpTransport() + .withMplex() + .withNoise() + .withCircuitRelay(r) + if not autorelay.isNil(): + builder = builder.withServices(@[autorelay]) + builder.build() + + +proc buildRelayMA(switchRelay: Switch, switchClient: Switch): MultiAddress = + MultiAddress.init($switchRelay.peerInfo.addrs[0] & "/p2p/" & + $switchRelay.peerInfo.peerId & "/p2p-circuit/p2p/" & + $switchClient.peerInfo.peerId).get() + +suite "Autorelay": + asyncTeardown: + checkTrackers() + + var + switchRelay {.threadvar.}: Switch + switchClient {.threadvar.}: Switch + relayClient {.threadvar.}: RelayClient + autorelay {.threadvar.}: AutoRelayService + + asyncTest "Simple test": + switchRelay = createSwitch(Relay.new()) + relayClient = RelayClient.new() + let fut = newFuture[void]() + proc checkMA(addresses: seq[MultiAddress]) = + check: addresses[0] == buildRelayMA(switchRelay, switchClient) + check: addresses.len() == 1 + fut.complete() + autorelay = AutoRelayService.new(3, relayClient, checkMA, newRng()) + switchClient = createSwitch(relayClient, autorelay) + await allFutures(switchClient.start(), switchRelay.start()) + await switchClient.connect(switchRelay.peerInfo.peerId, switchRelay.peerInfo.addrs) + await fut.wait(1.seconds) + let addresses = autorelay.getAddresses() + check: + addresses[0] == buildRelayMA(switchRelay, switchClient) + addresses.len() == 1 + await allFutures(switchClient.stop(), switchRelay.stop()) + + asyncTest "Connect after starting switches": + switchRelay = createSwitch(Relay.new()) + relayClient = RelayClient.new() + let fut = newFuture[void]() + proc checkMA(address: seq[MultiAddress]) = + check: address[0] == buildRelayMA(switchRelay, switchClient) + fut.complete() + let autorelay = AutoRelayService.new(3, relayClient, checkMA, newRng()) + switchClient = createSwitch(relayClient, autorelay) + await allFutures(switchClient.start(), switchRelay.start()) + await sleepAsync(500.millis) + await switchClient.connect(switchRelay.peerInfo.peerId, switchRelay.peerInfo.addrs) + await fut.wait(1.seconds) + let addresses = autorelay.getAddresses() + check: + addresses[0] == buildRelayMA(switchRelay, switchClient) + addresses.len() == 1 + await allFutures(switchClient.stop(), switchRelay.stop()) + + asyncTest "Three relays connections": + var state = 0 + let + rel1 = createSwitch(Relay.new()) + rel2 = createSwitch(Relay.new()) + rel3 = createSwitch(Relay.new()) + fut = newFuture[void]() + relayClient = RelayClient.new() + proc checkMA(addresses: seq[MultiAddress]) = + if state == 0 or state == 2: + check: + buildRelayMA(rel1, switchClient) in addresses + addresses.len() == 1 + state += 1 + elif state == 1: + check: + buildRelayMA(rel1, switchClient) in addresses + buildRelayMA(rel2, switchClient) in addresses + addresses.len() == 2 + state += 1 + elif state == 3: + check: + buildRelayMA(rel1, switchClient) in addresses + buildRelayMA(rel3, switchClient) in addresses + addresses.len() == 2 + state += 1 + fut.complete() + let autorelay = AutoRelayService.new(2, relayClient, checkMA, newRng()) + switchClient = createSwitch(relayClient, autorelay) + await allFutures(switchClient.start(), rel1.start(), rel2.start(), rel3.start()) + await switchClient.connect(rel1.peerInfo.peerId, rel1.peerInfo.addrs) + await sleepAsync(500.millis) + await switchClient.connect(rel2.peerInfo.peerId, rel2.peerInfo.addrs) + await switchClient.connect(rel3.peerInfo.peerId, rel3.peerInfo.addrs) + await sleepAsync(500.millis) + await rel2.stop() + await fut.wait(1.seconds) + let addresses = autorelay.getAddresses() + check: + buildRelayMA(rel1, switchClient) in addresses + buildRelayMA(rel3, switchClient) in addresses + addresses.len() == 2 + await allFutures(switchClient.stop(), rel1.stop(), rel3.stop()) diff --git a/tests/testnative.nim b/tests/testnative.nim index fb1f0de9d..71d122015 100644 --- a/tests/testnative.nim +++ b/tests/testnative.nim @@ -42,4 +42,5 @@ import testtcptransport, testdiscovery, testyamux, testautonat, - testautonatservice + testautonatservice, + testautorelay