From fd4f78d1c0d23766d18140e8bc0eace2671e3aef Mon Sep 17 00:00:00 2001 From: KonradStaniec Date: Mon, 25 Oct 2021 09:58:13 +0200 Subject: [PATCH] Add timeout loop (#416) * Modify outbuffer Each element of outbuffer keeps encoded packet ,number of transmissions of givern packet and information if given packet needs to be re-send. * Add initial handling of timeouts * Add tests for syn re-sends --- eth/utp/growable_buffer.nim | 25 ++- eth/utp/utp.nim | 2 +- eth/utp/utp_protocol.nim | 389 +++++++++++++++++++++++++++++++----- tests/utp/test_buffer.nim | 38 ++++ tests/utp/test_protocol.nim | 48 +++++ 5 files changed, 449 insertions(+), 53 deletions(-) diff --git a/eth/utp/growable_buffer.nim b/eth/utp/growable_buffer.nim index 7ba4194..07c0fe6 100644 --- a/eth/utp/growable_buffer.nim +++ b/eth/utp/growable_buffer.nim @@ -1,5 +1,5 @@ import - std/[options, math] + std/[options, math, sugar] export options @@ -10,9 +10,10 @@ export options # utp implementation. # Another alternative would be to use standard deque from deques module, and caluclate # item indexes from their sequence numbers. -type GrowableCircularBuffer*[A] = object - items: seq[Option[A]] - mask: int +type + GrowableCircularBuffer*[A] = object + items: seq[Option[A]] + mask: int # provided size will always be adjusted to next power of two proc init*[A](T: type GrowableCircularBuffer[A], size: Natural = 16): T = @@ -34,6 +35,22 @@ proc put*[A](buff: var GrowableCircularBuffer[A], i: Natural, elem: A) = proc delete*[A](buff: var GrowableCircularBuffer[A], i: Natural) = buff.putImpl(i, none[A]()) +proc hasKey*[A](buff: GrowableCircularBuffer[A], i: Natural): bool = + buff.get(i).isSome() + +proc exists*[A](buff: GrowableCircularBuffer[A], i: Natural, check: proc (x: A): bool): bool = + let maybeElem = buff.get(i) + if (maybeElem.isSome()): + let elem = maybeElem.unsafeGet() + check(elem) + else: + false + +proc `[]`*[A](buff: var GrowableCircularBuffer[A], i: Natural): var A = + ## Returns contents of the `var GrowableCircularBuffer`. If it is not set, then an exception + ## is thrown. + buff.items[i and buff.mask].get() + proc len*[A](buff: GrowableCircularBuffer[A]): int = buff.mask + 1 diff --git a/eth/utp/utp.nim b/eth/utp/utp.nim index 9c4dd51..8875598 100644 --- a/eth/utp/utp.nim +++ b/eth/utp/utp.nim @@ -37,7 +37,7 @@ when isMainModule: let helloUtp = "Helllo from nim implementation" let bytes = helloUtp.toBytes() - waitFor soc.write(bytes) + discard waitFor soc.write(bytes) runForever() diff --git a/eth/utp/utp_protocol.nim b/eth/utp/utp_protocol.nim index 24898f3..7fbf8f3 100644 --- a/eth/utp/utp_protocol.nim +++ b/eth/utp/utp_protocol.nim @@ -7,7 +7,7 @@ {.push raises: [Defect].} import - std/[tables, options, hashes], + std/[tables, options, hashes, sugar, math], chronos, chronicles, bearssl, ./packets, ./growable_buffer, @@ -30,6 +30,12 @@ type UtpSocketKey = object remoteAddress: TransportAddress rcvId: uint16 + + OutgoingPacket = object + packetBytes: seq[byte] + transmissions: uint16 + needResend: bool + timeSent: Moment UtpSocket* = ref object remoteAddress*: TransportAddress @@ -43,10 +49,8 @@ type # All seq number up to this havve been correctly acked by us ackNr: uint16 - # Should be completed after succesful connection to remote host. - # TODO check if nim gc handles properly cyclic references, as this future will - # contain reference to socket which hold this future. - # If that is not the case, then this future will need to be hold independly + # Should be completed after succesful connection to remote host or after timeout + # for the first syn packet connectionFuture: Future[UtpSocket] # the number of packets in the send queue. Packets that haven't @@ -55,14 +59,40 @@ type curWindowPackets: uint16 # out going buffer for all send packets - outBuffer: GrowableCircularBuffer[Packet] + outBuffer: GrowableCircularBuffer[OutgoingPacket] # incoming buffer for out of order packets inBuffer: GrowableCircularBuffer[Packet] + # current retransmit Timeout used to calculate rtoTimeout + retransmitTimeout: Duration + + # calculated round trip time during communication with remote peer + rtt: Duration + # calculated round trip time variance + rttVar: Duration + # Round trip timeout dynamicaly updated based on acks received from remote + # peer + rto: Duration + + # RTO timeout will happen when currenTime > rtoTimeout + rtoTimeout: Moment + # rcvBuffer buffer: AsyncBuffer + # loop called every 500ms to check for on going timeout status + checkTimeoutsLoop: Future[void] + + # number on consecutive re-transsmisions + retransmitCount: uint32 + + # Event which will complete whenever socket gets in destory statate + closeEvent: AsyncEvent + + # All callback to be called whenever socket gets in destroy state + closeCallbacks: seq[Future[void]] + utpProt: UtpProtocol UtpSocketsContainerRef = ref object @@ -71,6 +101,12 @@ type AckResult = enum PacketAcked, PacketAlreadyAcked, PacketNotSentYet + SocketConfig* = object + # This is configurable (in contrast to reference impl), as with standard 2 syn resends + # default timeout set to 3seconds and doubling of timeout with each re-send, it + # means that initial connection would timeout after 21s, which seems rather long + initialSynTimeout*: Duration + # For now utp protocol is tied to udp transport, but ultimatly we would like to # abstract underlying transport to be able to run utp over udp, discoveryv5 or # maybe some test transport @@ -78,14 +114,21 @@ type transport: DatagramTransport activeSockets: UtpSocketsContainerRef acceptConnectionCb: AcceptConnectionCallback + socketConfig: SocketConfig rng*: ref BrHmacDrbgContext - ## New remote client connection callback - ## ``server`` - UtpProtocol object. - ## ``client`` - accepted client utp socket. + # New remote client connection callback + # ``server`` - UtpProtocol object. + # ``client`` - accepted client utp socket. AcceptConnectionCallback* = proc(server: UtpProtocol, client: UtpSocket): Future[void] {.gcsafe, raises: [Defect].} + + # Callback to be called whenever socket is closed + SocketCloseCallback = proc (): void {.gcsafe, raises: [Defect].} + + ConnectionError* = object of CatchableError + const # Maximal number of payload bytes per packet. Total packet size will be equal to # mtuSize + sizeof(header) = 600 bytes @@ -93,12 +136,33 @@ const # adjusted based on traffic. mtuSize = 580 + # How often each socket check its different on going timers + checkTimeoutsLoopInterval = milliseconds(500) + + # Defualt initial timeout for first Syn packet + defaultInitialSynTimeout = milliseconds(3000) + + # Initial timeout to receive first Data data packet after receiving initial Syn + # packet. (TODO it should only be set when working over udp) + initialRcvRetransmitTimeout = milliseconds(10000) + proc new(T: type UtpSocketsContainerRef): T = UtpSocketsContainerRef(sockets: initTable[UtpSocketKey, UtpSocket]()) proc init(T: type UtpSocketKey, remoteAddress: TransportAddress, rcvId: uint16): T = UtpSocketKey(remoteAddress: remoteAddress, rcvId: rcvId) +proc init(T: type OutgoingPacket, packetBytes: seq[byte], transmissions: uint16, needResend: bool, timeSent: Moment = Moment.now()): T = + OutgoingPacket( + packetBytes: packetBytes, + transmissions: transmissions, + needResend: needResend, + timeSent: timeSent + ) + +proc init*(T: type SocketConfig, initialSynTimeout: Duration = defaultInitialSynTimeout): T = + SocketConfig(initialSynTimeout: initialSynTimeout) + # This should probably be defined in TransportAddress module, as hash function should # be consitent with equality function # in nim zero arrays always have hash equal to 0, irrespectively of array size, to @@ -132,6 +196,17 @@ proc hash(x: UtpSocketKey): Hash = h = h !& x.rcvId.hash !$h +proc setCloseCallback(s: UtpSocket, cb: SocketCloseCallback) {.async.} = + ## Set callback which will be called whenever the socket is permanently closed + try: + await s.closeEvent.wait() + cb() + except CancelledError: + trace "closeCallback cancelled" + +proc registerCloseCallback*(s: UtpSocket, cb: SocketCloseCallback) = + s.closeCallbacks.add(s.setCloseCallback(cb)) + proc getUtpSocket(s: UtpSocketsContainerRef, k: UtpSocketKey): Option[UtpSocket] = let s = s.sockets.getOrDefault(k) if s == nil: @@ -143,11 +218,23 @@ proc registerUtpSocket(s: UtpSocketsContainerRef, k: UtpSocketKey, socket: UtpSo # TODO Handle duplicates s.sockets[k] = socket -proc initOutgoingSocket(to: TransportAddress, p: UtpProtocol, rng: var BrHmacDrbgContext): UtpSocket = +proc deRegisterUtpSocket(s: UtpSocketsContainerRef, k: UtpSocketKey) = + s.sockets.del(k) + +iterator allSockets(s: UtpSocketsContainerRef): UtpSocket = + for socket in s.sockets.values(): + yield socket + +proc len(s: UtpSocketsContainerRef): int = + len(s.sockets) + +# TODO extract similiar code between Outgoinhg and Incoming socket initialization +proc initOutgoingSocket(to: TransportAddress, p: UtpProtocol, cfg: SocketConfig, rng: var BrHmacDrbgContext): UtpSocket = # TODO handle possible clashes and overflows let rcvConnectionId = randUint16(rng) let sndConnectionId = rcvConnectionId + 1 let initialSeqNr = randUint16(rng) + UtpSocket( remoteAddress: to, state: SynSent, @@ -155,11 +242,19 @@ proc initOutgoingSocket(to: TransportAddress, p: UtpProtocol, rng: var BrHmacDrb connectionIdSnd: sndConnectionId, seqNr: initialSeqNr, connectionFuture: newFuture[UtpSocket](), - outBuffer: GrowableCircularBuffer[Packet].init(), + outBuffer: GrowableCircularBuffer[OutgoingPacket].init(), inBuffer: GrowableCircularBuffer[Packet].init(), + retransmitTimeout: cfg.initialSynTimeout, + rtoTimeout: Moment.now() + cfg.initialSynTimeout, + # Initial timeout values taken from reference implemntation + rtt: milliseconds(0), + rttVar: milliseconds(800), + rto: milliseconds(3000), # Default 1MB buffer # TODO add posibility to configure buffer size buffer: AsyncBuffer.init(1024 * 1024), + closeEvent: newAsyncEvent(), + closeCallbacks: newSeq[Future[void]](), utpProt: p ) @@ -173,11 +268,19 @@ proc initIncomingSocket(to: TransportAddress, p: UtpProtocol, connectionId: uin seqNr: initialSeqNr, ackNr: ackNr, connectionFuture: newFuture[UtpSocket](), - outBuffer: GrowableCircularBuffer[Packet].init(), + outBuffer: GrowableCircularBuffer[OutgoingPacket].init(), inBuffer: GrowableCircularBuffer[Packet].init(), + retransmitTimeout: initialRcvRetransmitTimeout, + rtoTimeout: Moment.now() + initialRcvRetransmitTimeout, + # Initial timeout values taken from reference implemntation + rtt: milliseconds(0), + rttVar: milliseconds(800), + rto: milliseconds(3000), # Default 1MB buffer # TODO add posibility to configure buffer size buffer: AsyncBuffer.init(1024 * 1024), + closeEvent: newAsyncEvent(), + closeCallbacks: newSeq[Future[void]](), utpProt: p ) @@ -185,16 +288,68 @@ proc createAckPacket(socket: UtpSocket): Packet = ## Creates ack packet based on the socket current state ackPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576) +proc max(a, b: Duration): Duration = + if (a > b): + a + else: + b + +proc updateTimeouts(socket: UtpSocket, timeSent: Moment, currentTime: Moment) = + ## Update timeouts according to spec: + ## delta = rtt - packet_rtt + ## rtt_var += (abs(delta) - rtt_var) / 4; + ## rtt += (packet_rtt - rtt) / 8; + + let packetRtt = currentTime - timeSent + + if (socket.rtt.isZero): + socket.rtt = packetRtt + socket.rttVar = packetRtt div 2 + else: + let packetRttMicro = packetRtt.microseconds() + let rttVarMicro = socket.rttVar.microseconds() + let rttMicro = socket.rtt.microseconds() + + let delta = rttMicro - packetRttMicro + + let newVar = microseconds(rttVarMicro + (abs(delta) - rttVarMicro) div 4) + let newRtt = socket.rtt - (socket.rtt div 8) + (packetRtt div 8) + + socket.rttVar = newVar + socket.rtt = newRtt + + # according to spec it should be: timeout = max(rtt + rtt_var * 4, 500) + # but usually spec lags after implementation so milliseconds(1000) is used + socket.rto = max(socket.rtt + (socket.rttVar * 4), milliseconds(1000)) + proc ackPacket(socket: UtpSocket, seqNr: uint16): AckResult = let packetOpt = socket.outBuffer.get(seqNr) if packetOpt.isSome(): let packet = packetOpt.get() - # TODO Add number of transmision to each packet to track which packet was sent - # how many times, and handle here case when we try to ack packet which was not - # sent yet + + if packet.transmissions == 0: + # according to reference impl it can happen when we get an ack_nr that + # does not exceed what we have stuffed into the outgoing buffer, + # but does exceed what we have sent + # TODO analyze if this case can happen with our impl + return PacketNotSentYet + + let currentTime = Moment.now() + socket.outBuffer.delete(seqNr) - # TODO Update estimates about roundtrip time, when we are acking packed which - # acked without re sends + + # from spec: The rtt and rtt_var is only updated for packets that were sent only once. + # This avoids problems with figuring out which packet was acked, the first or the second one. + # it is standard solution to retransmission ambiguity problem + if packet.transmissions == 1: + socket.updateTimeouts(packet.timeSent, currentTime) + + socket.retransmitTimeout = socket.rto + socket.rtoTimeout = currentTime + socket.rto + + # TODO Add handlig of decreasing bytes window, whenadding handling of congestion control + + socket.retransmitCount = 0 PacketAcked else: # the packet has already been acked (or not sent) @@ -218,15 +373,6 @@ proc ackPackets(socket: UtpSocket, nrPacketsToAck: uint16) = proc getSocketKey(socket: UtpSocket): UtpSocketKey = UtpSocketKey.init(socket.remoteAddress, socket.connectionIdRcv) -proc initSynPacket(socket: UtpSocket): seq[byte] = - assert(socket.state == SynSent) - let packet = synPacket(socket.seqNr, socket.connectionIdRcv, 1048576) - socket.outBuffer.ensureSize(socket.seqNr, socket.curWindowPackets) - socket.outBuffer.put(socket.seqNr, packet) - inc socket.seqNr - inc socket.curWindowPackets - encodePacket(packet) - proc isConnected*(socket: UtpSocket): bool = socket.state == Connected @@ -258,21 +404,31 @@ proc sendData(socket: UtpSocket, data: seq[byte]): Future[void] = proc sendPacket(socket: UtpSocket, packet: Packet): Future[void] = socket.sendData(encodePacket(packet)) +# Should be called before flushing data onto the socket +proc setSend(p: var OutgoingPacket): seq[byte] = + inc p.transmissions + p.needResend = false + p.timeSent = Moment.now() + return p.packetBytes + proc flushPackets(socket: UtpSocket) {.async.} = var i: uint16 = socket.seqNr - socket.curWindowPackets while i != socket.seqNr: - let maybePacket = socket.outBuffer.get(i) - if (maybePacket.isSome()): - let p = maybePacket.get() - # TODO we should keep encoded packets in outgoing buffer to avoid, re-encoding - # them with each resend - await socket.sendData(encodePacket(p)) + # sending only packet which were not transmitted yet or need a resend + let shouldSendPacket = socket.outBuffer.exists(i, (p: OutgoingPacket) => (p.transmissions == 0 or p.needResend == true)) + if (shouldSendPacket): + let toSend = setSend(socket.outBuffer[i]) + await socket.sendData(toSend) inc i proc getPacketSize(socket: UtpSocket): int = # TODO currently returning constant, ultimatly it should be bases on mtu estimates mtuSize - + +proc resetSendTimeout(socket: UtpSocket) = + socket.retransmitTimeout = socket.rto + socket.rtoTimeout = Moment.now() + socket.retransmitTimeout + proc write*(socket: UtpSocket, data: seq[byte]): Future[int] {.async.} = var bytesWritten = 0 # TODO @@ -283,6 +439,9 @@ proc write*(socket: UtpSocket, data: seq[byte]): Future[int] {.async.} = if len(data) == 0: return bytesWritten + if socket.curWindowPackets == 0: + socket.resetSendTimeout() + let pSize = socket.getPacketSize() let endIndex = data.high() var i = 0 @@ -292,7 +451,7 @@ proc write*(socket: UtpSocket, data: seq[byte]): Future[int] {.async.} = let dataSlice = data[i..lastOrEnd] let dataPacket = dataPacket(socket.seqNr, socket.connectionIdSnd, socket.ackNr, 1048576, dataSlice) socket.outBuffer.ensureSize(socket.seqNr, socket.curWindowPackets) - socket.outBuffer.put(socket.seqNr, dataPacket) + socket.outBuffer.put(socket.seqNr, OutgoingPacket.init(encodePacket(dataPacket), 0, false)) inc socket.seqNr inc socket.curWindowPackets bytesWritten = bytesWritten + len(dataSlice) @@ -317,6 +476,95 @@ proc read*(socket: UtpSocket, n: Natural): Future[seq[byte]] {.async.}= return bytes +proc isOpened(socket:UtpSocket): bool = + return ( + socket.state == SynRecv or + socket.state == SynSent or + socket.state == Connected or + socket.state == ConnectedFull + ) + +proc markAllPacketAsLost(s: UtpSocket) = + var i = 0'u16 + while i < s.curWindowPackets: + + let packetSeqNr = s.seqNr - 1 - i + if (s.outBuffer.exists(packetSeqNr, (p: OutgoingPacket) => p. transmissions > 0 and p.needResend == false)): + s.outBuffer[packetSeqNr].needResend = true + # TODO here we should also decrease number of bytes in flight. This should be + # done when working on congestion control + + inc i + +proc checkTimeouts(socket: UtpSocket) {.async.} = + let currentTime = Moment.now() + # flush all packets which needs to be re-send + if socket.state != Destroy: + await socket.flushPackets() + + if socket.isOpened(): + if (currentTime > socket.rtoTimeout): + + # TODO add handling of probe time outs. Reference implemenation has mechanism + # of sending probes to determine mtu size. Probe timeouts do not count to standard + # timeouts calculations + + # client initiated connections, but did not send following data packet in rto + # time. TODO this should be configurable + if (socket.state == SynRecv): + socket.state = Destroy + socket.closeEvent.fire() + return + + if (socket.state == SynSent and socket.retransmitCount >= 2) or (socket.retransmitCount >= 4): + if socket.state == SynSent and (not socket.connectionFuture.finished()): + # TODO standard stream interface result in failed future in case of failed connections, + # but maybe it would be more clean to use result + socket.connectionFuture.fail(newException(ConnectionError, "Connection to peer timed out")) + + socket.state = Destroy + socket.closeEvent.fire() + return + + let newTimeout = socket.retransmitTimeout * 2 + socket.retransmitTimeout = newTimeout + socket.rtoTimeout = currentTime + newTimeout + + # TODO Add handling of congestion control + + # This will have much more sense when we will add handling of selective acks + # as then every selecivly acked packet restes timeout timer and removes packet + # from out buffer. + markAllPacketAsLost(socket) + + # resend oldest packet if there are some packets in flight + if (socket.curWindowPackets > 0): + notice "resending oldest packet in outBuffer" + inc socket.retransmitCount + let oldestPacketSeqNr = socket.seqNr - socket.curWindowPackets + # TODO add handling of fast timeout + + doAssert( + socket.outBuffer.get(oldestPacketSeqNr).isSome(), + "oldest packet should always be available when there is data in flight" + ) + let dataToSend = setSend(socket.outBuffer[oldestPacketSeqNr]) + await socket.sendData(dataToSend) + + # TODO add sending keep alives when necessary + +proc checkTimeoutsLoop(s: UtpSocket) {.async.} = + ## Loop that check timeoutsin the socket. + try: + while true: + await sleepAsync(checkTimeoutsLoopInterval) + await s.checkTimeouts() + except CancelledError: + trace "checkTimeoutsLoop canceled" + +proc startTimeoutLoop(s: UtpSocket) = + s.checkTimeoutsLoop = checkTimeoutsLoop(s) + proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) {.async.}= notice "Received packet ", packet = p let socketKey = UtpSocketKey.init(sender, p.header.connectionId) @@ -364,7 +612,6 @@ proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) {.asy else: # TODO handle out of order packets notice "Got out of order packet" - of ST_FIN: # TODO not implemented notice "Received ST_FIN on known socket" @@ -372,10 +619,15 @@ proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) {.asy notice "Received ST_STATE on known socket" # acks is the number of packets that was acked, in normal case - no selective # acks, no losses, no resends, it will usually be equal to 1 - let acks = pkAckNr - (socket.seqNr - 1 - socket.curWindowPackets) + var acks = pkAckNr - (socket.seqNr - 1 - socket.curWindowPackets) + + if acks > socket.curWindowPackets: + # this case happens if the we already received this ack nr + acks = 0 + socket.ackPackets(acks) - if (socket.state == SynSent): + if (socket.state == SynSent and (not socket.connectionFuture.finished())): socket.state = Connected # TODO reference implementation sets ackNr (p.header.seqNr - 1), although # spec mention that it should be equal p.header.seqNr. For now follow the @@ -401,10 +653,14 @@ proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) {.asy if (p.header.pType == ST_SYN): # Initial ackNr is set to incoming packer seqNr let incomingSocket = initIncomingSocket(sender, prot, p.header.connectionId, p.header.seqNr, prot.rng[]) - prot.activeSockets.registerUtpSocket(incomingSocket.getSocketKey(), incomingSocket) + let socketKey = incomingSocket.getSocketKey() + prot.activeSockets.registerUtpSocket(socketKey, incomingSocket) + # whenever socket get permanently closed, deregister it + incomingSocket.registerCloseCallback(proc () = prot.activeSockets.deRegisterUtpSocket(socketKey)) # Make sure ack was flushed onto datagram socket before passing connction # to upper layer await incomingSocket.sendPacket(incomingSocket.createAckPacket()) + incomingSocket.startTimeoutLoop() # TODO By default (when we have utp over udp) socket here is passed to upper layer # in SynRecv state, which is not writeable i.e user of socket cannot write # data to it unless some data will be received. This is counter measure to @@ -417,18 +673,45 @@ proc processPacket(prot: UtpProtocol, p: Packet, sender: TransportAddress) {.asy # TODO not implemented notice "Received not ST_SYN and socket is not know" +proc initSynPacket(socket: UtpSocket): OutgoingPacket = + ## creates syncPacket based on socket current state and put it in its outgoing + ## buffer + doAssert(socket.state == SynSent) + let packet = synPacket(socket.seqNr, socket.connectionIdRcv, 1048576) + # set number of transmissions to 1 as syn packet will be send just after + # initiliazation + let outgoingPacket = OutgoingPacket.init(encodePacket(packet), 1, false) + socket.outBuffer.ensureSize(socket.seqNr, socket.curWindowPackets) + socket.outBuffer.put(socket.seqNr, outgoingPacket) + inc socket.seqNr + inc socket.curWindowPackets + outgoingPacket + +proc openSockets*(p: UtpProtocol): int = + ## Returns number of currently active sockets + len(p.activeSockets) + +proc close*(s: UtpSocket) = + # TODO Rething all this when working on FIN and RESET packets and proper handling + # of resources + s.checkTimeoutsLoop.cancel() + s.closeEvent.fire() + # Connect to provided address # Reference implementation: https://github.com/bittorrent/libutp/blob/master/utp_internal.cpp#L2732 -# TODO not implemented proc connectTo*(p: UtpProtocol, address: TransportAddress): Future[UtpSocket] = - let socket = initOutgoingSocket(address, p, p.rng[]) - p.activeSockets.registerUtpSocket(socket.getSocketKey(), socket) - let synEncoded = socket.initSynPacket() - notice "Sending packet", packet = synEncoded + let socket = initOutgoingSocket(address, p, p.socketConfig, p.rng[]) + let socketKey = socket.getSocketKey() + p.activeSockets.registerUtpSocket(socketKey, socket) + # whenever socket get permanently closed, deregister it + socket.registerCloseCallback(proc () = p.activeSockets.deRegisterUtpSocket(socketKey)) + var outgoingSyn = socket.initSynPacket() + notice "Sending syn packet packet", packet = outgoingSyn # TODO add callback to handle errors and cancellation i.e unregister socket on # send error and finish connection future with failure # sending should be done from UtpSocketContext - discard socket.sendData(synEncoded) + discard socket.sendData(outgoingSyn.packetBytes) + socket.startTimeoutLoop() return socket.connectionFuture proc processDatagram(transp: DatagramTransport, raddr: TransportAddress): @@ -449,14 +732,24 @@ proc processDatagram(transp: DatagramTransport, raddr: TransportAddress): proc new*( T: type UtpProtocol, acceptConnectionCb: AcceptConnectionCallback, - address: TransportAddress, + address: TransportAddress, + socketConfig: SocketConfig = SocketConfig.init(), rng = newRng()): UtpProtocol {.raises: [Defect, CatchableError].} = doAssert(not(isNil(acceptConnectionCb))) let activeSockets = UtpSocketsContainerRef.new() - let utp = UtpProtocol(activeSockets: activeSockets, acceptConnectionCb: acceptConnectionCb, rng: rng) + let utp = UtpProtocol( + activeSockets: activeSockets, + acceptConnectionCb: acceptConnectionCb, + socketConfig: socketConfig, + rng: rng + ) let ta = newDatagramTransport(processDatagram, udata = utp, local = address) utp.transport = ta utp -proc closeWait*(p: UtpProtocol): Future[void] = - p.transport.closeWait() +proc closeWait*(p: UtpProtocol): Future[void] {.async.} = + # TODO Rething all this when working on FIN and RESET packets and proper handling + # of resources + await p.transport.closeWait() + for s in p.activeSockets.allSockets(): + s.close() diff --git a/tests/utp/test_buffer.nim b/tests/utp/test_buffer.nim index 0610036..bf942d1 100644 --- a/tests/utp/test_buffer.nim +++ b/tests/utp/test_buffer.nim @@ -7,9 +7,14 @@ {.used.} import + std/sugar, unittest, ../../eth/utp/growable_buffer + +type TestObj = object + foo: string + suite "Utp ring buffer": test "Empty buffer": let buff = GrowableCircularBuffer[int].init(size = 4) @@ -30,6 +35,39 @@ suite "Utp ring buffer": buff.get(13) == some(13) buff.get(14) == some(14) + test "Modifing existing element in buffer": + var buff = GrowableCircularBuffer[TestObj].init(size = 4) + let oldText = "test" + let newText = "testChanged" + + buff.put(11, TestObj(foo: oldText)) + + check: + buff.get(11).get() == TestObj(foo: oldText) + + buff[11].foo = newText + + check: + buff.get(11).get() == TestObj(foo: newText) + + test "Checking if element exists and has some properties": + var buff = GrowableCircularBuffer[TestObj].init(size = 4) + let text = "test" + let textIdx = 11 + + check: + not buff.exists(textIdx, x => x.foo == text) + + buff.put(textIdx, TestObj(foo: "old")) + + check: + not buff.exists(textIdx, x => x.foo == text) + + buff[textIdx].foo = text + + check: + buff.exists(textIdx, x => x.foo == text) + test "Deleting elements from buffer": var buff = GrowableCircularBuffer[int].init(size = 4) buff.put(11, 11) diff --git a/tests/utp/test_protocol.nim b/tests/utp/test_protocol.nim index c14b4ed..2784b1c 100644 --- a/tests/utp/test_protocol.nim +++ b/tests/utp/test_protocol.nim @@ -80,6 +80,54 @@ procSuite "Utp protocol tests": await utpProt1.closeWait() await utpProt2.closeWait() + asyncTest "Fail to connect to offline remote host": + let server1Called = newAsyncEvent() + let address = initTAddress("127.0.0.1", 9079) + let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address , SocketConfig.init(milliseconds(200))) + + let address1 = initTAddress("127.0.0.1", 9080) + + let fut = utpProt1.connectTo(address1) + + yield fut + + check: + fut.failed() + + await waitUntil(proc (): bool = utpProt1.openSockets() == 0) + + check: + utpProt1.openSockets() == 0 + + await utpProt1.closeWait() + + asyncTest "Success connect to remote host which initialy was offline": + let server1Called = newAsyncEvent() + let address = initTAddress("127.0.0.1", 9079) + let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address, SocketConfig.init(milliseconds(500))) + + let address1 = initTAddress("127.0.0.1", 9080) + + let futSock = utpProt1.connectTo(address1) + + # waiting 400 milisecond will trigger at least one re-send + await sleepAsync(milliseconds(400)) + + var server2Called = newAsyncEvent() + let utpProt2 = UtpProtocol.new(setAcceptedCallback(server2Called), address1) + + # this future will be completed when we called accepted connection callback + await server2Called.wait() + + yield futSock + + check: + futSock.finished() and (not futsock.failed()) and (not futsock.cancelled()) + server2Called.isSet() + + await utpProt1.closeWait() + await utpProt2.closeWait() + asyncTest "Success data transfer when data fits into one packet": var server1Called = newAsyncEvent() let address = initTAddress("127.0.0.1", 9079)