From e52f5fac0af42fbec5476f89cca8e35eaff3b2f9 Mon Sep 17 00:00:00 2001 From: Kim De Mey Date: Tue, 16 Jan 2024 18:10:26 +0100 Subject: [PATCH] Clean-up, correct and clarify utp_protocol tests (#660) --- tests/utp/test_protocol.nim | 450 +++++++++++++++++++----------------- 1 file changed, 234 insertions(+), 216 deletions(-) diff --git a/tests/utp/test_protocol.nim b/tests/utp/test_protocol.nim index 31d63ae..939176a 100644 --- a/tests/utp/test_protocol.nim +++ b/tests/utp/test_protocol.nim @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021 Status Research & Development GmbH +# Copyright (c) 2020-2024 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -11,12 +11,13 @@ import chronos, testutils/unittests, ./test_utils, - ../../eth/utp/utp_router as rt, - ../../eth/utp/utp_protocol, ../../eth/keys, + ../../eth/utp/[utp_router, utp_protocol], ../stubloglevel -proc setAcceptedCallback(event: AsyncEvent): AcceptConnectionCallback[TransportAddress] = +proc setAcceptedCallback( + event: AsyncEvent + ): AcceptConnectionCallback[TransportAddress] = return ( proc(server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] = let fut = newFuture[void]() @@ -25,21 +26,31 @@ proc setAcceptedCallback(event: AsyncEvent): AcceptConnectionCallback[TransportA fut ) -proc registerIncomingSocketCallback(serverSockets: AsyncQueue): AcceptConnectionCallback[TransportAddress] = +proc registerIncomingSocketCallback( + serverSockets: AsyncQueue + ): AcceptConnectionCallback[TransportAddress] = return ( proc(server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] = serverSockets.addLast(client) ) -proc allowOneIdCallback(allowedId: uint16): AllowConnectionCallback[TransportAddress] = +proc allowOneIdCallback( + allowedId: uint16 + ): AllowConnectionCallback[TransportAddress] = return ( proc(r: UtpRouter[TransportAddress], remoteAddress: TransportAddress, connectionId: uint16): bool = connectionId == allowedId ) -proc transferData(sender: UtpSocket[TransportAddress], receiver: UtpSocket[TransportAddress], data: seq[byte]): Future[seq[byte]] {.async.} = +proc transferData( + sender: UtpSocket[TransportAddress], + receiver: UtpSocket[TransportAddress], + data: seq[byte] + ): Future[seq[byte]] {.async.} = let bytesWritten = await sender.write(data) - doAssert bytesWritten.get() == len(data) + check: + bytesWritten.isOk() + bytesWritten.value() == len(data) let received = await receiver.read(len(data)) return received @@ -54,26 +65,27 @@ type utp1: UtpProtocol utp2: UtpProtocol utp3: UtpProtocol - clientSocket1: UtpSocket[TransportAddress] - clientSocket2: UtpSocket[TransportAddress] + client1Socket: UtpSocket[TransportAddress] + client2Socket: UtpSocket[TransportAddress] serverSocket1: UtpSocket[TransportAddress] serverSocket2: UtpSocket[TransportAddress] proc initClientServerScenario(): Future[ClientServerScenario] {.async.} = - let q = newAsyncQueue[UtpSocket[TransportAddress]]() - var server1Called = newAsyncEvent() - let address = initTAddress("127.0.0.1", 9079) - let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address) + let + server1Incoming = newAsyncEvent() # Not used + address1 = initTAddress("127.0.0.1", 9079) + utpProto1 = UtpProtocol.new(setAcceptedCallback(server1Incoming), address1) - let address1 = initTAddress("127.0.0.1", 9080) - let utpProt2 = UtpProtocol.new(registerIncomingSocketCallback(q), address1) - let clientSocket = await utpProt1.connectTo(address1) - # this future will be completed when we called accepted connection callback - let serverSocket = await q.popFirst() + utpProto2Sockets = newAsyncQueue[UtpSocket[TransportAddress]]() + address2 = initTAddress("127.0.0.1", 9080) + utpProto2 = UtpProtocol.new(registerIncomingSocketCallback(utpProto2Sockets), address2) + + clientSocket = await utpProto1.connectTo(address2) + serverSocket = await utpProto2Sockets.popFirst() return ClientServerScenario( - utp1: utpProt1, - utp2: utpProt2, + utp1: utpProto1, + utp2: utpProto2, clientSocket: clientSocket.get(), serverSocket: serverSocket ) @@ -84,35 +96,33 @@ proc close(s: ClientServerScenario) {.async.} = await s.utp1.shutdownWait() await s.utp2.shutdownWait() -proc init2ClientsServerScenario(): Future[TwoClientsServerScenario] {.async.} = - var serverSockets = newAsyncQueue[UtpSocket[TransportAddress]]() - var server1Called = newAsyncEvent() - let address1 = initTAddress("127.0.0.1", 9079) - let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address1) +proc initTwoClientsOneServerScenario(): Future[TwoClientsServerScenario] {.async.} = + let + server1Incoming = newAsyncEvent() # not used + address1 = initTAddress("127.0.0.1", 9079) + utpProto1 = UtpProtocol.new(setAcceptedCallback(server1Incoming), address1) - let address2 = initTAddress("127.0.0.1", 9080) - let utpProt2 = UtpProtocol.new(registerIncomingSocketCallback(serverSockets), address2) + server2Incoming = newAsyncEvent() # not used + address2 = initTAddress("127.0.0.1", 9080) + utpProto2 = UtpProtocol.new(setAcceptedCallback(server2Incoming), address2) - let address3 = initTAddress("127.0.0.1", 9081) - let utpProt3 = UtpProtocol.new(registerIncomingSocketCallback(serverSockets), address3) + server3Sockets = newAsyncQueue[UtpSocket[TransportAddress]]() + address3 = initTAddress("127.0.0.1", 9081) + utpProto3 = UtpProtocol.new(registerIncomingSocketCallback(server3Sockets), address3) - let clientSocket1 = await utpProt1.connectTo(address2) - let clientSocket2 = await utpProt1.connectTo(address3) + client1Socket = await utpProto1.connectTo(address3) + client2Socket = await utpProto2.connectTo(address3) - await waitUntil(proc (): bool = len(serverSockets) == 2) - - # this future will be completed when we called accepted connection callback - let serverSocket1 = serverSockets[0] - let serverSocket2 = serverSockets[1] + await waitUntil(proc (): bool = len(server3Sockets) == 2) return TwoClientsServerScenario( - utp1: utpProt1, - utp2: utpProt2, - utp3: utpProt3, - clientSocket1: clientSocket1.get(), - clientSocket2: clientSocket2.get(), - serverSocket1: serverSocket1, - serverSocket2: serverSocket2 + utp1: utpProto1, + utp2: utpProto2, + utp3: utpProto3, + client1Socket: client1Socket.get(), + client2Socket: client2Socket.get(), + serverSocket1: server3Sockets[0], + serverSocket2: server3Sockets[1] ) proc close(s: TwoClientsServerScenario) {.async.} = @@ -120,161 +130,170 @@ proc close(s: TwoClientsServerScenario) {.async.} = await s.utp2.shutdownWait() await s.utp3.shutdownWait() -procSuite "Utp protocol over udp tests": +procSuite "uTP over UDP protocol tests": let rng = newRng() - asyncTest "Success connect to remote host": - let server1Called = newAsyncEvent() - let address = initTAddress("127.0.0.1", 9079) - let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address) + asyncTest "Connect to remote host: test connection callback": + let + server1Incoming = newAsyncEvent() + address1 = initTAddress("127.0.0.1", 9079) + utpProto1 = UtpProtocol.new(setAcceptedCallback(server1Incoming), address1) - var server2Called = newAsyncEvent() - let address1 = initTAddress("127.0.0.1", 9080) - let utpProt2 = UtpProtocol.new(setAcceptedCallback(server2Called), address1) + server2Incoming = newAsyncEvent() + address2 = initTAddress("127.0.0.1", 9080) + utpProto2 = UtpProtocol.new(setAcceptedCallback(server2Incoming), address2) - let sockResult = await utpProt1.connectTo(address1) - let sock = sockResult.get() - # this future will be completed when we called accepted connection callback - await server2Called.wait() + socketResult = await utpProto1.connectTo(address2) + + check socketResult.isOk() + let socket = socketResult.value() + + # This future will complete when the accepted connection callback is called + await server2Incoming.wait() check: - sock.isConnected() - # after successful connection outgoing buffer should be empty as syn packet - # should be correctly acked - sock.numPacketsInOutGoingBuffer() == 0 + socket.isConnected() + # after a successful connection the outgoing buffer should be empty as + # the SYN packet should have been acked + socket.numPacketsInOutGoingBuffer() == 0 - server2Called.isSet() + server2Incoming.isSet() - await utpProt1.shutdownWait() - await utpProt2.shutdownWait() + await utpProto1.shutdownWait() + await utpProto2.shutdownWait() + asyncTest "Connect to remote host: test udata pointer and use it in callback": + proc cbUserData( + server: UtpRouter[TransportAddress], + client: UtpSocket[TransportAddress]): Future[void] = + let q = getUserData[TransportAddress, AsyncQueue[UtpSocket[TransportAddress]]](server) + q.addLast(client) - proc cbUserData(server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] = - let q = rt.getUserData[TransportAddress, AsyncQueue[UtpSocket[TransportAddress]]](server) - q.addLast(client) + let + incomingConnections1 = newAsyncQueue[UtpSocket[TransportAddress]]() + address1 = initTAddress("127.0.0.1", 9079) + utpProto1 = UtpProtocol.new(cbUserData, address1, incomingConnections1) - asyncTest "Provide user data pointer and use it in callback": - let incomingConnections = newAsyncQueue[UtpSocket[TransportAddress]]() - let address = initTAddress("127.0.0.1", 9079) - let utpProt1 = UtpProtocol.new(cbUserData, address, incomingConnections) + incomingConnections2 = newAsyncQueue[UtpSocket[TransportAddress]]() + address2 = initTAddress("127.0.0.1", 9080) + utpProto2 = UtpProtocol.new(cbUserData, address2, incomingConnections2) - let address1 = initTAddress("127.0.0.1", 9080) - let utpProt2 = UtpProtocol.new(cbUserData, address1, incomingConnections) + socketResult = await utpProto1.connectTo(address2) - let connResult = await utpProt1.connectTo(address1) + check socketResult.isOk() - check: - connResult.isOk() - - let clientSocket = connResult.get() - # this future will be completed when we called accepted connection callback - let serverSocket = await incomingConnections.get() + let clientSocket = socketResult.get() + # This future will complete when the accepted connection callback is called + let serverSocket = await incomingConnections2.get() check: clientSocket.isConnected() - # after successful connection outgoing buffer should be empty as syn packet - # should be correctly acked + # after a successful connection the outgoing buffer should be empty as + # the SYN packet should have been acked clientSocket.numPacketsInOutGoingBuffer() == 0 + # Server socket is not in connected state until first data transfer not serverSocket.isConnected() - await utpProt1.shutdownWait() - await utpProt2.shutdownWait() + await utpProto1.shutdownWait() + await utpProto2.shutdownWait() - asyncTest "Fail to connect to offline remote host": - let server1Called = newAsyncEvent() - let address = initTAddress("127.0.0.1", 9079) - let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address , nil, SocketConfig.init(milliseconds(200))) + asyncTest "Connect to offline remote server host": + let + server1Incoming = newAsyncEvent() + address1 = initTAddress("127.0.0.1", 9079) + utpProto1 = UtpProtocol.new( + setAcceptedCallback(server1Incoming), address1 , nil, + SocketConfig.init(milliseconds(200))) - let address1 = initTAddress("127.0.0.1", 9080) + address2 = initTAddress("127.0.0.1", 9080) - let connectionResult = await utpProt1.connectTo(address1) + socketResult = await utpProto1.connectTo(address2) - check: - connectionResult.isErr() + check socketResult.isErr() + let connectionError = socketResult.error() + check connectionError.kind == ConnectionTimedOut - let connectionError = connectionResult.error() + await waitUntil(proc (): bool = utpProto1.openSockets() == 0) - check: - connectionError.kind == ConnectionTimedOut + check utpProto1.openSockets() == 0 - await waitUntil(proc (): bool = utpProt1.openSockets() == 0) + await utpProto1.shutdownWait() - check: - utpProt1.openSockets() == 0 + asyncTest "Connect to remote host which was initially offline": + let + server1Incoming = newAsyncEvent() + address1 = initTAddress("127.0.0.1", 9079) + utpProto1 = UtpProtocol.new( + setAcceptedCallback(server1Incoming), address1, nil, + # Sets initial SYN timeout to 500ms + SocketConfig.init(milliseconds(500))) - await utpProt1.shutdownWait() + address2 = initTAddress("127.0.0.1", 9080) - asyncTest "Success connect to remote host which initially was offline": - let server1Called = newAsyncEvent() - let address = initTAddress("127.0.0.1", 9079) - let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address, nil, SocketConfig.init(milliseconds(500))) - - let address1 = initTAddress("127.0.0.1", 9080) - - let futSock = utpProt1.connectTo(address1) + futSock = utpProto1.connectTo(address2) # waiting 400 millisecond will trigger at least one re-send await sleepAsync(milliseconds(400)) - var server2Called = newAsyncEvent() - let utpProt2 = UtpProtocol.new(setAcceptedCallback(server2Called), address1) + var server2Incoming = newAsyncEvent() + let utpProto2 = UtpProtocol.new(setAcceptedCallback(server2Incoming), address2) - # this future will be completed when we called accepted connection callback - await server2Called.wait() + # This future will complete when the accepted connection callback is called + await server2Incoming.wait() discard (await futSock) check: futSock.finished() and (not futSock.failed()) and (not futSock.cancelled()) - server2Called.isSet() + server2Incoming.isSet() - await utpProt1.shutdownWait() - await utpProt2.shutdownWait() + await utpProto1.shutdownWait() + await utpProto2.shutdownWait() - asyncTest "Success data transfer when data fits into one packet": + asyncTest "Data transfer where data fits into one packet": let s = await initClientServerScenario() check: s.clientSocket.isConnected() - # after successful connection outgoing buffer should be empty as syn packet - # should be correctly acked + # after a successful connection the outgoing buffer should be empty as + # the SYN packet should have been acked s.clientSocket.numPacketsInOutGoingBuffer() == 0 - - # Server socket is not in connected state, until first data transfer + # Server socket is not in connected state until first data transfer (not s.serverSocket.isConnected()) let bytesToTransfer = rng[].generateBytes(100) - let bytesReceivedFromClient = await transferData(s.clientSocket, s.serverSocket, bytesToTransfer) + let bytesReceivedFromClient = await transferData( + s.clientSocket, s.serverSocket, bytesToTransfer) check: bytesToTransfer == bytesReceivedFromClient s.serverSocket.isConnected() - let bytesReceivedFromServer = await transferData(s.serverSocket, s.clientSocket, bytesToTransfer) + let bytesReceivedFromServer = await transferData( + s.serverSocket, s.clientSocket, bytesToTransfer) check: bytesToTransfer == bytesReceivedFromServer await s.close() - asyncTest "Success data transfer when data need to be sliced into multiple packets": + asyncTest "Data transfer where data need to be sliced into multiple packets": let s = await initClientServerScenario() check: s.clientSocket.isConnected() - # after successful connection outgoing buffer should be empty as syn packet - # should be correctly acked s.clientSocket.numPacketsInOutGoingBuffer() == 0 - (not s.serverSocket.isConnected()) # 20_000 bytes is way over maximal packet size let bytesToTransfer = rng[].generateBytes(20_000) - let bytesReceivedFromClient = await transferData(s.clientSocket, s.serverSocket, bytesToTransfer) - let bytesReceivedFromServer = await transferData(s.serverSocket, s.clientSocket, bytesToTransfer) + let bytesReceivedFromClient = await transferData( + s.clientSocket, s.serverSocket, bytesToTransfer) + let bytesReceivedFromServer = await transferData( + s.serverSocket, s.clientSocket, bytesToTransfer) # ultimately all send packets will acked, and outgoing buffer will be empty await waitUntil(proc (): bool = s.clientSocket.numPacketsInOutGoingBuffer() == 0) @@ -289,81 +308,74 @@ procSuite "Utp protocol over udp tests": await s.close() - asyncTest "Success multiple data transfers when data need to be sliced into multiple packets": + asyncTest "Multiple data transfers where data need to be sliced into multiple packets": let s = await initClientServerScenario() check: s.clientSocket.isConnected() - # after successful connection outgoing buffer should be empty as syn packet - # should be correctly acked s.clientSocket.numPacketsInOutGoingBuffer() == 0 + const + amountOftransfers = 3 + amountOfBytes = 5000 - # 5000 bytes is over maximal packet size - let bytesToTransfer = rng[].generateBytes(5000) + var totalBytesToTransfer: seq[byte] + for i in 0.. startMaxWindow s.serverSocket.isConnected() s.clientSocket.numPacketsInOutGoingBuffer() == 0 @@ -488,16 +507,14 @@ procSuite "Utp protocol over udp tests": await s.close() - asyncTest "Not used socket should decay its max send window": + asyncTest "Unused socket should decay its max send window": let s = await initClientServerScenario() let startMaxWindow = 2 * s.clientSocket.getSocketConfig().payloadSize check: s.clientSocket.isConnected() - # initially window has value equal to some pre configured constant + # initially the window has value equal to a pre-configured constant s.clientSocket.currentMaxWindowSize == startMaxWindow - # after successful connection outgoing buffer should be empty as syn packet - # should be correctly acked s.clientSocket.numPacketsInOutGoingBuffer() == 0 (not s.serverSocket.isConnected()) @@ -505,16 +522,17 @@ procSuite "Utp protocol over udp tests": # big transfer of 50kb let bytesToTransfer = rng[].generateBytes(50000) - let bytesReceivedFromClient = await transferData(s.clientSocket, s.serverSocket, bytesToTransfer) + let bytesReceivedFromClient = await transferData( + s.clientSocket, s.serverSocket, bytesToTransfer) - # ultimately all send packets will acked, and outgoing buffer will be empty + # ultimately all send packets will be acked and the outgoing buffer will be empty await waitUntil(proc (): bool = s.clientSocket.numPacketsInOutGoingBuffer() == 0) let maximumMaxWindow = s.clientSocket.currentMaxWindowSize check: - # we can only assert that window has grown, because specific values depends on - # particular timings + # we can only assess that the window has grown, because the specific value + # depends on particular timings maximumMaxWindow > startMaxWindow s.serverSocket.isConnected() s.clientSocket.numPacketsInOutGoingBuffer() == 0