mirror of
https://github.com/status-im/nim-libp2p.git
synced 2025-02-22 09:38:18 +00:00
chore(connmanager): specify raised exceptions (#1263)
This commit is contained in:
parent
ec43d0cb9f
commit
93dd5a6768
@ -123,7 +123,9 @@ proc removeConnEventHandler*(
|
|||||||
) =
|
) =
|
||||||
c.connEvents[kind].excl(handler)
|
c.connEvents[kind].excl(handler)
|
||||||
|
|
||||||
proc triggerConnEvent*(c: ConnManager, peerId: PeerId, event: ConnEvent) {.async.} =
|
proc triggerConnEvent*(
|
||||||
|
c: ConnManager, peerId: PeerId, event: ConnEvent
|
||||||
|
) {.async: (raises: [CancelledError]).} =
|
||||||
try:
|
try:
|
||||||
trace "About to trigger connection events", peer = peerId
|
trace "About to trigger connection events", peer = peerId
|
||||||
if c.connEvents[event.kind].len() > 0:
|
if c.connEvents[event.kind].len() > 0:
|
||||||
@ -154,7 +156,9 @@ proc removePeerEventHandler*(
|
|||||||
) =
|
) =
|
||||||
c.peerEvents[kind].excl(handler)
|
c.peerEvents[kind].excl(handler)
|
||||||
|
|
||||||
proc triggerPeerEvents*(c: ConnManager, peerId: PeerId, event: PeerEvent) {.async.} =
|
proc triggerPeerEvents*(
|
||||||
|
c: ConnManager, peerId: PeerId, event: PeerEvent
|
||||||
|
) {.async: (raises: [CancelledError]).} =
|
||||||
trace "About to trigger peer events", peer = peerId
|
trace "About to trigger peer events", peer = peerId
|
||||||
if c.peerEvents[event.kind].len == 0:
|
if c.peerEvents[event.kind].len == 0:
|
||||||
return
|
return
|
||||||
@ -174,7 +178,7 @@ proc triggerPeerEvents*(c: ConnManager, peerId: PeerId, event: PeerEvent) {.asyn
|
|||||||
|
|
||||||
proc expectConnection*(
|
proc expectConnection*(
|
||||||
c: ConnManager, p: PeerId, dir: Direction
|
c: ConnManager, p: PeerId, dir: Direction
|
||||||
): Future[Muxer] {.async.} =
|
): Future[Muxer] {.async: (raises: [AlreadyExpectingConnectionError, CancelledError]).} =
|
||||||
## Wait for a peer to connect to us. This will bypass the `MaxConnectionsPerPeer`
|
## Wait for a peer to connect to us. This will bypass the `MaxConnectionsPerPeer`
|
||||||
let key = (p, dir)
|
let key = (p, dir)
|
||||||
if key in c.expectedConnectionsOverLimit:
|
if key in c.expectedConnectionsOverLimit:
|
||||||
@ -183,7 +187,7 @@ proc expectConnection*(
|
|||||||
"Already expecting an incoming connection from that peer",
|
"Already expecting an incoming connection from that peer",
|
||||||
)
|
)
|
||||||
|
|
||||||
let future = newFuture[Muxer]()
|
let future = Future[Muxer].Raising([CancelledError]).init()
|
||||||
c.expectedConnectionsOverLimit[key] = future
|
c.expectedConnectionsOverLimit[key] = future
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -205,18 +209,18 @@ proc contains*(c: ConnManager, muxer: Muxer): bool =
|
|||||||
let conn = muxer.connection
|
let conn = muxer.connection
|
||||||
return muxer in c.muxed.getOrDefault(conn.peerId)
|
return muxer in c.muxed.getOrDefault(conn.peerId)
|
||||||
|
|
||||||
proc closeMuxer(muxer: Muxer) {.async.} =
|
proc closeMuxer(muxer: Muxer) {.async: (raises: [CancelledError]).} =
|
||||||
trace "Cleaning up muxer", m = muxer
|
trace "Cleaning up muxer", m = muxer
|
||||||
|
|
||||||
await muxer.close()
|
await muxer.close()
|
||||||
if not (isNil(muxer.handler)):
|
if not (isNil(muxer.handler)):
|
||||||
try:
|
try:
|
||||||
await muxer.handler # TODO noraises?
|
await muxer.handler
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "Exception in close muxer handler", description = exc.msg
|
trace "Exception in close muxer handler", description = exc.msg
|
||||||
trace "Cleaned up muxer", m = muxer
|
trace "Cleaned up muxer", m = muxer
|
||||||
|
|
||||||
proc muxCleanup(c: ConnManager, mux: Muxer) {.async.} =
|
proc muxCleanup(c: ConnManager, mux: Muxer) {.async: (raises: []).} =
|
||||||
try:
|
try:
|
||||||
trace "Triggering disconnect events", mux
|
trace "Triggering disconnect events", mux
|
||||||
let peerId = mux.connection.peerId
|
let peerId = mux.connection.peerId
|
||||||
@ -238,7 +242,7 @@ proc muxCleanup(c: ConnManager, mux: Muxer) {.async.} =
|
|||||||
# do not need to propagate CancelledError and should handle other errors
|
# do not need to propagate CancelledError and should handle other errors
|
||||||
warn "Unexpected exception peer cleanup handler", mux, description = exc.msg
|
warn "Unexpected exception peer cleanup handler", mux, description = exc.msg
|
||||||
|
|
||||||
proc onClose(c: ConnManager, mux: Muxer) {.async.} =
|
proc onClose(c: ConnManager, mux: Muxer) {.async: (raises: []).} =
|
||||||
## connection close even handler
|
## connection close even handler
|
||||||
##
|
##
|
||||||
## triggers the connections resource cleanup
|
## triggers the connections resource cleanup
|
||||||
@ -324,7 +328,9 @@ proc storeMuxer*(c: ConnManager, muxer: Muxer) {.raises: [CatchableError].} =
|
|||||||
|
|
||||||
trace "Stored muxer", muxer, direction = $muxer.connection.dir, peers = c.muxed.len
|
trace "Stored muxer", muxer, direction = $muxer.connection.dir, peers = c.muxed.len
|
||||||
|
|
||||||
proc getIncomingSlot*(c: ConnManager): Future[ConnectionSlot] {.async.} =
|
proc getIncomingSlot*(
|
||||||
|
c: ConnManager
|
||||||
|
): Future[ConnectionSlot] {.async: (raises: [CancelledError]).} =
|
||||||
await c.inSema.acquire()
|
await c.inSema.acquire()
|
||||||
return ConnectionSlot(connManager: c, direction: In)
|
return ConnectionSlot(connManager: c, direction: In)
|
||||||
|
|
||||||
@ -339,25 +345,21 @@ proc getOutgoingSlot*(
|
|||||||
raise newTooManyConnectionsError()
|
raise newTooManyConnectionsError()
|
||||||
return ConnectionSlot(connManager: c, direction: Out)
|
return ConnectionSlot(connManager: c, direction: Out)
|
||||||
|
|
||||||
|
func semaphore(c: ConnManager, dir: Direction): AsyncSemaphore {.inline.} =
|
||||||
|
return if dir == In: c.inSema else: c.outSema
|
||||||
|
|
||||||
proc slotsAvailable*(c: ConnManager, dir: Direction): int =
|
proc slotsAvailable*(c: ConnManager, dir: Direction): int =
|
||||||
case dir
|
return semaphore(c, dir).count
|
||||||
of Direction.In:
|
|
||||||
return c.inSema.count
|
|
||||||
of Direction.Out:
|
|
||||||
return c.outSema.count
|
|
||||||
|
|
||||||
proc release*(cs: ConnectionSlot) =
|
proc release*(cs: ConnectionSlot) =
|
||||||
if cs.direction == In:
|
semaphore(cs.connManager, cs.direction).release()
|
||||||
cs.connManager.inSema.release()
|
|
||||||
else:
|
|
||||||
cs.connManager.outSema.release()
|
|
||||||
|
|
||||||
proc trackConnection*(cs: ConnectionSlot, conn: Connection) =
|
proc trackConnection*(cs: ConnectionSlot, conn: Connection) =
|
||||||
if isNil(conn):
|
if isNil(conn):
|
||||||
cs.release()
|
cs.release()
|
||||||
return
|
return
|
||||||
|
|
||||||
proc semaphoreMonitor() {.async.} =
|
proc semaphoreMonitor() {.async: (raises: [CancelledError]).} =
|
||||||
try:
|
try:
|
||||||
await conn.join()
|
await conn.join()
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
@ -373,14 +375,18 @@ proc trackMuxer*(cs: ConnectionSlot, mux: Muxer) =
|
|||||||
return
|
return
|
||||||
cs.trackConnection(mux.connection)
|
cs.trackConnection(mux.connection)
|
||||||
|
|
||||||
proc getStream*(c: ConnManager, muxer: Muxer): Future[Connection] {.async.} =
|
proc getStream*(
|
||||||
|
c: ConnManager, muxer: Muxer
|
||||||
|
): Future[Connection] {.async: (raises: [LPStreamError, MuxerError, CancelledError]).} =
|
||||||
## get a muxed stream for the passed muxer
|
## get a muxed stream for the passed muxer
|
||||||
##
|
##
|
||||||
|
|
||||||
if not (isNil(muxer)):
|
if not (isNil(muxer)):
|
||||||
return await muxer.newStream()
|
return await muxer.newStream()
|
||||||
|
|
||||||
proc getStream*(c: ConnManager, peerId: PeerId): Future[Connection] {.async.} =
|
proc getStream*(
|
||||||
|
c: ConnManager, peerId: PeerId
|
||||||
|
): Future[Connection] {.async: (raises: [LPStreamError, MuxerError, CancelledError]).} =
|
||||||
## get a muxed stream for the passed peer from any connection
|
## get a muxed stream for the passed peer from any connection
|
||||||
##
|
##
|
||||||
|
|
||||||
@ -388,13 +394,13 @@ proc getStream*(c: ConnManager, peerId: PeerId): Future[Connection] {.async.} =
|
|||||||
|
|
||||||
proc getStream*(
|
proc getStream*(
|
||||||
c: ConnManager, peerId: PeerId, dir: Direction
|
c: ConnManager, peerId: PeerId, dir: Direction
|
||||||
): Future[Connection] {.async.} =
|
): Future[Connection] {.async: (raises: [LPStreamError, MuxerError, CancelledError]).} =
|
||||||
## get a muxed stream for the passed peer from a connection with `dir`
|
## get a muxed stream for the passed peer from a connection with `dir`
|
||||||
##
|
##
|
||||||
|
|
||||||
return await c.getStream(c.selectMuxer(peerId, dir))
|
return await c.getStream(c.selectMuxer(peerId, dir))
|
||||||
|
|
||||||
proc dropPeer*(c: ConnManager, peerId: PeerId) {.async.} =
|
proc dropPeer*(c: ConnManager, peerId: PeerId) {.async: (raises: [CancelledError]).} =
|
||||||
## drop connections and cleanup resources for peer
|
## drop connections and cleanup resources for peer
|
||||||
##
|
##
|
||||||
trace "Dropping peer", peerId
|
trace "Dropping peer", peerId
|
||||||
@ -405,7 +411,7 @@ proc dropPeer*(c: ConnManager, peerId: PeerId) {.async.} =
|
|||||||
|
|
||||||
trace "Peer dropped", peerId
|
trace "Peer dropped", peerId
|
||||||
|
|
||||||
proc close*(c: ConnManager) {.async.} =
|
proc close*(c: ConnManager) {.async: (raises: [CancelledError]).} =
|
||||||
## cleanup resources for the connection
|
## cleanup resources for the connection
|
||||||
## manager
|
## manager
|
||||||
##
|
##
|
||||||
|
@ -12,14 +12,12 @@
|
|||||||
import sequtils
|
import sequtils
|
||||||
import chronos, chronicles
|
import chronos, chronicles
|
||||||
|
|
||||||
# TODO: this should probably go in chronos
|
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "libp2p semaphore"
|
topics = "libp2p semaphore"
|
||||||
|
|
||||||
type AsyncSemaphore* = ref object of RootObj
|
type AsyncSemaphore* = ref object of RootObj
|
||||||
size*: int
|
size*: int
|
||||||
count: int
|
count: int # count of available slots
|
||||||
queue: seq[Future[void]]
|
queue: seq[Future[void]]
|
||||||
|
|
||||||
proc newAsyncSemaphore*(size: int): AsyncSemaphore =
|
proc newAsyncSemaphore*(size: int): AsyncSemaphore =
|
||||||
@ -38,7 +36,9 @@ proc tryAcquire*(s: AsyncSemaphore): bool =
|
|||||||
trace "Acquired slot", available = s.count, queue = s.queue.len
|
trace "Acquired slot", available = s.count, queue = s.queue.len
|
||||||
return true
|
return true
|
||||||
|
|
||||||
proc acquire*(s: AsyncSemaphore): Future[void] =
|
proc acquire*(
|
||||||
|
s: AsyncSemaphore
|
||||||
|
): Future[void] {.async: (raises: [CancelledError], raw: true).} =
|
||||||
## Acquire a resource and decrement the resource
|
## Acquire a resource and decrement the resource
|
||||||
## counter. If no more resources are available,
|
## counter. If no more resources are available,
|
||||||
## the returned future will not complete until
|
## the returned future will not complete until
|
||||||
|
Loading…
x
Reference in New Issue
Block a user