From 0fba19b81a0800388d1841a2218d84f6e7556ca7 Mon Sep 17 00:00:00 2001 From: Kim De Mey Date: Tue, 12 Apr 2022 15:49:19 +0200 Subject: [PATCH] Improvements to the propagation and seeding of data (#1058) * Improvements to the propagation and seeding of data - Use a lookup for nodes selection in neighborhoodGossip - Rework populate db code and add `propagateBlockHistoryDb` call and portal_history__propagateBlock json-rpc call - Small adjustment to blockwalk * Avoid storing out-of-range data in the propagate db calls --- fluffy/network/wire/portal_protocol.nim | 97 ++++----- fluffy/populate_db.nim | 191 +++++++++++------- .../rpc/rpc_calls/rpc_portal_debug_calls.nim | 1 + fluffy/rpc/rpc_portal_debug_api.nim | 8 + fluffy/scripts/test_portal_testnet.nim | 2 +- fluffy/tools/blockwalk.nim | 9 +- 6 files changed, 181 insertions(+), 127 deletions(-) diff --git a/fluffy/network/wire/portal_protocol.nim b/fluffy/network/wire/portal_protocol.nim index 16cc66e92..c3c44f861 100644 --- a/fluffy/network/wire/portal_protocol.nim +++ b/fluffy/network/wire/portal_protocol.nim @@ -743,51 +743,6 @@ proc offerWorker(p: PortalProtocol) {.async.} = proc offerQueueEmpty*(p: PortalProtocol): bool = p.offerQueue.empty() -proc neighborhoodGossip*(p: PortalProtocol, contentKeys: ContentKeysList) {.async.} = - let contentKey = contentKeys[0] # for now only 1 item is considered - let contentIdOpt = p.toContentId(contentKey) - if contentIdOpt.isNone(): - return - - let contentId = contentIdOpt.get() - # gossip content to closest neighbours to target: - # Selected closest 6 now. Better is perhaps to select 16 closest and then - # select 6 random out of those. - # TODO: Might actually have to do here closest to the local node, else data - # will not propagate well over to nodes with "large" Radius? - let closestNodes = p.routingTable.neighbours( - NodeId(contentId), k = 6, seenOnly = false) - - for node in closestNodes: - let req = OfferRequest(dst: node, kind: Database, contentKeys: contentKeys) - await p.offerQueue.addLast(req) - -proc processContent( - stream: PortalStream, contentKeys: ContentKeysList, content: seq[byte]) - {.gcsafe, raises: [Defect].} = - let p = getUserData[PortalProtocol](stream) - - # TODO: - # - Implement a way to discern different content items (e.g. length prefixed) - # - Check amount of content items according to ContentKeysList - # - The above could also live in `PortalStream` - - # TODO: for now we only consider 1 item being offered - if contentKeys.len() == 1: - let contentKey = contentKeys[0] - if p.validateContent(content, contentKey): - let contentIdOpt = p.toContentId(contentKey) - if contentIdOpt.isNone(): - return - - let contentId = contentIdOpt.get() - # Store content, should we recheck radius? - p.contentDB.put(contentId, content) - - asyncSpawn neighborhoodGossip(p, contentKeys) - else: - error "Received invalid content", contentKey - proc lookupWorker( p: PortalProtocol, dst: Node, target: NodeId): Future[seq[Node]] {.async.} = let distances = lookupDistances(target, dst.id) @@ -1025,6 +980,58 @@ proc queryRandom*(p: PortalProtocol): Future[seq[Node]] = ## Perform a query for a random target, return all nodes discovered. p.query(NodeId.random(p.baseProtocol.rng[])) +proc neighborhoodGossip*( + p: PortalProtocol, contentKeys: ContentKeysList, content: seq[byte]) + {.async.} = + let + # for now only 1 item is considered + contentInfo = ContentInfo(contentKey: contentKeys[0], content: content) + contentList = List[ContentInfo, contentKeysLimit].init(@[contentInfo]) + contentIdOpt = p.toContentId(contentInfo.contentKey) + + if contentIdOpt.isNone(): + return + + let contentId = contentIdOpt.get() + + # Doing an lookup over the network to get the very closest nodes to the + # content, instead of looking only at our own routing table. This should give + # a bigger rate of success in case the content is not known yet and avoid + # data being stopped in its propagation. However, perhaps this causes issues + # in data getting propagated in a wider id range. + let closestNodes = await p.lookup(NodeId(contentId)) + + for node in closestNodes[0..7]: # selecting closest 8 nodes + # Note: opportunistically not checking if the radius of the node is known + # and thus if the node is in radius with the content. + let req = OfferRequest(dst: node, kind: Direct, contentList: contentList) + await p.offerQueue.addLast(req) + +proc processContent( + stream: PortalStream, contentKeys: ContentKeysList, content: seq[byte]) + {.gcsafe, raises: [Defect].} = + let p = getUserData[PortalProtocol](stream) + + # TODO: + # - Implement a way to discern different content items (e.g. length prefixed) + # - Check amount of content items according to ContentKeysList + # - The above could also live in `PortalStream` + # For now we only consider 1 item being offered + if contentKeys.len() == 1: + let contentKey = contentKeys[0] + if p.validateContent(content, contentKey): + let contentIdOpt = p.toContentId(contentKey) + if contentIdOpt.isNone(): + return + + let contentId = contentIdOpt.get() + # Store content, should we recheck radius? + p.contentDB.put(contentId, content) + + asyncSpawn neighborhoodGossip(p, contentKeys, content) + else: + error "Received invalid content", contentKey + proc seedTable*(p: PortalProtocol) = ## Seed the table with specifically provided Portal bootstrap nodes. These are ## nodes that must support the wire protocol for the specific content network. diff --git a/fluffy/populate_db.nim b/fluffy/populate_db.nim index 2cdef716e..58a0944a0 100644 --- a/fluffy/populate_db.nim +++ b/fluffy/populate_db.nim @@ -30,7 +30,7 @@ type BlockDataTable* = Table[string, BlockData] -proc readBlockData*(dataFile: string): Result[BlockDataTable, string] = +proc readBlockDataTable*(dataFile: string): Result[BlockDataTable, string] = let blockData = readAllFile(dataFile) if blockData.isErr(): # TODO: map errors return err("Failed reading data-file") @@ -54,84 +54,92 @@ iterator blockHashes*(blockData: BlockDataTable): BlockHash = yield blockHash +proc readBlockData( + hash: string, blockData: BlockData, verify = false): + Result[seq[(ContentKey, seq[byte])], string] = + var res: seq[(ContentKey, seq[byte])] + + var rlp = + try: + rlpFromHex(blockData.rlp) + except ValueError as e: + return err("Invalid hex for rlp block data, number " & + $blockData.number & ": " & e.msg) + + # The data is currently formatted as an rlp encoded `EthBlock`, thus + # containing header, txs and uncles: [header, txs, uncles]. No receipts are + # available. + # TODO: Change to format to rlp data as it gets stored and send over the + # network over the network. I.e. [header, [txs, uncles], receipts] + if rlp.enterList(): + var blockHash: BlockHash + try: + blockHash.data = hexToByteArray[sizeof(BlockHash)](hash) + except ValueError as e: + return err("Invalid hex for blockhash, number " & + $blockData.number & ": " & e.msg) + + let contentKeyType = + ContentKeyType(chainId: 1'u16, blockHash: blockHash) + + try: + # If wanted the hash for the corresponding header can be verified + if verify: + if keccak256.digest(rlp.rawData()) != blockHash: + return err("Data is not matching hash, number " & $blockData.number) + + block: + let contentKey = ContentKey( + contentType: blockHeader, + blockHeaderKey: contentKeyType) + + res.add((contentKey, @(rlp.rawData()))) + rlp.skipElem() + + block: + let contentKey = ContentKey( + contentType: blockBody, + blockBodyKey: contentKeyType) + + # Note: Temporary until the data format gets changed. + let blockBody = BlockBody( + transactions: rlp.read(seq[Transaction]), + uncles: rlp.read(seq[BlockHeader])) + let rlpdata = encode(blockBody) + + res.add((contentKey, rlpdata)) + # res.add((contentKey, @(rlp.rawData()))) + # rlp.skipElem() + + # Note: No receipts yet in the data set + # block: + # let contentKey = ContentKey( + # contentType: receipts, + # receiptsKey: contentKeyType) + + # res.add((contentKey, @(rlp.rawData()))) + # rlp.skipElem() + + except RlpError as e: + return err("Invalid rlp data, number " & $blockData.number & ": " & e.msg) + + ok(res) + else: + err("Item is not a valid rlp list, number " & $blockData.number) + iterator blocks*( blockData: BlockDataTable, verify = false): seq[(ContentKey, seq[byte])] = for k,v in blockData: - var res: seq[(ContentKey, seq[byte])] + let res = readBlockData(k, v, verify) - var rlp = - try: - rlpFromHex(v.rlp) - except ValueError as e: - error "Invalid hex for rlp data", error = e.msg, number = v.number - continue - - # The data is currently formatted as an rlp encoded `EthBlock`, thus - # containing header, txs and uncles: [header, txs, uncles]. No receipts are - # available. - # TODO: Change to format to rlp data as it gets stored and send over the - # network over the network. I.e. [header, [txs, uncles], receipts] - if rlp.enterList(): - var blockHash: BlockHash - try: - blockHash.data = hexToByteArray[sizeof(BlockHash)](k) - except ValueError as e: - error "Invalid hex for block hash", error = e.msg, number = v.number - continue - - let contentKeyType = - ContentKeyType(chainId: 1'u16, blockHash: blockHash) - - try: - # If wanted the hash for the corresponding header can be verified - if verify: - if keccak256.digest(rlp.rawData()) != blockHash: - error "Data is not matching hash, skipping", number = v.number - continue - - block: - let contentKey = ContentKey( - contentType: blockHeader, - blockHeaderKey: contentKeyType) - - res.add((contentKey, @(rlp.rawData()))) - rlp.skipElem() - - block: - let contentKey = ContentKey( - contentType: blockBody, - blockBodyKey: contentKeyType) - - # Note: Temporary until the data format gets changed. - let blockBody = BlockBody( - transactions: rlp.read(seq[Transaction]), - uncles: rlp.read(seq[BlockHeader])) - let rlpdata = encode(blockBody) - - res.add((contentKey, rlpdata)) - # res.add((contentKey, @(rlp.rawData()))) - # rlp.skipElem() - - # Note: No receipts yet in the data set - # block: - # let contentKey = ContentKey( - # contentType: receipts, - # receiptsKey: contentKeyType) - - # res.add((contentKey, @(rlp.rawData()))) - # rlp.skipElem() - - except RlpError as e: - error "Invalid rlp data", number = v.number, error = e.msg - continue - - yield res + if res.isOk(): + yield res.get() else: - error "Item is not a valid rlp list", number = v.number + error "Failed reading block from block data", error = res.error proc populateHistoryDb*( db: ContentDB, dataFile: string, verify = false): Result[void, string] = - let blockData = ? readBlockData(dataFile) + let blockData = ? readBlockDataTable(dataFile) for b in blocks(blockData, verify): for value in b: @@ -143,22 +151,51 @@ proc populateHistoryDb*( proc propagateHistoryDb*( p: PortalProtocol, dataFile: string, verify = false): Future[Result[void, string]] {.async.} = - let blockData = readBlockData(dataFile) + let blockData = readBlockDataTable(dataFile) if blockData.isOk(): for b in blocks(blockData.get(), verify): for value in b: # Note: This is the slowest part due to the hashing that takes place. - p.contentDB.put(history_content.toContentId(value[0]), value[1]) + let contentId = history_content.toContentId(value[0]) + if p.inRange(contentId): + p.contentDB.put(contentId, value[1]) - # TODO: This call will get the content we just stored in the db, so it - # might be an improvement to directly pass it. - await p.neighborhoodGossip(ContentKeysList(@[encode(value[0])])) + await p.neighborhoodGossip( + ContentKeysList(@[encode(value[0])]), value[1]) # Need to be sure that all offers where started. TODO: this is not great. while not p.offerQueueEmpty(): - error "WAITING FOR OFFER QUEUE EMPTY" await sleepAsync(500.milliseconds) return ok() else: return err(blockData.error) + +proc propagateBlockHistoryDb*( + p: PortalProtocol, dataFile: string, blockHash: string, verify = false): + Future[Result[void, string]] {.async.} = + let blockDataTable = readBlockDataTable(dataFile) + + if blockDataTable.isOk(): + let b = + try: + blockDataTable.get()[blockHash] + except KeyError: + return err("Block hash not found in block data file") + + let blockDataRes = readBlockData(blockHash, b) + if blockDataRes.isErr: + return err(blockDataRes.error) + + let blockData = blockDataRes.get() + + for value in blockData: + let contentId = history_content.toContentId(value[0]) + if p.inRange(contentId): + p.contentDB.put(contentId, value[1]) + + await p.neighborhoodGossip(ContentKeysList(@[encode(value[0])]), value[1]) + + return ok() + else: + return err(blockDataTable.error) diff --git a/fluffy/rpc/rpc_calls/rpc_portal_debug_calls.nim b/fluffy/rpc/rpc_calls/rpc_portal_debug_calls.nim index 263eda6c6..bf1b19408 100644 --- a/fluffy/rpc/rpc_calls/rpc_portal_debug_calls.nim +++ b/fluffy/rpc/rpc_calls/rpc_portal_debug_calls.nim @@ -1,3 +1,4 @@ ## Portal History Network json-rpc debug & testing calls proc portal_history_store(contentId: string, content: string): bool proc portal_history_propagate(dataFile: string): bool +proc portal_history_propagateBlock(dataFile: string, blockHash: string): bool diff --git a/fluffy/rpc/rpc_portal_debug_api.nim b/fluffy/rpc/rpc_portal_debug_api.nim index ff71a7598..03d8fa0c7 100644 --- a/fluffy/rpc/rpc_portal_debug_api.nim +++ b/fluffy/rpc/rpc_portal_debug_api.nim @@ -34,3 +34,11 @@ proc installPortalDebugApiHandlers*( return true else: raise newException(ValueError, $res.error) + + rpcServer.rpc("portal_" & network & "_propagateBlock") do( + dataFile: string, blockHash: string) -> bool: + let res = await p.propagateBlockHistoryDb(dataFile, blockHash) + if res.isOk(): + return true + else: + raise newException(ValueError, $res.error) diff --git a/fluffy/scripts/test_portal_testnet.nim b/fluffy/scripts/test_portal_testnet.nim index 1942309e9..0319be528 100644 --- a/fluffy/scripts/test_portal_testnet.nim +++ b/fluffy/scripts/test_portal_testnet.nim @@ -188,7 +188,7 @@ procSuite "Portal testnet tests": check (await clients[0].portal_history_propagate(dataFile)) await clients[0].close() - let blockData = readBlockData(dataFile) + let blockData = readBlockDataTable(dataFile) check blockData.isOk() for client in clients: diff --git a/fluffy/tools/blockwalk.nim b/fluffy/tools/blockwalk.nim index ad7b9a7d7..5561cd068 100644 --- a/fluffy/tools/blockwalk.nim +++ b/fluffy/tools/blockwalk.nim @@ -11,6 +11,7 @@ {.push raises: [Defect].} import + std/strutils, confutils, chronicles, chronicles/topics_registry, stew/byteutils, eth/common/eth_types, ../../nimbus/rpc/[hexstrings, rpc_types], ../../nimbus/errors, @@ -60,12 +61,12 @@ proc walkBlocks(client: RpcClient, startHash: Hash256) {.async.} = let parentBlockOpt = try: await client.eth_getBlockByHash(parentHash.ethHashStr(), false) - except ValidationError as e: + except RpcPostError as e: # RpcPostError when for example timing out on the request. Could retry # in this case. fatal "Error occured on JSON-RPC request", error = e.msg quit 1 - except RpcPostError as e: + except ValidationError as e: # ValidationError from buildBlockObject, should not occur with proper # blocks fatal "Error occured on JSON-RPC request", error = e.msg @@ -89,8 +90,8 @@ proc walkBlocks(client: RpcClient, startHash: Hash256) {.async.} = blockNumber = parentBlock.number.get().string parentHash = parentBlock.parentHash - echo "Block number: " & blockNumber - echo "Block hash: " & "0x" & parentBlock.hash.get().data.toHex() + echo "Block " & $blockNumber.parseHexInt() & ": " & + parentBlock.hash.get().data.toHex() proc run(config: BlockWalkConf) {.async.} = let client = newRpcHttpClient()