Make packet size configurable (#497)

* Make packet size configurable
This commit is contained in:
KonradStaniec 2022-04-04 13:44:32 +02:00 committed by GitHub
parent ea03e66485
commit eb785207ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 72 additions and 31 deletions

View File

@ -80,6 +80,13 @@ type
# Size of reorder buffer calculated as fraction of optRcvBuffer # Size of reorder buffer calculated as fraction of optRcvBuffer
maxSizeOfReorderBuffer: uint32 maxSizeOfReorderBuffer: uint32
# Maximal number of payload bytes per data packet. Total packet size will be equal to
# payloadSize + 20 (size of header of data packet)
# TODO for now we enable only static configuration of packet sizes. In the future
# it would be nice to add option which enables automatic packet size discovery
# based on traffic
payloadSize*: uint32
WriteErrorType* = enum WriteErrorType* = enum
SocketNotWriteable, SocketNotWriteable,
FinSent FinSent
@ -297,11 +304,16 @@ type
ConnectionResult*[A] = Result[UtpSocket[A], OutgoingConnectionError] ConnectionResult*[A] = Result[UtpSocket[A], OutgoingConnectionError]
const const
# Maximal number of payload bytes per packet. Total packet size will be equal to # Default maximum size of the data packet payload. With such configuration
# mtuSize + sizeof(header) = 600 bytes # data packets will have 508 bytes (488 + 20 header).
# TODO for now it is just some random value. Ultimatly this value should be dynamically # 508 bytes of udp payload can translate into 576 bytes udp packet i.e
# adjusted based on traffic. # 508bytes paylaod + 60bytes (max ip header) + 8bytes (udp header) = 576bytes.
mtuSize = 580 # 576bytes is defined as minimum reassembly buffer size i.e
# the minimum datagram size that we are guaranteed any implementation must support.
# from RFC791: All hosts must be prepared
# to accept datagrams of up to 576 octets (whether they arrive whole
# or in fragments).
defaultPayloadSize = 488
# How often each socket check its different on going timers # How often each socket check its different on going timers
checkTimeoutsLoopInterval = milliseconds(500) checkTimeoutsLoopInterval = milliseconds(500)
@ -338,18 +350,6 @@ const
# from remote peer with `wndSize` set to number <= current packet size # from remote peer with `wndSize` set to number <= current packet size
defaultResetWindowTimeout = seconds(15) defaultResetWindowTimeout = seconds(15)
# If remote peer window drops to zero, then after some time we will reset it
# to this value even if we do not receive any more messages from remote peers.
# Reset period is configured in `SocketConfig`
minimalRemoteWindow: uint32 = 1500
# Initial max window size. Reference implementation uses value which enables one packet
# to be transfered.
# We use value two times higher as we do not yet have proper mtu estimation, and
# our impl should work over udp and discovery v5 (where proper estmation may be harder
# as packets already have discvoveryv5 envelope)
startMaxWindow* = 2 * mtuSize
reorderBufferMaxSize = 1024 reorderBufferMaxSize = 1024
duplicateAcksBeforeResend = 3 duplicateAcksBeforeResend = 3
@ -366,7 +366,6 @@ const
# happens # happens
maxReorderBufferSize = 0.5 maxReorderBufferSize = 0.5
proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T = proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T =
UtpSocketKey[A](remoteAddress: remoteAddress, rcvId: rcvId) UtpSocketKey[A](remoteAddress: remoteAddress, rcvId: rcvId)
@ -392,8 +391,12 @@ proc init*(
optRcvBuffer: uint32 = defaultOptRcvBuffer, optRcvBuffer: uint32 = defaultOptRcvBuffer,
incomingSocketReceiveTimeout: Option[Duration] = some(defaultRcvRetransmitTimeout), incomingSocketReceiveTimeout: Option[Duration] = some(defaultRcvRetransmitTimeout),
remoteWindowResetTimeout: Duration = defaultResetWindowTimeout, remoteWindowResetTimeout: Duration = defaultResetWindowTimeout,
optSndBuffer: uint32 = defaultOptRcvBuffer optSndBuffer: uint32 = defaultOptRcvBuffer,
payloadSize: uint32 = defaultPayloadSize
): T = ): T =
# make sure there is always some payload in data packets, and that packets are not to large.
# with 1480 packet boundary, data packets will have 1500 bytes which seems reasonable
doAssert(payloadSize > 0 and payloadSize <= 1480, "payloadSize should always be positive number <= 1480")
# TODO make sure optRcvBuffer is nicely divisible by maxReorderBufferSize # TODO make sure optRcvBuffer is nicely divisible by maxReorderBufferSize
let reorderBufferSize = uint32(maxReorderBufferSize * float64(optRcvBuffer)) let reorderBufferSize = uint32(maxReorderBufferSize * float64(optRcvBuffer))
SocketConfig( SocketConfig(
@ -403,7 +406,8 @@ proc init*(
optSndBuffer: optSndBuffer, optSndBuffer: optSndBuffer,
incomingSocketReceiveTimeout: incomingSocketReceiveTimeout, incomingSocketReceiveTimeout: incomingSocketReceiveTimeout,
remoteWindowResetTimeout: remoteWindowResetTimeout, remoteWindowResetTimeout: remoteWindowResetTimeout,
maxSizeOfReorderBuffer: reorderBufferSize maxSizeOfReorderBuffer: reorderBufferSize,
payloadSize: payloadSize
) )
# number of bytes which will fit in current send window # number of bytes which will fit in current send window
@ -515,14 +519,16 @@ proc checkTimeouts(socket: UtpSocket) =
socket.flushPackets() socket.flushPackets()
if socket.isOpened(): if socket.isOpened():
let currentPacketSize = uint32(socket.getPacketSize()) let currentPacketSize = socket.getPacketSize()
if (socket.zeroWindowTimer.isSome() and currentTime > socket.zeroWindowTimer.unsafeGet()): if (socket.zeroWindowTimer.isSome() and currentTime > socket.zeroWindowTimer.unsafeGet()):
if socket.maxRemoteWindow <= currentPacketSize: if socket.maxRemoteWindow <= currentPacketSize:
# Reset remote window, to minimal value which will fit at least two packet
let minimalRemoteWindow = 2 * socket.socketConfig.payloadSize
socket.maxRemoteWindow = minimalRemoteWindow socket.maxRemoteWindow = minimalRemoteWindow
socket.zeroWindowTimer = none[Moment]()
debug "Reset remote window to minimal value", debug "Reset remote window to minimal value",
minRemote = minimalRemoteWindow minRemote = minimalRemoteWindow
socket.zeroWindowTimer = none[Moment]()
if (currentTime > socket.rtoTimeout): if (currentTime > socket.rtoTimeout):
debug "CheckTimeouts rto timeout", debug "CheckTimeouts rto timeout",
@ -623,12 +629,11 @@ proc checkTimeoutsLoop(s: UtpSocket) {.async.} =
proc startTimeoutLoop(s: UtpSocket) = proc startTimeoutLoop(s: UtpSocket) =
s.checkTimeoutsLoop = checkTimeoutsLoop(s) s.checkTimeoutsLoop = checkTimeoutsLoop(s)
proc getPacketSize*(socket: UtpSocket): int = proc getPacketSize*(socket: UtpSocket): uint32 =
# TODO currently returning constant, ultimatly it should be bases on mtu estimates socket.socketConfig.payloadSize
mtuSize
proc handleDataWrite(socket: UtpSocket, data: seq[byte]): int = proc handleDataWrite(socket: UtpSocket, data: seq[byte]): int =
let pSize = socket.getPacketSize() let pSize = int(socket.getPacketSize())
let endIndex = data.high() let endIndex = data.high()
var i = 0 var i = 0
var bytesWritten = 0 var bytesWritten = 0
@ -1221,7 +1226,7 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) =
let diff = uint32((socket.ourHistogram.getValue() - minRtt).microseconds()) let diff = uint32((socket.ourHistogram.getValue() - minRtt).microseconds())
socket.ourHistogram.shift(diff) socket.ourHistogram.shift(diff)
let currentPacketSize = uint32(socket.getPacketSize()) let currentPacketSize = socket.getPacketSize()
let (newMaxWindow, newSlowStartTreshold, newSlowStart) = let (newMaxWindow, newSlowStartTreshold, newSlowStart) =
applyCongestionControl( applyCongestionControl(
socket.maxWindow, socket.maxWindow,
@ -1825,6 +1830,13 @@ proc new[A](
initialTimeout: Duration initialTimeout: Duration
): T = ): T =
let currentTime = getMonoTimestamp().moment let currentTime = getMonoTimestamp().moment
# Initial max window size. Reference implementation uses value which enables one packet
# to be transfered.
# We use value two times higher as we do not yet have proper mtu estimation, and
# our impl should work over udp and discovery v5 (where proper estmation may be harder
# as packets already have discoveryv5 envelope)
let initMaxWindow = 2 * cfg.payloadSize
T( T(
remoteAddress: to, remoteAddress: to,
state: state, state: state,
@ -1840,7 +1852,7 @@ proc new[A](
currentWindow: 0, currentWindow: 0,
# start with 1mb assumption, field will be updated with first received packet # start with 1mb assumption, field will be updated with first received packet
maxRemoteWindow: 1024 * 1024, maxRemoteWindow: 1024 * 1024,
maxWindow: startMaxWindow, maxWindow: initMaxWindow,
inBuffer: GrowableCircularBuffer[Packet].init(), inBuffer: GrowableCircularBuffer[Packet].init(),
retransmitTimeout: initialTimeout, retransmitTimeout: initialTimeout,
rtoTimeout: currentTime + initialTimeout, rtoTimeout: currentTime + initialTimeout,
@ -1926,6 +1938,9 @@ proc newIncomingSocket*[A](
initialTimeout initialTimeout
) )
proc getSocketConfig*(socket: UtpSocket): SocketConfig =
socket.socketConfig
proc startIncomingSocket*(socket: UtpSocket) = proc startIncomingSocket*(socket: UtpSocket) =
# Make sure ack was flushed before moving forward # Make sure ack was flushed before moving forward
socket.sendAck() socket.sendAck()

View File

@ -424,7 +424,7 @@ procSuite "Utp protocol over udp tests":
asyncTest "Success data transfer of a lot of data should increase available window on sender side": asyncTest "Success data transfer of a lot of data should increase available window on sender side":
let s = await initClientServerScenario() let s = await initClientServerScenario()
let startMaxWindow = 2 * s.clientSocket.getSocketConfig().payloadSize
check: check:
s.clientSocket.isConnected() s.clientSocket.isConnected()
# initially window has value equal to some pre configured constant # initially window has value equal to some pre configured constant
@ -455,6 +455,7 @@ procSuite "Utp protocol over udp tests":
asyncTest "Not used socket should decay its max send window": asyncTest "Not used socket should decay its max send window":
let s = await initClientServerScenario() let s = await initClientServerScenario()
let startMaxWindow = 2 * s.clientSocket.getSocketConfig().payloadSize
check: check:
s.clientSocket.isConnected() s.clientSocket.isConnected()

View File

@ -1490,7 +1490,6 @@ procSuite "Utp socket unit test":
writeFut.finished() writeFut.finished()
await outgoingSocket.destroyWait() await outgoingSocket.destroyWait()
asyncTest "Receiving ack for fin packet should destroy socket and clean up all resources": asyncTest "Receiving ack for fin packet should destroy socket and clean up all resources":
let q = newAsyncQueue[Packet]() let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16 let initialRemoteSeq = 10'u16
@ -1526,3 +1525,29 @@ procSuite "Utp socket unit test":
outgoingSocket.isClosedAndCleanedUpAllResources() outgoingSocket.isClosedAndCleanedUpAllResources()
await outgoingSocket.destroyWait() await outgoingSocket.destroyWait()
asyncTest "Maximum payload size should be configurable":
let q = newAsyncQueue[Packet]()
let initalRemoteSeqNr = 10'u16
let d = generateByteArray(rng[], 5000)
let maxPayloadSize = 800'u32
let config = SocketConfig.init(payloadSize = maxPayloadSize)
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q, cfg = config)
let wr = await outgoingSocket.write(d)
check:
wr.isOk()
# Initial max window should allow for at least 2 packets to go through
let dp1 = await q.get()
let dp2 = await q.get()
check:
dp1.header.pType == ST_DATA
len(dp1.payload) == int(maxPayloadSize)
dp2.header.pType == ST_DATA
len(dp2.payload) == int(maxPayloadSize)
await outgoingSocket.destroyWait()