Add writing tests to test socket

This commit is contained in:
KonradStaniec 2021-11-02 12:16:19 +01:00
parent b35cf5cb48
commit c3cec420bd
No known key found for this signature in database
GPG Key ID: B3831D2703380AF7
4 changed files with 139 additions and 14 deletions

View File

@ -83,7 +83,7 @@ proc processPacket[A](r: UtpRouter[A], p: Packet, sender: A) {.async.}=
else:
notice "Received SYN for not known connection. Initiating incoming connection"
# Initial ackNr is set to incoming packer seqNr
let incomingSocket = initIncomingSocket[A](sender, r.sendCb, p.header.connectionId, p.header.seqNr, r.rng[])
let incomingSocket = initIncomingSocket[A](sender, r.sendCb, r.socketConfig ,p.header.connectionId, p.header.seqNr, r.rng[])
r.registerUtpSocket(incomingSocket)
await incomingSocket.startIncomingSocket()
# TODO By default (when we have utp over udp) socket here is passed to upper layer

View File

@ -24,6 +24,9 @@ type
Reset,
Destroy
ConnectionDirection = enum
Outgoing, Incoming
UtpSocketKey*[A] = object
remoteAddress*: A
rcvId*: uint16
@ -40,9 +43,22 @@ type
# Socket callback to send data to remote peer
SendCallback*[A] = proc (to: A, data: seq[byte]): Future[void] {.gcsafe, raises: [Defect]}
SocketConfig* = object
# This is configurable (in contrast to reference impl), as with standard 2 syn resends
# default timeout set to 3seconds and doubling of timeout with each re-send, it
# means that initial connection would timeout after 21s, which seems rather long
initialSynTimeout*: Duration
# Number of resend re-tries of each data packet, before daclaring connection
# failed
dataResendsBeforeFailure*: uint16
UtpSocket*[A] = ref object
remoteAddress*: A
state: ConnectionState
direction: ConnectionDirection
socketConfig: SocketConfig
# Connection id for packets we receive
connectionIdRcv: uint16
# Connection id for packets we send
@ -104,12 +120,6 @@ type
send: SendCallback[A]
SocketConfig* = object
# This is configurable (in contrast to reference impl), as with standard 2 syn resends
# default timeout set to 3seconds and doubling of timeout with each re-send, it
# means that initial connection would timeout after 21s, which seems rather long
initialSynTimeout*: Duration
# User driven call back to be called whenever socket is permanently closed i.e
# reaches destroy state
SocketCloseCallback* = proc (): void {.gcsafe, raises: [Defect].}
@ -133,6 +143,10 @@ const
# packet. (TODO it should only be set when working over udp)
initialRcvRetransmitTimeout = milliseconds(10000)
# Number of times each data packet will be resend before declaring connection
# dead. 4 is taken from reference implementation
defaultDataResendsBeforeFailure = 4'u16
reorderBufferMaxSize = 1024
proc init*[A](T: type UtpSocketKey, remoteAddress: A, rcvId: uint16): T =
@ -146,8 +160,15 @@ proc init(T: type OutgoingPacket, packetBytes: seq[byte], transmissions: uint16,
timeSent: timeSent
)
proc init*(T: type SocketConfig, initialSynTimeout: Duration = defaultInitialSynTimeout): T =
SocketConfig(initialSynTimeout: initialSynTimeout)
proc init*(
T: type SocketConfig,
initialSynTimeout: Duration = defaultInitialSynTimeout,
dataResendsBeforeFailure: uint16 = defaultDataResendsBeforeFailure
): T =
SocketConfig(
initialSynTimeout: initialSynTimeout,
dataResendsBeforeFailure: dataResendsBeforeFailure
)
proc registerOutgoingPacket(socket: UtpSocket, oPacket: OutgoingPacket) =
## Adds packet to outgoing buffer and updates all related fields
@ -213,6 +234,10 @@ proc isOpened(socket:UtpSocket): bool =
socket.state == ConnectedFull
)
proc shouldDisconnectFromFailedRemote(socket: UtpSocket): bool =
(socket.state == SynSent and socket.retransmitCount >= 2) or
(socket.retransmitCount >= socket.socketConfig.dataResendsBeforeFailure)
proc checkTimeouts(socket: UtpSocket) {.async.} =
let currentTime = Moment.now()
# flush all packets which needs to be re-send
@ -233,7 +258,7 @@ proc checkTimeouts(socket: UtpSocket) {.async.} =
socket.closeEvent.fire()
return
if (socket.state == SynSent and socket.retransmitCount >= 2) or (socket.retransmitCount >= 4):
if socket.shouldDisconnectFromFailedRemote():
if socket.state == SynSent and (not socket.connectionFuture.finished()):
# TODO standard stream interface result in failed future in case of failed connections,
# but maybe it would be more clean to use result
@ -287,15 +312,24 @@ proc new[A](
to: A,
snd: SendCallback[A],
state: ConnectionState,
initialTimeout: Duration,
cfg: SocketConfig,
direction: ConnectionDirection,
rcvId: uint16,
sndId: uint16,
initialSeqNr: uint16,
initialAckNr: uint16
): T =
let initialTimeout =
if direction == Outgoing:
cfg.initialSynTimeout
else :
initialRcvRetransmitTimeout
T(
remoteAddress: to,
state: state,
direction: direction,
socketConfig: cfg,
connectionIdRcv: rcvId,
connectionIdSnd: sndId,
seqNr: initialSeqNr,
@ -333,7 +367,8 @@ proc initOutgoingSocket*[A](
to,
snd,
SynSent,
cfg.initialSynTimeout,
cfg,
Outgoing,
rcvConnectionId,
sndConnectionId,
initialSeqNr,
@ -344,6 +379,7 @@ proc initOutgoingSocket*[A](
proc initIncomingSocket*[A](
to: A,
snd: SendCallback[A],
cfg: SocketConfig,
connectionId: uint16,
ackNr: uint16,
rng: var BrHmacDrbgContext
@ -354,7 +390,8 @@ proc initIncomingSocket*[A](
to,
snd,
SynRecv,
initialRcvRetransmitTimeout,
cfg,
Incoming,
connectionId + 1,
connectionId,
initialSeqNr,

View File

@ -78,6 +78,7 @@ procSuite "Utp protocol over discovery v5 tests":
check:
clientSocket.isConnected()
clientSocket.close()
await node1.closeWait()
await node2.closeWait()
@ -112,6 +113,7 @@ procSuite "Utp protocol over discovery v5 tests":
clientSocket.isConnected()
serverSocket.isConnected()
clientSocket.close()
serverSocket.close()
await node1.closeWait()
await node2.closeWait()

View File

@ -240,3 +240,89 @@ procSuite "Utp socket unit test":
check:
outgoingSocket.numPacketsInReordedBuffer() == 1
asyncTest "Writing small enough data should produce 1 data packet":
let q = newAsyncQueue[Packet]()
let initialRemoteSeq = 10'u16
let dataToWrite = @[1'u8]
let (outgoingSocket, initialPacket) = connectOutGoingSocket(initialRemoteSeq, q)
let bytesWritten = await outgoingSocket.write(dataToWrite)
check:
bytesWritten == len(dataToWrite)
let sentPacket = await q.get()
check:
outgoingSocket.numPacketsInOutGoingBuffer() == 1
sentPacket.header.pType == ST_DATA
sentPacket.header.seqNr == initialPacket.header.seqNr + 1
sentPacket.payload == dataToWrite
# ackNr in state packet, is set to sentPacket.header.seqNr which means remote
# side processed out packet
let responseAck = ackPacket(initialRemoteSeq, initialPacket.header.connectionId, sentPacket.header.seqNr, testBufferSize)
await outgoingSocket.processPacket(responseAck)
check:
outgoingSocket.numPacketsInOutGoingBuffer() == 0
asyncTest "Socket should re-send data packet configurable number of times before declaring failure":
let q = newAsyncQueue[Packet]()
let initalRemoteSeqNr = 10'u16
let outgoingSocket = initOutgoingSocket[TransportAddress](testAddress, initTestSnd(q), SocketConfig.init(milliseconds(50), 2), rng[])
await outgoingSocket.startOutgoingSocket()
let initialPacket = await q.get()
check:
initialPacket.header.pType == ST_SYN
let responseAck = ackPacket(initalRemoteSeqNr, initialPacket.header.connectionId, initialPacket.header.seqNr, testBufferSize)
await outgoingSocket.processPacket(responseAck)
check:
outgoingSocket.isConnected()
let dataToWrite = @[1'u8]
let bytesWritten = await outgoingSocket.write(dataToWrite)
check:
bytesWritten == len(dataToWrite)
let sentPacket = await q.get()
check:
outgoingSocket.numPacketsInOutGoingBuffer() == 1
sentPacket.header.pType == ST_DATA
sentPacket.header.seqNr == initialPacket.header.seqNr + 1
sentPacket.payload == dataToWrite
let reSend1 = await q.get()
check:
outgoingSocket.numPacketsInOutGoingBuffer() == 1
reSend1.header.pType == ST_DATA
reSend1.header.seqNr == initialPacket.header.seqNr + 1
reSend1.payload == dataToWrite
let reSend2 = await q.get()
check:
outgoingSocket.numPacketsInOutGoingBuffer() == 1
reSend2.header.pType == ST_DATA
reSend2.header.seqNr == initialPacket.header.seqNr + 1
reSend2.payload == dataToWrite
# next timeout will should disconnect socket
await waitUntil(proc (): bool = outgoingSocket.isConnected() == false)
check:
not outgoingSocket.isConnected()
len(q) == 0