From 207244c2db5f1b55b23d82cf777cdf77f183a369 Mon Sep 17 00:00:00 2001 From: Kim De Mey Date: Wed, 12 Jun 2024 18:01:23 +0200 Subject: [PATCH] Add more missing async raises in uTP (#702) --- eth/utp/utp_router.nim | 2 +- eth/utp/utp_socket.nim | 24 ++++++++++++------------ tests/utp/test_discv5_protocol.nim | 20 ++++++++++++-------- tests/utp/test_protocol.nim | 24 ++++++++++++++---------- tests/utp/test_protocol_integration.nim | 10 ++++++---- tests/utp/test_utp_router.nim | 4 ++-- 6 files changed, 47 insertions(+), 37 deletions(-) diff --git a/eth/utp/utp_router.nim b/eth/utp/utp_router.nim index 91f6e17..f0b4940 100644 --- a/eth/utp/utp_router.nim +++ b/eth/utp/utp_router.nim @@ -38,7 +38,7 @@ type # ``server`` - UtpProtocol object. # ``client`` - accepted client utp socket. AcceptConnectionCallback*[A] = proc(server: UtpRouter[A], - client: UtpSocket[A]): Future[void] {.gcsafe, raises: [].} + client: UtpSocket[A]): Future[void] {.gcsafe, async: (raw: true, raises: []).} # Callback to act as firewall for incoming peers. Should return true if peer # is allowed to connect. diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index 2fe6be4..8223323 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -1810,10 +1810,10 @@ proc closeWait*(socket: UtpSocket) {.async: (raises: [CancelledError]).} = socket.close() await socket.closeEvent.wait() -proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] = +proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] {.async: (raw: true, raises: [CancelledError]).} = debug "Write data", dst = socket.socketKey, length = len(data) - let retFuture = newFuture[WriteResult]("UtpSocket.write") + let retFuture = Future[WriteResult].Raising([CancelledError]).init("utp_socket.write") if (socket.state != Connected): let res = WriteResult.err( @@ -1842,13 +1842,13 @@ proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] = # this should not happen as out write queue is unbounded raiseAssert e.msg - return retFuture + retFuture -proc read*(socket: UtpSocket, n: Natural): Future[seq[byte]] = - ## Read all bytes from socket ``socket``. +proc read*(socket: UtpSocket, n: Natural): Future[seq[byte]] {.async: (raw: true, raises: [CancelledError]).} = + ## Read n bytes from socket ``socket``. ## ## This procedure allocates buffer seq[byte] and return it as result. - let fut = newFuture[seq[uint8]]() + let fut = Future[seq[uint8]].Raising([CancelledError]).init("utp_socket.read") if socket.readingClosed(): fut.complete(newSeq[uint8]()) @@ -1864,16 +1864,16 @@ proc read*(socket: UtpSocket, n: Natural): Future[seq[byte]] = reader: fut)) ) except AsyncQueueFullError as e: - # should not happen as our write queue is unbounded - raiseAssert e.msg + # should not happen as our write queue is unbounded + raiseAssert e.msg - return fut + fut -proc read*(socket: UtpSocket): Future[seq[byte]] = +proc read*(socket: UtpSocket): Future[seq[byte]] {.async: (raw: true, raises: [CancelledError]).} = ## Read all bytes from socket ``socket``. ## ## This procedure allocates buffer seq[byte] and return it as result. - let fut = newFuture[seq[uint8]]() + let fut = Future[seq[uint8]].Raising([CancelledError]).init("utp_socket.read") if socket.readingClosed(): fut.complete(newSeq[uint8]()) @@ -1892,7 +1892,7 @@ proc read*(socket: UtpSocket): Future[seq[byte]] = # should not happen as our write queue is unbounded raiseAssert e.msg - return fut + fut # Check how many packets are still in the out going buffer, usefully for tests # or debugging. diff --git a/tests/utp/test_discv5_protocol.nim b/tests/utp/test_discv5_protocol.nim index a129bc3..3df0342 100644 --- a/tests/utp/test_discv5_protocol.nim +++ b/tests/utp/test_discv5_protocol.nim @@ -25,8 +25,10 @@ procSuite "uTP over discovery v5 protocol": proc registerIncomingSocketCallback(serverSockets: AsyncQueue): AcceptConnectionCallback[NodeAddress] = return ( - proc(server: UtpRouter[NodeAddress], client: UtpSocket[NodeAddress]): Future[void] = - serverSockets.addLast(client) + proc( + server: UtpRouter[NodeAddress], client: UtpSocket[NodeAddress] + ): Future[void] {.async: (raw: true, raises: []).} = + noCancel serverSockets.addLast(client) ) proc allowOneIdCallback(allowedId: uint16): AllowConnectionCallback[NodeAddress] = @@ -67,9 +69,11 @@ procSuite "uTP over discovery v5 protocol": await node1.closeWait() await node2.closeWait() - proc cbUserData(server: UtpRouter[NodeAddress], client: UtpSocket[NodeAddress]): Future[void] = + proc cbUserData( + server: UtpRouter[NodeAddress], client: UtpSocket[NodeAddress] + ): Future[void] {.async: (raw: true, raises: []).} = let queue = rt.getUserData[NodeAddress, AsyncQueue[UtpSocket[NodeAddress]]](server) - queue.addLast(client) + noCancel queue.addLast(client) asyncTest "Provide user data pointer and use it in callback": let @@ -249,20 +253,20 @@ procSuite "uTP over discovery v5 protocol": proc handleIncomingConnection( server: UtpRouter[NodeAddress], client: UtpSocket[NodeAddress] - ): Future[void] = + ): Future[void] {.async: (raw: true, raises: []).} = readFutures.add(client.readAndCheck()) var fut = newFuture[void]("test.AcceptConnectionCallback") fut.complete() - return fut + noCancel fut proc handleIncomingConnectionDummy( server: UtpRouter[NodeAddress], client: UtpSocket[NodeAddress] - ): Future[void] = + ): Future[void] {.async: (raw: true, raises: []).} = var fut = newFuture[void]("test.AcceptConnectionCallback") fut.complete() - return fut + noCancel fut let address1 = localAddress(20302) diff --git a/tests/utp/test_protocol.nim b/tests/utp/test_protocol.nim index aaf2b51..a6d26ed 100644 --- a/tests/utp/test_protocol.nim +++ b/tests/utp/test_protocol.nim @@ -19,8 +19,10 @@ proc setAcceptedCallback( event: AsyncEvent ): AcceptConnectionCallback[TransportAddress] = return ( - proc(server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] = - let fut = newFuture[void]() + proc( + server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress] + ): Future[void] {.async: (raw: true, raises: []).} = + let fut = noCancel Future[void].Raising([CancelledError]).init("test.AcceptConnectionCallback") event.fire() fut.complete() fut @@ -30,8 +32,10 @@ proc registerIncomingSocketCallback( serverSockets: AsyncQueue ): AcceptConnectionCallback[TransportAddress] = return ( - proc(server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] = - serverSockets.addLast(client) + proc( + server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress] + ): Future[void] {.async: (raw: true, raises: []).}= + noCancel serverSockets.addLast(client) ) proc allowOneIdCallback( @@ -165,9 +169,9 @@ procSuite "uTP over UDP protocol": asyncTest "Connect to remote host: test udata pointer and use it in callback": proc cbUserData( server: UtpRouter[TransportAddress], - client: UtpSocket[TransportAddress]): Future[void] = + client: UtpSocket[TransportAddress]): Future[void] {.async: (raw: true, raises: []).} = let q = getUserData[TransportAddress, AsyncQueue[UtpSocket[TransportAddress]]](server) - q.addLast(client) + noCancel q.addLast(client) let incomingConnections1 = newAsyncQueue[UtpSocket[TransportAddress]]() @@ -565,20 +569,20 @@ procSuite "uTP over UDP protocol": proc handleIncomingConnection( server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress] - ): Future[void] = + ): Future[void] {.async: (raw: true, raises: []).} = readFutures.add(client.readAndCheck()) var fut = newFuture[void]("test.AcceptConnectionCallback") fut.complete() - return fut + noCancel fut proc handleIncomingConnectionDummy( server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress] - ): Future[void] = + ): Future[void] {.async: (raw: true, raises: []).} = var fut = newFuture[void]("test.AcceptConnectionCallback") fut.complete() - return fut + noCancel fut let address1 = initTAddress("127.0.0.1", 9079) diff --git a/tests/utp/test_protocol_integration.nim b/tests/utp/test_protocol_integration.nim index 229e371..a90819c 100644 --- a/tests/utp/test_protocol_integration.nim +++ b/tests/utp/test_protocol_integration.nim @@ -1,4 +1,4 @@ -# Copyright (c) 2022 Status Research & Development GmbH +# Copyright (c) 2022-2024 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -32,12 +32,14 @@ proc buildAcceptConnection( t: ref Table[UtpSocketKey[TransportAddress], UtpSocket[TransportAddress]] ): AcceptConnectionCallback[TransportAddress] = return ( - proc (server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] = - let fut = newFuture[void]() + proc ( + server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress] + ): Future[void] {.async: (raw: true, raises: []).} = + let fut = noCancel Future[void].Raising([CancelledError]).init("test.AcceptConnectionCallback") let key = client.socketKey t[key] = client fut.complete() - return fut + fut ) proc getServerSocket( diff --git a/tests/utp/test_utp_router.nim b/tests/utp/test_utp_router.nim index f9eef8e..7e8a5b7 100644 --- a/tests/utp/test_utp_router.nim +++ b/tests/utp/test_utp_router.nim @@ -33,8 +33,8 @@ procSuite "uTP router unit": proc registerIncomingSocketCallback(serverSockets: AsyncQueue): AcceptConnectionCallback[int] = return ( - proc(server: UtpRouter[int], client: UtpSocket[int]): Future[void] = - serverSockets.addLast(client) + proc(server: UtpRouter[int], client: UtpSocket[int]): Future[void] {.async: (raw: true, raises: []).} = + noCancel serverSockets.addLast(client) ) proc testSend(to: int, bytes: seq[byte]) =