From 8ddc636fa7ded35dc55b09cd81622152ca8b4654 Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Wed, 14 Dec 2022 16:32:10 +0100 Subject: [PATCH] Multiple fixes --- libp2p/protocols/connectivity/autorelay.nim | 83 +++++++++++---------- libp2p/switch.nim | 10 ++- tests/testautorelay.nim | 3 +- tests/testnative.nim | 3 +- 4 files changed, 54 insertions(+), 45 deletions(-) diff --git a/libp2p/protocols/connectivity/autorelay.nim b/libp2p/protocols/connectivity/autorelay.nim index 5b1175cc9..2058befab 100644 --- a/libp2p/protocols/connectivity/autorelay.nim +++ b/libp2p/protocols/connectivity/autorelay.nim @@ -22,89 +22,96 @@ logScope: type OnReservationHandler = proc (ma: MultiAddress): Future[void] {.gcsafe, raises: [Defect].} - AutoRelay* = ref object of Service + AutoRelayService* = ref object of Service running: bool runner: Future[void] client: RelayClient - npeers: int - peers: Table[PeerId, Future[void]] + numRelays: int + relayPeers: Table[PeerId, Future[void]] + relayAddresses: Table[PeerId, MultiAddress] peerJoined: AsyncEvent onReservation: OnReservationHandler -proc reserveAndUpdate(self: AutoRelay, relayPid: PeerId, selfPid: PeerId) {.async.} = +proc reserveAndUpdate(self: AutoRelayService, relayPid: PeerId, selfPid: PeerId) {.async.} = while self.running: let rsvp = await self.client.reserve(relayPid).wait(chronos.seconds(5)) relayedAddr = MultiAddress.init($(rsvp.addrs[0]) & "/p2p-circuit/p2p/" & $selfPid).tryGet() - await self.onReservation(relayedAddr) + if not self.onReservation.isNil(): + await self.onReservation(relayedAddr) + self.relayAddresses[relayPid] = relayedAddr await sleepAsync chronos.seconds(rsvp.expire.int64 - times.now().utc.toTime.toUnix) -method setup*(self: AutoRelay, switch: Switch) {.async, gcsafe.} = - if self.inUse: - warn "Autorelay setup has already been called" - return - self.inUse = true - proc handlePeer(peerId: PeerId, event: PeerEvent) {.async.} = - if event.kind == Left and peerId in self.peers: - self.peers[peerId].cancel() - elif event.kind == Joined and self.peers.len < self.npeers: - self.peerJoined.fire() - switch.addPeerEventHandler(handlePeer, Joined) - switch.addPeerEventHandler(handlePeer, Left) +method setup*(self: AutoRelayService, switch: Switch): Future[bool] {.async, gcsafe.} = + let hasBeenSetUp = await procCall Service(self).setup(switch) + if hasBeenSetUp: + proc handlePeerJoined(peerId: PeerId, event: PeerEvent) {.async.} = + if self.relayPeers.len < self.numRelays: + self.peerJoined.fire() + proc handlePeerLeft(peerId: PeerId, event: PeerEvent) {.async.} = + if peerId in self.relayPeers: + self.relayPeers[peerId].cancel() + switch.addPeerEventHandler(handlePeerJoined, Joined) + switch.addPeerEventHandler(handlePeerLeft, Left) + return hasBeenSetUp -method innerRun(self: AutoRelay, switch: Switch) {.async, gcsafe.} = +method innerRun(self: AutoRelayService, switch: Switch) {.async, gcsafe.} = while true: - # Remove peers that failed + # Remove relayPeers that failed var peersToRemove: seq[PeerId] - for k, v in self.peers: + for k, v in self.relayPeers: if v.finished(): peersToRemove.add(k) for k in peersToRemove: - self.peers.del(k) + self.relayPeers.del(k) + self.relayAddresses.del(k) if peersToRemove.len() > 0: - await sleepAsync(500.millis) # To avoid ddosing our peers in certain condition + await sleepAsync(500.millis) # To avoid ddosing our relayPeers in certain condition - # Get all connected peers + # Get all connected relayPeers let rng = newRng() var connectedPeers = switch.connectedPeers(Direction.Out) connectedPeers.keepItIf(RelayV2HopCodec in switch.peerStore[ProtoBook][it] or - it notin self.peers.keys()) + it notin self.relayPeers) rng.shuffle(connectedPeers) for relayPid in connectedPeers: - if self.peers.len() >= self.npeers: + if self.relayPeers.len() >= self.numRelays: break if RelayV2HopCodec in switch.peerStore[ProtoBook][relayPid]: - self.peers[relayPid] = self.reserveAndUpdate(relayPid, switch.peerInfo.peerId) - let peersFutures = toSeq(self.peers.values()) + self.relayPeers[relayPid] = self.reserveAndUpdate(relayPid, switch.peerInfo.peerId) + let peersFutures = toSeq(self.relayPeers.values()) - if self.peers.len() < self.npeers: + if self.relayPeers.len() < self.numRelays: self.peerJoined.clear() await one(peersFutures) or self.peerJoined.wait() else: discard await one(peersFutures) -method run*(self: AutoRelay, switch: Switch) {.async, gcsafe.} = +method run*(self: AutoRelayService, switch: Switch) {.async, gcsafe.} = if self.running: trace "Autorelay is already running" return self.running = true self.runner = self.innerRun(switch) -method stop*(self: AutoRelay, switch: Switch) {.async, gcsafe.} = - if not self.inUse: - warn "service is already stopped" - self.inUse = false - self.running = false - self.runner.cancel() +method stop*(self: AutoRelayService, switch: Switch): Future[bool] {.async, gcsafe.} = + let hasBeenStopped = await procCall Service(self).stop(switch) + if hasBeenStopped: + self.running = false + self.runner.cancel() + return hasBeenStopped -proc new*(T: typedesc[AutoRelay], - npeers: int, +method getAddresses*(self: AutoRelayService): seq[MultiAddress] = + result = toSeq(self.relayAddresses.values) + +proc new*(T: typedesc[AutoRelayService], + numRelays: int, client: RelayClient, onReservation: OnReservationHandler): T = - T(npeers: npeers, + T(numRelays: numRelays, client: client, onReservation: onReservation, peerJoined: newAsyncEvent()) diff --git a/libp2p/switch.nim b/libp2p/switch.nim index c6fb0865b..aa92ab6de 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -79,20 +79,22 @@ type Service* = ref object of RootObj inUse*: bool -method setup*(self: Service, switch: Switch) {.base, async, gcsafe, public.} = +method setup*(self: Service, switch: Switch): Future[bool] {.base, async, gcsafe, public.} = if self.inUse: warn "service setup has already been called" - return + return false self.inUse = true + return true method run*(self: Service, switch: Switch) {.base, async, gcsafe, public.} = doAssert(false, "Not implemented!") -method stop*(self: Service, switch: Switch) {.base, async, gcsafe, public.} = +method stop*(self: Service, switch: Switch): Future[bool] {.base, async, gcsafe, public.} = if not self.inUse: warn "service is already stopped" - return + return false self.inUse = false + return true proc addConnEventHandler*(s: Switch, handler: ConnEventHandler, diff --git a/tests/testautorelay.nim b/tests/testautorelay.nim index 17ca02157..3f09165c3 100644 --- a/tests/testautorelay.nim +++ b/tests/testautorelay.nim @@ -35,7 +35,7 @@ suite "Autorelay": $relay.peerInfo.peerId & "/p2p-circuit/p2p/" & $switch.peerInfo.peerId).get() fut.complete() - let autorelay = AutoRelay.new(3, client, checkMA) + let autorelay = AutoRelayService.new(3, client, checkMA) switch.addService(autorelay) await switch.start() await relay.start() @@ -46,4 +46,3 @@ suite "Autorelay": await fut await switch.stop() await relay.stop() - diff --git a/tests/testnative.nim b/tests/testnative.nim index f35971b4b..27b7f6dda 100644 --- a/tests/testnative.nim +++ b/tests/testnative.nim @@ -40,4 +40,5 @@ import testtcptransport, testrendezvous, testdiscovery, testyamux, - testautonat + testautonat, + testautorelay