add `{.async: (raises).}` to `connmanager`

This commit is contained in:
Ludovic Chenut 2024-08-08 12:46:21 +02:00
parent 6ec038d29a
commit 97503c419c
No known key found for this signature in database
GPG Key ID: D9A59B1907F1D50C
3 changed files with 30 additions and 19 deletions

View File

@ -67,7 +67,8 @@ type
muxed: Table[PeerId, seq[Muxer]] muxed: Table[PeerId, seq[Muxer]]
connEvents: array[ConnEventKind, OrderedSet[ConnEventHandler]] connEvents: array[ConnEventKind, OrderedSet[ConnEventHandler]]
peerEvents: array[PeerEventKind, OrderedSet[PeerEventHandler]] peerEvents: array[PeerEventKind, OrderedSet[PeerEventHandler]]
expectedConnectionsOverLimit*: Table[(PeerId, Direction), Future[Muxer]] expectedConnectionsOverLimit*:
Table[(PeerId, Direction), Future[Muxer].Raising([CancelledError])]
peerStore*: PeerStore peerStore*: PeerStore
ConnectionSlot* = object ConnectionSlot* = object
@ -123,7 +124,9 @@ proc removeConnEventHandler*(
) = ) =
c.connEvents[kind].excl(handler) c.connEvents[kind].excl(handler)
proc triggerConnEvent*(c: ConnManager, peerId: PeerId, event: ConnEvent) {.async.} = proc triggerConnEvent*(
c: ConnManager, peerId: PeerId, event: ConnEvent
) {.async: (raises: [CancelledError]).} =
try: try:
trace "About to trigger connection events", peer = peerId trace "About to trigger connection events", peer = peerId
if c.connEvents[event.kind].len() > 0: if c.connEvents[event.kind].len() > 0:
@ -153,7 +156,9 @@ proc removePeerEventHandler*(
) = ) =
c.peerEvents[kind].excl(handler) c.peerEvents[kind].excl(handler)
proc triggerPeerEvents*(c: ConnManager, peerId: PeerId, event: PeerEvent) {.async.} = proc triggerPeerEvents*(
c: ConnManager, peerId: PeerId, event: PeerEvent
) {.async: (raises: [CancelledError]).} =
trace "About to trigger peer events", peer = peerId trace "About to trigger peer events", peer = peerId
if c.peerEvents[event.kind].len == 0: if c.peerEvents[event.kind].len == 0:
return return
@ -173,7 +178,7 @@ proc triggerPeerEvents*(c: ConnManager, peerId: PeerId, event: PeerEvent) {.asyn
proc expectConnection*( proc expectConnection*(
c: ConnManager, p: PeerId, dir: Direction c: ConnManager, p: PeerId, dir: Direction
): Future[Muxer] {.async.} = ): Future[Muxer] {.async: (raises: [AlreadyExpectingConnectionError, CancelledError]).} =
## Wait for a peer to connect to us. This will bypass the `MaxConnectionsPerPeer` ## Wait for a peer to connect to us. This will bypass the `MaxConnectionsPerPeer`
let key = (p, dir) let key = (p, dir)
if key in c.expectedConnectionsOverLimit: if key in c.expectedConnectionsOverLimit:
@ -182,7 +187,7 @@ proc expectConnection*(
"Already expecting an incoming connection from that peer", "Already expecting an incoming connection from that peer",
) )
let future = newFuture[Muxer]() let future = Future[Muxer].Raising([CancelledError]).init()
c.expectedConnectionsOverLimit[key] = future c.expectedConnectionsOverLimit[key] = future
try: try:
@ -204,7 +209,7 @@ proc contains*(c: ConnManager, muxer: Muxer): bool =
let conn = muxer.connection let conn = muxer.connection
return muxer in c.muxed.getOrDefault(conn.peerId) return muxer in c.muxed.getOrDefault(conn.peerId)
proc closeMuxer(muxer: Muxer) {.async.} = proc closeMuxer(muxer: Muxer) {.async: (raises: [CancelledError]).} =
trace "Cleaning up muxer", m = muxer trace "Cleaning up muxer", m = muxer
await muxer.close() await muxer.close()
@ -215,7 +220,7 @@ proc closeMuxer(muxer: Muxer) {.async.} =
trace "Exception in close muxer handler", exc = exc.msg trace "Exception in close muxer handler", exc = exc.msg
trace "Cleaned up muxer", m = muxer trace "Cleaned up muxer", m = muxer
proc muxCleanup(c: ConnManager, mux: Muxer) {.async.} = proc muxCleanup(c: ConnManager, mux: Muxer) {.async: (raises: [CancelledError]).} =
try: try:
trace "Triggering disconnect events", mux trace "Triggering disconnect events", mux
let peerId = mux.connection.peerId let peerId = mux.connection.peerId
@ -237,7 +242,7 @@ proc muxCleanup(c: ConnManager, mux: Muxer) {.async.} =
# do not need to propagate CancelledError and should handle other errors # do not need to propagate CancelledError and should handle other errors
warn "Unexpected exception peer cleanup handler", mux, msg = exc.msg warn "Unexpected exception peer cleanup handler", mux, msg = exc.msg
proc onClose(c: ConnManager, mux: Muxer) {.async.} = proc onClose(c: ConnManager, mux: Muxer) {.async: (raises: [CancelledError]).} =
## connection close even handler ## connection close even handler
## ##
## triggers the connections resource cleanup ## triggers the connections resource cleanup
@ -321,7 +326,9 @@ proc storeMuxer*(c: ConnManager, muxer: Muxer) {.raises: [CatchableError].} =
trace "Stored muxer", muxer, direction = $muxer.connection.dir, peers = c.muxed.len trace "Stored muxer", muxer, direction = $muxer.connection.dir, peers = c.muxed.len
proc getIncomingSlot*(c: ConnManager): Future[ConnectionSlot] {.async.} = proc getIncomingSlot*(
c: ConnManager
): Future[ConnectionSlot] {.async: (raises: [CancelledError]).} =
await c.inSema.acquire() await c.inSema.acquire()
return ConnectionSlot(connManager: c, direction: In) return ConnectionSlot(connManager: c, direction: In)
@ -354,7 +361,7 @@ proc trackConnection*(cs: ConnectionSlot, conn: Connection) =
cs.release() cs.release()
return return
proc semaphoreMonitor() {.async.} = proc semaphoreMonitor() {.async: (raises: [CancelledError]).} =
try: try:
await conn.join() await conn.join()
except CatchableError as exc: except CatchableError as exc:
@ -370,14 +377,18 @@ proc trackMuxer*(cs: ConnectionSlot, mux: Muxer) =
return return
cs.trackConnection(mux.connection) cs.trackConnection(mux.connection)
proc getStream*(c: ConnManager, muxer: Muxer): Future[Connection] {.async.} = proc getStream*(
c: ConnManager, muxer: Muxer
): Future[Connection] {.async: (raises: [LPStreamError, MuxerError, CancelledError]).} =
## get a muxed stream for the passed muxer ## get a muxed stream for the passed muxer
## ##
if not (isNil(muxer)): if not (isNil(muxer)):
return await muxer.newStream() return await muxer.newStream()
proc getStream*(c: ConnManager, peerId: PeerId): Future[Connection] {.async.} = proc getStream*(
c: ConnManager, peerId: PeerId
): Future[Connection] {.async: (raises: [LPStreamError, MuxerError, CancelledError]).} =
## get a muxed stream for the passed peer from any connection ## get a muxed stream for the passed peer from any connection
## ##
@ -385,13 +396,13 @@ proc getStream*(c: ConnManager, peerId: PeerId): Future[Connection] {.async.} =
proc getStream*( proc getStream*(
c: ConnManager, peerId: PeerId, dir: Direction c: ConnManager, peerId: PeerId, dir: Direction
): Future[Connection] {.async.} = ): Future[Connection] {.async: (raises: [LPStreamError, MuxerError, CancelledError]).} =
## get a muxed stream for the passed peer from a connection with `dir` ## get a muxed stream for the passed peer from a connection with `dir`
## ##
return await c.getStream(c.selectMuxer(peerId, dir)) return await c.getStream(c.selectMuxer(peerId, dir))
proc dropPeer*(c: ConnManager, peerId: PeerId) {.async.} = proc dropPeer*(c: ConnManager, peerId: PeerId) {.async: (raises: [CancelledError]).} =
## drop connections and cleanup resources for peer ## drop connections and cleanup resources for peer
## ##
trace "Dropping peer", peerId trace "Dropping peer", peerId
@ -402,7 +413,7 @@ proc dropPeer*(c: ConnManager, peerId: PeerId) {.async.} =
trace "Peer dropped", peerId trace "Peer dropped", peerId
proc close*(c: ConnManager) {.async.} = proc close*(c: ConnManager) {.async: (raises: [CancelledError]).} =
## cleanup resources for the connection ## cleanup resources for the connection
## manager ## manager
## ##

View File

@ -20,7 +20,7 @@ logScope:
type AsyncSemaphore* = ref object of RootObj type AsyncSemaphore* = ref object of RootObj
size*: int size*: int
count: int count: int
queue: seq[Future[void]] queue: seq[Future[void].Raising([CancelledError])]
proc newAsyncSemaphore*(size: int): AsyncSemaphore = proc newAsyncSemaphore*(size: int): AsyncSemaphore =
AsyncSemaphore(size: size, count: size) AsyncSemaphore(size: size, count: size)
@ -38,14 +38,14 @@ proc tryAcquire*(s: AsyncSemaphore): bool =
trace "Acquired slot", available = s.count, queue = s.queue.len trace "Acquired slot", available = s.count, queue = s.queue.len
return true return true
proc acquire*(s: AsyncSemaphore): Future[void] = proc acquire*(s: AsyncSemaphore): Future[void].Raising([CancelledError]) =
## Acquire a resource and decrement the resource ## Acquire a resource and decrement the resource
## counter. If no more resources are available, ## counter. If no more resources are available,
## the returned future will not complete until ## the returned future will not complete until
## the resource count goes above 0. ## the resource count goes above 0.
## ##
let fut = newFuture[void]("AsyncSemaphore.acquire") let fut = Future[void].Raising([CancelledError]).init("AsyncSemaphore.acquire")
if s.tryAcquire(): if s.tryAcquire():
fut.complete() fut.complete()
return fut return fut

View File

@ -14,7 +14,7 @@ import chronos
import ../libp2p/utils/semaphore import ../libp2p/utils/semaphore
import ./helpers import ./asyncunit
randomize() randomize()