Style fixes and comment improvements on uTP code (#623)

This commit is contained in:
Kim De Mey 2023-06-22 13:31:30 +02:00 committed by GitHub
parent 6dacb2ca5c
commit d74dc40bee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 259 additions and 204 deletions

View File

@ -31,7 +31,8 @@ proc init*(T: type NodeAddress, nodeId: NodeId, address: Address): NodeAddress =
NodeAddress(nodeId: nodeId, address: address)
proc init*(T: type NodeAddress, node: Node): Option[NodeAddress] =
node.address.map((address: Address) => NodeAddress(nodeId: node.id, address: address))
node.address.map((address: Address) =>
NodeAddress(nodeId: node.id, address: address))
proc hash(x: NodeAddress): Hash =
var h = 0
@ -51,25 +52,31 @@ func `$`*(x: UtpSocketKey[NodeAddress]): string =
", rcvId: " & $x.rcvId &
")"
proc talkReqDirect(p: protocol.Protocol, n: NodeAddress, protocol, request: seq[byte]): Future[void] =
proc talkReqDirect(
p: protocol.Protocol, n: NodeAddress, protocol, request: seq[byte]):
Future[void] =
let
reqId = RequestId.init(p.rng[])
message = encodeMessage(TalkReqMessage(protocol: protocol, request: request), reqId)
message = encodeMessage(
TalkReqMessage(protocol: protocol, request: request), reqId)
(data, nonce) = encodeMessagePacket(
p.rng[], p.codec, n.nodeId, n.address, message)
(data, nonce) = encodeMessagePacket(p.rng[], p.codec, n.nodeId, n.address, message)
trace "Send message packet", dstId = n.nodeId, address = n.address, kind = MessageKind.talkreq
trace "Send message packet",
dstId = n.nodeId, address = n.address, kind = MessageKind.talkreq
p.send(n.address, data)
proc initSendCallback(
t: protocol.Protocol, subProtocolName: seq[byte]): SendCallback[NodeAddress] =
t: protocol.Protocol, subProtocolName: seq[byte]):
SendCallback[NodeAddress] =
return (
proc (to: NodeAddress, data: seq[byte]): Future[void] =
let fut = newFuture[void]()
# hidden assumption here is that nodes already have established discv5 session
# between each other. In our use case this should be true as opening stream
# is only done after successful OFFER/ACCEPT or FINDCONTENT/CONTENT exchange
# which forces nodes to establish session between each other.
# hidden assumption here is that nodes already have established discv5
# session between each other. In our use case this should be true as
# opening stream is only done after successful OFFER/ACCEPT or
# FINDCONTENT/CONTENT exchange which forces nodes to establish session
# between each other.
discard t.talkReqDirect(to, subProtocolName, data)
fut.complete()
return fut

View File

@ -249,7 +249,7 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}=
else:
# TODO: add keeping track of recently send reset packets and do not send
# reset to peers which we recently send reset to.
debug "Received FIN/DATA/ACK on not known socket sending reset"
debug "Received FIN/DATA/ACK on unknown socket, sending reset"
let rstPacket = resetPacket(
randUint16(r.rng[]), p.header.connectionId, p.header.seqNr)
await r.sendCb(sender, encodePacket(rstPacket))

View File

@ -8,7 +8,7 @@
import
std/[sugar, deques],
chronos, chronicles,
chronos, chronicles, metrics,
stew/[results, bitops2],
./growable_buffer,
./packets,
@ -203,7 +203,7 @@ type
# peer
rto: Duration
# RTO timeout will happen when currenTime > rtoTimeout
# RTO timeout happens when currentTime > rtoTimeout
rtoTimeout: Moment
# rcvBuffer
@ -215,7 +215,7 @@ type
# readers waiting for data
pendingReads: Deque[ReadReq]
# loop called every 500ms to check for on going timeout status
# loop called every 500ms to check for timeouts
checkTimeoutsLoop: Future[void]
# number on consecutive re-transmissions
@ -257,15 +257,15 @@ type
# timer which is started when peer max window drops below current packet size
zeroWindowTimer: Option[Moment]
# last measured delay between current local timestamp, and remote sent
# timestamp. In microseconds
# last measured delay between current local timestamp and remote sent
# timestamp, in microseconds.
replayMicro: uint32
# indicator if we're in slow-start (exponential growth) phase
# indicator if socket is in in slow-start (exponential growth) phase
slowStart: bool
# indicator if we're in fast time out mode i.e we will resend
# oldest packet un-acked in case of newer packet arriving
# indicator if socket is in fast time-out mode, i.e will resend oldest
# not ACK'ed packet when newer packet is received.
fastTimeout: bool
# Sequence number of the next packet we are allowed to fast-resend. This is
@ -295,8 +295,8 @@ type
send: SendCallback[A]
# User driven call back to be called whenever socket is permanently closed i.e
# reaches destroy state
# User driven callback to be called whenever socket is permanently closed,
# i.e reaches the destroy state
SocketCloseCallback* = proc (): void {.gcsafe, raises: [].}
ConnectionError* = object of CatchableError
@ -543,13 +543,14 @@ proc checkTimeouts(socket: UtpSocket) =
if socket.isOpened():
let currentPacketSize = socket.getPacketSize()
if (socket.zeroWindowTimer.isSome() and currentTime > socket.zeroWindowTimer.unsafeGet()):
if (socket.zeroWindowTimer.isSome() and
currentTime > socket.zeroWindowTimer.unsafeGet()):
if socket.maxRemoteWindow <= currentPacketSize:
# Reset remote window, to minimal value which will fit at least two packet
let minimalRemoteWindow = 2 * socket.socketConfig.payloadSize
socket.maxRemoteWindow = minimalRemoteWindow
debug "Reset remote window to minimal value",
minRemote = minimalRemoteWindow
# Reset maxRemoteWindow to minimal value which will fit at least two
# packets
let remoteWindow = 2 * socket.socketConfig.payloadSize
socket.maxRemoteWindow = remoteWindow
debug "Reset remote window to minimal value", remoteWindow
socket.zeroWindowTimer = none[Moment]()
if (currentTime > socket.rtoTimeout):
@ -560,12 +561,14 @@ proc checkTimeouts(socket: UtpSocket) =
curWindowPackets = socket.curWindowPackets,
curWindowBytes = socket.currentWindow
# TODO add handling of probe time outs. Reference implementation has mechanism
# of sending probes to determine mtu size. Probe timeouts do not count to standard
# timeouts calculations
# TODO:
# Add handling of probing on timeouts. The reference implementation has
# a mechanism of sending probes to determine MTU size. Probe timeouts are
# not taking into account for the timeout calculation.
# client initiated connections, but did not send following data packet in rto
# time and our socket is configured to start in SynRecv state.
# For client initiated connections: SYN received but did not receive
# following data packet in rto time and the socket is configured to start
# in SynRecv state (to avoid amplifcation by IP spoofing).
if (socket.state == SynRecv):
socket.destroy()
return
@ -589,13 +592,13 @@ proc checkTimeouts(socket: UtpSocket) =
socket.retransmitTimeout = newTimeout
socket.rtoTimeout = currentTime + newTimeout
# on timeout reset duplicate ack counter
# on timeout, reset the duplicate ack counter
socket.duplicateAck = 0
if (socket.curWindowPackets == 0 and socket.maxWindow > currentPacketSize):
# there are no packets in flight even though there is place for more than whole packet
# this means connection is just idling. Reset window by 1/3'rd but no more
# than to fit at least one packet.
# There are no packets in flight even though there is space for more
# than a full packet. This means the connection is just idling.
# Reset window by 1/3'rd but no more than to fit at least one packet.
let oldMaxWindow = socket.maxWindow
let newMaxWindow = max((oldMaxWindow * 2) div 3, currentPacketSize)
@ -617,14 +620,15 @@ proc checkTimeouts(socket: UtpSocket) =
socket.maxWindow = currentPacketSize
socket.slowStart = true
# This will have much more sense when we will add handling of selective acks
# as then every selectively acked packet resets timeout timer and removes packet
# from out buffer.
# Note: with selective acks enabled, every selectively acked packet resets
# the timeout timer and removes te packet from the outBuffer.
markAllPacketAsLost(socket)
let oldestPacketSeqNr = socket.seqNr - socket.curWindowPackets
# resend oldest packet if there are some packets in flight, and oldestpacket was already sent
if (socket.curWindowPackets > 0 and socket.outBuffer[oldestPacketSeqNr].transmissions > 0):
# resend the oldest packet if there are some packets in flight and the
# oldest packet was already sent
if (socket.curWindowPackets > 0 and
socket.outBuffer[oldestPacketSeqNr].transmissions > 0):
inc socket.retransmitCount
socket.fastTimeout = true
@ -633,11 +637,10 @@ proc checkTimeouts(socket: UtpSocket) =
retransmitCount = socket.retransmitCount,
curWindowPackets = socket.curWindowPackets
# Oldest packet should always be present, so it is safe to call force
# resend
# Oldest packet should always be present, so it is safe to force resend
socket.sendPacket(oldestPacketSeqNr)
# TODO add sending keep alives when necessary
# TODO: add sending keep alives when necessary
proc checkTimeoutsLoop(s: UtpSocket) {.async.} =
## Loop that check timeouts in the socket.
@ -646,8 +649,8 @@ proc checkTimeoutsLoop(s: UtpSocket) {.async.} =
await sleepAsync(checkTimeoutsLoopInterval)
s.eventQueue.putNoWait(SocketEvent(kind: CheckTimeouts))
except CancelledError as exc:
# check timeouts loop is last running future managed by socket, if its
# cancelled we can fire closeEvent
# checkTimeoutsLoop is the last running future managed by the socket, when
# it's cancelled the closeEvent can be fired.
s.closeEvent.fire()
trace "checkTimeoutsLoop canceled"
raise exc
@ -721,12 +724,13 @@ proc isClosed*(socket: UtpSocket): bool =
proc isClosedAndCleanedUpAllResources*(socket: UtpSocket): bool =
## Test Api to check that all resources are properly cleaned up
socket.isClosed() and socket.eventLoop.cancelled() and socket.checkTimeoutsLoop.cancelled()
socket.isClosed() and socket.eventLoop.cancelled() and
socket.checkTimeoutsLoop.cancelled()
proc destroy*(s: UtpSocket) =
debug "Destroying socket", to = s.socketKey
## Moves socket to destroy state and clean all resources.
## Remote is not notified in any way about socket end of life
## Remote is not notified in any way about socket end of life.
s.state = Destroy
s.eventLoop.cancel()
# This procedure initiate cleanup process which goes like:
@ -737,9 +741,9 @@ proc destroy*(s: UtpSocket) =
# future shows as cancelled, but handler for CancelledError is not run
proc destroyWait*(s: UtpSocket) {.async.} =
## Moves socket to destroy state and clean all reasources and wait for all registered
## callback to fire
## Remote is not notified in any way about socket end of life
## Moves socket to destroy state and clean all resources and wait for all
## registered callbacks to fire,
## Remote is not notified in any way about socket end of life.
s.destroy()
await s.closeEvent.wait()
await allFutures(s.closeCallbacks)
@ -802,9 +806,10 @@ proc ackPacket(socket: UtpSocket, seqNr: uint16, currentTime: Moment): AckResult
pkTransmissions = packet.transmissions,
pkNeedResend = packet.needResend
# from spec: The rtt and rtt_var is only updated for packets that were sent only once.
# This avoids problems with figuring out which packet was acked, the first or the second one.
# it is standard solution to retransmission ambiguity problem
# from spec: The rtt and rtt_var is only updated for packets that were sent
# only once. This avoids the problem of figuring out which packet was acked,
# the first or the second one. It is standard solution to the retransmission
# ambiguity problem.
if packet.transmissions == 1:
socket.updateTimeouts(packet.timeSent, currentTime)
@ -815,10 +820,11 @@ proc ackPacket(socket: UtpSocket, seqNr: uint16, currentTime: Moment): AckResult
# been considered timed-out, and is not included in
# the cur_window anymore
if (not packet.needResend):
doAssert(socket.currentWindow >= packet.payloadLength, "Window should always be larger than packet length")
doAssert(socket.currentWindow >= packet.payloadLength,
"Window should always be larger than packet length")
socket.currentWindow = socket.currentWindow - packet.payloadLength
# we removed packet from our out going buffer
# recalculate as packet was removed from the outgoing buffer
socket.outBufferBytes = socket.outBufferBytes - packet.payloadLength
socket.retransmitCount = 0
@ -832,7 +838,8 @@ proc ackPackets(socket: UtpSocket, nrPacketsToAck: uint16, currentTime: Moment)
## Ack packets in outgoing buffer based on ack number in the received packet
var i = 0
while i < int(nrPacketsToAck):
let result = socket.ackPacket(socket.seqNr - socket.curWindowPackets, currentTime)
let result = socket.ackPacket(
socket.seqNr - socket.curWindowPackets, currentTime)
case result
of PacketAcked:
dec socket.curWindowPackets
@ -844,7 +851,9 @@ proc ackPackets(socket: UtpSocket, nrPacketsToAck: uint16, currentTime: Moment)
inc i
proc calculateAckedbytes(socket: UtpSocket, nrPacketsToAck: uint16, now: Moment): (uint32, Duration) =
proc calculateAckedbytes(
socket: UtpSocket, nrPacketsToAck: uint16, now: Moment):
(uint32, Duration) =
var i: uint16 = 0
var ackedBytes: uint32 = 0
var minRtt: Duration = InfiniteDuration
@ -891,9 +900,11 @@ proc isAckNrInvalid(socket: UtpSocket, packet: Packet): bool =
)
# counts the number of bytes acked by selective ack header
proc calculateSelectiveAckBytes*(socket: UtpSocket, receivedPackedAckNr: uint16, ext: SelectiveAckExtension): uint32 =
# we add 2, as the first bit in the mask therefore represents ackNr + 2 because
# ackNr + 1 (i.e next expected packet) is considered lost.
proc calculateSelectiveAckBytes*(
socket: UtpSocket, receivedPackedAckNr: uint16, ext: SelectiveAckExtension):
uint32 =
# Add 2, as the first bit in the mask represents ackNr + 2 because ackNr + 1
# (i.e next expected packet) is considered lost.
let base = receivedPackedAckNr + 2
if socket.curWindowPackets == 0:
@ -925,12 +936,13 @@ proc calculateSelectiveAckBytes*(socket: UtpSocket, receivedPackedAckNr: uint16
return ackedBytes
# decays maxWindow size by half if time is right i.e it is at least 100m since last
# window decay
# decays maxWindow size by half if time is right i.e it is at least 100m since
# last window decay
proc tryDecayWindow(socket: UtpSocket, now: Moment) =
if (now - socket.lastWindowDecay >= maxWindowDecay):
socket.lastWindowDecay = now
let newMaxWindow = max(uint32(0.5 * float64(socket.maxWindow)), uint32(minWindowSize))
let newMaxWindow =
max(uint32(0.5 * float64(socket.maxWindow)), uint32(minWindowSize))
debug "Decaying maxWindow",
oldWindow = socket.maxWindow,
@ -940,10 +952,13 @@ proc tryDecayWindow(socket: UtpSocket, now: Moment) =
socket.slowStart = false
socket.slowStartThreshold = newMaxWindow
# ack packets (removes them from out going buffer) based on selective ack extension header
proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: SelectiveAckExtension, currentTime: Moment): void =
# we add 2, as the first bit in the mask therefore represents ackNr + 2 because
# ackNr + 1 (i.e next expected packet) is considered lost.
# ack packets (removes them from out going buffer) based on selective ack
# extension header
proc selectiveAckPackets(
socket: UtpSocket, receivedPackedAckNr: uint16, ext: SelectiveAckExtension,
currentTime: Moment): void =
# Add 2, as the first bit in the mask represents ackNr + 2 because ackNr + 1
# (i.e next expected packet) is considered lost.
let base = receivedPackedAckNr + 2
if socket.curWindowPackets == 0:
@ -951,9 +966,10 @@ proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: S
var bits = (len(ext.acks)) * 8 - 1
# number of packets acked by this selective acks, it also works as duplicate ack
# counter.
# from spec: Each packet that is acked in the selective ack message counts as one duplicate ack
# number of packets acked by this selective ack, it also works as duplicate
# ack counter.
# from spec: Each packet that is acked in the selective ack message counts as
# one duplicate ack
var counter = 0
# sequence numbers of packets which should be resend
@ -986,7 +1002,8 @@ proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: S
dec bits
continue
if counter >= duplicateAcksBeforeResend and (v - socket.fastResendSeqNr) <= reorderBufferMaxSize:
if counter >= duplicateAcksBeforeResend and
(v - socket.fastResendSeqNr) <= reorderBufferMaxSize:
debug "No ack for packet",
pkAckNr = v,
dupAckCounter = counter,
@ -995,10 +1012,11 @@ proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: S
dec bits
let nextExpectedPacketSeqNr = base - 1'u16
# if we are about to start to resending first packet should be the first unacked packet
# When resending packets, the first packet should be the first unacked packet,
# ie. base - 1
if counter >= duplicateAcksBeforeResend and (nextExpectedPacketSeqNr - socket.fastResendSeqNr) <= reorderBufferMaxSize:
let nextExpectedPacketSeqNr = base - 1'u16
if counter >= duplicateAcksBeforeResend and
(nextExpectedPacketSeqNr - socket.fastResendSeqNr) <= reorderBufferMaxSize:
debug "No ack for packet",
pkAckNr = nextExpectedPacketSeqNr,
dupAckCounter = counter,
@ -1043,12 +1061,14 @@ proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: S
socket.duplicateAck = uint16(counter)
# Public mainly for test purposes
# generates bit mask which indicates which packets are already in socket
# reorder buffer
# from speck:
# The bitmask has reverse byte order. The first byte represents packets [ack_nr + 2, ack_nr + 2 + 7] in reverse order
# The least significant bit in the byte represents ack_nr + 2, the most significant bit in the byte represents ack_nr + 2 + 7
# The next byte in the mask represents [ack_nr + 2 + 8, ack_nr + 2 + 15] in reverse order, and so on
# Generates bit mask which indicates which packets are already in socket
# reorder buffer.
# From spec:
# The bitmask has reverse byte order. The first byte represents packets
# [ack_nr + 2, ack_nr + 2 + 7] in reverse order.
# The least significant bit in the byte represents ack_nr + 2, the most
# significant bit in the byte represents ack_nr + 2 + 7. The next byte in the
# mask represents [ack_nr + 2 + 8, ack_nr + 2 + 15] in reverse order, and so on.
proc generateSelectiveAckBitMask*(socket: UtpSocket): array[4, byte] =
let window = min(32, socket.inBuffer.len())
var arr: array[4, uint8] = [0'u8, 0, 0, 0]
@ -1181,10 +1201,11 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
duplicateAckCounter = socket.duplicateAck
else:
socket.duplicateAck = 0
# spec says that in case of duplicate ack counter larger that duplicateAcksBeforeResend
# we should re-send oldest packet, on the other hand reference implementation
# has code path which does it commented out with todo. Currently to be as close
# to reference impl we do not resend packets in that case
# Spec states that in case of a duplicate ack counter larger than
# `duplicateAcksBeforeResend` the oldest packet should be resend. However, the
# reference implementation has the code path which does this commented out
# with a todo. Currently the reference implementation is follow and packets
# are not resend in this case.
debug "Packet state variables",
pastExpected = pastExpected,
@ -1192,15 +1213,16 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
# If packet is totally off the mark, short-circuit the processing
if pastExpected >= reorderBufferMaxSize:
# if `pastExpected` is really big number (for example: uint16.high) then most
# probably we are receiving packet which we already received
# example: we already received packet with `seqNr = 10` so our `socket.ackNr = 10`
# if we receive this packet once again then `pastExpected = 10 - 10 - 1` which
# equals (due to wrapping) 65535
# this means that remote most probably did not receive our ack, so we need to resend
# it. We are doing it for last `reorderBufferMaxSize` packets
let isPossibleDuplicatedOldPacket = pastExpected >= (int(uint16.high) + 1) - reorderBufferMaxSize
# if `pastExpected` is a really big number (for example: uint16.high) then
# most probably we are receiving packet which we already received.
# example: socket already received packet with `seqNr = 10` so the
# `socket.ackNr = 10`.
# Then when this packet is received once again then
# `pastExpected = 10 - 10 - 1` which equals (due to wrapping) 65535.
# This means that remote most probably did not receive our ack, so we need
# to resend it. We are doing it for last `reorderBufferMaxSize` packets.
let isPossibleDuplicatedOldPacket =
pastExpected >= (int(uint16.high) + 1) - reorderBufferMaxSize
if (isPossibleDuplicatedOldPacket and p.header.pType != ST_STATE):
socket.sendAck()
@ -1209,13 +1231,15 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
pastExpected = pastExpected
return
var (ackedBytes, minRtt) = socket.calculateAckedbytes(acks, timestampInfo.moment)
var (ackedBytes, minRtt) =
socket.calculateAckedbytes(acks, timestampInfo.moment)
debug "Bytes acked by classic ack",
bytesAcked = ackedBytes
if (p.eack.isSome()):
let selectiveAckedBytes = socket.calculateSelectiveAckBytes(pkAckNr, p.eack.unsafeGet())
let selectiveAckedBytes =
socket.calculateSelectiveAckBytes(pkAckNr, p.eack.unsafeGet())
debug "Bytes acked by selective ack",
bytesAcked = selectiveAckedBytes
ackedBytes = ackedBytes + selectiveAckedBytes
@ -1225,8 +1249,8 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
# we are using uint32 not a Duration, to wrap a round in case of
# sentTimeRemote > receipTimestamp. This can happen as local and remote
# clock can be not synchronized or even using different system clock.
# i.e this number itself does not tell anything and is only used to feedback it
# to remote peer with each sent packet
# i.e this number itself does not tell anything and is only used to feedback
# it to remote peer with each sent packet
let remoteDelay =
if (sentTimeRemote == 0):
0'u32
@ -1246,7 +1270,8 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
if (prevRemoteDelayBase != 0 and
wrapCompareLess(socket.remoteHistogram.delayBase, prevRemoteDelayBase) and
prevRemoteDelayBase - socket.remoteHistogram.delayBase <= 10000'u32):
socket.ourHistogram.shift(prevRemoteDelayBase - socket.remoteHistogram.delayBase)
socket.ourHistogram.shift(
prevRemoteDelayBase - socket.remoteHistogram.delayBase)
let actualDelay = p.header.timestampDiff
@ -1286,10 +1311,12 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
slowStartThreshold = newSlowStartThreshold,
slowstart = newSlowStart
if (socket.zeroWindowTimer.isNone() and socket.maxRemoteWindow <= currentPacketSize):
# when zeroWindowTimer will be hit and maxRemoteWindow still will be equal to 0
# then it will be reset to minimal value
socket.zeroWindowTimer = some(timestampInfo.moment + socket.socketConfig.remoteWindowResetTimeout)
if (socket.zeroWindowTimer.isNone() and
socket.maxRemoteWindow <= currentPacketSize):
# when zeroWindowTimer is hit and maxRemoteWindow still is equal
# to 0 then it will be reset to the minimal value
socket.zeroWindowTimer =
some(timestampInfo.moment + socket.socketConfig.remoteWindowResetTimeout)
debug "Remote window size dropped below packet size",
currentTime = timestampInfo.moment,
@ -1298,15 +1325,16 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
socket.tryfinalizeConnection(p)
# socket.curWindowPackets == acks means that this packet acked all remaining packets
# including the sent fin packets
# socket.curWindowPackets == acks means that this packet acked all remaining
# packets including the sent FIN packets
if (socket.finSent and socket.curWindowPackets == acks):
debug "FIN acked, destroying socket"
socket.finAcked = true
# this bit of utp spec is a bit under specified (i.e there is not specification at all)
# reference implementation moves socket to destroy state in case that our fin was acked
# and socket is considered closed for reading and writing.
# but in theory remote could stil write some data on this socket (or even its own fin)
# this part of the uTP spec is a bit under specified, i.e there is no
# specification at all. The reference implementation moves socket to destroy
# state in case that our FIN was acked and socket is considered closed for
# reading and writing. But in theory, the remote could still write some data
# on this socket (or even its own FIN).
socket.destroy()
# Update fast resend counter to avoid resending old packet twice
@ -1336,11 +1364,12 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
socket.ackPackets(acks, timestampInfo.moment)
# packets in front may have been acked by selective ack, decrease window until we hit
# a packet that is still waiting to be acked
while (socket.curWindowPackets > 0 and socket.outBuffer.get(socket.seqNr - socket.curWindowPackets).isNone()):
# packets in front may have been acked by selective ack, decrease window until
# we hit a packet that is still waiting to be acked.
while (socket.curWindowPackets > 0 and
socket.outBuffer.get(socket.seqNr - socket.curWindowPackets).isNone()):
dec socket.curWindowPackets
debug "Packet in front hase been acked by selective ack. Decrese window",
debug "Packet in front has been acked by selective ack. Decrease window",
windowPackets = socket.curWindowPackets
# fast timeout
@ -1354,19 +1383,20 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
if oldestOutstandingPktSeqNr != socket.fastResendSeqNr:
# fastResendSeqNr do not point to oldest unacked packet, we probably already resent
# packet that timed-out. Leave fast timeout mode
# fastResendSeqNr does not point to oldest unacked packet, we probably
# already resent the packet that timed-out. Leave on fast timeout mode.
socket.fastTimeout = false
else:
let shouldReSendPacket = socket.outBuffer.exists(oldestOutstandingPktSeqNr, (p: OutgoingPacket) => p.transmissions > 0)
let shouldReSendPacket = socket.outBuffer.exists(
oldestOutstandingPktSeqNr, (p: OutgoingPacket) => p.transmissions > 0)
if shouldReSendPacket:
debug "Packet fast timeout resend",
pkSeqNr = oldestOutstandingPktSeqNr
inc socket.fastResendSeqNr
# Is is safe to call force resend as we already checked shouldReSendPacket
# condition
# It is safe to call force resend as we already checked
# `shouldReSendPacket` condition.
socket.sendPacket(oldestOutstandingPktSeqNr)
if (p.eack.isSome()):
@ -1387,12 +1417,14 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
let payloadLength = len(p.payload)
if (payloadLength > 0 and (not socket.readShutdown)):
# we need to sum both rcv buffer and reorder buffer
if (uint32(socket.offset) + socket.inBufferBytes + uint32(payloadLength) > socket.socketConfig.optRcvBuffer):
let totalBufferSize =
uint32(socket.offset) + socket.inBufferBytes + uint32(payloadLength)
if (totalBufferSize > socket.socketConfig.optRcvBuffer):
# even though packet is in order and passes all the checks, it would
# overflow our receive buffer, it means that we are receiving data
# faster than we are reading it. Do not ack this packet, and drop received
# data
debug "Recevied packet would overflow receive buffer dropping it",
# faster than we are reading it. Do not ack this packet, and drop
# received data.
debug "Received packet would overflow receive buffer, dropping it",
pkSeqNr = p.header.seqNr,
bytesReceived = payloadLength,
rcvbufferSize = socket.offset,
@ -1401,34 +1433,38 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
debug "Received data packet",
bytesReceived = payloadLength
# we are getting in order data packet, we can flush data directly to the incoming buffer
# we are getting in order data packet, we can flush data directly to the
# incoming buffer.
# await upload(addr socket.buffer, unsafeAddr p.payload[0], p.payload.len())
moveMem(addr socket.rcvBuffer[socket.offset], unsafeAddr p.payload[0], payloadLength)
moveMem(
addr socket.rcvBuffer[socket.offset],
unsafeAddr p.payload[0], payloadLength)
socket.offset = socket.offset + payloadLength
# Bytes have been passed to upper layer, we can increase number of last
# acked packet
inc socket.ackNr
# check if the following packets are in reorder buffer
# check if the following packets are in re-order buffer
debug "Looking for packets in re-order buffer",
reorderCount = socket.reorderCount
while true:
# We are doing this in reorder loop, to handle the case when we already received
# fin but there were some gaps before eof
# we have reached remote eof, and should not receive more packets from remote
if ((not socket.reachedFin) and socket.gotFin and socket.eofPktNr == socket.ackNr):
# We are doing this in reorder loop, to handle the case when we already
# received FIN but there were some gaps before eof.
# we have reached remote eof and should not receive more packets from
# remote.
if ((not socket.reachedFin) and socket.gotFin and
socket.eofPktNr == socket.ackNr):
debug "Reached socket EOF"
# In case of reaching eof, it is up to user of library what to to with
# it. With the current implementation, the most appropriate way would be to
# destroy it (as with our implementation we know that remote is destroying its acked fin)
# as any other send will either generate timeout, or socket will be forcefully
# closed by reset
# In case of reaching eof, it is up to user of library what to do with
# it. With the current implementation, the most appropriate way would
# be to destroy it (as with our implementation we know that remote is
# destroying its acked fin) as any other send will either generate
# timeout, or socket will be forcefully closed by reset
socket.reachedFin = true
# this is not necessarily true, but as we have already reached eof we can
# ignore following packets
# this is not necessarily true, but as we have already reached eof we
# can ignore following packets
socket.reorderCount = 0
if socket.reorderCount == 0:
@ -1454,9 +1490,12 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
rcvbufferSize = socket.offset,
reorderBufferSize = socket.inBufferBytes
# Rcv buffer and reorder buffer are sized that it is always possible to
# move data from reorder buffer to rcv buffer without overflow
moveMem(addr socket.rcvBuffer[socket.offset], unsafeAddr packet.payload[0], reorderPacketPayloadLength)
# Rcv buffer and reorder buffer are sized such that it is always
# possible to move data from reorder buffer to rcv buffer without
# overflow.
moveMem(
addr socket.rcvBuffer[socket.offset],
unsafeAddr packet.payload[0], reorderPacketPayloadLength)
socket.offset = socket.offset + reorderPacketPayloadLength
debug "Deleting packet",
@ -1465,7 +1504,8 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
socket.inBuffer.delete(nextPacketNum)
inc socket.ackNr
dec socket.reorderCount
socket.inBufferBytes = socket.inBufferBytes - uint32(reorderPacketPayloadLength)
socket.inBufferBytes =
socket.inBufferBytes - uint32(reorderPacketPayloadLength)
debug "Socket state after processing in order packet",
socketKey = socket.socketKey,
@ -1498,9 +1538,13 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
debug "Packet with seqNr already received",
seqNr = pkSeqNr
else:
let payloadLength = uint32(len(p.payload))
if (socket.inBufferBytes + payloadLength <= socket.socketConfig.maxSizeOfReorderBuffer and
socket.inBufferBytes + uint32(socket.offset) + payloadLength <= socket.socketConfig.optRcvBuffer):
let
payloadLength = uint32(len(p.payload))
totalReorderSize = socket.inBufferBytes + payloadLength
totalBufferSize =
socket.inBufferBytes + uint32(socket.offset) + payloadLength
if (totalReorderSize <= socket.socketConfig.maxSizeOfReorderBuffer and
totalBufferSize <= socket.socketConfig.optRcvBuffer):
debug "store packet in reorder buffer",
packetBytes = payloadLength,
@ -1516,8 +1560,8 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
socket.inBufferBytes = socket.inBufferBytes + payloadLength
debug "added out of order packet to reorder buffer",
reorderCount = socket.reorderCount
# we send ack packet, as we reorder count is > 0, so the eack bitmask will be
# generated
# we send ack packet, as we reorder count is > 0, so the eack bitmask
# will be generated
socket.sendAck()
proc processPacket*(socket: UtpSocket, p: Packet): Future[void] =
@ -1542,8 +1586,8 @@ proc onRead(socket: UtpSocket, readReq: var ReadReq): ReadResult =
return ReadCancelled
if socket.atEof():
# buffer is already empty and we reached remote fin, just finish read with whatever
# was already read
# buffer is already empty and we reached remote fin, just finish read with
# whatever was already read
readReq.reader.complete(readReq.bytesAvailable)
return SocketAlreadyFinished
@ -1768,7 +1812,8 @@ proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] =
let retFuture = newFuture[WriteResult]("UtpSocket.write")
if (socket.state != Connected):
let res = WriteResult.err(WriteError(kind: SocketNotWriteable, currentState: socket.state))
let res = WriteResult.err(
WriteError(kind: SocketNotWriteable, currentState: socket.state))
retFuture.complete(res)
return retFuture
@ -1845,8 +1890,8 @@ proc read*(socket: UtpSocket): Future[seq[byte]] =
return fut
# Check how many packets are still in the out going buffer, usefully for tests or
# debugging.
# Check how many packets are still in the out going buffer, usefully for tests
# or debugging.
proc numPacketsInOutGoingBuffer*(socket: UtpSocket): int =
var num = 0
for e in socket.outBuffer.items():
@ -1858,11 +1903,12 @@ proc numPacketsInOutGoingBuffer*(socket: UtpSocket): int =
proc numOfBytesInFlight*(socket: UtpSocket): uint32 = socket.currentWindow
# Check how many bytes are in incoming buffer
proc numOfBytesInIncomingBuffer*(socket: UtpSocket): uint32 = uint32(socket.offset)
proc numOfBytesInIncomingBuffer*(socket: UtpSocket): uint32 =
uint32(socket.offset)
# Check how many packets are still in the reorder buffer, useful for tests or
# debugging.
# It throws assertion error when number of elements in buffer do not equal kept counter
# debugging. It throws assertion error when number of elements in buffer do not
# equal kept counter.
proc numPacketsInReorderedBuffer*(socket: UtpSocket): int =
var num = 0
for e in socket.inBuffer.items():
@ -1874,9 +1920,9 @@ proc numPacketsInReorderedBuffer*(socket: UtpSocket): int =
proc numOfEventsInEventQueue*(socket: UtpSocket): int = len(socket.eventQueue)
proc connectionId*[A](socket: UtpSocket[A]): uint16 =
## Connection id is id which is used in first SYN packet which establishes the connection
## so for Outgoing side it is actually its rcv_id, and for Incoming side it is
## its snd_id
## Connection id is the id which is used in first SYN packet which establishes
## the connection, so for `Outgoing` side it is actually its rcv_id, and for
## `Incoming` side it is its snd_id.
case socket.direction
of Incoming:
socket.connectionIdSnd
@ -1902,11 +1948,11 @@ proc new[A](
): T =
let currentTime = getMonoTimestamp().moment
# Initial max window size. Reference implementation uses value which enables one packet
# to be transferred.
# We use value two times higher as we do not yet have proper mtu estimation, and
# our impl should work over udp and discovery v5 (where proper estimation may be harder
# as packets already have discoveryv5 envelope)
# Initial max window size. Reference implementation uses value which allows
# one packet to be transferred.
# We use a value two times higher as we do not yet have proper mtu estimation,
# and our impl. should work over UDP and discovery v5 (where proper estimation
# may be harder as packets already have discovery v5 envelope).
let initMaxWindow = 2 * cfg.payloadSize
T(
remoteAddress: to,
@ -1987,10 +2033,10 @@ proc newIncomingSocket*[A](
let (initialState, initialTimeout) =
if (cfg.incomingSocketReceiveTimeout.isNone()):
# it does not matter what timeout value we put here, as socket will be in
# connected state without outgoing packets in buffer so any timeout hit will
# just double rto without any penalties
# although we cannot use 0, as then timeout will be constantly re-set to 500ms
# and there will be a lot of not useful work done
# connected state without outgoing packets in buffer so any timeout hit
# will just double rto without any penalties
# although we cannot use 0, as then timeout will be constantly re-set to
# 500ms and there will be a lot of not useful work done
(Connected, defaultInitialSynTimeout)
else:
let timeout = cfg.incomingSocketReceiveTimeout.unsafeGet()
@ -2020,7 +2066,8 @@ proc startIncomingSocket*(socket: UtpSocket) =
proc startOutgoingSocket*(socket: UtpSocket): Future[void] =
doAssert(socket.state == SynSent)
let packet = synPacket(socket.seqNr, socket.connectionIdRcv, socket.getRcvWindowSize())
let packet =
synPacket(socket.seqNr, socket.connectionIdRcv, socket.getRcvWindowSize())
debug "Sending SYN packet",
seqNr = packet.header.seqNr,
connectionId = packet.header.connectionId

View File

@ -1,4 +1,4 @@
# Copyright (c) 2020-2022 Status Research & Development GmbH
# Copyright (c) 2020-2023 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
@ -17,48 +17,49 @@ import
../../eth/keys,
../stubloglevel
procSuite "Utp socket unit test":
let rng = newRng()
let testAddress = initTAddress("127.0.0.1", 9079)
let testBufferSize = 1024'u32
let defaultRcvOutgoingId = 314'u16
procSuite "uTP socket tests":
let
rng = newRng()
testAddress = initTAddress("127.0.0.1", 9079)
testBufferSize = 1024'u32
defaultRcvOutgoingId = 314'u16
proc packetsToBytes(packets: seq[Packet]): seq[byte] =
var resultBytes = newSeq[byte]()
var bytes = newSeq[byte]()
for p in packets:
resultBytes.add(p.payload)
return resultBytes
bytes.add(p.payload)
return bytes
asyncTest "Starting outgoing socket should send Syn packet":
asyncTest "Outgoing socket must send SYN packet":
let q = newAsyncQueue[Packet]()
let defaultConfig = SocketConfig.init()
let sock1 = newOutgoingSocket[TransportAddress](
let socket = newOutgoingSocket[TransportAddress](
testAddress,
initTestSnd(q),
defaultConfig,
defaultRcvOutgoingId,
rng[]
)
let fut1 = sock1.startOutgoingSocket()
let fut = socket.startOutgoingSocket()
let initialPacket = await q.get()
check:
initialPacket.header.pType == ST_SYN
initialPacket.header.wndSize == defaultConfig.optRcvBuffer
await sock1.destroyWait()
fut1.cancel()
await socket.destroyWait()
fut.cancel()
asyncTest "Outgoing socket should re-send syn packet 2 times before declaring failure":
asyncTest "Outgoing socket should re-send SYN packet 2 times before declaring failure":
let q = newAsyncQueue[Packet]()
let sock1 = newOutgoingSocket[TransportAddress](
let socket = newOutgoingSocket[TransportAddress](
testAddress,
initTestSnd(q),
SocketConfig.init(milliseconds(100)),
defaultRcvOutgoingId,
rng[]
)
let fut1 = sock1.startOutgoingSocket()
let fut1 = socket.startOutgoingSocket()
let initialPacket = await q.get()
check:
@ -75,24 +76,24 @@ procSuite "Utp socket unit test":
resentSynPacket1.header.pType == ST_SYN
# next timeout will should disconnect socket
await waitUntil(proc (): bool = sock1.isConnected() == false)
await waitUntil(proc (): bool = socket.isConnected() == false)
check:
not sock1.isConnected()
not socket.isConnected()
await sock1.destroyWait()
await socket.destroyWait()
fut1.cancel()
asyncTest "Processing in order ack should make socket connected":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
let (sock1, packet) = connectOutGoingSocket(initialRemoteSeq, q)
let (socket, packet) = connectOutGoingSocket(initialRemoteSeq, q)
check:
sock1.isConnected()
socket.isConnected()
await sock1.destroyWait()
await socket.destroyWait()
asyncTest "Processing in order data packet should upload it to buffer and ack packet":
let q = newAsyncQueue[Packet]()
@ -1418,9 +1419,9 @@ procSuite "Utp socket unit test":
let dataDropped = @[1'u8]
let dataReceived = @[2'u8]
let sock1 = newOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), cfg, defaultRcvOutgoingId, rng[])
let socket = newOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), cfg, defaultRcvOutgoingId, rng[])
asyncSpawn sock1.startOutgoingSocket()
asyncSpawn socket.startOutgoingSocket()
let initialPacket = await q.get()
@ -1456,16 +1457,16 @@ procSuite "Utp socket unit test":
# even though @[1'u8] is received first, it should be dropped as socket is not
# yet in connected state
await sock1.processPacket(dpDropped)
await sock1.processPacket(responseAck)
await sock1.processPacket(dpReceived)
await socket.processPacket(dpDropped)
await socket.processPacket(responseAck)
await socket.processPacket(dpReceived)
let receivedData = await sock1.read(1)
let receivedData = await socket.read(1)
check:
receivedData == dataReceived
await sock1.destroyWait()
await socket.destroyWait()
asyncTest "Clean up all resources when closing due to timeout failure":
let q = newAsyncQueue[Packet]()