nimbus-eth1/fluffy/tools/utp_testing/utp_test_app.nim

202 lines
5.7 KiB
Nim

# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
std/[hashes, tables],
chronos, chronicles, confutils,
confutils/std/net as confNet,
stew/byteutils,
stew/shims/net,
json_rpc/servers/httpserver,
eth/p2p/discoveryv5/protocol,
eth/p2p/discoveryv5/enr,
eth/utp/[utp_discv5_protocol, utp_router],
eth/keys,
../../rpc/rpc_discovery_api
const
defaultListenAddress* = (static ValidIpAddress.init("127.0.0.1"))
type AppConf* = object
rpcPort* {.
defaultValue: 7041
desc: "Json rpc port"
name: "rpc-port" .}: Port
udpPort* {.
defaultValue: 7042
desc: "UDP listening port"
name: "udp-port" .}: Port
udpListenAddress* {.
defaultValue: defaultListenAddress
desc: "UDP listening address"
name: "udp-listen-address" .}: ValidIpAddress
rpcListenAddress* {.
defaultValue: defaultListenAddress
desc: "RPC listening address"
name: "rpc-listen-address" .}: ValidIpAddress
proc `%`*(value: enr.Record): JsonNode =
newJString(value.toURI())
proc fromJson*(n: JsonNode, argName: string, result: var Record)
{.raises: [Defect, ValueError].} =
n.kind.expect(JString, argName)
echo "ENr looks " & n.getStr()
if not fromURI(result, n.getStr()):
raise newException(ValueError, "Invalid ENR")
type SKey = object
id: uint16
nodeId: NodeId
proc `%`*(value: SKey): JsonNode =
let hex = value.nodeId.toBytesBE().toHex()
let numId = value.id.toBytesBE().toHex()
let finalStr = hex & numId
newJString(finalStr)
proc fromJson*(n: JsonNode, argName: string, result: var SKey)
{.raises: [Defect, ValueError].} =
n.kind.expect(JString, argName)
let str = n.getStr()
let strLen = len(str)
if (strLen >= 64):
let nodeIdStr = str.substr(0, 63)
let connIdStr = str.substr(64)
let nodeId = NodeId.fromHex(nodeIdStr)
let connId = uint16.fromBytesBE(connIdStr.hexToSeqByte())
result = SKey(nodeId: nodeId, id: connId)
else:
raise newException(ValueError, "Too short string")
proc hash(x: SKey): Hash =
var h = 0
h = h !& x.id.hash
h = h !& x.nodeId.hash
!$h
func toSKey(k: UtpSocketKey[NodeAddress]): SKey =
SKey(id: k.rcvId, nodeId: k.remoteAddress.nodeId)
proc installUtpHandlers(
srv: RpcHttpServer,
d: protocol.Protocol,
s: UtpDiscv5Protocol,
t: ref Table[SKey, UtpSocket[NodeAddress]]) {.raises: [Defect, CatchableError].} =
srv.rpc("utp_connect") do(r: enr.Record) -> SKey:
let
nodeRes = newNode(r)
if nodeRes.isOk():
let node = nodeRes.get()
let nodeAddress = NodeAddress.init(node).unsafeGet()
discard d.addNode(node)
let connResult = await s.connectTo(nodeAddress)
if (connresult.isOk()):
let socket = connresult.get()
let sKey = socket.socketKey.toSKey()
t[sKey] = socket
return sKey
else:
raise newException(ValueError, "Connection to node Failed.")
else:
raise newException(ValueError, "Bad enr")
srv.rpc("utp_write") do(k: SKey, b: string) -> bool:
let sock = t.getOrDefault(k)
let bytes = hexToSeqByte(b)
if sock != nil:
# TODO consider doing it async to avoid json-rpc timeouts in case of large writes
let res = await sock.write(bytes)
if res.isOk():
return true
else:
# TODO return correct errors instead of just true/false
return false
else:
raise newException(ValueError, "Socket with provided key is missing")
srv.rpc("utp_get_connections") do() -> seq[SKey]:
var keys = newSeq[SKey]()
for k in t.keys:
keys.add(k)
return keys
srv.rpc("utp_read") do(k: SKey, n: int) -> string:
let sock = t.getOrDefault(k)
if sock != nil:
let res = await sock.read(n)
let asHex = res.toHex()
return asHex
else:
raise newException(ValueError, "Socket with provided key is missing")
srv.rpc("utp_close") do(k: SKey) -> bool:
let sock = t.getOrDefault(k)
if sock != nil:
await sock.closeWait()
return true
else:
raise newException(ValueError, "Socket with provided key is missing")
proc buildAcceptConnection(t: ref Table[SKey, UtpSocket[NodeAddress]]): AcceptConnectionCallback[NodeAddress] =
return (
proc (server: UtpRouter[NodeAddress], client: UtpSocket[NodeAddress]): Future[void] =
let fut = newFuture[void]()
let key = client.socketKey.toSKey()
t[key] = client
fut.complete()
return fut
)
when isMainModule:
var table = newTable[SKey, UtpSocket[NodeAddress]]()
let rng = newRng()
{.pop.}
let conf = AppConf.load()
{.push raises: [Defect].}
let
protName = "test-utp".toBytes()
la = initTAddress(conf.rpcListenAddress, conf.rpcPort)
key = PrivateKey.random(rng[])
discAddress = conf.udpListenAddress
srv = newRpcHttpServer(@[la], RpcRouter.init())
srv.start()
let d = newProtocol(
key,
some(discAddress), none(Port), some(conf.udpPort),
bootstrapRecords = @[],
bindIp = discAddress, bindPort = conf.udpPort,
enrAutoUpdate = true,
rng = rng)
d.open()
let
cfg = SocketConfig.init(incomingSocketReceiveTimeout = none[Duration]())
utp = UtpDiscv5Protocol.new(d, protName, buildAcceptConnection(table), socketConfig = cfg)
# needed for some of the discovery api: nodeInfo, setEnr, ping
srv.installDiscoveryApiHandlers(d)
srv.installUtpHandlers(d, utp, table)
runForever()