Add more missing async raises in uTP (#702)

This commit is contained in:
Kim De Mey 2024-06-12 18:01:23 +02:00 committed by GitHub
parent 224048a1d5
commit 207244c2db
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 47 additions and 37 deletions

View File

@ -38,7 +38,7 @@ type
# ``server`` - UtpProtocol object. # ``server`` - UtpProtocol object.
# ``client`` - accepted client utp socket. # ``client`` - accepted client utp socket.
AcceptConnectionCallback*[A] = proc(server: UtpRouter[A], AcceptConnectionCallback*[A] = proc(server: UtpRouter[A],
client: UtpSocket[A]): Future[void] {.gcsafe, raises: [].} client: UtpSocket[A]): Future[void] {.gcsafe, async: (raw: true, raises: []).}
# Callback to act as firewall for incoming peers. Should return true if peer # Callback to act as firewall for incoming peers. Should return true if peer
# is allowed to connect. # is allowed to connect.

View File

@ -1810,10 +1810,10 @@ proc closeWait*(socket: UtpSocket) {.async: (raises: [CancelledError]).} =
socket.close() socket.close()
await socket.closeEvent.wait() await socket.closeEvent.wait()
proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] = proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] {.async: (raw: true, raises: [CancelledError]).} =
debug "Write data", dst = socket.socketKey, length = len(data) debug "Write data", dst = socket.socketKey, length = len(data)
let retFuture = newFuture[WriteResult]("UtpSocket.write") let retFuture = Future[WriteResult].Raising([CancelledError]).init("utp_socket.write")
if (socket.state != Connected): if (socket.state != Connected):
let res = WriteResult.err( let res = WriteResult.err(
@ -1842,13 +1842,13 @@ proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] =
# this should not happen as out write queue is unbounded # this should not happen as out write queue is unbounded
raiseAssert e.msg raiseAssert e.msg
return retFuture retFuture
proc read*(socket: UtpSocket, n: Natural): Future[seq[byte]] = proc read*(socket: UtpSocket, n: Natural): Future[seq[byte]] {.async: (raw: true, raises: [CancelledError]).} =
## Read all bytes from socket ``socket``. ## Read n bytes from socket ``socket``.
## ##
## This procedure allocates buffer seq[byte] and return it as result. ## This procedure allocates buffer seq[byte] and return it as result.
let fut = newFuture[seq[uint8]]() let fut = Future[seq[uint8]].Raising([CancelledError]).init("utp_socket.read")
if socket.readingClosed(): if socket.readingClosed():
fut.complete(newSeq[uint8]()) fut.complete(newSeq[uint8]())
@ -1867,13 +1867,13 @@ proc read*(socket: UtpSocket, n: Natural): Future[seq[byte]] =
# should not happen as our write queue is unbounded # should not happen as our write queue is unbounded
raiseAssert e.msg raiseAssert e.msg
return fut fut
proc read*(socket: UtpSocket): Future[seq[byte]] = proc read*(socket: UtpSocket): Future[seq[byte]] {.async: (raw: true, raises: [CancelledError]).} =
## Read all bytes from socket ``socket``. ## Read all bytes from socket ``socket``.
## ##
## This procedure allocates buffer seq[byte] and return it as result. ## This procedure allocates buffer seq[byte] and return it as result.
let fut = newFuture[seq[uint8]]() let fut = Future[seq[uint8]].Raising([CancelledError]).init("utp_socket.read")
if socket.readingClosed(): if socket.readingClosed():
fut.complete(newSeq[uint8]()) fut.complete(newSeq[uint8]())
@ -1892,7 +1892,7 @@ proc read*(socket: UtpSocket): Future[seq[byte]] =
# should not happen as our write queue is unbounded # should not happen as our write queue is unbounded
raiseAssert e.msg raiseAssert e.msg
return fut fut
# Check how many packets are still in the out going buffer, usefully for tests # Check how many packets are still in the out going buffer, usefully for tests
# or debugging. # or debugging.

View File

@ -25,8 +25,10 @@ procSuite "uTP over discovery v5 protocol":
proc registerIncomingSocketCallback(serverSockets: AsyncQueue): AcceptConnectionCallback[NodeAddress] = proc registerIncomingSocketCallback(serverSockets: AsyncQueue): AcceptConnectionCallback[NodeAddress] =
return ( return (
proc(server: UtpRouter[NodeAddress], client: UtpSocket[NodeAddress]): Future[void] = proc(
serverSockets.addLast(client) server: UtpRouter[NodeAddress], client: UtpSocket[NodeAddress]
): Future[void] {.async: (raw: true, raises: []).} =
noCancel serverSockets.addLast(client)
) )
proc allowOneIdCallback(allowedId: uint16): AllowConnectionCallback[NodeAddress] = proc allowOneIdCallback(allowedId: uint16): AllowConnectionCallback[NodeAddress] =
@ -67,9 +69,11 @@ procSuite "uTP over discovery v5 protocol":
await node1.closeWait() await node1.closeWait()
await node2.closeWait() await node2.closeWait()
proc cbUserData(server: UtpRouter[NodeAddress], client: UtpSocket[NodeAddress]): Future[void] = proc cbUserData(
server: UtpRouter[NodeAddress], client: UtpSocket[NodeAddress]
): Future[void] {.async: (raw: true, raises: []).} =
let queue = rt.getUserData[NodeAddress, AsyncQueue[UtpSocket[NodeAddress]]](server) let queue = rt.getUserData[NodeAddress, AsyncQueue[UtpSocket[NodeAddress]]](server)
queue.addLast(client) noCancel queue.addLast(client)
asyncTest "Provide user data pointer and use it in callback": asyncTest "Provide user data pointer and use it in callback":
let let
@ -249,20 +253,20 @@ procSuite "uTP over discovery v5 protocol":
proc handleIncomingConnection( proc handleIncomingConnection(
server: UtpRouter[NodeAddress], server: UtpRouter[NodeAddress],
client: UtpSocket[NodeAddress] client: UtpSocket[NodeAddress]
): Future[void] = ): Future[void] {.async: (raw: true, raises: []).} =
readFutures.add(client.readAndCheck()) readFutures.add(client.readAndCheck())
var fut = newFuture[void]("test.AcceptConnectionCallback") var fut = newFuture[void]("test.AcceptConnectionCallback")
fut.complete() fut.complete()
return fut noCancel fut
proc handleIncomingConnectionDummy( proc handleIncomingConnectionDummy(
server: UtpRouter[NodeAddress], server: UtpRouter[NodeAddress],
client: UtpSocket[NodeAddress] client: UtpSocket[NodeAddress]
): Future[void] = ): Future[void] {.async: (raw: true, raises: []).} =
var fut = newFuture[void]("test.AcceptConnectionCallback") var fut = newFuture[void]("test.AcceptConnectionCallback")
fut.complete() fut.complete()
return fut noCancel fut
let let
address1 = localAddress(20302) address1 = localAddress(20302)

View File

@ -19,8 +19,10 @@ proc setAcceptedCallback(
event: AsyncEvent event: AsyncEvent
): AcceptConnectionCallback[TransportAddress] = ): AcceptConnectionCallback[TransportAddress] =
return ( return (
proc(server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] = proc(
let fut = newFuture[void]() server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]
): Future[void] {.async: (raw: true, raises: []).} =
let fut = noCancel Future[void].Raising([CancelledError]).init("test.AcceptConnectionCallback")
event.fire() event.fire()
fut.complete() fut.complete()
fut fut
@ -30,8 +32,10 @@ proc registerIncomingSocketCallback(
serverSockets: AsyncQueue serverSockets: AsyncQueue
): AcceptConnectionCallback[TransportAddress] = ): AcceptConnectionCallback[TransportAddress] =
return ( return (
proc(server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] = proc(
serverSockets.addLast(client) server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]
): Future[void] {.async: (raw: true, raises: []).}=
noCancel serverSockets.addLast(client)
) )
proc allowOneIdCallback( proc allowOneIdCallback(
@ -165,9 +169,9 @@ procSuite "uTP over UDP protocol":
asyncTest "Connect to remote host: test udata pointer and use it in callback": asyncTest "Connect to remote host: test udata pointer and use it in callback":
proc cbUserData( proc cbUserData(
server: UtpRouter[TransportAddress], server: UtpRouter[TransportAddress],
client: UtpSocket[TransportAddress]): Future[void] = client: UtpSocket[TransportAddress]): Future[void] {.async: (raw: true, raises: []).} =
let q = getUserData[TransportAddress, AsyncQueue[UtpSocket[TransportAddress]]](server) let q = getUserData[TransportAddress, AsyncQueue[UtpSocket[TransportAddress]]](server)
q.addLast(client) noCancel q.addLast(client)
let let
incomingConnections1 = newAsyncQueue[UtpSocket[TransportAddress]]() incomingConnections1 = newAsyncQueue[UtpSocket[TransportAddress]]()
@ -565,20 +569,20 @@ procSuite "uTP over UDP protocol":
proc handleIncomingConnection( proc handleIncomingConnection(
server: UtpRouter[TransportAddress], server: UtpRouter[TransportAddress],
client: UtpSocket[TransportAddress] client: UtpSocket[TransportAddress]
): Future[void] = ): Future[void] {.async: (raw: true, raises: []).} =
readFutures.add(client.readAndCheck()) readFutures.add(client.readAndCheck())
var fut = newFuture[void]("test.AcceptConnectionCallback") var fut = newFuture[void]("test.AcceptConnectionCallback")
fut.complete() fut.complete()
return fut noCancel fut
proc handleIncomingConnectionDummy( proc handleIncomingConnectionDummy(
server: UtpRouter[TransportAddress], server: UtpRouter[TransportAddress],
client: UtpSocket[TransportAddress] client: UtpSocket[TransportAddress]
): Future[void] = ): Future[void] {.async: (raw: true, raises: []).} =
var fut = newFuture[void]("test.AcceptConnectionCallback") var fut = newFuture[void]("test.AcceptConnectionCallback")
fut.complete() fut.complete()
return fut noCancel fut
let let
address1 = initTAddress("127.0.0.1", 9079) address1 = initTAddress("127.0.0.1", 9079)

View File

@ -1,4 +1,4 @@
# Copyright (c) 2022 Status Research & Development GmbH # Copyright (c) 2022-2024 Status Research & Development GmbH
# Licensed and distributed under either of # Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
@ -32,12 +32,14 @@ proc buildAcceptConnection(
t: ref Table[UtpSocketKey[TransportAddress], UtpSocket[TransportAddress]] t: ref Table[UtpSocketKey[TransportAddress], UtpSocket[TransportAddress]]
): AcceptConnectionCallback[TransportAddress] = ): AcceptConnectionCallback[TransportAddress] =
return ( return (
proc (server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] = proc (
let fut = newFuture[void]() server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]
): Future[void] {.async: (raw: true, raises: []).} =
let fut = noCancel Future[void].Raising([CancelledError]).init("test.AcceptConnectionCallback")
let key = client.socketKey let key = client.socketKey
t[key] = client t[key] = client
fut.complete() fut.complete()
return fut fut
) )
proc getServerSocket( proc getServerSocket(

View File

@ -33,8 +33,8 @@ procSuite "uTP router unit":
proc registerIncomingSocketCallback(serverSockets: AsyncQueue): AcceptConnectionCallback[int] = proc registerIncomingSocketCallback(serverSockets: AsyncQueue): AcceptConnectionCallback[int] =
return ( return (
proc(server: UtpRouter[int], client: UtpSocket[int]): Future[void] = proc(server: UtpRouter[int], client: UtpSocket[int]): Future[void] {.async: (raw: true, raises: []).} =
serverSockets.addLast(client) noCancel serverSockets.addLast(client)
) )
proc testSend(to: int, bytes: seq[byte]) = proc testSend(to: int, bytes: seq[byte]) =