Add async raises annotations for uTP code (#692)

* Add async raises annotations for uTP code

* Avoid compiler error + further clean-up
This commit is contained in:
Kim De Mey 2024-06-11 13:56:37 +02:00 committed by GitHub
parent 3d66c5b899
commit c3f9160fd2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 82 additions and 93 deletions

View File

@ -1,4 +1,4 @@
# Copyright (c) 2021-2023 Status Research & Development GmbH
# Copyright (c) 2021-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
@ -137,12 +137,14 @@ proc new*(
socketConfig
)
proc connectTo*(r: UtpDiscv5Protocol, address: NodeAddress):
Future[ConnectionResult[NodeAddress]] =
proc connectTo*(
r: UtpDiscv5Protocol, address: NodeAddress
): Future[ConnectionResult[NodeAddress]] {.async: (raw: true, raises: [CancelledError]).} =
return r.router.connectTo(address)
proc connectTo*(r: UtpDiscv5Protocol, address: NodeAddress, connectionId: uint16):
Future[ConnectionResult[NodeAddress]] =
proc connectTo*(
r: UtpDiscv5Protocol, address: NodeAddress, connectionId: uint16
): Future[ConnectionResult[NodeAddress]] {.async: (raw: true, raises: [CancelledError]).} =
return r.router.connectTo(address, connectionId)
proc shutdown*(r: UtpDiscv5Protocol) =
@ -150,7 +152,7 @@ proc shutdown*(r: UtpDiscv5Protocol) =
## this is up to user)
r.router.shutdown()
proc shutdownWait*(r: UtpDiscv5Protocol) {.async.} =
proc shutdownWait*(r: UtpDiscv5Protocol) {.async: (raises: []).} =
## Closes all managed utp connections in background (does not close discovery,
## this is up to user)
await r.router.shutdownWait()

View File

@ -1,4 +1,4 @@
# Copyright (c) 2021-2023 Status Research & Development GmbH
# Copyright (c) 2021-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
@ -137,16 +137,20 @@ proc new*(
rng
)
proc shutdownWait*(p: UtpProtocol): Future[void] {.async.} =
## closes all managed utp sockets and then underlying transport
proc shutdownWait*(p: UtpProtocol): Future[void] {.async: (raises: []).} =
## Closes all managed utp sockets and then underlying transport
await p.utpRouter.shutdownWait()
await p.transport.closeWait()
proc connectTo*(r: UtpProtocol, address: TransportAddress): Future[ConnectionResult[TransportAddress]] =
return r.utpRouter.connectTo(address)
proc connectTo*(
r: UtpProtocol, address: TransportAddress
): Future[ConnectionResult[TransportAddress]] {.async: (raw: true, raises: [CancelledError]).} =
r.utpRouter.connectTo(address)
proc connectTo*(r: UtpProtocol, address: TransportAddress, connectionId: uint16): Future[ConnectionResult[TransportAddress]] =
return r.utpRouter.connectTo(address, connectionId)
proc connectTo*(
r: UtpProtocol, address: TransportAddress, connectionId: uint16
): Future[ConnectionResult[TransportAddress]] {.async: (raw: true, raises: [CancelledError]).} =
r.utpRouter.connectTo(address, connectionId)
proc openSockets*(r: UtpProtocol): int =
len(r.utpRouter)

View File

@ -1,4 +1,4 @@
# Copyright (c) 2021-2023 Status Research & Development GmbH
# Copyright (c) 2021-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
@ -270,7 +270,7 @@ proc processIncomingBytes*[A](
warn "Failed to decode packet from address", address = sender, msg = err
proc generateNewUniqueSocket[A](
r: UtpRouter[A], address: A):Option[UtpSocket[A]] =
r: UtpRouter[A], address: A): Opt[UtpSocket[A]] =
## Try to generate unique socket, give up after maxSocketGenerationTries tries
var tryCount = 0
@ -280,80 +280,64 @@ proc generateNewUniqueSocket[A](
address, r.sendCb, r.socketConfig, rcvId, r.rng[])
if r.registerIfAbsent(socket):
return some(socket)
return Opt.some(socket)
inc tryCount
return none[UtpSocket[A]]()
return Opt.none(UtpSocket[A])
proc innerConnect[A](s: UtpSocket[A]): Future[ConnectionResult[A]] {.async.} =
try:
await s.startOutgoingSocket()
utp_success_outgoing.inc()
debug "Outgoing connection successful", dst = s.socketKey
return ok(s)
except ConnectionError:
utp_failed_outgoing.inc()
debug "Outgoing connection timed-out", dst = s.socketKey
s.destroy()
return err(OutgoingConnectionError(kind: ConnectionTimedOut))
except CancelledError as exc:
s.destroy()
debug "Connection cancelled", dst = s.socketKey
raise exc
proc connect[A](s: UtpSocket[A]): Future[ConnectionResult[A]] =
proc connect[A](s: UtpSocket[A]): Future[ConnectionResult[A]] {.async: (raises: [CancelledError]).} =
debug "Initiating connection", dst = s.socketKey
try:
await s.startOutgoingSocket()
utp_success_outgoing.inc()
debug "Outgoing connection successful", dst = s.socketKey
return ok(s)
except ConnectionError:
utp_failed_outgoing.inc()
debug "Outgoing connection timed-out", dst = s.socketKey
s.destroy()
return err(ConnectionTimedOut)
except CancelledError as exc:
s.destroy()
debug "Connection cancelled", dst = s.socketKey
raise exc
s.innerConnect()
proc socketAlreadyExists[A](): ConnectionResult[A] =
return err(OutgoingConnectionError(kind: SocketAlreadyExists))
proc socketAlreadyExistsFut[A](): Future[ConnectionResult[A]] =
let fut = newFuture[ConnectionResult[A]]()
fut.complete(socketAlreadyExists[A]())
return fut
# Connect to provided address
# Reference implementation:
# https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732
proc connectTo*[A](
r: UtpRouter[A], address: A): Future[ConnectionResult[A]] =
let maybeSocket = r.generateNewUniqueSocket(address)
r: UtpRouter[A], address: A
): Future[ConnectionResult[A]] {.async: (raises: [CancelledError]).} =
## Connect to the provided address
## Reference implementation:
## https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732
let socket = (r.generateNewUniqueSocket(address)).valueOr:
return err(SocketAlreadyExists)
if (maybeSocket.isNone()):
return socketAlreadyExistsFut[A]()
else:
let socket = maybeSocket.unsafeGet()
let connFut = socket.connect()
return connFut
await socket.connect()
# Connect to provided address with provided connection id. If the socket with
# this id and address already exists, return error
proc connectTo*[A](
r: UtpRouter[A], address: A, connectionId: uint16):
Future[ConnectionResult[A]] =
r: UtpRouter[A], address: A, connectionId: uint16
): Future[ConnectionResult[A]] {.async: (raises: [CancelledError]).} =
## Connect to address with provided connection id. If a socket with this id
## id and address already exists, return SocketAlreadyExists error.
let socket = newOutgoingSocket[A](
address, r.sendCb, r.socketConfig, connectionId, r.rng[])
if (r.registerIfAbsent(socket)):
let connFut = socket.connect()
return connFut
await socket.connect()
else:
return socketAlreadyExistsFut[A]()
err(SocketAlreadyExists)
proc shutdown*[A](r: UtpRouter[A]) =
# stop processing any new packets and close all sockets in background without
# notifying remote peers
## Stop processing any new packets and close all sockets in background without
## notifying remote peers.
r.closed = true
for s in r.allSockets():
s.destroy()
proc shutdownWait*[A](r: UtpRouter[A]) {.async.} =
proc shutdownWait*[A](r: UtpRouter[A]) {.async: (raises: []).} =
var activeSockets: seq[UtpSocket[A]] = @[]
# stop processing any new packets and close all sockets without
# notifying remote peers
## Stop processing any new packets and close all sockets without notifying
## remote peers.
r.closed = true
# Need to make a copy as calling socket.destroyWait() removes the socket from
@ -363,4 +347,4 @@ proc shutdownWait*[A](r: UtpRouter[A]) {.async.} =
activeSockets.add(s)
for s in activeSockets:
await s.destroyWait()
await noCancel(s.destroyWait())

View File

@ -33,6 +33,8 @@ type
ConnectionDirection = enum
Outgoing, Incoming
ConnectionError* = object of CatchableError
UtpSocketKey*[A] = object
remoteAddress*: A
rcvId*: uint16
@ -163,7 +165,7 @@ type
# Should be completed after successful connection to remote host or after
# timeout for the first SYN packet.
connectionFuture: Future[void]
connectionFuture: Future[void].Raising([ConnectionError, CancelledError])
# The number of packets in the send queue. Packets that haven't
# been sent yet and packets marked as needing to be resend count.
@ -301,16 +303,9 @@ type
# i.e reaches the destroy state
SocketCloseCallback* = proc (): void {.gcsafe, raises: [].}
ConnectionError* = object of CatchableError
OutgoingConnectionErrorType* = enum
OutgoingConnectionError* = enum
SocketAlreadyExists, ConnectionTimedOut
OutgoingConnectionError* = object
case kind*: OutgoingConnectionErrorType
of SocketAlreadyExists, ConnectionTimedOut:
discard
ConnectionResult*[A] = Result[UtpSocket[A], OutgoingConnectionError]
chronicles.formatIt(UtpSocketKey): $it
@ -586,8 +581,8 @@ proc checkTimeouts(socket: UtpSocket) =
if socket.state == SynSent and (not socket.connectionFuture.finished()):
# Note: The socket connect code will already call socket.destroy when
# ConnectionError gets raised, no need to do it here.
socket.connectionFuture.fail(newException(
ConnectionError, "Connection to peer timed out"))
socket.connectionFuture.fail(
(ref ConnectionError)(msg: "Connection to peer timed out"))
else:
socket.destroy()
@ -647,12 +642,16 @@ proc checkTimeouts(socket: UtpSocket) =
# TODO: add sending keep alives when necessary
proc checkTimeoutsLoop(s: UtpSocket) {.async.} =
proc checkTimeoutsLoop(s: UtpSocket) {.async: (raises: [CancelledError]).} =
## Loop that check timeouts in the socket.
try:
while true:
await sleepAsync(checkTimeoutsLoopInterval)
s.eventQueue.putNoWait(SocketEvent(kind: CheckTimeouts))
try:
s.eventQueue.putNoWait(SocketEvent(kind: CheckTimeouts))
except AsyncQueueFullError as e:
# this should not happen as the write queue is unbounded
raiseAssert e.msg
except CancelledError as exc:
# checkTimeoutsLoop is the last running future managed by the socket, when
# it's cancelled the closeEvent can be fired.
@ -745,7 +744,7 @@ proc destroy*(s: UtpSocket) =
# someone will try run `eventQueue.put`. Without `eventQueue.put` , eventLoop
# future shows as cancelled, but handler for CancelledError is not run
proc destroyWait*(s: UtpSocket) {.async.} =
proc destroyWait*(s: UtpSocket) {.async: (raises: [CancelledError]).} =
## Moves socket to destroy state and clean all resources and wait for all
## registered callbacks to fire,
## Remote is not notified in any way about socket end of life.
@ -753,7 +752,7 @@ proc destroyWait*(s: UtpSocket) {.async.} =
await s.closeEvent.wait()
await allFutures(s.closeCallbacks)
proc setCloseCallback(s: UtpSocket, cb: SocketCloseCallback) {.async.} =
proc setCloseCallback(s: UtpSocket, cb: SocketCloseCallback) {.async: (raises: []).} =
## Set callback which will be called whenever the socket is permanently closed
try:
await s.closeEvent.wait()
@ -1634,7 +1633,7 @@ proc onRead(socket: UtpSocket, readReq: var ReadReq): ReadResult =
return ReadNotFinished
proc eventLoop(socket: UtpSocket) {.async.} =
proc eventLoop(socket: UtpSocket) {.async: (raises: [CancelledError]).} =
try:
while true:
let socketEvent = await socket.eventQueue.get()
@ -1801,7 +1800,7 @@ proc close*(socket: UtpSocket) =
# destroy the socket.
socket.destroy()
proc closeWait*(socket: UtpSocket) {.async.} =
proc closeWait*(socket: UtpSocket) {.async: (raises: [CancelledError]).} =
## Gracefully close the connection (send FIN) if the socket is in the
## connected state and wait for the socket to be closed.
## Warning: if the FIN packet is lost, then the socket might get closed due to
@ -1968,7 +1967,7 @@ proc new[A](
connectionIdSnd: sndId,
seqNr: initialSeqNr,
ackNr: initialAckNr,
connectionFuture: newFuture[void](),
connectionFuture: Future[void].Raising([ConnectionError, CancelledError]).init(),
outBuffer: GrowableCircularBuffer[OutgoingPacket].init(),
outBufferBytes: 0,
currentWindow: 0,
@ -2069,7 +2068,7 @@ proc startIncomingSocket*(socket: UtpSocket) =
socket.startEventLoop()
socket.startTimeoutLoop()
proc startOutgoingSocket*(socket: UtpSocket): Future[void] =
proc startOutgoingSocket*(socket: UtpSocket): Future[void] {.async: (raw: true, raises: [ConnectionError, CancelledError]).} =
doAssert(socket.state == SynSent)
let packet =
synPacket(socket.seqNr, socket.connectionIdRcv, socket.getRcvWindowSize())

View File

@ -212,7 +212,7 @@ procSuite "uTP over UDP protocol":
check socketResult.isErr()
let connectionError = socketResult.error()
check connectionError.kind == ConnectionTimedOut
check connectionError == ConnectionTimedOut
await waitUntil(proc (): bool = utpProto1.openSockets() == 0)
@ -463,7 +463,7 @@ procSuite "uTP over UDP protocol":
allowedSocketRes.isOk()
notAllowedSocketRes.isErr()
# remote did not allow this connection and it timed out
notAllowedSocketRes.error().kind == ConnectionTimedOut
notAllowedSocketRes.error() == ConnectionTimedOut
let clientSocket = allowedSocketRes.get()
let serverSocket = await server3Sockets.get()

View File

@ -1,4 +1,4 @@
# Copyright (c) 2020-2021 Status Research & Development GmbH
# Copyright (c) 2020-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
@ -315,7 +315,7 @@ procSuite "uTP router unit":
check:
duplicatedConnectionResult.isErr()
duplicatedConnectionResult.error().kind == SocketAlreadyExists
duplicatedConnectionResult.error() == SocketAlreadyExists
asyncTest "Router should fail connect when socket syn will not be acked":
let q = newAsyncQueue[UtpSocket[int]]()
@ -336,7 +336,7 @@ procSuite "uTP router unit":
check:
connectResult.isErr()
connectResult.error().kind == ConnectionTimedOut
connectResult.error() == ConnectionTimedOut
router.len() == 0
asyncTest "Router should clear all resources when connection future is cancelled":
@ -376,7 +376,7 @@ procSuite "uTP router unit":
check:
connectResult.isErr()
# even though send is failing we will just finish with timeout,
connectResult.error().kind == ConnectionTimedOut
connectResult.error() == ConnectionTimedOut
router.len() == 0
asyncTest "Router should clear closed outgoing connections":