diff --git a/eth/utp/utp_discv5_protocol.nim b/eth/utp/utp_discv5_protocol.nim index 5319c29..76585d6 100644 --- a/eth/utp/utp_discv5_protocol.nim +++ b/eth/utp/utp_discv5_protocol.nim @@ -70,16 +70,13 @@ proc initSendCallback( t: protocol.Protocol, subProtocolName: seq[byte]): SendCallback[NodeAddress] = return ( - proc (to: NodeAddress, data: seq[byte]): Future[void] = - let fut = newFuture[void]() + proc (to: NodeAddress, data: seq[byte]){.raises: [], gcsafe.} = # hidden assumption here is that nodes already have established discv5 # session between each other. In our use case this should be true as # opening stream is only done after successful OFFER/ACCEPT or # FINDCONTENT/CONTENT exchange which forces nodes to establish session # between each other. discard t.talkReqDirect(to, subProtocolName, data) - fut.complete() - return fut ) proc messageHandler( diff --git a/eth/utp/utp_protocol.nim b/eth/utp/utp_protocol.nim index 77beddd..0eb4d7c 100644 --- a/eth/utp/utp_protocol.nim +++ b/eth/utp/utp_protocol.nim @@ -63,7 +63,7 @@ proc hash(x: UtpSocketKey[TransportAddress]): Hash = !$h proc processDatagram(transp: DatagramTransport, raddr: TransportAddress): - Future[void] {.async.} = + Future[void] {.async: (raises: []).} = let router = getUserData[UtpRouter[TransportAddress]](transp) # TODO: should we use `peekMessage()` to avoid allocation? let buf = try: transp.getMessage() @@ -71,12 +71,21 @@ proc processDatagram(transp: DatagramTransport, raddr: TransportAddress): trace "Error reading datagram msg: ", error = e.msg # This is likely to be local network connection issues. return - await processIncomingBytes[TransportAddress](router, buf, raddr) + try: + await processIncomingBytes[TransportAddress](router, buf, raddr) + except CancelledError: + debug "processIncomingBytes canceled" proc initSendCallback(t: DatagramTransport): SendCallback[TransportAddress] = return ( - proc (to: TransportAddress, data: seq[byte]): Future[void] = - t.sendTo(to, data) + proc ( + to: TransportAddress, data: seq[byte] + ) {.raises: [], gcsafe.} = + let fut = t.sendTo(to, data) + let cb = proc(data: pointer) {.gcsafe.} = + if fut.failed: + debug "uTP send failed", msg = fut.readError.msg + fut.addCallback cb ) proc new*( diff --git a/eth/utp/utp_router.nim b/eth/utp/utp_router.nim index 59708d6..9d345d4 100644 --- a/eth/utp/utp_router.nim +++ b/eth/utp/utp_router.nim @@ -187,7 +187,9 @@ proc shouldAllowConnection[A]( else: r.allowConnection(r, remoteAddress, connectionId) -proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}= +proc processPacket[A]( + r: UtpRouter[A], p: Packet, sender: A + ) {.async: (raises: [CancelledError]).}= debug "Received packet ", sender = sender, packetType = p.header.pType @@ -252,10 +254,11 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}= debug "Received FIN/DATA/ACK on unknown socket, sending reset" let rstPacket = resetPacket( randUint16(r.rng[]), p.header.connectionId, p.header.seqNr) - await r.sendCb(sender, encodePacket(rstPacket)) + r.sendCb(sender, encodePacket(rstPacket)) proc processIncomingBytes*[A]( - r: UtpRouter[A], bytes: seq[byte], sender: A) {.async.} = + r: UtpRouter[A], bytes: seq[byte], sender: A + ) {.async: (raises:[CancelledError]).} = if (not r.closed): let decoded = decodePacket(bytes) if (decoded.isOk()): diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index 352ab18..592d355 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -49,7 +49,9 @@ type # Socket callback to send data to remote peer SendCallback*[A] = - proc (to: A, data: seq[byte]): Future[void] {.gcsafe, raises: []} + proc ( + to: A, data: seq[byte] + ) {.gcsafe, raises: [].} SocketConfig* = object # This is configurable (in contrast to reference impl), as with standard 2 @@ -452,11 +454,8 @@ proc registerOutgoingPacket(socket: UtpSocket, oPacket: OutgoingPacket) = inc socket.seqNr inc socket.curWindowPackets -proc sendData(socket: UtpSocket, data: seq[byte]): Future[void] {.async.} = - try: - await socket.send(socket.remoteAddress, data) - except CatchableError as e: - warn "UTP send failed", msg = e.msg +proc sendData(socket: UtpSocket, data: seq[byte]) = + socket.send(socket.remoteAddress, data) proc sendPacket(socket: UtpSocket, seqNr: uint16) = proc setSend(p: var OutgoingPacket): seq[byte] = @@ -474,7 +473,7 @@ proc sendPacket(socket: UtpSocket, seqNr: uint16) = return p.packetBytes - asyncSpawn socket.sendData(setSend(socket.outBuffer[seqNr])) + socket.sendData(setSend(socket.outBuffer[seqNr])) proc resetSendTimeout(socket: UtpSocket) = socket.retransmitTimeout = socket.rto @@ -1112,7 +1111,7 @@ proc sendAck(socket: UtpSocket) = pkAckNr = ackPacket.header.ackNr, gotEACK = ackPacket.eack.isSome() - asyncSpawn socket.sendData(encodePacket(ackPacket)) + socket.sendData(encodePacket(ackPacket)) proc tryfinalizeConnection(socket: UtpSocket, p: Packet) = @@ -1568,7 +1567,9 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = # will be generated socket.sendAck() -proc processPacket*(socket: UtpSocket, p: Packet): Future[void] = +proc processPacket*( + socket: UtpSocket, p: Packet + ): Future[void] {.async: (raw: true, raises: [CancelledError]).} = socket.eventQueue.put(SocketEvent(kind: NewPacket, packet: p)) template shiftBuffer(t, c: untyped) = @@ -2081,5 +2082,5 @@ proc startOutgoingSocket*(socket: UtpSocket): Future[void] = socket.registerOutgoingPacket(outgoingPacket) socket.startEventLoop() socket.startTimeoutLoop() - asyncSpawn socket.sendData(outgoingPacket.packetBytes) + socket.sendData(outgoingPacket.packetBytes) return socket.connectionFuture diff --git a/tests/utp/test_protocol_integration.nim b/tests/utp/test_protocol_integration.nim index 37980c9..229e371 100644 --- a/tests/utp/test_protocol_integration.nim +++ b/tests/utp/test_protocol_integration.nim @@ -54,16 +54,29 @@ proc getServerSocket( procSuite "uTP over UDP protocol with loss and delays": let rng = newRng() + proc simulatedSend( + d: DatagramTransport, to: TransportAddress, data: seq[byte], + maxDelay: int, packetDropRate: int + ) {.async: (raises: []).} = + let i = rand(rng[], 99) + if i >= packetDropRate: + let delay = milliseconds(rand(rng[], maxDelay)) + try: + await sleepAsync(delay) + await d.sendTo(to, data) + except TransportError: + # ignore + return + except CancelledError: + # ignore + return + proc sendBuilder(maxDelay: int, packetDropRate: int): SendCallbackBuilder = return ( proc (d: DatagramTransport): SendCallback[TransportAddress] = return ( - proc (to: TransportAddress, data: seq[byte]): Future[void] {.async.} = - let i = rand(rng[], 99) - if i >= packetDropRate: - let delay = milliseconds(rand(rng[], maxDelay)) - await sleepAsync(delay) - await d.sendTo(to, data) + proc (to: TransportAddress, data: seq[byte]) = + asyncSpawn simulatedSend(d, to, data, maxDelay, packetDropRate) ) ) diff --git a/tests/utp/test_utils.nim b/tests/utp/test_utils.nim index 9307581..456bc94 100644 --- a/tests/utp/test_utils.nim +++ b/tests/utp/test_utils.nim @@ -106,8 +106,13 @@ proc generateDataPackets*( packets proc initTestSnd*(q: AsyncQueue[Packet]): SendCallback[TransportAddress]= - return ( - proc (to: TransportAddress, bytes: seq[byte]): Future[void] = + return ( + proc ( + to: TransportAddress, bytes: seq[byte] + ) {.raises: [], gcsafe.} = let p = decodePacket(bytes).get() - q.addLast(p) + try: + q.addLastNoWait(p) + except AsyncQueueFullError: + raiseAssert "Should not occur as unlimited queue" ) diff --git a/tests/utp/test_utp_router.nim b/tests/utp/test_utp_router.nim index aaec8ce..2709f06 100644 --- a/tests/utp/test_utp_router.nim +++ b/tests/utp/test_utp_router.nim @@ -37,16 +37,17 @@ procSuite "uTP router unit": serverSockets.addLast(client) ) - proc testSend(to: int, bytes: seq[byte]): Future[void] = - let f = newFuture[void]() - f.complete() - f + proc testSend(to: int, bytes: seq[byte]) = + discard proc initTestSnd(q: AsyncQueue[(Packet, int)]): SendCallback[int]= - return ( - proc (to: int, bytes: seq[byte]): Future[void] = + return ( + proc (to: int, bytes: seq[byte]) {.raises: [], gcsafe.} = let p = decodePacket(bytes).get() - q.addLast((p, to)) + try: + q.addLastNoWait((p, to)) + except AsyncQueueFullError: + raiseAssert "Should not occur as unlimited queue" ) template connectOutgoing( @@ -359,14 +360,14 @@ procSuite "uTP router unit": check: router.len() == 0 - asyncTest "Router should clear all resources and handle error while sending syn packet": + asyncTest "Router should clear all resources and handle error when sending syn packet fails": let q = newAsyncQueue[UtpSocket[int]]() let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(milliseconds(500)), rng) router.sendCb = - proc (to: int, data: seq[byte]): Future[void] = - let f = newFuture[void]() - f.fail(newException(TestError, "failed")) - return f + proc (to: int, data: seq[byte]) = + # Can just discard here not to send anything as the send callback does + # not forward errors anyhow + discard let connectResult = await router.connectTo(testSender2)