mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-01-13 05:44:40 +00:00
parent
296bf3156d
commit
a8fdf8ec36
50
fluffy/tools/utp_testing/utp_rpc_types.nim
Normal file
50
fluffy/tools/utp_testing/utp_rpc_types.nim
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
# Copyright (c) 2022 Status Research & Development GmbH
|
||||||
|
# Licensed and distributed under either of
|
||||||
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
|
|
||||||
|
import
|
||||||
|
std/hashes,
|
||||||
|
json_rpc/jsonmarshal,
|
||||||
|
stew/byteutils,
|
||||||
|
eth/p2p/discoveryv5/node,
|
||||||
|
eth/utp/[utp_discv5_protocol, utp_router]
|
||||||
|
|
||||||
|
export jsonmarshal
|
||||||
|
|
||||||
|
type SKey* = object
|
||||||
|
id*: uint16
|
||||||
|
nodeId*: NodeId
|
||||||
|
|
||||||
|
proc `%`*(value: SKey): JsonNode =
|
||||||
|
let hex = value.nodeId.toBytesBE().toHex()
|
||||||
|
let numId = value.id.toBytesBE().toHex()
|
||||||
|
let finalStr = hex & numId
|
||||||
|
newJString(finalStr)
|
||||||
|
|
||||||
|
proc fromJson*(n: JsonNode, argName: string, result: var SKey)
|
||||||
|
{.raises: [Defect, ValueError].} =
|
||||||
|
n.kind.expect(JString, argName)
|
||||||
|
let str = n.getStr()
|
||||||
|
let strLen = len(str)
|
||||||
|
if (strLen >= 64):
|
||||||
|
let nodeIdStr = str.substr(0, 63)
|
||||||
|
let connIdStr = str.substr(64)
|
||||||
|
let nodeId = NodeId.fromHex(nodeIdStr)
|
||||||
|
let connId = uint16.fromBytesBE(connIdStr.hexToSeqByte())
|
||||||
|
result = SKey(nodeId: nodeId, id: connId)
|
||||||
|
else:
|
||||||
|
raise newException(ValueError, "Too short string")
|
||||||
|
|
||||||
|
proc hash*(x: SKey): Hash =
|
||||||
|
var h = 0
|
||||||
|
h = h !& x.id.hash
|
||||||
|
h = h !& x.nodeId.hash
|
||||||
|
!$h
|
||||||
|
|
||||||
|
func toSKey*(k: UtpSocketKey[NodeAddress]): SKey =
|
||||||
|
SKey(id: k.rcvId, nodeId: k.remoteAddress.nodeId)
|
@ -6,9 +6,11 @@
|
|||||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
import
|
import
|
||||||
|
std/[options, sequtils, sugar],
|
||||||
unittest2, testutils, chronos,
|
unittest2, testutils, chronos,
|
||||||
json_rpc/rpcclient, stew/byteutils,
|
json_rpc/rpcclient, stew/byteutils,
|
||||||
eth/keys
|
eth/keys,
|
||||||
|
./utp_test_client
|
||||||
|
|
||||||
proc generateByteSeq(rng: var BrHmacDrbgContext, length: int): seq[byte] =
|
proc generateByteSeq(rng: var BrHmacDrbgContext, length: int): seq[byte] =
|
||||||
var bytes = newSeq[byte](length)
|
var bytes = newSeq[byte](length)
|
||||||
@ -34,47 +36,61 @@ procSuite "Utp integration tests":
|
|||||||
let serverContainerAddress = "127.0.0.1"
|
let serverContainerAddress = "127.0.0.1"
|
||||||
let serverContainerPort = Port(9041)
|
let serverContainerPort = Port(9041)
|
||||||
|
|
||||||
# to avoid influencing uTP tests by discv5 sessions negotiation, at least one ping
|
type FutureCallback[A] = proc (): Future[A] {.gcsafe, raises: [Defect].}
|
||||||
# should be successful
|
|
||||||
proc pingTillSuccess(client: RpcHttpClient, enr: JsonNode): Future[void] {.async.}=
|
# combinator which repeatadly calls passed closure until returned future is
|
||||||
var failed = true
|
# successfull
|
||||||
while failed:
|
proc repeatTillSuccess[A](f: FutureCallback[A]): Future[A] {.async.}=
|
||||||
let pingRes = (await client.call("ping", %[enr])).getBool()
|
while true:
|
||||||
if pingRes:
|
try:
|
||||||
failed = false
|
let res = await f()
|
||||||
|
return res
|
||||||
|
except CatchableError:
|
||||||
|
continue
|
||||||
|
except CancelledError as canc:
|
||||||
|
raise canc
|
||||||
|
|
||||||
|
proc findServerConnection(
|
||||||
|
connections: openArray[SKey],
|
||||||
|
clientId: NodeId,
|
||||||
|
clientConnectionId: uint16): Option[Skey] =
|
||||||
|
let conns: seq[SKey] =
|
||||||
|
connections.filter((key:Skey) => key.id == (clientConnectionId + 1) and key.nodeId == clientId)
|
||||||
|
if len(conns) == 0:
|
||||||
|
none[Skey]()
|
||||||
|
else:
|
||||||
|
some[Skey](conns[0])
|
||||||
|
|
||||||
# TODO add more scenarios
|
# TODO add more scenarios
|
||||||
asyncTest "Transfer 5000B of data over utp stream":
|
asyncTest "Transfer 100k bytes of data over utp stream":
|
||||||
let client = newRpcHttpClient()
|
let client = newRpcHttpClient()
|
||||||
let server = newRpcHttpClient()
|
let server = newRpcHttpClient()
|
||||||
|
let numOfBytes = 100000
|
||||||
|
|
||||||
await client.connect(clientContainerAddress, clientContainerPort, false)
|
await client.connect(clientContainerAddress, clientContainerPort, false)
|
||||||
await server.connect(serverContainerAddress, serverContainerPort, false)
|
await server.connect(serverContainerAddress, serverContainerPort, false)
|
||||||
|
|
||||||
# TODO add file to generate nice api calls
|
let clientInfo = await client.discv5_nodeInfo()
|
||||||
let clientEnr = await client.call("get_record", %[])
|
let serverInfo = await server.discv5_nodeInfo()
|
||||||
let serverEnr = await server.call("get_record", %[])
|
|
||||||
|
|
||||||
let serverAddRes = await server.call("add_record", %[clientEnr])
|
# nodes need to have established session before the utp try
|
||||||
|
discard await repeatTillSuccess(() => client.discv5_ping(serverInfo.nodeEnr))
|
||||||
# we need to have successfull ping to have proper session on both sides, otherwise
|
|
||||||
# whoareyou packet exchange may influence testing of utp
|
|
||||||
await client.pingTillSuccess(serverEnr)
|
|
||||||
|
|
||||||
let connectRes = await client.call("connect", %[serverEnr])
|
|
||||||
|
|
||||||
let srvConns = (await server.call("get_connections", %[])).getElems()
|
|
||||||
|
|
||||||
check:
|
|
||||||
len(srvConns) == 1
|
|
||||||
|
|
||||||
let
|
let
|
||||||
clientKey = srvConns[0]
|
clientConnectionKey = await repeatTillSuccess(() => client.utp_connect(serverInfo.nodeEnr))
|
||||||
numBytes = 5000
|
serverConnections = await repeatTillSuccess(() => server.utp_get_connections())
|
||||||
bytes = generateByteSeqHex(rng[], numBytes)
|
maybeServerConnectionKey = serverConnections.findServerConnection(clientInfo.nodeId, clientConnectionKey.id)
|
||||||
writeRes = await client.call("write", %[connectRes, %bytes])
|
|
||||||
readRes = await server.call("read", %[clientKey, %numBytes])
|
|
||||||
bytesReceived = readRes.getStr()
|
|
||||||
|
|
||||||
check:
|
check:
|
||||||
bytes == bytesReceived
|
maybeServerConnectionKey.isSome()
|
||||||
|
|
||||||
|
let serverConnectionKey = maybeServerConnectionKey.unsafeGet()
|
||||||
|
|
||||||
|
let
|
||||||
|
bytesToWrite = generateByteSeqHex(rng[], numOfBytes)
|
||||||
|
writeRes = await client.utp_write(clientConnectionKey, bytesToWrite)
|
||||||
|
readData = await server.utp_read(serverConnectionKey, numOfBytes)
|
||||||
|
|
||||||
|
check:
|
||||||
|
writeRes == true
|
||||||
|
readData == bytesToWrite
|
||||||
|
@ -16,7 +16,8 @@ import
|
|||||||
eth/p2p/discoveryv5/protocol,
|
eth/p2p/discoveryv5/protocol,
|
||||||
eth/p2p/discoveryv5/enr,
|
eth/p2p/discoveryv5/enr,
|
||||||
eth/utp/[utp_discv5_protocol, utp_router],
|
eth/utp/[utp_discv5_protocol, utp_router],
|
||||||
eth/keys
|
eth/keys,
|
||||||
|
../../rpc/rpc_discovery_api
|
||||||
|
|
||||||
const
|
const
|
||||||
defaultListenAddress* = (static ValidIpAddress.init("127.0.0.1"))
|
defaultListenAddress* = (static ValidIpAddress.init("127.0.0.1"))
|
||||||
@ -86,36 +87,13 @@ proc hash(x: SKey): Hash =
|
|||||||
func toSKey(k: UtpSocketKey[NodeAddress]): SKey =
|
func toSKey(k: UtpSocketKey[NodeAddress]): SKey =
|
||||||
SKey(id: k.rcvId, nodeId: k.remoteAddress.nodeId)
|
SKey(id: k.rcvId, nodeId: k.remoteAddress.nodeId)
|
||||||
|
|
||||||
proc installHandlers(
|
proc installUtpHandlers(
|
||||||
srv: RpcHttpServer,
|
srv: RpcHttpServer,
|
||||||
d: protocol.Protocol,
|
d: protocol.Protocol,
|
||||||
s: UtpDiscv5Protocol,
|
s: UtpDiscv5Protocol,
|
||||||
t: ref Table[SKey, UtpSocket[NodeAddress]]) {.raises: [Defect, CatchableError].} =
|
t: ref Table[SKey, UtpSocket[NodeAddress]]) {.raises: [Defect, CatchableError].} =
|
||||||
|
|
||||||
srv.rpc("get_record") do() -> enr.Record:
|
srv.rpc("utp_connect") do(r: enr.Record) -> SKey:
|
||||||
return d.getRecord()
|
|
||||||
|
|
||||||
srv.rpc("add_record") do(r: enr.Record) -> bool:
|
|
||||||
let node = newNode(r)
|
|
||||||
|
|
||||||
if node.isOk():
|
|
||||||
return d.addNode(node.get())
|
|
||||||
else:
|
|
||||||
raise newException(ValueError, "Bad enr")
|
|
||||||
|
|
||||||
srv.rpc("ping") do(r: enr.Record) -> bool:
|
|
||||||
let nodeRes = newNode(r)
|
|
||||||
|
|
||||||
if nodeRes.isOk():
|
|
||||||
let node = nodeRes.get()
|
|
||||||
discard d.addNode(nodeRes.get())
|
|
||||||
let pingRes = await d.ping(node)
|
|
||||||
|
|
||||||
return pingRes.isOk()
|
|
||||||
else:
|
|
||||||
raise newException(ValueError, "Bad enr")
|
|
||||||
|
|
||||||
srv.rpc("connect") do(r: enr.Record) -> SKey:
|
|
||||||
let
|
let
|
||||||
nodeRes = newNode(r)
|
nodeRes = newNode(r)
|
||||||
|
|
||||||
@ -134,10 +112,11 @@ proc installHandlers(
|
|||||||
else:
|
else:
|
||||||
raise newException(ValueError, "Bad enr")
|
raise newException(ValueError, "Bad enr")
|
||||||
|
|
||||||
srv.rpc("write") do(k: SKey, b: string) -> bool:
|
srv.rpc("utp_write") do(k: SKey, b: string) -> bool:
|
||||||
let sock = t.getOrDefault(k)
|
let sock = t.getOrDefault(k)
|
||||||
let bytes = hexToSeqByte(b)
|
let bytes = hexToSeqByte(b)
|
||||||
if sock != nil:
|
if sock != nil:
|
||||||
|
# TODO consider doing it async to avoid json-rpc timeouts in case of large writes
|
||||||
let res = await sock.write(bytes)
|
let res = await sock.write(bytes)
|
||||||
if res.isOk():
|
if res.isOk():
|
||||||
return true
|
return true
|
||||||
@ -147,7 +126,7 @@ proc installHandlers(
|
|||||||
else:
|
else:
|
||||||
raise newException(ValueError, "Socket with provided key is missing")
|
raise newException(ValueError, "Socket with provided key is missing")
|
||||||
|
|
||||||
srv.rpc("get_connections") do() -> seq[SKey]:
|
srv.rpc("utp_get_connections") do() -> seq[SKey]:
|
||||||
var keys = newSeq[SKey]()
|
var keys = newSeq[SKey]()
|
||||||
|
|
||||||
for k in t.keys:
|
for k in t.keys:
|
||||||
@ -155,7 +134,7 @@ proc installHandlers(
|
|||||||
|
|
||||||
return keys
|
return keys
|
||||||
|
|
||||||
srv.rpc("read") do(k: SKey, n: int) -> string:
|
srv.rpc("utp_read") do(k: SKey, n: int) -> string:
|
||||||
let sock = t.getOrDefault(k)
|
let sock = t.getOrDefault(k)
|
||||||
if sock != nil:
|
if sock != nil:
|
||||||
let res = await sock.read(n)
|
let res = await sock.read(n)
|
||||||
@ -164,7 +143,7 @@ proc installHandlers(
|
|||||||
else:
|
else:
|
||||||
raise newException(ValueError, "Socket with provided key is missing")
|
raise newException(ValueError, "Socket with provided key is missing")
|
||||||
|
|
||||||
srv.rpc("close") do(k: SKey) -> bool:
|
srv.rpc("utp_close") do(k: SKey) -> bool:
|
||||||
let sock = t.getOrDefault(k)
|
let sock = t.getOrDefault(k)
|
||||||
if sock != nil:
|
if sock != nil:
|
||||||
await sock.closeWait()
|
await sock.closeWait()
|
||||||
@ -214,6 +193,9 @@ when isMainModule:
|
|||||||
cfg = SocketConfig.init(incomingSocketReceiveTimeout = none[Duration]())
|
cfg = SocketConfig.init(incomingSocketReceiveTimeout = none[Duration]())
|
||||||
utp = UtpDiscv5Protocol.new(d, protName, buildAcceptConnection(table), socketConfig = cfg)
|
utp = UtpDiscv5Protocol.new(d, protName, buildAcceptConnection(table), socketConfig = cfg)
|
||||||
|
|
||||||
srv.installHandlers(d, utp, table)
|
# needed for some of the discovery api: nodeInfo, setEnr, ping
|
||||||
|
srv.installDiscoveryApiHandlers(d)
|
||||||
|
|
||||||
|
srv.installUtpHandlers(d, utp, table)
|
||||||
|
|
||||||
runForever()
|
runForever()
|
||||||
|
17
fluffy/tools/utp_testing/utp_test_client.nim
Normal file
17
fluffy/tools/utp_testing/utp_test_client.nim
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
# Nimbus
|
||||||
|
# Copyright (c) 2022 Status Research & Development GmbH
|
||||||
|
# Licensed and distributed under either of
|
||||||
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
|
import
|
||||||
|
std/os,
|
||||||
|
json_rpc/rpcclient,
|
||||||
|
./utp_rpc_types,
|
||||||
|
../../rpc/[rpc_types, rpc_discovery_api]
|
||||||
|
|
||||||
|
export utp_rpc_types, rpc_types
|
||||||
|
|
||||||
|
createRpcSigs(RpcClient, currentSourcePath.parentDir / "utp_test_rpc_calls.nim")
|
||||||
|
createRpcSigs(RpcClient, currentSourcePath.parentDir /../ "" /../ "rpc" / "rpc_calls" / "rpc_discovery_calls.nim")
|
6
fluffy/tools/utp_testing/utp_test_rpc_calls.nim
Normal file
6
fluffy/tools/utp_testing/utp_test_rpc_calls.nim
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
|
||||||
|
proc utp_connect(enr: Record): SKey
|
||||||
|
proc utp_write(k: SKey, b: string): bool
|
||||||
|
proc utp_read(k: SKey, n: int): string
|
||||||
|
proc utp_get_connections(): seq[SKey]
|
||||||
|
proc utp_close(k: SKey): bool
|
Loading…
x
Reference in New Issue
Block a user