implement portal_historyTraceRecursiveFindContent (#1813)

implement portal_historyTraceRecursiveFindContent according to spec
This commit is contained in:
Daniel Sobol 2023-10-30 17:48:06 +03:00 committed by GitHub
parent ddfaf2a4df
commit 9022a3993c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 222 additions and 0 deletions

View File

@ -195,6 +195,28 @@ type
# content is in their range
nodesInterestedInContent*: seq[Node]
TraceResponse* = object
durationMs*: int64
respondedWith*: seq[NodeId]
NodeMetadata* = object
enr*: Record
distance*: UInt256
TraceObject* = object
origin*: NodeId
targetId: UInt256
receivedFrom*: NodeId
responses*: Table[string, TraceResponse]
metadata*: Table[string, NodeMetadata]
cancelled*: seq[NodeId]
startedAtMs*: int64
TraceContentLookupResult* = object
content*: seq[byte]
utpTransfer*: bool
trace*: TraceObject
proc init*(
T: type ContentKV,
contentKey: ByteList,
@ -1072,6 +1094,183 @@ proc contentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256):
portal_lookup_content_failures.inc()
return Opt.none(ContentLookupResult)
proc traceContentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256):
Future[Opt[TraceContentLookupResult]] {.async.} =
## Perform a lookup for the given target, return the closest n nodes to the
## target. Maximum value for n is `BUCKET_SIZE`.
# `closestNodes` holds the k closest nodes to target found, sorted by distance
# Unvalidated nodes are used for requests as a form of validation.
var closestNodes = p.routingTable.neighbours(
targetId, BUCKET_SIZE, seenOnly = false)
# Shuffling the order of the nodes in order to not always hit the same node
# first for the same request.
p.baseProtocol.rng[].shuffle(closestNodes)
let ts = now(chronos.Moment)
var responses = initTable[string, TraceResponse]()
var metadata = initTable[string, NodeMetadata]()
var asked, seen = initHashSet[NodeId]()
asked.incl(p.baseProtocol.localNode.id) # No need to ask our own node
seen.incl(p.baseProtocol.localNode.id) # No need to discover our own node
for node in closestNodes:
seen.incl(node.id)
# Local node should be part of the responses
responses["0x" & $p.localNode.id] = TraceResponse(
durationMs: 0,
respondedWith: seen.toSeq()
)
metadata["0x" & $p.localNode.id] = NodeMetadata(
enr: p.localNode.record,
distance: p.routingTable.distance(p.localNode.id, targetId)
)
# We should also have metadata for all the closes nodes
# in order to be able to show cancelled requests
for cn in closestNodes:
metadata["0x" & $cn.id] = NodeMetadata(
enr: cn.record,
distance: p.routingTable.distance(cn.id, targetId)
)
var pendingQueries = newSeqOfCap[Future[PortalResult[FoundContent]]](alpha)
var pendingNodes = newSeq[Node]()
var requestAmount = 0'i64
var nodesWithoutContent: seq[Node] = newSeq[Node]()
while true:
var i = 0
# Doing `alpha` amount of requests at once as long as closer non queried
# nodes are discovered.
while i < closestNodes.len and pendingQueries.len < alpha:
let n = closestNodes[i]
if not asked.containsOrIncl(n.id):
pendingQueries.add(p.findContent(n, target))
pendingNodes.add(n)
requestAmount.inc()
inc i
trace "Pending lookup queries", total = pendingQueries.len
if pendingQueries.len == 0:
break
let query = await one(pendingQueries)
trace "Got lookup query response"
let index = pendingQueries.find(query)
if index != -1:
pendingQueries.del(index)
pendingNodes.del(index)
else:
error "Resulting query should have been in the pending queries"
let contentResult = query.read
if contentResult.isOk():
let content = contentResult.get()
case content.kind
of Nodes:
let duration = chronos.milliseconds(now(chronos.Moment) - ts)
let maybeRadius = p.radiusCache.get(content.src.id)
if maybeRadius.isSome() and
p.inRange(content.src.id, maybeRadius.unsafeGet(), targetId):
# Only return nodes which may be interested in content.
# No need to check for duplicates in nodesWithoutContent
# as requests are never made two times to the same node.
nodesWithoutContent.add(content.src)
var respondedWith = newSeq[NodeId]()
for n in content.nodes:
let dist = p.routingTable.distance(n.id, targetId)
metadata["0x" & $n.id] = NodeMetadata(
enr: n.record,
distance: dist,
)
respondedWith.add(n.id)
if not seen.containsOrIncl(n.id):
discard p.routingTable.addNode(n)
# If it wasn't seen before, insert node while remaining sorted
closestNodes.insert(n, closestNodes.lowerBound(n,
proc(x: Node, n: Node): int =
cmp(p.routingTable.distance(x.id, targetId), dist)
))
if closestNodes.len > BUCKET_SIZE:
closestNodes.del(closestNodes.high())
let distance = p.routingTable.distance(content.src.id, targetId)
let address = content.src.address.get()
responses["0x" & $content.src.id] = TraceResponse(
durationMs: duration,
respondedWith: respondedWith,
)
metadata["0x" & $content.src.id] = NodeMetadata(
enr: content.src.record,
distance: distance,
)
of Content:
let duration = chronos.milliseconds(now(chronos.Moment) - ts)
# cancel any pending queries as the content has been found
for f in pendingQueries:
f.cancel()
portal_lookup_content_requests.observe(requestAmount)
let distance = p.routingTable.distance(content.src.id, targetId)
responses["0x" & $content.src.id] = TraceResponse(
durationMs: duration,
respondedWith: newSeq[NodeId](),
)
metadata["0x" & $content.src.id] = NodeMetadata(
enr: content.src.record,
distance: distance,
)
var pendingNodeIds = newSeq[NodeId]()
for pn in pendingNodes:
pendingNodeIds.add(pn.id)
metadata["0x" & $pn.id] = NodeMetadata(
enr: pn.record,
distance: p.routingTable.distance(pn.id, targetId)
)
return Opt.some(TraceContentLookupResult(
content: content.content,
utpTransfer: content.utpTransfer,
trace: TraceObject(
origin: p.localNode.id,
targetId: targetId,
receivedFrom: content.src.id,
responses: responses,
metadata: metadata,
cancelled: pendingNodeIds,
startedAtMs: chronos.epochNanoSeconds(ts) div 1_000_000 # nanoseconds to milliseconds
)
))
else:
# TODO: Should we do something with the node that failed responding our
# query?
discard
portal_lookup_content_failures.inc()
return Opt.none(TraceContentLookupResult)
proc query*(p: PortalProtocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
{.async.} =
## Query k nodes for the given target, returns all nodes found, including the

View File

@ -24,6 +24,12 @@ type
content: string
utpTransfer: bool
TraceContentInfo* = object
content*: string
utpTransfer: bool
trace*: TraceObject
# Note:
# Using a string for the network parameter will give an error in the rpc macro:
# Error: Invalid node kind nnkInfix for macros.`$`
@ -173,6 +179,23 @@ proc installPortalApiHandlers*(
utpTransfer: contentResult.utpTransfer
)
rpcServer.rpc("portal_" & network & "TraceRecursiveFindContent") do(
contentKey: string) -> TraceContentInfo:
let
key = ByteList.init(hexToSeqByte(contentKey))
contentId = p.toContentId(key).valueOr:
raise newException(ValueError, "Invalid content key")
contentResult = (await p.traceContentLookup(key, contentId)).valueOr:
return TraceContentInfo(content: "0x")
return TraceContentInfo(
content: contentResult.content.to0xHex(),
utpTransfer: contentResult.utpTransfer,
trace: contentResult.trace
)
rpcServer.rpc("portal_" & network & "Store") do(
contentKey: string, contentValue: string) -> bool:
let key = ByteList.init(hexToSeqByte(contentKey))