diff --git a/eth/utp/clock_drift_calculator.nim b/eth/utp/clock_drift_calculator.nim new file mode 100644 index 0000000..2a9eec3 --- /dev/null +++ b/eth/utp/clock_drift_calculator.nim @@ -0,0 +1,91 @@ +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [Defect].} + +import + chronos + +const + # how long do we collect samples before calculating average + averageTime = seconds(5) + +# calculates 5s rolling average of incoming delays, which represent clock drift. +type ClockDriftCalculator* = object + # average of all delay samples compared to initial one. Average is done over + # 5s + averageDelay: int32 + # sum of all recent delay samples. All samples are relative to first sample + # averageDelayBase + currentDelaySum: int64 + # number if samples in sum + currentDelaySamples: int + # set to first sample, all further samples are taken in relative to this one + averageDelayBase: uint32 + # next time we should average samples + averageSampleTime: Moment + # estimated clock drift in microseconds per 5 seconds + clockDrift*: int32 + + # last calculated drift + lastClockDrift*: int32 + +proc init*(T: type ClockDriftCalculator, currentTime: Moment): T = + T( + averageSampleTime: currentTime + averageTime + ) + +proc addSample*(c: var ClockDriftCalculator, actualDelay: uint32, currentTime: Moment) = + if (actualDelay == 0): + return + + # this is our first sample, initialise our delay base + if c.averageDelayBase == 0: + c.averageDelayBase = actualDelay + + let distDown = c.averageDelayBase - actualDelay + + let distUp = actualDelay - c.averageDelayBase + + let averageDelaySample = + if (distDown > distUp): + # averageDelayBase is smaller that actualDelay, sample should be positive + int64(distUp) + else: + # averageDelayBase is bigger or equal to actualDelay, sample should be negative + -int64(distDown) + + c.currentDelaySum = c.currentDelaySum + averageDelaySample + inc c.currentDelaySamples + + if (currentTime > c.averageSampleTime): + # it is time to average our samples + var prevAverageDelay = c.averageDelay + c.averageDelay = int32(c.currentDelaySum div c.currentDelaySamples) + c.averageSampleTime = c.averageSampleTime + averageTime + c.currentDelaySum = 0 + c.currentDelaySamples = 0 + + # normalize average samples + let minSample = min(prevAverageDelay, c.averageDelay) + let maxSample = max(prevAverageDelay, c.averageDelay) + + var adjust = 0 + + if (minSample > 0): + adjust = -minSample + elif (maxSample < 0): + adjust = -maxSample + + if (adjust != 0): + c.averageDelayBase = c.averageDelayBase - uint32(adjust) + c.averageDelay = c.averageDelay + int32(adjust) + prevAverageDelay = prevAverageDelay + int32(adjust) + + let drift = c.averageDelay - prevAverageDelay + # rolling average + c.clockDrift = int32((int64(c.clockDrift) * 7 + drift) div 8) + c.lastClockDrift = drift diff --git a/eth/utp/delay_histogram.nim b/eth/utp/delay_histogram.nim new file mode 100644 index 0000000..7a4875d --- /dev/null +++ b/eth/utp/delay_histogram.nim @@ -0,0 +1,71 @@ +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [Defect].} + +import + chronos, + ./utp_utils + +const + currentDelaySize = 3 + delayBaseHistory = 13 + delayBaseUpdateInterval = minutes(1) + +type + DelayHistogram* = object + delayBase*: uint32 + currentDelayHistory: array[currentDelaySize, uint32] + currentDelyIdx: int + delayBaseHistory: array[delayBaseHistory, uint32] + delayBaseIdx: int + delayBaseTime: Moment + +proc init*(T: type DelayHistogram, currentTime: Moment): T = + DelayHistogram( + delayBaseTime: currentTime + ) + +proc shift*(h: var DelayHistogram, offset: uint32) = + for sample in h.delayBaseHistory.mitems(): + sample = sample + offset + h.delayBase = h.delayBase + offset + +proc addSample*(h: var DelayHistogram, sample: uint32, currentTime: Moment) = + # if delay base is zero it means it is our first sample. Initialize necessary parts + if h.delayBase == 0: + h.delayBase = sample + for i in h.delayBaseHistory.mitems(): + i = sample + + if wrapCompareLess(sample, h.delayBaseHistory[h.delayBaseIdx]): + h.delayBaseHistory[h.delayBaseIdx] = sample + + if wrapCompareLess(sample, h.delayBase): + h.delay_base = sample + + let delay = sample - h.delayBase + + h.currentDelayHistory[h.currentDelyIdx] = delay + h.currentDelyIdx = (h.currentDelyIdx + 1) mod currentDelaySize + + if (currentTime - h.delayBaseTime > delayBaseUpdateInterval): + h.delayBaseTime = currentTime + h.delayBaseIdx = (h.delayBaseIdx + 1) mod delayBaseHistory + h.delayBaseHistory[h.delayBaseIdx] = sample + h.delayBase = h.delayBaseHistory[0] + + for delaySample in h.delayBaseHistory.items(): + if (wrapCompareLess(delaySample, h.delayBase)): + h.delayBase = delaySample + +proc getValue*(h: DelayHistogram): Duration = + var value = uint32.high + # this will return zero if not all samples are colected + for sample in h.currentDelayHistory: + value = min(sample, value) + + microseconds(value) diff --git a/eth/utp/ledbat_congestion_control.nim b/eth/utp/ledbat_congestion_control.nim new file mode 100644 index 0000000..49baef4 --- /dev/null +++ b/eth/utp/ledbat_congestion_control.nim @@ -0,0 +1,98 @@ +import + chronos, + ./utp_utils + + +const targetDelay = milliseconds(100) + +# explanation from reference impl: +# number of bytes to increase max window size by, per RTT. This is +# scaled down linearly proportional to off_target. i.e. if all packets +# in one window have 0 delay, window size will increase by this number. +# Typically it's less. TCP increases one MSS per RTT, which is 1500 +const maxCwndIncreaseBytesPerRtt = 3000 + +const minWindowSize = 10 + +proc applyCongestionControl*( + currentMaxWindowSize: uint32, + currentSlowStart: bool, + currentSlowStartTreshold: uint32, + maxSndBufferSize: uint32, + currentPacketSize: uint32, + actualDelay: Duration, + numOfAckedBytes: uint32, + minRtt: Duration, + calculatedDelay: Duration, + clockDrift: int32 +): (uint32, uint32, bool) = + if (actualDelay.isZero() or minRtt.isZero() or numOfAckedBytes == 0): + return (currentMaxWindowSize, currentSlowStartTreshold, currentSlowStart) + + let ourDelay = min(minRtt, calculatedDelay) + + let target = targetDelay + + # Rationale from C reference impl: + # this is here to compensate for very large clock drift that affects + # the congestion controller into giving certain endpoints an unfair + # share of the bandwidth. We have an estimate of the clock drift + # (clock_drift). The unit of this is microseconds per 5 seconds. + # empirically, a reasonable cut-off appears to be about 200000 + # (which is pretty high). The main purpose is to compensate for + # people trying to "cheat" uTP by making their clock run slower, + # and this definitely catches that without any risk of false positives + # if clock_drift < -200000 start applying a penalty delay proportional + # to how far beoynd -200000 the clock drift is + let clockDriftPenalty: int64 = + if (clockDrift < -200000): + let penalty = (-clockDrift - 200000) div 7 + penalty + else: + 0 + + let offTarget = target.microseconds() - (ourDelay.microseconds() + clockDriftPenalty) + + # calculations from reference impl: + # double window_factor = (double)min(bytes_acked, max_window) / (double)max(max_window, bytes_acked); + # double delay_factor = off_target / target; + # double scaled_gain = MAX_CWND_INCREASE_BYTES_PER_RTT * window_factor * delay_factor; + + let windowFactor = float64(min(numOfAckedBytes, currentMaxWindowSize)) / float64(max(currentMaxWindowSize, numOfAckedBytes)) + + let delayFactor = float64(offTarget) / float64(target.microseconds()) + + let scaledGain = maxCwndIncreaseBytesPerRtt * windowFactor * delayFactor + + let scaledWindow = float64(currentMaxWindowSize) + scaledGain + + let ledbatCwnd: uint32 = + if scaledWindow < minWindowSize: + uint32(minWindowSize) + else: + uint32(scaledWindow) + + var newSlowStart = currentSlowStart + var newMaxWindowSize = currentMaxWindowSize + var newSlowStartTreshold = currentSlowStartTreshold + + if currentSlowStart: + let slowStartCwnd = currentMaxWindowSize + uint32(windowFactor * float64(currentPacketSize)) + + if (slowStartCwnd > currentSlowStartTreshold): + newSlowStart = false + elif float64(ourDelay.microseconds()) > float64(target.microseconds()) * 0.9: + # we are just a litte under target delay, discontinute slows start + newSlowStart = false + newSlowStartTreshold = currentMaxWindowSize + else: + newMaxWindowSize = max(slowStartCwnd, ledbatCwnd) + else: + newMaxWindowSize = ledbatCwnd + + newMaxWindowSize = clamp(newMaxWindowSize, minWindowSize, maxSndBufferSize) + + (newMaxWindowSize, newSlowStartTreshold, newSlowStart) + + + diff --git a/eth/utp/packets.nim b/eth/utp/packets.nim index fe6d534..30c9f85 100644 --- a/eth/utp/packets.nim +++ b/eth/utp/packets.nim @@ -56,6 +56,9 @@ type # https://github.com/bittorrent/libutp/blob/master/utp_utils.cpp, to check all the # timing assumptions on different platforms proc getMonoTimeTimeStamp*(): uint32 = + # this value is equivalent of: + # uint32((Moment.now() - Moment.init(0, Microseconds)).microseconds()) + # on macOs let time = getMonoTime() cast[uint32](time.ticks() div 1000) @@ -154,7 +157,7 @@ proc synPacket*(seqNr: uint16, rcvConnectionId: uint16, bufferSize: uint32): Pac Packet(header: h, payload: @[]) -proc ackPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSize: uint32): Packet = +proc ackPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSize: uint32, timestampDiff: uint32): Packet = let h = PacketHeaderV1( pType: ST_STATE, version: protocolVersion, @@ -162,9 +165,7 @@ proc ackPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSiz extension: 0'u8, connectionId: sndConnectionId, timestamp: getMonoTimeTimeStamp(), - # TODO for not we are using 0, but this value should be calculated on socket - # level - timestampDiff: 0'u32, + timestampDiff: timestampDiff, wndSize: bufferSize, seqNr: seqNr, ackNr: ackNr @@ -172,7 +173,14 @@ proc ackPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSiz Packet(header: h, payload: @[]) -proc dataPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSize: uint32, payload: seq[byte]): Packet = +proc dataPacket*( + seqNr: uint16, + sndConnectionId: uint16, + ackNr: uint16, + bufferSize: uint32, + payload: seq[byte], + timestampDiff: uint32 +): Packet = let h = PacketHeaderV1( pType: ST_DATA, version: protocolVersion, @@ -180,9 +188,7 @@ proc dataPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSi extension: 0'u8, connectionId: sndConnectionId, timestamp: getMonoTimeTimeStamp(), - # TODO for not we are using 0, but this value should be calculated on socket - # level - timestampDiff: 0'u32, + timestampDiff: timestampDiff, wndSize: bufferSize, seqNr: seqNr, ackNr: ackNr @@ -198,9 +204,9 @@ proc resetPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16): Packet extension: 0'u8, connectionId: sndConnectionId, timestamp: getMonoTimeTimeStamp(), - # TODO for not we are using 0, but this value should be calculated on socket - # level - timestampDiff: 0'u32, + # reset packet informs remote about lack of state for given connection, therefore + # we do not inform remote about its delay. + timestampDiff: 0, wndSize: 0, seqNr: seqNr, ackNr: ackNr @@ -208,7 +214,13 @@ proc resetPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16): Packet Packet(header: h, payload: @[]) -proc finPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSize: uint32): Packet = +proc finPacket*( + seqNr: uint16, + sndConnectionId: uint16, + ackNr: uint16, + bufferSize: uint32, + timestampDiff: uint32 +): Packet = let h = PacketHeaderV1( pType: ST_FIN, version: protocolVersion, @@ -216,9 +228,7 @@ proc finPacket*(seqNr: uint16, sndConnectionId: uint16, ackNr: uint16, bufferSiz extension: 0'u8, connectionId: sndConnectionId, timestamp: getMonoTimeTimeStamp(), - # TODO for not we are using 0, but this value should be calculated on socket - # level - timestampDiff: 0'u32, + timestampDiff: timestampDiff, wndSize: bufferSize, seqNr: seqNr, ackNr: ackNr diff --git a/eth/utp/send_buffer_tracker.nim b/eth/utp/send_buffer_tracker.nim index 16f333a..ce8268f 100644 --- a/eth/utp/send_buffer_tracker.nim +++ b/eth/utp/send_buffer_tracker.nim @@ -21,6 +21,9 @@ type SendBufferTracker* = ref object # remote receive window updated based on packed wndSize field maxRemoteWindow*: uint32 + # maximum window size, in bytes, calculated by local congestion controller + maxWindow*: uint32 + # configuration option for maxium number of bytes in snd buffer maxSndBufferSize*: uint32 waiters: seq[(uint32, Future[void])] @@ -29,18 +32,20 @@ proc new*( T: type SendBufferTracker, currentWindow: uint32, maxRemoteWindow: uint32, - maxSndBufferSize: uint32): T = + maxSndBufferSize: uint32, + maxWindow: uint32): T = return ( SendBufferTracker( currentWindow: currentWindow, maxRemoteWindow: maxRemoteWindow, maxSndBufferSize: maxSndBufferSize, + maxWindow: maxWindow, waiters: @[] ) ) proc currentFreeBytes*(t: SendBufferTracker): uint32 = - let maxSend = min(t.maxRemoteWindow, t.maxSndBufferSize) + let maxSend = min(min(t.maxRemoteWindow, t.maxSndBufferSize), t.maxWindow) if (maxSend <= t.currentWindow): return 0 else: @@ -65,6 +70,11 @@ proc updateMaxRemote*(t: SendBufferTracker, newRemoteWindow: uint32) = t.maxRemoteWindow = newRemoteWindow t.checkWaiters() +proc updateMaxWindowSize*(t: SendBufferTracker, newRemoteWindow: uint32, maxWindow: uint32) = + t.maxRemoteWindow = newRemoteWindow + t.maxWindow = maxWindow + t.checkWaiters() + proc decreaseCurrentWindow*(t: SendBufferTracker, value: uint32, notifyWaiters: bool) = doAssert(t.currentWindow >= value) t.currentWindow = t.currentWindow - value diff --git a/eth/utp/utp.nim b/eth/utp/utp.nim index 0ed768d..4564ea0 100644 --- a/eth/utp/utp.nim +++ b/eth/utp/utp.nim @@ -44,7 +44,11 @@ when isMainModule: waitFor(sleepAsync(milliseconds(1000))) - discard waitFor soc.write(bytes) + # discard waitFor soc.write(bytes) + + # waitFor(sleepAsync(milliseconds(1000))) + + # discard waitFor soc.write(bytes) runForever() diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index b683f52..f89e3c9 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -12,7 +12,12 @@ import stew/results, ./send_buffer_tracker, ./growable_buffer, - ./packets + ./packets, + ./ledbat_congestion_control, + ./delay_histogram, + ./utp_utils, + ./clock_drift_calculator + logScope: topics = "utp_socket" @@ -186,6 +191,25 @@ type zeroWindowTimer: Moment + # last measured delay between current local timestamp, and remote sent + # timestamp. In microseconds + replayMicro: uint32 + + # indicator if we're in slow-start (exponential growth) phase + slowStart: bool + + #the slow-start threshold, in bytes + slowStartTreshold: uint32 + + # history of our delays + ourHistogram: DelayHistogram + + # history of remote delays + remoteHistogram: DelayHistogram + + # calculator of drifiting between local and remote clocks + driftCalculator: ClockDriftCalculator + # socket identifier socketKey*: UtpSocketKey[A] @@ -255,6 +279,13 @@ const # Reset period is configured in `SocketConfig` minimalRemoteWindow: uint32 = 1500 + # Initial max window size. Reference implementation uses value which enables one packet + # to be transfered. + # We use value two times higher as we do not yet have proper mtu estimation, and + # our impl should work over udp and discovery v5 (where proper estmation may be harder + # as packets already have discvoveryv5 envelope) + startMaxWindow* = 2 * mtuSize + reorderBufferMaxSize = 1024 proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T = @@ -319,7 +350,8 @@ proc sendAck(socket: UtpSocket): Future[void] = socket.seqNr, socket.connectionIdSnd, socket.ackNr, - socket.getRcvWindowSize() + socket.getRcvWindowSize(), + socket.replayMicro ) socket.sendData(encodePacket(ackPacket)) @@ -405,7 +437,28 @@ proc checkTimeouts(socket: UtpSocket) {.async.} = socket.retransmitTimeout = newTimeout socket.rtoTimeout = currentTime + newTimeout - # TODO Add handling of congestion control + let currentPacketSize = uint32(socket.getPacketSize()) + + if (socket.curWindowPackets == 0 and socket.sendBufferTracker.maxWindow > currentPacketSize): + # there are no packets in flight even though there is place for more than whole packet + # this means connection is just idling. Reset window by 1/3'rd but no more + # than to fit at least one packet. + let oldMaxWindow = socket.sendBufferTracker.maxWindow + let newMaxWindow = max((oldMaxWindow * 2) div 3, currentPacketSize) + socket.sendBufferTracker.updateMaxWindowSize( + # maxRemote window does not change + socket.sendBufferTracker.maxRemoteWindow, + newMaxWindow + ) + else: + # delay was so high that window has shrunk below one packet. Reset window + # to fit a least one packet and start with slow start + socket.sendBufferTracker.updateMaxWindowSize( + # maxRemote window does not change + socket.sendBufferTracker.maxRemoteWindow, + currentPacketSize + ) + socket.slowStart = true # This will have much more sense when we will add handling of selective acks # as then every selecivly acked packet restes timeout timer and removes packet @@ -472,7 +525,15 @@ proc handleDataWrite(socket: UtpSocket, data: seq[byte], writeFut: Future[WriteR if socket.curWindowPackets == 0: socket.resetSendTimeout() - let dataPacket = dataPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, wndSize, dataSlice) + let dataPacket = + dataPacket( + socket.seqNr, + socket.connectionIdSnd, + socket.ackNr, + wndSize, + dataSlice, + socket.replayMicro + ) let outgoingPacket = OutgoingPacket.init(encodePacket(dataPacket), 1, false, payloadLength) socket.registerOutgoingPacket(outgoingPacket) await socket.sendData(outgoingPacket.packetBytes) @@ -500,7 +561,16 @@ proc handleClose(socket: UtpSocket): Future[void] {.async.} = if socket.curWindowPackets == 0: socket.resetSendTimeout() - let finEncoded = encodePacket(finPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, socket.getRcvWindowSize())) + let finEncoded = + encodePacket( + finPacket( + socket.seqNr, + socket.connectionIdSnd, + socket.ackNr, + socket.getRcvWindowSize(), + socket.replayMicro + ) + ) socket.registerOutgoingPacket(OutgoingPacket.init(finEncoded, 1, true, 0)) await socket.sendData(finEncoded) socket.finSent = true @@ -543,6 +613,7 @@ proc new[A]( initialAckNr: uint16, initialTimeout: Duration ): T = + let currentTime = Moment.now() T( remoteAddress: to, state: state, @@ -556,7 +627,7 @@ proc new[A]( outBuffer: GrowableCircularBuffer[OutgoingPacket].init(), inBuffer: GrowableCircularBuffer[Packet].init(), retransmitTimeout: initialTimeout, - rtoTimeout: Moment.now() + initialTimeout, + rtoTimeout: currentTime + initialTimeout, # Initial timeout values taken from reference implemntation rtt: milliseconds(0), rttVar: milliseconds(800), @@ -565,11 +636,16 @@ proc new[A]( closeEvent: newAsyncEvent(), closeCallbacks: newSeq[Future[void]](), # start with 1mb assumption, field will be updated with first received packet - sendBufferTracker: SendBufferTracker.new(0, 1024 * 1024, cfg.optSndBuffer), + sendBufferTracker: SendBufferTracker.new(0, 1024 * 1024, cfg.optSndBuffer, startMaxWindow), # queue with infinite size writeQueue: newAsyncQueue[WriteRequest](), - zeroWindowTimer: Moment.now() + cfg.remoteWindowResetTimeout, + zeroWindowTimer: currentTime + cfg.remoteWindowResetTimeout, socketKey: UtpSocketKey.init(to, rcvId), + slowStart: true, + slowStartTreshold: cfg.optSndBuffer, + ourHistogram: DelayHistogram.init(currentTime), + remoteHistogram: DelayHistogram.init(currentTime), + driftCalculator: ClockDriftCalculator.init(currentTime), send: snd ) @@ -682,12 +758,6 @@ proc setCloseCallback(s: UtpSocket, cb: SocketCloseCallback) {.async.} = proc registerCloseCallback*(s: UtpSocket, cb: SocketCloseCallback) = s.closeCallbacks.add(s.setCloseCallback(cb)) -proc max(a, b: Duration): Duration = - if (a > b): - a - else: - b - proc updateTimeouts(socket: UtpSocket, timeSent: Moment, currentTime: Moment) = ## Update timeouts according to spec: ## delta = rtt - packet_rtt @@ -769,20 +839,31 @@ proc ackPackets(socket: UtpSocket, nrPacketsToAck: uint16) = inc i +proc calculateAckedbytes(socket: UtpSocket, nrPacketsToAck: uint16, now: Moment): (uint32, Duration) = + var i: uint16 = 0 + var ackedBytes: uint32 = 0 + var minRtt: Duration = InfiniteDuration + while i < nrPacketsToack: + let seqNr = socket.seqNr - socket.curWindowPackets + i + let packetOpt = socket.outBuffer.get(seqNr) + if (packetOpt.isSome() and packetOpt.unsafeGet().transmissions > 0): + let packet = packetOpt.unsafeGet() + + ackedBytes = ackedBytes + packet.payloadLength + + # safety check in case clock is not monotonic + if packet.timeSent < now: + minRtt = min(minRtt, now - packet.timeSent) + else: + minRtt = min(minRtt, microseconds(50000)) + + inc i + (ackedBytes, minRtt) + proc initializeAckNr(socket: UtpSocket, packetSeqNr: uint16) = if (socket.state == SynSent): socket.ackNr = packetSeqNr - 1 -# compare if lhs is less than rhs, taking wrapping -# into account. i.e high(lhs) < 0 == true -proc wrapCompareLess(lhs: uint16, rhs:uint16): bool = - let distDown = (lhs - rhs) - let distUp = (rhs - lhs) - # if the distance walking up is shorter, lhs - # is less than rhs. If the distance walking down - # is shorter, then rhs is less than lhs - return distUp < distDown - proc isAckNrInvalid(socket: UtpSocket, packet: Packet): bool = let ackWindow = max(socket.curWindowPackets + allowedAckWindow, allowedAckWindow) ( @@ -802,6 +883,7 @@ proc isAckNrInvalid(socket: UtpSocket, packet: Packet): bool = # to scheduler which means there could be potentialy several processPacket procs # running proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = + let receiptTime = Moment.now() if socket.isAckNrInvalid(p): notice "Received packet with invalid ack nr" @@ -834,15 +916,74 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = notice "Received packet is totally of the mark" return - # update remote window size - socket.sendBufferTracker.updateMaxRemote(p.header.wndSize) + var (ackedBytes, minRtt) = socket.calculateAckedbytes(acks, receiptTime) + # TODO caluclate bytes acked by selective acks here (if thats the case) + + let receiptTimestamp = getMonoTimeTimeStamp() + + let sentTimeRemote = p.header.timestamp + # we are using uint32 not a Duration, to wrap a round in case of + # sentTimeRemote > receipTimestamp. This can happen as local and remote + # clock can be not synchornized or even using different system clock. + # i.e this number itself does not tell anything and is only used to feedback it + # to remote peer with each sent packet + let remoteDelay = + if (sentTimeRemote == 0): + 0'u32 + else: + receiptTimestamp - sentTimeRemote + + socket.replayMicro = remoteDelay + + let prevRemoteDelayBase = socket.remoteHistogram.delayBase + + if (remoteDelay != 0): + socket.remoteHistogram.addSample(remoteDelay, receiptTime) + + # remote new delay base is less than previous + # shift our delay base in other direction to take clock skew into account + # but no more than 10ms + if (prevRemoteDelayBase != 0 and + wrapCompareLess(socket.remoteHistogram.delayBase, prevRemoteDelayBase) and + prevRemoteDelayBase - socket.remoteHistogram.delayBase <= 10000'u32): + socket.ourHistogram.shift(prevRemoteDelayBase - socket.remoteHistogram.delayBase) + + let actualDelay = p.header.timestampDiff + + if actualDelay != 0: + socket.ourHistogram.addSample(actualDelay, receiptTime) + socket.driftCalculator.addSample(actualDelay, receiptTime) + + # adjust base delay if delay estimates exceeds rtt + if (socket.ourHistogram.getValue() > minRtt): + let diff = uint32((socket.ourHistogram.getValue() - minRtt).microseconds()) + socket.ourHistogram.shift(diff) + + let (newMaxWindow, newSlowStartTreshold, newSlowStart) = + applyCongestionControl( + socket.sendBufferTracker.maxWindow, + socket.slowStart, + socket.slowStartTreshold, + socket.socketConfig.optSndBuffer, + uint32(socket.getPacketSize()), + microseconds(actualDelay), + ackedBytes, + minRtt, + socket.ourHistogram.getValue(), + socket.driftCalculator.clockDrift + ) + + # update remote window size and max window + socket.sendBufferTracker.updateMaxWindowSize(p.header.wndSize, newMaxWindow) + socket.slowStart = newSlowStart + socket.slowStartTreshold = newSlowStartTreshold + if (socket.sendBufferTracker.maxRemoteWindow == 0): # when zeroWindowTimer will be hit and maxRemoteWindow still will be equal to 0 # then it will be reset to minimal value socket.zeroWindowTimer = Moment.now() + socket.socketConfig.remoteWindowResetTimeout - # socket.curWindowPackets == acks means that this packet acked all remaining packets # including the sent fin packets if (socket.finSent and socket.curWindowPackets == acks): @@ -855,7 +996,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = socket.destroy() socket.ackPackets(acks) - + case p.header.pType of ST_DATA, ST_FIN: # To avoid amplification attacks, server socket is in SynRecv state until @@ -1109,3 +1250,7 @@ proc connectionId*[A](socket: UtpSocket[A]): uint16 = socket.connectionIdSnd of Outgoing: socket.connectionIdRcv + +# Check what is current available window size for this socket +proc currentMaxWindowSize*[A](socket: UtpSocket[A]): uint32 = + socket.sendBufferTracker.maxWindow diff --git a/eth/utp/utp_utils.nim b/eth/utp/utp_utils.nim new file mode 100644 index 0000000..c83da1b --- /dev/null +++ b/eth/utp/utp_utils.nim @@ -0,0 +1,38 @@ +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [Defect].} + +import + chronos + +# compare if lhs is less than rhs, taking wrapping +# into account. i.e high(lhs) < 0 == true +proc wrapCompareLess*(lhs: uint32, rhs: uint32): bool = + let distDown = (lhs - rhs) + let distUp = (rhs - lhs) + # if the distance walking up is shorter, lhs + # is less than rhs. If the distance walking down + # is shorter, then rhs is less than lhs + return distUp < distDown + +proc wrapCompareLess*(lhs: uint16, rhs: uint16): bool = + let distDown = (lhs - rhs) + let distUp = (rhs - lhs) + + return distUp < distDown + +proc max*(a, b: Duration): Duration = + if (a > b): + a + else: + b + +proc min*(a, b: Duration): Duration = + if (a < b): + a + else: + b diff --git a/tests/utp/test_clock_drift_calculator.nim b/tests/utp/test_clock_drift_calculator.nim new file mode 100644 index 0000000..c797ec1 --- /dev/null +++ b/tests/utp/test_clock_drift_calculator.nim @@ -0,0 +1,71 @@ +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.used.} + +import + chronos, + unittest, + ../../eth/utp/clock_drift_calculator + +suite "Clock drift calculator": + + test "Initial clock drift should be 0": + let currentTime = Moment.now() + let calculator = ClockDriftCalculator.init(currentTime) + + check: + calculator.clockDrift == 0 + + test "Adding samples should not update averages if 5s did not pass": + let currentTime = Moment.now() + var calculator = ClockDriftCalculator.init(currentTime) + + calculator.addSample(10, currentTime + seconds(1)) + calculator.addSample(10, currentTime + seconds(2)) + + check: + calculator.clockDrift == 0 + calculator.lastClockDrift == 0 + + test "Clock drift should be calculated in relation to first sample": + let currentTime = Moment.now() + var calculator = ClockDriftCalculator.init(currentTime) + + # first sample which will be treated as a base sample + calculator.addSample(10, currentTime + seconds(3)) + + # second sample in the first inteval it will be treated in relation to first one + # so correct first drift should be: (50 - 10) / 2 == 20 + calculator.addSample(50, currentTime + seconds(6)) + + check: + calculator.clockDrift == 2 + calculator.lastClockDrift == 20 + + test "Clock drift should properly calcuated when clock drifts to two sides": + let currentTime = Moment.now() + var calculator1 = ClockDriftCalculator.init(currentTime) + var calculator2 = ClockDriftCalculator.init(currentTime) + + + # first sample which will be treated as a base sample + calculator1.addSample(10, currentTime + seconds(3)) + + # second sample in the first inteval it will be treated in relation to first one + # so correct first drift should be: (50 - 10) / 2 == 20 + calculator1.addSample(50, currentTime + seconds(6)) + + # first sample which will be treated as a base sample + calculator2.addSample(50, currentTime + seconds(3)) + + # second sample in the first inteval it will be treated in relation to first one + # so correct first drift should be: (10 - 50) / 2 == -20 + calculator2.addSample(10, currentTime + seconds(6)) + + check: + calculator1.clockDrift == -calculator2.clockDrift + calculator1.lastClockDrift == -calculator2.lastClockDrift diff --git a/tests/utp/test_protocol.nim b/tests/utp/test_protocol.nim index 1c24641..7df752e 100644 --- a/tests/utp/test_protocol.nim +++ b/tests/utp/test_protocol.nim @@ -421,3 +421,74 @@ procSuite "Utp protocol over udp tests": await utpProt1.shutdownWait() await utpProt2.shutdownWait() await utpProt3.shutdownWait() + + asyncTest "Success data transfer of a lot of data should increase available window on sender side": + let s = await initClientServerScenario() + + check: + s.clientSocket.isConnected() + # initially window has value equal to some pre configured constant + s.clientSocket.currentMaxWindowSize == startMaxWindow + # after successful connection outgoing buffer should be empty as syn packet + # should be correctly acked + s.clientSocket.numPacketsInOutGoingBuffer() == 0 + + (not s.serverSocket.isConnected()) + + # big transfer of 50kb + let bytesToTransfer = generateByteArray(rng[], 50000) + + let bytesReceivedFromClient = await transferData(s.clientSocket, s.serverSocket, bytesToTransfer) + + # ultimatly all send packets will acked, and outgoing buffer will be empty + await waitUntil(proc (): bool = s.clientSocket.numPacketsInOutGoingBuffer() == 0) + + check: + # we can only assert that window has grown, becouse specific values depends on + # particual timings + s.clientSocket.currentMaxWindowSize > startMaxWindow + s.serverSocket.isConnected() + s.clientSocket.numPacketsInOutGoingBuffer() == 0 + bytesReceivedFromClient == bytesToTransfer + + await s.close() + + asyncTest "Not used socket should decay its max send window": + let s = await initClientServerScenario() + + check: + s.clientSocket.isConnected() + # initially window has value equal to some pre configured constant + s.clientSocket.currentMaxWindowSize == startMaxWindow + # after successful connection outgoing buffer should be empty as syn packet + # should be correctly acked + s.clientSocket.numPacketsInOutGoingBuffer() == 0 + + (not s.serverSocket.isConnected()) + + # big transfer of 50kb + let bytesToTransfer = generateByteArray(rng[], 50000) + + let bytesReceivedFromClient = await transferData(s.clientSocket, s.serverSocket, bytesToTransfer) + + # ultimatly all send packets will acked, and outgoing buffer will be empty + await waitUntil(proc (): bool = s.clientSocket.numPacketsInOutGoingBuffer() == 0) + + let maximumMaxWindow = s.clientSocket.currentMaxWindowSize + + check: + # we can only assert that window has grown, becouse specific values depends on + # particual timings + maximumMaxWindow > startMaxWindow + s.serverSocket.isConnected() + s.clientSocket.numPacketsInOutGoingBuffer() == 0 + bytesReceivedFromClient == bytesToTransfer + + # wait long enough to trigger timeout + await sleepAsync(seconds(5)) + + check: + # window should decay when idle + s.clientSocket.currentMaxWindowSize < maximumMaxWindow + + await s.close() diff --git a/tests/utp/test_utp_router.nim b/tests/utp/test_utp_router.nim index 9b32f9f..5371bfa 100644 --- a/tests/utp/test_utp_router.nim +++ b/tests/utp/test_utp_router.nim @@ -60,7 +60,14 @@ procSuite "Utp router unit tests": check: initialPacket.header.pType == ST_SYN - let responseAck = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize) + let responseAck = + ackPacket( + initialRemoteSeq, + initialPacket.header.connectionId, + initialPacket.header.seqNr, + testBufferSize, + 0 + ) await router.processIncomingBytes(encodePacket(responseAck), remote) @@ -167,7 +174,17 @@ procSuite "Utp router unit tests": # socket is not configured to be connected until receiving data not socket.isConnected() - let encodedData = encodePacket(dataPacket(initSeq + 1, initConnId + 1, initialPacket.header.seqNr - 1, 10, dataToSend)) + let encodedData = + encodePacket( + dataPacket( + initSeq + 1, + initConnId + 1, + initialPacket.header.seqNr - 1, + 10, + dataToSend, + 0 + ) + ) await router.processIncomingBytes(encodedData, testSender) @@ -255,7 +272,14 @@ procSuite "Utp router unit tests": # connection id of syn packet should be set to requested connection id initialPacket.header.connectionId == requestedConnectionId - let responseAck = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize) + let responseAck = + ackPacket( + initialRemoteSeq, + initialPacket.header.connectionId, + initialPacket.header.seqNr, + testBufferSize, + 0 + ) await router.processIncomingBytes(encodePacket(responseAck), testSender2) @@ -355,7 +379,7 @@ procSuite "Utp router unit tests": router.sendCb = initTestSnd(pq) let sndId = 10'u16 - let dp = dataPacket(10'u16, sndId, 10'u16, 10'u32, @[1'u8]) + let dp = dataPacket(10'u16, sndId, 10'u16, 10'u32, @[1'u8], 0) await router.processIncomingBytes(encodePacket(dp), testSender2) diff --git a/tests/utp/test_utp_socket.nim b/tests/utp/test_utp_socket.nim index 805f20e..b89d1d8 100644 --- a/tests/utp/test_utp_socket.nim +++ b/tests/utp/test_utp_socket.nim @@ -44,7 +44,8 @@ procSuite "Utp socket unit test": connectionId, ackNr, testBufferSize, - generateByteArray(rng, packetSize) + generateByteArray(rng, packetSize), + 0 ) packets.add(packet) @@ -70,7 +71,14 @@ procSuite "Utp socket unit test": check: initialPacket.header.pType == ST_SYN - let responseAck = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, remoteReceiveBuffer) + let responseAck = + ackPacket( + initialRemoteSeq, + initialPacket.header.connectionId, + initialPacket.header.seqNr, + remoteReceiveBuffer, + 0 + ) await sock1.processPacket(responseAck) @@ -151,7 +159,15 @@ procSuite "Utp socket unit test": let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q) - let dataP1 = dataPacket(initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data) + let dataP1 = + dataPacket( + initalRemoteSeqNr, + initialPacket.header.connectionId, + initialPacket.header.seqNr, + testBufferSize, + data, + 0 + ) await outgoingSocket.processPacket(dataP1) let ack1 = await q.get() @@ -305,7 +321,14 @@ procSuite "Utp socket unit test": # ackNr in state packet, is set to sentPacket.header.seqNr which means remote # side processed out packet - let responseAck = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, sentPacket.header.seqNr, testBufferSize) + let responseAck = + ackPacket( + initialRemoteSeq, + initialPacket.header.connectionId, + sentPacket.header.seqNr, + testBufferSize, + 0 + ) await outgoingSocket.processPacket(responseAck) @@ -365,7 +388,14 @@ procSuite "Utp socket unit test": # user decided to cancel second write await writeFut2.cancelAndWait() # remote increased wnd size enough for all writes - let someAckFromRemote = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, 10) + let someAckFromRemote = + ackPacket( + initialRemoteSeq, + initialPacket.header.connectionId, + initialPacket.header.seqNr, + 10, + 0 + ) await outgoingSocket.processPacket(someAckFromRemote) @@ -407,7 +437,14 @@ procSuite "Utp socket unit test": check: initialPacket.header.pType == ST_SYN - let responseAck = ackPacket(initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize) + let responseAck = + ackPacket( + initalRemoteSeqNr, + initialPacket.header.connectionId, + initialPacket.header.seqNr, + testBufferSize, + 0 + ) await outgoingSocket.processPacket(responseAck) @@ -460,7 +497,14 @@ procSuite "Utp socket unit test": let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q) - let finP = finPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize) + let finP = + finPacket( + initialRemoteSeq, + initialPacket.header.connectionId, + initialPacket.header.seqNr, + testBufferSize, + 0 + ) await outgoingSocket.processPacket(finP) let ack1 = await q.get() @@ -481,10 +525,34 @@ procSuite "Utp socket unit test": let readF = outgoingSocket.read() - let dataP = dataPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data) - let dataP1 = dataPacket(initialRemoteSeq + 1, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data1) + let dataP = + dataPacket( + initialRemoteSeq, + initialPacket.header.connectionId, + initialPacket.header.seqNr, + testBufferSize, + data, + 0 + ) - let finP = finPacket(initialRemoteSeq + 2, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize) + let dataP1 = + dataPacket( + initialRemoteSeq + 1, + initialPacket.header.connectionId, + initialPacket.header.seqNr, + testBufferSize, + data1, + 0 + ) + + let finP = + finPacket( + initialRemoteSeq + 2, + initialPacket.header.connectionId, + initialPacket.header.seqNr, + testBufferSize, + 0 + ) await outgoingSocket.processPacket(finP) @@ -519,13 +587,36 @@ procSuite "Utp socket unit test": let readF = outgoingSocket.read() - let dataP = dataPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data) + let dataP = + dataPacket( + initialRemoteSeq, + initialPacket.header.connectionId, + initialPacket.header.seqNr, + testBufferSize, + data, + 0 + ) - let finP = finPacket(initialRemoteSeq + 1, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize) + let finP = + finPacket( + initialRemoteSeq + 1, + initialPacket.header.connectionId, + initialPacket.header.seqNr, + testBufferSize, + 0 + ) # dataP1 has seqNr larger than fin, there fore it should be considered past eof and never passed # to user of library - let dataP1 = dataPacket(initialRemoteSeq + 2, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data1) + let dataP1 = + dataPacket( + initialRemoteSeq + 2, + initialPacket.header.connectionId, + initialPacket.header.seqNr, + testBufferSize, + data1, + 0 + ) await outgoingSocket.processPacket(finP) @@ -583,7 +674,14 @@ procSuite "Utp socket unit test": check: sendFin.header.pType == ST_FIN - let responseAck = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, sendFin.header.seqNr, testBufferSize) + let responseAck = + ackPacket( + initialRemoteSeq, + initialPacket.header.connectionId, + sendFin.header.seqNr, + testBufferSize, + 0 + ) await outgoingSocket.processPacket(responseAck) @@ -639,7 +737,15 @@ procSuite "Utp socket unit test": let sCfg = SocketConfig.init(optRcvBuffer = initialRcvBufferSize) let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeqNr, q, testBufferSize, sCfg) - let dataP1 = dataPacket(initialRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data) + let dataP1 = + dataPacket( + initialRemoteSeqNr, + initialPacket.header.connectionId, + initialPacket.header.seqNr, + testBufferSize, + data, + 0 + ) await outgoingSocket.processPacket(dataP1) @@ -676,7 +782,15 @@ procSuite "Utp socket unit test": let sCfg = SocketConfig.init(optRcvBuffer = initialRcvBufferSize) let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q, testBufferSize, sCfg) - let dataP1 = dataPacket(initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data) + let dataP1 = + dataPacket( + initalRemoteSeqNr, + initialPacket.header.connectionId, + initialPacket.header.seqNr, + testBufferSize, + data, + 0 + ) await outgoingSocket.processPacket(dataP1) @@ -714,11 +828,35 @@ procSuite "Utp socket unit test": let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q) # data packet with ack nr set above our seq nr i.e packet from the future - let dataFuture = dataPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr + 1, testBufferSize, data1) + let dataFuture = + dataPacket( + initialRemoteSeq, + initialPacket.header.connectionId, + initialPacket.header.seqNr + 1, + testBufferSize, + data1, + 0 + ) # data packet wth ack number set below out ack window i.e packet too old - let dataTooOld = dataPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr - allowedAckWindow - 1, testBufferSize, data2) + let dataTooOld = + dataPacket( + initialRemoteSeq, + initialPacket.header.connectionId, + initialPacket.header.seqNr - allowedAckWindow - 1, + testBufferSize, + data2, + 0 + ) - let dataOk = dataPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data3) + let dataOk = + dataPacket( + initialRemoteSeq, + initialPacket.header.connectionId, + initialPacket.header.seqNr, + testBufferSize, + data3, + 0 + ) await outgoingSocket.processPacket(dataFuture) await outgoingSocket.processPacket(dataTooOld) @@ -773,7 +911,14 @@ procSuite "Utp socket unit test": check: int(outgoingSocket.numOfBytesInFlight) == len(dataToWrite) + len(dataToWrite) - let responseAck = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, sentPacket.header.seqNr, testBufferSize) + let responseAck = + ackPacket( + initialRemoteSeq, + initialPacket.header.connectionId, + sentPacket.header.seqNr, + testBufferSize, + 0 + ) await outgoingSocket.processPacket(responseAck) @@ -839,7 +984,14 @@ procSuite "Utp socket unit test": check: not writeFut.finished() - let someAckFromRemote = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, uint32(len(dataToWrite))) + let someAckFromRemote = + ackPacket( + initialRemoteSeq, + initialPacket.header.connectionId, + initialPacket.header.seqNr, + uint32(len(dataToWrite)), + 0 + ) await outgoingSocket.processPacket(someAckFromRemote) @@ -862,7 +1014,14 @@ procSuite "Utp socket unit test": # remote is initialized with buffer to small to handle whole payload let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q) let remoteRcvWindowSize = uint32(outgoingSocket.getPacketSize()) - let someAckFromRemote = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, remoteRcvWindowSize) + let someAckFromRemote = + ackPacket( + initialRemoteSeq, + initialPacket.header.connectionId, + initialPacket.header.seqNr, + remoteRcvWindowSize, + 0 + ) # we are using ack from remote to setup our snd window size to one packet size on one packet await outgoingSocket.processPacket(someAckFromRemote) @@ -890,7 +1049,14 @@ procSuite "Utp socket unit test": # remote is initialized with buffer to small to handle whole payload let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q) let remoteRcvWindowSize = uint32(outgoingSocket.getPacketSize()) - let someAckFromRemote = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, remoteRcvWindowSize) + let someAckFromRemote = + ackPacket( + initialRemoteSeq, + initialPacket.header.connectionId, + initialPacket.header.seqNr, + remoteRcvWindowSize, + 0 + ) # we are using ack from remote to setup our snd window size to one packet size on one packet await outgoingSocket.processPacket(someAckFromRemote) @@ -899,7 +1065,14 @@ procSuite "Utp socket unit test": let writeFut = outgoingSocket.write(twoPacketData) - let firstAckFromRemote = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr + 1, remoteRcvWindowSize) + let firstAckFromRemote = + ackPacket( + initialRemoteSeq, + initialPacket.header.connectionId, + initialPacket.header.seqNr + 1, + remoteRcvWindowSize, + 0 + ) let packet = await q.get() @@ -988,7 +1161,14 @@ procSuite "Utp socket unit test": # this write still cannot progress as 1st write is not acked not writeFut2.finished() - let someAckFromRemote = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr + 1, 10) + let someAckFromRemote = + ackPacket( + initialRemoteSeq, + initialPacket.header.connectionId, + initialPacket.header.seqNr + 1, + 10, + 0 + ) # acks first write, so there is space in buffer for new data and second # write should progress @@ -1004,3 +1184,33 @@ procSuite "Utp socket unit test": secondPacket.payload == somedata2 await outgoingSocket.destroyWait() + + asyncTest "Socket should inform remote about its delay": + let q = newAsyncQueue[Packet]() + let initialRemoteSeq = 10'u16 + + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q) + + let dataP1 = + dataPacket( + initialRemoteSeq, + initialPacket.header.connectionId, + initialPacket.header.seqNr, + testBufferSize, + @[1'u8], + 0 + ) + + check: + outgoingSocket.isConnected() + + # necessary to avoid timestampDiff near 0 and flaky tests + await sleepAsync(milliseconds(50)) + await outgoingSocket.processPacket(dataP1) + + let socketAck = await q.get() + + check: + socketAck.header.timestampDiff > 0 + + await outgoingSocket.destroyWait()