nim-eth/tests/utp/test_protocol_integration.nim

268 lines
9.0 KiB
Nim

# Copyright (c) 2022-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.used.}
import
std/[tables, options],
chronos,
testutils/unittests,
../../eth/utp/utp_router,
../../eth/utp/utp_protocol,
../../eth/common/keys,
../../eth/p2p/discoveryv5/random2,
../stubloglevel
proc connectTillSuccess(p: UtpProtocol, to: TransportAddress, maxTries: int = 20): Future[UtpSocket[TransportAddress]] {.async.} =
var i = 0
while true:
let res = await p.connectTo(to)
if res.isOk():
return res.unsafeGet()
else:
inc i
if i >= maxTries:
raise newException(CatchableError, "Connection failed")
proc buildAcceptConnection(
t: ref Table[UtpSocketKey[TransportAddress], UtpSocket[TransportAddress]]
): AcceptConnectionCallback[TransportAddress] =
return (
proc (
server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]
): Future[void] {.async: (raw: true, raises: []).} =
let fut = noCancel Future[void].Raising([CancelledError]).init("test.AcceptConnectionCallback")
let key = client.socketKey
t[key] = client
fut.complete()
fut
)
proc getServerSocket(
t: ref Table[UtpSocketKey[TransportAddress], UtpSocket[TransportAddress]],
clientAddress: TransportAddress,
clientConnectionId: uint16): Option[UtpSocket[TransportAddress]] =
let serverSocketKey = UtpSocketKey[TransportAddress](remoteAddress: clientAddress, rcvId: clientConnectionId + 1)
let srvSocket = t.getOrDefault(serverSocketKey)
if srvSocket == nil:
return none[UtpSocket[TransportAddress]]()
else:
return some(srvSocket)
procSuite "uTP over UDP protocol with loss and delays":
let rng = newRng()
proc simulatedSend(
d: DatagramTransport, to: TransportAddress, data: seq[byte],
maxDelay: int, packetDropRate: int
) {.async: (raises: []).} =
let i = rand(rng[], 99)
if i >= packetDropRate:
let delay = milliseconds(rand(rng[], maxDelay))
try:
await sleepAsync(delay)
await d.sendTo(to, data)
except TransportError:
# ignore
return
except CancelledError:
# ignore
return
proc sendBuilder(maxDelay: int, packetDropRate: int): SendCallbackBuilder =
return (
proc (d: DatagramTransport): SendCallback[TransportAddress] =
return (
proc (to: TransportAddress, data: seq[byte]) =
asyncSpawn simulatedSend(d, to, data, maxDelay, packetDropRate)
)
)
proc testScenario(maxDelay: int, dropRate: int, cfg: SocketConfig = SocketConfig.init()):
Future[(
UtpProtocol,
UtpSocket[TransportAddress],
UtpProtocol,
UtpSocket[TransportAddress])
] {.async.} =
var connections1 = newTable[UtpSocketKey[TransportAddress], UtpSocket[TransportAddress]]()
let address1 = initTAddress("127.0.0.1", 9080)
let utpProt1 =
UtpProtocol.new(
buildAcceptConnection(connections1),
address1,
socketConfig = cfg,
sendCallbackBuilder = sendBuilder(maxDelay, dropRate),
rng = rng)
var connections2 = newTable[UtpSocketKey[TransportAddress], UtpSocket[TransportAddress]]()
let address2 = initTAddress("127.0.0.1", 9081)
let utpProt2 =
UtpProtocol.new(
buildAcceptConnection(connections2),
address2,
socketConfig = cfg,
sendCallbackBuilder = sendBuilder(maxDelay, dropRate),
rng = rng)
let clientSocket = await utpProt1.connectTillSuccess(address2)
let maybeServerSocket = connections2.getServerSocket(address1, clientSocket.socketKey.rcvId)
let serverSocket = maybeServerSocket.unsafeGet()
return (utpProt1, clientSocket, utpProt2, serverSocket)
type TestCase = object
# in milliseconds
maxDelay: int
dropRate: int
bytesToTransfer: int
bytesPerRead: int
cfg: SocketConfig
proc init(
T: type TestCase,
maxDelay: int,
dropRate: int,
bytesToTransfer: int,
cfg: SocketConfig = SocketConfig.init(),
bytesPerRead: int = 0): TestCase =
TestCase(maxDelay: maxDelay, dropRate: dropRate, bytesToTransfer: bytesToTransfer, cfg: cfg, bytesPerRead: bytesPerRead)
let testCases = @[
TestCase.init(15, 3, 40000),
# super small recv buffer which will be constantly on the brink of being full
TestCase.init(10, 3, 40000, SocketConfig.init(optRcvBuffer = uint32(10000), remoteWindowResetTimeout = seconds(5))),
TestCase.init(10, 6, 40000, SocketConfig.init(optRcvBuffer = uint32(10000), remoteWindowResetTimeout = seconds(5)))
]
asyncTest "Write and Read large data in different network conditions":
for testCase in testCases:
let (
clientProtocol,
clientSocket,
serverProtocol,
serverSocket) = await testScenario(testCase.maxDelay, testCase.dropRate, testCase.cfg)
let smallBytes = 10
let smallBytesToTransfer = rng[].generateBytes(smallBytes)
# first transfer and read to make server socket connected
let write1 = await clientSocket.write(smallBytesToTransfer)
let read1 = await serverSocket.read(smallBytes)
check:
write1.isOk()
read1 == smallBytesToTransfer
let numBytes = testCase.bytesToTransfer
let bytesToTransfer = rng[].generateBytes(numBytes)
discard clientSocket.write(bytesToTransfer)
discard serverSocket.write(bytesToTransfer)
let serverReadFut = serverSocket.read(numBytes)
let clientReadFut = clientSocket.read(numBytes)
await allFutures(serverReadFut, clientReadFut)
let clientRead = clientReadFut.read()
let serverRead = serverReadFut.read()
check:
clientRead == bytesToTransfer
serverRead == bytesToTransfer
await clientProtocol.shutdownWait()
await serverProtocol.shutdownWait()
let testCases1 = @[
# small buffers so it will fill up between reads
TestCase.init(5, 3, 40000, SocketConfig.init(optRcvBuffer = uint32(10000), remoteWindowResetTimeout = seconds(5)), 10000),
TestCase.init(10, 6, 40000, SocketConfig.init(optRcvBuffer = uint32(10000), remoteWindowResetTimeout = seconds(5)), 10000),
TestCase.init(15, 6, 40000, SocketConfig.init(optRcvBuffer = uint32(10000), remoteWindowResetTimeout = seconds(5)), 10000)
]
proc readWithMultipleReads(s: UtpSocket[TransportAddress], numOfReads: int, bytesPerRead: int): Future[seq[byte]] {.async.}=
var i = 0
var res: seq[byte] = @[]
while i < numOfReads:
let bytes = await s.read(bytesPerRead)
res.add(bytes)
inc i
return res
asyncTest "Write and Read large data in different network conditions split over several reads":
for testCase in testCases1:
let (
clientProtocol,
clientSocket,
serverProtocol,
serverSocket) = await testScenario(testCase.maxDelay, testCase.dropRate, testCase.cfg)
let smallBytes = 10
let smallBytesToTransfer = rng[].generateBytes(smallBytes)
# first transfer and read to make server socket connected
discard await clientSocket.write(smallBytesToTransfer)
let read1 = await serverSocket.read(smallBytes)
check:
read1 == smallBytesToTransfer
let numBytes = testCase.bytesToTransfer
let bytesToTransfer = rng[].generateBytes(numBytes)
discard clientSocket.write(bytesToTransfer)
discard serverSocket.write(bytesToTransfer)
let numOfReads = int(testCase.bytesToTransfer / testCase.bytesPerRead)
let serverReadFut = serverSocket.readWithMultipleReads(numOfReads, testCase.bytesPerRead)
let clientReadFut = clientSocket.readWithMultipleReads(numOfReads, testCase.bytesPerRead)
await allFutures(serverReadFut, clientReadFut)
let clientRead = clientReadFut.read()
let serverRead = serverReadFut.read()
check:
clientRead == bytesToTransfer
serverRead == bytesToTransfer
await clientProtocol.shutdownWait()
await serverProtocol.shutdownWait()
let testCase2 = @[
TestCase.init(10, 0, 80000),
TestCase.init(10, 3, 40000),
TestCase.init(15, 6, 40000, SocketConfig.init(optRcvBuffer = uint32(10000), remoteWindowResetTimeout = seconds(5)))
]
asyncTest "Write large data and read till EOF":
for testCase in testCase2:
let (
clientProtocol,
clientSocket,
serverProtocol,
serverSocket) = await testScenario(testCase.maxDelay, testCase.dropRate, testCase.cfg)
let numBytes = testCase.bytesToTransfer
let bytesToTransfer = rng[].generateBytes(numBytes)
discard await clientSocket.write(bytesToTransfer)
clientSocket.close()
let read = await serverSocket.read()
check:
read == bytesToTransfer
await clientProtocol.shutdownWait()
await serverProtocol.shutdownWait()