mirror of
https://github.com/vacp2p/nim-libp2p.git
synced 2025-03-02 17:10:44 +00:00
chore: list raised exceptions in switch
services (#1251)
Co-authored-by: richΛrd <info@richardramos.me>
This commit is contained in:
parent
3e16ca724d
commit
86b6469e35
@ -22,7 +22,7 @@ type
|
||||
PeerInfoError* = object of LPError
|
||||
|
||||
AddressMapper* = proc(listenAddrs: seq[MultiAddress]): Future[seq[MultiAddress]] {.
|
||||
gcsafe, raises: []
|
||||
gcsafe, async: (raises: [CancelledError])
|
||||
.} ## A proc that expected to resolve the listen addresses into dialable addresses
|
||||
|
||||
PeerInfo* {.public.} = ref object
|
||||
@ -52,7 +52,7 @@ func shortLog*(p: PeerInfo): auto =
|
||||
chronicles.formatIt(PeerInfo):
|
||||
shortLog(it)
|
||||
|
||||
proc update*(p: PeerInfo) {.async.} =
|
||||
proc update*(p: PeerInfo) {.async: (raises: [CancelledError]).} =
|
||||
# p.addrs.len == 0 overrides addrs only if it is the first time update is being executed or if the field is empty.
|
||||
# p.addressMappers.len == 0 is for when all addressMappers have been removed,
|
||||
# and we wish to have addrs in its initial state, i.e., a copy of listenAddrs.
|
||||
|
@ -50,7 +50,7 @@ type
|
||||
|
||||
StatusAndConfidenceHandler* = proc(
|
||||
networkReachability: NetworkReachability, confidence: Opt[float]
|
||||
): Future[void] {.gcsafe, raises: [].}
|
||||
): Future[void] {.gcsafe, async: (raises: [CancelledError]).}
|
||||
|
||||
proc new*(
|
||||
T: typedesc[AutonatService],
|
||||
@ -79,7 +79,7 @@ proc new*(
|
||||
enableAddressMapper: enableAddressMapper,
|
||||
)
|
||||
|
||||
proc callHandler(self: AutonatService) {.async.} =
|
||||
proc callHandler(self: AutonatService) {.async: (raises: [CancelledError]).} =
|
||||
if not isNil(self.statusAndConfidenceHandler):
|
||||
await self.statusAndConfidenceHandler(self.networkReachability, self.confidence)
|
||||
|
||||
@ -92,7 +92,7 @@ proc doesPeerHaveIncomingConn(switch: Switch, peerId: PeerId): bool =
|
||||
|
||||
proc handleAnswer(
|
||||
self: AutonatService, ans: NetworkReachability
|
||||
): Future[bool] {.async.} =
|
||||
): Future[bool] {.async: (raises: [CancelledError]).} =
|
||||
if ans == Unknown:
|
||||
return
|
||||
|
||||
@ -127,7 +127,7 @@ proc handleAnswer(
|
||||
|
||||
proc askPeer(
|
||||
self: AutonatService, switch: Switch, peerId: PeerId
|
||||
): Future[NetworkReachability] {.async.} =
|
||||
): Future[NetworkReachability] {.async: (raises: [CancelledError]).} =
|
||||
logScope:
|
||||
peerId = $peerId
|
||||
|
||||
@ -160,7 +160,9 @@ proc askPeer(
|
||||
await switch.peerInfo.update()
|
||||
return ans
|
||||
|
||||
proc askConnectedPeers(self: AutonatService, switch: Switch) {.async.} =
|
||||
proc askConnectedPeers(
|
||||
self: AutonatService, switch: Switch
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
trace "Asking peers for reachability"
|
||||
var peers = switch.connectedPeers(Direction.Out)
|
||||
self.rng.shuffle(peers)
|
||||
@ -181,7 +183,7 @@ proc schedule(service: AutonatService, switch: Switch, interval: Duration) {.asy
|
||||
|
||||
proc addressMapper(
|
||||
self: AutonatService, peerStore: PeerStore, listenAddrs: seq[MultiAddress]
|
||||
): Future[seq[MultiAddress]] {.async.} =
|
||||
): Future[seq[MultiAddress]] {.async: (raises: [CancelledError]).} =
|
||||
if self.networkReachability != NetworkReachability.Reachable:
|
||||
return listenAddrs
|
||||
|
||||
@ -198,10 +200,12 @@ proc addressMapper(
|
||||
addrs.add(processedMA)
|
||||
return addrs
|
||||
|
||||
method setup*(self: AutonatService, switch: Switch): Future[bool] {.async.} =
|
||||
method setup*(
|
||||
self: AutonatService, switch: Switch
|
||||
): Future[bool] {.async: (raises: [CancelledError]).} =
|
||||
self.addressMapper = proc(
|
||||
listenAddrs: seq[MultiAddress]
|
||||
): Future[seq[MultiAddress]] {.async.} =
|
||||
): Future[seq[MultiAddress]] {.async: (raises: [CancelledError]).} =
|
||||
return await addressMapper(self, switch.peerStore, listenAddrs)
|
||||
|
||||
info "Setting up AutonatService"
|
||||
@ -221,11 +225,15 @@ method setup*(self: AutonatService, switch: Switch): Future[bool] {.async.} =
|
||||
switch.peerInfo.addressMappers.add(self.addressMapper)
|
||||
return hasBeenSetup
|
||||
|
||||
method run*(self: AutonatService, switch: Switch) {.async, public.} =
|
||||
method run*(
|
||||
self: AutonatService, switch: Switch
|
||||
) {.public, async: (raises: [CancelledError]).} =
|
||||
trace "Running AutonatService"
|
||||
await askConnectedPeers(self, switch)
|
||||
|
||||
method stop*(self: AutonatService, switch: Switch): Future[bool] {.async, public.} =
|
||||
method stop*(
|
||||
self: AutonatService, switch: Switch
|
||||
): Future[bool] {.public, async: (raises: [CancelledError]).} =
|
||||
info "Stopping AutonatService"
|
||||
let hasBeenStopped = await procCall Service(self).stop(switch)
|
||||
if hasBeenStopped:
|
||||
|
@ -36,7 +36,7 @@ proc isRunning*(self: AutoRelayService): bool =
|
||||
|
||||
proc addressMapper(
|
||||
self: AutoRelayService, listenAddrs: seq[MultiAddress]
|
||||
): Future[seq[MultiAddress]] {.async.} =
|
||||
): Future[seq[MultiAddress]] {.async: (raises: []).} =
|
||||
return concat(toSeq(self.relayAddresses.values)) & listenAddrs
|
||||
|
||||
proc reserveAndUpdate(
|
||||
@ -58,10 +58,12 @@ proc reserveAndUpdate(
|
||||
self.onReservation(concat(toSeq(self.relayAddresses.values)))
|
||||
await sleepAsync chronos.seconds(ttl - 30)
|
||||
|
||||
method setup*(self: AutoRelayService, switch: Switch): Future[bool] {.async.} =
|
||||
method setup*(
|
||||
self: AutoRelayService, switch: Switch
|
||||
): Future[bool] {.async: (raises: [CancelledError]).} =
|
||||
self.addressMapper = proc(
|
||||
listenAddrs: seq[MultiAddress]
|
||||
): Future[seq[MultiAddress]] {.async.} =
|
||||
): Future[seq[MultiAddress]] {.async: (raises: [CancelledError]).} =
|
||||
return await addressMapper(self, listenAddrs)
|
||||
|
||||
let hasBeenSetUp = await procCall Service(self).setup(switch)
|
||||
@ -87,19 +89,24 @@ proc manageBackedOff(self: AutoRelayService, pid: PeerId) {.async.} =
|
||||
self.backingOff.keepItIf(it != pid)
|
||||
self.peerAvailable.fire()
|
||||
|
||||
proc innerRun(self: AutoRelayService, switch: Switch) {.async.} =
|
||||
proc innerRun(
|
||||
self: AutoRelayService, switch: Switch
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
while true:
|
||||
# Remove relayPeers that failed
|
||||
let peers = toSeq(self.relayPeers.keys())
|
||||
for k in peers:
|
||||
if self.relayPeers[k].finished():
|
||||
self.relayPeers.del(k)
|
||||
self.relayAddresses.del(k)
|
||||
if not self.onReservation.isNil():
|
||||
self.onReservation(concat(toSeq(self.relayAddresses.values)))
|
||||
# To avoid ddosing our peers in certain conditions
|
||||
self.backingOff.add(k)
|
||||
asyncSpawn self.manageBackedOff(k)
|
||||
try:
|
||||
if self.relayPeers[k].finished():
|
||||
self.relayPeers.del(k)
|
||||
self.relayAddresses.del(k)
|
||||
if not self.onReservation.isNil():
|
||||
self.onReservation(concat(toSeq(self.relayAddresses.values)))
|
||||
# To avoid ddosing our peers in certain conditions
|
||||
self.backingOff.add(k)
|
||||
asyncSpawn self.manageBackedOff(k)
|
||||
except KeyError:
|
||||
raiseAssert "checked with in"
|
||||
|
||||
# Get all connected relayPeers
|
||||
self.peerAvailable.clear()
|
||||
@ -116,18 +123,25 @@ proc innerRun(self: AutoRelayService, switch: Switch) {.async.} =
|
||||
self.relayPeers[relayPid] = self.reserveAndUpdate(relayPid, switch)
|
||||
|
||||
if self.relayPeers.len() > 0:
|
||||
await one(toSeq(self.relayPeers.values())) or self.peerAvailable.wait()
|
||||
try:
|
||||
await one(toSeq(self.relayPeers.values())) or self.peerAvailable.wait()
|
||||
except ValueError:
|
||||
raiseAssert "checked with relayPeers.len()"
|
||||
else:
|
||||
await self.peerAvailable.wait()
|
||||
|
||||
method run*(self: AutoRelayService, switch: Switch) {.async.} =
|
||||
method run*(
|
||||
self: AutoRelayService, switch: Switch
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
if self.running:
|
||||
trace "Autorelay is already running"
|
||||
return
|
||||
self.running = true
|
||||
self.runner = self.innerRun(switch)
|
||||
|
||||
method stop*(self: AutoRelayService, switch: Switch): Future[bool] {.async.} =
|
||||
method stop*(
|
||||
self: AutoRelayService, switch: Switch
|
||||
): Future[bool] {.public, async: (raises: [CancelledError]).} =
|
||||
let hasBeenStopped = await procCall Service(self).stop(switch)
|
||||
if hasBeenStopped:
|
||||
self.running = false
|
||||
|
@ -89,13 +89,18 @@ proc newConnectedPeerHandler(
|
||||
except CatchableError as err:
|
||||
debug "Hole punching failed during dcutr", err = err.msg
|
||||
|
||||
method setup*(self: HPService, switch: Switch): Future[bool] {.async.} =
|
||||
method setup*(
|
||||
self: HPService, switch: Switch
|
||||
): Future[bool] {.async: (raises: [CancelledError]).} =
|
||||
var hasBeenSetup = await procCall Service(self).setup(switch)
|
||||
hasBeenSetup = hasBeenSetup and await self.autonatService.setup(switch)
|
||||
|
||||
if hasBeenSetup:
|
||||
let dcutrProto = Dcutr.new(switch)
|
||||
switch.mount(dcutrProto)
|
||||
try:
|
||||
let dcutrProto = Dcutr.new(switch)
|
||||
switch.mount(dcutrProto)
|
||||
except LPError as err:
|
||||
error "Failed to mount Dcutr", err = err.msg
|
||||
|
||||
self.newConnectedPeerHandler = proc(peerId: PeerId, event: PeerEvent) {.async.} =
|
||||
await newConnectedPeerHandler(self, switch, peerId, event)
|
||||
@ -106,7 +111,7 @@ method setup*(self: HPService, switch: Switch): Future[bool] {.async.} =
|
||||
|
||||
self.onNewStatusHandler = proc(
|
||||
networkReachability: NetworkReachability, confidence: Opt[float]
|
||||
) {.async.} =
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
if networkReachability == NetworkReachability.NotReachable and
|
||||
not self.autoRelayService.isRunning():
|
||||
discard await self.autoRelayService.setup(switch)
|
||||
@ -121,10 +126,14 @@ method setup*(self: HPService, switch: Switch): Future[bool] {.async.} =
|
||||
self.autonatService.statusAndConfidenceHandler(self.onNewStatusHandler)
|
||||
return hasBeenSetup
|
||||
|
||||
method run*(self: HPService, switch: Switch) {.async, public.} =
|
||||
method run*(
|
||||
self: HPService, switch: Switch
|
||||
) {.public, async: (raises: [CancelledError]).} =
|
||||
await self.autonatService.run(switch)
|
||||
|
||||
method stop*(self: HPService, switch: Switch): Future[bool] {.async, public.} =
|
||||
method stop*(
|
||||
self: HPService, switch: Switch
|
||||
): Future[bool] {.public, async: (raises: [CancelledError]).} =
|
||||
discard await self.autonatService.stop(switch)
|
||||
if not isNil(self.newConnectedPeerHandler):
|
||||
switch.connManager.removePeerEventHandler(
|
||||
|
@ -148,7 +148,7 @@ proc expandWildcardAddresses(
|
||||
|
||||
method setup*(
|
||||
self: WildcardAddressResolverService, switch: Switch
|
||||
): Future[bool] {.async.} =
|
||||
): Future[bool] {.async: (raises: [CancelledError]).} =
|
||||
## Sets up the `WildcardAddressResolverService`.
|
||||
##
|
||||
## This method adds the address mapper to the peer's list of address mappers.
|
||||
@ -161,7 +161,7 @@ method setup*(
|
||||
## - A `Future[bool]` that resolves to `true` if the setup was successful, otherwise `false`.
|
||||
self.addressMapper = proc(
|
||||
listenAddrs: seq[MultiAddress]
|
||||
): Future[seq[MultiAddress]] {.async.} =
|
||||
): Future[seq[MultiAddress]] {.async: (raises: [CancelledError]).} =
|
||||
return expandWildcardAddresses(self.networkInterfaceProvider, listenAddrs)
|
||||
|
||||
debug "Setting up WildcardAddressResolverService"
|
||||
@ -170,7 +170,9 @@ method setup*(
|
||||
switch.peerInfo.addressMappers.add(self.addressMapper)
|
||||
return hasBeenSetup
|
||||
|
||||
method run*(self: WildcardAddressResolverService, switch: Switch) {.async, public.} =
|
||||
method run*(
|
||||
self: WildcardAddressResolverService, switch: Switch
|
||||
) {.public, async: (raises: [CancelledError]).} =
|
||||
## Runs the WildcardAddressResolverService for a given switch.
|
||||
##
|
||||
## It updates the peer information for the provided switch by running the registered address mapper. Any other
|
||||
@ -181,7 +183,7 @@ method run*(self: WildcardAddressResolverService, switch: Switch) {.async, publi
|
||||
|
||||
method stop*(
|
||||
self: WildcardAddressResolverService, switch: Switch
|
||||
): Future[bool] {.async, public.} =
|
||||
): Future[bool] {.public, async: (raises: [CancelledError]).} =
|
||||
## Stops the WildcardAddressResolverService.
|
||||
##
|
||||
## Handles the shutdown process of the WildcardAddressResolverService for a given switch.
|
||||
|
@ -64,17 +64,21 @@ type
|
||||
Service* = ref object of RootObj
|
||||
inUse: bool
|
||||
|
||||
method setup*(self: Service, switch: Switch): Future[bool] {.base, async.} =
|
||||
method setup*(
|
||||
self: Service, switch: Switch
|
||||
): Future[bool] {.base, async: (raises: [CancelledError]).} =
|
||||
if self.inUse:
|
||||
warn "service setup has already been called"
|
||||
return false
|
||||
self.inUse = true
|
||||
return true
|
||||
|
||||
method run*(self: Service, switch: Switch) {.base, async.} =
|
||||
method run*(self: Service, switch: Switch) {.base, async: (raises: [CancelledError]).} =
|
||||
doAssert(false, "Not implemented!")
|
||||
|
||||
method stop*(self: Service, switch: Switch): Future[bool] {.base, async.} =
|
||||
method stop*(
|
||||
self: Service, switch: Switch
|
||||
): Future[bool] {.base, async: (raises: [CancelledError]).} =
|
||||
if not self.inUse:
|
||||
warn "service is already stopped"
|
||||
return false
|
||||
|
@ -99,7 +99,7 @@ suite "Autonat Service":
|
||||
|
||||
proc statusAndConfidenceHandler(
|
||||
networkReachability: NetworkReachability, confidence: Opt[float]
|
||||
) {.async.} =
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
if networkReachability == NetworkReachability.Reachable and confidence.isSome() and
|
||||
confidence.get() >= 0.3:
|
||||
if not awaiter.finished:
|
||||
@ -148,7 +148,7 @@ suite "Autonat Service":
|
||||
|
||||
proc statusAndConfidenceHandler(
|
||||
networkReachability: NetworkReachability, confidence: Opt[float]
|
||||
) {.async.} =
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
if networkReachability == NetworkReachability.NotReachable and confidence.isSome() and
|
||||
confidence.get() >= 0.3:
|
||||
if not awaiter.finished:
|
||||
@ -196,7 +196,7 @@ suite "Autonat Service":
|
||||
|
||||
proc statusAndConfidenceHandler(
|
||||
networkReachability: NetworkReachability, confidence: Opt[float]
|
||||
) {.async.} =
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
if networkReachability == NetworkReachability.Reachable and confidence.isSome() and
|
||||
confidence.get() == 1:
|
||||
if not awaiter.finished:
|
||||
@ -241,7 +241,7 @@ suite "Autonat Service":
|
||||
|
||||
proc statusAndConfidenceHandler(
|
||||
networkReachability: NetworkReachability, confidence: Opt[float]
|
||||
) {.async.} =
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
if networkReachability == NetworkReachability.NotReachable and confidence.isSome() and
|
||||
confidence.get() >= 0.3:
|
||||
if not awaiter.finished:
|
||||
@ -303,7 +303,7 @@ suite "Autonat Service":
|
||||
|
||||
proc statusAndConfidenceHandler(
|
||||
networkReachability: NetworkReachability, confidence: Opt[float]
|
||||
) {.async.} =
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
if networkReachability == NetworkReachability.Reachable and confidence.isSome() and
|
||||
confidence.get() == 1:
|
||||
if not awaiter.finished:
|
||||
@ -353,7 +353,7 @@ suite "Autonat Service":
|
||||
|
||||
proc statusAndConfidenceHandler1(
|
||||
networkReachability: NetworkReachability, confidence: Opt[float]
|
||||
) {.async.} =
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
if networkReachability == NetworkReachability.Reachable and confidence.isSome() and
|
||||
confidence.get() == 1:
|
||||
if not awaiter1.finished:
|
||||
@ -361,7 +361,7 @@ suite "Autonat Service":
|
||||
|
||||
proc statusAndConfidenceHandler2(
|
||||
networkReachability: NetworkReachability, confidence: Opt[float]
|
||||
) {.async.} =
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
if networkReachability == NetworkReachability.Reachable and confidence.isSome() and
|
||||
confidence.get() == 1:
|
||||
if not awaiter2.finished:
|
||||
@ -405,7 +405,7 @@ suite "Autonat Service":
|
||||
|
||||
proc statusAndConfidenceHandler1(
|
||||
networkReachability: NetworkReachability, confidence: Opt[float]
|
||||
) {.async.} =
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
if networkReachability == NetworkReachability.Reachable and confidence.isSome() and
|
||||
confidence.get() == 1:
|
||||
if not awaiter1.finished:
|
||||
@ -454,7 +454,7 @@ suite "Autonat Service":
|
||||
|
||||
proc statusAndConfidenceHandler(
|
||||
networkReachability: NetworkReachability, confidence: Opt[float]
|
||||
) {.async.} =
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
if networkReachability == NetworkReachability.Reachable and confidence.isSome() and
|
||||
confidence.get() == 1:
|
||||
if not awaiter.finished:
|
||||
@ -498,7 +498,7 @@ suite "Autonat Service":
|
||||
|
||||
proc statusAndConfidenceHandler(
|
||||
networkReachability: NetworkReachability, confidence: Opt[float]
|
||||
) {.async.} =
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
fail()
|
||||
|
||||
check autonatService.networkReachability == NetworkReachability.Unknown
|
||||
|
@ -74,7 +74,9 @@ suite "PeerInfo":
|
||||
]
|
||||
multiAddresses2 = @[MultiAddress.init("/ip4/8.8.8.8/tcp/33").tryGet()]
|
||||
|
||||
proc addressMapper(input: seq[MultiAddress]): Future[seq[MultiAddress]] {.async.} =
|
||||
proc addressMapper(
|
||||
input: seq[MultiAddress]
|
||||
): Future[seq[MultiAddress]] {.async: (raises: [CancelledError]).} =
|
||||
check input == multiAddresses
|
||||
await sleepAsync(0.seconds)
|
||||
return multiAddresses2
|
||||
|
Loading…
x
Reference in New Issue
Block a user