Implement fast resend logic for selective acks (#468)

* Implement fast resend logic for selective acks
This commit is contained in:
KonradStaniec 2022-01-27 11:07:40 +01:00 committed by GitHub
parent 7afd44d33e
commit 5791afccc3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 244 additions and 37 deletions

View File

@ -19,7 +19,7 @@ const targetDelay = milliseconds(100)
# Typically it's less. TCP increases one MSS per RTT, which is 1500 # Typically it's less. TCP increases one MSS per RTT, which is 1500
const maxCwndIncreaseBytesPerRtt = 3000 const maxCwndIncreaseBytesPerRtt = 3000
const minWindowSize = 10 const minWindowSize* = 10
proc applyCongestionControl*( proc applyCongestionControl*(
currentMaxWindowSize: uint32, currentMaxWindowSize: uint32,

View File

@ -70,6 +70,10 @@ proc updateMaxRemote*(t: SendBufferTracker, newRemoteWindow: uint32) =
t.maxRemoteWindow = newRemoteWindow t.maxRemoteWindow = newRemoteWindow
t.notifyWaiters() t.notifyWaiters()
proc updateMaxWindow*(t: SendBufferTracker, maxWindow: uint32) =
t.maxWindow = maxWindow
t.notifyWaiters()
proc updateMaxWindowSize*(t: SendBufferTracker, newRemoteWindow: uint32, maxWindow: uint32) = proc updateMaxWindowSize*(t: SendBufferTracker, newRemoteWindow: uint32, maxWindow: uint32) =
t.maxRemoteWindow = newRemoteWindow t.maxRemoteWindow = newRemoteWindow
t.maxWindow = maxWindow t.maxWindow = maxWindow

View File

@ -208,6 +208,12 @@ type
# necessary to make sure we only fast resend once per packet # necessary to make sure we only fast resend once per packet
fastResendSeqNr: uint16 fastResendSeqNr: uint16
# last time we decreased max window
lastWindowDecay: Moment
# counter of duplicate acks
duplicateAck: uint16
#the slow-start threshold, in bytes #the slow-start threshold, in bytes
slowStartTreshold: uint32 slowStartTreshold: uint32
@ -298,6 +304,11 @@ const
reorderBufferMaxSize = 1024 reorderBufferMaxSize = 1024
duplicateAcksBeforeResend = 3
# minimal time before subseqent window decays
maxWindowDecay = milliseconds(100)
proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T = proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T =
UtpSocketKey[A](remoteAddress: remoteAddress, rcvId: rcvId) UtpSocketKey[A](remoteAddress: remoteAddress, rcvId: rcvId)
@ -473,6 +484,9 @@ proc checkTimeouts(socket: UtpSocket) {.async.} =
socket.retransmitTimeout = newTimeout socket.retransmitTimeout = newTimeout
socket.rtoTimeout = currentTime + newTimeout socket.rtoTimeout = currentTime + newTimeout
# on timeout reset duplicate ack counter
socket.duplicateAck = 0
let currentPacketSize = uint32(socket.getPacketSize()) let currentPacketSize = uint32(socket.getPacketSize())
if (socket.curWindowPackets == 0 and socket.sendBufferTracker.maxWindow > currentPacketSize): if (socket.curWindowPackets == 0 and socket.sendBufferTracker.maxWindow > currentPacketSize):
@ -697,6 +711,7 @@ proc new[A](
slowStart: true, slowStart: true,
fastTimeout: false, fastTimeout: false,
fastResendSeqNr: initialSeqNr, fastResendSeqNr: initialSeqNr,
lastWindowDecay: currentTime - maxWindowDecay,
slowStartTreshold: cfg.optSndBuffer, slowStartTreshold: cfg.optSndBuffer,
ourHistogram: DelayHistogram.init(currentTime), ourHistogram: DelayHistogram.init(currentTime),
remoteHistogram: DelayHistogram.init(currentTime), remoteHistogram: DelayHistogram.init(currentTime),
@ -974,6 +989,21 @@ proc calculateSelectiveAckBytes*(socket: UtpSocket, receivedPackedAckNr: uint16
return ackedBytes return ackedBytes
# decays maxWindow size by half if time is right i.e it is at least 100m since last
# window decay
proc tryDecayWindow(socket: UtpSocket, now: Moment) =
if (now - socket.lastWindowDecay >= maxWindowDecay):
socket.lastWindowDecay = now
let newMaxWindow = max(uint32(0.5 * float64(socket.sendBufferTracker.maxWindow)), uint32(minWindowSize))
debug "Decaying maxWindow",
oldWindow = socket.sendBufferTracker.maxWindow,
newWindow = newMaxWindow
socket.sendBufferTracker.updateMaxWindow(newMaxWindow)
socket.slowStart = false
socket.slowStartTreshold = newMaxWindow
# ack packets (removes them from out going buffer) based on selective ack extension header # ack packets (removes them from out going buffer) based on selective ack extension header
proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: SelectiveAckExtension, currentTime: Moment): void = proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: SelectiveAckExtension, currentTime: Moment): void =
# we add 2, as the first bit in the mask therefore represents ackNr + 2 becouse # we add 2, as the first bit in the mask therefore represents ackNr + 2 becouse
@ -985,6 +1015,14 @@ proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: S
var bits = (len(ext.acks)) * 8 - 1 var bits = (len(ext.acks)) * 8 - 1
# number of packets acked by this selective acks, it also works as duplicate ack
# counter.
# from spec: Each packet that is acked in the selective ack message counts as one duplicate ack
var counter = 0
# sequence numbers of packets which should be resend
var resends: seq[uint16] = @[]
while bits >= 0: while bits >= 0:
let v = base + uint16(bits) let v = base + uint16(bits)
@ -992,6 +1030,11 @@ proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: S
dec bits dec bits
continue continue
let bitSet: bool = getBit(ext.acks, bits)
if bitSet:
inc counter
let maybePacket = socket.outBuffer.get(v) let maybePacket = socket.outBuffer.get(v)
if (maybePacket.isNone() or maybePacket.unsafeGet().transmissions == 0): if (maybePacket.isNone() or maybePacket.unsafeGet().transmissions == 0):
@ -1000,12 +1043,68 @@ proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: S
let pkt = maybePacket.unsafeGet() let pkt = maybePacket.unsafeGet()
if (getBit(ext.acks, bits)): if bitSet:
debug "Packet acked by selective ack",
pkSeqNr = v
discard socket.ackPacket(v, currentTime) discard socket.ackPacket(v, currentTime)
dec bits
continue
if counter >= duplicateAcksBeforeResend and (v - socket.fastResendSeqNr) <= reorderBufferMaxSize:
debug "No ack for packet",
pkAckNr = v,
dupAckCounter = counter,
fastResSeqNr = socket.fastResendSeqNr
resends.add(v)
dec bits dec bits
# TODO Add handling of fast timeouts and duplicate acks counting let nextExpectedPacketSeqNr = base - 1'u16
# if we are about to start to resending first packet should be the first unacked packet
# ie. base - 1
if counter >= duplicateAcksBeforeResend and (nextExpectedPacketSeqNr - socket.fastResendSeqNr) <= reorderBufferMaxSize:
debug "No ack for packet",
pkAckNr = nextExpectedPacketSeqNr,
dupAckCounter = counter,
fastResSeqNr = socket.fastResendSeqNr
resends.add(nextExpectedPacketSeqNr)
var i = high(resends)
var registerLoss: bool = false
var packetsSent = 0
while i >= 0:
let seqNrToResend: uint16 = resends[i]
let maybePkt = socket.outBuffer.get(seqNrToResend)
if maybePkt.isNone():
# packet is no longer in send buffer ignore whole further processing
dec i
continue
registerLoss = true
# it is safe to call as we already checked that packet is in send buffer
socket.forceResendPacket(seqNrToResend)
socket.fastResendSeqNr = seqNrToResend + 1
debug "Resent packet",
pkSeqNr = seqNrToResend,
fastResendSeqNr = socket.fastResendSeqNr
inc packetsSent
# resend max 4 packets, this is not defined in spec but reference impl has
# that check
if packetsSent >= 4:
break
dec i
if registerLoss:
socket.tryDecayWindow(Moment.now())
socket.duplicateAck = uint16(counter)
# Public mainly for test purposes # Public mainly for test purposes
# generates bit mask which indicates which packets are already in socket # generates bit mask which indicates which packets are already in socket
@ -1110,6 +1209,35 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
# this case happens if the we already received this ack nr # this case happens if the we already received this ack nr
acks = 0 acks = 0
# rationale from c reference impl:
# if we get the same ack_nr as in the last packet
# increase the duplicate_ack counter, otherwise reset
# it to 0.
# It's important to only count ACKs in ST_STATE packets. Any other
# packet (primarily ST_DATA) is likely to have been sent because of the
# other end having new outgoing data, not in response to incoming data.
# For instance, if we're receiving a steady stream of payload with no
# outgoing data, and we suddently have a few bytes of payload to send (say,
# a bittorrent HAVE message), we're very likely to see 3 duplicate ACKs
# immediately after sending our payload packet. This effectively disables
# the fast-resend on duplicate-ack logic for bi-directional connections
# (except in the case of a selective ACK). This is in line with BSD4.4 TCP
# implementation.
if socket.curWindowPackets > 0 and
pkAckNr == socket.seqNr - socket.curWindowPackets - 1 and
p.header.pType == ST_STATE:
inc socket.duplicateAck
debug "Recevied duplicated ack",
pkAckNr = pkAckNr,
duplicatAckCounter = socket.duplicateAck
else:
socket.duplicateAck = 0
# spec says that in case of duplicate ack counter larger that duplicateAcksBeforeResend
# we should re-send oldest packet, on the other hand refrence implementation
# has code path which does it commented out with todo. Currently to be as close
# to refrence impl we do not resend packets in that case
debug "Packet state variables", debug "Packet state variables",
pastExpected = pastExpected, pastExpected = pastExpected,
acks = acks acks = acks

View File

@ -370,7 +370,7 @@ procSuite "Utp socket unit test":
) )
await socket.processPacket(ack) await socket.processPacket(ack)
except CancelledError: except CancelledError:
echo "foo" discard
asyncTest "Hitting RTO timeout with packets in flight should not decay window": asyncTest "Hitting RTO timeout with packets in flight should not decay window":
let q = newAsyncQueue[Packet]() let q = newAsyncQueue[Packet]()

View File

@ -133,6 +133,9 @@ procSuite "Utp socket selective acks unit test":
# indexes of packets which should be delivered to remote # indexes of packets which should be delivered to remote
packetsDelivered: seq[int] packetsDelivered: seq[int]
# indexes of packets which should be re-sent in resend testcases
packetsResent: seq[int]
let selectiveAckTestCases = @[ let selectiveAckTestCases = @[
TestCase(numOfPackets: 2, packetsDelivered: @[1]), TestCase(numOfPackets: 2, packetsDelivered: @[1]),
TestCase(numOfPackets: 10, packetsDelivered: @[1, 3, 5, 7, 9]), TestCase(numOfPackets: 10, packetsDelivered: @[1, 3, 5, 7, 9]),
@ -144,15 +147,12 @@ procSuite "Utp socket selective acks unit test":
TestCase(numOfPackets: 33, packetsDelivered: toSeq(1..32)) TestCase(numOfPackets: 33, packetsDelivered: toSeq(1..32))
] ]
asyncTest "Socket should calculate number of bytes acked by selective acks": proc setupTestCase(
let dataSize = 10 dataToWrite: seq[byte],
let initialRemoteSeq = 10'u16 initialRemoteSeq: uint16,
let smallData = generateByteArray(rng[], 10) outgoingQueue: AsyncQueue[Packet],
incomingQueue: AsyncQueue[Packet],
for testCase in selectiveAckTestCases: testCase: TestCase): Future[(UtpSocket[TransportAddress], UtpSocket[TransportAddress], seq[Packet])] {.async.} =
let outgoingQueue = newAsyncQueue[Packet]()
let incomingQueue = newAsyncQueue[Packet]()
let (outgoingSocket, incomingSocket) = let (outgoingSocket, incomingSocket) =
connectOutGoingSocketWithIncoming( connectOutGoingSocketWithIncoming(
initialRemoteSeq, initialRemoteSeq,
@ -163,13 +163,33 @@ procSuite "Utp socket selective acks unit test":
var packets: seq[Packet] = @[] var packets: seq[Packet] = @[]
for _ in 0..<testCase.numOfPackets: for _ in 0..<testCase.numOfPackets:
discard await outgoingSocket.write(smallData) discard await outgoingSocket.write(dataToWrite)
let packet = await outgoingQueue.get() let packet = await outgoingQueue.get()
packets.add(packet) packets.add(packet)
for toDeliver in testCase.packetsDelivered: for toDeliver in testCase.packetsDelivered:
await incomingSocket.processPacket(packets[toDeliver]) await incomingSocket.processPacket(packets[toDeliver])
return (outgoingSocket, incomingSocket, packets)
asyncTest "Socket should calculate number of bytes acked by selective acks":
let dataSize = 10
let initialRemoteSeq = 10'u16
let smallData = generateByteArray(rng[], 10)
for testCase in selectiveAckTestCases:
let outgoingQueue = newAsyncQueue[Packet]()
let incomingQueue = newAsyncQueue[Packet]()
let (outgoingSocket, incomingSocket, _) = await setupTestCase(
smallData,
initialRemoteSeq,
outgoingQueue,
incomingQueue,
testCase
)
let finalAck = incomingSocket.generateAckPacket() let finalAck = incomingSocket.generateAckPacket()
check: check:
@ -201,23 +221,14 @@ procSuite "Utp socket selective acks unit test":
let outgoingQueue = newAsyncQueue[Packet]() let outgoingQueue = newAsyncQueue[Packet]()
let incomingQueue = newAsyncQueue[Packet]() let incomingQueue = newAsyncQueue[Packet]()
let (outgoingSocket, incomingSocket) = let (outgoingSocket, incomingSocket, _) = await setupTestCase(
connectOutGoingSocketWithIncoming( smallData,
initialRemoteSeq, initialRemoteSeq,
outgoingQueue, outgoingQueue,
incomingQueue incomingQueue,
testCase
) )
var packets: seq[Packet] = @[]
for _ in 0..<testCase.numOfPackets:
discard await outgoingSocket.write(smallData)
let packet = await outgoingQueue.get()
packets.add(packet)
for toDeliver in testCase.packetsDelivered:
await incomingSocket.processPacket(packets[toDeliver])
let finalAck = incomingSocket.generateAckPacket() let finalAck = incomingSocket.generateAckPacket()
check: check:
@ -242,3 +253,67 @@ procSuite "Utp socket selective acks unit test":
await outgoingSocket.destroyWait() await outgoingSocket.destroyWait()
await incomingSocket.destroyWait() await incomingSocket.destroyWait()
let packetResendTestCases = @[
TestCase(numOfPackets: 4, packetsDelivered: @[2, 3], packetsResent: @[]),
TestCase(numOfPackets: 4, packetsDelivered: @[1, 2, 3], packetsResent: @[0]),
TestCase(numOfPackets: 5, packetsDelivered: @[2, 3, 4], packetsResent: @[0, 1]),
TestCase(numOfPackets: 6, packetsDelivered: @[3, 4, 5], packetsResent: @[0, 1, 2]),
TestCase(numOfPackets: 7, packetsDelivered: @[4, 5, 6], packetsResent: @[0, 1, 2, 3]),
TestCase(numOfPackets: 8, packetsDelivered: @[5, 6, 7], packetsResent: @[0, 1, 2, 3]),
TestCase(numOfPackets: 10, packetsDelivered: @[3, 7, 8], packetsResent: @[0, 1, 2]),
TestCase(numOfPackets: 10, packetsDelivered: @[1, 2, 3, 7, 8, 9], packetsResent: @[0, 4, 5, 6]),
TestCase(numOfPackets: 10, packetsDelivered: @[1, 8, 9], packetsResent: @[0])
]
asyncTest "Socket should re-send packets when there are at least 3 packets acked ahead":
let dataSize = 10
let initialRemoteSeq = 10'u16
let smallData = generateByteArray(rng[], 10)
for testCase in packetResendTestCases:
let outgoingQueue = newAsyncQueue[Packet]()
let incomingQueue = newAsyncQueue[Packet]()
let (outgoingSocket, incomingSocket, initialPackets) = await setupTestCase(
smallData,
initialRemoteSeq,
outgoingQueue,
incomingQueue,
testCase
)
let initialBufferSize = outgoingSocket.currentMaxWindowSize()
let finalAck = incomingSocket.generateAckPacket()
check:
finalAck.eack.isSome()
let mask = finalAck.eack.unsafeGet().acks
let numOfDeliveredPackets = len(testCase.packetsDelivered)
check:
numOfSetBits(mask) == numOfDeliveredPackets
await outgoingSocket.processPacket(finalAck)
for idx in testCase.packetsResent:
let resentPacket = await outgoingQueue.get()
check:
resentPacket.header.seqNr == initialPackets[idx].header.seqNr
let endBufferSize = outgoingSocket.currentMaxWindowSize()
if len(testCase.packetsResent) == 0:
check:
# when there is no packet loss (no resent packets), buffer size increases
# due to packets acked by selective ack
endBufferSize > initialBufferSize
else:
check:
# due to ledbat congestion control we cannot assert on precise end buffer size,
# but due to packet loss we are sure it shoul be smaller that at the beginning
# becouse of 0.5 muliplayer
endBufferSize < initialBufferSize