mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-01-13 13:55:45 +00:00
Add content lookup function (#807)
* Add content lookup function * Pr comments Refactor contentLookup function Add additional logginf for protocol breaches in find content responses
This commit is contained in:
parent
14d2edcb26
commit
df3e7bb368
@ -11,7 +11,7 @@
|
||||
|
||||
import
|
||||
std/[options, sugar],
|
||||
nimcrypto/[sha2, hash], stew/objects,
|
||||
nimcrypto/[sha2, hash], stew/objects, stint,
|
||||
eth/ssz/ssz_serialization, eth/trie/[hexary, db]
|
||||
|
||||
export ssz_serialization
|
||||
@ -81,6 +81,12 @@ func toContentId*(contentKey: ByteList): ContentId =
|
||||
# https://github.com/ethereum/stateless-ethereum-specs/blob/master/state-network.md#content
|
||||
sha2.sha_256.digest(contentKey.asSeq())
|
||||
|
||||
func toContentId*(contentKey: ContentKey): ContentId =
|
||||
toContentId(encodeKeyAsList(contentKey))
|
||||
|
||||
func contentIdAsUint256*(id: ContentId): Uint256 =
|
||||
readUintBE[256](id.data)
|
||||
|
||||
type
|
||||
ContentStorage* = object
|
||||
# TODO: Quick implementation for now where we just use HexaryTrie, current
|
||||
|
@ -41,6 +41,15 @@ type
|
||||
|
||||
PortalResult*[T] = Result[T, cstring]
|
||||
|
||||
LookupResultKind = enum
|
||||
Nodes, Content
|
||||
|
||||
LookupResult = object
|
||||
case kind: LookupResultKind
|
||||
of Nodes:
|
||||
nodes: seq[Node]
|
||||
of Content:
|
||||
content: ByteList
|
||||
|
||||
proc addNode*(p: PortalProtocol, node: Node): NodeStatus =
|
||||
p.routingTable.addNode(node)
|
||||
@ -102,8 +111,7 @@ proc handleFindContent(p: PortalProtocol, fc: FindContentMessage): seq[byte] =
|
||||
NodeId(readUintBE[256](contentId.data)), seenOnly = true)
|
||||
payload = ByteList(@[]) # Empty payload when enrs are send
|
||||
enrs =
|
||||
closestNodes.map(proc(x: Node): ByteList = ByteList(x.record.raw))
|
||||
|
||||
closestNodes.map(proc(x: Node): ByteList = ByteList(x.record.raw))
|
||||
encodeMessage(FoundContentMessage(
|
||||
enrs: List[ByteList, 32](List(enrs)), payload: payload))
|
||||
|
||||
@ -295,6 +303,112 @@ proc lookup*(p: PortalProtocol, target: NodeId): Future[seq[Node]] {.async.} =
|
||||
p.lastLookup = now(chronos.Moment)
|
||||
return closestNodes
|
||||
|
||||
proc handleFoundContentMessage(p: PortalProtocol, m: FoundContentMessage, dst: Node, nodes: var seq[Node]): LookupResult =
|
||||
if (m.enrs.len() != 0 and m.payload.len() == 0):
|
||||
let records = recordsFromBytes(m.enrs)
|
||||
# TODO cannot use verifyNodesRecords(records, destNode, @[0'u16]) as it
|
||||
# also verify logdistances distances, but with content query those are not
|
||||
# used.
|
||||
# Implement version of verifyNodesRecords wchich do not validate distances.
|
||||
for r in records:
|
||||
let node = newNode(r)
|
||||
if node.isOk():
|
||||
let n = node.get()
|
||||
nodes.add(n)
|
||||
# Attempt to add all nodes discovered to routing table
|
||||
discard p.routingTable.addNode(n)
|
||||
|
||||
return LookupResult(kind: Nodes, nodes: nodes)
|
||||
elif (m.payload.len() != 0 and m.enrs.len() == 0):
|
||||
return LookupResult(kind: Content, content: m.payload)
|
||||
elif ((m.payload.len() != 0 and m.enrs.len() != 0)):
|
||||
# Both payload and enrs are filled, which means protocol breach. For now
|
||||
# just logging offending node to quickly identify it
|
||||
warn "Invalid foundcontent response form node ", uri = toURI(dst.record)
|
||||
return LookupResult(kind: Nodes, nodes: nodes)
|
||||
else:
|
||||
return LookupResult(kind: Nodes, nodes: nodes)
|
||||
|
||||
proc contentLookupWorker(p: PortalProtocol, destNode: Node, target: ContentKey):
|
||||
Future[LookupResult] {.async.} =
|
||||
var nodes: seq[Node]
|
||||
|
||||
let contentMessageResponse = await p.findContent(destNode, target)
|
||||
|
||||
if contentMessageResponse.isOk():
|
||||
return handleFoundContentMessage(p, contentMessageResponse.get(), destNode, nodes)
|
||||
else:
|
||||
return LookupResult(kind: Nodes, nodes: nodes)
|
||||
|
||||
# TODO ContentLookup and Lookup look almost exactly the same, also lookups in other
|
||||
# networks will probably be very similar. Extract lookup function to separate module
|
||||
# and make it more generaic
|
||||
proc contentLookup*(p: PortalProtocol, target: ContentKey): Future[Option[ByteList]] {.async.} =
|
||||
let targetId = contentIdAsUint256(toContentId(target))
|
||||
## Perform a lookup for the given target, return the closest n nodes to the
|
||||
## target. Maximum value for n is `BUCKET_SIZE`.
|
||||
# `closestNodes` holds the k closest nodes to target found, sorted by distance
|
||||
# Unvalidated nodes are used for requests as a form of validation.
|
||||
var closestNodes = p.routingTable.neighbours(targetId, BUCKET_SIZE,
|
||||
seenOnly = false)
|
||||
|
||||
var asked, seen = initHashSet[NodeId]()
|
||||
asked.incl(p.baseProtocol.localNode.id) # No need to ask our own node
|
||||
seen.incl(p.baseProtocol.localNode.id) # No need to discover our own node
|
||||
for node in closestNodes:
|
||||
seen.incl(node.id)
|
||||
|
||||
var pendingQueries = newSeqOfCap[Future[LookupResult]](Alpha)
|
||||
|
||||
while true:
|
||||
var i = 0
|
||||
# Doing `alpha` amount of requests at once as long as closer non queried
|
||||
# nodes are discovered.
|
||||
while i < closestNodes.len and pendingQueries.len < Alpha:
|
||||
let n = closestNodes[i]
|
||||
if not asked.containsOrIncl(n.id):
|
||||
pendingQueries.add(p.contentLookupWorker(n, target))
|
||||
inc i
|
||||
|
||||
trace "Pending lookup queries", total = pendingQueries.len
|
||||
|
||||
if pendingQueries.len == 0:
|
||||
break
|
||||
|
||||
let query = await one(pendingQueries)
|
||||
trace "Got lookup query response"
|
||||
|
||||
let index = pendingQueries.find(query)
|
||||
if index != -1:
|
||||
pendingQueries.del(index)
|
||||
else:
|
||||
error "Resulting query should have been in the pending queries"
|
||||
|
||||
let lookupResult = query.read
|
||||
|
||||
# TODO: Remove node on timed-out query? To handle failure better, LookUpResult
|
||||
# should have third enum option like failure.
|
||||
case lookupResult.kind
|
||||
of Nodes:
|
||||
for n in lookupResult.nodes:
|
||||
if not seen.containsOrIncl(n.id):
|
||||
# If it wasn't seen before, insert node while remaining sorted
|
||||
closestNodes.insert(n, closestNodes.lowerBound(n,
|
||||
proc(x: Node, n: Node): int =
|
||||
cmp(distanceTo(x, targetId), distanceTo(n, targetId))
|
||||
))
|
||||
|
||||
if closestNodes.len > BUCKET_SIZE:
|
||||
closestNodes.del(closestNodes.high())
|
||||
of Content:
|
||||
# cancel any pending queries as we have find the content
|
||||
for f in pendingQueries:
|
||||
f.cancel()
|
||||
|
||||
return some(lookupResult.content)
|
||||
|
||||
return none[ByteList]()
|
||||
|
||||
proc query*(p: PortalProtocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
|
||||
{.async.} =
|
||||
## Query k nodes for the given target, returns all nodes found, including the
|
||||
|
@ -9,7 +9,7 @@ import
|
||||
std/os,
|
||||
testutils/unittests,
|
||||
eth/[keys, trie/db, trie/hexary, ssz/ssz_serialization],
|
||||
eth/p2p/discoveryv5/protocol as discv5_protocol,
|
||||
eth/p2p/discoveryv5/protocol as discv5_protocol, eth/p2p/discoveryv5/routing_table,
|
||||
../../nimbus/[genesis, chain_config, db/db_chain],
|
||||
../network/state/portal_protocol, ../network/state/content,
|
||||
./test_helpers
|
||||
@ -74,5 +74,58 @@ procSuite "Content Network":
|
||||
|
||||
let hash = hexary.keccak(foundContent.get().payload.asSeq())
|
||||
check hash.data == key
|
||||
|
||||
await node1.closeWait()
|
||||
await node2.closeWait()
|
||||
|
||||
asyncTest "Find content in the network via content lookup":
|
||||
let
|
||||
node1 = initDiscoveryNode(
|
||||
rng, PrivateKey.random(rng[]), localAddress(20302))
|
||||
node2 = initDiscoveryNode(
|
||||
rng, PrivateKey.random(rng[]), localAddress(20303))
|
||||
node3 = initDiscoveryNode(
|
||||
rng, PrivateKey.random(rng[]), localAddress(20304))
|
||||
|
||||
|
||||
proto1 = PortalProtocol.new(node1)
|
||||
proto2 = PortalProtocol.new(node2)
|
||||
proto3 = PortalProtocol.new(node3)
|
||||
|
||||
let trie =
|
||||
genesisToTrie("fluffy" / "tests" / "custom_genesis" / "chainid7.json")
|
||||
|
||||
proto3.contentStorage = ContentStorage(trie: trie)
|
||||
|
||||
# Node1 knows about Node2, and Node2 knows about Node3 which hold all content
|
||||
check proto1.addNode(proto2.baseProtocol.localNode) == Added
|
||||
check proto2.addNode(proto3.baseProtocol.localNode) == Added
|
||||
|
||||
check (await proto2.ping(proto3.baseProtocol.localNode)).isOk()
|
||||
|
||||
var keys: seq[seq[byte]]
|
||||
for k, v in trie.replicate:
|
||||
keys.add(k)
|
||||
|
||||
# Get first key
|
||||
var nodeHash: NodeHash
|
||||
let firstKey = keys[0]
|
||||
copyMem(nodeHash.data.addr, unsafeAddr firstKey[0], sizeof(nodeHash.data))
|
||||
|
||||
let contentKey = ContentKey(
|
||||
networkId: 0'u16,
|
||||
contentType: content.ContentType.Account,
|
||||
nodeHash: nodeHash)
|
||||
|
||||
let foundContent = await proto1.contentLookup(contentKey)
|
||||
|
||||
check:
|
||||
foundContent.isSome()
|
||||
|
||||
let hash = hexary.keccak(foundContent.get().asSeq())
|
||||
|
||||
check hash.data == firstKey
|
||||
|
||||
await node1.closeWait()
|
||||
await node2.closeWait()
|
||||
await node3.closeWait()
|
||||
|
Loading…
x
Reference in New Issue
Block a user