diff --git a/eth/utp/send_buffer_tracker.nim b/eth/utp/send_buffer_tracker.nim index f15cfbd..16f333a 100644 --- a/eth/utp/send_buffer_tracker.nim +++ b/eth/utp/send_buffer_tracker.nim @@ -20,13 +20,27 @@ type SendBufferTracker* = ref object # remote receive window updated based on packed wndSize field maxRemoteWindow*: uint32 + + # configuration option for maxium number of bytes in snd buffer + maxSndBufferSize*: uint32 waiters: seq[(uint32, Future[void])] -proc new*(T: type SendBufferTracker, currentWindow: uint32, maxRemoteWindow: uint32): T = - return SendBufferTracker(currentWindow: currentWindow, maxRemoteWindow: maxRemoteWindow, waiters: @[]) +proc new*( + T: type SendBufferTracker, + currentWindow: uint32, + maxRemoteWindow: uint32, + maxSndBufferSize: uint32): T = + return ( + SendBufferTracker( + currentWindow: currentWindow, + maxRemoteWindow: maxRemoteWindow, + maxSndBufferSize: maxSndBufferSize, + waiters: @[] + ) + ) proc currentFreeBytes*(t: SendBufferTracker): uint32 = - let maxSend = t.maxRemoteWindow + let maxSend = min(t.maxRemoteWindow, t.maxSndBufferSize) if (maxSend <= t.currentWindow): return 0 else: diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index 010cdb0..b683f52 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -57,6 +57,9 @@ type # Maximnal size of receive buffer in bytes optRcvBuffer*: uint32 + # Maximnal size of send buffer in bytes + optSndBuffer*: uint32 + # If set to some(`Duration`), the incoming socket will be initialized in # `SynRecv` state and the remote peer will have `Duration` to transfer data # to move the socket in `Connected` state. @@ -278,12 +281,14 @@ proc init*( dataResendsBeforeFailure: uint16 = defaultDataResendsBeforeFailure, optRcvBuffer: uint32 = defaultOptRcvBuffer, incomingSocketReceiveTimeout: Option[Duration] = some(defaultRcvRetransmitTimeout), - remoteWindowResetTimeout: Duration = defaultResetWindowTimeout + remoteWindowResetTimeout: Duration = defaultResetWindowTimeout, + optSndBuffer: uint32 = defaultOptRcvBuffer ): T = SocketConfig( initialSynTimeout: initialSynTimeout, dataResendsBeforeFailure: dataResendsBeforeFailure, optRcvBuffer: optRcvBuffer, + optSndBuffer: optSndBuffer, incomingSocketReceiveTimeout: incomingSocketReceiveTimeout, remoteWindowResetTimeout: remoteWindowResetTimeout ) @@ -560,7 +565,7 @@ proc new[A]( closeEvent: newAsyncEvent(), closeCallbacks: newSeq[Future[void]](), # start with 1mb assumption, field will be updated with first received packet - sendBufferTracker: SendBufferTracker.new(0, 1024 * 1024), + sendBufferTracker: SendBufferTracker.new(0, 1024 * 1024, cfg.optSndBuffer), # queue with infinite size writeQueue: newAsyncQueue[WriteRequest](), zeroWindowTimer: Moment.now() + cfg.remoteWindowResetTimeout, diff --git a/tests/utp/test_utp_socket.nim b/tests/utp/test_utp_socket.nim index 75d1295..805f20e 100644 --- a/tests/utp/test_utp_socket.nim +++ b/tests/utp/test_utp_socket.nim @@ -956,3 +956,51 @@ procSuite "Utp socket unit test": p.payload == someData await outgoingSocket.destroyWait() + + asyncTest "Writing data should respect max snd buffer option": + let q = newAsyncQueue[Packet]() + let initialRemoteSeq = 10'u16 + let someData1 = @[1'u8] + let somedata2 = @[2'u8] + let (outgoingSocket, initialPacket) = + connectOutGoingSocket( + initialRemoteSeq, + q, + cfg = SocketConfig.init( + optSndBuffer = 1 + ) + ) + + check: + outgoingSocket.isConnected() + + # snd buffer got 1 byte of space so this future shold finish + let write1 = await outgoingSocket.write(someData1) + + let writeFut2 = outgoingSocket.write(someData2) + + # wait until 2 re-sends to check we do not accidently free buffer during re-sends + discard await q.get() + discard await q.get() + let firstPacket = await q.get() + + check: + # this write still cannot progress as 1st write is not acked + not writeFut2.finished() + + let someAckFromRemote = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr + 1, 10) + + # acks first write, so there is space in buffer for new data and second + # write should progress + await outgoingSocket.processPacket(someAckFromRemote) + + yield writeFut2 + + let secondPacket = await q.get() + + check: + writeFut2.finished() + firstPacket.payload == someData1 + secondPacket.payload == somedata2 + + await outgoingSocket.destroyWait()