chore: specify raised exceptions in miscellaneous places (#1269)

This commit is contained in:
vladopajic 2025-02-28 14:19:53 +01:00 committed by GitHub
parent f7daad91e6
commit adf2345adb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 83 additions and 39 deletions

View File

@ -276,7 +276,7 @@ proc selectMuxer*(c: ConnManager, peerId: PeerId): Muxer =
trace "connection not found", peerId
return mux
proc storeMuxer*(c: ConnManager, muxer: Muxer) {.raises: [CatchableError].} =
proc storeMuxer*(c: ConnManager, muxer: Muxer) {.raises: [LPError].} =
## store the connection and muxer
##

View File

@ -972,6 +972,7 @@ proc openStream*(
raise newException(DaemonLocalError, "Wrong message type!")
proc streamHandler(server: StreamServer, transp: StreamTransport) {.async.} =
# must not specify raised exceptions as this is StreamCallback from chronos
var api = getUserData[DaemonAPI](server)
var message = await transp.recvMessage()
var pb = initProtoBuffer(message)

View File

@ -79,16 +79,20 @@ type
advertisementUpdated*: AsyncEvent
advertiseLoop*: Future[void]
method request*(self: DiscoveryInterface, pa: PeerAttributes) {.async, base.} =
doAssert(false, "Not implemented!")
method advertise*(self: DiscoveryInterface) {.async, base.} =
doAssert(false, "Not implemented!")
type
DiscoveryError* = object of LPError
DiscoveryFinished* = object of LPError
method request*(
self: DiscoveryInterface, pa: PeerAttributes
) {.base, async: (raises: [DiscoveryError, CancelledError]).} =
doAssert(false, "Not implemented!")
method advertise*(
self: DiscoveryInterface
) {.base, async: (raises: [CancelledError]).} =
doAssert(false, "Not implemented!")
type
DiscoveryQuery* = ref object
attr: PeerAttributes
peers: AsyncQueue[PeerAttributes]
@ -137,7 +141,9 @@ template forEach*(query: DiscoveryQuery, code: untyped) =
## peer attritubtes are available through the variable
## `peer`
proc forEachInternal(q: DiscoveryQuery) {.async.} =
proc forEachInternal(
q: DiscoveryQuery
) {.async: (raises: [CancelledError, DiscoveryError]).} =
while true:
let peer {.inject.} =
try:
@ -162,7 +168,11 @@ proc stop*(dm: DiscoveryManager) =
continue
i.advertiseLoop.cancel()
proc getPeer*(query: DiscoveryQuery): Future[PeerAttributes] {.async.} =
proc getPeer*(
query: DiscoveryQuery
): Future[PeerAttributes] {.
async: (raises: [CancelledError, DiscoveryError, DiscoveryFinished])
.} =
let getter = query.peers.popFirst()
try:

View File

@ -23,7 +23,9 @@ type
proc `==`*(a, b: RdvNamespace): bool {.borrow.}
method request*(self: RendezVousInterface, pa: PeerAttributes) {.async.} =
method request*(
self: RendezVousInterface, pa: PeerAttributes
) {.async: (raises: [DiscoveryError, CancelledError]).} =
var namespace = ""
for attr in pa:
if attr.ofType(RdvNamespace):
@ -48,7 +50,7 @@ method request*(self: RendezVousInterface, pa: PeerAttributes) {.async.} =
await sleepAsync(self.timeToRequest)
method advertise*(self: RendezVousInterface) {.async.} =
method advertise*(self: RendezVousInterface) {.async: (raises: [CancelledError]).} =
while true:
var toAdvertise: seq[string]
for attr in self.toAdvertise:

View File

@ -188,7 +188,16 @@ proc cleanup*(peerStore: PeerStore, peerId: PeerId) =
peerStore.del(peerStore.toClean[0])
peerStore.toClean.delete(0)
proc identify*(peerStore: PeerStore, muxer: Muxer) {.async.} =
proc identify*(
peerStore: PeerStore, muxer: Muxer
) {.
async: (
raises: [
CancelledError, IdentityNoMatchError, IdentityInvalidMsgError, MultiStreamError,
LPStreamError, MuxerError,
]
)
.} =
# new stream for identify
var stream = await muxer.newStream()
if stream == nil:

View File

@ -80,7 +80,7 @@ proc handleRelayedConnect(
proc reserve*(
cl: RelayClient, peerId: PeerId, addrs: seq[MultiAddress] = @[]
): Future[Rsvp] {.async.} =
): Future[Rsvp] {.async: (raises: [ReservationError, DialFailedError, CancelledError]).} =
let conn = await cl.switch.dial(peerId, addrs, RelayV2HopCodec)
defer:
await conn.close()

View File

@ -20,7 +20,8 @@ import
../utils/heartbeat,
../stream/connection,
../utils/offsettedseq,
../utils/semaphore
../utils/semaphore,
../discovery/discoverymngr
export chronicles
@ -295,7 +296,7 @@ proc decode(_: typedesc[Message], buf: seq[byte]): Opt[Message] =
Opt.some(msg)
type
RendezVousError* = object of LPError
RendezVousError* = object of DiscoveryError
RegisteredData = object
expiration: Moment
peerId: PeerId
@ -555,7 +556,7 @@ proc requestLocally*(rdv: RendezVous, ns: string): seq[PeerRecord] =
proc request*(
rdv: RendezVous, ns: string, l: int = DiscoverLimit.int, peers: seq[PeerId]
): Future[seq[PeerRecord]] {.async.} =
): Future[seq[PeerRecord]] {.async: (raises: [DiscoveryError, CancelledError]).} =
var
s: Table[PeerId, (PeerRecord, Register)]
limit: uint64
@ -634,7 +635,7 @@ proc request*(
proc request*(
rdv: RendezVous, ns: string, l: int = DiscoverLimit.int
): Future[seq[PeerRecord]] {.async.} =
): Future[seq[PeerRecord]] {.async: (raises: [DiscoveryError, CancelledError]).} =
await rdv.request(ns, l, rdv.peers)
proc unsubscribeLocally*(rdv: RendezVous, ns: string) =

View File

@ -41,7 +41,9 @@ proc addressMapper(
proc reserveAndUpdate(
self: AutoRelayService, relayPid: PeerId, switch: Switch
) {.async.} =
) {.async: (raises: [CatchableError]).} =
# CatchableError used to simplify raised errors here, as there could be
# many different errors raised but caller don't really care what is cause of error
while self.running:
let
rsvp = await self.client.reserve(relayPid).wait(chronos.seconds(5))
@ -86,7 +88,9 @@ method setup*(
await self.run(switch)
return hasBeenSetUp
proc manageBackedOff(self: AutoRelayService, pid: PeerId) {.async.} =
proc manageBackedOff(
self: AutoRelayService, pid: PeerId
) {.async: (raises: [CancelledError]).} =
await sleepAsync(chronos.seconds(5))
self.backingOff.keepItIf(it != pid)
self.peerAvailable.fire()

View File

@ -27,6 +27,7 @@ import
protocols/secure/secure,
peerinfo,
utils/semaphore,
./muxers/muxer,
connmanager,
nameresolving/nameresolver,
peerid,
@ -61,6 +62,8 @@ type
started: bool
services*: seq[Service]
UpgradeError* = object of LPError
Service* = ref object of RootObj
inUse: bool
@ -216,30 +219,44 @@ proc mount*[T: LPProtocol](
s.ms.addHandler(proto.codecs, proto, matcher)
s.peerInfo.protocols.add(proto.codec)
proc upgrader(switch: Switch, trans: Transport, conn: Connection) {.async.} =
let muxed = await trans.upgrade(conn, Opt.none(PeerId))
switch.connManager.storeMuxer(muxed)
await switch.peerStore.identify(muxed)
await switch.connManager.triggerPeerEvents(
muxed.connection.peerId, PeerEvent(kind: PeerEventKind.Identified, initiator: false)
)
trace "Connection upgrade succeeded"
proc upgrader(
switch: Switch, trans: Transport, conn: Connection
) {.async: (raises: [CancelledError, UpgradeError]).} =
try:
let muxed = await trans.upgrade(conn, Opt.none(PeerId))
switch.connManager.storeMuxer(muxed)
await switch.peerStore.identify(muxed)
await switch.connManager.triggerPeerEvents(
muxed.connection.peerId,
PeerEvent(kind: PeerEventKind.Identified, initiator: false),
)
except CancelledError as e:
raise e
except CatchableError as e:
raise newException(UpgradeError, e.msg, e)
proc upgradeMonitor(
switch: Switch, trans: Transport, conn: Connection, upgrades: AsyncSemaphore
) {.async.} =
) {.async: (raises: []).} =
var upgradeSuccessful = false
try:
await switch.upgrader(trans, conn).wait(30.seconds)
except CatchableError as exc:
if exc isnot CancelledError:
libp2p_failed_upgrades_incoming.inc()
if not isNil(conn):
await conn.close()
trace "Exception awaiting connection upgrade", description = exc.msg, conn
trace "Connection upgrade succeeded"
upgradeSuccessful = true
except CancelledError:
trace "Connection upgrade cancelled", conn
except AsyncTimeoutError:
trace "Connection upgrade timeout", conn
libp2p_failed_upgrades_incoming.inc()
except UpgradeError as e:
trace "Connection upgrade failed", description = e.msg, conn
libp2p_failed_upgrades_incoming.inc()
finally:
if (not upgradeSuccessful) and (not isNil(conn)):
await conn.close()
upgrades.release()
proc accept(s: Switch, transport: Transport) {.async.} = # noraises
proc accept(s: Switch, transport: Transport) {.async: (raises: []).} =
## switch accept loop, ran for every transport
##
@ -286,7 +303,7 @@ proc accept(s: Switch, transport: Transport) {.async.} = # noraises
await conn.close()
return
proc stop*(s: Switch) {.async, public.} =
proc stop*(s: Switch) {.public, async: (raises: [CancelledError]).} =
## Stop listening on every transport, and
## close every active connections
@ -318,7 +335,7 @@ proc stop*(s: Switch) {.async, public.} =
trace "Switch stopped"
proc start*(s: Switch) {.async, public.} =
proc start*(s: Switch) {.public, async: (raises: [CancelledError, LPError]).} =
## Start listening on every transport
if s.started:
@ -340,7 +357,7 @@ proc start*(s: Switch) {.async, public.} =
for fut in startFuts:
if fut.failed:
await s.stop()
raise fut.error
raise newException(LPError, "starting transports failed", fut.error)
for t in s.transports: # for each transport
if t.addrs.len > 0 or t.running: