From 86f6d284aa961273088edaf178ddb055f44ec45b Mon Sep 17 00:00:00 2001 From: jangko Date: Thu, 15 Sep 2022 14:50:16 +0700 Subject: [PATCH] initial beacon sync skeleton implementation --- nimbus/db/storage_types.nim | 25 ++ nimbus/sync/skeleton.nim | 588 ++++++++++++++++++++++++++++++++++++ nimbus/utils.nim | 9 +- 3 files changed, 621 insertions(+), 1 deletion(-) create mode 100644 nimbus/sync/skeleton.nim diff --git a/nimbus/db/storage_types.nim b/nimbus/db/storage_types.nim index 06f7a66e1..1484bdbda 100644 --- a/nimbus/db/storage_types.nim +++ b/nimbus/db/storage_types.nim @@ -15,6 +15,10 @@ type terminalHash safeHash finalizedHash + skeletonProgress + skeletonBlockHashToNumber + skeletonBlock + skeletonTransaction DbKey* = object # The first byte stores the key type. The rest are key-specific values @@ -79,6 +83,27 @@ proc finalizedHashKey*(): DbKey {.inline.} = result.data[0] = byte ord(finalizedHash) result.dataEndPos = uint8 1 +proc skeletonProgressKey*(): DbKey {.inline.} = + result.data[0] = byte ord(skeletonProgress) + result.dataEndPos = 1 + +proc skeletonBlockHashToNumberKey*(h: Hash256): DbKey {.inline.} = + result.data[0] = byte ord(skeletonBlockHashToNumber) + result.data[1 .. 32] = h.data + result.dataEndPos = uint8 32 + +proc skeletonBlockKey*(u: BlockNumber): DbKey {.inline.} = + result.data[0] = byte ord(skeletonBlock) + doAssert sizeof(u) <= 32 + copyMem(addr result.data[1], unsafeAddr u, sizeof(u)) + result.dataEndPos = uint8 sizeof(u) + +proc skeletonTransactionKey*(u: BlockNumber): DbKey {.inline.} = + result.data[0] = byte ord(skeletonTransaction) + doAssert sizeof(u) <= 32 + copyMem(addr result.data[1], unsafeAddr u, sizeof(u)) + result.dataEndPos = uint8 sizeof(u) + template toOpenArray*(k: DbKey): openArray[byte] = k.data.toOpenArray(0, int(k.dataEndPos)) diff --git a/nimbus/sync/skeleton.nim b/nimbus/sync/skeleton.nim new file mode 100644 index 000000000..8809ad248 --- /dev/null +++ b/nimbus/sync/skeleton.nim @@ -0,0 +1,588 @@ +import + std/[times], + eth/[common, rlp], + eth/trie/db, + #stew/results, + stint, chronicles, + ../db/[db_chain, storage_types], + ".."/[utils, chain_config], + ../p2p/chain + +{.push raises: [Defect].} + +logScope: + topics = "skeleton" + +type + # Contiguous header chain segment that is backed by the database, + # but may not be linked to the live chain. The skeleton downloader may produce + # a new one of these every time it is restarted until the subchain grows large + # enough to connect with a previous subchain. + SkeletonSubchain* = object + head*: UInt256 # Block number of the newest header in the subchain + tail*: UInt256 # Block number of the oldest header in the subchain + next*: Hash256 # Block hash of the next oldest header in the subchain + + # Database entry to allow suspending and resuming a chain + # sync. As the skeleton header chain is downloaded backwards, restarts can and + # will produce temporarily disjoint subchains. There is no way to restart a + # suspended skeleton sync without prior knowledge of all prior suspension points. + SkeletonProgress = seq[SkeletonSubchain] + + # The Skeleton chain class helps support beacon sync by accepting head blocks + # while backfill syncing the rest of the chain. + SkeletonRef* = ref object + subchains: SkeletonProgress + started : Time # Timestamp when the skeleton syncer was created + logged : Time # Timestamp when progress was last logged to user + pulled : int64 # Number of headers downloaded in this run + filling : bool # Whether we are actively filling the canonical chain + chainTTD : DifficultyInt + chainDB : BaseChainDB + chain : Chain + + # config + skeletonFillCanonicalBackStep: int + skeletonSubchainMergeMinimum: int + syncTargetHeight: int + ignoreTxs: bool + + SkeletonError* = object of CatchableError + + # SyncReorged is an internal helper error to signal that the head chain of + # the current sync cycle was (partially) reorged, thus the skeleton syncer + # should abort and restart with the new state. + ErrSyncReorged* = object of SkeletonError + + # ReorgDenied is returned if an attempt is made to extend the beacon chain + # with a new header, but it does not link up to the existing sync. + ErrReorgDenied* = object of SkeletonError + + # SyncMerged is an internal helper error to signal that the current sync + # cycle merged with a previously aborted subchain, thus the skeleton syncer + # should abort and restart with the new state. + ErrSyncMerged* = object of SkeletonError + + ErrHeaderNotFound* = object of SkeletonError + +const + # How often to log sync status (in ms) + STATUS_LOG_INTERVAL = initDuration(microseconds = 8000) + +proc new*(_: type SkeletonRef, chain: Chain): SkeletonRef = + new(result) + result.chain = chain + result.chainDB = chain.db + result.started = getTime() + result.logged = getTime() + result.pulled = 0'i64 + result.filling = false + result.chainTTD = chain.db.ttd() + result.skeletonFillCanonicalBackStep = 100 + result.skeletonSubchainMergeMinimum = 1000 + #result.syncTargetHeight = ? + result.ignoreTxs = false + +template get(sk: SkeletonRef, key: untyped): untyped = + get(sk.chainDB.db, key.toOpenArray) + +template put(sk: SkeletonRef, key, val: untyped): untyped = + put(sk.chainDB.db, key.toOpenArray, val) + +template del(sk: SkeletonRef, key: untyped): untyped = + del(sk.chainDB.db, key.toOpenArray) + +template toFork(sk: SkeletonRef, number: untyped): untyped = + toFork(sk.chainDB.config, number) + +template blockHeight(sk: SkeletonRef): untyped = + sk.chainDB.currentBlock + +# Reads the SkeletonProgress from db +proc readSyncProgress(sk: SkeletonRef) {.raises: [Defect, RlpError].} = + let rawProgress = sk.get(skeletonProgressKey()) + if rawProgress.len == 0: return + sk.subchains = rlp.decode(rawProgress, SkeletonProgress) + +# Writes the SkeletonProgress to db +proc writeSyncProgress(sk: SkeletonRef) = + for x in sk.subchains: + debug "Writing sync progress subchains", + head=x.head, tail=x.tail, next=short(x.next) + + let encodedProgress = rlp.encode(sk.subchains) + sk.put(skeletonProgressKey(), encodedProgress) + +proc open*(sk: SkeletonRef){.raises: [Defect, RlpError].} = + sk.readSyncProgress() + sk.started = getTime() + +# Gets a block from the skeleton or canonical db by number. +proc getHeader*(sk: SkeletonRef, + number: BlockNumber, + output: var BlockHeader, + onlySkeleton: bool = false): bool {.raises: [Defect, RlpError].} = + let rawHeader = sk.get(skeletonBlockKey(number)) + if rawHeader.len != 0: + output = rlp.decode(rawHeader, BlockHeader) + return true + else: + if onlySkeleton: return false + # As a fallback, try to get the block from the canonical chain in case it is available there + return sk.chainDB.getBlockHeader(number, output) + +# Gets a skeleton block from the db by hash +proc getHeaderByHash*(sk: SkeletonRef, + hash: Hash256, + output: var BlockHeader): bool {.raises: [Defect, RlpError].} = + let rawNumber = sk.get(skeletonBlockHashToNumberKey(hash)) + if rawNumber.len == 0: + return false + return sk.getHeader(rlp.decode(rawNumber, BlockNumber), output) + +# Deletes a skeleton block from the db by number +proc deleteBlock(sk: SkeletonRef, header: BlockHeader) = + sk.del(skeletonBlockKey(header.blockNumber)) + sk.del(skeletonBlockHashToNumberKey(header.blockHash)) + sk.del(skeletonTransactionKey(header.blockNumber)) + +# Writes a skeeton block to the db by number +proc putHeader*(sk: SkeletonRef, header: BlockHeader) = + let encodedHeader = rlp.encode(header) + sk.put(skeletonBlockKey(header.blockNumber), encodedHeader) + sk.put( + skeletonBlockHashToNumberKey(header.blockHash), + rlp.encode(header.blockNumber) + ) + +proc putBlock(sk: SkeletonRef, header: BlockHeader, txs: openArray[Transaction]) = + let encodedHeader = rlp.encode(header) + sk.put(skeletonBlockKey(header.blockNumber), encodedHeader) + sk.put( + skeletonBlockHashToNumberKey(header.blockHash), + rlp.encode(header.blockNumber) + ) + sk.put(skeletonTransactionKey(header.blockNumber), rlp.encode(txs)) + +proc getTxs(sk: SkeletonRef, number: BlockNumber, + output: var seq[Transaction]){.raises: [ + Defect, CatchableError].} = + let rawTxs = sk.get(skeletonTransactionKey(number)) + if rawTxs.len > 0: + output = rlp.decode(rawTxs, seq[Transaction]) + else: + raise newException(SkeletonError, + "getTxs: no transactions from block number " & $number) + +# Bounds returns the current head and tail tracked by the skeleton syncer. +proc bounds*(sk: SkeletonRef): SkeletonSubchain = + sk.subchains[0] + +# Returns true if the skeleton chain is linked to canonical +proc isLinked*(sk: SkeletonRef): bool {.raises: [Defect, CatchableError].} = + if sk.subchains.len == 0: return false + let sc = sk.bounds() + + # make check for genesis if tail is 1? + let head = sk.blockHeight + if sc.tail > head + 1.toBlockNumber: + return false + + var nextHeader: BlockHeader + let number = sc.tail - 1.toBlockNumber + if sk.getHeader(number, nextHeader): + return sc.next == nextHeader.blockHash + else: + raise newException(ErrHeaderNotFound, "isLinked: No header with number=" & $number) + +proc trySubChainsMerge(sk: SkeletonRef): bool {.raises: [Defect, RlpError].} = + var + merged = false + edited = false + head: BlockHeader + + let subchainMergeMinimum = sk.skeletonSubchainMergeMinimum.u256 + # If the subchain extended into the next subchain, we need to handle + # the overlap. Since there could be many overlaps, do this in a loop. + while sk.subchains.len > 1 and + sk.subchains[1].head >= sk.subchains[0].tail: + # Extract some stats from the second subchain + let sc = sk.subchains[1] + + # Since we just overwrote part of the next subchain, we need to trim + # its head independent of matching or mismatching content + if sc.tail >= sk.subchains[0].tail: + # Fully overwritten, get rid of the subchain as a whole + debug "Previous subchain fully overwritten", + head=sc.head, tail=sc.tail, next=short(sc.next) + sk.subchains.delete(1) + edited = true + continue + else: + # Partially overwritten, trim the head to the overwritten size + debug "Previous subchain partially overwritten", + head=sc.head, tail=sc.tail, next=short(sc.next) + sk.subchains[1].head = sk.subchains[0].tail - 1.toBlockNumber + edited = true + + # If the old subchain is an extension of the new one, merge the two + # and let the skeleton syncer restart (to clean internal state) + if sk.getHeader(sk.subchains[1].head, head) and + head.blockHash == sk.subchains[0].next: + # only merge is we can integrate a big progress, as each merge leads + # to disruption of the block fetcher to start a fresh + if (sc.head - sc.tail) > subchainMergeMinimum: + debug "Previous subchain merged head", + head=sc.head, tail=sc.tail, next=short(sc.next) + sk.subchains[0].tail = sc.tail + sk.subchains[0].next = sc.next + sk.subchains.delete(1) + # If subchains were merged, all further available headers + # are invalid since we skipped ahead. + merged = true + else: + debug "Subchain ignored for merge", + head=sc.head, tail=sc.tail, next=short(sc.next) + sk.subchains.delete(1) + edited = true + + if edited: sk.writeSyncProgress() + return merged + +proc backStep(sk: SkeletonRef) {.raises: [Defect, RlpError].}= + if sk.skeletonFillCanonicalBackStep <= 0: + return + + let sc = sk.bounds() + var + hasTail: bool + tailHeader: BlockHeader + newTail = sc.tail + + while true: + newTail = newTail + sk.skeletonFillCanonicalBackStep.u256 + hasTail = sk.getHeader(newTail, tailHeader, true) + if hasTail or newTail > sc.head: break + + if newTail > sc.head: + newTail = sc.head + hasTail = sk.getHeader(newTail, tailHeader, true) + + if hasTail and newTail > 0.toBlockNumber: + trace "Backstepped skeleton", head=sc.head, tail=newTail + sk.subchains[0].tail = newTail + sk.subchains[0].next = tailHeader.parentHash + sk.writeSyncProgress() + else: + # we need a new head, emptying the subchains + sk.subchains = @[] + sk.writeSyncProgress() + warn "Couldn't backStep subchain 0, dropping subchains for new head signal" + +# processNewHead does the internal shuffling for a new head marker and either +# accepts and integrates it into the skeleton or requests a reorg. Upon reorg, +# the syncer will tear itself down and restart with a fresh head. It is simpler +# to reconstruct the sync state than to mutate it. +# +# @returns true if the chain was reorged +proc processNewHead(sk: SkeletonRef, + head: BlockHeader, + force = false): bool {.raises: [Defect, RlpError].} = + # If the header cannot be inserted without interruption, return an error for + # the outer loop to tear down the skeleton sync and restart it + let number = head.blockNumber + + if sk.subchains.len == 0: + warn "Skeleton reorged and cleaned, no current subchain", newHead=number + return true + + let lastchain = sk.subchains[0] + if lastchain.tail >= number: + # If the chain is down to a single beacon header, and it is re-announced + # once more, ignore it instead of tearing down sync for a noop. + if lastchain.head == lastchain.tail: + var header: BlockHeader + let hasHeader = sk.getHeader(number, header) + # TODO: what should we do when hasHeader == false? + if hasHeader and header.blockHash == head.blockHash: + return false + + # Not a noop / double head announce, abort with a reorg + if force: + warn "Beacon chain reorged", + tail=lastchain.tail, head=lastchain.head, newHead=number + return true + + if lastchain.head + 1.toBlockNumber < number: + if force: + warn "Beacon chain gapped", + head=lastchain.head, newHead=number + return true + + var parent: BlockHeader + let hasParent = sk.getHeader(number - 1.toBlockNumber, parent) + if hasParent and parent.blockHash != head.parentHash: + if force: + warn "Beacon chain forked", + ancestor=parent.blockNumber, hash=short(parent.blockHash), + want=short(head.parentHash) + return true + + # Update the database with the new sync stats and insert the new + # head header. We won't delete any trimmed skeleton headers since + # those will be outside the index space of the many subchains and + # the database space will be reclaimed eventually when processing + # blocks above the current head. + sk.putHeader(head) + sk.subchains[0].head = number + sk.writeSyncProgress() + return false + +# Inserts skeleton blocks into canonical chain and runs execution. +proc fillCanonicalChain*(sk: SkeletonRef) {.raises: [Defect, CatchableError].} = + if sk.filling: return + sk.filling = true + + var canonicalHead = sk.blockHeight + let start = canonicalHead + let sc = sk.bounds() + debug "Starting canonical chain fill", + canonicalHead=canonicalHead, subchainHead=sc.head + + var fillLogIndex = 0 + while sk.filling and canonicalHead < sc.head: + # Get next block + let number = canonicalHead + 1.toBlockNumber + var header: BlockHeader + let hasHeader = sk.getHeader(number, header) + if not hasHeader: + # This shouldn't happen, but if it does because of some issues, we should back step + # and fetch again + debug "fillCanonicalChain block number not found, backStepping", + number=number + sk.backStep() + break + + # Insert into chain + var body: BlockBody + if not sk.ignoreTxs: + sk.getTxs(header.blockNumber, body.transactions) + let res = sk.chain.persistBlocks([header], [body]) + if res != ValidationResult.OK: + let hardFork = sk.toFork(number) + error "Failed to put block from skeleton chain to canonical", + number=number, + fork=hardFork, + hash=short(header.blockHash) + + sk.backStep() + break + + # Delete skeleton block to clean up as we go + sk.deleteBlock(header) + canonicalHead += 1.toBlockNumber + inc fillLogIndex # num block inserted + if fillLogIndex > 50: + trace "Skeleton canonical chain fill status", + canonicalHead=canonicalHead, + chainHead=sk.blockHeight, + subchainHead=sc.head + fillLogIndex = 0 + + sk.filling = false + trace "Successfully put blocks from skeleton chain to canonical target", + start=start, stop=canonicalHead, skeletonHead=sc.head, + syncTargetHeight=sk.syncTargetHeight + +# Announce and integrate a new head. +# throws if the new head causes a reorg. +proc setHead*(sk: SkeletonRef, head: BlockHeader, + force = false) {.raises: [Defect, CatchableError].} = + debug "New skeleton head announced", + number=head.blockNumber, + hash=short(head.blockHash), + force=force + + let reorged = sk.processNewHead(head, force) + + # If linked, fill the canonical chain. + if force and sk.isLinked(): + sk.fillCanonicalChain() + + if reorged: + if force: + raise newException(ErrSyncReorged, "setHead: sync reorg") + else: + raise newException(ErrReorgDenied, "setHead: reorg denied") + +# Attempts to get the skeleton sync into a consistent state wrt any +# past state on disk and the newly requested head to sync to. +proc initSync*(sk: SkeletonRef, head: BlockHeader) {.raises: [ + Defect, CatchableError].} = + let number = head.blockNumber + + if sk.subchains.len == 0: + # Start a fresh sync with a single subchain represented by the currently sent + # chain head. + sk.subchains.add(SkeletonSubchain( + head: number, + tail: number, + next: head.parentHash + )) + debug "Created initial skeleton subchain", + head=number, tail=number + else: + # Print some continuation logs + for x in sk.subchains: + debug "Restarting skeleton subchain", + head=x.head, tail=x.tail, next=short(x.next) + + # Create a new subchain for the head (unless the last can be extended), + # trimming anything it would overwrite + let headchain = SkeletonSubchain( + head: number, + tail: number, + next: head.parentHash + ) + + while sk.subchains.len > 0: + # If the last chain is above the new head, delete altogether + let lastchain = addr sk.subchains[0] + if lastchain.tail >= headchain.tail: + debug "Dropping skeleton subchain", + head=lastchain.head, tail=lastchain.tail + sk.subchains.delete(0) # remove `lastchain` + continue + # Otherwise truncate the last chain if needed and abort trimming + if lastchain.head >= headchain.tail: + debug "Trimming skeleton subchain", + oldHead=lastchain.head, newHead=headchain.tail - 1.toBlockNumber, + tail=lastchain.tail + lastchain.head = headchain.tail - 1.toBlockNumber + break + + # If the last subchain can be extended, we're lucky. Otherwise create + # a new subchain sync task. + var extended = false + if sk.subchains.len > 0: + let lastchain = addr sk.subchains[0] + if lastchain.head == headchain.tail - 1.toBlockNumber: + var header: BlockHeader + let lasthead = sk.getHeader(lastchain.head, header) + if lasthead and header.blockHash == head.parentHash: + debug "Extended skeleton subchain with new", + head=headchain.tail, tail=lastchain.tail + lastchain.head = headchain.tail + extended = true + if not extended: + debug "Created new skeleton subchain", + head=number, tail=number + sk.subchains.insert(headchain) + + sk.putHeader(head) + sk.writeSyncProgress() + + # If the sync is finished, start filling the canonical chain. + if sk.isLinked(): + sk.fillCanonicalChain() + +# Writes skeleton blocks to the db by number +# @returns number of blocks saved +proc putBlocks*(sk: SkeletonRef, headers: openArray[BlockHeader]): int {.raises: [ + Defect, CatchableError].}= + var merged = false + + if headers.len > 0: + let first {.used.} = headers[0] + let last {.used.} = headers[^1] + let sc {.used.} = if sk.subchains.len > 0: + sk.subchains[0] + else: + SkeletonSubchain() + debug "Skeleton putBlocks start", + count = headers.len, + first = first.blockNumber, + hash = short(first.blockHash), + fork = sk.toFork(first.blockNumber), + last = last.blockNumber, + hash = short(last.blockHash), + fork = sk.toFork(last.blockNumber), + head = sc.head, + tail = sc.tail, + next = short(sc.next) + + for header in headers: + let number = header.blockNumber + if number >= sk.subchains[0].tail: + # These blocks should already be in skeleton, and might be coming in + # from previous events especially if the previous subchains merge + continue + + # Extend subchain or create new segment if necessary + if sk.subchains[0].next == header.blockHash: + sk.putHeader(header) + sk.pulled += 1'i64 + sk.subchains[0].tail -= 1.toBlockNumber + sk.subchains[0].next = header.parentHash + else: + # Critical error, we expect new incoming blocks to extend the canonical + # subchain which is the [0]'th + let fork = sk.toFork(number) + warn "Blocks don't extend canonical subchain", + head=sk.subchains[0].head, + tail=sk.subchains[0].tail, + next=short(sk.subchains[0].next), + number=number, + hash=short(header.blockHash), + fork=fork + raise newException(SkeletonError, "Blocks don't extend canonical subchain") + + merged = sk.trySubChainsMerge() + # If its merged, we need to break as the new tail could be quite ahead + # so we need to clear out and run the reverse block fetcher again + if merged: break + + sk.writeSyncProgress() + + # Print a progress report making the UX a bit nicer + if getTime() - sk.logged > STATUS_LOG_INTERVAL: + var left = sk.bounds().tail - 1.toBlockNumber - sk.blockHeight + if sk.isLinked(): left = 0.toBlockNumber + if left > 0.toBlockNumber: + sk.logged = getTime() + if sk.pulled == 0: + info "Beacon sync starting", left=left + else: + let sinceStarted = getTime() - sk.started + let eta = (sinceStarted div sk.pulled) * left.truncate(int64) + info "Syncing beacon headers", + downloaded=sk.pulled, left=left, eta=eta + + # If the sync is finished, start filling the canonical chain. + if sk.isLinked(): + sk.fillCanonicalChain() + + if merged: + raise newException(ErrSyncMerged, "putBlocks: sync merged") + + return headers.len + +proc `subchains=`*(sk: SkeletonRef, subchains: openArray[SkeletonSubchain]) = + sk.subchains = @subchains + +proc len*(sk: SkeletonRef): int = + sk.subchains.len + +iterator items*(sk: SkeletonRef): SkeletonSubChain = + for x in sk.subchains: + yield x + +iterator pairs*(sk: SkeletonRef): tuple[key: int, val: SkeletonSubChain] = + for i, x in sk.subchains: + yield (i, x) + +proc ignoreTxs*(sk: SkeletonRef): bool = + sk.ignoreTxs + +proc `ignoreTxs=`*(sk: SkeletonRef, val: bool) = + sk.ignoreTxs = val diff --git a/nimbus/utils.nim b/nimbus/utils.nim index 2ad51db4b..ccb1f95af 100644 --- a/nimbus/utils.nim +++ b/nimbus/utils.nim @@ -1,5 +1,6 @@ import - eth/[trie, rlp, common/eth_types_rlp, trie/db] + eth/[trie, rlp, common/eth_types_rlp, trie/db], + stew/byteutils export eth_types_rlp @@ -50,3 +51,9 @@ proc crc32*(crc: uint32, buf: openArray[byte]): uint32 = crcu32 = (crcu32 shr 4) xor kcrc32[int((crcu32 and 0xF) xor (uint32(b) shr 4'u32))] result = not crcu32 + +proc short*(h: Hash256): string = + var bytes: array[6, byte] + bytes[0..2] = h.data[0..2] + bytes[^3..^1] = h.data[^3..^1] + bytes.toHex