Add custom talkreq implementation (#464)

* Add custom talkreq implementation to utp over discv5
This commit is contained in:
KonradStaniec 2022-01-24 11:58:35 +01:00 committed by GitHub
parent 4e2b340af6
commit 9a7b1afe9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 78 additions and 63 deletions

View File

@ -224,7 +224,7 @@ proc updateRecord*(
# TODO: Would it make sense to actively ping ("broadcast") to all the peers # TODO: Would it make sense to actively ping ("broadcast") to all the peers
# we stored a handshake with in order to get that ENR updated? # we stored a handshake with in order to get that ENR updated?
proc send(d: Protocol, a: Address, data: seq[byte]) = proc send*(d: Protocol, a: Address, data: seq[byte]) =
let ta = initTAddress(a.ip, a.port) let ta = initTAddress(a.ip, a.port)
let f = d.transp.sendTo(ta, data) let f = d.transp.sendTo(ta, data)
f.callback = proc(data: pointer) {.gcsafe.} = f.callback = proc(data: pointer) {.gcsafe.} =

View File

@ -7,9 +7,9 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import import
std/[hashes], std/[hashes, sugar],
chronos, chronicles, chronos, chronicles,
../p2p/discoveryv5/protocol, ../p2p/discoveryv5/[protocol, messages, encoding],
./utp_router, ./utp_router,
../keys ../keys
@ -18,64 +18,82 @@ export utp_router, protocol, chronicles
logScope: logScope:
topics = "utp_discv5_protocol" topics = "utp_discv5_protocol"
type UtpDiscv5Protocol* = ref object of TalkProtocol type
prot: protocol.Protocol NodeAddress* = object
router: UtpRouter[Node] nodeId*: NodeId
address*: Address
proc hash(x: UtpSocketKey[Node]): Hash = UtpDiscv5Protocol* = ref object of TalkProtocol
prot: protocol.Protocol
router: UtpRouter[NodeAddress]
proc init*(T: type NodeAddress, nodeId: NodeId, address: Address): NodeAddress =
NodeAddress(nodeId: nodeId, address: address)
proc init*(T: type NodeAddress, node: Node): Option[NodeAddress] =
node.address.map((address: Address) => NodeAddress(nodeId: node.id, address: address))
proc hash(x: NodeAddress): Hash =
var h = 0
h = h !& x.nodeId.hash
h = h !& x.address.hash
!$h
proc hash(x: UtpSocketKey[NodeAddress]): Hash =
var h = 0 var h = 0
h = h !& x.remoteAddress.hash h = h !& x.remoteAddress.hash
h = h !& x.rcvId.hash h = h !& x.rcvId.hash
!$h !$h
func `$`*(x: UtpSocketKey[Node]): string = func `$`*(x: UtpSocketKey[NodeAddress]): string =
"(remoteId: " & $x.remoteAddress.id & "(remoteId: " & $x.remoteAddress.nodeId &
", remoteAddress: " & $x.remoteAddress.address & ", remoteAddress: " & $x.remoteAddress.address &
", rcvId: "& $x.rcvId & ", rcvId: "& $x.rcvId &
")" ")"
proc talkReqDirect(p: protocol.Protocol, n: NodeAddress, protocol, request: seq[byte]): Future[void] =
let
reqId = RequestId.init(p.rng[])
message = encodeMessage(TalkReqMessage(protocol: protocol, request: request), reqId)
(data, nonce) = encodeMessagePacket(p.rng[], p.codec, n.nodeId, n.address, message)
trace "Send message packet", dstId = n.nodeId, address = n.address, kind = MessageKind.talkreq
p.send(n.address, data)
proc initSendCallback( proc initSendCallback(
t: protocol.Protocol, subProtocolName: seq[byte]): SendCallback[Node] = t: protocol.Protocol, subProtocolName: seq[byte]): SendCallback[NodeAddress] =
return ( return (
proc (to: Node, data: seq[byte]): Future[void] = proc (to: NodeAddress, data: seq[byte]): Future[void] =
let fut = newFuture[void]() let fut = newFuture[void]()
# TODO: In discovery v5 each talkreq waits for a talkresp, but here we # hidden assumption here is that nodes already have established discv5 session
# would really like the fire and forget semantics (similar to udp). # between each other. In our use case this should be true as openning stream
# For now start talkreq/talkresp in background, and discard its result. # is only done after succesful OFFER/ACCEPT or FINDCONTENT/CONTENT exchange
# That way we also lose information about any possible errors. # which forces nodes to establish session between each other.
# Consider adding talkreq proc which does not wait for the response. discard t.talkReqDirect(to, subProtocolName, data)
discard t.talkreq(to, subProtocolName, data)
fut.complete() fut.complete()
return fut return fut
) )
proc messageHandler(protocol: TalkProtocol, request: seq[byte], proc messageHandler(protocol: TalkProtocol, request: seq[byte],
srcId: NodeId, srcUdpAddress: Address): seq[byte] = srcId: NodeId, srcUdpAddress: Address): seq[byte] =
let p = UtpDiscv5Protocol(protocol) let
let maybeSender = p.prot.getNode(srcId) p = UtpDiscv5Protocol(protocol)
nodeAddress = NodeAddress.init(srcId, srcUdpAddress)
if maybeSender.isSome(): debug "Received utp payload from known node. Start processing",
debug "Received utp payload from known node. Start processing" nodeId = nodeAddress.nodeId, address = nodeAddress.address
let sender = maybeSender.unsafeGet() asyncSpawn p.router.processIncomingBytes(request, nodeAddress)
# processIncomingBytes may respond to remote by using talkreq requests
asyncSpawn p.router.processIncomingBytes(request, sender)
# We always send empty responses as discv5 spec requires that talkreq
# always receives a talkresp.
@[]
else:
debug "Received utp payload from unknown node. Ignore"
@[]
proc new*( proc new*(
T: type UtpDiscv5Protocol, T: type UtpDiscv5Protocol,
p: protocol.Protocol, p: protocol.Protocol,
subProtocolName: seq[byte], subProtocolName: seq[byte],
acceptConnectionCb: AcceptConnectionCallback[Node], acceptConnectionCb: AcceptConnectionCallback[NodeAddress],
allowConnectionCb: AllowConnectionCallback[Node] = nil, allowConnectionCb: AllowConnectionCallback[NodeAddress] = nil,
socketConfig: SocketConfig = SocketConfig.init()): UtpDiscv5Protocol = socketConfig: SocketConfig = SocketConfig.init()): UtpDiscv5Protocol =
doAssert(not(isNil(acceptConnectionCb))) doAssert(not(isNil(acceptConnectionCb)))
let router = UtpRouter[Node].new( let router = UtpRouter[NodeAddress].new(
acceptConnectionCb, acceptConnectionCb,
allowConnectionCb, allowConnectionCb,
socketConfig, socketConfig,
@ -94,12 +112,12 @@ proc new*(
) )
prot prot
proc connectTo*(r: UtpDiscv5Protocol, address: Node): proc connectTo*(r: UtpDiscv5Protocol, address: NodeAddress):
Future[ConnectionResult[Node]] = Future[ConnectionResult[NodeAddress]] =
return r.router.connectTo(address) return r.router.connectTo(address)
proc connectTo*(r: UtpDiscv5Protocol, address: Node, connectionId: uint16): proc connectTo*(r: UtpDiscv5Protocol, address: NodeAddress, connectionId: uint16):
Future[ConnectionResult[Node]] = Future[ConnectionResult[NodeAddress]] =
return r.router.connectTo(address, connectionId) return r.router.connectTo(address, connectionId)
proc shutdown*(r: UtpDiscv5Protocol) = proc shutdown*(r: UtpDiscv5Protocol) =

View File

@ -7,6 +7,7 @@
{.used.} {.used.}
import import
std/options,
chronos, bearssl, chronos, bearssl,
stew/shims/net, stew/byteutils, stew/shims/net, stew/byteutils,
testutils/unittests, testutils/unittests,
@ -48,15 +49,15 @@ procSuite "Utp protocol over discovery v5 tests":
let rng = newRng() let rng = newRng()
let utpProtId = "test-utp".toBytes() let utpProtId = "test-utp".toBytes()
proc registerIncomingSocketCallback(serverSockets: AsyncQueue): AcceptConnectionCallback[Node] = proc registerIncomingSocketCallback(serverSockets: AsyncQueue): AcceptConnectionCallback[NodeAddress] =
return ( return (
proc(server: UtpRouter[Node], client: UtpSocket[Node]): Future[void] = proc(server: UtpRouter[NodeAddress], client: UtpSocket[NodeAddress]): Future[void] =
serverSockets.addLast(client) serverSockets.addLast(client)
) )
proc allowOneIdCallback(allowedId: uint16): AllowConnectionCallback[Node] = proc allowOneIdCallback(allowedId: uint16): AllowConnectionCallback[NodeAddress] =
return ( return (
proc(r: UtpRouter[Node], remoteAddress: Node, connectionId: uint16): bool = proc(r: UtpRouter[NodeAddress], remoteAddress: NodeAddress, connectionId: uint16): bool =
connectionId == allowedId connectionId == allowedId
) )
@ -64,7 +65,7 @@ procSuite "Utp protocol over discovery v5 tests":
# from standard utp case # from standard utp case
asyncTest "Success connect to remote host": asyncTest "Success connect to remote host":
let let
queue = newAsyncQueue[UtpSocket[Node]]() queue = newAsyncQueue[UtpSocket[NodeAddress]]()
node1 = initDiscoveryNode( node1 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20302)) rng, PrivateKey.random(rng[]), localAddress(20302))
node2 = initDiscoveryNode( node2 = initDiscoveryNode(
@ -73,12 +74,11 @@ procSuite "Utp protocol over discovery v5 tests":
utp1 = UtpDiscv5Protocol.new(node1, utpProtId, registerIncomingSocketCallback(queue)) utp1 = UtpDiscv5Protocol.new(node1, utpProtId, registerIncomingSocketCallback(queue))
utp2 = UtpDiscv5Protocol.new(node2, utpProtId, registerIncomingSocketCallback(queue)) utp2 = UtpDiscv5Protocol.new(node2, utpProtId, registerIncomingSocketCallback(queue))
# nodes must know about each other # nodes must have session between each other
check: check:
node1.addNode(node2.localNode) (await node1.ping(node2.localNode)).isOk()
node2.addNode(node1.localNode)
let clientSocketResult = await utp1.connectTo(node2.localNode) let clientSocketResult = await utp1.connectTo(NodeAddress.init(node2.localNode).unsafeGet())
let clientSocket = clientSocketResult.get() let clientSocket = clientSocketResult.get()
let serverSocket = await queue.get() let serverSocket = await queue.get()
@ -95,7 +95,7 @@ procSuite "Utp protocol over discovery v5 tests":
asyncTest "Success write data over packet size to remote host": asyncTest "Success write data over packet size to remote host":
let let
queue = newAsyncQueue[UtpSocket[Node]]() queue = newAsyncQueue[UtpSocket[NodeAddress]]()
node1 = initDiscoveryNode( node1 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20302)) rng, PrivateKey.random(rng[]), localAddress(20302))
node2 = initDiscoveryNode( node2 = initDiscoveryNode(
@ -104,13 +104,12 @@ procSuite "Utp protocol over discovery v5 tests":
utp1 = UtpDiscv5Protocol.new(node1, utpProtId, registerIncomingSocketCallback(queue)) utp1 = UtpDiscv5Protocol.new(node1, utpProtId, registerIncomingSocketCallback(queue))
utp2 = UtpDiscv5Protocol.new(node2, utpProtId, registerIncomingSocketCallback(queue)) utp2 = UtpDiscv5Protocol.new(node2, utpProtId, registerIncomingSocketCallback(queue))
# nodes must know about each other # nodes must have session between each other
check: check:
node1.addNode(node2.localNode) (await node1.ping(node2.localNode)).isOk()
node2.addNode(node1.localNode)
let numOfBytes = 5000 let numOfBytes = 5000
let clientSocketResult = await utp1.connectTo(node2.localNode) let clientSocketResult = await utp1.connectTo(NodeAddress.init(node2.localNode).unsafeGet())
let clientSocket = clientSocketResult.get() let clientSocket = clientSocketResult.get()
let serverSocket = await queue.get() let serverSocket = await queue.get()
@ -135,7 +134,7 @@ procSuite "Utp protocol over discovery v5 tests":
let let
allowedId: uint16 = 10 allowedId: uint16 = 10
lowSynTimeout = milliseconds(500) lowSynTimeout = milliseconds(500)
queue = newAsyncQueue[UtpSocket[Node]]() queue = newAsyncQueue[UtpSocket[NodeAddress]]()
node1 = initDiscoveryNode( node1 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20302)) rng, PrivateKey.random(rng[]), localAddress(20302))
node2 = initDiscoveryNode( node2 = initDiscoveryNode(
@ -154,13 +153,12 @@ procSuite "Utp protocol over discovery v5 tests":
allowOneIdCallback(allowedId), allowOneIdCallback(allowedId),
SocketConfig.init()) SocketConfig.init())
# nodes must know about each other # nodes must have session between each other
check: check:
node1.addNode(node2.localNode) (await node1.ping(node2.localNode)).isOk()
node2.addNode(node1.localNode)
let clientSocketResult1 = await utp1.connectTo(node2.localNode, allowedId) let clientSocketResult1 = await utp1.connectTo(NodeAddress.init(node2.localNode).unsafeGet(), allowedId)
let clientSocketResult2 = await utp1.connectTo(node2.localNode, allowedId + 1) let clientSocketResult2 = await utp1.connectTo(NodeAddress.init(node2.localNode).unsafeGet(), allowedId + 1)
check: check:
clientSocketResult1.isOk() clientSocketResult1.isOk()
@ -180,7 +178,7 @@ procSuite "Utp protocol over discovery v5 tests":
asyncTest "Configure incoming connections to be in connected state": asyncTest "Configure incoming connections to be in connected state":
let let
queue = newAsyncQueue[UtpSocket[Node]]() queue = newAsyncQueue[UtpSocket[NodeAddress]]()
node1 = initDiscoveryNode( node1 = initDiscoveryNode(
rng, PrivateKey.random(rng[]), localAddress(20302)) rng, PrivateKey.random(rng[]), localAddress(20302))
node2 = initDiscoveryNode( node2 = initDiscoveryNode(
@ -194,12 +192,11 @@ procSuite "Utp protocol over discovery v5 tests":
socketConfig = SocketConfig.init(incomingSocketReceiveTimeout = none[Duration]()) socketConfig = SocketConfig.init(incomingSocketReceiveTimeout = none[Duration]())
) )
# nodes must know about each other # nodes must have session between each other
check: check:
node1.addNode(node2.localNode) (await node1.ping(node2.localNode)).isOk()
node2.addNode(node1.localNode)
let clientSocketResult = await utp1.connectTo(node2.localNode) let clientSocketResult = await utp1.connectTo(NodeAddress.init(node2.localNode).unsafeGet())
let clientSocket = clientSocketResult.get() let clientSocket = clientSocketResult.get()
let serverSocket = await queue.get() let serverSocket = await queue.get()