From 9022a3993ce9324441c1f37f1c24a4d1aad8d382 Mon Sep 17 00:00:00 2001 From: Daniel Sobol Date: Mon, 30 Oct 2023 17:48:06 +0300 Subject: [PATCH] implement portal_historyTraceRecursiveFindContent (#1813) implement portal_historyTraceRecursiveFindContent according to spec --- fluffy/network/wire/portal_protocol.nim | 199 ++++++++++++++++++++++++ fluffy/rpc/rpc_portal_api.nim | 23 +++ 2 files changed, 222 insertions(+) diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index b928fa7a7..310208530 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -195,6 +195,28 @@ type # content is in their range nodesInterestedInContent*: seq[Node] + TraceResponse* = object + durationMs*: int64 + respondedWith*: seq[NodeId] + + NodeMetadata* = object + enr*: Record + distance*: UInt256 + + TraceObject* = object + origin*: NodeId + targetId: UInt256 + receivedFrom*: NodeId + responses*: Table[string, TraceResponse] + metadata*: Table[string, NodeMetadata] + cancelled*: seq[NodeId] + startedAtMs*: int64 + + TraceContentLookupResult* = object + content*: seq[byte] + utpTransfer*: bool + trace*: TraceObject + proc init*( T: type ContentKV, contentKey: ByteList, @@ -1072,6 +1094,183 @@ proc contentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256): portal_lookup_content_failures.inc() return Opt.none(ContentLookupResult) +proc traceContentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256): + Future[Opt[TraceContentLookupResult]] {.async.} = + ## Perform a lookup for the given target, return the closest n nodes to the + ## target. Maximum value for n is `BUCKET_SIZE`. + # `closestNodes` holds the k closest nodes to target found, sorted by distance + # Unvalidated nodes are used for requests as a form of validation. + var closestNodes = p.routingTable.neighbours( + targetId, BUCKET_SIZE, seenOnly = false) + # Shuffling the order of the nodes in order to not always hit the same node + # first for the same request. + p.baseProtocol.rng[].shuffle(closestNodes) + + let ts = now(chronos.Moment) + var responses = initTable[string, TraceResponse]() + var metadata = initTable[string, NodeMetadata]() + + var asked, seen = initHashSet[NodeId]() + asked.incl(p.baseProtocol.localNode.id) # No need to ask our own node + seen.incl(p.baseProtocol.localNode.id) # No need to discover our own node + for node in closestNodes: + seen.incl(node.id) + + # Local node should be part of the responses + responses["0x" & $p.localNode.id] = TraceResponse( + durationMs: 0, + respondedWith: seen.toSeq() + ) + + metadata["0x" & $p.localNode.id] = NodeMetadata( + enr: p.localNode.record, + distance: p.routingTable.distance(p.localNode.id, targetId) + ) + + # We should also have metadata for all the closes nodes + # in order to be able to show cancelled requests + for cn in closestNodes: + metadata["0x" & $cn.id] = NodeMetadata( + enr: cn.record, + distance: p.routingTable.distance(cn.id, targetId) + ) + + var pendingQueries = newSeqOfCap[Future[PortalResult[FoundContent]]](alpha) + var pendingNodes = newSeq[Node]() + var requestAmount = 0'i64 + + var nodesWithoutContent: seq[Node] = newSeq[Node]() + + while true: + var i = 0 + # Doing `alpha` amount of requests at once as long as closer non queried + # nodes are discovered. + while i < closestNodes.len and pendingQueries.len < alpha: + let n = closestNodes[i] + if not asked.containsOrIncl(n.id): + pendingQueries.add(p.findContent(n, target)) + pendingNodes.add(n) + requestAmount.inc() + inc i + + trace "Pending lookup queries", total = pendingQueries.len + + if pendingQueries.len == 0: + break + + let query = await one(pendingQueries) + trace "Got lookup query response" + + let index = pendingQueries.find(query) + if index != -1: + pendingQueries.del(index) + pendingNodes.del(index) + else: + error "Resulting query should have been in the pending queries" + + let contentResult = query.read + + if contentResult.isOk(): + let content = contentResult.get() + + case content.kind + of Nodes: + let duration = chronos.milliseconds(now(chronos.Moment) - ts) + + let maybeRadius = p.radiusCache.get(content.src.id) + if maybeRadius.isSome() and + p.inRange(content.src.id, maybeRadius.unsafeGet(), targetId): + # Only return nodes which may be interested in content. + # No need to check for duplicates in nodesWithoutContent + # as requests are never made two times to the same node. + nodesWithoutContent.add(content.src) + + var respondedWith = newSeq[NodeId]() + + for n in content.nodes: + let dist = p.routingTable.distance(n.id, targetId) + + metadata["0x" & $n.id] = NodeMetadata( + enr: n.record, + distance: dist, + ) + respondedWith.add(n.id) + + if not seen.containsOrIncl(n.id): + discard p.routingTable.addNode(n) + # If it wasn't seen before, insert node while remaining sorted + closestNodes.insert(n, closestNodes.lowerBound(n, + proc(x: Node, n: Node): int = + cmp(p.routingTable.distance(x.id, targetId), dist) + )) + + if closestNodes.len > BUCKET_SIZE: + closestNodes.del(closestNodes.high()) + + let distance = p.routingTable.distance(content.src.id, targetId) + + let address = content.src.address.get() + + responses["0x" & $content.src.id] = TraceResponse( + durationMs: duration, + respondedWith: respondedWith, + ) + + metadata["0x" & $content.src.id] = NodeMetadata( + enr: content.src.record, + distance: distance, + ) + + of Content: + let duration = chronos.milliseconds(now(chronos.Moment) - ts) + + # cancel any pending queries as the content has been found + for f in pendingQueries: + f.cancel() + portal_lookup_content_requests.observe(requestAmount) + + let distance = p.routingTable.distance(content.src.id, targetId) + + responses["0x" & $content.src.id] = TraceResponse( + durationMs: duration, + respondedWith: newSeq[NodeId](), + ) + + metadata["0x" & $content.src.id] = NodeMetadata( + enr: content.src.record, + distance: distance, + ) + + var pendingNodeIds = newSeq[NodeId]() + + for pn in pendingNodes: + pendingNodeIds.add(pn.id) + metadata["0x" & $pn.id] = NodeMetadata( + enr: pn.record, + distance: p.routingTable.distance(pn.id, targetId) + ) + + return Opt.some(TraceContentLookupResult( + content: content.content, + utpTransfer: content.utpTransfer, + trace: TraceObject( + origin: p.localNode.id, + targetId: targetId, + receivedFrom: content.src.id, + responses: responses, + metadata: metadata, + cancelled: pendingNodeIds, + startedAtMs: chronos.epochNanoSeconds(ts) div 1_000_000 # nanoseconds to milliseconds + ) + )) + else: + # TODO: Should we do something with the node that failed responding our + # query? + discard + + portal_lookup_content_failures.inc() + return Opt.none(TraceContentLookupResult) + proc query*(p: PortalProtocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]] {.async.} = ## Query k nodes for the given target, returns all nodes found, including the diff --git a/fluffy/rpc/rpc_portal_api.nim b/fluffy/rpc/rpc_portal_api.nim index e69b7ca3d..c41374ae9 100644 --- a/fluffy/rpc/rpc_portal_api.nim +++ b/fluffy/rpc/rpc_portal_api.nim @@ -24,6 +24,12 @@ type content: string utpTransfer: bool + TraceContentInfo* = object + content*: string + utpTransfer: bool + trace*: TraceObject + + # Note: # Using a string for the network parameter will give an error in the rpc macro: # Error: Invalid node kind nnkInfix for macros.`$` @@ -173,6 +179,23 @@ proc installPortalApiHandlers*( utpTransfer: contentResult.utpTransfer ) + rpcServer.rpc("portal_" & network & "TraceRecursiveFindContent") do( + contentKey: string) -> TraceContentInfo: + + let + key = ByteList.init(hexToSeqByte(contentKey)) + contentId = p.toContentId(key).valueOr: + raise newException(ValueError, "Invalid content key") + + contentResult = (await p.traceContentLookup(key, contentId)).valueOr: + return TraceContentInfo(content: "0x") + + return TraceContentInfo( + content: contentResult.content.to0xHex(), + utpTransfer: contentResult.utpTransfer, + trace: contentResult.trace + ) + rpcServer.rpc("portal_" & network & "Store") do( contentKey: string, contentValue: string) -> bool: let key = ByteList.init(hexToSeqByte(contentKey))