From 97503c419cd2331421375411f40b8716c119b4c9 Mon Sep 17 00:00:00 2001 From: Ludovic Chenut Date: Thu, 8 Aug 2024 12:46:21 +0200 Subject: [PATCH] add `{.async: (raises).}` to `connmanager` --- libp2p/connmanager.nim | 41 ++++++++++++++++++++++++-------------- libp2p/utils/semaphore.nim | 6 +++--- tests/testsemaphore.nim | 2 +- 3 files changed, 30 insertions(+), 19 deletions(-) diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index f70ca35e7..5d8db58a6 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -67,7 +67,8 @@ type muxed: Table[PeerId, seq[Muxer]] connEvents: array[ConnEventKind, OrderedSet[ConnEventHandler]] peerEvents: array[PeerEventKind, OrderedSet[PeerEventHandler]] - expectedConnectionsOverLimit*: Table[(PeerId, Direction), Future[Muxer]] + expectedConnectionsOverLimit*: + Table[(PeerId, Direction), Future[Muxer].Raising([CancelledError])] peerStore*: PeerStore ConnectionSlot* = object @@ -123,7 +124,9 @@ proc removeConnEventHandler*( ) = c.connEvents[kind].excl(handler) -proc triggerConnEvent*(c: ConnManager, peerId: PeerId, event: ConnEvent) {.async.} = +proc triggerConnEvent*( + c: ConnManager, peerId: PeerId, event: ConnEvent +) {.async: (raises: [CancelledError]).} = try: trace "About to trigger connection events", peer = peerId if c.connEvents[event.kind].len() > 0: @@ -153,7 +156,9 @@ proc removePeerEventHandler*( ) = c.peerEvents[kind].excl(handler) -proc triggerPeerEvents*(c: ConnManager, peerId: PeerId, event: PeerEvent) {.async.} = +proc triggerPeerEvents*( + c: ConnManager, peerId: PeerId, event: PeerEvent +) {.async: (raises: [CancelledError]).} = trace "About to trigger peer events", peer = peerId if c.peerEvents[event.kind].len == 0: return @@ -173,7 +178,7 @@ proc triggerPeerEvents*(c: ConnManager, peerId: PeerId, event: PeerEvent) {.asyn proc expectConnection*( c: ConnManager, p: PeerId, dir: Direction -): Future[Muxer] {.async.} = +): Future[Muxer] {.async: (raises: [AlreadyExpectingConnectionError, CancelledError]).} = ## Wait for a peer to connect to us. This will bypass the `MaxConnectionsPerPeer` let key = (p, dir) if key in c.expectedConnectionsOverLimit: @@ -182,7 +187,7 @@ proc expectConnection*( "Already expecting an incoming connection from that peer", ) - let future = newFuture[Muxer]() + let future = Future[Muxer].Raising([CancelledError]).init() c.expectedConnectionsOverLimit[key] = future try: @@ -204,7 +209,7 @@ proc contains*(c: ConnManager, muxer: Muxer): bool = let conn = muxer.connection return muxer in c.muxed.getOrDefault(conn.peerId) -proc closeMuxer(muxer: Muxer) {.async.} = +proc closeMuxer(muxer: Muxer) {.async: (raises: [CancelledError]).} = trace "Cleaning up muxer", m = muxer await muxer.close() @@ -215,7 +220,7 @@ proc closeMuxer(muxer: Muxer) {.async.} = trace "Exception in close muxer handler", exc = exc.msg trace "Cleaned up muxer", m = muxer -proc muxCleanup(c: ConnManager, mux: Muxer) {.async.} = +proc muxCleanup(c: ConnManager, mux: Muxer) {.async: (raises: [CancelledError]).} = try: trace "Triggering disconnect events", mux let peerId = mux.connection.peerId @@ -237,7 +242,7 @@ proc muxCleanup(c: ConnManager, mux: Muxer) {.async.} = # do not need to propagate CancelledError and should handle other errors warn "Unexpected exception peer cleanup handler", mux, msg = exc.msg -proc onClose(c: ConnManager, mux: Muxer) {.async.} = +proc onClose(c: ConnManager, mux: Muxer) {.async: (raises: [CancelledError]).} = ## connection close even handler ## ## triggers the connections resource cleanup @@ -321,7 +326,9 @@ proc storeMuxer*(c: ConnManager, muxer: Muxer) {.raises: [CatchableError].} = trace "Stored muxer", muxer, direction = $muxer.connection.dir, peers = c.muxed.len -proc getIncomingSlot*(c: ConnManager): Future[ConnectionSlot] {.async.} = +proc getIncomingSlot*( + c: ConnManager +): Future[ConnectionSlot] {.async: (raises: [CancelledError]).} = await c.inSema.acquire() return ConnectionSlot(connManager: c, direction: In) @@ -354,7 +361,7 @@ proc trackConnection*(cs: ConnectionSlot, conn: Connection) = cs.release() return - proc semaphoreMonitor() {.async.} = + proc semaphoreMonitor() {.async: (raises: [CancelledError]).} = try: await conn.join() except CatchableError as exc: @@ -370,14 +377,18 @@ proc trackMuxer*(cs: ConnectionSlot, mux: Muxer) = return cs.trackConnection(mux.connection) -proc getStream*(c: ConnManager, muxer: Muxer): Future[Connection] {.async.} = +proc getStream*( + c: ConnManager, muxer: Muxer +): Future[Connection] {.async: (raises: [LPStreamError, MuxerError, CancelledError]).} = ## get a muxed stream for the passed muxer ## if not (isNil(muxer)): return await muxer.newStream() -proc getStream*(c: ConnManager, peerId: PeerId): Future[Connection] {.async.} = +proc getStream*( + c: ConnManager, peerId: PeerId +): Future[Connection] {.async: (raises: [LPStreamError, MuxerError, CancelledError]).} = ## get a muxed stream for the passed peer from any connection ## @@ -385,13 +396,13 @@ proc getStream*(c: ConnManager, peerId: PeerId): Future[Connection] {.async.} = proc getStream*( c: ConnManager, peerId: PeerId, dir: Direction -): Future[Connection] {.async.} = +): Future[Connection] {.async: (raises: [LPStreamError, MuxerError, CancelledError]).} = ## get a muxed stream for the passed peer from a connection with `dir` ## return await c.getStream(c.selectMuxer(peerId, dir)) -proc dropPeer*(c: ConnManager, peerId: PeerId) {.async.} = +proc dropPeer*(c: ConnManager, peerId: PeerId) {.async: (raises: [CancelledError]).} = ## drop connections and cleanup resources for peer ## trace "Dropping peer", peerId @@ -402,7 +413,7 @@ proc dropPeer*(c: ConnManager, peerId: PeerId) {.async.} = trace "Peer dropped", peerId -proc close*(c: ConnManager) {.async.} = +proc close*(c: ConnManager) {.async: (raises: [CancelledError]).} = ## cleanup resources for the connection ## manager ## diff --git a/libp2p/utils/semaphore.nim b/libp2p/utils/semaphore.nim index 6c885ea99..feef60235 100644 --- a/libp2p/utils/semaphore.nim +++ b/libp2p/utils/semaphore.nim @@ -20,7 +20,7 @@ logScope: type AsyncSemaphore* = ref object of RootObj size*: int count: int - queue: seq[Future[void]] + queue: seq[Future[void].Raising([CancelledError])] proc newAsyncSemaphore*(size: int): AsyncSemaphore = AsyncSemaphore(size: size, count: size) @@ -38,14 +38,14 @@ proc tryAcquire*(s: AsyncSemaphore): bool = trace "Acquired slot", available = s.count, queue = s.queue.len return true -proc acquire*(s: AsyncSemaphore): Future[void] = +proc acquire*(s: AsyncSemaphore): Future[void].Raising([CancelledError]) = ## Acquire a resource and decrement the resource ## counter. If no more resources are available, ## the returned future will not complete until ## the resource count goes above 0. ## - let fut = newFuture[void]("AsyncSemaphore.acquire") + let fut = Future[void].Raising([CancelledError]).init("AsyncSemaphore.acquire") if s.tryAcquire(): fut.complete() return fut diff --git a/tests/testsemaphore.nim b/tests/testsemaphore.nim index 95e5b9080..a1e6daf93 100644 --- a/tests/testsemaphore.nim +++ b/tests/testsemaphore.nim @@ -14,7 +14,7 @@ import chronos import ../libp2p/utils/semaphore -import ./helpers +import ./asyncunit randomize()