nim-eth/tests/utp/test_utp_router.nim

458 lines
14 KiB
Nim

# Copyright (c) 2020-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.used.}
import
std/[hashes, options],
chronos, chronicles,
testutils/unittests,
./test_utils,
../../eth/utp/utp_router,
../../eth/utp/packets,
../../eth/keys,
../stubloglevel
proc hash*(x: UtpSocketKey[int]): Hash =
var h = 0
h = h !& x.remoteAddress.hash
h = h !& x.rcvId.hash
!$h
type
TestError* = object of CatchableError
procSuite "uTP router unit":
let rng = newRng()
let testSender = 1
let testSender2 = 2
let testBufferSize = 1024'u32
proc registerIncomingSocketCallback(serverSockets: AsyncQueue): AcceptConnectionCallback[int] =
return (
proc(server: UtpRouter[int], client: UtpSocket[int]): Future[void] {.async: (raw: true, raises: []).} =
noCancel serverSockets.addLast(client)
)
proc testSend(to: int, bytes: seq[byte]) =
discard
proc initTestSnd(q: AsyncQueue[(Packet, int)]): SendCallback[int]=
return (
proc (to: int, bytes: seq[byte]) {.raises: [], gcsafe.} =
let p = decodePacket(bytes).get()
try:
q.addLastNoWait((p, to))
except AsyncQueueFullError:
raiseAssert "Should not occur as unlimited queue"
)
template connectOutgoing(
r: UtpRouter[int],
remote: int,
pq: AsyncQueue[(Packet, int)],
initialRemoteSeq: uint16): (UtpSocket[int], Packet)=
let connectFuture = router.connectTo(remote)
let (initialPacket, _) = await pq.get()
check:
initialPacket.header.pType == ST_SYN
let responseAck =
ackPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
testBufferSize,
0
)
await router.processIncomingBytes(encodePacket(responseAck), remote)
let outgoingSocket = await connectFuture
(outgoingSocket.get(), initialPacket)
asyncTest "Router should ignore non utp packets":
let q = newAsyncQueue[UtpSocket[int]]()
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)
router.sendCb = testSend
await router.processIncomingBytes(@[1'u8, 2, 3], testSender)
check:
router.len() == 0
q.len() == 0
asyncTest "Router should create new incoming socket when receiving not known syn packet":
let q = newAsyncQueue[UtpSocket[int]]()
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), nil, nil, SocketConfig.init(), rng)
router.sendCb = testSend
let encodedSyn = encodePacket(synPacket(10, 10, 10))
await router.processIncomingBytes(encodedSyn, testSender)
check:
router.len() == 1
asyncTest "Router should not create new incoming connections when hitting connections limit":
let q = newAsyncQueue[UtpSocket[int]]()
let connectionsLimit = 2
let customConfig = SocketConfig.init(maxNumberOfOpenConnections = connectionsLimit)
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), customConfig, rng)
router.sendCb = testSend
var synPackets: seq[seq[byte]]
for i in 1..connectionsLimit+5:
let encodedSyn = encodePacket(synPacket(10, uint16(i), 10))
synPackets.add(encodedSyn)
for p in synPackets:
await router.processIncomingBytes(p, testSender)
check:
router.len() == connectionsLimit
asyncTest "Incoming connection should be closed when not receiving data for period of time when configured":
let q = newAsyncQueue[UtpSocket[int]]()
let router =
UtpRouter[int].new(
registerIncomingSocketCallback(q),
SocketConfig.init(incomingSocketReceiveTimeout = some(seconds(2))),
rng
)
router.sendCb = testSend
let encodedSyn = encodePacket(synPacket(10, 10, 10))
await router.processIncomingBytes(encodedSyn, testSender)
let socket = await q.get()
check:
router.len() == 1
# socket is not configured to be connected until receiving data
not socket.isConnected()
await waitUntil(proc (): bool = socket.isClosed())
check:
router.len() == 0
asyncTest "Incoming connection should be in connected state when configured":
let q = newAsyncQueue[UtpSocket[int]]()
let router =
UtpRouter[int].new(
registerIncomingSocketCallback(q),
SocketConfig.init(incomingSocketReceiveTimeout = none[Duration]()),
rng
)
router.sendCb = testSend
let encodedSyn = encodePacket(synPacket(10, 10, 10))
await router.processIncomingBytes(encodedSyn, testSender)
let socket = await q.get()
check:
router.len() == 1
socket.isConnected()
# wait a while to trigger timeout and check that socket is still connected
await sleepAsync(seconds(3))
check:
router.len() == 1
socket.isConnected()
asyncTest "Incoming connection should change state to connected when receiving data packet":
let q = newAsyncQueue[UtpSocket[int]]()
let pq = newAsyncQueue[(Packet, int)]()
let router =
UtpRouter[int].new(
registerIncomingSocketCallback(q),
SocketConfig.init(incomingSocketReceiveTimeout = some(seconds(3))),
rng
)
router.sendCb = initTestSnd(pq)
let dataToSend = @[1'u8]
let initSeq: uint16 = 10
let initConnId: uint16 = 10
let encodedSyn = encodePacket(synPacket(initSeq, initConnId, 10))
await router.processIncomingBytes(encodedSyn, testSender)
let (initialPacket, _) = await pq.get()
let socket = await q.get()
check:
router.len() == 1
# socket is not configured to be connected until receiving data
not socket.isConnected()
let encodedData =
encodePacket(
dataPacket(
initSeq + 1,
initConnId + 1,
initialPacket.header.seqNr - 1,
10,
dataToSend,
0
)
)
await router.processIncomingBytes(encodedData, testSender)
await waitUntil(proc (): bool = socket.numOfEventsInEventQueue() == 0)
check:
socket.isConnected()
asyncTest "Router should create new incoming socket when receiving same syn packet from different sender":
let q = newAsyncQueue[UtpSocket[int]]()
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)
router.sendCb = testSend
let encodedSyn = encodePacket(synPacket(10, 10, 10))
await router.processIncomingBytes(encodedSyn, testSender)
check:
router.len() == 1
await router.processIncomingBytes(encodedSyn, testSender2)
check:
router.len() == 2
asyncTest "Router should ignore duplicated syn packet":
let q = newAsyncQueue[UtpSocket[int]]()
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)
router.sendCb = testSend
let encodedSyn = encodePacket(synPacket(10, 10, 10))
await router.processIncomingBytes(encodedSyn, testSender)
check:
router.len() == 1
await router.processIncomingBytes(encodedSyn, testSender)
check:
router.len() == 1
asyncTest "Router should clear closed incoming sockets":
let q = newAsyncQueue[UtpSocket[int]]()
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)
router.sendCb = testSend
let encodedSyn = encodePacket(synPacket(10, 10, 10))
await router.processIncomingBytes(encodedSyn, testSender)
let socket = await q.get()
check:
router.len() == 1
await socket.destroyWait()
check:
not socket.isConnected()
router.len() == 0
asyncTest "Router should connect to out going peer":
let q = newAsyncQueue[UtpSocket[int]]()
let pq = newAsyncQueue[(Packet, int)]()
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)
router.sendCb = initTestSnd(pq)
let (outgoingSocket, _) = router.connectOutgoing(testSender2, pq, 30'u16)
check:
outgoingSocket.isConnected()
router.len() == 1
asyncTest "Router should fail to connect to the same peer with the same connection id":
let q = newAsyncQueue[UtpSocket[int]]()
let pq = newAsyncQueue[(Packet, int)]()
let initialRemoteSeq = 30'u16
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)
router.sendCb = initTestSnd(pq)
let requestedConnectionId = 1'u16
let connectFuture = router.connectTo(testSender2, requestedConnectionId)
let (initialPacket, _) = await pq.get()
check:
initialPacket.header.pType == ST_SYN
# connection id of syn packet should be set to requested connection id
initialPacket.header.connectionId == requestedConnectionId
let responseAck =
ackPacket(
initialRemoteSeq,
initialPacket.header.connectionId,
initialPacket.header.seqNr,
testBufferSize,
0
)
await router.processIncomingBytes(encodePacket(responseAck), testSender2)
let outgoingSocket = await connectFuture
check:
outgoingSocket.get().isConnected()
router.len() == 1
let duplicatedConnectionResult = await router.connectTo(testSender2, requestedConnectionId)
check:
duplicatedConnectionResult.isErr()
duplicatedConnectionResult.error() == SocketAlreadyExists
asyncTest "Router should fail connect when socket syn will not be acked":
let q = newAsyncQueue[UtpSocket[int]]()
let pq = newAsyncQueue[(Packet, int)]()
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(milliseconds(500)), rng)
router.sendCb = initTestSnd(pq)
let connectFuture = router.connectTo(testSender2)
let (initialPacket, _) = await pq.get()
check:
initialPacket.header.pType == ST_SYN
let connectResult = await connectFuture
await waitUntil(proc (): bool = router.len() == 0)
check:
connectResult.isErr()
connectResult.error() == ConnectionTimedOut
router.len() == 0
asyncTest "Router should clear all resources when connection future is cancelled":
let q = newAsyncQueue[UtpSocket[int]]()
let pq = newAsyncQueue[(Packet, int)]()
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(milliseconds(500)), rng)
router.sendCb = initTestSnd(pq)
let connectFuture = router.connectTo(testSender2)
let (initialPacket, _) = await pq.get()
check:
initialPacket.header.pType == ST_SYN
router.len() == 1
await connectFuture.cancelAndWait()
await waitUntil(proc (): bool = router.len() == 0)
check:
router.len() == 0
asyncTest "Router should clear all resources and handle error when sending syn packet fails":
let q = newAsyncQueue[UtpSocket[int]]()
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(milliseconds(500)), rng)
router.sendCb =
proc (to: int, data: seq[byte]) =
# Can just discard here not to send anything as the send callback does
# not forward errors anyhow
discard
let connectResult = await router.connectTo(testSender2)
await waitUntil(proc (): bool = router.len() == 0)
check:
connectResult.isErr()
# even though send is failing we will just finish with timeout,
connectResult.error() == ConnectionTimedOut
router.len() == 0
asyncTest "Router should clear closed outgoing connections":
let q = newAsyncQueue[UtpSocket[int]]()
let pq = newAsyncQueue[(Packet, int)]()
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)
router.sendCb = initTestSnd(pq)
let (outgoingSocket, _) = router.connectOutgoing(testSender2, pq, 30'u16)
check:
outgoingSocket.isConnected()
router.len() == 1
await outgoingSocket.destroyWait()
check:
not outgoingSocket.isConnected()
router.len() == 0
asyncTest "Router should respond with Reset when receiving packet for not known connection":
let q = newAsyncQueue[UtpSocket[int]]()
let pq = newAsyncQueue[(Packet, int)]()
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)
router.sendCb = initTestSnd(pq)
let sndId = 10'u16
let dp = dataPacket(10'u16, sndId, 10'u16, 10'u32, @[1'u8], 0)
await router.processIncomingBytes(encodePacket(dp), testSender2)
let (packet, sender) = await pq.get()
check:
packet.header.pType == ST_RESET
packet.header.connectionId == sndId
sender == testSender2
asyncTest "Router close incoming connection which receives reset":
let q = newAsyncQueue[UtpSocket[int]]()
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)
router.sendCb = testSend
let recvId = 10'u16
let encodedSyn = encodePacket(synPacket(10, recvId, 10))
await router.processIncomingBytes(encodedSyn, testSender)
check:
router.len() == 1
let rstPacket = resetPacket(10, recvId, 10)
await router.processIncomingBytes(encodePacket(rstPacket), testSender)
await waitUntil(proc (): bool = router.len() == 0)
check:
router.len() == 0
asyncTest "Router close outgoing connection which receives reset":
let q = newAsyncQueue[UtpSocket[int]]()
let pq = newAsyncQueue[(Packet, int)]()
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)
router.sendCb = initTestSnd(pq)
let (_, initialSyn) = router.connectOutgoing(testSender2, pq, 30'u16)
check:
router.len() == 1
# remote side sendId is syn.header.connectionId + 1
let rstPacket = resetPacket(10, initialSyn.header.connectionId + 1, 10)
await router.processIncomingBytes(encodePacket(rstPacket), testSender2)
await waitUntil(proc (): bool = router.len() == 0)
check:
router.len() == 0