Normalise sync handler prototypes (#1087)

* Use type name eth and snap (rather than snap1)

* Prettified snap/eth handler trace messages

* Regrouped sync sources

details:
  Snap storage related sources are moved to common directory.
  Option --new-sync renamed to --snap-sync

also:
  Normalised logging for secondary/non-protocol handlers.

* Merge protocol wrapper files => protocol.nim

details:
  Merge wrapper sync/protocol_ethxx.nim and sync/protocol_snapxx.nim
  into single file snap/protocol.nim

* Comments cosmetics

* Similar start logic for blockchain_sync.nim and sync/snap.nim

* Renamed p2p/blockchain_sync.nim -> sync/fast.nim
This commit is contained in:
Jordan Hrycaj 2022-05-13 17:30:10 +01:00 committed by GitHub
parent 4e8c5f292e
commit 62d31d6f1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 493 additions and 716 deletions

View File

@ -18,7 +18,7 @@ import
db/db_chain, db/db_chain,
rpc/p2p, rpc/p2p,
rpc/engine_api, rpc/engine_api,
sync/protocol_ethxx, sync/protocol,
utils/tx_pool utils/tx_pool
], ],
../../../tests/test_helpers, ../../../tests/test_helpers,

View File

@ -10,7 +10,7 @@
import import
std/[os, json, times], std/[os, json, times],
eth/[p2p, trie/db], ../../../nimbus/db/db_chain, eth/[p2p, trie/db], ../../../nimbus/db/db_chain,
../../../nimbus/sync/protocol_ethxx, ../../../nimbus/sync/protocol,
../../../nimbus/[genesis, config, conf_utils, context], ../../../nimbus/[genesis, config, conf_utils, context],
../../../nimbus/graphql/ethapi, ../../../tests/test_helpers, ../../../nimbus/graphql/ethapi, ../../../tests/test_helpers,
../../../nimbus/utils/tx_pool, ../../../nimbus/utils/tx_pool,

View File

@ -15,7 +15,7 @@ import
stew/results, stew/results,
chronos, json_rpc/[rpcserver, rpcclient], chronos, json_rpc/[rpcserver, rpcclient],
../../../nimbus/db/db_chain, ../../../nimbus/db/db_chain,
../../../nimbus/sync/protocol_ethxx, ../../../nimbus/sync/protocol,
../../../nimbus/[config, context, genesis, utils/tx_pool], ../../../nimbus/[config, context, genesis, utils/tx_pool],
../../../nimbus/rpc/[common, p2p, debug], ../../../nimbus/rpc/[common, p2p, debug],
../../../tests/test_helpers, ../../../tests/test_helpers,

View File

@ -150,6 +150,11 @@ type
abbr : "p" abbr : "p"
name: "prune-mode" }: PruneMode name: "prune-mode" }: PruneMode
snapSync* {.
desc: "Enable experimental new sync algorithms"
defaultValue: false
name: "snap-sync" .}: bool
importKey* {. importKey* {.
desc: "Import unencrypted 32 bytes hex private key from a file" desc: "Import unencrypted 32 bytes hex private key from a file"
defaultValue: "" defaultValue: ""
@ -351,11 +356,6 @@ type
defaultValueDesc: $ProtocolFlag.Eth defaultValueDesc: $ProtocolFlag.Eth
name: "protocols" .}: seq[string] name: "protocols" .}: seq[string]
newSync* {.
desc: "Enable experimental new sync algorithms"
defaultValue: false
name: "new-sync" .}: bool
case cmd* {. case cmd* {.
command command
defaultValue: NimbusCmd.noCommand }: NimbusCmd defaultValue: NimbusCmd.noCommand }: NimbusCmd

View File

@ -26,10 +26,9 @@ import
"."/[conf_utils, config, constants, context, genesis, sealer, utils, version], "."/[conf_utils, config, constants, context, genesis, sealer, utils, version],
./db/[storage_types, db_chain, select_backend], ./db/[storage_types, db_chain, select_backend],
./graphql/ethapi, ./graphql/ethapi,
./p2p/[chain, blockchain_sync], ./p2p/[chain, clique/clique_desc, clique/clique_sealer],
./p2p/clique/[clique_desc, clique_sealer],
./rpc/[common, debug, engine_api, jwt_auth, p2p], ./rpc/[common, debug, engine_api, jwt_auth, p2p],
./sync/[protocol_ethxx, protocol_snapxx, newsync], ./sync/[fast, protocol, snap],
./utils/tx_pool ./utils/tx_pool
when defined(evmc_enabled): when defined(evmc_enabled):
@ -56,7 +55,6 @@ type
ctx: EthContext ctx: EthContext
chainRef: Chain chainRef: Chain
txPool: TxPoolRef txPool: TxPoolRef
syncLoop: Future[SyncStatus]
networkLoop: Future[void] networkLoop: Future[void]
proc importBlocks(conf: NimbusConf, chainDB: BaseChainDB) = proc importBlocks(conf: NimbusConf, chainDB: BaseChainDB) =
@ -123,8 +121,9 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
# Add protocol capabilities based on protocol flags # Add protocol capabilities based on protocol flags
if ProtocolFlag.Eth in protocols: if ProtocolFlag.Eth in protocols:
nimbus.ethNode.addCapability eth nimbus.ethNode.addCapability protocol.eth
nimbus.ethNode.addCapability snap1 if conf.snapSync:
nimbus.ethNode.addCapability protocol.snap
if ProtocolFlag.Les in protocols: if ProtocolFlag.Les in protocols:
nimbus.ethNode.addCapability les nimbus.ethNode.addCapability les
@ -136,9 +135,9 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
nimbus.chainRef.extraValidation = 0 < verifyFrom nimbus.chainRef.extraValidation = 0 < verifyFrom
nimbus.chainRef.verifyFrom = verifyFrom nimbus.chainRef.verifyFrom = verifyFrom
# Early-initialise "--new-sync" before starting any network connections. # Early-initialise "--snap-sync" before starting any network connections.
if ProtocolFlag.Eth in protocols and conf.newSync: if ProtocolFlag.Eth in protocols and conf.snapSync:
newSyncEarly(nimbus.ethNode) SnapSyncCtx.new(nimbus.ethNode).start
# Connect directly to the static nodes # Connect directly to the static nodes
let staticPeers = conf.getStaticPeers() let staticPeers = conf.getStaticPeers()
@ -149,7 +148,7 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
if conf.maxPeers > 0: if conf.maxPeers > 0:
nimbus.networkLoop = nimbus.ethNode.connectToNetwork( nimbus.networkLoop = nimbus.ethNode.connectToNetwork(
enableDiscovery = conf.discovery != DiscoveryType.None, enableDiscovery = conf.discovery != DiscoveryType.None,
waitForPeers = not conf.newSync) waitForPeers = not conf.snapSync)
proc localServices(nimbus: NimbusNode, conf: NimbusConf, proc localServices(nimbus: NimbusNode, conf: NimbusConf,
chainDB: BaseChainDB, protocols: set[ProtocolFlag]) = chainDB: BaseChainDB, protocols: set[ProtocolFlag]) =
@ -325,11 +324,8 @@ proc start(nimbus: NimbusNode, conf: NimbusConf) =
localServices(nimbus, conf, chainDB, protocols) localServices(nimbus, conf, chainDB, protocols)
if ProtocolFlag.Eth in protocols and conf.maxPeers > 0: if ProtocolFlag.Eth in protocols and conf.maxPeers > 0:
# TODO: temp code until the CLI/RPC interface is fleshed out if not conf.snapSync:
if conf.newSync: FastSyncCtx.new(nimbus.ethNode).start
newSync()
else:
nimbus.syncLoop = nimbus.ethNode.fastBlockchainSync()
if nimbus.state == Starting: if nimbus.state == Starting:
# it might have been set to "Stopping" with Ctrl+C # it might have been set to "Stopping" with Ctrl+C

View File

@ -3,99 +3,7 @@
The comments are collected in chronological order, oldest first (as opposed to The comments are collected in chronological order, oldest first (as opposed to
squash merge order which is oldest last.) squash merge order which is oldest last.)
## Sync: Rapidly find and track peer canonical heads If a similar comment is found in a source file it was deleted here.
First component of new sync approach.
This module fetches and tracks the canonical chain head of each connected
peer. (Or in future, each peer we care about; we won't poll them all so
often.)
This is for when we aren't sure of the block number of a peer's canonical
chain head. Most of the time, after finding which block, it quietly polls
to track small updates to the "best" block number and hash of each peer.
But sometimes that can get out of step. If there has been a deeper reorg
than our tracking window, or a burst of more than a few new blocks, network
delays, downtime, or the peer is itself syncing. Perhaps we stopped Nimbus
and restarted a while later, e.g. suspending a laptop or Control-Z. Then
this will catch up. It is even possible that the best hash the peer gave us
in the `Status` handshake has disappeared by the time we query for the
corresponding block number, so we start at zero.
The steps here perform a robust and efficient O(log N) search to rapidly
converge on the new best block if it's moved out of the polling window no
matter where it starts, confirm the peer's canonical chain head boundary,
then track the peer's chain head in real-time by polling. The method is
robust to peer state changes at any time.
The purpose is to:
- Help with finding a peer common chain prefix ("fast sync pivot") in a
consistent, fast and explicit way.
- Catch up quickly after any long pauses of network downtime, program not
running, or deep chain reorgs.
- Be able to display real-time peer states, so they are less mysterious.
- Tell the beam/snap/trie sync processes when to start and what blocks to
fetch, and keep those fetchers in the head-adjacent window of the
ever-changing chain.
- Help the sync process bootstrap usefully when we only have one peer,
speculatively fetching and validating what data we can before we have more
peers to corroborate the consensus.
- Help detect consensus failures in the network.
We cannot assume a peer's canonical chain stays the same or only gains new
blocks from one query to the next. There can be reorgs, including deep
reorgs. When a reorg happens, the best block number can decrease if the new
canonical chain is shorter than the old one, and the best block hash we
previously knew can become unavailable on the peer. So we must detect when
the current best block disappears and be able to reduce block number.
## Config: Add `--new-sync` option and use it
This option enables new blockchain sync and real-time consensus algorithms that
will eventually replace the old, very limited sync.
New sync is work in progress. It's included as an option rather than a code
branch, because it's more useful for testing this way, and must not conflict
anyway. It's off by default. Eventually this will become enabled by default
and the option will be removed.
## Tracing: New `traceGossips` category, tidy calls to other categories
- `traceGossips` has been added, because on some networks there are so many
transaction messages, it is difficult to see other activity.
- Adds a trace macro corresponding to each of the packet tracing categories
`traceTimeouts`, `traceNetworkErrors` and `tracePacketErrors`. Improves
readability of code using them, and API consistency.
## Sync: Move `tracePacket` etc into sync_types.nim
Move the templates `tracePacket`, `traceGossip` , `traceTimeout`,
`traceNetworkError` and `tracePacketError` from protocol_eth65 to
sync_types.
The reason for moving is they are also needed for `snap` protocol calls.
## Config: Add `--new-sync` option and use it
This option enables new blockchain sync and real-time consensus algorithms that
will eventually replace the old, very limited sync.
New sync is work in progress. It's included as an option rather than a code
branch, because it's more useful for testing this way, and must not conflict
anyway. It's off by default. Eventually this will become enabled by default
and the option will be removed.
## Sync: Chain head: Promote peer chain head updates to debug level ## Sync: Chain head: Promote peer chain head updates to debug level
@ -176,7 +84,6 @@ There are a number of issues with the simple implementation here:
can proceed using entirely local data from there. can proceed using entirely local data from there.
## Sync: Add `genesisStateRoot` for state syncing ## Sync: Add `genesisStateRoot` for state syncing
State syncing requires the `stateRoot` value of the selected block to sync to. State syncing requires the `stateRoot` value of the selected block to sync to.
@ -194,263 +101,3 @@ special code.
In order to exercise the state sync code and see how peers behave when block 0 In order to exercise the state sync code and see how peers behave when block 0
is selected, and avoid special code, use the genesis `stateRoot` found locally, is selected, and avoid special code, use the genesis `stateRoot` found locally,
and sync that state from peers like any other. and sync that state from peers like any other.
## Sync: New types `LeafPath`, `InteriorPath` and support functions
`InteriorPath` is a path to an interior node in an Ethereum hexary trie. This
is a sequence of 0 to 64 hex digits. 0 digits means the root node, and 64
digits means a leaf node whose path hasn't been converted to `LeafPath` yet.
`LeafPath` is a path to a leaf in an Ethereum hexary trie. Individually, each
leaf path is a hash, but rather than being the hash of the contents, it's the
hash of the item's address. Collectively, these hashes have some 256-bit
numerical properties: ordering, intervals and meaningful difference.
## Sync: Add `onGetNodeData`, `onNodeData` to `eth/65` protocol handler
These hooks allow new sync code to register to provide reply data or consume
incoming events without a circular import dependency involving `p2pProtocol`.
Without the hooks, the protocol file needs to import functions that consume
incoming network messages so the `p2pProtocol` can call them, and the functions
that produce outgoing network messages need to import the protocol file.
But related producer/consumer function pairs are typically located in the same
file because they are closely related. For example the producer of
`GetNodeData` and the consumer of `NodeData`.
In this specific case, we also need to break the `requestResponse` relationship
between `GetNodeData` and `NodeData` messages when pipelining.
There are other ways to accomplish this, but this way is most practical, and
it allows different protocol-using modules to coexist easily. When the hooks
aren't set, default behaviour is fine.
## Sync: Robust support for `GetNodeData` network calls
This module provides an async function to call `GetNodeData`, a request in
the Ethereum DevP2P/ETH network protocol. Parallel requests can be issued,
maintaining a pipeline.
Given a list of hashes, it returns a list of trie nodes or contract
bytecodes matching those hashes. The returned nodes may be any subset of
those requested, including an empty list. The returned nodes are not
necessarily in the same order as the request, so a mapping from request
items to node items is included. On timeout or error, `nil` is returned.
Only data passing hash verification is returned, so hashes don't need to be
verified again. No exceptions are raised, and no detail is returned about
timeouts or errors, but systematically formatted trace messages are output
if enabled, and show in detail if various events occur such as timeouts,
bad hashes, mixed replies, network errors, etc.
This tracks queued requests and individual request hashes, verifies received
node data hashes, and matches them against requests. When a peer replies in
same order as requests are sent, and each reply contains nodes in the same
order as requested, the matching process is efficient. It avoids storing
request details in a hash table when possible. If replies or nodes are out
of order, the process is still efficient but has to do a little more work.
Empty replies:
Empty replies are matched with requests using a queue draining technique.
After an empty reply is received, we temporarily pause further requests and
wait for more replies. After we receive all outstanding replies, we know
which requests the empty replies were for, and can complete those requests.
Eth/66 protocol:
Although Eth/66 simplifies by matching replies to requests, replies can still
have data out of order or missing, so hashes still need to be verified and
looked up. Much of the code here is still required for Eth/66.
References:
- [Ethereum Wire Protocol (ETH)](https://github.com/ethereum/devp2p/blob/master/caps/eth.md)
- [`GetNodeData` (0x0d)](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getnodedata-0x0d)
- [`NodeData` (0x0e)](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#nodedata-0x0e)
## Sync: Robustly parse trie nodes from network untrusted data
This module parses hexary trie nodes as used by Ethereum from data received
over the network. The data is untrusted, and a non-canonical RLP encoding
of the node must be rejected, so it is parsed carefully.
The caller provides bytes and context. Context includes node hash, trie
path, and a boolean saying if this trie node is child of an extension node.
The result of parsing is up to 16 child node hashes to follow, or up to 7
leaf nodes to decode.
The caller should ensure the bytes are verified against the hash before
calling this parser. Even though they pass the hash, they are still
untrusted bytes that must be parsed carefully, because the hash itself is
from an untrusted source.
`RlpError` exceptions may occur on some well-crafted adversarial input
due to the RLP reader implementation. They could be trapped and treated
like other parse errors, but they are not, to avoid the overhead of
`try..except` in the parser (which uses C `setjmp`). The caller should
put `try..except RlpError` outside its trie node parsing loop.
### Path range metadata benefits
Because data is handled in path ranges, this allows a compact metadata
representation of what data is stored locally and what isn't, compared with
the size of a representation of partially completed trie traversal with
`eth` `GetNodeData`. Due to the smaller metadata, after aborting a partial
sync and restarting, it is possible to resume quickly, without waiting for
the very slow local database scan associated with older versions of Geth.
However, Nimbus's sync method uses this principle as inspiration to
obtain similar metadata benefits whichever network protocol is used.
### Distributed hash table (DHT) building block
Although `snap` was designed for bootstrapping clients with the entire
Ethereum state, it is well suited to fetching only a subset of path ranges.
This may be useful for bootstrapping distributed hash tables (DHTs).
### Remote state and Beam sync benefits
`snap` was not intended for Beam sync, or "remote state on demand", used by
transactions executing locally that fetch state from the network instead of
local storage.
Even so, as a lucky accident `snap` allows individual states to be fetched
in fewer network round trips than `eth`. Often a single round trip,
compared with about 10 round trips per account query over `eth`. This is
because `eth` `GetNodeData` requires a trie traversal chasing hashes
sequentially, while `snap` `GetTrieNode` trie traversal can be done with
predictable paths.
Therefore `snap` can be used to accelerate remote states and Beam sync.
### Performance benefits
`snap` is used for much higher performance transfer of the entire Ethereum
execution state (accounts, storage, bytecode) compared with hexary trie
traversal using `eth` `GetNodeData`.
It improves both network and local storage performance. The benefits are
substantial, and summarised here:
- [Ethereum Snapshot Protocol (SNAP) - Expected results]
(https://github.com/ethereum/devp2p/blob/master/caps/snap.md)
- [Geth v1.10.0 - Snap sync]
(https://blog.ethereum.org/2021/03/03/geth-v1-10-0/#snap-sync)
In the Snap sync model, local storage benefits require clients to adopt a
different representation of Ethereum state than the trie storage that Geth
(and most clients) traditionally used, and still do in archive mode,
However, Nimbus's sync method obtains similar local storage benefits
whichever network protocol is used. Nimbus uses `snap` protocol because
it is a more efficient network protocol.
## Sync: Changes to `snap/1` protocol to match Geth parameters
The `snap/1` specification doesn't match reality. If we implement the
protocol as specified, Geth drops the peer connection. We must do as Geth
expects.
- `GetAccountRanges` and `GetStorageRanges` take parameters `origin` and
`limit`, instead of a single `startingHash` parameter in the
specification. `origin` and `limit` are 256-bit paths representing the
starting hash and ending trie path, both inclusive.
- If the `snap/1` specification is followed (omitting `limit`), Geth 1.10
disconnects immediately so we must follow this deviation.
- Results from either call may include one item with path `>= limit`. Geth
fetches data from its internal database until it reaches this condition or
the bytes threshold, then replies with what it fetched. Usually there is
no item at the exact path `limit`, so there is one after.
## Sync: Ethereum Snapshot Protocol (SNAP), version 1
This patch adds the `snap/1` protocol, documented at:
- [Ethereum Snapshot Protocol (SNAP)]
(https://github.com/ethereum/devp2p/blob/master/caps/snap.md).
This is just the protocol handlers, not the sync algorithm.
## Sync: Changes to `snap/1` protocol to match Geth `GetStorageRanges`
The main part of this part is to add a comment documenting quirky behaviour of
`GetStorageRanges` with Geth, and workarounds for the missing right-side proof.
The code change is smaller, but it does change the type of parameters `origin`
and limit` to `GetStorageRanges`. Trace messages are updated accordingly.
When calling a Geth peer with `GetStorageRanges`:
- Parameters `origin` and `limit` may each be empty blobs, which mean "all
zeros" (0x00000...) or "no limit" (0xfffff...) respectively.
(Blobs shorter than 32 bytes can also be given, and they are extended with
zero bytes; longer than 32 bytes can be given and are truncated, but this
is Geth being too accepting, and shouldn't be used.)
- In the `slots` reply, the last account's storage list may be empty even if
that account has non-empty storage.
This happens when the bytes threshold is reached just after finishing
storage for the previous account, or when `origin` is greater than the
first account's last storage slot. When either of these happens, `proof`
is non-empty. In the case of `origin` zero or empty, the non-empty proof
only contains the left-side boundary proof, because it meets the condition
for omitting the right-side proof described in the next point.
- In the `proof` reply, the right-side boundary proof is only included if
the last returned storage slot has non-zero path and `origin != 0`, or if
the result stops due to reaching the bytes threshold.
Because there's only one proof anyway if left-side and right-side are the
same path, this works out to mean the right-side proof is omitted in cases
where `origin == 0` and the result stops at a slot `>= limit` before
reaching the bytes threshold.
Although the specification doesn't say anything about `limit`, this is
against the spirit of the specification rule, which says the right-side
proof is always included if the last returned path differs from the
starting hash.
The omitted right-side proof can cause problems when using `limit`.
In other words, when doing range queries, or merging results from
pipelining where different `stateRoot` hashes are used as time progresses.
Workarounds:
- Fetch the proof using a second `GetStorageRanges` query with non-zero
`origin` (perhaps equal to `limit`; use `origin = 1` if `limit == 0`).
- Avoid the condition by using `origin >= 1` when using `limit`.
- Use trie node traversal (`snap` `GetTrieNodes` or `eth` `GetNodeData`)
to obtain the omitted proof.
- When multiple accounts are requested with `origin > 0`, only one account's
storage is returned. There is no point requesting multiple accounts with
`origin > 0`. (It might be useful if it treated `origin` as applying to
only the first account, but it doesn't.)
- When multiple accounts are requested with non-default `limit` and
`origin == 0`, and the first account result stops at a slot `>= limit`
before reaching the bytes threshold, storage for the other accounts in the
request are returned as well. The other accounts are not limited by
`limit`, only the bytes threshold. The right-side proof is omitted from
`proof` when this happens, because this is the same condition as described
earlier for omitting the right-side proof. (It might be useful if it
treated `origin` as applying to only the first account and `limit` to only
the last account, but it doesn't.)

View File

@ -1,16 +1,21 @@
# nim-eth # nim-eth
# Copyright (c) 2018-2021 Status Research & Development GmbH # Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of # Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * MIT license (license terms in the root directory or at
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # https://opensource.org/licenses/MIT).
# at your option. This file may not be copied, modified, or distributed except according to those terms. # * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import import
std/[sets, options, random, hashes, sequtils], std/[sets, options, random, hashes, sequtils],
chronos, chronicles, chronicles,
../sync/protocol_ethxx, chronos,
eth/common/eth_types, eth/[common/eth_types, p2p],
eth/[p2p, p2p/private/p2p_types, p2p/rlpx, p2p/peer_pool] eth/p2p/[private/p2p_types, peer_pool],
stew/byteutils,
"."/[protocol, trace_helper]
{.push raises:[Defect].} {.push raises:[Defect].}
@ -19,10 +24,10 @@ const
# number of peers before syncing # number of peers before syncing
type type
SyncStatus* = enum #SyncStatus = enum
syncSuccess # syncSuccess
syncNotEnoughPeers # syncNotEnoughPeers
syncTimeOut # syncTimeOut
BlockchainSyncDefect* = object of Defect BlockchainSyncDefect* = object of Defect
## Catch and relay exception ## Catch and relay exception
@ -40,7 +45,7 @@ type
headers: seq[BlockHeader] headers: seq[BlockHeader]
bodies: seq[BlockBody] bodies: seq[BlockBody]
SyncContext = ref object FastSyncCtx* = ref object
workQueue: seq[WantedBlocks] workQueue: seq[WantedBlocks]
endBlockNumber: BlockNumber endBlockNumber: BlockNumber
finalizedBlock: BlockNumber # Block which was downloaded and verified finalizedBlock: BlockNumber # Block which was downloaded and verified
@ -66,7 +71,7 @@ proc endIndex(b: WantedBlocks): BlockNumber =
result = b.startIndex result = b.startIndex
result += (b.numBlocks - 1).toBlockNumber result += (b.numBlocks - 1).toBlockNumber
proc availableWorkItem(ctx: SyncContext): int = proc availableWorkItem(ctx: FastSyncCtx): int =
var maxPendingBlock = ctx.finalizedBlock # the last downloaded & processed var maxPendingBlock = ctx.finalizedBlock # the last downloaded & processed
trace "queue len", length = ctx.workQueue.len trace "queue len", length = ctx.workQueue.len
result = -1 result = -1
@ -110,7 +115,7 @@ proc availableWorkItem(ctx: SyncContext): int =
numBlocks = maxHeadersFetch numBlocks = maxHeadersFetch
ctx.workQueue[result] = WantedBlocks(startIndex: nextRequestedBlock, numBlocks: numBlocks.uint, state: Initial) ctx.workQueue[result] = WantedBlocks(startIndex: nextRequestedBlock, numBlocks: numBlocks.uint, state: Initial)
proc persistWorkItem(ctx: SyncContext, wi: var WantedBlocks): ValidationResult proc persistWorkItem(ctx: FastSyncCtx, wi: var WantedBlocks): ValidationResult
{.gcsafe, raises:[Defect,CatchableError].} = {.gcsafe, raises:[Defect,CatchableError].} =
catchException("persistBlocks"): catchException("persistBlocks"):
result = ctx.chain.persistBlocks(wi.headers, wi.bodies) result = ctx.chain.persistBlocks(wi.headers, wi.bodies)
@ -124,7 +129,7 @@ proc persistWorkItem(ctx: SyncContext, wi: var WantedBlocks): ValidationResult
wi.headers = @[] wi.headers = @[]
wi.bodies = @[] wi.bodies = @[]
proc persistPendingWorkItems(ctx: SyncContext): (int, ValidationResult) proc persistPendingWorkItems(ctx: FastSyncCtx): (int, ValidationResult)
{.gcsafe, raises:[Defect,CatchableError].} = {.gcsafe, raises:[Defect,CatchableError].} =
var nextStartIndex = ctx.finalizedBlock + 1 var nextStartIndex = ctx.finalizedBlock + 1
var keepRunning = true var keepRunning = true
@ -153,7 +158,7 @@ proc persistPendingWorkItems(ctx: SyncContext): (int, ValidationResult)
ctx.hasOutOfOrderBlocks = hasOutOfOrderBlocks ctx.hasOutOfOrderBlocks = hasOutOfOrderBlocks
proc returnWorkItem(ctx: SyncContext, workItem: int): ValidationResult proc returnWorkItem(ctx: FastSyncCtx, workItem: int): ValidationResult
{.gcsafe, raises:[Defect,CatchableError].} = {.gcsafe, raises:[Defect,CatchableError].} =
let wi = addr ctx.workQueue[workItem] let wi = addr ctx.workQueue[workItem]
let askedBlocks = wi.numBlocks.int let askedBlocks = wi.numBlocks.int
@ -192,15 +197,7 @@ proc returnWorkItem(ctx: SyncContext, workItem: int): ValidationResult
receivedBlocks receivedBlocks
return ValidationResult.Error return ValidationResult.Error
proc newSyncContext(chain: AbstractChainDB, peerPool: PeerPool): SyncContext proc handleLostPeer(ctx: FastSyncCtx) =
{.gcsafe, raises:[Defect,CatchableError].} =
new result
result.chain = chain
result.peerPool = peerPool
result.trustedPeers = initHashSet[Peer]()
result.finalizedBlock = chain.getBestBlockHeader().blockNumber
proc handleLostPeer(ctx: SyncContext) =
# TODO: ask the PeerPool for new connections and then call # TODO: ask the PeerPool for new connections and then call
# `obtainBlocksFromPeer` # `obtainBlocksFromPeer`
discard discard
@ -224,7 +221,7 @@ proc getBestBlockNumber(p: Peer): Future[BlockNumber] {.async.} =
count=latestBlock.get.headers.len, count=latestBlock.get.headers.len,
blockNumber=(if latestBlock.get.headers.len > 0: $result else: "missing") blockNumber=(if latestBlock.get.headers.len > 0: $result else: "missing")
proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} = proc obtainBlocksFromPeer(syncCtx: FastSyncCtx, peer: Peer) {.async.} =
# Update our best block number # Update our best block number
try: try:
let bestBlockNumber = await peer.getBestBlockNumber() let bestBlockNumber = await peer.getBestBlockNumber()
@ -356,7 +353,7 @@ proc peersAgreeOnChain(a, b: Peer): Future[bool] {.async.} =
tracePacket "<< Got reply eth.BlockHeaders (0x04)", peer=a, tracePacket "<< Got reply eth.BlockHeaders (0x04)", peer=a,
count=latestBlock.get.headers.len, blockNumber count=latestBlock.get.headers.len, blockNumber
proc randomTrustedPeer(ctx: SyncContext): Peer = proc randomTrustedPeer(ctx: FastSyncCtx): Peer =
var k = rand(ctx.trustedPeers.len - 1) var k = rand(ctx.trustedPeers.len - 1)
var i = 0 var i = 0
for p in ctx.trustedPeers: for p in ctx.trustedPeers:
@ -364,7 +361,7 @@ proc randomTrustedPeer(ctx: SyncContext): Peer =
if i == k: return if i == k: return
inc i inc i
proc startSyncWithPeer(ctx: SyncContext, peer: Peer) {.async.} = proc startSyncWithPeer(ctx: FastSyncCtx, peer: Peer) {.async.} =
trace "start sync", peer, trustedPeers = ctx.trustedPeers.len trace "start sync", peer, trustedPeers = ctx.trustedPeers.len
if ctx.trustedPeers.len >= minPeersToStartSync: if ctx.trustedPeers.len >= minPeersToStartSync:
# We have enough trusted peers. Validate new peer against trusted # We have enough trusted peers. Validate new peer against trusted
@ -409,7 +406,7 @@ proc startSyncWithPeer(ctx: SyncContext, peer: Peer) {.async.} =
asyncSpawn ctx.obtainBlocksFromPeer(p) asyncSpawn ctx.obtainBlocksFromPeer(p)
proc onPeerConnected(ctx: SyncContext, peer: Peer) = proc onPeerConnected(ctx: FastSyncCtx, peer: Peer) =
trace "New candidate for sync", peer trace "New candidate for sync", peer
try: try:
let f = ctx.startSyncWithPeer(peer) let f = ctx.startSyncWithPeer(peer)
@ -425,21 +422,10 @@ proc onPeerConnected(ctx: SyncContext, peer: Peer) =
debug "Exception in startSyncWithPeer()", exc = e.name, err = e.msg debug "Exception in startSyncWithPeer()", exc = e.name, err = e.msg
proc onPeerDisconnected(ctx: SyncContext, p: Peer) = proc onPeerDisconnected(ctx: FastSyncCtx, p: Peer) =
trace "peer disconnected ", peer = p trace "peer disconnected ", peer = p
ctx.trustedPeers.excl(p) ctx.trustedPeers.excl(p)
proc startSync(ctx: SyncContext) =
var po: PeerObserver
po.onPeerConnected = proc(p: Peer) {.gcsafe.} =
ctx.onPeerConnected(p)
po.onPeerDisconnected = proc(p: Peer) {.gcsafe.} =
ctx.onPeerDisconnected(p)
po.setProtocol eth
ctx.peerPool.addObserver(ctx, po)
proc findBestPeer(node: EthereumNode): (Peer, DifficultyInt) = proc findBestPeer(node: EthereumNode): (Peer, DifficultyInt) =
var var
bestBlockDifficulty: DifficultyInt = 0.stuint(256) bestBlockDifficulty: DifficultyInt = 0.stuint(256)
@ -454,12 +440,29 @@ proc findBestPeer(node: EthereumNode): (Peer, DifficultyInt) =
result = (bestPeer, bestBlockDifficulty) result = (bestPeer, bestBlockDifficulty)
proc fastBlockchainSync*(node: EthereumNode): Future[SyncStatus] {.async.} = proc new*(T: type FastSyncCtx; ethNode: EthereumNode): T
## Code for the fast blockchain sync procedure: {.gcsafe, raises:[Defect,CatchableError].} =
## https://github.com/ethereum/wiki/wiki/Parallel-Block-Downloads FastSyncCtx(
## https://github.com/ethereum/go-ethereum/pull/1889 # workQueue: n/a
# TODO: This needs a better interface. Consider removing this function and # endBlockNumber: n/a
# exposing SyncCtx # hasOutOfOrderBlocks: n/a
var syncCtx = newSyncContext(node.chain, node.peerPool) chain: ethNode.chain,
syncCtx.startSync() peerPool: ethNode.peerPool,
trustedPeers: initHashSet[Peer](),
finalizedBlock: ethNode.chain.getBestBlockHeader.blockNumber)
proc start*(ctx: FastSyncCtx) =
## Code for the fast blockchain sync procedure:
## <https://github.com/ethereum/wiki/wiki/Parallel-Block-Downloads>_
## <https://github.com/ethereum/go-ethereum/pull/1889__
var po = PeerObserver(
onPeerConnected:
proc(p: Peer) {.gcsafe.} =
ctx.onPeerConnected(p),
onPeerDisconnected:
proc(p: Peer) {.gcsafe.} =
ctx.onPeerDisconnected(p))
po.setProtocol eth
ctx.peerPool.addObserver(ctx, po)
# End

View File

@ -8,5 +8,16 @@
# at your option. This file may not be copied, modified, or distributed # at your option. This file may not be copied, modified, or distributed
# except according to those terms. # except according to those terms.
import ./protocol/eth66 import
export eth66 ./protocol/eth66 as proto_eth,
./protocol/snap1 as proto_snap
export
proto_eth,
proto_snap
type
eth* = eth66
snap* = snap1
# End

View File

@ -2,19 +2,45 @@
# #
# Copyright (c) 2018-2021 Status Research & Development GmbH # Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed under either of # Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0) # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) # http://www.apache.org/licenses/LICENSE-2.0)
# at your option. This file may not be copied, modified, or distributed except according to those terms. # * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## This module implements Ethereum Wire Protocol version 66, `eth/66`. ## This module implements `eth/66`, the `Ethereum Wire Protocol version 66
## Specification: ## <https://github.com/ethereum/devp2p/blob/master/caps/eth.md>`_
## `eth/66 <https://github.com/ethereum/devp2p/blob/master/caps/eth.md>`_ ##
## Optional peply processor function hooks
## ---------------------------------------
##
## The `onGetNodeData` and `onNodeData` hooks allow new sync code to register
## for providing reply data or consume incoming events without a circular
## import dependency involving the `p2pProtocol`.
##
## Without the hooks, the protocol file needs to import functions that consume
## incoming network messages. So the `p2pProtocol` can call them, and the
## functions that produce outgoing network messages need to import the protocol
## file.
##
## But related producer/consumer function pairs are typically located in the
## very same file because they are closely related. For an example see the
## producer of `GetNodeData` and the consumer of `NodeData`.
##
## In this specific case, we need to split the `requestResponse` relationship
## between `GetNodeData` and `NodeData` messages when pipelining.
##
## Among others, this way is the most practical to acomplish the split
## implementation. It allows different protocol-using modules to coexist
## easily. When the hooks aren't set, default behaviour applies.
import import
chronos, stint, chronicles, stew/byteutils, macros, chronos, stint, chronicles, stew/byteutils, macros,
eth/[common/eth_types, rlp, p2p], eth/[common/eth_types, rlp, p2p],
eth/p2p/[rlpx, private/p2p_types, blockchain_utils], eth/p2p/[rlpx, private/p2p_types, blockchain_utils],
../sync_types ".."/[sync_types, trace_helper],
./pickeled_eth_tracers
export export
tracePackets, tracePacket, tracePackets, tracePacket,
@ -34,7 +60,7 @@ type
forkHash: array[4, byte] # The RLP encoding must be exactly 4 bytes. forkHash: array[4, byte] # The RLP encoding must be exactly 4 bytes.
forkNext: BlockNumber # The RLP encoding must be variable-length forkNext: BlockNumber # The RLP encoding must be variable-length
PeerState = ref object PeerState* = ref object
initialized*: bool initialized*: bool
bestBlockHash*: BlockHash bestBlockHash*: BlockHash
bestDifficulty*: DifficultyInt bestDifficulty*: DifficultyInt
@ -53,31 +79,9 @@ const
ethVersion* = 66 ethVersion* = 66
prettyEthProtoName* = "[eth/" & $ethVersion & "]" prettyEthProtoName* = "[eth/" & $ethVersion & "]"
func toHex*(hash: Hash256): string = hash.data.toHex
func traceStep*(request: BlocksRequest): string = p2pProtocol eth66(version = ethVersion,
var str = if request.reverse: "-" else: "+" rlpxName = "eth",
if request.skip < high(typeof(request.skip)):
return str & $(request.skip + 1)
return static($(high(typeof(request.skip)).u256 + 1))
# Shortcuts, print the protocol type as well (might be removed in future)
const protoInfo = prettyEthProtoName
template traceReceived(msg: static[string], args: varargs[untyped]) =
tracePacket "<< " & protoInfo & " Received " & msg, `args`
template traceDiscarding(msg: static[string], args: varargs[untyped]) =
tracePacket "<< " & protoInfo & " Discarding " & msg, `args`
template traceGossipDiscarding(msg: static[string], args: varargs[untyped]) =
traceGossip "<< " & protoInfo & " Discarding " & msg, `args`
template traceSending(msg: static[string], args: varargs[untyped]) =
tracePacket ">> " & protoInfo & " Sending " & msg, `args`
template traceReplying(msg: static[string], args: varargs[untyped]) =
tracePacket ">> " & protoInfo & " Replying " & msg, `args`
p2pProtocol eth(version = ethVersion,
peerState = PeerState, peerState = PeerState,
useRequestIds = true): useRequestIds = true):
@ -91,7 +95,7 @@ p2pProtocol eth(version = ethVersion,
forkHash: chainForkId.crc.toBytesBE, forkHash: chainForkId.crc.toBytesBE,
forkNext: chainForkId.nextFork.toBlockNumber) forkNext: chainForkId.nextFork.toBlockNumber)
traceSending "eth.Status (0x00) " & prettyEthProtoName, traceSending "Status (0x00) " & prettyEthProtoName,
peer, td=bestBlock.difficulty, peer, td=bestBlock.difficulty,
bestHash=bestBlock.blockHash.toHex, bestHash=bestBlock.blockHash.toHex,
networkId=network.networkId, networkId=network.networkId,
@ -139,7 +143,7 @@ p2pProtocol eth(version = ethVersion,
bestHash: BlockHash, bestHash: BlockHash,
genesisHash: BlockHash, genesisHash: BlockHash,
forkId: ForkId) = forkId: ForkId) =
traceReceived "eth.Status (0x00) [eth/" & $ethVersion & "]", traceReceived "Status (0x00)",
peer, td=totalDifficulty, peer, td=totalDifficulty,
bestHash=bestHash.toHex, bestHash=bestHash.toHex,
networkId, networkId,
@ -148,13 +152,13 @@ p2pProtocol eth(version = ethVersion,
# User message 0x01: NewBlockHashes. # User message 0x01: NewBlockHashes.
proc newBlockHashes(peer: Peer, hashes: openArray[NewBlockHashesAnnounce]) = proc newBlockHashes(peer: Peer, hashes: openArray[NewBlockHashesAnnounce]) =
traceGossipDiscarding "eth.NewBlockHashes (0x01)", traceGossipDiscarding "NewBlockHashes (0x01)",
peer, hashes=hashes.len peer, hashes=hashes.len
discard discard
# User message 0x02: Transactions. # User message 0x02: Transactions.
proc transactions(peer: Peer, transactions: openArray[Transaction]) = proc transactions(peer: Peer, transactions: openArray[Transaction]) =
traceGossipDiscarding "eth.Transactions (0x02)", traceGossipDiscarding "Transactions (0x02)",
peer, transactions=transactions.len peer, transactions=transactions.len
discard discard
@ -163,34 +167,34 @@ p2pProtocol eth(version = ethVersion,
proc getBlockHeaders(peer: Peer, request: BlocksRequest) = proc getBlockHeaders(peer: Peer, request: BlocksRequest) =
if tracePackets: if tracePackets:
if request.maxResults == 1 and request.startBlock.isHash: if request.maxResults == 1 and request.startBlock.isHash:
traceReceived "eth.GetBlockHeaders/Hash (0x03)", traceReceived "GetBlockHeaders/Hash (0x03)",
peer, blockHash=($request.startBlock.hash), count=1 peer, blockHash=($request.startBlock.hash), count=1
elif request.maxResults == 1: elif request.maxResults == 1:
traceReceived "eth.GetBlockHeaders (0x03)", traceReceived "GetBlockHeaders (0x03)",
peer, `block`=request.startBlock.number, count=1 peer, `block`=request.startBlock.number, count=1
elif request.startBlock.isHash: elif request.startBlock.isHash:
traceReceived "eth.GetBlockHeaders/Hash (0x03)", traceReceived "GetBlockHeaders/Hash (0x03)",
peer, firstBlockHash=($request.startBlock.hash), peer, firstBlockHash=($request.startBlock.hash),
count=request.maxResults, count=request.maxResults,
step=traceStep(request) step=traceStep(request)
else: else:
traceReceived "eth.GetBlockHeaders (0x03)", traceReceived "GetBlockHeaders (0x03)",
peer, firstBlock=request.startBlock.number, peer, firstBlock=request.startBlock.number,
count=request.maxResults, count=request.maxResults,
step=traceStep(request) step=traceStep(request)
if request.maxResults > uint64(maxHeadersFetch): if request.maxResults > uint64(maxHeadersFetch):
debug "eth.GetBlockHeaders (0x03) requested too many headers", debug "GetBlockHeaders (0x03) requested too many headers",
peer, requested=request.maxResults, max=maxHeadersFetch peer, requested=request.maxResults, max=maxHeadersFetch
await peer.disconnect(BreachOfProtocol) await peer.disconnect(BreachOfProtocol)
return return
let headers = peer.network.chain.getBlockHeaders(request) let headers = peer.network.chain.getBlockHeaders(request)
if headers.len > 0: if headers.len > 0:
traceReplying "with eth.BlockHeaders (0x04)", traceReplying "with BlockHeaders (0x04)",
peer, sent=headers.len, requested=request.maxResults peer, sent=headers.len, requested=request.maxResults
else: else:
traceReplying "EMPTY eth.BlockHeaders (0x04)", traceReplying "EMPTY BlockHeaders (0x04)",
peer, sent=0, requested=request.maxResults peer, sent=0, requested=request.maxResults
await response.send(headers) await response.send(headers)
@ -201,20 +205,20 @@ p2pProtocol eth(version = ethVersion,
requestResponse: requestResponse:
# User message 0x05: GetBlockBodies. # User message 0x05: GetBlockBodies.
proc getBlockBodies(peer: Peer, hashes: openArray[BlockHash]) = proc getBlockBodies(peer: Peer, hashes: openArray[BlockHash]) =
traceReceived "eth.GetBlockBodies (0x05)", traceReceived "GetBlockBodies (0x05)",
peer, hashes=hashes.len peer, hashes=hashes.len
if hashes.len > maxBodiesFetch: if hashes.len > maxBodiesFetch:
debug "eth.GetBlockBodies (0x05) requested too many bodies", debug "GetBlockBodies (0x05) requested too many bodies",
peer, requested=hashes.len, max=maxBodiesFetch peer, requested=hashes.len, max=maxBodiesFetch
await peer.disconnect(BreachOfProtocol) await peer.disconnect(BreachOfProtocol)
return return
let bodies = peer.network.chain.getBlockBodies(hashes) let bodies = peer.network.chain.getBlockBodies(hashes)
if bodies.len > 0: if bodies.len > 0:
traceReplying "with eth.BlockBodies (0x06)", traceReplying "with BlockBodies (0x06)",
peer, sent=bodies.len, requested=hashes.len peer, sent=bodies.len, requested=hashes.len
else: else:
traceReplying "EMPTY eth.BlockBodies (0x06)", traceReplying "EMPTY BlockBodies (0x06)",
peer, sent=0, requested=hashes.len peer, sent=0, requested=hashes.len
await response.send(bodies) await response.send(bodies)
@ -226,7 +230,7 @@ p2pProtocol eth(version = ethVersion,
proc newBlock(peer: Peer, bh: EthBlock, totalDifficulty: DifficultyInt) = proc newBlock(peer: Peer, bh: EthBlock, totalDifficulty: DifficultyInt) =
# (Note, needs to use `EthBlock` instead of its alias `NewBlockAnnounce` # (Note, needs to use `EthBlock` instead of its alias `NewBlockAnnounce`
# because either `p2pProtocol` or RLPx doesn't work with an alias.) # because either `p2pProtocol` or RLPx doesn't work with an alias.)
traceGossipDiscarding "eth.NewBlock (0x07)", traceGossipDiscarding "NewBlock (0x07)",
peer, totalDifficulty, peer, totalDifficulty,
blockNumber = bh.header.blockNumber, blockNumber = bh.header.blockNumber,
blockDifficulty = bh.header.difficulty blockDifficulty = bh.header.difficulty
@ -234,17 +238,17 @@ p2pProtocol eth(version = ethVersion,
# User message 0x08: NewPooledTransactionHashes. # User message 0x08: NewPooledTransactionHashes.
proc newPooledTransactionHashes(peer: Peer, hashes: openArray[TxHash]) = proc newPooledTransactionHashes(peer: Peer, hashes: openArray[TxHash]) =
traceGossipDiscarding "eth.NewPooledTransactionHashes (0x08)", traceGossipDiscarding "NewPooledTransactionHashes (0x08)",
peer, hashes=hashes.len peer, hashes=hashes.len
discard discard
requestResponse: requestResponse:
# User message 0x09: GetPooledTransactions. # User message 0x09: GetPooledTransactions.
proc getPooledTransactions(peer: Peer, hashes: openArray[TxHash]) = proc getPooledTransactions(peer: Peer, hashes: openArray[TxHash]) =
traceReceived "eth.GetPooledTransactions (0x09)", traceReceived "GetPooledTransactions (0x09)",
peer, hashes=hashes.len peer, hashes=hashes.len
traceReplying "EMPTY eth.PooledTransactions (0x10)", traceReplying "EMPTY PooledTransactions (0x10)",
peer, sent=0, requested=hashes.len peer, sent=0, requested=hashes.len
await response.send([]) await response.send([])
@ -255,7 +259,7 @@ p2pProtocol eth(version = ethVersion,
# User message 0x0d: GetNodeData. # User message 0x0d: GetNodeData.
proc getNodeData(peer: Peer, hashes: openArray[NodeHash]) = proc getNodeData(peer: Peer, hashes: openArray[NodeHash]) =
traceReceived "eth.GetNodeData (0x0d)", peer, traceReceived "GetNodeData (0x0d)", peer,
hashes=hashes.len hashes=hashes.len
var data: seq[Blob] var data: seq[Blob]
@ -265,10 +269,10 @@ p2pProtocol eth(version = ethVersion,
data = peer.network.chain.getStorageNodes(hashes) data = peer.network.chain.getStorageNodes(hashes)
if data.len > 0: if data.len > 0:
traceReplying "with eth.NodeData (0x0e)", peer, traceReplying "with NodeData (0x0e)", peer,
sent=data.len, requested=hashes.len sent=data.len, requested=hashes.len
else: else:
traceReplying "EMPTY eth.NodeData (0x0e)", peer, traceReplying "EMPTY NodeData (0x0e)", peer,
sent=0, requested=hashes.len sent=0, requested=hashes.len
await peer.nodeData(data) await peer.nodeData(data)
@ -280,16 +284,16 @@ p2pProtocol eth(version = ethVersion,
# know if this is a valid reply ("Got reply") or something else. # know if this is a valid reply ("Got reply") or something else.
peer.state.onNodeData(peer, data) peer.state.onNodeData(peer, data)
else: else:
traceDiscarding "eth.NodeData (0x0e)", peer, traceDiscarding "NodeData (0x0e)", peer,
bytes=data.len bytes=data.len
requestResponse: requestResponse:
# User message 0x0f: GetReceipts. # User message 0x0f: GetReceipts.
proc getReceipts(peer: Peer, hashes: openArray[BlockHash]) = proc getReceipts(peer: Peer, hashes: openArray[BlockHash]) =
traceReceived "eth.GetReceipts (0x0f)", traceReceived "GetReceipts (0x0f)",
peer, hashes=hashes.len peer, hashes=hashes.len
traceReplying "EMPTY eth.Receipts (0x10)", traceReplying "EMPTY Receipts (0x10)",
peer, sent=0, requested=hashes.len peer, sent=0, requested=hashes.len
await response.send([]) await response.send([])
# TODO: implement `getReceipts` and reactivate this code # TODO: implement `getReceipts` and reactivate this code

View File

@ -0,0 +1,52 @@
# Nimbus - Rapidly converge on and track the canonical chain head of each peer
#
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
template traceReceived*(msg: static[string], args: varargs[untyped]) =
tracePacket "<< " & prettyEthProtoName & " Received " & msg,
`args`
template traceGot*(msg: static[string], args: varargs[untyped]) =
tracePacket "<< " & prettyEthProtoName & " Got " & msg,
`args`
template traceProtocolViolation*(msg: static[string], args: varargs[untyped]) =
tracePacketError "<< " & prettyEthProtoName & " Protocol violation, " & msg,
`args`
template traceRecvError*(msg: static[string], args: varargs[untyped]) =
traceNetworkError "<< " & prettyEthProtoName & " Error " & msg,
`args`
template traceTimeoutWaiting*(msg: static[string], args: varargs[untyped]) =
traceTimeout "<< " & prettyEthProtoName & " Timeout waiting " & msg,
`args`
template traceSending*(msg: static[string], args: varargs[untyped]) =
tracePacket ">> " & prettyEthProtoName & " Sending " & msg,
`args`
template traceReplying*(msg: static[string], args: varargs[untyped]) =
tracePacket ">> " & prettyEthProtoName & " Replying " & msg,
`args`
template traceDelaying*(msg: static[string], args: varargs[untyped]) =
tracePacket ">>" & prettyEthProtoName & " Delaying " & msg,
`args`
template traceGossipDiscarding*(msg: static[string], args: varargs[untyped]) =
traceGossip "<< " & prettyEthProtoName & " Discarding " & msg,
`args`
template traceDiscarding*(msg: static[string], args: varargs[untyped]) =
tracePacket "<< " & prettyEthProtoName & " Discarding " & msg,
`args`
# End

View File

@ -0,0 +1,40 @@
# Nimbus - Rapidly converge on and track the canonical chain head of each peer
#
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
template traceReceived*(msg: static[string], args: varargs[untyped]) =
tracePacket "<< " & prettySnapProtoName & " Received " & msg,
`args`
template traceGot*(msg: static[string], args: varargs[untyped]) =
tracePacket "<< " & prettySnapProtoName & " Got " & msg,
`args`
template traceProtocolViolation*(msg: static[string], args: varargs[untyped]) =
tracePacketError "<< " & prettySnapProtoName & " Protocol violation, " & msg,
`args`
template traceRecvError*(msg: static[string], args: varargs[untyped]) =
traceNetworkError "<< " & prettySnapProtoName & " Error " & msg,
`args`
template traceTimeoutWaiting*(msg: static[string], args: varargs[untyped]) =
traceTimeout "<< " & prettySnapProtoName & " Timeout waiting " & msg,
`args`
template traceSending*(msg: static[string], args: varargs[untyped]) =
tracePacket ">> " & prettySnapProtoName & " Sending " & msg,
`args`
template traceReplying*(msg: static[string], args: varargs[untyped]) =
tracePacket ">> " & prettySnapProtoName & " Replying " & msg,
`args`
# End

View File

@ -9,47 +9,57 @@
# at your option. This file may not be copied, modified, or distributed # at your option. This file may not be copied, modified, or distributed
# except according to those terms. # except according to those terms.
## This module implements Ethereum Snapshot Protocol (SNAP), `snap/1`, as ## This module implements `snap/1`, the `Ethereum Snapshot Protocol (SNAP)
## specified at the reference below, but modified for Geth compatibility. ## <https://github.com/ethereum/devp2p/blob/master/caps/snap.md>`_.
## ##
## - [Ethereum Snapshot Protocol (SNAP)] ## Modifications for *Geth* compatibility
## (https://github.com/ethereum/devp2p/blob/master/caps/snap.md) ## --------------------------------------
## ##
## Note: The `snap/1` specification doesn't match reality. If we implement the ## `GetAccountRange` and `GetStorageRanges` take parameters `origin` and
## protocol as specified, Geth drops the peer connection. We must do as Geth
## expects.
##
## Modifications for Geth compatibility
## ------------------------------------
##
## - `GetAccountRanges` and `GetStorageRanges` take parameters `origin` and
## `limit`, instead of a single `startingHash` parameter in the ## `limit`, instead of a single `startingHash` parameter in the
## specification. `origin` and `limit` are 256-bit paths representing the ## specification. The parameters `origin` and `limit` are 256-bit paths
## starting hash and ending trie path, both inclusive. ## representing the starting hash and ending trie path, both inclusive.
## ##
## - If the `snap/1` specification is followed (omitting `limit`), Geth 1.10 ## The `snap/1` specification doesn't match reality. If the specification is
## disconnects immediately so we must follow this deviation. ## strictly followed omitting `limit`, *Geth 1.10* disconnects immediately so
## this implementation strives to meet the *Geth* behaviour.
## ##
## - Results from either call may include one item with path `>= limit`. Geth ## Results from either call may include one item with path `>= limit`. *Geth*
## fetches data from its internal database until it reaches this condition or ## fetches data from its internal database until it reaches this condition or
## the bytes threshold, then replies with what it fetched. Usually there is ## the bytes threshold, then replies with what it fetched. Usually there is
## no item at the exact path `limit`, so there is one after. ## no item at the exact path `limit`, so there is one after.
## ##
## - `GetAccountRanges` parameters `origin` and `limit` must be 32 byte blobs.
## There is no reason why empty limit is not allowed here when it is allowed
## for `GetStorageRanges`, it just isn't.
## ##
## `GetStorageRanges` quirks for Geth compatibility ## Modified `GetStorageRanges` (0x02) message syntax
## ------------------------------------------------ ## ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
## ##
## When calling a Geth peer with `GetStorageRanges`: ## As implementes here, the request message is encoded as
##
## `[reqID, rootHash, accountHashes, origin, limit, responseBytes]`
##
## It requests the storage slots of multiple accounts' storage tries. Since
## certain contracts have huge state, the method can also request storage
## slots from a single account, starting at a specific storage key hash.
## The intended purpose of this message is to fetch a large number of
## subsequent storage slots from a remote node and reconstruct a state
## subtrie locally.
##
## * `reqID`: Request ID to match up responses with
## * `rootHash`: 32 byte root hash of the account trie to serve
## * `accountHashes`: Array of 32 byte account hashes of the storage tries to serve
## * `origin`: Storage slot hash fragment of the first to retrieve (see below)
## * `limit`: Storage slot hash fragment after which to stop serving (see below)
## * `responseBytes`: 64 bit number soft limit at which to stop returning data
##
## Discussion of *Geth* `GetStorageRanges` behaviour
## ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
## ##
## - Parameters `origin` and `limit` may each be empty blobs, which mean "all ## - Parameters `origin` and `limit` may each be empty blobs, which mean "all
## zeros" (0x00000...) or "no limit" (0xfffff...) respectively. ## zeros" (0x00000...) or "no limit" (0xfffff...) respectively.
## ##
## (Blobs shorter than 32 bytes can also be given, and they are extended with ## (Blobs shorter than 32 bytes can also be given, and they are extended with
## zero bytes; longer than 32 bytes can be given and are truncated, but this ## zero bytes; longer than 32 bytes can be given and are truncated, but this
## is Geth being too accepting, and shouldn't be used.) ## is *Geth* being too accepting, and shouldn't be used.)
## ##
## - In the `slots` reply, the last account's storage list may be empty even if ## - In the `slots` reply, the last account's storage list may be empty even if
## that account has non-empty storage. ## that account has non-empty storage.
@ -80,12 +90,12 @@
## pipelining where different `stateRoot` hashes are used as time progresses. ## pipelining where different `stateRoot` hashes are used as time progresses.
## Workarounds: ## Workarounds:
## ##
## - Fetch the proof using a second `GetStorageRanges` query with non-zero ## * Fetch the proof using a second `GetStorageRanges` query with non-zero
## `origin` (perhaps equal to `limit`; use `origin = 1` if `limit == 0`). ## `origin` (perhaps equal to `limit`; use `origin = 1` if `limit == 0`).
## ##
## - Avoid the condition by using `origin >= 1` when using `limit`. ## * Avoid the condition by using `origin >= 1` when using `limit`.
## ##
## - Use trie node traversal (`snap` `GetTrieNodes` or `eth` `GetNodeData`) ## * Use trie node traversal (`snap` `GetTrieNodes` or `eth` `GetNodeData`)
## to obtain the omitted proof. ## to obtain the omitted proof.
## ##
## - When multiple accounts are requested with `origin > 0`, only one account's ## - When multiple accounts are requested with `origin > 0`, only one account's
@ -103,6 +113,30 @@
## treated `origin` as applying to only the first account and `limit` to only ## treated `origin` as applying to only the first account and `limit` to only
## the last account, but it doesn't.) ## the last account, but it doesn't.)
## ##
## Modified `GetAccountRange` (0x00) packet syntax
## ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
##
## As implementes here, the request message is encoded as
##
## `[reqID, rootHash, origin, limit, responseBytes]`
##
## It requests an unknown number of accounts from a given account trie, starting
## at the specified account hash and capped by the maximum allowed response
## size in bytes. The intended purpose of this message is to fetch a large
## number of subsequent accounts from a remote node and reconstruct a state
## subtrie locally.
##
## The `GetAccountRange` parameters `origin` and `limit` must be 32 byte
## blobs. There is no reason why empty limit is not allowed here when it is
## allowed for `GetStorageRanges`, it just isn't.
##
## * `reqID`: Request ID to match up responses with
## * `rootHash`: Root hash of the account trie to serve
## * `origin`: 32 byte storage slot hash of the first to retrieve
## * `limit`: 32 byte storage slot hash fragment after which to stop serving
## * `responseBytes`: 64 bit number soft limit at which to stop returning data
##
##
## Performance benefits ## Performance benefits
## -------------------- ## --------------------
## ##
@ -113,13 +147,13 @@
## It improves both network and local storage performance. The benefits are ## It improves both network and local storage performance. The benefits are
## substantial, and summarised here: ## substantial, and summarised here:
## ##
## - [Ethereum Snapshot Protocol (SNAP) - Expected results] ## - `Ethereum Snapshot Protocol (SNAP) - Expected results
## (https://github.com/ethereum/devp2p/blob/master/caps/snap.md) ## <https://github.com/ethereum/devp2p/blob/master/caps/snap.md>`_
## - [Geth v1.10.0 - Snap sync] ## - `Geth v1.10.0 - Snap sync
## (https://blog.ethereum.org/2021/03/03/geth-v1-10-0/#snap-sync) ## <https://blog.ethereum.org/2021/03/03/geth-v1-10-0/#snap-sync>`_
## ##
## In the Snap sync model, local storage benefits require clients to adopt a ## In the Snap sync model, local storage benefits require clients to adopt a
## different representation of Ethereum state than the trie storage that Geth ## different representation of Ethereum state than the trie storage that *Geth*
## (and most clients) traditionally used, and still do in archive mode, ## (and most clients) traditionally used, and still do in archive mode,
## ##
## However, Nimbus's sync method obtains similar local storage benefits ## However, Nimbus's sync method obtains similar local storage benefits
@ -157,7 +191,7 @@
## the size of a representation of partially completed trie traversal with ## the size of a representation of partially completed trie traversal with
## `eth` `GetNodeData`. Due to the smaller metadata, after aborting a partial ## `eth` `GetNodeData`. Due to the smaller metadata, after aborting a partial
## sync and restarting, it is possible to resume quickly, without waiting for ## sync and restarting, it is possible to resume quickly, without waiting for
## the very slow local database scan associated with older versions of Geth. ## the very slow local database scan associated with older versions of *Geth*.
## ##
## However, Nimbus's sync method uses this principle as inspiration to ## However, Nimbus's sync method uses this principle as inspiration to
## obtain similar metadata benefits whichever network protocol is used. ## obtain similar metadata benefits whichever network protocol is used.
@ -171,8 +205,9 @@ import
nimcrypto/hash, nimcrypto/hash,
stew/byteutils, stew/byteutils,
stint, stint,
../sync_types, ".."/[sync_types, trace_helper],
../../constants ../../constants,
./pickeled_snap_tracers
type type
SnapAccount* = object SnapAccount* = object
@ -198,7 +233,7 @@ const
# avoids transmitting these hashes in about 90% of accounts. We need to # avoids transmitting these hashes in about 90% of accounts. We need to
# recognise or set these hashes in `Account` when serialising RLP for `snap`. # recognise or set these hashes in `Account` when serialising RLP for `snap`.
proc read*(rlp: var Rlp, t: var SnapAccount, _: type Account): Account = proc read(rlp: var Rlp, t: var SnapAccount, _: type Account): Account =
## RLP decoding for `SnapAccount`, which contains a path and account. ## RLP decoding for `SnapAccount`, which contains a path and account.
## The snap representation of the account differs from `Account` RLP. ## The snap representation of the account differs from `Account` RLP.
## Empty storage hash and empty code hash are each represented by an ## Empty storage hash and empty code hash are each represented by an
@ -226,7 +261,7 @@ proc read*(rlp: var Rlp, t: var SnapAccount, _: type Account): Account =
rlp.skipElem() rlp.skipElem()
result.codeHash = EMPTY_SHA3 result.codeHash = EMPTY_SHA3
proc append*(rlpWriter: var RlpWriter, t: SnapAccount, account: Account) = proc append(rlpWriter: var RlpWriter, t: SnapAccount, account: Account) =
## RLP encoding for `SnapAccount`, which contains a path and account. ## RLP encoding for `SnapAccount`, which contains a path and account.
## The snap representation of the account differs from `Account` RLP. ## The snap representation of the account differs from `Account` RLP.
## Empty storage hash and empty code hash are each represented by an ## Empty storage hash and empty code hash are each represented by an
@ -246,39 +281,12 @@ proc append*(rlpWriter: var RlpWriter, t: SnapAccount, account: Account) =
# RLP serialisation for `LeafPath`. # RLP serialisation for `LeafPath`.
template read*(rlp: var Rlp, _: type LeafPath): LeafPath = template read(rlp: var Rlp, _: type LeafPath): LeafPath =
rlp.read(array[sizeof(LeafPath().toBytes), byte]).toLeafPath rlp.read(array[sizeof(LeafPath().toBytes), byte]).toLeafPath
template append*(rlpWriter: var RlpWriter, leafPath: LeafPath) = template append(rlpWriter: var RlpWriter, leafPath: LeafPath) =
rlpWriter.append(leafPath.toBytes) rlpWriter.append(leafPath.toBytes)
# Maybe cruft following?
# # TODO: Don't know why, but the `p2pProtocol` can't handle this type. It
# # tries to serialise the `Option` as an object, looking at the internal
# # fields. But then fails because they are private fields.
# #
# ## RLP serialisation for `Option[SnapPath]`.
# #
# #proc read*(rlp: var Rlp, _: type Option[SnapPath]): Option[SnapPath] =
# # if rlp.blobLen == 0 and rlp.isBlob:
# # result = none(SnapPath)
# # else:
# # result = some(read(rlp, SnapPath))
#
# #proc write*(rlpWriter: var RlpWriter, value: Option[SnapPath]) =
# # if value.isNone:
# # rlpWriter.append("")
# # else:
# # rlpWriter.append(value.unsafeGet)
# Shortcuts, print the protocol type as well (might be removed in future)
const protoInfo = prettySnapProtoName
template traceReceived(msg: static[string], args: varargs[untyped]) =
tracePacket "<< " & protoInfo & " Received " & msg, `args`
template traceReplying(msg: static[string], args: varargs[untyped]) =
tracePacket ">> " & protoInfo & " Replying " & msg, `args`
p2pProtocol snap1(version = 1, p2pProtocol snap1(version = 1,
rlpxName = "snap", rlpxName = "snap",
@ -291,11 +299,11 @@ p2pProtocol snap1(version = 1,
# Next line differs from spec to match Geth. # Next line differs from spec to match Geth.
origin: LeafPath, limit: LeafPath, origin: LeafPath, limit: LeafPath,
responseBytes: uint64) = responseBytes: uint64) =
traceReceived "snap.GetAccountRange (0x00)", peer, traceReceived "GetAccountRange (0x00)", peer,
accountRange=pathRange(origin, limit), accountRange=pathRange(origin, limit),
stateRoot=($rootHash), responseBytes stateRoot=($rootHash), responseBytes
traceReplying "EMPTY snap.AccountRange (0x01)", peer, sent=0 traceReplying "EMPTY AccountRange (0x01)", peer, sent=0
await response.send(@[], @[]) await response.send(@[], @[])
# User message 0x01: AccountRange. # User message 0x01: AccountRange.
@ -331,12 +339,12 @@ p2pProtocol snap1(version = 1,
if definiteFullRange: if definiteFullRange:
# Fetching storage for multiple accounts. # Fetching storage for multiple accounts.
traceReceived "snap.GetStorageRanges/A (0x02)", peer, traceReceived "GetStorageRanges/A (0x02)", peer,
accountPaths=accounts.len, accountPaths=accounts.len,
stateRoot=($rootHash), responseBytes stateRoot=($rootHash), responseBytes
elif accounts.len == 1: elif accounts.len == 1:
# Fetching partial storage for one account, aka. "large contract". # Fetching partial storage for one account, aka. "large contract".
traceReceived "snap.GetStorageRanges/S (0x02)", peer, traceReceived "GetStorageRanges/S (0x02)", peer,
accountPaths=1, accountPaths=1,
storageRange=(describe(origin) & '-' & describe(limit)), storageRange=(describe(origin) & '-' & describe(limit)),
stateRoot=($rootHash), responseBytes stateRoot=($rootHash), responseBytes
@ -344,12 +352,12 @@ p2pProtocol snap1(version = 1,
# This branch is separated because these shouldn't occur. It's not # This branch is separated because these shouldn't occur. It's not
# really specified what happens when there are multiple accounts and # really specified what happens when there are multiple accounts and
# non-default path range. # non-default path range.
traceReceived "snap.GetStorageRanges/AS?? (0x02)", peer, traceReceived "GetStorageRanges/AS?? (0x02)", peer,
accountPaths=accounts.len, accountPaths=accounts.len,
storageRange=(describe(origin) & '-' & describe(limit)), storageRange=(describe(origin) & '-' & describe(limit)),
stateRoot=($rootHash), responseBytes stateRoot=($rootHash), responseBytes
traceReplying "EMPTY snap.StorageRanges (0x03)", peer, sent=0 traceReplying "EMPTY StorageRanges (0x03)", peer, sent=0
await response.send(@[], @[]) await response.send(@[], @[])
# User message 0x03: StorageRanges. # User message 0x03: StorageRanges.
@ -361,10 +369,10 @@ p2pProtocol snap1(version = 1,
requestResponse: requestResponse:
proc getByteCodes(peer: Peer, hashes: openArray[NodeHash], proc getByteCodes(peer: Peer, hashes: openArray[NodeHash],
responseBytes: uint64) = responseBytes: uint64) =
traceReceived "snap.GetByteCodes (0x04)", peer, traceReceived "GetByteCodes (0x04)", peer,
hashes=hashes.len, responseBytes hashes=hashes.len, responseBytes
traceReplying "EMPTY snap.ByteCodes (0x05)", peer, sent=0 traceReplying "EMPTY ByteCodes (0x05)", peer, sent=0
await response.send(@[]) await response.send(@[])
# User message 0x05: ByteCodes. # User message 0x05: ByteCodes.
@ -374,10 +382,10 @@ p2pProtocol snap1(version = 1,
requestResponse: requestResponse:
proc getTrieNodes(peer: Peer, rootHash: TrieHash, proc getTrieNodes(peer: Peer, rootHash: TrieHash,
paths: openArray[InteriorPath], responseBytes: uint64) = paths: openArray[InteriorPath], responseBytes: uint64) =
traceReceived "snap.GetTrieNodes (0x06)", peer, traceReceived "GetTrieNodes (0x06)", peer,
nodePaths=paths.len, stateRoot=($rootHash), responseBytes nodePaths=paths.len, stateRoot=($rootHash), responseBytes
traceReplying "EMPTY snap.TrieNodes (0x07)", peer, sent=0 traceReplying "EMPTY TrieNodes (0x07)", peer, sent=0
await response.send(@[]) await response.send(@[])
# User message 0x07: TrieNodes. # User message 0x07: TrieNodes.

View File

@ -1,12 +0,0 @@
# Nimbus
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import ./protocol/snap1
export snap1

View File

@ -9,15 +9,24 @@
# at your option. This file may not be copied, modified, or distributed # at your option. This file may not be copied, modified, or distributed
# except according to those terms. # except according to those terms.
{.push raises: [Defect].}
import import
chronicles, chronicles,
chronos, chronos,
eth/[common/eth_types, p2p, rlp], eth/[common/eth_types, p2p, rlp],
eth/p2p/[rlpx, peer_pool, private/p2p_types], eth/p2p/[rlpx, peer_pool, private/p2p_types],
stint, stint,
"."/[chain_head_tracker, protocol_ethxx, protocol/get_nodedata, sync_types] "."/[protocol, sync_types],
./snap/[chain_head_tracker, get_nodedata]
{.push raises: [Defect].}
type
SnapSyncCtx* = ref object of SnapSync
peerPool: PeerPool
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc syncPeerLoop(sp: SyncPeer) {.async.} = proc syncPeerLoop(sp: SyncPeer) {.async.} =
# This basic loop just runs the head-hunter for each peer. # This basic loop just runs the head-hunter for each peer.
@ -36,7 +45,7 @@ proc syncPeerStop(sp: SyncPeer) =
# TODO: Cancel SyncPeers that are running. We need clean cancellation for # TODO: Cancel SyncPeers that are running. We need clean cancellation for
# this. Doing so reliably will be addressed at a later time. # this. Doing so reliably will be addressed at a later time.
proc onPeerConnected(ns: NewSync, protocolPeer: Peer) = proc onPeerConnected(ns: SnapSyncCtx, protocolPeer: Peer) =
let sp = SyncPeer( let sp = SyncPeer(
ns: ns, ns: ns,
peer: protocolPeer, peer: protocolPeer,
@ -46,8 +55,7 @@ proc onPeerConnected(ns: NewSync, protocolPeer: Peer) =
huntLow: 0.toBlockNumber, huntLow: 0.toBlockNumber,
huntHigh: high(BlockNumber), huntHigh: high(BlockNumber),
huntStep: 0, huntStep: 0,
bestBlockNumber: 0.toBlockNumber bestBlockNumber: 0.toBlockNumber)
)
trace "Sync: Peer connected", peer=sp trace "Sync: Peer connected", peer=sp
sp.setupGetNodeData() sp.setupGetNodeData()
@ -63,7 +71,7 @@ proc onPeerConnected(ns: NewSync, protocolPeer: Peer) =
ns.syncPeers.add(sp) ns.syncPeers.add(sp)
sp.syncPeerStart() sp.syncPeerStart()
proc onPeerDisconnected(ns: NewSync, protocolPeer: Peer) = proc onPeerDisconnected(ns: SnapSyncCtx, protocolPeer: Peer) =
trace "Sync: Peer disconnected", peer=protocolPeer trace "Sync: Peer disconnected", peer=protocolPeer
# Find matching `sp` and remove from `ns.syncPeers`. # Find matching `sp` and remove from `ns.syncPeers`.
var sp: SyncPeer = nil var sp: SyncPeer = nil
@ -78,22 +86,27 @@ proc onPeerDisconnected(ns: NewSync, protocolPeer: Peer) =
sp.syncPeerStop() sp.syncPeerStop()
proc newSyncEarly*(ethNode: EthereumNode) = # ------------------------------------------------------------------------------
info "** Using --new-sync experimental new sync algorithms" # Public functions
info "** Note that fetched data is not currently stored" # ------------------------------------------------------------------------------
info "** It's used for timing, behaviour and interop tests"
let ns = NewSync() proc new*(T: type SnapSyncCtx; ethNode: EthereumNode): T =
## Constructor
new result
result.peerPool = ethNode.peerPool
proc start*(ctx: SnapSyncCtx) =
## Set up syncing. This call should come early.
var po = PeerObserver( var po = PeerObserver(
onPeerConnected: onPeerConnected:
proc(protocolPeer: Peer) {.gcsafe.} = proc(p: Peer) {.gcsafe.} =
ns.onPeerConnected(protocolPeer), ctx.onPeerConnected(p),
onPeerDisconnected: onPeerDisconnected:
proc(protocolPeer: Peer) {.gcsafe.} = proc(p: Peer) {.gcsafe.} =
ns.onPeerDisconnected(protocolPeer) ctx.onPeerDisconnected(p))
) po.setProtocol eth
po.setProtocol(eth) ctx.peerPool.addObserver(ctx, po)
ethNode.peerPool.addObserver(ns, po)
proc newSync*() = # ------------------------------------------------------------------------------
discard # End
# ------------------------------------------------------------------------------

View File

@ -2,9 +2,12 @@
# #
# Copyright (c) 2021 Status Research & Development GmbH # Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of # Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0) # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) # http://www.apache.org/licenses/LICENSE-2.0)
# at your option. This file may not be copied, modified, or distributed except according to those terms. # * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
## This module fetches and tracks the canonical chain head of each connected ## This module fetches and tracks the canonical chain head of each connected
## peer. (Or in future, each peer we care about; we won't poll them all so ## peer. (Or in future, each peer we care about; we won't poll them all so
@ -62,8 +65,9 @@ import
chronos, stint, chronicles, stew/byteutils, chronos, stint, chronicles, stew/byteutils,
eth/[common/eth_types, rlp, p2p], eth/[common/eth_types, rlp, p2p],
eth/p2p/[rlpx, private/p2p_types], eth/p2p/[rlpx, private/p2p_types],
../p2p/chain/chain_desc, ../../p2p/chain/chain_desc,
"."/[sync_types, protocol_ethxx, pie/slicer] ".."/[protocol, protocol/pickeled_eth_tracers, sync_types, trace_helper],
./pie/slicer
const const
syncLockedMinimumReply = 8 syncLockedMinimumReply = 8
@ -107,6 +111,7 @@ doAssert syncHuntQuerySize >= 1 and syncHuntQuerySize <= maxHeadersFetch
doAssert syncHuntForwardExpandShift >= 1 and syncHuntForwardExpandShift <= 8 doAssert syncHuntForwardExpandShift >= 1 and syncHuntForwardExpandShift <= 8
doAssert syncHuntBackwardExpandShift >= 1 and syncHuntBackwardExpandShift <= 8 doAssert syncHuntBackwardExpandShift >= 1 and syncHuntBackwardExpandShift <= 8
proc clearSyncStateRoot(sp: SyncPeer) = proc clearSyncStateRoot(sp: SyncPeer) =
if sp.syncStateRoot.isSome: if sp.syncStateRoot.isSome:
debug "Sync: Stopping state sync from this peer", peer=sp debug "Sync: Stopping state sync from this peer", peer=sp
@ -486,17 +491,17 @@ proc peerHuntCanonical*(sp: SyncPeer) {.async.} =
if tracePackets: if tracePackets:
if request.maxResults == 1 and request.startBlock.isHash: if request.maxResults == 1 and request.startBlock.isHash:
tracePacket ">> Sending eth.GetBlockHeaders/Hash (0x03)", peer=sp, traceSending "GetBlockHeaders/Hash", peer=sp,
blockHash=($request.startBlock.hash), count=1 blockHash=($request.startBlock.hash), count=1
elif request.maxResults == 1: elif request.maxResults == 1:
tracePacket ">> Sending eth.GetBlockHeaders (0x03)", peer=sp, traceSending "GetBlockHeaders", peer=sp,
`block`=request.startBlock, count=1 `block`=request.startBlock, count=1
elif request.startBlock.isHash: elif request.startBlock.isHash:
tracePacket ">> Sending eth.GetBlockHeaders/Hash (0x03)", peer=sp, traceSending "GetBlockHeaders/Hash", peer=sp,
firstBlockHash=request.startBlock, count=request.maxResults, firstBlockHash=request.startBlock, count=request.maxResults,
step=traceStep(request) step=traceStep(request)
else: else:
tracePacket ">> Sending eth.GetBlockHeaders (0x03)", peer=sp, traceSending "GetBlockHeaders", peer=sp,
firstBlock=request.startBlock, count=request.maxResults, firstBlock=request.startBlock, count=request.maxResults,
step=traceStep(request) step=traceStep(request)
@ -505,14 +510,14 @@ proc peerHuntCanonical*(sp: SyncPeer) {.async.} =
try: try:
reply = await sp.peer.getBlockHeaders(request) reply = await sp.peer.getBlockHeaders(request)
except CatchableError as e: except CatchableError as e:
traceNetworkError "<< Error waiting for reply to eth.GetBlockHeaders (0x03)", traceRecvError "waiting for reply to GetBlockHeaders",
peer=sp, error=e.msg peer=sp, error=e.msg
inc sp.stats.major.networkErrors inc sp.stats.major.networkErrors
sp.stopped = true sp.stopped = true
return return
if reply.isNone: if reply.isNone:
traceTimeout "<< Timeout waiting for reply to eth.GetBlockHeaders (0x03)", traceTimeoutWaiting "for reply to GetBlockHeaders",
peer=sp peer=sp
# TODO: Should disconnect? # TODO: Should disconnect?
inc sp.stats.minor.timeoutBlockHeaders inc sp.stats.minor.timeoutBlockHeaders
@ -521,18 +526,18 @@ proc peerHuntCanonical*(sp: SyncPeer) {.async.} =
let len = reply.get.headers.len let len = reply.get.headers.len
if tracePackets: if tracePackets:
if len == 0: if len == 0:
tracePacket "<< Got EMPTY reply eth.BlockHeaders (0x04)", peer=sp, traceGot "EMPTY reply BlockHeaders", peer=sp,
got=0, requested=request.maxResults got=0, requested=request.maxResults
else: else:
let firstBlock = reply.get.headers[0].blockNumber let firstBlock = reply.get.headers[0].blockNumber
let lastBlock = reply.get.headers[len - 1].blockNumber let lastBlock = reply.get.headers[len - 1].blockNumber
tracePacket "<< Got reply eth.BlockHeaders (0x04)", peer=sp, traceGot "reply BlockHeaders", peer=sp,
got=len, requested=request.maxResults, firstBlock, lastBlock got=len, requested=request.maxResults, firstBlock, lastBlock
sp.pendingGetBlockHeaders = false sp.pendingGetBlockHeaders = false
if len > request.maxResults.int: if len > request.maxResults.int:
tracePacketError "<< Protocol violation, excess headers in eth.BlockHeaders (0x04)", traceProtocolViolation "excess headers in BlockHeaders",
peer=sp, got=len, requested=request.maxResults peer=sp, got=len, requested=request.maxResults
# TODO: Should disconnect. # TODO: Should disconnect.
inc sp.stats.major.excessBlockHeaders inc sp.stats.major.excessBlockHeaders

View File

@ -47,9 +47,12 @@
## ##
## References: ## References:
## ##
## - [Ethereum Wire Protocol (ETH)](https://github.com/ethereum/devp2p/blob/master/caps/eth.md) ## - `Ethereum Wire Protocol (ETH)
## - [`GetNodeData` (0x0d)](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getnodedata-0x0d) ## <https://github.com/ethereum/devp2p/blob/master/caps/eth.md>`_
## - [`NodeData` (0x0e)](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#nodedata-0x0e) ## - `GetNodeData (0x0d)
## <https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getnodedata-0x0d>`_
## - `NodeData (0x0e)
## <https://github.com/ethereum/devp2p/blob/master/caps/eth.md#nodedata-0x0e>`_
## ##
## Note: ## Note:
## ##
@ -62,10 +65,10 @@
import import
std/[sequtils, sets, tables, hashes], std/[sequtils, sets, tables, hashes],
chronos, chronos,
eth/[common/eth_types, rlp, p2p],
nimcrypto/keccak,
stint, stint,
".."/[sync_types, protocol_ethxx] nimcrypto/keccak,
eth/[common/eth_types, rlp, p2p],
".."/[protocol, protocol/pickeled_eth_tracers, sync_types]
type type
NodeDataRequestQueue* = ref object of typeof SyncPeer().nodeDataRequestsBase NodeDataRequestQueue* = ref object of typeof SyncPeer().nodeDataRequestsBase
@ -93,27 +96,6 @@ type
reverseMap: seq[int] # Access with `reversMap(i)` instead. reverseMap: seq[int] # Access with `reversMap(i)` instead.
hashVerifiedData*: seq[Blob] hashVerifiedData*: seq[Blob]
# Shortcuts, print the protocol type as well (might be removed in future)
const protoInfo = prettyEthProtoName
template traceReceived(msg: static[string], args: varargs[untyped]) =
tracePacket "<< " & protoInfo & " Received " & msg, `args`
template traceWaitTimeout(msg: static[string], args: varargs[untyped]) =
traceTimeout "<< " & protoInfo & " Timeout waiting " & msg, `args`
template traceWaitError(msg: static[string], args: varargs[untyped]) =
traceNetworkError "<< " & protoInfo & " Error waiting " & msg, `args`
template traceProtoError(msg: static[string], args: varargs[untyped]) =
tracePacketError "<< " & protoInfo & " Protocol violation, " & msg, `args`
template traceDisconError(msg: static[string], args: varargs[untyped]) =
traceNetworkError "<< " & protoInfo & " Peer disconnected, " & msg, `args`
template traceSending(msg: static[string], args: varargs[untyped]) =
tracePacket ">> " & protoInfo & " Sending " & msg, `args`
template traceSendError(msg: static[string], args: varargs[untyped]) =
traceNetworkError ">> " & protoInfo & " Error sending " & msg, `args`
template traceDelaying(msg: static[string], args: varargs[untyped]) =
tracePacket ">> " & protoInfo & " Dlaying " & msg, `args`
template reverseMap*(reply: NodeDataReply, index: int): int = template reverseMap*(reply: NodeDataReply, index: int): int =
## Given an index into the request hash list, return index into the reply ## Given an index into the request hash list, return index into the reply
## `hashVerifiedData`, or -1 if there is no data for that request hash. ## `hashVerifiedData`, or -1 if there is no data for that request hash.
@ -133,46 +115,47 @@ template `$`*(paths: (InteriorPath, InteriorPath)): string =
pathRange(paths[0], paths[1]) pathRange(paths[0], paths[1])
proc traceGetNodeDataSending(request: NodeDataRequest) = proc traceGetNodeDataSending(request: NodeDataRequest) =
traceSending "eth.GetNodeData (0x0d)", peer=request.sp, traceSending "GetNodeData", peer=request.sp,
hashes=request.hashes.len, pathRange=request.pathRange hashes=request.hashes.len, pathRange=request.pathRange
proc traceGetNodeDataDelaying(request: NodeDataRequest) = proc traceGetNodeDataDelaying(request: NodeDataRequest) =
traceDelaying "eth.GetNodeData (0x0d)", peer=request.sp, traceDelaying "GetNodeData", peer=request.sp,
hashes=request.hashes.len, pathRange=request.pathRange hashes=request.hashes.len, pathRange=request.pathRange
proc traceGetNodeDataSendError(request: NodeDataRequest, proc traceGetNodeDataSendError(request: NodeDataRequest,
e: ref CatchableError) = e: ref CatchableError) =
traceSendError "eth.GetNodeData (0x0d)", traceRecvError "sending GetNodeData",
peer=request.sp, error=e.msg, peer=request.sp, error=e.msg,
hashes=request.hashes.len, pathRange=request.pathRange hashes=request.hashes.len, pathRange=request.pathRange
proc traceNodeDataReplyError(request: NodeDataRequest, proc traceNodeDataReplyError(request: NodeDataRequest,
e: ref CatchableError) = e: ref CatchableError) =
traceWaitError "for reply to eth.GetNodeData (0x0d)", traceRecvError "waiting for reply to GetNodeData",
peer=request.sp, error=e.msg, peer=request.sp, error=e.msg,
hashes=request.hashes.len, pathRange=request.pathRange hashes=request.hashes.len, pathRange=request.pathRange
proc traceNodeDataReplyTimeout(request: NodeDataRequest) = proc traceNodeDataReplyTimeout(request: NodeDataRequest) =
traceWaitTimeout "for reply to eth.GetNodeData (0x0d)", traceTimeoutWaiting "for reply to GetNodeData",
hashes=request.hashes.len, pathRange=request.pathRange, peer=request.sp hashes=request.hashes.len, pathRange=request.pathRange, peer=request.sp
proc traceGetNodeDataDisconnected(request: NodeDataRequest) = proc traceGetNodeDataDisconnected(request: NodeDataRequest) =
traceDisconError "not sending eth.GetNodeData (0x0d)", traceRecvError "peer disconnected, not sending GetNodeData",
peer=request.sp, hashes=request.hashes.len, pathRange=request.pathRange peer=request.sp, hashes=request.hashes.len, pathRange=request.pathRange
proc traceNodeDataReplyEmpty(sp: SyncPeer, request: NodeDataRequest) = proc traceNodeDataReplyEmpty(sp: SyncPeer, request: NodeDataRequest) =
# `request` can be `nil` because we don't always know which request # `request` can be `nil` because we don't always know which request
# the empty reply goes with. Therefore `sp` must be included. # the empty reply goes with. Therefore `sp` must be included.
if request.isNil: if request.isNil:
traceReceived "EMPTY eth.NodeData (0x0e)", peer=sp, got=0 traceGot "EMPTY NodeData", peer=sp,
got=0
else: else:
traceReceived "eth.NodeData (0x0e)", peer=sp, got=0, traceGot "NodeData", peer=sp,
requested=request.hashes.len, pathRange=request.pathRange got=0, requested=request.hashes.len, pathRange=request.pathRange
proc traceNodeDataReplyUnmatched(sp: SyncPeer, got: int) = proc traceNodeDataReplyUnmatched(sp: SyncPeer, got: int) =
# There is no request for this reply. Therefore `sp` must be included. # There is no request for this reply. Therefore `sp` must be included.
traceProtoError "non-reply eth.NodeData (0x0e)", peer=sp, got traceProtocolViolation "non-reply NodeData", peer=sp, got
debug "Sync: Warning: Unexpected non-reply eth.NodeData from peer" debug "Sync: Warning: Unexpected non-reply NodeData from peer"
proc traceNodeDataReply(request: NodeDataRequest, proc traceNodeDataReply(request: NodeDataRequest,
got, use, unmatched, other, duplicates: int) = got, use, unmatched, other, duplicates: int) =
@ -182,11 +165,11 @@ proc traceNodeDataReply(request: NodeDataRequest,
logScope: pathRange=request.pathRange logScope: pathRange=request.pathRange
logScope: peer=request.sp logScope: peer=request.sp
if got > request.hashes.len and (unmatched + other) == 0: if got > request.hashes.len and (unmatched + other) == 0:
traceReceived "EXCESS reply eth.NodeData (0x0e)" traceGot "EXCESS reply NodeData"
elif got == request.hashes.len or use != got: elif got == request.hashes.len or use != got:
traceReceived "reply eth.NodeData (0x0e)" traceGot "reply NodeData"
elif got < request.hashes.len: elif got < request.hashes.len:
traceReceived "TRUNCATED reply eth.NodeData (0x0e)" traceGot "TRUNCATED reply NodeData"
if use != got: if use != got:
logScope: logScope:
@ -197,18 +180,18 @@ proc traceNodeDataReply(request: NodeDataRequest,
pathRange=request.pathRange pathRange=request.pathRange
peer=request.sp peer=request.sp
if unmatched > 0: if unmatched > 0:
traceProtoError "incorrect hashes in eth.NodeData (0x0e)" traceProtocolViolation "incorrect hashes in NodeData"
debug "Sync: Warning: eth.NodeData has nodes with incorrect hashes" debug "Sync: Warning: NodeData has nodes with incorrect hashes"
elif other > 0: elif other > 0:
traceProtoError "mixed request nodes in eth.NodeData (0x0e)" traceProtocolViolation "mixed request nodes in NodeData"
debug "Sync: Warning: eth.NodeData has nodes from mixed requests" debug "Sync: Warning: NodeData has nodes from mixed requests"
elif got > request.hashes.len: elif got > request.hashes.len:
# Excess without unmatched/other is only possible with duplicates > 0. # Excess without unmatched/other is only possible with duplicates > 0.
traceProtoError "excess nodes in eth.NodeData (0x0e)" traceProtocolViolation "excess nodes in NodeData"
debug "Sync: Warning: eth.NodeData has more nodes than requested" debug "Sync: Warning: NodeData has more nodes than requested"
else: else:
traceProtoError "duplicate nodes in eth.NodeData (0x0e)" traceProtocolViolation "duplicate nodes in NodeData"
debug "Sync: Warning: eth.NodeData has duplicate nodes" debug "Sync: Warning: NodeData has duplicate nodes"
proc hash(hash: ptr Hash256): Hash = cast[ptr Hash](addr hash.data)[] proc hash(hash: ptr Hash256): Hash = cast[ptr Hash](addr hash.data)[]
proc `==`(hash1, hash2: ptr Hash256): bool = hash1[] == hash2[] proc `==`(hash1, hash2: ptr Hash256): bool = hash1[] == hash2[]
@ -349,7 +332,7 @@ proc nodeDataComplete(request: NodeDataRequest, reply: NodeDataReply,
# Subtle: Timer can trigger and its callback be added to Chronos run loop, # Subtle: Timer can trigger and its callback be added to Chronos run loop,
# then data event trigger and call `clearTimer()`. The timer callback # then data event trigger and call `clearTimer()`. The timer callback
# will then run but it must be ignored. # will then run but it must be ignored.
debug "Sync: Warning: Resolved timer race over eth.NodeData reply" debug "Sync: Warning: Resolved timer race over NodeData reply"
else: else:
request.timer.clearTimer() request.timer.clearTimer()
request.future.complete(reply) request.future.complete(reply)

View File

@ -9,11 +9,11 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import import
std/[sets, tables, sequtils, strutils], std/[sets, sequtils, strutils],
chronos, chronos,
eth/[common/eth_types, rlp, p2p], eth/[common/eth_types, rlp, p2p],
stint, stint,
../sync_types ../../sync_types
type type
LeafRange* = object LeafRange* = object

View File

@ -22,13 +22,12 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import import
std/[sets, tables], std/sets,
chronos, chronos,
eth/[common/eth_types, rlp, p2p], eth/[common/eth_types, rlp, p2p],
nimcrypto/keccak, nimcrypto/keccak,
stint, stint,
../sync_types, "../.."/[protocol, protocol/pickeled_snap_tracers, sync_types, trace_helper],
../protocol_snapxx,
./common ./common
const const
@ -42,13 +41,13 @@ proc snapFetch*(sp: SyncPeer, stateRoot: TrieHash,
const responseBytes = 2 * 1024 * 1024 const responseBytes = 2 * 1024 * 1024
if sp.stopped: if sp.stopped:
traceNetworkError "<< Peer already disconnected, not sending snap.GetAccountRange (0x00)", traceRecvError "peer already disconnected, not sending GetAccountRange",
peer=sp, accountRange=pathRange(origin, limit), peer=sp, accountRange=pathRange(origin, limit),
stateRoot=($stateRoot), bytesLimit=snapRequestBytesLimit stateRoot=($stateRoot), bytesLimit=snapRequestBytesLimit
sp.putSlice(leafRange) sp.putSlice(leafRange)
if tracePackets: if tracePackets:
tracePacket ">> Sending snap.GetAccountRange (0x00)", traceSending "GetAccountRange",
accountRange=pathRange(origin, limit), accountRange=pathRange(origin, limit),
stateRoot=($stateRoot), bytesLimit=snapRequestBytesLimit, peer=sp stateRoot=($stateRoot), bytesLimit=snapRequestBytesLimit, peer=sp
@ -58,7 +57,7 @@ proc snapFetch*(sp: SyncPeer, stateRoot: TrieHash,
reply = await sp.peer.getAccountRange(stateRoot, origin, limit, reply = await sp.peer.getAccountRange(stateRoot, origin, limit,
snapRequestBytesLimit) snapRequestBytesLimit)
except CatchableError as e: except CatchableError as e:
traceNetworkError "<< Error waiting for reply to snap.GetAccountRange (0x00)", traceRecvError "waiting for reply to GetAccountRange",
peer=sp, error=e.msg peer=sp, error=e.msg
inc sp.stats.major.networkErrors inc sp.stats.major.networkErrors
sp.stopped = true sp.stopped = true
@ -66,7 +65,7 @@ proc snapFetch*(sp: SyncPeer, stateRoot: TrieHash,
return return
if reply.isNone: if reply.isNone:
traceTimeout "<< Timeout waiting for reply to snap.GetAccountRange (0x00)", traceTimeoutWaiting "for reply to GetAccountRange",
peer=sp peer=sp
sp.putSlice(leafRange) sp.putSlice(leafRange)
return return
@ -89,14 +88,14 @@ proc snapFetch*(sp: SyncPeer, stateRoot: TrieHash,
# This makes all the difference to terminating the fetch. For now we'll # This makes all the difference to terminating the fetch. For now we'll
# trust the mere existence of the proof rather than verifying it. # trust the mere existence of the proof rather than verifying it.
if proof.len == 0: if proof.len == 0:
tracePacket "<< Got EMPTY reply snap.AccountRange (0x01)", peer=sp, traceGot "EMPTY reply AccountRange", peer=sp,
got=len, proofLen=proof.len, gotRange="-", got=len, proofLen=proof.len, gotRange="-",
requestedRange=pathRange(origin, limit), stateRoot=($stateRoot) requestedRange=pathRange(origin, limit), stateRoot=($stateRoot)
sp.putSlice(leafRange) sp.putSlice(leafRange)
# Don't keep retrying snap for this state. # Don't keep retrying snap for this state.
sp.stopThisState = true sp.stopThisState = true
else: else:
tracePacket "<< Got END reply snap.AccountRange (0x01)", peer=sp, traceGot "END reply AccountRange", peer=sp,
got=len, proofLen=proof.len, gotRange=pathRange(origin, high(LeafPath)), got=len, proofLen=proof.len, gotRange=pathRange(origin, high(LeafPath)),
requestedRange=pathRange(origin, limit), stateRoot=($stateRoot) requestedRange=pathRange(origin, limit), stateRoot=($stateRoot)
# Current slicer can't accept more result data than was requested, so # Current slicer can't accept more result data than was requested, so
@ -105,14 +104,14 @@ proc snapFetch*(sp: SyncPeer, stateRoot: TrieHash,
return return
var lastPath = accounts[len-1].accHash var lastPath = accounts[len-1].accHash
tracePacket "<< Got reply snap.AccountRange (0x01)", peer=sp, traceGot "reply AccountRange", peer=sp,
got=len, proofLen=proof.len, gotRange=pathRange(origin, lastPath), got=len, proofLen=proof.len, gotRange=pathRange(origin, lastPath),
requestedRange=pathRange(origin, limit), stateRoot=($stateRoot) requestedRange=pathRange(origin, limit), stateRoot=($stateRoot)
# Missing proof isn't allowed, unless `origin` is min path in which case # Missing proof isn't allowed, unless `origin` is min path in which case
# there might be no proof if the result spans the entire range. # there might be no proof if the result spans the entire range.
if proof.len == 0 and origin != low(LeafPath): if proof.len == 0 and origin != low(LeafPath):
tracePacketError "<< Protocol violation, missing proof in snap.AccountRange (0x01)", traceProtocolViolation "missing proof in AccountRange",
peer=sp, got=len, proofLen=proof.len, gotRange=pathRange(origin,lastPath), peer=sp, got=len, proofLen=proof.len, gotRange=pathRange(origin,lastPath),
requestedRange=pathRange(origin, limit), stateRoot=($stateRoot) requestedRange=pathRange(origin, limit), stateRoot=($stateRoot)
sp.putSlice(leafRange) sp.putSlice(leafRange)
@ -135,4 +134,4 @@ proc snapFetch*(sp: SyncPeer, stateRoot: TrieHash,
sp.countAccounts(keepAccounts) sp.countAccounts(keepAccounts)
proc peerSupportsSnap*(sp: SyncPeer): bool = proc peerSupportsSnap*(sp: SyncPeer): bool =
not sp.stopped and sp.peer.supports(snap1) not sp.stopped and sp.peer.supports(snap)

View File

@ -29,7 +29,8 @@ import
chronos, chronos,
eth/[common/eth_types, rlp, p2p], eth/[common/eth_types, rlp, p2p],
stint, stint,
".."/[protocol/get_nodedata, sync_types, validate_trienode], "../.."/[sync_types, trace_helper],
".."/[get_nodedata, validate_trienode],
./common ./common
type type

View File

@ -12,12 +12,12 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import import
std/[sets, tables, random], std/[sets, random],
chronos, chronos,
nimcrypto/keccak, nimcrypto/keccak,
stint, stint,
eth/[common/eth_types, p2p, rlp], eth/[common/eth_types, p2p, rlp],
../sync_types, ../../sync_types,
"."/[common, fetch_trie, fetch_snap] "."/[common, fetch_trie, fetch_snap]
# Note: To test disabling snap (or trie), modify `peerSupportsGetNodeData` or # Note: To test disabling snap (or trie), modify `peerSupportsGetNodeData` or

View File

@ -31,7 +31,7 @@
import import
eth/[common/eth_types, rlp, p2p], eth/[common/eth_types, rlp, p2p],
"."/[sync_types] ".."/[sync_types, trace_helper]
type type
TrieNodeParseContext* = object TrieNodeParseContext* = object

View File

@ -17,42 +17,15 @@ import
stint, stew/byteutils, chronicles, chronos, stint, stew/byteutils, chronicles, chronos,
eth/[common/eth_types, p2p] eth/[common/eth_types, p2p]
const
tracePackets* = true
## Whether to `trace` log each sync network message.
traceGossips* = false
## Whether to `trace` log each gossip network message.
traceHandshakes* = true
## Whether to `trace` log each network handshake message.
traceTimeouts* = true
## Whether to `trace` log each network request timeout.
traceNetworkErrors* = true
## Whether to `trace` log each network request error.
tracePacketErrors* = true
## Whether to `trace` log each messages with invalid data.
traceIndividualNodes* = false
## Whether to `trace` log each trie node, account, storage, receipt, etc.
template tracePacket*(msg: static[string], args: varargs[untyped]) =
if tracePackets: trace `msg`, `args`
template traceGossip*(msg: static[string], args: varargs[untyped]) =
if traceGossips: trace `msg`, `args`
template traceTimeout*(msg: static[string], args: varargs[untyped]) =
if traceTimeouts: trace `msg`, `args`
template traceNetworkError*(msg: static[string], args: varargs[untyped]) =
if traceNetworkErrors: trace `msg`, `args`
template tracePacketError*(msg: static[string], args: varargs[untyped]) =
if tracePacketErrors: trace `msg`, `args`
type type
NewSync* = ref object SnapSync* = ref object of RootObj
## Shared state among all peers of a syncing node. ## Shared state among all peers of a syncing node.
syncPeers*: seq[SyncPeer] syncPeers*: seq[SyncPeer]
sharedFetch: SharedFetchState # Exported via templates. sharedFetch: SharedFetchState # Exported via templates.
SyncPeer* = ref object SyncPeer* = ref object
## Peer state tracking. ## Peer state tracking.
ns*: NewSync ns*: SnapSync
peer*: Peer # p2pProtocol(eth65). peer*: Peer # p2pProtocol(eth65).
stopped*: bool stopped*: bool
pendingGetBlockHeaders*:bool pendingGetBlockHeaders*:bool
@ -135,13 +108,13 @@ type
## numerical properties: ordering, intervals and meaningful difference. ## numerical properties: ordering, intervals and meaningful difference.
number: UInt256 number: UInt256
# Use `import protocol/get_nodedata` to access the real type's methods. # Use `import snap/get_nodedata` to access the real type's methods.
NodeDataRequestQueue {.inheritable, pure.} = ref object NodeDataRequestQueue {.inheritable, pure.} = ref object
# Use `import pie/trie_fetch` to access the real type's methods. # Use `import snap/pie/trie_fetch` to access the real type's methods.
SharedFetchState {.inheritable, pure.} = ref object SharedFetchState {.inheritable, pure.} = ref object
# Use `import pie/trie_fetch` to access the real type's methods. # Use `import snap/pie/trie_fetch` to access the real type's methods.
FetchState {.inheritable, pure.} = ref object FetchState {.inheritable, pure.} = ref object
proc inc(stat: var Stat) {.borrow.} proc inc(stat: var Stat) {.borrow.}

View File

@ -0,0 +1,54 @@
# Nimbus - Types, data structures and shared utilities used in network sync
#
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or
# distributed except according to those terms.
import
chronicles,
eth/common/eth_types,
stew/byteutils
const
tracePackets* = true
## Whether to `trace` log each sync network message.
traceGossips* = false
## Whether to `trace` log each gossip network message.
traceHandshakes* = true
## Whether to `trace` log each network handshake message.
traceTimeouts* = true
## Whether to `trace` log each network request timeout.
traceNetworkErrors* = true
## Whether to `trace` log each network request error.
tracePacketErrors* = true
## Whether to `trace` log each messages with invalid data.
traceIndividualNodes* = false
## Whether to `trace` log each trie node, account, storage, receipt, etc.
template tracePacket*(msg: static[string], args: varargs[untyped]) =
if tracePackets: trace `msg`, `args`
template traceGossip*(msg: static[string], args: varargs[untyped]) =
if traceGossips: trace `msg`, `args`
template traceTimeout*(msg: static[string], args: varargs[untyped]) =
if traceTimeouts: trace `msg`, `args`
template traceNetworkError*(msg: static[string], args: varargs[untyped]) =
if traceNetworkErrors: trace `msg`, `args`
template tracePacketError*(msg: static[string], args: varargs[untyped]) =
if tracePacketErrors: trace `msg`, `args`
func toHex*(hash: Hash256): string =
## Shortcut for buteutils.toHex(hash.data)
hash.data.toHex
func traceStep*(request: BlocksRequest): string =
var str = if request.reverse: "-" else: "+"
if request.skip < high(typeof(request.skip)):
return str & $(request.skip + 1)
return static($(high(typeof(request.skip)).u256 + 1))
# End

View File

@ -12,7 +12,7 @@ import
stew/byteutils, unittest2, stew/byteutils, unittest2,
eth/[p2p, common, trie/db, rlp], eth/[p2p, common, trie/db, rlp],
graphql, ../nimbus/graphql/ethapi, graphql/test_common, graphql, ../nimbus/graphql/ethapi, graphql/test_common,
../nimbus/sync/protocol_ethxx, ../nimbus/sync/protocol,
../nimbus/[genesis, config, chain_config, context], ../nimbus/[genesis, config, chain_config, context],
../nimbus/db/[db_chain], ../nimbus/db/[db_chain],
../nimbus/p2p/chain, ./test_helpers, ../nimbus/p2p/chain, ./test_helpers,

View File

@ -12,7 +12,7 @@ import
unittest2, nimcrypto, eth/common as eth_common, unittest2, nimcrypto, eth/common as eth_common,
json_rpc/[rpcserver, rpcclient], web3/[conversions, engine_api_types], json_rpc/[rpcserver, rpcclient], web3/[conversions, engine_api_types],
eth/[trie/db, p2p/private/p2p_types], eth/[trie/db, p2p/private/p2p_types],
../nimbus/sync/protocol_ethxx, ../nimbus/sync/protocol,
../nimbus/rpc/[common, p2p, hexstrings, rpc_types, rpc_utils, engine_api], ../nimbus/rpc/[common, p2p, hexstrings, rpc_types, rpc_utils, engine_api],
../nimbus/db/[db_chain], ../nimbus/db/[db_chain],
../nimbus/[chain_config, config, context, genesis, sealer], ../nimbus/[chain_config, config, context, genesis, sealer],

View File

@ -14,7 +14,7 @@ import
../nimbus/[constants, config, genesis, utils, transaction, ../nimbus/[constants, config, genesis, utils, transaction,
vm_state, vm_types, version], vm_state, vm_types, version],
../nimbus/db/[accounts_cache, db_chain], ../nimbus/db/[accounts_cache, db_chain],
../nimbus/sync/protocol_ethxx, ../nimbus/sync/protocol,
../nimbus/p2p/[chain, executor, executor/executor_helpers], ../nimbus/p2p/[chain, executor, executor/executor_helpers],
../nimbus/utils/[difficulty, tx_pool], ../nimbus/utils/[difficulty, tx_pool],
../nimbus/[context, chain_config], ../nimbus/[context, chain_config],