diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index dfc6cd0..4364261 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -80,6 +80,13 @@ type # Size of reorder buffer calculated as fraction of optRcvBuffer maxSizeOfReorderBuffer: uint32 + # Maximal number of payload bytes per data packet. Total packet size will be equal to + # payloadSize + 20 (size of header of data packet) + # TODO for now we enable only static configuration of packet sizes. In the future + # it would be nice to add option which enables automatic packet size discovery + # based on traffic + payloadSize*: uint32 + WriteErrorType* = enum SocketNotWriteable, FinSent @@ -297,11 +304,16 @@ type ConnectionResult*[A] = Result[UtpSocket[A], OutgoingConnectionError] const - # Maximal number of payload bytes per packet. Total packet size will be equal to - # mtuSize + sizeof(header) = 600 bytes - # TODO for now it is just some random value. Ultimatly this value should be dynamically - # adjusted based on traffic. - mtuSize = 580 + # Default maximum size of the data packet payload. With such configuration + # data packets will have 508 bytes (488 + 20 header). + # 508 bytes of udp payload can translate into 576 bytes udp packet i.e + # 508bytes paylaod + 60bytes (max ip header) + 8bytes (udp header) = 576bytes. + # 576bytes is defined as minimum reassembly buffer size i.e + # the minimum datagram size that we are guaranteed any implementation must support. + # from RFC791: All hosts must be prepared + # to accept datagrams of up to 576 octets (whether they arrive whole + # or in fragments). + defaultPayloadSize = 488 # How often each socket check its different on going timers checkTimeoutsLoopInterval = milliseconds(500) @@ -338,18 +350,6 @@ const # from remote peer with `wndSize` set to number <= current packet size defaultResetWindowTimeout = seconds(15) - # If remote peer window drops to zero, then after some time we will reset it - # to this value even if we do not receive any more messages from remote peers. - # Reset period is configured in `SocketConfig` - minimalRemoteWindow: uint32 = 1500 - - # Initial max window size. Reference implementation uses value which enables one packet - # to be transfered. - # We use value two times higher as we do not yet have proper mtu estimation, and - # our impl should work over udp and discovery v5 (where proper estmation may be harder - # as packets already have discvoveryv5 envelope) - startMaxWindow* = 2 * mtuSize - reorderBufferMaxSize = 1024 duplicateAcksBeforeResend = 3 @@ -366,7 +366,6 @@ const # happens maxReorderBufferSize = 0.5 - proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T = UtpSocketKey[A](remoteAddress: remoteAddress, rcvId: rcvId) @@ -392,8 +391,12 @@ proc init*( optRcvBuffer: uint32 = defaultOptRcvBuffer, incomingSocketReceiveTimeout: Option[Duration] = some(defaultRcvRetransmitTimeout), remoteWindowResetTimeout: Duration = defaultResetWindowTimeout, - optSndBuffer: uint32 = defaultOptRcvBuffer + optSndBuffer: uint32 = defaultOptRcvBuffer, + payloadSize: uint32 = defaultPayloadSize ): T = + # make sure there is always some payload in data packets, and that packets are not to large. + # with 1480 packet boundary, data packets will have 1500 bytes which seems reasonable + doAssert(payloadSize > 0 and payloadSize <= 1480, "payloadSize should always be positive number <= 1480") # TODO make sure optRcvBuffer is nicely divisible by maxReorderBufferSize let reorderBufferSize = uint32(maxReorderBufferSize * float64(optRcvBuffer)) SocketConfig( @@ -403,7 +406,8 @@ proc init*( optSndBuffer: optSndBuffer, incomingSocketReceiveTimeout: incomingSocketReceiveTimeout, remoteWindowResetTimeout: remoteWindowResetTimeout, - maxSizeOfReorderBuffer: reorderBufferSize + maxSizeOfReorderBuffer: reorderBufferSize, + payloadSize: payloadSize ) # number of bytes which will fit in current send window @@ -515,14 +519,16 @@ proc checkTimeouts(socket: UtpSocket) = socket.flushPackets() if socket.isOpened(): - let currentPacketSize = uint32(socket.getPacketSize()) + let currentPacketSize = socket.getPacketSize() if (socket.zeroWindowTimer.isSome() and currentTime > socket.zeroWindowTimer.unsafeGet()): if socket.maxRemoteWindow <= currentPacketSize: + # Reset remote window, to minimal value which will fit at least two packet + let minimalRemoteWindow = 2 * socket.socketConfig.payloadSize socket.maxRemoteWindow = minimalRemoteWindow + debug "Reset remote window to minimal value", + minRemote = minimalRemoteWindow socket.zeroWindowTimer = none[Moment]() - debug "Reset remote window to minimal value", - minRemote = minimalRemoteWindow if (currentTime > socket.rtoTimeout): debug "CheckTimeouts rto timeout", @@ -623,12 +629,11 @@ proc checkTimeoutsLoop(s: UtpSocket) {.async.} = proc startTimeoutLoop(s: UtpSocket) = s.checkTimeoutsLoop = checkTimeoutsLoop(s) -proc getPacketSize*(socket: UtpSocket): int = - # TODO currently returning constant, ultimatly it should be bases on mtu estimates - mtuSize +proc getPacketSize*(socket: UtpSocket): uint32 = + socket.socketConfig.payloadSize proc handleDataWrite(socket: UtpSocket, data: seq[byte]): int = - let pSize = socket.getPacketSize() + let pSize = int(socket.getPacketSize()) let endIndex = data.high() var i = 0 var bytesWritten = 0 @@ -1221,7 +1226,7 @@ proc processPacketInternal(socket: UtpSocket, p: Packet) = let diff = uint32((socket.ourHistogram.getValue() - minRtt).microseconds()) socket.ourHistogram.shift(diff) - let currentPacketSize = uint32(socket.getPacketSize()) + let currentPacketSize = socket.getPacketSize() let (newMaxWindow, newSlowStartTreshold, newSlowStart) = applyCongestionControl( socket.maxWindow, @@ -1825,6 +1830,13 @@ proc new[A]( initialTimeout: Duration ): T = let currentTime = getMonoTimestamp().moment + + # Initial max window size. Reference implementation uses value which enables one packet + # to be transfered. + # We use value two times higher as we do not yet have proper mtu estimation, and + # our impl should work over udp and discovery v5 (where proper estmation may be harder + # as packets already have discoveryv5 envelope) + let initMaxWindow = 2 * cfg.payloadSize T( remoteAddress: to, state: state, @@ -1840,7 +1852,7 @@ proc new[A]( currentWindow: 0, # start with 1mb assumption, field will be updated with first received packet maxRemoteWindow: 1024 * 1024, - maxWindow: startMaxWindow, + maxWindow: initMaxWindow, inBuffer: GrowableCircularBuffer[Packet].init(), retransmitTimeout: initialTimeout, rtoTimeout: currentTime + initialTimeout, @@ -1926,6 +1938,9 @@ proc newIncomingSocket*[A]( initialTimeout ) +proc getSocketConfig*(socket: UtpSocket): SocketConfig = + socket.socketConfig + proc startIncomingSocket*(socket: UtpSocket) = # Make sure ack was flushed before moving forward socket.sendAck() diff --git a/tests/utp/test_protocol.nim b/tests/utp/test_protocol.nim index d39207f..a262d01 100644 --- a/tests/utp/test_protocol.nim +++ b/tests/utp/test_protocol.nim @@ -424,7 +424,7 @@ procSuite "Utp protocol over udp tests": asyncTest "Success data transfer of a lot of data should increase available window on sender side": let s = await initClientServerScenario() - + let startMaxWindow = 2 * s.clientSocket.getSocketConfig().payloadSize check: s.clientSocket.isConnected() # initially window has value equal to some pre configured constant @@ -455,6 +455,7 @@ procSuite "Utp protocol over udp tests": asyncTest "Not used socket should decay its max send window": let s = await initClientServerScenario() + let startMaxWindow = 2 * s.clientSocket.getSocketConfig().payloadSize check: s.clientSocket.isConnected() diff --git a/tests/utp/test_utp_socket.nim b/tests/utp/test_utp_socket.nim index 53027d0..8d3cf60 100644 --- a/tests/utp/test_utp_socket.nim +++ b/tests/utp/test_utp_socket.nim @@ -1490,7 +1490,6 @@ procSuite "Utp socket unit test": writeFut.finished() await outgoingSocket.destroyWait() - asyncTest "Receiving ack for fin packet should destroy socket and clean up all resources": let q = newAsyncQueue[Packet]() let initialRemoteSeq = 10'u16 @@ -1526,3 +1525,29 @@ procSuite "Utp socket unit test": outgoingSocket.isClosedAndCleanedUpAllResources() await outgoingSocket.destroyWait() + + asyncTest "Maximum payload size should be configurable": + let q = newAsyncQueue[Packet]() + let initalRemoteSeqNr = 10'u16 + let d = generateByteArray(rng[], 5000) + let maxPayloadSize = 800'u32 + let config = SocketConfig.init(payloadSize = maxPayloadSize) + + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q, cfg = config) + + let wr = await outgoingSocket.write(d) + + check: + wr.isOk() + + # Initial max window should allow for at least 2 packets to go through + let dp1 = await q.get() + let dp2 = await q.get() + + check: + dp1.header.pType == ST_DATA + len(dp1.payload) == int(maxPayloadSize) + dp2.header.pType == ST_DATA + len(dp2.payload) == int(maxPayloadSize) + + await outgoingSocket.destroyWait()