Fix chronos related deprecated warnings in uTP code part II (#669)

This also makes the uTP SendCallBack not returning a Future any
more as it is not used in sendData anyhow. And in case of uTP
over discv5, discv5 send call is already not async.

This gives quite a noticable throughput benchmark improvement over
with uTP over UDP, and a slightly noticable with uTP over discv5
This commit is contained in:
Kim De Mey 2024-01-23 18:41:38 +01:00 committed by GitHub
parent b89d712339
commit 19965bab95
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 71 additions and 42 deletions

View File

@ -70,16 +70,13 @@ proc initSendCallback(
t: protocol.Protocol, subProtocolName: seq[byte]): t: protocol.Protocol, subProtocolName: seq[byte]):
SendCallback[NodeAddress] = SendCallback[NodeAddress] =
return ( return (
proc (to: NodeAddress, data: seq[byte]): Future[void] = proc (to: NodeAddress, data: seq[byte]){.raises: [], gcsafe.} =
let fut = newFuture[void]()
# hidden assumption here is that nodes already have established discv5 # hidden assumption here is that nodes already have established discv5
# session between each other. In our use case this should be true as # session between each other. In our use case this should be true as
# opening stream is only done after successful OFFER/ACCEPT or # opening stream is only done after successful OFFER/ACCEPT or
# FINDCONTENT/CONTENT exchange which forces nodes to establish session # FINDCONTENT/CONTENT exchange which forces nodes to establish session
# between each other. # between each other.
discard t.talkReqDirect(to, subProtocolName, data) discard t.talkReqDirect(to, subProtocolName, data)
fut.complete()
return fut
) )
proc messageHandler( proc messageHandler(

View File

@ -63,7 +63,7 @@ proc hash(x: UtpSocketKey[TransportAddress]): Hash =
!$h !$h
proc processDatagram(transp: DatagramTransport, raddr: TransportAddress): proc processDatagram(transp: DatagramTransport, raddr: TransportAddress):
Future[void] {.async.} = Future[void] {.async: (raises: []).} =
let router = getUserData[UtpRouter[TransportAddress]](transp) let router = getUserData[UtpRouter[TransportAddress]](transp)
# TODO: should we use `peekMessage()` to avoid allocation? # TODO: should we use `peekMessage()` to avoid allocation?
let buf = try: transp.getMessage() let buf = try: transp.getMessage()
@ -71,12 +71,21 @@ proc processDatagram(transp: DatagramTransport, raddr: TransportAddress):
trace "Error reading datagram msg: ", error = e.msg trace "Error reading datagram msg: ", error = e.msg
# This is likely to be local network connection issues. # This is likely to be local network connection issues.
return return
await processIncomingBytes[TransportAddress](router, buf, raddr) try:
await processIncomingBytes[TransportAddress](router, buf, raddr)
except CancelledError:
debug "processIncomingBytes canceled"
proc initSendCallback(t: DatagramTransport): SendCallback[TransportAddress] = proc initSendCallback(t: DatagramTransport): SendCallback[TransportAddress] =
return ( return (
proc (to: TransportAddress, data: seq[byte]): Future[void] = proc (
t.sendTo(to, data) to: TransportAddress, data: seq[byte]
) {.raises: [], gcsafe.} =
let fut = t.sendTo(to, data)
let cb = proc(data: pointer) {.gcsafe.} =
if fut.failed:
debug "uTP send failed", msg = fut.readError.msg
fut.addCallback cb
) )
proc new*( proc new*(

View File

@ -187,7 +187,9 @@ proc shouldAllowConnection[A](
else: else:
r.allowConnection(r, remoteAddress, connectionId) r.allowConnection(r, remoteAddress, connectionId)
proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}= proc processPacket[A](
r: UtpRouter[A], p: Packet, sender: A
) {.async: (raises: [CancelledError]).}=
debug "Received packet ", debug "Received packet ",
sender = sender, sender = sender,
packetType = p.header.pType packetType = p.header.pType
@ -252,10 +254,11 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}=
debug "Received FIN/DATA/ACK on unknown socket, sending reset" debug "Received FIN/DATA/ACK on unknown socket, sending reset"
let rstPacket = resetPacket( let rstPacket = resetPacket(
randUint16(r.rng[]), p.header.connectionId, p.header.seqNr) randUint16(r.rng[]), p.header.connectionId, p.header.seqNr)
await r.sendCb(sender, encodePacket(rstPacket)) r.sendCb(sender, encodePacket(rstPacket))
proc processIncomingBytes*[A]( proc processIncomingBytes*[A](
r: UtpRouter[A], bytes: seq[byte], sender: A) {.async.} = r: UtpRouter[A], bytes: seq[byte], sender: A
) {.async: (raises:[CancelledError]).} =
if (not r.closed): if (not r.closed):
let decoded = decodePacket(bytes) let decoded = decodePacket(bytes)
if (decoded.isOk()): if (decoded.isOk()):

View File

@ -49,7 +49,9 @@ type
# Socket callback to send data to remote peer # Socket callback to send data to remote peer
SendCallback*[A] = SendCallback*[A] =
proc (to: A, data: seq[byte]): Future[void] {.gcsafe, raises: []} proc (
to: A, data: seq[byte]
) {.gcsafe, raises: [].}
SocketConfig* = object SocketConfig* = object
# This is configurable (in contrast to reference impl), as with standard 2 # This is configurable (in contrast to reference impl), as with standard 2
@ -452,11 +454,8 @@ proc registerOutgoingPacket(socket: UtpSocket, oPacket: OutgoingPacket) =
inc socket.seqNr inc socket.seqNr
inc socket.curWindowPackets inc socket.curWindowPackets
proc sendData(socket: UtpSocket, data: seq[byte]): Future[void] {.async.} = proc sendData(socket: UtpSocket, data: seq[byte]) =
try: socket.send(socket.remoteAddress, data)
await socket.send(socket.remoteAddress, data)
except CatchableError as e:
warn "UTP send failed", msg = e.msg
proc sendPacket(socket: UtpSocket, seqNr: uint16) = proc sendPacket(socket: UtpSocket, seqNr: uint16) =
proc setSend(p: var OutgoingPacket): seq[byte] = proc setSend(p: var OutgoingPacket): seq[byte] =
@ -474,7 +473,7 @@ proc sendPacket(socket: UtpSocket, seqNr: uint16) =
return p.packetBytes return p.packetBytes
asyncSpawn socket.sendData(setSend(socket.outBuffer[seqNr])) socket.sendData(setSend(socket.outBuffer[seqNr]))
proc resetSendTimeout(socket: UtpSocket) = proc resetSendTimeout(socket: UtpSocket) =
socket.retransmitTimeout = socket.rto socket.retransmitTimeout = socket.rto
@ -1112,7 +1111,7 @@ proc sendAck(socket: UtpSocket) =
pkAckNr = ackPacket.header.ackNr, pkAckNr = ackPacket.header.ackNr,
gotEACK = ackPacket.eack.isSome() gotEACK = ackPacket.eack.isSome()
asyncSpawn socket.sendData(encodePacket(ackPacket)) socket.sendData(encodePacket(ackPacket))
proc tryfinalizeConnection(socket: UtpSocket, p: Packet) = proc tryfinalizeConnection(socket: UtpSocket, p: Packet) =
@ -1568,7 +1567,9 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
# will be generated # will be generated
socket.sendAck() socket.sendAck()
proc processPacket*(socket: UtpSocket, p: Packet): Future[void] = proc processPacket*(
socket: UtpSocket, p: Packet
): Future[void] {.async: (raw: true, raises: [CancelledError]).} =
socket.eventQueue.put(SocketEvent(kind: NewPacket, packet: p)) socket.eventQueue.put(SocketEvent(kind: NewPacket, packet: p))
template shiftBuffer(t, c: untyped) = template shiftBuffer(t, c: untyped) =
@ -2081,5 +2082,5 @@ proc startOutgoingSocket*(socket: UtpSocket): Future[void] =
socket.registerOutgoingPacket(outgoingPacket) socket.registerOutgoingPacket(outgoingPacket)
socket.startEventLoop() socket.startEventLoop()
socket.startTimeoutLoop() socket.startTimeoutLoop()
asyncSpawn socket.sendData(outgoingPacket.packetBytes) socket.sendData(outgoingPacket.packetBytes)
return socket.connectionFuture return socket.connectionFuture

View File

@ -54,16 +54,29 @@ proc getServerSocket(
procSuite "uTP over UDP protocol with loss and delays": procSuite "uTP over UDP protocol with loss and delays":
let rng = newRng() let rng = newRng()
proc simulatedSend(
d: DatagramTransport, to: TransportAddress, data: seq[byte],
maxDelay: int, packetDropRate: int
) {.async: (raises: []).} =
let i = rand(rng[], 99)
if i >= packetDropRate:
let delay = milliseconds(rand(rng[], maxDelay))
try:
await sleepAsync(delay)
await d.sendTo(to, data)
except TransportError:
# ignore
return
except CancelledError:
# ignore
return
proc sendBuilder(maxDelay: int, packetDropRate: int): SendCallbackBuilder = proc sendBuilder(maxDelay: int, packetDropRate: int): SendCallbackBuilder =
return ( return (
proc (d: DatagramTransport): SendCallback[TransportAddress] = proc (d: DatagramTransport): SendCallback[TransportAddress] =
return ( return (
proc (to: TransportAddress, data: seq[byte]): Future[void] {.async.} = proc (to: TransportAddress, data: seq[byte]) =
let i = rand(rng[], 99) asyncSpawn simulatedSend(d, to, data, maxDelay, packetDropRate)
if i >= packetDropRate:
let delay = milliseconds(rand(rng[], maxDelay))
await sleepAsync(delay)
await d.sendTo(to, data)
) )
) )

View File

@ -106,8 +106,13 @@ proc generateDataPackets*(
packets packets
proc initTestSnd*(q: AsyncQueue[Packet]): SendCallback[TransportAddress]= proc initTestSnd*(q: AsyncQueue[Packet]): SendCallback[TransportAddress]=
return ( return (
proc (to: TransportAddress, bytes: seq[byte]): Future[void] = proc (
to: TransportAddress, bytes: seq[byte]
) {.raises: [], gcsafe.} =
let p = decodePacket(bytes).get() let p = decodePacket(bytes).get()
q.addLast(p) try:
q.addLastNoWait(p)
except AsyncQueueFullError:
raiseAssert "Should not occur as unlimited queue"
) )

View File

@ -37,16 +37,17 @@ procSuite "uTP router unit":
serverSockets.addLast(client) serverSockets.addLast(client)
) )
proc testSend(to: int, bytes: seq[byte]): Future[void] = proc testSend(to: int, bytes: seq[byte]) =
let f = newFuture[void]() discard
f.complete()
f
proc initTestSnd(q: AsyncQueue[(Packet, int)]): SendCallback[int]= proc initTestSnd(q: AsyncQueue[(Packet, int)]): SendCallback[int]=
return ( return (
proc (to: int, bytes: seq[byte]): Future[void] = proc (to: int, bytes: seq[byte]) {.raises: [], gcsafe.} =
let p = decodePacket(bytes).get() let p = decodePacket(bytes).get()
q.addLast((p, to)) try:
q.addLastNoWait((p, to))
except AsyncQueueFullError:
raiseAssert "Should not occur as unlimited queue"
) )
template connectOutgoing( template connectOutgoing(
@ -359,14 +360,14 @@ procSuite "uTP router unit":
check: check:
router.len() == 0 router.len() == 0
asyncTest "Router should clear all resources and handle error while sending syn packet": asyncTest "Router should clear all resources and handle error when sending syn packet fails":
let q = newAsyncQueue[UtpSocket[int]]() let q = newAsyncQueue[UtpSocket[int]]()
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(milliseconds(500)), rng) let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(milliseconds(500)), rng)
router.sendCb = router.sendCb =
proc (to: int, data: seq[byte]): Future[void] = proc (to: int, data: seq[byte]) =
let f = newFuture[void]() # Can just discard here not to send anything as the send callback does
f.fail(newException(TestError, "failed")) # not forward errors anyhow
return f discard
let connectResult = await router.connectTo(testSender2) let connectResult = await router.connectTo(testSender2)