Clean-up, correct and clarify utp_protocol tests (#660)

This commit is contained in:
Kim De Mey 2024-01-16 18:10:26 +01:00 committed by GitHub
parent cbcd1fd307
commit e52f5fac0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 234 additions and 216 deletions

View File

@ -1,4 +1,4 @@
# Copyright (c) 2020-2021 Status Research & Development GmbH # Copyright (c) 2020-2024 Status Research & Development GmbH
# Licensed and distributed under either of # Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
@ -11,12 +11,13 @@ import
chronos, chronos,
testutils/unittests, testutils/unittests,
./test_utils, ./test_utils,
../../eth/utp/utp_router as rt,
../../eth/utp/utp_protocol,
../../eth/keys, ../../eth/keys,
../../eth/utp/[utp_router, utp_protocol],
../stubloglevel ../stubloglevel
proc setAcceptedCallback(event: AsyncEvent): AcceptConnectionCallback[TransportAddress] = proc setAcceptedCallback(
event: AsyncEvent
): AcceptConnectionCallback[TransportAddress] =
return ( return (
proc(server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] = proc(server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] =
let fut = newFuture[void]() let fut = newFuture[void]()
@ -25,21 +26,31 @@ proc setAcceptedCallback(event: AsyncEvent): AcceptConnectionCallback[TransportA
fut fut
) )
proc registerIncomingSocketCallback(serverSockets: AsyncQueue): AcceptConnectionCallback[TransportAddress] = proc registerIncomingSocketCallback(
serverSockets: AsyncQueue
): AcceptConnectionCallback[TransportAddress] =
return ( return (
proc(server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] = proc(server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] =
serverSockets.addLast(client) serverSockets.addLast(client)
) )
proc allowOneIdCallback(allowedId: uint16): AllowConnectionCallback[TransportAddress] = proc allowOneIdCallback(
allowedId: uint16
): AllowConnectionCallback[TransportAddress] =
return ( return (
proc(r: UtpRouter[TransportAddress], remoteAddress: TransportAddress, connectionId: uint16): bool = proc(r: UtpRouter[TransportAddress], remoteAddress: TransportAddress, connectionId: uint16): bool =
connectionId == allowedId connectionId == allowedId
) )
proc transferData(sender: UtpSocket[TransportAddress], receiver: UtpSocket[TransportAddress], data: seq[byte]): Future[seq[byte]] {.async.} = proc transferData(
sender: UtpSocket[TransportAddress],
receiver: UtpSocket[TransportAddress],
data: seq[byte]
): Future[seq[byte]] {.async.} =
let bytesWritten = await sender.write(data) let bytesWritten = await sender.write(data)
doAssert bytesWritten.get() == len(data) check:
bytesWritten.isOk()
bytesWritten.value() == len(data)
let received = await receiver.read(len(data)) let received = await receiver.read(len(data))
return received return received
@ -54,26 +65,27 @@ type
utp1: UtpProtocol utp1: UtpProtocol
utp2: UtpProtocol utp2: UtpProtocol
utp3: UtpProtocol utp3: UtpProtocol
clientSocket1: UtpSocket[TransportAddress] client1Socket: UtpSocket[TransportAddress]
clientSocket2: UtpSocket[TransportAddress] client2Socket: UtpSocket[TransportAddress]
serverSocket1: UtpSocket[TransportAddress] serverSocket1: UtpSocket[TransportAddress]
serverSocket2: UtpSocket[TransportAddress] serverSocket2: UtpSocket[TransportAddress]
proc initClientServerScenario(): Future[ClientServerScenario] {.async.} = proc initClientServerScenario(): Future[ClientServerScenario] {.async.} =
let q = newAsyncQueue[UtpSocket[TransportAddress]]() let
var server1Called = newAsyncEvent() server1Incoming = newAsyncEvent() # Not used
let address = initTAddress("127.0.0.1", 9079) address1 = initTAddress("127.0.0.1", 9079)
let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address) utpProto1 = UtpProtocol.new(setAcceptedCallback(server1Incoming), address1)
let address1 = initTAddress("127.0.0.1", 9080) utpProto2Sockets = newAsyncQueue[UtpSocket[TransportAddress]]()
let utpProt2 = UtpProtocol.new(registerIncomingSocketCallback(q), address1) address2 = initTAddress("127.0.0.1", 9080)
let clientSocket = await utpProt1.connectTo(address1) utpProto2 = UtpProtocol.new(registerIncomingSocketCallback(utpProto2Sockets), address2)
# this future will be completed when we called accepted connection callback
let serverSocket = await q.popFirst() clientSocket = await utpProto1.connectTo(address2)
serverSocket = await utpProto2Sockets.popFirst()
return ClientServerScenario( return ClientServerScenario(
utp1: utpProt1, utp1: utpProto1,
utp2: utpProt2, utp2: utpProto2,
clientSocket: clientSocket.get(), clientSocket: clientSocket.get(),
serverSocket: serverSocket serverSocket: serverSocket
) )
@ -84,35 +96,33 @@ proc close(s: ClientServerScenario) {.async.} =
await s.utp1.shutdownWait() await s.utp1.shutdownWait()
await s.utp2.shutdownWait() await s.utp2.shutdownWait()
proc init2ClientsServerScenario(): Future[TwoClientsServerScenario] {.async.} = proc initTwoClientsOneServerScenario(): Future[TwoClientsServerScenario] {.async.} =
var serverSockets = newAsyncQueue[UtpSocket[TransportAddress]]() let
var server1Called = newAsyncEvent() server1Incoming = newAsyncEvent() # not used
let address1 = initTAddress("127.0.0.1", 9079) address1 = initTAddress("127.0.0.1", 9079)
let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address1) utpProto1 = UtpProtocol.new(setAcceptedCallback(server1Incoming), address1)
let address2 = initTAddress("127.0.0.1", 9080) server2Incoming = newAsyncEvent() # not used
let utpProt2 = UtpProtocol.new(registerIncomingSocketCallback(serverSockets), address2) address2 = initTAddress("127.0.0.1", 9080)
utpProto2 = UtpProtocol.new(setAcceptedCallback(server2Incoming), address2)
let address3 = initTAddress("127.0.0.1", 9081) server3Sockets = newAsyncQueue[UtpSocket[TransportAddress]]()
let utpProt3 = UtpProtocol.new(registerIncomingSocketCallback(serverSockets), address3) address3 = initTAddress("127.0.0.1", 9081)
utpProto3 = UtpProtocol.new(registerIncomingSocketCallback(server3Sockets), address3)
let clientSocket1 = await utpProt1.connectTo(address2) client1Socket = await utpProto1.connectTo(address3)
let clientSocket2 = await utpProt1.connectTo(address3) client2Socket = await utpProto2.connectTo(address3)
await waitUntil(proc (): bool = len(serverSockets) == 2) await waitUntil(proc (): bool = len(server3Sockets) == 2)
# this future will be completed when we called accepted connection callback
let serverSocket1 = serverSockets[0]
let serverSocket2 = serverSockets[1]
return TwoClientsServerScenario( return TwoClientsServerScenario(
utp1: utpProt1, utp1: utpProto1,
utp2: utpProt2, utp2: utpProto2,
utp3: utpProt3, utp3: utpProto3,
clientSocket1: clientSocket1.get(), client1Socket: client1Socket.get(),
clientSocket2: clientSocket2.get(), client2Socket: client2Socket.get(),
serverSocket1: serverSocket1, serverSocket1: server3Sockets[0],
serverSocket2: serverSocket2 serverSocket2: server3Sockets[1]
) )
proc close(s: TwoClientsServerScenario) {.async.} = proc close(s: TwoClientsServerScenario) {.async.} =
@ -120,161 +130,170 @@ proc close(s: TwoClientsServerScenario) {.async.} =
await s.utp2.shutdownWait() await s.utp2.shutdownWait()
await s.utp3.shutdownWait() await s.utp3.shutdownWait()
procSuite "Utp protocol over udp tests": procSuite "uTP over UDP protocol tests":
let rng = newRng() let rng = newRng()
asyncTest "Success connect to remote host": asyncTest "Connect to remote host: test connection callback":
let server1Called = newAsyncEvent() let
let address = initTAddress("127.0.0.1", 9079) server1Incoming = newAsyncEvent()
let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address) address1 = initTAddress("127.0.0.1", 9079)
utpProto1 = UtpProtocol.new(setAcceptedCallback(server1Incoming), address1)
var server2Called = newAsyncEvent() server2Incoming = newAsyncEvent()
let address1 = initTAddress("127.0.0.1", 9080) address2 = initTAddress("127.0.0.1", 9080)
let utpProt2 = UtpProtocol.new(setAcceptedCallback(server2Called), address1) utpProto2 = UtpProtocol.new(setAcceptedCallback(server2Incoming), address2)
let sockResult = await utpProt1.connectTo(address1) socketResult = await utpProto1.connectTo(address2)
let sock = sockResult.get()
# this future will be completed when we called accepted connection callback check socketResult.isOk()
await server2Called.wait() let socket = socketResult.value()
# This future will complete when the accepted connection callback is called
await server2Incoming.wait()
check: check:
sock.isConnected() socket.isConnected()
# after successful connection outgoing buffer should be empty as syn packet # after a successful connection the outgoing buffer should be empty as
# should be correctly acked # the SYN packet should have been acked
sock.numPacketsInOutGoingBuffer() == 0 socket.numPacketsInOutGoingBuffer() == 0
server2Called.isSet() server2Incoming.isSet()
await utpProt1.shutdownWait() await utpProto1.shutdownWait()
await utpProt2.shutdownWait() await utpProto2.shutdownWait()
asyncTest "Connect to remote host: test udata pointer and use it in callback":
proc cbUserData(
server: UtpRouter[TransportAddress],
client: UtpSocket[TransportAddress]): Future[void] =
let q = getUserData[TransportAddress, AsyncQueue[UtpSocket[TransportAddress]]](server)
q.addLast(client)
proc cbUserData(server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] = let
let q = rt.getUserData[TransportAddress, AsyncQueue[UtpSocket[TransportAddress]]](server) incomingConnections1 = newAsyncQueue[UtpSocket[TransportAddress]]()
q.addLast(client) address1 = initTAddress("127.0.0.1", 9079)
utpProto1 = UtpProtocol.new(cbUserData, address1, incomingConnections1)
asyncTest "Provide user data pointer and use it in callback": incomingConnections2 = newAsyncQueue[UtpSocket[TransportAddress]]()
let incomingConnections = newAsyncQueue[UtpSocket[TransportAddress]]() address2 = initTAddress("127.0.0.1", 9080)
let address = initTAddress("127.0.0.1", 9079) utpProto2 = UtpProtocol.new(cbUserData, address2, incomingConnections2)
let utpProt1 = UtpProtocol.new(cbUserData, address, incomingConnections)
let address1 = initTAddress("127.0.0.1", 9080) socketResult = await utpProto1.connectTo(address2)
let utpProt2 = UtpProtocol.new(cbUserData, address1, incomingConnections)
let connResult = await utpProt1.connectTo(address1) check socketResult.isOk()
check: let clientSocket = socketResult.get()
connResult.isOk() # This future will complete when the accepted connection callback is called
let serverSocket = await incomingConnections2.get()
let clientSocket = connResult.get()
# this future will be completed when we called accepted connection callback
let serverSocket = await incomingConnections.get()
check: check:
clientSocket.isConnected() clientSocket.isConnected()
# after successful connection outgoing buffer should be empty as syn packet # after a successful connection the outgoing buffer should be empty as
# should be correctly acked # the SYN packet should have been acked
clientSocket.numPacketsInOutGoingBuffer() == 0 clientSocket.numPacketsInOutGoingBuffer() == 0
# Server socket is not in connected state until first data transfer
not serverSocket.isConnected() not serverSocket.isConnected()
await utpProt1.shutdownWait() await utpProto1.shutdownWait()
await utpProt2.shutdownWait() await utpProto2.shutdownWait()
asyncTest "Fail to connect to offline remote host": asyncTest "Connect to offline remote server host":
let server1Called = newAsyncEvent() let
let address = initTAddress("127.0.0.1", 9079) server1Incoming = newAsyncEvent()
let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address , nil, SocketConfig.init(milliseconds(200))) address1 = initTAddress("127.0.0.1", 9079)
utpProto1 = UtpProtocol.new(
setAcceptedCallback(server1Incoming), address1 , nil,
SocketConfig.init(milliseconds(200)))
let address1 = initTAddress("127.0.0.1", 9080) address2 = initTAddress("127.0.0.1", 9080)
let connectionResult = await utpProt1.connectTo(address1) socketResult = await utpProto1.connectTo(address2)
check: check socketResult.isErr()
connectionResult.isErr() let connectionError = socketResult.error()
check connectionError.kind == ConnectionTimedOut
let connectionError = connectionResult.error() await waitUntil(proc (): bool = utpProto1.openSockets() == 0)
check: check utpProto1.openSockets() == 0
connectionError.kind == ConnectionTimedOut
await waitUntil(proc (): bool = utpProt1.openSockets() == 0) await utpProto1.shutdownWait()
check: asyncTest "Connect to remote host which was initially offline":
utpProt1.openSockets() == 0 let
server1Incoming = newAsyncEvent()
address1 = initTAddress("127.0.0.1", 9079)
utpProto1 = UtpProtocol.new(
setAcceptedCallback(server1Incoming), address1, nil,
# Sets initial SYN timeout to 500ms
SocketConfig.init(milliseconds(500)))
await utpProt1.shutdownWait() address2 = initTAddress("127.0.0.1", 9080)
asyncTest "Success connect to remote host which initially was offline": futSock = utpProto1.connectTo(address2)
let server1Called = newAsyncEvent()
let address = initTAddress("127.0.0.1", 9079)
let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address, nil, SocketConfig.init(milliseconds(500)))
let address1 = initTAddress("127.0.0.1", 9080)
let futSock = utpProt1.connectTo(address1)
# waiting 400 millisecond will trigger at least one re-send # waiting 400 millisecond will trigger at least one re-send
await sleepAsync(milliseconds(400)) await sleepAsync(milliseconds(400))
var server2Called = newAsyncEvent() var server2Incoming = newAsyncEvent()
let utpProt2 = UtpProtocol.new(setAcceptedCallback(server2Called), address1) let utpProto2 = UtpProtocol.new(setAcceptedCallback(server2Incoming), address2)
# this future will be completed when we called accepted connection callback # This future will complete when the accepted connection callback is called
await server2Called.wait() await server2Incoming.wait()
discard (await futSock) discard (await futSock)
check: check:
futSock.finished() and (not futSock.failed()) and (not futSock.cancelled()) futSock.finished() and (not futSock.failed()) and (not futSock.cancelled())
server2Called.isSet() server2Incoming.isSet()
await utpProt1.shutdownWait() await utpProto1.shutdownWait()
await utpProt2.shutdownWait() await utpProto2.shutdownWait()
asyncTest "Success data transfer when data fits into one packet": asyncTest "Data transfer where data fits into one packet":
let s = await initClientServerScenario() let s = await initClientServerScenario()
check: check:
s.clientSocket.isConnected() s.clientSocket.isConnected()
# after successful connection outgoing buffer should be empty as syn packet # after a successful connection the outgoing buffer should be empty as
# should be correctly acked # the SYN packet should have been acked
s.clientSocket.numPacketsInOutGoingBuffer() == 0 s.clientSocket.numPacketsInOutGoingBuffer() == 0
# Server socket is not in connected state until first data transfer
# Server socket is not in connected state, until first data transfer
(not s.serverSocket.isConnected()) (not s.serverSocket.isConnected())
let bytesToTransfer = rng[].generateBytes(100) let bytesToTransfer = rng[].generateBytes(100)
let bytesReceivedFromClient = await transferData(s.clientSocket, s.serverSocket, bytesToTransfer) let bytesReceivedFromClient = await transferData(
s.clientSocket, s.serverSocket, bytesToTransfer)
check: check:
bytesToTransfer == bytesReceivedFromClient bytesToTransfer == bytesReceivedFromClient
s.serverSocket.isConnected() s.serverSocket.isConnected()
let bytesReceivedFromServer = await transferData(s.serverSocket, s.clientSocket, bytesToTransfer) let bytesReceivedFromServer = await transferData(
s.serverSocket, s.clientSocket, bytesToTransfer)
check: check:
bytesToTransfer == bytesReceivedFromServer bytesToTransfer == bytesReceivedFromServer
await s.close() await s.close()
asyncTest "Success data transfer when data need to be sliced into multiple packets": asyncTest "Data transfer where data need to be sliced into multiple packets":
let s = await initClientServerScenario() let s = await initClientServerScenario()
check: check:
s.clientSocket.isConnected() s.clientSocket.isConnected()
# after successful connection outgoing buffer should be empty as syn packet
# should be correctly acked
s.clientSocket.numPacketsInOutGoingBuffer() == 0 s.clientSocket.numPacketsInOutGoingBuffer() == 0
(not s.serverSocket.isConnected()) (not s.serverSocket.isConnected())
# 20_000 bytes is way over maximal packet size # 20_000 bytes is way over maximal packet size
let bytesToTransfer = rng[].generateBytes(20_000) let bytesToTransfer = rng[].generateBytes(20_000)
let bytesReceivedFromClient = await transferData(s.clientSocket, s.serverSocket, bytesToTransfer) let bytesReceivedFromClient = await transferData(
let bytesReceivedFromServer = await transferData(s.serverSocket, s.clientSocket, bytesToTransfer) s.clientSocket, s.serverSocket, bytesToTransfer)
let bytesReceivedFromServer = await transferData(
s.serverSocket, s.clientSocket, bytesToTransfer)
# ultimately all send packets will acked, and outgoing buffer will be empty # ultimately all send packets will acked, and outgoing buffer will be empty
await waitUntil(proc (): bool = s.clientSocket.numPacketsInOutGoingBuffer() == 0) await waitUntil(proc (): bool = s.clientSocket.numPacketsInOutGoingBuffer() == 0)
@ -289,81 +308,74 @@ procSuite "Utp protocol over udp tests":
await s.close() await s.close()
asyncTest "Success multiple data transfers when data need to be sliced into multiple packets": asyncTest "Multiple data transfers where data need to be sliced into multiple packets":
let s = await initClientServerScenario() let s = await initClientServerScenario()
check: check:
s.clientSocket.isConnected() s.clientSocket.isConnected()
# after successful connection outgoing buffer should be empty as syn packet
# should be correctly acked
s.clientSocket.numPacketsInOutGoingBuffer() == 0 s.clientSocket.numPacketsInOutGoingBuffer() == 0
const
amountOftransfers = 3
amountOfBytes = 5000
# 5000 bytes is over maximal packet size var totalBytesToTransfer: seq[byte]
let bytesToTransfer = rng[].generateBytes(5000) for i in 0..<amountOftransfers:
let bytesToTransfer = rng[].generateBytes(amountOfBytes)
let written = await s.clientSocket.write(bytesToTransfer)
let written = await s.clientSocket.write(bytesToTransfer) check:
written.isOk()
written.value() == amountOfBytes
check: totalBytesToTransfer.add(bytesToTransfer)
written.get() == len(bytesToTransfer)
let bytesToTransfer1 = rng[].generateBytes(5000) let bytesReceived = await s.serverSocket.read(amountOfBytes * amountOftransfers)
let written1 = await s.clientSocket.write(bytesToTransfer1)
check:
written1.get() == len(bytesToTransfer)
let bytesReceived = await s.serverSocket.read(len(bytesToTransfer) + len(bytesToTransfer1))
# ultimately all send packets will acked, and outgoing buffer will be empty
await waitUntil(proc (): bool = s.clientSocket.numPacketsInOutGoingBuffer() == 0) await waitUntil(proc (): bool = s.clientSocket.numPacketsInOutGoingBuffer() == 0)
check: check:
s.clientSocket.numPacketsInOutGoingBuffer() == 0 s.clientSocket.numPacketsInOutGoingBuffer() == 0
bytesToTransfer.concat(bytesToTransfer1) == bytesReceived totalBytesToTransfer == bytesReceived
await s.close() await s.close()
asyncTest "Success data transfers from multiple clients": asyncTest "Data transfers from multiple clients to one server":
let s = await init2ClientsServerScenario() let s = await initTwoClientsOneServerScenario()
check: check:
s.clientSocket1.isConnected() s.client1Socket.isConnected()
s.clientSocket2.isConnected() s.client2Socket.isConnected()
s.clientSocket1.numPacketsInOutGoingBuffer() == 0 s.client1Socket.numPacketsInOutGoingBuffer() == 0
s.clientSocket2.numPacketsInOutGoingBuffer() == 0 s.client2Socket.numPacketsInOutGoingBuffer() == 0
let numBytesToTransfer = 5000 let
let client1Data = rng[].generateBytes(numBytesToTransfer) numBytesToTransfer = 5000
let client2Data = rng[].generateBytes(numBytesToTransfer) client1Data = rng[].generateBytes(numBytesToTransfer)
client2Data = rng[].generateBytes(numBytesToTransfer)
discard s.clientSocket1.write(client1Data) discard s.client1Socket.write(client1Data)
discard s.clientSocket2.write(client2Data) discard s.client2Socket.write(client2Data)
let server1ReadBytes = await s.serverSocket1.read(numBytesToTransfer) let serverReadBytes1 = await s.serverSocket1.read(numBytesToTransfer)
let server2ReadBytes = await s.serverSocket2.read(numBytesToTransfer) let serverReadBytes2 = await s.serverSocket2.read(numBytesToTransfer)
check: check:
client1Data == server1ReadBytes client1Data == serverReadBytes1
client2Data == server2ReadBytes client2Data == serverReadBytes2
await s.close() await s.close()
asyncTest "Gracefully stop of the socket": asyncTest "Graceful stop of the socket":
let s = await initClientServerScenario() let s = await initClientServerScenario()
check: check:
s.clientSocket.isConnected() s.clientSocket.isConnected()
# after successful connection outgoing buffer should be empty as syn packet
# should be correctly acked
s.clientSocket.numPacketsInOutGoingBuffer() == 0 s.clientSocket.numPacketsInOutGoingBuffer() == 0
# Server socket is not in connected state, until first data transfer
(not s.serverSocket.isConnected()) (not s.serverSocket.isConnected())
let bytesToTransfer = rng[].generateBytes(100) let bytesToTransfer = rng[].generateBytes(100)
let bytesReceivedFromClient = await transferData(s.clientSocket, s.serverSocket, bytesToTransfer) let bytesReceivedFromClient = await transferData(
s.clientSocket, s.serverSocket, bytesToTransfer)
check: check:
bytesToTransfer == bytesReceivedFromClient bytesToTransfer == bytesReceivedFromClient
@ -385,20 +397,17 @@ procSuite "Utp protocol over udp tests":
await s.close() await s.close()
asyncTest "Reading data until eof": asyncTest "Read data until eof":
let s = await initClientServerScenario() let s = await initClientServerScenario()
check: check:
s.clientSocket.isConnected() s.clientSocket.isConnected()
# after successful connection outgoing buffer should be empty as syn packet
# should be correctly acked
s.clientSocket.numPacketsInOutGoingBuffer() == 0 s.clientSocket.numPacketsInOutGoingBuffer() == 0
# Server socket is not in connected state, until first data transfer
(not s.serverSocket.isConnected()) (not s.serverSocket.isConnected())
let bytesToTransfer1 = rng[].generateBytes(1000) let
let bytesToTransfer2 = rng[].generateBytes(1000) bytesToTransfer1 = rng[].generateBytes(1000)
let bytesToTransfer3 = rng[].generateBytes(1000) bytesToTransfer2 = rng[].generateBytes(1000)
bytesToTransfer3 = rng[].generateBytes(1000)
discard await s.clientSocket.write(bytesToTransfer1) discard await s.clientSocket.write(bytesToTransfer1)
discard await s.clientSocket.write(bytesToTransfer2) discard await s.clientSocket.write(bytesToTransfer2)
@ -415,57 +424,66 @@ procSuite "Utp protocol over udp tests":
await s.close() await s.close()
asyncTest "Accept connection only from allowed peers": asyncTest "Accept connection only from allowed peers":
let allowedId: uint16 = 10 const
let lowSynTimeout = milliseconds(500) allowedId: uint16 = 10
var serverSockets = newAsyncQueue[UtpSocket[TransportAddress]]() lowSynTimeout = milliseconds(500)
var server1Called = newAsyncEvent()
let address1 = initTAddress("127.0.0.1", 9079)
let utpProt1 =
UtpProtocol.new(setAcceptedCallback(server1Called), address1, nil, SocketConfig.init(lowSynTimeout))
let address2 = initTAddress("127.0.0.1", 9080) let
let utpProt2 = server1Incoming = newAsyncEvent() # not used
UtpProtocol.new(registerIncomingSocketCallback(serverSockets), address2, nil, SocketConfig.init(lowSynTimeout)) address1 = initTAddress("127.0.0.1", 9079)
utpProto1 =
UtpProtocol.new(
setAcceptedCallback(server1Incoming), address1, nil,
SocketConfig.init(lowSynTimeout)
)
let address3 = initTAddress("127.0.0.1", 9081) server2Incoming = newAsyncEvent() # not used
let utpProt3 = address2 = initTAddress("127.0.0.1", 9080)
UtpProtocol.new( utpProto2 =
registerIncomingSocketCallback(serverSockets), UtpProtocol.new(
address3, setAcceptedCallback(server2Incoming), address2, nil,
nil, SocketConfig.init(lowSynTimeout)
SocketConfig.init(), )
allowOneIdCallback(allowedId)
)
let allowedSocketRes = await utpProt1.connectTo(address3, allowedId) server3Sockets = newAsyncQueue[UtpSocket[TransportAddress]]()
let notAllowedSocketRes = await utpProt2.connectTo(address3, allowedId + 1) address3 = initTAddress("127.0.0.1", 9081)
utpProto3 =
UtpProtocol.new(
registerIncomingSocketCallback(server3Sockets),
address3,
nil,
SocketConfig.init(),
allowOneIdCallback(allowedId)
)
let allowedSocketRes = await utpProto1.connectTo(address3, allowedId)
let notAllowedSocketRes = await utpProto2.connectTo(address3, allowedId + 1)
check: check:
allowedSocketRes.isOk() allowedSocketRes.isOk()
notAllowedSocketRes.isErr() notAllowedSocketRes.isErr()
# remote did not allow this connection, and ultimately it did time out # remote did not allow this connection and it timed out
notAllowedSocketRes.error().kind == ConnectionTimedOut notAllowedSocketRes.error().kind == ConnectionTimedOut
let clientSocket = allowedSocketRes.get() let clientSocket = allowedSocketRes.get()
let serverSocket = await serverSockets.get() let serverSocket = await server3Sockets.get()
check: check:
clientSocket.connectionId() == allowedId clientSocket.connectionId() == allowedId
serverSocket.connectionId() == allowedId serverSocket.connectionId() == allowedId
await utpProt1.shutdownWait() await utpProto1.shutdownWait()
await utpProt2.shutdownWait() await utpProto2.shutdownWait()
await utpProt3.shutdownWait() await utpProto3.shutdownWait()
asyncTest "Success data transfer of a lot of data should increase available window on sender side": asyncTest "Data transfer of a lot of data should increase window on sender side":
let s = await initClientServerScenario() let s = await initClientServerScenario()
let startMaxWindow = 2 * s.clientSocket.getSocketConfig().payloadSize let startMaxWindow = 2 * s.clientSocket.getSocketConfig().payloadSize
check: check:
s.clientSocket.isConnected() s.clientSocket.isConnected()
# initially window has value equal to some pre configured constant # initially the window has value equal to a pre-configured constant
s.clientSocket.currentMaxWindowSize == startMaxWindow s.clientSocket.currentMaxWindowSize == startMaxWindow
# after successful connection outgoing buffer should be empty as syn packet
# should be correctly acked
s.clientSocket.numPacketsInOutGoingBuffer() == 0 s.clientSocket.numPacketsInOutGoingBuffer() == 0
(not s.serverSocket.isConnected()) (not s.serverSocket.isConnected())
@ -473,14 +491,15 @@ procSuite "Utp protocol over udp tests":
# big transfer of 50kb # big transfer of 50kb
let bytesToTransfer = rng[].generateBytes(50000) let bytesToTransfer = rng[].generateBytes(50000)
let bytesReceivedFromClient = await transferData(s.clientSocket, s.serverSocket, bytesToTransfer) let bytesReceivedFromClient = await transferData(
s.clientSocket, s.serverSocket, bytesToTransfer)
# ultimately all send packets will acked, and outgoing buffer will be empty # ultimately all send packets will be acked and the outgoing buffer will be empty
await waitUntil(proc (): bool = s.clientSocket.numPacketsInOutGoingBuffer() == 0) await waitUntil(proc (): bool = s.clientSocket.numPacketsInOutGoingBuffer() == 0)
check: check:
# we can only assert that window has grown, because specific values depends on # we can only assess that the window has grown, because the specific value
# particular timings # depends on particular timings
s.clientSocket.currentMaxWindowSize > startMaxWindow s.clientSocket.currentMaxWindowSize > startMaxWindow
s.serverSocket.isConnected() s.serverSocket.isConnected()
s.clientSocket.numPacketsInOutGoingBuffer() == 0 s.clientSocket.numPacketsInOutGoingBuffer() == 0
@ -488,16 +507,14 @@ procSuite "Utp protocol over udp tests":
await s.close() await s.close()
asyncTest "Not used socket should decay its max send window": asyncTest "Unused socket should decay its max send window":
let s = await initClientServerScenario() let s = await initClientServerScenario()
let startMaxWindow = 2 * s.clientSocket.getSocketConfig().payloadSize let startMaxWindow = 2 * s.clientSocket.getSocketConfig().payloadSize
check: check:
s.clientSocket.isConnected() s.clientSocket.isConnected()
# initially window has value equal to some pre configured constant # initially the window has value equal to a pre-configured constant
s.clientSocket.currentMaxWindowSize == startMaxWindow s.clientSocket.currentMaxWindowSize == startMaxWindow
# after successful connection outgoing buffer should be empty as syn packet
# should be correctly acked
s.clientSocket.numPacketsInOutGoingBuffer() == 0 s.clientSocket.numPacketsInOutGoingBuffer() == 0
(not s.serverSocket.isConnected()) (not s.serverSocket.isConnected())
@ -505,16 +522,17 @@ procSuite "Utp protocol over udp tests":
# big transfer of 50kb # big transfer of 50kb
let bytesToTransfer = rng[].generateBytes(50000) let bytesToTransfer = rng[].generateBytes(50000)
let bytesReceivedFromClient = await transferData(s.clientSocket, s.serverSocket, bytesToTransfer) let bytesReceivedFromClient = await transferData(
s.clientSocket, s.serverSocket, bytesToTransfer)
# ultimately all send packets will acked, and outgoing buffer will be empty # ultimately all send packets will be acked and the outgoing buffer will be empty
await waitUntil(proc (): bool = s.clientSocket.numPacketsInOutGoingBuffer() == 0) await waitUntil(proc (): bool = s.clientSocket.numPacketsInOutGoingBuffer() == 0)
let maximumMaxWindow = s.clientSocket.currentMaxWindowSize let maximumMaxWindow = s.clientSocket.currentMaxWindowSize
check: check:
# we can only assert that window has grown, because specific values depends on # we can only assess that the window has grown, because the specific value
# particular timings # depends on particular timings
maximumMaxWindow > startMaxWindow maximumMaxWindow > startMaxWindow
s.serverSocket.isConnected() s.serverSocket.isConnected()
s.clientSocket.numPacketsInOutGoingBuffer() == 0 s.clientSocket.numPacketsInOutGoingBuffer() == 0