766 lines
26 KiB
Nim
766 lines
26 KiB
Nim
# Fluffy
|
|
# Copyright (c) 2021-2024 Status Research & Development GmbH
|
|
# Licensed and distributed under either of
|
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
|
|
|
{.push raises: [].}
|
|
|
|
import
|
|
results,
|
|
chronos,
|
|
chronicles,
|
|
eth/trie/ordered_trie,
|
|
eth/common/[hashes, headers_rlp, blocks_rlp, receipts_rlp, transactions_rlp],
|
|
eth/p2p/discoveryv5/[protocol, enr],
|
|
../../common/common_types,
|
|
../../database/content_db,
|
|
../../network_metadata,
|
|
../wire/[portal_protocol, portal_stream, portal_protocol_config],
|
|
"."/[history_content, validation/historical_hashes_accumulator],
|
|
../beacon/beacon_chain_historical_roots,
|
|
./content/content_deprecated
|
|
|
|
from eth/common/eth_types_rlp import rlpHash
|
|
from eth/common/accounts import EMPTY_ROOT_HASH
|
|
|
|
logScope:
|
|
topics = "portal_hist"
|
|
|
|
export historical_hashes_accumulator, blocks_rlp
|
|
|
|
type
|
|
HistoryNetwork* = ref object
|
|
portalProtocol*: PortalProtocol
|
|
contentDB*: ContentDB
|
|
contentQueue*: AsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])]
|
|
accumulator*: FinishedHistoricalHashesAccumulator
|
|
historicalRoots*: HistoricalRoots
|
|
processContentLoop: Future[void]
|
|
statusLogLoop: Future[void]
|
|
contentRequestRetries: int
|
|
|
|
Block* = (Header, BlockBody)
|
|
|
|
func toContentIdHandler(contentKey: ContentKeyByteList): results.Opt[ContentId] =
|
|
ok(toContentId(contentKey))
|
|
|
|
## Calls to go from SSZ decoded Portal types to RLP fully decoded EL types
|
|
|
|
func fromPortalBlockBody*(
|
|
T: type BlockBody, body: PortalBlockBodyLegacy
|
|
): Result[T, string] =
|
|
## Get the EL BlockBody from the SSZ-decoded `PortalBlockBodyLegacy`.
|
|
try:
|
|
var transactions: seq[Transaction]
|
|
for tx in body.transactions:
|
|
transactions.add(rlp.decode(tx.asSeq(), Transaction))
|
|
|
|
let uncles = rlp.decode(body.uncles.asSeq(), seq[Header])
|
|
|
|
ok(BlockBody(transactions: transactions, uncles: uncles))
|
|
except RlpError as e:
|
|
err("RLP decoding failed: " & e.msg)
|
|
|
|
func fromPortalBlockBody*(
|
|
T: type BlockBody, body: PortalBlockBodyShanghai
|
|
): Result[T, string] =
|
|
## Get the EL BlockBody from the SSZ-decoded `PortalBlockBodyShanghai`.
|
|
try:
|
|
var transactions: seq[Transaction]
|
|
for tx in body.transactions:
|
|
transactions.add(rlp.decode(tx.asSeq(), Transaction))
|
|
|
|
var withdrawals: seq[Withdrawal]
|
|
for w in body.withdrawals:
|
|
withdrawals.add(rlp.decode(w.asSeq(), Withdrawal))
|
|
|
|
ok(
|
|
BlockBody(
|
|
transactions: transactions,
|
|
uncles: @[], # Uncles must be empty, this is verified in `validateBlockBody`
|
|
withdrawals: Opt.some(withdrawals),
|
|
)
|
|
)
|
|
except RlpError as e:
|
|
err("RLP decoding failed: " & e.msg)
|
|
|
|
func fromPortalBlockBodyOrRaise*(
|
|
T: type BlockBody, body: PortalBlockBodyLegacy | PortalBlockBodyShanghai
|
|
): T =
|
|
## Get the EL BlockBody from one of the SSZ-decoded Portal BlockBody types.
|
|
## Will raise Assertion in case of invalid RLP encodings. Only use of data
|
|
## has been validated before!
|
|
let res = BlockBody.fromPortalBlockBody(body)
|
|
if res.isOk():
|
|
res.get()
|
|
else:
|
|
raiseAssert(res.error)
|
|
|
|
func fromPortalReceipts*(
|
|
T: type seq[Receipt], receipts: PortalReceipts
|
|
): Result[T, string] =
|
|
## Get the full decoded EL seq[Receipt] from the SSZ-decoded `PortalReceipts`.
|
|
try:
|
|
var res: seq[Receipt]
|
|
for receipt in receipts:
|
|
res.add(rlp.decode(receipt.asSeq(), Receipt))
|
|
|
|
ok(res)
|
|
except RlpError as e:
|
|
err("RLP decoding failed: " & e.msg)
|
|
|
|
## Calls to encode EL block types to the SSZ encoded Portal types.
|
|
|
|
# TODO: The fact that we have different Portal BlockBody types for the different
|
|
# forks but not for the EL BlockBody (usage of Option) does not play so well
|
|
# together.
|
|
|
|
func fromBlockBody*(T: type PortalBlockBodyLegacy, body: BlockBody): T =
|
|
var transactions: Transactions
|
|
for tx in body.transactions:
|
|
discard transactions.add(TransactionByteList(rlp.encode(tx)))
|
|
|
|
let uncles = Uncles(rlp.encode(body.uncles))
|
|
|
|
PortalBlockBodyLegacy(transactions: transactions, uncles: uncles)
|
|
|
|
func fromBlockBody*(T: type PortalBlockBodyShanghai, body: BlockBody): T =
|
|
var transactions: Transactions
|
|
for tx in body.transactions:
|
|
discard transactions.add(TransactionByteList(rlp.encode(tx)))
|
|
|
|
let uncles = Uncles(rlp.encode(body.uncles))
|
|
|
|
doAssert(body.withdrawals.isSome())
|
|
|
|
var withdrawals: Withdrawals
|
|
for w in body.withdrawals.get():
|
|
discard withdrawals.add(WithdrawalByteList(rlp.encode(w)))
|
|
PortalBlockBodyShanghai(
|
|
transactions: transactions, uncles: uncles, withdrawals: withdrawals
|
|
)
|
|
|
|
func fromReceipts*(T: type PortalReceipts, receipts: seq[Receipt]): T =
|
|
var portalReceipts: PortalReceipts
|
|
for receipt in receipts:
|
|
discard portalReceipts.add(ReceiptByteList(rlp.encode(receipt)))
|
|
|
|
portalReceipts
|
|
|
|
func encode*(blockBody: BlockBody): seq[byte] =
|
|
if blockBody.withdrawals.isSome():
|
|
SSZ.encode(PortalBlockBodyShanghai.fromBlockBody(blockBody))
|
|
else:
|
|
SSZ.encode(PortalBlockBodyLegacy.fromBlockBody(blockBody))
|
|
|
|
func encode*(receipts: seq[Receipt]): seq[byte] =
|
|
let portalReceipts = PortalReceipts.fromReceipts(receipts)
|
|
|
|
SSZ.encode(portalReceipts)
|
|
|
|
## Calls and helper calls to do validation of block header, body and receipts
|
|
# TODO: Failures on validation and perhaps deserialisation should be punished
|
|
# for if/when peer scoring/banning is added.
|
|
|
|
func validateBlockHeader*(header: Header, blockHash: Hash32): Result[void, string] =
|
|
if not (header.rlpHash() == blockHash):
|
|
err("Block header hash does not match")
|
|
else:
|
|
ok()
|
|
|
|
func validateBlockHeader*(header: Header, number: uint64): Result[void, string] =
|
|
if not (header.number == number):
|
|
err("Block header number does not match")
|
|
else:
|
|
ok()
|
|
|
|
func validateBlockHeaderBytes*(
|
|
bytes: openArray[byte], id: uint64 | Hash32
|
|
): Result[Header, string] =
|
|
let header = ?decodeRlp(bytes, Header)
|
|
|
|
# Note:
|
|
# One could do additional quick-checks here such as timestamp vs the optional
|
|
# (later forks) added fields. E.g. Shanghai field, Cancun fields,
|
|
# zero ommersHash, etc.
|
|
# However, the block hash comparison will obviously catch these and it is
|
|
# pretty trivial to provide a non-canonical valid header.
|
|
# It might be somewhat more useful if just done (temporarily) for the headers
|
|
# post-merge which are currently provided without proof.
|
|
# For comparison by number this is obviously not sufficient as any other field
|
|
# could be manipulated and because of this a block header proof will always
|
|
# be needed.
|
|
|
|
?header.validateBlockHeader(id)
|
|
|
|
ok(header)
|
|
|
|
template append*(w: var RlpWriter, v: TransactionByteList) =
|
|
w.appendRawBytes(v.asSeq)
|
|
|
|
template append*(w: var RlpWriter, v: WithdrawalByteList) =
|
|
w.appendRawBytes(v.asSeq)
|
|
|
|
template append*(w: var RlpWriter, v: ReceiptByteList) =
|
|
w.appendRawBytes(v.asSeq)
|
|
|
|
proc validateBlockBody*(
|
|
body: PortalBlockBodyLegacy, header: Header
|
|
): Result[void, string] =
|
|
## Validate the block body against the txRoot and ommersHash from the header.
|
|
let calculatedOmmersHash = keccak256(body.uncles.asSeq())
|
|
if calculatedOmmersHash != header.ommersHash:
|
|
return err("Invalid ommers hash")
|
|
|
|
let calculatedTxsRoot = orderedTrieRoot(body.transactions.asSeq)
|
|
if calculatedTxsRoot != header.txRoot:
|
|
return err(
|
|
"Invalid transactions root: expected " & $header.txRoot & " - got " &
|
|
$calculatedTxsRoot
|
|
)
|
|
|
|
ok()
|
|
|
|
proc validateBlockBody*(
|
|
body: PortalBlockBodyShanghai, header: Header
|
|
): Result[void, string] =
|
|
## Validate the block body against the txRoot, ommersHash and withdrawalsRoot
|
|
## from the header.
|
|
# Shortcut the ommersHash calculation as uncles must be an RLP encoded
|
|
# empty list
|
|
if body.uncles.asSeq() != @[byte 0xc0]:
|
|
return err("Invalid ommers hash, uncles list is not empty")
|
|
|
|
let calculatedTxsRoot = orderedTrieRoot(body.transactions.asSeq)
|
|
if calculatedTxsRoot != header.txRoot:
|
|
return err(
|
|
"Invalid transactions root: expected " & $header.txRoot & " - got " &
|
|
$calculatedTxsRoot
|
|
)
|
|
|
|
# TODO: This check is done higher up but perhaps this can become cleaner with
|
|
# some refactor.
|
|
doAssert(header.withdrawalsRoot.isSome())
|
|
|
|
let
|
|
calculatedWithdrawalsRoot = orderedTrieRoot(body.withdrawals.asSeq)
|
|
headerWithdrawalsRoot = header.withdrawalsRoot.get()
|
|
if calculatedWithdrawalsRoot != headerWithdrawalsRoot:
|
|
return err(
|
|
"Invalid withdrawals root: expected " & $headerWithdrawalsRoot & " - got " &
|
|
$calculatedWithdrawalsRoot
|
|
)
|
|
|
|
ok()
|
|
|
|
proc decodeBlockBodyBytes*(bytes: openArray[byte]): Result[BlockBody, string] =
|
|
if (let body = decodeSsz(bytes, PortalBlockBodyShanghai); body.isOk()):
|
|
BlockBody.fromPortalBlockBody(body.get())
|
|
elif (let body = decodeSsz(bytes, PortalBlockBodyLegacy); body.isOk()):
|
|
BlockBody.fromPortalBlockBody(body.get())
|
|
else:
|
|
err("All Portal block body decodings failed")
|
|
|
|
proc validateBlockBodyBytes*(
|
|
bytes: openArray[byte], header: Header
|
|
): Result[BlockBody, string] =
|
|
## Fully decode the SSZ encoded Portal Block Body and validate it against the
|
|
## header.
|
|
## TODO: improve this decoding in combination with the block body validation
|
|
## calls.
|
|
let timestamp = Moment.init(header.timestamp.int64, Second)
|
|
# TODO: The additional header checks are not needed as header is implicitly
|
|
# verified by means of the accumulator? Except that we don't use this yet
|
|
# post merge, so the checks are still useful, for now.
|
|
if isShanghai(chainConfig, timestamp):
|
|
if header.withdrawalsRoot.isNone():
|
|
err("Expected withdrawalsRoot for Shanghai block")
|
|
elif header.ommersHash != EMPTY_UNCLE_HASH:
|
|
err("Expected empty uncles for a Shanghai block")
|
|
else:
|
|
let body = ?decodeSsz(bytes, PortalBlockBodyShanghai)
|
|
?validateBlockBody(body, header)
|
|
BlockBody.fromPortalBlockBody(body)
|
|
elif isPoSBlock(chainConfig, header.number):
|
|
if header.withdrawalsRoot.isSome():
|
|
err("Expected no withdrawalsRoot for pre Shanghai block")
|
|
elif header.ommersHash != EMPTY_UNCLE_HASH:
|
|
err("Expected empty uncles for a PoS block")
|
|
else:
|
|
let body = ?decodeSsz(bytes, PortalBlockBodyLegacy)
|
|
?validateBlockBody(body, header)
|
|
BlockBody.fromPortalBlockBody(body)
|
|
else:
|
|
if header.withdrawalsRoot.isSome():
|
|
err("Expected no withdrawalsRoot for pre Shanghai block")
|
|
else:
|
|
let body = ?decodeSsz(bytes, PortalBlockBodyLegacy)
|
|
?validateBlockBody(body, header)
|
|
BlockBody.fromPortalBlockBody(body)
|
|
|
|
proc validateReceipts*(
|
|
receipts: PortalReceipts, receiptsRoot: Hash32
|
|
): Result[void, string] =
|
|
if orderedTrieRoot(receipts.asSeq) != receiptsRoot:
|
|
err("Unexpected receipt root")
|
|
else:
|
|
ok()
|
|
|
|
proc validateReceiptsBytes*(
|
|
bytes: openArray[byte], receiptsRoot: Hash32
|
|
): Result[seq[Receipt], string] =
|
|
## Fully decode the SSZ encoded receipts and validate it against the header's
|
|
## receipts root.
|
|
let receipts = ?decodeSsz(bytes, PortalReceipts)
|
|
|
|
?validateReceipts(receipts, receiptsRoot)
|
|
|
|
seq[Receipt].fromPortalReceipts(receipts)
|
|
|
|
## Content helper calls for specific history network types
|
|
|
|
proc getContent(
|
|
n: HistoryNetwork,
|
|
T: type Header,
|
|
contentKey: ContentKeyByteList,
|
|
contentId: ContentId,
|
|
): Opt[T] =
|
|
let localContent = n.portalProtocol.getLocalContent(contentKey, contentId).valueOr:
|
|
return Opt.none(T)
|
|
|
|
let headerWithProof =
|
|
try:
|
|
SSZ.decode(localContent, BlockHeaderWithProof)
|
|
except SerializationError as e:
|
|
raiseAssert(e.msg)
|
|
|
|
let res = decodeRlp(headerWithProof.header.asSeq(), T)
|
|
if res.isErr():
|
|
raiseAssert(res.error)
|
|
else:
|
|
Opt.some(res.get())
|
|
|
|
proc getContent(
|
|
n: HistoryNetwork,
|
|
T: type BlockBody,
|
|
contentKey: ContentKeyByteList,
|
|
contentId: ContentId,
|
|
header: Header,
|
|
): Opt[T] =
|
|
let localContent = n.portalProtocol.getLocalContent(contentKey, contentId).valueOr:
|
|
return Opt.none(T)
|
|
|
|
let
|
|
timestamp = Moment.init(header.timestamp.int64, Second)
|
|
body =
|
|
if isShanghai(chainConfig, timestamp):
|
|
BlockBody.fromPortalBlockBodyOrRaise(
|
|
decodeSszOrRaise(localContent, PortalBlockBodyShanghai)
|
|
)
|
|
elif isPoSBlock(chainConfig, header.number):
|
|
BlockBody.fromPortalBlockBodyOrRaise(
|
|
decodeSszOrRaise(localContent, PortalBlockBodyLegacy)
|
|
)
|
|
else:
|
|
BlockBody.fromPortalBlockBodyOrRaise(
|
|
decodeSszOrRaise(localContent, PortalBlockBodyLegacy)
|
|
)
|
|
|
|
Opt.some(body)
|
|
|
|
proc getContent(
|
|
n: HistoryNetwork,
|
|
T: type seq[Receipt],
|
|
contentKey: ContentKeyByteList,
|
|
contentId: ContentId,
|
|
): Opt[T] =
|
|
let localContent = n.portalProtocol.getLocalContent(contentKey, contentId).valueOr:
|
|
return Opt.none(T)
|
|
|
|
let portalReceipts =
|
|
try:
|
|
SSZ.decode(localContent, PortalReceipts)
|
|
except SerializationError:
|
|
raiseAssert("Stored data should always be serialized correctly")
|
|
|
|
let res = T.fromPortalReceipts(portalReceipts)
|
|
if res.isErr():
|
|
raiseAssert(res.error)
|
|
else:
|
|
Opt.some(res.get())
|
|
|
|
proc getContent(
|
|
n: HistoryNetwork,
|
|
T: type EpochRecord,
|
|
contentKey: ContentKeyByteList,
|
|
contentId: ContentId,
|
|
): Opt[T] =
|
|
let localContent = n.portalProtocol.getLocalContent(contentKey, contentId).valueOr:
|
|
return Opt.none(T)
|
|
|
|
try:
|
|
Opt.some(SSZ.decode(localContent, T))
|
|
except SerializationError:
|
|
raiseAssert("Stored data should always be serialized correctly")
|
|
|
|
## Public API to get the history network specific types, either from database
|
|
## or through a lookup on the Portal Network
|
|
|
|
# TODO: Currently doing retries on lookups but only when the validation fails.
|
|
# This is to avoid nodes that provide garbage from blocking us with getting the
|
|
# requested data. Might want to also do that on a failed lookup, as perhaps this
|
|
# could occur when being really unlucky with nodes timing out on requests.
|
|
# Additionally, more improvements could be done with the lookup, as currently
|
|
# ongoing requests are cancelled after the receival of the first response,
|
|
# however that response is not yet validated at that moment.
|
|
|
|
func verifyHeader(
|
|
n: HistoryNetwork, header: Header, proof: BlockHeaderProof
|
|
): Result[void, string] =
|
|
verifyHeader(n.accumulator, header, proof)
|
|
|
|
proc getVerifiedBlockHeader*(
|
|
n: HistoryNetwork, id: Hash32 | uint64
|
|
): Future[Opt[Header]] {.async: (raises: [CancelledError]).} =
|
|
let
|
|
contentKey = blockHeaderContentKey(id).encode()
|
|
contentId = history_content.toContentId(contentKey)
|
|
|
|
logScope:
|
|
id
|
|
contentKey
|
|
|
|
# Note: This still requests a BlockHeaderWithProof from the database, as that
|
|
# is what is stored. But the proof doesn't need to be verified as it gets
|
|
# gets verified before storing.
|
|
let headerFromDb = n.getContent(Header, contentKey, contentId)
|
|
if headerFromDb.isSome():
|
|
info "Fetched block header from database"
|
|
return headerFromDb
|
|
|
|
for i in 0 ..< (1 + n.contentRequestRetries):
|
|
let
|
|
headerContent = (await n.portalProtocol.contentLookup(contentKey, contentId)).valueOr:
|
|
warn "Failed fetching block header with proof from the network"
|
|
return Opt.none(Header)
|
|
|
|
headerWithProof = decodeSsz(headerContent.content, BlockHeaderWithProof).valueOr:
|
|
warn "Failed decoding header with proof", error = error
|
|
continue
|
|
|
|
header = validateBlockHeaderBytes(headerWithProof.header.asSeq(), id).valueOr:
|
|
warn "Validation of block header failed", error = error
|
|
continue
|
|
|
|
if (let r = n.verifyHeader(header, headerWithProof.proof); r.isErr):
|
|
warn "Verification of block header failed", error = r.error
|
|
continue
|
|
|
|
info "Fetched valid block header from the network"
|
|
# Content is valid, it can be stored and propagated to interested peers
|
|
n.portalProtocol.storeContent(
|
|
contentKey, contentId, headerContent.content, cacheContent = true
|
|
)
|
|
n.portalProtocol.triggerPoke(
|
|
headerContent.nodesInterestedInContent, contentKey, headerContent.content
|
|
)
|
|
|
|
return Opt.some(header)
|
|
|
|
# Headers were requested `1 + requestRetries` times and all failed on validation
|
|
return Opt.none(Header)
|
|
|
|
proc getBlockBody*(
|
|
n: HistoryNetwork, blockHash: Hash32, header: Header
|
|
): Future[Opt[BlockBody]] {.async: (raises: [CancelledError]).} =
|
|
if header.txRoot == EMPTY_ROOT_HASH and header.ommersHash == EMPTY_UNCLE_HASH:
|
|
# Short path for empty body indicated by txRoot and ommersHash
|
|
return Opt.some(BlockBody(transactions: @[], uncles: @[]))
|
|
|
|
let
|
|
contentKey = blockBodyContentKey(blockHash).encode()
|
|
contentId = contentKey.toContentId()
|
|
|
|
logScope:
|
|
blockHash
|
|
contentKey
|
|
|
|
let bodyFromDb = n.getContent(BlockBody, contentKey, contentId, header)
|
|
if bodyFromDb.isSome():
|
|
info "Fetched block body from database"
|
|
return bodyFromDb
|
|
|
|
for i in 0 ..< (1 + n.contentRequestRetries):
|
|
let
|
|
bodyContent = (await n.portalProtocol.contentLookup(contentKey, contentId)).valueOr:
|
|
warn "Failed fetching block body from the network"
|
|
return Opt.none(BlockBody)
|
|
|
|
body = validateBlockBodyBytes(bodyContent.content, header).valueOr:
|
|
warn "Validation of block body failed", error
|
|
continue
|
|
|
|
info "Fetched block body from the network"
|
|
# Content is valid, it can be stored and propagated to interested peers
|
|
n.portalProtocol.storeContent(
|
|
contentKey, contentId, bodyContent.content, cacheContent = true
|
|
)
|
|
n.portalProtocol.triggerPoke(
|
|
bodyContent.nodesInterestedInContent, contentKey, bodyContent.content
|
|
)
|
|
|
|
return Opt.some(body)
|
|
|
|
# Bodies were requested `1 + requestRetries` times and all failed on validation
|
|
return Opt.none(BlockBody)
|
|
|
|
proc getBlock*(
|
|
n: HistoryNetwork, id: Hash32 | uint64
|
|
): Future[Opt[Block]] {.async: (raises: [CancelledError]).} =
|
|
debug "Trying to retrieve block", id
|
|
|
|
# Note: Using `getVerifiedBlockHeader` instead of getBlockHeader even though
|
|
# proofs are not necessiarly needed, in order to avoid having to inject
|
|
# also the original type into the network.
|
|
let
|
|
header = (await n.getVerifiedBlockHeader(id)).valueOr:
|
|
warn "Failed to get header when getting block", id
|
|
return Opt.none(Block)
|
|
hash =
|
|
when id is Hash32:
|
|
id
|
|
else:
|
|
header.rlpHash()
|
|
body = (await n.getBlockBody(hash, header)).valueOr:
|
|
warn "Failed to get body when getting block", hash
|
|
return Opt.none(Block)
|
|
|
|
return Opt.some((header, body))
|
|
|
|
proc getBlockHashByNumber*(
|
|
n: HistoryNetwork, blockNumber: uint64
|
|
): Future[Result[Hash32, string]] {.async: (raises: [CancelledError]).} =
|
|
let header = (await n.getVerifiedBlockHeader(blockNumber)).valueOr:
|
|
return err("Cannot retrieve block header for given block number")
|
|
|
|
ok(header.rlpHash())
|
|
|
|
proc getReceipts*(
|
|
n: HistoryNetwork, blockHash: Hash32, header: Header
|
|
): Future[Opt[seq[Receipt]]] {.async: (raises: [CancelledError]).} =
|
|
if header.receiptsRoot == EMPTY_ROOT_HASH:
|
|
# Short path for empty receipts indicated by receipts root
|
|
return Opt.some(newSeq[Receipt]())
|
|
|
|
let
|
|
contentKey = receiptsContentKey(blockHash).encode()
|
|
contentId = contentKey.toContentId()
|
|
|
|
logScope:
|
|
blockHash
|
|
contentKey
|
|
|
|
let receiptsFromDb = n.getContent(seq[Receipt], contentKey, contentId)
|
|
if receiptsFromDb.isSome():
|
|
info "Fetched receipts from database"
|
|
return receiptsFromDb
|
|
|
|
for i in 0 ..< (1 + n.contentRequestRetries):
|
|
let
|
|
receiptsContent = (await n.portalProtocol.contentLookup(contentKey, contentId)).valueOr:
|
|
warn "Failed fetching receipts from the network"
|
|
return Opt.none(seq[Receipt])
|
|
receipts = validateReceiptsBytes(receiptsContent.content, header.receiptsRoot).valueOr:
|
|
warn "Validation of receipts failed", error
|
|
continue
|
|
|
|
info "Fetched receipts from the network"
|
|
# Content is valid, it can be stored and propagated to interested peers
|
|
n.portalProtocol.storeContent(
|
|
contentKey, contentId, receiptsContent.content, cacheContent = true
|
|
)
|
|
n.portalProtocol.triggerPoke(
|
|
receiptsContent.nodesInterestedInContent, contentKey, receiptsContent.content
|
|
)
|
|
|
|
return Opt.some(receipts)
|
|
|
|
# Receipts were requested `1 + requestRetries` times and all failed on validation
|
|
return Opt.none(seq[Receipt])
|
|
|
|
proc validateContent(
|
|
n: HistoryNetwork, content: seq[byte], contentKey: ContentKeyByteList
|
|
): Future[bool] {.async: (raises: [CancelledError]).} =
|
|
let key = contentKey.decode().valueOr:
|
|
return false
|
|
|
|
case key.contentType
|
|
of blockHeader:
|
|
let
|
|
headerWithProof = decodeSsz(content, BlockHeaderWithProof).valueOr:
|
|
warn "Failed decoding header with proof", error
|
|
return false
|
|
header = validateBlockHeaderBytes(
|
|
headerWithProof.header.asSeq(), key.blockHeaderKey.blockHash
|
|
).valueOr:
|
|
warn "Invalid block header offered", error
|
|
return false
|
|
|
|
let res = n.verifyHeader(header, headerWithProof.proof)
|
|
if res.isErr():
|
|
warn "Failed on check if header is part of canonical chain", error = res.error
|
|
return false
|
|
else:
|
|
return true
|
|
of blockBody:
|
|
let header = (await n.getVerifiedBlockHeader(key.blockBodyKey.blockHash)).valueOr:
|
|
warn "Failed getting canonical header for block"
|
|
return false
|
|
|
|
let res = validateBlockBodyBytes(content, header)
|
|
if res.isErr():
|
|
warn "Failed validating block body", error = res.error
|
|
return false
|
|
else:
|
|
return true
|
|
of receipts:
|
|
let header = (await n.getVerifiedBlockHeader(key.receiptsKey.blockHash)).valueOr:
|
|
warn "Failed getting canonical header for receipts"
|
|
return false
|
|
|
|
let res = validateReceiptsBytes(content, header.receiptsRoot)
|
|
if res.isErr():
|
|
warn "Failed validating receipts", error = res.error
|
|
return false
|
|
else:
|
|
return true
|
|
of blockNumber:
|
|
let
|
|
headerWithProof = decodeSsz(content, BlockHeaderWithProof).valueOr:
|
|
warn "Failed decoding header with proof", error
|
|
return false
|
|
header = validateBlockHeaderBytes(
|
|
headerWithProof.header.asSeq(), key.blockNumberKey.blockNumber
|
|
).valueOr:
|
|
warn "Invalid block header offered", error
|
|
return false
|
|
|
|
let res = n.verifyHeader(header, headerWithProof.proof)
|
|
if res.isErr():
|
|
warn "Failed on check if header is part of canonical chain", error = res.error
|
|
return false
|
|
else:
|
|
return true
|
|
|
|
proc new*(
|
|
T: type HistoryNetwork,
|
|
portalNetwork: PortalNetwork,
|
|
baseProtocol: protocol.Protocol,
|
|
contentDB: ContentDB,
|
|
streamManager: StreamManager,
|
|
accumulator: FinishedHistoricalHashesAccumulator,
|
|
historicalRoots: HistoricalRoots = loadHistoricalRoots(),
|
|
bootstrapRecords: openArray[Record] = [],
|
|
portalConfig: PortalProtocolConfig = defaultPortalProtocolConfig,
|
|
contentRequestRetries = 1,
|
|
): T =
|
|
let
|
|
contentQueue = newAsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])](50)
|
|
|
|
stream = streamManager.registerNewStream(contentQueue)
|
|
|
|
portalProtocol = PortalProtocol.new(
|
|
baseProtocol,
|
|
getProtocolId(portalNetwork, PortalSubnetwork.history),
|
|
toContentIdHandler,
|
|
createGetHandler(contentDB),
|
|
createStoreHandler(contentDB, portalConfig.radiusConfig),
|
|
createRadiusHandler(contentDB),
|
|
stream,
|
|
bootstrapRecords,
|
|
config = portalConfig,
|
|
)
|
|
|
|
HistoryNetwork(
|
|
portalProtocol: portalProtocol,
|
|
contentDB: contentDB,
|
|
contentQueue: contentQueue,
|
|
accumulator: accumulator,
|
|
historicalRoots: historicalRoots,
|
|
contentRequestRetries: contentRequestRetries,
|
|
)
|
|
|
|
proc validateContent(
|
|
n: HistoryNetwork, contentKeys: ContentKeysList, contentItems: seq[seq[byte]]
|
|
): Future[bool] {.async: (raises: [CancelledError]).} =
|
|
# content passed here can have less items then contentKeys, but not more.
|
|
for i, contentItem in contentItems:
|
|
let contentKey = contentKeys[i]
|
|
if await n.validateContent(contentItem, contentKey):
|
|
let contentId = n.portalProtocol.toContentId(contentKey).valueOr:
|
|
error "Received offered content with invalid content key", contentKey
|
|
return false
|
|
|
|
n.portalProtocol.storeContent(contentKey, contentId, contentItem)
|
|
|
|
info "Received offered content validated successfully", contentKey
|
|
else:
|
|
error "Received offered content failed validation", contentKey
|
|
return false
|
|
|
|
return true
|
|
|
|
proc processContentLoop(n: HistoryNetwork) {.async: (raises: []).} =
|
|
try:
|
|
while true:
|
|
let (srcNodeId, contentKeys, contentItems) = await n.contentQueue.popFirst()
|
|
|
|
# When there is one invalid content item, all other content items are
|
|
# dropped and not gossiped around.
|
|
# TODO: Differentiate between failures due to invalid data and failures
|
|
# due to missing network data for validation.
|
|
if await n.validateContent(contentKeys, contentItems):
|
|
asyncSpawn n.portalProtocol.neighborhoodGossipDiscardPeers(
|
|
srcNodeId, contentKeys, contentItems
|
|
)
|
|
except CancelledError:
|
|
trace "processContentLoop canceled"
|
|
|
|
proc statusLogLoop(n: HistoryNetwork) {.async: (raises: []).} =
|
|
try:
|
|
while true:
|
|
info "History network status",
|
|
routingTableNodes = n.portalProtocol.routingTable.len()
|
|
|
|
await sleepAsync(60.seconds)
|
|
except CancelledError:
|
|
trace "statusLogLoop canceled"
|
|
|
|
proc start*(n: HistoryNetwork) =
|
|
info "Starting Portal execution history network",
|
|
protocolId = n.portalProtocol.protocolId,
|
|
accumulatorRoot = hash_tree_root(n.accumulator)
|
|
|
|
n.portalProtocol.start()
|
|
|
|
n.processContentLoop = processContentLoop(n)
|
|
n.statusLogLoop = statusLogLoop(n)
|
|
pruneDeprecatedAccumulatorRecords(n.accumulator, n.contentDB)
|
|
|
|
proc stop*(n: HistoryNetwork) {.async: (raises: []).} =
|
|
info "Stopping Portal execution history network"
|
|
|
|
var futures: seq[Future[void]]
|
|
futures.add(n.portalProtocol.stop())
|
|
|
|
if not n.processContentLoop.isNil:
|
|
futures.add(n.processContentLoop.cancelAndWait())
|
|
if not n.statusLogLoop.isNil:
|
|
futures.add(n.statusLogLoop.cancelAndWait())
|
|
await noCancel(allFutures(futures))
|
|
|
|
n.processContentLoop = nil
|
|
n.statusLogLoop = nil
|