From 7afd44d33e1657f7c2e2c65d5723811574b89497 Mon Sep 17 00:00:00 2001 From: KonradStaniec Date: Wed, 26 Jan 2022 09:49:34 +0100 Subject: [PATCH] Implement fast resend logic (#466) --- eth/utp/send_buffer_tracker.nim | 13 ++--- eth/utp/utp_socket.nim | 94 ++++++++++++++++++++++++--------- tests/utp/test_utp_socket.nim | 68 ++++++++++++++++++++++++ 3 files changed, 145 insertions(+), 30 deletions(-) diff --git a/eth/utp/send_buffer_tracker.nim b/eth/utp/send_buffer_tracker.nim index ed480ed..6d9166b 100644 --- a/eth/utp/send_buffer_tracker.nim +++ b/eth/utp/send_buffer_tracker.nim @@ -51,7 +51,7 @@ proc currentFreeBytes*(t: SendBufferTracker): uint32 = else: return maxSend - t.currentWindow -proc checkWaiters(t: SendBufferTracker) = +proc notifyWaiters*(t: SendBufferTracker) = var i = 0 while i < len(t.waiters): let freeSpace = t.currentFreeBytes() @@ -68,18 +68,16 @@ proc checkWaiters(t: SendBufferTracker) = proc updateMaxRemote*(t: SendBufferTracker, newRemoteWindow: uint32) = t.maxRemoteWindow = newRemoteWindow - t.checkWaiters() + t.notifyWaiters() proc updateMaxWindowSize*(t: SendBufferTracker, newRemoteWindow: uint32, maxWindow: uint32) = t.maxRemoteWindow = newRemoteWindow t.maxWindow = maxWindow - t.checkWaiters() + t.notifyWaiters() -proc decreaseCurrentWindow*(t: SendBufferTracker, value: uint32, notifyWaiters: bool) = +proc decreaseCurrentWindow*(t: SendBufferTracker, value: uint32) = doAssert(t.currentWindow >= value) t.currentWindow = t.currentWindow - value - if (notifyWaiters): - t.checkWaiters() proc reserveNBytesWait*(t: SendBufferTracker, n: uint32): Future[void] = let fut = newFuture[void]("SendBufferTracker.reserveNBytesWait") @@ -99,4 +97,7 @@ proc reserveNBytes*(t: SendBufferTracker, n: uint32): bool = else: return false +proc forceReserveNBytes*(t: SendBufferTracker, n: uint32) = + t.currentWindow = t.currentWindow + n + proc currentBytesInFlight*(t: SendBufferTracker): uint32 = t.currentWindow diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index 0680f05..8a0ee9f 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -200,6 +200,14 @@ type # indicator if we're in slow-start (exponential growth) phase slowStart: bool + # indiciator if we're in fast time out mode i.e we will resent + # oldest packet un-acket in case of newer packet arriving + fastTimeout: bool + + # Sequence number of the next packet we are allowed to fast-resend. This is + # necessary to make sure we only fast resend once per packet + fastResendSeqNr: uint16 + #the slow-start threshold, in bytes slowStartTreshold: uint32 @@ -391,7 +399,7 @@ proc markAllPacketAsLost(s: UtpSocket) = # lack of waiters notification in case of timeout effectivly means that # we do not allow any new bytes to enter snd buffer in case of new free space # due to timeout. - s.sendBufferTracker.decreaseCurrentWindow(packetPayloadLength, notifyWaiters = false) + s.sendBufferTracker.decreaseCurrentWindow(packetPayloadLength) inc i @@ -406,6 +414,20 @@ proc shouldDisconnectFromFailedRemote(socket: UtpSocket): bool = (socket.state == SynSent and socket.retransmitCount >= 2) or (socket.retransmitCount >= socket.socketConfig.dataResendsBeforeFailure) +# Forces asynchronous re-send of packet waiting in outgoing buffer +proc forceResendPacket(socket: UtpSocket, pkSeqNr: uint16) = + doAssert( + socket.outBuffer.get(pkSeqNr).isSome(), + "Force resend should be called only on packet still in outgoing buffer" + ) + if socket.outBuffer[pkSeqNr].needResend: + # if needResend is set to true it means that packet payload was already + # removed from the bytes window and need to be re-added. + socket.sendBufferTracker.forceReserveNBytes(socket.outBuffer[pkSeqNr].payloadLength) + + let data = socket.setSend(socket.outBuffer[pkSeqNr]) + discard socket.sendData(data) + proc checkTimeouts(socket: UtpSocket) {.async.} = let currentTime = getMonoTimestamp().moment # flush all packets which needs to be re-send @@ -493,30 +515,19 @@ proc checkTimeouts(socket: UtpSocket) {.async.} = # resend oldest packet if there are some packets in flight if (socket.curWindowPackets > 0): - inc socket.retransmitCount let oldestPacketSeqNr = socket.seqNr - socket.curWindowPackets - # TODO add handling of fast timeout - - doAssert( - socket.outBuffer.get(oldestPacketSeqNr).isSome(), - "oldest packet should always be available when there is data in flight" - ) - - let payloadLength = socket.outBuffer[oldestPacketSeqNr].payloadLength - if (socket.sendBufferTracker.reserveNBytes(payloadLength)): - debug "Resending oldest packet in outBuffer", - seqNr = oldestPacketSeqNr, - curWindowPackets = socket.curWindowPackets - - let dataToSend = socket.setSend(socket.outBuffer[oldestPacketSeqNr]) - await socket.sendData(dataToSend) - else: - # TODO Logs added here to check if we need to check for spcae in send buffer - # reference impl does not do it. - debug "Should resend oldest packet in outBuffer but there is no place for more bytes in send buffer", - seqNr = oldestPacketSeqNr, - curWindowPackets = socket.curWindowPackets + + inc socket.retransmitCount + socket.fastTimeout = true + + debug "Resending oldest packet", + pkSeqNr = oldestPacketSeqNr, + retransmitCount = socket.retransmitCount, + curWindowPackets = socket.curWindowPackets + # Oldest packet should always be present, so it is safe to call force + # resend + socket.forceResendPacket(oldestPacketSeqNr) # TODO add sending keep alives when necessary @@ -684,6 +695,8 @@ proc new[A]( zeroWindowTimer: currentTime + cfg.remoteWindowResetTimeout, socketKey: UtpSocketKey.init(to, rcvId), slowStart: true, + fastTimeout: false, + fastResendSeqNr: initialSeqNr, slowStartTreshold: cfg.optSndBuffer, ourHistogram: DelayHistogram.init(currentTime), remoteHistogram: DelayHistogram.init(currentTime), @@ -860,7 +873,10 @@ proc ackPacket(socket: UtpSocket, seqNr: uint16, currentTime: Moment): AckResult # been considered timed-out, and is not included in # the cur_window anymore if (not packet.needResend): - socket.sendBufferTracker.decreaseCurrentWindow(packet.payloadLength, notifyWaiters = true) + socket.sendBufferTracker.decreaseCurrentWindow(packet.payloadLength) + + # we notify all waiters that there is possibly new space in send buffer + socket.sendBufferTracker.notifyWaiters() socket.retransmitCount = 0 PacketAcked @@ -1212,6 +1228,10 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = # but in theory remote could stil write some data on this socket (or even its own fin) socket.destroy() + # Update fast resend counter to avoid resending old packet twice + if wrapCompareLess(socket.fastResendSeqNr, pkAckNr + 1): + socket.fastResendSeqNr = pkAckNr + 1 + socket.ackPackets(acks, timestampInfo.moment) # packets in front may have been acked by selective ack, decrease window until we hit @@ -1221,6 +1241,32 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = debug "Packet in front hase been acked by selective ack. Decrese window", windowPackets = socket.curWindowPackets + # fast timeout + if socket.fastTimeout: + let oldestOutstandingPktSeqNr = socket.seqNr - socket.curWindowPackets + + debug "Hit fast timeout re-send", + curWindowPackets = socket.curWindowPackets, + oldesPkSeqNr = oldestOutstandingPktSeqNr, + fastResendSeqNr = socket.fastResendSeqNr + + + if oldestOutstandingPktSeqNr != socket.fastResendSeqNr: + # fastResendSeqNr do not point to oldest unacked packet, we probably already resent + # packet that timed-out. Leave fast timeout mode + socket.fastTimeout = false + else: + let shouldReSendPacket = socket.outBuffer.exists(oldestOutstandingPktSeqNr, (p: OutgoingPacket) => p.transmissions > 0) + if shouldReSendPacket: + debug "Packet fast timeout resend", + pkSeqNr = oldestOutstandingPktSeqNr + + inc socket.fastResendSeqNr + + # Is is safe to call force resend as we already checked shouldReSendPacket + # condition + socket.forceResendPacket(oldestOutstandingPktSeqNr) + if (p.eack.isSome()): socket.selectiveAckPackets(pkAckNr, p.eack.unsafeGet(), timestampInfo.moment) diff --git a/tests/utp/test_utp_socket.nim b/tests/utp/test_utp_socket.nim index 9927318..895a597 100644 --- a/tests/utp/test_utp_socket.nim +++ b/tests/utp/test_utp_socket.nim @@ -1331,6 +1331,8 @@ procSuite "Utp socket unit test": await outgoingSocket.processPacket(dataP1) + let fastResend = await q.get() + let ack = await q.get() check: @@ -1345,3 +1347,69 @@ procSuite "Utp socket unit test": thirdSend.header.ackNr > secondSend.header.ackNr await outgoingSocket.destroyWait() + + asyncTest "Should support fast timeout ": + let q = newAsyncQueue[Packet]() + let initialRemoteSeq = 10'u16 + + # small writes to make sure it will be 3 different packets + let dataToWrite1 = @[1'u8] + let dataToWrite2 = @[1'u8] + let dataToWrite3 = @[1'u8] + + + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q) + + let writeRes1 = await outgoingSocket.write(dataToWrite1) + let writeRes2 = await outgoingSocket.write(dataToWrite2) + let writeRes3 = await outgoingSocket.write(dataToWrite3) + + check: + writeRes1.isOk() + writeRes2.isOk() + writeRes3.isOk() + + # drain queue of all sent packets + let sent1 = await q.get() + let sent2 = await q.get() + let sent3 = await q.get() + + # wait for first timeout. Socket will enter fast timeout mode + let reSent1 = await q.get() + + check: + # check that re-sent packet is the oldest one + reSent1.payload == sent1.payload + reSent1.header.seqNr == sent1.header.seqNr + + # ack which will ack our re-sent packet + let responseAck = + ackPacket( + initialRemoteSeq, + initialPacket.header.connectionId, + reSent1.header.seqNr, + testBufferSize, + 0 + ) + + await outgoingSocket.processPacket(responseAck) + + let fastResentPacket = await q.get() + + check: + # second packet is now oldest unacked packet so it should be the one which + # is send during fast resend + fastResentPacket.payload == sent2.payload + fastResentPacket.header.seqNr == sent2.header.seqNr + + # duplicate ack, processing it should not fast-resend any packet + await outgoingSocket.processPacket(responseAck) + + let resent3 = await q.get() + + check: + # in next timeout cycle packet nr3 is the only one waiting for re-send + resent3.payload == sent3.payload + resent3.header.seqNr == sent3.header.seqNr + + await outgoingSocket.destroyWait()