From d4cc42241df92fc661fb9cdfe08048715db793cd Mon Sep 17 00:00:00 2001 From: KonradStaniec Date: Thu, 4 Nov 2021 07:38:46 +0100 Subject: [PATCH] Add handling of out of order packets (#418) * Add handling of out of order packets --- eth/utp/utp.nim | 9 +- eth/utp/utp_router.nim | 2 +- eth/utp/utp_socket.nim | 163 +++++++++++--- tests/utp/all_utp_tests.nim | 3 +- tests/utp/test_discv5_protocol.nim | 4 +- tests/utp/test_protocol.nim | 16 +- tests/utp/test_utils.nim | 18 ++ tests/utp/test_utp_socket.nim | 328 +++++++++++++++++++++++++++++ 8 files changed, 492 insertions(+), 51 deletions(-) create mode 100644 tests/utp/test_utils.nim create mode 100644 tests/utp/test_utp_socket.nim diff --git a/eth/utp/utp.nim b/eth/utp/utp.nim index 02004ec..6fa8e58 100644 --- a/eth/utp/utp.nim +++ b/eth/utp/utp.nim @@ -8,6 +8,7 @@ import chronos, stew/byteutils, + ./utp_router, ./utp_socket, ./utp_protocol @@ -18,9 +19,9 @@ import # 3. make # 4. ./ucat -ddddd -l -p 9078 - it will run utp server on port 9078 when isMainModule: - proc echoIncomingSocketCallBack(): AcceptConnectionCallback = + proc echoIncomingSocketCallBack(): AcceptConnectionCallback[TransportAddress] = return ( - proc (server: UtpProtocol, client: UtpSocket): Future[void] {.gcsafe, raises: [Defect].} = + proc (server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] {.gcsafe, raises: [Defect].} = echo "received incoming connection" let fakeFuture = newFuture[void]() fakeFuture.complete() @@ -40,6 +41,10 @@ when isMainModule: discard waitFor soc.write(bytes) + waitFor(sleepAsync(milliseconds(1000))) + + discard waitFor soc.write(bytes) + runForever() waitFor utpProt.closeWait() diff --git a/eth/utp/utp_router.nim b/eth/utp/utp_router.nim index 3ff09fa..7fcb266 100644 --- a/eth/utp/utp_router.nim +++ b/eth/utp/utp_router.nim @@ -83,7 +83,7 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}= else: notice "Received SYN for not known connection. Initiating incoming connection" # Initial ackNr is set to incoming packer seqNr - let incomingSocket = initIncomingSocket[A](sender, r.sendCb, p.header.connectionId, p.header.seqNr, r.rng[]) + let incomingSocket = initIncomingSocket[A](sender, r.sendCb, r.socketConfig ,p.header.connectionId, p.header.seqNr, r.rng[]) r.registerUtpSocket(incomingSocket) await incomingSocket.startIncomingSocket() # TODO By default (when we have utp over udp) socket here is passed to upper layer diff --git a/eth/utp/utp_socket.nim b/eth/utp/utp_socket.nim index c2e82db..75de7b6 100644 --- a/eth/utp/utp_socket.nim +++ b/eth/utp/utp_socket.nim @@ -24,6 +24,9 @@ type Reset, Destroy + ConnectionDirection = enum + Outgoing, Incoming + UtpSocketKey*[A] = object remoteAddress*: A rcvId*: uint16 @@ -40,9 +43,22 @@ type # Socket callback to send data to remote peer SendCallback*[A] = proc (to: A, data: seq[byte]): Future[void] {.gcsafe, raises: [Defect]} + SocketConfig* = object + # This is configurable (in contrast to reference impl), as with standard 2 syn resends + # default timeout set to 3seconds and doubling of timeout with each re-send, it + # means that initial connection would timeout after 21s, which seems rather long + initialSynTimeout*: Duration + + # Number of resend re-tries of each data packet, before daclaring connection + # failed + dataResendsBeforeFailure*: uint16 + UtpSocket*[A] = ref object remoteAddress*: A state: ConnectionState + direction: ConnectionDirection + socketConfig: SocketConfig + # Connection id for packets we receive connectionIdRcv: uint16 # Connection id for packets we send @@ -67,6 +83,9 @@ type # incoming buffer for out of order packets inBuffer: GrowableCircularBuffer[Packet] + # Number of packets waiting in reorder buffer + reorderCount: uint16 + # current retransmit Timeout used to calculate rtoTimeout retransmitTimeout: Duration @@ -101,12 +120,6 @@ type send: SendCallback[A] - SocketConfig* = object - # This is configurable (in contrast to reference impl), as with standard 2 syn resends - # default timeout set to 3seconds and doubling of timeout with each re-send, it - # means that initial connection would timeout after 21s, which seems rather long - initialSynTimeout*: Duration - # User driven call back to be called whenever socket is permanently closed i.e # reaches destroy state SocketCloseCallback* = proc (): void {.gcsafe, raises: [Defect].} @@ -130,6 +143,12 @@ const # packet. (TODO it should only be set when working over udp) initialRcvRetransmitTimeout = milliseconds(10000) + # Number of times each data packet will be resend before declaring connection + # dead. 4 is taken from reference implementation + defaultDataResendsBeforeFailure = 4'u16 + + reorderBufferMaxSize = 1024 + proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T = UtpSocketKey[A](remoteAddress: remoteAddress, rcvId: rcvId) @@ -141,8 +160,15 @@ proc init(T: type OutgoingPacket, packetBytes: seq[byte], transmissions: uint16, timeSent: timeSent ) -proc init*(T: type SocketConfig, initialSynTimeout: Duration = defaultInitialSynTimeout): T = - SocketConfig(initialSynTimeout: initialSynTimeout) +proc init*( + T: type SocketConfig, + initialSynTimeout: Duration = defaultInitialSynTimeout, + dataResendsBeforeFailure: uint16 = defaultDataResendsBeforeFailure + ): T = + SocketConfig( + initialSynTimeout: initialSynTimeout, + dataResendsBeforeFailure: dataResendsBeforeFailure + ) proc registerOutgoingPacket(socket: UtpSocket, oPacket: OutgoingPacket) = ## Adds packet to outgoing buffer and updates all related fields @@ -208,6 +234,10 @@ proc isOpened(socket:UtpSocket): bool = socket.state == ConnectedFull ) +proc shouldDisconnectFromFailedRemote(socket: UtpSocket): bool = + (socket.state == SynSent and socket.retransmitCount >= 2) or + (socket.retransmitCount >= socket.socketConfig.dataResendsBeforeFailure) + proc checkTimeouts(socket: UtpSocket) {.async.} = let currentTime = Moment.now() # flush all packets which needs to be re-send @@ -228,7 +258,7 @@ proc checkTimeouts(socket: UtpSocket) {.async.} = socket.closeEvent.fire() return - if (socket.state == SynSent and socket.retransmitCount >= 2) or (socket.retransmitCount >= 4): + if socket.shouldDisconnectFromFailedRemote(): if socket.state == SynSent and (not socket.connectionFuture.finished()): # TODO standard stream interface result in failed future in case of failed connections, # but maybe it would be more clean to use result @@ -282,15 +312,24 @@ proc new[A]( to: A, snd: SendCallback[A], state: ConnectionState, - initialTimeout: Duration, + cfg: SocketConfig, + direction: ConnectionDirection, rcvId: uint16, sndId: uint16, initialSeqNr: uint16, initialAckNr: uint16 ): T = + let initialTimeout = + if direction == Outgoing: + cfg.initialSynTimeout + else : + initialRcvRetransmitTimeout + T( remoteAddress: to, state: state, + direction: direction, + socketConfig: cfg, connectionIdRcv: rcvId, connectionIdSnd: sndId, seqNr: initialSeqNr, @@ -328,7 +367,8 @@ proc initOutgoingSocket*[A]( to, snd, SynSent, - cfg.initialSynTimeout, + cfg, + Outgoing, rcvConnectionId, sndConnectionId, initialSeqNr, @@ -339,6 +379,7 @@ proc initOutgoingSocket*[A]( proc initIncomingSocket*[A]( to: A, snd: SendCallback[A], + cfg: SocketConfig, connectionId: uint16, ackNr: uint16, rng: var BrHmacDrbgContext @@ -349,7 +390,8 @@ proc initIncomingSocket*[A]( to, snd, SynRecv, - initialRcvRetransmitTimeout, + cfg, + Incoming, connectionId + 1, connectionId, initialSeqNr, @@ -476,13 +518,45 @@ proc ackPackets(socket: UtpSocket, nrPacketsToAck: uint16) = inc i +proc initializeAckNr(socket: UtpSocket, packetSeqNr: uint16) = + if (socket.state == SynSent): + socket.ackNr = packetSeqNr - 1 + # TODO at socket level we should handle only FIN/DATA/ACK packets. Refactor to make # it enforcable by type system +# TODO re-think synchronization of this procedure, as each await inside gives control +# to scheduler which means there could be potentialy several processPacket procs +# running proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = ## Updates socket state based on received packet, and sends ack when necessary. ## Shoyuld be called in main packet receiving loop let pkSeqNr = p.header.seqNr let pkAckNr = p.header.ackNr + + socket.initializeAckNr(pkSeqNr) + + # number of packets past the expected + # ack_nr is the last acked, seq_nr is the + # current. Subtracring 1 makes 0 mean "this is the next expected packet" + let pastExpected = pkSeqNr - socket.ackNr - 1 + + # acks is the number of packets that was acked, in normal case - no selective + # acks, no losses, no resends, it will usually be equal to 1 + # we can calculate it here and not only for ST_STATE packet, as each utp + # packet has info about remote side last acked packet. + var acks = pkAckNr - (socket.seqNr - 1 - socket.curWindowPackets) + + if acks > socket.curWindowPackets: + # this case happens if the we already received this ack nr + acks = 0 + + # If packet is totally of the mark short circout the processing + if pastExpected >= reorderBufferMaxSize: + notice "Received packet is totally of the mark" + return + + socket.ackPackets(acks) + case p.header.pType of ST_DATA: # To avoid amplification attacks, server socket is in SynRecv state until @@ -493,46 +567,62 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = socket.state = Connected notice "Received ST_DATA on known socket" - # number of packets past the expected - # ack_nr is the last acked, seq_nr is the - # current. Subtracring 1 makes 0 mean "this is the next expected packet" - let pastExpected = pkSeqNr - socket.ackNr - 1 if (pastExpected == 0): # we are getting in order data packet, we can flush data directly to the incoming buffer await upload(addr socket.buffer, unsafeAddr p.payload[0], p.payload.len()) - # TODO handle the case when there may be some packets in incoming buffer which - # are direct extension of this packet and therefore we could pass also their - # content to upper layer. This may need to be done when handling selective - # acks. - # Bytes have been passed to upper layer, we can increase number of last # acked packet inc socket.ackNr + # check if the following packets are in reorder buffer + while true: + if socket.reorderCount == 0: + break + + # TODO Handle case when we have reached eof becouse of fin packet + let nextPacketNum = socket.ackNr + 1 + let maybePacket = socket.inBuffer.get(nextPacketNum) + + if maybePacket.isNone(): + break + + let packet = maybePacket.unsafeGet() + + await upload(addr socket.buffer, unsafeAddr packet.payload[0], packet.payload.len()) + + socket.inBuffer.delete(nextPacketNum) + + inc socket.ackNr + dec socket.reorderCount + # TODO for now we just schedule concurrent task with ack sending. It may # need improvement, as with this approach there is no direct control over # how many concurrent tasks there are and how to cancel them when socket # is closed asyncSpawn socket.sendAck() else: - # TODO handle out of order packets + # TODO Handle case when out of order is out of eof range notice "Got out of order packet" + + # growing buffer before checking the packet is already there to avoid + # looking at older packet due to indices wrap aroud + socket.inBuffer.ensureSize(pkSeqNr + 1, pastExpected + 1) + + if (socket.inBuffer.get(pkSeqNr).isSome()): + notice "packet already received" + else: + socket.inBuffer.put(pkSeqNr, p) + inc socket.reorderCount + notice "added out of order packet in reorder buffer" + # TODO for now we do not sent any ack as we do not handle selective acks + # add sending of selective acks of ST_FIN: # TODO not implemented notice "Received ST_FIN on known socket" of ST_STATE: notice "Received ST_STATE on known socket" - # acks is the number of packets that was acked, in normal case - no selective - # acks, no losses, no resends, it will usually be equal to 1 - var acks = pkAckNr - (socket.seqNr - 1 - socket.curWindowPackets) - - if acks > socket.curWindowPackets: - # this case happens if the we already received this ack nr - acks = 0 - - socket.ackPackets(acks) if (socket.state == SynSent and (not socket.connectionFuture.finished())): socket.state = Connected @@ -628,3 +718,14 @@ proc numPacketsInOutGoingBuffer*(socket: UtpSocket): int = inc num doAssert(num == int(socket.curWindowPackets)) num + +# Check how many packets are still in the reorder buffer, usefull for tests or +# debugging. +# It throws assertion error when number of elements in buffer do not equal kept counter +proc numPacketsInReordedBuffer*(socket: UtpSocket): int = + var num = 0 + for e in socket.inBUffer.items(): + if e.isSome(): + inc num + doAssert(num == int(socket.reorderCount)) + num diff --git a/tests/utp/all_utp_tests.nim b/tests/utp/all_utp_tests.nim index a9ff4f2..d1a1feb 100644 --- a/tests/utp/all_utp_tests.nim +++ b/tests/utp/all_utp_tests.nim @@ -10,4 +10,5 @@ import ./test_packets, ./test_protocol, ./test_discv5_protocol, - ./test_buffer + ./test_buffer, + ./test_utp_socket diff --git a/tests/utp/test_discv5_protocol.nim b/tests/utp/test_discv5_protocol.nim index bb192ed..2200bbd 100644 --- a/tests/utp/test_discv5_protocol.nim +++ b/tests/utp/test_discv5_protocol.nim @@ -78,6 +78,7 @@ procSuite "Utp protocol over discovery v5 tests": check: clientSocket.isConnected() + clientSocket.close() await node1.closeWait() await node2.closeWait() @@ -112,6 +113,7 @@ procSuite "Utp protocol over discovery v5 tests": clientSocket.isConnected() serverSocket.isConnected() - + clientSocket.close() + serverSocket.close() await node1.closeWait() await node2.closeWait() diff --git a/tests/utp/test_protocol.nim b/tests/utp/test_protocol.nim index 4f15c8e..08cd279 100644 --- a/tests/utp/test_protocol.nim +++ b/tests/utp/test_protocol.nim @@ -10,16 +10,10 @@ import sequtils, chronos, bearssl, testutils/unittests, + ./test_utils, ../../eth/utp/utp_router, ../../eth/utp/utp_protocol, ../../eth/keys - -proc generateByteArray(rng: var BrHmacDrbgContext, length: int): seq[byte] = - var bytes = newSeq[byte](length) - brHmacDrbgGenerate(rng, bytes) - return bytes - -type AssertionCallback = proc(): bool {.gcsafe, raises: [Defect].} proc setAcceptedCallback(event: AsyncEvent): AcceptConnectionCallback[TransportAddress] = return ( @@ -36,14 +30,6 @@ proc registerIncomingSocketCallback(serverSockets: AsyncQueue): AcceptConnection serverSockets.addLast(client) ) -proc waitUntil(f: AssertionCallback): Future[void] {.async.} = - while true: - let res = f() - if res: - break - else: - await sleepAsync(milliseconds(50)) - proc transferData(sender: UtpSocket[TransportAddress], receiver: UtpSocket[TransportAddress], data: seq[byte]): Future[seq[byte]] {.async.}= let bytesWritten = await sender.write(data) doAssert bytesWritten == len(data) diff --git a/tests/utp/test_utils.nim b/tests/utp/test_utils.nim new file mode 100644 index 0000000..d20e2d7 --- /dev/null +++ b/tests/utp/test_utils.nim @@ -0,0 +1,18 @@ +import + chronos, + ./../eth/keys + +type AssertionCallback = proc(): bool {.gcsafe, raises: [Defect].} + +proc generateByteArray*(rng: var BrHmacDrbgContext, length: int): seq[byte] = + var bytes = newSeq[byte](length) + brHmacDrbgGenerate(rng, bytes) + return bytes + +proc waitUntil*(f: AssertionCallback): Future[void] {.async.} = + while true: + let res = f() + if res: + break + else: + await sleepAsync(milliseconds(50)) diff --git a/tests/utp/test_utp_socket.nim b/tests/utp/test_utp_socket.nim new file mode 100644 index 0000000..657bc52 --- /dev/null +++ b/tests/utp/test_utp_socket.nim @@ -0,0 +1,328 @@ +# Copyright (c) 2020-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.used.} + +import + std/[algorithm, random], + chronos, bearssl, chronicles, + testutils/unittests, + ./test_utils, + ../../eth/utp/utp_router, + ../../eth/utp/packets, + ../../eth/keys + +procSuite "Utp socket unit test": + let rng = newRng() + let testAddress = initTAddress("127.0.0.1", 9079) + let testBufferSize = 1024'u32 + + proc initTestSnd(q: AsyncQueue[Packet]): SendCallback[TransportAddress]= + return ( + proc (to: TransportAddress, bytes: seq[byte]): Future[void] = + let p = decodePacket(bytes).get() + q.addLast(p) + ) + + proc generateDataPackets( + numberOfPackets: uint16, + initialSeqNr: uint16, + connectionId: uint16, + ackNr: uint16, + rng: var BrHmacDrbgContext): seq[Packet] = + let packetSize = 100 + var packets = newSeq[Packet]() + var i = 0'u16 + while i < numberOfPackets: + let packet = dataPacket( + initialSeqNr + i, + connectionId, + ackNr, + testBufferSize, + generateByteArray(rng, packetSize) + ) + packets.add(packet) + + inc i + + packets + + proc packetsToBytes(packets: seq[Packet]): seq[byte] = + var resultBytes = newSeq[byte]() + for p in packets: + resultBytes.add(p.payload) + return resultBytes + + template connectOutGoingSocket(initialRemoteSeq: uint16, q: AsyncQueue[Packet]): (UtpSocket[TransportAddress], Packet) = + let sock1 = initOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), SocketConfig.init(), rng[]) + await sock1.startOutgoingSocket() + let initialPacket = await q.get() + + check: + initialPacket.header.pType == ST_SYN + + let responseAck = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize) + + await sock1.processPacket(responseAck) + + check: + sock1.isConnected() + + (sock1, initialPacket) + + asyncTest "Starting outgoing socket should send Syn packet": + let q = newAsyncQueue[Packet]() + let sock1 = initOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), SocketConfig.init(), rng[]) + await sock1.startOutgoingSocket() + let initialPacket = await q.get() + + check: + initialPacket.header.pType == ST_SYN + + asyncTest "Outgoing socket should re-send syn packet 2 times before declaring failure": + let q = newAsyncQueue[Packet]() + let sock1 = initOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), SocketConfig.init(milliseconds(100)), rng[]) + await sock1.startOutgoingSocket() + let initialPacket = await q.get() + + check: + initialPacket.header.pType == ST_SYN + + let resentSynPacket = await q.get() + + check: + resentSynPacket.header.pType == ST_SYN + + let resentSynPacket1 = await q.get() + + check: + resentSynPacket1.header.pType == ST_SYN + + # next timeout will should disconnect socket + await waitUntil(proc (): bool = sock1.isConnected() == false) + + check: + not sock1.isConnected() + + asyncTest "Processing in order ack should make socket connected": + let q = newAsyncQueue[Packet]() + let initialRemoteSeq = 10'u16 + + discard connectOutGoingSocket(initialRemoteSeq, q) + + asyncTest "Processing in order data packet should upload it to buffer and ack packet": + let q = newAsyncQueue[Packet]() + let initalRemoteSeqNr = 10'u16 + let data = @[1'u8, 2'u8, 3'u8] + + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q) + + let dataP1 = dataPacket(initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data) + + await outgoingSocket.processPacket(dataP1) + let ack1 = await q.get() + + check: + ack1.header.pType == ST_STATE + ack1.header.ackNr == initalRemoteSeqNr + + let receivedBytes = await outgoingSocket.read(len(data)) + + check: + receivedBytes == data + + asyncTest "Processing out of order data packet should buffer it until receiving in order one": + # TODO test is valid until implementing selective acks + let q = newAsyncQueue[Packet]() + let initalRemoteSeqNr = 10'u16 + + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q) + + var packets = generateDataPackets(10, initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, rng[]) + + let data = packetsToBytes(packets) + + # start feeding packets from the last one + packets.reverse() + + for p in packets: + await outgoingSocket.processPacket(p) + + let ack2 = await q.get() + + check: + ack2.header.pType == ST_STATE + # we are acking in one shot whole 10 packets + ack2.header.ackNr == initalRemoteSeqNr + uint16(len(packets) - 1) + + let receivedData = await outgoingSocket.read(len(data)) + + check: + receivedData == data + + asyncTest "Processing out of order data packet should ignore duplicated not ordered packets": + # TODO test is valid until implementing selective acks + let q = newAsyncQueue[Packet]() + let initalRemoteSeqNr = 10'u16 + + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q) + + var packets = generateDataPackets(3, initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, rng[]) + + let data = packetsToBytes(packets) + + # start feeding packets from the last one + packets.reverse() + + # Process last packet additional two times, it should be ignored by processing logic + await outgoingSocket.processPacket(packets[0]) + await outgoingSocket.processPacket(packets[0]) + + for p in packets: + await outgoingSocket.processPacket(p) + + let ack2 = await q.get() + + check: + ack2.header.pType == ST_STATE + # we are acking in one shot whole 10 packets + ack2.header.ackNr == initalRemoteSeqNr + uint16(len(packets) - 1) + + let receivedData = await outgoingSocket.read(len(data)) + + check: + receivedData == data + + asyncTest "Processing packets in random order": + # TODO test is valid until implementing selective acks + let q = newAsyncQueue[Packet]() + let initalRemoteSeqNr = 10'u16 + + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q) + + var packets = generateDataPackets(30, initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, rng[]) + + let data = packetsToBytes(packets) + + # start feeding packets from the last one + randomize() + packets.shuffle() + + for p in packets: + await outgoingSocket.processPacket(p) + + let receivedData = await outgoingSocket.read(len(data)) + + check: + # with packets totally out of order we cannont assert on acks + # as they can be fired at any point. What matters is that data is passed + # in same order as received. + receivedData == data + + asyncTest "Ignoring totally out of order packet": + # TODO test is valid until implementing selective acks + let q = newAsyncQueue[Packet]() + let initalRemoteSeqNr = 10'u16 + + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q) + + var packets = generateDataPackets(1025, initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, rng[]) + + await outgoingSocket.processPacket(packets[1024]) + + check: + outgoingSocket.numPacketsInReordedBuffer() == 0 + + await outgoingSocket.processPacket(packets[1023]) + + check: + outgoingSocket.numPacketsInReordedBuffer() == 1 + + asyncTest "Writing small enough data should produce 1 data packet": + let q = newAsyncQueue[Packet]() + let initialRemoteSeq = 10'u16 + + let dataToWrite = @[1'u8] + + let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q) + + let bytesWritten = await outgoingSocket.write(dataToWrite) + + check: + bytesWritten == len(dataToWrite) + + let sentPacket = await q.get() + + check: + outgoingSocket.numPacketsInOutGoingBuffer() == 1 + sentPacket.header.pType == ST_DATA + sentPacket.header.seqNr == initialPacket.header.seqNr + 1 + sentPacket.payload == dataToWrite + + # ackNr in state packet, is set to sentPacket.header.seqNr which means remote + # side processed out packet + let responseAck = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, sentPacket.header.seqNr, testBufferSize) + + await outgoingSocket.processPacket(responseAck) + + check: + outgoingSocket.numPacketsInOutGoingBuffer() == 0 + + asyncTest "Socket should re-send data packet configurable number of times before declaring failure": + let q = newAsyncQueue[Packet]() + let initalRemoteSeqNr = 10'u16 + + let outgoingSocket = initOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), SocketConfig.init(milliseconds(50), 2), rng[]) + await outgoingSocket.startOutgoingSocket() + let initialPacket = await q.get() + + check: + initialPacket.header.pType == ST_SYN + + let responseAck = ackPacket(initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize) + + await outgoingSocket.processPacket(responseAck) + + check: + outgoingSocket.isConnected() + + let dataToWrite = @[1'u8] + + let bytesWritten = await outgoingSocket.write(dataToWrite) + + check: + bytesWritten == len(dataToWrite) + + let sentPacket = await q.get() + + check: + outgoingSocket.numPacketsInOutGoingBuffer() == 1 + sentPacket.header.pType == ST_DATA + sentPacket.header.seqNr == initialPacket.header.seqNr + 1 + sentPacket.payload == dataToWrite + + let reSend1 = await q.get() + + check: + outgoingSocket.numPacketsInOutGoingBuffer() == 1 + reSend1.header.pType == ST_DATA + reSend1.header.seqNr == initialPacket.header.seqNr + 1 + reSend1.payload == dataToWrite + + let reSend2 = await q.get() + + check: + outgoingSocket.numPacketsInOutGoingBuffer() == 1 + reSend2.header.pType == ST_DATA + reSend2.header.seqNr == initialPacket.header.seqNr + 1 + reSend2.payload == dataToWrite + + # next timeout will should disconnect socket + await waitUntil(proc (): bool = outgoingSocket.isConnected() == false) + + check: + not outgoingSocket.isConnected() + len(q) == 0