diff --git a/eth/utp/send_buffer_tracker.nim b/eth/utp/send_buffer_tracker.nim new file mode 100644 index 0000000..f15cfbd --- /dev/null +++ b/eth/utp/send_buffer_tracker.nim @@ -0,0 +1,78 @@ +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [Defect].} + +import + chronos + +# Internal Utp data structure to track send window and properly block when there is +# no free space when trying to send more bytes +type SendBufferTracker* = ref object + # number of payload bytes in-flight (i.e not counting header sizes) + # packets that have not yet been sent do not count, packets + # that are marked as needing to be re-sent (due to a timeout) + # don't count either + currentWindow*: uint32 + + # remote receive window updated based on packed wndSize field + maxRemoteWindow*: uint32 + waiters: seq[(uint32, Future[void])] + +proc new*(T: type SendBufferTracker, currentWindow: uint32, maxRemoteWindow: uint32): T = + return SendBufferTracker(currentWindow: currentWindow, maxRemoteWindow: maxRemoteWindow, waiters: @[]) + +proc currentFreeBytes*(t: SendBufferTracker): uint32 = + let maxSend = t.maxRemoteWindow + if (maxSend <= t.currentWindow): + return 0 + else: + return maxSend - t.currentWindow + +proc checkWaiters(t: SendBufferTracker) = + var i = 0 + while i < len(t.waiters): + let freeSpace = t.currentFreeBytes() + let (required, fut) = t.waiters[i] + if (required <= freeSpace): + # in case future was cancelled + if (not fut.finished()): + t.currentWindow = t.currentWindow + required + fut.complete() + t.waiters.del(i) + else: + # we do not have place for next waiter, just finish processing + return + +proc updateMaxRemote*(t: SendBufferTracker, newRemoteWindow: uint32) = + t.maxRemoteWindow = newRemoteWindow + t.checkWaiters() + +proc decreaseCurrentWindow*(t: SendBufferTracker, value: uint32, notifyWaiters: bool) = + doAssert(t.currentWindow >= value) + t.currentWindow = t.currentWindow - value + if (notifyWaiters): + t.checkWaiters() + +proc reserveNBytesWait*(t: SendBufferTracker, n: uint32): Future[void] = + let fut = newFuture[void]("SendBufferTracker.reserveNBytesWait") + let free = t.currentFreeBytes() + if (n <= free): + t.currentWindow = t.currentWindow + n + fut.complete() + else: + t.waiters.add((n, fut)) + fut + +proc reserveNBytes*(t: SendBufferTracker, n: uint32): bool = + let free = t.currentFreeBytes() + if (n <= free): + t.currentWindow = t.currentWindow + n + return true + else: + return false + +proc currentBytesInFlight*(t: SendBufferTracker): uint32 = t.currentWindow diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index e3c525a..010cdb0 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -10,6 +10,7 @@ import std/sugar, chronos, chronicles, bearssl, stew/results, + ./send_buffer_tracker, ./growable_buffer, ./packets @@ -21,8 +22,6 @@ type SynSent, SynRecv, Connected, - ConnectedFull, - Reset, Destroy ConnectionDirection = enum @@ -65,6 +64,34 @@ type # state and will be able to transfer data. incomingSocketReceiveTimeout*: Option[Duration] + # Timeout after which the send window will be reset to its minimal value after it dropped + # to zero. i.e when we received a packet from remote peer with `wndSize` set to 0. + remoteWindowResetTimeout*: Duration + + WriteErrorType* = enum + SocketNotWriteable, + FinSent + + WriteError* = object + case kind*: WriteErrorType + of SocketNotWriteable: + currentState*: ConnectionState + of FinSent: + discard + + WriteResult* = Result[int, WriteError] + + WriteRequestType = enum + Data, Close + + WriteRequest = object + case kind: WriteRequestType + of Data: + data: seq[byte] + writer: Future[WriteResult] + of Close: + discard + UtpSocket*[A] = ref object remoteAddress*: A state: ConnectionState @@ -133,6 +160,9 @@ type # we sent out fin packet finSent: bool + # we requested to close the socket by sending fin packet + sendFinRequested: bool + # have our fin been acked finAcked: bool @@ -145,11 +175,13 @@ type # sequence number of remoted fin packet eofPktNr: uint16 - # number payload bytes in-flight (i.e not countig header sizes) - # packets that have not yet been sent do not count, packets - # that are marked as needing to be re-sent (due to a timeout) - # don't count either - currentWindow: uint32 + sendBufferTracker: SendBufferTracker + + writeQueue: AsyncQueue[WriteRequest] + + writeLoop: Future[void] + + zeroWindowTimer: Moment # socket identifier socketKey*: UtpSocketKey[A] @@ -162,19 +194,6 @@ type ConnectionError* = object of CatchableError - WriteErrorType* = enum - SocketNotWriteable, - FinSent - - WriteError* = object - case kind*: WriteErrorType - of SocketNotWriteable: - currentState*: ConnectionState - of FinSent: - discard - - WriteResult* = Result[int, WriteError] - OutgoingConnectionErrorType* = enum SocketAlreadyExists, ConnectionTimedOut, ErrorWhileSendingSyn @@ -224,6 +243,15 @@ const # considered suspicious and ignored allowedAckWindow*: uint16 = 3 + # Timeout after which the send window will be reset to its minimal value after it dropped + # to zero. i.e when we received a packet from remote peer with `wndSize` set to 0. + defaultResetWindowTimeout = seconds(15) + + # If remote peer window drops to zero, then after some time we will reset it + # to this value even if we do not receive any more messages from remote peers. + # Reset period is configured in `SocketConfig` + minimalRemoteWindow: uint32 = 1500 + reorderBufferMaxSize = 1024 proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T = @@ -249,13 +277,15 @@ proc init*( initialSynTimeout: Duration = defaultInitialSynTimeout, dataResendsBeforeFailure: uint16 = defaultDataResendsBeforeFailure, optRcvBuffer: uint32 = defaultOptRcvBuffer, - incomingSocketReceiveTimeout: Option[Duration] = some(defaultRcvRetransmitTimeout) + incomingSocketReceiveTimeout: Option[Duration] = some(defaultRcvRetransmitTimeout), + remoteWindowResetTimeout: Duration = defaultResetWindowTimeout ): T = SocketConfig( initialSynTimeout: initialSynTimeout, dataResendsBeforeFailure: dataResendsBeforeFailure, optRcvBuffer: optRcvBuffer, - incomingSocketReceiveTimeout: incomingSocketReceiveTimeout + incomingSocketReceiveTimeout: incomingSocketReceiveTimeout, + remoteWindowResetTimeout: remoteWindowResetTimeout ) proc getRcvWindowSize(socket: UtpSocket): uint32 = @@ -290,10 +320,6 @@ proc sendAck(socket: UtpSocket): Future[void] = # Should be called before sending packet proc setSend(s: UtpSocket, p: var OutgoingPacket): seq[byte] = - - if (p.transmissions == 0 or p.needResend): - s.currentWindow = s.currentWindow + p.payloadLength - inc p.transmissions p.needResend = false p.timeSent = Moment.now() @@ -305,8 +331,12 @@ proc flushPackets(socket: UtpSocket) {.async.} = # sending only packet which were not transmitted yet or need a resend let shouldSendPacket = socket.outBuffer.exists(i, (p: OutgoingPacket) => (p.transmissions == 0 or p.needResend == true)) if (shouldSendPacket): - let toSend = socket.setSend(socket.outBuffer[i]) - await socket.sendData(toSend) + if socket.sendBufferTracker.reserveNBytes(socket.outBuffer[i].payloadLength): + let toSend = socket.setSend(socket.outBuffer[i]) + await socket.sendData(toSend) + else: + # there is no place in send buffer, stop flushing + return inc i proc markAllPacketAsLost(s: UtpSocket) = @@ -314,11 +344,13 @@ proc markAllPacketAsLost(s: UtpSocket) = while i < s.curWindowPackets: let packetSeqNr = s.seqNr - 1 - i - if (s.outBuffer.exists(packetSeqNr, (p: OutgoingPacket) => p. transmissions > 0 and p.needResend == false)): + if (s.outBuffer.exists(packetSeqNr, (p: OutgoingPacket) => p.transmissions > 0 and p.needResend == false)): s.outBuffer[packetSeqNr].needResend = true let packetPayloadLength = s.outBuffer[packetSeqNr].payloadLength - doAssert(s.currentWindow >= packetPayloadLength) - s.currentWindow = s.currentWindow - packetPayloadLength + # lack of waiters notification in case of timeout effectivly means that + # we do not allow any new bytes to enter snd buffer in case of new free space + # due to timeout. + s.sendBufferTracker.decreaseCurrentWindow(packetPayloadLength, notifyWaiters = false) inc i @@ -326,8 +358,7 @@ proc isOpened(socket:UtpSocket): bool = return ( socket.state == SynRecv or socket.state == SynSent or - socket.state == Connected or - socket.state == ConnectedFull + socket.state == Connected ) proc shouldDisconnectFromFailedRemote(socket: UtpSocket): bool = @@ -341,6 +372,11 @@ proc checkTimeouts(socket: UtpSocket) {.async.} = await socket.flushPackets() if socket.isOpened(): + + if (socket.sendBufferTracker.maxRemoteWindow == 0 and currentTime > socket.zeroWindowTimer): + debug "Reset remote window to minimal value" + socket.sendBufferTracker.updateMaxRemote(minimalRemoteWindow) + if (currentTime > socket.rtoTimeout): # TODO add handling of probe time outs. Reference implemenation has mechanism @@ -382,8 +418,10 @@ proc checkTimeouts(socket: UtpSocket) {.async.} = socket.outBuffer.get(oldestPacketSeqNr).isSome(), "oldest packet should always be available when there is data in flight" ) - let dataToSend = socket.setSend(socket.outBuffer[oldestPacketSeqNr]) - await socket.sendData(dataToSend) + let payloadLength = socket.outBuffer[oldestPacketSeqNr].payloadLength + if (socket.sendBufferTracker.reserveNBytes(payloadLength)): + let dataToSend = socket.setSend(socket.outBuffer[oldestPacketSeqNr]) + await socket.sendData(dataToSend) # TODO add sending keep alives when necessary @@ -399,6 +437,94 @@ proc checkTimeoutsLoop(s: UtpSocket) {.async.} = proc startTimeoutLoop(s: UtpSocket) = s.checkTimeoutsLoop = checkTimeoutsLoop(s) +proc getPacketSize*(socket: UtpSocket): int = + # TODO currently returning constant, ultimatly it should be bases on mtu estimates + mtuSize + +proc resetSendTimeout(socket: UtpSocket) = + socket.retransmitTimeout = socket.rto + socket.rtoTimeout = Moment.now() + socket.retransmitTimeout + +proc handleDataWrite(socket: UtpSocket, data: seq[byte], writeFut: Future[WriteResult]): Future[void] {.async.} = + if writeFut.finished(): + # write future was cancelled befere we got chance to process it, short circuit + # processing and move to next loop iteration + return + + let pSize = socket.getPacketSize() + let endIndex = data.high() + var i = 0 + var bytesWritten = 0 + let wndSize = socket.getRcvWindowSize() + + while i <= endIndex: + let lastIndex = i + pSize - 1 + let lastOrEnd = min(lastIndex, endIndex) + let dataSlice = data[i..lastOrEnd] + let payloadLength = uint32(len(dataSlice)) + try: + await socket.sendBufferTracker.reserveNBytesWait(payloadLength) + if socket.curWindowPackets == 0: + socket.resetSendTimeout() + + let dataPacket = dataPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, wndSize, dataSlice) + let outgoingPacket = OutgoingPacket.init(encodePacket(dataPacket), 1, false, payloadLength) + socket.registerOutgoingPacket(outgoingPacket) + await socket.sendData(outgoingPacket.packetBytes) + except CancelledError as exc: + # write loop has been cancelled in the middle of processing due to the + # socket closing + # this approach can create partial write in case destroyin socket in the + # the middle of the write + doAssert(socket.state == Destroy) + if (not writeFut.finished()): + let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state)) + writeFut.complete(res) + # we need to re-raise exception so the outer loop will be properly cancelled too + raise exc + bytesWritten = bytesWritten + len(dataSlice) + i = lastOrEnd + 1 + + # Before completeing future with success (as all data was sent sucessfuly) + # we need to check if user did not cancel write on his end + if (not writeFut.finished()): + writeFut.complete(Result[int, WriteError].ok(bytesWritten)) + +proc handleClose(socket: UtpSocket): Future[void] {.async.} = + try: + if socket.curWindowPackets == 0: + socket.resetSendTimeout() + + let finEncoded = encodePacket(finPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, socket.getRcvWindowSize())) + socket.registerOutgoingPacket(OutgoingPacket.init(finEncoded, 1, true, 0)) + await socket.sendData(finEncoded) + socket.finSent = true + except CancelledError as exc: + raise exc + +proc writeLoop(socket: UtpSocket): Future[void] {.async.} = + ## Loop that processes writes on socket + try: + while true: + let req = await socket.writeQueue.get() + case req.kind + of Data: + await socket.handleDataWrite(req.data, req.writer) + of Close: + await socket.handleClose() + + except CancelledError: + doAssert(socket.state == Destroy) + for req in socket.writeQueue.items: + if (req.kind == Data and not req.writer.finished()): + let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state)) + req.writer.complete(res) + socket.writeQueue.clear() + trace "writeLoop canceled" + +proc startWriteLoop(s: UtpSocket) = + s.writeLoop = writeLoop(s) + proc new[A]( T: type UtpSocket[A], to: A, @@ -433,6 +559,11 @@ proc new[A]( buffer: AsyncBuffer.init(int(cfg.optRcvBuffer)), closeEvent: newAsyncEvent(), closeCallbacks: newSeq[Future[void]](), + # start with 1mb assumption, field will be updated with first received packet + sendBufferTracker: SendBufferTracker.new(0, 1024 * 1024), + # queue with infinite size + writeQueue: newAsyncQueue[WriteRequest](), + zeroWindowTimer: Moment.now() + cfg.remoteWindowResetTimeout, socketKey: UtpSocketKey.init(to, rcvId), send: snd ) @@ -502,6 +633,7 @@ proc startOutgoingSocket*(socket: UtpSocket): Future[void] {.async.} = # initiliazation let outgoingPacket = OutgoingPacket.init(encodePacket(packet), 1, false, 0) socket.registerOutgoingPacket(outgoingPacket) + socket.startWriteLoop() socket.startTimeoutLoop() await socket.sendData(outgoingPacket.packetBytes) await socket.connectionFuture @@ -509,10 +641,11 @@ proc startOutgoingSocket*(socket: UtpSocket): Future[void] {.async.} = proc startIncomingSocket*(socket: UtpSocket) {.async.} = # Make sure ack was flushed before moving forward await socket.sendAck() + socket.startWriteLoop() socket.startTimeoutLoop() proc isConnected*(socket: UtpSocket): bool = - socket.state == Connected or socket.state == ConnectedFull + socket.state == Connected proc isClosed*(socket: UtpSocket): bool = socket.state == Destroy and socket.closeEvent.isSet() @@ -521,6 +654,7 @@ proc destroy*(s: UtpSocket) = ## Moves socket to destroy state and clean all reasources. ## Remote is not notified in any way about socket end of life s.state = Destroy + s.writeLoop.cancel() s.checkTimeoutsLoop.cancel() s.closeEvent.fire() @@ -606,8 +740,7 @@ proc ackPacket(socket: UtpSocket, seqNr: uint16): AckResult = # been considered timed-out, and is not included in # the cur_window anymore if (not packet.needResend): - doAssert(socket.currentWindow >= packet.payloadLength) - socket.currentWindow = socket.currentWindow - packet.payloadLength + socket.sendBufferTracker.decreaseCurrentWindow(packet.payloadLength, notifyWaiters = true) socket.retransmitCount = 0 PacketAcked @@ -696,6 +829,15 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = notice "Received packet is totally of the mark" return + # update remote window size + socket.sendBufferTracker.updateMaxRemote(p.header.wndSize) + + if (socket.sendBufferTracker.maxRemoteWindow == 0): + # when zeroWindowTimer will be hit and maxRemoteWindow still will be equal to 0 + # then it will be reset to minimal value + socket.zeroWindowTimer = Moment.now() + socket.socketConfig.remoteWindowResetTimeout + + # socket.curWindowPackets == acks means that this packet acked all remaining packets # including the sent fin packets if (socket.finSent and socket.curWindowPackets == acks): @@ -821,29 +963,24 @@ proc atEof*(socket: UtpSocket): bool = proc readingClosed(socket: UtpSocket): bool = socket.atEof() or socket.state == Destroy -proc getPacketSize(socket: UtpSocket): int = - # TODO currently returning constant, ultimatly it should be bases on mtu estimates - mtuSize - -proc resetSendTimeout(socket: UtpSocket) = - socket.retransmitTimeout = socket.rto - socket.rtoTimeout = Moment.now() + socket.retransmitTimeout - -proc close*(socket: UtpSocket) {.async.} = +proc close*(socket: UtpSocket) = ## Gracefully closes conneciton (send FIN) if socket is in connected state ## does not wait for socket to close if socket.state != Destroy: case socket.state - of Connected, ConnectedFull: + of Connected: socket.readShutdown = true - if (not socket.finSent): - if socket.curWindowPackets == 0: - socket.resetSendTimeout() - - let finEncoded = encodePacket(finPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, socket.getRcvWindowSize())) - socket.registerOutgoingPacket(OutgoingPacket.init(finEncoded, 1, true, 0)) - socket.finSent = true - await socket.sendData(finEncoded) + if (not socket.sendFinRequested): + try: + # with this approach, all pending writes will be executed before sending fin packet + # we could also and method which places close request as first one to process + # but it would complicate the write loop + socket.writeQueue.putNoWait(WriteRequest(kind: Close)) + except AsyncQueueFullError as e: + # should not happen as our write queue is unbounded + raiseAssert e.msg + + socket.sendFinRequested = true else: # In any other case like connection is not established so sending fin make # no sense, we can just out right close it @@ -855,46 +992,38 @@ proc closeWait*(socket: UtpSocket) {.async.} = ## Warning: if FIN packet for some reason will be lost, then socket will be closed ## due to retransmission failure which may take some time. ## default is 4 retransmissions with doubling of rto between each retranssmision - await socket.close() + socket.close() await socket.closeEvent.wait() -proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] {.async.} = - +proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] = + let retFuture = newFuture[WriteResult]("UtpSocket.write") + if (socket.state != Connected): - return err(WriteError(kind: SocketNotWriteable, currentState: socket.state)) + let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state)) + retFuture.complete(res) + return retFuture # fin should be last packet received by remote side, therefore trying to write # after sending fin is considered error - if socket.finSent: - return err(WriteError(kind: FinSent)) + if socket.sendFinRequested or socket.finSent: + let res = Result[int, WriteError].err(WriteError(kind: FinSent)) + retFuture.complete(res) + return retFuture var bytesWritten = 0 - # TODO - # Handle growing of send window - if len(data) == 0: - return ok(bytesWritten) + let res = Result[int, WriteError].ok(bytesWritten) + retFuture.complete(res) + return retFuture + + try: + socket.writeQueue.putNoWait(WriteRequest(kind: Data, data: data, writer: retFuture)) + except AsyncQueueFullError as e: + # this should not happen as out write queue is unbounded + raiseAssert e.msg - if socket.curWindowPackets == 0: - socket.resetSendTimeout() - - let pSize = socket.getPacketSize() - let endIndex = data.high() - var i = 0 - let wndSize = socket.getRcvWindowSize() - while i <= data.high: - let lastIndex = i + pSize - 1 - let lastOrEnd = min(lastIndex, endIndex) - let dataSlice = data[i..lastOrEnd] - let dataPacket = dataPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, wndSize, dataSlice) - let payloadLength = uint32(len(dataSlice)) - socket.registerOutgoingPacket(OutgoingPacket.init(encodePacket(dataPacket), 0, false, payloadLength)) - bytesWritten = bytesWritten + len(dataSlice) - i = lastOrEnd + 1 - await socket.flushPackets() - - return ok(bytesWritten) + return retFuture template readLoop(body: untyped): untyped = while true: @@ -953,7 +1082,7 @@ proc numPacketsInOutGoingBuffer*(socket: UtpSocket): int = num # Check how many payload bytes are still in flight -proc numOfBytesInFlight*(socket: UtpSocket): uint32 = socket.currentWindow +proc numOfBytesInFlight*(socket: UtpSocket): uint32 = socket.sendBufferTracker.currentBytesInFlight() # Check how many packets are still in the reorder buffer, usefull for tests or # debugging. diff --git a/tests/utp/test_utp_socket.nim b/tests/utp/test_utp_socket.nim index 3377a52..75d1295 100644 --- a/tests/utp/test_utp_socket.nim +++ b/tests/utp/test_utp_socket.nim @@ -61,6 +61,7 @@ procSuite "Utp socket unit test": template connectOutGoingSocket( initialRemoteSeq: uint16, q: AsyncQueue[Packet], + remoteReceiveBuffer: uint32 = testBufferSize, cfg: SocketConfig = SocketConfig.init()): (UtpSocket[TransportAddress], Packet) = let sock1 = newOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), cfg, defaultRcvOutgoingId, rng[]) asyncSpawn sock1.startOutgoingSocket() @@ -69,7 +70,7 @@ procSuite "Utp socket unit test": check: initialPacket.header.pType == ST_SYN - let responseAck = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize) + let responseAck = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, remoteReceiveBuffer) await sock1.processPacket(responseAck) @@ -313,6 +314,80 @@ procSuite "Utp socket unit test": await outgoingSocket.destroyWait() + asyncTest "Blocked writing futures should be properly finished when socket is closed": + let q = newAsyncQueue[Packet]() + let initialRemoteSeq = 10'u16 + + let dataToWrite1 = @[0'u8] + let dataToWrite2 = @[1'u8] + + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, 0) + + let writeFut1 = outgoingSocket.write(dataToWrite1) + let writeFut2 = outgoingSocket.write(dataToWrite2) + + # wait a little to show that futures are not progressing + await sleepAsync(seconds(1)) + + check: + not writeFut1.finished() + not writeFut2.finished() + + outgoingSocket.destroy() + + yield writeFut1 + yield writeFut2 + + check: + writeFut1.completed() + writeFut2.completed() + writeFut1.read().isErr() + writeFut2.read().isErr() + + await outgoingSocket.destroyWait() + + asyncTest "Cancelled write futures should not be processed if cancelled before processing": + let q = newAsyncQueue[Packet]() + let initialRemoteSeq = 10'u16 + + let dataToWrite1 = @[0'u8] + let dataToWrite2 = @[1'u8] + let dataToWrite3 = @[2'u8] + + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, 0) + + # only writeFut1 will progress as to processing stage, writeFut2 and writeFut3 + # will be blocked in queue + let writeFut1 = outgoingSocket.write(dataToWrite1) + let writeFut2 = outgoingSocket.write(dataToWrite2) + let writeFut3 = outgoingSocket.write(dataToWrite3) + + # user decided to cancel second write + await writeFut2.cancelAndWait() + # remote increased wnd size enough for all writes + let someAckFromRemote = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, 10) + + await outgoingSocket.processPacket(someAckFromRemote) + + yield writeFut1 + yield writeFut2 + yield writeFut3 + + check: + writeFut1.completed() + writeFut2.cancelled() + writeFut3.completed() + + let p1 = await q.get() + let p2 = await q.get + + check: + # we produce only two data packets as write with dataToWrite2 was cancelled + p1.payload == dataToWrite1 + p2.payload == dataToWrite3 + + await outgoingSocket.destroyWait() + asyncTest "Socket should re-send data packet configurable number of times before declaring failure": let q = newAsyncQueue[Packet]() let initalRemoteSeqNr = 10'u16 @@ -486,7 +561,7 @@ procSuite "Utp socket unit test": let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q) - await outgoingSocket.close() + outgoingSocket.close() let sendFin = await q.get() @@ -501,7 +576,7 @@ procSuite "Utp socket unit test": let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q) - let closeF = outgoingSocket.close() + outgoingSocket.close() let sendFin = await q.get() @@ -512,8 +587,6 @@ procSuite "Utp socket unit test": await outgoingSocket.processPacket(responseAck) - await closeF - check: not outgoingSocket.isConnected() @@ -544,7 +617,7 @@ procSuite "Utp socket unit test": let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q) - await outgoingSocket.close() + outgoingSocket.close() let writeResult = await outgoingSocket.write(@[1'u8]) @@ -564,7 +637,7 @@ procSuite "Utp socket unit test": let initialRcvBufferSize = 10'u32 let data = @[1'u8, 2'u8, 3'u8] let sCfg = SocketConfig.init(optRcvBuffer = initialRcvBufferSize) - let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeqNr, q, sCfg) + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeqNr, q, testBufferSize, sCfg) let dataP1 = dataPacket(initialRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data) @@ -585,7 +658,7 @@ procSuite "Utp socket unit test": sentData.header.pType == ST_DATA sentData.header.wndSize == initialRcvBufferSize - uint32(len(data)) - await outgoingSocket.close() + outgoingSocket.close() let sentFin = await q.get() @@ -601,7 +674,7 @@ procSuite "Utp socket unit test": let initialRcvBufferSize = 10'u32 let data = @[1'u8, 2'u8, 3'u8] let sCfg = SocketConfig.init(optRcvBuffer = initialRcvBufferSize) - let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q, sCfg) + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q, testBufferSize, sCfg) let dataP1 = dataPacket(initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data) @@ -746,3 +819,140 @@ procSuite "Utp socket unit test": int(outgoingSocket.numOfBytesInFlight) == len(dataToWrite) await outgoingSocket.destroyWait() + + asyncTest "Writing data should asynchronously block until there is enough space in snd buffer": + let q = newAsyncQueue[Packet]() + let initialRemoteSeq = 10'u16 + + let dataToWrite = @[1'u8, 2, 3, 4, 5] + + # remote is initialized with buffer to small to handle whole payload + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, uint32(len(dataToWrite) - 1)) + + let writeFut = outgoingSocket.write(dataToWrite) + + # wait some time to check future is not finished + await sleepAsync(seconds(2)) + + # write is not finished as future is blocked from progressing due to to small + # send window + check: + not writeFut.finished() + + let someAckFromRemote = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, uint32(len(dataToWrite))) + + await outgoingSocket.processPacket(someAckFromRemote) + + # after processing packet with increased buffer size write should complete and + # packet should be sent + let sentPacket = await q.get() + + check: + sentPacket.payload == dataToWrite + writeFut.finished() + + await outgoingSocket.destroyWait() + + asyncTest "Writing data should not progress in case of timeouting packets and small snd window": + let q = newAsyncQueue[Packet]() + let initialRemoteSeq = 10'u16 + + let dataToWrite = @[1'u8, 2, 3, 4, 5] + + # remote is initialized with buffer to small to handle whole payload + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q) + let remoteRcvWindowSize = uint32(outgoingSocket.getPacketSize()) + let someAckFromRemote = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, remoteRcvWindowSize) + + # we are using ack from remote to setup our snd window size to one packet size on one packet + await outgoingSocket.processPacket(someAckFromRemote) + + let twoPacketData = generateByteArray(rng[], int(2 * remoteRcvWindowSize)) + + let writeFut = outgoingSocket.write(twoPacketData) + + # after this time first packet will be send and will timeout, but the write should not + # finish, as timeouting packets do not notify writing about new space in snd + # buffer + await sleepAsync(seconds(2)) + + check: + not writeFut.finished() + + await outgoingSocket.destroyWait() + + asyncTest "Writing data should respect remote rcv window size": + let q = newAsyncQueue[Packet]() + let initialRemoteSeq = 10'u16 + + let dataToWrite = @[1'u8, 2, 3, 4, 5] + + # remote is initialized with buffer to small to handle whole payload + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q) + let remoteRcvWindowSize = uint32(outgoingSocket.getPacketSize()) + let someAckFromRemote = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, remoteRcvWindowSize) + + # we are using ack from remote to setup our snd window size to one packet size on one packet + await outgoingSocket.processPacket(someAckFromRemote) + + let twoPacketData = generateByteArray(rng[], int(2 * remoteRcvWindowSize)) + + let writeFut = outgoingSocket.write(twoPacketData) + + let firstAckFromRemote = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr + 1, remoteRcvWindowSize) + + let packet = await q.get() + + check: + packet.header.pType == ST_DATA + uint32(len(packet.payload)) == remoteRcvWindowSize + not writeFut.finished + + await outgoingSocket.processPacket(firstAckFromRemote) + + let packet1 = await q.get() + let writeResult = await writeFut + + check: + packet1.header.pType == ST_DATA + writeFut.finished + + await outgoingSocket.destroyWait() + + asyncTest "Remote window should be reseted to minimal value after configured amount of time": + let q = newAsyncQueue[Packet]() + let initialRemoteSeq = 10'u16 + let someData = @[1'u8] + let (outgoingSocket, packet) = + connectOutGoingSocket( + initialRemoteSeq, + q, + remoteReceiveBuffer = 0, + cfg = SocketConfig.init( + remoteWindowResetTimeout = seconds(3) + ) + ) + + check: + outgoingSocket.isConnected() + + let writeFut = outgoingSocket.write(someData) + + await sleepAsync(seconds(1)) + + check: + # Even after 1 second write is not finished as we did not receive any message + # so remote rcv window is still zero + not writeFut.finished() + + # Ultimately, after 3 second remote rcv window will be reseted to minimal value + # and write will be able to progress + let writeResult = await writeFut + + let p = await q.get() + + check: + writeResult.isOk() + p.payload == someData + + await outgoingSocket.destroyWait()