Add user data to utp (#525)

This commit is contained in:
KonradStaniec 2022-08-12 15:09:59 +02:00 committed by GitHub
parent 883825aad7
commit 2556b090ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 153 additions and 14 deletions

View File

@ -89,6 +89,7 @@ proc new*(
p: protocol.Protocol, p: protocol.Protocol,
subProtocolName: seq[byte], subProtocolName: seq[byte],
acceptConnectionCb: AcceptConnectionCallback[NodeAddress], acceptConnectionCb: AcceptConnectionCallback[NodeAddress],
udata: pointer = nil,
allowConnectionCb: AllowConnectionCallback[NodeAddress] = nil, allowConnectionCb: AllowConnectionCallback[NodeAddress] = nil,
socketConfig: SocketConfig = SocketConfig.init()): UtpDiscv5Protocol = socketConfig: SocketConfig = SocketConfig.init()): UtpDiscv5Protocol =
doAssert(not(isNil(acceptConnectionCb))) doAssert(not(isNil(acceptConnectionCb)))
@ -96,6 +97,7 @@ proc new*(
let router = UtpRouter[NodeAddress].new( let router = UtpRouter[NodeAddress].new(
acceptConnectionCb, acceptConnectionCb,
allowConnectionCb, allowConnectionCb,
udata,
socketConfig, socketConfig,
p.rng p.rng
) )
@ -112,6 +114,24 @@ proc new*(
) )
prot prot
proc new*(
T: type UtpDiscv5Protocol,
p: protocol.Protocol,
subProtocolName: seq[byte],
acceptConnectionCb: AcceptConnectionCallback[NodeAddress],
udata: ref,
allowConnectionCb: AllowConnectionCallback[NodeAddress] = nil,
socketConfig: SocketConfig = SocketConfig.init()): UtpDiscv5Protocol =
GC_ref(udata)
UtpDiscv5Protocol.new(
p,
subProtocolName,
acceptConnectionCb,
cast[pointer](udata),
allowConnectionCb,
socketConfig
)
proc connectTo*(r: UtpDiscv5Protocol, address: NodeAddress): proc connectTo*(r: UtpDiscv5Protocol, address: NodeAddress):
Future[ConnectionResult[NodeAddress]] = Future[ConnectionResult[NodeAddress]] =
return r.router.connectTo(address) return r.router.connectTo(address)

View File

@ -75,19 +75,21 @@ proc initSendCallback(t: DatagramTransport): SendCallback[TransportAddress] =
) )
proc new*( proc new*(
T: type UtpProtocol, T: type UtpProtocol,
acceptConnectionCb: AcceptConnectionCallback[TransportAddress], acceptConnectionCb: AcceptConnectionCallback[TransportAddress],
address: TransportAddress, address: TransportAddress,
socketConfig: SocketConfig = SocketConfig.init(), udata: pointer = nil,
allowConnectionCb: AllowConnectionCallback[TransportAddress] = nil, socketConfig: SocketConfig = SocketConfig.init(),
sendCallbackBuilder: SendCallbackBuilder = nil, allowConnectionCb: AllowConnectionCallback[TransportAddress] = nil,
rng = newRng()): UtpProtocol {.raises: [Defect, CatchableError].} = sendCallbackBuilder: SendCallbackBuilder = nil,
rng = newRng()): UtpProtocol {.raises: [Defect, CatchableError].} =
doAssert(not(isNil(acceptConnectionCb))) doAssert(not(isNil(acceptConnectionCb)))
let router = UtpRouter[TransportAddress].new( let router = UtpRouter[TransportAddress].new(
acceptConnectionCb, acceptConnectionCb,
allowConnectionCb, allowConnectionCb,
udata,
socketConfig, socketConfig,
rng rng
) )
@ -101,6 +103,26 @@ proc new*(
UtpProtocol(transport: ta, utpRouter: router) UtpProtocol(transport: ta, utpRouter: router)
proc new*(
T: type UtpProtocol,
acceptConnectionCb: AcceptConnectionCallback[TransportAddress],
address: TransportAddress,
udata: ref,
socketConfig: SocketConfig = SocketConfig.init(),
allowConnectionCb: AllowConnectionCallback[TransportAddress] = nil,
sendCallbackBuilder: SendCallbackBuilder = nil,
rng = newRng()): UtpProtocol {.raises: [Defect, CatchableError].} =
GC_ref(udata)
UtpProtocol.new(
acceptConnectionCb,
address,
cast[pointer](udata),
socketConfig,
allowConnectionCb,
sendCallbackBuilder,
rng
)
proc shutdownWait*(p: UtpProtocol): Future[void] {.async.} = proc shutdownWait*(p: UtpProtocol): Future[void] {.async.} =
## closes all managed utp sockets and then underlying transport ## closes all managed utp sockets and then underlying transport
await p.utpRouter.shutdownWait() await p.utpRouter.shutdownWait()

View File

@ -56,6 +56,7 @@ type
closed: bool closed: bool
sendCb*: SendCallback[A] sendCb*: SendCallback[A]
allowConnection*: AllowConnectionCallback[A] allowConnection*: AllowConnectionCallback[A]
udata: pointer
rng*: ref HmacDrbgContext rng*: ref HmacDrbgContext
const const
@ -114,6 +115,7 @@ proc new*[A](
T: type UtpRouter[A], T: type UtpRouter[A],
acceptConnectionCb: AcceptConnectionCallback[A], acceptConnectionCb: AcceptConnectionCallback[A],
allowConnectionCb: AllowConnectionCallback[A], allowConnectionCb: AllowConnectionCallback[A],
udata: pointer,
socketConfig: SocketConfig = SocketConfig.init(), socketConfig: SocketConfig = SocketConfig.init(),
rng = newRng()): UtpRouter[A] = rng = newRng()): UtpRouter[A] =
doAssert(not(isNil(acceptConnectionCb))) doAssert(not(isNil(acceptConnectionCb)))
@ -122,6 +124,7 @@ proc new*[A](
acceptConnection: acceptConnectionCb, acceptConnection: acceptConnectionCb,
allowConnection: allowConnectionCb, allowConnection: allowConnectionCb,
socketConfig: socketConfig, socketConfig: socketConfig,
udata: udata,
rng: rng rng: rng
) )
@ -130,7 +133,30 @@ proc new*[A](
acceptConnectionCb: AcceptConnectionCallback[A], acceptConnectionCb: AcceptConnectionCallback[A],
socketConfig: SocketConfig = SocketConfig.init(), socketConfig: SocketConfig = SocketConfig.init(),
rng = newRng()): UtpRouter[A] = rng = newRng()): UtpRouter[A] =
UtpRouter[A].new(acceptConnectionCb, nil, socketConfig, rng) UtpRouter[A].new(acceptConnectionCb, nil, nil, socketConfig, rng)
proc new*[A](
T: type UtpRouter[A],
acceptConnectionCb: AcceptConnectionCallback[A],
allowConnectionCb: AllowConnectionCallback[A],
udata: ref,
socketConfig: SocketConfig = SocketConfig.init(),
rng = newRng()): UtpRouter[A] =
doAssert(not(isNil(acceptConnectionCb)))
GC_ref(udata)
UtpRouter[A].new(acceptConnectionCb, allowConnectionCb, cast[pointer](udata), socketConfig, rng)
proc new*[A](
T: type UtpRouter[A],
acceptConnectionCb: AcceptConnectionCallback[A],
udata: ref,
socketConfig: SocketConfig = SocketConfig.init(),
rng = newRng()): UtpRouter[A] =
UtpRouter[A].new(acceptConnectionCb, nil, udata, socketConfig, rng)
proc getUserData*[A, T](router: UtpRouter[A]): T =
## Obtain user data stored in ``router`` object.
cast[T](router.udata)
# There are different possibilities on how the connection got established, need # There are different possibilities on how the connection got established, need
# to check every case. # to check every case.

View File

@ -15,6 +15,7 @@ import
../../eth/p2p/discoveryv5/protocol as discv5_protocol, ../../eth/p2p/discoveryv5/protocol as discv5_protocol,
../../eth/utp/utp_discv5_protocol, ../../eth/utp/utp_discv5_protocol,
../../eth/keys, ../../eth/keys,
../../eth/utp/utp_router as rt,
../p2p/discv5_test_helper ../p2p/discv5_test_helper
procSuite "Utp protocol over discovery v5 tests": procSuite "Utp protocol over discovery v5 tests":
@ -65,6 +66,41 @@ procSuite "Utp protocol over discovery v5 tests":
await node1.closeWait() await node1.closeWait()
await node2.closeWait() await node2.closeWait()
proc cbUserData(server: UtpRouter[NodeAddress], client: UtpSocket[NodeAddress]): Future[void] =
let queue = rt.getUserData[NodeAddress, AsyncQueue[UtpSocket[NodeAddress]]](server)
queue.addLast(client)
asyncTest "Provide user data pointer and use it in callback":
let
queue = newAsyncQueue[UtpSocket[NodeAddress]]()
node1 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20302))
node2 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20303))
# constructor which uses connection callback and user data pointer as ref
utp1 = UtpDiscv5Protocol.new(node1, utpProtId, cbUserData, queue)
utp2 = UtpDiscv5Protocol.new(node2, utpProtId, cbUserData, queue)
# nodes must have session between each other
check:
(await node1.ping(node2.localNode)).isOk()
let clientSocketResult = await utp1.connectTo(NodeAddress.init(node2.localNode).unsafeGet())
let clientSocket = clientSocketResult.get()
let serverSocket = await queue.get()
check:
clientSocket.isConnected()
# in this test we do not configure the socket to be connected just after
# accepting incoming connection
not serverSocket.isConnected()
await clientSocket.destroyWait()
await serverSocket.destroyWait()
await node1.closeWait()
await node2.closeWait()
asyncTest "Success write data over packet size to remote host": asyncTest "Success write data over packet size to remote host":
let let
queue = newAsyncQueue[UtpSocket[NodeAddress]]() queue = newAsyncQueue[UtpSocket[NodeAddress]]()
@ -122,6 +158,7 @@ procSuite "Utp protocol over discovery v5 tests":
node2, node2,
utpProtId, utpProtId,
registerIncomingSocketCallback(queue), registerIncomingSocketCallback(queue),
nil,
allowOneIdCallback(allowedId), allowOneIdCallback(allowedId),
SocketConfig.init()) SocketConfig.init())

View File

@ -11,7 +11,7 @@ import
chronos, chronos,
testutils/unittests, testutils/unittests,
./test_utils, ./test_utils,
../../eth/utp/utp_router, ../../eth/utp/utp_router as rt,
../../eth/utp/utp_protocol, ../../eth/utp/utp_protocol,
../../eth/keys ../../eth/keys
@ -147,10 +147,43 @@ procSuite "Utp protocol over udp tests":
await utpProt1.shutdownWait() await utpProt1.shutdownWait()
await utpProt2.shutdownWait() await utpProt2.shutdownWait()
proc cbUserData(server: UtpRouter[TransportAddress], client: UtpSocket[TransportAddress]): Future[void] =
let q = rt.getUserData[TransportAddress, AsyncQueue[UtpSocket[TransportAddress]]](server)
q.addLast(client)
asyncTest "Provide user data pointer and use it in callback":
let incomingConnections = newAsyncQueue[UtpSocket[TransportAddress]]()
let address = initTAddress("127.0.0.1", 9079)
let utpProt1 = UtpProtocol.new(cbUserData, address, incomingConnections)
let address1 = initTAddress("127.0.0.1", 9080)
let utpProt2 = UtpProtocol.new(cbUserData, address1, incomingConnections)
let connResult = await utpProt1.connectTo(address1)
check:
connResult.isOk()
let clientSocket = connResult.get()
# this future will be completed when we called accepted connection callback
let serverSocket = await incomingConnections.get()
check:
clientSocket.isConnected()
# after successful connection outgoing buffer should be empty as syn packet
# should be correctly acked
clientSocket.numPacketsInOutGoingBuffer() == 0
not serverSocket.isConnected()
await utpProt1.shutdownWait()
await utpProt2.shutdownWait()
asyncTest "Fail to connect to offline remote host": asyncTest "Fail to connect to offline remote host":
let server1Called = newAsyncEvent() let server1Called = newAsyncEvent()
let address = initTAddress("127.0.0.1", 9079) let address = initTAddress("127.0.0.1", 9079)
let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address , SocketConfig.init(milliseconds(200))) let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address , nil, SocketConfig.init(milliseconds(200)))
let address1 = initTAddress("127.0.0.1", 9080) let address1 = initTAddress("127.0.0.1", 9080)
@ -174,7 +207,7 @@ procSuite "Utp protocol over udp tests":
asyncTest "Success connect to remote host which initialy was offline": asyncTest "Success connect to remote host which initialy was offline":
let server1Called = newAsyncEvent() let server1Called = newAsyncEvent()
let address = initTAddress("127.0.0.1", 9079) let address = initTAddress("127.0.0.1", 9079)
let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address, SocketConfig.init(milliseconds(500))) let utpProt1 = UtpProtocol.new(setAcceptedCallback(server1Called), address, nil, SocketConfig.init(milliseconds(500)))
let address1 = initTAddress("127.0.0.1", 9080) let address1 = initTAddress("127.0.0.1", 9080)
@ -387,17 +420,18 @@ procSuite "Utp protocol over udp tests":
var server1Called = newAsyncEvent() var server1Called = newAsyncEvent()
let address1 = initTAddress("127.0.0.1", 9079) let address1 = initTAddress("127.0.0.1", 9079)
let utpProt1 = let utpProt1 =
UtpProtocol.new(setAcceptedCallback(server1Called), address1, SocketConfig.init(lowSynTimeout)) UtpProtocol.new(setAcceptedCallback(server1Called), address1, nil, SocketConfig.init(lowSynTimeout))
let address2 = initTAddress("127.0.0.1", 9080) let address2 = initTAddress("127.0.0.1", 9080)
let utpProt2 = let utpProt2 =
UtpProtocol.new(registerIncomingSocketCallback(serverSockets), address2, SocketConfig.init(lowSynTimeout)) UtpProtocol.new(registerIncomingSocketCallback(serverSockets), address2, nil, SocketConfig.init(lowSynTimeout))
let address3 = initTAddress("127.0.0.1", 9081) let address3 = initTAddress("127.0.0.1", 9081)
let utpProt3 = let utpProt3 =
UtpProtocol.new( UtpProtocol.new(
registerIncomingSocketCallback(serverSockets), registerIncomingSocketCallback(serverSockets),
address3, address3,
nil,
SocketConfig.init(), SocketConfig.init(),
allowOneIdCallback(allowedId) allowOneIdCallback(allowedId)
) )

View File

@ -87,7 +87,7 @@ procSuite "Utp router unit tests":
asyncTest "Router should create new incoming socket when receiving not known syn packet": asyncTest "Router should create new incoming socket when receiving not known syn packet":
let q = newAsyncQueue[UtpSocket[int]]() let q = newAsyncQueue[UtpSocket[int]]()
let router = UtpRouter[int].new(registerIncomingSocketCallback(q), SocketConfig.init(), rng) let router = UtpRouter[int].new(registerIncomingSocketCallback(q), nil, nil, SocketConfig.init(), rng)
router.sendCb = testSend router.sendCb = testSend
let encodedSyn = encodePacket(synPacket(10, 10, 10)) let encodedSyn = encodePacket(synPacket(10, 10, 10))