chore: specify raising exceptions in daemon module (#1249)

This commit is contained in:
vladopajic 2025-02-14 15:36:29 +01:00 committed by GitHub
parent a6e45d6157
commit be33ad6ac7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 273 additions and 74 deletions

View File

@ -158,7 +158,7 @@ type
key*: PublicKey key*: PublicKey
P2PStreamCallback* = proc(api: DaemonAPI, stream: P2PStream): Future[void] {. P2PStreamCallback* = proc(api: DaemonAPI, stream: P2PStream): Future[void] {.
gcsafe, raises: [CatchableError] gcsafe, async: (raises: [CatchableError])
.} .}
P2PPubSubCallback* = proc( P2PPubSubCallback* = proc(
api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage api: DaemonAPI, ticket: PubsubTicket, message: PubSubMessage
@ -485,7 +485,11 @@ proc getErrorMessage(pb: ProtoBuffer): string {.inline, raises: [DaemonLocalErro
if initProtoBuffer(error).getRequiredField(1, result).isErr(): if initProtoBuffer(error).getRequiredField(1, result).isErr():
raise newException(DaemonLocalError, "Error message is missing!") raise newException(DaemonLocalError, "Error message is missing!")
proc recvMessage(conn: StreamTransport): Future[seq[byte]] {.async.} = proc recvMessage(
conn: StreamTransport
): Future[seq[byte]] {.
async: (raises: [TransportIncompleteError, TransportError, CancelledError])
.} =
var var
size: uint size: uint
length: int length: int
@ -508,13 +512,19 @@ proc recvMessage(conn: StreamTransport): Future[seq[byte]] {.async.} =
result = buffer result = buffer
proc newConnection*(api: DaemonAPI): Future[StreamTransport] {.raises: [LPError].} = proc newConnection*(
result = connect(api.address) api: DaemonAPI
): Future[StreamTransport] {.
async: (raises: [MaInvalidAddress, TransportError, CancelledError, LPError])
.} =
await connect(api.address)
proc closeConnection*(api: DaemonAPI, transp: StreamTransport): Future[void] = proc closeConnection*(
result = transp.closeWait() api: DaemonAPI, transp: StreamTransport
): Future[void] {.async: (raises: [CancelledError]).} =
await transp.closeWait()
proc socketExists(address: MultiAddress): Future[bool] {.async.} = proc socketExists(address: MultiAddress): Future[bool] {.async: (raises: []).} =
try: try:
var transp = await connect(address) var transp = await connect(address)
await transp.closeWait() await transp.closeWait()
@ -534,7 +544,9 @@ else:
proc getProcessId(): int = proc getProcessId(): int =
result = int(posix.getpid()) result = int(posix.getpid())
proc getSocket(pattern: string, count: ptr int): Future[MultiAddress] {.async.} = proc getSocket(
pattern: string, count: ptr int
): Future[MultiAddress] {.async: (raises: [ValueError, LPError]).} =
var sockname = "" var sockname = ""
var pid = $getProcessId() var pid = $getProcessId()
sockname = pattern % [pid, $(count[])] sockname = pattern % [pid, $(count[])]
@ -562,7 +574,35 @@ proc getSocket(pattern: string, count: ptr int): Future[MultiAddress] {.async.}
closeSocket(sock) closeSocket(sock)
# This is forward declaration needed for newDaemonApi() # This is forward declaration needed for newDaemonApi()
proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} proc listPeers*(
api: DaemonAPI
): Future[seq[PeerInfo]] {.
async: (
raises: [
ValueError, DaemonLocalError, OSError, MaInvalidAddress, TransportError,
CancelledError, LPError,
]
)
.}
template exceptionToAssert(body: untyped): untyped =
block:
var res: type(body)
when defined(nimHasWarnBareExcept):
{.push warning[BareExcept]: off.}
try:
res = body
except OSError as exc:
raise exc
except IOError as exc:
raise exc
except Defect as exc:
raise exc
except Exception as exc:
raiseAssert exc.msg
when defined(nimHasWarnBareExcept):
{.pop.}
res
proc copyEnv(): StringTableRef = proc copyEnv(): StringTableRef =
## This procedure copy all environment variables into StringTable. ## This procedure copy all environment variables into StringTable.
@ -586,7 +626,14 @@ proc newDaemonApi*(
peersRequired = 2, peersRequired = 2,
logFile = "", logFile = "",
logLevel = IpfsLogLevel.Debug, logLevel = IpfsLogLevel.Debug,
): Future[DaemonAPI] {.async.} = ): Future[DaemonAPI] {.
async: (
raises: [
ValueError, DaemonLocalError, CancelledError, LPError, OSError, IOError,
AsyncError,
]
)
.} =
## Initialize connection to `go-libp2p-daemon` control socket. ## Initialize connection to `go-libp2p-daemon` control socket.
## ##
## ``flags`` - set of P2PDaemonFlags. ## ``flags`` - set of P2PDaemonFlags.
@ -780,7 +827,7 @@ proc newDaemonApi*(
result = api result = api
proc close*(stream: P2PStream) {.async.} = proc close*(stream: P2PStream) {.async: (raises: [DaemonLocalError]).} =
## Close ``stream``. ## Close ``stream``.
if P2PStreamFlags.Closed notin stream.flags: if P2PStreamFlags.Closed notin stream.flags:
await stream.transp.closeWait() await stream.transp.closeWait()
@ -789,7 +836,9 @@ proc close*(stream: P2PStream) {.async.} =
else: else:
raise newException(DaemonLocalError, "Stream is already closed!") raise newException(DaemonLocalError, "Stream is already closed!")
proc close*(api: DaemonAPI) {.async.} = proc close*(
api: DaemonAPI
) {.async: (raises: [TransportOsError, LPError, ValueError, OSError, CancelledError]).} =
## Shutdown connections to `go-libp2p-daemon` control socket. ## Shutdown connections to `go-libp2p-daemon` control socket.
# await api.pool.close() # await api.pool.close()
# Closing all pending servers. # Closing all pending servers.
@ -827,7 +876,9 @@ template withMessage(m, body: untyped): untyped =
proc transactMessage( proc transactMessage(
transp: StreamTransport, pb: ProtoBuffer transp: StreamTransport, pb: ProtoBuffer
): Future[ProtoBuffer] {.async.} = ): Future[ProtoBuffer] {.
async: (raises: [DaemonLocalError, TransportError, CancelledError])
.} =
let length = pb.getLen() let length = pb.getLen()
let res = await transp.write(pb.getPtr(), length) let res = await transp.write(pb.getPtr(), length)
if res != length: if res != length:
@ -845,7 +896,11 @@ proc getPeerInfo(pb: ProtoBuffer): PeerInfo {.raises: [DaemonLocalError].} =
discard pb.getRepeatedField(2, result.addresses) discard pb.getRepeatedField(2, result.addresses)
proc identity*(api: DaemonAPI): Future[PeerInfo] {.async.} = proc identity*(
api: DaemonAPI
): Future[PeerInfo] {.
async: (raises: [MaInvalidAddress, TransportError, CancelledError, LPError])
.} =
## Get Node identity information ## Get Node identity information
var transp = await api.newConnection() var transp = await api.newConnection()
try: try:
@ -860,7 +915,7 @@ proc identity*(api: DaemonAPI): Future[PeerInfo] {.async.} =
proc connect*( proc connect*(
api: DaemonAPI, peer: PeerId, addresses: seq[MultiAddress], timeout = 0 api: DaemonAPI, peer: PeerId, addresses: seq[MultiAddress], timeout = 0
) {.async.} = ) {.async: (raises: [MaInvalidAddress, TransportError, CancelledError, LPError]).} =
## Connect to remote peer with id ``peer`` and addresses ``addresses``. ## Connect to remote peer with id ``peer`` and addresses ``addresses``.
var transp = await api.newConnection() var transp = await api.newConnection()
try: try:
@ -870,7 +925,9 @@ proc connect*(
except CatchableError: except CatchableError:
await api.closeConnection(transp) await api.closeConnection(transp)
proc disconnect*(api: DaemonAPI, peer: PeerId) {.async.} = proc disconnect*(
api: DaemonAPI, peer: PeerId
) {.async: (raises: [MaInvalidAddress, TransportError, CancelledError, LPError]).} =
## Disconnect from remote peer with id ``peer``. ## Disconnect from remote peer with id ``peer``.
var transp = await api.newConnection() var transp = await api.newConnection()
try: try:
@ -882,7 +939,12 @@ proc disconnect*(api: DaemonAPI, peer: PeerId) {.async.} =
proc openStream*( proc openStream*(
api: DaemonAPI, peer: PeerId, protocols: seq[string], timeout = 0 api: DaemonAPI, peer: PeerId, protocols: seq[string], timeout = 0
): Future[P2PStream] {.async.} = ): Future[P2PStream] {.
async: (
raises:
[MaInvalidAddress, TransportError, CancelledError, LPError, DaemonLocalError]
)
.} =
## Open new stream to peer ``peer`` using one of the protocols in ## Open new stream to peer ``peer`` using one of the protocols in
## ``protocols``. Returns ``StreamTransport`` for the stream. ## ``protocols``. Returns ``StreamTransport`` for the stream.
var transp = await api.newConnection() var transp = await api.newConnection()
@ -903,9 +965,9 @@ proc openStream*(
stream.flags.incl(Outbound) stream.flags.incl(Outbound)
stream.transp = transp stream.transp = transp
result = stream result = stream
except CatchableError as exc: except ResultError[ProtoError]:
await api.closeConnection(transp) await api.closeConnection(transp)
raise exc raise newException(DaemonLocalError, "Wrong message type!")
proc streamHandler(server: StreamServer, transp: StreamTransport) {.async.} = proc streamHandler(server: StreamServer, transp: StreamTransport) {.async.} =
var api = getUserData[DaemonAPI](server) var api = getUserData[DaemonAPI](server)
@ -927,11 +989,28 @@ proc streamHandler(server: StreamServer, transp: StreamTransport) {.async.} =
proc addHandler*( proc addHandler*(
api: DaemonAPI, protocols: seq[string], handler: P2PStreamCallback api: DaemonAPI, protocols: seq[string], handler: P2PStreamCallback
) {.async, raises: [LPError].} = ) {.
async: (
raises: [
MaInvalidAddress, DaemonLocalError, TransportError, CancelledError, LPError,
ValueError,
]
)
.} =
## Add stream handler ``handler`` for set of protocols ``protocols``. ## Add stream handler ``handler`` for set of protocols ``protocols``.
var transp = await api.newConnection() var transp = await api.newConnection()
let maddress = await getSocket(api.pattern, addr api.ucounter) let maddress = await getSocket(api.pattern, addr api.ucounter)
var server = createStreamServer(maddress, streamHandler, udata = api) var server = createStreamServer(maddress, streamHandler, udata = api)
var removeHandler = proc(): Future[void] {.
async: (raises: [CancelledError, TransportError])
.} =
for item in protocols:
api.handlers.del(item)
server.stop()
server.close()
await server.join()
try: try:
for item in protocols: for item in protocols:
api.handlers[item] = handler api.handlers[item] = handler
@ -939,17 +1018,28 @@ proc addHandler*(
var pb = await transp.transactMessage(requestStreamHandler(maddress, protocols)) var pb = await transp.transactMessage(requestStreamHandler(maddress, protocols))
pb.withMessage: pb.withMessage:
api.servers.add(P2PServer(server: server, address: maddress)) api.servers.add(P2PServer(server: server, address: maddress))
except CatchableError as exc: except DaemonLocalError as e:
for item in protocols: await removeHandler()
api.handlers.del(item) raise e
server.stop() except TransportError as e:
server.close() await removeHandler()
await server.join() raise e
raise exc except CancelledError as e:
await removeHandler()
raise e
finally: finally:
await api.closeConnection(transp) await api.closeConnection(transp)
proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} = proc listPeers*(
api: DaemonAPI
): Future[seq[PeerInfo]] {.
async: (
raises: [
ValueError, DaemonLocalError, OSError, MaInvalidAddress, TransportError,
CancelledError, LPError,
]
)
.} =
## Get list of remote peers to which we are currently connected. ## Get list of remote peers to which we are currently connected.
var transp = await api.newConnection() var transp = await api.newConnection()
try: try:
@ -964,7 +1054,14 @@ proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} =
finally: finally:
await api.closeConnection(transp) await api.closeConnection(transp)
proc cmTagPeer*(api: DaemonAPI, peer: PeerId, tag: string, weight: int) {.async.} = proc cmTagPeer*(
api: DaemonAPI, peer: PeerId, tag: string, weight: int
) {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Tag peer with id ``peer`` using ``tag`` and ``weight``. ## Tag peer with id ``peer`` using ``tag`` and ``weight``.
var transp = await api.newConnection() var transp = await api.newConnection()
try: try:
@ -974,7 +1071,14 @@ proc cmTagPeer*(api: DaemonAPI, peer: PeerId, tag: string, weight: int) {.async.
finally: finally:
await api.closeConnection(transp) await api.closeConnection(transp)
proc cmUntagPeer*(api: DaemonAPI, peer: PeerId, tag: string) {.async.} = proc cmUntagPeer*(
api: DaemonAPI, peer: PeerId, tag: string
) {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Remove tag ``tag`` from peer with id ``peer``. ## Remove tag ``tag`` from peer with id ``peer``.
var transp = await api.newConnection() var transp = await api.newConnection()
try: try:
@ -984,7 +1088,14 @@ proc cmUntagPeer*(api: DaemonAPI, peer: PeerId, tag: string) {.async.} =
finally: finally:
await api.closeConnection(transp) await api.closeConnection(transp)
proc cmTrimPeers*(api: DaemonAPI) {.async.} = proc cmTrimPeers*(
api: DaemonAPI
) {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Trim all connections. ## Trim all connections.
var transp = await api.newConnection() var transp = await api.newConnection()
try: try:
@ -1058,7 +1169,12 @@ proc getDhtMessageType(
proc dhtFindPeer*( proc dhtFindPeer*(
api: DaemonAPI, peer: PeerId, timeout = 0 api: DaemonAPI, peer: PeerId, timeout = 0
): Future[PeerInfo] {.async.} = ): Future[PeerInfo] {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Find peer with id ``peer`` and return peer information ``PeerInfo``. ## Find peer with id ``peer`` and return peer information ``PeerInfo``.
## ##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
@ -1073,7 +1189,12 @@ proc dhtFindPeer*(
proc dhtGetPublicKey*( proc dhtGetPublicKey*(
api: DaemonAPI, peer: PeerId, timeout = 0 api: DaemonAPI, peer: PeerId, timeout = 0
): Future[PublicKey] {.async.} = ): Future[PublicKey] {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Get peer's public key from peer with id ``peer``. ## Get peer's public key from peer with id ``peer``.
## ##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
@ -1088,7 +1209,12 @@ proc dhtGetPublicKey*(
proc dhtGetValue*( proc dhtGetValue*(
api: DaemonAPI, key: string, timeout = 0 api: DaemonAPI, key: string, timeout = 0
): Future[seq[byte]] {.async.} = ): Future[seq[byte]] {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Get value associated with ``key``. ## Get value associated with ``key``.
## ##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
@ -1103,7 +1229,12 @@ proc dhtGetValue*(
proc dhtPutValue*( proc dhtPutValue*(
api: DaemonAPI, key: string, value: seq[byte], timeout = 0 api: DaemonAPI, key: string, value: seq[byte], timeout = 0
) {.async.} = ) {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Associate ``value`` with ``key``. ## Associate ``value`` with ``key``.
## ##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
@ -1116,7 +1247,14 @@ proc dhtPutValue*(
finally: finally:
await api.closeConnection(transp) await api.closeConnection(transp)
proc dhtProvide*(api: DaemonAPI, cid: Cid, timeout = 0) {.async.} = proc dhtProvide*(
api: DaemonAPI, cid: Cid, timeout = 0
) {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Provide content with id ``cid``. ## Provide content with id ``cid``.
## ##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
@ -1131,7 +1269,12 @@ proc dhtProvide*(api: DaemonAPI, cid: Cid, timeout = 0) {.async.} =
proc dhtFindPeersConnectedToPeer*( proc dhtFindPeersConnectedToPeer*(
api: DaemonAPI, peer: PeerId, timeout = 0 api: DaemonAPI, peer: PeerId, timeout = 0
): Future[seq[PeerInfo]] {.async.} = ): Future[seq[PeerInfo]] {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Find peers which are connected to peer with id ``peer``. ## Find peers which are connected to peer with id ``peer``.
## ##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
@ -1157,7 +1300,12 @@ proc dhtFindPeersConnectedToPeer*(
proc dhtGetClosestPeers*( proc dhtGetClosestPeers*(
api: DaemonAPI, key: string, timeout = 0 api: DaemonAPI, key: string, timeout = 0
): Future[seq[PeerId]] {.async.} = ): Future[seq[PeerId]] {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Get closest peers for ``key``. ## Get closest peers for ``key``.
## ##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
@ -1183,7 +1331,12 @@ proc dhtGetClosestPeers*(
proc dhtFindProviders*( proc dhtFindProviders*(
api: DaemonAPI, cid: Cid, count: uint32, timeout = 0 api: DaemonAPI, cid: Cid, count: uint32, timeout = 0
): Future[seq[PeerInfo]] {.async.} = ): Future[seq[PeerInfo]] {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Get ``count`` providers for content with id ``cid``. ## Get ``count`` providers for content with id ``cid``.
## ##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
@ -1209,7 +1362,12 @@ proc dhtFindProviders*(
proc dhtSearchValue*( proc dhtSearchValue*(
api: DaemonAPI, key: string, timeout = 0 api: DaemonAPI, key: string, timeout = 0
): Future[seq[seq[byte]]] {.async.} = ): Future[seq[seq[byte]]] {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Search for value with ``key``, return list of values found. ## Search for value with ``key``, return list of values found.
## ##
## You can specify timeout for DHT request with ``timeout`` value. ``0`` value ## You can specify timeout for DHT request with ``timeout`` value. ``0`` value
@ -1232,7 +1390,14 @@ proc dhtSearchValue*(
finally: finally:
await api.closeConnection(transp) await api.closeConnection(transp)
proc pubsubGetTopics*(api: DaemonAPI): Future[seq[string]] {.async.} = proc pubsubGetTopics*(
api: DaemonAPI
): Future[seq[string]] {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Get list of topics this node is subscribed to. ## Get list of topics this node is subscribed to.
var transp = await api.newConnection() var transp = await api.newConnection()
try: try:
@ -1245,7 +1410,14 @@ proc pubsubGetTopics*(api: DaemonAPI): Future[seq[string]] {.async.} =
finally: finally:
await api.closeConnection(transp) await api.closeConnection(transp)
proc pubsubListPeers*(api: DaemonAPI, topic: string): Future[seq[PeerId]] {.async.} = proc pubsubListPeers*(
api: DaemonAPI, topic: string
): Future[seq[PeerId]] {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Get list of peers we are connected to and which also subscribed to topic ## Get list of peers we are connected to and which also subscribed to topic
## ``topic``. ## ``topic``.
var transp = await api.newConnection() var transp = await api.newConnection()
@ -1260,7 +1432,14 @@ proc pubsubListPeers*(api: DaemonAPI, topic: string): Future[seq[PeerId]] {.asyn
finally: finally:
await api.closeConnection(transp) await api.closeConnection(transp)
proc pubsubPublish*(api: DaemonAPI, topic: string, value: seq[byte]) {.async.} = proc pubsubPublish*(
api: DaemonAPI, topic: string, value: seq[byte]
) {.
async: (
raises:
[DaemonLocalError, MaInvalidAddress, TransportError, CancelledError, LPError]
)
.} =
## Get list of peer identifiers which are subscribed to topic ``topic``. ## Get list of peer identifiers which are subscribed to topic ``topic``.
var transp = await api.newConnection() var transp = await api.newConnection()
try: try:
@ -1280,7 +1459,13 @@ proc getPubsubMessage*(pb: ProtoBuffer): PubSubMessage =
discard pb.getField(5, result.signature) discard pb.getField(5, result.signature)
discard pb.getField(6, result.key) discard pb.getField(6, result.key)
proc pubsubLoop(api: DaemonAPI, ticket: PubsubTicket) {.async.} = proc pubsubLoop(
api: DaemonAPI, ticket: PubsubTicket
) {.
async: (
raises: [TransportIncompleteError, TransportError, CancelledError, CatchableError]
)
.} =
while true: while true:
var pbmessage = await ticket.transp.recvMessage() var pbmessage = await ticket.transp.recvMessage()
if len(pbmessage) == 0: if len(pbmessage) == 0:
@ -1296,7 +1481,12 @@ proc pubsubLoop(api: DaemonAPI, ticket: PubsubTicket) {.async.} =
proc pubsubSubscribe*( proc pubsubSubscribe*(
api: DaemonAPI, topic: string, handler: P2PPubSubCallback api: DaemonAPI, topic: string, handler: P2PPubSubCallback
): Future[PubsubTicket] {.async: (raises: [CatchableError]).} = ): Future[PubsubTicket] {.
async: (
raises:
[MaInvalidAddress, TransportError, LPError, CancelledError, DaemonLocalError]
)
.} =
## Subscribe to topic ``topic``. ## Subscribe to topic ``topic``.
var transp = await api.newConnection() var transp = await api.newConnection()
try: try:
@ -1308,7 +1498,13 @@ proc pubsubSubscribe*(
ticket.transp = transp ticket.transp = transp
asyncSpawn pubsubLoop(api, ticket) asyncSpawn pubsubLoop(api, ticket)
result = ticket result = ticket
except CatchableError as exc: except DaemonLocalError as exc:
await api.closeConnection(transp)
raise exc
except TransportError as exc:
await api.closeConnection(transp)
raise exc
except CancelledError as exc:
await api.closeConnection(transp) await api.closeConnection(transp)
raise exc raise exc

View File

@ -55,7 +55,7 @@ proc newPool*(
address: TransportAddress, address: TransportAddress,
poolsize: int = DefaultPoolSize, poolsize: int = DefaultPoolSize,
bufferSize = DefaultStreamBufferSize, bufferSize = DefaultStreamBufferSize,
): Future[TransportPool] {.async.} = ): Future[TransportPool] {.async: (raises: [CancelledError]).} =
## Establish pool of connections to address ``address`` with size ## Establish pool of connections to address ``address`` with size
## ``poolsize``. ## ``poolsize``.
var pool = new TransportPool var pool = new TransportPool
@ -80,7 +80,9 @@ proc newPool*(
pool.state = Connected pool.state = Connected
result = pool result = pool
proc acquire*(pool: TransportPool): Future[StreamTransport] {.async.} = proc acquire*(
pool: TransportPool
): Future[StreamTransport] {.async: (raises: [CancelledError, TransportPoolError]).} =
## Acquire non-busy connection from pool ``pool``. ## Acquire non-busy connection from pool ``pool``.
var transp: StreamTransport var transp: StreamTransport
if pool.state in {Connected}: if pool.state in {Connected}:
@ -102,7 +104,9 @@ proc acquire*(pool: TransportPool): Future[StreamTransport] {.async.} =
raise newException(TransportPoolError, "Pool is not ready!") raise newException(TransportPoolError, "Pool is not ready!")
result = transp result = transp
proc release*(pool: TransportPool, transp: StreamTransport) = proc release*(
pool: TransportPool, transp: StreamTransport
) {.async: (raises: [TransportPoolError]).} =
## Release connection ``transp`` back to pool ``pool``. ## Release connection ``transp`` back to pool ``pool``.
if pool.state in {Connected, Closing}: if pool.state in {Connected, Closing}:
var found = false var found = false
@ -118,7 +122,9 @@ proc release*(pool: TransportPool, transp: StreamTransport) =
else: else:
raise newException(TransportPoolError, "Pool is not ready!") raise newException(TransportPoolError, "Pool is not ready!")
proc join*(pool: TransportPool) {.async.} = proc join*(
pool: TransportPool
) {.async: (raises: [TransportPoolError, CancelledError]).} =
## Waiting for all connection to become available. ## Waiting for all connection to become available.
if pool.state in {Connected, Closing}: if pool.state in {Connected, Closing}:
while true: while true:
@ -130,7 +136,9 @@ proc join*(pool: TransportPool) {.async.} =
elif pool.state == Connecting: elif pool.state == Connecting:
raise newException(TransportPoolError, "Pool is not ready!") raise newException(TransportPoolError, "Pool is not ready!")
proc close*(pool: TransportPool) {.async.} = proc close*(
pool: TransportPool
) {.async: (raises: [TransportPoolError, CancelledError]).} =
## Closes transports pool ``pool`` and release all resources. ## Closes transports pool ``pool`` and release all resources.
if pool.state == Connected: if pool.state == Connected:
pool.state = Closing pool.state = Closing

View File

@ -71,23 +71,6 @@ proc capLen*[T](s: var seq[T], length: Natural) =
if s.len > length: if s.len > length:
s.setLen(length) s.setLen(length)
template exceptionToAssert*(body: untyped): untyped =
block:
var res: type(body)
when defined(nimHasWarnBareExcept):
{.push warning[BareExcept]: off.}
try:
res = body
except CatchableError as exc:
raise exc
except Defect as exc:
raise exc
except Exception as exc:
raiseAssert exc.msg
when defined(nimHasWarnBareExcept):
{.pop.}
res
template withValue*[T](self: Opt[T] | Option[T], value, body: untyped): untyped = template withValue*[T](self: Opt[T] | Option[T], value, body: untyped): untyped =
## This template provides a convenient way to work with `Option` types in Nim. ## This template provides a convenient way to work with `Option` types in Nim.
## It allows you to execute a block of code (`body`) only when the `Option` is not empty. ## It allows you to execute a block of code (`body`) only when the `Option` is not empty.

View File

@ -67,7 +67,9 @@ proc connect*(
child: StreamTransport = nil, child: StreamTransport = nil,
flags = default(set[SocketFlags]), flags = default(set[SocketFlags]),
localAddress: Opt[MultiAddress] = Opt.none(MultiAddress), localAddress: Opt[MultiAddress] = Opt.none(MultiAddress),
): Future[StreamTransport] {.async.} = ): Future[StreamTransport] {.
async: (raises: [MaInvalidAddress, TransportError, CancelledError, LPError])
.} =
## Open new connection to remote peer with address ``ma`` and create ## Open new connection to remote peer with address ``ma`` and create
## new transport object ``StreamTransport`` for established connection. ## new transport object ``StreamTransport`` for established connection.
## ``bufferSize`` is size of internal buffer for transport. ## ``bufferSize`` is size of internal buffer for transport.

View File

@ -185,7 +185,9 @@ proc commonInteropTests*(name: string, swCreator: SwitchCreator) =
let daemonPeer = await daemonNode.identity() let daemonPeer = await daemonNode.identity()
var testFuture = newFuture[void]("test.future") var testFuture = newFuture[void]("test.future")
proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = proc daemonHandler(
api: DaemonAPI, stream: P2PStream
) {.async: (raises: [CatchableError]).} =
check string.fromBytes(await stream.transp.readLp()) == "test 1" check string.fromBytes(await stream.transp.readLp()) == "test 1"
discard await stream.transp.writeLp("test 2") discard await stream.transp.writeLp("test 2")
check string.fromBytes(await stream.transp.readLp()) == "test 3" check string.fromBytes(await stream.transp.readLp()) == "test 3"
@ -227,7 +229,9 @@ proc commonInteropTests*(name: string, swCreator: SwitchCreator) =
let daemonPeer = await daemonNode.identity() let daemonPeer = await daemonNode.identity()
var testFuture = newFuture[string]("test.future") var testFuture = newFuture[string]("test.future")
proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = proc daemonHandler(
api: DaemonAPI, stream: P2PStream
) {.async: (raises: [CatchableError]).} =
# We should perform `readLp()` instead of `readLine()`. `readLine()` # We should perform `readLp()` instead of `readLine()`. `readLine()`
# here reads actually length prefixed string. # here reads actually length prefixed string.
var line = await stream.transp.readLine() var line = await stream.transp.readLine()
@ -351,7 +355,9 @@ proc commonInteropTests*(name: string, swCreator: SwitchCreator) =
let daemonPeer = await daemonNode.identity() let daemonPeer = await daemonNode.identity()
var testFuture = newFuture[string]("test.future") var testFuture = newFuture[string]("test.future")
proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = proc daemonHandler(
api: DaemonAPI, stream: P2PStream
) {.async: (raises: [CatchableError]).} =
# We should perform `readLp()` instead of `readLine()`. `readLine()` # We should perform `readLp()` instead of `readLine()`. `readLine()`
# here reads actually length prefixed string. # here reads actually length prefixed string.
var line = await stream.transp.readLine() var line = await stream.transp.readLine()
@ -485,7 +491,9 @@ proc relayInteropTests*(name: string, relayCreator: SwitchCreator) =
# TODO: This Future blocks the daemonHandler after sending the last message. # TODO: This Future blocks the daemonHandler after sending the last message.
# It exists because there's a strange behavior where stream.close sends # It exists because there's a strange behavior where stream.close sends
# a Rst instead of Fin. We should investigate this at some point. # a Rst instead of Fin. We should investigate this at some point.
proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = proc daemonHandler(
api: DaemonAPI, stream: P2PStream
) {.async: (raises: [CatchableError]).} =
check "line1" == string.fromBytes(await stream.transp.readLp()) check "line1" == string.fromBytes(await stream.transp.readLp())
discard await stream.transp.writeLp("line2") discard await stream.transp.writeLp("line2")
check "line3" == string.fromBytes(await stream.transp.readLp()) check "line3" == string.fromBytes(await stream.transp.readLp())

View File

@ -28,7 +28,9 @@ proc connectStreamTest(): Future[bool] {.async.} =
var testFuture = newFuture[string]("test.future") var testFuture = newFuture[string]("test.future")
proc streamHandler(api: DaemonAPI, stream: P2PStream) {.async.} = proc streamHandler(
api: DaemonAPI, stream: P2PStream
) {.async: (raises: [CatchableError]).} =
var line = await stream.transp.readLine() var line = await stream.transp.readLine()
testFuture.complete(line) testFuture.complete(line)