nimbus-eth1/nimbus/sync/skeleton.nim
2022-12-02 13:51:42 +07:00

589 lines
21 KiB
Nim

import
std/[times],
eth/[common, rlp],
eth/trie/db,
#stew/results,
stint, chronicles,
../db/[db_chain, storage_types],
".."/[utils, chain_config],
../p2p/chain
{.push raises: [Defect].}
logScope:
topics = "skeleton"
type
# Contiguous header chain segment that is backed by the database,
# but may not be linked to the live chain. The skeleton downloader may produce
# a new one of these every time it is restarted until the subchain grows large
# enough to connect with a previous subchain.
SkeletonSubchain* = object
head*: UInt256 # Block number of the newest header in the subchain
tail*: UInt256 # Block number of the oldest header in the subchain
next*: Hash256 # Block hash of the next oldest header in the subchain
# Database entry to allow suspending and resuming a chain
# sync. As the skeleton header chain is downloaded backwards, restarts can and
# will produce temporarily disjoint subchains. There is no way to restart a
# suspended skeleton sync without prior knowledge of all prior suspension points.
SkeletonProgress = seq[SkeletonSubchain]
# The Skeleton chain class helps support beacon sync by accepting head blocks
# while backfill syncing the rest of the chain.
SkeletonRef* = ref object
subchains: SkeletonProgress
started : Time # Timestamp when the skeleton syncer was created
logged : Time # Timestamp when progress was last logged to user
pulled : int64 # Number of headers downloaded in this run
filling : bool # Whether we are actively filling the canonical chain
chainTTD : DifficultyInt
chainDB : ChainDBRef
chain : Chain
# config
skeletonFillCanonicalBackStep: int
skeletonSubchainMergeMinimum: int
syncTargetHeight: int
ignoreTxs: bool
SkeletonError* = object of CatchableError
# SyncReorged is an internal helper error to signal that the head chain of
# the current sync cycle was (partially) reorged, thus the skeleton syncer
# should abort and restart with the new state.
ErrSyncReorged* = object of SkeletonError
# ReorgDenied is returned if an attempt is made to extend the beacon chain
# with a new header, but it does not link up to the existing sync.
ErrReorgDenied* = object of SkeletonError
# SyncMerged is an internal helper error to signal that the current sync
# cycle merged with a previously aborted subchain, thus the skeleton syncer
# should abort and restart with the new state.
ErrSyncMerged* = object of SkeletonError
ErrHeaderNotFound* = object of SkeletonError
const
# How often to log sync status (in ms)
STATUS_LOG_INTERVAL = initDuration(microseconds = 8000)
proc new*(_: type SkeletonRef, chain: Chain): SkeletonRef =
new(result)
result.chain = chain
result.chainDB = chain.db
result.started = getTime()
result.logged = getTime()
result.pulled = 0'i64
result.filling = false
result.chainTTD = chain.db.ttd()
result.skeletonFillCanonicalBackStep = 100
result.skeletonSubchainMergeMinimum = 1000
#result.syncTargetHeight = ?
result.ignoreTxs = false
template get(sk: SkeletonRef, key: untyped): untyped =
get(sk.chainDB.db, key.toOpenArray)
template put(sk: SkeletonRef, key, val: untyped): untyped =
put(sk.chainDB.db, key.toOpenArray, val)
template del(sk: SkeletonRef, key: untyped): untyped =
del(sk.chainDB.db, key.toOpenArray)
template toFork(sk: SkeletonRef, number: untyped): untyped =
toFork(sk.chainDB.config, number)
template blockHeight(sk: SkeletonRef): untyped =
sk.chainDB.currentBlock
# Reads the SkeletonProgress from db
proc readSyncProgress(sk: SkeletonRef) {.raises: [Defect, RlpError].} =
let rawProgress = sk.get(skeletonProgressKey())
if rawProgress.len == 0: return
sk.subchains = rlp.decode(rawProgress, SkeletonProgress)
# Writes the SkeletonProgress to db
proc writeSyncProgress(sk: SkeletonRef) =
for x in sk.subchains:
debug "Writing sync progress subchains",
head=x.head, tail=x.tail, next=short(x.next)
let encodedProgress = rlp.encode(sk.subchains)
sk.put(skeletonProgressKey(), encodedProgress)
proc open*(sk: SkeletonRef){.raises: [Defect, RlpError].} =
sk.readSyncProgress()
sk.started = getTime()
# Gets a block from the skeleton or canonical db by number.
proc getHeader*(sk: SkeletonRef,
number: BlockNumber,
output: var BlockHeader,
onlySkeleton: bool = false): bool {.raises: [Defect, RlpError].} =
let rawHeader = sk.get(skeletonBlockKey(number))
if rawHeader.len != 0:
output = rlp.decode(rawHeader, BlockHeader)
return true
else:
if onlySkeleton: return false
# As a fallback, try to get the block from the canonical chain in case it is available there
return sk.chainDB.getBlockHeader(number, output)
# Gets a skeleton block from the db by hash
proc getHeaderByHash*(sk: SkeletonRef,
hash: Hash256,
output: var BlockHeader): bool {.raises: [Defect, RlpError].} =
let rawNumber = sk.get(skeletonBlockHashToNumberKey(hash))
if rawNumber.len == 0:
return false
return sk.getHeader(rlp.decode(rawNumber, BlockNumber), output)
# Deletes a skeleton block from the db by number
proc deleteBlock(sk: SkeletonRef, header: BlockHeader) =
sk.del(skeletonBlockKey(header.blockNumber))
sk.del(skeletonBlockHashToNumberKey(header.blockHash))
sk.del(skeletonTransactionKey(header.blockNumber))
# Writes a skeeton block to the db by number
proc putHeader*(sk: SkeletonRef, header: BlockHeader) =
let encodedHeader = rlp.encode(header)
sk.put(skeletonBlockKey(header.blockNumber), encodedHeader)
sk.put(
skeletonBlockHashToNumberKey(header.blockHash),
rlp.encode(header.blockNumber)
)
proc putBlock(sk: SkeletonRef, header: BlockHeader, txs: openArray[Transaction]) =
let encodedHeader = rlp.encode(header)
sk.put(skeletonBlockKey(header.blockNumber), encodedHeader)
sk.put(
skeletonBlockHashToNumberKey(header.blockHash),
rlp.encode(header.blockNumber)
)
sk.put(skeletonTransactionKey(header.blockNumber), rlp.encode(txs))
proc getTxs(sk: SkeletonRef, number: BlockNumber,
output: var seq[Transaction]){.raises: [
Defect, CatchableError].} =
let rawTxs = sk.get(skeletonTransactionKey(number))
if rawTxs.len > 0:
output = rlp.decode(rawTxs, seq[Transaction])
else:
raise newException(SkeletonError,
"getTxs: no transactions from block number " & $number)
# Bounds returns the current head and tail tracked by the skeleton syncer.
proc bounds*(sk: SkeletonRef): SkeletonSubchain =
sk.subchains[0]
# Returns true if the skeleton chain is linked to canonical
proc isLinked*(sk: SkeletonRef): bool {.raises: [Defect, CatchableError].} =
if sk.subchains.len == 0: return false
let sc = sk.bounds()
# make check for genesis if tail is 1?
let head = sk.blockHeight
if sc.tail > head + 1.toBlockNumber:
return false
var nextHeader: BlockHeader
let number = sc.tail - 1.toBlockNumber
if sk.getHeader(number, nextHeader):
return sc.next == nextHeader.blockHash
else:
raise newException(ErrHeaderNotFound, "isLinked: No header with number=" & $number)
proc trySubChainsMerge(sk: SkeletonRef): bool {.raises: [Defect, RlpError].} =
var
merged = false
edited = false
head: BlockHeader
let subchainMergeMinimum = sk.skeletonSubchainMergeMinimum.u256
# If the subchain extended into the next subchain, we need to handle
# the overlap. Since there could be many overlaps, do this in a loop.
while sk.subchains.len > 1 and
sk.subchains[1].head >= sk.subchains[0].tail:
# Extract some stats from the second subchain
let sc = sk.subchains[1]
# Since we just overwrote part of the next subchain, we need to trim
# its head independent of matching or mismatching content
if sc.tail >= sk.subchains[0].tail:
# Fully overwritten, get rid of the subchain as a whole
debug "Previous subchain fully overwritten",
head=sc.head, tail=sc.tail, next=short(sc.next)
sk.subchains.delete(1)
edited = true
continue
else:
# Partially overwritten, trim the head to the overwritten size
debug "Previous subchain partially overwritten",
head=sc.head, tail=sc.tail, next=short(sc.next)
sk.subchains[1].head = sk.subchains[0].tail - 1.toBlockNumber
edited = true
# If the old subchain is an extension of the new one, merge the two
# and let the skeleton syncer restart (to clean internal state)
if sk.getHeader(sk.subchains[1].head, head) and
head.blockHash == sk.subchains[0].next:
# only merge is we can integrate a big progress, as each merge leads
# to disruption of the block fetcher to start a fresh
if (sc.head - sc.tail) > subchainMergeMinimum:
debug "Previous subchain merged head",
head=sc.head, tail=sc.tail, next=short(sc.next)
sk.subchains[0].tail = sc.tail
sk.subchains[0].next = sc.next
sk.subchains.delete(1)
# If subchains were merged, all further available headers
# are invalid since we skipped ahead.
merged = true
else:
debug "Subchain ignored for merge",
head=sc.head, tail=sc.tail, next=short(sc.next)
sk.subchains.delete(1)
edited = true
if edited: sk.writeSyncProgress()
return merged
proc backStep(sk: SkeletonRef) {.raises: [Defect, RlpError].}=
if sk.skeletonFillCanonicalBackStep <= 0:
return
let sc = sk.bounds()
var
hasTail: bool
tailHeader: BlockHeader
newTail = sc.tail
while true:
newTail = newTail + sk.skeletonFillCanonicalBackStep.u256
hasTail = sk.getHeader(newTail, tailHeader, true)
if hasTail or newTail > sc.head: break
if newTail > sc.head:
newTail = sc.head
hasTail = sk.getHeader(newTail, tailHeader, true)
if hasTail and newTail > 0.toBlockNumber:
trace "Backstepped skeleton", head=sc.head, tail=newTail
sk.subchains[0].tail = newTail
sk.subchains[0].next = tailHeader.parentHash
sk.writeSyncProgress()
else:
# we need a new head, emptying the subchains
sk.subchains = @[]
sk.writeSyncProgress()
warn "Couldn't backStep subchain 0, dropping subchains for new head signal"
# processNewHead does the internal shuffling for a new head marker and either
# accepts and integrates it into the skeleton or requests a reorg. Upon reorg,
# the syncer will tear itself down and restart with a fresh head. It is simpler
# to reconstruct the sync state than to mutate it.
#
# @returns true if the chain was reorged
proc processNewHead(sk: SkeletonRef,
head: BlockHeader,
force = false): bool {.raises: [Defect, RlpError].} =
# If the header cannot be inserted without interruption, return an error for
# the outer loop to tear down the skeleton sync and restart it
let number = head.blockNumber
if sk.subchains.len == 0:
warn "Skeleton reorged and cleaned, no current subchain", newHead=number
return true
let lastchain = sk.subchains[0]
if lastchain.tail >= number:
# If the chain is down to a single beacon header, and it is re-announced
# once more, ignore it instead of tearing down sync for a noop.
if lastchain.head == lastchain.tail:
var header: BlockHeader
let hasHeader = sk.getHeader(number, header)
# TODO: what should we do when hasHeader == false?
if hasHeader and header.blockHash == head.blockHash:
return false
# Not a noop / double head announce, abort with a reorg
if force:
warn "Beacon chain reorged",
tail=lastchain.tail, head=lastchain.head, newHead=number
return true
if lastchain.head + 1.toBlockNumber < number:
if force:
warn "Beacon chain gapped",
head=lastchain.head, newHead=number
return true
var parent: BlockHeader
let hasParent = sk.getHeader(number - 1.toBlockNumber, parent)
if hasParent and parent.blockHash != head.parentHash:
if force:
warn "Beacon chain forked",
ancestor=parent.blockNumber, hash=short(parent.blockHash),
want=short(head.parentHash)
return true
# Update the database with the new sync stats and insert the new
# head header. We won't delete any trimmed skeleton headers since
# those will be outside the index space of the many subchains and
# the database space will be reclaimed eventually when processing
# blocks above the current head.
sk.putHeader(head)
sk.subchains[0].head = number
sk.writeSyncProgress()
return false
# Inserts skeleton blocks into canonical chain and runs execution.
proc fillCanonicalChain*(sk: SkeletonRef) {.raises: [Defect, CatchableError].} =
if sk.filling: return
sk.filling = true
var canonicalHead = sk.blockHeight
let start = canonicalHead
let sc = sk.bounds()
debug "Starting canonical chain fill",
canonicalHead=canonicalHead, subchainHead=sc.head
var fillLogIndex = 0
while sk.filling and canonicalHead < sc.head:
# Get next block
let number = canonicalHead + 1.toBlockNumber
var header: BlockHeader
let hasHeader = sk.getHeader(number, header)
if not hasHeader:
# This shouldn't happen, but if it does because of some issues, we should back step
# and fetch again
debug "fillCanonicalChain block number not found, backStepping",
number=number
sk.backStep()
break
# Insert into chain
var body: BlockBody
if not sk.ignoreTxs:
sk.getTxs(header.blockNumber, body.transactions)
let res = sk.chain.persistBlocks([header], [body])
if res != ValidationResult.OK:
let hardFork = sk.toFork(number)
error "Failed to put block from skeleton chain to canonical",
number=number,
fork=hardFork,
hash=short(header.blockHash)
sk.backStep()
break
# Delete skeleton block to clean up as we go
sk.deleteBlock(header)
canonicalHead += 1.toBlockNumber
inc fillLogIndex # num block inserted
if fillLogIndex > 50:
trace "Skeleton canonical chain fill status",
canonicalHead=canonicalHead,
chainHead=sk.blockHeight,
subchainHead=sc.head
fillLogIndex = 0
sk.filling = false
trace "Successfully put blocks from skeleton chain to canonical target",
start=start, stop=canonicalHead, skeletonHead=sc.head,
syncTargetHeight=sk.syncTargetHeight
# Announce and integrate a new head.
# throws if the new head causes a reorg.
proc setHead*(sk: SkeletonRef, head: BlockHeader,
force = false) {.raises: [Defect, CatchableError].} =
debug "New skeleton head announced",
number=head.blockNumber,
hash=short(head.blockHash),
force=force
let reorged = sk.processNewHead(head, force)
# If linked, fill the canonical chain.
if force and sk.isLinked():
sk.fillCanonicalChain()
if reorged:
if force:
raise newException(ErrSyncReorged, "setHead: sync reorg")
else:
raise newException(ErrReorgDenied, "setHead: reorg denied")
# Attempts to get the skeleton sync into a consistent state wrt any
# past state on disk and the newly requested head to sync to.
proc initSync*(sk: SkeletonRef, head: BlockHeader) {.raises: [
Defect, CatchableError].} =
let number = head.blockNumber
if sk.subchains.len == 0:
# Start a fresh sync with a single subchain represented by the currently sent
# chain head.
sk.subchains.add(SkeletonSubchain(
head: number,
tail: number,
next: head.parentHash
))
debug "Created initial skeleton subchain",
head=number, tail=number
else:
# Print some continuation logs
for x in sk.subchains:
debug "Restarting skeleton subchain",
head=x.head, tail=x.tail, next=short(x.next)
# Create a new subchain for the head (unless the last can be extended),
# trimming anything it would overwrite
let headchain = SkeletonSubchain(
head: number,
tail: number,
next: head.parentHash
)
while sk.subchains.len > 0:
# If the last chain is above the new head, delete altogether
let lastchain = addr sk.subchains[0]
if lastchain.tail >= headchain.tail:
debug "Dropping skeleton subchain",
head=lastchain.head, tail=lastchain.tail
sk.subchains.delete(0) # remove `lastchain`
continue
# Otherwise truncate the last chain if needed and abort trimming
if lastchain.head >= headchain.tail:
debug "Trimming skeleton subchain",
oldHead=lastchain.head, newHead=headchain.tail - 1.toBlockNumber,
tail=lastchain.tail
lastchain.head = headchain.tail - 1.toBlockNumber
break
# If the last subchain can be extended, we're lucky. Otherwise create
# a new subchain sync task.
var extended = false
if sk.subchains.len > 0:
let lastchain = addr sk.subchains[0]
if lastchain.head == headchain.tail - 1.toBlockNumber:
var header: BlockHeader
let lasthead = sk.getHeader(lastchain.head, header)
if lasthead and header.blockHash == head.parentHash:
debug "Extended skeleton subchain with new",
head=headchain.tail, tail=lastchain.tail
lastchain.head = headchain.tail
extended = true
if not extended:
debug "Created new skeleton subchain",
head=number, tail=number
sk.subchains.insert(headchain)
sk.putHeader(head)
sk.writeSyncProgress()
# If the sync is finished, start filling the canonical chain.
if sk.isLinked():
sk.fillCanonicalChain()
# Writes skeleton blocks to the db by number
# @returns number of blocks saved
proc putBlocks*(sk: SkeletonRef, headers: openArray[BlockHeader]): int {.raises: [
Defect, CatchableError].}=
var merged = false
if headers.len > 0:
let first {.used.} = headers[0]
let last {.used.} = headers[^1]
let sc {.used.} = if sk.subchains.len > 0:
sk.subchains[0]
else:
SkeletonSubchain()
debug "Skeleton putBlocks start",
count = headers.len,
first = first.blockNumber,
hash = short(first.blockHash),
fork = sk.toFork(first.blockNumber),
last = last.blockNumber,
hash = short(last.blockHash),
fork = sk.toFork(last.blockNumber),
head = sc.head,
tail = sc.tail,
next = short(sc.next)
for header in headers:
let number = header.blockNumber
if number >= sk.subchains[0].tail:
# These blocks should already be in skeleton, and might be coming in
# from previous events especially if the previous subchains merge
continue
# Extend subchain or create new segment if necessary
if sk.subchains[0].next == header.blockHash:
sk.putHeader(header)
sk.pulled += 1'i64
sk.subchains[0].tail -= 1.toBlockNumber
sk.subchains[0].next = header.parentHash
else:
# Critical error, we expect new incoming blocks to extend the canonical
# subchain which is the [0]'th
let fork = sk.toFork(number)
warn "Blocks don't extend canonical subchain",
head=sk.subchains[0].head,
tail=sk.subchains[0].tail,
next=short(sk.subchains[0].next),
number=number,
hash=short(header.blockHash),
fork=fork
raise newException(SkeletonError, "Blocks don't extend canonical subchain")
merged = sk.trySubChainsMerge()
# If its merged, we need to break as the new tail could be quite ahead
# so we need to clear out and run the reverse block fetcher again
if merged: break
sk.writeSyncProgress()
# Print a progress report making the UX a bit nicer
if getTime() - sk.logged > STATUS_LOG_INTERVAL:
var left = sk.bounds().tail - 1.toBlockNumber - sk.blockHeight
if sk.isLinked(): left = 0.toBlockNumber
if left > 0.toBlockNumber:
sk.logged = getTime()
if sk.pulled == 0:
info "Beacon sync starting", left=left
else:
let sinceStarted = getTime() - sk.started
let eta = (sinceStarted div sk.pulled) * left.truncate(int64)
info "Syncing beacon headers",
downloaded=sk.pulled, left=left, eta=eta
# If the sync is finished, start filling the canonical chain.
if sk.isLinked():
sk.fillCanonicalChain()
if merged:
raise newException(ErrSyncMerged, "putBlocks: sync merged")
return headers.len
proc `subchains=`*(sk: SkeletonRef, subchains: openArray[SkeletonSubchain]) =
sk.subchains = @subchains
proc len*(sk: SkeletonRef): int =
sk.subchains.len
iterator items*(sk: SkeletonRef): SkeletonSubChain =
for x in sk.subchains:
yield x
iterator pairs*(sk: SkeletonRef): tuple[key: int, val: SkeletonSubChain] =
for i, x in sk.subchains:
yield (i, x)
proc ignoreTxs*(sk: SkeletonRef): bool =
sk.ignoreTxs
proc `ignoreTxs=`*(sk: SkeletonRef, val: bool) =
sk.ignoreTxs = val