diff --git a/eth/utp/ledbat_congestion_control.nim b/eth/utp/ledbat_congestion_control.nim index 85ef201..f258c32 100644 --- a/eth/utp/ledbat_congestion_control.nim +++ b/eth/utp/ledbat_congestion_control.nim @@ -19,7 +19,7 @@ const targetDelay = milliseconds(100) # Typically it's less. TCP increases one MSS per RTT, which is 1500 const maxCwndIncreaseBytesPerRtt = 3000 -const minWindowSize = 10 +const minWindowSize* = 10 proc applyCongestionControl*( currentMaxWindowSize: uint32, diff --git a/eth/utp/send_buffer_tracker.nim b/eth/utp/send_buffer_tracker.nim index 6d9166b..5c90b8e 100644 --- a/eth/utp/send_buffer_tracker.nim +++ b/eth/utp/send_buffer_tracker.nim @@ -70,6 +70,10 @@ proc updateMaxRemote*(t: SendBufferTracker, newRemoteWindow: uint32) = t.maxRemoteWindow = newRemoteWindow t.notifyWaiters() +proc updateMaxWindow*(t: SendBufferTracker, maxWindow: uint32) = + t.maxWindow = maxWindow + t.notifyWaiters() + proc updateMaxWindowSize*(t: SendBufferTracker, newRemoteWindow: uint32, maxWindow: uint32) = t.maxRemoteWindow = newRemoteWindow t.maxWindow = maxWindow diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index 8a0ee9f..121d2b0 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -208,6 +208,12 @@ type # necessary to make sure we only fast resend once per packet fastResendSeqNr: uint16 + # last time we decreased max window + lastWindowDecay: Moment + + # counter of duplicate acks + duplicateAck: uint16 + #the slow-start threshold, in bytes slowStartTreshold: uint32 @@ -298,6 +304,11 @@ const reorderBufferMaxSize = 1024 + duplicateAcksBeforeResend = 3 + + # minimal time before subseqent window decays + maxWindowDecay = milliseconds(100) + proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T = UtpSocketKey[A](remoteAddress: remoteAddress, rcvId: rcvId) @@ -473,6 +484,9 @@ proc checkTimeouts(socket: UtpSocket) {.async.} = socket.retransmitTimeout = newTimeout socket.rtoTimeout = currentTime + newTimeout + # on timeout reset duplicate ack counter + socket.duplicateAck = 0 + let currentPacketSize = uint32(socket.getPacketSize()) if (socket.curWindowPackets == 0 and socket.sendBufferTracker.maxWindow > currentPacketSize): @@ -697,6 +711,7 @@ proc new[A]( slowStart: true, fastTimeout: false, fastResendSeqNr: initialSeqNr, + lastWindowDecay: currentTime - maxWindowDecay, slowStartTreshold: cfg.optSndBuffer, ourHistogram: DelayHistogram.init(currentTime), remoteHistogram: DelayHistogram.init(currentTime), @@ -974,9 +989,24 @@ proc calculateSelectiveAckBytes*(socket: UtpSocket, receivedPackedAckNr: uint16 return ackedBytes +# decays maxWindow size by half if time is right i.e it is at least 100m since last +# window decay +proc tryDecayWindow(socket: UtpSocket, now: Moment) = + if (now - socket.lastWindowDecay >= maxWindowDecay): + socket.lastWindowDecay = now + let newMaxWindow = max(uint32(0.5 * float64(socket.sendBufferTracker.maxWindow)), uint32(minWindowSize)) + + debug "Decaying maxWindow", + oldWindow = socket.sendBufferTracker.maxWindow, + newWindow = newMaxWindow + + socket.sendBufferTracker.updateMaxWindow(newMaxWindow) + socket.slowStart = false + socket.slowStartTreshold = newMaxWindow + # ack packets (removes them from out going buffer) based on selective ack extension header proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: SelectiveAckExtension, currentTime: Moment): void = - # we add 2, as the first bit in the mask therefore represents ackNr + 2 becouse + # we add 2, as the first bit in the mask therefore represents ackNr + 2 becouse # ackNr + 1 (i.e next expected packet) is considered lost. let base = receivedPackedAckNr + 2 @@ -985,12 +1015,25 @@ proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: S var bits = (len(ext.acks)) * 8 - 1 + # number of packets acked by this selective acks, it also works as duplicate ack + # counter. + # from spec: Each packet that is acked in the selective ack message counts as one duplicate ack + var counter = 0 + + # sequence numbers of packets which should be resend + var resends: seq[uint16] = @[] + while bits >= 0: let v = base + uint16(bits) if (socket.seqNr - v - 1) >= socket.curWindowPackets - 1: dec bits continue + + let bitSet: bool = getBit(ext.acks, bits) + + if bitSet: + inc counter let maybePacket = socket.outBuffer.get(v) @@ -1000,12 +1043,68 @@ proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: S let pkt = maybePacket.unsafeGet() - if (getBit(ext.acks, bits)): + if bitSet: + debug "Packet acked by selective ack", + pkSeqNr = v discard socket.ackPacket(v, currentTime) + dec bits + continue + + if counter >= duplicateAcksBeforeResend and (v - socket.fastResendSeqNr) <= reorderBufferMaxSize: + debug "No ack for packet", + pkAckNr = v, + dupAckCounter = counter, + fastResSeqNr = socket.fastResendSeqNr + resends.add(v) dec bits - # TODO Add handling of fast timeouts and duplicate acks counting + let nextExpectedPacketSeqNr = base - 1'u16 + # if we are about to start to resending first packet should be the first unacked packet + # ie. base - 1 + if counter >= duplicateAcksBeforeResend and (nextExpectedPacketSeqNr - socket.fastResendSeqNr) <= reorderBufferMaxSize: + debug "No ack for packet", + pkAckNr = nextExpectedPacketSeqNr, + dupAckCounter = counter, + fastResSeqNr = socket.fastResendSeqNr + resends.add(nextExpectedPacketSeqNr) + + var i = high(resends) + var registerLoss: bool = false + var packetsSent = 0 + while i >= 0: + let seqNrToResend: uint16 = resends[i] + + let maybePkt = socket.outBuffer.get(seqNrToResend) + + if maybePkt.isNone(): + # packet is no longer in send buffer ignore whole further processing + dec i + continue + + registerLoss = true + # it is safe to call as we already checked that packet is in send buffer + + socket.forceResendPacket(seqNrToResend) + socket.fastResendSeqNr = seqNrToResend + 1 + + debug "Resent packet", + pkSeqNr = seqNrToResend, + fastResendSeqNr = socket.fastResendSeqNr + + inc packetsSent + + # resend max 4 packets, this is not defined in spec but reference impl has + # that check + if packetsSent >= 4: + break + + dec i + + if registerLoss: + socket.tryDecayWindow(Moment.now()) + + socket.duplicateAck = uint16(counter) # Public mainly for test purposes # generates bit mask which indicates which packets are already in socket @@ -1110,6 +1209,35 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = # this case happens if the we already received this ack nr acks = 0 + # rationale from c reference impl: + # if we get the same ack_nr as in the last packet + # increase the duplicate_ack counter, otherwise reset + # it to 0. + # It's important to only count ACKs in ST_STATE packets. Any other + # packet (primarily ST_DATA) is likely to have been sent because of the + # other end having new outgoing data, not in response to incoming data. + # For instance, if we're receiving a steady stream of payload with no + # outgoing data, and we suddently have a few bytes of payload to send (say, + # a bittorrent HAVE message), we're very likely to see 3 duplicate ACKs + # immediately after sending our payload packet. This effectively disables + # the fast-resend on duplicate-ack logic for bi-directional connections + # (except in the case of a selective ACK). This is in line with BSD4.4 TCP + # implementation. + if socket.curWindowPackets > 0 and + pkAckNr == socket.seqNr - socket.curWindowPackets - 1 and + p.header.pType == ST_STATE: + inc socket.duplicateAck + + debug "Recevied duplicated ack", + pkAckNr = pkAckNr, + duplicatAckCounter = socket.duplicateAck + else: + socket.duplicateAck = 0 + # spec says that in case of duplicate ack counter larger that duplicateAcksBeforeResend + # we should re-send oldest packet, on the other hand refrence implementation + # has code path which does it commented out with todo. Currently to be as close + # to refrence impl we do not resend packets in that case + debug "Packet state variables", pastExpected = pastExpected, acks = acks diff --git a/tests/utp/test_utp_socket.nim b/tests/utp/test_utp_socket.nim index 895a597..fb43436 100644 --- a/tests/utp/test_utp_socket.nim +++ b/tests/utp/test_utp_socket.nim @@ -370,7 +370,7 @@ procSuite "Utp socket unit test": ) await socket.processPacket(ack) except CancelledError: - echo "foo" + discard asyncTest "Hitting RTO timeout with packets in flight should not decay window": let q = newAsyncQueue[Packet]() diff --git a/tests/utp/test_utp_socket_sack.nim b/tests/utp/test_utp_socket_sack.nim index a2294df..0433797 100644 --- a/tests/utp/test_utp_socket_sack.nim +++ b/tests/utp/test_utp_socket_sack.nim @@ -133,6 +133,9 @@ procSuite "Utp socket selective acks unit test": # indexes of packets which should be delivered to remote packetsDelivered: seq[int] + # indexes of packets which should be re-sent in resend testcases + packetsResent: seq[int] + let selectiveAckTestCases = @[ TestCase(numOfPackets: 2, packetsDelivered: @[1]), TestCase(numOfPackets: 10, packetsDelivered: @[1, 3, 5, 7, 9]), @@ -144,6 +147,32 @@ procSuite "Utp socket selective acks unit test": TestCase(numOfPackets: 33, packetsDelivered: toSeq(1..32)) ] + proc setupTestCase( + dataToWrite: seq[byte], + initialRemoteSeq: uint16, + outgoingQueue: AsyncQueue[Packet], + incomingQueue: AsyncQueue[Packet], + testCase: TestCase): Future[(UtpSocket[TransportAddress], UtpSocket[TransportAddress], seq[Packet])] {.async.} = + let (outgoingSocket, incomingSocket) = + connectOutGoingSocketWithIncoming( + initialRemoteSeq, + outgoingQueue, + incomingQueue + ) + + var packets: seq[Packet] = @[] + + for _ in 0.. initialBufferSize + else: + check: + # due to ledbat congestion control we cannot assert on precise end buffer size, + # but due to packet loss we are sure it shoul be smaller that at the beginning + # becouse of 0.5 muliplayer + endBufferSize < initialBufferSize