diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index 0a18472..dfc6cd0 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -612,9 +612,13 @@ proc checkTimeoutsLoop(s: UtpSocket) {.async.} = try: while true: await sleepAsync(checkTimeoutsLoopInterval) - await s.eventQueue.put(SocketEvent(kind: CheckTimeouts)) - except CancelledError: + s.eventQueue.putNoWait(SocketEvent(kind: CheckTimeouts)) + except CancelledError as exc: + # check timeouts loop is last running future managed by socket, if its + # cancelled we can fire closeEvent + s.closeEvent.fire() trace "checkTimeoutsLoop canceled" + raise exc proc startTimeoutLoop(s: UtpSocket) = s.checkTimeoutsLoop = checkTimeoutsLoop(s) @@ -682,6 +686,10 @@ proc isConnected*(socket: UtpSocket): bool = proc isClosed*(socket: UtpSocket): bool = socket.state == Destroy and socket.closeEvent.isSet() +proc isClosedAndCleanedUpAllResources*(socket: UtpSocket): bool = + ## Test Api to check that all resources are properly cleaned up + socket.isClosed() and socket.eventLoop.cancelled() and socket.checkTimeoutsLoop.cancelled() + proc destroy*(s: UtpSocket) = info "Destroying socket", to = s.socketKey @@ -689,8 +697,12 @@ proc destroy*(s: UtpSocket) = ## Remote is not notified in any way about socket end of life s.state = Destroy s.eventLoop.cancel() - s.checkTimeoutsLoop.cancel() - s.closeEvent.fire() + # This procedure initiate cleanup process which goes like: + # Cancel EventLoop -> Cancel timeoutsLoop -> Fire closeEvent + # This is necessary due to how evenLoop look like i.e it has only one await + # point on `eventQueue.get` which trigger cancellation excepion only when + # someone will try run `eventQueue.put`. Without `eventQueue.put` , eventLoop + # future shows as cancelled, but handler for CancelledError is not run proc destroyWait*(s: UtpSocket) {.async.} = ## Moves socket to destroy state and clean all reasources and wait for all registered @@ -1609,9 +1621,8 @@ proc eventLoop(socket: UtpSocket) {.async.} = else: # in any other case we do not need to do any thing discard - socket.checkTimeouts() - except CancelledError: + except CancelledError as exc: for w in socket.pendingWrites.items(): if w.kind == Data and (not w.writer.finished()): let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state)) @@ -1624,7 +1635,10 @@ proc eventLoop(socket: UtpSocket) {.async.} = r.reader.complete(r.bytesAvailable) socket.pendingWrites.clear() socket.pendingReads.clear() + # main eventLoop has been cancelled, try to cancel check timeouts loop + socket.checkTimeoutsLoop.cancel() trace "main socket event loop cancelled" + raise exc proc startEventLoop(s: UtpSocket) = s.eventLoop = eventLoop(s) diff --git a/tests/utp/test_utp_router.nim b/tests/utp/test_utp_router.nim index 75b6816..5c4c76b 100644 --- a/tests/utp/test_utp_router.nim +++ b/tests/utp/test_utp_router.nim @@ -312,6 +312,8 @@ procSuite "Utp router unit tests": let connectResult = await connectFuture + await waitUntil(proc (): bool = router.len() == 0) + check: connectResult.isErr() connectResult.error().kind == ConnectionTimedOut diff --git a/tests/utp/test_utp_socket.nim b/tests/utp/test_utp_socket.nim index cf17e3c..53027d0 100644 --- a/tests/utp/test_utp_socket.nim +++ b/tests/utp/test_utp_socket.nim @@ -1465,3 +1465,64 @@ procSuite "Utp socket unit test": receivedData == dataRecived await sock1.destroyWait() + + asyncTest "Clean up all resources when closing due to timeout failure": + let q = newAsyncQueue[Packet]() + let initialRemoteSeq = 10'u16 + + let dataToWrite = @[1'u8] + let customCfg = SocketConfig.init(dataResendsBeforeFailure = 2, optSndBuffer = 1) + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, cfg = customCfg) + + let bytesWritten = await outgoingSocket.write(dataToWrite) + # this future will never finish as there is not place in write buffer + # although it should get properly clearead up when socket is closed + let writeFut = outgoingSocket.write(dataToWrite) + + check: + bytesWritten.get() == len(dataToWrite) + + let sentPacket = await q.get() + # wait for failure and cleanup of all resources + await waitUntil(proc (): bool = outgoingSocket.isClosedAndCleanedUpAllResources()) + check: + # main event loop handler should fire and clean up dangling future + writeFut.finished() + await outgoingSocket.destroyWait() + + + asyncTest "Receiving ack for fin packet should destroy socket and clean up all resources": + let q = newAsyncQueue[Packet]() + let initialRemoteSeq = 10'u16 + + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q) + + outgoingSocket.close() + + let sendFin = await q.get() + + check: + sendFin.header.pType == ST_FIN + + let responseAck = + ackPacket( + initialRemoteSeq, + initialPacket.header.connectionId, + sendFin.header.seqNr, + testBufferSize, + 0 + ) + + await outgoingSocket.processPacket(responseAck) + + await waitUntil(proc (): bool = not outgoingSocket.isConnected()) + + check: + not outgoingSocket.isConnected() + + await waitUntil(proc (): bool = outgoingSocket.isClosedAndCleanedUpAllResources()) + + check: + outgoingSocket.isClosedAndCleanedUpAllResources() + + await outgoingSocket.destroyWait()