Improvements to the propagation and seeding of data (#1058)

* Improvements to the propagation and seeding of data

- Use a lookup for nodes selection in neighborhoodGossip
- Rework populate db code and add `propagateBlockHistoryDb` call
and portal_history__propagateBlock json-rpc call
- Small adjustment to blockwalk

* Avoid storing out-of-range data in the propagate db calls
This commit is contained in:
Kim De Mey 2022-04-12 15:49:19 +02:00 committed by GitHub
parent 0195d5830b
commit 0fba19b81a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 181 additions and 127 deletions

View File

@ -743,51 +743,6 @@ proc offerWorker(p: PortalProtocol) {.async.} =
proc offerQueueEmpty*(p: PortalProtocol): bool =
p.offerQueue.empty()
proc neighborhoodGossip*(p: PortalProtocol, contentKeys: ContentKeysList) {.async.} =
let contentKey = contentKeys[0] # for now only 1 item is considered
let contentIdOpt = p.toContentId(contentKey)
if contentIdOpt.isNone():
return
let contentId = contentIdOpt.get()
# gossip content to closest neighbours to target:
# Selected closest 6 now. Better is perhaps to select 16 closest and then
# select 6 random out of those.
# TODO: Might actually have to do here closest to the local node, else data
# will not propagate well over to nodes with "large" Radius?
let closestNodes = p.routingTable.neighbours(
NodeId(contentId), k = 6, seenOnly = false)
for node in closestNodes:
let req = OfferRequest(dst: node, kind: Database, contentKeys: contentKeys)
await p.offerQueue.addLast(req)
proc processContent(
stream: PortalStream, contentKeys: ContentKeysList, content: seq[byte])
{.gcsafe, raises: [Defect].} =
let p = getUserData[PortalProtocol](stream)
# TODO:
# - Implement a way to discern different content items (e.g. length prefixed)
# - Check amount of content items according to ContentKeysList
# - The above could also live in `PortalStream`
# TODO: for now we only consider 1 item being offered
if contentKeys.len() == 1:
let contentKey = contentKeys[0]
if p.validateContent(content, contentKey):
let contentIdOpt = p.toContentId(contentKey)
if contentIdOpt.isNone():
return
let contentId = contentIdOpt.get()
# Store content, should we recheck radius?
p.contentDB.put(contentId, content)
asyncSpawn neighborhoodGossip(p, contentKeys)
else:
error "Received invalid content", contentKey
proc lookupWorker(
p: PortalProtocol, dst: Node, target: NodeId): Future[seq[Node]] {.async.} =
let distances = lookupDistances(target, dst.id)
@ -1025,6 +980,58 @@ proc queryRandom*(p: PortalProtocol): Future[seq[Node]] =
## Perform a query for a random target, return all nodes discovered.
p.query(NodeId.random(p.baseProtocol.rng[]))
proc neighborhoodGossip*(
p: PortalProtocol, contentKeys: ContentKeysList, content: seq[byte])
{.async.} =
let
# for now only 1 item is considered
contentInfo = ContentInfo(contentKey: contentKeys[0], content: content)
contentList = List[ContentInfo, contentKeysLimit].init(@[contentInfo])
contentIdOpt = p.toContentId(contentInfo.contentKey)
if contentIdOpt.isNone():
return
let contentId = contentIdOpt.get()
# Doing an lookup over the network to get the very closest nodes to the
# content, instead of looking only at our own routing table. This should give
# a bigger rate of success in case the content is not known yet and avoid
# data being stopped in its propagation. However, perhaps this causes issues
# in data getting propagated in a wider id range.
let closestNodes = await p.lookup(NodeId(contentId))
for node in closestNodes[0..7]: # selecting closest 8 nodes
# Note: opportunistically not checking if the radius of the node is known
# and thus if the node is in radius with the content.
let req = OfferRequest(dst: node, kind: Direct, contentList: contentList)
await p.offerQueue.addLast(req)
proc processContent(
stream: PortalStream, contentKeys: ContentKeysList, content: seq[byte])
{.gcsafe, raises: [Defect].} =
let p = getUserData[PortalProtocol](stream)
# TODO:
# - Implement a way to discern different content items (e.g. length prefixed)
# - Check amount of content items according to ContentKeysList
# - The above could also live in `PortalStream`
# For now we only consider 1 item being offered
if contentKeys.len() == 1:
let contentKey = contentKeys[0]
if p.validateContent(content, contentKey):
let contentIdOpt = p.toContentId(contentKey)
if contentIdOpt.isNone():
return
let contentId = contentIdOpt.get()
# Store content, should we recheck radius?
p.contentDB.put(contentId, content)
asyncSpawn neighborhoodGossip(p, contentKeys, content)
else:
error "Received invalid content", contentKey
proc seedTable*(p: PortalProtocol) =
## Seed the table with specifically provided Portal bootstrap nodes. These are
## nodes that must support the wire protocol for the specific content network.

View File

@ -30,7 +30,7 @@ type
BlockDataTable* = Table[string, BlockData]
proc readBlockData*(dataFile: string): Result[BlockDataTable, string] =
proc readBlockDataTable*(dataFile: string): Result[BlockDataTable, string] =
let blockData = readAllFile(dataFile)
if blockData.isErr(): # TODO: map errors
return err("Failed reading data-file")
@ -54,84 +54,92 @@ iterator blockHashes*(blockData: BlockDataTable): BlockHash =
yield blockHash
proc readBlockData(
hash: string, blockData: BlockData, verify = false):
Result[seq[(ContentKey, seq[byte])], string] =
var res: seq[(ContentKey, seq[byte])]
var rlp =
try:
rlpFromHex(blockData.rlp)
except ValueError as e:
return err("Invalid hex for rlp block data, number " &
$blockData.number & ": " & e.msg)
# The data is currently formatted as an rlp encoded `EthBlock`, thus
# containing header, txs and uncles: [header, txs, uncles]. No receipts are
# available.
# TODO: Change to format to rlp data as it gets stored and send over the
# network over the network. I.e. [header, [txs, uncles], receipts]
if rlp.enterList():
var blockHash: BlockHash
try:
blockHash.data = hexToByteArray[sizeof(BlockHash)](hash)
except ValueError as e:
return err("Invalid hex for blockhash, number " &
$blockData.number & ": " & e.msg)
let contentKeyType =
ContentKeyType(chainId: 1'u16, blockHash: blockHash)
try:
# If wanted the hash for the corresponding header can be verified
if verify:
if keccak256.digest(rlp.rawData()) != blockHash:
return err("Data is not matching hash, number " & $blockData.number)
block:
let contentKey = ContentKey(
contentType: blockHeader,
blockHeaderKey: contentKeyType)
res.add((contentKey, @(rlp.rawData())))
rlp.skipElem()
block:
let contentKey = ContentKey(
contentType: blockBody,
blockBodyKey: contentKeyType)
# Note: Temporary until the data format gets changed.
let blockBody = BlockBody(
transactions: rlp.read(seq[Transaction]),
uncles: rlp.read(seq[BlockHeader]))
let rlpdata = encode(blockBody)
res.add((contentKey, rlpdata))
# res.add((contentKey, @(rlp.rawData())))
# rlp.skipElem()
# Note: No receipts yet in the data set
# block:
# let contentKey = ContentKey(
# contentType: receipts,
# receiptsKey: contentKeyType)
# res.add((contentKey, @(rlp.rawData())))
# rlp.skipElem()
except RlpError as e:
return err("Invalid rlp data, number " & $blockData.number & ": " & e.msg)
ok(res)
else:
err("Item is not a valid rlp list, number " & $blockData.number)
iterator blocks*(
blockData: BlockDataTable, verify = false): seq[(ContentKey, seq[byte])] =
for k,v in blockData:
var res: seq[(ContentKey, seq[byte])]
let res = readBlockData(k, v, verify)
var rlp =
try:
rlpFromHex(v.rlp)
except ValueError as e:
error "Invalid hex for rlp data", error = e.msg, number = v.number
continue
# The data is currently formatted as an rlp encoded `EthBlock`, thus
# containing header, txs and uncles: [header, txs, uncles]. No receipts are
# available.
# TODO: Change to format to rlp data as it gets stored and send over the
# network over the network. I.e. [header, [txs, uncles], receipts]
if rlp.enterList():
var blockHash: BlockHash
try:
blockHash.data = hexToByteArray[sizeof(BlockHash)](k)
except ValueError as e:
error "Invalid hex for block hash", error = e.msg, number = v.number
continue
let contentKeyType =
ContentKeyType(chainId: 1'u16, blockHash: blockHash)
try:
# If wanted the hash for the corresponding header can be verified
if verify:
if keccak256.digest(rlp.rawData()) != blockHash:
error "Data is not matching hash, skipping", number = v.number
continue
block:
let contentKey = ContentKey(
contentType: blockHeader,
blockHeaderKey: contentKeyType)
res.add((contentKey, @(rlp.rawData())))
rlp.skipElem()
block:
let contentKey = ContentKey(
contentType: blockBody,
blockBodyKey: contentKeyType)
# Note: Temporary until the data format gets changed.
let blockBody = BlockBody(
transactions: rlp.read(seq[Transaction]),
uncles: rlp.read(seq[BlockHeader]))
let rlpdata = encode(blockBody)
res.add((contentKey, rlpdata))
# res.add((contentKey, @(rlp.rawData())))
# rlp.skipElem()
# Note: No receipts yet in the data set
# block:
# let contentKey = ContentKey(
# contentType: receipts,
# receiptsKey: contentKeyType)
# res.add((contentKey, @(rlp.rawData())))
# rlp.skipElem()
except RlpError as e:
error "Invalid rlp data", number = v.number, error = e.msg
continue
yield res
if res.isOk():
yield res.get()
else:
error "Item is not a valid rlp list", number = v.number
error "Failed reading block from block data", error = res.error
proc populateHistoryDb*(
db: ContentDB, dataFile: string, verify = false): Result[void, string] =
let blockData = ? readBlockData(dataFile)
let blockData = ? readBlockDataTable(dataFile)
for b in blocks(blockData, verify):
for value in b:
@ -143,22 +151,51 @@ proc populateHistoryDb*(
proc propagateHistoryDb*(
p: PortalProtocol, dataFile: string, verify = false):
Future[Result[void, string]] {.async.} =
let blockData = readBlockData(dataFile)
let blockData = readBlockDataTable(dataFile)
if blockData.isOk():
for b in blocks(blockData.get(), verify):
for value in b:
# Note: This is the slowest part due to the hashing that takes place.
p.contentDB.put(history_content.toContentId(value[0]), value[1])
let contentId = history_content.toContentId(value[0])
if p.inRange(contentId):
p.contentDB.put(contentId, value[1])
# TODO: This call will get the content we just stored in the db, so it
# might be an improvement to directly pass it.
await p.neighborhoodGossip(ContentKeysList(@[encode(value[0])]))
await p.neighborhoodGossip(
ContentKeysList(@[encode(value[0])]), value[1])
# Need to be sure that all offers where started. TODO: this is not great.
while not p.offerQueueEmpty():
error "WAITING FOR OFFER QUEUE EMPTY"
await sleepAsync(500.milliseconds)
return ok()
else:
return err(blockData.error)
proc propagateBlockHistoryDb*(
p: PortalProtocol, dataFile: string, blockHash: string, verify = false):
Future[Result[void, string]] {.async.} =
let blockDataTable = readBlockDataTable(dataFile)
if blockDataTable.isOk():
let b =
try:
blockDataTable.get()[blockHash]
except KeyError:
return err("Block hash not found in block data file")
let blockDataRes = readBlockData(blockHash, b)
if blockDataRes.isErr:
return err(blockDataRes.error)
let blockData = blockDataRes.get()
for value in blockData:
let contentId = history_content.toContentId(value[0])
if p.inRange(contentId):
p.contentDB.put(contentId, value[1])
await p.neighborhoodGossip(ContentKeysList(@[encode(value[0])]), value[1])
return ok()
else:
return err(blockDataTable.error)

View File

@ -1,3 +1,4 @@
## Portal History Network json-rpc debug & testing calls
proc portal_history_store(contentId: string, content: string): bool
proc portal_history_propagate(dataFile: string): bool
proc portal_history_propagateBlock(dataFile: string, blockHash: string): bool

View File

@ -34,3 +34,11 @@ proc installPortalDebugApiHandlers*(
return true
else:
raise newException(ValueError, $res.error)
rpcServer.rpc("portal_" & network & "_propagateBlock") do(
dataFile: string, blockHash: string) -> bool:
let res = await p.propagateBlockHistoryDb(dataFile, blockHash)
if res.isOk():
return true
else:
raise newException(ValueError, $res.error)

View File

@ -188,7 +188,7 @@ procSuite "Portal testnet tests":
check (await clients[0].portal_history_propagate(dataFile))
await clients[0].close()
let blockData = readBlockData(dataFile)
let blockData = readBlockDataTable(dataFile)
check blockData.isOk()
for client in clients:

View File

@ -11,6 +11,7 @@
{.push raises: [Defect].}
import
std/strutils,
confutils, chronicles, chronicles/topics_registry, stew/byteutils,
eth/common/eth_types,
../../nimbus/rpc/[hexstrings, rpc_types], ../../nimbus/errors,
@ -60,12 +61,12 @@ proc walkBlocks(client: RpcClient, startHash: Hash256) {.async.} =
let parentBlockOpt =
try:
await client.eth_getBlockByHash(parentHash.ethHashStr(), false)
except ValidationError as e:
except RpcPostError as e:
# RpcPostError when for example timing out on the request. Could retry
# in this case.
fatal "Error occured on JSON-RPC request", error = e.msg
quit 1
except RpcPostError as e:
except ValidationError as e:
# ValidationError from buildBlockObject, should not occur with proper
# blocks
fatal "Error occured on JSON-RPC request", error = e.msg
@ -89,8 +90,8 @@ proc walkBlocks(client: RpcClient, startHash: Hash256) {.async.} =
blockNumber = parentBlock.number.get().string
parentHash = parentBlock.parentHash
echo "Block number: " & blockNumber
echo "Block hash: " & "0x" & parentBlock.hash.get().data.toHex()
echo "Block " & $blockNumber.parseHexInt() & ": " &
parentBlock.hash.get().data.toHex()
proc run(config: BlockWalkConf) {.async.} =
let client = newRpcHttpClient()