Add handling of out of order packets (#418)

* Add handling of out of order packets
This commit is contained in:
KonradStaniec 2021-11-04 07:38:46 +01:00 committed by GitHub
parent 34bac6e703
commit d4cc42241d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 492 additions and 51 deletions

View File

@ -8,6 +8,7 @@
import import
chronos, stew/byteutils, chronos, stew/byteutils,
./utp_router,
./utp_socket, ./utp_socket,
./utp_protocol ./utp_protocol
@ -18,9 +19,9 @@ import
# 3. make # 3. make
# 4. ./ucat -ddddd -l -p 9078 - it will run utp server on port 9078 # 4. ./ucat -ddddd -l -p 9078 - it will run utp server on port 9078
when isMainModule: when isMainModule:
proc echoIncomingSocketCallBack(): AcceptConnectionCallback = proc echoIncomingSocketCallBack(): AcceptConnectionCallback[TransportAddress] =
return ( return (
proc (server: UtpProtocol, client: UtpSocket): Future[void] {.gcsafe, raises: [Defect].} = proc (server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] {.gcsafe, raises: [Defect].} =
echo "received incoming connection" echo "received incoming connection"
let fakeFuture = newFuture[void]() let fakeFuture = newFuture[void]()
fakeFuture.complete() fakeFuture.complete()
@ -40,6 +41,10 @@ when isMainModule:
discard waitFor soc.write(bytes) discard waitFor soc.write(bytes)
waitFor(sleepAsync(milliseconds(1000)))
discard waitFor soc.write(bytes)
runForever() runForever()
waitFor utpProt.closeWait() waitFor utpProt.closeWait()

View File

@ -83,7 +83,7 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}=
else: else:
notice "Received SYN for not known connection. Initiating incoming connection" notice "Received SYN for not known connection. Initiating incoming connection"
# Initial ackNr is set to incoming packer seqNr # Initial ackNr is set to incoming packer seqNr
let incomingSocket = initIncomingSocket[A](sender, r.sendCb, p.header.connectionId, p.header.seqNr, r.rng[]) let incomingSocket = initIncomingSocket[A](sender, r.sendCb, r.socketConfig ,p.header.connectionId, p.header.seqNr, r.rng[])
r.registerUtpSocket(incomingSocket) r.registerUtpSocket(incomingSocket)
await incomingSocket.startIncomingSocket() await incomingSocket.startIncomingSocket()
# TODO By default (when we have utp over udp) socket here is passed to upper layer # TODO By default (when we have utp over udp) socket here is passed to upper layer

View File

@ -24,6 +24,9 @@ type
Reset, Reset,
Destroy Destroy
ConnectionDirection = enum
Outgoing, Incoming
UtpSocketKey*[A] = object UtpSocketKey*[A] = object
remoteAddress*: A remoteAddress*: A
rcvId*: uint16 rcvId*: uint16
@ -40,9 +43,22 @@ type
# Socket callback to send data to remote peer # Socket callback to send data to remote peer
SendCallback*[A] = proc (to: A, data: seq[byte]): Future[void] {.gcsafe, raises: [Defect]} SendCallback*[A] = proc (to: A, data: seq[byte]): Future[void] {.gcsafe, raises: [Defect]}
SocketConfig* = object
# This is configurable (in contrast to reference impl), as with standard 2 syn resends
# default timeout set to 3seconds and doubling of timeout with each re-send, it
# means that initial connection would timeout after 21s, which seems rather long
initialSynTimeout*: Duration
# Number of resend re-tries of each data packet, before daclaring connection
# failed
dataResendsBeforeFailure*: uint16
UtpSocket*[A] = ref object UtpSocket*[A] = ref object
remoteAddress*: A remoteAddress*: A
state: ConnectionState state: ConnectionState
direction: ConnectionDirection
socketConfig: SocketConfig
# Connection id for packets we receive # Connection id for packets we receive
connectionIdRcv: uint16 connectionIdRcv: uint16
# Connection id for packets we send # Connection id for packets we send
@ -67,6 +83,9 @@ type
# incoming buffer for out of order packets # incoming buffer for out of order packets
inBuffer: GrowableCircularBuffer[Packet] inBuffer: GrowableCircularBuffer[Packet]
# Number of packets waiting in reorder buffer
reorderCount: uint16
# current retransmit Timeout used to calculate rtoTimeout # current retransmit Timeout used to calculate rtoTimeout
retransmitTimeout: Duration retransmitTimeout: Duration
@ -101,12 +120,6 @@ type
send: SendCallback[A] send: SendCallback[A]
SocketConfig* = object
# This is configurable (in contrast to reference impl), as with standard 2 syn resends
# default timeout set to 3seconds and doubling of timeout with each re-send, it
# means that initial connection would timeout after 21s, which seems rather long
initialSynTimeout*: Duration
# User driven call back to be called whenever socket is permanently closed i.e # User driven call back to be called whenever socket is permanently closed i.e
# reaches destroy state # reaches destroy state
SocketCloseCallback* = proc (): void {.gcsafe, raises: [Defect].} SocketCloseCallback* = proc (): void {.gcsafe, raises: [Defect].}
@ -130,6 +143,12 @@ const
# packet. (TODO it should only be set when working over udp) # packet. (TODO it should only be set when working over udp)
initialRcvRetransmitTimeout = milliseconds(10000) initialRcvRetransmitTimeout = milliseconds(10000)
# Number of times each data packet will be resend before declaring connection
# dead. 4 is taken from reference implementation
defaultDataResendsBeforeFailure = 4'u16
reorderBufferMaxSize = 1024
proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T = proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T =
UtpSocketKey[A](remoteAddress: remoteAddress, rcvId: rcvId) UtpSocketKey[A](remoteAddress: remoteAddress, rcvId: rcvId)
@ -141,8 +160,15 @@ proc init(T: type OutgoingPacket, packetBytes: seq[byte], transmissions: uint16,
timeSent: timeSent timeSent: timeSent
) )
proc init*(T: type SocketConfig, initialSynTimeout: Duration = defaultInitialSynTimeout): T = proc init*(
SocketConfig(initialSynTimeout: initialSynTimeout) T: type SocketConfig,
initialSynTimeout: Duration = defaultInitialSynTimeout,
dataResendsBeforeFailure: uint16 = defaultDataResendsBeforeFailure
): T =
SocketConfig(
initialSynTimeout: initialSynTimeout,
dataResendsBeforeFailure: dataResendsBeforeFailure
)
proc registerOutgoingPacket(socket: UtpSocket, oPacket: OutgoingPacket) = proc registerOutgoingPacket(socket: UtpSocket, oPacket: OutgoingPacket) =
## Adds packet to outgoing buffer and updates all related fields ## Adds packet to outgoing buffer and updates all related fields
@ -208,6 +234,10 @@ proc isOpened(socket:UtpSocket): bool =
socket.state == ConnectedFull socket.state == ConnectedFull
) )
proc shouldDisconnectFromFailedRemote(socket: UtpSocket): bool =
(socket.state == SynSent and socket.retransmitCount >= 2) or
(socket.retransmitCount >= socket.socketConfig.dataResendsBeforeFailure)
proc checkTimeouts(socket: UtpSocket) {.async.} = proc checkTimeouts(socket: UtpSocket) {.async.} =
let currentTime = Moment.now() let currentTime = Moment.now()
# flush all packets which needs to be re-send # flush all packets which needs to be re-send
@ -228,7 +258,7 @@ proc checkTimeouts(socket: UtpSocket) {.async.} =
socket.closeEvent.fire() socket.closeEvent.fire()
return return
if (socket.state == SynSent and socket.retransmitCount >= 2) or (socket.retransmitCount >= 4): if socket.shouldDisconnectFromFailedRemote():
if socket.state == SynSent and (not socket.connectionFuture.finished()): if socket.state == SynSent and (not socket.connectionFuture.finished()):
# TODO standard stream interface result in failed future in case of failed connections, # TODO standard stream interface result in failed future in case of failed connections,
# but maybe it would be more clean to use result # but maybe it would be more clean to use result
@ -282,15 +312,24 @@ proc new[A](
to: A, to: A,
snd: SendCallback[A], snd: SendCallback[A],
state: ConnectionState, state: ConnectionState,
initialTimeout: Duration, cfg: SocketConfig,
direction: ConnectionDirection,
rcvId: uint16, rcvId: uint16,
sndId: uint16, sndId: uint16,
initialSeqNr: uint16, initialSeqNr: uint16,
initialAckNr: uint16 initialAckNr: uint16
): T = ): T =
let initialTimeout =
if direction == Outgoing:
cfg.initialSynTimeout
else :
initialRcvRetransmitTimeout
T( T(
remoteAddress: to, remoteAddress: to,
state: state, state: state,
direction: direction,
socketConfig: cfg,
connectionIdRcv: rcvId, connectionIdRcv: rcvId,
connectionIdSnd: sndId, connectionIdSnd: sndId,
seqNr: initialSeqNr, seqNr: initialSeqNr,
@ -328,7 +367,8 @@ proc initOutgoingSocket*[A](
to, to,
snd, snd,
SynSent, SynSent,
cfg.initialSynTimeout, cfg,
Outgoing,
rcvConnectionId, rcvConnectionId,
sndConnectionId, sndConnectionId,
initialSeqNr, initialSeqNr,
@ -339,6 +379,7 @@ proc initOutgoingSocket*[A](
proc initIncomingSocket*[A]( proc initIncomingSocket*[A](
to: A, to: A,
snd: SendCallback[A], snd: SendCallback[A],
cfg: SocketConfig,
connectionId: uint16, connectionId: uint16,
ackNr: uint16, ackNr: uint16,
rng: var BrHmacDrbgContext rng: var BrHmacDrbgContext
@ -349,7 +390,8 @@ proc initIncomingSocket*[A](
to, to,
snd, snd,
SynRecv, SynRecv,
initialRcvRetransmitTimeout, cfg,
Incoming,
connectionId + 1, connectionId + 1,
connectionId, connectionId,
initialSeqNr, initialSeqNr,
@ -476,13 +518,45 @@ proc ackPackets(socket: UtpSocket, nrPacketsToAck: uint16) =
inc i inc i
proc initializeAckNr(socket: UtpSocket, packetSeqNr: uint16) =
if (socket.state == SynSent):
socket.ackNr = packetSeqNr - 1
# TODO at socket level we should handle only FIN/DATA/ACK packets. Refactor to make # TODO at socket level we should handle only FIN/DATA/ACK packets. Refactor to make
# it enforcable by type system # it enforcable by type system
# TODO re-think synchronization of this procedure, as each await inside gives control
# to scheduler which means there could be potentialy several processPacket procs
# running
proc processPacket*(socket: UtpSocket, p: Packet) {.async.} = proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
## Updates socket state based on received packet, and sends ack when necessary. ## Updates socket state based on received packet, and sends ack when necessary.
## Shoyuld be called in main packet receiving loop ## Shoyuld be called in main packet receiving loop
let pkSeqNr = p.header.seqNr let pkSeqNr = p.header.seqNr
let pkAckNr = p.header.ackNr let pkAckNr = p.header.ackNr
socket.initializeAckNr(pkSeqNr)
# number of packets past the expected
# ack_nr is the last acked, seq_nr is the
# current. Subtracring 1 makes 0 mean "this is the next expected packet"
let pastExpected = pkSeqNr - socket.ackNr - 1
# acks is the number of packets that was acked, in normal case - no selective
# acks, no losses, no resends, it will usually be equal to 1
# we can calculate it here and not only for ST_STATE packet, as each utp
# packet has info about remote side last acked packet.
var acks = pkAckNr - (socket.seqNr - 1 - socket.curWindowPackets)
if acks > socket.curWindowPackets:
# this case happens if the we already received this ack nr
acks = 0
# If packet is totally of the mark short circout the processing
if pastExpected >= reorderBufferMaxSize:
notice "Received packet is totally of the mark"
return
socket.ackPackets(acks)
case p.header.pType case p.header.pType
of ST_DATA: of ST_DATA:
# To avoid amplification attacks, server socket is in SynRecv state until # To avoid amplification attacks, server socket is in SynRecv state until
@ -493,46 +567,62 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
socket.state = Connected socket.state = Connected
notice "Received ST_DATA on known socket" notice "Received ST_DATA on known socket"
# number of packets past the expected
# ack_nr is the last acked, seq_nr is the
# current. Subtracring 1 makes 0 mean "this is the next expected packet"
let pastExpected = pkSeqNr - socket.ackNr - 1
if (pastExpected == 0): if (pastExpected == 0):
# we are getting in order data packet, we can flush data directly to the incoming buffer # we are getting in order data packet, we can flush data directly to the incoming buffer
await upload(addr socket.buffer, unsafeAddr p.payload[0], p.payload.len()) await upload(addr socket.buffer, unsafeAddr p.payload[0], p.payload.len())
# TODO handle the case when there may be some packets in incoming buffer which
# are direct extension of this packet and therefore we could pass also their
# content to upper layer. This may need to be done when handling selective
# acks.
# Bytes have been passed to upper layer, we can increase number of last # Bytes have been passed to upper layer, we can increase number of last
# acked packet # acked packet
inc socket.ackNr inc socket.ackNr
# check if the following packets are in reorder buffer
while true:
if socket.reorderCount == 0:
break
# TODO Handle case when we have reached eof becouse of fin packet
let nextPacketNum = socket.ackNr + 1
let maybePacket = socket.inBuffer.get(nextPacketNum)
if maybePacket.isNone():
break
let packet = maybePacket.unsafeGet()
await upload(addr socket.buffer, unsafeAddr packet.payload[0], packet.payload.len())
socket.inBuffer.delete(nextPacketNum)
inc socket.ackNr
dec socket.reorderCount
# TODO for now we just schedule concurrent task with ack sending. It may # TODO for now we just schedule concurrent task with ack sending. It may
# need improvement, as with this approach there is no direct control over # need improvement, as with this approach there is no direct control over
# how many concurrent tasks there are and how to cancel them when socket # how many concurrent tasks there are and how to cancel them when socket
# is closed # is closed
asyncSpawn socket.sendAck() asyncSpawn socket.sendAck()
else: else:
# TODO handle out of order packets # TODO Handle case when out of order is out of eof range
notice "Got out of order packet" notice "Got out of order packet"
# growing buffer before checking the packet is already there to avoid
# looking at older packet due to indices wrap aroud
socket.inBuffer.ensureSize(pkSeqNr + 1, pastExpected + 1)
if (socket.inBuffer.get(pkSeqNr).isSome()):
notice "packet already received"
else:
socket.inBuffer.put(pkSeqNr, p)
inc socket.reorderCount
notice "added out of order packet in reorder buffer"
# TODO for now we do not sent any ack as we do not handle selective acks
# add sending of selective acks
of ST_FIN: of ST_FIN:
# TODO not implemented # TODO not implemented
notice "Received ST_FIN on known socket" notice "Received ST_FIN on known socket"
of ST_STATE: of ST_STATE:
notice "Received ST_STATE on known socket" notice "Received ST_STATE on known socket"
# acks is the number of packets that was acked, in normal case - no selective
# acks, no losses, no resends, it will usually be equal to 1
var acks = pkAckNr - (socket.seqNr - 1 - socket.curWindowPackets)
if acks > socket.curWindowPackets:
# this case happens if the we already received this ack nr
acks = 0
socket.ackPackets(acks)
if (socket.state == SynSent and (not socket.connectionFuture.finished())): if (socket.state == SynSent and (not socket.connectionFuture.finished())):
socket.state = Connected socket.state = Connected
@ -628,3 +718,14 @@ proc numPacketsInOutGoingBuffer*(socket: UtpSocket): int =
inc num inc num
doAssert(num == int(socket.curWindowPackets)) doAssert(num == int(socket.curWindowPackets))
num num
# Check how many packets are still in the reorder buffer, usefull for tests or
# debugging.
# It throws assertion error when number of elements in buffer do not equal kept counter
proc numPacketsInReordedBuffer*(socket: UtpSocket): int =
var num = 0
for e in socket.inBUffer.items():
if e.isSome():
inc num
doAssert(num == int(socket.reorderCount))
num

View File

@ -10,4 +10,5 @@ import
./test_packets, ./test_packets,
./test_protocol, ./test_protocol,
./test_discv5_protocol, ./test_discv5_protocol,
./test_buffer ./test_buffer,
./test_utp_socket

View File

@ -78,6 +78,7 @@ procSuite "Utp protocol over discovery v5 tests":
check: check:
clientSocket.isConnected() clientSocket.isConnected()
clientSocket.close()
await node1.closeWait() await node1.closeWait()
await node2.closeWait() await node2.closeWait()
@ -112,6 +113,7 @@ procSuite "Utp protocol over discovery v5 tests":
clientSocket.isConnected() clientSocket.isConnected()
serverSocket.isConnected() serverSocket.isConnected()
clientSocket.close()
serverSocket.close()
await node1.closeWait() await node1.closeWait()
await node2.closeWait() await node2.closeWait()

View File

@ -10,16 +10,10 @@ import
sequtils, sequtils,
chronos, bearssl, chronos, bearssl,
testutils/unittests, testutils/unittests,
./test_utils,
../../eth/utp/utp_router, ../../eth/utp/utp_router,
../../eth/utp/utp_protocol, ../../eth/utp/utp_protocol,
../../eth/keys ../../eth/keys
proc generateByteArray(rng: var BrHmacDrbgContext, length: int): seq[byte] =
var bytes = newSeq[byte](length)
brHmacDrbgGenerate(rng, bytes)
return bytes
type AssertionCallback = proc(): bool {.gcsafe, raises: [Defect].}
proc setAcceptedCallback(event: AsyncEvent): AcceptConnectionCallback[TransportAddress] = proc setAcceptedCallback(event: AsyncEvent): AcceptConnectionCallback[TransportAddress] =
return ( return (
@ -36,14 +30,6 @@ proc registerIncomingSocketCallback(serverSockets: AsyncQueue): AcceptConnection
serverSockets.addLast(client) serverSockets.addLast(client)
) )
proc waitUntil(f: AssertionCallback): Future[void] {.async.} =
while true:
let res = f()
if res:
break
else:
await sleepAsync(milliseconds(50))
proc transferData(sender: UtpSocket[TransportAddress], receiver: UtpSocket[TransportAddress], data: seq[byte]): Future[seq[byte]] {.async.}= proc transferData(sender: UtpSocket[TransportAddress], receiver: UtpSocket[TransportAddress], data: seq[byte]): Future[seq[byte]] {.async.}=
let bytesWritten = await sender.write(data) let bytesWritten = await sender.write(data)
doAssert bytesWritten == len(data) doAssert bytesWritten == len(data)

18
tests/utp/test_utils.nim Normal file
View File

@ -0,0 +1,18 @@
import
chronos,
./../eth/keys
type AssertionCallback = proc(): bool {.gcsafe, raises: [Defect].}
proc generateByteArray*(rng: var BrHmacDrbgContext, length: int): seq[byte] =
var bytes = newSeq[byte](length)
brHmacDrbgGenerate(rng, bytes)
return bytes
proc waitUntil*(f: AssertionCallback): Future[void] {.async.} =
while true:
let res = f()
if res:
break
else:
await sleepAsync(milliseconds(50))

View File

@ -0,0 +1,328 @@
# Copyright (c) 2020-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.used.}
import
std/[algorithm, random],
chronos, bearssl, chronicles,
testutils/unittests,
./test_utils,
../../eth/utp/utp_router,
../../eth/utp/packets,
../../eth/keys
procSuite "Utp socket unit test":
let rng = newRng()
let testAddress = initTAddress("127.0.0.1", 9079)
let testBufferSize = 1024'u32
proc initTestSnd(q: AsyncQueue[Packet]): SendCallback[TransportAddress]=
return (
proc (to: TransportAddress, bytes: seq[byte]): Future[void] =
let p = decodePacket(bytes).get()
q.addLast(p)
)
proc generateDataPackets(
numberOfPackets: uint16,
initialSeqNr: uint16,
connectionId: uint16,
ackNr: uint16,
rng: var BrHmacDrbgContext): seq[Packet] =
let packetSize = 100
var packets = newSeq[Packet]()
var i = 0'u16
while i < numberOfPackets:
let packet = dataPacket(
initialSeqNr + i,
connectionId,
ackNr,
testBufferSize,
generateByteArray(rng, packetSize)
)
packets.add(packet)
inc i
packets
proc packetsToBytes(packets: seq[Packet]): seq[byte] =
var resultBytes = newSeq[byte]()
for p in packets:
resultBytes.add(p.payload)
return resultBytes
template connectOutGoingSocket(initialRemoteSeq: uint16, q: AsyncQueue[Packet]): (UtpSocket[TransportAddress], Packet) =
let sock1 = initOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), SocketConfig.init(), rng[])
await sock1.startOutgoingSocket()
let initialPacket = await q.get()
check:
initialPacket.header.pType == ST_SYN
let responseAck = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize)
await sock1.processPacket(responseAck)
check:
sock1.isConnected()
(sock1, initialPacket)
asyncTest "Starting outgoing socket should send Syn packet":
let q = newAsyncQueue[Packet]()
let sock1 = initOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), SocketConfig.init(), rng[])
await sock1.startOutgoingSocket()
let initialPacket = await q.get()
check:
initialPacket.header.pType == ST_SYN
asyncTest "Outgoing socket should re-send syn packet 2 times before declaring failure":
let q = newAsyncQueue[Packet]()
let sock1 = initOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), SocketConfig.init(milliseconds(100)), rng[])
await sock1.startOutgoingSocket()
let initialPacket = await q.get()
check:
initialPacket.header.pType == ST_SYN
let resentSynPacket = await q.get()
check:
resentSynPacket.header.pType == ST_SYN
let resentSynPacket1 = await q.get()
check:
resentSynPacket1.header.pType == ST_SYN
# next timeout will should disconnect socket
await waitUntil(proc (): bool = sock1.isConnected() == false)
check:
not sock1.isConnected()
asyncTest "Processing in order ack should make socket connected":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
discard connectOutGoingSocket(initialRemoteSeq, q)
asyncTest "Processing in order data packet should upload it to buffer and ack packet":
let q = newAsyncQueue[Packet]()
let initalRemoteSeqNr = 10'u16
let data = @[1'u8, 2'u8, 3'u8]
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q)
let dataP1 = dataPacket(initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize, data)
await outgoingSocket.processPacket(dataP1)
let ack1 = await q.get()
check:
ack1.header.pType == ST_STATE
ack1.header.ackNr == initalRemoteSeqNr
let receivedBytes = await outgoingSocket.read(len(data))
check:
receivedBytes == data
asyncTest "Processing out of order data packet should buffer it until receiving in order one":
# TODO test is valid until implementing selective acks
let q = newAsyncQueue[Packet]()
let initalRemoteSeqNr = 10'u16
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q)
var packets = generateDataPackets(10, initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, rng[])
let data = packetsToBytes(packets)
# start feeding packets from the last one
packets.reverse()
for p in packets:
await outgoingSocket.processPacket(p)
let ack2 = await q.get()
check:
ack2.header.pType == ST_STATE
# we are acking in one shot whole 10 packets
ack2.header.ackNr == initalRemoteSeqNr + uint16(len(packets) - 1)
let receivedData = await outgoingSocket.read(len(data))
check:
receivedData == data
asyncTest "Processing out of order data packet should ignore duplicated not ordered packets":
# TODO test is valid until implementing selective acks
let q = newAsyncQueue[Packet]()
let initalRemoteSeqNr = 10'u16
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q)
var packets = generateDataPackets(3, initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, rng[])
let data = packetsToBytes(packets)
# start feeding packets from the last one
packets.reverse()
# Process last packet additional two times, it should be ignored by processing logic
await outgoingSocket.processPacket(packets[0])
await outgoingSocket.processPacket(packets[0])
for p in packets:
await outgoingSocket.processPacket(p)
let ack2 = await q.get()
check:
ack2.header.pType == ST_STATE
# we are acking in one shot whole 10 packets
ack2.header.ackNr == initalRemoteSeqNr + uint16(len(packets) - 1)
let receivedData = await outgoingSocket.read(len(data))
check:
receivedData == data
asyncTest "Processing packets in random order":
# TODO test is valid until implementing selective acks
let q = newAsyncQueue[Packet]()
let initalRemoteSeqNr = 10'u16
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q)
var packets = generateDataPackets(30, initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, rng[])
let data = packetsToBytes(packets)
# start feeding packets from the last one
randomize()
packets.shuffle()
for p in packets:
await outgoingSocket.processPacket(p)
let receivedData = await outgoingSocket.read(len(data))
check:
# with packets totally out of order we cannont assert on acks
# as they can be fired at any point. What matters is that data is passed
# in same order as received.
receivedData == data
asyncTest "Ignoring totally out of order packet":
# TODO test is valid until implementing selective acks
let q = newAsyncQueue[Packet]()
let initalRemoteSeqNr = 10'u16
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initalRemoteSeqNr, q)
var packets = generateDataPackets(1025, initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, rng[])
await outgoingSocket.processPacket(packets[1024])
check:
outgoingSocket.numPacketsInReordedBuffer() == 0
await outgoingSocket.processPacket(packets[1023])
check:
outgoingSocket.numPacketsInReordedBuffer() == 1
asyncTest "Writing small enough data should produce 1 data packet":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
let dataToWrite = @[1'u8]
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q)
let bytesWritten = await outgoingSocket.write(dataToWrite)
check:
bytesWritten == len(dataToWrite)
let sentPacket = await q.get()
check:
outgoingSocket.numPacketsInOutGoingBuffer() == 1
sentPacket.header.pType == ST_DATA
sentPacket.header.seqNr == initialPacket.header.seqNr + 1
sentPacket.payload == dataToWrite
# ackNr in state packet, is set to sentPacket.header.seqNr which means remote
# side processed out packet
let responseAck = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, sentPacket.header.seqNr, testBufferSize)
await outgoingSocket.processPacket(responseAck)
check:
outgoingSocket.numPacketsInOutGoingBuffer() == 0
asyncTest "Socket should re-send data packet configurable number of times before declaring failure":
let q = newAsyncQueue[Packet]()
let initalRemoteSeqNr = 10'u16
let outgoingSocket = initOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), SocketConfig.init(milliseconds(50), 2), rng[])
await outgoingSocket.startOutgoingSocket()
let initialPacket = await q.get()
check:
initialPacket.header.pType == ST_SYN
let responseAck = ackPacket(initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize)
await outgoingSocket.processPacket(responseAck)
check:
outgoingSocket.isConnected()
let dataToWrite = @[1'u8]
let bytesWritten = await outgoingSocket.write(dataToWrite)
check:
bytesWritten == len(dataToWrite)
let sentPacket = await q.get()
check:
outgoingSocket.numPacketsInOutGoingBuffer() == 1
sentPacket.header.pType == ST_DATA
sentPacket.header.seqNr == initialPacket.header.seqNr + 1
sentPacket.payload == dataToWrite
let reSend1 = await q.get()
check:
outgoingSocket.numPacketsInOutGoingBuffer() == 1
reSend1.header.pType == ST_DATA
reSend1.header.seqNr == initialPacket.header.seqNr + 1
reSend1.payload == dataToWrite
let reSend2 = await q.get()
check:
outgoingSocket.numPacketsInOutGoingBuffer() == 1
reSend2.header.pType == ST_DATA
reSend2.header.seqNr == initialPacket.header.seqNr + 1
reSend2.payload == dataToWrite
# next timeout will should disconnect socket
await waitUntil(proc (): bool = outgoingSocket.isConnected() == false)
check:
not outgoingSocket.isConnected()
len(q) == 0