Ledbat congestion control (#443)

* Return delay to remote peer

* Initial ledbat window calculation

* Add tests for window grow and decay

* Add delay histograms

* Add calculation of clock drift
This commit is contained in:
KonradStaniec 2021-12-09 10:52:21 +01:00 committed by GitHub
parent ae0920d40d
commit b4066a5688
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 918 additions and 75 deletions

View File

@ -0,0 +1,91 @@
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
chronos
const
# how long do we collect samples before calculating average
averageTime = seconds(5)
# calculates 5s rolling average of incoming delays, which represent clock drift.
type ClockDriftCalculator* = object
# average of all delay samples compared to initial one. Average is done over
# 5s
averageDelay: int32
# sum of all recent delay samples. All samples are relative to first sample
# averageDelayBase
currentDelaySum: int64
# number if samples in sum
currentDelaySamples: int
# set to first sample, all further samples are taken in relative to this one
averageDelayBase: uint32
# next time we should average samples
averageSampleTime: Moment
# estimated clock drift in microseconds per 5 seconds
clockDrift*: int32
# last calculated drift
lastClockDrift*: int32
proc init*(T: type ClockDriftCalculator, currentTime: Moment): T =
T(
averageSampleTime: currentTime + averageTime
)
proc addSample*(c: var ClockDriftCalculator, actualDelay: uint32, currentTime: Moment) =
if (actualDelay == 0):
return
# this is our first sample, initialise our delay base
if c.averageDelayBase == 0:
c.averageDelayBase = actualDelay
let distDown = c.averageDelayBase - actualDelay
let distUp = actualDelay - c.averageDelayBase
let averageDelaySample =
if (distDown > distUp):
# averageDelayBase is smaller that actualDelay, sample should be positive
int64(distUp)
else:
# averageDelayBase is bigger or equal to actualDelay, sample should be negative
-int64(distDown)
c.currentDelaySum = c.currentDelaySum + averageDelaySample
inc c.currentDelaySamples
if (currentTime > c.averageSampleTime):
# it is time to average our samples
var prevAverageDelay = c.averageDelay
c.averageDelay = int32(c.currentDelaySum div c.currentDelaySamples)
c.averageSampleTime = c.averageSampleTime + averageTime
c.currentDelaySum = 0
c.currentDelaySamples = 0
# normalize average samples
let minSample = min(prevAverageDelay, c.averageDelay)
let maxSample = max(prevAverageDelay, c.averageDelay)
var adjust = 0
if (minSample > 0):
adjust = -minSample
elif (maxSample < 0):
adjust = -maxSample
if (adjust != 0):
c.averageDelayBase = c.averageDelayBase - uint32(adjust)
c.averageDelay = c.averageDelay + int32(adjust)
prevAverageDelay = prevAverageDelay + int32(adjust)
let drift = c.averageDelay - prevAverageDelay
# rolling average
c.clockDrift = int32((int64(c.clockDrift) * 7 + drift) div 8)
c.lastClockDrift = drift

View File

@ -0,0 +1,71 @@
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
chronos,
./utp_utils
const
currentDelaySize = 3
delayBaseHistory = 13
delayBaseUpdateInterval = minutes(1)
type
DelayHistogram* = object
delayBase*: uint32
currentDelayHistory: array[currentDelaySize, uint32]
currentDelyIdx: int
delayBaseHistory: array[delayBaseHistory, uint32]
delayBaseIdx: int
delayBaseTime: Moment
proc init*(T: type DelayHistogram, currentTime: Moment): T =
DelayHistogram(
delayBaseTime: currentTime
)
proc shift*(h: var DelayHistogram, offset: uint32) =
for sample in h.delayBaseHistory.mitems():
sample = sample + offset
h.delayBase = h.delayBase + offset
proc addSample*(h: var DelayHistogram, sample: uint32, currentTime: Moment) =
# if delay base is zero it means it is our first sample. Initialize necessary parts
if h.delayBase == 0:
h.delayBase = sample
for i in h.delayBaseHistory.mitems():
i = sample
if wrapCompareLess(sample, h.delayBaseHistory[h.delayBaseIdx]):
h.delayBaseHistory[h.delayBaseIdx] = sample
if wrapCompareLess(sample, h.delayBase):
h.delay_base = sample
let delay = sample - h.delayBase
h.currentDelayHistory[h.currentDelyIdx] = delay
h.currentDelyIdx = (h.currentDelyIdx + 1) mod currentDelaySize
if (currentTime - h.delayBaseTime > delayBaseUpdateInterval):
h.delayBaseTime = currentTime
h.delayBaseIdx = (h.delayBaseIdx + 1) mod delayBaseHistory
h.delayBaseHistory[h.delayBaseIdx] = sample
h.delayBase = h.delayBaseHistory[0]
for delaySample in h.delayBaseHistory.items():
if (wrapCompareLess(delaySample, h.delayBase)):
h.delayBase = delaySample
proc getValue*(h: DelayHistogram): Duration =
var value = uint32.high
# this will return zero if not all samples are colected
for sample in h.currentDelayHistory:
value = min(sample, value)
microseconds(value)

View File

@ -0,0 +1,98 @@
import
chronos,
./utp_utils
const targetDelay = milliseconds(100)
# explanation from reference impl:
# number of bytes to increase max window size by, per RTT. This is
# scaled down linearly proportional to off_target. i.e. if all packets
# in one window have 0 delay, window size will increase by this number.
# Typically it's less. TCP increases one MSS per RTT, which is 1500
const maxCwndIncreaseBytesPerRtt = 3000
const minWindowSize = 10
proc applyCongestionControl*(
currentMaxWindowSize: uint32,
currentSlowStart: bool,
currentSlowStartTreshold: uint32,
maxSndBufferSize: uint32,
currentPacketSize: uint32,
actualDelay: Duration,
numOfAckedBytes: uint32,
minRtt: Duration,
calculatedDelay: Duration,
clockDrift: int32
): (uint32, uint32, bool) =
if (actualDelay.isZero() or minRtt.isZero() or numOfAckedBytes == 0):
return (currentMaxWindowSize, currentSlowStartTreshold, currentSlowStart)
let ourDelay = min(minRtt, calculatedDelay)
let target = targetDelay
# Rationale from C reference impl:
# this is here to compensate for very large clock drift that affects
# the congestion controller into giving certain endpoints an unfair
# share of the bandwidth. We have an estimate of the clock drift
# (clock_drift). The unit of this is microseconds per 5 seconds.
# empirically, a reasonable cut-off appears to be about 200000
# (which is pretty high). The main purpose is to compensate for
# people trying to "cheat" uTP by making their clock run slower,
# and this definitely catches that without any risk of false positives
# if clock_drift < -200000 start applying a penalty delay proportional
# to how far beoynd -200000 the clock drift is
let clockDriftPenalty: int64 =
if (clockDrift < -200000):
let penalty = (-clockDrift - 200000) div 7
penalty
else:
0
let offTarget = target.microseconds() - (ourDelay.microseconds() + clockDriftPenalty)
# calculations from reference impl:
# double window_factor = (double)min(bytes_acked, max_window) / (double)max(max_window, bytes_acked);
# double delay_factor = off_target / target;
# double scaled_gain = MAX_CWND_INCREASE_BYTES_PER_RTT * window_factor * delay_factor;
let windowFactor = float64(min(numOfAckedBytes, currentMaxWindowSize)) / float64(max(currentMaxWindowSize, numOfAckedBytes))
let delayFactor = float64(offTarget) / float64(target.microseconds())
let scaledGain = maxCwndIncreaseBytesPerRtt * windowFactor * delayFactor
let scaledWindow = float64(currentMaxWindowSize) + scaledGain
let ledbatCwnd: uint32 =
if scaledWindow < minWindowSize:
uint32(minWindowSize)
else:
uint32(scaledWindow)
var newSlowStart = currentSlowStart
var newMaxWindowSize = currentMaxWindowSize
var newSlowStartTreshold = currentSlowStartTreshold
if currentSlowStart:
let slowStartCwnd = currentMaxWindowSize + uint32(windowFactor * float64(currentPacketSize))
if (slowStartCwnd > currentSlowStartTreshold):
newSlowStart = false
elif float64(ourDelay.microseconds()) > float64(target.microseconds()) * 0.9:
# we are just a litte under target delay, discontinute slows start
newSlowStart = false
newSlowStartTreshold = currentMaxWindowSize
else:
newMaxWindowSize = max(slowStartCwnd, ledbatCwnd)
else:
newMaxWindowSize = ledbatCwnd
newMaxWindowSize = clamp(newMaxWindowSize, minWindowSize, maxSndBufferSize)
(newMaxWindowSize, newSlowStartTreshold, newSlowStart)

View File

@ -56,6 +56,9 @@ type
# https://github.com/bittorrent/libutp/blob/master/utp_utils.cpp, to check all the # https://github.com/bittorrent/libutp/blob/master/utp_utils.cpp, to check all the
# timing assumptions on different platforms # timing assumptions on different platforms
proc getMonoTimeTimeStamp*(): uint32 = proc getMonoTimeTimeStamp*(): uint32 =
# this value is equivalent of:
# uint32((Moment.now() - Moment.init(0, Microseconds)).microseconds())
# on macOs
let time = getMonoTime() let time = getMonoTime()
cast[uint32](time.ticks() div 1000) cast[uint32](time.ticks() div 1000)
@ -154,7 +157,7 @@ proc synPacket*(seqNr: uint16, rcvConnectionId: uint16, bufferSize: uint32): Pac
Packet(header: h, payload: @[]) Packet(header: h, payload: @[])
proc ackPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSize: uint32): Packet = proc ackPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSize: uint32, timestampDiff: uint32): Packet =
let h = PacketHeaderV1( let h = PacketHeaderV1(
pType: ST_STATE, pType: ST_STATE,
version: protocolVersion, version: protocolVersion,
@ -162,9 +165,7 @@ proc ackPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSiz
extension: 0'u8, extension: 0'u8,
connectionId: sndConnectionId, connectionId: sndConnectionId,
timestamp: getMonoTimeTimeStamp(), timestamp: getMonoTimeTimeStamp(),
# TODO for not we are using 0, but this value should be calculated on socket timestampDiff: timestampDiff,
# level
timestampDiff: 0'u32,
wndSize: bufferSize, wndSize: bufferSize,
seqNr: seqNr, seqNr: seqNr,
ackNr: ackNr ackNr: ackNr
@ -172,7 +173,14 @@ proc ackPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSiz
Packet(header: h, payload: @[]) Packet(header: h, payload: @[])
proc dataPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSize: uint32, payload: seq[byte]): Packet = proc dataPacket*(
seqNr: uint16,
sndConnectionId: uint16,
ackNr: uint16,
bufferSize: uint32,
payload: seq[byte],
timestampDiff: uint32
): Packet =
let h = PacketHeaderV1( let h = PacketHeaderV1(
pType: ST_DATA, pType: ST_DATA,
version: protocolVersion, version: protocolVersion,
@ -180,9 +188,7 @@ proc dataPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSi
extension: 0'u8, extension: 0'u8,
connectionId: sndConnectionId, connectionId: sndConnectionId,
timestamp: getMonoTimeTimeStamp(), timestamp: getMonoTimeTimeStamp(),
# TODO for not we are using 0, but this value should be calculated on socket timestampDiff: timestampDiff,
# level
timestampDiff: 0'u32,
wndSize: bufferSize, wndSize: bufferSize,
seqNr: seqNr, seqNr: seqNr,
ackNr: ackNr ackNr: ackNr
@ -198,9 +204,9 @@ proc resetPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16): Packet
extension: 0'u8, extension: 0'u8,
connectionId: sndConnectionId, connectionId: sndConnectionId,
timestamp: getMonoTimeTimeStamp(), timestamp: getMonoTimeTimeStamp(),
# TODO for not we are using 0, but this value should be calculated on socket # reset packet informs remote about lack of state for given connection, therefore
# level # we do not inform remote about its delay.
timestampDiff: 0'u32, timestampDiff: 0,
wndSize: 0, wndSize: 0,
seqNr: seqNr, seqNr: seqNr,
ackNr: ackNr ackNr: ackNr
@ -208,7 +214,13 @@ proc resetPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16): Packet
Packet(header: h, payload: @[]) Packet(header: h, payload: @[])
proc finPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSize: uint32): Packet = proc finPacket*(
seqNr: uint16,
sndConnectionId: uint16,
ackNr: uint16,
bufferSize: uint32,
timestampDiff: uint32
): Packet =
let h = PacketHeaderV1( let h = PacketHeaderV1(
pType: ST_FIN, pType: ST_FIN,
version: protocolVersion, version: protocolVersion,
@ -216,9 +228,7 @@ proc finPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSiz
extension: 0'u8, extension: 0'u8,
connectionId: sndConnectionId, connectionId: sndConnectionId,
timestamp: getMonoTimeTimeStamp(), timestamp: getMonoTimeTimeStamp(),
# TODO for not we are using 0, but this value should be calculated on socket timestampDiff: timestampDiff,
# level
timestampDiff: 0'u32,
wndSize: bufferSize, wndSize: bufferSize,
seqNr: seqNr, seqNr: seqNr,
ackNr: ackNr ackNr: ackNr

View File

@ -21,6 +21,9 @@ type SendBufferTracker* = ref object
# remote receive window updated based on packed wndSize field # remote receive window updated based on packed wndSize field
maxRemoteWindow*: uint32 maxRemoteWindow*: uint32
# maximum window size, in bytes, calculated by local congestion controller
maxWindow*: uint32
# configuration option for maxium number of bytes in snd buffer # configuration option for maxium number of bytes in snd buffer
maxSndBufferSize*: uint32 maxSndBufferSize*: uint32
waiters: seq[(uint32, Future[void])] waiters: seq[(uint32, Future[void])]
@ -29,18 +32,20 @@ proc new*(
T: type SendBufferTracker, T: type SendBufferTracker,
currentWindow: uint32, currentWindow: uint32,
maxRemoteWindow: uint32, maxRemoteWindow: uint32,
maxSndBufferSize: uint32): T = maxSndBufferSize: uint32,
maxWindow: uint32): T =
return ( return (
SendBufferTracker( SendBufferTracker(
currentWindow: currentWindow, currentWindow: currentWindow,
maxRemoteWindow: maxRemoteWindow, maxRemoteWindow: maxRemoteWindow,
maxSndBufferSize: maxSndBufferSize, maxSndBufferSize: maxSndBufferSize,
maxWindow: maxWindow,
waiters: @[] waiters: @[]
) )
) )
proc currentFreeBytes*(t: SendBufferTracker): uint32 = proc currentFreeBytes*(t: SendBufferTracker): uint32 =
let maxSend = min(t.maxRemoteWindow, t.maxSndBufferSize) let maxSend = min(min(t.maxRemoteWindow, t.maxSndBufferSize), t.maxWindow)
if (maxSend <= t.currentWindow): if (maxSend <= t.currentWindow):
return 0 return 0
else: else:
@ -65,6 +70,11 @@ proc updateMaxRemote*(t: SendBufferTracker, newRemoteWindow: uint32) =
t.maxRemoteWindow = newRemoteWindow t.maxRemoteWindow = newRemoteWindow
t.checkWaiters() t.checkWaiters()
proc updateMaxWindowSize*(t: SendBufferTracker, newRemoteWindow: uint32, maxWindow: uint32) =
t.maxRemoteWindow = newRemoteWindow
t.maxWindow = maxWindow
t.checkWaiters()
proc decreaseCurrentWindow*(t: SendBufferTracker, value: uint32, notifyWaiters: bool) = proc decreaseCurrentWindow*(t: SendBufferTracker, value: uint32, notifyWaiters: bool) =
doAssert(t.currentWindow >= value) doAssert(t.currentWindow >= value)
t.currentWindow = t.currentWindow - value t.currentWindow = t.currentWindow - value

View File

@ -44,7 +44,11 @@ when isMainModule:
waitFor(sleepAsync(milliseconds(1000))) waitFor(sleepAsync(milliseconds(1000)))
discard waitFor soc.write(bytes) # discard waitFor soc.write(bytes)
# waitFor(sleepAsync(milliseconds(1000)))
# discard waitFor soc.write(bytes)
runForever() runForever()

View File

@ -12,7 +12,12 @@ import
stew/results, stew/results,
./send_buffer_tracker, ./send_buffer_tracker,
./growable_buffer, ./growable_buffer,
./packets ./packets,
./ledbat_congestion_control,
./delay_histogram,
./utp_utils,
./clock_drift_calculator
logScope: logScope:
topics = "utp_socket" topics = "utp_socket"
@ -186,6 +191,25 @@ type
zeroWindowTimer: Moment zeroWindowTimer: Moment
# last measured delay between current local timestamp, and remote sent
# timestamp. In microseconds
replayMicro: uint32
# indicator if we're in slow-start (exponential growth) phase
slowStart: bool
#the slow-start threshold, in bytes
slowStartTreshold: uint32
# history of our delays
ourHistogram: DelayHistogram
# history of remote delays
remoteHistogram: DelayHistogram
# calculator of drifiting between local and remote clocks
driftCalculator: ClockDriftCalculator
# socket identifier # socket identifier
socketKey*: UtpSocketKey[A] socketKey*: UtpSocketKey[A]
@ -255,6 +279,13 @@ const
# Reset period is configured in `SocketConfig` # Reset period is configured in `SocketConfig`
minimalRemoteWindow: uint32 = 1500 minimalRemoteWindow: uint32 = 1500
# Initial max window size. Reference implementation uses value which enables one packet
# to be transfered.
# We use value two times higher as we do not yet have proper mtu estimation, and
# our impl should work over udp and discovery v5 (where proper estmation may be harder
# as packets already have discvoveryv5 envelope)
startMaxWindow* = 2 * mtuSize
reorderBufferMaxSize = 1024 reorderBufferMaxSize = 1024
proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T = proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T =
@ -319,7 +350,8 @@ proc sendAck(socket: UtpSocket): Future[void] =
socket.seqNr, socket.seqNr,
socket.connectionIdSnd, socket.connectionIdSnd,
socket.ackNr, socket.ackNr,
socket.getRcvWindowSize() socket.getRcvWindowSize(),
socket.replayMicro
) )
socket.sendData(encodePacket(ackPacket)) socket.sendData(encodePacket(ackPacket))
@ -405,7 +437,28 @@ proc checkTimeouts(socket: UtpSocket) {.async.} =
socket.retransmitTimeout = newTimeout socket.retransmitTimeout = newTimeout
socket.rtoTimeout = currentTime + newTimeout socket.rtoTimeout = currentTime + newTimeout
# TODO Add handling of congestion control let currentPacketSize = uint32(socket.getPacketSize())
if (socket.curWindowPackets == 0 and socket.sendBufferTracker.maxWindow > currentPacketSize):
# there are no packets in flight even though there is place for more than whole packet
# this means connection is just idling. Reset window by 1/3'rd but no more
# than to fit at least one packet.
let oldMaxWindow = socket.sendBufferTracker.maxWindow
let newMaxWindow = max((oldMaxWindow * 2) div 3, currentPacketSize)
socket.sendBufferTracker.updateMaxWindowSize(
# maxRemote window does not change
socket.sendBufferTracker.maxRemoteWindow,
newMaxWindow
)
else:
# delay was so high that window has shrunk below one packet. Reset window
# to fit a least one packet and start with slow start
socket.sendBufferTracker.updateMaxWindowSize(
# maxRemote window does not change
socket.sendBufferTracker.maxRemoteWindow,
currentPacketSize
)
socket.slowStart = true
# This will have much more sense when we will add handling of selective acks # This will have much more sense when we will add handling of selective acks
# as then every selecivly acked packet restes timeout timer and removes packet # as then every selecivly acked packet restes timeout timer and removes packet
@ -472,7 +525,15 @@ proc handleDataWrite(socket: UtpSocket, data: seq[byte], writeFut: Future[WriteR
if socket.curWindowPackets == 0: if socket.curWindowPackets == 0:
socket.resetSendTimeout() socket.resetSendTimeout()
let dataPacket = dataPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, wndSize, dataSlice) let dataPacket =
dataPacket(
socket.seqNr,
socket.connectionIdSnd,
socket.ackNr,
wndSize,
dataSlice,
socket.replayMicro
)
let outgoingPacket = OutgoingPacket.init(encodePacket(dataPacket), 1, false, payloadLength) let outgoingPacket = OutgoingPacket.init(encodePacket(dataPacket), 1, false, payloadLength)
socket.registerOutgoingPacket(outgoingPacket) socket.registerOutgoingPacket(outgoingPacket)
await socket.sendData(outgoingPacket.packetBytes) await socket.sendData(outgoingPacket.packetBytes)
@ -500,7 +561,16 @@ proc handleClose(socket: UtpSocket): Future[void] {.async.} =
if socket.curWindowPackets == 0: if socket.curWindowPackets == 0:
socket.resetSendTimeout() socket.resetSendTimeout()
let finEncoded = encodePacket(finPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, socket.getRcvWindowSize())) let finEncoded =
encodePacket(
finPacket(
socket.seqNr,
socket.connectionIdSnd,
socket.ackNr,
socket.getRcvWindowSize(),
socket.replayMicro
)
)
socket.registerOutgoingPacket(OutgoingPacket.init(finEncoded, 1, true, 0)) socket.registerOutgoingPacket(OutgoingPacket.init(finEncoded, 1, true, 0))
await socket.sendData(finEncoded) await socket.sendData(finEncoded)
socket.finSent = true socket.finSent = true
@ -543,6 +613,7 @@ proc new[A](
initialAckNr: uint16, initialAckNr: uint16,
initialTimeout: Duration initialTimeout: Duration
): T = ): T =
let currentTime = Moment.now()
T( T(
remoteAddress: to, remoteAddress: to,
state: state, state: state,
@ -556,7 +627,7 @@ proc new[A](
outBuffer: GrowableCircularBuffer[OutgoingPacket].init(), outBuffer: GrowableCircularBuffer[OutgoingPacket].init(),
inBuffer: GrowableCircularBuffer[Packet].init(), inBuffer: GrowableCircularBuffer[Packet].init(),
retransmitTimeout: initialTimeout, retransmitTimeout: initialTimeout,
rtoTimeout: Moment.now() + initialTimeout, rtoTimeout: currentTime + initialTimeout,
# Initial timeout values taken from reference implemntation # Initial timeout values taken from reference implemntation
rtt: milliseconds(0), rtt: milliseconds(0),
rttVar: milliseconds(800), rttVar: milliseconds(800),
@ -565,11 +636,16 @@ proc new[A](
closeEvent: newAsyncEvent(), closeEvent: newAsyncEvent(),
closeCallbacks: newSeq[Future[void]](), closeCallbacks: newSeq[Future[void]](),
# start with 1mb assumption, field will be updated with first received packet # start with 1mb assumption, field will be updated with first received packet
sendBufferTracker: SendBufferTracker.new(0, 1024 * 1024, cfg.optSndBuffer), sendBufferTracker: SendBufferTracker.new(0, 1024 * 1024, cfg.optSndBuffer, startMaxWindow),
# queue with infinite size # queue with infinite size
writeQueue: newAsyncQueue[WriteRequest](), writeQueue: newAsyncQueue[WriteRequest](),
zeroWindowTimer: Moment.now() + cfg.remoteWindowResetTimeout, zeroWindowTimer: currentTime + cfg.remoteWindowResetTimeout,
socketKey: UtpSocketKey.init(to, rcvId), socketKey: UtpSocketKey.init(to, rcvId),
slowStart: true,
slowStartTreshold: cfg.optSndBuffer,
ourHistogram: DelayHistogram.init(currentTime),
remoteHistogram: DelayHistogram.init(currentTime),
driftCalculator: ClockDriftCalculator.init(currentTime),
send: snd send: snd
) )
@ -682,12 +758,6 @@ proc setCloseCallback(s: UtpSocket, cb: SocketCloseCallback) {.async.} =
proc registerCloseCallback*(s: UtpSocket, cb: SocketCloseCallback) = proc registerCloseCallback*(s: UtpSocket, cb: SocketCloseCallback) =
s.closeCallbacks.add(s.setCloseCallback(cb)) s.closeCallbacks.add(s.setCloseCallback(cb))
proc max(a, b: Duration): Duration =
if (a > b):
a
else:
b
proc updateTimeouts(socket: UtpSocket, timeSent: Moment, currentTime: Moment) = proc updateTimeouts(socket: UtpSocket, timeSent: Moment, currentTime: Moment) =
## Update timeouts according to spec: ## Update timeouts according to spec:
## delta = rtt - packet_rtt ## delta = rtt - packet_rtt
@ -769,20 +839,31 @@ proc ackPackets(socket: UtpSocket, nrPacketsToAck: uint16) =
inc i inc i
proc calculateAckedbytes(socket: UtpSocket, nrPacketsToAck: uint16, now: Moment): (uint32, Duration) =
var i: uint16 = 0
var ackedBytes: uint32 = 0
var minRtt: Duration = InfiniteDuration
while i < nrPacketsToack:
let seqNr = socket.seqNr - socket.curWindowPackets + i
let packetOpt = socket.outBuffer.get(seqNr)
if (packetOpt.isSome() and packetOpt.unsafeGet().transmissions > 0):
let packet = packetOpt.unsafeGet()
ackedBytes = ackedBytes + packet.payloadLength
# safety check in case clock is not monotonic
if packet.timeSent < now:
minRtt = min(minRtt, now - packet.timeSent)
else:
minRtt = min(minRtt, microseconds(50000))
inc i
(ackedBytes, minRtt)
proc initializeAckNr(socket: UtpSocket, packetSeqNr: uint16) = proc initializeAckNr(socket: UtpSocket, packetSeqNr: uint16) =
if (socket.state == SynSent): if (socket.state == SynSent):
socket.ackNr = packetSeqNr - 1 socket.ackNr = packetSeqNr - 1
# compare if lhs is less than rhs, taking wrapping
# into account. i.e high(lhs) < 0 == true
proc wrapCompareLess(lhs: uint16, rhs:uint16): bool =
let distDown = (lhs - rhs)
let distUp = (rhs - lhs)
# if the distance walking up is shorter, lhs
# is less than rhs. If the distance walking down
# is shorter, then rhs is less than lhs
return distUp < distDown
proc isAckNrInvalid(socket: UtpSocket, packet: Packet): bool = proc isAckNrInvalid(socket: UtpSocket, packet: Packet): bool =
let ackWindow = max(socket.curWindowPackets + allowedAckWindow, allowedAckWindow) let ackWindow = max(socket.curWindowPackets + allowedAckWindow, allowedAckWindow)
( (
@ -802,6 +883,7 @@ proc isAckNrInvalid(socket: UtpSocket, packet: Packet): bool =
# to scheduler which means there could be potentialy several processPacket procs # to scheduler which means there could be potentialy several processPacket procs
# running # running
proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
let receiptTime = Moment.now()
if socket.isAckNrInvalid(p): if socket.isAckNrInvalid(p):
notice "Received packet with invalid ack nr" notice "Received packet with invalid ack nr"
@ -834,15 +916,74 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
notice "Received packet is totally of the mark" notice "Received packet is totally of the mark"
return return
# update remote window size var (ackedBytes, minRtt) = socket.calculateAckedbytes(acks, receiptTime)
socket.sendBufferTracker.updateMaxRemote(p.header.wndSize) # TODO caluclate bytes acked by selective acks here (if thats the case)
let receiptTimestamp = getMonoTimeTimeStamp()
let sentTimeRemote = p.header.timestamp
# we are using uint32 not a Duration, to wrap a round in case of
# sentTimeRemote > receipTimestamp. This can happen as local and remote
# clock can be not synchornized or even using different system clock.
# i.e this number itself does not tell anything and is only used to feedback it
# to remote peer with each sent packet
let remoteDelay =
if (sentTimeRemote == 0):
0'u32
else:
receiptTimestamp - sentTimeRemote
socket.replayMicro = remoteDelay
let prevRemoteDelayBase = socket.remoteHistogram.delayBase
if (remoteDelay != 0):
socket.remoteHistogram.addSample(remoteDelay, receiptTime)
# remote new delay base is less than previous
# shift our delay base in other direction to take clock skew into account
# but no more than 10ms
if (prevRemoteDelayBase != 0 and
wrapCompareLess(socket.remoteHistogram.delayBase, prevRemoteDelayBase) and
prevRemoteDelayBase - socket.remoteHistogram.delayBase <= 10000'u32):
socket.ourHistogram.shift(prevRemoteDelayBase - socket.remoteHistogram.delayBase)
let actualDelay = p.header.timestampDiff
if actualDelay != 0:
socket.ourHistogram.addSample(actualDelay, receiptTime)
socket.driftCalculator.addSample(actualDelay, receiptTime)
# adjust base delay if delay estimates exceeds rtt
if (socket.ourHistogram.getValue() > minRtt):
let diff = uint32((socket.ourHistogram.getValue() - minRtt).microseconds())
socket.ourHistogram.shift(diff)
let (newMaxWindow, newSlowStartTreshold, newSlowStart) =
applyCongestionControl(
socket.sendBufferTracker.maxWindow,
socket.slowStart,
socket.slowStartTreshold,
socket.socketConfig.optSndBuffer,
uint32(socket.getPacketSize()),
microseconds(actualDelay),
ackedBytes,
minRtt,
socket.ourHistogram.getValue(),
socket.driftCalculator.clockDrift
)
# update remote window size and max window
socket.sendBufferTracker.updateMaxWindowSize(p.header.wndSize, newMaxWindow)
socket.slowStart = newSlowStart
socket.slowStartTreshold = newSlowStartTreshold
if (socket.sendBufferTracker.maxRemoteWindow == 0): if (socket.sendBufferTracker.maxRemoteWindow == 0):
# when zeroWindowTimer will be hit and maxRemoteWindow still will be equal to 0 # when zeroWindowTimer will be hit and maxRemoteWindow still will be equal to 0
# then it will be reset to minimal value # then it will be reset to minimal value
socket.zeroWindowTimer = Moment.now() + socket.socketConfig.remoteWindowResetTimeout socket.zeroWindowTimer = Moment.now() + socket.socketConfig.remoteWindowResetTimeout
# socket.curWindowPackets == acks means that this packet acked all remaining packets # socket.curWindowPackets == acks means that this packet acked all remaining packets
# including the sent fin packets # including the sent fin packets
if (socket.finSent and socket.curWindowPackets == acks): if (socket.finSent and socket.curWindowPackets == acks):
@ -855,7 +996,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
socket.destroy() socket.destroy()
socket.ackPackets(acks) socket.ackPackets(acks)
case p.header.pType case p.header.pType
of ST_DATA, ST_FIN: of ST_DATA, ST_FIN:
# To avoid amplification attacks, server socket is in SynRecv state until # To avoid amplification attacks, server socket is in SynRecv state until
@ -1109,3 +1250,7 @@ proc connectionId*[A](socket: UtpSocket[A]): uint16 =
socket.connectionIdSnd socket.connectionIdSnd
of Outgoing: of Outgoing:
socket.connectionIdRcv socket.connectionIdRcv
# Check what is current available window size for this socket
proc currentMaxWindowSize*[A](socket: UtpSocket[A]): uint32 =
socket.sendBufferTracker.maxWindow

38
eth/utp/utp_utils.nim Normal file
View File

@ -0,0 +1,38 @@
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
chronos
# compare if lhs is less than rhs, taking wrapping
# into account. i.e high(lhs) < 0 == true
proc wrapCompareLess*(lhs: uint32, rhs: uint32): bool =
let distDown = (lhs - rhs)
let distUp = (rhs - lhs)
# if the distance walking up is shorter, lhs
# is less than rhs. If the distance walking down
# is shorter, then rhs is less than lhs
return distUp < distDown
proc wrapCompareLess*(lhs: uint16, rhs: uint16): bool =
let distDown = (lhs - rhs)
let distUp = (rhs - lhs)
return distUp < distDown
proc max*(a, b: Duration): Duration =
if (a > b):
a
else:
b
proc min*(a, b: Duration): Duration =
if (a < b):
a
else:
b

View File

@ -0,0 +1,71 @@
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.used.}
import
chronos,
unittest,
../../eth/utp/clock_drift_calculator
suite "Clock drift calculator":
test "Initial clock drift should be 0":
let currentTime = Moment.now()
let calculator = ClockDriftCalculator.init(currentTime)
check:
calculator.clockDrift == 0
test "Adding samples should not update averages if 5s did not pass":
let currentTime = Moment.now()
var calculator = ClockDriftCalculator.init(currentTime)
calculator.addSample(10, currentTime + seconds(1))
calculator.addSample(10, currentTime + seconds(2))
check:
calculator.clockDrift == 0
calculator.lastClockDrift == 0
test "Clock drift should be calculated in relation to first sample":
let currentTime = Moment.now()
var calculator = ClockDriftCalculator.init(currentTime)
# first sample which will be treated as a base sample
calculator.addSample(10, currentTime + seconds(3))
# second sample in the first inteval it will be treated in relation to first one
# so correct first drift should be: (50 - 10) / 2 == 20
calculator.addSample(50, currentTime + seconds(6))
check:
calculator.clockDrift == 2
calculator.lastClockDrift == 20
test "Clock drift should properly calcuated when clock drifts to two sides":
let currentTime = Moment.now()
var calculator1 = ClockDriftCalculator.init(currentTime)
var calculator2 = ClockDriftCalculator.init(currentTime)
# first sample which will be treated as a base sample
calculator1.addSample(10, currentTime + seconds(3))
# second sample in the first inteval it will be treated in relation to first one
# so correct first drift should be: (50 - 10) / 2 == 20
calculator1.addSample(50, currentTime + seconds(6))
# first sample which will be treated as a base sample
calculator2.addSample(50, currentTime + seconds(3))
# second sample in the first inteval it will be treated in relation to first one
# so correct first drift should be: (10 - 50) / 2 == -20
calculator2.addSample(10, currentTime + seconds(6))
check:
calculator1.clockDrift == -calculator2.clockDrift
calculator1.lastClockDrift == -calculator2.lastClockDrift

View File

@ -421,3 +421,74 @@ procSuite "Utp protocol over udp tests":
await utpProt1.shutdownWait() await utpProt1.shutdownWait()
await utpProt2.shutdownWait() await utpProt2.shutdownWait()
await utpProt3.shutdownWait() await utpProt3.shutdownWait()
asyncTest "Success data transfer of a lot of data should increase available window on sender side":
let s = await initClientServerScenario()
check:
s.clientSocket.isConnected()
# initially window has value equal to some pre configured constant
s.clientSocket.currentMaxWindowSize == startMaxWindow
# after successful connection outgoing buffer should be empty as syn packet
# should be correctly acked
s.clientSocket.numPacketsInOutGoingBuffer() == 0
(not s.serverSocket.isConnected())
# big transfer of 50kb
let bytesToTransfer = generateByteArray(rng[], 50000)
let bytesReceivedFromClient = await transferData(s.clientSocket, s.serverSocket, bytesToTransfer)
# ultimatly all send packets will acked, and outgoing buffer will be empty
await waitUntil(proc (): bool = s.clientSocket.numPacketsInOutGoingBuffer() == 0)
check:
# we can only assert that window has grown, becouse specific values depends on
# particual timings
s.clientSocket.currentMaxWindowSize > startMaxWindow
s.serverSocket.isConnected()
s.clientSocket.numPacketsInOutGoingBuffer() == 0
bytesReceivedFromClient == bytesToTransfer
await s.close()
asyncTest "Not used socket should decay its max send window":
let s = await initClientServerScenario()
check:
s.clientSocket.isConnected()
# initially window has value equal to some pre configured constant
s.clientSocket.currentMaxWindowSize == startMaxWindow
# after successful connection outgoing buffer should be empty as syn packet
# should be correctly acked
s.clientSocket.numPacketsInOutGoingBuffer() == 0
(not s.serverSocket.isConnected())
# big transfer of 50kb
let bytesToTransfer = generateByteArray(rng[], 50000)
let bytesReceivedFromClient = await transferData(s.clientSocket, s.serverSocket, bytesToTransfer)
# ultimatly all send packets will acked, and outgoing buffer will be empty
await waitUntil(proc (): bool = s.clientSocket.numPacketsInOutGoingBuffer() == 0)
let maximumMaxWindow = s.clientSocket.currentMaxWindowSize
check:
# we can only assert that window has grown, becouse specific values depends on
# particual timings
maximumMaxWindow > startMaxWindow
s.serverSocket.isConnected()
s.clientSocket.numPacketsInOutGoingBuffer() == 0
bytesReceivedFromClient == bytesToTransfer
# wait long enough to trigger timeout
await sleepAsync(seconds(5))
check:
# window should decay when idle
s.clientSocket.currentMaxWindowSize < maximumMaxWindow
await s.close()

View File

@ -60,7 +60,14 @@ procSuite "Utp router unit tests":
check: check:
initialPacket.header.pType == ST_SYN initialPacket.header.pType == ST_SYN
let responseAck = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize) let responseAck =
ackPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
testBufferSize,
0
)
await router.processIncomingBytes(encodePacket(responseAck), remote) await router.processIncomingBytes(encodePacket(responseAck), remote)
@ -167,7 +174,17 @@ procSuite "Utp router unit tests":
# socket is not configured to be connected until receiving data # socket is not configured to be connected until receiving data
not socket.isConnected() not socket.isConnected()
let encodedData = encodePacket(dataPacket(initSeq + 1, initConnId + 1, initialPacket.header.seqNr - 1, 10, dataToSend)) let encodedData =
encodePacket(
dataPacket(
initSeq + 1,
initConnId + 1,
initialPacket.header.seqNr - 1,
10,
dataToSend,
0
)
)
await router.processIncomingBytes(encodedData, testSender) await router.processIncomingBytes(encodedData, testSender)
@ -255,7 +272,14 @@ procSuite "Utp router unit tests":
# connection id of syn packet should be set to requested connection id # connection id of syn packet should be set to requested connection id
initialPacket.header.connectionId == requestedConnectionId initialPacket.header.connectionId == requestedConnectionId
let responseAck = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize) let responseAck =
ackPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
testBufferSize,
0
)
await router.processIncomingBytes(encodePacket(responseAck), testSender2) await router.processIncomingBytes(encodePacket(responseAck), testSender2)
@ -355,7 +379,7 @@ procSuite "Utp router unit tests":
router.sendCb = initTestSnd(pq) router.sendCb = initTestSnd(pq)
let sndId = 10'u16 let sndId = 10'u16
let dp = dataPacket(10'u16, sndId, 10'u16, 10'u32, @[1'u8]) let dp = dataPacket(10'u16, sndId, 10'u16, 10'u32, @[1'u8], 0)
await router.processIncomingBytes(encodePacket(dp), testSender2) await router.processIncomingBytes(encodePacket(dp), testSender2)

View File

@ -44,7 +44,8 @@ procSuite "Utp socket unit test":
connectionId, connectionId,
ackNr, ackNr,
testBufferSize, testBufferSize,
generateByteArray(rng, packetSize) generateByteArray(rng, packetSize),
0
) )
packets.add(packet) packets.add(packet)
@ -70,7 +71,14 @@ procSuite "Utp socket unit test":
check: check:
initialPacket.header.pType == ST_SYN initialPacket.header.pType == ST_SYN
let responseAck = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, remoteReceiveBuffer) let responseAck =
ackPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
remoteReceiveBuffer,
0
)
await sock1.processPacket(responseAck) await sock1.processPacket(responseAck)
@ -151,7 +159,15 @@ procSuite "Utp socket unit test":
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q) let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q)
let dataP1 = dataPacket(initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data) let dataP1 =
dataPacket(
initalRemoteSeqNr,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
testBufferSize,
data,
0
)
await outgoingSocket.processPacket(dataP1) await outgoingSocket.processPacket(dataP1)
let ack1 = await q.get() let ack1 = await q.get()
@ -305,7 +321,14 @@ procSuite "Utp socket unit test":
# ackNr in state packet, is set to sentPacket.header.seqNr which means remote # ackNr in state packet, is set to sentPacket.header.seqNr which means remote
# side processed out packet # side processed out packet
let responseAck = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, sentPacket.header.seqNr, testBufferSize) let responseAck =
ackPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
sentPacket.header.seqNr,
testBufferSize,
0
)
await outgoingSocket.processPacket(responseAck) await outgoingSocket.processPacket(responseAck)
@ -365,7 +388,14 @@ procSuite "Utp socket unit test":
# user decided to cancel second write # user decided to cancel second write
await writeFut2.cancelAndWait() await writeFut2.cancelAndWait()
# remote increased wnd size enough for all writes # remote increased wnd size enough for all writes
let someAckFromRemote = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, 10) let someAckFromRemote =
ackPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
10,
0
)
await outgoingSocket.processPacket(someAckFromRemote) await outgoingSocket.processPacket(someAckFromRemote)
@ -407,7 +437,14 @@ procSuite "Utp socket unit test":
check: check:
initialPacket.header.pType == ST_SYN initialPacket.header.pType == ST_SYN
let responseAck = ackPacket(initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize) let responseAck =
ackPacket(
initalRemoteSeqNr,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
testBufferSize,
0
)
await outgoingSocket.processPacket(responseAck) await outgoingSocket.processPacket(responseAck)
@ -460,7 +497,14 @@ procSuite "Utp socket unit test":
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q) let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q)
let finP = finPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize) let finP =
finPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
testBufferSize,
0
)
await outgoingSocket.processPacket(finP) await outgoingSocket.processPacket(finP)
let ack1 = await q.get() let ack1 = await q.get()
@ -481,10 +525,34 @@ procSuite "Utp socket unit test":
let readF = outgoingSocket.read() let readF = outgoingSocket.read()
let dataP = dataPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data) let dataP =
let dataP1 = dataPacket(initialRemoteSeq + 1, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data1) dataPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
testBufferSize,
data,
0
)
let finP = finPacket(initialRemoteSeq + 2, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize) let dataP1 =
dataPacket(
initialRemoteSeq + 1,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
testBufferSize,
data1,
0
)
let finP =
finPacket(
initialRemoteSeq + 2,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
testBufferSize,
0
)
await outgoingSocket.processPacket(finP) await outgoingSocket.processPacket(finP)
@ -519,13 +587,36 @@ procSuite "Utp socket unit test":
let readF = outgoingSocket.read() let readF = outgoingSocket.read()
let dataP = dataPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data) let dataP =
dataPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
testBufferSize,
data,
0
)
let finP = finPacket(initialRemoteSeq + 1, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize) let finP =
finPacket(
initialRemoteSeq + 1,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
testBufferSize,
0
)
# dataP1 has seqNr larger than fin, there fore it should be considered past eof and never passed # dataP1 has seqNr larger than fin, there fore it should be considered past eof and never passed
# to user of library # to user of library
let dataP1 = dataPacket(initialRemoteSeq + 2, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data1) let dataP1 =
dataPacket(
initialRemoteSeq + 2,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
testBufferSize,
data1,
0
)
await outgoingSocket.processPacket(finP) await outgoingSocket.processPacket(finP)
@ -583,7 +674,14 @@ procSuite "Utp socket unit test":
check: check:
sendFin.header.pType == ST_FIN sendFin.header.pType == ST_FIN
let responseAck = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, sendFin.header.seqNr, testBufferSize) let responseAck =
ackPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
sendFin.header.seqNr,
testBufferSize,
0
)
await outgoingSocket.processPacket(responseAck) await outgoingSocket.processPacket(responseAck)
@ -639,7 +737,15 @@ procSuite "Utp socket unit test":
let sCfg = SocketConfig.init(optRcvBuffer = initialRcvBufferSize) let sCfg = SocketConfig.init(optRcvBuffer = initialRcvBufferSize)
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeqNr, q, testBufferSize, sCfg) let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeqNr, q, testBufferSize, sCfg)
let dataP1 = dataPacket(initialRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data) let dataP1 =
dataPacket(
initialRemoteSeqNr,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
testBufferSize,
data,
0
)
await outgoingSocket.processPacket(dataP1) await outgoingSocket.processPacket(dataP1)
@ -676,7 +782,15 @@ procSuite "Utp socket unit test":
let sCfg = SocketConfig.init(optRcvBuffer = initialRcvBufferSize) let sCfg = SocketConfig.init(optRcvBuffer = initialRcvBufferSize)
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q, testBufferSize, sCfg) let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q, testBufferSize, sCfg)
let dataP1 = dataPacket(initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data) let dataP1 =
dataPacket(
initalRemoteSeqNr,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
testBufferSize,
data,
0
)
await outgoingSocket.processPacket(dataP1) await outgoingSocket.processPacket(dataP1)
@ -714,11 +828,35 @@ procSuite "Utp socket unit test":
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q) let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q)
# data packet with ack nr set above our seq nr i.e packet from the future # data packet with ack nr set above our seq nr i.e packet from the future
let dataFuture = dataPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr + 1, testBufferSize, data1) let dataFuture =
dataPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
initialPacket.header.seqNr + 1,
testBufferSize,
data1,
0
)
# data packet wth ack number set below out ack window i.e packet too old # data packet wth ack number set below out ack window i.e packet too old
let dataTooOld = dataPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr - allowedAckWindow - 1, testBufferSize, data2) let dataTooOld =
dataPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
initialPacket.header.seqNr - allowedAckWindow - 1,
testBufferSize,
data2,
0
)
let dataOk = dataPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data3) let dataOk =
dataPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
testBufferSize,
data3,
0
)
await outgoingSocket.processPacket(dataFuture) await outgoingSocket.processPacket(dataFuture)
await outgoingSocket.processPacket(dataTooOld) await outgoingSocket.processPacket(dataTooOld)
@ -773,7 +911,14 @@ procSuite "Utp socket unit test":
check: check:
int(outgoingSocket.numOfBytesInFlight) == len(dataToWrite) + len(dataToWrite) int(outgoingSocket.numOfBytesInFlight) == len(dataToWrite) + len(dataToWrite)
let responseAck = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, sentPacket.header.seqNr, testBufferSize) let responseAck =
ackPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
sentPacket.header.seqNr,
testBufferSize,
0
)
await outgoingSocket.processPacket(responseAck) await outgoingSocket.processPacket(responseAck)
@ -839,7 +984,14 @@ procSuite "Utp socket unit test":
check: check:
not writeFut.finished() not writeFut.finished()
let someAckFromRemote = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, uint32(len(dataToWrite))) let someAckFromRemote =
ackPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
uint32(len(dataToWrite)),
0
)
await outgoingSocket.processPacket(someAckFromRemote) await outgoingSocket.processPacket(someAckFromRemote)
@ -862,7 +1014,14 @@ procSuite "Utp socket unit test":
# remote is initialized with buffer to small to handle whole payload # remote is initialized with buffer to small to handle whole payload
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q) let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q)
let remoteRcvWindowSize = uint32(outgoingSocket.getPacketSize()) let remoteRcvWindowSize = uint32(outgoingSocket.getPacketSize())
let someAckFromRemote = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, remoteRcvWindowSize) let someAckFromRemote =
ackPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
remoteRcvWindowSize,
0
)
# we are using ack from remote to setup our snd window size to one packet size on one packet # we are using ack from remote to setup our snd window size to one packet size on one packet
await outgoingSocket.processPacket(someAckFromRemote) await outgoingSocket.processPacket(someAckFromRemote)
@ -890,7 +1049,14 @@ procSuite "Utp socket unit test":
# remote is initialized with buffer to small to handle whole payload # remote is initialized with buffer to small to handle whole payload
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q) let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q)
let remoteRcvWindowSize = uint32(outgoingSocket.getPacketSize()) let remoteRcvWindowSize = uint32(outgoingSocket.getPacketSize())
let someAckFromRemote = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, remoteRcvWindowSize) let someAckFromRemote =
ackPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
remoteRcvWindowSize,
0
)
# we are using ack from remote to setup our snd window size to one packet size on one packet # we are using ack from remote to setup our snd window size to one packet size on one packet
await outgoingSocket.processPacket(someAckFromRemote) await outgoingSocket.processPacket(someAckFromRemote)
@ -899,7 +1065,14 @@ procSuite "Utp socket unit test":
let writeFut = outgoingSocket.write(twoPacketData) let writeFut = outgoingSocket.write(twoPacketData)
let firstAckFromRemote = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr + 1, remoteRcvWindowSize) let firstAckFromRemote =
ackPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
initialPacket.header.seqNr + 1,
remoteRcvWindowSize,
0
)
let packet = await q.get() let packet = await q.get()
@ -988,7 +1161,14 @@ procSuite "Utp socket unit test":
# this write still cannot progress as 1st write is not acked # this write still cannot progress as 1st write is not acked
not writeFut2.finished() not writeFut2.finished()
let someAckFromRemote = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr + 1, 10) let someAckFromRemote =
ackPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
initialPacket.header.seqNr + 1,
10,
0
)
# acks first write, so there is space in buffer for new data and second # acks first write, so there is space in buffer for new data and second
# write should progress # write should progress
@ -1004,3 +1184,33 @@ procSuite "Utp socket unit test":
secondPacket.payload == somedata2 secondPacket.payload == somedata2
await outgoingSocket.destroyWait() await outgoingSocket.destroyWait()
asyncTest "Socket should inform remote about its delay":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q)
let dataP1 =
dataPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
testBufferSize,
@[1'u8],
0
)
check:
outgoingSocket.isConnected()
# necessary to avoid timestampDiff near 0 and flaky tests
await sleepAsync(milliseconds(50))
await outgoingSocket.processPacket(dataP1)
let socketAck = await q.get()
check:
socketAck.header.timestampDiff > 0
await outgoingSocket.destroyWait()