mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-01-12 21:34:33 +00:00
parent
a20917a547
commit
20068de838
@ -6,7 +6,7 @@
|
|||||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, sequtils, sugar],
|
std/[options, sequtils, sugar, strutils],
|
||||||
unittest2, testutils, chronos,
|
unittest2, testutils, chronos,
|
||||||
json_rpc/rpcclient, stew/byteutils,
|
json_rpc/rpcclient, stew/byteutils,
|
||||||
eth/keys,
|
eth/keys,
|
||||||
@ -36,12 +36,12 @@ procSuite "Utp integration tests":
|
|||||||
let serverContainerAddress = "127.0.0.1"
|
let serverContainerAddress = "127.0.0.1"
|
||||||
let serverContainerPort = Port(9041)
|
let serverContainerPort = Port(9041)
|
||||||
|
|
||||||
type FutureCallback[A] = proc (): Future[A] {.gcsafe, raises: [Defect].}
|
type
|
||||||
|
FutureCallback[A] = proc (): Future[A] {.gcsafe, raises: [Defect].}
|
||||||
# combinator which repeatadly calls passed closure until returned future is
|
# combinator which repeatadly calls passed closure until returned future is
|
||||||
# successfull
|
# successfull
|
||||||
# TODO: currently works only for non void types
|
# TODO: currently works only for non void types
|
||||||
proc repeatTillSuccess[A](f: FutureCallback[A], maxTries: int = 10): Future[A] {.async.} =
|
proc repeatTillSuccess[A](f: FutureCallback[A], maxTries: int = 20): Future[A] {.async.} =
|
||||||
var i = 0
|
var i = 0
|
||||||
while true:
|
while true:
|
||||||
try:
|
try:
|
||||||
@ -69,11 +69,9 @@ procSuite "Utp integration tests":
|
|||||||
else:
|
else:
|
||||||
some[Skey](conns[0])
|
some[Skey](conns[0])
|
||||||
|
|
||||||
# TODO add more scenarios
|
proc setupTest(): Future[(RpcHttpClient, NodeInfo, RpcHttpClient, NodeInfo)] {.async.} =
|
||||||
asyncTest "Transfer 100k bytes of data over utp stream":
|
|
||||||
let client = newRpcHttpClient()
|
let client = newRpcHttpClient()
|
||||||
let server = newRpcHttpClient()
|
let server = newRpcHttpClient()
|
||||||
let numOfBytes = 100000
|
|
||||||
|
|
||||||
await client.connect(clientContainerAddress, clientContainerPort, false)
|
await client.connect(clientContainerAddress, clientContainerPort, false)
|
||||||
await server.connect(serverContainerAddress, serverContainerPort, false)
|
await server.connect(serverContainerAddress, serverContainerPort, false)
|
||||||
@ -85,7 +83,12 @@ procSuite "Utp integration tests":
|
|||||||
|
|
||||||
# nodes need to have established session before the utp try
|
# nodes need to have established session before the utp try
|
||||||
discard await repeatTillSuccess(() => client.discv5_ping(serverInfo.nodeEnr))
|
discard await repeatTillSuccess(() => client.discv5_ping(serverInfo.nodeEnr))
|
||||||
|
|
||||||
|
return (client, clientInfo, server, serverInfo)
|
||||||
|
|
||||||
|
asyncTest "Transfer 100k bytes of data over utp stream from client to server":
|
||||||
|
let (client, clientInfo, server, serverInfo) = await setupTest()
|
||||||
|
let numOfBytes = 100000
|
||||||
let
|
let
|
||||||
clientConnectionKey = await repeatTillSuccess(() => client.utp_connect(serverInfo.nodeEnr))
|
clientConnectionKey = await repeatTillSuccess(() => client.utp_connect(serverInfo.nodeEnr))
|
||||||
serverConnections = await repeatTillSuccess(() => server.utp_get_connections())
|
serverConnections = await repeatTillSuccess(() => server.utp_get_connections())
|
||||||
@ -104,3 +107,103 @@ procSuite "Utp integration tests":
|
|||||||
check:
|
check:
|
||||||
writeRes == true
|
writeRes == true
|
||||||
readData == bytesToWrite
|
readData == bytesToWrite
|
||||||
|
|
||||||
|
asyncTest "Transfer 100k bytes of data over utp stream from server to client":
|
||||||
|
# In classic uTP this would not be possible, as when uTP works over udp
|
||||||
|
# client needs to transfer first, but when working over discv5 it should be possible
|
||||||
|
# to transfer data from server to client from the start
|
||||||
|
let (client, clientInfo, server, serverInfo) = await setupTest()
|
||||||
|
let numOfBytes = 100000
|
||||||
|
let
|
||||||
|
clientConnectionKey = await repeatTillSuccess(() => client.utp_connect(serverInfo.nodeEnr))
|
||||||
|
serverConnections = await repeatTillSuccess(() => server.utp_get_connections())
|
||||||
|
maybeServerConnectionKey = serverConnections.findServerConnection(clientInfo.nodeId, clientConnectionKey.id)
|
||||||
|
|
||||||
|
check:
|
||||||
|
maybeServerConnectionKey.isSome()
|
||||||
|
|
||||||
|
let serverConnectionKey = maybeServerConnectionKey.unsafeGet()
|
||||||
|
|
||||||
|
let
|
||||||
|
bytesToWrite = generateByteSeqHex(rng[], numOfBytes)
|
||||||
|
writeRes = await server.utp_write(serverConnectionKey, bytesToWrite)
|
||||||
|
readData = await client.utp_read(clientConnectionKey, numOfBytes)
|
||||||
|
|
||||||
|
check:
|
||||||
|
writeRes == true
|
||||||
|
readData == bytesToWrite
|
||||||
|
|
||||||
|
asyncTest "Multiple 10k bytes transfers over utp stream":
|
||||||
|
let (client, clientInfo, server, serverInfo) = await setupTest()
|
||||||
|
let numOfBytes = 10000
|
||||||
|
let
|
||||||
|
clientConnectionKey = await repeatTillSuccess(() => client.utp_connect(serverInfo.nodeEnr))
|
||||||
|
serverConnections = await repeatTillSuccess(() => server.utp_get_connections())
|
||||||
|
maybeServerConnectionKey = serverConnections.findServerConnection(clientInfo.nodeId, clientConnectionKey.id)
|
||||||
|
|
||||||
|
check:
|
||||||
|
maybeServerConnectionKey.isSome()
|
||||||
|
|
||||||
|
let serverConnectionKey = maybeServerConnectionKey.unsafeGet()
|
||||||
|
|
||||||
|
let
|
||||||
|
bytesToWrite = generateByteSeqHex(rng[], numOfBytes)
|
||||||
|
bytesToWrite1 = generateByteSeqHex(rng[], numOfBytes)
|
||||||
|
bytesToWrite2 = generateByteSeqHex(rng[], numOfBytes)
|
||||||
|
writeRes = await client.utp_write(clientConnectionKey, bytesToWrite)
|
||||||
|
writeRes1 = await client.utp_write(clientConnectionKey, bytesToWrite1)
|
||||||
|
writeRes2 = await client.utp_write(clientConnectionKey, bytesToWrite2)
|
||||||
|
readData = await server.utp_read(serverConnectionKey, numOfBytes * 3)
|
||||||
|
|
||||||
|
let writtenData = join(@[bytesToWrite, bytesToWrite1, bytesToWrite2])
|
||||||
|
|
||||||
|
check:
|
||||||
|
writeRes == true
|
||||||
|
writeRes1 == true
|
||||||
|
writeRes2 == true
|
||||||
|
readData == writtenData
|
||||||
|
|
||||||
|
asyncTest "Handle mulitplie sockets over one utp server instance ":
|
||||||
|
let (client, clientInfo, server, serverInfo) = await setupTest()
|
||||||
|
let numOfBytes = 10000
|
||||||
|
let
|
||||||
|
clientConnectionKey1 = await repeatTillSuccess(() => client.utp_connect(serverInfo.nodeEnr))
|
||||||
|
clientConnectionKey2 = await repeatTillSuccess(() => client.utp_connect(serverInfo.nodeEnr))
|
||||||
|
clientConnectionKey3 = await repeatTillSuccess(() => client.utp_connect(serverInfo.nodeEnr))
|
||||||
|
serverConnections = await repeatTillSuccess(() => server.utp_get_connections())
|
||||||
|
|
||||||
|
maybeServerConnectionKey1 = serverConnections.findServerConnection(clientInfo.nodeId, clientConnectionKey1.id)
|
||||||
|
maybeServerConnectionKey2 = serverConnections.findServerConnection(clientInfo.nodeId, clientConnectionKey2.id)
|
||||||
|
maybeServerConnectionKey3 = serverConnections.findServerConnection(clientInfo.nodeId, clientConnectionKey3.id)
|
||||||
|
|
||||||
|
check:
|
||||||
|
maybeServerConnectionKey1.isSome()
|
||||||
|
maybeServerConnectionKey2.isSome()
|
||||||
|
maybeServerConnectionKey3.isSome()
|
||||||
|
|
||||||
|
let serverConnectionKey1 = maybeServerConnectionKey1.unsafeGet()
|
||||||
|
let serverConnectionKey2 = maybeServerConnectionKey2.unsafeGet()
|
||||||
|
let serverConnectionKey3 = maybeServerConnectionKey3.unsafeGet()
|
||||||
|
|
||||||
|
let
|
||||||
|
bytesToWrite1 = generateByteSeqHex(rng[], numOfBytes)
|
||||||
|
bytesToWrite2 = generateByteSeqHex(rng[], numOfBytes)
|
||||||
|
bytesToWrite3 = generateByteSeqHex(rng[], numOfBytes)
|
||||||
|
|
||||||
|
writeRes1 = await client.utp_write(clientConnectionKey1, bytesToWrite1)
|
||||||
|
writeRes2 = await client.utp_write(clientConnectionKey2, bytesToWrite2)
|
||||||
|
writeRes3 = await client.utp_write(clientConnectionKey3, bytesToWrite3)
|
||||||
|
|
||||||
|
readData1 = await server.utp_read(serverConnectionKey1, numOfBytes)
|
||||||
|
readData2 = await server.utp_read(serverConnectionKey2, numOfBytes)
|
||||||
|
readData3 = await server.utp_read(serverConnectionKey3, numOfBytes)
|
||||||
|
|
||||||
|
check:
|
||||||
|
writeRes1 == true
|
||||||
|
writeRes2 == true
|
||||||
|
writeRes3 == true
|
||||||
|
|
||||||
|
# all data was delivered to correct sockets
|
||||||
|
readData1 == bytesToWrite1
|
||||||
|
readData2 == bytesToWrite2
|
||||||
|
readData3 == bytesToWrite3
|
||||||
|
Loading…
x
Reference in New Issue
Block a user