diff --git a/eth/utp/send_buffer_tracker.nim b/eth/utp/send_buffer_tracker.nim deleted file mode 100644 index 5c90b8e..0000000 --- a/eth/utp/send_buffer_tracker.nim +++ /dev/null @@ -1,107 +0,0 @@ -# Copyright (c) 2021 Status Research & Development GmbH -# Licensed and distributed under either of -# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). -# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). -# at your option. This file may not be copied, modified, or distributed except according to those terms. - -{.push raises: [Defect].} - -import - chronos - -# Internal Utp data structure to track send window and properly block when there is -# no free space when trying to send more bytes -type SendBufferTracker* = ref object - # number of payload bytes in-flight (i.e not counting header sizes) - # packets that have not yet been sent do not count, packets - # that are marked as needing to be re-sent (due to a timeout) - # don't count either - currentWindow*: uint32 - - # remote receive window updated based on packed wndSize field - maxRemoteWindow*: uint32 - - # maximum window size, in bytes, calculated by local congestion controller - maxWindow*: uint32 - - # configuration option for maxium number of bytes in snd buffer - maxSndBufferSize*: uint32 - waiters: seq[(uint32, Future[void])] - -proc new*( - T: type SendBufferTracker, - currentWindow: uint32, - maxRemoteWindow: uint32, - maxSndBufferSize: uint32, - maxWindow: uint32): T = - return ( - SendBufferTracker( - currentWindow: currentWindow, - maxRemoteWindow: maxRemoteWindow, - maxSndBufferSize: maxSndBufferSize, - maxWindow: maxWindow, - waiters: @[] - ) - ) - -proc currentFreeBytes*(t: SendBufferTracker): uint32 = - let maxSend = min(min(t.maxRemoteWindow, t.maxSndBufferSize), t.maxWindow) - if (maxSend <= t.currentWindow): - return 0 - else: - return maxSend - t.currentWindow - -proc notifyWaiters*(t: SendBufferTracker) = - var i = 0 - while i < len(t.waiters): - let freeSpace = t.currentFreeBytes() - let (required, fut) = t.waiters[i] - if (required <= freeSpace): - # in case future was cancelled - if (not fut.finished()): - t.currentWindow = t.currentWindow + required - fut.complete() - t.waiters.del(i) - else: - # we do not have place for next waiter, just finish processing - return - -proc updateMaxRemote*(t: SendBufferTracker, newRemoteWindow: uint32) = - t.maxRemoteWindow = newRemoteWindow - t.notifyWaiters() - -proc updateMaxWindow*(t: SendBufferTracker, maxWindow: uint32) = - t.maxWindow = maxWindow - t.notifyWaiters() - -proc updateMaxWindowSize*(t: SendBufferTracker, newRemoteWindow: uint32, maxWindow: uint32) = - t.maxRemoteWindow = newRemoteWindow - t.maxWindow = maxWindow - t.notifyWaiters() - -proc decreaseCurrentWindow*(t: SendBufferTracker, value: uint32) = - doAssert(t.currentWindow >= value) - t.currentWindow = t.currentWindow - value - -proc reserveNBytesWait*(t: SendBufferTracker, n: uint32): Future[void] = - let fut = newFuture[void]("SendBufferTracker.reserveNBytesWait") - let free = t.currentFreeBytes() - if (n <= free): - t.currentWindow = t.currentWindow + n - fut.complete() - else: - t.waiters.add((n, fut)) - fut - -proc reserveNBytes*(t: SendBufferTracker, n: uint32): bool = - let free = t.currentFreeBytes() - if (n <= free): - t.currentWindow = t.currentWindow + n - return true - else: - return false - -proc forceReserveNBytes*(t: SendBufferTracker, n: uint32) = - t.currentWindow = t.currentWindow + n - -proc currentBytesInFlight*(t: SendBufferTracker): uint32 = t.currentWindow diff --git a/eth/utp/utp_router.nim b/eth/utp/utp_router.nim index 3fa301f..2a5fb64 100644 --- a/eth/utp/utp_router.nim +++ b/eth/utp/utp_router.nim @@ -168,7 +168,7 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}= # Initial ackNr is set to incoming packer seqNr let incomingSocket = newIncomingSocket[A](sender, r.sendCb, r.socketConfig ,p.header.connectionId, p.header.seqNr, r.rng[]) r.registerUtpSocket(incomingSocket) - await incomingSocket.startIncomingSocket() + incomingSocket.startIncomingSocket() # Based on configuration, socket is passed to upper layer either in SynRecv # or Connected state info "Accepting incoming connection", @@ -235,12 +235,6 @@ proc connect[A](s: UtpSocket[A]): Future[ConnectionResult[A]] {.async.}= to = s.socketKey s.destroy() return err(OutgoingConnectionError(kind: ConnectionTimedOut)) - except CatchableError as e: - info "Outgoing connection failed due to send error", - to = s.socketKey - s.destroy() - # this may only happen if user provided callback will for some reason fail - return err(OutgoingConnectionError(kind: ErrorWhileSendingSyn, error: e)) # Connect to provided address # Reference implementation: https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732 diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index ea41d41..fa5bf20 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -7,10 +7,9 @@ {.push raises: [Defect].} import - std/sugar, + std/[sugar, deques], chronos, chronicles, bearssl, stew/[results, bitops2], - ./send_buffer_tracker, ./growable_buffer, ./packets, ./ledbat_congestion_control, @@ -78,6 +77,9 @@ type # to zero. i.e when we received a packet from remote peer with `wndSize` set to 0. remoteWindowResetTimeout*: Duration + # Size of reorder buffer calculated as fraction of optRcvBuffer + maxSizeOfReorderBuffer: uint32 + WriteErrorType* = enum SocketNotWriteable, FinSent @@ -102,6 +104,31 @@ type of Close: discard + SocketEventType = enum + NewPacket, CheckTimeouts, CloseReq, WriteReq, ReadReqType + + ReadReq = object + bytesToRead: int + bytesAvailable: seq[uint8] + reader: Future[seq[uint8]] + + ReadResult = enum + ReadCancelled, ReadFinished, ReadNotFinished, SocketAlreadyFinished + + SocketEvent = object + case kind: SocketEventType + of CheckTimeouts: + discard + of NewPacket: + packet: Packet + of CloseReq: + discard + of WriteReq: + data: seq[byte] + writer: Future[WriteResult] + of ReadReqType: + readReq: ReadReq + UtpSocket*[A] = ref object remoteAddress*: A state: ConnectionState @@ -129,9 +156,24 @@ type # out going buffer for all send packets outBuffer: GrowableCircularBuffer[OutgoingPacket] + # current number of bytes in send buffer + outBufferBytes: uint32 + + # current number of bytes in flight + currentWindow: uint32 + + # current max window broadcasted by remote peer + maxRemoteWindow: uint32 + + # current max window calculated by ledbat congestion controller + maxWindow: uint32 + # incoming buffer for out of order packets inBuffer: GrowableCircularBuffer[Packet] + # number of bytes in reorder buffer + inBufferBytes: uint32 + # Number of packets waiting in reorder buffer reorderCount: uint16 @@ -150,7 +192,13 @@ type rtoTimeout: Moment # rcvBuffer - buffer: AsyncBuffer + rcvBuffer: seq[byte] + + # current size of rcv buffer + offset: int + + # readers waiting for data + pendingReads: Deque[ReadReq] # loop called every 500ms to check for on going timeout status checkTimeoutsLoop: Future[void] @@ -185,11 +233,11 @@ type # sequence number of remoted fin packet eofPktNr: uint16 - sendBufferTracker: SendBufferTracker + pendingWrites: Deque[WriteRequest] - writeQueue: AsyncQueue[WriteRequest] + eventQueue: AsyncQueue[SocketEvent] - writeLoop: Future[void] + eventLoop: Future[void] # timer which is started when peer max window drops below current packet size zeroWindowTimer: Option[Moment] @@ -239,12 +287,10 @@ type ConnectionError* = object of CatchableError OutgoingConnectionErrorType* = enum - SocketAlreadyExists, ConnectionTimedOut, ErrorWhileSendingSyn + SocketAlreadyExists, ConnectionTimedOut OutgoingConnectionError* = object case kind*: OutgoingConnectionErrorType - of ErrorWhileSendingSyn: - error*: ref CatchableError of SocketAlreadyExists, ConnectionTimedOut: discard @@ -311,6 +357,16 @@ const # minimal time before subseqent window decays maxWindowDecay = milliseconds(100) + # Maximal size of reorder buffer as fraction of optRcvBuffer size following + # semantics apply bases on rcvBuffer set to 1000 bytes: + # if there are already 1000 bytes in rcv buffer no more bytes will be accepted to reorder buffer + # if there are already 500 bytes in reoreder buffer, no more bytes will be accepted + # to it, and only 500 bytes can be accepted to rcv buffer + # this way there is always a space in rcv buffer to fit new data if the reordering + # happens + maxReorderBufferSize = 0.5 + + proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T = UtpSocketKey[A](remoteAddress: remoteAddress, rcvId: rcvId) @@ -338,17 +394,28 @@ proc init*( remoteWindowResetTimeout: Duration = defaultResetWindowTimeout, optSndBuffer: uint32 = defaultOptRcvBuffer ): T = + # TODO make sure optRcvBuffer is nicely divisible by maxReorderBufferSize + let reorderBufferSize = uint32(maxReorderBufferSize * float64(optRcvBuffer)) SocketConfig( initialSynTimeout: initialSynTimeout, dataResendsBeforeFailure: dataResendsBeforeFailure, optRcvBuffer: optRcvBuffer, optSndBuffer: optSndBuffer, incomingSocketReceiveTimeout: incomingSocketReceiveTimeout, - remoteWindowResetTimeout: remoteWindowResetTimeout + remoteWindowResetTimeout: remoteWindowResetTimeout, + maxSizeOfReorderBuffer: reorderBufferSize ) +# number of bytes which will fit in current send window +proc freeWindowBytes(socket: UtpSocket): uint32 = + let maxSend = min(socket.maxRemoteWindow, socket.maxWindow) + if (maxSend <= socket.currentWindow): + return 0 + else: + return maxSend - socket.currentWindow + proc getRcvWindowSize(socket: UtpSocket): uint32 = - let currentDataSize = socket.buffer.dataLen() + let currentDataSize = socket.offset if currentDataSize > int(socket.socketConfig.optRcvBuffer): 0'u32 else: @@ -358,42 +425,58 @@ proc registerOutgoingPacket(socket: UtpSocket, oPacket: OutgoingPacket) = ## Adds packet to outgoing buffer and updates all related fields socket.outBuffer.ensureSize(socket.seqNr, socket.curWindowPackets) socket.outBuffer.put(socket.seqNr, oPacket) + socket.outBufferBytes = socket.outBufferBytes + oPacket.payloadLength inc socket.seqNr inc socket.curWindowPackets -proc sendData(socket: UtpSocket, data: seq[byte]): Future[void] = +proc sendData(socket: UtpSocket, data: seq[byte]) = let f = socket.send(socket.remoteAddress, data) f.callback = proc(data: pointer) {.gcsafe.} = if f.failed: warn "UTP send failed", msg = f.readError.msg - return f -# Should be called before sending packet -proc setSend(s: UtpSocket, p: var OutgoingPacket): seq[byte] = - let timestampInfo = getMonoTimestamp() +proc sendPacket(socket: UtpSocket, seqNr: uint16) = + proc setSend(p: var OutgoingPacket): seq[byte] = + let timestampInfo = getMonoTimestamp() - inc p.transmissions - p.needResend = false - p.timeSent = timestampInfo.moment - # all bytearrays in outgoing buffer should be properly encoded utp packets - # so it is safe to directly modify fields - modifyTimeStampAndAckNr(p.packetBytes, timestampInfo.timestamp, s.ackNr) + if p.transmissions == 0 or p.needResend: + socket.currentWindow = socket.currentWindow + p.payloadLength - return p.packetBytes + inc p.transmissions + p.needResend = false + p.timeSent = timestampInfo.moment + # all bytearrays in outgoing buffer should be properly encoded utp packets + # so it is safe to directly modify fields + modifyTimeStampAndAckNr(p.packetBytes, timestampInfo.timestamp, socket.ackNr) -proc flushPackets(socket: UtpSocket) {.async.} = - var i: uint16 = socket.seqNr - socket.curWindowPackets + return p.packetBytes + + socket.sendData(setSend(socket.outBuffer[seqNr])) + +proc resetSendTimeout(socket: UtpSocket) = + socket.retransmitTimeout = socket.rto + socket.rtoTimeout = getMonoTimestamp().moment + socket.retransmitTimeout + +proc flushPackets(socket: UtpSocket) = + let oldestOutgoingPacketSeqNr = socket.seqNr - socket.curWindowPackets + var i: uint16 = oldestOutgoingPacketSeqNr while i != socket.seqNr: # sending only packet which were not transmitted yet or need a resend let shouldSendPacket = socket.outBuffer.exists(i, (p: OutgoingPacket) => (p.transmissions == 0 or p.needResend == true)) if (shouldSendPacket): - if socket.sendBufferTracker.reserveNBytes(socket.outBuffer[i].payloadLength): - debug "Resending packet during flush", + if (socket.freeWindowBytes() > 0): + # this our first send packet reset rto timeout + if i == oldestOutgoingPacketSeqNr and socket.curWindowPackets == 1 and socket.outBuffer[i].transmissions == 0: + socket.resetSendTimeout() + + debug "Flushing packet", pkSeqNr = i - let toSend = socket.setSend(socket.outBuffer[i]) - await socket.sendData(toSend) + socket.sendPacket(i) else: - debug "Should resend packet during flush but there is no place in send buffer", + debug "Should resend packet during flush but there is no place in send window", + currentBytesWindow = socket.currentWindow, + maxRemoteWindow = socket.maxRemoteWindow, + maxWindow = socket.maxWindow, pkSeqNr = i # there is no place in send buffer, stop flushing return @@ -409,10 +492,8 @@ proc markAllPacketAsLost(s: UtpSocket) = pkSeqNr = packetSeqNr s.outBuffer[packetSeqNr].needResend = true let packetPayloadLength = s.outBuffer[packetSeqNr].payloadLength - # lack of waiters notification in case of timeout effectivly means that - # we do not allow any new bytes to enter snd buffer in case of new free space - # due to timeout. - s.sendBufferTracker.decreaseCurrentWindow(packetPayloadLength) + doAssert(s.currentWindow >= packetPayloadLength, "Window should always be larger than packet length") + s.currentWindow = s.currentWindow - packetPayloadLength inc i @@ -427,44 +508,29 @@ proc shouldDisconnectFromFailedRemote(socket: UtpSocket): bool = (socket.state == SynSent and socket.retransmitCount >= 2) or (socket.retransmitCount >= socket.socketConfig.dataResendsBeforeFailure) -# Forces asynchronous re-send of packet waiting in outgoing buffer -proc forceResendPacket(socket: UtpSocket, pkSeqNr: uint16) = - doAssert( - socket.outBuffer.get(pkSeqNr).isSome(), - "Force resend should be called only on packet still in outgoing buffer" - ) - if socket.outBuffer[pkSeqNr].needResend: - # if needResend is set to true it means that packet payload was already - # removed from the bytes window and need to be re-added. - socket.sendBufferTracker.forceReserveNBytes(socket.outBuffer[pkSeqNr].payloadLength) - - let data = socket.setSend(socket.outBuffer[pkSeqNr]) - discard socket.sendData(data) - -proc checkTimeouts(socket: UtpSocket) {.async.} = +proc checkTimeouts(socket: UtpSocket) = let currentTime = getMonoTimestamp().moment # flush all packets which needs to be re-send if socket.state != Destroy: - await socket.flushPackets() + socket.flushPackets() if socket.isOpened(): let currentPacketSize = uint32(socket.getPacketSize()) if (socket.zeroWindowTimer.isSome() and currentTime > socket.zeroWindowTimer.unsafeGet()): - if socket.sendBufferTracker.maxRemoteWindow <= currentPacketSize: - socket.sendBufferTracker.updateMaxRemote(minimalRemoteWindow) + if socket.maxRemoteWindow <= currentPacketSize: + socket.maxRemoteWindow = minimalRemoteWindow socket.zeroWindowTimer = none[Moment]() debug "Reset remote window to minimal value", minRemote = minimalRemoteWindow - if (currentTime > socket.rtoTimeout): debug "CheckTimeouts rto timeout", socketKey = socket.socketKey, state = socket.state, - maxWindow = socket.sendBufferTracker.maxWindow, + maxWindow = socket.maxWindow, curWindowPackets = socket.curWindowPackets, - curWindowBytes = socket.sendBufferTracker.currentWindow + curWindowBytes = socket.currentWindow # TODO add handling of probe time outs. Reference implemenation has mechanism # of sending probes to determine mtu size. Probe timeouts do not count to standard @@ -494,39 +560,29 @@ proc checkTimeouts(socket: UtpSocket) {.async.} = # on timeout reset duplicate ack counter socket.duplicateAck = 0 - - - if (socket.curWindowPackets == 0 and socket.sendBufferTracker.maxWindow > currentPacketSize): + if (socket.curWindowPackets == 0 and socket.maxWindow > currentPacketSize): # there are no packets in flight even though there is place for more than whole packet # this means connection is just idling. Reset window by 1/3'rd but no more # than to fit at least one packet. - let oldMaxWindow = socket.sendBufferTracker.maxWindow + let oldMaxWindow = socket.maxWindow let newMaxWindow = max((oldMaxWindow * 2) div 3, currentPacketSize) debug "Decaying max window due to socket idling", oldMaxWindow = oldMaxWindow, newMaxWindow = newMaxWindow - - socket.sendBufferTracker.updateMaxWindowSize( - # maxRemote window does not change - socket.sendBufferTracker.maxRemoteWindow, - newMaxWindow - ) - elif (socket.sendBufferTracker.maxWindow < currentPacketSize): + + socket.maxWindow = newMaxWindow + elif (socket.maxWindow < currentPacketSize): # due to high delay window has shrunk below packet size # which means that we cannot send more data # reset it to fit at least one packet debug "Reseting window size do fit a least one packet", - oldWindowSize = socket.sendBufferTracker.maxWindow, + oldWindowSize = socket.maxWindow, newWindowSize = currentPacketSize # delay was so high that window has shrunk below one packet. Reset window # to fit a least one packet and start with slow start - socket.sendBufferTracker.updateMaxWindowSize( - # maxRemote window does not change - socket.sendBufferTracker.maxRemoteWindow, - currentPacketSize - ) + socket.maxWindow = currentPacketSize socket.slowStart = true # This will have much more sense when we will add handling of selective acks @@ -534,10 +590,9 @@ proc checkTimeouts(socket: UtpSocket) {.async.} = # from out buffer. markAllPacketAsLost(socket) - # resend oldest packet if there are some packets in flight - if (socket.curWindowPackets > 0): - let oldestPacketSeqNr = socket.seqNr - socket.curWindowPackets - + let oldestPacketSeqNr = socket.seqNr - socket.curWindowPackets + # resend oldest packet if there are some packets in flight, and oldestpacket was already sent + if (socket.curWindowPackets > 0 and socket.outBuffer[oldestPacketSeqNr].transmissions > 0): inc socket.retransmitCount socket.fastTimeout = true @@ -548,8 +603,8 @@ proc checkTimeouts(socket: UtpSocket) {.async.} = # Oldest packet should always be present, so it is safe to call force # resend - socket.forceResendPacket(oldestPacketSeqNr) - + socket.sendPacket(oldestPacketSeqNr) + # TODO add sending keep alives when necessary proc checkTimeoutsLoop(s: UtpSocket) {.async.} = @@ -557,7 +612,7 @@ proc checkTimeoutsLoop(s: UtpSocket) {.async.} = try: while true: await sleepAsync(checkTimeoutsLoopInterval) - await s.checkTimeouts() + await s.eventQueue.put(SocketEvent(kind: CheckTimeouts)) except CancelledError: trace "checkTimeoutsLoop canceled" @@ -568,16 +623,7 @@ proc getPacketSize*(socket: UtpSocket): int = # TODO currently returning constant, ultimatly it should be bases on mtu estimates mtuSize -proc resetSendTimeout(socket: UtpSocket) = - socket.retransmitTimeout = socket.rto - socket.rtoTimeout = getMonoTimestamp().moment + socket.retransmitTimeout - -proc handleDataWrite(socket: UtpSocket, data: seq[byte], writeFut: Future[WriteResult]): Future[void] {.async.} = - if writeFut.finished(): - # write future was cancelled befere we got chance to process it, short circuit - # processing and move to next loop iteration - return - +proc handleDataWrite(socket: UtpSocket, data: seq[byte]): int = let pSize = socket.getPacketSize() let endIndex = data.high() var i = 0 @@ -588,14 +634,9 @@ proc handleDataWrite(socket: UtpSocket, data: seq[byte], writeFut: Future[WriteR let lastOrEnd = min(lastIndex, endIndex) let dataSlice = data[i..lastOrEnd] let payloadLength = uint32(len(dataSlice)) - try: - await socket.sendBufferTracker.reserveNBytesWait(payloadLength) - - if socket.curWindowPackets == 0: - socket.resetSendTimeout() + if (socket.outBufferBytes + payloadLength <= socket.socketConfig.optSndBuffer): let wndSize = socket.getRcvWindowSize() - let dataPacket = dataPacket( socket.seqNr, @@ -605,202 +646,35 @@ proc handleDataWrite(socket: UtpSocket, data: seq[byte], writeFut: Future[WriteR dataSlice, socket.replayMicro ) - let outgoingPacket = OutgoingPacket.init(encodePacket(dataPacket), 1, false, payloadLength) + let outgoingPacket = OutgoingPacket.init(encodePacket(dataPacket), 0, false, payloadLength) socket.registerOutgoingPacket(outgoingPacket) - await socket.sendData(outgoingPacket.packetBytes) - except CancelledError as exc: - # write loop has been cancelled in the middle of processing due to the - # socket closing - # this approach can create partial write in when destroying the socket in the - # the middle of the write - doAssert(socket.state == Destroy) - if (not writeFut.finished()): - let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state)) - writeFut.complete(res) - # we need to re-raise exception so the outer loop will be properly cancelled too - raise exc - bytesWritten = bytesWritten + len(dataSlice) + bytesWritten = bytesWritten + len(dataSlice) + socket.flushPackets() + else: + debug "No more place in write buffer", + currentBufferSize = socket.outBufferBytes, + maxBufferSize = socket.socketConfig.optSndBuffer, + nexPacketSize = payloadLength + break + i = lastOrEnd + 1 - # Before completing the future with success (as all data was sent successfully) - # we need to check if user did not cancel write on his end - if (not writeFut.finished()): - writeFut.complete(Result[int, WriteError].ok(bytesWritten)) + return bytesWritten -proc handleClose(socket: UtpSocket): Future[void] {.async.} = - try: - if socket.curWindowPackets == 0: - socket.resetSendTimeout() - - let finEncoded = - encodePacket( - finPacket( - socket.seqNr, - socket.connectionIdSnd, - socket.ackNr, - socket.getRcvWindowSize(), - socket.replayMicro - ) +proc handleClose(socket: UtpSocket) = + let finEncoded = + encodePacket( + finPacket( + socket.seqNr, + socket.connectionIdSnd, + socket.ackNr, + socket.getRcvWindowSize(), + socket.replayMicro ) - socket.registerOutgoingPacket(OutgoingPacket.init(finEncoded, 1, true, 0)) - await socket.sendData(finEncoded) - socket.finSent = true - except CancelledError as exc: - raise exc - -proc writeLoop(socket: UtpSocket): Future[void] {.async.} = - ## Loop that processes writes on socket - try: - while true: - let req = await socket.writeQueue.get() - case req.kind - of Data: - await socket.handleDataWrite(req.data, req.writer) - info "Written data to remote", - to = socket.socketKey, - bytesWritten = len(req.data) - of Close: - await socket.handleClose() - info "Sent FIN to remote", - to = socket.socketKey - - except CancelledError: - doAssert(socket.state == Destroy) - for req in socket.writeQueue.items: - if (req.kind == Data and not req.writer.finished()): - let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state)) - req.writer.complete(res) - socket.writeQueue.clear() - trace "writeLoop canceled" - -proc startWriteLoop(s: UtpSocket) = - s.writeLoop = writeLoop(s) - -proc new[A]( - T: type UtpSocket[A], - to: A, - snd: SendCallback[A], - state: ConnectionState, - cfg: SocketConfig, - direction: ConnectionDirection, - rcvId: uint16, - sndId: uint16, - initialSeqNr: uint16, - initialAckNr: uint16, - initialTimeout: Duration -): T = - let currentTime = getMonoTimestamp().moment - T( - remoteAddress: to, - state: state, - direction: direction, - socketConfig: cfg, - connectionIdRcv: rcvId, - connectionIdSnd: sndId, - seqNr: initialSeqNr, - ackNr: initialAckNr, - connectionFuture: newFuture[void](), - outBuffer: GrowableCircularBuffer[OutgoingPacket].init(), - inBuffer: GrowableCircularBuffer[Packet].init(), - retransmitTimeout: initialTimeout, - rtoTimeout: currentTime + initialTimeout, - # Initial timeout values taken from reference implemntation - rtt: milliseconds(0), - rttVar: milliseconds(800), - rto: milliseconds(3000), - buffer: AsyncBuffer.init(int(cfg.optRcvBuffer)), - closeEvent: newAsyncEvent(), - closeCallbacks: newSeq[Future[void]](), - # start with 1mb assumption, field will be updated with first received packet - sendBufferTracker: SendBufferTracker.new(0, 1024 * 1024, cfg.optSndBuffer, startMaxWindow), - # queue with infinite size - writeQueue: newAsyncQueue[WriteRequest](), - zeroWindowTimer: none[Moment](), - socketKey: UtpSocketKey.init(to, rcvId), - slowStart: true, - fastTimeout: false, - fastResendSeqNr: initialSeqNr, - lastWindowDecay: currentTime - maxWindowDecay, - slowStartTreshold: cfg.optSndBuffer, - ourHistogram: DelayHistogram.init(currentTime), - remoteHistogram: DelayHistogram.init(currentTime), - driftCalculator: ClockDriftCalculator.init(currentTime), - send: snd - ) - -proc newOutgoingSocket*[A]( - to: A, - snd: SendCallback[A], - cfg: SocketConfig, - rcvConnectionId: uint16, - rng: var BrHmacDrbgContext -): UtpSocket[A] = - let sndConnectionId = rcvConnectionId + 1 - let initialSeqNr = randUint16(rng) - - UtpSocket[A].new( - to, - snd, - SynSent, - cfg, - Outgoing, - rcvConnectionId, - sndConnectionId, - initialSeqNr, - # Initialy ack nr is 0, as we do not know remote inital seqnr - 0, - cfg.initialSynTimeout - ) - -proc newIncomingSocket*[A]( - to: A, - snd: SendCallback[A], - cfg: SocketConfig, - connectionId: uint16, - ackNr: uint16, - rng: var BrHmacDrbgContext -): UtpSocket[A] = - let initialSeqNr = randUint16(rng) - - let (initialState, initialTimeout) = - if (cfg.incomingSocketReceiveTimeout.isNone()): - # it does not matter what timeout value we put here, as socket will be in - # connected state without outgoing packets in buffer so any timeout hit will - # just double rto without any penalties - # although we cannont use 0, as then timeout will be constantly re-set to 500ms - # and there will be a lot of not usefull work done - (Connected, defaultInitialSynTimeout) - else: - let timeout = cfg.incomingSocketReceiveTimeout.unsafeGet() - (SynRecv, timeout) - - UtpSocket[A].new( - to, - snd, - initialState, - cfg, - Incoming, - connectionId + 1, - connectionId, - initialSeqNr, - ackNr, - initialTimeout - ) - -proc startOutgoingSocket*(socket: UtpSocket): Future[void] {.async.} = - doAssert(socket.state == SynSent) - let packet = synPacket(socket.seqNr, socket.connectionIdRcv, socket.getRcvWindowSize()) - debug "Sending SYN packet", - seqNr = packet.header.seqNr, - connectionId = packet.header.connectionId - # set number of transmissions to 1 as syn packet will be send just after - # initiliazation - let outgoingPacket = OutgoingPacket.init(encodePacket(packet), 1, false, 0) - socket.registerOutgoingPacket(outgoingPacket) - socket.startWriteLoop() - socket.startTimeoutLoop() - await socket.sendData(outgoingPacket.packetBytes) - await socket.connectionFuture + ) + socket.finSent = true + socket.registerOutgoingPacket(OutgoingPacket.init(finEncoded, 0, false, 0)) + socket.flushPackets() proc isConnected*(socket: UtpSocket): bool = socket.state == Connected @@ -814,7 +688,7 @@ proc destroy*(s: UtpSocket) = ## Moves socket to destroy state and clean all reasources. ## Remote is not notified in any way about socket end of life s.state = Destroy - s.writeLoop.cancel() + s.eventLoop.cancel() s.checkTimeoutsLoop.cancel() s.closeEvent.fire() @@ -897,10 +771,11 @@ proc ackPacket(socket: UtpSocket, seqNr: uint16, currentTime: Moment): AckResult # been considered timed-out, and is not included in # the cur_window anymore if (not packet.needResend): - socket.sendBufferTracker.decreaseCurrentWindow(packet.payloadLength) + doAssert(socket.currentWindow >= packet.payloadLength, "Window should always be larger than packet length") + socket.currentWindow = socket.currentWindow - packet.payloadLength - # we notify all waiters that there is possibly new space in send buffer - socket.sendBufferTracker.notifyWaiters() + # we removed packet from our out going buffer + socket.outBufferBytes = socket.outBufferBytes - packet.payloadLength socket.retransmitCount = 0 PacketAcked @@ -1003,13 +878,13 @@ proc calculateSelectiveAckBytes*(socket: UtpSocket, receivedPackedAckNr: uint16 proc tryDecayWindow(socket: UtpSocket, now: Moment) = if (now - socket.lastWindowDecay >= maxWindowDecay): socket.lastWindowDecay = now - let newMaxWindow = max(uint32(0.5 * float64(socket.sendBufferTracker.maxWindow)), uint32(minWindowSize)) + let newMaxWindow = max(uint32(0.5 * float64(socket.maxWindow)), uint32(minWindowSize)) debug "Decaying maxWindow", - oldWindow = socket.sendBufferTracker.maxWindow, + oldWindow = socket.maxWindow, newWindow = newMaxWindow - socket.sendBufferTracker.updateMaxWindow(newMaxWindow) + socket.maxWindow = newMaxWindow socket.slowStart = false socket.slowStartTreshold = newMaxWindow @@ -1094,7 +969,7 @@ proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: S registerLoss = true # it is safe to call as we already checked that packet is in send buffer - socket.forceResendPacket(seqNrToResend) + socket.sendPacket(seqNrToResend) socket.fastResendSeqNr = seqNrToResend + 1 debug "Resent packet", @@ -1151,7 +1026,7 @@ proc generateAckPacket*(socket: UtpSocket): Packet = bitmask ) -proc sendAck(socket: UtpSocket): Future[void] = +proc sendAck(socket: UtpSocket) = ## Creates and sends ack, based on current socket state. Acks are different from ## other packets as we do not track them in outgoing buffet @@ -1164,18 +1039,9 @@ proc sendAck(socket: UtpSocket): Future[void] = socket.sendData(encodePacket(ackPacket)) -proc startIncomingSocket*(socket: UtpSocket) {.async.} = - # Make sure ack was flushed before moving forward - await socket.sendAck() - socket.startWriteLoop() - socket.startTimeoutLoop() - # TODO at socket level we should handle only FIN/DATA/ACK packets. Refactor to make # it enforcable by type system -# TODO re-think synchronization of this procedure, as each await inside gives control -# to scheduler which means there could be potentialy several processPacket procs -# running -proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = +proc processPacketInternal(socket: UtpSocket, p: Packet) = debug "Process packet", socketKey = socket.socketKey, @@ -1267,7 +1133,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = let isPossibleDuplicatedOldPacket = pastExpected >= (int(uint16.high) + 1) - reorderBufferMaxSize if (isPossibleDuplicatedOldPacket and p.header.pType != ST_STATE): - discard socket.sendAck() + socket.sendAck() debug "Got an invalid packet sequence number, too far off", pastExpected = pastExpected @@ -1326,7 +1192,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = let currentPacketSize = uint32(socket.getPacketSize()) let (newMaxWindow, newSlowStartTreshold, newSlowStart) = applyCongestionControl( - socket.sendBufferTracker.maxWindow, + socket.maxWindow, socket.slowStart, socket.slowStartTreshold, socket.socketConfig.optSndBuffer, @@ -1339,7 +1205,8 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = ) # update remote window size and max window - socket.sendBufferTracker.updateMaxWindowSize(p.header.wndSize, newMaxWindow) + socket.maxWindow = newMaxWindow + socket.maxRemoteWindow = p.header.wndSize socket.slowStart = newSlowStart socket.slowStartTreshold = newSlowStartTreshold @@ -1349,7 +1216,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = slowStartTreshold = newSlowStartTreshold, slowstart = newSlowStart - if (socket.zeroWindowTimer.isNone() and socket.sendBufferTracker.maxRemoteWindow <= currentPacketSize): + if (socket.zeroWindowTimer.isNone() and socket.maxRemoteWindow <= currentPacketSize): # when zeroWindowTimer will be hit and maxRemoteWindow still will be equal to 0 # then it will be reset to minimal value socket.zeroWindowTimer = some(timestampInfo.moment + socket.socketConfig.remoteWindowResetTimeout) @@ -1407,7 +1274,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = # Is is safe to call force resend as we already checked shouldReSendPacket # condition - socket.forceResendPacket(oldestOutstandingPktSeqNr) + socket.sendPacket(oldestOutstandingPktSeqNr) if (p.eack.isSome()): socket.selectiveAckPackets(pkAckNr, p.eack.unsafeGet(), timestampInfo.moment) @@ -1432,11 +1299,28 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = # we got in order packet if (pastExpected == 0 and (not socket.reachedFin)): debug "Received in order packet" - if (len(p.payload) > 0 and (not socket.readShutdown)): + let payloadLength = len(p.payload) + if (payloadLength > 0 and (not socket.readShutdown)): + # we need to sum both rcv buffer and reorder buffer + if (uint32(socket.offset) + socket.inBufferBytes + uint32(payloadLength) > socket.socketConfig.optRcvBuffer): + # even though packet is in order and passes all the checks, it would + # overflow our receive buffer, it means that we are receiving data + # faster than we are reading it. Do not ack this packet, and drop received + # data + debug "Recevied packet would overflow receive buffer dropping it", + pkSeqNr = p.header.seqNr, + bytesReceived = payloadLength, + rcvbufferSize = socket.offset, + reorderBufferSize = socket.inBufferBytes + return + debug "Received data packet", - bytesReceived = len(p.payload) + bytesReceived = payloadLength # we are getting in order data packet, we can flush data directly to the incoming buffer - await upload(addr socket.buffer, unsafeAddr p.payload[0], p.payload.len()) + # await upload(addr socket.buffer, unsafeAddr p.payload[0], p.payload.len()) + moveMem(addr socket.rcvBuffer[socket.offset], unsafeAddr p.payload[0], payloadLength) + socket.offset = socket.offset + payloadLength + # Bytes have been passed to upper layer, we can increase number of last # acked packet inc socket.ackNr @@ -1462,9 +1346,6 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = # ignore following packets socket.reorderCount = 0 - # notify all readers we have reached eof - socket.buffer.forget() - if socket.reorderCount == 0: break @@ -1476,21 +1357,30 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = break let packet = maybePacket.unsafeGet() + let reorderPacketPayloadLength = len(packet.payload) - if (len(packet.payload) > 0 and (not socket.readShutdown)): + if (reorderPacketPayloadLength > 0 and (not socket.readShutdown)): debug "Got packet from reorder buffer", packetBytes = len(packet.payload), packetSeqNr = packet.header.seqNr, packetAckNr = packet.header.ackNr, socketSeqNr = socket.seqNr, - socekrAckNr = socket.ackNr + socektAckNr = socket.ackNr, + rcvbufferSize = socket.offset, + reorderBufferSize = socket.inBufferBytes + + # Rcv buffer and reorder buffer are sized that it is always possible to + # move data from reorder buffer to rcv buffer without overflow + moveMem(addr socket.rcvBuffer[socket.offset], unsafeAddr packet.payload[0], reorderPacketPayloadLength) + socket.offset = socket.offset + reorderPacketPayloadLength - await upload(addr socket.buffer, unsafeAddr packet.payload[0], packet.payload.len()) + debug "Deleting packet", + seqNr = nextPacketNum socket.inBuffer.delete(nextPacketNum) - inc socket.ackNr dec socket.reorderCount + socket.inBufferBytes = socket.inBufferBytes - uint32(reorderPacketPayloadLength) debug "Socket state after processing in order packet", socketKey = socket.socketKey, @@ -1502,7 +1392,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = # need improvement, as with this approach there is no direct control over # how many concurrent tasks there are and how to cancel them when socket # is closed - discard socket.sendAck() + socket.sendAck() # we got packet out of order else: @@ -1523,13 +1413,27 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = debug "Packet with seqNr already received", seqNr = pkSeqNr else: - socket.inBuffer.put(pkSeqNr, p) - inc socket.reorderCount - debug "added out of order packet to reorder buffer", - reorderCount = socket.reorderCount - # we send ack packet, as we reoreder count is > 0, so the eack bitmask will be - # generated - discard socket.sendAck() + let payloadLength = uint32(len(p.payload)) + if (socket.inBufferBytes + payloadLength <= socket.socketConfig.maxSizeOfReorderBuffer and + socket.inBufferBytes + uint32(socket.offset) + payloadLength <= socket.socketConfig.optRcvBuffer): + + debug "store packet in reorder buffer", + packetBytes = payloadLength, + packetSeqNr = p.header.seqNr, + packetAckNr = p.header.ackNr, + socketSeqNr = socket.seqNr, + socektAckNr = socket.ackNr, + rcvbufferSize = socket.offset, + reorderBufferSize = socket.inBufferBytes + + socket.inBuffer.put(pkSeqNr, p) + inc socket.reorderCount + socket.inBufferBytes = socket.inBufferBytes + payloadLength + debug "added out of order packet to reorder buffer", + reorderCount = socket.reorderCount + # we send ack packet, as we reoreder count is > 0, so the eack bitmask will be + # generated + socket.sendAck() of ST_STATE: if (socket.state == SynSent and (not socket.connectionFuture.finished())): @@ -1547,10 +1451,158 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = of ST_SYN: debug "Received ST_SYN on known socket, ignoring" +proc processPacket*(socket: UtpSocket, p: Packet): Future[void] = + socket.eventQueue.put(SocketEvent(kind: NewPacket, packet: p)) + +template shiftBuffer(t, c: untyped) = + if (t).offset > c: + if c > 0: + moveMem(addr((t).rcvBuffer[0]), addr((t).rcvBuffer[(c)]), (t).offset - (c)) + (t).offset = (t).offset - (c) + else: + (t).offset = 0 + +proc onRead(socket: UtpSocket, readReq: var ReadReq): ReadResult = + if readReq.reader.finished(): + return ReadCancelled + + if socket.atEof(): + # buffer is already empty and we reached remote fin, just finish read with whatever + # was already read + readReq.reader.complete(readReq.bytesAvailable) + return SocketAlreadyFinished + + if readReq.bytesToRead == 0: + # treat is as read till eof + readReq.bytesAvailable.add(socket.rcvBuffer.toOpenArray(0, socket.offset - 1)) + socket.shiftBuffer(socket.offset) + if (socket.atEof()): + readReq.reader.complete(readReq.bytesAvailable) + return ReadFinished + else: + return ReadNotFinished + else: + let bytesAlreadyRead = len(readReq.bytesAvailable) + let bytesLeftToRead = readReq.bytesToRead - bytesAlreadyRead + let count = min(socket.offset, bytesLeftToRead) + readReq.bytesAvailable.add(socket.rcvBuffer.toOpenArray(0, count - 1)) + socket.shiftBuffer(count) + if (len(readReq.bytesAvailable) == readReq.bytesToRead): + readReq.reader.complete(readReq.bytesAvailable) + return ReadFinished + else: + return ReadNotFinished + +proc eventLoop(socket: UtpSocket) {.async.} = + try: + while true: + let ev = await socket.eventQueue.get() + case ev.kind + of NewPacket: + socket.processPacketInternal(ev.packet) + + # we processed a packet and rcv buffer size is larger than 0, + # check if we can finish some pending readers + while socket.pendingReads.len() > 0 and socket.offset > 0: + let readResult = socket.onRead(socket.pendingReads[0]) + case readResult + of ReadFinished: + discard socket.pendingReads.popFirst() + of ReadNotFinished: + # there was not enough bytes in buffer to finish this read request, + # stop processing fruther reeads + break + else: + # read was cancelled or socket is already finished move on to next read + # request + discard socket.pendingReads.popFirst() + + # we processed packet, so there could more place in the send buffer + while socket.pendingWrites.len() > 0: + let wr = socket.pendingWrites.popFirst() + case wr.kind + of Close: + socket.handleClose() + # close should be last packet send + break + of Data: + # check if writing was not cancelled in the mean time. This approach + # can create partial writes as part of the data could be written with + # with WriteReq + if (not wr.writer.finished()): + let bytesWritten = socket.handleDataWrite(wr.data) + if (bytesWritten == len(wr.data)): + # all bytes were written we can finish external future + wr.writer.complete(Result[int, WriteError].ok(bytesWritten)) + else: + let bytesLeft = wr.data[bytesWritten..ev.data.high] + # bytes partially written to buffer, schedule rest of data for later + socket.pendingWrites.addFirst(WriteRequest(kind: Data, data: bytesLeft, writer: ev.writer)) + # there is no more place in the buffer break from the loop + break + of CheckTimeouts: + discard + of CloseReq: + if (socket.pendingWrites.len() > 0): + # there are still some unfinished writes, waiting to be finished + socket.pendingWrites.addLast(WriteRequest(kind: Close)) + else: + socket.handleClose() + of WriteReq: + # check if the writer was not cancelled in mean time + if (not ev.writer.finished()): + if (socket.pendingWrites.len() > 0): + # there are still some unfinished writes, waiting to be finished schdule this batch for later + socket.pendingWrites.addLast(WriteRequest(kind: Data, data: ev.data, writer: ev.writer)) + else: + let bytesWritten = socket.handleDataWrite(ev.data) + if (bytesWritten == len(ev.data)): + # all bytes were written we can finish external future + ev.writer.complete(Result[int, WriteError].ok(bytesWritten)) + else: + let bytesLeft = ev.data[bytesWritten..ev.data.high] + # bytes partially written to buffer, schedule rest of data for later + socket.pendingWrites.addLast(WriteRequest(kind: Data, data: bytesLeft, writer: ev.writer)) + of ReadReqType: + # check if the writer was not cancelled in mean time + if (not ev.readReq.reader.finished()): + if (socket.pendingReads.len() > 0): + # there is already pending unfininshed read request, schedule this one for + # later + socket.pendingReads.addLast(ev.readReq) + else: + var readReq = ev.readReq + let readResult = socket.onRead(readReq) + case readResult + of ReadNotFinished: + socket.pendingReads.addLast(readReq) + else: + # in any other case we do not need to do any thing + discard + + socket.checkTimeouts() + except CancelledError: + for w in socket.pendingWrites.items(): + if w.kind == Data and (not w.writer.finished()): + let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state)) + w.writer.complete(res) + for r in socket.pendingReads.items(): + # complete every reader with already read bytes + # TODO: it maybe better to refine read api to returl Future[Result[seq[byte], E]] + # and return erros for not finished reads + if (not r.reader.finished()): + r.reader.complete(r.bytesAvailable) + socket.pendingWrites.clear() + socket.pendingReads.clear() + trace "main socket event loop cancelled" + +proc startEventLoop(s: UtpSocket) = + s.eventLoop = eventLoop(s) + proc atEof*(socket: UtpSocket): bool = # socket is considered at eof when remote side sent us fin packet # and we have processed all packets up to fin - socket.buffer.dataLen() == 0 and socket.reachedFin + socket.offset == 0 and socket.reachedFin proc readingClosed(socket: UtpSocket): bool = socket.atEof() or socket.state == Destroy @@ -1569,7 +1621,7 @@ proc close*(socket: UtpSocket) = # with this approach, all pending writes will be executed before sending fin packet # we could also and method which places close request as first one to process # but it would complicate the write loop - socket.writeQueue.putNoWait(WriteRequest(kind: Close)) + socket.eventQueue.putNoWait(SocketEvent(kind: CloseReq)) except AsyncQueueFullError as e: # should not happen as our write queue is unbounded raiseAssert e.msg @@ -1616,65 +1668,62 @@ proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] = return retFuture try: - socket.writeQueue.putNoWait(WriteRequest(kind: Data, data: data, writer: retFuture)) + socket.eventQueue.putNoWait(SocketEvent(kind: WriteReq, data: data, writer: retFuture)) except AsyncQueueFullError as e: # this should not happen as out write queue is unbounded raiseAssert e.msg return retFuture -template readLoop(body: untyped): untyped = - while true: - let (consumed, done) = body - socket.buffer.shift(consumed) - if done: - break - else: - if not(socket.readingClosed()): - await socket.buffer.wait() - -proc read*(socket: UtpSocket, n: Natural): Future[seq[byte]] {.async.}= - ## Read all bytes `n` bytes from socket ``socket``. - ## - ## This procedure allocates buffer seq[byte] and return it as result. - var bytes = newSeq[byte]() - - if n == 0: - return bytes - - readLoop(): - if socket.readingClosed(): - (0, true) - else: - let count = min(socket.buffer.dataLen(), n - len(bytes)) - bytes.add(socket.buffer.buffer.toOpenArray(0, count - 1)) - (count, len(bytes) == n) - - debug "Read data ", - remote = socket.socketKey, - length = n - - return bytes - -proc read*(socket: UtpSocket): Future[seq[byte]] {.async.}= +proc read*(socket: UtpSocket, n: Natural): Future[seq[byte]] = ## Read all bytes from socket ``socket``. ## ## This procedure allocates buffer seq[byte] and return it as result. - var bytes = newSeq[byte]() + let fut = newFuture[seq[uint8]]() - readLoop(): - if socket.readingClosed(): - (0, true) - else: - let count = socket.buffer.dataLen() - bytes.add(socket.buffer.buffer.toOpenArray(0, count - 1)) - (count, false) + if socket.readingClosed(): + fut.complete(newSeq[uint8]()) + return fut - debug "Read data ", - remote = socket.socketKey, - length = len(bytes) + try: + socket.eventQueue.putNoWait( + SocketEvent( + kind:ReadReqType, + readReq: ReadReq( + bytesToRead: n, + bytesAvailable: newSeq[uint8](), + reader: fut)) + ) + except AsyncQueueFullError as e: + # should not happen as our write queue is unbounded + raiseAssert e.msg - return bytes + return fut + +proc read*(socket: UtpSocket): Future[seq[byte]] = + ## Read all bytes from socket ``socket``. + ## + ## This procedure allocates buffer seq[byte] and return it as result. + let fut = newFuture[seq[uint8]]() + + if socket.readingClosed(): + fut.complete(newSeq[uint8]()) + return fut + + try: + socket.eventQueue.putNoWait( + SocketEvent( + kind:ReadReqType, + readReq: ReadReq( + bytesToRead: 0, + bytesAvailable: newSeq[uint8](), + reader: fut)) + ) + except AsyncQueueFullError as e: + # should not happen as our write queue is unbounded + raiseAssert e.msg + + return fut # Check how many packets are still in the out going buffer, usefull for tests or # debugging. @@ -1686,10 +1735,10 @@ proc numPacketsInOutGoingBuffer*(socket: UtpSocket): int = num # Check how many payload bytes are still in flight -proc numOfBytesInFlight*(socket: UtpSocket): uint32 = socket.sendBufferTracker.currentBytesInFlight() +proc numOfBytesInFlight*(socket: UtpSocket): uint32 = socket.currentWindow # Check how many bytes are in incoming buffer -proc numOfBytesInIncomingBuffer*(socket: UtpSocket): uint32 = uint32(socket.buffer.dataLen()) +proc numOfBytesInIncomingBuffer*(socket: UtpSocket): uint32 = uint32(socket.offset) # Check how many packets are still in the reorder buffer, usefull for tests or # debugging. @@ -1702,6 +1751,8 @@ proc numPacketsInReordedBuffer*(socket: UtpSocket): int = doAssert(num == int(socket.reorderCount)) num +proc numOfEventsInEventQueue*(socket: UtpSocket): int = len(socket.eventQueue) + proc connectionId*[A](socket: UtpSocket[A]): uint16 = ## Connection id is id which is used in first SYN packet which establishes the connection ## so for Outgoing side it is actually its rcv_id, and for Incoming side it is @@ -1714,4 +1765,140 @@ proc connectionId*[A](socket: UtpSocket[A]): uint16 = # Check what is current available window size for this socket proc currentMaxWindowSize*[A](socket: UtpSocket[A]): uint32 = - socket.sendBufferTracker.maxWindow + socket.maxWindow + +proc new[A]( + T: type UtpSocket[A], + to: A, + snd: SendCallback[A], + state: ConnectionState, + cfg: SocketConfig, + direction: ConnectionDirection, + rcvId: uint16, + sndId: uint16, + initialSeqNr: uint16, + initialAckNr: uint16, + initialTimeout: Duration +): T = + let currentTime = getMonoTimestamp().moment + T( + remoteAddress: to, + state: state, + direction: direction, + socketConfig: cfg, + connectionIdRcv: rcvId, + connectionIdSnd: sndId, + seqNr: initialSeqNr, + ackNr: initialAckNr, + connectionFuture: newFuture[void](), + outBuffer: GrowableCircularBuffer[OutgoingPacket].init(), + outBufferBytes: 0, + currentWindow: 0, + # start with 1mb assumption, field will be updated with first received packet + maxRemoteWindow: 1024 * 1024, + maxWindow: startMaxWindow, + inBuffer: GrowableCircularBuffer[Packet].init(), + retransmitTimeout: initialTimeout, + rtoTimeout: currentTime + initialTimeout, + # Initial timeout values taken from reference implemntation + rtt: milliseconds(0), + rttVar: milliseconds(800), + rto: milliseconds(3000), + rcvBuffer: newSeq[uint8](int(cfg.optRcvBuffer)), + pendingReads: initDeque[ReadReq](), + closeEvent: newAsyncEvent(), + closeCallbacks: newSeq[Future[void]](), + pendingWrites: initDeque[WriteRequest](), + eventQueue: newAsyncQueue[SocketEvent](), + zeroWindowTimer: none[Moment](), + socketKey: UtpSocketKey.init(to, rcvId), + slowStart: true, + fastTimeout: false, + fastResendSeqNr: initialSeqNr, + lastWindowDecay: currentTime - maxWindowDecay, + slowStartTreshold: cfg.optSndBuffer, + ourHistogram: DelayHistogram.init(currentTime), + remoteHistogram: DelayHistogram.init(currentTime), + driftCalculator: ClockDriftCalculator.init(currentTime), + send: snd + ) + +proc newOutgoingSocket*[A]( + to: A, + snd: SendCallback[A], + cfg: SocketConfig, + rcvConnectionId: uint16, + rng: var BrHmacDrbgContext +): UtpSocket[A] = + let sndConnectionId = rcvConnectionId + 1 + let initialSeqNr = randUint16(rng) + + UtpSocket[A].new( + to, + snd, + SynSent, + cfg, + Outgoing, + rcvConnectionId, + sndConnectionId, + initialSeqNr, + # Initialy ack nr is 0, as we do not know remote inital seqnr + 0, + cfg.initialSynTimeout + ) + +proc newIncomingSocket*[A]( + to: A, + snd: SendCallback[A], + cfg: SocketConfig, + connectionId: uint16, + ackNr: uint16, + rng: var BrHmacDrbgContext +): UtpSocket[A] = + let initialSeqNr = randUint16(rng) + + let (initialState, initialTimeout) = + if (cfg.incomingSocketReceiveTimeout.isNone()): + # it does not matter what timeout value we put here, as socket will be in + # connected state without outgoing packets in buffer so any timeout hit will + # just double rto without any penalties + # although we cannont use 0, as then timeout will be constantly re-set to 500ms + # and there will be a lot of not usefull work done + (Connected, defaultInitialSynTimeout) + else: + let timeout = cfg.incomingSocketReceiveTimeout.unsafeGet() + (SynRecv, timeout) + + UtpSocket[A].new( + to, + snd, + initialState, + cfg, + Incoming, + connectionId + 1, + connectionId, + initialSeqNr, + ackNr, + initialTimeout + ) + +proc startIncomingSocket*(socket: UtpSocket) = + # Make sure ack was flushed before moving forward + socket.sendAck() + socket.startEventLoop() + socket.startTimeoutLoop() + +proc startOutgoingSocket*(socket: UtpSocket): Future[void] = + doAssert(socket.state == SynSent) + let packet = synPacket(socket.seqNr, socket.connectionIdRcv, socket.getRcvWindowSize()) + debug "Sending SYN packet", + seqNr = packet.header.seqNr, + connectionId = packet.header.connectionId + # set number of transmissions to 1 as syn packet will be send just after + # initiliazation + let outgoingPacket = OutgoingPacket.init(encodePacket(packet), 1, false, 0) + socket.registerOutgoingPacket(outgoingPacket) + socket.startEventLoop() + socket.startTimeoutLoop() + socket.sendData(outgoingPacket.packetBytes) + return socket.connectionFuture diff --git a/tests/utp/test_protocol_integration.nim b/tests/utp/test_protocol_integration.nim index 43e712f..d3121a9 100644 --- a/tests/utp/test_protocol_integration.nim +++ b/tests/utp/test_protocol_integration.nim @@ -123,11 +123,10 @@ procSuite "Utp protocol over udp tests with loss and delays": let testCases = @[ TestCase.init(45, 10, 40000), - TestCase.init(45, 15, 40000), - TestCase.init(50, 20, 20000), + TestCase.init(25, 15, 40000), # super small recv buffer which will be constantly on the brink of being full - TestCase.init(15, 5, 80000, SocketConfig.init(optRcvBuffer = uint32(2000), remoteWindowResetTimeout = seconds(5))), - TestCase.init(15, 10, 80000, SocketConfig.init(optRcvBuffer = uint32(2000), remoteWindowResetTimeout = seconds(5))) + TestCase.init(15, 5, 40000, SocketConfig.init(optRcvBuffer = uint32(6000), remoteWindowResetTimeout = seconds(5))), + TestCase.init(15, 10, 40000, SocketConfig.init(optRcvBuffer = uint32(6000), remoteWindowResetTimeout = seconds(5))) ] asyncTest "Write and Read large data in different network conditions": @@ -173,9 +172,9 @@ procSuite "Utp protocol over udp tests with loss and delays": let testCases1 = @[ # small buffers so it will fill up between reads - TestCase.init(15, 5, 60000, SocketConfig.init(optRcvBuffer = uint32(2000), remoteWindowResetTimeout = seconds(5)), 10000), - TestCase.init(15, 10, 60000, SocketConfig.init(optRcvBuffer = uint32(2000), remoteWindowResetTimeout = seconds(5)), 10000), - TestCase.init(15, 15, 60000, SocketConfig.init(optRcvBuffer = uint32(2000), remoteWindowResetTimeout = seconds(5)), 10000) + TestCase.init(15, 5, 40000, SocketConfig.init(optRcvBuffer = uint32(6000), remoteWindowResetTimeout = seconds(5)), 10000), + TestCase.init(15, 10, 40000, SocketConfig.init(optRcvBuffer = uint32(6000), remoteWindowResetTimeout = seconds(5)), 10000), + TestCase.init(15, 15, 40000, SocketConfig.init(optRcvBuffer = uint32(6000), remoteWindowResetTimeout = seconds(5)), 10000) ] proc readWithMultipleReads(s: UtpSocket[TransportAddress], numOfReads: int, bytesPerRead: int): Future[seq[byte]] {.async.}= diff --git a/tests/utp/test_utils.nim b/tests/utp/test_utils.nim index 30209ce..556b558 100644 --- a/tests/utp/test_utils.nim +++ b/tests/utp/test_utils.nim @@ -44,7 +44,7 @@ template connectOutGoingSocket*( ) await sock1.processPacket(responseAck) - + await waitUntil(proc (): bool = sock1.isConnected()) check: sock1.isConnected() @@ -72,12 +72,14 @@ template connectOutGoingSocketWithIncoming*( rng[] ) - await incomingSocket.startIncomingSocket() + incomingSocket.startIncomingSocket() let responseAck = await incomingQueue.get() await outgoingSocket.processPacket(responseAck) - + + await waitUntil(proc (): bool = outgoingSocket.isConnected()) + check: outgoingSocket.isConnected() diff --git a/tests/utp/test_utp_router.nim b/tests/utp/test_utp_router.nim index 11b8cc2..75b6816 100644 --- a/tests/utp/test_utp_router.nim +++ b/tests/utp/test_utp_router.nim @@ -188,6 +188,8 @@ procSuite "Utp router unit tests": await router.processIncomingBytes(encodedData, testSender) + await waitUntil(proc (): bool = socket.numOfEventsInEventQueue() == 0) + check: socket.isConnected() @@ -350,8 +352,8 @@ procSuite "Utp router unit tests": check: connectResult.isErr() - connectResult.error().kind == ErrorWhileSendingSyn - cast[TestError](connectResult.error().error) is TestError + # even though send is failing we will just finish with timeout, + connectResult.error().kind == ConnectionTimedOut router.len() == 0 asyncTest "Router should clear closed outgoing connections": diff --git a/tests/utp/test_utp_socket.nim b/tests/utp/test_utp_socket.nim index fb43436..fbae5f8 100644 --- a/tests/utp/test_utp_socket.nim +++ b/tests/utp/test_utp_socket.nim @@ -295,7 +295,6 @@ procSuite "Utp socket unit test": await outgoingSocket.destroyWait() asyncTest "Ignoring totally out of order packet": - # TODO test is valid until implementing selective acks let q = newAsyncQueue[Packet]() let initalRemoteSeqNr = 10'u16 @@ -305,11 +304,11 @@ procSuite "Utp socket unit test": await outgoingSocket.processPacket(packets[1024]) - check: - outgoingSocket.numPacketsInReordedBuffer() == 0 - await outgoingSocket.processPacket(packets[1023]) + # give some time to process those packets + await sleepAsync(milliseconds(500)) + check: outgoingSocket.numPacketsInReordedBuffer() == 1 @@ -349,6 +348,8 @@ procSuite "Utp socket unit test": await outgoingSocket.processPacket(responseAck) + await waitUntil(proc (): bool = outgoingSocket.numPacketsInOutGoingBuffer() == 0) + check: outgoingSocket.numPacketsInOutGoingBuffer() == 0 @@ -427,7 +428,7 @@ procSuite "Utp socket unit test": let dataToWrite1 = @[0'u8] let dataToWrite2 = @[1'u8] - let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, 0) + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, cfg = SocketConfig.init(optSndBuffer = 0)) let writeFut1 = outgoingSocket.write(dataToWrite1) let writeFut2 = outgoingSocket.write(dataToWrite2) @@ -531,6 +532,8 @@ procSuite "Utp socket unit test": await outgoingSocket.processPacket(responseAck) + await waitUntil(proc (): bool = outgoingSocket.isConnected()) + check: outgoingSocket.isConnected() @@ -768,6 +771,8 @@ procSuite "Utp socket unit test": await outgoingSocket.processPacket(responseAck) + await waitUntil(proc (): bool = not outgoingSocket.isConnected()) + check: not outgoingSocket.isConnected() @@ -1005,6 +1010,8 @@ procSuite "Utp socket unit test": await outgoingSocket.processPacket(responseAck) + await waitUntil(proc (): bool = int(outgoingSocket.numOfBytesInFlight) == len(dataToWrite)) + check: # only first packet has been acked so there should still by 5 bytes left int(outgoingSocket.numOfBytesInFlight) == len(dataToWrite) @@ -1052,18 +1059,18 @@ procSuite "Utp socket unit test": let q = newAsyncQueue[Packet]() let initialRemoteSeq = 10'u16 - let dataToWrite = @[1'u8, 2, 3, 4, 5] + let dataToWrite = generateByteArray(rng[], 1001) # remote is initialized with buffer to small to handle whole payload - let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, uint32(len(dataToWrite) - 1)) + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, cfg = SocketConfig.init(optSndBuffer = 1000)) let writeFut = outgoingSocket.write(dataToWrite) # wait some time to check future is not finished await sleepAsync(seconds(2)) - # write is not finished as future is blocked from progressing due to to small - # send window + # write is not finished as future is blocked from progressing due to to full + # send buffer check: not writeFut.finished() @@ -1071,20 +1078,18 @@ procSuite "Utp socket unit test": ackPacket( initialRemoteSeq, initialPacket.header.connectionId, - initialPacket.header.seqNr, - uint32(len(dataToWrite)), + initialPacket.header.seqNr + 1, + testBufferSize, 0 ) await outgoingSocket.processPacket(someAckFromRemote) - # after processing packet with increased buffer size write should complete and - # packet should be sent - let sentPacket = await q.get() + # only after processing ack write will progress + let writeResult = await writeFut check: - sentPacket.payload == dataToWrite - writeFut.finished() + writeResult.isOK() await outgoingSocket.destroyWait() @@ -1092,30 +1097,21 @@ procSuite "Utp socket unit test": let q = newAsyncQueue[Packet]() let initialRemoteSeq = 10'u16 - let dataToWrite = @[1'u8, 2, 3, 4, 5] - + let dataToWirte = 1160 # remote is initialized with buffer to small to handle whole payload - let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q) - let remoteRcvWindowSize = uint32(outgoingSocket.getPacketSize()) - let someAckFromRemote = - ackPacket( - initialRemoteSeq, - initialPacket.header.connectionId, - initialPacket.header.seqNr, - remoteRcvWindowSize, - 0 - ) + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, cfg = SocketConfig.init(optSndBuffer = 1160)) - # we are using ack from remote to setup our snd window size to one packet size on one packet - await outgoingSocket.processPacket(someAckFromRemote) + let twoPacketData = generateByteArray(rng[], int(dataToWirte)) - let twoPacketData = generateByteArray(rng[], int(2 * remoteRcvWindowSize)) + let writeResult = await outgoingSocket.write(twoPacketData) + check: + writeResult.isOk() + + # this write will not progress as snd buffer is full let writeFut = outgoingSocket.write(twoPacketData) - # after this time first packet will be send and will timeout, but the write should not - # finish, as timeouting packets do not notify writing about new space in snd - # buffer + # we wait for packets to timeout await sleepAsync(seconds(2)) check: @@ -1162,15 +1158,22 @@ procSuite "Utp socket unit test": check: packet.header.pType == ST_DATA uint32(len(packet.payload)) == remoteRcvWindowSize - not writeFut.finished + + let packet1Fut = q.get() + + await sleepAsync(milliseconds(500)) + + check: + not packet1Fut.finished() await outgoingSocket.processPacket(firstAckFromRemote) - let packet1 = await q.get() - let writeResult = await writeFut + # packet is sent only after first packet is acked + let packet1 = await packet1Fut check: packet1.header.pType == ST_DATA + packet1.header.seqNr == packet.header.seqNr + 1 writeFut.finished await outgoingSocket.destroyWait() @@ -1192,19 +1195,10 @@ procSuite "Utp socket unit test": check: outgoingSocket.isConnected() - let writeFut = outgoingSocket.write(someData) - - await sleepAsync(seconds(1)) - - check: - # Even after 1 second write is not finished as we did not receive any message - # so remote rcv window is still zero - not writeFut.finished() - - # Ultimately, after 3 second remote rcv window will be reseted to minimal value - # and write will be able to progress - let writeResult = await writeFut - + # write result will be successfull as send buffer has space + let writeResult = await outgoingSocket.write(someData) + + # this will finish in seconds(3) as only after this time window will be set to min value let p = await q.get() check: diff --git a/tests/utp/test_utp_socket_sack.nim b/tests/utp/test_utp_socket_sack.nim index 0433797..567d7e7 100644 --- a/tests/utp/test_utp_socket_sack.nim +++ b/tests/utp/test_utp_socket_sack.nim @@ -48,6 +48,8 @@ procSuite "Utp socket selective acks unit test": for p in dataPackets: await outgoingSocket.processPacket(p) + await waitUntil(proc (): bool = outgoingSocket.numOfEventsInEventQueue() == 0) + let extArray = outgoingSocket.generateSelectiveAckBitMask() await outgoingSocket.destroyWait() @@ -170,6 +172,7 @@ procSuite "Utp socket selective acks unit test": for toDeliver in testCase.packetsDelivered: await incomingSocket.processPacket(packets[toDeliver]) + await waitUntil(proc (): bool = incomingSocket.numOfEventsInEventQueue() == 0) return (outgoingSocket, incomingSocket, packets) @@ -248,8 +251,12 @@ procSuite "Utp socket selective acks unit test": await outgoingSocket.processPacket(finalAck) + let expectedPackets = testCase.numOfPackets - len(testCase.packetsDelivered) + + await waitUntil(proc (): bool = outgoingSocket.numPacketsInOutGoingBuffer() == expectedPackets) + check: - outgoingSocket.numPacketsInOutGoingBuffer() == testCase.numOfPackets - len(testCase.packetsDelivered) + outgoingSocket.numPacketsInOutGoingBuffer() == expectedPackets await outgoingSocket.destroyWait() await incomingSocket.destroyWait() @@ -299,6 +306,8 @@ procSuite "Utp socket selective acks unit test": await outgoingSocket.processPacket(finalAck) + await waitUntil(proc (): bool = outgoingSocket.numOfEventsInEventQueue() == 0) + for idx in testCase.packetsResent: let resentPacket = await outgoingQueue.get() check: