Add network retries for fluffy block requests (#1153)

- Add retries when the network request failed on validation of
block header, body or receipts.
- Shuffle the first set of neighbours that the request is send to
in order to not always hit the same peer first for the same content
- Some general clean-up
This commit is contained in:
Kim De Mey 2022-07-05 14:42:55 +02:00 committed by GitHub
parent 2b4baff8ec
commit facd7342fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 118 additions and 94 deletions

View File

@ -176,7 +176,7 @@ proc validateBlockBodyBytes*(
try: try:
SSZ.decode(bytes, BlockBodySSZ) SSZ.decode(bytes, BlockBodySSZ)
except SszError as e: except SszError as e:
return err("Failed to decode block body" & e.msg) return err("Failed to decode block body: " & e.msg)
? validateBlockBody(body, txRoot, ommersHash) ? validateBlockBody(body, txRoot, ommersHash)
@ -199,7 +199,7 @@ proc validateReceiptsBytes*(
try: try:
SSZ.decode(bytes, ReceiptsSSZ) SSZ.decode(bytes, ReceiptsSSZ)
except SszError as e: except SszError as e:
return err("Failed to decode receipts" & e.msg) return err("Failed to decode receipts: " & e.msg)
? validateReceipts(receipts, receiptsRoot) ? validateReceipts(receipts, receiptsRoot)
@ -262,6 +262,15 @@ proc getContentFromDb(
## Public API to get the history network specific types, either from database ## Public API to get the history network specific types, either from database
## or through a lookup on the Portal Network ## or through a lookup on the Portal Network
const requestRetries = 4
# TODO: Currently doing 4 retries on lookups but only when the validation fails.
# This is to avoid nodes that provide garbage from blocking us with getting the
# requested data. Might want to also do that on a failed lookup, as perhaps this
# could occur when being really unlucky with nodes timing out on requests.
# Additionally, more improvements could be done with the lookup, as currently
# ongoing requests are cancelled after the receival of the first response,
# however that response is not yet validated at that moment.
proc getBlockHeader*( proc getBlockHeader*(
h: HistoryNetwork, chainId: uint16, hash: BlockHash): h: HistoryNetwork, chainId: uint16, hash: BlockHash):
Future[Option[BlockHeader]] {.async.} = Future[Option[BlockHeader]] {.async.} =
@ -273,71 +282,72 @@ proc getBlockHeader*(
info "Fetched block header from database", hash info "Fetched block header from database", hash
return headerFromDb return headerFromDb
let headerContentLookup = for i in 0..<requestRetries:
await h.portalProtocol.contentLookup(keyEncoded, contentId) let headerContentLookup =
if headerContentLookup.isNone(): await h.portalProtocol.contentLookup(keyEncoded, contentId)
warn "Failed fetching block header from the network", hash if headerContentLookup.isNone():
return none(BlockHeader) warn "Failed fetching block header from the network", hash
return none(BlockHeader)
let headerContent = headerContentLookup.unsafeGet() let headerContent = headerContentLookup.unsafeGet()
let res = validateBlockHeaderBytes(headerContent.content, hash) let res = validateBlockHeaderBytes(headerContent.content, hash)
# TODO: If the validation fails, a new request could be done. if res.isOk():
if res.isOk(): info "Fetched block header from the network", hash
info "Fetched block header from the network", hash # Content is valid we can propagate it to interested peers
# Content is valid we can propagate it to interested peers h.portalProtocol.triggerPoke(
h.portalProtocol.triggerPoke( headerContent.nodesInterestedInContent,
headerContent.nodesInterestedInContent, keyEncoded,
keyEncoded, headerContent.content
headerContent.content )
)
h.portalProtocol.storeContent(contentId, headerContent.content) h.portalProtocol.storeContent(contentId, headerContent.content)
return some(res.get()) return some(res.get())
else: else:
return none(BlockHeader) warn "Validation of block header failed", err = res.error, hash
# Headers were requested `requestRetries` times and all failed on validation
return none(BlockHeader)
proc getBlockBody*( proc getBlockBody*(
h: HistoryNetwork, h: HistoryNetwork, chainId: uint16, hash: BlockHash, header: BlockHeader):
chainId: uint16, Future[Option[BlockBody]] {.async.} =
hash: BlockHash,
header: BlockHeader):Future[Option[BlockBody]] {.async.} =
let let
(keyEncoded, contentId) = getEncodedKeyForContent(blockBody, chainId, hash) (keyEncoded, contentId) = getEncodedKeyForContent(blockBody, chainId, hash)
bodyFromDb = h.getContentFromDb(BlockBody, contentId) bodyFromDb = h.getContentFromDb(BlockBody, contentId)
if bodyFromDb.isSome(): if bodyFromDb.isSome():
info "Fetched block body from database", hash info "Fetched block body from database", hash
return some(bodyFromDb.unsafeGet()) return bodyFromDb
let bodyContentLookup = for i in 0..<requestRetries:
await h.portalProtocol.contentLookup(keyEncoded, contentId) let bodyContentLookup =
if bodyContentLookup.isNone(): await h.portalProtocol.contentLookup(keyEncoded, contentId)
warn "Failed fetching block body from the network", hash if bodyContentLookup.isNone():
return none(BlockBody) warn "Failed fetching block body from the network", hash
let bodyContent = bodyContentLookup.unsafeGet() let bodyContent = bodyContentLookup.unsafeGet()
let res = validateBlockBodyBytes( let res = validateBlockBodyBytes(
bodyContent.content, header.txRoot, header.ommersHash) bodyContent.content, header.txRoot, header.ommersHash)
if res.isErr(): if res.isOk():
return none(BlockBody) info "Fetched block body from the network", hash
info "Fetched block body from the network", hash # body is valid, propagate it to interested peers
h.portalProtocol.triggerPoke(
bodyContent.nodesInterestedInContent,
keyEncoded,
bodyContent.content
)
let blockBody = res.get() h.portalProtocol.storeContent(contentId, bodyContent.content)
# body is valid, propagate it to interested peers return some(res.get())
h.portalProtocol.triggerPoke( else:
bodyContent.nodesInterestedInContent, warn "Validation of block body failed", err = res.error, hash
keyEncoded,
bodyContent.content
)
h.portalProtocol.storeContent(contentId, bodyContent.content) return none(BlockBody)
return some(blockBody)
proc getBlock*( proc getBlock*(
h: HistoryNetwork, chainId: uint16, hash: BlockHash): h: HistoryNetwork, chainId: uint16, hash: BlockHash):
@ -356,7 +366,7 @@ proc getBlock*(
let body = bodyOpt.unsafeGet() let body = bodyOpt.unsafeGet()
return some[Block]((header, body)) return some((header, body))
proc getReceipts*( proc getReceipts*(
h: HistoryNetwork, h: HistoryNetwork,
@ -364,7 +374,7 @@ proc getReceipts*(
hash: BlockHash, hash: BlockHash,
header: BlockHeader): Future[Option[seq[Receipt]]] {.async.} = header: BlockHeader): Future[Option[seq[Receipt]]] {.async.} =
if header.receiptRoot == BLANK_ROOT_HASH: if header.receiptRoot == BLANK_ROOT_HASH:
# The header has no receipts, return early with empty receipts # Short path for empty receipts indicated by receipts root
return some(newSeq[Receipt]()) return some(newSeq[Receipt]())
let (keyEncoded, contentId) = getEncodedKeyForContent(receipts, chainId, hash) let (keyEncoded, contentId) = getEncodedKeyForContent(receipts, chainId, hash)
@ -373,34 +383,37 @@ proc getReceipts*(
if receiptsFromDb.isSome(): if receiptsFromDb.isSome():
info "Fetched receipts from database", hash info "Fetched receipts from database", hash
return some(receiptsFromDb.unsafeGet()) return receiptsFromDb
let receiptsContentLookup = for i in 0..<requestRetries:
await h.portalProtocol.contentLookup(keyEncoded, contentId) let receiptsContentLookup =
if receiptsContentLookup.isNone(): await h.portalProtocol.contentLookup(keyEncoded, contentId)
warn "Failed fetching receipts from the network", hash if receiptsContentLookup.isNone():
return none[seq[Receipt]]() warn "Failed fetching receipts from the network", hash
return none(seq[Receipt])
let receiptsContent = receiptsContentLookup.unsafeGet() let receiptsContent = receiptsContentLookup.unsafeGet()
let res = validateReceiptsBytes(receiptsContent.content, header.receiptRoot) let res = validateReceiptsBytes(receiptsContent.content, header.receiptRoot)
if res.isErr(): if res.isOk():
return none[seq[Receipt]]() info "Fetched receipts from the network", hash
info "Fetched receipts from the network", hash let receipts = res.get()
let receipts = res.get() # receipts are valid, propagate it to interested peers
h.portalProtocol.triggerPoke(
receiptsContent.nodesInterestedInContent,
keyEncoded,
receiptsContent.content
)
# receips are valid, propagate it to interested peers h.portalProtocol.storeContent(contentId, receiptsContent.content)
h.portalProtocol.triggerPoke(
receiptsContent.nodesInterestedInContent,
keyEncoded,
receiptsContent.content
)
h.portalProtocol.storeContent(contentId, receiptsContent.content) return some(res.get())
else:
warn "Validation of receipts failed", err = res.error, hash
return some(receipts) return none(seq[Receipt])
func validateEpochAccumulator(bytes: openArray[byte]): bool = func validateEpochAccumulator(bytes: openArray[byte]): bool =
# For now just validate by checking if de-serialization works # For now just validate by checking if de-serialization works

View File

@ -433,7 +433,7 @@ proc fromLogRadius(T: type UInt256, logRadius: uint16): T =
# Get the max value of the logRadius range # Get the max value of the logRadius range
pow((2).stuint(256), logRadius) - 1 pow((2).stuint(256), logRadius) - 1
proc getInitialRadius(rc: RadiusConfig): UInt256 = proc getInitialRadius(rc: RadiusConfig): UInt256 =
case rc.kind case rc.kind
of Static: of Static:
return UInt256.fromLogRadius(rc.logRadius) return UInt256.fromLogRadius(rc.logRadius)
@ -946,8 +946,11 @@ proc contentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256):
## target. Maximum value for n is `BUCKET_SIZE`. ## target. Maximum value for n is `BUCKET_SIZE`.
# `closestNodes` holds the k closest nodes to target found, sorted by distance # `closestNodes` holds the k closest nodes to target found, sorted by distance
# Unvalidated nodes are used for requests as a form of validation. # Unvalidated nodes are used for requests as a form of validation.
var closestNodes = p.routingTable.neighbours(targetId, BUCKET_SIZE, var closestNodes = p.routingTable.neighbours(
seenOnly = false) targetId, BUCKET_SIZE, seenOnly = false)
# Shuffling the order of the nodes in order to not always hit the same node
# first for the same request.
p.baseProtocol.rng[].shuffle(closestNodes)
var asked, seen = initHashSet[NodeId]() var asked, seen = initHashSet[NodeId]()
asked.incl(p.baseProtocol.localNode.id) # No need to ask our own node asked.incl(p.baseProtocol.localNode.id) # No need to ask our own node

View File

@ -235,40 +235,48 @@ proc installEthApiHandlers*(
# rpcServerWithProxy.rpc("eth_getTransactionReceipt") do( # rpcServerWithProxy.rpc("eth_getTransactionReceipt") do(
# data: EthHashStr) -> Option[ReceiptObject]: # data: EthHashStr) -> Option[ReceiptObject]:
rpcServerWithProxy.rpc("eth_getLogs") do(filterOptions: FilterOptions) -> seq[FilterLog]: rpcServerWithProxy.rpc("eth_getLogs") do(
filterOptions: FilterOptions) -> seq[FilterLog]:
if filterOptions.blockhash.isNone(): if filterOptions.blockhash.isNone():
# currently only queries with provided blockhash are supported. To support # Currently only queries by blockhash are supported.
# range queries it would require Indicies network. # To support range queries the Indicies network is required.
raise newException(ValueError, "Unsupported query. Field `blockhash` needs to be provided") raise newException(ValueError,
"Unsupported query: Only `blockhash` queries are currently supported")
else: else:
let hash = filterOptions.blockHash.unsafeGet() let hash = filterOptions.blockHash.unsafeGet()
let maybeHeader = await historyNetwork.getBlockHeader(1'u16, hash) let headerOpt = await historyNetwork.getBlockHeader(1'u16, hash)
if headerOpt.isNone():
raise newException(ValueError,
"Could not find header with requested hash")
if maybeHeader.isNone(): let header = headerOpt.unsafeGet()
raise newException(ValueError, "Could not find header with requested hash")
let header = maybeHeader.unsafeGet()
if headerBloomFilter(header, filterOptions.address, filterOptions.topics): if headerBloomFilter(header, filterOptions.address, filterOptions.topics):
# TODO: These queries could be done concurrently, investigate if there # TODO: These queries could be done concurrently, investigate if there
# are no assumptions about usage of concurrent queries on portal # are no assumptions about usage of concurrent queries on portal
# wire protocol level # wire protocol level
let maybeBody = await historyNetwork.getBlockBody(1'u16, hash, header) let
let maybeReceipts = await historyNetwork.getReceipts(1'u16, hash, header) bodyOpt = await historyNetwork.getBlockBody(1'u16, hash, header)
receiptsOpt = await historyNetwork.getReceipts(1'u16, hash, header)
if bodyOpt.isSome() and receiptsOpt.isSome():
let
body = bodyOpt.unsafeGet()
receipts = receiptsOpt.unsafeGet()
logs = deriveLogs(header, body.transactions, receipts)
filteredLogs = filterLogs(
logs, filterOptions.address, filterOptions.topics)
if maybeBody.isSome() and maybeReceipts.isSome():
let body = maybeBody.unsafeGet()
let receipts = maybeReceipts.unsafeGet()
let logs = deriveLogs(header, body.transactions, receipts)
let filteredLogs = filterLogs(logs, filterOptions.address, filterOptions.topics)
return filteredLogs return filteredLogs
else: else:
if maybeBody.isNone(): if bodyOpt.isNone():
raise newException(ValueError, "Could not find body for requested hash") raise newException(ValueError,
"Could not find block body for requested hash")
else: else:
raise newException(ValueError, "Could not find receipts for requested hash") raise newException(ValueError,
"Could not find receipts for requested hash")
else: else:
# bloomfilter returned false, we do known that there is no logs matching # bloomfilter returned false, we do known that there are no logs
# given criteria # matching the given criteria
return @[] return @[]