mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-02-26 10:55:41 +00:00
Sync: Move blockchain_sync
code and use it with eth/65
Move `blockchain_sync.nim` from `nim-eth` to `nimbus-eth1`. This lets `blockchain_sync` use the `eth/65` protocol to synchronise with more modern peers than before. Practically, the effect is the sync process runs more quickly and reliably than before. It finds usable peers, and they are up to date. Note, this is mostly old code, and it mostly performs "classic sync", the original Ethereum method. Here's a summary of this code: - It decides on a blockchain canonical head by sampling a few peers. - Starting from block 0 (genesis), it downloads each block header and block, mostly in order. - After it downloads each block, it executes the EVM transactions in that block and updates state trie from that, before going to the next block. - This way the database state is updated by EVM executions in block order, and new state is persisted to the trie database after each block. Even though it mentions Geth "fast sync" (comments near end of file), and has some elements, it isn't really. The most obvious missing part is this code _doesn't download a state trie_, it calculates all state from block 0. Geth "fast sync" has several parts: 1. Find an agreed common chain among several peers to treat as probably secure, and a sufficiently long suffix to provide "statistical economic consensus" when it is validated. 2. Perform a subset of PoW calculations, skipping forward over a segment to verify some of the PoWs according to a pattern in the relevant paper. 3. Download the state trie from the block at the start of that last segment. 4. Execute only the blocks/transactions in that last segment, using the downloaded state trie, to fill out the later states and properly validate the blocks in the last segment. Some other issues with `blockchain_sync` code: - If it ever reaches the head of the chain, it doesn't follow new blocks with increasing block numbers, at least not rapidly. - If the chain undergoes a reorg, this code won't fetch a block number it has already fetched, so it can't accept the reorg. It will end up conflicted with peers. This hasn't mattered because the development focus has been on the bulk of the catching up process, not the real-time head and reorgs. - So it probably doesn't work correctly when it gets close to the head due to many small reorgs, though it might for subtle reasons. - Some of the network message handling isn't sufficiently robust, and it discards some replies that have valid data according to specification. - On rare occasions the initial query mapping block hash to number can fail (because the peer's state changes). - It makes some assumptions about the state of peers based on their responses which may not be valid (I'm not convinced they are). The method for working out "trusted" peers that agree a common chain prefix is clever. It compares peers by asking each peer if it has the header matching another peer's canonical head block by hash. But it's not clear that merely knowing about a block constitutes agreement about the canonical chain. (If it did, query by block number would give the same answer more authoritatively.) Nonetheless, being able to run this sync process on `eth/65` is useful. <# interactive rebase in progress; onto 66532e8a Signed-off-by: Jamie Lokier <jamie@shareable.org>
This commit is contained in:
parent
936a18b4f4
commit
c435409292
@ -16,7 +16,7 @@ import
|
||||
eth/common as eth_common, eth/p2p as eth_p2p,
|
||||
chronos, json_rpc/rpcserver, chronicles,
|
||||
eth/p2p/rlpx_protocols/les_protocol,
|
||||
eth/p2p/blockchain_sync, eth/net/nat, eth/p2p/peer_pool,
|
||||
./p2p/blockchain_sync, eth/net/nat, eth/p2p/peer_pool,
|
||||
./sync/protocol_eth65,
|
||||
config, genesis, rpc/[common, p2p, debug, key_storage], p2p/chain,
|
||||
eth/trie/db, metrics, metrics/[chronos_httpserver, chronicles_support],
|
||||
|
413
nimbus/p2p/blockchain_sync.nim
Normal file
413
nimbus/p2p/blockchain_sync.nim
Normal file
@ -0,0 +1,413 @@
|
||||
# nim-eth
|
||||
# Copyright (c) 2018-2021 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
import
|
||||
std/[sets, options, random, hashes],
|
||||
chronos, chronicles,
|
||||
eth/common/eth_types,
|
||||
eth/[p2p, p2p/private/p2p_types, p2p/rlpx, p2p/peer_pool],
|
||||
../sync/protocol_eth65
|
||||
|
||||
const
|
||||
minPeersToStartSync* = 2 # Wait for consensus of at least this
|
||||
# number of peers before syncing
|
||||
|
||||
type
|
||||
SyncStatus* = enum
|
||||
syncSuccess
|
||||
syncNotEnoughPeers
|
||||
syncTimeOut
|
||||
|
||||
WantedBlocksState = enum
|
||||
Initial,
|
||||
Requested,
|
||||
Received,
|
||||
Persisted
|
||||
|
||||
WantedBlocks = object
|
||||
startIndex: BlockNumber
|
||||
numBlocks: uint
|
||||
state: WantedBlocksState
|
||||
headers: seq[BlockHeader]
|
||||
bodies: seq[BlockBody]
|
||||
|
||||
SyncContext = ref object
|
||||
workQueue: seq[WantedBlocks]
|
||||
endBlockNumber: BlockNumber
|
||||
finalizedBlock: BlockNumber # Block which was downloaded and verified
|
||||
chain: AbstractChainDB
|
||||
peerPool: PeerPool
|
||||
trustedPeers: HashSet[Peer]
|
||||
hasOutOfOrderBlocks: bool
|
||||
|
||||
proc hash*(p: Peer): Hash = hash(cast[pointer](p))
|
||||
|
||||
proc endIndex(b: WantedBlocks): BlockNumber =
|
||||
result = b.startIndex
|
||||
result += (b.numBlocks - 1).toBlockNumber
|
||||
|
||||
proc availableWorkItem(ctx: SyncContext): int =
|
||||
var maxPendingBlock = ctx.finalizedBlock # the last downloaded & processed
|
||||
trace "queue len", length = ctx.workQueue.len
|
||||
result = -1
|
||||
for i in 0 .. ctx.workQueue.high:
|
||||
case ctx.workQueue[i].state
|
||||
of Initial:
|
||||
# When there is a work item at Initial state, immediatly use this one.
|
||||
# This usually means a previous work item that failed somewhere in the
|
||||
# process, and thus can be reused to work on.
|
||||
return i
|
||||
of Persisted:
|
||||
# In case of Persisted, we can reset this work item to a new one.
|
||||
result = i
|
||||
# No break here to give work items in Initial state priority and to
|
||||
# calculate endBlock.
|
||||
else:
|
||||
discard
|
||||
|
||||
# Check all endBlocks of all workqueue items to decide on next range of
|
||||
# blocks to collect & process.
|
||||
let endBlock = ctx.workQueue[i].endIndex
|
||||
if endBlock > maxPendingBlock:
|
||||
maxPendingBlock = endBlock
|
||||
|
||||
let nextRequestedBlock = maxPendingBlock + 1
|
||||
# If this next block doesn't exist yet according to any of our peers, don't
|
||||
# return a work item (and sync will be stopped).
|
||||
if nextRequestedBlock >= ctx.endBlockNumber:
|
||||
return -1
|
||||
|
||||
# Increase queue when there are no free (Initial / Persisted) work items in
|
||||
# the queue. At start, queue will be empty.
|
||||
if result == -1:
|
||||
result = ctx.workQueue.len
|
||||
ctx.workQueue.setLen(result + 1)
|
||||
|
||||
# Create new work item when queue was increased, reset when selected work item
|
||||
# is at Persisted state.
|
||||
var numBlocks = (ctx.endBlockNumber - nextRequestedBlock).toInt
|
||||
if numBlocks > maxHeadersFetch:
|
||||
numBlocks = maxHeadersFetch
|
||||
ctx.workQueue[result] = WantedBlocks(startIndex: nextRequestedBlock, numBlocks: numBlocks.uint, state: Initial)
|
||||
|
||||
proc persistWorkItem(ctx: SyncContext, wi: var WantedBlocks): ValidationResult =
|
||||
result = ctx.chain.persistBlocks(wi.headers, wi.bodies)
|
||||
case result
|
||||
of ValidationResult.OK:
|
||||
ctx.finalizedBlock = wi.endIndex
|
||||
wi.state = Persisted
|
||||
of ValidationResult.Error:
|
||||
wi.state = Initial
|
||||
# successful or not, we're done with these blocks
|
||||
wi.headers = @[]
|
||||
wi.bodies = @[]
|
||||
|
||||
proc persistPendingWorkItems(ctx: SyncContext): (int, ValidationResult) =
|
||||
var nextStartIndex = ctx.finalizedBlock + 1
|
||||
var keepRunning = true
|
||||
var hasOutOfOrderBlocks = false
|
||||
trace "Looking for out of order blocks"
|
||||
while keepRunning:
|
||||
keepRunning = false
|
||||
hasOutOfOrderBlocks = false
|
||||
# Go over the full work queue and check for every work item if it is in
|
||||
# Received state and has the next blocks in line to be processed.
|
||||
for i in 0 ..< ctx.workQueue.len:
|
||||
let start = ctx.workQueue[i].startIndex
|
||||
# There should be at least 1 like this, namely the just received work item
|
||||
# that initiated this call.
|
||||
if ctx.workQueue[i].state == Received:
|
||||
if start == nextStartIndex:
|
||||
trace "Processing pending work item", number = start
|
||||
result = (i, ctx.persistWorkItem(ctx.workQueue[i]))
|
||||
# TODO: We can stop here on failure, but have to set
|
||||
# hasOutofORderBlocks. Is this always valid?
|
||||
nextStartIndex = ctx.finalizedBlock + 1
|
||||
keepRunning = true
|
||||
break
|
||||
else:
|
||||
hasOutOfOrderBlocks = true
|
||||
|
||||
ctx.hasOutOfOrderBlocks = hasOutOfOrderBlocks
|
||||
|
||||
proc returnWorkItem(ctx: SyncContext, workItem: int): ValidationResult =
|
||||
let wi = addr ctx.workQueue[workItem]
|
||||
let askedBlocks = wi.numBlocks.int
|
||||
let receivedBlocks = wi.headers.len
|
||||
let start = wi.startIndex
|
||||
|
||||
if askedBlocks == receivedBlocks:
|
||||
trace "Work item complete",
|
||||
start,
|
||||
askedBlocks,
|
||||
receivedBlocks
|
||||
|
||||
if wi.startIndex != ctx.finalizedBlock + 1:
|
||||
trace "Blocks out of order", start, final = ctx.finalizedBlock
|
||||
ctx.hasOutOfOrderBlocks = true
|
||||
|
||||
if ctx.hasOutOfOrderBlocks:
|
||||
let (index, validation) = ctx.persistPendingWorkItems()
|
||||
# Only report an error if it was this peer's work item that failed
|
||||
if validation == ValidationResult.Error and index == workitem:
|
||||
result = ValidationResult.Error
|
||||
# TODO: What about failures on other peers' work items?
|
||||
# In that case the peer will probably get disconnected on future erroneous
|
||||
# work items, but before this occurs, several more blocks (that will fail)
|
||||
# might get downloaded from this peer. This will delay the sync and this
|
||||
# should be improved.
|
||||
else:
|
||||
trace "Processing work item", number = wi.startIndex
|
||||
# Validation result needs to be returned so that higher up can be decided
|
||||
# to disconnect from this peer in case of error.
|
||||
result = ctx.persistWorkItem(wi[])
|
||||
else:
|
||||
trace "Work item complete but we got fewer blocks than requested, so we're ditching the whole thing.",
|
||||
start,
|
||||
askedBlocks,
|
||||
receivedBlocks
|
||||
return ValidationResult.Error
|
||||
|
||||
proc newSyncContext(chain: AbstractChainDB, peerPool: PeerPool): SyncContext =
|
||||
new result
|
||||
result.chain = chain
|
||||
result.peerPool = peerPool
|
||||
result.trustedPeers = initHashSet[Peer]()
|
||||
result.finalizedBlock = chain.getBestBlockHeader().blockNumber
|
||||
|
||||
proc handleLostPeer(ctx: SyncContext) =
|
||||
# TODO: ask the PeerPool for new connections and then call
|
||||
# `obtainBlocksFromPeer`
|
||||
discard
|
||||
|
||||
proc getBestBlockNumber(p: Peer): Future[BlockNumber] {.async.} =
|
||||
let request = BlocksRequest(
|
||||
startBlock: HashOrNum(isHash: true,
|
||||
hash: p.state(eth).bestBlockHash),
|
||||
maxResults: 1,
|
||||
skip: 0,
|
||||
reverse: true)
|
||||
|
||||
let latestBlock = await p.getBlockHeaders(request)
|
||||
|
||||
if latestBlock.isSome and latestBlock.get.headers.len > 0:
|
||||
result = latestBlock.get.headers[0].blockNumber
|
||||
|
||||
proc obtainBlocksFromPeer(syncCtx: SyncContext, peer: Peer) {.async.} =
|
||||
# Update our best block number
|
||||
try:
|
||||
let bestBlockNumber = await peer.getBestBlockNumber()
|
||||
if bestBlockNumber > syncCtx.endBlockNumber:
|
||||
trace "New sync end block number", number = bestBlockNumber
|
||||
syncCtx.endBlockNumber = bestBlockNumber
|
||||
except TransportError:
|
||||
debug "Transport got closed during obtainBlocksFromPeer"
|
||||
except CatchableError as e:
|
||||
debug "Exception in getBestBlockNumber()", exc = e.name, err = e.msg
|
||||
# no need to exit here, because the context might still have blocks to fetch
|
||||
# from this peer
|
||||
|
||||
while (let workItemIdx = syncCtx.availableWorkItem(); workItemIdx != -1 and
|
||||
peer.connectionState notin {Disconnecting, Disconnected}):
|
||||
template workItem: auto = syncCtx.workQueue[workItemIdx]
|
||||
workItem.state = Requested
|
||||
trace "Requesting block headers", start = workItem.startIndex,
|
||||
count = workItem.numBlocks, peer = peer.remote.node
|
||||
let request = BlocksRequest(
|
||||
startBlock: HashOrNum(isHash: false, number: workItem.startIndex),
|
||||
maxResults: workItem.numBlocks,
|
||||
skip: 0,
|
||||
reverse: false)
|
||||
|
||||
var dataReceived = false
|
||||
try:
|
||||
let results = await peer.getBlockHeaders(request)
|
||||
if results.isSome:
|
||||
shallowCopy(workItem.headers, results.get.headers)
|
||||
|
||||
var bodies = newSeq[BlockBody]()
|
||||
var hashes = newSeq[KeccakHash]()
|
||||
var nextIndex = workItem.startIndex
|
||||
for i in workItem.headers:
|
||||
if i.blockNumber != nextIndex:
|
||||
raise newException(CatchableError, "The block numbers are not in sequence. Not processing this workItem.")
|
||||
else:
|
||||
nextIndex = nextIndex + 1
|
||||
hashes.add(blockHash(i))
|
||||
if hashes.len == maxBodiesFetch:
|
||||
let b = await peer.getBlockBodies(hashes)
|
||||
if b.isNone:
|
||||
raise newException(CatchableError, "Was not able to get the block bodies.")
|
||||
hashes.setLen(0)
|
||||
bodies.add(b.get.blocks)
|
||||
|
||||
if hashes.len != 0:
|
||||
let b = await peer.getBlockBodies(hashes)
|
||||
if b.isNone:
|
||||
raise newException(CatchableError, "Was not able to get the block bodies.")
|
||||
bodies.add(b.get.blocks)
|
||||
|
||||
if bodies.len == workItem.headers.len:
|
||||
shallowCopy(workItem.bodies, bodies)
|
||||
dataReceived = true
|
||||
else:
|
||||
warn "Bodies len != headers.len", bodies = bodies.len, headers = workItem.headers.len
|
||||
except TransportError:
|
||||
debug "Transport got closed during obtainBlocksFromPeer"
|
||||
except CatchableError as e:
|
||||
# the success case sets `dataReceived`, so we can just fall back to the
|
||||
# failure path below. If we signal time-outs with exceptions such
|
||||
# failures will be easier to handle.
|
||||
debug "Exception in obtainBlocksFromPeer()", exc = e.name, err = e.msg
|
||||
|
||||
var giveUpOnPeer = false
|
||||
|
||||
if dataReceived:
|
||||
workItem.state = Received
|
||||
if syncCtx.returnWorkItem(workItemIdx) != ValidationResult.OK:
|
||||
giveUpOnPeer = true
|
||||
else:
|
||||
giveUpOnPeer = true
|
||||
|
||||
if giveUpOnPeer:
|
||||
workItem.state = Initial
|
||||
try:
|
||||
await peer.disconnect(SubprotocolReason)
|
||||
except CatchableError:
|
||||
discard
|
||||
syncCtx.handleLostPeer()
|
||||
break
|
||||
|
||||
trace "Finished obtaining blocks", peer
|
||||
|
||||
proc peersAgreeOnChain(a, b: Peer): Future[bool] {.async.} =
|
||||
# Returns true if one of the peers acknowledges existence of the best block
|
||||
# of another peer.
|
||||
var
|
||||
a = a
|
||||
b = b
|
||||
|
||||
if a.state(eth).bestDifficulty < b.state(eth).bestDifficulty:
|
||||
swap(a, b)
|
||||
|
||||
let request = BlocksRequest(
|
||||
startBlock: HashOrNum(isHash: true,
|
||||
hash: b.state(eth).bestBlockHash),
|
||||
maxResults: 1,
|
||||
skip: 0,
|
||||
reverse: true)
|
||||
|
||||
let latestBlock = await a.getBlockHeaders(request)
|
||||
result = latestBlock.isSome and latestBlock.get.headers.len > 0
|
||||
|
||||
proc randomTrustedPeer(ctx: SyncContext): Peer =
|
||||
var k = rand(ctx.trustedPeers.len - 1)
|
||||
var i = 0
|
||||
for p in ctx.trustedPeers:
|
||||
result = p
|
||||
if i == k: return
|
||||
inc i
|
||||
|
||||
proc startSyncWithPeer(ctx: SyncContext, peer: Peer) {.async.} =
|
||||
trace "start sync", peer, trustedPeers = ctx.trustedPeers.len
|
||||
if ctx.trustedPeers.len >= minPeersToStartSync:
|
||||
# We have enough trusted peers. Validate new peer against trusted
|
||||
if await peersAgreeOnChain(peer, ctx.randomTrustedPeer()):
|
||||
ctx.trustedPeers.incl(peer)
|
||||
asyncCheck ctx.obtainBlocksFromPeer(peer)
|
||||
elif ctx.trustedPeers.len == 0:
|
||||
# Assume the peer is trusted, but don't start sync until we reevaluate
|
||||
# it with more peers
|
||||
trace "Assume trusted peer", peer
|
||||
ctx.trustedPeers.incl(peer)
|
||||
else:
|
||||
# At this point we have some "trusted" candidates, but they are not
|
||||
# "trusted" enough. We evaluate `peer` against all other candidates.
|
||||
# If one of the candidates disagrees, we swap it for `peer`. If all
|
||||
# candidates agree, we add `peer` to trusted set. The peers in the set
|
||||
# will become "fully trusted" (and sync will start) when the set is big
|
||||
# enough
|
||||
var
|
||||
agreeScore = 0
|
||||
disagreedPeer: Peer
|
||||
|
||||
for tp in ctx.trustedPeers:
|
||||
if await peersAgreeOnChain(peer, tp):
|
||||
inc agreeScore
|
||||
else:
|
||||
disagreedPeer = tp
|
||||
|
||||
let disagreeScore = ctx.trustedPeers.len - agreeScore
|
||||
|
||||
if agreeScore == ctx.trustedPeers.len:
|
||||
ctx.trustedPeers.incl(peer) # The best possible outcome
|
||||
elif disagreeScore == 1:
|
||||
trace "Peer is no longer trusted for sync", peer
|
||||
ctx.trustedPeers.excl(disagreedPeer)
|
||||
ctx.trustedPeers.incl(peer)
|
||||
else:
|
||||
trace "Peer not trusted for sync", peer
|
||||
|
||||
if ctx.trustedPeers.len == minPeersToStartSync:
|
||||
for p in ctx.trustedPeers:
|
||||
asyncCheck ctx.obtainBlocksFromPeer(p)
|
||||
|
||||
|
||||
proc onPeerConnected(ctx: SyncContext, peer: Peer) =
|
||||
trace "New candidate for sync", peer
|
||||
try:
|
||||
let f = ctx.startSyncWithPeer(peer)
|
||||
f.callback = proc(data: pointer) {.gcsafe.} =
|
||||
if f.failed:
|
||||
if f.error of TransportError:
|
||||
debug "Transport got closed during startSyncWithPeer"
|
||||
else:
|
||||
error "startSyncWithPeer failed", msg = f.readError.msg, peer
|
||||
except TransportError:
|
||||
debug "Transport got closed during startSyncWithPeer"
|
||||
except CatchableError as e:
|
||||
debug "Exception in startSyncWithPeer()", exc = e.name, err = e.msg
|
||||
|
||||
|
||||
proc onPeerDisconnected(ctx: SyncContext, p: Peer) =
|
||||
trace "peer disconnected ", peer = p
|
||||
ctx.trustedPeers.excl(p)
|
||||
|
||||
proc startSync(ctx: SyncContext) =
|
||||
var po: PeerObserver
|
||||
po.onPeerConnected = proc(p: Peer) {.gcsafe.} =
|
||||
ctx.onPeerConnected(p)
|
||||
|
||||
po.onPeerDisconnected = proc(p: Peer) {.gcsafe.} =
|
||||
ctx.onPeerDisconnected(p)
|
||||
|
||||
po.setProtocol eth
|
||||
ctx.peerPool.addObserver(ctx, po)
|
||||
|
||||
proc findBestPeer(node: EthereumNode): (Peer, DifficultyInt) =
|
||||
var
|
||||
bestBlockDifficulty: DifficultyInt = 0.stuint(256)
|
||||
bestPeer: Peer = nil
|
||||
|
||||
for peer in node.peers(eth):
|
||||
let peerEthState = peer.state(eth)
|
||||
if peerEthState.initialized:
|
||||
if peerEthState.bestDifficulty > bestBlockDifficulty:
|
||||
bestBlockDifficulty = peerEthState.bestDifficulty
|
||||
bestPeer = peer
|
||||
|
||||
result = (bestPeer, bestBlockDifficulty)
|
||||
|
||||
proc fastBlockchainSync*(node: EthereumNode): Future[SyncStatus] {.async.} =
|
||||
## Code for the fast blockchain sync procedure:
|
||||
## https://github.com/ethereum/wiki/wiki/Parallel-Block-Downloads
|
||||
## https://github.com/ethereum/go-ethereum/pull/1889
|
||||
# TODO: This needs a better interface. Consider removing this function and
|
||||
# exposing SyncCtx
|
||||
var syncCtx = newSyncContext(node.chain, node.peerPool)
|
||||
syncCtx.startSync()
|
||||
|
Loading…
x
Reference in New Issue
Block a user