From d74dc40bee2f9f61f74b1f869ef3f063864e53d4 Mon Sep 17 00:00:00 2001 From: Kim De Mey Date: Thu, 22 Jun 2023 13:31:30 +0200 Subject: [PATCH] Style fixes and comment improvements on uTP code (#623) --- eth/utp/utp_discv5_protocol.nim | 29 ++- eth/utp/utp_router.nim | 2 +- eth/utp/utp_socket.nim | 371 ++++++++++++++++++-------------- tests/utp/test_utp_socket.nim | 61 +++--- 4 files changed, 259 insertions(+), 204 deletions(-) diff --git a/eth/utp/utp_discv5_protocol.nim b/eth/utp/utp_discv5_protocol.nim index 06a91e5..5c8d727 100644 --- a/eth/utp/utp_discv5_protocol.nim +++ b/eth/utp/utp_discv5_protocol.nim @@ -31,7 +31,8 @@ proc init*(T: type NodeAddress, nodeId: NodeId, address: Address): NodeAddress = NodeAddress(nodeId: nodeId, address: address) proc init*(T: type NodeAddress, node: Node): Option[NodeAddress] = - node.address.map((address: Address) => NodeAddress(nodeId: node.id, address: address)) + node.address.map((address: Address) => + NodeAddress(nodeId: node.id, address: address)) proc hash(x: NodeAddress): Hash = var h = 0 @@ -51,25 +52,31 @@ func `$`*(x: UtpSocketKey[NodeAddress]): string = ", rcvId: " & $x.rcvId & ")" -proc talkReqDirect(p: protocol.Protocol, n: NodeAddress, protocol, request: seq[byte]): Future[void] = +proc talkReqDirect( + p: protocol.Protocol, n: NodeAddress, protocol, request: seq[byte]): + Future[void] = let reqId = RequestId.init(p.rng[]) - message = encodeMessage(TalkReqMessage(protocol: protocol, request: request), reqId) + message = encodeMessage( + TalkReqMessage(protocol: protocol, request: request), reqId) + (data, nonce) = encodeMessagePacket( + p.rng[], p.codec, n.nodeId, n.address, message) - (data, nonce) = encodeMessagePacket(p.rng[], p.codec, n.nodeId, n.address, message) - - trace "Send message packet", dstId = n.nodeId, address = n.address, kind = MessageKind.talkreq + trace "Send message packet", + dstId = n.nodeId, address = n.address, kind = MessageKind.talkreq p.send(n.address, data) proc initSendCallback( - t: protocol.Protocol, subProtocolName: seq[byte]): SendCallback[NodeAddress] = + t: protocol.Protocol, subProtocolName: seq[byte]): + SendCallback[NodeAddress] = return ( proc (to: NodeAddress, data: seq[byte]): Future[void] = let fut = newFuture[void]() - # hidden assumption here is that nodes already have established discv5 session - # between each other. In our use case this should be true as opening stream - # is only done after successful OFFER/ACCEPT or FINDCONTENT/CONTENT exchange - # which forces nodes to establish session between each other. + # hidden assumption here is that nodes already have established discv5 + # session between each other. In our use case this should be true as + # opening stream is only done after successful OFFER/ACCEPT or + # FINDCONTENT/CONTENT exchange which forces nodes to establish session + # between each other. discard t.talkReqDirect(to, subProtocolName, data) fut.complete() return fut diff --git a/eth/utp/utp_router.nim b/eth/utp/utp_router.nim index 906663b..1c70739 100644 --- a/eth/utp/utp_router.nim +++ b/eth/utp/utp_router.nim @@ -249,7 +249,7 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}= else: # TODO: add keeping track of recently send reset packets and do not send # reset to peers which we recently send reset to. - debug "Received FIN/DATA/ACK on not known socket sending reset" + debug "Received FIN/DATA/ACK on unknown socket, sending reset" let rstPacket = resetPacket( randUint16(r.rng[]), p.header.connectionId, p.header.seqNr) await r.sendCb(sender, encodePacket(rstPacket)) diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index 57c6770..da361bd 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -8,7 +8,7 @@ import std/[sugar, deques], - chronos, chronicles, + chronos, chronicles, metrics, stew/[results, bitops2], ./growable_buffer, ./packets, @@ -203,7 +203,7 @@ type # peer rto: Duration - # RTO timeout will happen when currenTime > rtoTimeout + # RTO timeout happens when currentTime > rtoTimeout rtoTimeout: Moment # rcvBuffer @@ -215,7 +215,7 @@ type # readers waiting for data pendingReads: Deque[ReadReq] - # loop called every 500ms to check for on going timeout status + # loop called every 500ms to check for timeouts checkTimeoutsLoop: Future[void] # number on consecutive re-transmissions @@ -257,15 +257,15 @@ type # timer which is started when peer max window drops below current packet size zeroWindowTimer: Option[Moment] - # last measured delay between current local timestamp, and remote sent - # timestamp. In microseconds + # last measured delay between current local timestamp and remote sent + # timestamp, in microseconds. replayMicro: uint32 - # indicator if we're in slow-start (exponential growth) phase + # indicator if socket is in in slow-start (exponential growth) phase slowStart: bool - # indicator if we're in fast time out mode i.e we will resend - # oldest packet un-acked in case of newer packet arriving + # indicator if socket is in fast time-out mode, i.e will resend oldest + # not ACK'ed packet when newer packet is received. fastTimeout: bool # Sequence number of the next packet we are allowed to fast-resend. This is @@ -278,7 +278,7 @@ type # counter of duplicate acks duplicateAck: uint16 - #the slow-start threshold, in bytes + # the slow-start threshold, in bytes slowStartThreshold: uint32 # history of our delays @@ -295,8 +295,8 @@ type send: SendCallback[A] - # User driven call back to be called whenever socket is permanently closed i.e - # reaches destroy state + # User driven callback to be called whenever socket is permanently closed, + # i.e reaches the destroy state SocketCloseCallback* = proc (): void {.gcsafe, raises: [].} ConnectionError* = object of CatchableError @@ -543,13 +543,14 @@ proc checkTimeouts(socket: UtpSocket) = if socket.isOpened(): let currentPacketSize = socket.getPacketSize() - if (socket.zeroWindowTimer.isSome() and currentTime > socket.zeroWindowTimer.unsafeGet()): + if (socket.zeroWindowTimer.isSome() and + currentTime > socket.zeroWindowTimer.unsafeGet()): if socket.maxRemoteWindow <= currentPacketSize: - # Reset remote window, to minimal value which will fit at least two packet - let minimalRemoteWindow = 2 * socket.socketConfig.payloadSize - socket.maxRemoteWindow = minimalRemoteWindow - debug "Reset remote window to minimal value", - minRemote = minimalRemoteWindow + # Reset maxRemoteWindow to minimal value which will fit at least two + # packets + let remoteWindow = 2 * socket.socketConfig.payloadSize + socket.maxRemoteWindow = remoteWindow + debug "Reset remote window to minimal value", remoteWindow socket.zeroWindowTimer = none[Moment]() if (currentTime > socket.rtoTimeout): @@ -560,12 +561,14 @@ proc checkTimeouts(socket: UtpSocket) = curWindowPackets = socket.curWindowPackets, curWindowBytes = socket.currentWindow - # TODO add handling of probe time outs. Reference implementation has mechanism - # of sending probes to determine mtu size. Probe timeouts do not count to standard - # timeouts calculations + # TODO: + # Add handling of probing on timeouts. The reference implementation has + # a mechanism of sending probes to determine MTU size. Probe timeouts are + # not taking into account for the timeout calculation. - # client initiated connections, but did not send following data packet in rto - # time and our socket is configured to start in SynRecv state. + # For client initiated connections: SYN received but did not receive + # following data packet in rto time and the socket is configured to start + # in SynRecv state (to avoid amplifcation by IP spoofing). if (socket.state == SynRecv): socket.destroy() return @@ -589,13 +592,13 @@ proc checkTimeouts(socket: UtpSocket) = socket.retransmitTimeout = newTimeout socket.rtoTimeout = currentTime + newTimeout - # on timeout reset duplicate ack counter + # on timeout, reset the duplicate ack counter socket.duplicateAck = 0 if (socket.curWindowPackets == 0 and socket.maxWindow > currentPacketSize): - # there are no packets in flight even though there is place for more than whole packet - # this means connection is just idling. Reset window by 1/3'rd but no more - # than to fit at least one packet. + # There are no packets in flight even though there is space for more + # than a full packet. This means the connection is just idling. + # Reset window by 1/3'rd but no more than to fit at least one packet. let oldMaxWindow = socket.maxWindow let newMaxWindow = max((oldMaxWindow * 2) div 3, currentPacketSize) @@ -617,14 +620,15 @@ proc checkTimeouts(socket: UtpSocket) = socket.maxWindow = currentPacketSize socket.slowStart = true - # This will have much more sense when we will add handling of selective acks - # as then every selectively acked packet resets timeout timer and removes packet - # from out buffer. + # Note: with selective acks enabled, every selectively acked packet resets + # the timeout timer and removes te packet from the outBuffer. markAllPacketAsLost(socket) let oldestPacketSeqNr = socket.seqNr - socket.curWindowPackets - # resend oldest packet if there are some packets in flight, and oldestpacket was already sent - if (socket.curWindowPackets > 0 and socket.outBuffer[oldestPacketSeqNr].transmissions > 0): + # resend the oldest packet if there are some packets in flight and the + # oldest packet was already sent + if (socket.curWindowPackets > 0 and + socket.outBuffer[oldestPacketSeqNr].transmissions > 0): inc socket.retransmitCount socket.fastTimeout = true @@ -633,11 +637,10 @@ proc checkTimeouts(socket: UtpSocket) = retransmitCount = socket.retransmitCount, curWindowPackets = socket.curWindowPackets - # Oldest packet should always be present, so it is safe to call force - # resend + # Oldest packet should always be present, so it is safe to force resend socket.sendPacket(oldestPacketSeqNr) - # TODO add sending keep alives when necessary + # TODO: add sending keep alives when necessary proc checkTimeoutsLoop(s: UtpSocket) {.async.} = ## Loop that check timeouts in the socket. @@ -646,8 +649,8 @@ proc checkTimeoutsLoop(s: UtpSocket) {.async.} = await sleepAsync(checkTimeoutsLoopInterval) s.eventQueue.putNoWait(SocketEvent(kind: CheckTimeouts)) except CancelledError as exc: - # check timeouts loop is last running future managed by socket, if its - # cancelled we can fire closeEvent + # checkTimeoutsLoop is the last running future managed by the socket, when + # it's cancelled the closeEvent can be fired. s.closeEvent.fire() trace "checkTimeoutsLoop canceled" raise exc @@ -721,12 +724,13 @@ proc isClosed*(socket: UtpSocket): bool = proc isClosedAndCleanedUpAllResources*(socket: UtpSocket): bool = ## Test Api to check that all resources are properly cleaned up - socket.isClosed() and socket.eventLoop.cancelled() and socket.checkTimeoutsLoop.cancelled() + socket.isClosed() and socket.eventLoop.cancelled() and + socket.checkTimeoutsLoop.cancelled() proc destroy*(s: UtpSocket) = debug "Destroying socket", to = s.socketKey ## Moves socket to destroy state and clean all resources. - ## Remote is not notified in any way about socket end of life + ## Remote is not notified in any way about socket end of life. s.state = Destroy s.eventLoop.cancel() # This procedure initiate cleanup process which goes like: @@ -737,9 +741,9 @@ proc destroy*(s: UtpSocket) = # future shows as cancelled, but handler for CancelledError is not run proc destroyWait*(s: UtpSocket) {.async.} = - ## Moves socket to destroy state and clean all reasources and wait for all registered - ## callback to fire - ## Remote is not notified in any way about socket end of life + ## Moves socket to destroy state and clean all resources and wait for all + ## registered callbacks to fire, + ## Remote is not notified in any way about socket end of life. s.destroy() await s.closeEvent.wait() await allFutures(s.closeCallbacks) @@ -802,9 +806,10 @@ proc ackPacket(socket: UtpSocket, seqNr: uint16, currentTime: Moment): AckResult pkTransmissions = packet.transmissions, pkNeedResend = packet.needResend - # from spec: The rtt and rtt_var is only updated for packets that were sent only once. - # This avoids problems with figuring out which packet was acked, the first or the second one. - # it is standard solution to retransmission ambiguity problem + # from spec: The rtt and rtt_var is only updated for packets that were sent + # only once. This avoids the problem of figuring out which packet was acked, + # the first or the second one. It is standard solution to the retransmission + # ambiguity problem. if packet.transmissions == 1: socket.updateTimeouts(packet.timeSent, currentTime) @@ -815,10 +820,11 @@ proc ackPacket(socket: UtpSocket, seqNr: uint16, currentTime: Moment): AckResult # been considered timed-out, and is not included in # the cur_window anymore if (not packet.needResend): - doAssert(socket.currentWindow >= packet.payloadLength, "Window should always be larger than packet length") + doAssert(socket.currentWindow >= packet.payloadLength, + "Window should always be larger than packet length") socket.currentWindow = socket.currentWindow - packet.payloadLength - # we removed packet from our out going buffer + # recalculate as packet was removed from the outgoing buffer socket.outBufferBytes = socket.outBufferBytes - packet.payloadLength socket.retransmitCount = 0 @@ -832,7 +838,8 @@ proc ackPackets(socket: UtpSocket, nrPacketsToAck: uint16, currentTime: Moment) ## Ack packets in outgoing buffer based on ack number in the received packet var i = 0 while i < int(nrPacketsToAck): - let result = socket.ackPacket(socket.seqNr - socket.curWindowPackets, currentTime) + let result = socket.ackPacket( + socket.seqNr - socket.curWindowPackets, currentTime) case result of PacketAcked: dec socket.curWindowPackets @@ -844,7 +851,9 @@ proc ackPackets(socket: UtpSocket, nrPacketsToAck: uint16, currentTime: Moment) inc i -proc calculateAckedbytes(socket: UtpSocket, nrPacketsToAck: uint16, now: Moment): (uint32, Duration) = +proc calculateAckedbytes( + socket: UtpSocket, nrPacketsToAck: uint16, now: Moment): + (uint32, Duration) = var i: uint16 = 0 var ackedBytes: uint32 = 0 var minRtt: Duration = InfiniteDuration @@ -891,9 +900,11 @@ proc isAckNrInvalid(socket: UtpSocket, packet: Packet): bool = ) # counts the number of bytes acked by selective ack header -proc calculateSelectiveAckBytes*(socket: UtpSocket, receivedPackedAckNr: uint16, ext: SelectiveAckExtension): uint32 = - # we add 2, as the first bit in the mask therefore represents ackNr + 2 because - # ackNr + 1 (i.e next expected packet) is considered lost. +proc calculateSelectiveAckBytes*( + socket: UtpSocket, receivedPackedAckNr: uint16, ext: SelectiveAckExtension): + uint32 = + # Add 2, as the first bit in the mask represents ackNr + 2 because ackNr + 1 + # (i.e next expected packet) is considered lost. let base = receivedPackedAckNr + 2 if socket.curWindowPackets == 0: @@ -925,12 +936,13 @@ proc calculateSelectiveAckBytes*(socket: UtpSocket, receivedPackedAckNr: uint16 return ackedBytes -# decays maxWindow size by half if time is right i.e it is at least 100m since last -# window decay +# decays maxWindow size by half if time is right i.e it is at least 100m since +# last window decay proc tryDecayWindow(socket: UtpSocket, now: Moment) = if (now - socket.lastWindowDecay >= maxWindowDecay): socket.lastWindowDecay = now - let newMaxWindow = max(uint32(0.5 * float64(socket.maxWindow)), uint32(minWindowSize)) + let newMaxWindow = + max(uint32(0.5 * float64(socket.maxWindow)), uint32(minWindowSize)) debug "Decaying maxWindow", oldWindow = socket.maxWindow, @@ -940,10 +952,13 @@ proc tryDecayWindow(socket: UtpSocket, now: Moment) = socket.slowStart = false socket.slowStartThreshold = newMaxWindow -# ack packets (removes them from out going buffer) based on selective ack extension header -proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: SelectiveAckExtension, currentTime: Moment): void = - # we add 2, as the first bit in the mask therefore represents ackNr + 2 because - # ackNr + 1 (i.e next expected packet) is considered lost. +# ack packets (removes them from out going buffer) based on selective ack +# extension header +proc selectiveAckPackets( + socket: UtpSocket, receivedPackedAckNr: uint16, ext: SelectiveAckExtension, + currentTime: Moment): void = + # Add 2, as the first bit in the mask represents ackNr + 2 because ackNr + 1 + # (i.e next expected packet) is considered lost. let base = receivedPackedAckNr + 2 if socket.curWindowPackets == 0: @@ -951,9 +966,10 @@ proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: S var bits = (len(ext.acks)) * 8 - 1 - # number of packets acked by this selective acks, it also works as duplicate ack - # counter. - # from spec: Each packet that is acked in the selective ack message counts as one duplicate ack + # number of packets acked by this selective ack, it also works as duplicate + # ack counter. + # from spec: Each packet that is acked in the selective ack message counts as + # one duplicate ack var counter = 0 # sequence numbers of packets which should be resend @@ -986,7 +1002,8 @@ proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: S dec bits continue - if counter >= duplicateAcksBeforeResend and (v - socket.fastResendSeqNr) <= reorderBufferMaxSize: + if counter >= duplicateAcksBeforeResend and + (v - socket.fastResendSeqNr) <= reorderBufferMaxSize: debug "No ack for packet", pkAckNr = v, dupAckCounter = counter, @@ -995,15 +1012,16 @@ proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: S dec bits - let nextExpectedPacketSeqNr = base - 1'u16 - # if we are about to start to resending first packet should be the first unacked packet + # When resending packets, the first packet should be the first unacked packet, # ie. base - 1 - if counter >= duplicateAcksBeforeResend and (nextExpectedPacketSeqNr - socket.fastResendSeqNr) <= reorderBufferMaxSize: - debug "No ack for packet", - pkAckNr = nextExpectedPacketSeqNr, - dupAckCounter = counter, - fastResSeqNr = socket.fastResendSeqNr - resends.add(nextExpectedPacketSeqNr) + let nextExpectedPacketSeqNr = base - 1'u16 + if counter >= duplicateAcksBeforeResend and + (nextExpectedPacketSeqNr - socket.fastResendSeqNr) <= reorderBufferMaxSize: + debug "No ack for packet", + pkAckNr = nextExpectedPacketSeqNr, + dupAckCounter = counter, + fastResSeqNr = socket.fastResendSeqNr + resends.add(nextExpectedPacketSeqNr) var i = high(resends) var registerLoss: bool = false @@ -1043,12 +1061,14 @@ proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: S socket.duplicateAck = uint16(counter) # Public mainly for test purposes -# generates bit mask which indicates which packets are already in socket -# reorder buffer -# from speck: -# The bitmask has reverse byte order. The first byte represents packets [ack_nr + 2, ack_nr + 2 + 7] in reverse order -# The least significant bit in the byte represents ack_nr + 2, the most significant bit in the byte represents ack_nr + 2 + 7 -# The next byte in the mask represents [ack_nr + 2 + 8, ack_nr + 2 + 15] in reverse order, and so on +# Generates bit mask which indicates which packets are already in socket +# reorder buffer. +# From spec: +# The bitmask has reverse byte order. The first byte represents packets +# [ack_nr + 2, ack_nr + 2 + 7] in reverse order. +# The least significant bit in the byte represents ack_nr + 2, the most +# significant bit in the byte represents ack_nr + 2 + 7. The next byte in the +# mask represents [ack_nr + 2 + 8, ack_nr + 2 + 15] in reverse order, and so on. proc generateSelectiveAckBitMask*(socket: UtpSocket): array[4, byte] = let window = min(32, socket.inBuffer.len()) var arr: array[4, uint8] = [0'u8, 0, 0, 0] @@ -1181,10 +1201,11 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = duplicateAckCounter = socket.duplicateAck else: socket.duplicateAck = 0 - # spec says that in case of duplicate ack counter larger that duplicateAcksBeforeResend - # we should re-send oldest packet, on the other hand reference implementation - # has code path which does it commented out with todo. Currently to be as close - # to reference impl we do not resend packets in that case + # Spec states that in case of a duplicate ack counter larger than + # `duplicateAcksBeforeResend` the oldest packet should be resend. However, the + # reference implementation has the code path which does this commented out + # with a todo. Currently the reference implementation is follow and packets + # are not resend in this case. debug "Packet state variables", pastExpected = pastExpected, @@ -1192,15 +1213,16 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = # If packet is totally off the mark, short-circuit the processing if pastExpected >= reorderBufferMaxSize: - - # if `pastExpected` is really big number (for example: uint16.high) then most - # probably we are receiving packet which we already received - # example: we already received packet with `seqNr = 10` so our `socket.ackNr = 10` - # if we receive this packet once again then `pastExpected = 10 - 10 - 1` which - # equals (due to wrapping) 65535 - # this means that remote most probably did not receive our ack, so we need to resend - # it. We are doing it for last `reorderBufferMaxSize` packets - let isPossibleDuplicatedOldPacket = pastExpected >= (int(uint16.high) + 1) - reorderBufferMaxSize + # if `pastExpected` is a really big number (for example: uint16.high) then + # most probably we are receiving packet which we already received. + # example: socket already received packet with `seqNr = 10` so the + # `socket.ackNr = 10`. + # Then when this packet is received once again then + # `pastExpected = 10 - 10 - 1` which equals (due to wrapping) 65535. + # This means that remote most probably did not receive our ack, so we need + # to resend it. We are doing it for last `reorderBufferMaxSize` packets. + let isPossibleDuplicatedOldPacket = + pastExpected >= (int(uint16.high) + 1) - reorderBufferMaxSize if (isPossibleDuplicatedOldPacket and p.header.pType != ST_STATE): socket.sendAck() @@ -1209,13 +1231,15 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = pastExpected = pastExpected return - var (ackedBytes, minRtt) = socket.calculateAckedbytes(acks, timestampInfo.moment) + var (ackedBytes, minRtt) = + socket.calculateAckedbytes(acks, timestampInfo.moment) debug "Bytes acked by classic ack", bytesAcked = ackedBytes if (p.eack.isSome()): - let selectiveAckedBytes = socket.calculateSelectiveAckBytes(pkAckNr, p.eack.unsafeGet()) + let selectiveAckedBytes = + socket.calculateSelectiveAckBytes(pkAckNr, p.eack.unsafeGet()) debug "Bytes acked by selective ack", bytesAcked = selectiveAckedBytes ackedBytes = ackedBytes + selectiveAckedBytes @@ -1225,8 +1249,8 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = # we are using uint32 not a Duration, to wrap a round in case of # sentTimeRemote > receipTimestamp. This can happen as local and remote # clock can be not synchronized or even using different system clock. - # i.e this number itself does not tell anything and is only used to feedback it - # to remote peer with each sent packet + # i.e this number itself does not tell anything and is only used to feedback + # it to remote peer with each sent packet let remoteDelay = if (sentTimeRemote == 0): 0'u32 @@ -1246,7 +1270,8 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = if (prevRemoteDelayBase != 0 and wrapCompareLess(socket.remoteHistogram.delayBase, prevRemoteDelayBase) and prevRemoteDelayBase - socket.remoteHistogram.delayBase <= 10000'u32): - socket.ourHistogram.shift(prevRemoteDelayBase - socket.remoteHistogram.delayBase) + socket.ourHistogram.shift( + prevRemoteDelayBase - socket.remoteHistogram.delayBase) let actualDelay = p.header.timestampDiff @@ -1286,10 +1311,12 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = slowStartThreshold = newSlowStartThreshold, slowstart = newSlowStart - if (socket.zeroWindowTimer.isNone() and socket.maxRemoteWindow <= currentPacketSize): - # when zeroWindowTimer will be hit and maxRemoteWindow still will be equal to 0 - # then it will be reset to minimal value - socket.zeroWindowTimer = some(timestampInfo.moment + socket.socketConfig.remoteWindowResetTimeout) + if (socket.zeroWindowTimer.isNone() and + socket.maxRemoteWindow <= currentPacketSize): + # when zeroWindowTimer is hit and maxRemoteWindow still is equal + # to 0 then it will be reset to the minimal value + socket.zeroWindowTimer = + some(timestampInfo.moment + socket.socketConfig.remoteWindowResetTimeout) debug "Remote window size dropped below packet size", currentTime = timestampInfo.moment, @@ -1298,15 +1325,16 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = socket.tryfinalizeConnection(p) - # socket.curWindowPackets == acks means that this packet acked all remaining packets - # including the sent fin packets + # socket.curWindowPackets == acks means that this packet acked all remaining + # packets including the sent FIN packets if (socket.finSent and socket.curWindowPackets == acks): debug "FIN acked, destroying socket" socket.finAcked = true - # this bit of utp spec is a bit under specified (i.e there is not specification at all) - # reference implementation moves socket to destroy state in case that our fin was acked - # and socket is considered closed for reading and writing. - # but in theory remote could stil write some data on this socket (or even its own fin) + # this part of the uTP spec is a bit under specified, i.e there is no + # specification at all. The reference implementation moves socket to destroy + # state in case that our FIN was acked and socket is considered closed for + # reading and writing. But in theory, the remote could still write some data + # on this socket (or even its own FIN). socket.destroy() # Update fast resend counter to avoid resending old packet twice @@ -1336,11 +1364,12 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = socket.ackPackets(acks, timestampInfo.moment) - # packets in front may have been acked by selective ack, decrease window until we hit - # a packet that is still waiting to be acked - while (socket.curWindowPackets > 0 and socket.outBuffer.get(socket.seqNr - socket.curWindowPackets).isNone()): + # packets in front may have been acked by selective ack, decrease window until + # we hit a packet that is still waiting to be acked. + while (socket.curWindowPackets > 0 and + socket.outBuffer.get(socket.seqNr - socket.curWindowPackets).isNone()): dec socket.curWindowPackets - debug "Packet in front hase been acked by selective ack. Decrese window", + debug "Packet in front has been acked by selective ack. Decrease window", windowPackets = socket.curWindowPackets # fast timeout @@ -1354,19 +1383,20 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = if oldestOutstandingPktSeqNr != socket.fastResendSeqNr: - # fastResendSeqNr do not point to oldest unacked packet, we probably already resent - # packet that timed-out. Leave fast timeout mode + # fastResendSeqNr does not point to oldest unacked packet, we probably + # already resent the packet that timed-out. Leave on fast timeout mode. socket.fastTimeout = false else: - let shouldReSendPacket = socket.outBuffer.exists(oldestOutstandingPktSeqNr, (p: OutgoingPacket) => p.transmissions > 0) + let shouldReSendPacket = socket.outBuffer.exists( + oldestOutstandingPktSeqNr, (p: OutgoingPacket) => p.transmissions > 0) if shouldReSendPacket: debug "Packet fast timeout resend", pkSeqNr = oldestOutstandingPktSeqNr inc socket.fastResendSeqNr - # Is is safe to call force resend as we already checked shouldReSendPacket - # condition + # It is safe to call force resend as we already checked + # `shouldReSendPacket` condition. socket.sendPacket(oldestOutstandingPktSeqNr) if (p.eack.isSome()): @@ -1387,12 +1417,14 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = let payloadLength = len(p.payload) if (payloadLength > 0 and (not socket.readShutdown)): # we need to sum both rcv buffer and reorder buffer - if (uint32(socket.offset) + socket.inBufferBytes + uint32(payloadLength) > socket.socketConfig.optRcvBuffer): + let totalBufferSize = + uint32(socket.offset) + socket.inBufferBytes + uint32(payloadLength) + if (totalBufferSize > socket.socketConfig.optRcvBuffer): # even though packet is in order and passes all the checks, it would # overflow our receive buffer, it means that we are receiving data - # faster than we are reading it. Do not ack this packet, and drop received - # data - debug "Recevied packet would overflow receive buffer dropping it", + # faster than we are reading it. Do not ack this packet, and drop + # received data. + debug "Received packet would overflow receive buffer, dropping it", pkSeqNr = p.header.seqNr, bytesReceived = payloadLength, rcvbufferSize = socket.offset, @@ -1401,34 +1433,38 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = debug "Received data packet", bytesReceived = payloadLength - # we are getting in order data packet, we can flush data directly to the incoming buffer + # we are getting in order data packet, we can flush data directly to the + # incoming buffer. # await upload(addr socket.buffer, unsafeAddr p.payload[0], p.payload.len()) - moveMem(addr socket.rcvBuffer[socket.offset], unsafeAddr p.payload[0], payloadLength) + moveMem( + addr socket.rcvBuffer[socket.offset], + unsafeAddr p.payload[0], payloadLength) socket.offset = socket.offset + payloadLength # Bytes have been passed to upper layer, we can increase number of last # acked packet inc socket.ackNr - # check if the following packets are in reorder buffer - + # check if the following packets are in re-order buffer debug "Looking for packets in re-order buffer", reorderCount = socket.reorderCount while true: - # We are doing this in reorder loop, to handle the case when we already received - # fin but there were some gaps before eof - # we have reached remote eof, and should not receive more packets from remote - if ((not socket.reachedFin) and socket.gotFin and socket.eofPktNr == socket.ackNr): + # We are doing this in reorder loop, to handle the case when we already + # received FIN but there were some gaps before eof. + # we have reached remote eof and should not receive more packets from + # remote. + if ((not socket.reachedFin) and socket.gotFin and + socket.eofPktNr == socket.ackNr): debug "Reached socket EOF" - # In case of reaching eof, it is up to user of library what to to with - # it. With the current implementation, the most appropriate way would be to - # destroy it (as with our implementation we know that remote is destroying its acked fin) - # as any other send will either generate timeout, or socket will be forcefully - # closed by reset + # In case of reaching eof, it is up to user of library what to do with + # it. With the current implementation, the most appropriate way would + # be to destroy it (as with our implementation we know that remote is + # destroying its acked fin) as any other send will either generate + # timeout, or socket will be forcefully closed by reset socket.reachedFin = true - # this is not necessarily true, but as we have already reached eof we can - # ignore following packets + # this is not necessarily true, but as we have already reached eof we + # can ignore following packets socket.reorderCount = 0 if socket.reorderCount == 0: @@ -1454,9 +1490,12 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = rcvbufferSize = socket.offset, reorderBufferSize = socket.inBufferBytes - # Rcv buffer and reorder buffer are sized that it is always possible to - # move data from reorder buffer to rcv buffer without overflow - moveMem(addr socket.rcvBuffer[socket.offset], unsafeAddr packet.payload[0], reorderPacketPayloadLength) + # Rcv buffer and reorder buffer are sized such that it is always + # possible to move data from reorder buffer to rcv buffer without + # overflow. + moveMem( + addr socket.rcvBuffer[socket.offset], + unsafeAddr packet.payload[0], reorderPacketPayloadLength) socket.offset = socket.offset + reorderPacketPayloadLength debug "Deleting packet", @@ -1465,7 +1504,8 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = socket.inBuffer.delete(nextPacketNum) inc socket.ackNr dec socket.reorderCount - socket.inBufferBytes = socket.inBufferBytes - uint32(reorderPacketPayloadLength) + socket.inBufferBytes = + socket.inBufferBytes - uint32(reorderPacketPayloadLength) debug "Socket state after processing in order packet", socketKey = socket.socketKey, @@ -1498,9 +1538,13 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = debug "Packet with seqNr already received", seqNr = pkSeqNr else: - let payloadLength = uint32(len(p.payload)) - if (socket.inBufferBytes + payloadLength <= socket.socketConfig.maxSizeOfReorderBuffer and - socket.inBufferBytes + uint32(socket.offset) + payloadLength <= socket.socketConfig.optRcvBuffer): + let + payloadLength = uint32(len(p.payload)) + totalReorderSize = socket.inBufferBytes + payloadLength + totalBufferSize = + socket.inBufferBytes + uint32(socket.offset) + payloadLength + if (totalReorderSize <= socket.socketConfig.maxSizeOfReorderBuffer and + totalBufferSize <= socket.socketConfig.optRcvBuffer): debug "store packet in reorder buffer", packetBytes = payloadLength, @@ -1516,8 +1560,8 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = socket.inBufferBytes = socket.inBufferBytes + payloadLength debug "added out of order packet to reorder buffer", reorderCount = socket.reorderCount - # we send ack packet, as we reorder count is > 0, so the eack bitmask will be - # generated + # we send ack packet, as we reorder count is > 0, so the eack bitmask + # will be generated socket.sendAck() proc processPacket*(socket: UtpSocket, p: Packet): Future[void] = @@ -1542,8 +1586,8 @@ proc onRead(socket: UtpSocket, readReq: var ReadReq): ReadResult = return ReadCancelled if socket.atEof(): - # buffer is already empty and we reached remote fin, just finish read with whatever - # was already read + # buffer is already empty and we reached remote fin, just finish read with + # whatever was already read readReq.reader.complete(readReq.bytesAvailable) return SocketAlreadyFinished @@ -1768,7 +1812,8 @@ proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] = let retFuture = newFuture[WriteResult]("UtpSocket.write") if (socket.state != Connected): - let res = WriteResult.err(WriteError(kind: SocketNotWriteable, currentState: socket.state)) + let res = WriteResult.err( + WriteError(kind: SocketNotWriteable, currentState: socket.state)) retFuture.complete(res) return retFuture @@ -1845,8 +1890,8 @@ proc read*(socket: UtpSocket): Future[seq[byte]] = return fut -# Check how many packets are still in the out going buffer, usefully for tests or -# debugging. +# Check how many packets are still in the out going buffer, usefully for tests +# or debugging. proc numPacketsInOutGoingBuffer*(socket: UtpSocket): int = var num = 0 for e in socket.outBuffer.items(): @@ -1858,11 +1903,12 @@ proc numPacketsInOutGoingBuffer*(socket: UtpSocket): int = proc numOfBytesInFlight*(socket: UtpSocket): uint32 = socket.currentWindow # Check how many bytes are in incoming buffer -proc numOfBytesInIncomingBuffer*(socket: UtpSocket): uint32 = uint32(socket.offset) +proc numOfBytesInIncomingBuffer*(socket: UtpSocket): uint32 = + uint32(socket.offset) # Check how many packets are still in the reorder buffer, useful for tests or -# debugging. -# It throws assertion error when number of elements in buffer do not equal kept counter +# debugging. It throws assertion error when number of elements in buffer do not +# equal kept counter. proc numPacketsInReorderedBuffer*(socket: UtpSocket): int = var num = 0 for e in socket.inBuffer.items(): @@ -1874,9 +1920,9 @@ proc numPacketsInReorderedBuffer*(socket: UtpSocket): int = proc numOfEventsInEventQueue*(socket: UtpSocket): int = len(socket.eventQueue) proc connectionId*[A](socket: UtpSocket[A]): uint16 = - ## Connection id is id which is used in first SYN packet which establishes the connection - ## so for Outgoing side it is actually its rcv_id, and for Incoming side it is - ## its snd_id + ## Connection id is the id which is used in first SYN packet which establishes + ## the connection, so for `Outgoing` side it is actually its rcv_id, and for + ## `Incoming` side it is its snd_id. case socket.direction of Incoming: socket.connectionIdSnd @@ -1902,11 +1948,11 @@ proc new[A]( ): T = let currentTime = getMonoTimestamp().moment - # Initial max window size. Reference implementation uses value which enables one packet - # to be transferred. - # We use value two times higher as we do not yet have proper mtu estimation, and - # our impl should work over udp and discovery v5 (where proper estimation may be harder - # as packets already have discoveryv5 envelope) + # Initial max window size. Reference implementation uses value which allows + # one packet to be transferred. + # We use a value two times higher as we do not yet have proper mtu estimation, + # and our impl. should work over UDP and discovery v5 (where proper estimation + # may be harder as packets already have discovery v5 envelope). let initMaxWindow = 2 * cfg.payloadSize T( remoteAddress: to, @@ -1987,10 +2033,10 @@ proc newIncomingSocket*[A]( let (initialState, initialTimeout) = if (cfg.incomingSocketReceiveTimeout.isNone()): # it does not matter what timeout value we put here, as socket will be in - # connected state without outgoing packets in buffer so any timeout hit will - # just double rto without any penalties - # although we cannot use 0, as then timeout will be constantly re-set to 500ms - # and there will be a lot of not useful work done + # connected state without outgoing packets in buffer so any timeout hit + # will just double rto without any penalties + # although we cannot use 0, as then timeout will be constantly re-set to + # 500ms and there will be a lot of not useful work done (Connected, defaultInitialSynTimeout) else: let timeout = cfg.incomingSocketReceiveTimeout.unsafeGet() @@ -2020,7 +2066,8 @@ proc startIncomingSocket*(socket: UtpSocket) = proc startOutgoingSocket*(socket: UtpSocket): Future[void] = doAssert(socket.state == SynSent) - let packet = synPacket(socket.seqNr, socket.connectionIdRcv, socket.getRcvWindowSize()) + let packet = + synPacket(socket.seqNr, socket.connectionIdRcv, socket.getRcvWindowSize()) debug "Sending SYN packet", seqNr = packet.header.seqNr, connectionId = packet.header.connectionId diff --git a/tests/utp/test_utp_socket.nim b/tests/utp/test_utp_socket.nim index 9cff6f3..d893e96 100644 --- a/tests/utp/test_utp_socket.nim +++ b/tests/utp/test_utp_socket.nim @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2022 Status Research & Development GmbH +# Copyright (c) 2020-2023 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -17,48 +17,49 @@ import ../../eth/keys, ../stubloglevel -procSuite "Utp socket unit test": - let rng = newRng() - let testAddress = initTAddress("127.0.0.1", 9079) - let testBufferSize = 1024'u32 - let defaultRcvOutgoingId = 314'u16 +procSuite "uTP socket tests": + let + rng = newRng() + testAddress = initTAddress("127.0.0.1", 9079) + testBufferSize = 1024'u32 + defaultRcvOutgoingId = 314'u16 proc packetsToBytes(packets: seq[Packet]): seq[byte] = - var resultBytes = newSeq[byte]() + var bytes = newSeq[byte]() for p in packets: - resultBytes.add(p.payload) - return resultBytes + bytes.add(p.payload) + return bytes - asyncTest "Starting outgoing socket should send Syn packet": + asyncTest "Outgoing socket must send SYN packet": let q = newAsyncQueue[Packet]() let defaultConfig = SocketConfig.init() - let sock1 = newOutgoingSocket[TransportAddress]( + let socket = newOutgoingSocket[TransportAddress]( testAddress, initTestSnd(q), defaultConfig, defaultRcvOutgoingId, rng[] ) - let fut1 = sock1.startOutgoingSocket() + let fut = socket.startOutgoingSocket() let initialPacket = await q.get() check: initialPacket.header.pType == ST_SYN initialPacket.header.wndSize == defaultConfig.optRcvBuffer - await sock1.destroyWait() - fut1.cancel() + await socket.destroyWait() + fut.cancel() - asyncTest "Outgoing socket should re-send syn packet 2 times before declaring failure": + asyncTest "Outgoing socket should re-send SYN packet 2 times before declaring failure": let q = newAsyncQueue[Packet]() - let sock1 = newOutgoingSocket[TransportAddress]( + let socket = newOutgoingSocket[TransportAddress]( testAddress, initTestSnd(q), SocketConfig.init(milliseconds(100)), defaultRcvOutgoingId, rng[] ) - let fut1 = sock1.startOutgoingSocket() + let fut1 = socket.startOutgoingSocket() let initialPacket = await q.get() check: @@ -75,24 +76,24 @@ procSuite "Utp socket unit test": resentSynPacket1.header.pType == ST_SYN # next timeout will should disconnect socket - await waitUntil(proc (): bool = sock1.isConnected() == false) + await waitUntil(proc (): bool = socket.isConnected() == false) check: - not sock1.isConnected() + not socket.isConnected() - await sock1.destroyWait() + await socket.destroyWait() fut1.cancel() asyncTest "Processing in order ack should make socket connected": let q = newAsyncQueue[Packet]() let initialRemoteSeq = 10'u16 - let (sock1, packet) = connectOutGoingSocket(initialRemoteSeq, q) + let (socket, packet) = connectOutGoingSocket(initialRemoteSeq, q) check: - sock1.isConnected() + socket.isConnected() - await sock1.destroyWait() + await socket.destroyWait() asyncTest "Processing in order data packet should upload it to buffer and ack packet": let q = newAsyncQueue[Packet]() @@ -1418,9 +1419,9 @@ procSuite "Utp socket unit test": let dataDropped = @[1'u8] let dataReceived = @[2'u8] - let sock1 = newOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), cfg, defaultRcvOutgoingId, rng[]) + let socket = newOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), cfg, defaultRcvOutgoingId, rng[]) - asyncSpawn sock1.startOutgoingSocket() + asyncSpawn socket.startOutgoingSocket() let initialPacket = await q.get() @@ -1456,16 +1457,16 @@ procSuite "Utp socket unit test": # even though @[1'u8] is received first, it should be dropped as socket is not # yet in connected state - await sock1.processPacket(dpDropped) - await sock1.processPacket(responseAck) - await sock1.processPacket(dpReceived) + await socket.processPacket(dpDropped) + await socket.processPacket(responseAck) + await socket.processPacket(dpReceived) - let receivedData = await sock1.read(1) + let receivedData = await socket.read(1) check: receivedData == dataReceived - await sock1.destroyWait() + await socket.destroyWait() asyncTest "Clean up all resources when closing due to timeout failure": let q = newAsyncQueue[Packet]()