Remove beacon sync (#2666)

This commit is contained in:
andri lim 2024-09-29 09:13:50 +07:00 committed by GitHub
parent 438e183586
commit 5f1b945ebe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 3 additions and 3077 deletions

View File

@ -23,10 +23,6 @@ import
core/tx_pool/tx_item,
core/block_import,
rpc,
rpc/oracle,
sync/protocol,
sync/beacon,
sync/handlers,
beacon/beacon_engine,
beacon/web3_eth_conv,
common
@ -47,7 +43,6 @@ type
server : RpcHttpServer
ttd : DifficultyInt
client : RpcHttpClient
sync : BeaconSyncRef
txPool : TxPoolRef
chain : ForkedChainRef
@ -114,12 +109,7 @@ proc newEngineEnv*(conf: var NimbusConf, chainFile: string, enableAuth: bool): E
echo "Failed to create rpc server: ", error
quit(QuitFailure)
sync = if com.ttd().isSome:
BeaconSyncRef.init(node, chain, ctx.rng, conf.maxPeers, id=conf.tcpPort.int)
else:
BeaconSyncRef(nil)
beaconEngine = BeaconEngineRef.new(txPool, chain)
oracle = Oracle.new(com)
serverApi = newServerAPI(chain)
setupServerAPI(serverApi, server)
@ -137,9 +127,6 @@ proc newEngineEnv*(conf: var NimbusConf, chainFile: string, enableAuth: bool): E
let client = newRpcHttpClient()
waitFor client.connect("127.0.0.1", conf.httpPort, false)
if com.ttd().isSome:
sync.start()
node.startListening()
EngineEnv(
@ -148,15 +135,12 @@ proc newEngineEnv*(conf: var NimbusConf, chainFile: string, enableAuth: bool): E
node : node,
server : server,
client : client,
sync : sync,
txPool : txPool,
chain : chain
)
proc close*(env: EngineEnv) =
waitFor env.node.closeWait()
if not env.sync.isNil:
env.sync.stop()
waitFor env.client.close()
waitFor env.server.closeWait()

View File

@ -139,11 +139,7 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
# nimbus.snapSyncRef = SnapSyncRef.init(
# nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers,
# tickerOK, exCtrlFile)
of SyncMode.Default:
nimbus.beaconSyncRef = BeaconSyncRef.init(
nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng, conf.maxPeers,
)
of SyncMode.Flare:
of SyncMode.Default, SyncMode.Flare:
nimbus.flareSyncRef = FlareSyncRef.init(
nimbus.ethNode, nimbus.chainRef, conf.maxPeers, conf.flareChunkSize)
@ -257,9 +253,7 @@ proc run(nimbus: NimbusNode, conf: NimbusConf) =
if conf.maxPeers > 0:
case conf.syncMode:
of SyncMode.Default:
nimbus.beaconSyncRef.start
of SyncMode.Flare:
of SyncMode.Default, SyncMode.Flare:
nimbus.flareSyncRef.start
#of SyncMode.Snap:
# nimbus.snapSyncRef.start

View File

@ -15,7 +15,7 @@ import
./core/chain,
./core/tx_pool,
./sync/peers,
./sync/[beacon, flare],
./sync/flare,
# ./sync/snap, # -- todo
./beacon/beacon_engine,
./common,
@ -29,7 +29,6 @@ export
chain,
tx_pool,
peers,
beacon,
flare,
#snap,
full,
@ -52,7 +51,6 @@ type
networkLoop*: Future[void]
peerManager*: PeerManagerRef
# snapSyncRef*: SnapSyncRef # -- todo
beaconSyncRef*: BeaconSyncRef
flareSyncRef*: FlareSyncRef
beaconEngine*: BeaconEngineRef
metricsServer*: MetricsHttpServerRef
@ -71,8 +69,6 @@ proc stop*(nimbus: NimbusNode, conf: NimbusConf) {.async, gcsafe.} =
await nimbus.peerManager.stop()
#if nimbus.snapSyncRef.isNil.not:
# nimbus.snapSyncRef.stop()
if nimbus.beaconSyncRef.isNil.not:
nimbus.beaconSyncRef.stop()
if nimbus.metricsServer.isNil.not:
await nimbus.metricsServer.stop()

View File

@ -1,149 +0,0 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises: [].}
import
eth/p2p,
chronicles,
chronos,
stew/[interval_set, sorted_set],
./beacon/[worker, worker_desc, beacon_impl],
"."/[sync_desc, sync_sched, protocol]
logScope:
topics = "beacon-sync"
type
BeaconSyncRef* = RunnerSyncRef[BeaconCtxData,BeaconBuddyData]
const
extraTraceMessages = false # or true
## Enable additional logging noise
# ------------------------------------------------------------------------------
# Private logging helpers
# ------------------------------------------------------------------------------
template traceMsg(f, info: static[string]; args: varargs[untyped]) =
trace "Snap scheduler " & f & "() " & info, args
template traceMsgCtx(f, info: static[string]; c: BeaconCtxRef) =
when extraTraceMessages:
block:
let
poolMode {.inject.} = c.poolMode
daemon {.inject.} = c.daemon
f.traceMsg info, poolMode, daemon
template traceMsgBuddy(f, info: static[string]; b: BeaconBuddyRef) =
when extraTraceMessages:
block:
let
peer {.inject.} = b.peer
runState {.inject.} = b.ctrl.state
multiOk {.inject.} = b.ctrl.multiOk
poolMode {.inject.} = b.ctx.poolMode
daemon {.inject.} = b.ctx.daemon
f.traceMsg info, peer, runState, multiOk, poolMode, daemon
template tracerFrameCtx(f: static[string]; c: BeaconCtxRef; code: untyped) =
f.traceMsgCtx "begin", c
code
f.traceMsgCtx "end", c
template tracerFrameBuddy(f: static[string]; b: BeaconBuddyRef; code: untyped) =
f.traceMsgBuddy "begin", b
code
f.traceMsgBuddy "end", b
# ------------------------------------------------------------------------------
# Virtual methods/interface, `mixin` functions
# ------------------------------------------------------------------------------
proc runSetup(ctx: BeaconCtxRef): bool =
tracerFrameCtx("runSetup", ctx):
result = worker.setup(ctx)
proc runRelease(ctx: BeaconCtxRef) =
tracerFrameCtx("runRelease", ctx):
worker.release(ctx)
proc runDaemon(ctx: BeaconCtxRef) {.async.} =
tracerFrameCtx("runDaemon", ctx):
await worker.runDaemon(ctx)
proc runStart(buddy: BeaconBuddyRef): bool =
tracerFrameBuddy("runStart", buddy):
result = worker.start(buddy)
proc runStop(buddy: BeaconBuddyRef) =
tracerFrameBuddy("runStop", buddy):
worker.stop(buddy)
proc runPool(buddy: BeaconBuddyRef; last: bool; laps: int): bool =
tracerFrameBuddy("runPool", buddy):
result = worker.runPool(buddy, last, laps)
proc runSingle(buddy: BeaconBuddyRef) {.async.} =
tracerFrameBuddy("runSingle", buddy):
await worker.runSingle(buddy)
proc runMulti(buddy: BeaconBuddyRef) {.async.} =
tracerFrameBuddy("runMulti", buddy):
await worker.runMulti(buddy)
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
func updateBeaconHeaderCB(ctx: BeaconSyncRef): SyncReqNewHeadCB =
## Update beacon header. This function is intended as a call back function
## for the RPC module.
result = proc(h: BlockHeader) {.gcsafe, raises: [].} =
try:
debug "REQUEST SYNC", number=h.number, hash=h.blockHash.short
waitFor ctx.ctx.appendSyncTarget(h)
except CatchableError as ex:
error "updateBeconHeaderCB error", msg=ex.msg
proc enableRpcMagic(ctx: BeaconSyncRef) =
## Helper for `setup()`: Enable external pivot update via RPC
let com = ctx.ctx.chain.com
com.syncReqNewHead = ctx.updateBeaconHeaderCB
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc init*(
T: type BeaconSyncRef;
ethNode: EthereumNode;
chain: ForkedChainRef;
rng: ref HmacDrbgContext;
maxPeers: int;
id: int = 0): T =
new result
result.initSync(ethNode, chain, maxPeers)
result.ctx.pool.rng = rng
result.ctx.pool.id = id
proc start*(ctx: BeaconSyncRef) =
## Beacon Sync always begin with stop mode
doAssert ctx.startSync() # Initialize subsystems
ctx.enableRpcMagic() # Allow external pivot update via RPC
proc stop*(ctx: BeaconSyncRef) =
ctx.stopSync()
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -1,419 +0,0 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
std/[tables, options],
chronicles,
chronos,
chronos/timer,
./worker_desc,
./skeleton_main,
./skeleton_utils,
./skeleton_db,
../../utils/utils,
../protocol,
../types
logScope:
topics = "beacon-impl"
{.push gcsafe, raises: [].}
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
func makeGetBodyJob(header: BlockHeader, setHead: bool): BeaconJob =
BeaconJob(
mode: bjmGetBody,
getBodyJob: BeaconJobGetBody(
headerHash: header.blockHash,
sumHash: header.sumHash,
header: header,
setHead: setHead
)
)
func makeGetBlocksJob(number, maxResults: uint64) : BeaconJob =
BeaconJob(
mode: bjmGetBlocks,
getBlocksJob: BeaconJobGetBlocks(
number: number,
maxResults: maxResults,
)
)
func makeHeaderRequest(number: uint64, maxResults: uint64): BlocksRequest =
BlocksRequest(
startBlock: HashOrNum(isHash: false, number: number),
maxResults: maxResults.uint,
skip: 0,
reverse: true
)
func makeGetBodiesJob(headers: sink seq[BlockHeader],
headerHash: sink seq[Hash256],
reqBodies: sink seq[bool]): BeaconJob =
BeaconJob(
mode: bjmGetBodies,
getBodiesJob: BeaconJobGetBodies(
headers : system.move(headers),
headerHash: system.move(headerHash),
reqBodies : system.move(reqBodies)
)
)
proc requeue(buddy: BeaconBuddyRef, job: BeaconJob) =
buddy.ctx.poolMode = true
buddy.only.requeue.add job
proc jobDone(buddy: BeaconBuddyRef) =
buddy.only.job = nil
proc mapBodiesToHeader(buddy: BeaconBuddyRef,
job: BeaconJob,
bodies: openArray[BlockBody],
reqBodies: openArray[bool]) {.raises: [].} =
doAssert(job.mode == bjmGetBlocks or
job.mode == bjmGetBodies,
"mapBodiesToHeader doesn't allow this job: " & $job.mode)
var
headers = if job.mode == bjmGetBlocks:
system.move(job.getBlocksJob.headers)
else:
system.move(job.getBodiesJob.headers)
map = Table[Hash256, int]()
for i, x in bodies:
let bodyHash = sumHash(x)
map[bodyHash] = i
for i, req in reqBodies:
if not req:
if job.mode == bjmGetBlocks:
job.getBlocksJob.headers.add headers[i]
job.getBlocksJob.bodies.add BlockBody()
else:
job.getBodiesJob.headers.add headers[i]
job.getBodiesJob.bodies.add BlockBody()
continue
let bodyHash = sumHash(headers[i])
let z = map.getOrDefault(bodyHash, MissingBody)
if z == MissingBody:
# missing or invalid body, request again
buddy.requeue makeGetBodyJob(headers[i], setHead = false)
continue
if job.mode == bjmGetBlocks:
job.getBlocksJob.headers.add headers[i]
job.getBlocksJob.bodies.add bodies[z]
else:
job.getBodiesJob.headers.add headers[i]
job.getBodiesJob.bodies.add bodies[z]
proc putBlocks(ctx: BeaconCtxRef,
skel: SkeletonRef,
headers: openArray[BlockHeader],
bodies: openArray[BlockBody]) =
for i, body in bodies:
let r = skel.putBody(headers[i], body)
doAssert(r.isOk)
let res = skel.putBlocks(headers)
if res.isErr:
error "putBlocks->putBlocks", msg=res.error
return
let z = res.get
if FillCanonical in z.status:
let rr = skel.fillCanonicalChain()
if rr.isErr:
error "putBlocks->fillCanonicalChain", msg=rr.error
return
proc setupTally*(ctx: BeaconCtxRef) =
let
skel = ctx.pool.skeleton
last = skel.last
discard ctx.pool.mask.merge(1'u64, last.head)
for x in skel.subchains:
discard ctx.pool.mask.reduce(x.tail, x.head)
discard ctx.pool.pulled.merge(x.tail, x.head)
proc mergeTally*(ctx: BeaconCtxRef, least: uint64, last: uint64) =
discard ctx.pool.mask.merge(least, last)
proc reduceTally*(ctx: BeaconCtxRef, least: uint64, last: uint64) =
discard ctx.pool.mask.reduce(least, last)
discard ctx.pool.pulled.merge(least, last)
proc downloaded*(ctx: BeaconCtxRef, head: uint64): bool =
ctx.pool.pulled.covered(head, head) > 0'u64
proc headTally(ctx: BeaconCtxRef, head: uint64) =
discard ctx.pool.pulled.merge(head, head)
let rc = ctx.pool.mask.le()
if rc.isSome:
let maxPt = rc.get().maxPt
if head > maxPt:
# new head
discard ctx.pool.mask.merge(maxPt+1, head-1)
else:
# initialize
discard ctx.pool.mask.merge(1'u64, head)
discard ctx.pool.mask.reduce(head, head)
proc popFirst(x: var TargetQueue): BlockHeader =
# assume we already check len > 0
x.shift().get().data
proc addLast(x: var TargetQueue, h: BlockHeader) =
discard x.prepend(h.blockHash, h)
# ------------------------------------------------------------------------------
# Synchronizer will produce jobs for workers
# ------------------------------------------------------------------------------
proc resumeSync*(ctx: BeaconCtxRef): Future[bool] {.async.} =
let skel = ctx.pool.skeleton
if skel.len == 0:
return true
let last = skel.last
let res = skel.getHeader(last.head)
if res.isErr:
error "resumeSync->getHeader", msg=res.error
return false
let maybeHeader = res.get
if maybeHeader.isNone:
return true
let header = maybeHeader.get
let r = skel.initSync(header)
if r.isErr:
error "resumeSync->initSync", msg=r.error
return false
let z = r.get
if FillCanonical in z.status:
let rr = skel.fillCanonicalChain()
if rr.isErr:
error "resumeSync->fillCanonicalChain", msg=rr.error
return false
# collect gaps of skeleton, excluding genesis
ctx.setupTally()
return true
proc appendSyncTarget*(ctx: BeaconCtxRef, h: BlockHeader): Future[void] {.async.} =
while bmShiftTarget in ctx.pool.mode:
await sleepAsync timer.milliseconds(10)
let number = h.u64
ctx.pool.mode.incl bmAppendTarget
if not ctx.downloaded(number):
ctx.headTally(number)
ctx.pool.target.addLast(h)
ctx.pool.mode.excl bmAppendTarget
ctx.daemon = true
proc shiftSyncTarget*(ctx: BeaconCtxRef): Future[BlockHeader] {.async.} =
doAssert(ctx.pool.target.len > 0)
while bmAppendTarget in ctx.pool.mode:
await sleepAsync timer.milliseconds(10)
ctx.pool.mode.incl bmShiftTarget
let h = ctx.pool.target.popFirst()
ctx.pool.mode.excl bmShiftTarget
return h
proc setSyncTarget*(ctx: BeaconCtxRef): Future[void] {.async.} =
let header = await ctx.shiftSyncTarget()
let res = ctx.pool.skeleton.setHead(header, force = true)
if res.isOk:
let job = makeGetBodyJob(header, setHead = true)
ctx.pool.jobs.addLast(job)
else:
error "setSyncTarget.setHead", msg=res.error
proc fillBlocksGaps*(ctx: BeaconCtxRef, least: uint64, last: uint64) =
if last - least < MaxGetBlocks:
ctx.reduceTally(last-least, last)
let job = makeGetBlocksJob(last, last-least+1)
ctx.pool.jobs.addLast(job)
return
var
max = last
while true:
ctx.reduceTally(max-MaxGetBlocks, max)
let job = makeGetBlocksJob(max, MaxGetBlocks)
ctx.pool.jobs.addLast(job)
if ctx.pool.jobs.len > MaxJobsQueue:
return
max = max-MaxGetBlocks
if max <= MaxGetBlocks:
break
if max > 1:
ctx.reduceTally(1, max)
let job = makeGetBlocksJob(max, max)
ctx.pool.jobs.addLast(job)
# ------------------------------------------------------------------------------
# Worker will consume available jobs
# ------------------------------------------------------------------------------
proc executeGetBodyJob*(buddy: BeaconBuddyRef, job: BeaconJob): Future[void] {.async.} =
let
ctx = buddy.ctx
peer = buddy.peer
skel = ctx.pool.skeleton
let b = await peer.getBlockBodies([job.getBodyJob.headerHash])
if b.isNone:
debug "executeGetBodyJob->getBodies none",
hash=job.getBodyJob.headerHash.short,
number=job.getBodyJob.header.number
# retry with other peer
buddy.requeue job
return
let bodies = b.get
if bodies.blocks.len == 0:
debug "executeGetBodyJob->getBodies isZero",
hash=job.getBodyJob.headerHash.short,
number=job.getBodyJob.header.number
# retry with other peer
buddy.requeue job
return
job.getBodyJob.body = bodies.blocks[0]
let bodySumHash = sumHash(job.getBodyJob.body)
if bodySumHash != job.getBodyJob.sumHash:
# retry with other peer
debug "executeGetBodyJob->sumHash",
expect=job.getBodyJob.sumHash.short,
get=bodySumHash.short
buddy.requeue job
return
var status: set[SkeletonStatus]
if job.getBodyJob.setHead:
let res = skel.setHead(job.getBodyJob.header)
if res.isErr:
error "executeGetBodyJob->setHead", msg=res.error
# something wrong
return
status = res.get().status
else:
let res = skel.putBlocks([job.getBodyJob.header])
if res.isErr:
error "executeGetBodyJob->putBlocks", msg=res.error
return
status = res.get().status
let r = skel.putBody(job.getBodyJob.header, job.getBodyJob.body)
doAssert(r.isOk)
if FillCanonical in status:
let rr = skel.fillCanonicalChain()
if rr.isErr:
error "executeGetBodyJob->fillCanonicalChain", msg=rr.error
return
buddy.jobDone()
proc executeGetBlocksJob*(buddy: BeaconBuddyRef, job: BeaconJob): Future[void] {.async.} =
let
ctx = buddy.ctx
peer = buddy.peer
skel = ctx.pool.skeleton
request = makeHeaderRequest(job.getBlocksJob.number, job.getBlocksJob.maxResults)
let res = await peer.getBlockHeaders(request)
if res.isNone:
# retry with other peer
error "executeGetBlocksJob->getBlockHeaders none"
buddy.requeue job
return
job.getBlocksJob.headers = res.get().headers
let numHeaders = job.getBlocksJob.headers.len
var
headerHashes = newSeqOfCap[Hash256](numHeaders)
reqBodies = newSeqOfCap[bool](numHeaders)
numRequest = 0
for i, x in job.getBlocksJob.headers:
if not x.hasBody:
reqBodies.add false
continue
reqBodies.add true
headerHashes.add x.blockHash
inc numRequest
if numRequest == 0:
# all bodies are empty
for _ in 0..<numHeaders:
job.getBlocksJob.bodies.add BlockBody()
else:
let b = await peer.getBlockBodies(headerHashes)
if b.isNone:
debug "executeGetBlocksJob->getBodies none"
# retry with other peer
buddy.requeue makeGetBodiesJob(job.getBlocksJob.headers,
headerHashes, reqBodies)
return
buddy.mapBodiesToHeader(job, b.get().blocks, reqBodies)
ctx.putBlocks(skel, job.getBlocksJob.headers, job.getBlocksJob.bodies)
buddy.jobDone()
proc executeGetBodiesJob*(buddy: BeaconBuddyRef, job: BeaconJob): Future[void] {.async.} =
let
ctx = buddy.ctx
peer = buddy.peer
skel = ctx.pool.skeleton
let b = await peer.getBlockBodies(job.getBodiesJob.headerHash)
if b.isNone:
debug "executeGetBodiesJob->getBodies none"
# retry with other peer
buddy.requeue job
return
buddy.mapBodiesToHeader(job, b.get().blocks, job.getBodiesJob.reqBodies)
ctx.putBlocks(skel, job.getBodiesJob.headers, job.getBodiesJob.bodies)
buddy.jobDone()
proc executeJob*(buddy: BeaconBuddyRef, job: BeaconJob): Future[void] {.async.} =
try:
case job.mode
of bjmGetBody:
await executeGetBodyJob(buddy, job)
of bjmGetBlocks:
await executeGetBlocksJob(buddy, job)
of bjmGetBodies:
await executeGetBodiesJob(buddy, job)
except TransportError as ex:
error "executeJob->TransportError", msg=ex.msg
except CatchableError as ex:
error "executeJob->OtherError", msg=ex.msg
# retry with other peer
buddy.requeue job

View File

@ -1,497 +0,0 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
./skeleton_desc,
./skeleton_utils,
./skeleton_db,
../../utils/utils
{.push gcsafe, raises: [].}
logScope:
topics = "skeleton"
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc fastForwardHead(sk: SkeletonRef, last: Segment, target: uint64): Result[void, string] =
# Try fast forwarding the chain head to the number
let
head = last.head
maybeHead = sk.getHeader(head, true).valueOr:
return err(error)
if maybeHead.isNone:
return ok()
var
headBlock = maybeHead.get
headBlockHash = headBlock.blockHash
for newHead in head + 1 .. target:
let maybeHead = sk.getHeader(newHead, true).valueOr:
return err(error)
if maybeHead.isNone:
break
let newBlock = maybeHead.get
if newBlock.parentHash != headBlockHash:
# Head can't be updated forward
break
headBlock = newBlock
headBlockHash = headBlock.blockHash
last.head = headBlock.u64
debug "lastchain head fast forwarded",
`from`=head, to=last.head, tail=last.tail
ok()
proc backStep(sk: SkeletonRef): Result[uint64, string] =
if sk.conf.fillCanonicalBackStep <= 0:
return ok(0)
let sc = sk.last
var
newTail = sc.tail
maybeTailHeader: Opt[BlockHeader]
while true:
newTail = newTail + sk.conf.fillCanonicalBackStep
maybeTailHeader = sk.getHeader(newTail, true).valueOr:
return err(error)
if maybeTailHeader.isSome or newTail > sc.head: break
if newTail > sc.head:
newTail = sc.head
maybeTailHeader = sk.getHeader(newTail, true).valueOr:
return err(error)
if maybeTailHeader.isSome and newTail > 0:
debug "Backstepped skeleton", head=sc.head, tail=newTail
let tailHeader = maybeTailHeader.get
sk.last.tail = tailHeader.u64
sk.last.next = tailHeader.parentHash
sk.writeProgress()
return ok(newTail)
# we need a new head, emptying the subchains
sk.clear()
sk.writeProgress()
debug "Couldn't backStep subchain 0, dropping subchains for new head signal"
return ok(0)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc isLinked*(sk: SkeletonRef): Result[bool, string] =
## Returns true if the skeleton chain is linked to canonical
if sk.isEmpty:
return ok(false)
let sc = sk.last
# if its genesis we are linked
if sc.tail == 0:
return ok(true)
let head = sk.blockHeight
if sc.tail > head + 1:
return ok(false)
let number = sc.tail - 1
let maybeHeader = sk.getHeader(number).valueOr:
return err("isLinked: " & error)
# The above sc.tail > head - 1
# assure maybeHeader.isSome
doAssert maybeHeader.isSome
let nextHeader = maybeHeader.get
let linked = sc.next == nextHeader.blockHash
if linked and sk.len > 1:
# Remove all other subchains as no more relevant
sk.removeAllButLast()
sk.writeProgress()
return ok(linked)
proc trySubChainsMerge*(sk: SkeletonRef): Result[bool, string] =
var
merged = false
edited = false
# If the subchain extended into the next subchain, we need to handle
# the overlap. Since there could be many overlaps, do this in a loop.
while sk.len > 1 and sk.second.head >= sk.last.tail:
# Extract some stats from the second subchain
let sc = sk.second
# Since we just overwrote part of the next subchain, we need to trim
# its head independent of matching or mismatching content
if sc.tail >= sk.last.tail:
# Fully overwritten, get rid of the subchain as a whole
debug "Previous subchain fully overwritten", sub=sc
sk.removeSecond()
edited = true
continue
else:
# Partially overwritten, trim the head to the overwritten size
debug "Previous subchain partially overwritten", sub=sc
sc.head = sk.last.tail - 1
edited = true
# If the old subchain is an extension of the new one, merge the two
# and let the skeleton syncer restart (to clean internal state)
let
maybeSecondHead = sk.getHeader(sk.second.head).valueOr:
return err(error)
secondHeadHash = maybeSecondHead.blockHash
if maybeSecondHead.isSome and secondHeadHash == sk.last.next:
# only merge if we can integrate a big progress, as each merge leads
# to disruption of the block fetcher to start a fresh
if (sc.head - sc.tail) > sk.conf.subchainMergeMinimum:
debug "Previous subchain merged head", sub=sc
sk.last.tail = sc.tail
sk.last.next = sc.next
sk.removeSecond()
# If subchains were merged, all further available headers
# are invalid since we skipped ahead.
merged = true
else:
debug "Subchain ignored for merge", sub=sc
sk.removeSecond()
edited = true
if edited: sk.writeProgress()
ok(merged)
proc putBlocks*(sk: SkeletonRef, headers: openArray[BlockHeader]):
Result[StatusAndNumber, string] =
## Writes skeleton blocks to the db by number
## @returns number of blocks saved
var
merged = false
tailUpdated = false
if sk.len == 0:
return err("no subchain set")
# best place to debug beacon downloader
when false:
var numbers: seq[uint64]
for header in headers:
numbers.add header.u64
debugEcho numbers
for header in headers:
let
number = header.u64
headerHash = header.blockHash
if number >= sk.last.tail:
# These blocks should already be in skeleton, and might be coming in
# from previous events especially if the previous subchains merge
continue
elif number == 0:
let genesisHash = sk.genesisHash
if headerHash == genesisHash:
return err("Skeleton pubBlocks with invalid genesis block " &
"number=" & $number &
", hash=" & headerHash.short &
", genesisHash=" & genesisHash.short)
continue
# Extend subchain or create new segment if necessary
if sk.last.next == headerHash:
sk.putHeader(header)
sk.pulled += 1
sk.last.tail = number
sk.last.next = header.parentHash
tailUpdated = true
else:
# Critical error, we expect new incoming blocks to extend the canonical
# subchain which is the [0]'th
debug "Blocks don't extend canonical subchain",
sub=sk.last,
number,
hash=headerHash.short
return err("Blocks don't extend canonical subchain")
merged = sk.trySubChainsMerge().valueOr:
return err(error)
if tailUpdated or merged:
sk.progress.canonicalHeadReset = true
# If its merged, we need to break as the new tail could be quite ahead
# so we need to clear out and run the reverse block fetcher again
if merged: break
sk.writeProgress()
# Print a progress report making the UX a bit nicer
#if getTime() - sk.logged > STATUS_LOG_INTERVAL:
# var left = sk.last.tail - 1 - sk.blockHeight
# if sk.progress.linked: left = 0
# if left > 0:
# sk.logged = getTime()
# if sk.pulled == 0:
# info "Beacon sync starting", left=left
# else:
# let sinceStarted = getTime() - sk.started
# let eta = (sinceStarted div sk.pulled.int64) * left.int64
# info "Syncing beacon headers",
# downloaded=sk.pulled, left=left, eta=eta.short
sk.progress.linked = sk.isLinked().valueOr:
return err(error)
var res = StatusAndNumber(number: headers.len.uint64)
# If the sync is finished, start filling the canonical chain.
if sk.progress.linked:
res.status.incl FillCanonical
if merged:
res.status.incl SyncMerged
ok(res)
# Inserts skeleton blocks into canonical chain and runs execution.
proc fillCanonicalChain*(sk: SkeletonRef): Result[void, string] =
if sk.filling: return ok()
sk.filling = true
var
canonicalHead = sk.blockHeight
maybeOldHead = Opt.none BlockHeader
let subchain = sk.last
if sk.progress.canonicalHeadReset:
# Grab previous head block in case of resettng canonical head
let oldHead = sk.canonicalHead()
maybeOldHead = Opt.some oldHead
if subchain.tail > canonicalHead + 1:
return err("Canonical head should already be on or " &
"ahead subchain tail canonicalHead=" &
$canonicalHead & ", tail=" & $subchain.tail)
let newHead = if subchain.tail > 0: subchain.tail - 1
else: 0
debug "Resetting canonicalHead for fillCanonicalChain",
`from`=canonicalHead, to=newHead
canonicalHead = newHead
sk.resetCanonicalHead(canonicalHead, oldHead.u64)
sk.progress.canonicalHeadReset = false
let start {.used.} = canonicalHead
# This subchain is a reference to update the tail for
# the very subchain we are filling the data for
debug "Starting canonical chain fill",
canonicalHead, subchainHead=subchain.head
while sk.filling and canonicalHead < subchain.head:
# Get next block
let
number = canonicalHead + 1
maybeHeader = sk.getHeader(number).valueOr:
return err(error)
if maybeHeader.isNone:
# This shouldn't happen, but if it does because of some issues,
# we should back step and fetch again
debug "fillCanonicalChain block not found, backStepping", number
sk.backStep().isOkOr:
return err(error)
break
# Insert into chain
let header = maybeHeader.get
let res = sk.insertBlock(header, true)
if res.isErr:
let maybeHead = sk.getHeader(subchain.head).valueOr:
return err(error)
# In post-merge, notify the engine API of encountered bad chains
if maybeHead.isSome:
sk.com.notifyBadBlock(header, maybeHead.get)
debug "fillCanonicalChain putBlock", msg=res.error
if maybeOldHead.isSome:
let oldHead = maybeOldHead.get
if oldHead.u64 >= number:
# Put original canonical head block back if reorg fails
sk.insertBlock(oldHead, true).isOkOr:
return err(error)
let numBlocksInserted = res.valueOr: 0
if numBlocksInserted != 1:
debug "Failed to put block from skeleton chain to canonical",
number=number,
hash=header.blockHashStr,
parentHash=header.parentHash.short
# Lets log some parent by number and parent by hash, that may help to understand whats going on
let parent {.used.} = sk.getHeader(number - 1).valueOr:
return err(error)
debug "ParentByNumber", number=parent.numberStr, hash=parent.blockHashStr
let parentWithHash {.used.} = sk.getHeader(header.parentHash).valueOr:
return err(error)
debug "parentByHash",
number=parentWithHash.numberStr,
hash=parentWithHash.blockHashStr
sk.backStep().isOkOr:
return err(error)
break
canonicalHead += numBlocksInserted
sk.fillLogIndex += numBlocksInserted
# Delete skeleton block to clean up as we go, if block is fetched and chain is linked
# it will be fetched from the chain without any issues
sk.deleteHeaderAndBody(header)
if sk.fillLogIndex >= 20:
debug "Skeleton canonical chain fill status",
canonicalHead,
chainHead=sk.blockHeight,
subchainHead=subchain.head
sk.fillLogIndex = 0
sk.filling = false
debug "Successfully put blocks from skeleton chain to canonical",
start, `end`=canonicalHead,
skeletonHead=subchain.head
ok()
proc processNewHead*(sk: SkeletonRef, head: BlockHeader,
force = false): Result[bool, string] =
## processNewHead does the internal shuffling for a new head marker and either
## accepts and integrates it into the skeleton or requests a reorg. Upon reorg,
## the syncer will tear itself down and restart with a fresh head. It is simpler
## to reconstruct the sync state than to mutate it.
## @returns true if the chain was reorged
# If the header cannot be inserted without interruption, return an error for
# the outer loop to tear down the skeleton sync and restart it
let
number = head.u64
headHash = head.blockHash
genesisHash = sk.genesisHash
if number == 0:
if headHash != genesisHash:
return err("Invalid genesis setHead announcement " &
"number=" & $number &
", hash=" & headHash.short &
", genesisHash=" & genesisHash.short
)
# genesis announcement
return ok(false)
let last = if sk.isEmpty:
debug "Skeleton empty, comparing against genesis head=0 tail=0",
newHead=number
# set the lastchain to genesis for comparison in
# following conditions
segment(0, 0, zeroBlockHash)
else:
sk.last
if last.tail > number:
# Not a noop / double head announce, abort with a reorg
if force:
debug "Skeleton setHead before tail, resetting skeleton",
tail=last.tail, head=last.head, newHead=number
last.head = number
last.tail = number
last.next = head.parentHash
else:
debug "Skeleton announcement before tail, will reset skeleton",
tail=last.tail, head=last.head, newHead=number
return ok(true)
elif last.head >= number:
# Check if its duplicate announcement, if not trim the head and
# let the match run after this if block
let maybeDupBlock = sk.getHeader(number).valueOr:
return err(error)
let maybeDupHash = maybeDupBlock.blockHash
if maybeDupBlock.isSome and maybeDupHash == headHash:
debug "Skeleton duplicate announcement",
tail=last.tail, head=last.head, number, hash=headHash.short
return ok(false)
else:
# Since its not a dup block, so there is reorg in the chain or at least
# in the head which we will let it get addressed after this if else block
if force:
debug "Skeleton differing announcement",
tail=last.tail,
head=last.head,
number=number,
expected=maybeDupHash.short,
actual=headHash.short
else:
debug "Skeleton stale announcement",
tail=last.tail,
head=last.head,
number
return ok(true)
elif last.head + 1 < number:
if force:
sk.fastForwardHead(last, number - 1).isOkOr:
return err(error)
# If its still less than number then its gapped head
if last.head + 1 < number:
debug "Beacon chain gapped setHead",
head=last.head, newHead=number
return ok(true)
else:
debug "Beacon chain gapped announcement",
head=last.head, newHead=number
return ok(true)
let maybeParent = sk.getHeader(number - 1).valueOr:
return err(error)
let parentHash = maybeParent.blockHash
if maybeParent.isNone or parentHash != head.parentHash:
if force:
debug "Beacon chain forked",
ancestor=maybeParent.numberStr,
hash=maybeParent.blockHashStr,
want=head.parentHash.short
return ok(true)
if force:
last.head = number
if sk.isEmpty:
# If there was no subchain to being with i.e. initialized from genesis
# and no reorg then push in subchains else the reorg handling will
# push the new chain
sk.push(last)
sk.progress.linked = sk.isLinked.valueOr:
return err(error)
debug "Beacon chain extended new", last
return ok(false)

View File

@ -1,208 +0,0 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
eth/rlp,
eth/common/eth_types_rlp,
./skeleton_desc,
./skeleton_utils,
../../db/storage_types,
../../utils/utils,
../../core/chain
export
eth_types_rlp.blockHash
{.push gcsafe, raises: [].}
logScope:
topics = "skeleton"
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
template get(sk: SkeletonRef, key: untyped): untyped =
sk.db.ctx.getKvt().get(key.toOpenArray).valueOr: EmptyBlob
template put(sk: SkeletonRef, key, val: untyped): untyped =
let rc = sk.db.ctx.getKvt().put(key.toOpenArray, val)
if rc.isErr:
raiseAssert "put() failed: " & $$rc.error
template del(sk: SkeletonRef, key: untyped): untyped =
discard sk.db.ctx.getKvt().del(key.toOpenArray)
proc append(w: var RlpWriter, s: Segment) =
w.startList(3)
w.append(s.head)
w.append(s.tail)
w.append(s.next)
proc append(w: var RlpWriter, p: Progress) =
w.startList(3)
w.append(p.segments)
w.append(p.linked)
w.append(p.canonicalHeadReset)
proc readImpl(rlp: var Rlp, T: type Segment): Segment {.raises: [RlpError].} =
rlp.tryEnterList()
Segment(
head: rlp.read(uint64),
tail: rlp.read(uint64),
next: rlp.read(Hash256),
)
proc readImpl(rlp: var Rlp, T: type Progress): Progress {.raises: [RlpError].} =
rlp.tryEnterList()
Progress(
segments: rlp.read(seq[Segment]),
linked : rlp.read(bool),
canonicalHeadReset: rlp.read(bool),
)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc getHeader*(sk: SkeletonRef,
number: uint64,
onlySkeleton: bool = false): Result[Opt[BlockHeader], string] =
## Gets a block from the skeleton or canonical db by number.
try:
let rawHeader = sk.get(skeletonHeaderKey(number.BlockNumber))
if rawHeader.len != 0:
let output = rlp.decode(rawHeader, BlockHeader)
return ok(Opt.some output)
if onlySkeleton:
return ok(Opt.none BlockHeader)
# As a fallback, try to get the block from the canonical chain
# in case it is available there
var output: BlockHeader
if sk.db.getBlockHeader(number.BlockNumber, output):
return ok(Opt.some output)
ok(Opt.none BlockHeader)
except RlpError as ex:
err(ex.msg)
proc getHeader*(sk: SkeletonRef,
blockHash: Hash256,
onlySkeleton: bool = false):
Result[Opt[BlockHeader], string] =
## Gets a skeleton block from the db by hash
try:
let rawNumber = sk.get(skeletonBlockHashToNumberKey(blockHash))
if rawNumber.len != 0:
var output: BlockHeader
let number = rlp.decode(rawNumber, BlockNumber)
if sk.db.getBlockHeader(number, output):
return ok(Opt.some output)
if onlySkeleton:
return ok(Opt.none BlockHeader)
# As a fallback, try to get the block from the canonical chain
# in case it is available there
var output: BlockHeader
if sk.db.getBlockHeader(blockHash, output):
return ok(Opt.some output)
ok(Opt.none BlockHeader)
except RlpError as ex:
err(ex.msg)
proc putHeader*(sk: SkeletonRef, header: BlockHeader) =
## Writes a skeleton block header to the db by number
let encodedHeader = rlp.encode(header)
sk.put(skeletonHeaderKey(header.number), encodedHeader)
sk.put(
skeletonBlockHashToNumberKey(header.blockHash),
rlp.encode(header.number)
)
proc putBody*(sk: SkeletonRef, header: BlockHeader, body: BlockBody): Result[void, string] =
## Writes block body to db
try:
let
encodedBody = rlp.encode(body)
bodyHash = sumHash(body)
headerHash = header.blockHash
keyHash = sumHash(headerHash, bodyHash)
sk.put(skeletonBodyKey(keyHash), encodedBody)
ok()
except CatchableError as ex:
err(ex.msg)
proc getBody*(sk: SkeletonRef, header: BlockHeader): Result[Opt[BlockBody], string] =
## Reads block body from db
## sumHash is the hash of [txRoot, ommersHash, wdRoot]
try:
let
bodyHash = header.sumHash
headerHash = header.blockHash
keyHash = sumHash(headerHash, bodyHash)
rawBody = sk.get(skeletonBodyKey(keyHash))
if rawBody.len > 0:
return ok(Opt.some rlp.decode(rawBody, BlockBody))
ok(Opt.none BlockBody)
except RlpError as ex:
err(ex.msg)
proc writeProgress*(sk: SkeletonRef) =
## Writes the progress to db
for sub in sk.subchains:
debug "Writing sync progress subchains", sub
let encodedProgress = rlp.encode(sk.progress)
sk.put(skeletonProgressKey(), encodedProgress)
proc readProgress*(sk: SkeletonRef): Result[void, string] =
## Reads the SkeletonProgress from db
try:
let rawProgress = sk.get(skeletonProgressKey())
if rawProgress.len == 0:
return ok()
sk.progress = rlp.decode(rawProgress, Progress)
ok()
except RlpError as ex:
err(ex.msg)
proc deleteHeaderAndBody*(sk: SkeletonRef, header: BlockHeader) =
## Deletes a skeleton block from the db by number
sk.del(skeletonHeaderKey(header.number))
sk.del(skeletonBlockHashToNumberKey(header.blockHash))
sk.del(skeletonBodyKey(header.sumHash))
proc canonicalHead*(sk: SkeletonRef): BlockHeader =
sk.chain.latestHeader
proc resetCanonicalHead*(sk: SkeletonRef, newHead, oldHead: uint64) =
debug "RESET CANONICAL", newHead, oldHead
sk.chain.com.syncCurrent = newHead.BlockNumber
proc insertBlocks*(sk: SkeletonRef,
blocks: openArray[EthBlock],
fromEngine: bool): Result[uint64, string] =
for blk in blocks:
? sk.chain.importBlock(blk)
ok(blocks.len.uint64)
proc insertBlock*(sk: SkeletonRef,
header: BlockHeader,
fromEngine: bool): Result[uint64, string] =
let maybeBody = sk.getBody(header).valueOr:
return err(error)
if maybeBody.isNone:
return err("insertBlock: Block body not found: " & $header.u64)
sk.insertBlocks([EthBlock.init(header, maybeBody.get)], fromEngine)

View File

@ -1,99 +0,0 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
std/times,
chronicles,
results,
eth/common/eth_types,
../../utils/utils,
../../db/core_db,
../../core/chain
export
eth_types,
core_db,
chain,
chronicles,
results,
times
{.push gcsafe, raises: [].}
logScope:
topics = "skeleton"
type
# Contiguous header chain segment that is backed by the database,
# but may not be linked to the live chain. The skeleton downloader may produce
# a new one of these every time it is restarted until the subchain grows large
# enough to connect with a previous subchain.
Segment* = ref object
head*: uint64 # Block number of the newest header in the subchain
tail*: uint64 # Block number of the oldest header in the subchain
next*: Hash256 # Block hash of the next oldest header in the subchain
# Database entry to allow suspending and resuming a chain sync.
# As the skeleton header chain is downloaded backwards, restarts can and
# will produce temporarily disjoint subchains. There is no way to restart a
# suspended skeleton sync without prior knowledge of all prior suspension points.
Progress* = ref object
segments*: seq[Segment]
linked* : bool
canonicalHeadReset*: bool
SkeletonConfig* = ref object
fillCanonicalBackStep*: uint64
subchainMergeMinimum* : uint64
# The Skeleton chain class helps support beacon sync by accepting head blocks
# while backfill syncing the rest of the chain.
SkeletonRef* = ref object
progress*: Progress
pulled* : uint64 # Number of headers downloaded in this run
filling* : bool # Whether we are actively filling the canonical chain
started* : Time # Timestamp when the skeleton syncer was created
logged* : Time # Timestamp when progress was last logged to user
db* : CoreDbRef
chain* : ForkedChainRef
conf* : SkeletonConfig
fillLogIndex*: uint64
SkeletonStatus* = enum
SkeletonOk
# SyncReorged is a signal that the head chain of
# the current sync cycle was (partially) reorged, thus the skeleton syncer
# should abort and restart with the new state.
SyncReorged
# ReorgDenied is returned if an attempt is made to extend the beacon chain
# with a new header, but it does not link up to the existing sync.
ReorgDenied
# SyncMerged is a signal that the current sync cycle merged with
# a previously aborted subchain, thus the skeleton syncer
# should abort and restart with the new state.
SyncMerged
# Request to do fillCanonicalChain
FillCanonical
StatusAndNumber* = object
status*: set[SkeletonStatus]
number*: uint64
StatusAndReorg* = object
status*: set[SkeletonStatus]
reorg* : bool
BodyRange* = object
min*: uint64
max*: uint64

View File

@ -1,178 +0,0 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
./skeleton_desc,
./skeleton_utils,
./skeleton_db,
./skeleton_algo
export
skeleton_desc,
skeleton_algo.isLinked,
skeleton_algo.putBlocks,
skeleton_algo.fillCanonicalChain
{.push gcsafe, raises: [].}
logScope:
topics = "skeleton"
# ------------------------------------------------------------------------------
# Constructors
# ------------------------------------------------------------------------------
proc new*(_: type SkeletonRef, chain: ForkedChainRef): SkeletonRef =
SkeletonRef(
progress: Progress(),
pulled : 0,
filling : false,
chain : chain,
db : chain.db,
started : getTime(),
logged : getTime(),
conf : SkeletonConfig(
fillCanonicalBackStep: 100,
subchainMergeMinimum : 1000,
),
)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc open*(sk: SkeletonRef): Result[void, string] =
if sk.chain.com.ttd.isNone and sk.chain.com.ttdPassed.not:
return err("Cannot create skeleton as ttd and ttdPassed not set")
sk.readProgress().isOkOr:
return err(error)
sk.started = getTime()
ok()
proc setHead*(sk: SkeletonRef, head: BlockHeader,
force = true, init = false,
reorgthrow = false): Result[StatusAndReorg, string] =
## Announce and integrate a new head.
## @params head - The block being attempted as a new head
## @params force - Flag to indicate if this is just a check of
## worthiness or a actually new head
## @params init - Flag this is the first time since the beacon
## sync start to perform additional tasks
## @params reorgthrow - Flag to indicate if we would actually like
## to throw if there is a reorg
## instead of just returning the boolean
##
## @returns True if the head (will) cause a reorg in the
## canonical skeleton subchain
let
number = head.u64
debug "New skeleton head announced",
number,
hash=head.blockHashStr,
force
let reorg = sk.processNewHead(head, force).valueOr:
return err(error)
if force and reorg:
# It could just be a reorg at this head with previous tail preserved
let
subchain = if sk.isEmpty: Segment(nil)
else: sk.last
maybeParent = sk.getHeader(number - 1).valueOr:
return err(error)
parentHash = maybeParent.blockHash
if subchain.isNil or maybeParent.isNone or parentHash != head.parentHash:
let sub = segment(number, number, head.parentHash)
sk.push(sub)
debug "Created new subchain", sub
else:
# Only the head differed, tail is preserved
subchain.head = number
# Reset the filling of canonical head from tail on reorg
sk.progress.canonicalHeadReset = true
# Put this block irrespective of the force
sk.putHeader(head)
if init:
sk.trySubChainsMerge().isOkOr:
return err(error)
if (force and reorg) or init:
sk.progress.linked = sk.isLinked.valueOr:
return err(error)
if force or init:
sk.writeProgress()
var res = StatusAndReorg(reorg: reorg)
if force and sk.progress.linked:
res.status.incl FillCanonical
# Earlier we were throwing on reorg, essentially for the purposes for
# killing the reverse fetcher
# but it can be handled properly in the calling fn without erroring
if reorg and reorgthrow:
if force:
res.status.incl SyncReorged
else:
res.status.incl ReorgDenied
ok(res)
proc initSync*(sk: SkeletonRef, head: BlockHeader,
reorgthrow = false): Result[StatusAndReorg, string] =
## Setup the skeleton to init sync with head
## @params head - The block with which we want to init the skeleton head
## @params reorgthrow - If we would like the function to throw instead of
## silently return if there is reorg of the skeleton head
##
## @returns True if the skeleton was reorged trying to init else false
sk.setHead(head, true, true, reorgthrow)
func bodyRange*(sk: SkeletonRef): Result[BodyRange, string] =
## Get range of bodies need to be downloaded by synchronizer
var canonicalHead = sk.blockHeight
let subchain = sk.last
if sk.progress.canonicalHeadReset:
if subchain.tail > canonicalHead + 1:
return err("Canonical head should already be on or " &
"ahead subchain tail canonicalHead=" &
$canonicalHead & ", tail=" & $subchain.tail)
let newHead = if subchain.tail > 0: subchain.tail - 1
else: 0
canonicalHead = newHead
ok(BodyRange(
min: canonicalHead,
max: subchain.head,
))
# ------------------------------------------------------------------------------
# Getters and setters
# ------------------------------------------------------------------------------
func fillCanonicalBackStep*(sk: SkeletonRef): uint64 =
sk.conf.fillCanonicalBackStep
func subchainMergeMinimum*(sk: SkeletonRef): uint64 =
sk.conf.subchainMergeMinimum
proc `fillCanonicalBackStep=`*(sk: SkeletonRef, val: uint64) =
sk.conf.fillCanonicalBackStep = val
proc `subchainMergeMinimum=`*(sk: SkeletonRef, val: uint64) =
sk.conf.subchainMergeMinimum = val

View File

@ -1,126 +0,0 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
./skeleton_desc
{.push gcsafe, raises: [].}
logScope:
topics = "skeleton"
const
# How often to log sync status (in ms)
STATUS_LOG_INTERVAL* = initDuration(microseconds = 8000)
zeroBlockHash* = default(Hash256)
# ------------------------------------------------------------------------------
# Misc helpers
# ------------------------------------------------------------------------------
func u64*(h: BlockHeader): uint64 =
h.number
func blockHash*(x: Opt[BlockHeader]): Hash256 =
if x.isSome: x.get.blockHash
else: zeroBlockHash
func numberStr*(x: Opt[BlockHeader]): string =
if x.isSome: $(x.get.u64)
else: "N/A"
func blockHashStr*(x: Opt[BlockHeader]): string =
if x.isSome: x.get.blockHash.short
else: "N/A"
func blockHashStr*(x: BlockHeader): string =
x.blockHash.short
# ------------------------------------------------------------------------------
# Segment helpers
# ------------------------------------------------------------------------------
func segment*(head, tail: uint64, next: Hash256): Segment =
Segment(head: head, tail: tail, next: next)
func short(s: Segment): string =
s.next.short
func `$`*(s: Segment): string =
result = "head: " & $s.head &
", tail: " & $s.tail &
", next: " & s.short
# ------------------------------------------------------------------------------
# Progress helpers
# ------------------------------------------------------------------------------
proc add*(ss: Progress, s: Segment) =
ss.segments.add s
proc add*(ss: Progress, head, tail: uint64, next: Hash256) =
ss.add Segment(head: head, tail: tail, next: next)
# ------------------------------------------------------------------------------
# SkeletonRef helpers
# ------------------------------------------------------------------------------
func isEmpty*(sk: SkeletonRef): bool =
sk.progress.segments.len == 0
func notEmpty*(sk: SkeletonRef): bool =
sk.progress.segments.len > 0
func blockHeight*(sk: SkeletonRef): uint64 =
sk.chain.com.syncCurrent
func genesisHash*(sk: SkeletonRef): Hash256 =
sk.chain.com.genesisHash
func com*(sk: SkeletonRef): CommonRef =
sk.chain.com
func len*(sk: SkeletonRef): int =
sk.progress.segments.len
func last*(sk: SkeletonRef): Segment =
sk.progress.segments[^1]
func second*(sk: SkeletonRef): Segment =
sk.progress.segments[^2]
iterator subchains*(sk: SkeletonRef): Segment =
for sub in sk.progress.segments:
yield sub
iterator pairs*(sk: SkeletonRef): (int, Segment) =
for i, sub in sk.progress.segments:
yield (i, sub)
proc push*(sk: SkeletonRef, s: Segment) =
sk.progress.add s
proc push*(sk: SkeletonRef, head, tail: uint64, next: Hash256) =
sk.progress.add(head, tail, next)
proc removeLast*(sk: SkeletonRef) =
discard sk.progress.segments.pop
proc removeSecond*(sk: SkeletonRef) =
sk.progress.segments.delete(sk.len-2)
proc removeAllButLast*(sk: SkeletonRef) =
let last = sk.progress.segments.pop
for sub in sk.subchains:
debug "Canonical subchain linked with main, removing junked chains", sub
sk.progress.segments = @[last]
proc clear*(sk: SkeletonRef) =
sk.progress.segments.setLen(0)

View File

@ -1,214 +0,0 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises:[].}
import
chronicles,
chronos,
chronos/timer,
eth/p2p,
".."/[protocol, sync_desc],
./worker_desc,
./skeleton_main,
./beacon_impl
logScope:
topics = "beacon-buddy"
const
extraTraceMessages = false # or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Public start/stop and admin functions
# ------------------------------------------------------------------------------
proc setup*(ctx: BeaconCtxRef): bool =
## Global set up
ctx.pool.target.init()
ctx.pool.mask = HeaderInterval.init()
ctx.pool.pulled = HeaderInterval.init()
ctx.pool.skeleton = SkeletonRef.new(ctx.chain)
let res = ctx.pool.skeleton.open()
if res.isErr:
error "Cannot open beacon skeleton", msg=res.error
return false
ctx.pool.mode.incl bmResumeSync
true
proc release*(ctx: BeaconCtxRef) =
## Global clean up
discard
proc start*(buddy: BeaconBuddyRef): bool =
## Initialise worker peer
let
ctx = buddy.ctx
peer = buddy.peer
if peer.supports(protocol.eth) and
peer.state(protocol.eth).initialized:
ctx.daemon = true
return true
proc stop*(buddy: BeaconBuddyRef) =
## Clean up this peer
buddy.ctrl.stopped = true
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc runDaemon*(ctx: BeaconCtxRef) {.async.} =
## Global background job that will be re-started as long as the variable
## `ctx.daemon` is set `true`. If that job was stopped due to re-setting
## `ctx.daemon` to `false`, it will be restarted next after it was reset
## as `true` not before there is some activity on the `runPool()`,
## `runSingle()`, or `runMulti()` functions.
##
debug "RUNDAEMON", id=ctx.pool.id
# Just wake up after long sleep (e.g. client terminated)
if bmResumeSync in ctx.pool.mode:
discard await ctx.resumeSync()
ctx.pool.mode.excl bmResumeSync
# We get order from engine API
if ctx.pool.target.len > 0:
await ctx.setSyncTarget()
# Distributing jobs of filling gaps to peers
let mask = ctx.pool.mask
for x in mask.decreasing:
ctx.fillBlocksGaps(x.minPt, x.maxPt)
# Tell the `runPool` to grab job for each peer
if ctx.pool.jobs.len > 0:
ctx.poolMode = true
# Rerun this function next iteration
# if there are more new sync target
ctx.daemon = ctx.pool.target.len > 0
# Without waiting, this function repeats every 50ms (as set with the constant
# `sync_sched.execLoopTimeElapsedMin`.) Larger waiting time cleans up logging.
var sleepDuration = timer.milliseconds(300)
if ctx.pool.jobs.len == 0 and ctx.pool.target.len == 0:
sleepDuration = timer.seconds(5)
await sleepAsync sleepDuration
proc runSingle*(buddy: BeaconBuddyRef) {.async.} =
## This peer worker is invoked if the peer-local flag `buddy.ctrl.multiOk`
## is set `false` which is the default mode. This flag is updated by the
## worker when deemed appropriate.
## * For all workers, there can be only one `runSingle()` function active
## simultaneously for all worker peers.
## * There will be no `runMulti()` function active for the same worker peer
## simultaneously
## * There will be no `runPool()` iterator active simultaneously.
##
## Note that this function runs in `async` mode.
##
let
ctx = buddy.ctx
debug "RUNSINGLE", id=ctx.pool.id
if buddy.ctrl.stopped:
when extraTraceMessages:
trace "Single mode stopped", peer, pivotState=ctx.pool.pivotState
return # done with this buddy
var napping = timer.seconds(2)
when extraTraceMessages:
trace "Single mode end", peer, napping
# Without waiting, this function repeats every 50ms (as set with the constant
# `sync_sched.execLoopTimeElapsedMin`.)
await sleepAsync napping
# request new jobs, if available
if ctx.pool.jobs.len == 0:
ctx.daemon = true
proc runPool*(buddy: BeaconBuddyRef; last: bool; laps: int): bool =
## Once started, the function `runPool()` is called for all worker peers in
## sequence as the body of an iteration as long as the function returns
## `false`. There will be no other worker peer functions activated
## simultaneously.
##
## This procedure is started if the global flag `buddy.ctx.poolMode` is set
## `true` (default is `false`.) It will be automatically reset before the
## the loop starts. Re-setting it again results in repeating the loop. The
## argument `lap` (starting with `0`) indicated the currend lap of the
## repeated loops.
##
## The argument `last` is set `true` if the last entry is reached.
##
## Note that this function does not run in `async` mode.
##
let
ctx = buddy.ctx
debug "RUNPOOL", id=ctx.pool.id
# If a peer cannot finish it's job,
# we will put it back into circulation.
# A peer can also spawn more jobs.
if buddy.only.requeue.len > 0:
for job in buddy.only.requeue:
ctx.pool.jobs.addLast(job)
buddy.only.requeue.setLen(0)
buddy.only.job = nil
# Take distributed jobs for each peer
if ctx.pool.jobs.len > 0 and buddy.only.job.isNil:
buddy.only.job = ctx.pool.jobs.popFirst()
buddy.ctrl.multiOk = true
# If there is no more jobs, stop
ctx.pool.jobs.len == 0
proc runMulti*(buddy: BeaconBuddyRef) {.async.} =
## This peer worker is invoked if the `buddy.ctrl.multiOk` flag is set
## `true` which is typically done after finishing `runSingle()`. This
## instance can be simultaneously active for all peer workers.
##
let
ctx = buddy.ctx
debug "RUNMULTI", id=ctx.pool.id
# If each of peers get their job,
# execute it until failure or success
# It is also possible to spawn more jobs
if buddy.only.job.isNil.not:
await buddy.executeJob(buddy.only.job)
# Update persistent database
#while not buddy.ctrl.stopped:
# Allow thread switch as `persistBlocks()` might be slow
await sleepAsync timer.milliseconds(10)
# request new jobs, if available
if ctx.pool.jobs.len == 0:
ctx.daemon = true
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -1,97 +0,0 @@
# Nimbus
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises:[].}
import
std/deques,
stew/interval_set,
stew/keyed_queue,
eth/p2p,
chronos,
../sync_desc,
./skeleton_desc
export
deques,
interval_set,
keyed_queue
type
BeaconMode* = enum
bmNone ## Do nothing
bmResumeSync ## Resume sync if the client stopped
bmAppendTarget ## Put new sync target into queue
bmShiftTarget ## Get sync target from queue
BeaconJobGetBody* = object
headerHash*: Hash256 ## request body using this hash
sumHash* : Hash256 ## compare downloaded body to this hash
header* : BlockHeader ## downloaded header
body* : BlockBody ## downloaded body
setHead* : bool ## true: setHead, false: putBlocks
BeaconJobGetBlocks* = object
number* : uint64 ## starting number of blocks
maxResults*: uint64 ## number of blocks we want to download
headers* : seq[BlockHeader] ## downloaded headers
bodies* : seq[BlockBody] ## downloaded bodies
BeaconJobGetBodies* = object
headers* : seq[BlockHeader] ## downloaded headers
bodies* : seq[BlockBody] ## downloaded bodies
headerHash*: seq[Hash256] ## request to download bodies using this hashes
reqBodies* : seq[bool] ## skip downloading body if header has no body
BeaconJobMode* = enum
bjmGetBody ## when setSyncTarget done, download the body
bjmGetBlocks ## download blocks to fill skeleton gaps
bjmGetBodies ## if bjmGetBlocks failed to download bodies, give it to other peer
BeaconJob* = ref object
case mode*: BeaconJobMode
of bjmGetBody:
getBodyJob*: BeaconJobGetBody
of bjmGetBlocks:
getBlocksJob*: BeaconJobGetBlocks
of bjmGetBodies:
getBodiesJob*: BeaconJobGetBodies
BeaconBuddyData* = object
## Local descriptor data extension
job* : BeaconJob
requeue*: seq[BeaconJob]
TargetQueue* = KeyedQueue[Hash256, BlockHeader]
HeaderInterval* = IntervalSetRef[uint64, uint64]
BeaconCtxData* = object
## Globally shared data extension
rng* : ref HmacDrbgContext ## Random generator, pre-initialised
id* : int ## Instance id, for debugging purpose
skeleton*: SkeletonRef ## Core algorithm, tracking both canonical and side chain
mode* : set[BeaconMode] ## Do one thing at a time
target* : TargetQueue ## Sync target list
jobs* : Deque[BeaconJob] ## Each buddy can get a job from here
mask* : HeaderInterval ## Skeleton gaps need to be downloaded
pulled* : HeaderInterval ## Downloaded skeleton blocks
BeaconBuddyRef* = BuddyRef[BeaconCtxData,BeaconBuddyData]
## Extended worker peer descriptor
BeaconCtxRef* = CtxRef[BeaconCtxData]
## Extended global descriptor
const
MaxGetBlocks* = 64
MaxJobsQueue* = 32
MissingBody* = -1
# End

View File

@ -38,7 +38,6 @@ cliBuilder:
./test_txpool2,
./test_engine_api,
./test_eip4844,
./test_beacon/test_skeleton,
./test_getproof_json,
./test_aristo,
./test_coredb

View File

@ -1,193 +0,0 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
stew/byteutils,
../../nimbus/core/chain/forked_chain,
../../nimbus/core/pow/difficulty,
../../nimbus/config,
../../nimbus/common,
../../nimbus/sync/beacon/skeleton_desc
const
genesisFile = "tests/customgenesis/post-merge.json"
type
Subchain* = object
head*: uint64
tail*: uint64
TestEnv* = object
conf* : NimbusConf
chain*: ForkedChainRef
CCModify = proc(cc: NetworkParams)
let
block49* = BlockHeader(
number: 49.BlockNumber
)
block49B* = BlockHeader(
number: 49.BlockNumber,
extraData: @['B'.byte]
)
block50* = BlockHeader(
number: 50.BlockNumber,
parentHash: block49.blockHash
)
block50B* = BlockHeader(
number: 50.BlockNumber,
parentHash: block49.blockHash,
gasLimit: 999.GasInt,
)
block51* = BlockHeader(
number: 51.BlockNumber,
parentHash: block50.blockHash
)
proc setupEnv*(extraValidation: bool = false, ccm: CCModify = nil): TestEnv =
let
conf = makeConfig(@[
"--custom-network:" & genesisFile
])
if ccm.isNil.not:
ccm(conf.networkParams)
let
com = CommonRef.new(
newCoreDbRef DefaultDbMemory,
conf.networkId,
conf.networkParams
)
chain = newForkedChain(com, com.genesisHeader, extraValidation = extraValidation)
TestEnv(
conf : conf,
chain: chain,
)
func subchain*(head, tail: uint64): Subchain =
Subchain(head: head, tail: tail)
func header*(bn: uint64, temp, parent: BlockHeader, diff: uint64): BlockHeader =
BlockHeader(
number: bn.BlockNumber,
parentHash : parent.blockHash,
difficulty : diff.u256,
timestamp : parent.timestamp + 1,
gasLimit : temp.gasLimit,
stateRoot : temp.stateRoot,
txRoot : temp.txRoot,
baseFeePerGas : temp.baseFeePerGas,
receiptsRoot : temp.receiptsRoot,
ommersHash : temp.ommersHash,
withdrawalsRoot: temp.withdrawalsRoot,
blobGasUsed : temp.blobGasUsed,
excessBlobGas : temp.excessBlobGas,
parentBeaconBlockRoot: temp.parentBeaconBlockRoot,
)
func header*(com: CommonRef, bn: uint64, temp, parent: BlockHeader): BlockHeader =
result = header(bn, temp, parent, 0)
result.difficulty = com.calcDifficulty(result.timestamp, parent)
func header*(bn: uint64, temp, parent: BlockHeader,
diff: uint64, stateRoot: string): BlockHeader =
result = header(bn, temp, parent, diff)
result.stateRoot = Hash256(data: hextoByteArray[32](stateRoot))
func header*(com: CommonRef, bn: uint64, temp, parent: BlockHeader,
stateRoot: string): BlockHeader =
result = com.header(bn, temp, parent)
result.stateRoot = Hash256(data: hextoByteArray[32](stateRoot))
func emptyBody*(): BlockBody =
BlockBody(
transactions: @[],
uncles: @[],
withdrawals: Opt.none(seq[Withdrawal]),
)
template fillCanonical(skel, z, stat) =
if z.status == stat and FillCanonical in z.status:
let xx = skel.fillCanonicalChain()
check xx.isOk
if xx.isErr:
debugEcho "FillCanonicalChain: ", xx.error
break
template initSyncT*(skel, blk: untyped, r = false) =
let x = skel.initSync(blk)
check x.isOk
if x.isErr:
debugEcho "initSync:", x.error
break
let z = x.get
check z.reorg == r
template setHeadT*(skel, blk, frc, r) =
let x = skel.setHead(blk, frc)
check x.isOk
if x.isErr:
debugEcho "setHead:", x.error
break
let z = x.get
check z.reorg == r
template initSyncT*(skel, blk, r, stat) =
let x = skel.initSync(blk)
check x.isOk
if x.isErr:
debugEcho "initSync:", x.error
break
let z = x.get
check z.reorg == r
check z.status == stat
fillCanonical(skel, z, stat)
template setHeadT*(skel, blk, frc, r, stat) =
let x = skel.setHead(blk, frc)
check x.isOk
if x.isErr:
debugEcho "setHead:", x.error
break
let z = x.get
check z.reorg == r
check z.status == stat
fillCanonical(skel, z, stat)
template putBlocksT*(skel, blocks, numBlocks, stat) =
let x = skel.putBlocks(blocks)
check x.isOk
if x.isErr:
debugEcho "putBlocks: ", x.error
break
let z = x.get
check z.number == numBlocks
check z.status == stat
fillCanonical(skel, z, stat)
template isLinkedT*(skel, r) =
let x = skel.isLinked()
check x.isOk
if x.isErr:
debugEcho "isLinked: ", x.error
break
check x.get == r
template getHeaderClean*(skel, headers) =
for header in headers:
var r = skel.getHeader(header.u64, true)
check r.isOk
check r.get.isNone
r = skel.getHeader(header.blockHash, true)
check r.isOk
check r.get.isNone

View File

@ -1,201 +0,0 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
unittest2,
./setup_env,
../../nimbus/sync/beacon/skeleton_main,
../../nimbus/sync/beacon/skeleton_utils,
../../nimbus/sync/beacon/skeleton_db
type
TestCase = object
name : string
blocks : seq[BlockHeader] # Database content (besides the genesis)
oldState: seq[Subchain] # Old sync state with various interrupted subchains
head : BlockHeader # New head header to announce to reorg to
newState: seq[Subchain] # Expected sync state after the reorg
reorg : bool
let testCases = [
# The sync is expected to create a single subchain with the requested head.
TestCase(
name: "Completely empty database with only the genesis set.",
head: block50,
newState: @[subchain(50, 50)],
reorg: true
),
# This is a synthetic case, just for the sake of covering things.
TestCase(
name: "Empty database with only the genesis set with a leftover empty sync progress",
head: block50,
newState: @[subchain(50, 50)],
reorg: true
),
# The old subchain should be left as is and a new one appended to the sync status.
TestCase(
name: "A single leftover subchain is present, older than the new head.",
oldState: @[subchain(10, 5)],
head: block50,
newState: @[
subchain(10, 5),
subchain(50, 50),
],
reorg: true
),
# The old subchains should be left as is and a new one appended to the sync status.
TestCase(
name: "Multiple leftover subchains are present, older than the new head.",
oldState: @[
subchain(10, 5),
subchain(20, 15),
],
head: block50,
newState: @[
subchain(10, 5),
subchain(20, 15),
subchain(50, 50),
],
reorg: true
),
# The newer subchain should be deleted and a fresh one created for the head.
TestCase(
name: "A single leftover subchain is present, newer than the new head.",
oldState: @[subchain(65, 60)],
head: block50,
newState: @[subchain(50, 50)],
reorg: true
),
# The newer subchains should be deleted and a fresh one created for the head.
TestCase(
name: "Multiple leftover subchain is present, newer than the new head.",
oldState: @[
subchain(65, 60),
subchain(75, 70),
],
head: block50,
newState: @[subchain(50, 50)],
reorg: true
),
# than the announced head. The head should delete the newer one,
# keeping the older one.
TestCase(
name: "Two leftover subchains are present, one fully older and one fully newer",
oldState: @[
subchain(10, 5),
subchain(65, 60),
],
head: block50,
newState: @[
subchain(10, 5),
subchain(50, 50),
],
reorg: true
),
# than the announced head. The head should delete the newer
# ones, keeping the older ones.
TestCase(
name: "Multiple leftover subchains are present, some fully older and some fully newer",
oldState: @[
subchain(10, 5),
subchain(20, 15),
subchain(65, 60),
subchain(75, 70),
],
head: block50,
newState: @[
subchain(10, 5),
subchain(20, 15),
subchain(50, 50),
],
reorg: true
),
# it with one more header. We expect the subchain head to be pushed forward.
TestCase(
name: "A single leftover subchain is present and the new head is extending",
blocks: @[block49],
oldState: @[subchain(49, 5)],
head: block50,
newState: @[subchain(50, 5)],
reorg: false
),
# A single leftover subchain is present. A new head is announced that
# links into the middle of it, correctly anchoring into an existing
# header. We expect the old subchain to be truncated and extended with
# the new head.
TestCase(
name: "Duplicate announcement should not modify subchain",
blocks: @[block49, block50],
oldState: @[subchain(100, 5)],
head: block50,
newState: @[subchain(100, 5)],
reorg: false
),
# A single leftover subchain is present. A new head is announced that
# links into the middle of it, correctly anchoring into an existing
# header. We expect the old subchain to be truncated and extended with
# the new head.
TestCase(
name: "A new alternate head is announced in the middle should truncate subchain",
blocks: @[block49, block50],
oldState: @[subchain(100, 5)],
head: block50B,
newState: @[subchain(50, 5)],
reorg: true
),
# A single leftover subchain is present. A new head is announced that
# links into the middle of it, but does not anchor into an existing
# header. We expect the old subchain to be truncated and a new chain
# be created for the dangling head.
TestCase(
name: "The old subchain to be truncated and a new chain be created for the dangling head",
blocks: @[block49B],
oldState: @[subchain(100, 5)],
head: block50,
newState: @[
subchain(49, 5),
subchain(50, 50),
],
reorg: true
),
]
proc test1*() =
suite "Tests various sync initializations":
# based on previous leftovers in the database
# and announced heads.
for z in testCases:
test z.name:
let env = setupEnv()
let skel = SkeletonRef.new(env.chain)
let res = skel.open()
check res.isOk
if res.isErr:
debugEcho res.error
break
for header in z.blocks:
skel.putHeader(header)
for x in z.oldState:
skel.push(x.head, x.tail, default(Hash256))
let r = skel.initSync(z.head).valueOr:
debugEcho "initSync: ", error
check false
break
check r.status.card == 0
check r.reorg == z.reorg
check skel.len == z.newState.len
for i, sc in skel:
check sc.head == z.newState[i].head
check sc.tail == z.newState[i].tail

View File

@ -1,125 +0,0 @@
# Nimbus
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
unittest2,
./setup_env,
../../nimbus/sync/beacon/skeleton_main,
../../nimbus/sync/beacon/skeleton_utils
# Tests that a running skeleton sync can be extended with properly linked up
# headers but not with side chains.
type
TestCase = object
name : string
blocks : seq[BlockHeader] # Database content (besides the genesis)
head : BlockHeader # New head header to announce to reorg to
extend : BlockHeader # New head header to announce to extend with
force : bool # To force set head not just to extend
newState: seq[Subchain] # Expected sync state after the reorg
err : Opt[SkeletonStatus] # Whether extension succeeds or not
let testCases = [
# Initialize a sync and try to extend it with a subsequent block.
TestCase(
name: "Initialize a sync and try to extend it with a subsequent block",
head: block49,
extend: block50,
force: true,
newState: @[subchain(50, 49)],
),
# Initialize a sync and try to extend it with the existing head block.
TestCase(
name: "Initialize a sync and try to extend it with the existing head block",
blocks: @[block49],
head: block49,
extend: block49,
newState: @[subchain(49, 49)],
),
# Initialize a sync and try to extend it with a sibling block.
TestCase(
name: "Initialize a sync and try to extend it with a sibling block",
head: block49,
extend: block49B,
newState: @[subchain(49, 49)],
err: Opt.some ReorgDenied,
),
# Initialize a sync and try to extend it with a number-wise sequential
# header, but a hash wise non-linking one.
TestCase(
name: "Initialize a sync and try to extend it with a number-wise sequential alternate block",
head: block49B,
extend: block50,
newState: @[subchain(49, 49)],
err: Opt.some ReorgDenied,
),
# Initialize a sync and try to extend it with a non-linking future block.
TestCase(
name: "Initialize a sync and try to extend it with a non-linking future block",
head: block49,
extend: block51,
newState: @[subchain(49, 49)],
err: Opt.some ReorgDenied,
),
# Initialize a sync and try to extend it with a past canonical block.
TestCase(
name: "Initialize a sync and try to extend it with a past canonical block",
head: block50,
extend: block49,
newState: @[subchain(50, 50)],
err: Opt.some ReorgDenied,
),
# Initialize a sync and try to extend it with a past sidechain block.
TestCase(
name: "Initialize a sync and try to extend it with a past sidechain block",
head: block50,
extend: block49B,
newState: @[subchain(50, 50)],
err: Opt.some ReorgDenied,
),
]
proc test2*() =
suite "Tests that a running skeleton sync can be extended":
for z in testCases:
test z.name:
let env = setupEnv()
let skel = SkeletonRef.new(env.chain)
let res = skel.open()
check res.isOk
if res.isErr:
debugEcho res.error
check false
break
let x = skel.initSync(z.head).valueOr:
debugEcho "initSync: ", error
check false
break
check x.status.card == 0
check x.reorg == true
let r = skel.setHead(z.extend, z.force, false, true).valueOr:
debugEcho "setHead: ", error
check false
break
if z.err.isSome:
check r.status.card == 1
check z.err.get in r.status
else:
check r.status.card == 0
check skel.len == z.newState.len
for i, sc in skel:
check sc.head == z.newState[i].head
check sc.tail == z.newState[i].tail

View File

@ -1,109 +0,0 @@
# Nimbus
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
unittest2,
./setup_env,
../../nimbus/sync/beacon/skeleton_main,
../../nimbus/sync/beacon/skeleton_utils
proc test3*() =
suite "should init/setHead properly from genesis":
let env = setupEnv()
let skel = SkeletonRef.new(env.chain)
test "skel open ok":
let res = skel.open()
check res.isOk
if res.isErr:
debugEcho res.error
check false
break
let
genesis = env.chain.com.genesisHeader
block1 = header(1, genesis, genesis, 100)
block2 = header(2, genesis, block1, 100)
block3 = header(3, genesis, block1, 100)
test "should not reorg on genesis init":
skel.initSyncT(genesis, false)
test "should not reorg on genesis announcement":
skel.setHeadT(genesis, false, false)
test "should not reorg on genesis setHead":
skel.setHeadT(genesis, true, false)
test "no subchain should have been created":
check skel.len == 0
test "should not allow putBlocks since no subchain set":
let r = skel.putBlocks([block1])
check r.isErr
test "canonical height should be at genesis":
check skel.blockHeight == 0
test "should not reorg on valid first block":
skel.setHeadT(block1, false, false)
test "no subchain should have been created":
check skel.len == 0
test "should not reorg on valid first block":
skel.setHeadT(block1, true, false)
test "subchain should have been created":
check skel.len == 1
test "head should be set to first block":
check skel.last.head == 1
test "subchain status should be linked":
skel.isLinkedT(true)
test "should not reorg on valid second block":
skel.setHeadT(block2, true, false)
test "subchain should be same":
check skel.len == 1
test "head should be set to first block":
check skel.last.head == 2
test "subchain status should stay linked":
skel.isLinkedT(true)
test "should not extend on invalid third block":
skel.setHeadT(block3, false, true)
# since its not a forced update so shouldn"t affect subchain status
test "subchain should be same":
check skel.len == 1
test "head should be set to second block":
check skel.last.head == 2
test "subchain status should stay linked":
skel.isLinkedT(true)
test "should not extend on invalid third block":
skel.setHeadT(block3, true, true)
# since its not a forced update so shouldn"t affect subchain status
test "new subchain should be created":
check skel.len == 2
test "head should be set to third block":
check skel.last.head == 3
test "subchain status should not be linked anymore":
skel.isLinkedT(false)

View File

@ -1,64 +0,0 @@
# Nimbus
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
unittest2,
./setup_env,
../../nimbus/sync/beacon/skeleton_main,
../../nimbus/sync/beacon/skeleton_utils,
../../nimbus/sync/beacon/skeleton_db
proc test4*() =
suite "should fill the canonical chain after being linked to genesis":
let env = setupEnv()
let skel = SkeletonRef.new(env.chain)
test "skel open ok":
let res = skel.open()
check res.isOk
if res.isErr:
debugEcho res.error
check false
break
let
genesis = env.chain.com.genesisHeader
block1 = header(1, genesis, genesis, 100)
block2 = header(2, genesis, block1, 100)
block3 = header(3, genesis, block2, 100)
block4 = header(4, genesis, block3, 100)
block5 = header(5, genesis, block4, 100)
emptyBody = emptyBody()
test "put body":
for header in [block1, block2, block3, block4, block5]:
let res = skel.putBody(header, emptyBody)
check res.isOk
test "canonical height should be at genesis":
skel.initSyncT(block4, true)
skel.putBlocksT([block3, block2], 2, {})
check skel.blockHeight == 0
test "canonical height should update after being linked":
skel.putBlocksT([block1], 1, {FillCanonical})
check skel.blockHeight == 4
test "canonical height should not change when setHead is set with force=false":
skel.setHeadT(block5, false, false, {})
check skel.blockHeight == 4
test "canonical height should change when setHead is set with force=true":
skel.setHeadT(block5, true, false, {FillCanonical})
check skel.blockHeight == 5
test "skel header should be cleaned up after filling canonical chain":
let headers = [block1, block2, block3, block4, block5]
skel.getHeaderClean(headers)

View File

@ -1,68 +0,0 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
unittest2,
./setup_env,
../../nimbus/sync/beacon/skeleton_main,
../../nimbus/sync/beacon/skeleton_utils,
../../nimbus/sync/beacon/skeleton_db
proc test5*() =
suite "should fill the canonical chain after being linked to a canonical block past genesis":
let env = setupEnv()
let skel = SkeletonRef.new(env.chain)
test "skel open ok":
let res = skel.open()
check res.isOk
if res.isErr:
debugEcho res.error
check false
break
let
genesis = env.chain.com.genesisHeader
block1 = header(1, genesis, genesis, 100)
block2 = header(2, genesis, block1, 100)
block3 = header(3, genesis, block2, 100)
block4 = header(4, genesis, block3, 100)
block5 = header(5, genesis, block4, 100)
emptyBody = emptyBody()
test "put body":
for header in [block1, block2, block3, block4, block5]:
let res = skel.putBody(header, emptyBody)
check res.isOk
test "canonical height should be at block 2":
let r = skel.insertBlocks([
EthBlock.init(block1, emptyBody), EthBlock.init(block2, emptyBody)], false)
check r.isOk
check r.get == 2
skel.initSyncT(block4, true)
check skel.blockHeight == 2
test "canonical height should update after being linked":
skel.putBlocksT([block3], 1, {FillCanonical})
check skel.blockHeight == 4
test "canonical height should not change when setHead with force=false":
skel.setHeadT(block5, false, false, {})
check skel.blockHeight == 4
test "canonical height should change when setHead with force=true":
skel.setHeadT(block5, true, false, {FillCanonical})
check skel.blockHeight == 5
test "skel header should be cleaned up after filling canonical chain":
let headers = [block3, block4, block5]
skel.getHeaderClean(headers)

View File

@ -1,82 +0,0 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
std/options,
stew/byteutils,
unittest2,
./setup_env,
../../nimbus/sync/beacon/skeleton_main,
../../nimbus/sync/beacon/skeleton_utils,
../../nimbus/sync/beacon/skeleton_db
proc ccm(cc: NetworkParams) =
cc.config.terminalTotalDifficulty = Opt.some(262000.u256)
cc.genesis.extraData = hexToSeqByte("0x000000000000000000")
cc.genesis.difficulty = 1.u256
proc test6*() =
suite "should abort filling the canonical chain if the terminal block is invalid":
let env = setupEnv(extraValidation = true, ccm)
let skel = SkeletonRef.new(env.chain)
test "skel open ok":
let res = skel.open()
check res.isOk
if res.isErr:
debugEcho res.error
check false
break
let
genesis = env.chain.com.genesisHeader
block1 = env.chain.com.header(1, genesis, genesis,
"6BD9564DD3F4028E3E56F62F1BE52EC8F893CC4FD7DB75DB6A1DC3EB2858998C")
block2 = env.chain.com.header(2, block1, block1,
"32DAA84E151F4C8C6BD4D9ADA4392488FFAFD42ACDE1E9C662B3268C911A5CCC")
block3PoW = env.chain.com.header(3, block2, block2)
block3PoS = header(3, block2, block2, 0)
block4InvalidPoS = header(4, block3PoS, block3PoW, 0)
block4PoS = header(4, block3PoS, block3PoS, 0)
block5 = header(5, block4PoS, block4PoS, 0)
emptyBody = emptyBody()
test "put body":
for header in [block1, block2, block3PoW, block3PoS, block4InvalidPoS, block4PoS, block5]:
let res = skel.putBody(header, emptyBody)
check res.isOk
test "canonical height should be at genesis":
skel.initSyncT(block4InvalidPoS, true)
skel.putBlocksT([block3PoW, block2], 2, {})
check skel.blockHeight == 0
test "canonical height should stop at block 2":
# (valid terminal block), since block 3 is invalid (past ttd)
skel.putBlocksT([block1], 1, {FillCanonical})
check skel.blockHeight == 2
test "canonical height should not change when setHead is set with force=false":
skel.setHeadT(block5, false, true, {})
check skel.blockHeight == 2
test "canonical height should now be at head with correct chain":
# Put correct chain
skel.initSyncT(block4PoS, true, {})
skel.putBlocksT([block3PoS], 1, {FillCanonical})
check skel.blockHeight == 4
test "canonical height should now be at head with correct chain":
check env.chain.latestHash == block4PoS.blockHash
test "should update to new height":
skel.setHeadT(block5, true, false)
check skel.last.head == 5

View File

@ -1,63 +0,0 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
unittest2,
stew/byteutils,
./setup_env,
../../nimbus/sync/beacon/skeleton_main,
../../nimbus/sync/beacon/skeleton_utils,
../../nimbus/sync/beacon/skeleton_db
proc ccm(cc: NetworkParams) =
cc.config.terminalTotalDifficulty = Opt.some(262000.u256)
cc.genesis.extraData = hexToSeqByte("0x000000000000000000")
cc.genesis.difficulty = 1.u256
proc test7*() =
suite "should abort filling the canonical chain and backstep if the terminal block is invalid":
let env = setupEnv(extraValidation = true, ccm)
let skel = SkeletonRef.new(env.chain)
test "skel open ok":
let res = skel.open()
check res.isOk
if res.isErr:
debugEcho res.error
check false
break
let
genesis = env.chain.com.genesisHeader
block1 = env.chain.com.header(1, genesis, genesis,
"6BD9564DD3F4028E3E56F62F1BE52EC8F893CC4FD7DB75DB6A1DC3EB2858998C")
block2 = env.chain.com.header(2, block1, block1,
"32DAA84E151F4C8C6BD4D9ADA4392488FFAFD42ACDE1E9C662B3268C911A5CCC")
block3PoW = env.chain.com.header(3, block2, block2)
block4InvalidPoS = header(4, block3PoW, block3PoW, 0)
emptyBody = emptyBody()
test "put body":
for header in [block1, block2, block3PoW, block4InvalidPoS]:
let res = skel.putBody(header, emptyBody)
check res.isOk
test "canonical height should be at genesis":
skel.initSyncT(block4InvalidPoS, true)
skel.putBlocksT([block3PoW, block2], 2, {})
check skel.blockHeight == 0
test "canonical height should stop at block 2":
# (valid terminal block), since block 3 is invalid (past ttd)
skel.putBlocksT([block1], 1, {FillCanonical})
check skel.blockHeight == 2
test "Subchain should have been backstepped to 4":
check skel.last.tail == 4

View File

@ -1,64 +0,0 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
unittest2,
stew/byteutils,
./setup_env,
../../nimbus/sync/beacon/skeleton_main,
../../nimbus/sync/beacon/skeleton_utils,
../../nimbus/sync/beacon/skeleton_db
proc ccm(cc: NetworkParams) =
cc.config.terminalTotalDifficulty = Opt.some(262000.u256)
cc.genesis.extraData = hexToSeqByte("0x000000000000000000")
cc.genesis.difficulty = 1.u256
proc test8*() =
suite "should abort filling the canonical chain if a PoS block comes too early without hitting ttd":
let env = setupEnv(extraValidation = true, ccm)
let skel = SkeletonRef.new(env.chain)
skel.fillCanonicalBackStep = 0
test "skel open ok":
let res = skel.open()
check res.isOk
if res.isErr:
debugEcho res.error
check false
break
let
genesis = env.chain.com.genesisHeader
block1 = env.chain.com.header(1, genesis, genesis,
"6BD9564DD3F4028E3E56F62F1BE52EC8F893CC4FD7DB75DB6A1DC3EB2858998C")
block2 = env.chain.com.header(2, block1, block1,
"32DAA84E151F4C8C6BD4D9ADA4392488FFAFD42ACDE1E9C662B3268C911A5CCC")
block2PoS = header(2, block1, block1, 0)
block3 = header(3, block2, block2, 0)
emptyBody = emptyBody()
test "put body":
for header in [block1, block2, block2PoS, block3]:
let res = skel.putBody(header, emptyBody)
check res.isOk
test "canonical height should stop at block 1":
# (valid PoW block), since block 2 is invalid (invalid PoS, not past ttd)
skel.initSyncT(block2PoS, true)
skel.putBlocksT([block1], 1, {FillCanonical})
check skel.blockHeight == 1
test "canonical height should now be at head with correct chain":
# Put correct chain
skel.initSyncT(block3, true)
skel.putBlocksT([block2], 1, {FillCanonical})
check skel.blockHeight == 3
check env.chain.latestHash == block3.blockHash

View File

@ -1,91 +0,0 @@
# Nimbus
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at
# https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at
# https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
unittest2,
test_1_initsync,
test_2_extend,
test_3_sethead_genesis,
test_4_fill_canonical,
test_5_canonical_past_genesis,
test_6_abort_filling,
test_7_abort_and_backstep,
test_8_pos_too_early,
../../nimbus/sync/beacon/skeleton_main,
./setup_env
proc noTtdAndNoTtdPassed(cc: NetworkParams) =
cc.config.terminalTotalDifficultyPassed = Opt.none(bool)
cc.config.terminalTotalDifficulty = Opt.none(UInt256)
cc.genesis.difficulty = 1.u256
proc ttdPassedNoTtd(cc: NetworkParams) =
cc.config.terminalTotalDifficultyPassed = Opt.some(true)
cc.config.terminalTotalDifficulty = Opt.none(UInt256)
cc.genesis.difficulty = 1.u256
proc ttdNoTtdPassed(cc: NetworkParams) =
cc.config.terminalTotalDifficultyPassed = Opt.none(bool)
cc.config.terminalTotalDifficulty = Opt.some(0.u256)
cc.genesis.difficulty = 1.u256
proc ttdAndTtdPassed(cc: NetworkParams) =
cc.config.terminalTotalDifficultyPassed = Opt.some(true)
cc.config.terminalTotalDifficulty = Opt.some(0.u256)
cc.genesis.difficulty = 1.u256
proc ttdPassedFalseNoTtd(cc: NetworkParams) =
cc.config.terminalTotalDifficultyPassed = Opt.some(false)
cc.config.terminalTotalDifficulty = Opt.none(UInt256)
cc.genesis.difficulty = 1.u256
proc skeletonMain*() =
test1()
test2()
test3()
test4()
test5()
test6()
test7()
test8()
suite "test skeleton open":
test "skeleton open should error if both ttd and ttdPassed not set":
let env = setupEnv(extraValidation = true, noTtdAndNoTtdPassed)
let skel = SkeletonRef.new(env.chain)
let res = skel.open()
check res.isErr
test "skeleton open should ok if ttdPassed is set":
let env = setupEnv(extraValidation = true, ttdPassedNoTtd)
let skel = SkeletonRef.new(env.chain)
let res = skel.open()
check res.isOk
test "skeleton open should ok if ttd is set":
let env = setupEnv(extraValidation = true, ttdNoTtdPassed)
let skel = SkeletonRef.new(env.chain)
let res = skel.open()
check res.isOk
test "skeleton open should ok if both ttd and ttdPassed are set":
let env = setupEnv(extraValidation = true, ttdAndTtdPassed)
let skel = SkeletonRef.new(env.chain)
let res = skel.open()
check res.isOk
test "skeleton open should error if ttd not set and ttdPassed are false":
let env = setupEnv(extraValidation = true, ttdPassedFalseNoTtd)
let skel = SkeletonRef.new(env.chain)
let res = skel.open()
check res.isErr
when isMainModule:
skeletonMain()