diff --git a/fluffy/network/state/content.nim b/fluffy/network/state/content.nim index 79d3aaf01..29c772984 100644 --- a/fluffy/network/state/content.nim +++ b/fluffy/network/state/content.nim @@ -11,7 +11,7 @@ import std/[options, sugar], - nimcrypto/[sha2, hash], stew/objects, + nimcrypto/[sha2, hash], stew/objects, stint, eth/ssz/ssz_serialization, eth/trie/[hexary, db] export ssz_serialization @@ -81,6 +81,12 @@ func toContentId*(contentKey: ByteList): ContentId = # https://github.com/ethereum/stateless-ethereum-specs/blob/master/state-network.md#content sha2.sha_256.digest(contentKey.asSeq()) +func toContentId*(contentKey: ContentKey): ContentId = + toContentId(encodeKeyAsList(contentKey)) + +func contentIdAsUint256*(id: ContentId): Uint256 = + readUintBE[256](id.data) + type ContentStorage* = object # TODO: Quick implementation for now where we just use HexaryTrie, current diff --git a/fluffy/network/state/portal_protocol.nim b/fluffy/network/state/portal_protocol.nim index d49bdf427..1df1c4dd6 100644 --- a/fluffy/network/state/portal_protocol.nim +++ b/fluffy/network/state/portal_protocol.nim @@ -41,6 +41,15 @@ type PortalResult*[T] = Result[T, cstring] + LookupResultKind = enum + Nodes, Content + + LookupResult = object + case kind: LookupResultKind + of Nodes: + nodes: seq[Node] + of Content: + content: ByteList proc addNode*(p: PortalProtocol, node: Node): NodeStatus = p.routingTable.addNode(node) @@ -102,8 +111,7 @@ proc handleFindContent(p: PortalProtocol, fc: FindContentMessage): seq[byte] = NodeId(readUintBE[256](contentId.data)), seenOnly = true) payload = ByteList(@[]) # Empty payload when enrs are send enrs = - closestNodes.map(proc(x: Node): ByteList = ByteList(x.record.raw)) - + closestNodes.map(proc(x: Node): ByteList = ByteList(x.record.raw)) encodeMessage(FoundContentMessage( enrs: List[ByteList, 32](List(enrs)), payload: payload)) @@ -295,6 +303,112 @@ proc lookup*(p: PortalProtocol, target: NodeId): Future[seq[Node]] {.async.} = p.lastLookup = now(chronos.Moment) return closestNodes +proc handleFoundContentMessage(p: PortalProtocol, m: FoundContentMessage, dst: Node, nodes: var seq[Node]): LookupResult = + if (m.enrs.len() != 0 and m.payload.len() == 0): + let records = recordsFromBytes(m.enrs) + # TODO cannot use verifyNodesRecords(records, destNode, @[0'u16]) as it + # also verify logdistances distances, but with content query those are not + # used. + # Implement version of verifyNodesRecords wchich do not validate distances. + for r in records: + let node = newNode(r) + if node.isOk(): + let n = node.get() + nodes.add(n) + # Attempt to add all nodes discovered to routing table + discard p.routingTable.addNode(n) + + return LookupResult(kind: Nodes, nodes: nodes) + elif (m.payload.len() != 0 and m.enrs.len() == 0): + return LookupResult(kind: Content, content: m.payload) + elif ((m.payload.len() != 0 and m.enrs.len() != 0)): + # Both payload and enrs are filled, which means protocol breach. For now + # just logging offending node to quickly identify it + warn "Invalid foundcontent response form node ", uri = toURI(dst.record) + return LookupResult(kind: Nodes, nodes: nodes) + else: + return LookupResult(kind: Nodes, nodes: nodes) + +proc contentLookupWorker(p: PortalProtocol, destNode: Node, target: ContentKey): + Future[LookupResult] {.async.} = + var nodes: seq[Node] + + let contentMessageResponse = await p.findContent(destNode, target) + + if contentMessageResponse.isOk(): + return handleFoundContentMessage(p, contentMessageResponse.get(), destNode, nodes) + else: + return LookupResult(kind: Nodes, nodes: nodes) + +# TODO ContentLookup and Lookup look almost exactly the same, also lookups in other +# networks will probably be very similar. Extract lookup function to separate module +# and make it more generaic +proc contentLookup*(p: PortalProtocol, target: ContentKey): Future[Option[ByteList]] {.async.} = + let targetId = contentIdAsUint256(toContentId(target)) + ## Perform a lookup for the given target, return the closest n nodes to the + ## target. Maximum value for n is `BUCKET_SIZE`. + # `closestNodes` holds the k closest nodes to target found, sorted by distance + # Unvalidated nodes are used for requests as a form of validation. + var closestNodes = p.routingTable.neighbours(targetId, BUCKET_SIZE, + seenOnly = false) + + var asked, seen = initHashSet[NodeId]() + asked.incl(p.baseProtocol.localNode.id) # No need to ask our own node + seen.incl(p.baseProtocol.localNode.id) # No need to discover our own node + for node in closestNodes: + seen.incl(node.id) + + var pendingQueries = newSeqOfCap[Future[LookupResult]](Alpha) + + while true: + var i = 0 + # Doing `alpha` amount of requests at once as long as closer non queried + # nodes are discovered. + while i < closestNodes.len and pendingQueries.len < Alpha: + let n = closestNodes[i] + if not asked.containsOrIncl(n.id): + pendingQueries.add(p.contentLookupWorker(n, target)) + inc i + + trace "Pending lookup queries", total = pendingQueries.len + + if pendingQueries.len == 0: + break + + let query = await one(pendingQueries) + trace "Got lookup query response" + + let index = pendingQueries.find(query) + if index != -1: + pendingQueries.del(index) + else: + error "Resulting query should have been in the pending queries" + + let lookupResult = query.read + + # TODO: Remove node on timed-out query? To handle failure better, LookUpResult + # should have third enum option like failure. + case lookupResult.kind + of Nodes: + for n in lookupResult.nodes: + if not seen.containsOrIncl(n.id): + # If it wasn't seen before, insert node while remaining sorted + closestNodes.insert(n, closestNodes.lowerBound(n, + proc(x: Node, n: Node): int = + cmp(distanceTo(x, targetId), distanceTo(n, targetId)) + )) + + if closestNodes.len > BUCKET_SIZE: + closestNodes.del(closestNodes.high()) + of Content: + # cancel any pending queries as we have find the content + for f in pendingQueries: + f.cancel() + + return some(lookupResult.content) + + return none[ByteList]() + proc query*(p: PortalProtocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]] {.async.} = ## Query k nodes for the given target, returns all nodes found, including the diff --git a/fluffy/tests/test_content_network.nim b/fluffy/tests/test_content_network.nim index 11ed0ca19..c229995bf 100644 --- a/fluffy/tests/test_content_network.nim +++ b/fluffy/tests/test_content_network.nim @@ -9,7 +9,7 @@ import std/os, testutils/unittests, eth/[keys, trie/db, trie/hexary, ssz/ssz_serialization], - eth/p2p/discoveryv5/protocol as discv5_protocol, + eth/p2p/discoveryv5/protocol as discv5_protocol, eth/p2p/discoveryv5/routing_table, ../../nimbus/[genesis, chain_config, db/db_chain], ../network/state/portal_protocol, ../network/state/content, ./test_helpers @@ -74,5 +74,58 @@ procSuite "Content Network": let hash = hexary.keccak(foundContent.get().payload.asSeq()) check hash.data == key + await node1.closeWait() await node2.closeWait() + + asyncTest "Find content in the network via content lookup": + let + node1 = initDiscoveryNode( + rng, PrivateKey.random(rng[]), localAddress(20302)) + node2 = initDiscoveryNode( + rng, PrivateKey.random(rng[]), localAddress(20303)) + node3 = initDiscoveryNode( + rng, PrivateKey.random(rng[]), localAddress(20304)) + + + proto1 = PortalProtocol.new(node1) + proto2 = PortalProtocol.new(node2) + proto3 = PortalProtocol.new(node3) + + let trie = + genesisToTrie("fluffy" / "tests" / "custom_genesis" / "chainid7.json") + + proto3.contentStorage = ContentStorage(trie: trie) + + # Node1 knows about Node2, and Node2 knows about Node3 which hold all content + check proto1.addNode(proto2.baseProtocol.localNode) == Added + check proto2.addNode(proto3.baseProtocol.localNode) == Added + + check (await proto2.ping(proto3.baseProtocol.localNode)).isOk() + + var keys: seq[seq[byte]] + for k, v in trie.replicate: + keys.add(k) + + # Get first key + var nodeHash: NodeHash + let firstKey = keys[0] + copyMem(nodeHash.data.addr, unsafeAddr firstKey[0], sizeof(nodeHash.data)) + + let contentKey = ContentKey( + networkId: 0'u16, + contentType: content.ContentType.Account, + nodeHash: nodeHash) + + let foundContent = await proto1.contentLookup(contentKey) + + check: + foundContent.isSome() + + let hash = hexary.keccak(foundContent.get().asSeq()) + + check hash.data == firstKey + + await node1.closeWait() + await node2.closeWait() + await node3.closeWait()