Squashed snap-sync-preview patch (#1076)

* Squashed snap-sync-preview patch

why:
  Providing end results makes it easier to have an overview.

  Collected patch set comments are available as nimbus/sync/ChangeLog.md
  in chronological order, oldest first.

* Removed some cruft and obsolete imports, normalised logging
This commit is contained in:
Jordan Hrycaj 2022-05-09 15:04:48 +01:00 committed by GitHub
parent 69366e1880
commit 58e0543920
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 3677 additions and 78 deletions

View File

@ -8,7 +8,7 @@
# those terms.
import
std/[tables, strutils,sequtils, options, times],
std/[tables, strutils, options, times],
eth/[common, rlp, p2p], stint, stew/[byteutils],
nimcrypto/hash,
json_serialization, chronicles,

View File

@ -351,6 +351,11 @@ type
defaultValueDesc: $ProtocolFlag.Eth
name: "protocols" .}: seq[string]
newSync* {.
desc: "Enable experimental new sync algorithms"
defaultValue: false
name: "new-sync" .}: bool
case cmd* {.
command
defaultValue: NimbusCmd.noCommand }: NimbusCmd

View File

@ -29,7 +29,7 @@ import
./p2p/[chain, blockchain_sync],
./p2p/clique/[clique_desc, clique_sealer],
./rpc/[common, debug, engine_api, jwt_auth, p2p],
./sync/protocol_ethxx,
./sync/[protocol_ethxx, protocol_snapxx, newsync],
./utils/tx_pool
when defined(evmc_enabled):
@ -124,6 +124,7 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
# Add protocol capabilities based on protocol flags
if ProtocolFlag.Eth in protocols:
nimbus.ethNode.addCapability eth
nimbus.ethNode.addCapability snap1
if ProtocolFlag.Les in protocols:
nimbus.ethNode.addCapability les
@ -135,15 +136,20 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
nimbus.chainRef.extraValidation = 0 < verifyFrom
nimbus.chainRef.verifyFrom = verifyFrom
# Early-initialise "--new-sync" before starting any network connections.
if ProtocolFlag.Eth in protocols and conf.newSync:
newSyncEarly(nimbus.ethNode)
# Connect directly to the static nodes
let staticPeers = conf.getStaticPeers()
for enode in staticPeers:
asyncCheck nimbus.ethNode.peerPool.connectToNode(newNode(enode))
asyncSpawn nimbus.ethNode.peerPool.connectToNode(newNode(enode))
# Start Eth node
if conf.maxPeers > 0:
nimbus.networkLoop = nimbus.ethNode.connectToNetwork(
enableDiscovery = conf.discovery != DiscoveryType.None)
enableDiscovery = conf.discovery != DiscoveryType.None,
waitForPeers = not conf.newSync)
proc localServices(nimbus: NimbusNode, conf: NimbusConf,
chainDB: BaseChainDB, protocols: set[ProtocolFlag]) =
@ -319,7 +325,14 @@ proc start(nimbus: NimbusNode, conf: NimbusConf) =
localServices(nimbus, conf, chainDB, protocols)
if ProtocolFlag.Eth in protocols and conf.maxPeers > 0:
nimbus.syncLoop = nimbus.ethNode.fastBlockchainSync()
# previously: nimbus.syncLoop = nimbus.ethNode.fastBlockchainSync()
# TODO: temp code until the CLI/RPC interface is fleshed out
if not conf.newSync:
let status = waitFor nimbus.ethNode.fastBlockchainSync()
if status != syncSuccess:
debug "Block sync failed: ", status
else:
newSync()
if nimbus.state == Starting:
# it might have been set to "Stopping" with Ctrl+C

View File

@ -253,24 +253,25 @@ proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} =
var dataReceived = false
try:
tracePacket ">> Sending eth.GetBlockHeaders (0x03)", peer,
startBlock=request.startBlock.number, max=request.maxResults
startBlock=request.startBlock.number, max=request.maxResults,
step=traceStep(request)
let results = await peer.getBlockHeaders(request)
if results.isSome:
tracePacket "<< Got reply eth.BlockHeaders (0x04)", peer,
count=results.get.headers.len
count=results.get.headers.len, requested=request.maxResults
shallowCopy(workItem.headers, results.get.headers)
var bodies = newSeqOfCap[BlockBody](workItem.headers.len)
var hashes = newSeqOfCap[KeccakHash](maxBodiesFetch)
template fetchBodies() =
tracePacket ">> Sending eth.GetBlockBodies (0x05)", peer,
count=hashes.len
hashes=hashes.len
let b = await peer.getBlockBodies(hashes)
if b.isNone:
raise newException(CatchableError, "Was not able to get the block bodies")
let bodiesLen = b.get.blocks.len
tracePacket "<< Got reply eth.BlockBodies (0x06)", peer,
count=bodiesLen
count=bodiesLen, requested=hashes.len
if bodiesLen == 0:
raise newException(CatchableError, "Zero block bodies received for request")
elif bodiesLen < hashes.len:
@ -369,7 +370,7 @@ proc startSyncWithPeer(ctx: SyncContext, peer: Peer) {.async.} =
# We have enough trusted peers. Validate new peer against trusted
if await peersAgreeOnChain(peer, ctx.randomTrustedPeer()):
ctx.trustedPeers.incl(peer)
asyncCheck ctx.obtainBlocksFromPeer(peer)
asyncSpawn ctx.obtainBlocksFromPeer(peer)
elif ctx.trustedPeers.len == 0:
# Assume the peer is trusted, but don't start sync until we reevaluate
# it with more peers
@ -405,7 +406,7 @@ proc startSyncWithPeer(ctx: SyncContext, peer: Peer) {.async.} =
if ctx.trustedPeers.len == minPeersToStartSync:
for p in ctx.trustedPeers:
asyncCheck ctx.obtainBlocksFromPeer(p)
asyncSpawn ctx.obtainBlocksFromPeer(p)
proc onPeerConnected(ctx: SyncContext, peer: Peer) =

View File

@ -49,6 +49,8 @@ type
blockZeroHash: KeccakHash ##\
## Overload cache for `genesisHash()` method
blockZeroStateRoot: KeccakHash
extraValidation: bool ##\
## Trigger extra validation, currently within `persistBlocks()`
## function only.
@ -143,7 +145,9 @@ func calculateForkIds(c: ChainConfig,
proc setForkId(c: Chain)
{. raises: [Defect,CatchableError].} =
c.blockZeroHash = c.db.toGenesisHeader.blockHash
let blockZero = c.db.toGenesisHeader
c.blockZeroHash = blockZero.blockHash
c.blockZeroStateRoot = blockZero.stateRoot
let genesisCRC = crc32(0, c.blockZeroHash.data)
c.forkIds = calculateForkIds(c.db.config, genesisCRC)
@ -210,6 +214,10 @@ method genesisHash*(c: Chain): KeccakHash {.gcsafe.} =
## Getter: `AbstractChainDB` overload method
c.blockZeroHash
method genesisStateRoot*(c: Chain): KeccakHash {.gcsafe, base.} =
## Getter: `AbstractChainDB` overloadable base method
c.blockZeroStateRoot
method getBestBlockHeader*(c: Chain): BlockHeader
{.gcsafe, raises: [Defect,CatchableError].} =
## Getter: `AbstractChainDB` overload method

View File

@ -12,10 +12,10 @@ import
stew/[objects, results, byteutils],
json_rpc/[rpcserver, errors],
web3/[conversions, engine_api_types], chronicles,
eth/[trie, rlp, common, trie/db],
eth/[rlp, common],
".."/db/db_chain,
".."/p2p/chain/[chain_desc, persist_blocks],
".."/[sealer, utils, constants],
".."/[sealer, constants],
".."/merge/[mergetypes, mergeutils]
import eth/common/eth_types except BlockHeader

View File

@ -28,8 +28,7 @@ import
clique_sealer],
./p2p/[gaslimit, validate],
"."/[chain_config, utils, context],
"."/utils/tx_pool,
"."/merge/mergetypes
"."/utils/tx_pool
from web3/ethtypes as web3types import nil
from web3/engine_api_types import PayloadAttributesV1, ExecutionPayloadV1

456
nimbus/sync/ChangeLog.md Normal file
View File

@ -0,0 +1,456 @@
# Collected change log from Jamie's snap branch squash merge
The comments are collected in chronological order, oldest first (as opposed to
squash merge order which is oldest last.)
## Sync: Rapidly find and track peer canonical heads
First component of new sync approach.
This module fetches and tracks the canonical chain head of each connected
peer. (Or in future, each peer we care about; we won't poll them all so
often.)
This is for when we aren't sure of the block number of a peer's canonical
chain head. Most of the time, after finding which block, it quietly polls
to track small updates to the "best" block number and hash of each peer.
But sometimes that can get out of step. If there has been a deeper reorg
than our tracking window, or a burst of more than a few new blocks, network
delays, downtime, or the peer is itself syncing. Perhaps we stopped Nimbus
and restarted a while later, e.g. suspending a laptop or Control-Z. Then
this will catch up. It is even possible that the best hash the peer gave us
in the `Status` handshake has disappeared by the time we query for the
corresponding block number, so we start at zero.
The steps here perform a robust and efficient O(log N) search to rapidly
converge on the new best block if it's moved out of the polling window no
matter where it starts, confirm the peer's canonical chain head boundary,
then track the peer's chain head in real-time by polling. The method is
robust to peer state changes at any time.
The purpose is to:
- Help with finding a peer common chain prefix ("fast sync pivot") in a
consistent, fast and explicit way.
- Catch up quickly after any long pauses of network downtime, program not
running, or deep chain reorgs.
- Be able to display real-time peer states, so they are less mysterious.
- Tell the beam/snap/trie sync processes when to start and what blocks to
fetch, and keep those fetchers in the head-adjacent window of the
ever-changing chain.
- Help the sync process bootstrap usefully when we only have one peer,
speculatively fetching and validating what data we can before we have more
peers to corroborate the consensus.
- Help detect consensus failures in the network.
We cannot assume a peer's canonical chain stays the same or only gains new
blocks from one query to the next. There can be reorgs, including deep
reorgs. When a reorg happens, the best block number can decrease if the new
canonical chain is shorter than the old one, and the best block hash we
previously knew can become unavailable on the peer. So we must detect when
the current best block disappears and be able to reduce block number.
## Config: Add `--new-sync` option and use it
This option enables new blockchain sync and real-time consensus algorithms that
will eventually replace the old, very limited sync.
New sync is work in progress. It's included as an option rather than a code
branch, because it's more useful for testing this way, and must not conflict
anyway. It's off by default. Eventually this will become enabled by default
and the option will be removed.
## Tracing: New `traceGossips` category, tidy calls to other categories
- `traceGossips` has been added, because on some networks there are so many
transaction messages, it is difficult to see other activity.
- Adds a trace macro corresponding to each of the packet tracing categories
`traceTimeouts`, `traceNetworkErrors` and `tracePacketErrors`. Improves
readability of code using them, and API consistency.
## Sync: Move `tracePacket` etc into sync_types.nim
Move the templates `tracePacket`, `traceGossip` , `traceTimeout`,
`traceNetworkError` and `tracePacketError` from protocol_eth65 to
sync_types.
The reason for moving is they are also needed for `snap` protocol calls.
## Config: Add `--new-sync` option and use it
This option enables new blockchain sync and real-time consensus algorithms that
will eventually replace the old, very limited sync.
New sync is work in progress. It's included as an option rather than a code
branch, because it's more useful for testing this way, and must not conflict
anyway. It's off by default. Eventually this will become enabled by default
and the option will be removed.
## Sync: Chain head: Promote peer chain head updates to debug level
This way, you can see peer chain head updates at `--log-level:DEBUG` without
being flooded by trace messages.
These occur about once every 15 seconds from each good peer.
## Sync: Chain head: Rate limit "blocked overlapping" error states
Under some conditions when a peer is not responding (but stays connected),
these messages happen continuously. Don't output them and don't waste CPU
trying.
## Sync: Update protocol code to use `BlockHash`, `TxHash`, `NodeHash`
New hash type aliases added and used. They're not `distinct` because that
would be disruptive, but perhaps they will be eventually, when code is
harmonised around using them.
Changes:
- Use `BlockHash` more consistently, to match the rest of the sync code.
- Use `BlockNumber` where currently `uint64` is used in the protocol (and
`uint` was used before that, which was 32-bit on 32-bit targets).
- New alias `TxHash` is for transactions and is used in
`NewPooledTransactionHashes` and `GetPooledTransactions`.
- New alias `NodeHash` is for trie nodes (or contract bytecode)
and is used in `GetNodeData`.
## Sync: Set and update `syncStateRoot` for each peer
State syncing requires the `stateRoot` value of the selected block to sync to.
The chain head tracker selects a block and uses `block.stateRoot`. State sync
reads that value to sync to. It can change at any time, but that's ok, the
state sync algorithm is designed around that idea.
Aside from getting an initial `stateRoot`, the regular updates are essential
because state sync is so slow.
On Mainnet, it is normal for the initial selected block to become too old
before state sync is complete, and then peers stop providing data in their
replies. The solution is for `stateRoot` to be updated by the chain head
tracker so it's always recent enough. (On Goerli and a fast peer we can fetch
the whole state just in time without this.)
There are a number of issues with the simple implementation here:
- The selected `stateRoot` block shouldn't be the most recent canonical head,
because it is prone to change due to small reorgs. It should be a more stable
block choice, slightly further back in time.
However, any block close to the head is reasonably harmless during the state
"snap" phase. Small block differences cause a small state delta, which are
patched automatically during "heal" traversals.
- During the state "heal" phase, `stateRoot` should not be updated on every
block change, because it disrupts the "heal" traversal when this happens.
It should be kept the same for longer, but not too long because the `snap/1`
protocol does not provide state older than 128 blocks ago.
So during "heal", `stateRoot` should be updated roughly every N blocks where
N is close to 128, except when the heal is disrupted due to chain reorgs
taking place or other loss of available state from the peer.
- During the state "heal" phase, `stateRoot` must be coordinated among all
the peers. This is because "heal" converges a patchwork of states from
different times into a unified point-in-time whole state, so that execution
can proceed using entirely local data from there.
## Sync: Add `genesisStateRoot` for state syncing
State syncing requires the `stateRoot` value of the selected block to sync to.
Normally the chain head tracker selects a block and uses `block.stateRoot`.
However, in some cases in test environments, the chain head tracker finds the
sync block is 0, the genesis block, without receiving that block from a peer.
Of course this only happens when connecting to peers that are on block 0
themselves, but it can happen and must be handled.
Perhaps we should not run state sync on block 0, and instead the local trie.
But to get the correct "flat" or "snap sync" style representation that requires
special code.
In order to exercise the state sync code and see how peers behave when block 0
is selected, and avoid special code, use the genesis `stateRoot` found locally,
and sync that state from peers like any other.
## Sync: New types `LeafPath`, `InteriorPath` and support functions
`InteriorPath` is a path to an interior node in an Ethereum hexary trie. This
is a sequence of 0 to 64 hex digits. 0 digits means the root node, and 64
digits means a leaf node whose path hasn't been converted to `LeafPath` yet.
`LeafPath` is a path to a leaf in an Ethereum hexary trie. Individually, each
leaf path is a hash, but rather than being the hash of the contents, it's the
hash of the item's address. Collectively, these hashes have some 256-bit
numerical properties: ordering, intervals and meaningful difference.
## Sync: Add `onGetNodeData`, `onNodeData` to `eth/65` protocol handler
These hooks allow new sync code to register to provide reply data or consume
incoming events without a circular import dependency involving `p2pProtocol`.
Without the hooks, the protocol file needs to import functions that consume
incoming network messages so the `p2pProtocol` can call them, and the functions
that produce outgoing network messages need to import the protocol file.
But related producer/consumer function pairs are typically located in the same
file because they are closely related. For example the producer of
`GetNodeData` and the consumer of `NodeData`.
In this specific case, we also need to break the `requestResponse` relationship
between `GetNodeData` and `NodeData` messages when pipelining.
There are other ways to accomplish this, but this way is most practical, and
it allows different protocol-using modules to coexist easily. When the hooks
aren't set, default behaviour is fine.
## Sync: Robust support for `GetNodeData` network calls
This module provides an async function to call `GetNodeData`, a request in
the Ethereum DevP2P/ETH network protocol. Parallel requests can be issued,
maintaining a pipeline.
Given a list of hashes, it returns a list of trie nodes or contract
bytecodes matching those hashes. The returned nodes may be any subset of
those requested, including an empty list. The returned nodes are not
necessarily in the same order as the request, so a mapping from request
items to node items is included. On timeout or error, `nil` is returned.
Only data passing hash verification is returned, so hashes don't need to be
verified again. No exceptions are raised, and no detail is returned about
timeouts or errors, but systematically formatted trace messages are output
if enabled, and show in detail if various events occur such as timeouts,
bad hashes, mixed replies, network errors, etc.
This tracks queued requests and individual request hashes, verifies received
node data hashes, and matches them against requests. When a peer replies in
same order as requests are sent, and each reply contains nodes in the same
order as requested, the matching process is efficient. It avoids storing
request details in a hash table when possible. If replies or nodes are out
of order, the process is still efficient but has to do a little more work.
Empty replies:
Empty replies are matched with requests using a queue draining technique.
After an empty reply is received, we temporarily pause further requests and
wait for more replies. After we receive all outstanding replies, we know
which requests the empty replies were for, and can complete those requests.
Eth/66 protocol:
Although Eth/66 simplifies by matching replies to requests, replies can still
have data out of order or missing, so hashes still need to be verified and
looked up. Much of the code here is still required for Eth/66.
References:
- [Ethereum Wire Protocol (ETH)](https://github.com/ethereum/devp2p/blob/master/caps/eth.md)
- [`GetNodeData` (0x0d)](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getnodedata-0x0d)
- [`NodeData` (0x0e)](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#nodedata-0x0e)
## Sync: Robustly parse trie nodes from network untrusted data
This module parses hexary trie nodes as used by Ethereum from data received
over the network. The data is untrusted, and a non-canonical RLP encoding
of the node must be rejected, so it is parsed carefully.
The caller provides bytes and context. Context includes node hash, trie
path, and a boolean saying if this trie node is child of an extension node.
The result of parsing is up to 16 child node hashes to follow, or up to 7
leaf nodes to decode.
The caller should ensure the bytes are verified against the hash before
calling this parser. Even though they pass the hash, they are still
untrusted bytes that must be parsed carefully, because the hash itself is
from an untrusted source.
`RlpError` exceptions may occur on some well-crafted adversarial input
due to the RLP reader implementation. They could be trapped and treated
like other parse errors, but they are not, to avoid the overhead of
`try..except` in the parser (which uses C `setjmp`). The caller should
put `try..except RlpError` outside its trie node parsing loop.
### Path range metadata benefits
Because data is handled in path ranges, this allows a compact metadata
representation of what data is stored locally and what isn't, compared with
the size of a representation of partially completed trie traversal with
`eth` `GetNodeData`. Due to the smaller metadata, after aborting a partial
sync and restarting, it is possible to resume quickly, without waiting for
the very slow local database scan associated with older versions of Geth.
However, Nimbus's sync method uses this principle as inspiration to
obtain similar metadata benefits whichever network protocol is used.
### Distributed hash table (DHT) building block
Although `snap` was designed for bootstrapping clients with the entire
Ethereum state, it is well suited to fetching only a subset of path ranges.
This may be useful for bootstrapping distributed hash tables (DHTs).
### Remote state and Beam sync benefits
`snap` was not intended for Beam sync, or "remote state on demand", used by
transactions executing locally that fetch state from the network instead of
local storage.
Even so, as a lucky accident `snap` allows individual states to be fetched
in fewer network round trips than `eth`. Often a single round trip,
compared with about 10 round trips per account query over `eth`. This is
because `eth` `GetNodeData` requires a trie traversal chasing hashes
sequentially, while `snap` `GetTrieNode` trie traversal can be done with
predictable paths.
Therefore `snap` can be used to accelerate remote states and Beam sync.
### Performance benefits
`snap` is used for much higher performance transfer of the entire Ethereum
execution state (accounts, storage, bytecode) compared with hexary trie
traversal using `eth` `GetNodeData`.
It improves both network and local storage performance. The benefits are
substantial, and summarised here:
- [Ethereum Snapshot Protocol (SNAP) - Expected results]
(https://github.com/ethereum/devp2p/blob/master/caps/snap.md)
- [Geth v1.10.0 - Snap sync]
(https://blog.ethereum.org/2021/03/03/geth-v1-10-0/#snap-sync)
In the Snap sync model, local storage benefits require clients to adopt a
different representation of Ethereum state than the trie storage that Geth
(and most clients) traditionally used, and still do in archive mode,
However, Nimbus's sync method obtains similar local storage benefits
whichever network protocol is used. Nimbus uses `snap` protocol because
it is a more efficient network protocol.
## Sync: Changes to `snap/1` protocol to match Geth parameters
The `snap/1` specification doesn't match reality. If we implement the
protocol as specified, Geth drops the peer connection. We must do as Geth
expects.
- `GetAccountRanges` and `GetStorageRanges` take parameters `origin` and
`limit`, instead of a single `startingHash` parameter in the
specification. `origin` and `limit` are 256-bit paths representing the
starting hash and ending trie path, both inclusive.
- If the `snap/1` specification is followed (omitting `limit`), Geth 1.10
disconnects immediately so we must follow this deviation.
- Results from either call may include one item with path `>= limit`. Geth
fetches data from its internal database until it reaches this condition or
the bytes threshold, then replies with what it fetched. Usually there is
no item at the exact path `limit`, so there is one after.
## Sync: Ethereum Snapshot Protocol (SNAP), version 1
This patch adds the `snap/1` protocol, documented at:
- [Ethereum Snapshot Protocol (SNAP)]
(https://github.com/ethereum/devp2p/blob/master/caps/snap.md).
This is just the protocol handlers, not the sync algorithm.
## Sync: Changes to `snap/1` protocol to match Geth `GetStorageRanges`
The main part of this part is to add a comment documenting quirky behaviour of
`GetStorageRanges` with Geth, and workarounds for the missing right-side proof.
The code change is smaller, but it does change the type of parameters `origin`
and limit` to `GetStorageRanges`. Trace messages are updated accordingly.
When calling a Geth peer with `GetStorageRanges`:
- Parameters `origin` and `limit` may each be empty blobs, which mean "all
zeros" (0x00000...) or "no limit" (0xfffff...) respectively.
(Blobs shorter than 32 bytes can also be given, and they are extended with
zero bytes; longer than 32 bytes can be given and are truncated, but this
is Geth being too accepting, and shouldn't be used.)
- In the `slots` reply, the last account's storage list may be empty even if
that account has non-empty storage.
This happens when the bytes threshold is reached just after finishing
storage for the previous account, or when `origin` is greater than the
first account's last storage slot. When either of these happens, `proof`
is non-empty. In the case of `origin` zero or empty, the non-empty proof
only contains the left-side boundary proof, because it meets the condition
for omitting the right-side proof described in the next point.
- In the `proof` reply, the right-side boundary proof is only included if
the last returned storage slot has non-zero path and `origin != 0`, or if
the result stops due to reaching the bytes threshold.
Because there's only one proof anyway if left-side and right-side are the
same path, this works out to mean the right-side proof is omitted in cases
where `origin == 0` and the result stops at a slot `>= limit` before
reaching the bytes threshold.
Although the specification doesn't say anything about `limit`, this is
against the spirit of the specification rule, which says the right-side
proof is always included if the last returned path differs from the
starting hash.
The omitted right-side proof can cause problems when using `limit`.
In other words, when doing range queries, or merging results from
pipelining where different `stateRoot` hashes are used as time progresses.
Workarounds:
- Fetch the proof using a second `GetStorageRanges` query with non-zero
`origin` (perhaps equal to `limit`; use `origin = 1` if `limit == 0`).
- Avoid the condition by using `origin >= 1` when using `limit`.
- Use trie node traversal (`snap` `GetTrieNodes` or `eth` `GetNodeData`)
to obtain the omitted proof.
- When multiple accounts are requested with `origin > 0`, only one account's
storage is returned. There is no point requesting multiple accounts with
`origin > 0`. (It might be useful if it treated `origin` as applying to
only the first account, but it doesn't.)
- When multiple accounts are requested with non-default `limit` and
`origin == 0`, and the first account result stops at a slot `>= limit`
before reaching the bytes threshold, storage for the other accounts in the
request are returned as well. The other accounts are not limited by
`limit`, only the bytes threshold. The right-side proof is omitted from
`proof` when this happens, because this is the same condition as described
earlier for omitting the right-side proof. (It might be useful if it
treated `origin` as applying to only the first account and `limit` to only
the last account, but it doesn't.)

View File

@ -0,0 +1,545 @@
# Nimbus - Rapidly converge on and track the canonical chain head of each peer
#
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed except according to those terms.
## This module fetches and tracks the canonical chain head of each connected
## peer. (Or in future, each peer we care about; we won't poll them all so
## often.)
##
## This is for when we aren't sure of the block number of a peer's canonical
## chain head. Most of the time, after finding which block, it quietly polls
## to track small updates to the "best" block number and hash of each peer.
##
## But sometimes that can get out of step. If there has been a deeper reorg
## than our tracking window, or a burst of more than a few new blocks, network
## delays, downtime, or the peer is itself syncing. Perhaps we stopped Nimbus
## and restarted a while later, e.g. suspending a laptop or Control-Z. Then
## this will catch up. It is even possible that the best hash the peer gave us
## in the `Status` handshake has disappeared by the time we query for the
## corresponding block number, so we start at zero.
##
## The steps here perform a robust and efficient O(log N) search to rapidly
## converge on the new best block if it's moved out of the polling window no
## matter where it starts, confirm the peer's canonical chain head boundary,
## then track the peer's chain head in real-time by polling. The method is
## robust to peer state changes at any time.
##
## The purpose is to:
##
## - Help with finding a peer common chain prefix ("fast sync pivot") in a
## consistent, fast and explicit way.
##
## - Catch up quickly after any long pauses of network downtime, program not
## running, or deep chain reorgs.
##
## - Be able to display real-time peer states, so they are less mysterious.
##
## - Tell the beam/snap/trie sync processes when to start and what blocks to
## fetch, and keep those fetchers in the head-adjacent window of the
## ever-changing chain.
##
## - Help the sync process bootstrap usefully when we only have one peer,
## speculatively fetching and validating what data we can before we have more
## peers to corroborate the consensus.
##
## - Help detect consensus failures in the network.
##
## We cannot assume a peer's canonical chain stays the same or only gains new
## blocks from one query to the next. There can be reorgs, including deep
## reorgs. When a reorg happens, the best block number can decrease if the new
## canonical chain is shorter than the old one, and the best block hash we
## previously knew can become unavailable on the peer. So we must detect when
## the current best block disappears and be able to reduce block number.
{.push raises: [Defect].}
import
std/bitops,
chronos, stint, chronicles, stew/byteutils,
eth/[common/eth_types, rlp, p2p],
eth/p2p/[rlpx, private/p2p_types],
../p2p/chain/chain_desc,
"."/[sync_types, protocol_ethxx, pie/slicer]
const
syncLockedMinimumReply = 8
## Minimum number of headers we assume any peers will send if they have
## them in contiguous ascending queries. Fewer than this confirms we have
## found the peer's canonical chain head boundary. Must be at least 2, and
## at least `syncLockedQueryOverlap+2` to stay `SyncLocked` when the chain
## extends. Should not be large as that would be stretching assumptions
## about peer implementations. 8 is chosen as it allows 3-deep extensions
## and 3-deep reorgs to be followed in a single round trip.
syncLockedQueryOverlap = 4
## Number of headers to re-query on each poll when `SyncLocked` so that we
## get small reorg updates in one round trip. Must be no more than
## `syncLockedMinimumReply-1`, no more than `syncLockedMinimumReply-2` to
## stay `SyncLocked` when the chain extends, and not too large to avoid
## excessive duplicate fetching. 4 is chosen as it allows 3-deep reorgs
## to be followed in single round trip.
syncLockedQuerySize = 192
## Query size when polling `SyncLocked`. Must be at least
## `syncLockedMinimumReply`. Large is fine, if we get a large reply the
## values are almost always useful.
syncHuntQuerySize = 16
## Query size when hunting for canonical head boundary. Small is good
## because we don't want to keep most of the headers at hunt time.
syncHuntForwardExpandShift = 4
## Expansion factor during `SyncHuntForward` exponential search.
## 16 is chosen for rapid convergence when bootstrapping or catching up.
syncHuntBackwardExpandShift = 1
## Expansion factor during `SyncHuntBackward` exponential search.
## 2 is chosen for better convergence when tracking a chain reorg.
doAssert syncLockedMinimumReply >= 2
doAssert syncLockedMinimumReply >= syncLockedQueryOverlap + 2
doAssert syncLockedQuerySize <= maxHeadersFetch
doAssert syncHuntQuerySize >= 1 and syncHuntQuerySize <= maxHeadersFetch
doAssert syncHuntForwardExpandShift >= 1 and syncHuntForwardExpandShift <= 8
doAssert syncHuntBackwardExpandShift >= 1 and syncHuntBackwardExpandShift <= 8
proc clearSyncStateRoot(sp: SyncPeer) =
if sp.syncStateRoot.isSome:
debug "Sync: Stopping state sync from this peer", peer=sp
sp.syncStateRoot = none(TrieHash)
proc setSyncStateRoot(sp: SyncPeer, blockNumber: BlockNumber,
blockHash: BlockHash, stateRoot: TrieHash) =
if sp.syncStateRoot.isNone:
debug "Sync: Starting state sync from this peer", peer=sp,
`block`=blockNumber, blockHash=($blockHash), stateRoot=($stateRoot)
elif sp.syncStateRoot.unsafeGet != stateRoot:
trace "Sync: Adjusting state sync root from this peer", peer=sp,
`block`=blockNumber, blockHash=($blockHash), stateRoot=($stateRoot)
sp.syncStateRoot = some(stateRoot)
if not sp.startedFetch:
sp.startedFetch = true
trace "Sync: Starting to download block state", peer=sp,
`block`=blockNumber, blockHash=($blockHash), stateRoot=($stateRoot)
asyncSpawn sp.stateFetch()
proc traceSyncLocked(sp: SyncPeer, bestNumber: BlockNumber,
bestHash: BlockHash) =
## Trace messages when peer canonical head is confirmed or updated.
if sp.syncMode != SyncLocked:
debug "Sync: Now tracking chain head of peer",
`block`=bestNumber, blockHash=($bestHash), peer=sp
elif bestNumber > sp.bestBlockNumber:
if bestNumber == sp.bestBlockNumber + 1:
debug "Sync: Peer chain head advanced one block", peer=sp,
advance=1, `block`=bestNumber, blockHash=($bestHash)
else:
debug "Sync: Peer chain head advanced some blocks", peer=sp,
advance=(sp.bestBlockNumber - bestNumber),
`block`=bestNumber, blockHash=($bestHash)
elif bestNumber < sp.bestBlockNumber or bestHash != sp.bestBlockHash:
debug "Sync: Peer chain head reorg detected", peer=sp,
advance=(sp.bestBlockNumber - bestNumber),
`block`=bestNumber, blockHash=($bestHash)
proc setSyncLocked(sp: SyncPeer, bestNumber: BlockNumber,
bestHash: BlockHash) =
## Actions to take when peer canonical head is confirmed or updated.
sp.traceSyncLocked(bestNumber, bestHash)
sp.bestBlockNumber = bestNumber
sp.bestBlockHash = bestHash
sp.syncMode = SyncLocked
proc setHuntBackward(sp: SyncPeer, lowestAbsent: BlockNumber) =
## Start exponential search mode backward due to new uncertainty.
sp.syncMode = SyncHuntBackward
sp.huntStep = 0
# Block zero is always present.
sp.huntLow = 0.toBlockNumber
# Zero `lowestAbsent` is never correct, but an incorrect peer could send it.
sp.huntHigh = if lowestAbsent > 0: lowestAbsent else: 1.toBlockNumber
sp.clearSyncStateRoot()
proc setHuntForward(sp: SyncPeer, highestPresent: BlockNumber) =
## Start exponential search mode forward due to new uncertainty.
sp.syncMode = SyncHuntForward
sp.huntStep = 0
sp.huntLow = highestPresent
sp.huntHigh = high(BlockNumber)
sp.clearSyncStateRoot()
proc updateHuntAbsent(sp: SyncPeer, lowestAbsent: BlockNumber) =
## Converge uncertainty range backward.
if lowestAbsent < sp.huntHigh:
sp.huntHigh = lowestAbsent
# If uncertainty range has moved outside the search window, change to hunt
# backward to block zero. Note that empty uncertainty range is allowed
# (empty range is `huntLow + 1 == huntHigh`).
if sp.huntHigh <= sp.huntLow:
sp.setHuntBackward(lowestAbsent)
sp.clearSyncStateRoot()
proc updateHuntPresent(sp: SyncPeer, highestPresent: BlockNumber) =
## Converge uncertainty range forward.
if highestPresent > sp.huntLow:
sp.huntLow = highestPresent
# If uncertainty range has moved outside the search window, change to hunt
# forward to no upper limit. Note that empty uncertainty range is allowed
# (empty range is `huntLow + 1 == huntHigh`).
if sp.huntLow >= sp.huntHigh:
sp.setHuntForward(highestPresent)
sp.clearSyncStateRoot()
proc peerSyncChainEmptyReply(sp: SyncPeer, request: BlocksRequest) =
## Handle empty `GetBlockHeaders` reply. This means `request.startBlock` is
## absent on the peer. If it was `SyncLocked` there must have been a reorg
## and the previous canonical chain head has disappeared. If hunting, this
## updates the range of uncertainty.
# Treat empty response to a request starting from block 1 as equivalent to
# length 1 starting from block 0 in `peerSyncChainNonEmptyReply`. We treat
# every peer as if it would send genesis for block 0, without asking for it.
if request.skip == 0 and not request.reverse and
not request.startBlock.isHash and
request.startBlock.number == 1.toBlockNumber:
sp.setSyncLocked(0.toBlockNumber, sp.peer.network.chain.genesisHash)
sp.setSyncStateRoot(0.toBlockNumber, sp.peer.network.chain.genesisHash,
sp.peer.network.chain.Chain.genesisStateRoot)
return
if sp.syncMode == SyncLocked or sp.syncMode == SyncOnlyHash:
inc sp.stats.ok.reorgDetected
trace "Sync: Peer reorg detected, best block disappeared", peer=sp,
`block`=request.startBlock
let lowestAbsent = request.startBlock.number
case sp.syncMode:
of SyncLocked:
# If this message doesn't change our knowledge, ignore it.
if lowestAbsent > sp.bestBlockNumber:
return
# Due to a reorg, peer's canonical head has lower block number, outside
# our tracking window. Sync lock is no longer valid. Switch to hunt
# backward to find the new canonical head.
sp.setHuntBackward(lowestAbsent)
of SyncOnlyHash:
# Due to a reorg, peer doesn't have the block hash it originally gave us.
# Switch to hunt forward from block zero to find the canonical head.
sp.setHuntForward(0.toBlockNumber)
of SyncHuntForward, SyncHuntBackward, SyncHuntRange, SyncHuntRangeFinal:
# Update the hunt range.
sp.updateHuntAbsent(lowestAbsent)
# Update best block number. It is invalid except when `SyncLocked`, but
# still useful as a hint of what we knew recently, for example in displays.
if lowestAbsent <= sp.bestBlockNumber:
sp.bestBlockNumber = if lowestAbsent == 0.toBlockNumber: lowestAbsent
else: lowestAbsent - 1.toBlockNumber
sp.bestBlockHash = default(typeof(sp.bestBlockHash))
proc peerSyncChainNonEmptyReply(sp: SyncPeer, request: BlocksRequest,
headers: openArray[BlockHeader]) =
## Handle non-empty `GetBlockHeaders` reply. This means `request.startBlock`
## is present on the peer and in its canonical chain (unless the request was
## made with a hash). If it's a short, contiguous, ascending order reply, it
## reveals the abrupt transition at the end of the chain and we have learned
## or reconfirmed the real-time head block. If hunting, this updates the
## range of uncertainty.
let len = headers.len
let highestIndex = if request.reverse: 0 else: len - 1
# We assume a short enough reply means we've learned the peer's canonical
# head, because it would have replied with another header if not at the head.
# This is not justified when the request used a general hash, because the
# peer doesn't have to reply with its canonical chain in that case, except it
# is still justified if the hash was the known canonical head, which is
# the case in a `SyncOnlyHash` request.
if len < syncLockedMinimumReply and
request.skip == 0 and not request.reverse and
len.uint < request.maxResults:
let blockHash = headers[highestIndex].blockHash
sp.setSyncLocked(headers[highestIndex].blockNumber, blockHash)
sp.setSyncStateRoot(headers[highestIndex].blockNumber, blockHash,
headers[highestIndex].stateRoot)
return
# Be careful, this number is from externally supplied data and arithmetic
# in the upward direction could overflow.
let highestPresent = headers[highestIndex].blockNumber
# A reply that isn't short enough for the canonical head criterion above
# tells us headers up to some number, but it doesn't tell us if there are
# more after it in the peer's canonical chain. We have to request more
# headers to find out.
case sp.syncMode:
of SyncLocked:
# If this message doesn't change our knowledge, ignore it.
if highestPresent <= sp.bestBlockNumber:
return
# Sync lock is no longer valid as we don't have confirmed canonical head.
# Switch to hunt forward to find the new canonical head.
sp.setHuntForward(highestPresent)
of SyncOnlyHash:
# As `SyncLocked` but without the block number check.
sp.setHuntForward(highestPresent)
of SyncHuntForward, SyncHuntBackward, SyncHuntRange, SyncHuntRangeFinal:
# Update the hunt range.
sp.updateHuntPresent(highestPresent)
# Update best block number. It is invalid except when `SyncLocked`, but
# still useful as a hint of what we knew recently, for example in displays.
if highestPresent > sp.bestBlockNumber:
sp.bestBlockNumber = highestPresent
sp.bestBlockHash = headers[highestIndex].blockHash
proc peerSyncChainRequest(sp: SyncPeer, request: var BlocksRequest) =
## Choose `GetBlockHeaders` parameters when hunting or following the canonical
## chain of a peer.
request = BlocksRequest(
startBlock: HashOrNum(isHash: false),
skip: 0,
reverse: false
)
if sp.syncMode == SyncLocked:
# Stable and locked. This is just checking for changes including reorgs.
# `sp.bestBlockNumber` was recently the head of the peer's canonical
# chain. We must include this block number to detect when the canonical
# chain gets shorter versus no change.
request.startBlock.number =
if sp.bestBlockNumber <= syncLockedQueryOverlap:
# Every peer should send genesis for block 0, so don't ask for it.
# `peerSyncChainEmptyReply` has logic to handle this reply as if it
# was for block 0. Aside from saving bytes, this is more robust if
# some client doesn't do genesis reply correctly.
1.toBlockNumber
else:
min(sp.bestBlockNumber - syncLockedQueryOverlap.toBlockNumber,
high(BlockNumber) - (syncLockedQuerySize - 1).toBlockNumber)
request.maxResults = syncLockedQuerySize
return
if sp.syncMode == SyncOnlyHash:
# We only have the hash of the recent head of the peer's canonical chain.
# Like `SyncLocked`, query more than one item to detect when the
# canonical chain gets shorter, no change or longer.
request.startBlock = HashOrNum(isHash: true, hash: sp.bestBlockHash)
request.maxResults = syncLockedQuerySize
return
# Searching for the peers's canonical head. An ascending query is always
# used, regardless of search direction. This is because a descending query
# (`reverse = true` and `maxResults > 1`) is useless for searching: Either
# `startBlock` is present, in which case the extra descending results
# contribute no more information about the canonical head boundary, or
# `startBlock` is absent in which case there are zero results. It's not
# defined in the `eth` specification that there must be zero results (in
# principle peers could return the lower numbered blocks), but in practice
# peers stop at the first absent block in the sequence from `startBlock`.
#
# Guaranteeing O(log N) time convergence in all scenarios requires some
# properties to be true in both exponential search (expanding) and
# quasi-binary search (converging in a range). The most important is that
# the gap to `startBlock` after `huntLow` and also before `huntHigh` are
# proportional to the query step, where the query step is `huntStep`
# exponentially expanding each round, or `maxStep` approximately evenly
# distributed in the range.
#
# `huntLow+1` must not be used consistently as the start, even with a large
# enough query step size, as that will sometimes take O(N) to converge in
# both the exponential and quasi-binary searches. (Ending at `huntHigh-1`
# is fine if `syncHuntQuerySize > 1`. This asymmetry is due to ascending
# queries (see earlier comment), and non-empty truncated query reply being
# proof of presence before the truncation point, but not proof of absence
# after it. A reply can be truncated just because the peer decides to.)
#
# The proportional gap requirement is why we divide by query size here,
# instead of stretching to fit more strictly with `(range-1)/(size-1)`.
const syncHuntFinalSize = max(2, syncHuntQuerySize)
var maxStep: typeof(request.skip)
let fullRangeClamped =
if sp.huntHigh <= sp.huntLow: typeof(maxStep)(0)
else: min(high(typeof(maxStep)).toBlockNumber,
sp.huntHigh - sp.huntLow).truncate(typeof(maxStep)) - 1
if fullRangeClamped >= syncHuntFinalSize: # `SyncHuntRangeFinal` condition.
maxStep = if syncHuntQuerySize == 1:
fullRangeClamped
elif (syncHuntQuerySize and (syncHuntQuerySize-1)) == 0:
fullRangeClamped shr fastLog2(syncHuntQuerySize)
else:
fullRangeClamped div syncHuntQuerySize
doAssert syncHuntFinalSize >= syncHuntQuerySize
doAssert maxStep >= 1 # Ensured by the above assertion.
# Check for exponential search (expanding). Iterate `huntStep`. O(log N)
# requires `startBlock` to be offset from `huntLow`/`huntHigh`.
if sp.syncMode in {SyncHuntForward, SyncHuntBackward} and
fullRangeClamped >= syncHuntFinalSize:
let forward = sp.syncMode == SyncHuntForward
let expandShift = if forward: syncHuntForwardExpandShift
else: syncHuntBackwardExpandShift
# Switches to range search when this condition is no longer true.
if sp.huntStep < maxStep shr expandShift:
# The `if` above means the next line cannot overflow.
sp.huntStep = if sp.huntStep > 0: sp.huntStep shl expandShift else: 1
# Satisfy the O(log N) convergence conditions.
request.startBlock.number =
if forward: sp.huntLow + sp.huntStep.toBlockNumber
else: sp.huntHigh - (sp.huntStep * syncHuntQuerySize).toBlockNumber
request.maxResults = syncHuntQuerySize
request.skip = sp.huntStep - 1
return
# For tracing/display.
sp.huntStep = maxStep
sp.syncMode = SyncHuntRange
if maxStep > 0:
# Quasi-binary search (converging in a range). O(log N) requires
# `startBlock` to satisfy the constraints described above, with the
# proportionality from both ends of the range. The optimal information
# gathering position is tricky and doesn't make much difference, so don't
# bother. We'll centre the query in the range.
var offset = fullRangeClamped - maxStep * (syncHuntQuerySize-1)
# Rounding must bias towards end to ensure `offset >= 1` after this.
offset -= offset shr 1
request.startBlock.number = sp.huntLow + offset.toBlockNumber
request.maxResults = syncHuntQuerySize
request.skip = maxStep - 1
else:
# Small range, final step. At `fullRange == 0` we must query at least one
# block before and after the range to confirm the canonical head boundary,
# or find it has moved. This ensures progress without getting stuck. When
# `fullRange` is small this is also beneficial, to get `SyncLocked` in one
# round trip from hereand it simplifies the other search branches below.
# Ideally the query is similar to `SyncLocked`, enough to get `SyncLocked`
# in one round trip, and accommodate a small reorg or extension.
const afterSoftMax = syncLockedMinimumReply - syncLockedQueryOverlap
const beforeHardMax = syncLockedQueryOverlap
let extra = syncHuntFinalSize - fullRangeClamped
var before = (extra + 1) shr 1
before = max(before + afterSoftMax, extra) - afterSoftMax
before = min(before, beforeHardMax)
# See `SyncLocked` case.
request.startBlock.number =
if sp.bestBlockNumber <= before.toBlockNumber: 1.toBlockNumber
else: min(sp.bestBlockNumber - before.toBlockNumber,
high(BlockNumber) - (syncHuntFinalSize - 1).toBlockNumber)
request.maxResults = syncHuntFinalSize
sp.syncMode = SyncHuntRangeFinal
proc peerSyncChainTrace(sp: SyncPeer) =
## To be called after `peerSyncChainRequest` has updated state.
case sp.syncMode:
of SyncLocked:
trace "Sync: SyncLocked",
bestBlock=sp.bestBlockNumber, bestBlockHash=($sp.bestBlockHash)
of SyncOnlyHash:
trace "Sync: OnlyHash", bestBlockHash=($sp.bestBlockHash)
of SyncHuntForward:
template highMax(n: BlockNumber): string =
if n == high(BlockNumber): "max" else: $n
trace "Sync: HuntForward",
low=sp.huntLow, high=highMax(sp.huntHigh), step=sp.huntStep
of SyncHuntBackward:
trace "Sync: HuntBackward",
low=sp.huntLow, high=sp.huntHigh, step=sp.huntStep
of SyncHuntRange:
trace "Sync: HuntRange",
low=sp.huntLow, high=sp.huntHigh, step=sp.huntStep
of SyncHuntRangeFinal:
trace "Sync: HuntRangeFinal",
low=sp.huntLow, high=sp.huntHigh, step=1
proc peerHuntCanonical*(sp: SyncPeer) {.async.} =
## Query a peer to update our knowledge of its canonical chain and its best
## block, which is its canonical chain head. This can be called at any time
## after a peer has negotiated the connection.
##
## This function is called in an exponential then binary search style
## during initial sync to find the canonical head, real-time polling
## afterwards to check for updates.
##
## All replies to this query are part of the peer's canonical chain at the
## time the peer sends them.
# If we send multiple `GetBlockHeaders` requests, the replies can be out of
# order, and prior to eth/66 there is no request-id. We'll avoid this
# problem by never sending overlapping `GetBlockHeaders` to the same peer.
if sp.pendingGetBlockHeaders:
#trace ">| Blocked overlapping eth.GetBlockHeaders (0x03)", peer=sp
await sleepAsync(chronos.milliseconds(500))
return
sp.pendingGetBlockHeaders = true
var request {.noinit.}: BlocksRequest
sp.peerSyncChainRequest(request)
if tracePackets:
if request.maxResults == 1 and request.startBlock.isHash:
tracePacket ">> Sending eth.GetBlockHeaders/Hash (0x03)", peer=sp,
blockHash=($request.startBlock.hash), count=1
elif request.maxResults == 1:
tracePacket ">> Sending eth.GetBlockHeaders (0x03)", peer=sp,
`block`=request.startBlock, count=1
elif request.startBlock.isHash:
tracePacket ">> Sending eth.GetBlockHeaders/Hash (0x03)", peer=sp,
firstBlockHash=request.startBlock, count=request.maxResults,
step=traceStep(request)
else:
tracePacket ">> Sending eth.GetBlockHeaders (0x03)", peer=sp,
firstBlock=request.startBlock, count=request.maxResults,
step=traceStep(request)
inc sp.stats.ok.getBlockHeaders
var reply: typeof await sp.peer.getBlockHeaders(request)
try:
reply = await sp.peer.getBlockHeaders(request)
except CatchableError as e:
traceNetworkError "<< Error waiting for reply to eth.GetBlockHeaders (0x03)",
peer=sp, error=e.msg
inc sp.stats.major.networkErrors
sp.stopped = true
return
if reply.isNone:
traceTimeout "<< Timeout waiting for reply to eth.GetBlockHeaders (0x03)",
peer=sp
# TODO: Should disconnect?
inc sp.stats.minor.timeoutBlockHeaders
return
let len = reply.get.headers.len
if tracePackets:
if len == 0:
tracePacket "<< Got EMPTY reply eth.BlockHeaders (0x04)", peer=sp,
got=0, requested=request.maxResults
else:
let firstBlock = reply.get.headers[0].blockNumber
let lastBlock = reply.get.headers[len - 1].blockNumber
tracePacket "<< Got reply eth.BlockHeaders (0x04)", peer=sp,
got=len, requested=request.maxResults, firstBlock, lastBlock
sp.pendingGetBlockHeaders = false
if len > request.maxResults.int:
tracePacketError "<< Protocol violation, excess headers in eth.BlockHeaders (0x04)",
peer=sp, got=len, requested=request.maxResults
# TODO: Should disconnect.
inc sp.stats.major.excessBlockHeaders
return
if len > 0:
# TODO: Check this is not copying the `headers`.
sp.peerSyncChainNonEmptyReply(request, reply.get.headers)
else:
sp.peerSyncChainEmptyReply(request)

View File

@ -0,0 +1,508 @@
# Nimbus - Robust support for `GetNodeData` network calls
#
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## This module provides an async function to call `GetNodeData`, a request in
## the Ethereum DevP2P/ETH network protocol. Parallel requests can be issued,
## maintaining a pipeline.
##
## Given a list of hashes, it returns a list of trie nodes or contract
## bytecodes matching those hashes. The returned nodes may be any subset of
## those requested, including an empty list. The returned nodes are not
## necessarily in the same order as the request, so a mapping from request
## items to node items is included. On timeout or error, `nil` is returned.
##
## Only data passing hash verification is returned, so hashes don't need to be
## verified again. No exceptions are raised, and no detail is returned about
## timeouts or errors, but systematically formatted trace messages are output
## if enabled, and show in detail if various events occur such as timeouts,
## bad hashes, mixed replies, network errors, etc.
##
## This tracks queued requests and individual request hashes, verifies received
## node data hashes, and matches them against requests. When a peer replies in
## same order as requests are sent, and each reply contains nodes in the same
## order as requested, the matching process is efficient. It avoids storing
## request details in a hash table when possible. If replies or nodes are out
## of order, the process is still efficient but has to do a little more work.
##
## Empty replies:
##
## Empty replies are matched with requests using a queue draining technique.
## After an empty reply is received, we temporarily pause further requests and
## wait for more replies. After we receive all outstanding replies, we know
## which requests the empty replies were for, and can complete those requests.
##
## Eth/66 protocol:
##
## Although Eth/66 simplifies by matching replies to requests, replies can still
## have data out of order or missing, so hashes still need to be verified and
## looked up. Much of the code here is still required for Eth/66.
##
## References:
##
## - [Ethereum Wire Protocol (ETH)](https://github.com/ethereum/devp2p/blob/master/caps/eth.md)
## - [`GetNodeData` (0x0d)](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getnodedata-0x0d)
## - [`NodeData` (0x0e)](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#nodedata-0x0e)
##
## Note:
##
## This should be made generic for other request types which need similar hash
## matching. Before this module was written, we tended to accept whatever came
## and assume a lot about replies. It often worked but wasn't robust enough.
{.push raises: [Defect].}
import
std/[sequtils, sets, tables, hashes],
chronos, stint, nimcrypto/keccak,
eth/[common/eth_types, rlp, p2p],
"."/[sync_types, protocol_ethxx]
type
NodeDataRequestQueue* = ref object of typeof SyncPeer().nodeDataRequestsBase
liveRequests*: HashSet[NodeDataRequest]
empties*: int
# `OrderedSet` was considered instead of `seq` here, but it has a slow
# implementation of `excl`, defeating the motivation for using it.
waitingOnEmpties*: seq[NodeDataRequest]
beforeFirstHash*: seq[NodeDataRequest]
beforeFullHash*: HashSet[NodeDataRequest]
# We need to be able to lookup requests by the hash of reply data.
# `ptr NodeHash` is used here so the table doesn't require an independent
# copy of the hash. The hash is part of the request object.
itemHash*: Table[ptr NodeHash, (NodeDataRequest, int)]
NodeDataRequest* = ref object
sp*: SyncPeer
hashes*: seq[NodeHash]
future*: Future[NodeDataReply]
timer*: TimerCallback
pathRange*: (InteriorPath, InteriorPath)
fullHashed*: bool
NodeDataReply* = ref object
reverseMap: seq[int] # Access with `reversMap(i)` instead.
hashVerifiedData*: seq[Blob]
template reverseMap*(reply: NodeDataReply, index: int): int =
## Given an index into the request hash list, return index into the reply
## `hashVerifiedData`, or -1 if there is no data for that request hash.
if index < reply.reverseMap.len: reply.reverseMap[index] - 1
elif index < reply.hashVerifiedData.len: index
else: -1
template nodeDataRequests*(sp: SyncPeer): auto =
## Make `sp.nodeDataRequests` available with the real object type.
sync_types.nodeDataRequestsBase(sp).NodeDataRequestQueue
template nodeDataHash*(data: Blob): NodeHash = keccak256.digest(data).NodeHash
template pathRange(request: NodeDataRequest): string =
pathRange(request.pathRange[0], request.pathRange[1])
template `$`*(paths: (InteriorPath, InteriorPath)): string =
pathRange(paths[0], paths[1])
proc traceGetNodeDataSending(request: NodeDataRequest) =
tracePacket ">> Sending eth.GetNodeData (0x0d)", peer=request.sp,
hashes=request.hashes.len, pathRange=request.pathRange
proc traceGetNodeDataDelaying(request: NodeDataRequest) =
tracePacket ">> Delaying eth.GetNodeData (0x0d)", peer=request.sp,
hashes=request.hashes.len, pathRange=request.pathRange
proc traceGetNodeDataSendError(request: NodeDataRequest,
e: ref CatchableError) =
traceNetworkError ">> Error sending eth.GetNodeData (0x0d)",
peer=request.sp, error=e.msg,
hashes=request.hashes.len, pathRange=request.pathRange
proc traceNodeDataReplyError(request: NodeDataRequest,
e: ref CatchableError) =
traceNetworkError "<< Error waiting for reply to eth.GetNodeData (0x0d)",
peer=request.sp, error=e.msg,
hashes=request.hashes.len, pathRange=request.pathRange
proc traceNodeDataReplyTimeout(request: NodeDataRequest) =
traceTimeout "<< Timeout waiting for reply to eth.GetNodeData (0x0d)",
hashes=request.hashes.len, pathRange=request.pathRange, peer=request.sp
proc traceGetNodeDataDisconnected(request: NodeDataRequest) =
traceNetworkError "<< Peer disconnected, not sending eth.GetNodeData (0x0d)",
peer=request.sp, hashes=request.hashes.len, pathRange=request.pathRange
proc traceNodeDataReplyEmpty(sp: SyncPeer, request: NodeDataRequest) =
# `request` can be `nil` because we don't always know which request
# the empty reply goes with. Therefore `sp` must be included.
if request.isNil:
tracePacket "<< Got EMPTY eth.NodeData (0x0e)", peer=sp,
got=0
else:
tracePacket "<< Got eth.NodeData (0x0e)", peer=sp,
got=0, requested=request.hashes.len, pathRange=request.pathRange
proc traceNodeDataReplyUnmatched(sp: SyncPeer, got: int) =
# There is no request for this reply. Therefore `sp` must be included.
tracePacketError "<< Protocol violation, non-reply eth.NodeData (0x0e)",
peer=sp, got
debug "Sync: Warning: Unexpected non-reply eth.NodeData from peer"
proc traceNodeDataReply(request: NodeDataRequest,
got, use, unmatched, other, duplicates: int) =
if tracePackets:
logScope: got=got
logScope: requested=request.hashes.len
logScope: pathRange=request.pathRange
logScope: peer=request.sp
if got > request.hashes.len and (unmatched + other) == 0:
tracePacket "<< Got EXCESS reply eth.NodeData (0x0e)"
elif got == request.hashes.len or use != got:
tracePacket "<< Got reply eth.NodeData (0x0e)"
elif got < request.hashes.len:
tracePacket "<< Got TRUNCATED reply eth.NodeData (0x0e)"
if use != got:
logScope:
discarding=(got - use)
keeping=use
got=got
requested=request.hashes.len
pathRange=request.pathRange
peer=request.sp
if unmatched > 0:
tracePacketError "<< Protocol violation, incorrect hashes in eth.NodeData (0x0e)"
debug "Sync: Warning: eth.NodeData has nodes with incorrect hashes"
elif other > 0:
tracePacketError "<< Protocol violation, mixed request nodes in eth.NodeData (0x0e)"
debug "Sync: Warning: eth.NodeData has nodes from mixed requests"
elif got > request.hashes.len:
# Excess without unmatched/other is only possible with duplicates > 0.
tracePacketError "<< Protocol violation, excess nodes in eth.NodeData (0x0e)"
debug "Sync: Warning: eth.NodeData has more nodes than requested"
else:
tracePacketError "<< Protocol violation, duplicate nodes in eth.NodeData (0x0e)"
debug "Sync: Warning: eth.NodeData has duplicate nodes"
proc hash(hash: ptr Hash256): Hash = cast[ptr Hash](addr hash.data)[]
proc `==`(hash1, hash2: ptr Hash256): bool = hash1[] == hash2[]
proc hash(request: NodeDataRequest): Hash = hash(cast[pointer](request))
proc nodeDataMatchRequest(rq: NodeDataRequestQueue, data: openArray[Blob],
reverseMap: var seq[int],
use, unmatched, other, duplicates: var int
): NodeDataRequest =
## Verify hashes in the received node data and use them to find the matching
## request, and match individual nodes to indices in the request in case they
## are out of order, which is allowed. Note, even if we know which request,
## (`eth/66`), we have to hash and match the indices anyway.
##
## The matched request is returned or `nil` if no match.
## `reverseMap` is updated, and it should be empty initially.
## The caller is responsible for applying `reverseMap` to the data.
##
## `use`, `unmatched`, `other` or `duplicates` are incremented for each node.
## If the last three happen, the reply has errors, but the caller can decide
## what to do. Non-nil `request` may still be returned with those errors.
var request: NodeDataRequest = nil
# Iterate through reply data, hashing and efficiently finding what matches.
for i in 0 ..< data.len:
var itemRequest: NodeDataRequest
var index = 0
let hash = nodeDataHash(data[i])
if i == 0:
# Efficiently guess the request belongs to the oldest queued request and
# the items are in requested order. This lets us skip storing any item
# hashes in the global item hash table. `beforeFirstHash` is ordered to
# make sure we always find the oldest queued request first.
var j = 0
while j < rq.beforeFirstHash.len:
let hashRequest = rq.beforeFirstHash[j]
if hashRequest.hashes[0] == hash:
itemRequest = hashRequest
break
# Efficiently scan other requests, hashing the first item in each to
# speed up future scans. This lets us avoid storing all item hashes
# in the global request table when replies have items in requested
# order, even though replies themselves are out of order.
if j == 0:
(itemRequest, index) = rq.itemHash.getOrDefault(unsafeAddr hash)
if not itemRequest.isNil:
break
rq.itemHash[addr hashRequest.hashes[0]] = (hashRequest, 0)
rq.beforeFullHash.incl(hashRequest)
inc j
if j > 0:
rq.beforeFirstHash.delete(0, j-1)
elif not request.isNil:
# Efficiently guess the items are in requested order. This avoids
# having to store individual item hashes in the global request table.
if i < request.hashes.len and request.hashes[i] == hash:
(itemRequest, index) = (request, i)
# If hash-store avoiding heuristics didn't work, a full lookup is required.
# If this succeeds, the reply must have items out of requested order.
# If it fails, a peer sent a bad reply.
if itemRequest.isNil:
(itemRequest, index) = rq.itemHash.getOrDefault(unsafeAddr hash)
if itemRequest.isNil:
# Hash and search items in the current request first, if there is one.
if not request.isNil and not request.fullHashed:
request.fullHashed = true
for j in 0 ..< request.hashes.len:
rq.itemHash[addr request.hashes[j]] = (request, j)
(itemRequest, index) =
rq.itemHash.getOrDefault(unsafeAddr hash)
if itemRequest.isNil:
# Hash and search all items across all requests.
if rq.beforeFirstHash.len + rq.beforeFullHash.len > 0:
if rq.beforeFullHash.len > 0:
rq.beforeFirstHash.add(rq.beforeFullHash.toSeq)
rq.beforeFullHash.clear()
for hashRequest in rq.beforeFirstHash:
if not hashRequest.fullHashed:
hashRequest.fullHashed = true
for j in 0 ..< hashRequest.hashes.len:
rq.itemHash[addr hashRequest.hashes[j]] = (hashRequest, j)
rq.beforeFirstHash.setLen(0)
(itemRequest, index) = rq.itemHash.getOrDefault(unsafeAddr hash)
if itemRequest.isNil:
# Not found anywhere.
inc unmatched
continue
# Matched now in `itemRequest`. But is it the same request as before?
if not request.isNil and itemRequest != request:
inc other
continue
request = itemRequest
# Matched now in `request`. Is item in order? Is it a duplicate?
# Build `reverseMap` but efficiently skip doing so if items are in order.
if index == i and reverseMap.len == 0:
inc use
else:
if reverseMap.len == 0:
newSeq[int](reverseMap, request.hashes.len)
for j in 0 ..< i:
reverseMap[j] = j
if reverseMap[index] > 0:
inc duplicates
continue
reverseMap[index] = i + 1
inc use
return request
proc nodeDataRequestEnqueue(rq: NodeDataRequestQueue,
request: NodeDataRequest) =
## Add `request` to the data structures in `rq: NodeDataRequest`.
doAssert not rq.liveRequests.containsOrIncl(request)
rq.beforeFirstHash.add(request)
proc nodeDataRequestDequeue(rq: NodeDataRequestQueue,
request: NodeDataRequest) =
## Remove `request` from the data structures in `rq: NodeDataRequest`.
doAssert not rq.liveRequests.missingOrExcl(request)
let index = rq.beforeFirstHash.find(request)
if index >= 0:
rq.beforeFirstHash.delete(index)
rq.beforeFullHash.excl(request)
for j in 0 ..< (if request.fullHashed: request.hashes.len else: 1):
rq.itemHash.del(addr request.hashes[j])
# Forward declarations.
proc nodeDataTryEmpties(rq: NodeDataRequestQueue)
proc nodeDataEnqueueAndSend(request: NodeDataRequest) {.async.}
proc nodeDataComplete(request: NodeDataRequest, reply: NodeDataReply,
insideTryEmpties = false) =
## Complete `request` with received data or other reply.
if request.future.finished:
# Subtle: Timer can trigger and its callback be added to Chronos run loop,
# then data event trigger and call `clearTimer()`. The timer callback
# will then run but it must be ignored.
debug "Sync: Warning: Resolved timer race over eth.NodeData reply"
else:
request.timer.clearTimer()
request.future.complete(reply)
let rq = request.sp.nodeDataRequests
trace "nodeDataRequestDequeue", addr=cast[pointer](request).repr
rq.nodeDataRequestDequeue(request)
# It may now be possible to match empty replies to earlier requests.
if not insideTryEmpties:
rq.nodeDataTryEmpties()
proc nodeDataTimeout(request: NodeDataRequest) =
## Complete `request` with timeout.
request.traceNodeDataReplyTimeout()
{.gcsafe.}: request.nodeDataComplete(nil)
proc nodeDataTryEmpties(rq: NodeDataRequestQueue) =
## See if we can match queued empty replies to earlier requests.
# TODO: This approach doesn't handle timeouts and errors correctly.
# The problem is it's ambiguous whether an empty reply after timed out
# request was intended by the peer for that request.
if rq.empties > 0 and rq.empties >= rq.liveRequests.len:
# Complete all live requests with empty results, now they are all matched.
if rq.liveRequests.len > 0:
# Careful: Use `.toSeq` below because we must not use the `HashSet`
# iterator while the set is being changed.
for request in rq.liveRequests.toSeq:
# Constructed reply object, because empty is different from timeout.
request.nodeDataComplete(NodeDataReply(), true)
# Move all temporarily delayed requests to the live state, and send them.
if rq.waitingOnEmpties.len > 0:
var tmpList: seq[NodeDataRequest]
swap(tmpList, rq.waitingOnEmpties)
for i in 0 ..< tmpList.len:
asyncSpawn nodeDataEnqueueAndSend(tmpList[i])
proc nodeDataNewRequest(sp: SyncPeer, hashes: seq[NodeHash],
pathFrom, pathTo: InteriorPath
): NodeDataRequest =
## Make a new `NodeDataRequest` to receive a reply or timeout in future. The
## caller is responsible for sending the `GetNodeData` request, and must do
## that after this setup (not before) to avoid race conditions.
let request = NodeDataRequest(sp: sp, hashes: hashes,
pathRange: (pathFrom, pathTo))
# TODO: Cache the time when making batches of requests, instead of calling
# `Moment.fromNow()` which always does a system call. `p2pProtocol` request
# timeouts have the same issue (and is where we got 10 seconds default).
# request.timer = setTimer(Moment.fromNow(10.seconds),
# nodeDataTimeout, cast[pointer](request))
request.timer = safeSetTimer(Moment.fromNow(10.seconds),
nodeDataTimeout, request)
request.future = newFuture[NodeDataReply]()
return request
proc nodeDataEnqueueAndSend(request: NodeDataRequest) {.async.} =
## Helper function to send an `eth.GetNodeData` request.
## But not when we're draining the in flight queue to match empty replies.
let sp = request.sp
if sp.stopped:
request.traceGetNodeDataDisconnected()
request.future.complete(nil)
return
let rq = sp.nodeDataRequests
if rq.empties > 0:
request.traceGetNodeDataDelaying()
rq.waitingOnEmpties.add(request)
return
request.traceGetNodeDataSending()
rq.nodeDataRequestEnqueue(request)
inc sp.stats.ok.getNodeData
try:
# TODO: What exactly does this `await` do, wait for space in send buffer?
# TODO: Check if this copies the hashes redundantly.
await sp.peer.getNodeData(request.hashes)
except CatchableError as e:
request.traceGetNodeDataSendError(e)
inc sp.stats.major.networkErrors
sp.stopped = true
request.future.fail(e)
proc onNodeData(sp: SyncPeer, data: openArray[Blob]) =
## Handle an incoming `eth.NodeData` reply.
## Practically, this is also where all the incoming packet trace messages go.
let rq = sp.nodeDataRequests
# Empty replies are meaningful, but we can only associate them with requests
# when there are enough empty replies to cover all outstanding requests. If
# not, queue the empty reply and block further requests. Existing other
# requests in flight can still receive data.
if data.len == 0:
# If there are no requests, don't queue, just let processing fall
# through until the "non-reply" protocol violation error.
if rq.liveRequests.len > 0:
sp.traceNodeDataReplyEmpty(if rq.liveRequests.len != 1: nil
else: rq.liveRequests.toSeq[0])
inc rq.empties
# It may now be possible to match empty replies to earlier requests.
rq.nodeDataTryEmpties()
return
let reply = NodeDataReply()
var (use, unmatched, other, duplicates) = (0, 0, 0, 0)
let request = nodeDataMatchRequest(rq, data, reply.reverseMap,
use, unmatched, other, duplicates)
if request.isNil:
sp.traceNodeDataReplyUnmatched(data.len)
return
request.traceNodeDataReply(data.len, use, unmatched, other, duplicates)
# TODO: Speed improvement possible here.
if reply.reverseMap.len == 0:
reply.hashVerifiedData = if use == data.len: @data
else: @data[0 .. (use-1)]
else:
reply.hashVerifiedData = newSeqOfCap[Blob](use)
var j = 0
for i in 0 ..< request.hashes.len:
let index = reply.reverseMap[i] - 1
if index >= 0:
reply.hashVerifiedData.add(data[index])
reply.reverseMap[i] = j + 1
inc j
doAssert reply.hashVerifiedData.len == use
request.nodeDataComplete(reply)
proc getNodeData*(sp: SyncPeer, hashes: seq[NodeHash],
pathFrom, pathTo: InteriorPath): Future[NodeDataReply] {.async.} =
## Async function to send a `GetNodeData` request to a peer, and when the
## peer replies, or on timeout or error, return `NodeDataReply`.
##
## The request is a list of hashes. The reply is a list of trie nodes or
## contract bytecodes matching those hashes, not necessarily in the same
## order as the request. The returned list may be any subset of the
## requested nodes, including an empty list. On timeout or error `nil` is
## returned. Use `reply.reverseMap(i)` to map request items to reply data.
##
## Only data passing hash verification is returned, so hashes don't need to
## be verified again. No exceptions are raised, and no detail is returned
## about timeouts or errors.
##
## `pathFrom` and `pathTo` are not used except for logging.
let request = sp.nodeDataNewRequest(hashes, pathFrom, pathTo)
# There is no "Sending..." trace message here, because it can be delayed
# by the empty reply logic in `nodeDataEnqueueAndSend`.
var reply: NodeDataReply = nil
try:
await request.nodeDataEnqueueAndSend()
reply = await request.future
except CatchableError as e:
request.traceNodeDataReplyError(e)
inc sp.stats.major.networkErrors
sp.stopped = true
return nil
# Timeout, packet and packet error trace messages are done in `onNodeData`
# and `nodeDataTimeout`, where there is more context than here. Here we
# always received just valid data with hashes already verified, or `nil`.
return reply
proc setupGetNodeData*(sp: SyncPeer) =
## Initialise `SyncPeer` to support `getNodeData` calls.
if sp.nodeDataRequests.isNil:
sp.nodeDataRequests = NodeDataRequestQueue()
sp.peer.state(eth).onNodeData =
proc (_: Peer, data: openArray[Blob]) =
{.gcsafe.}: onNodeData(sp, data)
sp.peer.state(eth).onGetNodeData =
proc (_: Peer, hashes: openArray[NodeHash], data: var seq[Blob]) =
# Return empty nodes result. This callback is installed to
# ensure we don't reply with nodes from the chainDb.
discard

99
nimbus/sync/newsync.nim Normal file
View File

@ -0,0 +1,99 @@
# Nimbus - New sync approach - A fusion of snap, trie, beam and other methods
#
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises: [Defect].}
import
chronicles,
chronos,
eth/[common/eth_types, p2p, rlp],
eth/p2p/[rlpx, peer_pool, private/p2p_types],
stint,
"."/[sync_types, protocol_ethxx, chain_head_tracker, get_nodedata]
proc syncPeerLoop(sp: SyncPeer) {.async.} =
# This basic loop just runs the head-hunter for each peer.
while not sp.stopped:
await sp.peerHuntCanonical()
if sp.stopped:
return
let delayMs = if sp.syncMode == SyncLocked: 1000 else: 50
await sleepAsync(chronos.milliseconds(delayMs))
proc syncPeerStart(sp: SyncPeer) =
asyncSpawn sp.syncPeerLoop()
proc syncPeerStop(sp: SyncPeer) =
sp.stopped = true
# TODO: Cancel SyncPeers that are running. We need clean cancellation for
# this. Doing so reliably will be addressed at a later time.
proc onPeerConnected(ns: NewSync, protocolPeer: Peer) =
let sp = SyncPeer(
ns: ns,
peer: protocolPeer,
stopped: false,
# Initial state: hunt forward, maximum uncertainty range.
syncMode: SyncHuntForward,
huntLow: 0.toBlockNumber,
huntHigh: high(BlockNumber),
huntStep: 0,
bestBlockNumber: 0.toBlockNumber
)
trace "Sync: Peer connected", peer=sp
sp.setupGetNodeData()
if protocolPeer.state(eth).initialized:
# We know the hash but not the block number.
sp.bestBlockHash = protocolPeer.state(eth).bestBlockHash
#TODO: Temporarily disabled because it's useful to test the head hunter.
#sp.syncMode = SyncOnlyHash
else:
trace "Sync: state(eth) not initialized!"
ns.syncPeers.add(sp)
sp.syncPeerStart()
proc onPeerDisconnected(ns: NewSync, protocolPeer: Peer) =
trace "Sync: Peer disconnected", peer=protocolPeer
# Find matching `sp` and remove from `ns.syncPeers`.
var sp: SyncPeer = nil
for i in 0 ..< ns.syncPeers.len:
if ns.syncPeers[i].peer == protocolPeer:
sp = ns.syncPeers[i]
ns.syncPeers.delete(i)
break
if sp.isNil:
debug "Sync: Unknown peer disconnected", peer=protocolPeer
return
sp.syncPeerStop()
proc newSyncEarly*(ethNode: EthereumNode) =
info "** Using --new-sync experimental new sync algorithms"
info "** Note that fetched data is not currently stored"
info "** It's used for timing, behaviour and interop tests"
let ns = NewSync()
var po = PeerObserver(
onPeerConnected:
proc(protocolPeer: Peer) {.gcsafe.} =
ns.onPeerConnected(protocolPeer),
onPeerDisconnected:
proc(protocolPeer: Peer) {.gcsafe.} =
ns.onPeerDisconnected(protocolPeer)
)
po.setProtocol(eth)
ethNode.peerPool.addObserver(ns, po)
proc newSync*() =
discard

170
nimbus/sync/pie/common.nim Normal file
View File

@ -0,0 +1,170 @@
# Nimbus - Fetch account and storage states from peers efficiently
#
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
std/[sets, tables, sequtils, strutils],
chronos,
eth/[common/eth_types, rlp, p2p],
stint,
../sync_types
type
LeafRange* = object
leafLow*, leafHigh*: LeafPath
SharedFetchState* = ref object of typeof SyncPeer().sharedFetchBase
## Account fetching state that is shared among all peers.
# Leaf path ranges not fetched or in progress on any peer.
leafRanges: seq[LeafRange]
countAccounts*: int64
countAccountBytes*: int64
countRange*: UInt256
countRangeStarted*: bool
countRangeSnap*: UInt256
countRangeSnapStarted*: bool
countRangeTrie*: UInt256
countRangeTrieStarted*: bool
displayTimer: TimerCallback
template sharedFetch*(sp: SyncPeer): auto =
sync_types.sharedFetchBase(sp).SharedFetchState
proc rangeFraction(value: UInt256, discriminator: bool): int =
## Format a value in the range 0..2^256 as a percentage, 0-100%. As the
## top of the range 2^256 cannot be represented in `UInt256` it actually
## has the value `0: UInt256`, and with that value, `discriminator` is
## consulted to decide between 0% and 100%. For other values, the value is
## constrained to be between slightly above 0% and slightly below 100%,
## so that the endpoints are distinctive when displayed.
const multiplier = 10000
var fraction: int = 0 # Fixed point, fraction 0.0-1.0 multiplied up.
if value == 0:
return if discriminator: multiplier else: 0 # Either 100.00% or 0.00%.
const shift = 8 * (sizeof(value) - sizeof(uint64))
const wordHigh: uint64 = (high(typeof(value)) shr shift).truncate(uint64)
# Divide `wordHigh+1` by `multiplier`, rounding up, avoiding overflow.
const wordDiv: uint64 = 1 + ((wordHigh shr 1) div (multiplier.uint64 shr 1))
let wordValue: uint64 = (value shr shift).truncate(uint64)
let divided: uint64 = wordValue div wordDiv
return if divided >= multiplier: multiplier - 1
elif divided <= 0: 1
else: divided.int
proc percent(value: UInt256, discriminator: bool): string =
var str = intToStr(rangeFraction(value, discriminator), 3)
str.insert(".", str.len - 2)
str.add('%')
return str
proc setDisplayTimer(sp: SyncPeer, at: Moment) {.gcsafe.}
proc displayUpdate(sp: SyncPeer) {.gcsafe.} =
let sharedFetch = sp.sharedFetch
doAssert not sharedFetch.isNil
info "State: Account sync progress",
percent=percent(sharedFetch.countRange, sharedFetch.countRangeStarted),
accounts=sharedFetch.countAccounts,
snap=percent(sharedFetch.countRangeSnap, sharedFetch.countRangeSnapStarted),
trie=percent(sharedFetch.countRangeTrie, sharedFetch.countRangeTrieStarted)
sp.setDisplayTimer(Moment.fromNow(1.seconds))
proc setDisplayTimer(sp: SyncPeer, at: Moment) =
sp.sharedFetch.displayTimer = safeSetTimer(at, displayUpdate, sp)
proc newSharedFetchState(sp: SyncPeer): SharedFetchState =
result = SharedFetchState(
leafRanges: @[LeafRange(leafLow: low(LeafPath), leafHigh: high(LeafPath))]
)
result.displayTimer = safeSetTimer(Moment.fromNow(100.milliseconds),
displayUpdate, sp)
proc hasSlice*(sp: SyncPeer): bool =
## Return `true` iff `getSlice` would return a free slice to work on.
if sp.sharedFetch.isNil:
sp.sharedFetch = newSharedFetchState(sp)
result = not sp.sharedFetch.isNil and sp.sharedFetch.leafRanges.len > 0
trace "Sync: hasSlice", peer=sp, hasSlice=result
proc getSlice*(sp: SyncPeer, leafLow, leafHigh: var LeafPath): bool =
## Claim a free slice to work on. If a slice was available, it's claimed,
## `leadLow` and `leafHigh` are set to the slice range and `true` is
## returned. Otherwise `false` is returned.
if sp.sharedFetch.isNil:
sp.sharedFetch = newSharedFetchState(sp)
let sharedFetch = sp.sharedFetch
template ranges: auto = sharedFetch.leafRanges
const leafMaxFetchRange = (high(LeafPath) - low(LeafPath)) div 1000
if ranges.len == 0:
trace "Sync: getSlice", leafRange="none"
return false
leafLow = ranges[0].leafLow
if ranges[0].leafHigh - ranges[0].leafLow <= leafMaxFetchRange:
leafHigh = ranges[0].leafHigh
ranges.delete(0)
else:
leafHigh = leafLow + leafMaxFetchRange
ranges[0].leafLow = leafHigh + 1
trace "Sync: getSlice", peer=sp, leafRange=pathRange(leafLow, leafHigh)
return true
proc putSlice*(sp: SyncPeer, leafLow, leafHigh: LeafPath) =
## Return a slice to the free list, merging with the rest of the list.
let sharedFetch = sp.sharedFetch
template ranges: auto = sharedFetch.leafRanges
trace "Sync: putSlice", leafRange=pathRange(leafLow, leafHigh), peer=sp
var i = 0
while i < ranges.len and leafLow > ranges[i].leafHigh:
inc i
if i > 0 and leafLow - 1 == ranges[i-1].leafHigh:
dec i
var j = i
while j < ranges.len and leafHigh >= ranges[j].leafLow:
inc j
if j < ranges.len and leafHigh + 1 == ranges[j].leafLow:
inc j
if j == i:
ranges.insert(LeafRange(leafLow: leafLow, leafHigh: leafHigh), i)
else:
if j-1 > i:
ranges[i].leafHigh = ranges[j-1].leafHigh
ranges.delete(i+1, j-1)
if leafLow < ranges[i].leafLow:
ranges[i].leafLow = leafLow
if leafHigh > ranges[i].leafHigh:
ranges[i].leafHigh = leafHigh
template getSlice*(sp: SyncPeer, leafRange: var LeafRange): bool =
sp.getSlice(leafRange.leafLow, leafRange.leafHigh)
template putSlice*(sp: SyncPeer, leafRange: LeafRange) =
sp.putSlice(leafRange.leafLow, leafRange.leafHigh)
proc countSlice*(sp: SyncPeer, leafLow, leafHigh: LeafPath, which: bool) =
doAssert leafLow <= leafHigh
sp.sharedFetch.countRange += leafHigh - leafLow + 1
sp.sharedFetch.countRangeStarted = true
if which:
sp.sharedFetch.countRangeSnap += leafHigh - leafLow + 1
sp.sharedFetch.countRangeSnapStarted = true
else:
sp.sharedFetch.countRangeTrie += leafHigh - leafLow + 1
sp.sharedFetch.countRangeTrieStarted = true
template countSlice*(sp: SyncPeer, leafRange: LeafRange, which: bool) =
sp.countSlice(leafRange.leafLow, leafRange.leafHigh, which)
proc countAccounts*(sp: SyncPeer, len: int) =
sp.sharedFetch.countAccounts += len

View File

@ -0,0 +1,138 @@
# Nimbus - Fetch account and storage states from peers by snapshot traversal
#
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## This module fetches the Ethereum account state trie from network peers by
## traversing leaves of the trie in leaf path order, making network requests
## using the `snap` protocol.
##
## From the leaves it is possible to reconstruct parts of a full trie. With a
## separate trie traversal process it is possible to efficiently update the
## leaf states for related tries (new blocks), and merge partial data from
## different related tries (blocks at different times) together in a way that
## eventually becomes a full trie for a single block.
{.push raises: [Defect].}
import
std/[sets, tables],
chronos,
eth/[common/eth_types, rlp, p2p],
nimcrypto/keccak,
stint,
../sync_types,
../protocol_snapxx,
./common
const
snapRequestBytesLimit = 2 * 1024 * 1024
## Soft bytes limit to request in `snap` protocol calls.
proc snapFetch*(sp: SyncPeer, stateRoot: TrieHash,
leafRange: LeafRange) {.async.} =
var origin = leafRange.leafLow
var limit = leafRange.leafHigh
const responseBytes = 2 * 1024 * 1024
if sp.stopped:
traceNetworkError "<< Peer already disconnected, not sending snap.GetAccountRange (0x00)",
peer=sp, accountRange=pathRange(origin, limit),
stateRoot=($stateRoot), bytesLimit=snapRequestBytesLimit
sp.putSlice(leafRange)
if tracePackets:
tracePacket ">> Sending snap.GetAccountRange (0x00)",
accountRange=pathRange(origin, limit),
stateRoot=($stateRoot), bytesLimit=snapRequestBytesLimit, peer=sp
var reply: typeof await sp.peer.getAccountRange(stateRoot, origin, limit,
snapRequestBytesLimit)
try:
reply = await sp.peer.getAccountRange(stateRoot, origin, limit,
snapRequestBytesLimit)
except CatchableError as e:
traceNetworkError "<< Error waiting for reply to snap.GetAccountRange (0x00)",
peer=sp, error=e.msg
inc sp.stats.major.networkErrors
sp.stopped = true
sp.putSlice(leafRange)
return
if reply.isNone:
traceTimeout "<< Timeout waiting for reply to snap.GetAccountRange (0x00)",
peer=sp
sp.putSlice(leafRange)
return
# TODO: Unwanted copying here caused by `.get`. But the simple alternative
# where `reply.get` is used on every access, even just to get `.len`, results
# in more copying. TODO: Check if this `let` should be `var`.
let accountsAndProof = reply.get
template accounts: auto = accountsAndProof.accounts
# TODO: We're not currently verifying boundary proofs, but we do depend on
# whether there is a proof supplied. Unlike Snap sync, the Pie sync
# algorithm doesn't verify most boundary proofs at this stage.
template proof: auto = accountsAndProof.proof
let len = accounts.len
if len == 0:
# If there's no proof, this reply means the peer has no accounts available
# in the range for this query. But if there's a proof, this reply means
# there are no more accounts starting at path `origin` up to max path.
# This makes all the difference to terminating the fetch. For now we'll
# trust the mere existence of the proof rather than verifying it.
if proof.len == 0:
tracePacket "<< Got EMPTY reply snap.AccountRange (0x01)", peer=sp,
got=len, proofLen=proof.len, gotRange="-",
requestedRange=pathRange(origin, limit), stateRoot=($stateRoot)
sp.putSlice(leafRange)
# Don't keep retrying snap for this state.
sp.stopThisState = true
else:
tracePacket "<< Got END reply snap.AccountRange (0x01)", peer=sp,
got=len, proofLen=proof.len, gotRange=pathRange(origin, high(LeafPath)),
requestedRange=pathRange(origin, limit), stateRoot=($stateRoot)
# Current slicer can't accept more result data than was requested, so
# just leave the requested slice claimed and update statistics.
sp.countSlice(origin, limit, true)
return
var lastPath = accounts[len-1].accHash
tracePacket "<< Got reply snap.AccountRange (0x01)", peer=sp,
got=len, proofLen=proof.len, gotRange=pathRange(origin, lastPath),
requestedRange=pathRange(origin, limit), stateRoot=($stateRoot)
# Missing proof isn't allowed, unless `origin` is min path in which case
# there might be no proof if the result spans the entire range.
if proof.len == 0 and origin != low(LeafPath):
tracePacketError "<< Protocol violation, missing proof in snap.AccountRange (0x01)",
peer=sp, got=len, proofLen=proof.len, gotRange=pathRange(origin,lastPath),
requestedRange=pathRange(origin, limit), stateRoot=($stateRoot)
sp.putSlice(leafRange)
return
var keepAccounts = len
if lastPath < limit:
sp.countSlice(origin, lastPath, true)
sp.putSlice(lastPath + 1, limit)
else:
# Current slicer can't accept more result data than was requested.
# So truncate to limit before updating statistics.
sp.countSlice(origin, limit, true)
while lastPath > limit:
dec keepAccounts
if keepAccounts == 0:
break
lastPath = accounts[keepAccounts-1].accHash
sp.countAccounts(keepAccounts)
proc peerSupportsSnap*(sp: SyncPeer): bool =
not sp.stopped and sp.peer.supports(snap1)

View File

@ -0,0 +1,359 @@
# Nimbus - Fetch account and storage states from peers by trie traversal
#
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## This module fetches the Ethereum account state trie from network peers by
## traversing the trie from the root, making network requests.
##
## Requests are batched, pipelined, sorted, sliced and load-balanced.
##
## - Batching and pipelining improve network performance to each peer.
##
## - Load-balancing allows the available of multiple peers to be used.
##
## - Sorting and slicing is a key part of the pie sync algorithm, which allows
## the entire Ethereum state to be fetched, even following hash trie
## pointers, without significant random access database I/O.
{.push raises: [Defect].}
import
std/[sets, tables, algorithm],
chronos,
eth/[common/eth_types, rlp, p2p],
stint,
".."/[sync_types, get_nodedata, validate_trienode],
./common
type
FetchState = ref object of typeof SyncPeer().fetchBase
## Account fetching state on a single peer.
sp: SyncPeer
nodeGetQueue: seq[SingleNodeRequest]
nodeGetsInFlight: int
scheduledBatch: bool
progressPrefix: string
progressCount: int
nodesInFlight: int
getNodeDataErrors: int
leafRange: LeafRange
unwindAccounts: int64
unwindAccountBytes: int64
finish: Future[void]
SingleNodeRequest = ref object
hash: NodeHash
path: InteriorPath
future: Future[Blob]
const
maxBatchGetNodeData = 384
## Maximum number of node hashes to batch per `GetNodeData` request.
maxParallelGetNodeData = 32
## Maximum number of `GetNodeData` requests in parallel to a single peer.
template fetch*(sp: SyncPeer): auto =
sync_types.fetchBase(sp).FetchState
# Forward declaration.
proc scheduleBatchGetNodeData(fetch: FetchState) {.gcsafe.}
proc wrapCallGetNodeData(fetch: FetchState, hashes: seq[NodeHash],
futures: seq[Future[Blob]],
pathFrom, pathTo: InteriorPath) {.async.} =
inc fetch.nodeGetsInFlight
let reply = await fetch.sp.getNodeData(hashes, pathFrom, pathTo)
# Timeout, packet and packet error trace messages are done in `get_nodedata`,
# where there is more context than here. Here we always received just valid
# data with hashes already verified, or empty list of `nil`.
if reply.isNil:
# Timeout or error.
fetch.sp.stopThisState = true
for i in 0 ..< futures.len:
futures[i].complete(@[])
elif reply.hashVerifiedData.len == 0:
# Empty reply, matched to request.
# It means there are none of the nodes available, but it's not an error.
fetch.sp.stopThisState = true
for i in 0 ..< futures.len:
futures[i].complete(@[])
else:
# Non-empty reply.
for i in 0 ..< futures.len:
let index = reply.reverseMap(i)
if index >= 0:
futures[i].complete(reply.hashVerifiedData[index])
else:
futures[i].complete(@[])
dec fetch.nodeGetsInFlight
# Receiving a reply may allow more requests to be sent.
if fetch.nodeGetQueue.len > 0 and not fetch.scheduledBatch:
fetch.scheduleBatchGetNodeData()
proc batchGetNodeData(fetch: FetchState) =
var i = fetch.nodeGetQueue.len
if i == 0 or fetch.nodeGetsInFlight >= maxParallelGetNodeData:
return
# WIP NOTE: The algorithm below is out of date and is not the final algorithm
# (which doesn't need to sort). Below gets some of the job done, but it can
# exhaust memory in pathological cases depending how the peers respond, and
# it cannot track prograss or batch the leaves for storage. Its replacement
# is actually simpler but depends on a data structure which has yet to be
# merged. Also, it doesn't use `Future` so much.
#
# OLD NOTE: Sort individual requests in order of path. The sort is
# descending order but they are popped off the end of the sequence (for O(1)
# removal) so are processed in ascending order of path.
#
# For large state tries, this is important magic:
#
# - This sort and the bounded max nodes/requests limits together cause
# trie traversal to use depth-first order instead of breadth-first.
# - More precisely, it uses approximate depth-first order, but if that
# would result in any unused capacity in the network request pipelining,
# that capacity is used to fetch additional nodes instead of wasted.
# - Depth-first (approximate) has an important effect of limiting memory used.
# With breadth-first it would use a vast amount of memory on large tries.
#
# - The pipelining modification to depth-first completely changes network
# behaviour. It allows a pipeline to bootstrap until it's properly full.
# - Without the modification, strict depth-first would be much slower as
# every request would wait for a round-trip before starting the next.
#
# - The trie traversal is also in left-to-right path order (also approximate).
# - The leaves are fetched in left-to-right path order (also approximate).
#
# - The left-to-right order is very beneficial to the remote database reads.
# - Even though hashes for lookup are randomly ordered, and the remote must
# use them in lookups, many remote databases store the leaves in some way
# indexed by path. If so, this order will greatly improve lookup locality,
# which directly reduces the amount of storage I/O time and latency.
# - The left-to-right order is beneficial to the local database writes as well.
# - If the local database indexes by path, the left-to-right write order will
# increase storage density by a lot in a B-tree compared with random order.
# - If the local database doesn't index by path at all, but does use "rowid"
# internally (like SQLite by default), the left-to-right write order will
# improve read performance when other peers sync reading this local node.
proc cmpSingleNodeRequest(x, y: SingleNodeRequest): int =
# `x` and `y` are deliberately swapped to get descending order. See above.
cmp(y.path, x.path)
sort(fetch.nodeGetQueue, cmpSingleNodeRequest)
trace "Trie: Sort length", sortLen=i
# If stopped, abort all waiting nodes, so they clean up.
if fetch.sp.stopThisState or fetch.sp.stopped:
while i > 0:
fetch.nodeGetQueue[i].future.complete(@[])
dec i
fetch.nodeGetQueue.setLen(0)
return
var hashes = newSeqOfCap[NodeHash](maxBatchGetNodeData)
var futures = newSeqOfCap[Future[Blob]](maxBatchGetNodeData)
while i > 0 and fetch.nodeGetsInFlight < maxParallelGetNodeData:
var pathToIndex = -1
var pathFromIndex = -1
while i > 0 and futures.len < maxBatchGetNodeData:
dec i
if pathToIndex < 0 or
fetch.nodeGetQueue[i].path > fetch.nodeGetQueue[pathToIndex].path:
pathToIndex = i
if pathFromIndex < 0 or
fetch.nodeGetQueue[i].path < fetch.nodeGetQueue[pathFromIndex].path:
pathFromIndex = i
hashes.add(fetch.nodeGetQueue[i].hash)
futures.add(fetch.nodeGetQueue[i].future)
asyncSpawn fetch.wrapCallGetNodeData(hashes, futures,
fetch.nodeGetQueue[pathFromIndex].path,
fetch.nodeGetQueue[pathToIndex].path)
hashes.setLen(0)
futures.setLen(0)
fetch.nodeGetQueue.setLen(i)
proc scheduleBatchGetNodeData(fetch: FetchState) =
if not fetch.scheduledBatch:
fetch.scheduledBatch = true
proc batchGetNodeData(arg: pointer) =
let fetch = cast[FetchState](arg)
fetch.scheduledBatch = false
fetch.batchGetNodeData()
# We rely on `callSoon` scheduling for the _end_ of the current run list,
# after other async functions finish adding more single node requests.
callSoon(batchGetNodeData, cast[pointer](fetch))
proc getNodeData(fetch: FetchState,
hash: TrieHash, path: InteriorPath): Future[Blob] {.async.} =
## Request _one_ item of trie node data asynchronously. This function
## batches requested into larger `eth.GetNodeData` requests efficiently.
if traceIndividualNodes:
trace "> Fetching individual NodeData", peer=fetch.sp,
depth=path.depth, path, hash=($hash)
let future = newFuture[Blob]()
fetch.nodeGetQueue.add(SingleNodeRequest(
hash: hash,
path: path,
future: future
))
if not fetch.scheduledBatch:
fetch.scheduleBatchGetNodeData()
let nodeBytes = await future
if fetch.sp.stopThisState or fetch.sp.stopped:
return nodebytes
if tracePackets:
doAssert nodeBytes.len == 0 or nodeDataHash(nodeBytes) == hash
if traceIndividualNodes:
if nodeBytes.len > 0:
trace "< Received individual NodeData", peer=fetch.sp,
depth=path.depth, path, hash=($hash),
nodeLen=nodeBytes.len, nodeBytes=nodeBytes.toHex
else:
trace "< Received EMPTY individual NodeData", peer=fetch.sp,
depth=path.depth, path, hash=($hash),
nodeLen=nodeBytes.len
return nodeBytes
proc pathInRange(fetch: FetchState, path: InteriorPath): bool =
# TODO: This method is ugly and unnecessarily slow.
var compare = fetch.leafRange.leafLow.toInteriorPath
while compare.depth > path.depth:
compare.pop()
if path < compare:
return false
compare = fetch.leafRange.leafHigh.toInteriorPath
while compare.depth > path.depth:
compare.pop()
if path > compare:
return false
return true
proc traverse(fetch: FetchState, hash: NodeHash, path: InteriorPath,
fromExtension: bool) {.async.} =
template errorReturn() =
fetch.sp.stopThisState = true
dec fetch.nodesInFlight
if fetch.nodesInFlight == 0:
fetch.finish.complete()
return
# If something triggered stop earlier, don't request, and clean up now.
if fetch.sp.stopThisState or fetch.sp.stopped:
errorReturn()
let nodeBytes = await fetch.getNodeData(hash, path)
# If something triggered stop, clean up now.
if fetch.sp.stopThisState or fetch.sp.stopped:
errorReturn()
# Don't keep emitting error messages after one error. We'll allow 10.
if fetch.getNodeDataErrors >= 10:
errorReturn()
var parseContext: TrieNodeParseContext # Default values are fine.
try:
fetch.sp.parseTrieNode(path, hash, nodeBytes, fromExtension, parseContext)
except RlpError as e:
fetch.sp.parseTrieNodeError(path, hash, nodeBytes, parseContext, e)
if parseContext.errors > 0:
debug "Aborting trie traversal due to errors", peer=fetch.sp
inc fetch.getNodeDataErrors
errorReturn()
if parseContext.childQueue.len > 0:
for i in 0 ..< parseContext.childQueue.len:
let (nodePath, nodeHash, fromExtension) = parseContext.childQueue[i]
# Discard traversals outside the current leaf range.
if not fetch.pathInRange(nodePath):
continue
# Here, with `await` results in depth-first traversal and `asyncSpawn`
# results in breadth-first. Neither is what we really want. Depth-first
# is delayed by a round trip time for every node. It's far too slow.
# Pure breadth-first expands to an ever increasing pipeline of requests
# until it runs out of memory, although network timing means that is
# unlikely. That's far too risky. However the sorting in the request
# dispatcher left-biases the traversal so that the out of memory
# condition won't occur.
inc fetch.nodesInFlight
asyncSpawn fetch.traverse(nodeHash, nodePath, fromExtension)
if parseContext.leafQueue.len > 0:
for i in 0 ..< parseContext.leafQueue.len:
let leafPtr = addr parseContext.leafQueue[i]
template leafPath: auto = leafPtr[0]
# Discard leaves outside the current leaf range.
if leafPtr[0] < fetch.leafRange.leafLow or
leafPtr[0] > fetch.leafRange.leafHigh:
continue
template leafBytes: auto = leafPtr[2]
inc fetch.unwindAccounts
fetch.unwindAccountBytes += leafBytes.len
inc fetch.sp.sharedFetch.countAccounts
fetch.sp.sharedFetch.countAccountBytes += leafBytes.len
dec fetch.nodesInFlight
if fetch.nodesInFlight == 0:
fetch.finish.complete()
proc probeGetNodeData(sp: SyncPeer, stateRoot: TrieHash): Future[bool] {.async.} =
# Before doing real trie traversal on this peer, send a probe request for
# `stateRoot` to see if it's worth pursuing at all. We will avoid reserving
# a slice of leafspace, even temporarily, if no traversal will take place.
#
# Possible outcomes:
#
# - Trie root is returned. Peers supporting `GetNodeData` do this as long as
# `stateRoot` is still in their pruning window, and isn't on a superseded
# chain branch.
#
# - Empty reply, meaning "I don't have the data". Peers supporting
# `GetNodeData` do this when `stateRoot` is no longer available.
# OpenEthereum does this for all nodes from version 3.3.0-rc.8.
#
# - No reply at all (which is out of spec). Erigon does this. It should
# send an empty reply. We don't want to cut off a peer for other purposes
# such as a source of blocks and transactions, just because it doesn't
# reply to `GetNodeData`.
let reply = await sp.getNodeData(@[stateRoot],
rootInteriorPath, rootInteriorPath)
return not reply.isNil and reply.hashVerifiedData.len == 1
proc trieFetch*(sp: SyncPeer, stateRoot: TrieHash,
leafRange: LeafRange) {.async.} =
if sp.fetch.isNil:
sp.fetch = FetchState(sp: sp)
template fetch: auto = sp.fetch
fetch.leafRange = leafRange
fetch.finish = newFuture[void]()
fetch.unwindAccounts = 0
fetch.unwindAccountBytes = 0
inc fetch.nodesInFlight
await fetch.traverse(stateRoot.NodeHash, rootInteriorPath, false)
await fetch.finish
if fetch.getNodeDataErrors == 0:
sp.countSlice(leafRange, false)
else:
sp.sharedFetch.countAccounts -= fetch.unwindAccounts
sp.sharedFetch.countAccountBytes -= fetch.unwindAccountBytes
sp.putSlice(leafRange)
proc peerSupportsGetNodeData*(sp: SyncPeer): bool =
not sp.stopped and (sp.fetch.isNil or sp.fetch.getNodeDataErrors == 0)

View File

@ -0,0 +1,81 @@
# Nimbus - Fetch account and storage states from peers efficiently
#
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises: [Defect].}
import
std/[sets, tables, random],
chronos,
nimcrypto/keccak,
stint,
eth/[common/eth_types, p2p, rlp],
../sync_types,
"."/[common, fetch_trie, fetch_snap]
# Note: To test disabling snap (or trie), modify `peerSupportsGetNodeData` or
# `peerSupportsSnap` where those are defined.
proc stateFetch*(sp: SyncPeer) {.async.} =
var stateRoot = sp.syncStateRoot.get
trace "Sync: Syncing from stateRoot", peer=sp, stateRoot=($stateRoot)
while true:
if not sp.peerSupportsGetNodeData() and not sp.peerSupportsSnap():
trace "Sync: Cannot sync more from this peer", peer=sp
return
if not sp.hasSlice():
trace "Sync: Nothing more to sync from this peer", peer=sp
while not sp.hasSlice():
await sleepAsync(5.seconds) # TODO: Use an event trigger instead.
if sp.syncStateRoot.isNone:
trace "Sync: No current state root for this peer", peer=sp
while sp.syncStateRoot.isNone and
(sp.peerSupportsGetNodeData() or sp.peerSupportsSnap()) and
sp.hasSlice():
await sleepAsync(5.seconds) # TODO: Use an event trigger instead.
continue
if stateRoot != sp.syncStateRoot.get:
trace "Sync: Syncing from new stateRoot", peer=sp, stateRoot=($stateRoot)
stateRoot = sp.syncStateRoot.get
sp.stopThisState = false
if sp.stopThisState:
trace "Sync: Pausing sync until we get a new state root", peer=sp
while sp.syncStateRoot.isSome and stateRoot == sp.syncStateRoot.get and
(sp.peerSupportsGetNodeData() or sp.peerSupportsSnap()) and
sp.hasSlice():
await sleepAsync(5.seconds) # TODO: Use an event trigger instead.
continue
var leafRange: LeafRange
# Mix up different slice modes, because when connecting to static nodes one
# mode or the other tends to dominate, which makes the mix harder to test.
var allowSnap = true
if sp.peerSupportsSnap() and sp.peerSupportsGetNodeData():
if rand(99) < 50:
allowSnap = false
if sp.peerSupportsSnap() and allowSnap:
discard sp.getSlice(leafRange)
trace "Sync: snap.GetAccountRange segment", peer=sp,
leafRange=pathRange(leafRange.leafLow, leafRange.leafHigh),
stateRoot=($stateRoot)
await sp.snapFetch(stateRoot, leafRange)
elif sp.peerSupportsGetNodeData():
discard sp.getSlice(leafRange)
trace "Sync: eth.GetNodeData segment", peer=sp,
leafRange=pathRange(leafRange.leafLow, leafRange.leafHigh),
stateRoot=($stateRoot)
await sp.trieFetch(stateRoot, leafRange)

View File

@ -13,12 +13,20 @@
import
chronos, stint, chronicles, stew/byteutils, macros,
eth/[common/eth_types, rlp, p2p],
eth/p2p/[rlpx, private/p2p_types, blockchain_utils]
eth/p2p/[rlpx, private/p2p_types, blockchain_utils],
../sync_types
export
tracePackets, tracePacket,
traceGossips, traceGossip,
traceTimeouts, traceTimeout,
traceNetworkErrors, traceNetworkError,
tracePacketErrors, tracePacketError
type
NewBlockHashesAnnounce* = object
hash: KeccakHash
number: uint64 # Note: Was `uint`, wrong on 32-bit targets.
hash: BlockHash
number: BlockNumber
NewBlockAnnounce* = EthBlock
@ -26,18 +34,16 @@ type
forkHash: array[4, byte] # The RLP encoding must be exactly 4 bytes.
forkNext: BlockNumber # The RLP encoding must be variable-length
NewBlockAndTotalDiff* = object
ethBlock: EthBlock
totalDifficulty: DifficultyInt
PeerState = ref object
initialized*: bool
bestBlockHash*: KeccakHash
bestBlockHash*: BlockHash
bestDifficulty*: DifficultyInt
const
tracePackets* = true # Set `true` or `false` to control packet traces.
traceHandshakes* = true
onGetNodeData*:
proc (peer: Peer, hashes: openArray[NodeHash],
data: var seq[Blob]) {.gcsafe.}
onNodeData*:
proc (peer: Peer, data: openArray[Blob]) {.gcsafe.}
const
maxStateFetch* = 384
@ -46,11 +52,13 @@ const
maxHeadersFetch* = 192
ethVersion* = 66
func toHex*(x: KeccakHash): string = x.data.toHex
macro tracePacket*(msg: static[string], args: varargs[untyped]) =
quote do:
if tracePackets:
trace `msg`, `args`
func toHex*(hash: Hash256): string = hash.data.toHex
func traceStep*(request: BlocksRequest): string =
var str = if request.reverse: "-" else: "+"
if request.skip < high(typeof(request.skip)):
return str & $(request.skip + 1)
return static($(high(typeof(request.skip)).u256 + 1))
p2pProtocol eth(version = ethVersion,
peerState = PeerState,
@ -64,7 +72,7 @@ p2pProtocol eth(version = ethVersion,
chainForkId = chain.getForkId(bestBlock.blockNumber)
forkId = ForkId(
forkHash: chainForkId.crc.toBytesBE,
forkNext: chainForkId.nextFork.u256)
forkNext: chainForkId.nextFork.toBlockNumber)
tracePacket ">> Sending eth.Status (0x00) [eth/" & $ethVersion & "]",
peer, td=bestBlock.difficulty,
@ -111,8 +119,8 @@ p2pProtocol eth(version = ethVersion,
ethVersionArg: uint,
networkId: NetworkId,
totalDifficulty: DifficultyInt,
bestHash: KeccakHash,
genesisHash: KeccakHash,
bestHash: BlockHash,
genesisHash: BlockHash,
forkId: ForkId) =
tracePacket "<< Received eth.Status (0x00) [eth/" & $ethVersion & "]",
peer, td=totalDifficulty,
@ -123,21 +131,37 @@ p2pProtocol eth(version = ethVersion,
# User message 0x01: NewBlockHashes.
proc newBlockHashes(peer: Peer, hashes: openArray[NewBlockHashesAnnounce]) =
tracePacket "<< Discarding eth.NewBlockHashes (0x01)",
peer, count=hashes.len
traceGossip "<< Discarding eth.NewBlockHashes (0x01)",
peer, hashes=hashes.len
discard
# User message 0x02: Transactions.
proc transactions(peer: Peer, transactions: openArray[Transaction]) =
tracePacket "<< Discarding eth.Transactions (0x02)",
peer, count=transactions.len
traceGossip "<< Discarding eth.Transactions (0x02)",
peer, transactions=transactions.len
discard
requestResponse:
# User message 0x03: GetBlockHeaders.
proc getBlockHeaders(peer: Peer, request: BlocksRequest) =
tracePacket "<< Received eth.GetBlockHeaders (0x03)",
peer, count=request.maxResults
if tracePackets:
if request.maxResults == 1 and request.startBlock.isHash:
tracePacket "<< Received eth.GetBlockHeaders/Hash (0x03)",
peer, blockHash=($request.startBlock.hash), count=1
elif request.maxResults == 1:
tracePacket "<< Received eth.GetBlockHeaders (0x03)",
peer, `block`=request.startBlock.number, count=1
elif request.startBlock.isHash:
tracePacket "<< Received eth.GetBlockHeaders/Hash (0x03)",
peer, firstBlockHash=($request.startBlock.hash),
count=request.maxResults,
step=traceStep(request)
else:
tracePacket "<< Received eth.GetBlockHeaders (0x03)",
peer, firstBlock=request.startBlock.number,
count=request.maxResults,
step=traceStep(request)
if request.maxResults > uint64(maxHeadersFetch):
debug "eth.GetBlockHeaders (0x03) requested too many headers",
peer, requested=request.maxResults, max=maxHeadersFetch
@ -147,10 +171,10 @@ p2pProtocol eth(version = ethVersion,
let headers = peer.network.chain.getBlockHeaders(request)
if headers.len > 0:
tracePacket ">> Replying with eth.BlockHeaders (0x04)",
peer, count=headers.len
peer, sent=headers.len, requested=request.maxResults
else:
tracePacket ">> Replying EMPTY eth.BlockHeaders (0x04)",
peer, count=0
peer, sent=0, requested=request.maxResults
await response.send(headers)
@ -159,9 +183,9 @@ p2pProtocol eth(version = ethVersion,
requestResponse:
# User message 0x05: GetBlockBodies.
proc getBlockBodies(peer: Peer, hashes: openArray[KeccakHash]) =
proc getBlockBodies(peer: Peer, hashes: openArray[BlockHash]) =
tracePacket "<< Received eth.GetBlockBodies (0x05)",
peer, count=hashes.len
peer, hashes=hashes.len
if hashes.len > maxBodiesFetch:
debug "eth.GetBlockBodies (0x05) requested too many bodies",
peer, requested=hashes.len, max=maxBodiesFetch
@ -171,10 +195,10 @@ p2pProtocol eth(version = ethVersion,
let bodies = peer.network.chain.getBlockBodies(hashes)
if bodies.len > 0:
tracePacket ">> Replying with eth.BlockBodies (0x06)",
peer, count=bodies.len
peer, sent=bodies.len, requested=hashes.len
else:
tracePacket ">> Replying EMPTY eth.BlockBodies (0x06)",
peer, count=0
peer, sent=0, requested=hashes.len
await response.send(bodies)
@ -185,26 +209,26 @@ p2pProtocol eth(version = ethVersion,
proc newBlock(peer: Peer, bh: EthBlock, totalDifficulty: DifficultyInt) =
# (Note, needs to use `EthBlock` instead of its alias `NewBlockAnnounce`
# because either `p2pProtocol` or RLPx doesn't work with an alias.)
tracePacket "<< Discarding eth.NewBlock (0x07)",
traceGossip "<< Discarding eth.NewBlock (0x07)",
peer, totalDifficulty,
blockNumber = bh.header.blockNumber,
blockDifficulty = bh.header.difficulty
discard
# User message 0x08: NewPooledTransactionHashes.
proc newPooledTransactionHashes(peer: Peer, hashes: openArray[KeccakHash]) =
tracePacket "<< Discarding eth.NewPooledTransactionHashes (0x08)",
peer, count=hashes.len
proc newPooledTransactionHashes(peer: Peer, hashes: openArray[TxHash]) =
traceGossip "<< Discarding eth.NewPooledTransactionHashes (0x08)",
peer, hashes=hashes.len
discard
requestResponse:
# User message 0x09: GetPooledTransactions.
proc getPooledTransactions(peer: Peer, hashes: openArray[KeccakHash]) =
proc getPooledTransactions(peer: Peer, hashes: openArray[TxHash]) =
tracePacket "<< Received eth.GetPooledTransactions (0x09)",
peer, count=hashes.len
peer, hashes=hashes.len
tracePacket ">> Replying EMPTY eth.PooledTransactions (0x10)",
peer, count=0
peer, sent=0, requested=hashes.len
await response.send([])
# User message 0x0a: PooledTransactions.
@ -212,33 +236,44 @@ p2pProtocol eth(version = ethVersion,
nextId 0x0d
requestResponse:
# User message 0x0d: GetNodeData.
proc getNodeData(peer: Peer, hashes: openArray[KeccakHash]) =
tracePacket "<< Received eth.GetNodeData (0x0d)",
peer, count=hashes.len
# User message 0x0d: GetNodeData.
proc getNodeData(peer: Peer, hashes: openArray[NodeHash]) =
tracePacket "<< Received eth.GetNodeData (0x0d)", peer,
hashes=hashes.len
let blobs = peer.network.chain.getStorageNodes(hashes)
if blobs.len > 0:
tracePacket ">> Replying with eth.NodeData (0x0e)",
peer, count=blobs.len
else:
tracePacket ">> Replying EMPTY eth.NodeData (0x0e)",
peer, count=0
var data: seq[Blob]
if not peer.state.onGetNodeData.isNil:
peer.state.onGetNodeData(peer, hashes, data)
else:
data = peer.network.chain.getStorageNodes(hashes)
await response.send(blobs)
if data.len > 0:
tracePacket ">> Replying with eth.NodeData (0x0e)", peer,
sent=data.len, requested=hashes.len
else:
tracePacket ">> Replying EMPTY eth.NodeData (0x0e)", peer,
sent=0, requested=hashes.len
# User message 0x0e: NodeData.
proc nodeData(peer: Peer, data: openArray[Blob])
await peer.nodeData(data)
# User message 0x0e: NodeData.
proc nodeData(peer: Peer, data: openArray[Blob]) =
if not peer.state.onNodeData.isNil:
# The `onNodeData` should do its own `tracePacket`, because we don't
# know if this is a valid reply ("Got reply") or something else.
peer.state.onNodeData(peer, data)
else:
tracePacket "<< Discarding eth.NodeData (0x0e)", peer,
bytes=data.len
requestResponse:
# User message 0x0f: GetReceipts.
proc getReceipts(peer: Peer, hashes: openArray[KeccakHash]) =
proc getReceipts(peer: Peer, hashes: openArray[BlockHash]) =
tracePacket "<< Received eth.GetReceipts (0x0f)",
peer, count=hashes.len
peer, hashes=hashes.len
tracePacket ">> Replying EMPTY eth.Receipts (0x10)",
peer, count=0
peer, sent=0, requested=hashes.len
await response.send([])
# TODO: implement `getReceipts` and reactivate this code
# await response.send(peer.network.chain.getReceipts(hashes))

View File

@ -0,0 +1,375 @@
# Nimbus - Ethereum Snap Protocol (SNAP), version 1
#
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## This module implements Ethereum Snapshot Protocol (SNAP), `snap/1`, as
## specified at the reference below, but modified for Geth compatibility.
##
## - [Ethereum Snapshot Protocol (SNAP)]
## (https://github.com/ethereum/devp2p/blob/master/caps/snap.md)
##
## Note: The `snap/1` specification doesn't match reality. If we implement the
## protocol as specified, Geth drops the peer connection. We must do as Geth
## expects.
##
## Modifications for Geth compatibility
## ------------------------------------
##
## - `GetAccountRanges` and `GetStorageRanges` take parameters `origin` and
## `limit`, instead of a single `startingHash` parameter in the
## specification. `origin` and `limit` are 256-bit paths representing the
## starting hash and ending trie path, both inclusive.
##
## - If the `snap/1` specification is followed (omitting `limit`), Geth 1.10
## disconnects immediately so we must follow this deviation.
##
## - Results from either call may include one item with path `>= limit`. Geth
## fetches data from its internal database until it reaches this condition or
## the bytes threshold, then replies with what it fetched. Usually there is
## no item at the exact path `limit`, so there is one after.
##
## - `GetAccountRanges` parameters `origin` and `limit` must be 32 byte blobs.
## There is no reason why empty limit is not allowed here when it is allowed
## for `GetStorageRanges`, it just isn't.
##
## `GetStorageRanges` quirks for Geth compatibility
## ------------------------------------------------
##
## When calling a Geth peer with `GetStorageRanges`:
##
## - Parameters `origin` and `limit` may each be empty blobs, which mean "all
## zeros" (0x00000...) or "no limit" (0xfffff...) respectively.
##
## (Blobs shorter than 32 bytes can also be given, and they are extended with
## zero bytes; longer than 32 bytes can be given and are truncated, but this
## is Geth being too accepting, and shouldn't be used.)
##
## - In the `slots` reply, the last account's storage list may be empty even if
## that account has non-empty storage.
##
## This happens when the bytes threshold is reached just after finishing
## storage for the previous account, or when `origin` is greater than the
## first account's last storage slot. When either of these happens, `proof`
## is non-empty. In the case of `origin` zero or empty, the non-empty proof
## only contains the left-side boundary proof, because it meets the condition
## for omitting the right-side proof described in the next point.
##
## - In the `proof` reply, the right-side boundary proof is only included if
## the last returned storage slot has non-zero path and `origin != 0`, or if
## the result stops due to reaching the bytes threshold.
##
## Because there's only one proof anyway if left-side and right-side are the
## same path, this works out to mean the right-side proof is omitted in cases
## where `origin == 0` and the result stops at a slot `>= limit` before
## reaching the bytes threshold.
##
## Although the specification doesn't say anything about `limit`, this is
## against the spirit of the specification rule, which says the right-side
## proof is always included if the last returned path differs from the
## starting hash.
##
## The omitted right-side proof can cause problems when using `limit`.
## In other words, when doing range queries, or merging results from
## pipelining where different `stateRoot` hashes are used as time progresses.
## Workarounds:
##
## - Fetch the proof using a second `GetStorageRanges` query with non-zero
## `origin` (perhaps equal to `limit`; use `origin = 1` if `limit == 0`).
##
## - Avoid the condition by using `origin >= 1` when using `limit`.
##
## - Use trie node traversal (`snap` `GetTrieNodes` or `eth` `GetNodeData`)
## to obtain the omitted proof.
##
## - When multiple accounts are requested with `origin > 0`, only one account's
## storage is returned. There is no point requesting multiple accounts with
## `origin > 0`. (It might be useful if it treated `origin` as applying to
## only the first account, but it doesn't.)
##
## - When multiple accounts are requested with non-default `limit` and
## `origin == 0`, and the first account result stops at a slot `>= limit`
## before reaching the bytes threshold, storage for the other accounts in the
## request are returned as well. The other accounts are not limited by
## `limit`, only the bytes threshold. The right-side proof is omitted from
## `proof` when this happens, because this is the same condition as described
## earlier for omitting the right-side proof. (It might be useful if it
## treated `origin` as applying to only the first account and `limit` to only
## the last account, but it doesn't.)
##
## Performance benefits
## --------------------
##
## `snap` is used for much higher performance transfer of the entire Ethereum
## execution state (accounts, storage, bytecode) compared with hexary trie
## traversal using `eth` `GetNodeData`.
##
## It improves both network and local storage performance. The benefits are
## substantial, and summarised here:
##
## - [Ethereum Snapshot Protocol (SNAP) - Expected results]
## (https://github.com/ethereum/devp2p/blob/master/caps/snap.md)
## - [Geth v1.10.0 - Snap sync]
## (https://blog.ethereum.org/2021/03/03/geth-v1-10-0/#snap-sync)
##
## In the Snap sync model, local storage benefits require clients to adopt a
## different representation of Ethereum state than the trie storage that Geth
## (and most clients) traditionally used, and still do in archive mode,
##
## However, Nimbus's sync method obtains similar local storage benefits
## whichever network protocol is used. Nimbus uses `snap` protocol because it
## is a more efficient network protocol.
##
## Remote state and Beam sync benefits
## -----------------------------------
##
## `snap` was not intended for Beam sync, or "remote state on demand", used by
## transactions executing locally that fetch state from the network instead of
## local storage.
##
## Even so, as a lucky accident `snap` allows individual states to be fetched
## in fewer network round trips than `eth`. Often a single round trip,
## compared with about 10 round trips per account query over `eth`. This is
## because `eth` `GetNodeData` requires a trie traversal chasing hashes
## sequentially, while `snap` `GetTrieNode` trie traversal can be done with
## predictable paths.
##
## Therefore `snap` can be used to accelerate remote states and Beam sync.
##
## Distributed hash table (DHT) building block
## -------------------------------------------
##
## Although `snap` was designed for bootstrapping clients with the entire
## Ethereum state, it is well suited to fetching only a subset of path ranges.
## This may be useful for bootstrapping distributed hash tables (DHTs).
##
## Path range metadata benefits
## ----------------------------
##
## Because data is handled in path ranges, this allows a compact metadata
## representation of what data is stored locally and what isn't, compared with
## the size of a representation of partially completed trie traversal with
## `eth` `GetNodeData`. Due to the smaller metadata, after aborting a partial
## sync and restarting, it is possible to resume quickly, without waiting for
## the very slow local database scan associated with older versions of Geth.
##
## However, Nimbus's sync method uses this principle as inspiration to
## obtain similar metadata benefits whichever network protocol is used.
import
std/options,
chronicles,
chronos,
eth/[common/eth_types, rlp, p2p],
eth/p2p/[rlpx, private/p2p_types],
nimcrypto/hash,
stew/byteutils,
stint,
../sync_types
type
SnapAccount* = object
accHash*: LeafPath
accBody* {.rlpCustomSerialization.}: Account
SnapAccountProof* = seq[Blob]
SnapStorage* = object
slotHash*: LeafPath
slotData*: Blob
SnapStorageProof* = seq[Blob]
# The `snap` protocol represents `Account` differently from the regular RLP
# serialisation used in `eth` protocol as well as the canonical Merkle hash
# over all accounts. In `snap`, empty storage hash and empty code hash are
# each represented by an RLP zero-length string instead of the full hash. This
# avoids transmitting these hashes in about 90% of accounts. We need to
# recognise or set these hashes in `Account` when serialising RLP for `snap`.
const EMPTY_STORAGE_HASH* =
"56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421".toDigest
const EMPTY_CODE_HASH* =
"c5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470".toDigest
proc read*(rlp: var Rlp, t: var SnapAccount, _: type Account): Account =
## RLP decoding for `SnapAccount`, which contains a path and account.
## The snap representation of the account differs from `Account` RLP.
## Empty storage hash and empty code hash are each represented by an
## RLP zero-length string instead of the full hash.
rlp.tryEnterList()
result.nonce = rlp.read(typeof(result.nonce))
result.balance = rlp.read(typeof(result.balance))
if rlp.blobLen != 0 or not rlp.isBlob:
result.storageRoot = rlp.read(typeof(result.storageRoot))
if result.storageRoot == EMPTY_STORAGE_HASH:
raise newException(RlpTypeMismatch,
"EMPTY_STORAGE_HASH not encoded as empty string in Snap protocol"
)
else:
rlp.skipElem()
result.storageRoot = EMPTY_STORAGE_HASH
if rlp.blobLen != 0 or not rlp.isBlob:
result.codeHash = rlp.read(typeof(result.codeHash))
if result.codeHash == EMPTY_CODE_HASH:
raise newException(RlpTypeMismatch,
"EMPTY_CODE_HASH not encoded as empty string in Snap protocol"
)
else:
rlp.skipElem()
result.codeHash = EMPTY_CODE_HASH
proc append*(rlpWriter: var RlpWriter, t: SnapAccount, account: Account) =
## RLP encoding for `SnapAccount`, which contains a path and account.
## The snap representation of the account differs from `Account` RLP.
## Empty storage hash and empty code hash are each represented by an
## RLP zero-length string instead of the full hash.
rlpWriter.append(account.nonce)
rlpWriter.append(account.balance)
if account.storageRoot == EMPTY_STORAGE_HASH:
rlpWriter.append("")
else:
rlpWriter.append(account.storageRoot)
if account.codeHash == EMPTY_CODE_HASH:
rlpWriter.append("")
else:
rlpWriter.append(account.codeHash)
# RLP serialisation for `LeafPath`.
template read*(rlp: var Rlp, _: type LeafPath): LeafPath =
rlp.read(array[sizeof(LeafPath().toBytes), byte]).toLeafPath
template append*(rlpWriter: var RlpWriter, leafPath: LeafPath) =
rlpWriter.append(leafPath.toBytes)
# TODO: Don't know why, but the `p2pProtocol` can't handle this type. It tries
# to serialise the `Option` as an object, looking at the internal fields. But
# then fails because they are private fields.
#
## RLP serialisation for `Option[SnapPath]`.
#
#proc read*(rlp: var Rlp, _: type Option[SnapPath]): Option[SnapPath] =
# if rlp.blobLen == 0 and rlp.isBlob:
# result = none(SnapPath)
# else:
# result = some(read(rlp, SnapPath))
#proc write*(rlpWriter: var RlpWriter, value: Option[SnapPath]) =
# if value.isNone:
# rlpWriter.append("")
# else:
# rlpWriter.append(value.unsafeGet)
p2pProtocol snap1(version = 1,
rlpxName = "snap",
useRequestIds = true):
requestResponse:
# User message 0x00: GetAccountRange.
# Note: `origin` and `limit` differs from the specification to match Geth.
proc getAccountRange(peer: Peer, rootHash: TrieHash,
# Next line differs from spec to match Geth.
origin: LeafPath, limit: LeafPath,
responseBytes: uint64) =
tracePacket "<< Received snap.GetAccountRange (0x00)", peer,
accountRange=pathRange(origin, limit),
stateRoot=($rootHash), responseBytes
tracePacket ">> Replying EMPTY snap.AccountRange (0x01)", peer, sent=0
await response.send(@[], @[])
# User message 0x01: AccountRange.
proc accountRange(peer: Peer, accounts: seq[SnapAccount],
proof: SnapAccountProof)
requestResponse:
# User message 0x02: GetStorageRanges.
# Note: `origin` and `limit` differs from the specification to match Geth.
proc getStorageRanges(peer: Peer, rootHash: TrieHash,
accounts: openArray[LeafPath],
# Next line differs from spec to match Geth.
origin: openArray[byte], limit: openArray[byte],
responseBytes: uint64) =
if tracePackets:
var definiteFullRange = ((origin.len == 32 or origin.len == 0) and
(limit.len == 32 or limit.len == 0))
if definiteFullRange:
for i in 0 ..< origin.len:
if origin[i] != 0x00:
definiteFullRange = false
break
if definiteFullRange:
for i in 0 ..< limit.len:
if limit[i] != 0xff:
definiteFullRange = false
break
template describe(value: openArray[byte]): string =
if value.len == 0: "(empty)"
elif value.len == 32: value.toHex
else: "(non-standard-len=" & $value.len & ')' & value.toHex
if definiteFullRange:
# Fetching storage for multiple accounts.
tracePacket "<< Received snap.GetStorageRanges/A (0x02)", peer,
accountPaths=accounts.len,
stateRoot=($rootHash), responseBytes
elif accounts.len == 1:
# Fetching partial storage for one account, aka. "large contract".
tracePacket "<< Received snap.GetStorageRanges/S (0x02)", peer,
accountPaths=1,
storageRange=(describe(origin) & '-' & describe(limit)),
stateRoot=($rootHash), responseBytes
else:
# This branch is separated because these shouldn't occur. It's not
# really specified what happens when there are multiple accounts and
# non-default path range.
tracePacket "<< Received snap.GetStorageRanges/AS?? (0x02)", peer,
accountPaths=accounts.len,
storageRange=(describe(origin) & '-' & describe(limit)),
stateRoot=($rootHash), responseBytes
tracePacket ">> Replying EMPTY snap.StorageRanges (0x03)", peer, sent=0
await response.send(@[], @[])
# User message 0x03: StorageRanges.
# Note: See comments in this file for a list of Geth quirks to expect.
proc storageRange(peer: Peer, slots: openArray[seq[SnapStorage]],
proof: SnapStorageProof)
# User message 0x04: GetByteCodes.
requestResponse:
proc getByteCodes(peer: Peer, hashes: openArray[NodeHash],
responseBytes: uint64) =
tracePacket "<< Received snap.GetByteCodes (0x04)", peer,
hashes=hashes.len, responseBytes
tracePacket ">> Replying EMPTY snap.ByteCodes (0x05)", peer, sent=0
await response.send(@[])
# User message 0x05: ByteCodes.
proc byteCodes(peer: Peer, codes: openArray[Blob])
# User message 0x06: GetTrieNodes.
requestResponse:
proc getTrieNodes(peer: Peer, rootHash: TrieHash,
paths: openArray[InteriorPath], responseBytes: uint64) =
tracePacket "<< Received snap.GetTrieNodes (0x06)", peer,
nodePaths=paths.len, stateRoot=($rootHash), responseBytes
tracePacket ">> Replying EMPTY snap.TrieNodes (0x07)", peer, sent=0
await response.send(@[])
# User message 0x07: TrieNodes.
proc trieNodes(peer: Peer, nodes: openArray[Blob])

View File

@ -8,5 +8,5 @@
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import ./protocol_eth66
export protocol_eth66
import ./protocol/eth66
export eth66

View File

@ -0,0 +1,12 @@
# Nimbus
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import ./protocol/snap1
export snap1

345
nimbus/sync/sync_types.nim Normal file
View File

@ -0,0 +1,345 @@
# Nimbus - Types, data structures and shared utilities used in network sync
#
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or
# distributed except according to those terms.
## Shared types, data structures and shared utilities used by the eth1
## network sync processes.
import
std/options,
stint, stew/byteutils, chronicles, chronos,
eth/[common/eth_types, p2p]
const
tracePackets* = true
## Whether to `trace` log each sync network message.
traceGossips* = false
## Whether to `trace` log each gossip network message.
traceHandshakes* = true
## Whether to `trace` log each network handshake message.
traceTimeouts* = true
## Whether to `trace` log each network request timeout.
traceNetworkErrors* = true
## Whether to `trace` log each network request error.
tracePacketErrors* = true
## Whether to `trace` log each messages with invalid data.
traceIndividualNodes* = false
## Whether to `trace` log each trie node, account, storage, receipt, etc.
template tracePacket*(msg: static[string], args: varargs[untyped]) =
if tracePackets: trace `msg`, `args`
template traceGossip*(msg: static[string], args: varargs[untyped]) =
if traceGossips: trace `msg`, `args`
template traceTimeout*(msg: static[string], args: varargs[untyped]) =
if traceTimeouts: trace `msg`, `args`
template traceNetworkError*(msg: static[string], args: varargs[untyped]) =
if traceNetworkErrors: trace `msg`, `args`
template tracePacketError*(msg: static[string], args: varargs[untyped]) =
if tracePacketErrors: trace `msg`, `args`
type
NewSync* = ref object
## Shared state among all peers of a syncing node.
syncPeers*: seq[SyncPeer]
sharedFetch: SharedFetchState # Exported via templates.
SyncPeer* = ref object
## Peer state tracking.
ns*: NewSync
peer*: Peer # p2pProtocol(eth65).
stopped*: bool
pendingGetBlockHeaders*:bool
stats*: SyncPeerStats
# Peer canonical chain head ("best block") search state.
syncMode*: SyncPeerMode
bestBlockNumber*: BlockNumber
bestBlockHash*: BlockHash
huntLow*: BlockNumber # Recent highest known present block.
huntHigh*: BlockNumber # Recent lowest known absent block.
huntStep*: typeof(BlocksRequest.skip)
# State root to fetch state for.
# This changes during sync and is slightly different for each peer.
syncStateRoot*: Option[TrieHash]
nodeDataRequests: NodeDataRequestQueue # Exported via templates.
fetch: FetchState # Exported via templates.
startedFetch*: bool
stopThisState*: bool
SyncPeerMode* = enum
## The current state of tracking the peer's canonical chain head.
## `bestBlockNumber` is only valid when this is `SyncLocked`.
SyncLocked
SyncOnlyHash
SyncHuntForward
SyncHuntBackward
SyncHuntRange
SyncHuntRangeFinal
SyncPeerStats = object
## Statistics counters for events associated with this peer.
## These may be used to recognise errors and select good peers.
ok*: SyncPeerStatsOk
minor*: SyncPeerStatsMinor
major*: SyncPeerStatsMajor
SyncPeerStatsOk = object
reorgDetected*: Stat
getBlockHeaders*: Stat
getNodeData*: Stat
SyncPeerStatsMinor = object
timeoutBlockHeaders*: Stat
unexpectedBlockHash*: Stat
SyncPeerStatsMajor = object
networkErrors*: Stat
excessBlockHeaders*: Stat
wrongBlockHeader*: Stat
Stat = distinct int
BlockHash* = Hash256
## Hash of a block, goes with `BlockNumber`.
TxHash* = Hash256
## Hash of a transaction.
TrieHash* = Hash256
## Hash of a trie root: accounts, storage, receipts or transactions.
NodeHash* = Hash256
## Hash of a trie node or other blob carried over `eth.NodeData`:
## account trie nodes, storage trie nodes, contract code.
InteriorPath* = object
## Path to an interior node in an Ethereum hexary trie. This is a sequence
## of 0 to 64 hex digits. 0 digits means the root node, and 64 digits
## means a leaf node whose path hasn't been converted to `LeafPath` yet.
bytes: array[32, byte] # Access with `path.digit(i)` instead.
numDigits: byte # Access with `path.depth` instead.
LeafPath* = object
## Path to a leaf in an Ethereum hexary trie. Individually, each leaf path
## is a hash, but rather than being the hash of the contents, it's the hash
## of the item's address. Collectively, these hashes have some 256-bit
## numerical properties: ordering, intervals and meaningful difference.
number: UInt256
# Use `import get_nodedata` to access the real type's methods.
NodeDataRequestQueue {.inheritable, pure.} = ref object
# Use `import trie_fetch` to access the real type's methods.
SharedFetchState {.inheritable, pure.} = ref object
# Use `import trie_fetch` to access the real type's methods.
FetchState {.inheritable, pure.} = ref object
proc inc(stat: var Stat) {.borrow.}
template nodeDataRequestsBase*(sp: SyncPeer): auto =
sp.nodeDataRequests
template `nodeDataRequests=`*(sp: SyncPeer, value: auto) =
sp.nodeDataRequests = value
template sharedFetchBase*(sp: SyncPeer): auto =
sp.ns.sharedFetch
template `sharedFetch=`*(sp: SyncPeer, value: auto) =
sp.ns.sharedFetch = value
template fetchBase*(sp: SyncPeer): auto =
sp.fetch
template `fetch=`*(sp: SyncPeer, value: auto) =
sp.fetch = value
## `InteriorPath` methods.
template maxDepth*(_: InteriorPath | typedesc[InteriorPath]): int = 64
template rootInteriorPath*(): InteriorPath =
# Initialised to empty sequence.
InteriorPath()
template toInteriorPath*(interiorpath: InteriorPath): InteriorPath =
interiorPath
template toInteriorPath*(leafPath: LeafPath): InteriorPath =
doAssert sizeof(leafPath.number.toBytesBE) * 2 == InteriorPath.maxDepth
doAssert sizeof(leafPath.number.toBytesBE) == sizeof(InteriorPath().bytes)
InteriorPath(bytes: leafPath.number.toBytesBE,
numDigits: InteriorPath.maxDepth)
template depth*(path: InteriorPath): int =
path.numDigits.int
proc digit*(path: InteriorPath, index: int): int =
doAssert index >= 0 and index < path.numDigits.int
let b = path.bytes[index shr 1]
(if (index and 1) == 0: (b shr 4) else: (b and 0x0f)).int
proc add*(path: var InteriorPath, digit: byte) =
doAssert path.numDigits < InteriorPath.maxDepth
inc path.numDigits
if (path.numDigits and 1) != 0:
path.bytes[path.numDigits shr 1] = (digit shl 4)
else:
path.bytes[(path.numDigits shr 1) - 1] += (digit and 0x0f)
proc addPair*(path: var InteriorPath, digitPair: byte) =
doAssert path.numDigits < InteriorPath.maxDepth - 1
path.numDigits += 2
if (path.numDigits and 1) == 0:
path.bytes[(path.numDigits shr 1) - 1] = digitPair
else:
path.bytes[(path.numDigits shr 1) - 1] += (digitPair shr 4)
path.bytes[path.numDigits shr 1] = (digitPair shl 4)
proc pop*(path: var InteriorPath) =
doAssert path.numDigits >= 1
dec path.numDigits
path.bytes[path.numDigits shr 1] =
if (path.numDigits and 1) == 0: 0.byte
else: path.bytes[path.numDigits shr 1] and 0xf0
proc `==`*(path1, path2: InteriorPath): bool =
# Paths are zero-padded to the end of the array, so comparison is easy.
for i in 0 ..< (max(path1.numDigits, path2.numDigits).int + 1) shr 1:
if path1.bytes[i] != path2.bytes[i]:
return false
return true
proc `<=`*(path1, path2: InteriorPath): bool =
# Paths are zero-padded to the end of the array, so comparison is easy.
for i in 0 ..< (max(path1.numDigits, path2.numDigits).int + 1) shr 1:
if path1.bytes[i] != path2.bytes[i]:
return path1.bytes[i] <= path2.bytes[i]
return true
proc cmp*(path1, path2: InteriorPath): int =
# Paths are zero-padded to the end of the array, so comparison is easy.
for i in 0 ..< (max(path1.numDigits, path2.numDigits).int + 1) shr 1:
if path1.bytes[i] != path2.bytes[i]:
return path1.bytes[i].int - path2.bytes[i].int
return 0
template `!=`*(path1, path2: InteriorPath): auto = not(path1 == path2)
template `<`*(path1, path2: InteriorPath): auto = not(path2 <= path1)
template `>=`*(path1, path2: InteriorPath): auto = path2 <= path1
template `>`*(path1, path2: InteriorPath): auto = not(path1 <= path2)
## `LeafPath` methods.
template low*(_: LeafPath | type LeafPath): auto =
LeafPath(number: low(UInt256))
template high*(_: LeafPath | type LeafPath): auto =
LeafPath(number: high(UInt256))
const leafPathBytes = sizeof(LeafPath().number.toBytesBE)
template toLeafPath*(leafPath: LeafPath): LeafPath =
leafPath
template toLeafPath*(interiorPath: InteriorPath): LeafPath =
doAssert interiorPath.numDigits == InteriorPath.maxDepth
doAssert sizeof(interiorPath.bytes) * 2 == InteriorPath.maxDepth
doAssert sizeof(interiorPath.bytes) == leafPathBytes
LeafPath(number: UInt256.fromBytesBE(interiorPath.bytes))
template toLeafPath*(bytes: array[leafPathBytes, byte]): LeafPath =
doAssert sizeof(bytes) == leafPathBytes
LeafPath(number: UInt256.fromBytesBE(bytes))
template toBytes*(leafPath: LeafPath): array[leafPathBytes, byte] =
doAssert sizeof(LeafPath().number.toBytesBE) == leafPathBytes
leafPath.number.toBytesBE
# Note, `{.borrow.}` didn't work for these symbols (with Nim 1.2.12) when we
# defined `LeafPath = distinct UInt256`. The `==` didn't match any symbol to
# borrow from, and the auto-generated `<` failed to compile, with a peculiar
# type mismatch error.
template `==`*(path1, path2: LeafPath): auto = path1.number == path2.number
template `!=`*(path1, path2: LeafPath): auto = path1.number != path2.number
template `<`*(path1, path2: LeafPath): auto = path1.number < path2.number
template `<=`*(path1, path2: LeafPath): auto = path1.number <= path2.number
template `>`*(path1, path2: LeafPath): auto = path1.number > path2.number
template `>=`*(path1, path2: LeafPath): auto = path1.number >= path2.number
template cmp*(path1, path2: LeafPath): auto = cmp(path1.number, path2.number)
template `-`*(path1, path2: LeafPath): UInt256 =
path1.number - path2.number
template `+`*(base: LeafPath, step: Uint256 | SomeInteger): LeafPath =
LeafPath(number: base.number + step)
template `-`*(base: LeafPath, step: Uint256 | SomeInteger): LeafPath =
LeafPath(number: base.number - step)
## String output functions.
template `$`*(sp: SyncPeer): string = $sp.peer
template `$`*(hash: Hash256): string = hash.data.toHex
template `$`*(blob: Blob): string = blob.toHex
template `$`*(hashOrNum: HashOrNum): string =
# It's always obvious which one from the visible length of the string.
if hashOrNum.isHash: $hashOrNum.hash
else: $hashOrNum.number
proc toHex*(path: InteriorPath, withEllipsis = true): string =
const hexChars = "0123456789abcdef"
let digits = path.numDigits.int
if not withEllipsis:
result = newString(digits)
else:
result = newString(min(digits + 3, 64))
result[^3] = '.'
result[^2] = '.'
result[^1] = '.'
for i in 0 ..< digits:
result[i] = hexChars[path.digit(i)]
template `$`*(path: InteriorPath): string = path.toHex
proc pathRange*(path1, path2: InteriorPath): string =
path1.toHex(false) & '-' & path2.toHex(false)
template toHex*(path: LeafPath): string = path.number.toBytesBE.toHex
template `$`*(path: LeafPath): string = path.toHex
proc pathRange*(path1, path2: LeafPath): string =
path1.toHex & '-' & path2.toHex
export Blob, Hash256, toHex
# The files and lines clutter more useful details when sync tracing is enabled.
publicLogScope: chroniclesLineNumbers=false
# Use `safeSetTimer` consistently, with a `ref T` argument if including one.
type
SafeCallbackFunc*[T] = proc (objectRef: ref T) {.gcsafe, raises: [Defect].}
SafeCallbackFuncVoid* = proc () {.gcsafe, raises: [Defect].}
proc safeSetTimer*[T](at: Moment, cb: SafeCallbackFunc[T],
objectRef: ref T = nil): TimerCallback =
## Like `setTimer` but takes a typed `ref T` argument, which is passed to the
## callback function correctly typed. Stores the `ref` in a closure to avoid
## garbage collection memory corruption issues that occur when the `setTimer`
## pointer argument is used.
proc chronosTimerSafeCb(udata: pointer) = cb(objectRef)
return setTimer(at, chronosTimerSafeCb)
proc safeSetTimer*[T](at: Moment, cb: SafeCallbackFuncVoid): TimerCallback =
## Like `setTimer` but takes no pointer argument. The callback function
## takes no arguments.
proc chronosTimerSafeCb(udata: pointer) = cb()
return setTimer(at, chronosTimerSafeCb)
proc setTimer*(at: Moment, cb: CallbackFunc, udata: pointer): TimerCallback
{.error: "Do not use setTimer with a `pointer` type argument".}
## `setTimer` with a non-nil pointer argument is dangerous because
## the pointed-to object is often freed or garbage collected before the
## timer callback runs. Call `setTimer` with a `ref` argument instead.
proc setTimer*(at: Moment, cb: CallbackFunc): TimerCallback =
chronos.setTimer(at, cb, nil)

View File

@ -0,0 +1,449 @@
# Nimbus - Robustly parse trie nodes from network untrusted data
#
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed except according to those terms.
## This module parses Ethereum hexary trie nodes from bytes received over the
## network. The data is untrusted, and a non-canonical RLP encoding of the
## node must be rejected, so this code parses carefully.
##
## The caller provides bytes and context. Context includes node hash, trie
## path, and a boolean saying if this trie node is child of an extension node.
##
## The result of parsing is up to 16 child nodes to visit (paths with hashes),
## or up to 7 leaf nodes to decode.
##
## This doesn't verify the node hash. The caller should ensure the bytes are
## verified against the hash separately from calling this parser. Even
## with the hash verified, the bytes are still untrusted and must be parsed
## carefully, because the hash itself is from an untrusted source.
##
## `RlpError` exceptions may occur on some well-crafted adversarial input due
## to the RLP reader implementation. They could be trapped and treated like
## other parse errors, but that's not done. Instead, the caller should put
## `try..except RlpError as e` outside its trie node parsing loop, and pass the
## exception to `parseTrieNodeError` if it occurs.
{.push raises: [Defect].}
import
eth/[common/eth_types, rlp, p2p],
"."/[sync_types]
type
TrieNodeParseContext* = object
childQueue*: seq[(InteriorPath, NodeHash, bool)]
leafQueue*: seq[(LeafPath, NodeHash, Blob)]
errors*: int
template maybeHash(nodeHash: NodeHash, nodeBytes: Blob): string =
if nodeBytes.len >= 32: $nodeHash else: "selfEncoded"
proc combinePaths(nodePath, childPath: InteriorPath): string =
let nodeHex = nodePath.toHex(false)
let childHex = childPath.toHex(true)
nodeHex & "::" & childHex[nodeHex.len..^1]
template leafError(msg: string{lit}, more: varargs[untyped]) =
mixin sp, leafPath, nodePath, nodeHash, nodeBytes, context
debug "Trie leaf data error: " & msg, peer=sp,
depth=nodePath.depth, leafDepth=leafPath.depth, `more`,
path=combinePaths(nodePath, leafPath),
hash=maybeHash(nodeHash, nodeBytes),
nodeLen=nodeBytes.len, nodeBytes=nodeBytes.toHex
echo inspect(rlpFromBytes(nodeBytes))
inc context.errors
template childError(msg: string{lit}, more: varargs[untyped]) =
mixin sp, childPath, nodePath, nodeHash, nodeBytes, context
debug "Trie data error: " & msg, peer=sp,
depth=nodePath.depth, childDepth=childPath.depth, `more`,
path=combinePaths(nodePath, childPath),
hash=maybeHash(nodeHash, nodeBytes),
nodeLen=nodeBytes.len, nodeBytes=nodeBytes.toHex
echo inspect(rlpFromBytes(nodeBytes))
inc context.errors
template nodeError(msg: string{lit}, more: varargs[untyped]) =
mixin sp, nodePath, nodeHash, nodeBytes, context
debug "Trie data error: " & msg, peer=sp,
depth=nodePath.depth, `more`,
path=nodePath.toHex(true), hash=maybeHash(nodeHash, nodeBytes),
nodeLen=nodeBytes.len, nodeBytes=nodeBytes.toHex
echo inspect(rlpFromBytes(nodeBytes))
inc context.errors
proc parseLeafValue(sp: SyncPeer,
nodePath: InteriorPath, nodeHash: NodeHash, nodeBytes: Blob,
nodeRlp: var Rlp, leafPath: InteriorPath,
context: var TrieNodeParseContext
) {.raises: [Defect, RlpError].} =
## Parse the leaf value of a trie leaf node. The caller has already updated
## `leafPath`, which means the path length can't be above the maximum.
## But it hasn't checked other path length constraints.
# Check the leaf depth. Leaves are only allowed at the maximum depth.
if leafPath.depth != InteriorPath.maxDepth:
leafError "Leaf node at interior node depth",
requiredLeafDepth=InteriorPath.maxDepth
return
# Check and parse leaf RLP.
# List elements were already counted before calling here, so we don't
# need a `nodeRlp.hasData` "has no bytes" check.
if not nodeRlp.isBlob:
leafError "Leaf value (RLP element 1) is not a blob"
return
# TODO: `blobLen` can throw if there's an encoded number larger than our
# types can represent. This is untrusted data, so we must handle that case.
let leafLen = nodeRlp.blobLen
# A leaf cannot be zero-length.
if leafLen == 0:
leafError "Leaf value (RLP element 1) is zero length"
return
context.leafQueue.add((leafPath.toLeafPath, nodeHash, nodeRlp.toBytes))
if traceIndividualNodes:
let leafBytes = context.leafQueue[^1][2]
trace "Trie: Account leaf found", peer=sp,
path=combinePaths(nodePath, leafPath),
nodeHash=maybeHash(nodeHash, nodeBytes),
leafLen, leafBytes=leafBytes.toHex
# echo inspect(rlpFromBytes(leafBytes))
# Forward declaration, used for bounded, rare recursion.
proc parseTrieNode*(sp: SyncPeer,
nodePath: InteriorPath, nodeHash: NodeHash, nodeBytes: Blob,
fromExtension: bool,
context: var TrieNodeParseContext
) {.gcsafe, raises: [Defect, RlpError].}
proc parseExtensionChild(sp: SyncPeer,
nodePath: InteriorPath, nodeHash: NodeHash,
nodeBytes: Blob, nodeRlp: var Rlp,
childPath: InteriorPath,
context: var TrieNodeParseContext
) {.raises: [Defect, RlpError].} =
## Parse the child branch of a trie extension node. The caller has already
## updated `childPath`, which means the path length can't be above the
## maximum. But it hasn't checked other path length constraints.
# In the canonical trie encoding, extension nodes must have non-zero
# length prefix. Only leaf nodes can have zero length prefix.
#
# TODO: File a correction to the Yellow Paper, Ethereum formal specification.
# The Yellow Paper says (on 2021-11-01) "Extension: [...] a series of
# nibbles of size _greater_ than one". In the real world, Mainnet and
# Goerli, there are extension nodes with prefix length one.
if childPath.depth == nodePath.depth:
childError "Extension node prefix does not increase depth",
prefixLen=(childPath.depth - nodePath.depth), minPrefixLen=1
return
# Check and parse child RLP.
# List elements were already counted before calling here, so we don't
# need a `nodeRlp.hasData` "has no bytes" check.
if not nodeRlp.isBlob:
childError "Extension node child (RLP element 1) is not a blob"
return
# TODO: `blobLen` can throw if there's an encoded number larger than our
# types can represent. This is untrusted data, so we must handle that case.
let childLen = nodeRlp.blobLen
if childLen == 32:
doAssert(sizeof(NodeHash) == 32)
context.childQueue.add((childPath, nodeRlp.read(NodeHash), true))
elif childLen == 0:
# The child of an extension node cannot be empty, unlike the children of a
# branch node. The child must be present.
childError "Extension node child (RLP element 1) is zero length"
elif childLen < 32:
# TODO: In rare cases, which are cryptographically unlikely, the child
# RLP can be < 32 bytes. Because this is hard to test, let's make < 32
# exit the program for now to see if any appear on Mainnet.
doAssert childLen == 32
sp.parseTrieNode(childPath, NodeHash(), nodeRlp.toBytes, true, context)
else:
childError "Extension node child (RLP element 1) has length > 32 bytes"
proc parseExtensionOrLeaf(sp: SyncPeer,
nodePath: InteriorPath, nodeHash: NodeHash,
nodeBytes: Blob, nodeRlp: var Rlp,
fromExtension: bool,
context: var TrieNodeParseContext
) {.raises: [Defect, RlpError].} =
## Parse a trie extension node or leaf node. The caller has already checked
## it is a list of 2 elements, but nothing else.
# Check and parse prefix digits RLP.
# List elements were already counted before calling here, so we don't
# need a `nodeRlp.hasData` "has no bytes" check.
if not nodeRlp.isBlob:
nodeError "Extension prefix or leaf node path suffix (RLP element 0) is not a blob"
return
# TODO: Prefix can be empty according to `hexPrefixDecode`. Check that.
# TODO: `blobLen` can throw if there's an encoded number larger than our
# types can represent. This is untrusted data, so we must handle that case.
let prefixBytesLen = nodeRlp.blobLen
if prefixBytesLen == 0:
nodeError "Extension or leaf node prefix (RLP element 0) is zero length"
return
let prefixBytes = nodeRlp.toBytes
let firstByte = prefixBytes[0]
let oddLen = (firstByte.int and 0x10) shr 4
let isLeaf = (firstByte and 0x20)
# Check no bits are set that shouldn't be, to verify canonical encoding.
# Upper 2 bits are never set. If the prefix is even length, the extra
# nibble in the first byte must be zero.
if (firstByte and (if oddLen == 0: 0xcf else: 0xc0)) != 0:
if isLeaf != 0:
nodeError "Leaf node path suffix (RLP element 0) starts with invalid byte",
invalidByte=[firstByte].toHex
else:
nodeError "Extension node prefix (RLP element 0) starts with invalid byte",
invalidByte=[firstByte].toHex
return
# In the canonical trie encoding, an extension node's child is not allowed to
# be a leaf or another extension node. This check is done here, instead of
# earlier, to give a more informative message about leaf versus extension.
if fromExtension:
if isLeaf != 0:
nodeError "Leaf node is child of an extension node"
else:
nodeError "Extension node is child of another extension node"
return
# Check child depth before making `childPath`, as the type has limited length.
# The strange arithmetic where we don't just add `prefixLen + depth` is to
# rule out numeric overflow. This is also why we don't include `childDepth
# = prefixLen + depth` in the error messages.
let prefixLen = (prefixBytesLen * 2) - 2 + oddLen
if prefixLen > InteriorPath.maxDepth - nodePath.depth:
if isLeaf != 0:
nodeError "Leaf node path suffix takes depth past maximum",
suffixLen=prefixLen, maxDepth=InteriorPath.maxDepth
elif nodePath.depth >= InteriorPath.maxDepth:
nodeError "Extension node at maximum depth",
maxDepth=InteriorPath.maxDepth
else:
# In the canonical trie encoding, an extension node prefix is not allowed
# to take depth up to exactly the maximum either. That would mean the
# child would have to be a leaf, and a leaf cannot be the child of an
# extension node. So we could error on `>= max` here. But we choose `>`
# to allow the traversal to continue and get a more detailed error from
# the child node.
nodeError "Extension node prefix takes depth past maximum",
prefixLen, maxDepth=InteriorPath.maxDepth
return
var childPath = nodePath
if oddLen != 0:
childPath.add(firstByte and 0x0f)
for i in 1 ..< prefixBytesLen:
childPath.addPair(prefixBytes[i])
nodeRlp.skipElem()
if isLeaf != 0:
sp.parseLeafValue(nodePath, nodeHash, nodeBytes, nodeRlp,
childPath, context)
else:
sp.parseExtensionChild(nodePath, nodeHash, nodeBytes, nodeRlp,
childPath, context)
proc parseBranchNode(sp: SyncPeer,
nodePath: InteriorPath, nodeHash: NodeHash,
nodeBytes: Blob, nodeRlp: var Rlp,
context: var TrieNodeParseContext
) {.raises: [Defect, RlpError].} =
## Parse a trie branch node. The caller has already checked it is a list
## of 17 elements, but nothing else.
# Check the length before making `childPath`, as that has a maximum length.
if nodePath.depth >= InteriorPath.maxDepth:
nodeError "Branch node at maximum depth",
childDepth=(nodePath.depth + 1), maxDepth=InteriorPath.maxDepth
return
let queueLenBefore = context.childQueue.len
var childPath = nodePath
for i in 0 .. 15:
# List elements were already counted before this loop, so we don't
# need a `nodeRlp.hasData` "has no bytes" check.
if not nodeRlp.isBlob:
childPath.add(i.byte)
childError "Branch node child (RLP element i in 0..15) is not a blob", i
return
# TODO: `blobLen` can throw if there's an encoded number larger than our
# types can represent. This is untrusted data, so we must handle that case.
let childLen = nodeRlp.blobLen
if childLen == 0:
# Quick version of `nodeRlp.skipElem()` for zero-length blob.
inc nodeRlp.position
continue
childPath.add(i.byte)
if childLen == 32:
doAssert(sizeof(NodeHash) == 32)
context.childQueue.add((childPath, nodeRlp.read(NodeHash), false))
elif childLen < 32:
# TODO: In rare cases, which are cryptographically unlikely, the child
# RLP can be < 32 bytes. Because this is hard to test, let's make < 32
# exit the program for now to see if any appear on Mainnet.
doAssert childLen == 32
sp.parseTrieNode(childPath, NodeHash(), nodeRlp.toBytes, false, context)
nodeRlp.skipElem()
else:
childError "Branch node child (RLP element i in 0..15) has length > 32 bytes", i
return
childPath.pop()
# List elements were already counted before this loop, so we don't
# need a `nodeRlp.hasData` "has no bytes" check for the value item.
if not nodeRlp.isBlob:
nodeError "Branch node value (RLP element 16) is not a blob"
return
# TODO: `blobLen` can throw if there's an encoded number larger than our
# types can represent. This is untrusted data, so we must handle that case.
let valueLen = nodeRlp.blobLen
if valueLen != 0:
nodeError "Branch node value (RLP element 16) has non-zero length",
valueLen, valueMaxLen=0
return
# In the canonical trie encoding, there must be at least 2 child branches.
let branchCount = context.childQueue.len - queueLenBefore
if branchCount < 2:
# Undo what we queued, as this node has an error.
context.childQueue.setLen(queueLenBefore)
if branchCount == 0:
nodeError "Branch node has no child branches",
branches=branchCount, minBranches=2
else:
nodeError "Branch node has insufficient child branches",
branches=branchCount, minBranches=2
return
proc parseTrieNode*(sp: SyncPeer,
nodePath: InteriorPath, nodeHash: NodeHash, nodeBytes: Blob,
fromExtension: bool, context: var TrieNodeParseContext
) {.raises: [Defect, RlpError].} =
## Parse an Ethereum trie node of any kind. The caller provides bytes and
## context. Context includes trie path, node hash, and whether the parent
## node is an extension node.
##
## - Any child nodes to visit are added to `context.childQueue`, up to 16.
##
## - Any leaf nodes to decode are added to `context.leafQueue`, up to 7.
##
## - When multiple children or leaves are returned, they are guaranteed to
## be in increasing path order.
##
## - If there are any errors parsing or with basic trie structure, debug
## messages are logged and `context.errors` is incremented.
##
## - `nodePath` is used. Using the path, the correct depth of branch,
## extension and leaf nodes are checked by this parser. Child and leaf
## paths are calculated and stored in `childQueue` and `leafQueue`.
##
## - `nodeHash` is _not_ used to verify the bytes.
##
## The hash is only used for diagnostics. The caller should ensure the
## bytes are verified against the hash separately from calling this
## function, so there's no need to calculate the hash again here. Even
## with the hash verified, the bytes are still untrusted and must be parsed
## carefully, because the hash itself is from an untrusted source.
##
## - `fromExtension` is used. It is used to check that an extension node is
## not the parent of another extension node or a leaf, as this is not
## allowed in the canonical trie encoding. This value is `false` for the
## root node of a trie, otherwise it is the value stored in `childQueue`
## from parsing the parent node.
##
## - The `sp: SyncPeer` is like the hash, only used for diagnostics. When
## there is invalid data, it's useful to show where we got it from.
##
## - Some limited recursion is possible while parsing, because of how < 32
## byte nodes are encoded inside others. When recursion occurs, the number
## of child nodes will be 0, and the theoretical maximum number of leaf
## nodes is 7, although this is only possible in contrived test cases that
## disable path hashing. This is why `leafQueue` is a list.
##
## Recursion like this cannot occur with account nodes, because account
## leaves are too large for it in the canonical Ethereum encoding. It can
## occur with storage slots.
##
## - `RlpError` exceptions may occur on some well-crafted adversarial input
## due to the RLP reader implementation. They could be caught and treated
## like other parse errors, but they are not caught here, to avoid the
## overhead of `try..except` in the parser (which uses C `setjmp`). The
## caller should put `try..except RlpError` outside its trie node parsing
## loop, and call `parseTrieNodeError` when `RlpError` is caught.
##
## As a special case only used internally during recursion, if `nodeBytes` is
## shorter than 32, `nodeHash` is ignored, even for diagnostics. These
## internal nodes don't have a hash and can't be fetched over the network.
var nodeRlp = rlpFromBytes(nodeBytes)
if not nodeRlp.hasData:
nodeError "Trie node RLP has no bytes"
return
# This section is like calling `isList` and `listLen`, but it has more
# robust checks: It checks there are no extra bytes after the list, and
# the list header exactly matches the byte length of list contents.
# By using `enterList` it also sets up the sub-parsers to read `nodeRlp`.
var savePosition = nodeRlp.position
nodeRlp.skipElem()
let afterListPosition = nodeRlp.position
let hasExtraTrailingBytes = nodeRlp.hasData()
nodeRlp.position = savePosition
if not nodeRlp.enterList():
nodeError "Trie node RLP is not a list"
return
savePosition = nodeRlp.position
if hasExtraTrailingBytes:
nodeError "Trie node RLP has extra bytes after the list"
return
var nodeListLen = 0
while nodeRlp.position < afterListPosition:
inc nodeListLen
nodeRlp.skipElem()
if nodeRlp.position != afterListPosition:
nodeError "Trie node RLP list container has incorrect length for contents"
return
nodeRlp.position = savePosition
if nodeListLen == 2:
sp.parseExtensionOrLeaf(nodePath, nodeHash, nodeBytes, nodeRlp,
fromExtension, context)
elif nodeListLen == 17:
sp.parseBranchNode(nodePath, nodeHash, nodeBytes, nodeRlp, context)
else:
nodeError "Trie node RLP is not a list with 2 or 17 items",
listLen=nodeListLen
return
proc parseTrieNodeError*(sp: SyncPeer, nodePath: InteriorPath,
nodeHash: NodeHash, nodeBytes: Blob,
context: var TrieNodeParseContext,
exception: ref RlpError) =
## Handle an `RlpError` exception and update `context.errors`. This should
## be called if `parseTrieNode` raises any exception derived from `RlpError`.
## This is a separate function is so that many `parseTrieNode` calls can be
## made in a loop, with the `try..except` lifted outside the loop.
try:
nodeError "Exception from RLP parser", exception=exception.msg
except RlpError as e:
# If we get `RlpError` from `nodeError` it means `inspect` failed.
# That should never happen, so raise `Defect` to terminate the program.
raise newException(Defect, "Exception from RLP inspector", e)

View File

@ -7,11 +7,12 @@
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
std/times,
eth/common/eth_types, stint, options, stew/byteutils, chronicles,
std/[options, times],
chronicles,
eth/[common/eth_types, trie/db],
stint,
".."/[vm_types, vm_state, vm_gas_costs, forks, constants],
".."/[db/db_chain, db/accounts_cache, transaction], eth/trie/db,
".."/[chain_config, rpc/hexstrings],
".."/[db/db_chain, db/accounts_cache, chain_config],
./call_common
type