diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index 867152e..0680f05 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -469,8 +469,10 @@ proc checkTimeouts(socket: UtpSocket) {.async.} = socket.sendBufferTracker.maxRemoteWindow, newMaxWindow ) - else: - + elif (socket.sendBufferTracker.maxWindow < currentPacketSize): + # due to high delay window has shrunk below packet size + # which means that we cannot send more data + # reset it to fit at least one packet debug "Reseting window size do fit a least one packet", oldWindowSize = socket.sendBufferTracker.maxWindow, newWindowSize = currentPacketSize diff --git a/tests/utp/test_utp_socket.nim b/tests/utp/test_utp_socket.nim index d27b449..9927318 100644 --- a/tests/utp/test_utp_socket.nim +++ b/tests/utp/test_utp_socket.nim @@ -354,6 +354,72 @@ procSuite "Utp socket unit test": await outgoingSocket.destroyWait() + proc ackAllPacket( + socket: UtpSocket[TransportAddress], + queue: AsyncQueue[Packet], + initialRemoteSeq: uint16): Future[void] {.async.} = + try: + while true: + let sentPacket = await queue.get() + let ack = ackPacket( + initialRemoteSeq, + sentPacket.header.connectionId, + sentPacket.header.seqNr, + 1024 * 1024, + 1000'u32 + ) + await socket.processPacket(ack) + except CancelledError: + echo "foo" + + asyncTest "Hitting RTO timeout with packets in flight should not decay window": + let q = newAsyncQueue[Packet]() + let initialRemoteSeq = 10'u16 + + # lot of data which will generate at least 5 packets + let bigDataTowWrite = generateByteArray(rng[], 10000) + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q) + + let acker = outgoingSocket.ackAllPacket(q, initialRemoteSeq) + let bytesWritten = await outgoingSocket.write(bigDataTowWrite) + + check: + bytesWritten.get() == len(bigDataTowWrite) + + await waitUntil(proc (): bool = outgoingSocket.numPacketsInOutGoingBuffer() == 0) + + let maxWindowAfterSuccesfulSends = outgoingSocket.currentMaxWindowSize() + + check: + # after processing a lot of data, our window size should be a lot bigger than our packet size + maxWindowAfterSuccesfulSends > uint32(outgoingSocket.getPacketSize()) + + # cancel acking process, next writes will for sure timeout + await acker.cancelAndWait() + + # data which fits one packet and will timeout + let smallerData = generateByteArray(rng[], 100) + + let bytesWritten1 = await outgoingSocket.write(smallerData) + + # ignore standard sent packet + discard await q.get() + + check: + bytesWritten1.get() == len(smallerData) + + # ignore also first re-send + discard await q.get() + + let maxWindowAfterTimeout = outgoingSocket.currentMaxWindowSize() + + check: + # After standard timeout window should not decay and must be bigger than packet size + maxWindowAfterTimeout > uint32(outgoingSocket.getPacketSize()) + maxWindowAfterTimeout == maxWindowAfterSuccesfulSends + + await outgoingSocket.destroyWait() + asyncTest "Blocked writing futures should be properly finished when socket is closed": let q = newAsyncQueue[Packet]() let initialRemoteSeq = 10'u16