From 5bf4fa89533467353cc3d83a8e4a0fb0d72ea9ab Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Tue, 13 Dec 2022 15:39:31 +0100 Subject: [PATCH] Finish the autorelay & add a simple test --- libp2p/protocols/connectivity/autorelay.nim | 45 ++++++++++++------- tests/testautorelay.nim | 49 +++++++++++++++++++++ 2 files changed, 79 insertions(+), 15 deletions(-) create mode 100644 tests/testautorelay.nim diff --git a/libp2p/protocols/connectivity/autorelay.nim b/libp2p/protocols/connectivity/autorelay.nim index bd7b20504..631a12f28 100644 --- a/libp2p/protocols/connectivity/autorelay.nim +++ b/libp2p/protocols/connectivity/autorelay.nim @@ -28,15 +28,16 @@ type client: RelayClient npeers: int peers: Table[PeerId, Future[void]] + peerJoined: AsyncEvent onReservation: OnReservationHandler -proc reserveAndUpdate(self: AutoRelay, peerId: PeerId) {.async.} = +proc reserveAndUpdate(self: AutoRelay, relayPid: PeerId, selfPid: PeerId) {.async.} = while self.running: let - rsvp = await self.client.reserve(peerId).wait(chronos.seconds(5)) - relayedAddr = MultiAddress.init("").tryGet() # TODO -# relayedAddr = MultiAddress.init($(rsvp.addrs[0]) & "/p2p-circuit/p2p/" & -# $(switch.peerInfo.peerId).tryGet() + rsvp = await self.client.reserve(relayPid).wait(chronos.seconds(5)) + relayedAddr = MultiAddress.init($(rsvp.addrs[0]) & + "/p2p-circuit/p2p/" & + $selfPid).tryGet() await self.onReservation(relayedAddr) await sleepAsync chronos.seconds(rsvp.expire.int64 - times.now().utc.toTime.toUnix) @@ -48,16 +49,22 @@ method setup*(self: AutoRelay, switch: Switch) {.async, gcsafe.} = proc handlePeer(peerId: PeerId, event: PeerEvent) {.async.} = if event.kind == Left and peerId in self.peers: self.peers[peerId].cancel() + elif event.kind == Joined and self.peers.len < self.npeers: + self.peerJoined.fire() + switch.addPeerEventHandler(handlePeer, Joined) + switch.addPeerEventHandler(handlePeer, Left) method innerRun(self: AutoRelay, switch: Switch) {.async, gcsafe.} = while true: # Remove peers that failed var peersToRemove: seq[PeerId] for k, v in self.peers: - if v.failed(): + if v.failed() or v.cancelled(): peersToRemove.add(k) for k in peersToRemove: self.peers.del(k) + if peersToRemove.len() > 0: + await sleepAsync(500.millis) # To avoid ddosing our peers in certain condition # Get all connected peers let rng = newRng() @@ -65,13 +72,18 @@ method innerRun(self: AutoRelay, switch: Switch) {.async, gcsafe.} = connectedPeers.keepItIf(RelayV2HopCodec in switch.peerStore[ProtoBook][it]) rng.shuffle(connectedPeers) - for peerId in switch.connectedPeers(Direction.Out): + for relayPid in switch.connectedPeers(Direction.Out): if self.peers.len() >= self.npeers: break - if RelayV2HopCodec in switch.peerStore[ProtoBook][peerId]: - self.peers[peerId] = self.reserveAndUpdate(peerId) + if RelayV2HopCodec in switch.peerStore[ProtoBook][relayPid]: + self.peers[relayPid] = self.reserveAndUpdate(relayPid, switch.peerInfo.peerId) let peersFutures = toSeq(self.peers.values()) - # await race(peersFutures) TODO + + if self.peers.len() < self.npeers: + self.peerJoined.clear() + await one(peersFutures) or self.peerJoined.wait() + else: + discard await one(peersFutures) method run*(self: AutoRelay, switch: Switch) {.async, gcsafe.} = if self.running: @@ -87,8 +99,11 @@ method stop*(self: AutoRelay, switch: Switch) {.async, gcsafe.} = self.running = false self.runner.cancel() -method new(T: typedesc[AutoRelay], - npeers: int, - client: RelayClient, - onReservation: OnReservationHandler): T = - T(npeers: npeers, client: client, onReservation: onReservation) +proc new*(T: typedesc[AutoRelay], + npeers: int, + client: RelayClient, + onReservation: OnReservationHandler): T = + T(npeers: npeers, + client: client, + onReservation: onReservation, + peerJoined: newAsyncEvent()) diff --git a/tests/testautorelay.nim b/tests/testautorelay.nim new file mode 100644 index 000000000..17ca02157 --- /dev/null +++ b/tests/testautorelay.nim @@ -0,0 +1,49 @@ +{.used.} + +import bearssl, chronos, options +import ../libp2p +import ../libp2p/[protocols/connectivity/relay/relay, + protocols/connectivity/relay/messages, + protocols/connectivity/relay/utils, + protocols/connectivity/relay/client, + protocols/connectivity/autorelay] +import ./helpers +import std/times +import stew/byteutils + +proc createSwitch(r: Relay): Switch = + result = SwitchBuilder.new() + .withRng(newRng()) + .withAddresses(@[ MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() ]) + .withTcpTransport() + .withMplex() + .withNoise() + .withCircuitRelay(r) + .build() + +suite "Autorelay": + asyncTeardown: + checkTrackers() + + asyncTest "Simple test": + let relay = createSwitch(Relay.new()) + let client = RelayClient.new() + let switch = createSwitch(client) + let fut = newFuture[void]() + proc checkMA(address: MultiAddress) {.async.} = + check: address == MultiAddress.init($relay.peerInfo.addrs[0] & "/p2p/" & + $relay.peerInfo.peerId & "/p2p-circuit/p2p/" & + $switch.peerInfo.peerId).get() + fut.complete() + let autorelay = AutoRelay.new(3, client, checkMA) + switch.addService(autorelay) + await switch.start() + await relay.start() + await switch.connect(relay.peerInfo.peerId, relay.peerInfo.addrs) + + discard autorelay.run(switch) + + await fut + await switch.stop() + await relay.stop() +