mirror of https://github.com/status-im/nim-eth.git
458 lines
14 KiB
Nim
458 lines
14 KiB
Nim
# Copyright (c) 2020-2021 Status Research & Development GmbH
|
|
# Licensed and distributed under either of
|
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
|
|
|
{.used.}
|
|
|
|
import
|
|
std/[hashes, options],
|
|
chronos, chronicles,
|
|
testutils/unittests,
|
|
./test_utils,
|
|
../../eth/utp/utp_router,
|
|
../../eth/utp/packets,
|
|
../../eth/keys,
|
|
../stubloglevel
|
|
|
|
proc hash*(x: UtpSocketKey[int]): Hash =
|
|
var h = 0
|
|
h = h !& x.remoteAddress.hash
|
|
h = h !& x.rcvId.hash
|
|
!$h
|
|
|
|
type
|
|
TestError* = object of CatchableError
|
|
|
|
procSuite "uTP router unit":
|
|
let rng = newRng()
|
|
let testSender = 1
|
|
let testSender2 = 2
|
|
let testBufferSize = 1024'u32
|
|
|
|
proc registerIncomingSocketCallback(serverSockets: AsyncQueue): AcceptConnectionCallback[int] =
|
|
return (
|
|
proc(server: UtpRouter[int], client: UtpSocket[int]): Future[void] =
|
|
serverSockets.addLast(client)
|
|
)
|
|
|
|
proc testSend(to: int, bytes: seq[byte]) =
|
|
discard
|
|
|
|
proc initTestSnd(q: AsyncQueue[(Packet, int)]): SendCallback[int]=
|
|
return (
|
|
proc (to: int, bytes: seq[byte]) {.raises: [], gcsafe.} =
|
|
let p = decodePacket(bytes).get()
|
|
try:
|
|
q.addLastNoWait((p, to))
|
|
except AsyncQueueFullError:
|
|
raiseAssert "Should not occur as unlimited queue"
|
|
)
|
|
|
|
template connectOutgoing(
|
|
r: UtpRouter[int],
|
|
remote: int,
|
|
pq: AsyncQueue[(Packet, int)],
|
|
initialRemoteSeq: uint16): (UtpSocket[int], Packet)=
|
|
let connectFuture = router.connectTo(remote)
|
|
|
|
let (initialPacket, _) = await pq.get()
|
|
|
|
check:
|
|
initialPacket.header.pType == ST_SYN
|
|
|
|
let responseAck =
|
|
ackPacket(
|
|
initialRemoteSeq,
|
|
initialPacket.header.connectionId,
|
|
initialPacket.header.seqNr,
|
|
testBufferSize,
|
|
0
|
|
)
|
|
|
|
await router.processIncomingBytes(encodePacket(responseAck), remote)
|
|
|
|
let outgoingSocket = await connectFuture
|
|
(outgoingSocket.get(), initialPacket)
|
|
|
|
asyncTest "Router should ignore non utp packets":
|
|
let q = newAsyncQueue[UtpSocket[int]]()
|
|
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)
|
|
router.sendCb = testSend
|
|
|
|
await router.processIncomingBytes(@[1'u8, 2, 3], testSender)
|
|
|
|
check:
|
|
router.len() == 0
|
|
q.len() == 0
|
|
|
|
asyncTest "Router should create new incoming socket when receiving not known syn packet":
|
|
let q = newAsyncQueue[UtpSocket[int]]()
|
|
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), nil, nil, SocketConfig.init(), rng)
|
|
router.sendCb = testSend
|
|
let encodedSyn = encodePacket(synPacket(10, 10, 10))
|
|
|
|
await router.processIncomingBytes(encodedSyn, testSender)
|
|
|
|
check:
|
|
router.len() == 1
|
|
|
|
asyncTest "Router should not create new incoming connections when hitting connections limit":
|
|
let q = newAsyncQueue[UtpSocket[int]]()
|
|
let connectionsLimit = 2
|
|
let customConfig = SocketConfig.init(maxNumberOfOpenConnections = connectionsLimit)
|
|
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), customConfig, rng)
|
|
router.sendCb = testSend
|
|
|
|
var synPackets: seq[seq[byte]]
|
|
|
|
for i in 1..connectionsLimit+5:
|
|
let encodedSyn = encodePacket(synPacket(10, uint16(i), 10))
|
|
synPackets.add(encodedSyn)
|
|
|
|
for p in synPackets:
|
|
await router.processIncomingBytes(p, testSender)
|
|
|
|
check:
|
|
router.len() == connectionsLimit
|
|
|
|
asyncTest "Incoming connection should be closed when not receiving data for period of time when configured":
|
|
let q = newAsyncQueue[UtpSocket[int]]()
|
|
let router =
|
|
UtpRouter[int].new(
|
|
registerIncomingSocketCallback(q),
|
|
SocketConfig.init(incomingSocketReceiveTimeout = some(seconds(2))),
|
|
rng
|
|
)
|
|
router.sendCb = testSend
|
|
let encodedSyn = encodePacket(synPacket(10, 10, 10))
|
|
|
|
await router.processIncomingBytes(encodedSyn, testSender)
|
|
|
|
let socket = await q.get()
|
|
|
|
check:
|
|
router.len() == 1
|
|
# socket is not configured to be connected until receiving data
|
|
not socket.isConnected()
|
|
|
|
await waitUntil(proc (): bool = socket.isClosed())
|
|
|
|
check:
|
|
router.len() == 0
|
|
|
|
asyncTest "Incoming connection should be in connected state when configured":
|
|
let q = newAsyncQueue[UtpSocket[int]]()
|
|
let router =
|
|
UtpRouter[int].new(
|
|
registerIncomingSocketCallback(q),
|
|
SocketConfig.init(incomingSocketReceiveTimeout = none[Duration]()),
|
|
rng
|
|
)
|
|
router.sendCb = testSend
|
|
let encodedSyn = encodePacket(synPacket(10, 10, 10))
|
|
|
|
await router.processIncomingBytes(encodedSyn, testSender)
|
|
|
|
let socket = await q.get()
|
|
|
|
check:
|
|
router.len() == 1
|
|
socket.isConnected()
|
|
|
|
# wait a while to trigger timeout and check that socket is still connected
|
|
await sleepAsync(seconds(3))
|
|
|
|
check:
|
|
router.len() == 1
|
|
socket.isConnected()
|
|
|
|
asyncTest "Incoming connection should change state to connected when receiving data packet":
|
|
let q = newAsyncQueue[UtpSocket[int]]()
|
|
let pq = newAsyncQueue[(Packet, int)]()
|
|
let router =
|
|
UtpRouter[int].new(
|
|
registerIncomingSocketCallback(q),
|
|
SocketConfig.init(incomingSocketReceiveTimeout = some(seconds(3))),
|
|
rng
|
|
)
|
|
router.sendCb = initTestSnd(pq)
|
|
|
|
let dataToSend = @[1'u8]
|
|
let initSeq: uint16 = 10
|
|
let initConnId: uint16 = 10
|
|
|
|
let encodedSyn = encodePacket(synPacket(initSeq, initConnId, 10))
|
|
|
|
await router.processIncomingBytes(encodedSyn, testSender)
|
|
|
|
let (initialPacket, _) = await pq.get()
|
|
let socket = await q.get()
|
|
|
|
check:
|
|
router.len() == 1
|
|
# socket is not configured to be connected until receiving data
|
|
not socket.isConnected()
|
|
|
|
let encodedData =
|
|
encodePacket(
|
|
dataPacket(
|
|
initSeq + 1,
|
|
initConnId + 1,
|
|
initialPacket.header.seqNr - 1,
|
|
10,
|
|
dataToSend,
|
|
0
|
|
)
|
|
)
|
|
|
|
await router.processIncomingBytes(encodedData, testSender)
|
|
|
|
await waitUntil(proc (): bool = socket.numOfEventsInEventQueue() == 0)
|
|
|
|
check:
|
|
socket.isConnected()
|
|
|
|
asyncTest "Router should create new incoming socket when receiving same syn packet from different sender":
|
|
let q = newAsyncQueue[UtpSocket[int]]()
|
|
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)
|
|
router.sendCb = testSend
|
|
let encodedSyn = encodePacket(synPacket(10, 10, 10))
|
|
|
|
await router.processIncomingBytes(encodedSyn, testSender)
|
|
|
|
check:
|
|
router.len() == 1
|
|
|
|
await router.processIncomingBytes(encodedSyn, testSender2)
|
|
|
|
check:
|
|
router.len() == 2
|
|
|
|
asyncTest "Router should ignore duplicated syn packet":
|
|
let q = newAsyncQueue[UtpSocket[int]]()
|
|
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)
|
|
router.sendCb = testSend
|
|
let encodedSyn = encodePacket(synPacket(10, 10, 10))
|
|
|
|
await router.processIncomingBytes(encodedSyn, testSender)
|
|
|
|
check:
|
|
router.len() == 1
|
|
|
|
await router.processIncomingBytes(encodedSyn, testSender)
|
|
|
|
check:
|
|
router.len() == 1
|
|
|
|
asyncTest "Router should clear closed incoming sockets":
|
|
let q = newAsyncQueue[UtpSocket[int]]()
|
|
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)
|
|
router.sendCb = testSend
|
|
let encodedSyn = encodePacket(synPacket(10, 10, 10))
|
|
|
|
await router.processIncomingBytes(encodedSyn, testSender)
|
|
|
|
let socket = await q.get()
|
|
|
|
check:
|
|
router.len() == 1
|
|
|
|
await socket.destroyWait()
|
|
|
|
check:
|
|
not socket.isConnected()
|
|
router.len() == 0
|
|
|
|
asyncTest "Router should connect to out going peer":
|
|
let q = newAsyncQueue[UtpSocket[int]]()
|
|
let pq = newAsyncQueue[(Packet, int)]()
|
|
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)
|
|
router.sendCb = initTestSnd(pq)
|
|
|
|
let (outgoingSocket, _) = router.connectOutgoing(testSender2, pq, 30'u16)
|
|
|
|
check:
|
|
outgoingSocket.isConnected()
|
|
router.len() == 1
|
|
|
|
asyncTest "Router should fail to connect to the same peer with the same connection id":
|
|
let q = newAsyncQueue[UtpSocket[int]]()
|
|
let pq = newAsyncQueue[(Packet, int)]()
|
|
let initialRemoteSeq = 30'u16
|
|
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)
|
|
router.sendCb = initTestSnd(pq)
|
|
|
|
let requestedConnectionId = 1'u16
|
|
let connectFuture = router.connectTo(testSender2, requestedConnectionId)
|
|
|
|
let (initialPacket, _) = await pq.get()
|
|
|
|
check:
|
|
initialPacket.header.pType == ST_SYN
|
|
# connection id of syn packet should be set to requested connection id
|
|
initialPacket.header.connectionId == requestedConnectionId
|
|
|
|
let responseAck =
|
|
ackPacket(
|
|
initialRemoteSeq,
|
|
initialPacket.header.connectionId,
|
|
initialPacket.header.seqNr,
|
|
testBufferSize,
|
|
0
|
|
)
|
|
|
|
await router.processIncomingBytes(encodePacket(responseAck), testSender2)
|
|
|
|
let outgoingSocket = await connectFuture
|
|
|
|
check:
|
|
outgoingSocket.get().isConnected()
|
|
router.len() == 1
|
|
|
|
let duplicatedConnectionResult = await router.connectTo(testSender2, requestedConnectionId)
|
|
|
|
check:
|
|
duplicatedConnectionResult.isErr()
|
|
duplicatedConnectionResult.error().kind == SocketAlreadyExists
|
|
|
|
asyncTest "Router should fail connect when socket syn will not be acked":
|
|
let q = newAsyncQueue[UtpSocket[int]]()
|
|
let pq = newAsyncQueue[(Packet, int)]()
|
|
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(milliseconds(500)), rng)
|
|
router.sendCb = initTestSnd(pq)
|
|
|
|
let connectFuture = router.connectTo(testSender2)
|
|
|
|
let (initialPacket, _) = await pq.get()
|
|
|
|
check:
|
|
initialPacket.header.pType == ST_SYN
|
|
|
|
let connectResult = await connectFuture
|
|
|
|
await waitUntil(proc (): bool = router.len() == 0)
|
|
|
|
check:
|
|
connectResult.isErr()
|
|
connectResult.error().kind == ConnectionTimedOut
|
|
router.len() == 0
|
|
|
|
asyncTest "Router should clear all resources when connection future is cancelled":
|
|
let q = newAsyncQueue[UtpSocket[int]]()
|
|
let pq = newAsyncQueue[(Packet, int)]()
|
|
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(milliseconds(500)), rng)
|
|
router.sendCb = initTestSnd(pq)
|
|
|
|
let connectFuture = router.connectTo(testSender2)
|
|
|
|
let (initialPacket, _) = await pq.get()
|
|
|
|
check:
|
|
initialPacket.header.pType == ST_SYN
|
|
router.len() == 1
|
|
|
|
await connectFuture.cancelAndWait()
|
|
|
|
await waitUntil(proc (): bool = router.len() == 0)
|
|
|
|
check:
|
|
router.len() == 0
|
|
|
|
asyncTest "Router should clear all resources and handle error when sending syn packet fails":
|
|
let q = newAsyncQueue[UtpSocket[int]]()
|
|
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(milliseconds(500)), rng)
|
|
router.sendCb =
|
|
proc (to: int, data: seq[byte]) =
|
|
# Can just discard here not to send anything as the send callback does
|
|
# not forward errors anyhow
|
|
discard
|
|
|
|
let connectResult = await router.connectTo(testSender2)
|
|
|
|
await waitUntil(proc (): bool = router.len() == 0)
|
|
|
|
check:
|
|
connectResult.isErr()
|
|
# even though send is failing we will just finish with timeout,
|
|
connectResult.error().kind == ConnectionTimedOut
|
|
router.len() == 0
|
|
|
|
asyncTest "Router should clear closed outgoing connections":
|
|
let q = newAsyncQueue[UtpSocket[int]]()
|
|
let pq = newAsyncQueue[(Packet, int)]()
|
|
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)
|
|
router.sendCb = initTestSnd(pq)
|
|
|
|
let (outgoingSocket, _) = router.connectOutgoing(testSender2, pq, 30'u16)
|
|
|
|
check:
|
|
outgoingSocket.isConnected()
|
|
router.len() == 1
|
|
|
|
await outgoingSocket.destroyWait()
|
|
|
|
check:
|
|
not outgoingSocket.isConnected()
|
|
router.len() == 0
|
|
|
|
asyncTest "Router should respond with Reset when receiving packet for not known connection":
|
|
let q = newAsyncQueue[UtpSocket[int]]()
|
|
let pq = newAsyncQueue[(Packet, int)]()
|
|
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)
|
|
router.sendCb = initTestSnd(pq)
|
|
|
|
let sndId = 10'u16
|
|
let dp = dataPacket(10'u16, sndId, 10'u16, 10'u32, @[1'u8], 0)
|
|
|
|
await router.processIncomingBytes(encodePacket(dp), testSender2)
|
|
|
|
let (packet, sender) = await pq.get()
|
|
check:
|
|
packet.header.pType == ST_RESET
|
|
packet.header.connectionId == sndId
|
|
sender == testSender2
|
|
|
|
asyncTest "Router close incoming connection which receives reset":
|
|
let q = newAsyncQueue[UtpSocket[int]]()
|
|
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)
|
|
router.sendCb = testSend
|
|
let recvId = 10'u16
|
|
let encodedSyn = encodePacket(synPacket(10, recvId, 10))
|
|
|
|
await router.processIncomingBytes(encodedSyn, testSender)
|
|
|
|
check:
|
|
router.len() == 1
|
|
|
|
let rstPacket = resetPacket(10, recvId, 10)
|
|
|
|
await router.processIncomingBytes(encodePacket(rstPacket), testSender)
|
|
|
|
await waitUntil(proc (): bool = router.len() == 0)
|
|
|
|
check:
|
|
router.len() == 0
|
|
|
|
asyncTest "Router close outgoing connection which receives reset":
|
|
let q = newAsyncQueue[UtpSocket[int]]()
|
|
let pq = newAsyncQueue[(Packet, int)]()
|
|
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng)
|
|
router.sendCb = initTestSnd(pq)
|
|
|
|
let (_, initialSyn) = router.connectOutgoing(testSender2, pq, 30'u16)
|
|
|
|
check:
|
|
router.len() == 1
|
|
|
|
# remote side sendId is syn.header.connectionId + 1
|
|
let rstPacket = resetPacket(10, initialSyn.header.connectionId + 1, 10)
|
|
|
|
await router.processIncomingBytes(encodePacket(rstPacket), testSender2)
|
|
|
|
await waitUntil(proc (): bool = router.len() == 0)
|
|
|
|
check:
|
|
router.len() == 0
|