mirror of
https://github.com/status-im/nim-libp2p-dht.git
synced 2025-02-23 17:48:21 +00:00
add "traditional" targetID based findNode message
This patch adds a findNode message sending the actual target ID as in traditional Kademlia lookup. This in contrast to the actual findNode message that send information about the distance only, leading to more secure but slower lookups. Having both primitives allows us to select which to use per use case. Current naming is findNode for the distance based message and findNodeFast for the message added in this patch.
This commit is contained in:
parent
6408cf4e3f
commit
01488104e5
@ -16,6 +16,7 @@ import
|
|||||||
std/[hashes, net],
|
std/[hashes, net],
|
||||||
eth/[keys],
|
eth/[keys],
|
||||||
./spr,
|
./spr,
|
||||||
|
./node,
|
||||||
../../../../dht/providers_messages
|
../../../../dht/providers_messages
|
||||||
|
|
||||||
export providers_messages
|
export providers_messages
|
||||||
@ -40,6 +41,7 @@ type
|
|||||||
addProvider = 0x0B
|
addProvider = 0x0B
|
||||||
getProviders = 0x0C
|
getProviders = 0x0C
|
||||||
providers = 0x0D
|
providers = 0x0D
|
||||||
|
findNodeFast = 0x83
|
||||||
|
|
||||||
RequestId* = object
|
RequestId* = object
|
||||||
id*: seq[byte]
|
id*: seq[byte]
|
||||||
@ -55,6 +57,9 @@ type
|
|||||||
FindNodeMessage* = object
|
FindNodeMessage* = object
|
||||||
distances*: seq[uint16]
|
distances*: seq[uint16]
|
||||||
|
|
||||||
|
FindNodeFastMessage* = object
|
||||||
|
target*: NodeId
|
||||||
|
|
||||||
NodesMessage* = object
|
NodesMessage* = object
|
||||||
total*: uint32
|
total*: uint32
|
||||||
sprs*: seq[SignedPeerRecord]
|
sprs*: seq[SignedPeerRecord]
|
||||||
@ -74,7 +79,7 @@ type
|
|||||||
|
|
||||||
SomeMessage* = PingMessage or PongMessage or FindNodeMessage or NodesMessage or
|
SomeMessage* = PingMessage or PongMessage or FindNodeMessage or NodesMessage or
|
||||||
TalkReqMessage or TalkRespMessage or AddProviderMessage or GetProvidersMessage or
|
TalkReqMessage or TalkRespMessage or AddProviderMessage or GetProvidersMessage or
|
||||||
ProvidersMessage
|
ProvidersMessage or FindNodeFastMessage
|
||||||
|
|
||||||
Message* = object
|
Message* = object
|
||||||
reqId*: RequestId
|
reqId*: RequestId
|
||||||
@ -85,6 +90,8 @@ type
|
|||||||
pong*: PongMessage
|
pong*: PongMessage
|
||||||
of findNode:
|
of findNode:
|
||||||
findNode*: FindNodeMessage
|
findNode*: FindNodeMessage
|
||||||
|
of findNodeFast:
|
||||||
|
findNodeFast*: FindNodeFastMessage
|
||||||
of nodes:
|
of nodes:
|
||||||
nodes*: NodesMessage
|
nodes*: NodesMessage
|
||||||
of talkReq:
|
of talkReq:
|
||||||
@ -112,6 +119,7 @@ template messageKind*(T: typedesc[SomeMessage]): MessageKind =
|
|||||||
when T is PingMessage: ping
|
when T is PingMessage: ping
|
||||||
elif T is PongMessage: pong
|
elif T is PongMessage: pong
|
||||||
elif T is FindNodeMessage: findNode
|
elif T is FindNodeMessage: findNode
|
||||||
|
elif T is FindNodeFastMessage: findNodeFast
|
||||||
elif T is NodesMessage: nodes
|
elif T is NodesMessage: nodes
|
||||||
elif T is TalkReqMessage: talkReq
|
elif T is TalkReqMessage: talkReq
|
||||||
elif T is TalkRespMessage: talkResp
|
elif T is TalkRespMessage: talkResp
|
||||||
|
@ -15,7 +15,7 @@ import
|
|||||||
chronicles,
|
chronicles,
|
||||||
libp2p/routing_record,
|
libp2p/routing_record,
|
||||||
libp2p/signed_envelope,
|
libp2p/signed_envelope,
|
||||||
"."/[messages, spr],
|
"."/[messages, spr, node],
|
||||||
../../../../dht/providers_encoding
|
../../../../dht/providers_encoding
|
||||||
|
|
||||||
from stew/objects import checkedEnumAssign
|
from stew/objects import checkedEnumAssign
|
||||||
@ -60,6 +60,16 @@ proc append*(writer: var RlpWriter, ip: IpAddress) =
|
|||||||
writer.append(ip.address_v4)
|
writer.append(ip.address_v4)
|
||||||
of IpAddressFamily.IPv6: writer.append(ip.address_v6)
|
of IpAddressFamily.IPv6: writer.append(ip.address_v6)
|
||||||
|
|
||||||
|
proc read*(rlp: var Rlp, T: type NodeId): T
|
||||||
|
{.raises: [ValueError, RlpError, Defect].} =
|
||||||
|
mixin read
|
||||||
|
let nodeId = NodeId.fromBytesBE(rlp.toBytes())
|
||||||
|
rlp.skipElem()
|
||||||
|
nodeId
|
||||||
|
|
||||||
|
proc append*(writer: var RlpWriter, value: NodeId) =
|
||||||
|
writer.append(value.toBytesBE)
|
||||||
|
|
||||||
proc numFields(T: typedesc): int =
|
proc numFields(T: typedesc): int =
|
||||||
for k, v in fieldPairs(default(T)): inc result
|
for k, v in fieldPairs(default(T)): inc result
|
||||||
|
|
||||||
@ -117,6 +127,7 @@ proc decodeMessage*(body: openArray[byte]): DecodeResult[Message] =
|
|||||||
of ping: rlp.decode(message.ping)
|
of ping: rlp.decode(message.ping)
|
||||||
of pong: rlp.decode(message.pong)
|
of pong: rlp.decode(message.pong)
|
||||||
of findNode: rlp.decode(message.findNode)
|
of findNode: rlp.decode(message.findNode)
|
||||||
|
of findNodeFast: rlp.decode(message.findNodeFast)
|
||||||
of nodes: rlp.decode(message.nodes)
|
of nodes: rlp.decode(message.nodes)
|
||||||
of talkReq: rlp.decode(message.talkReq)
|
of talkReq: rlp.decode(message.talkReq)
|
||||||
of talkResp: rlp.decode(message.talkResp)
|
of talkResp: rlp.decode(message.talkResp)
|
||||||
|
@ -291,6 +291,12 @@ proc handleFindNode(d: Protocol, fromId: NodeId, fromAddr: Address,
|
|||||||
# with empty nodes.
|
# with empty nodes.
|
||||||
d.sendNodes(fromId, fromAddr, reqId, [])
|
d.sendNodes(fromId, fromAddr, reqId, [])
|
||||||
|
|
||||||
|
proc handleFindNodeFast(d: Protocol, fromId: NodeId, fromAddr: Address,
|
||||||
|
fnf: FindNodeFastMessage, reqId: RequestId) =
|
||||||
|
d.sendNodes(fromId, fromAddr, reqId,
|
||||||
|
d.routingTable.neighbours(fnf.target, seenOnly = true))
|
||||||
|
# TODO: if known, maybe we should add exact target even if not yet "seen"
|
||||||
|
|
||||||
proc handleTalkReq(d: Protocol, fromId: NodeId, fromAddr: Address,
|
proc handleTalkReq(d: Protocol, fromId: NodeId, fromAddr: Address,
|
||||||
talkreq: TalkReqMessage, reqId: RequestId) =
|
talkreq: TalkReqMessage, reqId: RequestId) =
|
||||||
let talkProtocol = d.talkProtocols.getOrDefault(talkreq.protocol)
|
let talkProtocol = d.talkProtocols.getOrDefault(talkreq.protocol)
|
||||||
@ -336,6 +342,9 @@ proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address,
|
|||||||
of findNode:
|
of findNode:
|
||||||
discovery_message_requests_incoming.inc()
|
discovery_message_requests_incoming.inc()
|
||||||
d.handleFindNode(srcId, fromAddr, message.findNode, message.reqId)
|
d.handleFindNode(srcId, fromAddr, message.findNode, message.reqId)
|
||||||
|
of findNodeFast:
|
||||||
|
discovery_message_requests_incoming.inc()
|
||||||
|
d.handleFindNodeFast(srcId, fromAddr, message.findNodeFast, message.reqId)
|
||||||
of talkReq:
|
of talkReq:
|
||||||
discovery_message_requests_incoming.inc()
|
discovery_message_requests_incoming.inc()
|
||||||
d.handleTalkReq(srcId, fromAddr, message.talkReq, message.reqId)
|
d.handleTalkReq(srcId, fromAddr, message.talkReq, message.reqId)
|
||||||
@ -467,7 +476,7 @@ proc ping*(d: Protocol, toNode: Node):
|
|||||||
|
|
||||||
proc findNode*(d: Protocol, toNode: Node, distances: seq[uint16]):
|
proc findNode*(d: Protocol, toNode: Node, distances: seq[uint16]):
|
||||||
Future[DiscResult[seq[Node]]] {.async.} =
|
Future[DiscResult[seq[Node]]] {.async.} =
|
||||||
## Send a discovery findNode message.
|
## Send a getNeighbours message.
|
||||||
##
|
##
|
||||||
## Returns the received nodes or an error.
|
## Returns the received nodes or an error.
|
||||||
## Received SPRs are already validated and converted to `Node`.
|
## Received SPRs are already validated and converted to `Node`.
|
||||||
@ -482,6 +491,24 @@ proc findNode*(d: Protocol, toNode: Node, distances: seq[uint16]):
|
|||||||
d.replaceNode(toNode)
|
d.replaceNode(toNode)
|
||||||
return err(nodes.error)
|
return err(nodes.error)
|
||||||
|
|
||||||
|
proc findNodeFast*(d: Protocol, toNode: Node, target: NodeId):
|
||||||
|
Future[DiscResult[seq[Node]]] {.async.} =
|
||||||
|
## Send a findNode message.
|
||||||
|
##
|
||||||
|
## Returns the received nodes or an error.
|
||||||
|
## Received SPRs are already validated and converted to `Node`.
|
||||||
|
let reqId = d.sendRequest(toNode, FindNodeFastMessage(target: target))
|
||||||
|
let nodes = await d.waitNodes(toNode, reqId)
|
||||||
|
|
||||||
|
if nodes.isOk:
|
||||||
|
let res = verifyNodesRecords(nodes.get(), toNode, findNodeResultLimit)
|
||||||
|
d.routingTable.setJustSeen(toNode)
|
||||||
|
return ok(res)
|
||||||
|
else:
|
||||||
|
d.replaceNode(toNode)
|
||||||
|
return err(nodes.error)
|
||||||
|
|
||||||
|
|
||||||
proc talkReq*(d: Protocol, toNode: Node, protocol, request: seq[byte]):
|
proc talkReq*(d: Protocol, toNode: Node, protocol, request: seq[byte]):
|
||||||
Future[DiscResult[seq[byte]]] {.async.} =
|
Future[DiscResult[seq[byte]]] {.async.} =
|
||||||
## Send a discovery talkreq message.
|
## Send a discovery talkreq message.
|
||||||
@ -530,7 +557,19 @@ proc lookupWorker(d: Protocol, destNode: Node, target: NodeId):
|
|||||||
for n in result:
|
for n in result:
|
||||||
discard d.addNode(n)
|
discard d.addNode(n)
|
||||||
|
|
||||||
proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]] {.async.} =
|
proc lookupWorkerFast(d: Protocol, destNode: Node, target: NodeId):
|
||||||
|
Future[seq[Node]] {.async.} =
|
||||||
|
## use terget NodeId based find_node
|
||||||
|
|
||||||
|
let r = await d.findNodeFast(destNode, target)
|
||||||
|
if r.isOk:
|
||||||
|
result.add(r[])
|
||||||
|
|
||||||
|
# Attempt to add all nodes discovered
|
||||||
|
for n in result:
|
||||||
|
discard d.addNode(n)
|
||||||
|
|
||||||
|
proc lookup*(d: Protocol, target: NodeId, fast: bool = false): Future[seq[Node]] {.async.} =
|
||||||
## Perform a lookup for the given target, return the closest n nodes to the
|
## Perform a lookup for the given target, return the closest n nodes to the
|
||||||
## target. Maximum value for n is `BUCKET_SIZE`.
|
## target. Maximum value for n is `BUCKET_SIZE`.
|
||||||
# `closestNodes` holds the k closest nodes to target found, sorted by distance
|
# `closestNodes` holds the k closest nodes to target found, sorted by distance
|
||||||
@ -553,7 +592,10 @@ proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]] {.async.} =
|
|||||||
while i < closestNodes.len and pendingQueries.len < alpha:
|
while i < closestNodes.len and pendingQueries.len < alpha:
|
||||||
let n = closestNodes[i]
|
let n = closestNodes[i]
|
||||||
if not asked.containsOrIncl(n.id):
|
if not asked.containsOrIncl(n.id):
|
||||||
pendingQueries.add(d.lookupWorker(n, target))
|
if fast:
|
||||||
|
pendingQueries.add(d.lookupWorkerFast(n, target))
|
||||||
|
else:
|
||||||
|
pendingQueries.add(d.lookupWorker(n, target))
|
||||||
inc i
|
inc i
|
||||||
|
|
||||||
trace "discv5 pending queries", total = pendingQueries.len
|
trace "discv5 pending queries", total = pendingQueries.len
|
||||||
|
Loading…
x
Reference in New Issue
Block a user