initial beacon sync skeleton implementation
This commit is contained in:
parent
513f44d7d4
commit
86f6d284aa
|
@ -15,6 +15,10 @@ type
|
|||
terminalHash
|
||||
safeHash
|
||||
finalizedHash
|
||||
skeletonProgress
|
||||
skeletonBlockHashToNumber
|
||||
skeletonBlock
|
||||
skeletonTransaction
|
||||
|
||||
DbKey* = object
|
||||
# The first byte stores the key type. The rest are key-specific values
|
||||
|
@ -79,6 +83,27 @@ proc finalizedHashKey*(): DbKey {.inline.} =
|
|||
result.data[0] = byte ord(finalizedHash)
|
||||
result.dataEndPos = uint8 1
|
||||
|
||||
proc skeletonProgressKey*(): DbKey {.inline.} =
|
||||
result.data[0] = byte ord(skeletonProgress)
|
||||
result.dataEndPos = 1
|
||||
|
||||
proc skeletonBlockHashToNumberKey*(h: Hash256): DbKey {.inline.} =
|
||||
result.data[0] = byte ord(skeletonBlockHashToNumber)
|
||||
result.data[1 .. 32] = h.data
|
||||
result.dataEndPos = uint8 32
|
||||
|
||||
proc skeletonBlockKey*(u: BlockNumber): DbKey {.inline.} =
|
||||
result.data[0] = byte ord(skeletonBlock)
|
||||
doAssert sizeof(u) <= 32
|
||||
copyMem(addr result.data[1], unsafeAddr u, sizeof(u))
|
||||
result.dataEndPos = uint8 sizeof(u)
|
||||
|
||||
proc skeletonTransactionKey*(u: BlockNumber): DbKey {.inline.} =
|
||||
result.data[0] = byte ord(skeletonTransaction)
|
||||
doAssert sizeof(u) <= 32
|
||||
copyMem(addr result.data[1], unsafeAddr u, sizeof(u))
|
||||
result.dataEndPos = uint8 sizeof(u)
|
||||
|
||||
template toOpenArray*(k: DbKey): openArray[byte] =
|
||||
k.data.toOpenArray(0, int(k.dataEndPos))
|
||||
|
||||
|
|
|
@ -0,0 +1,588 @@
|
|||
import
|
||||
std/[times],
|
||||
eth/[common, rlp],
|
||||
eth/trie/db,
|
||||
#stew/results,
|
||||
stint, chronicles,
|
||||
../db/[db_chain, storage_types],
|
||||
".."/[utils, chain_config],
|
||||
../p2p/chain
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
logScope:
|
||||
topics = "skeleton"
|
||||
|
||||
type
|
||||
# Contiguous header chain segment that is backed by the database,
|
||||
# but may not be linked to the live chain. The skeleton downloader may produce
|
||||
# a new one of these every time it is restarted until the subchain grows large
|
||||
# enough to connect with a previous subchain.
|
||||
SkeletonSubchain* = object
|
||||
head*: UInt256 # Block number of the newest header in the subchain
|
||||
tail*: UInt256 # Block number of the oldest header in the subchain
|
||||
next*: Hash256 # Block hash of the next oldest header in the subchain
|
||||
|
||||
# Database entry to allow suspending and resuming a chain
|
||||
# sync. As the skeleton header chain is downloaded backwards, restarts can and
|
||||
# will produce temporarily disjoint subchains. There is no way to restart a
|
||||
# suspended skeleton sync without prior knowledge of all prior suspension points.
|
||||
SkeletonProgress = seq[SkeletonSubchain]
|
||||
|
||||
# The Skeleton chain class helps support beacon sync by accepting head blocks
|
||||
# while backfill syncing the rest of the chain.
|
||||
SkeletonRef* = ref object
|
||||
subchains: SkeletonProgress
|
||||
started : Time # Timestamp when the skeleton syncer was created
|
||||
logged : Time # Timestamp when progress was last logged to user
|
||||
pulled : int64 # Number of headers downloaded in this run
|
||||
filling : bool # Whether we are actively filling the canonical chain
|
||||
chainTTD : DifficultyInt
|
||||
chainDB : BaseChainDB
|
||||
chain : Chain
|
||||
|
||||
# config
|
||||
skeletonFillCanonicalBackStep: int
|
||||
skeletonSubchainMergeMinimum: int
|
||||
syncTargetHeight: int
|
||||
ignoreTxs: bool
|
||||
|
||||
SkeletonError* = object of CatchableError
|
||||
|
||||
# SyncReorged is an internal helper error to signal that the head chain of
|
||||
# the current sync cycle was (partially) reorged, thus the skeleton syncer
|
||||
# should abort and restart with the new state.
|
||||
ErrSyncReorged* = object of SkeletonError
|
||||
|
||||
# ReorgDenied is returned if an attempt is made to extend the beacon chain
|
||||
# with a new header, but it does not link up to the existing sync.
|
||||
ErrReorgDenied* = object of SkeletonError
|
||||
|
||||
# SyncMerged is an internal helper error to signal that the current sync
|
||||
# cycle merged with a previously aborted subchain, thus the skeleton syncer
|
||||
# should abort and restart with the new state.
|
||||
ErrSyncMerged* = object of SkeletonError
|
||||
|
||||
ErrHeaderNotFound* = object of SkeletonError
|
||||
|
||||
const
|
||||
# How often to log sync status (in ms)
|
||||
STATUS_LOG_INTERVAL = initDuration(microseconds = 8000)
|
||||
|
||||
proc new*(_: type SkeletonRef, chain: Chain): SkeletonRef =
|
||||
new(result)
|
||||
result.chain = chain
|
||||
result.chainDB = chain.db
|
||||
result.started = getTime()
|
||||
result.logged = getTime()
|
||||
result.pulled = 0'i64
|
||||
result.filling = false
|
||||
result.chainTTD = chain.db.ttd()
|
||||
result.skeletonFillCanonicalBackStep = 100
|
||||
result.skeletonSubchainMergeMinimum = 1000
|
||||
#result.syncTargetHeight = ?
|
||||
result.ignoreTxs = false
|
||||
|
||||
template get(sk: SkeletonRef, key: untyped): untyped =
|
||||
get(sk.chainDB.db, key.toOpenArray)
|
||||
|
||||
template put(sk: SkeletonRef, key, val: untyped): untyped =
|
||||
put(sk.chainDB.db, key.toOpenArray, val)
|
||||
|
||||
template del(sk: SkeletonRef, key: untyped): untyped =
|
||||
del(sk.chainDB.db, key.toOpenArray)
|
||||
|
||||
template toFork(sk: SkeletonRef, number: untyped): untyped =
|
||||
toFork(sk.chainDB.config, number)
|
||||
|
||||
template blockHeight(sk: SkeletonRef): untyped =
|
||||
sk.chainDB.currentBlock
|
||||
|
||||
# Reads the SkeletonProgress from db
|
||||
proc readSyncProgress(sk: SkeletonRef) {.raises: [Defect, RlpError].} =
|
||||
let rawProgress = sk.get(skeletonProgressKey())
|
||||
if rawProgress.len == 0: return
|
||||
sk.subchains = rlp.decode(rawProgress, SkeletonProgress)
|
||||
|
||||
# Writes the SkeletonProgress to db
|
||||
proc writeSyncProgress(sk: SkeletonRef) =
|
||||
for x in sk.subchains:
|
||||
debug "Writing sync progress subchains",
|
||||
head=x.head, tail=x.tail, next=short(x.next)
|
||||
|
||||
let encodedProgress = rlp.encode(sk.subchains)
|
||||
sk.put(skeletonProgressKey(), encodedProgress)
|
||||
|
||||
proc open*(sk: SkeletonRef){.raises: [Defect, RlpError].} =
|
||||
sk.readSyncProgress()
|
||||
sk.started = getTime()
|
||||
|
||||
# Gets a block from the skeleton or canonical db by number.
|
||||
proc getHeader*(sk: SkeletonRef,
|
||||
number: BlockNumber,
|
||||
output: var BlockHeader,
|
||||
onlySkeleton: bool = false): bool {.raises: [Defect, RlpError].} =
|
||||
let rawHeader = sk.get(skeletonBlockKey(number))
|
||||
if rawHeader.len != 0:
|
||||
output = rlp.decode(rawHeader, BlockHeader)
|
||||
return true
|
||||
else:
|
||||
if onlySkeleton: return false
|
||||
# As a fallback, try to get the block from the canonical chain in case it is available there
|
||||
return sk.chainDB.getBlockHeader(number, output)
|
||||
|
||||
# Gets a skeleton block from the db by hash
|
||||
proc getHeaderByHash*(sk: SkeletonRef,
|
||||
hash: Hash256,
|
||||
output: var BlockHeader): bool {.raises: [Defect, RlpError].} =
|
||||
let rawNumber = sk.get(skeletonBlockHashToNumberKey(hash))
|
||||
if rawNumber.len == 0:
|
||||
return false
|
||||
return sk.getHeader(rlp.decode(rawNumber, BlockNumber), output)
|
||||
|
||||
# Deletes a skeleton block from the db by number
|
||||
proc deleteBlock(sk: SkeletonRef, header: BlockHeader) =
|
||||
sk.del(skeletonBlockKey(header.blockNumber))
|
||||
sk.del(skeletonBlockHashToNumberKey(header.blockHash))
|
||||
sk.del(skeletonTransactionKey(header.blockNumber))
|
||||
|
||||
# Writes a skeeton block to the db by number
|
||||
proc putHeader*(sk: SkeletonRef, header: BlockHeader) =
|
||||
let encodedHeader = rlp.encode(header)
|
||||
sk.put(skeletonBlockKey(header.blockNumber), encodedHeader)
|
||||
sk.put(
|
||||
skeletonBlockHashToNumberKey(header.blockHash),
|
||||
rlp.encode(header.blockNumber)
|
||||
)
|
||||
|
||||
proc putBlock(sk: SkeletonRef, header: BlockHeader, txs: openArray[Transaction]) =
|
||||
let encodedHeader = rlp.encode(header)
|
||||
sk.put(skeletonBlockKey(header.blockNumber), encodedHeader)
|
||||
sk.put(
|
||||
skeletonBlockHashToNumberKey(header.blockHash),
|
||||
rlp.encode(header.blockNumber)
|
||||
)
|
||||
sk.put(skeletonTransactionKey(header.blockNumber), rlp.encode(txs))
|
||||
|
||||
proc getTxs(sk: SkeletonRef, number: BlockNumber,
|
||||
output: var seq[Transaction]){.raises: [
|
||||
Defect, CatchableError].} =
|
||||
let rawTxs = sk.get(skeletonTransactionKey(number))
|
||||
if rawTxs.len > 0:
|
||||
output = rlp.decode(rawTxs, seq[Transaction])
|
||||
else:
|
||||
raise newException(SkeletonError,
|
||||
"getTxs: no transactions from block number " & $number)
|
||||
|
||||
# Bounds returns the current head and tail tracked by the skeleton syncer.
|
||||
proc bounds*(sk: SkeletonRef): SkeletonSubchain =
|
||||
sk.subchains[0]
|
||||
|
||||
# Returns true if the skeleton chain is linked to canonical
|
||||
proc isLinked*(sk: SkeletonRef): bool {.raises: [Defect, CatchableError].} =
|
||||
if sk.subchains.len == 0: return false
|
||||
let sc = sk.bounds()
|
||||
|
||||
# make check for genesis if tail is 1?
|
||||
let head = sk.blockHeight
|
||||
if sc.tail > head + 1.toBlockNumber:
|
||||
return false
|
||||
|
||||
var nextHeader: BlockHeader
|
||||
let number = sc.tail - 1.toBlockNumber
|
||||
if sk.getHeader(number, nextHeader):
|
||||
return sc.next == nextHeader.blockHash
|
||||
else:
|
||||
raise newException(ErrHeaderNotFound, "isLinked: No header with number=" & $number)
|
||||
|
||||
proc trySubChainsMerge(sk: SkeletonRef): bool {.raises: [Defect, RlpError].} =
|
||||
var
|
||||
merged = false
|
||||
edited = false
|
||||
head: BlockHeader
|
||||
|
||||
let subchainMergeMinimum = sk.skeletonSubchainMergeMinimum.u256
|
||||
# If the subchain extended into the next subchain, we need to handle
|
||||
# the overlap. Since there could be many overlaps, do this in a loop.
|
||||
while sk.subchains.len > 1 and
|
||||
sk.subchains[1].head >= sk.subchains[0].tail:
|
||||
# Extract some stats from the second subchain
|
||||
let sc = sk.subchains[1]
|
||||
|
||||
# Since we just overwrote part of the next subchain, we need to trim
|
||||
# its head independent of matching or mismatching content
|
||||
if sc.tail >= sk.subchains[0].tail:
|
||||
# Fully overwritten, get rid of the subchain as a whole
|
||||
debug "Previous subchain fully overwritten",
|
||||
head=sc.head, tail=sc.tail, next=short(sc.next)
|
||||
sk.subchains.delete(1)
|
||||
edited = true
|
||||
continue
|
||||
else:
|
||||
# Partially overwritten, trim the head to the overwritten size
|
||||
debug "Previous subchain partially overwritten",
|
||||
head=sc.head, tail=sc.tail, next=short(sc.next)
|
||||
sk.subchains[1].head = sk.subchains[0].tail - 1.toBlockNumber
|
||||
edited = true
|
||||
|
||||
# If the old subchain is an extension of the new one, merge the two
|
||||
# and let the skeleton syncer restart (to clean internal state)
|
||||
if sk.getHeader(sk.subchains[1].head, head) and
|
||||
head.blockHash == sk.subchains[0].next:
|
||||
# only merge is we can integrate a big progress, as each merge leads
|
||||
# to disruption of the block fetcher to start a fresh
|
||||
if (sc.head - sc.tail) > subchainMergeMinimum:
|
||||
debug "Previous subchain merged head",
|
||||
head=sc.head, tail=sc.tail, next=short(sc.next)
|
||||
sk.subchains[0].tail = sc.tail
|
||||
sk.subchains[0].next = sc.next
|
||||
sk.subchains.delete(1)
|
||||
# If subchains were merged, all further available headers
|
||||
# are invalid since we skipped ahead.
|
||||
merged = true
|
||||
else:
|
||||
debug "Subchain ignored for merge",
|
||||
head=sc.head, tail=sc.tail, next=short(sc.next)
|
||||
sk.subchains.delete(1)
|
||||
edited = true
|
||||
|
||||
if edited: sk.writeSyncProgress()
|
||||
return merged
|
||||
|
||||
proc backStep(sk: SkeletonRef) {.raises: [Defect, RlpError].}=
|
||||
if sk.skeletonFillCanonicalBackStep <= 0:
|
||||
return
|
||||
|
||||
let sc = sk.bounds()
|
||||
var
|
||||
hasTail: bool
|
||||
tailHeader: BlockHeader
|
||||
newTail = sc.tail
|
||||
|
||||
while true:
|
||||
newTail = newTail + sk.skeletonFillCanonicalBackStep.u256
|
||||
hasTail = sk.getHeader(newTail, tailHeader, true)
|
||||
if hasTail or newTail > sc.head: break
|
||||
|
||||
if newTail > sc.head:
|
||||
newTail = sc.head
|
||||
hasTail = sk.getHeader(newTail, tailHeader, true)
|
||||
|
||||
if hasTail and newTail > 0.toBlockNumber:
|
||||
trace "Backstepped skeleton", head=sc.head, tail=newTail
|
||||
sk.subchains[0].tail = newTail
|
||||
sk.subchains[0].next = tailHeader.parentHash
|
||||
sk.writeSyncProgress()
|
||||
else:
|
||||
# we need a new head, emptying the subchains
|
||||
sk.subchains = @[]
|
||||
sk.writeSyncProgress()
|
||||
warn "Couldn't backStep subchain 0, dropping subchains for new head signal"
|
||||
|
||||
# processNewHead does the internal shuffling for a new head marker and either
|
||||
# accepts and integrates it into the skeleton or requests a reorg. Upon reorg,
|
||||
# the syncer will tear itself down and restart with a fresh head. It is simpler
|
||||
# to reconstruct the sync state than to mutate it.
|
||||
#
|
||||
# @returns true if the chain was reorged
|
||||
proc processNewHead(sk: SkeletonRef,
|
||||
head: BlockHeader,
|
||||
force = false): bool {.raises: [Defect, RlpError].} =
|
||||
# If the header cannot be inserted without interruption, return an error for
|
||||
# the outer loop to tear down the skeleton sync and restart it
|
||||
let number = head.blockNumber
|
||||
|
||||
if sk.subchains.len == 0:
|
||||
warn "Skeleton reorged and cleaned, no current subchain", newHead=number
|
||||
return true
|
||||
|
||||
let lastchain = sk.subchains[0]
|
||||
if lastchain.tail >= number:
|
||||
# If the chain is down to a single beacon header, and it is re-announced
|
||||
# once more, ignore it instead of tearing down sync for a noop.
|
||||
if lastchain.head == lastchain.tail:
|
||||
var header: BlockHeader
|
||||
let hasHeader = sk.getHeader(number, header)
|
||||
# TODO: what should we do when hasHeader == false?
|
||||
if hasHeader and header.blockHash == head.blockHash:
|
||||
return false
|
||||
|
||||
# Not a noop / double head announce, abort with a reorg
|
||||
if force:
|
||||
warn "Beacon chain reorged",
|
||||
tail=lastchain.tail, head=lastchain.head, newHead=number
|
||||
return true
|
||||
|
||||
if lastchain.head + 1.toBlockNumber < number:
|
||||
if force:
|
||||
warn "Beacon chain gapped",
|
||||
head=lastchain.head, newHead=number
|
||||
return true
|
||||
|
||||
var parent: BlockHeader
|
||||
let hasParent = sk.getHeader(number - 1.toBlockNumber, parent)
|
||||
if hasParent and parent.blockHash != head.parentHash:
|
||||
if force:
|
||||
warn "Beacon chain forked",
|
||||
ancestor=parent.blockNumber, hash=short(parent.blockHash),
|
||||
want=short(head.parentHash)
|
||||
return true
|
||||
|
||||
# Update the database with the new sync stats and insert the new
|
||||
# head header. We won't delete any trimmed skeleton headers since
|
||||
# those will be outside the index space of the many subchains and
|
||||
# the database space will be reclaimed eventually when processing
|
||||
# blocks above the current head.
|
||||
sk.putHeader(head)
|
||||
sk.subchains[0].head = number
|
||||
sk.writeSyncProgress()
|
||||
return false
|
||||
|
||||
# Inserts skeleton blocks into canonical chain and runs execution.
|
||||
proc fillCanonicalChain*(sk: SkeletonRef) {.raises: [Defect, CatchableError].} =
|
||||
if sk.filling: return
|
||||
sk.filling = true
|
||||
|
||||
var canonicalHead = sk.blockHeight
|
||||
let start = canonicalHead
|
||||
let sc = sk.bounds()
|
||||
debug "Starting canonical chain fill",
|
||||
canonicalHead=canonicalHead, subchainHead=sc.head
|
||||
|
||||
var fillLogIndex = 0
|
||||
while sk.filling and canonicalHead < sc.head:
|
||||
# Get next block
|
||||
let number = canonicalHead + 1.toBlockNumber
|
||||
var header: BlockHeader
|
||||
let hasHeader = sk.getHeader(number, header)
|
||||
if not hasHeader:
|
||||
# This shouldn't happen, but if it does because of some issues, we should back step
|
||||
# and fetch again
|
||||
debug "fillCanonicalChain block number not found, backStepping",
|
||||
number=number
|
||||
sk.backStep()
|
||||
break
|
||||
|
||||
# Insert into chain
|
||||
var body: BlockBody
|
||||
if not sk.ignoreTxs:
|
||||
sk.getTxs(header.blockNumber, body.transactions)
|
||||
let res = sk.chain.persistBlocks([header], [body])
|
||||
if res != ValidationResult.OK:
|
||||
let hardFork = sk.toFork(number)
|
||||
error "Failed to put block from skeleton chain to canonical",
|
||||
number=number,
|
||||
fork=hardFork,
|
||||
hash=short(header.blockHash)
|
||||
|
||||
sk.backStep()
|
||||
break
|
||||
|
||||
# Delete skeleton block to clean up as we go
|
||||
sk.deleteBlock(header)
|
||||
canonicalHead += 1.toBlockNumber
|
||||
inc fillLogIndex # num block inserted
|
||||
if fillLogIndex > 50:
|
||||
trace "Skeleton canonical chain fill status",
|
||||
canonicalHead=canonicalHead,
|
||||
chainHead=sk.blockHeight,
|
||||
subchainHead=sc.head
|
||||
fillLogIndex = 0
|
||||
|
||||
sk.filling = false
|
||||
trace "Successfully put blocks from skeleton chain to canonical target",
|
||||
start=start, stop=canonicalHead, skeletonHead=sc.head,
|
||||
syncTargetHeight=sk.syncTargetHeight
|
||||
|
||||
# Announce and integrate a new head.
|
||||
# throws if the new head causes a reorg.
|
||||
proc setHead*(sk: SkeletonRef, head: BlockHeader,
|
||||
force = false) {.raises: [Defect, CatchableError].} =
|
||||
debug "New skeleton head announced",
|
||||
number=head.blockNumber,
|
||||
hash=short(head.blockHash),
|
||||
force=force
|
||||
|
||||
let reorged = sk.processNewHead(head, force)
|
||||
|
||||
# If linked, fill the canonical chain.
|
||||
if force and sk.isLinked():
|
||||
sk.fillCanonicalChain()
|
||||
|
||||
if reorged:
|
||||
if force:
|
||||
raise newException(ErrSyncReorged, "setHead: sync reorg")
|
||||
else:
|
||||
raise newException(ErrReorgDenied, "setHead: reorg denied")
|
||||
|
||||
# Attempts to get the skeleton sync into a consistent state wrt any
|
||||
# past state on disk and the newly requested head to sync to.
|
||||
proc initSync*(sk: SkeletonRef, head: BlockHeader) {.raises: [
|
||||
Defect, CatchableError].} =
|
||||
let number = head.blockNumber
|
||||
|
||||
if sk.subchains.len == 0:
|
||||
# Start a fresh sync with a single subchain represented by the currently sent
|
||||
# chain head.
|
||||
sk.subchains.add(SkeletonSubchain(
|
||||
head: number,
|
||||
tail: number,
|
||||
next: head.parentHash
|
||||
))
|
||||
debug "Created initial skeleton subchain",
|
||||
head=number, tail=number
|
||||
else:
|
||||
# Print some continuation logs
|
||||
for x in sk.subchains:
|
||||
debug "Restarting skeleton subchain",
|
||||
head=x.head, tail=x.tail, next=short(x.next)
|
||||
|
||||
# Create a new subchain for the head (unless the last can be extended),
|
||||
# trimming anything it would overwrite
|
||||
let headchain = SkeletonSubchain(
|
||||
head: number,
|
||||
tail: number,
|
||||
next: head.parentHash
|
||||
)
|
||||
|
||||
while sk.subchains.len > 0:
|
||||
# If the last chain is above the new head, delete altogether
|
||||
let lastchain = addr sk.subchains[0]
|
||||
if lastchain.tail >= headchain.tail:
|
||||
debug "Dropping skeleton subchain",
|
||||
head=lastchain.head, tail=lastchain.tail
|
||||
sk.subchains.delete(0) # remove `lastchain`
|
||||
continue
|
||||
# Otherwise truncate the last chain if needed and abort trimming
|
||||
if lastchain.head >= headchain.tail:
|
||||
debug "Trimming skeleton subchain",
|
||||
oldHead=lastchain.head, newHead=headchain.tail - 1.toBlockNumber,
|
||||
tail=lastchain.tail
|
||||
lastchain.head = headchain.tail - 1.toBlockNumber
|
||||
break
|
||||
|
||||
# If the last subchain can be extended, we're lucky. Otherwise create
|
||||
# a new subchain sync task.
|
||||
var extended = false
|
||||
if sk.subchains.len > 0:
|
||||
let lastchain = addr sk.subchains[0]
|
||||
if lastchain.head == headchain.tail - 1.toBlockNumber:
|
||||
var header: BlockHeader
|
||||
let lasthead = sk.getHeader(lastchain.head, header)
|
||||
if lasthead and header.blockHash == head.parentHash:
|
||||
debug "Extended skeleton subchain with new",
|
||||
head=headchain.tail, tail=lastchain.tail
|
||||
lastchain.head = headchain.tail
|
||||
extended = true
|
||||
if not extended:
|
||||
debug "Created new skeleton subchain",
|
||||
head=number, tail=number
|
||||
sk.subchains.insert(headchain)
|
||||
|
||||
sk.putHeader(head)
|
||||
sk.writeSyncProgress()
|
||||
|
||||
# If the sync is finished, start filling the canonical chain.
|
||||
if sk.isLinked():
|
||||
sk.fillCanonicalChain()
|
||||
|
||||
# Writes skeleton blocks to the db by number
|
||||
# @returns number of blocks saved
|
||||
proc putBlocks*(sk: SkeletonRef, headers: openArray[BlockHeader]): int {.raises: [
|
||||
Defect, CatchableError].}=
|
||||
var merged = false
|
||||
|
||||
if headers.len > 0:
|
||||
let first {.used.} = headers[0]
|
||||
let last {.used.} = headers[^1]
|
||||
let sc {.used.} = if sk.subchains.len > 0:
|
||||
sk.subchains[0]
|
||||
else:
|
||||
SkeletonSubchain()
|
||||
debug "Skeleton putBlocks start",
|
||||
count = headers.len,
|
||||
first = first.blockNumber,
|
||||
hash = short(first.blockHash),
|
||||
fork = sk.toFork(first.blockNumber),
|
||||
last = last.blockNumber,
|
||||
hash = short(last.blockHash),
|
||||
fork = sk.toFork(last.blockNumber),
|
||||
head = sc.head,
|
||||
tail = sc.tail,
|
||||
next = short(sc.next)
|
||||
|
||||
for header in headers:
|
||||
let number = header.blockNumber
|
||||
if number >= sk.subchains[0].tail:
|
||||
# These blocks should already be in skeleton, and might be coming in
|
||||
# from previous events especially if the previous subchains merge
|
||||
continue
|
||||
|
||||
# Extend subchain or create new segment if necessary
|
||||
if sk.subchains[0].next == header.blockHash:
|
||||
sk.putHeader(header)
|
||||
sk.pulled += 1'i64
|
||||
sk.subchains[0].tail -= 1.toBlockNumber
|
||||
sk.subchains[0].next = header.parentHash
|
||||
else:
|
||||
# Critical error, we expect new incoming blocks to extend the canonical
|
||||
# subchain which is the [0]'th
|
||||
let fork = sk.toFork(number)
|
||||
warn "Blocks don't extend canonical subchain",
|
||||
head=sk.subchains[0].head,
|
||||
tail=sk.subchains[0].tail,
|
||||
next=short(sk.subchains[0].next),
|
||||
number=number,
|
||||
hash=short(header.blockHash),
|
||||
fork=fork
|
||||
raise newException(SkeletonError, "Blocks don't extend canonical subchain")
|
||||
|
||||
merged = sk.trySubChainsMerge()
|
||||
# If its merged, we need to break as the new tail could be quite ahead
|
||||
# so we need to clear out and run the reverse block fetcher again
|
||||
if merged: break
|
||||
|
||||
sk.writeSyncProgress()
|
||||
|
||||
# Print a progress report making the UX a bit nicer
|
||||
if getTime() - sk.logged > STATUS_LOG_INTERVAL:
|
||||
var left = sk.bounds().tail - 1.toBlockNumber - sk.blockHeight
|
||||
if sk.isLinked(): left = 0.toBlockNumber
|
||||
if left > 0.toBlockNumber:
|
||||
sk.logged = getTime()
|
||||
if sk.pulled == 0:
|
||||
info "Beacon sync starting", left=left
|
||||
else:
|
||||
let sinceStarted = getTime() - sk.started
|
||||
let eta = (sinceStarted div sk.pulled) * left.truncate(int64)
|
||||
info "Syncing beacon headers",
|
||||
downloaded=sk.pulled, left=left, eta=eta
|
||||
|
||||
# If the sync is finished, start filling the canonical chain.
|
||||
if sk.isLinked():
|
||||
sk.fillCanonicalChain()
|
||||
|
||||
if merged:
|
||||
raise newException(ErrSyncMerged, "putBlocks: sync merged")
|
||||
|
||||
return headers.len
|
||||
|
||||
proc `subchains=`*(sk: SkeletonRef, subchains: openArray[SkeletonSubchain]) =
|
||||
sk.subchains = @subchains
|
||||
|
||||
proc len*(sk: SkeletonRef): int =
|
||||
sk.subchains.len
|
||||
|
||||
iterator items*(sk: SkeletonRef): SkeletonSubChain =
|
||||
for x in sk.subchains:
|
||||
yield x
|
||||
|
||||
iterator pairs*(sk: SkeletonRef): tuple[key: int, val: SkeletonSubChain] =
|
||||
for i, x in sk.subchains:
|
||||
yield (i, x)
|
||||
|
||||
proc ignoreTxs*(sk: SkeletonRef): bool =
|
||||
sk.ignoreTxs
|
||||
|
||||
proc `ignoreTxs=`*(sk: SkeletonRef, val: bool) =
|
||||
sk.ignoreTxs = val
|
|
@ -1,5 +1,6 @@
|
|||
import
|
||||
eth/[trie, rlp, common/eth_types_rlp, trie/db]
|
||||
eth/[trie, rlp, common/eth_types_rlp, trie/db],
|
||||
stew/byteutils
|
||||
|
||||
export eth_types_rlp
|
||||
|
||||
|
@ -50,3 +51,9 @@ proc crc32*(crc: uint32, buf: openArray[byte]): uint32 =
|
|||
crcu32 = (crcu32 shr 4) xor kcrc32[int((crcu32 and 0xF) xor (uint32(b) shr 4'u32))]
|
||||
|
||||
result = not crcu32
|
||||
|
||||
proc short*(h: Hash256): string =
|
||||
var bytes: array[6, byte]
|
||||
bytes[0..2] = h.data[0..2]
|
||||
bytes[^3..^1] = h.data[^3..^1]
|
||||
bytes.toHex
|
||||
|
|
Loading…
Reference in New Issue