From 8ef6b13b1b3bb11d33b157d16cd86d14d99cf73a Mon Sep 17 00:00:00 2001 From: KonradStaniec Date: Thu, 24 Feb 2022 18:22:44 +0100 Subject: [PATCH] Add event loop to socket (#475) - add eventLoop to control all incoming events - change semantic of write to asynchronously block only when send buffer is full, and not when bytes do not fit into send window - change handling of receive buffer, to start dropping packets if the reorder buffer and receive buffer are full. Old behaviour was to async block unless there is space which could lead to resource exhaustion attacks --- eth/utp/send_buffer_tracker.nim | 107 --- eth/utp/utp_router.nim | 8 +- eth/utp/utp_socket.nim | 947 ++++++++++++++---------- tests/utp/test_protocol_integration.nim | 13 +- tests/utp/test_utils.nim | 8 +- tests/utp/test_utp_router.nim | 6 +- tests/utp/test_utp_socket.nim | 94 ++- tests/utp/test_utp_socket_sack.nim | 11 +- 8 files changed, 637 insertions(+), 557 deletions(-) delete mode 100644 eth/utp/send_buffer_tracker.nim diff --git a/eth/utp/send_buffer_tracker.nim b/eth/utp/send_buffer_tracker.nim deleted file mode 100644 index 5c90b8e..0000000 --- a/eth/utp/send_buffer_tracker.nim +++ /dev/null @@ -1,107 +0,0 @@ -# Copyright (c) 2021 Status Research & Development GmbH -# Licensed and distributed under either of -# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). -# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). -# at your option. This file may not be copied, modified, or distributed except according to those terms. - -{.push raises: [Defect].} - -import - chronos - -# Internal Utp data structure to track send window and properly block when there is -# no free space when trying to send more bytes -type SendBufferTracker* = ref object - # number of payload bytes in-flight (i.e not counting header sizes) - # packets that have not yet been sent do not count, packets - # that are marked as needing to be re-sent (due to a timeout) - # don't count either - currentWindow*: uint32 - - # remote receive window updated based on packed wndSize field - maxRemoteWindow*: uint32 - - # maximum window size, in bytes, calculated by local congestion controller - maxWindow*: uint32 - - # configuration option for maxium number of bytes in snd buffer - maxSndBufferSize*: uint32 - waiters: seq[(uint32, Future[void])] - -proc new*( - T: type SendBufferTracker, - currentWindow: uint32, - maxRemoteWindow: uint32, - maxSndBufferSize: uint32, - maxWindow: uint32): T = - return ( - SendBufferTracker( - currentWindow: currentWindow, - maxRemoteWindow: maxRemoteWindow, - maxSndBufferSize: maxSndBufferSize, - maxWindow: maxWindow, - waiters: @[] - ) - ) - -proc currentFreeBytes*(t: SendBufferTracker): uint32 = - let maxSend = min(min(t.maxRemoteWindow, t.maxSndBufferSize), t.maxWindow) - if (maxSend <= t.currentWindow): - return 0 - else: - return maxSend - t.currentWindow - -proc notifyWaiters*(t: SendBufferTracker) = - var i = 0 - while i < len(t.waiters): - let freeSpace = t.currentFreeBytes() - let (required, fut) = t.waiters[i] - if (required <= freeSpace): - # in case future was cancelled - if (not fut.finished()): - t.currentWindow = t.currentWindow + required - fut.complete() - t.waiters.del(i) - else: - # we do not have place for next waiter, just finish processing - return - -proc updateMaxRemote*(t: SendBufferTracker, newRemoteWindow: uint32) = - t.maxRemoteWindow = newRemoteWindow - t.notifyWaiters() - -proc updateMaxWindow*(t: SendBufferTracker, maxWindow: uint32) = - t.maxWindow = maxWindow - t.notifyWaiters() - -proc updateMaxWindowSize*(t: SendBufferTracker, newRemoteWindow: uint32, maxWindow: uint32) = - t.maxRemoteWindow = newRemoteWindow - t.maxWindow = maxWindow - t.notifyWaiters() - -proc decreaseCurrentWindow*(t: SendBufferTracker, value: uint32) = - doAssert(t.currentWindow >= value) - t.currentWindow = t.currentWindow - value - -proc reserveNBytesWait*(t: SendBufferTracker, n: uint32): Future[void] = - let fut = newFuture[void]("SendBufferTracker.reserveNBytesWait") - let free = t.currentFreeBytes() - if (n <= free): - t.currentWindow = t.currentWindow + n - fut.complete() - else: - t.waiters.add((n, fut)) - fut - -proc reserveNBytes*(t: SendBufferTracker, n: uint32): bool = - let free = t.currentFreeBytes() - if (n <= free): - t.currentWindow = t.currentWindow + n - return true - else: - return false - -proc forceReserveNBytes*(t: SendBufferTracker, n: uint32) = - t.currentWindow = t.currentWindow + n - -proc currentBytesInFlight*(t: SendBufferTracker): uint32 = t.currentWindow diff --git a/eth/utp/utp_router.nim b/eth/utp/utp_router.nim index 3fa301f..2a5fb64 100644 --- a/eth/utp/utp_router.nim +++ b/eth/utp/utp_router.nim @@ -168,7 +168,7 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}= # Initial ackNr is set to incoming packer seqNr let incomingSocket = newIncomingSocket[A](sender, r.sendCb, r.socketConfig ,p.header.connectionId, p.header.seqNr, r.rng[]) r.registerUtpSocket(incomingSocket) - await incomingSocket.startIncomingSocket() + incomingSocket.startIncomingSocket() # Based on configuration, socket is passed to upper layer either in SynRecv # or Connected state info "Accepting incoming connection", @@ -235,12 +235,6 @@ proc connect[A](s: UtpSocket[A]): Future[ConnectionResult[A]] {.async.}= to = s.socketKey s.destroy() return err(OutgoingConnectionError(kind: ConnectionTimedOut)) - except CatchableError as e: - info "Outgoing connection failed due to send error", - to = s.socketKey - s.destroy() - # this may only happen if user provided callback will for some reason fail - return err(OutgoingConnectionError(kind: ErrorWhileSendingSyn, error: e)) # Connect to provided address # Reference implementation: https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732 diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index ea41d41..fa5bf20 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -7,10 +7,9 @@ {.push raises: [Defect].} import - std/sugar, + std/[sugar, deques], chronos, chronicles, bearssl, stew/[results, bitops2], - ./send_buffer_tracker, ./growable_buffer, ./packets, ./ledbat_congestion_control, @@ -78,6 +77,9 @@ type # to zero. i.e when we received a packet from remote peer with `wndSize` set to 0. remoteWindowResetTimeout*: Duration + # Size of reorder buffer calculated as fraction of optRcvBuffer + maxSizeOfReorderBuffer: uint32 + WriteErrorType* = enum SocketNotWriteable, FinSent @@ -102,6 +104,31 @@ type of Close: discard + SocketEventType = enum + NewPacket, CheckTimeouts, CloseReq, WriteReq, ReadReqType + + ReadReq = object + bytesToRead: int + bytesAvailable: seq[uint8] + reader: Future[seq[uint8]] + + ReadResult = enum + ReadCancelled, ReadFinished, ReadNotFinished, SocketAlreadyFinished + + SocketEvent = object + case kind: SocketEventType + of CheckTimeouts: + discard + of NewPacket: + packet: Packet + of CloseReq: + discard + of WriteReq: + data: seq[byte] + writer: Future[WriteResult] + of ReadReqType: + readReq: ReadReq + UtpSocket*[A] = ref object remoteAddress*: A state: ConnectionState @@ -129,9 +156,24 @@ type # out going buffer for all send packets outBuffer: GrowableCircularBuffer[OutgoingPacket] + # current number of bytes in send buffer + outBufferBytes: uint32 + + # current number of bytes in flight + currentWindow: uint32 + + # current max window broadcasted by remote peer + maxRemoteWindow: uint32 + + # current max window calculated by ledbat congestion controller + maxWindow: uint32 + # incoming buffer for out of order packets inBuffer: GrowableCircularBuffer[Packet] + # number of bytes in reorder buffer + inBufferBytes: uint32 + # Number of packets waiting in reorder buffer reorderCount: uint16 @@ -150,7 +192,13 @@ type rtoTimeout: Moment # rcvBuffer - buffer: AsyncBuffer + rcvBuffer: seq[byte] + + # current size of rcv buffer + offset: int + + # readers waiting for data + pendingReads: Deque[ReadReq] # loop called every 500ms to check for on going timeout status checkTimeoutsLoop: Future[void] @@ -185,11 +233,11 @@ type # sequence number of remoted fin packet eofPktNr: uint16 - sendBufferTracker: SendBufferTracker + pendingWrites: Deque[WriteRequest] - writeQueue: AsyncQueue[WriteRequest] + eventQueue: AsyncQueue[SocketEvent] - writeLoop: Future[void] + eventLoop: Future[void] # timer which is started when peer max window drops below current packet size zeroWindowTimer: Option[Moment] @@ -239,12 +287,10 @@ type ConnectionError* = object of CatchableError OutgoingConnectionErrorType* = enum - SocketAlreadyExists, ConnectionTimedOut, ErrorWhileSendingSyn + SocketAlreadyExists, ConnectionTimedOut OutgoingConnectionError* = object case kind*: OutgoingConnectionErrorType - of ErrorWhileSendingSyn: - error*: ref CatchableError of SocketAlreadyExists, ConnectionTimedOut: discard @@ -311,6 +357,16 @@ const # minimal time before subseqent window decays maxWindowDecay = milliseconds(100) + # Maximal size of reorder buffer as fraction of optRcvBuffer size following + # semantics apply bases on rcvBuffer set to 1000 bytes: + # if there are already 1000 bytes in rcv buffer no more bytes will be accepted to reorder buffer + # if there are already 500 bytes in reoreder buffer, no more bytes will be accepted + # to it, and only 500 bytes can be accepted to rcv buffer + # this way there is always a space in rcv buffer to fit new data if the reordering + # happens + maxReorderBufferSize = 0.5 + + proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T = UtpSocketKey[A](remoteAddress: remoteAddress, rcvId: rcvId) @@ -338,17 +394,28 @@ proc init*( remoteWindowResetTimeout: Duration = defaultResetWindowTimeout, optSndBuffer: uint32 = defaultOptRcvBuffer ): T = + # TODO make sure optRcvBuffer is nicely divisible by maxReorderBufferSize + let reorderBufferSize = uint32(maxReorderBufferSize * float64(optRcvBuffer)) SocketConfig( initialSynTimeout: initialSynTimeout, dataResendsBeforeFailure: dataResendsBeforeFailure, optRcvBuffer: optRcvBuffer, optSndBuffer: optSndBuffer, incomingSocketReceiveTimeout: incomingSocketReceiveTimeout, - remoteWindowResetTimeout: remoteWindowResetTimeout + remoteWindowResetTimeout: remoteWindowResetTimeout, + maxSizeOfReorderBuffer: reorderBufferSize ) +# number of bytes which will fit in current send window +proc freeWindowBytes(socket: UtpSocket): uint32 = + let maxSend = min(socket.maxRemoteWindow, socket.maxWindow) + if (maxSend <= socket.currentWindow): + return 0 + else: + return maxSend - socket.currentWindow + proc getRcvWindowSize(socket: UtpSocket): uint32 = - let currentDataSize = socket.buffer.dataLen() + let currentDataSize = socket.offset if currentDataSize > int(socket.socketConfig.optRcvBuffer): 0'u32 else: @@ -358,42 +425,58 @@ proc registerOutgoingPacket(socket: UtpSocket, oPacket: OutgoingPacket) = ## Adds packet to outgoing buffer and updates all related fields socket.outBuffer.ensureSize(socket.seqNr, socket.curWindowPackets) socket.outBuffer.put(socket.seqNr, oPacket) + socket.outBufferBytes = socket.outBufferBytes + oPacket.payloadLength inc socket.seqNr inc socket.curWindowPackets -proc sendData(socket: UtpSocket, data: seq[byte]): Future[void] = +proc sendData(socket: UtpSocket, data: seq[byte]) = let f = socket.send(socket.remoteAddress, data) f.callback = proc(data: pointer) {.gcsafe.} = if f.failed: warn "UTP send failed", msg = f.readError.msg - return f -# Should be called before sending packet -proc setSend(s: UtpSocket, p: var OutgoingPacket): seq[byte] = - let timestampInfo = getMonoTimestamp() +proc sendPacket(socket: UtpSocket, seqNr: uint16) = + proc setSend(p: var OutgoingPacket): seq[byte] = + let timestampInfo = getMonoTimestamp() - inc p.transmissions - p.needResend = false - p.timeSent = timestampInfo.moment - # all bytearrays in outgoing buffer should be properly encoded utp packets - # so it is safe to directly modify fields - modifyTimeStampAndAckNr(p.packetBytes, timestampInfo.timestamp, s.ackNr) + if p.transmissions == 0 or p.needResend: + socket.currentWindow = socket.currentWindow + p.payloadLength - return p.packetBytes + inc p.transmissions + p.needResend = false + p.timeSent = timestampInfo.moment + # all bytearrays in outgoing buffer should be properly encoded utp packets + # so it is safe to directly modify fields + modifyTimeStampAndAckNr(p.packetBytes, timestampInfo.timestamp, socket.ackNr) -proc flushPackets(socket: UtpSocket) {.async.} = - var i: uint16 = socket.seqNr - socket.curWindowPackets + return p.packetBytes + + socket.sendData(setSend(socket.outBuffer[seqNr])) + +proc resetSendTimeout(socket: UtpSocket) = + socket.retransmitTimeout = socket.rto + socket.rtoTimeout = getMonoTimestamp().moment + socket.retransmitTimeout + +proc flushPackets(socket: UtpSocket) = + let oldestOutgoingPacketSeqNr = socket.seqNr - socket.curWindowPackets + var i: uint16 = oldestOutgoingPacketSeqNr while i != socket.seqNr: # sending only packet which were not transmitted yet or need a resend let shouldSendPacket = socket.outBuffer.exists(i, (p: OutgoingPacket) => (p.transmissions == 0 or p.needResend == true)) if (shouldSendPacket): - if socket.sendBufferTracker.reserveNBytes(socket.outBuffer[i].payloadLength): - debug "Resending packet during flush", + if (socket.freeWindowBytes() > 0): + # this our first send packet reset rto timeout + if i == oldestOutgoingPacketSeqNr and socket.curWindowPackets == 1 and socket.outBuffer[i].transmissions == 0: + socket.resetSendTimeout() + + debug "Flushing packet", pkSeqNr = i - let toSend = socket.setSend(socket.outBuffer[i]) - await socket.sendData(toSend) + socket.sendPacket(i) else: - debug "Should resend packet during flush but there is no place in send buffer", + debug "Should resend packet during flush but there is no place in send window", + currentBytesWindow = socket.currentWindow, + maxRemoteWindow = socket.maxRemoteWindow, + maxWindow = socket.maxWindow, pkSeqNr = i # there is no place in send buffer, stop flushing return @@ -409,10 +492,8 @@ proc markAllPacketAsLost(s: UtpSocket) = pkSeqNr = packetSeqNr s.outBuffer[packetSeqNr].needResend = true let packetPayloadLength = s.outBuffer[packetSeqNr].payloadLength - # lack of waiters notification in case of timeout effectivly means that - # we do not allow any new bytes to enter snd buffer in case of new free space - # due to timeout. - s.sendBufferTracker.decreaseCurrentWindow(packetPayloadLength) + doAssert(s.currentWindow >= packetPayloadLength, "Window should always be larger than packet length") + s.currentWindow = s.currentWindow - packetPayloadLength inc i @@ -427,44 +508,29 @@ proc shouldDisconnectFromFailedRemote(socket: UtpSocket): bool = (socket.state == SynSent and socket.retransmitCount >= 2) or (socket.retransmitCount >= socket.socketConfig.dataResendsBeforeFailure) -# Forces asynchronous re-send of packet waiting in outgoing buffer -proc forceResendPacket(socket: UtpSocket, pkSeqNr: uint16) = - doAssert( - socket.outBuffer.get(pkSeqNr).isSome(), - "Force resend should be called only on packet still in outgoing buffer" - ) - if socket.outBuffer[pkSeqNr].needResend: - # if needResend is set to true it means that packet payload was already - # removed from the bytes window and need to be re-added. - socket.sendBufferTracker.forceReserveNBytes(socket.outBuffer[pkSeqNr].payloadLength) - - let data = socket.setSend(socket.outBuffer[pkSeqNr]) - discard socket.sendData(data) - -proc checkTimeouts(socket: UtpSocket) {.async.} = +proc checkTimeouts(socket: UtpSocket) = let currentTime = getMonoTimestamp().moment # flush all packets which needs to be re-send if socket.state != Destroy: - await socket.flushPackets() + socket.flushPackets() if socket.isOpened(): let currentPacketSize = uint32(socket.getPacketSize()) if (socket.zeroWindowTimer.isSome() and currentTime > socket.zeroWindowTimer.unsafeGet()): - if socket.sendBufferTracker.maxRemoteWindow <= currentPacketSize: - socket.sendBufferTracker.updateMaxRemote(minimalRemoteWindow) + if socket.maxRemoteWindow <= currentPacketSize: + socket.maxRemoteWindow = minimalRemoteWindow socket.zeroWindowTimer = none[Moment]() debug "Reset remote window to minimal value", minRemote = minimalRemoteWindow - if (currentTime > socket.rtoTimeout): debug "CheckTimeouts rto timeout", socketKey = socket.socketKey, state = socket.state, - maxWindow = socket.sendBufferTracker.maxWindow, + maxWindow = socket.maxWindow, curWindowPackets = socket.curWindowPackets, - curWindowBytes = socket.sendBufferTracker.currentWindow + curWindowBytes = socket.currentWindow # TODO add handling of probe time outs. Reference implemenation has mechanism # of sending probes to determine mtu size. Probe timeouts do not count to standard @@ -494,39 +560,29 @@ proc checkTimeouts(socket: UtpSocket) {.async.} = # on timeout reset duplicate ack counter socket.duplicateAck = 0 - - - if (socket.curWindowPackets == 0 and socket.sendBufferTracker.maxWindow > currentPacketSize): + if (socket.curWindowPackets == 0 and socket.maxWindow > currentPacketSize): # there are no packets in flight even though there is place for more than whole packet # this means connection is just idling. Reset window by 1/3'rd but no more # than to fit at least one packet. - let oldMaxWindow = socket.sendBufferTracker.maxWindow + let oldMaxWindow = socket.maxWindow let newMaxWindow = max((oldMaxWindow * 2) div 3, currentPacketSize) debug "Decaying max window due to socket idling", oldMaxWindow = oldMaxWindow, newMaxWindow = newMaxWindow - - socket.sendBufferTracker.updateMaxWindowSize( - # maxRemote window does not change - socket.sendBufferTracker.maxRemoteWindow, - newMaxWindow - ) - elif (socket.sendBufferTracker.maxWindow < currentPacketSize): + + socket.maxWindow = newMaxWindow + elif (socket.maxWindow < currentPacketSize): # due to high delay window has shrunk below packet size # which means that we cannot send more data # reset it to fit at least one packet debug "Reseting window size do fit a least one packet", - oldWindowSize = socket.sendBufferTracker.maxWindow, + oldWindowSize = socket.maxWindow, newWindowSize = currentPacketSize # delay was so high that window has shrunk below one packet. Reset window # to fit a least one packet and start with slow start - socket.sendBufferTracker.updateMaxWindowSize( - # maxRemote window does not change - socket.sendBufferTracker.maxRemoteWindow, - currentPacketSize - ) + socket.maxWindow = currentPacketSize socket.slowStart = true # This will have much more sense when we will add handling of selective acks @@ -534,10 +590,9 @@ proc checkTimeouts(socket: UtpSocket) {.async.} = # from out buffer. markAllPacketAsLost(socket) - # resend oldest packet if there are some packets in flight - if (socket.curWindowPackets > 0): - let oldestPacketSeqNr = socket.seqNr - socket.curWindowPackets - + let oldestPacketSeqNr = socket.seqNr - socket.curWindowPackets + # resend oldest packet if there are some packets in flight, and oldestpacket was already sent + if (socket.curWindowPackets > 0 and socket.outBuffer[oldestPacketSeqNr].transmissions > 0): inc socket.retransmitCount socket.fastTimeout = true @@ -548,8 +603,8 @@ proc checkTimeouts(socket: UtpSocket) {.async.} = # Oldest packet should always be present, so it is safe to call force # resend - socket.forceResendPacket(oldestPacketSeqNr) - + socket.sendPacket(oldestPacketSeqNr) + # TODO add sending keep alives when necessary proc checkTimeoutsLoop(s: UtpSocket) {.async.} = @@ -557,7 +612,7 @@ proc checkTimeoutsLoop(s: UtpSocket) {.async.} = try: while true: await sleepAsync(checkTimeoutsLoopInterval) - await s.checkTimeouts() + await s.eventQueue.put(SocketEvent(kind: CheckTimeouts)) except CancelledError: trace "checkTimeoutsLoop canceled" @@ -568,16 +623,7 @@ proc getPacketSize*(socket: UtpSocket): int = # TODO currently returning constant, ultimatly it should be bases on mtu estimates mtuSize -proc resetSendTimeout(socket: UtpSocket) = - socket.retransmitTimeout = socket.rto - socket.rtoTimeout = getMonoTimestamp().moment + socket.retransmitTimeout - -proc handleDataWrite(socket: UtpSocket, data: seq[byte], writeFut: Future[WriteResult]): Future[void] {.async.} = - if writeFut.finished(): - # write future was cancelled befere we got chance to process it, short circuit - # processing and move to next loop iteration - return - +proc handleDataWrite(socket: UtpSocket, data: seq[byte]): int = let pSize = socket.getPacketSize() let endIndex = data.high() var i = 0 @@ -588,14 +634,9 @@ proc handleDataWrite(socket: UtpSocket, data: seq[byte], writeFut: Future[WriteR let lastOrEnd = min(lastIndex, endIndex) let dataSlice = data[i..lastOrEnd] let payloadLength = uint32(len(dataSlice)) - try: - await socket.sendBufferTracker.reserveNBytesWait(payloadLength) - - if socket.curWindowPackets == 0: - socket.resetSendTimeout() + if (socket.outBufferBytes + payloadLength <= socket.socketConfig.optSndBuffer): let wndSize = socket.getRcvWindowSize() - let dataPacket = dataPacket( socket.seqNr, @@ -605,202 +646,35 @@ proc handleDataWrite(socket: UtpSocket, data: seq[byte], writeFut: Future[WriteR dataSlice, socket.replayMicro ) - let outgoingPacket = OutgoingPacket.init(encodePacket(dataPacket), 1, false, payloadLength) + let outgoingPacket = OutgoingPacket.init(encodePacket(dataPacket), 0, false, payloadLength) socket.registerOutgoingPacket(outgoingPacket) - await socket.sendData(outgoingPacket.packetBytes) - except CancelledError as exc: - # write loop has been cancelled in the middle of processing due to the - # socket closing - # this approach can create partial write in when destroying the socket in the - # the middle of the write - doAssert(socket.state == Destroy) - if (not writeFut.finished()): - let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state)) - writeFut.complete(res) - # we need to re-raise exception so the outer loop will be properly cancelled too - raise exc - bytesWritten = bytesWritten + len(dataSlice) + bytesWritten = bytesWritten + len(dataSlice) + socket.flushPackets() + else: + debug "No more place in write buffer", + currentBufferSize = socket.outBufferBytes, + maxBufferSize = socket.socketConfig.optSndBuffer, + nexPacketSize = payloadLength + break + i = lastOrEnd + 1 - # Before completing the future with success (as all data was sent successfully) - # we need to check if user did not cancel write on his end - if (not writeFut.finished()): - writeFut.complete(Result[int, WriteError].ok(bytesWritten)) + return bytesWritten -proc handleClose(socket: UtpSocket): Future[void] {.async.} = - try: - if socket.curWindowPackets == 0: - socket.resetSendTimeout() - - let finEncoded = - encodePacket( - finPacket( - socket.seqNr, - socket.connectionIdSnd, - socket.ackNr, - socket.getRcvWindowSize(), - socket.replayMicro - ) +proc handleClose(socket: UtpSocket) = + let finEncoded = + encodePacket( + finPacket( + socket.seqNr, + socket.connectionIdSnd, + socket.ackNr, + socket.getRcvWindowSize(), + socket.replayMicro ) - socket.registerOutgoingPacket(OutgoingPacket.init(finEncoded, 1, true, 0)) - await socket.sendData(finEncoded) - socket.finSent = true - except CancelledError as exc: - raise exc - -proc writeLoop(socket: UtpSocket): Future[void] {.async.} = - ## Loop that processes writes on socket - try: - while true: - let req = await socket.writeQueue.get() - case req.kind - of Data: - await socket.handleDataWrite(req.data, req.writer) - info "Written data to remote", - to = socket.socketKey, - bytesWritten = len(req.data) - of Close: - await socket.handleClose() - info "Sent FIN to remote", - to = socket.socketKey - - except CancelledError: - doAssert(socket.state == Destroy) - for req in socket.writeQueue.items: - if (req.kind == Data and not req.writer.finished()): - let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state)) - req.writer.complete(res) - socket.writeQueue.clear() - trace "writeLoop canceled" - -proc startWriteLoop(s: UtpSocket) = - s.writeLoop = writeLoop(s) - -proc new[A]( - T: type UtpSocket[A], - to: A, - snd: SendCallback[A], - state: ConnectionState, - cfg: SocketConfig, - direction: ConnectionDirection, - rcvId: uint16, - sndId: uint16, - initialSeqNr: uint16, - initialAckNr: uint16, - initialTimeout: Duration -): T = - let currentTime = getMonoTimestamp().moment - T( - remoteAddress: to, - state: state, - direction: direction, - socketConfig: cfg, - connectionIdRcv: rcvId, - connectionIdSnd: sndId, - seqNr: initialSeqNr, - ackNr: initialAckNr, - connectionFuture: newFuture[void](), - outBuffer: GrowableCircularBuffer[OutgoingPacket].init(), - inBuffer: GrowableCircularBuffer[Packet].init(), - retransmitTimeout: initialTimeout, - rtoTimeout: currentTime + initialTimeout, - # Initial timeout values taken from reference implemntation - rtt: milliseconds(0), - rttVar: milliseconds(800), - rto: milliseconds(3000), - buffer: AsyncBuffer.init(int(cfg.optRcvBuffer)), - closeEvent: newAsyncEvent(), - closeCallbacks: newSeq[Future[void]](), - # start with 1mb assumption, field will be updated with first received packet - sendBufferTracker: SendBufferTracker.new(0, 1024 * 1024, cfg.optSndBuffer, startMaxWindow), - # queue with infinite size - writeQueue: newAsyncQueue[WriteRequest](), - zeroWindowTimer: none[Moment](), - socketKey: UtpSocketKey.init(to, rcvId), - slowStart: true, - fastTimeout: false, - fastResendSeqNr: initialSeqNr, - lastWindowDecay: currentTime - maxWindowDecay, - slowStartTreshold: cfg.optSndBuffer, - ourHistogram: DelayHistogram.init(currentTime), - remoteHistogram: DelayHistogram.init(currentTime), - driftCalculator: ClockDriftCalculator.init(currentTime), - send: snd - ) - -proc newOutgoingSocket*[A]( - to: A, - snd: SendCallback[A], - cfg: SocketConfig, - rcvConnectionId: uint16, - rng: var BrHmacDrbgContext -): UtpSocket[A] = - let sndConnectionId = rcvConnectionId + 1 - let initialSeqNr = randUint16(rng) - - UtpSocket[A].new( - to, - snd, - SynSent, - cfg, - Outgoing, - rcvConnectionId, - sndConnectionId, - initialSeqNr, - # Initialy ack nr is 0, as we do not know remote inital seqnr - 0, - cfg.initialSynTimeout - ) - -proc newIncomingSocket*[A]( - to: A, - snd: SendCallback[A], - cfg: SocketConfig, - connectionId: uint16, - ackNr: uint16, - rng: var BrHmacDrbgContext -): UtpSocket[A] = - let initialSeqNr = randUint16(rng) - - let (initialState, initialTimeout) = - if (cfg.incomingSocketReceiveTimeout.isNone()): - # it does not matter what timeout value we put here, as socket will be in - # connected state without outgoing packets in buffer so any timeout hit will - # just double rto without any penalties - # although we cannont use 0, as then timeout will be constantly re-set to 500ms - # and there will be a lot of not usefull work done - (Connected, defaultInitialSynTimeout) - else: - let timeout = cfg.incomingSocketReceiveTimeout.unsafeGet() - (SynRecv, timeout) - - UtpSocket[A].new( - to, - snd, - initialState, - cfg, - Incoming, - connectionId + 1, - connectionId, - initialSeqNr, - ackNr, - initialTimeout - ) - -proc startOutgoingSocket*(socket: UtpSocket): Future[void] {.async.} = - doAssert(socket.state == SynSent) - let packet = synPacket(socket.seqNr, socket.connectionIdRcv, socket.getRcvWindowSize()) - debug "Sending SYN packet", - seqNr = packet.header.seqNr, - connectionId = packet.header.connectionId - # set number of transmissions to 1 as syn packet will be send just after - # initiliazation - let outgoingPacket = OutgoingPacket.init(encodePacket(packet), 1, false, 0) - socket.registerOutgoingPacket(outgoingPacket) - socket.startWriteLoop() - socket.startTimeoutLoop() - await socket.sendData(outgoingPacket.packetBytes) - await socket.connectionFuture + ) + socket.finSent = true + socket.registerOutgoingPacket(OutgoingPacket.init(finEncoded, 0, false, 0)) + socket.flushPackets() proc isConnected*(socket: UtpSocket): bool = socket.state == Connected @@ -814,7 +688,7 @@ proc destroy*(s: UtpSocket) = ## Moves socket to destroy state and clean all reasources. ## Remote is not notified in any way about socket end of life s.state = Destroy - s.writeLoop.cancel() + s.eventLoop.cancel() s.checkTimeoutsLoop.cancel() s.closeEvent.fire() @@ -897,10 +771,11 @@ proc ackPacket(socket: UtpSocket, seqNr: uint16, currentTime: Moment): AckResult # been considered timed-out, and is not included in # the cur_window anymore if (not packet.needResend): - socket.sendBufferTracker.decreaseCurrentWindow(packet.payloadLength) + doAssert(socket.currentWindow >= packet.payloadLength, "Window should always be larger than packet length") + socket.currentWindow = socket.currentWindow - packet.payloadLength - # we notify all waiters that there is possibly new space in send buffer - socket.sendBufferTracker.notifyWaiters() + # we removed packet from our out going buffer + socket.outBufferBytes = socket.outBufferBytes - packet.payloadLength socket.retransmitCount = 0 PacketAcked @@ -1003,13 +878,13 @@ proc calculateSelectiveAckBytes*(socket: UtpSocket, receivedPackedAckNr: uint16 proc tryDecayWindow(socket: UtpSocket, now: Moment) = if (now - socket.lastWindowDecay >= maxWindowDecay): socket.lastWindowDecay = now - let newMaxWindow = max(uint32(0.5 * float64(socket.sendBufferTracker.maxWindow)), uint32(minWindowSize)) + let newMaxWindow = max(uint32(0.5 * float64(socket.maxWindow)), uint32(minWindowSize)) debug "Decaying maxWindow", - oldWindow = socket.sendBufferTracker.maxWindow, + oldWindow = socket.maxWindow, newWindow = newMaxWindow - socket.sendBufferTracker.updateMaxWindow(newMaxWindow) + socket.maxWindow = newMaxWindow socket.slowStart = false socket.slowStartTreshold = newMaxWindow @@ -1094,7 +969,7 @@ proc selectiveAckPackets(socket: UtpSocket, receivedPackedAckNr: uint16, ext: S registerLoss = true # it is safe to call as we already checked that packet is in send buffer - socket.forceResendPacket(seqNrToResend) + socket.sendPacket(seqNrToResend) socket.fastResendSeqNr = seqNrToResend + 1 debug "Resent packet", @@ -1151,7 +1026,7 @@ proc generateAckPacket*(socket: UtpSocket): Packet = bitmask ) -proc sendAck(socket: UtpSocket): Future[void] = +proc sendAck(socket: UtpSocket) = ## Creates and sends ack, based on current socket state. Acks are different from ## other packets as we do not track them in outgoing buffet @@ -1164,18 +1039,9 @@ proc sendAck(socket: UtpSocket): Future[void] = socket.sendData(encodePacket(ackPacket)) -proc startIncomingSocket*(socket: UtpSocket) {.async.} = - # Make sure ack was flushed before moving forward - await socket.sendAck() - socket.startWriteLoop() - socket.startTimeoutLoop() - # TODO at socket level we should handle only FIN/DATA/ACK packets. Refactor to make # it enforcable by type system -# TODO re-think synchronization of this procedure, as each await inside gives control -# to scheduler which means there could be potentialy several processPacket procs -# running -proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = +proc processPacketInternal(socket: UtpSocket, p: Packet) = debug "Process packet", socketKey = socket.socketKey, @@ -1267,7 +1133,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = let isPossibleDuplicatedOldPacket = pastExpected >= (int(uint16.high) + 1) - reorderBufferMaxSize if (isPossibleDuplicatedOldPacket and p.header.pType != ST_STATE): - discard socket.sendAck() + socket.sendAck() debug "Got an invalid packet sequence number, too far off", pastExpected = pastExpected @@ -1326,7 +1192,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = let currentPacketSize = uint32(socket.getPacketSize()) let (newMaxWindow, newSlowStartTreshold, newSlowStart) = applyCongestionControl( - socket.sendBufferTracker.maxWindow, + socket.maxWindow, socket.slowStart, socket.slowStartTreshold, socket.socketConfig.optSndBuffer, @@ -1339,7 +1205,8 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = ) # update remote window size and max window - socket.sendBufferTracker.updateMaxWindowSize(p.header.wndSize, newMaxWindow) + socket.maxWindow = newMaxWindow + socket.maxRemoteWindow = p.header.wndSize socket.slowStart = newSlowStart socket.slowStartTreshold = newSlowStartTreshold @@ -1349,7 +1216,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = slowStartTreshold = newSlowStartTreshold, slowstart = newSlowStart - if (socket.zeroWindowTimer.isNone() and socket.sendBufferTracker.maxRemoteWindow <= currentPacketSize): + if (socket.zeroWindowTimer.isNone() and socket.maxRemoteWindow <= currentPacketSize): # when zeroWindowTimer will be hit and maxRemoteWindow still will be equal to 0 # then it will be reset to minimal value socket.zeroWindowTimer = some(timestampInfo.moment + socket.socketConfig.remoteWindowResetTimeout) @@ -1407,7 +1274,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = # Is is safe to call force resend as we already checked shouldReSendPacket # condition - socket.forceResendPacket(oldestOutstandingPktSeqNr) + socket.sendPacket(oldestOutstandingPktSeqNr) if (p.eack.isSome()): socket.selectiveAckPackets(pkAckNr, p.eack.unsafeGet(), timestampInfo.moment) @@ -1432,11 +1299,28 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = # we got in order packet if (pastExpected == 0 and (not socket.reachedFin)): debug "Received in order packet" - if (len(p.payload) > 0 and (not socket.readShutdown)): + let payloadLength = len(p.payload) + if (payloadLength > 0 and (not socket.readShutdown)): + # we need to sum both rcv buffer and reorder buffer + if (uint32(socket.offset) + socket.inBufferBytes + uint32(payloadLength) > socket.socketConfig.optRcvBuffer): + # even though packet is in order and passes all the checks, it would + # overflow our receive buffer, it means that we are receiving data + # faster than we are reading it. Do not ack this packet, and drop received + # data + debug "Recevied packet would overflow receive buffer dropping it", + pkSeqNr = p.header.seqNr, + bytesReceived = payloadLength, + rcvbufferSize = socket.offset, + reorderBufferSize = socket.inBufferBytes + return + debug "Received data packet", - bytesReceived = len(p.payload) + bytesReceived = payloadLength # we are getting in order data packet, we can flush data directly to the incoming buffer - await upload(addr socket.buffer, unsafeAddr p.payload[0], p.payload.len()) + # await upload(addr socket.buffer, unsafeAddr p.payload[0], p.payload.len()) + moveMem(addr socket.rcvBuffer[socket.offset], unsafeAddr p.payload[0], payloadLength) + socket.offset = socket.offset + payloadLength + # Bytes have been passed to upper layer, we can increase number of last # acked packet inc socket.ackNr @@ -1462,9 +1346,6 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = # ignore following packets socket.reorderCount = 0 - # notify all readers we have reached eof - socket.buffer.forget() - if socket.reorderCount == 0: break @@ -1476,21 +1357,30 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = break let packet = maybePacket.unsafeGet() + let reorderPacketPayloadLength = len(packet.payload) - if (len(packet.payload) > 0 and (not socket.readShutdown)): + if (reorderPacketPayloadLength > 0 and (not socket.readShutdown)): debug "Got packet from reorder buffer", packetBytes = len(packet.payload), packetSeqNr = packet.header.seqNr, packetAckNr = packet.header.ackNr, socketSeqNr = socket.seqNr, - socekrAckNr = socket.ackNr + socektAckNr = socket.ackNr, + rcvbufferSize = socket.offset, + reorderBufferSize = socket.inBufferBytes + + # Rcv buffer and reorder buffer are sized that it is always possible to + # move data from reorder buffer to rcv buffer without overflow + moveMem(addr socket.rcvBuffer[socket.offset], unsafeAddr packet.payload[0], reorderPacketPayloadLength) + socket.offset = socket.offset + reorderPacketPayloadLength - await upload(addr socket.buffer, unsafeAddr packet.payload[0], packet.payload.len()) + debug "Deleting packet", + seqNr = nextPacketNum socket.inBuffer.delete(nextPacketNum) - inc socket.ackNr dec socket.reorderCount + socket.inBufferBytes = socket.inBufferBytes - uint32(reorderPacketPayloadLength) debug "Socket state after processing in order packet", socketKey = socket.socketKey, @@ -1502,7 +1392,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = # need improvement, as with this approach there is no direct control over # how many concurrent tasks there are and how to cancel them when socket # is closed - discard socket.sendAck() + socket.sendAck() # we got packet out of order else: @@ -1523,13 +1413,27 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = debug "Packet with seqNr already received", seqNr = pkSeqNr else: - socket.inBuffer.put(pkSeqNr, p) - inc socket.reorderCount - debug "added out of order packet to reorder buffer", - reorderCount = socket.reorderCount - # we send ack packet, as we reoreder count is > 0, so the eack bitmask will be - # generated - discard socket.sendAck() + let payloadLength = uint32(len(p.payload)) + if (socket.inBufferBytes + payloadLength <= socket.socketConfig.maxSizeOfReorderBuffer and + socket.inBufferBytes + uint32(socket.offset) + payloadLength <= socket.socketConfig.optRcvBuffer): + + debug "store packet in reorder buffer", + packetBytes = payloadLength, + packetSeqNr = p.header.seqNr, + packetAckNr = p.header.ackNr, + socketSeqNr = socket.seqNr, + socektAckNr = socket.ackNr, + rcvbufferSize = socket.offset, + reorderBufferSize = socket.inBufferBytes + + socket.inBuffer.put(pkSeqNr, p) + inc socket.reorderCount + socket.inBufferBytes = socket.inBufferBytes + payloadLength + debug "added out of order packet to reorder buffer", + reorderCount = socket.reorderCount + # we send ack packet, as we reoreder count is > 0, so the eack bitmask will be + # generated + socket.sendAck() of ST_STATE: if (socket.state == SynSent and (not socket.connectionFuture.finished())): @@ -1547,10 +1451,158 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = of ST_SYN: debug "Received ST_SYN on known socket, ignoring" +proc processPacket*(socket: UtpSocket, p: Packet): Future[void] = + socket.eventQueue.put(SocketEvent(kind: NewPacket, packet: p)) + +template shiftBuffer(t, c: untyped) = + if (t).offset > c: + if c > 0: + moveMem(addr((t).rcvBuffer[0]), addr((t).rcvBuffer[(c)]), (t).offset - (c)) + (t).offset = (t).offset - (c) + else: + (t).offset = 0 + +proc onRead(socket: UtpSocket, readReq: var ReadReq): ReadResult = + if readReq.reader.finished(): + return ReadCancelled + + if socket.atEof(): + # buffer is already empty and we reached remote fin, just finish read with whatever + # was already read + readReq.reader.complete(readReq.bytesAvailable) + return SocketAlreadyFinished + + if readReq.bytesToRead == 0: + # treat is as read till eof + readReq.bytesAvailable.add(socket.rcvBuffer.toOpenArray(0, socket.offset - 1)) + socket.shiftBuffer(socket.offset) + if (socket.atEof()): + readReq.reader.complete(readReq.bytesAvailable) + return ReadFinished + else: + return ReadNotFinished + else: + let bytesAlreadyRead = len(readReq.bytesAvailable) + let bytesLeftToRead = readReq.bytesToRead - bytesAlreadyRead + let count = min(socket.offset, bytesLeftToRead) + readReq.bytesAvailable.add(socket.rcvBuffer.toOpenArray(0, count - 1)) + socket.shiftBuffer(count) + if (len(readReq.bytesAvailable) == readReq.bytesToRead): + readReq.reader.complete(readReq.bytesAvailable) + return ReadFinished + else: + return ReadNotFinished + +proc eventLoop(socket: UtpSocket) {.async.} = + try: + while true: + let ev = await socket.eventQueue.get() + case ev.kind + of NewPacket: + socket.processPacketInternal(ev.packet) + + # we processed a packet and rcv buffer size is larger than 0, + # check if we can finish some pending readers + while socket.pendingReads.len() > 0 and socket.offset > 0: + let readResult = socket.onRead(socket.pendingReads[0]) + case readResult + of ReadFinished: + discard socket.pendingReads.popFirst() + of ReadNotFinished: + # there was not enough bytes in buffer to finish this read request, + # stop processing fruther reeads + break + else: + # read was cancelled or socket is already finished move on to next read + # request + discard socket.pendingReads.popFirst() + + # we processed packet, so there could more place in the send buffer + while socket.pendingWrites.len() > 0: + let wr = socket.pendingWrites.popFirst() + case wr.kind + of Close: + socket.handleClose() + # close should be last packet send + break + of Data: + # check if writing was not cancelled in the mean time. This approach + # can create partial writes as part of the data could be written with + # with WriteReq + if (not wr.writer.finished()): + let bytesWritten = socket.handleDataWrite(wr.data) + if (bytesWritten == len(wr.data)): + # all bytes were written we can finish external future + wr.writer.complete(Result[int, WriteError].ok(bytesWritten)) + else: + let bytesLeft = wr.data[bytesWritten..ev.data.high] + # bytes partially written to buffer, schedule rest of data for later + socket.pendingWrites.addFirst(WriteRequest(kind: Data, data: bytesLeft, writer: ev.writer)) + # there is no more place in the buffer break from the loop + break + of CheckTimeouts: + discard + of CloseReq: + if (socket.pendingWrites.len() > 0): + # there are still some unfinished writes, waiting to be finished + socket.pendingWrites.addLast(WriteRequest(kind: Close)) + else: + socket.handleClose() + of WriteReq: + # check if the writer was not cancelled in mean time + if (not ev.writer.finished()): + if (socket.pendingWrites.len() > 0): + # there are still some unfinished writes, waiting to be finished schdule this batch for later + socket.pendingWrites.addLast(WriteRequest(kind: Data, data: ev.data, writer: ev.writer)) + else: + let bytesWritten = socket.handleDataWrite(ev.data) + if (bytesWritten == len(ev.data)): + # all bytes were written we can finish external future + ev.writer.complete(Result[int, WriteError].ok(bytesWritten)) + else: + let bytesLeft = ev.data[bytesWritten..ev.data.high] + # bytes partially written to buffer, schedule rest of data for later + socket.pendingWrites.addLast(WriteRequest(kind: Data, data: bytesLeft, writer: ev.writer)) + of ReadReqType: + # check if the writer was not cancelled in mean time + if (not ev.readReq.reader.finished()): + if (socket.pendingReads.len() > 0): + # there is already pending unfininshed read request, schedule this one for + # later + socket.pendingReads.addLast(ev.readReq) + else: + var readReq = ev.readReq + let readResult = socket.onRead(readReq) + case readResult + of ReadNotFinished: + socket.pendingReads.addLast(readReq) + else: + # in any other case we do not need to do any thing + discard + + socket.checkTimeouts() + except CancelledError: + for w in socket.pendingWrites.items(): + if w.kind == Data and (not w.writer.finished()): + let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state)) + w.writer.complete(res) + for r in socket.pendingReads.items(): + # complete every reader with already read bytes + # TODO: it maybe better to refine read api to returl Future[Result[seq[byte], E]] + # and return erros for not finished reads + if (not r.reader.finished()): + r.reader.complete(r.bytesAvailable) + socket.pendingWrites.clear() + socket.pendingReads.clear() + trace "main socket event loop cancelled" + +proc startEventLoop(s: UtpSocket) = + s.eventLoop = eventLoop(s) + proc atEof*(socket: UtpSocket): bool = # socket is considered at eof when remote side sent us fin packet # and we have processed all packets up to fin - socket.buffer.dataLen() == 0 and socket.reachedFin + socket.offset == 0 and socket.reachedFin proc readingClosed(socket: UtpSocket): bool = socket.atEof() or socket.state == Destroy @@ -1569,7 +1621,7 @@ proc close*(socket: UtpSocket) = # with this approach, all pending writes will be executed before sending fin packet # we could also and method which places close request as first one to process # but it would complicate the write loop - socket.writeQueue.putNoWait(WriteRequest(kind: Close)) + socket.eventQueue.putNoWait(SocketEvent(kind: CloseReq)) except AsyncQueueFullError as e: # should not happen as our write queue is unbounded raiseAssert e.msg @@ -1616,65 +1668,62 @@ proc write*(socket: UtpSocket, data: seq[byte]): Future[WriteResult] = return retFuture try: - socket.writeQueue.putNoWait(WriteRequest(kind: Data, data: data, writer: retFuture)) + socket.eventQueue.putNoWait(SocketEvent(kind: WriteReq, data: data, writer: retFuture)) except AsyncQueueFullError as e: # this should not happen as out write queue is unbounded raiseAssert e.msg return retFuture -template readLoop(body: untyped): untyped = - while true: - let (consumed, done) = body - socket.buffer.shift(consumed) - if done: - break - else: - if not(socket.readingClosed()): - await socket.buffer.wait() - -proc read*(socket: UtpSocket, n: Natural): Future[seq[byte]] {.async.}= - ## Read all bytes `n` bytes from socket ``socket``. - ## - ## This procedure allocates buffer seq[byte] and return it as result. - var bytes = newSeq[byte]() - - if n == 0: - return bytes - - readLoop(): - if socket.readingClosed(): - (0, true) - else: - let count = min(socket.buffer.dataLen(), n - len(bytes)) - bytes.add(socket.buffer.buffer.toOpenArray(0, count - 1)) - (count, len(bytes) == n) - - debug "Read data ", - remote = socket.socketKey, - length = n - - return bytes - -proc read*(socket: UtpSocket): Future[seq[byte]] {.async.}= +proc read*(socket: UtpSocket, n: Natural): Future[seq[byte]] = ## Read all bytes from socket ``socket``. ## ## This procedure allocates buffer seq[byte] and return it as result. - var bytes = newSeq[byte]() + let fut = newFuture[seq[uint8]]() - readLoop(): - if socket.readingClosed(): - (0, true) - else: - let count = socket.buffer.dataLen() - bytes.add(socket.buffer.buffer.toOpenArray(0, count - 1)) - (count, false) + if socket.readingClosed(): + fut.complete(newSeq[uint8]()) + return fut - debug "Read data ", - remote = socket.socketKey, - length = len(bytes) + try: + socket.eventQueue.putNoWait( + SocketEvent( + kind:ReadReqType, + readReq: ReadReq( + bytesToRead: n, + bytesAvailable: newSeq[uint8](), + reader: fut)) + ) + except AsyncQueueFullError as e: + # should not happen as our write queue is unbounded + raiseAssert e.msg - return bytes + return fut + +proc read*(socket: UtpSocket): Future[seq[byte]] = + ## Read all bytes from socket ``socket``. + ## + ## This procedure allocates buffer seq[byte] and return it as result. + let fut = newFuture[seq[uint8]]() + + if socket.readingClosed(): + fut.complete(newSeq[uint8]()) + return fut + + try: + socket.eventQueue.putNoWait( + SocketEvent( + kind:ReadReqType, + readReq: ReadReq( + bytesToRead: 0, + bytesAvailable: newSeq[uint8](), + reader: fut)) + ) + except AsyncQueueFullError as e: + # should not happen as our write queue is unbounded + raiseAssert e.msg + + return fut # Check how many packets are still in the out going buffer, usefull for tests or # debugging. @@ -1686,10 +1735,10 @@ proc numPacketsInOutGoingBuffer*(socket: UtpSocket): int = num # Check how many payload bytes are still in flight -proc numOfBytesInFlight*(socket: UtpSocket): uint32 = socket.sendBufferTracker.currentBytesInFlight() +proc numOfBytesInFlight*(socket: UtpSocket): uint32 = socket.currentWindow # Check how many bytes are in incoming buffer -proc numOfBytesInIncomingBuffer*(socket: UtpSocket): uint32 = uint32(socket.buffer.dataLen()) +proc numOfBytesInIncomingBuffer*(socket: UtpSocket): uint32 = uint32(socket.offset) # Check how many packets are still in the reorder buffer, usefull for tests or # debugging. @@ -1702,6 +1751,8 @@ proc numPacketsInReordedBuffer*(socket: UtpSocket): int = doAssert(num == int(socket.reorderCount)) num +proc numOfEventsInEventQueue*(socket: UtpSocket): int = len(socket.eventQueue) + proc connectionId*[A](socket: UtpSocket[A]): uint16 = ## Connection id is id which is used in first SYN packet which establishes the connection ## so for Outgoing side it is actually its rcv_id, and for Incoming side it is @@ -1714,4 +1765,140 @@ proc connectionId*[A](socket: UtpSocket[A]): uint16 = # Check what is current available window size for this socket proc currentMaxWindowSize*[A](socket: UtpSocket[A]): uint32 = - socket.sendBufferTracker.maxWindow + socket.maxWindow + +proc new[A]( + T: type UtpSocket[A], + to: A, + snd: SendCallback[A], + state: ConnectionState, + cfg: SocketConfig, + direction: ConnectionDirection, + rcvId: uint16, + sndId: uint16, + initialSeqNr: uint16, + initialAckNr: uint16, + initialTimeout: Duration +): T = + let currentTime = getMonoTimestamp().moment + T( + remoteAddress: to, + state: state, + direction: direction, + socketConfig: cfg, + connectionIdRcv: rcvId, + connectionIdSnd: sndId, + seqNr: initialSeqNr, + ackNr: initialAckNr, + connectionFuture: newFuture[void](), + outBuffer: GrowableCircularBuffer[OutgoingPacket].init(), + outBufferBytes: 0, + currentWindow: 0, + # start with 1mb assumption, field will be updated with first received packet + maxRemoteWindow: 1024 * 1024, + maxWindow: startMaxWindow, + inBuffer: GrowableCircularBuffer[Packet].init(), + retransmitTimeout: initialTimeout, + rtoTimeout: currentTime + initialTimeout, + # Initial timeout values taken from reference implemntation + rtt: milliseconds(0), + rttVar: milliseconds(800), + rto: milliseconds(3000), + rcvBuffer: newSeq[uint8](int(cfg.optRcvBuffer)), + pendingReads: initDeque[ReadReq](), + closeEvent: newAsyncEvent(), + closeCallbacks: newSeq[Future[void]](), + pendingWrites: initDeque[WriteRequest](), + eventQueue: newAsyncQueue[SocketEvent](), + zeroWindowTimer: none[Moment](), + socketKey: UtpSocketKey.init(to, rcvId), + slowStart: true, + fastTimeout: false, + fastResendSeqNr: initialSeqNr, + lastWindowDecay: currentTime - maxWindowDecay, + slowStartTreshold: cfg.optSndBuffer, + ourHistogram: DelayHistogram.init(currentTime), + remoteHistogram: DelayHistogram.init(currentTime), + driftCalculator: ClockDriftCalculator.init(currentTime), + send: snd + ) + +proc newOutgoingSocket*[A]( + to: A, + snd: SendCallback[A], + cfg: SocketConfig, + rcvConnectionId: uint16, + rng: var BrHmacDrbgContext +): UtpSocket[A] = + let sndConnectionId = rcvConnectionId + 1 + let initialSeqNr = randUint16(rng) + + UtpSocket[A].new( + to, + snd, + SynSent, + cfg, + Outgoing, + rcvConnectionId, + sndConnectionId, + initialSeqNr, + # Initialy ack nr is 0, as we do not know remote inital seqnr + 0, + cfg.initialSynTimeout + ) + +proc newIncomingSocket*[A]( + to: A, + snd: SendCallback[A], + cfg: SocketConfig, + connectionId: uint16, + ackNr: uint16, + rng: var BrHmacDrbgContext +): UtpSocket[A] = + let initialSeqNr = randUint16(rng) + + let (initialState, initialTimeout) = + if (cfg.incomingSocketReceiveTimeout.isNone()): + # it does not matter what timeout value we put here, as socket will be in + # connected state without outgoing packets in buffer so any timeout hit will + # just double rto without any penalties + # although we cannont use 0, as then timeout will be constantly re-set to 500ms + # and there will be a lot of not usefull work done + (Connected, defaultInitialSynTimeout) + else: + let timeout = cfg.incomingSocketReceiveTimeout.unsafeGet() + (SynRecv, timeout) + + UtpSocket[A].new( + to, + snd, + initialState, + cfg, + Incoming, + connectionId + 1, + connectionId, + initialSeqNr, + ackNr, + initialTimeout + ) + +proc startIncomingSocket*(socket: UtpSocket) = + # Make sure ack was flushed before moving forward + socket.sendAck() + socket.startEventLoop() + socket.startTimeoutLoop() + +proc startOutgoingSocket*(socket: UtpSocket): Future[void] = + doAssert(socket.state == SynSent) + let packet = synPacket(socket.seqNr, socket.connectionIdRcv, socket.getRcvWindowSize()) + debug "Sending SYN packet", + seqNr = packet.header.seqNr, + connectionId = packet.header.connectionId + # set number of transmissions to 1 as syn packet will be send just after + # initiliazation + let outgoingPacket = OutgoingPacket.init(encodePacket(packet), 1, false, 0) + socket.registerOutgoingPacket(outgoingPacket) + socket.startEventLoop() + socket.startTimeoutLoop() + socket.sendData(outgoingPacket.packetBytes) + return socket.connectionFuture diff --git a/tests/utp/test_protocol_integration.nim b/tests/utp/test_protocol_integration.nim index 43e712f..d3121a9 100644 --- a/tests/utp/test_protocol_integration.nim +++ b/tests/utp/test_protocol_integration.nim @@ -123,11 +123,10 @@ procSuite "Utp protocol over udp tests with loss and delays": let testCases = @[ TestCase.init(45, 10, 40000), - TestCase.init(45, 15, 40000), - TestCase.init(50, 20, 20000), + TestCase.init(25, 15, 40000), # super small recv buffer which will be constantly on the brink of being full - TestCase.init(15, 5, 80000, SocketConfig.init(optRcvBuffer = uint32(2000), remoteWindowResetTimeout = seconds(5))), - TestCase.init(15, 10, 80000, SocketConfig.init(optRcvBuffer = uint32(2000), remoteWindowResetTimeout = seconds(5))) + TestCase.init(15, 5, 40000, SocketConfig.init(optRcvBuffer = uint32(6000), remoteWindowResetTimeout = seconds(5))), + TestCase.init(15, 10, 40000, SocketConfig.init(optRcvBuffer = uint32(6000), remoteWindowResetTimeout = seconds(5))) ] asyncTest "Write and Read large data in different network conditions": @@ -173,9 +172,9 @@ procSuite "Utp protocol over udp tests with loss and delays": let testCases1 = @[ # small buffers so it will fill up between reads - TestCase.init(15, 5, 60000, SocketConfig.init(optRcvBuffer = uint32(2000), remoteWindowResetTimeout = seconds(5)), 10000), - TestCase.init(15, 10, 60000, SocketConfig.init(optRcvBuffer = uint32(2000), remoteWindowResetTimeout = seconds(5)), 10000), - TestCase.init(15, 15, 60000, SocketConfig.init(optRcvBuffer = uint32(2000), remoteWindowResetTimeout = seconds(5)), 10000) + TestCase.init(15, 5, 40000, SocketConfig.init(optRcvBuffer = uint32(6000), remoteWindowResetTimeout = seconds(5)), 10000), + TestCase.init(15, 10, 40000, SocketConfig.init(optRcvBuffer = uint32(6000), remoteWindowResetTimeout = seconds(5)), 10000), + TestCase.init(15, 15, 40000, SocketConfig.init(optRcvBuffer = uint32(6000), remoteWindowResetTimeout = seconds(5)), 10000) ] proc readWithMultipleReads(s: UtpSocket[TransportAddress], numOfReads: int, bytesPerRead: int): Future[seq[byte]] {.async.}= diff --git a/tests/utp/test_utils.nim b/tests/utp/test_utils.nim index 30209ce..556b558 100644 --- a/tests/utp/test_utils.nim +++ b/tests/utp/test_utils.nim @@ -44,7 +44,7 @@ template connectOutGoingSocket*( ) await sock1.processPacket(responseAck) - + await waitUntil(proc (): bool = sock1.isConnected()) check: sock1.isConnected() @@ -72,12 +72,14 @@ template connectOutGoingSocketWithIncoming*( rng[] ) - await incomingSocket.startIncomingSocket() + incomingSocket.startIncomingSocket() let responseAck = await incomingQueue.get() await outgoingSocket.processPacket(responseAck) - + + await waitUntil(proc (): bool = outgoingSocket.isConnected()) + check: outgoingSocket.isConnected() diff --git a/tests/utp/test_utp_router.nim b/tests/utp/test_utp_router.nim index 11b8cc2..75b6816 100644 --- a/tests/utp/test_utp_router.nim +++ b/tests/utp/test_utp_router.nim @@ -188,6 +188,8 @@ procSuite "Utp router unit tests": await router.processIncomingBytes(encodedData, testSender) + await waitUntil(proc (): bool = socket.numOfEventsInEventQueue() == 0) + check: socket.isConnected() @@ -350,8 +352,8 @@ procSuite "Utp router unit tests": check: connectResult.isErr() - connectResult.error().kind == ErrorWhileSendingSyn - cast[TestError](connectResult.error().error) is TestError + # even though send is failing we will just finish with timeout, + connectResult.error().kind == ConnectionTimedOut router.len() == 0 asyncTest "Router should clear closed outgoing connections": diff --git a/tests/utp/test_utp_socket.nim b/tests/utp/test_utp_socket.nim index fb43436..fbae5f8 100644 --- a/tests/utp/test_utp_socket.nim +++ b/tests/utp/test_utp_socket.nim @@ -295,7 +295,6 @@ procSuite "Utp socket unit test": await outgoingSocket.destroyWait() asyncTest "Ignoring totally out of order packet": - # TODO test is valid until implementing selective acks let q = newAsyncQueue[Packet]() let initalRemoteSeqNr = 10'u16 @@ -305,11 +304,11 @@ procSuite "Utp socket unit test": await outgoingSocket.processPacket(packets[1024]) - check: - outgoingSocket.numPacketsInReordedBuffer() == 0 - await outgoingSocket.processPacket(packets[1023]) + # give some time to process those packets + await sleepAsync(milliseconds(500)) + check: outgoingSocket.numPacketsInReordedBuffer() == 1 @@ -349,6 +348,8 @@ procSuite "Utp socket unit test": await outgoingSocket.processPacket(responseAck) + await waitUntil(proc (): bool = outgoingSocket.numPacketsInOutGoingBuffer() == 0) + check: outgoingSocket.numPacketsInOutGoingBuffer() == 0 @@ -427,7 +428,7 @@ procSuite "Utp socket unit test": let dataToWrite1 = @[0'u8] let dataToWrite2 = @[1'u8] - let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, 0) + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, cfg = SocketConfig.init(optSndBuffer = 0)) let writeFut1 = outgoingSocket.write(dataToWrite1) let writeFut2 = outgoingSocket.write(dataToWrite2) @@ -531,6 +532,8 @@ procSuite "Utp socket unit test": await outgoingSocket.processPacket(responseAck) + await waitUntil(proc (): bool = outgoingSocket.isConnected()) + check: outgoingSocket.isConnected() @@ -768,6 +771,8 @@ procSuite "Utp socket unit test": await outgoingSocket.processPacket(responseAck) + await waitUntil(proc (): bool = not outgoingSocket.isConnected()) + check: not outgoingSocket.isConnected() @@ -1005,6 +1010,8 @@ procSuite "Utp socket unit test": await outgoingSocket.processPacket(responseAck) + await waitUntil(proc (): bool = int(outgoingSocket.numOfBytesInFlight) == len(dataToWrite)) + check: # only first packet has been acked so there should still by 5 bytes left int(outgoingSocket.numOfBytesInFlight) == len(dataToWrite) @@ -1052,18 +1059,18 @@ procSuite "Utp socket unit test": let q = newAsyncQueue[Packet]() let initialRemoteSeq = 10'u16 - let dataToWrite = @[1'u8, 2, 3, 4, 5] + let dataToWrite = generateByteArray(rng[], 1001) # remote is initialized with buffer to small to handle whole payload - let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, uint32(len(dataToWrite) - 1)) + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, cfg = SocketConfig.init(optSndBuffer = 1000)) let writeFut = outgoingSocket.write(dataToWrite) # wait some time to check future is not finished await sleepAsync(seconds(2)) - # write is not finished as future is blocked from progressing due to to small - # send window + # write is not finished as future is blocked from progressing due to to full + # send buffer check: not writeFut.finished() @@ -1071,20 +1078,18 @@ procSuite "Utp socket unit test": ackPacket( initialRemoteSeq, initialPacket.header.connectionId, - initialPacket.header.seqNr, - uint32(len(dataToWrite)), + initialPacket.header.seqNr + 1, + testBufferSize, 0 ) await outgoingSocket.processPacket(someAckFromRemote) - # after processing packet with increased buffer size write should complete and - # packet should be sent - let sentPacket = await q.get() + # only after processing ack write will progress + let writeResult = await writeFut check: - sentPacket.payload == dataToWrite - writeFut.finished() + writeResult.isOK() await outgoingSocket.destroyWait() @@ -1092,30 +1097,21 @@ procSuite "Utp socket unit test": let q = newAsyncQueue[Packet]() let initialRemoteSeq = 10'u16 - let dataToWrite = @[1'u8, 2, 3, 4, 5] - + let dataToWirte = 1160 # remote is initialized with buffer to small to handle whole payload - let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q) - let remoteRcvWindowSize = uint32(outgoingSocket.getPacketSize()) - let someAckFromRemote = - ackPacket( - initialRemoteSeq, - initialPacket.header.connectionId, - initialPacket.header.seqNr, - remoteRcvWindowSize, - 0 - ) + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q, cfg = SocketConfig.init(optSndBuffer = 1160)) - # we are using ack from remote to setup our snd window size to one packet size on one packet - await outgoingSocket.processPacket(someAckFromRemote) + let twoPacketData = generateByteArray(rng[], int(dataToWirte)) - let twoPacketData = generateByteArray(rng[], int(2 * remoteRcvWindowSize)) + let writeResult = await outgoingSocket.write(twoPacketData) + check: + writeResult.isOk() + + # this write will not progress as snd buffer is full let writeFut = outgoingSocket.write(twoPacketData) - # after this time first packet will be send and will timeout, but the write should not - # finish, as timeouting packets do not notify writing about new space in snd - # buffer + # we wait for packets to timeout await sleepAsync(seconds(2)) check: @@ -1162,15 +1158,22 @@ procSuite "Utp socket unit test": check: packet.header.pType == ST_DATA uint32(len(packet.payload)) == remoteRcvWindowSize - not writeFut.finished + + let packet1Fut = q.get() + + await sleepAsync(milliseconds(500)) + + check: + not packet1Fut.finished() await outgoingSocket.processPacket(firstAckFromRemote) - let packet1 = await q.get() - let writeResult = await writeFut + # packet is sent only after first packet is acked + let packet1 = await packet1Fut check: packet1.header.pType == ST_DATA + packet1.header.seqNr == packet.header.seqNr + 1 writeFut.finished await outgoingSocket.destroyWait() @@ -1192,19 +1195,10 @@ procSuite "Utp socket unit test": check: outgoingSocket.isConnected() - let writeFut = outgoingSocket.write(someData) - - await sleepAsync(seconds(1)) - - check: - # Even after 1 second write is not finished as we did not receive any message - # so remote rcv window is still zero - not writeFut.finished() - - # Ultimately, after 3 second remote rcv window will be reseted to minimal value - # and write will be able to progress - let writeResult = await writeFut - + # write result will be successfull as send buffer has space + let writeResult = await outgoingSocket.write(someData) + + # this will finish in seconds(3) as only after this time window will be set to min value let p = await q.get() check: diff --git a/tests/utp/test_utp_socket_sack.nim b/tests/utp/test_utp_socket_sack.nim index 0433797..567d7e7 100644 --- a/tests/utp/test_utp_socket_sack.nim +++ b/tests/utp/test_utp_socket_sack.nim @@ -48,6 +48,8 @@ procSuite "Utp socket selective acks unit test": for p in dataPackets: await outgoingSocket.processPacket(p) + await waitUntil(proc (): bool = outgoingSocket.numOfEventsInEventQueue() == 0) + let extArray = outgoingSocket.generateSelectiveAckBitMask() await outgoingSocket.destroyWait() @@ -170,6 +172,7 @@ procSuite "Utp socket selective acks unit test": for toDeliver in testCase.packetsDelivered: await incomingSocket.processPacket(packets[toDeliver]) + await waitUntil(proc (): bool = incomingSocket.numOfEventsInEventQueue() == 0) return (outgoingSocket, incomingSocket, packets) @@ -248,8 +251,12 @@ procSuite "Utp socket selective acks unit test": await outgoingSocket.processPacket(finalAck) + let expectedPackets = testCase.numOfPackets - len(testCase.packetsDelivered) + + await waitUntil(proc (): bool = outgoingSocket.numPacketsInOutGoingBuffer() == expectedPackets) + check: - outgoingSocket.numPacketsInOutGoingBuffer() == testCase.numOfPackets - len(testCase.packetsDelivered) + outgoingSocket.numPacketsInOutGoingBuffer() == expectedPackets await outgoingSocket.destroyWait() await incomingSocket.destroyWait() @@ -299,6 +306,8 @@ procSuite "Utp socket selective acks unit test": await outgoingSocket.processPacket(finalAck) + await waitUntil(proc (): bool = outgoingSocket.numOfEventsInEventQueue() == 0) + for idx in testCase.packetsResent: let resentPacket = await outgoingQueue.get() check: