Working prototype of beacon downloader (#1780)

* Working prototype of beacon downloader

* Use KeyedQueue for setSyncTarget queue

* Convert MergeTracker and PayloadQueue to non ref object
This commit is contained in:
andri lim 2023-09-28 13:20:12 +07:00 committed by GitHub
parent 6d132811ba
commit e8d59bc7a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 747 additions and 150 deletions

View File

@ -121,7 +121,10 @@ proc newEngineEnv*(conf: var NimbusConf, chainFile: string, enableAuth: bool): E
sealer = SealingEngineRef.new(
chain, ctx, conf.engineSigner,
txPool, EngineStopped)
sync = BeaconSyncRef.init(node, chain, ctx.rng, conf.maxPeers, id=conf.tcpPort.int)
sync = if com.ttd().isSome:
BeaconSyncRef.init(node, chain, ctx.rng, conf.maxPeers, id=conf.tcpPort.int)
else:
BeaconSyncRef(nil)
beaconEngine = BeaconEngineRef.new(txPool, chain)
setupEthRpc(node, ctx, com, txPool, server)
@ -140,7 +143,9 @@ proc newEngineEnv*(conf: var NimbusConf, chainFile: string, enableAuth: bool): E
let client = newRpcHttpClient()
waitFor client.connect("127.0.0.1", conf.rpcPort, false)
sync.start()
if com.ttd().isSome:
sync.start()
node.startListening()
EngineEnv(
@ -155,7 +160,8 @@ proc newEngineEnv*(conf: var NimbusConf, chainFile: string, enableAuth: bool): E
proc close*(env: EngineEnv) =
waitFor env.node.closeWait()
env.sync.stop()
if not env.sync.isNil:
env.sync.stop()
waitFor env.client.close()
waitFor env.sealer.stop()
waitFor env.server.closeWait()

View File

@ -16,6 +16,10 @@ type
run* : proc(spec: BaseSpec): bool
spec* : BaseSpec
const
DefaultTimeout* = 60 # seconds
DefaultSleep* = 1
template testCond*(expr: untyped) =
if not (expr):
return false

View File

@ -256,7 +256,7 @@ let wdTestList* = [
"- Wait for sync, which include syncing a pre-Withdrawals block, and verify withdrawn account's balance\n",
run: specExecute[SyncSpec],
spec: SyncSpec(
timeoutSeconds: 50,
timeoutSeconds: 100,
wdForkHeight: 2,
wdBlockCount: 128,
wdPerBlock: MAINNET_MAX_WITHDRAWAL_COUNT_PER_BLOCK,

View File

@ -15,10 +15,16 @@ type
syncSteps*: int # Sync block chunks that will be passed as head through FCUs to the syncing client
syncShouldFail*: bool
timeoutSeconds*: int
sleep*: int
proc doSync(ws: SyncSpec, client: RpcClient, clMock: CLMocker): Future[bool] {.async.} =
let period = chronos.seconds(1)
if ws.sleep == 0:
ws.sleep = DefaultSleep
let period = chronos.seconds(ws.sleep)
var loop = 0
if ws.timeoutSeconds == 0:
ws.timeoutSeconds = DefaultTimeout
while loop < ws.timeoutSeconds:
let res = client.newPayloadV2(clMock.latestExecutedPayload.V1V2)
discard res
@ -36,7 +42,7 @@ proc doSync(ws: SyncSpec, client: RpcClient, clMock: CLMocker): Future[bool] {.a
error "Syncing client rejected valid chain"
await sleepAsync(period)
inc loop
loop += ws.sleep
return false

View File

@ -9,6 +9,7 @@
import
std/[tables, strutils, times],
../../nimbus/utils/utils,
unittest2
export
@ -45,6 +46,6 @@ proc print*(stat: SimStat, dur: Duration, name: string) =
f.write("\n")
f.write(" - " & $stat)
f.write("\n")
f.write(" - " & $dur)
f.write(" - " & dur.short)
f.write("\n")
f.close()

View File

@ -78,7 +78,7 @@ proc forkchoiceUpdated*(ben: BeaconEngineRef,
info "Forkchoice requested sync to new head",
number = header.blockNumber,
hash = blockHash
hash = blockHash.short
# Update sync header (if any)
com.syncReqNewHead(header)
@ -117,7 +117,7 @@ proc forkchoiceUpdated*(ben: BeaconEngineRef,
if db.getBlockHash(header.blockNumber, canonHash) and canonHash == blockHash:
# TODO should this be possible?
# If we allow these types of reorgs, we will do lots and lots of reorgs during sync
warn "Reorg to previous block"
debug "Reorg to previous block"
if chain.setCanonical(header) != ValidationResult.OK:
return invalidFCU(com, header)
elif chain.setCanonical(header) != ValidationResult.OK:
@ -184,7 +184,7 @@ proc forkchoiceUpdated*(ben: BeaconEngineRef,
info "Created payload for sealing",
id = id.toHex,
hash = payload.blockHash,
hash = payload.blockHash.short,
number = payload.blockNumber
return validFCU(some(id), blockHash)

View File

@ -66,7 +66,7 @@ proc newPayload*(ben: BeaconEngineRef,
# return a fake success.
if db.getBlockHeader(blockHash, header):
warn "Ignoring already known beacon payload",
number = header.blockNumber, hash = blockHash
number = header.blockNumber, hash = blockHash.short
return validStatus(blockHash)
# If the parent is missing, we - in theory - could trigger a sync, but that
@ -94,8 +94,8 @@ proc newPayload*(ben: BeaconEngineRef,
# a forkchoice update request.
warn "Ignoring payload with missing parent",
number = header.blockNumber,
hash = blockHash,
parent = header.parentHash
hash = blockHash.short,
parent = header.parentHash.short
return acceptedStatus()
# We have an existing parent, do some sanity checks to avoid the beacon client

View File

@ -24,7 +24,7 @@ export
type
BeaconEngineRef* = ref object
txPool: TxPoolRef
merge : MergeTrackerRef
merge : MergeTracker
queue : PayloadQueue
chain : ChainRef
@ -56,7 +56,7 @@ proc new*(_: type BeaconEngineRef,
chain: ChainRef): BeaconEngineRef =
BeaconEngineRef(
txPool: txPool,
merge : MergeTrackerRef.new(txPool.com.db),
merge : MergeTracker.init(txPool.com.db),
queue : PayloadQueue(),
chain : chain,
)

View File

@ -28,7 +28,7 @@ type
# Merger is an internal help structure used to track the eth1/2
# transition status. It's a common structure can be used in both full node
# and light client.
MergeTrackerRef* = ref object
MergeTracker* = object
db : CoreDbRef
status: TransitionStatus
@ -51,8 +51,8 @@ proc readStatus(db: CoreDbRef): TransitionStatus =
# Constructors
# ------------------------------------------------------------------------------
proc new*(_: type MergeTrackerRef, db: CoreDbRef): MergeTrackerRef =
MergeTrackerRef(
proc init*(_: type MergeTracker, db: CoreDbRef): MergeTracker =
MergeTracker(
db: db,
status: db.readStatus()
)
@ -61,7 +61,7 @@ proc new*(_: type MergeTrackerRef, db: CoreDbRef): MergeTrackerRef =
# Public functions, setters
# ------------------------------------------------------------------------------
proc reachTTD*(m: MergeTrackerRef) =
proc reachTTD*(m: var MergeTracker) =
## ReachTTD is called whenever the first NewHead message received
## from the consensus-layer.
if m.status.leftPoW:
@ -72,7 +72,7 @@ proc reachTTD*(m: MergeTrackerRef) =
info "Left PoW stage"
proc finalizePoS*(m: MergeTrackerRef) =
proc finalizePoS*(m: var MergeTracker) =
## FinalizePoS is called whenever the first FinalisedBlock message received
## from the consensus-layer.
@ -88,10 +88,10 @@ proc finalizePoS*(m: MergeTrackerRef) =
# Public functions, getters
# ------------------------------------------------------------------------------
func ttdReached*(m: MergeTrackerRef): bool =
func ttdReached*(m: MergeTracker): bool =
## TTDReached reports whether the chain has left the PoW stage.
m.status.leftPoW
func posFinalized*(m: MergeTrackerRef): bool =
func posFinalized*(m: MergeTracker): bool =
## PoSFinalized reports whether the chain has entered the PoS stage.
m.status.enteredPoS

View File

@ -40,7 +40,7 @@ type
hash: common.Hash256
header: common.BlockHeader
PayloadQueue* = ref object
PayloadQueue* = object
payloadQueue: SimpleQueue[MaxTrackedPayloads, PayloadItem]
headerQueue: SimpleQueue[MaxTrackedHeaders, HeaderItem]

View File

@ -12,7 +12,8 @@ import
web3/ethtypes,
web3/engine_api_types,
eth/common/eth_types_rlp,
stew/byteutils
stew/byteutils,
../utils/utils
from web3/ethtypes as web3types import nil
import eth/common/eth_types as common
@ -55,6 +56,10 @@ proc `$`*(x: Web3Quantity): string =
proc `$`*(x: Web3Address): string =
distinctBase(x).toHex
proc short*(x: Web3Hash): string =
let z = common.Hash256(data: distinctBase x)
short(z)
# ------------------------------------------------------------------------------
# Web3 defaults

View File

@ -15,7 +15,7 @@ import
chronicles,
chronos,
stew/[interval_set, sorted_set],
./beacon/[worker, worker_desc],
./beacon/[worker, worker_desc, beacon_impl],
"."/[sync_desc, sync_sched, protocol]
logScope:
@ -110,16 +110,17 @@ proc updateBeaconHeaderCB(ctx: BeaconSyncRef): SyncReqNewHeadCB =
## for the RPC module.
result = proc(h: BlockHeader) {.gcsafe, raises: [].} =
try:
debugEcho "REQUEST SYNC TO: ", h.blockNumber
debugEcho "REQUEST SYNC TO: ", h.blockHash
debug "REQUEST SYNC", number=h.blockNumber, hash=h.blockHash.short
waitFor ctx.ctx.appendSyncTarget(h)
except CatchableError as ex:
debugEcho ex.msg
error "updateBeconHeaderCB error", msg=ex.msg
proc enableRpcMagic(ctx: BeaconSyncRef) =
## Helper for `setup()`: Enable external pivot update via RPC
let com = ctx.ctx.chain.com
com.syncReqNewHead = ctx.updateBeaconHeaderCB
com.syncReqRelaxV2 = true
# We need engine_newPayload to be strict
com.syncReqRelaxV2 = false
# ------------------------------------------------------------------------------
# Public functions

View File

@ -0,0 +1,408 @@
# Nimbus
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
std/tables,
chronicles,
chronos,
chronos/timer,
./worker_desc,
./skeleton_main,
./skeleton_utils,
./skeleton_db,
../../utils/utils,
../protocol,
../types
logScope:
topics = "beacon-impl"
{.push gcsafe, raises: [].}
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
func makeGetBodyJob(header: BlockHeader, setHead: bool): BeaconJob =
BeaconJob(
mode: bjmGetBody,
getBodyJob: BeaconJobGetBody(
headerHash: header.blockHash,
sumHash: header.sumHash,
header: header,
setHead: setHead
)
)
func makeGetBlocksJob(number, maxResults: uint64) : BeaconJob =
BeaconJob(
mode: bjmGetBlocks,
getBlocksJob: BeaconJobGetBlocks(
number: number,
maxResults: maxResults,
)
)
func makeHeaderRequest(number: uint64, maxResults: uint64): BlocksRequest =
BlocksRequest(
startBlock: HashOrNum(isHash: false, number: number.u256),
maxResults: maxResults.uint,
skip: 0,
reverse: true
)
func makeGetBodiesJob(headers: sink seq[BlockHeader],
headerHash: sink seq[Hash256],
reqBodies: sink seq[bool]): BeaconJob =
BeaconJob(
mode: bjmGetBodies,
getBodiesJob: BeaconJobGetBodies(
headers : system.move(headers),
headerHash: system.move(headerHash),
reqBodies : system.move(reqBodies)
)
)
proc requeue(buddy: BeaconBuddyRef, job: BeaconJob) =
buddy.ctx.poolMode = true
buddy.only.requeue.add job
proc jobDone(buddy: BeaconBuddyRef) =
buddy.only.job = nil
proc mapBodiesToHeader(buddy: BeaconBuddyRef,
job: BeaconJob,
bodies: openArray[BlockBody],
reqBodies: openArray[bool]) {.raises: [CatchableError].} =
var
headers = system.move(job.getBlocksJob.headers)
map = initTable[Hash256, int]()
for i, x in bodies:
let bodyHash = sumHash(x)
map[bodyHash] = i
for i, req in reqBodies:
if not req:
if job.mode == bjmGetBlocks:
job.getBlocksJob.headers.add headers[i]
job.getBlocksJob.bodies.add BlockBody()
else:
job.getBodiesJob.headers.add headers[i]
job.getBodiesJob.bodies.add BlockBody()
continue
let bodyHash = sumHash(headers[i])
let z = map.getOrDefault(bodyHash, MissingBody)
if z == MissingBody:
# missing or invalid body, request again
buddy.requeue makeGetBodyJob(headers[i], setHead = false)
continue
if job.mode == bjmGetBlocks:
job.getBlocksJob.headers.add headers[i]
job.getBlocksJob.bodies.add bodies[z]
else:
job.getBodiesJob.headers.add headers[i]
job.getBodiesJob.bodies.add bodies[z]
proc putBlocks(ctx: BeaconCtxRef,
skel: SkeletonRef,
headers: openArray[BlockHeader],
bodies: openArray[BlockBody]) =
for i, body in bodies:
let r = skel.putBody(headers[i], body)
doAssert(r.isOk)
let res = skel.putBlocks(headers)
if res.isErr:
error "putBlocks->putBlocks", msg=res.error
return
let z = res.get
if FillCanonical in z.status:
let rr = skel.fillCanonicalChain()
if rr.isErr:
error "putBlocks->fillCanonicalChain", msg=rr.error
return
proc setupTally*(ctx: BeaconCtxRef) =
let
skel = ctx.pool.skeleton
last = skel.last
discard ctx.pool.mask.merge(1'u64, last.head)
for x in skel.subchains:
discard ctx.pool.mask.reduce(x.tail, x.head)
discard ctx.pool.pulled.merge(x.tail, x.head)
proc mergeTally*(ctx: BeaconCtxRef, least: uint64, last: uint64) =
discard ctx.pool.mask.merge(least, last)
proc reduceTally*(ctx: BeaconCtxRef, least: uint64, last: uint64) =
discard ctx.pool.mask.reduce(least, last)
discard ctx.pool.pulled.merge(least, last)
proc downloaded*(ctx: BeaconCtxRef, head: uint64): bool =
ctx.pool.pulled.covered(head, head) > 0'u64
proc headTally(ctx: BeaconCtxRef, head: uint64) =
discard ctx.pool.pulled.merge(head, head)
let rc = ctx.pool.mask.le()
if rc.isSome:
let maxPt = rc.get().maxPt
if head > maxPt:
# new head
discard ctx.pool.mask.merge(maxPt+1, head-1)
else:
# initialize
discard ctx.pool.mask.merge(1'u64, head)
discard ctx.pool.mask.reduce(head, head)
proc popFirst(x: var TargetQueue): BlockHeader =
# assume we already check len > 0
x.shift().get().data
proc addLast(x: var TargetQueue, h: BlockHeader) =
discard x.prepend(h.blockHash, h)
# ------------------------------------------------------------------------------
# Synchronizer will produce jobs for workers
# ------------------------------------------------------------------------------
proc resumeSync*(ctx: BeaconCtxRef): Future[bool] {.async.} =
let skel = ctx.pool.skeleton
if skel.len == 0:
return true
let last = skel.last
let res = skel.getHeader(last.head)
if res.isErr:
error "resumeSync->getHeader", msg=res.error
return false
let maybeHeader = res.get
if maybeHeader.isNone:
return true
let header = maybeHeader.get
let r = skel.initSync(header)
if r.isErr:
error "resumeSync->initSync", msg=r.error
return false
let z = r.get
if FillCanonical in z.status:
let rr = skel.fillCanonicalChain()
if rr.isErr:
error "resumeSync->fillCanonicalChain", msg=rr.error
return false
# collect gaps of skeleton, excluding genesis
ctx.setupTally()
return true
proc appendSyncTarget*(ctx: BeaconCtxRef, h: BlockHeader): Future[void] {.async.} =
while bmShiftTarget in ctx.pool.mode:
await sleepAsync timer.milliseconds(10)
let number = h.u64
ctx.pool.mode.incl bmAppendTarget
if not ctx.downloaded(number):
ctx.headTally(number)
ctx.pool.target.addLast(h)
ctx.pool.mode.excl bmAppendTarget
ctx.daemon = true
proc shiftSyncTarget*(ctx: BeaconCtxRef): Future[BlockHeader] {.async.} =
doAssert(ctx.pool.target.len > 0)
while bmAppendTarget in ctx.pool.mode:
await sleepAsync timer.milliseconds(10)
ctx.pool.mode.incl bmShiftTarget
let h = ctx.pool.target.popFirst()
ctx.pool.mode.excl bmShiftTarget
return h
proc setSyncTarget*(ctx: BeaconCtxRef): Future[void] {.async.} =
let header = await ctx.shiftSyncTarget()
let job = makeGetBodyJob(header, setHead = true)
ctx.pool.jobs.addLast(job)
proc fillBlocksGaps*(ctx: BeaconCtxRef, least: uint64, last: uint64) =
if last - least < MaxGetBlocks:
ctx.reduceTally(last-least, last)
let job = makeGetBlocksJob(last, last-least+1)
ctx.pool.jobs.addLast(job)
return
var
max = last
while true:
ctx.reduceTally(max-MaxGetBlocks, max)
let job = makeGetBlocksJob(max, MaxGetBlocks)
ctx.pool.jobs.addLast(job)
if ctx.pool.jobs.len > MaxJobsQueue:
return
max = max-MaxGetBlocks
if max <= MaxGetBlocks:
break
if max > 1:
ctx.reduceTally(1, max)
let job = makeGetBlocksJob(max, max)
ctx.pool.jobs.addLast(job)
# ------------------------------------------------------------------------------
# Worker will consume available jobs
# ------------------------------------------------------------------------------
proc executeGetBodyJob*(buddy: BeaconBuddyRef, job: BeaconJob): Future[void] {.async.} =
let
ctx = buddy.ctx
peer = buddy.peer
skel = ctx.pool.skeleton
let b = await peer.getBlockBodies([job.getBodyJob.headerHash])
if b.isNone:
debug "executeGetBodyJob->getBodies none"
# retry with other peer
buddy.requeue job
return
let bodies = b.get
if bodies.blocks.len == 0:
debug "executeGetBodyJob->getBodies isZero"
# retry with other peer
buddy.requeue job
return
job.getBodyJob.body = bodies.blocks[0]
let bodySumHash = sumHash(job.getBodyJob.body)
if bodySumHash != job.getBodyJob.sumHash:
# retry with other peer
debug "executeGetBodyJob->sumHash",
expect=job.getBodyJob.sumHash.short,
get=bodySumHash.short
buddy.requeue job
return
var status: set[SkeletonStatus]
if job.getBodyJob.setHead:
let res = skel.setHead(job.getBodyJob.header)
if res.isErr:
error "executeGetBodyJob->setHead", msg=res.error
# something wrong
return
status = res.get().status
else:
let res = skel.putBlocks([job.getBodyJob.header])
if res.isErr:
error "executeGetBodyJob->putBlocks", msg=res.error
return
status = res.get().status
let r = skel.putBody(job.getBodyJob.header, job.getBodyJob.body)
doAssert(r.isOk)
if FillCanonical in status:
let rr = skel.fillCanonicalChain()
if rr.isErr:
error "executeGetBodyJob->fillCanonicalChain", msg=rr.error
return
buddy.jobDone()
proc executeGetBlocksJob*(buddy: BeaconBuddyRef, job: BeaconJob): Future[void] {.async.} =
let
ctx = buddy.ctx
peer = buddy.peer
skel = ctx.pool.skeleton
request = makeHeaderRequest(job.getBlocksJob.number, job.getBlocksJob.maxResults)
let res = await peer.getBlockHeaders(request)
if res.isNone:
# retry with other peer
error "executeGetBlocksJob->getBlockHeaders none"
buddy.requeue job
return
job.getBlocksJob.headers = res.get().headers
let numHeaders = job.getBlocksJob.headers.len
var
headerHashes = newSeqOfCap[Hash256](numHeaders)
reqBodies = newSeqOfCap[bool](numHeaders)
numRequest = 0
for i, x in job.getBlocksJob.headers:
if not x.hasBody:
reqBodies.add false
continue
reqBodies.add true
headerHashes.add x.blockHash
inc numRequest
if numRequest == 0:
# all bodies are empty
for _ in 0..<numHeaders:
job.getBlocksJob.bodies.add BlockBody()
else:
let b = await peer.getBlockBodies(headerHashes)
if b.isNone:
debug "executeGetBlocksJob->getBodies none"
# retry with other peer
buddy.requeue makeGetBodiesJob(job.getBlocksJob.headers,
headerHashes, reqBodies)
return
buddy.mapBodiesToHeader(job, b.get().blocks, reqBodies)
ctx.putBlocks(skel, job.getBlocksJob.headers, job.getBlocksJob.bodies)
buddy.jobDone()
proc executeGetBodiesJob*(buddy: BeaconBuddyRef, job: BeaconJob): Future[void] {.async.} =
let
ctx = buddy.ctx
peer = buddy.peer
skel = ctx.pool.skeleton
let b = await peer.getBlockBodies(job.getBodiesJob.headerHash)
if b.isNone:
debug "executeGetBodiesJob->getBodies none"
# retry with other peer
buddy.requeue job
return
buddy.mapBodiesToHeader(job, b.get().blocks, job.getBodiesJob.reqBodies)
ctx.putBlocks(skel, job.getBodiesJob.headers, job.getBodiesJob.bodies)
buddy.jobDone()
proc executeJob*(buddy: BeaconBuddyRef, job: BeaconJob): Future[void] {.async.} =
let
ctx = buddy.ctx
try:
case job.mode
of bjmGetBody:
await executeGetBodyJob(buddy, job)
of bjmGetBlocks:
await executeGetBlocksJob(buddy, job)
of bjmGetBodies:
await executeGetBodiesJob(buddy, job)
except TransportError as ex:
error "executeJob->TransportError", msg=ex.msg
except CatchableError as ex:
error "executeJob->OtherError", msg=ex.msg
# retry with other peer
buddy.requeue job

View File

@ -11,44 +11,17 @@
import
./skeleton_desc,
./skeleton_utils,
./skeleton_db
./skeleton_db,
../../utils/utils
{.push gcsafe, raises: [].}
logScope:
topics = "skeleton"
proc isLinked*(sk: SkeletonRef): Result[bool, string] =
## Returns true if the skeleton chain is linked to canonical
if sk.isEmpty:
return ok(false)
let sc = sk.last
# if its genesis we are linked
if sc.tail == 0:
return ok(true)
let head = sk.blockHeight
if sc.tail > head + 1:
return ok(false)
let number = sc.tail - 1
let maybeHeader = sk.getHeader(number).valueOr:
return err("isLinked: " & error)
# The above sc.tail > head - 1
# assure maybeHeader.isSome
doAssert maybeHeader.isSome
let nextHeader = maybeHeader.get
let linked = sc.next == nextHeader.blockHash
if linked and sk.len > 1:
# Remove all other subchains as no more relevant
sk.removeAllButLast()
sk.writeProgress()
return ok(linked)
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc fastForwardHead(sk: SkeletonRef, last: Segment, target: uint64): Result[void, string] =
# Try fast forwarding the chain head to the number
@ -84,6 +57,76 @@ proc fastForwardHead(sk: SkeletonRef, last: Segment, target: uint64): Result[voi
`from`=head, to=last.head, tail=last.tail
ok()
proc backStep(sk: SkeletonRef): Result[uint64, string] =
if sk.conf.fillCanonicalBackStep <= 0:
return ok(0)
let sc = sk.last
var
newTail = sc.tail
maybeTailHeader: Opt[BlockHeader]
while true:
newTail = newTail + sk.conf.fillCanonicalBackStep
maybeTailHeader = sk.getHeader(newTail, true).valueOr:
return err(error)
if maybeTailHeader.isSome or newTail > sc.head: break
if newTail > sc.head:
newTail = sc.head
maybeTailHeader = sk.getHeader(newTail, true).valueOr:
return err(error)
if maybeTailHeader.isSome and newTail > 0:
debug "Backstepped skeleton", head=sc.head, tail=newTail
let tailHeader = maybeTailHeader.get
sk.last.tail = tailHeader.u64
sk.last.next = tailHeader.parentHash
sk.writeProgress()
return ok(newTail)
# we need a new head, emptying the subchains
sk.clear()
sk.writeProgress()
debug "Couldn't backStep subchain 0, dropping subchains for new head signal"
return ok(0)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc isLinked*(sk: SkeletonRef): Result[bool, string] =
## Returns true if the skeleton chain is linked to canonical
if sk.isEmpty:
return ok(false)
let sc = sk.last
# if its genesis we are linked
if sc.tail == 0:
return ok(true)
let head = sk.blockHeight
if sc.tail > head + 1:
return ok(false)
let number = sc.tail - 1
let maybeHeader = sk.getHeader(number).valueOr:
return err("isLinked: " & error)
# The above sc.tail > head - 1
# assure maybeHeader.isSome
doAssert maybeHeader.isSome
let nextHeader = maybeHeader.get
let linked = sc.next == nextHeader.blockHash
if linked and sk.len > 1:
# Remove all other subchains as no more relevant
sk.removeAllButLast()
sk.writeProgress()
return ok(linked)
proc trySubChainsMerge*(sk: SkeletonRef): Result[bool, string] =
var
merged = false
@ -146,6 +189,13 @@ proc putBlocks*(sk: SkeletonRef, headers: openArray[BlockHeader]):
if sk.len == 0:
return err("no subchain set")
# best place to debug beacon downloader
when false:
var numbers: seq[uint64]
for header in headers:
numbers.add header.u64
debugEcho numbers
for header in headers:
let
number = header.u64
@ -193,18 +243,18 @@ proc putBlocks*(sk: SkeletonRef, headers: openArray[BlockHeader]):
sk.writeProgress()
# Print a progress report making the UX a bit nicer
if getTime() - sk.logged > STATUS_LOG_INTERVAL:
var left = sk.last.tail - 1 - sk.blockHeight
if sk.progress.linked: left = 0
if left > 0:
sk.logged = getTime()
if sk.pulled == 0:
info "Beacon sync starting", left=left
else:
let sinceStarted = getTime() - sk.started
let eta = (sinceStarted div sk.pulled.int64) * left.int64
info "Syncing beacon headers",
downloaded=sk.pulled, left=left, eta=eta
#if getTime() - sk.logged > STATUS_LOG_INTERVAL:
# var left = sk.last.tail - 1 - sk.blockHeight
# if sk.progress.linked: left = 0
# if left > 0:
# sk.logged = getTime()
# if sk.pulled == 0:
# info "Beacon sync starting", left=left
# else:
# let sinceStarted = getTime() - sk.started
# let eta = (sinceStarted div sk.pulled.int64) * left.int64
# info "Syncing beacon headers",
# downloaded=sk.pulled, left=left, eta=eta.short
sk.progress.linked = sk.isLinked().valueOr:
return err(error)
@ -218,40 +268,6 @@ proc putBlocks*(sk: SkeletonRef, headers: openArray[BlockHeader]):
res.status.incl SyncMerged
ok(res)
proc backStep(sk: SkeletonRef): Result[uint64, string] =
if sk.conf.fillCanonicalBackStep <= 0:
return ok(0)
let sc = sk.last
var
newTail = sc.tail
maybeTailHeader: Opt[BlockHeader]
while true:
newTail = newTail + sk.conf.fillCanonicalBackStep
maybeTailHeader = sk.getHeader(newTail, true).valueOr:
return err(error)
if maybeTailHeader.isSome or newTail > sc.head: break
if newTail > sc.head:
newTail = sc.head
maybeTailHeader = sk.getHeader(newTail, true).valueOr:
return err(error)
if maybeTailHeader.isSome and newTail > 0:
debug "Backstepped skeleton", head=sc.head, tail=newTail
let tailHeader = maybeTailHeader.get
sk.last.tail = tailHeader.u64
sk.last.next = tailHeader.parentHash
sk.writeProgress()
return ok(newTail)
# we need a new head, emptying the subchains
sk.clear()
sk.writeProgress()
debug "Couldn't backStep subchain 0, dropping subchains for new head signal"
return ok(0)
# Inserts skeleton blocks into canonical chain and runs execution.
proc fillCanonicalChain*(sk: SkeletonRef): Result[void, string] =
if sk.filling: return ok()
@ -324,11 +340,11 @@ proc fillCanonicalChain*(sk: SkeletonRef): Result[void, string] =
parentHash=header.parentHash.short
# Lets log some parent by number and parent by hash, that may help to understand whats going on
let parent = sk.getHeader(number - 1).valueOr:
let parent {.used.} = sk.getHeader(number - 1).valueOr:
return err(error)
debug "ParentByNumber", number=parent.numberStr, hash=parent.blockHashStr
let parentWithHash = sk.getHeader(header.parentHash).valueOr:
let parentWithHash {.used.} = sk.getHeader(header.parentHash).valueOr:
return err(error)
debug "parentByHash",
@ -346,7 +362,7 @@ proc fillCanonicalChain*(sk: SkeletonRef): Result[void, string] =
# it will be fetched from the chain without any issues
sk.deleteHeaderAndBody(header)
if sk.fillLogIndex >= 20:
info "Skeleton canonical chain fill status",
debug "Skeleton canonical chain fill status",
canonicalHead,
chainHead=sk.blockHeight,
subchainHead=subchain.head

View File

@ -13,9 +13,13 @@
import
chronicles,
chronos,
chronos/timer,
eth/p2p,
".."/[protocol, sync_desc],
./worker_desc
./worker_desc,
./skeleton_main,
./skeleton_utils,
./beacon_impl
logScope:
topics = "beacon-buddy"
@ -24,41 +28,27 @@ const
extraTraceMessages = false # or true
## Enabled additional logging noise
FirstPivotSeenTimeout = 3.minutes
## Turn on relaxed pivot negotiation after some waiting time when there
## was a `peer` seen but was rejected. This covers a rare event. Typically
## useless peers do not appear ready for negotiation.
FirstPivotAcceptedTimeout = 50.seconds
## Turn on relaxed pivot negotiation after some waiting time when there
## was a `peer` accepted but no second one yet.
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc pp(n: BlockNumber): string =
## Dedicated pretty printer (`$` is defined elsewhere using `UInt256`)
if n == high(BlockNumber): "high" else:"#" & $n
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
# ------------------------------------------------------------------------------
# Public start/stop and admin functions
# ------------------------------------------------------------------------------
proc setup*(ctx: BeaconCtxRef): bool =
## Global set up
#ctx.pool.pivot = BestPivotCtxRef.init(ctx.pool.rng)
ctx.pool.target.init()
ctx.pool.mask = HeaderInterval.init()
ctx.pool.pulled = HeaderInterval.init()
ctx.pool.skeleton = SkeletonRef.new(ctx.chain)
let res = ctx.pool.skeleton.open()
if res.isErr:
error "Cannot open beacon skeleton", msg=res.error
return false
ctx.pool.mode.incl bmResumeSync
true
proc release*(ctx: BeaconCtxRef) =
## Global clean up
#ctx.pool.pivot = nil
discard
proc start*(buddy: BeaconBuddyRef): bool =
## Initialise worker peer
@ -88,12 +78,37 @@ proc runDaemon*(ctx: BeaconCtxRef) {.async.} =
## `runSingle()`, or `runMulti()` functions.
##
debugEcho "RUNDAEMON: ", ctx.pool.id
ctx.daemon = false
debug "RUNDAEMON", id=ctx.pool.id
# Just wake up after long sleep (e.g. client terminated)
if bmResumeSync in ctx.pool.mode:
let ok = await ctx.resumeSync()
ctx.pool.mode.excl bmResumeSync
# We get order from engine API
if ctx.pool.target.len > 0:
await ctx.setSyncTarget()
# Distributing jobs of filling gaps to peers
let mask = ctx.pool.mask
for x in mask.decreasing:
ctx.fillBlocksGaps(x.minPt, x.maxPt)
# Tell the `runPool` to grab job for each peer
if ctx.pool.jobs.len > 0:
ctx.poolMode = true
# Rerun this function next iteration
# if there are more new sync target
ctx.daemon = ctx.pool.target.len > 0
# Without waiting, this function repeats every 50ms (as set with the constant
# `sync_sched.execLoopTimeElapsedMin`.) Larger waiting time cleans up logging.
await sleepAsync 300.milliseconds
var sleepDuration = timer.milliseconds(300)
if ctx.pool.jobs.len == 0 and ctx.pool.target.len == 0:
sleepDuration = timer.seconds(5)
await sleepAsync sleepDuration
proc runSingle*(buddy: BeaconBuddyRef) {.async.} =
@ -110,16 +125,15 @@ proc runSingle*(buddy: BeaconBuddyRef) {.async.} =
##
let
ctx = buddy.ctx
peer {.used.} = buddy.peer
debugEcho "RUNSINGLE: ", ctx.pool.id
debug "RUNSINGLE", id=ctx.pool.id
if buddy.ctrl.stopped:
when extraTraceMessages:
trace "Single mode stopped", peer, pivotState=ctx.pool.pivotState
return # done with this buddy
var napping = 2.seconds
var napping = timer.seconds(2)
when extraTraceMessages:
trace "Single mode end", peer, napping
@ -127,6 +141,10 @@ proc runSingle*(buddy: BeaconBuddyRef) {.async.} =
# `sync_sched.execLoopTimeElapsedMin`.)
await sleepAsync napping
# request new jobs, if available
if ctx.pool.jobs.len == 0:
ctx.daemon = true
proc runPool*(buddy: BeaconBuddyRef; last: bool; laps: int): bool =
## Once started, the function `runPool()` is called for all worker peers in
@ -147,9 +165,25 @@ proc runPool*(buddy: BeaconBuddyRef; last: bool; laps: int): bool =
let
ctx = buddy.ctx
debugEcho "RUNPOOL: ", ctx.pool.id
debug "RUNPOOL", id=ctx.pool.id
# If a peer cannot finish it's job,
# we will put it back into circulation.
# A peer can also spawn more jobs.
if buddy.only.requeue.len > 0:
for job in buddy.only.requeue:
ctx.pool.jobs.addLast(job)
buddy.only.requeue.setLen(0)
buddy.only.job = nil
# Take distributed jobs for each peer
if ctx.pool.jobs.len > 0 and buddy.only.job.isNil:
buddy.only.job = ctx.pool.jobs.popFirst()
buddy.ctrl.multiOk = true
# If there is no more jobs, stop
ctx.pool.jobs.len == 0
true # Stop after running once regardless of peer
proc runMulti*(buddy: BeaconBuddyRef) {.async.} =
## This peer worker is invoked if the `buddy.ctrl.multiOk` flag is set
@ -159,12 +193,22 @@ proc runMulti*(buddy: BeaconBuddyRef) {.async.} =
let
ctx = buddy.ctx
debugEcho "RUNMULTI: ", ctx.pool.id
debug "RUNMULTI", id=ctx.pool.id
# If each of peers get their job,
# execute it until failure or success
# It is also possible to spawn more jobs
if buddy.only.job.isNil.not:
await buddy.executeJob(buddy.only.job)
# Update persistent database
#while not buddy.ctrl.stopped:
# Allow thread switch as `persistBlocks()` might be slow
await sleepAsync(10.milliseconds)
await sleepAsync timer.milliseconds(10)
# request new jobs, if available
if ctx.pool.jobs.len == 0:
ctx.daemon = true
# ------------------------------------------------------------------------------
# End

View File

@ -11,18 +11,77 @@
{.push raises:[].}
import
std/deques,
stew/interval_set,
stew/keyed_queue,
eth/p2p,
chronos,
../sync_desc
../sync_desc,
./skeleton_desc
export
deques,
interval_set,
keyed_queue
type
BeaconMode* = enum
bmNone ## Do nothing
bmResumeSync ## Resume sync if the client stopped
bmAppendTarget ## Put new sync target into queue
bmShiftTarget ## Get sync target from queue
BeaconJobGetBody* = object
headerHash*: Hash256 ## request body using this hash
sumHash* : Hash256 ## compare downloaded body to this hash
header* : BlockHeader ## downloaded header
body* : BlockBody ## downloaded body
setHead* : bool ## true: setHead, false: putBlocks
BeaconJobGetBlocks* = object
number* : uint64 ## starting number of blocks
maxResults*: uint64 ## number of blocks we want to download
headers* : seq[BlockHeader] ## downloaded headers
bodies* : seq[BlockBody] ## downloaded bodies
BeaconJobGetBodies* = object
headers* : seq[BlockHeader] ## downloaded headers
bodies* : seq[BlockBody] ## downloaded bodies
headerHash*: seq[Hash256] ## request to download bodies using this hashes
reqBodies* : seq[bool] ## skip downloading body if header has no body
BeaconJobMode* = enum
bjmGetBody ## when setSyncTarget done, download the body
bjmGetBlocks ## download blocks to fill skeleton gaps
bjmGetBodies ## if bjmGetBlocks failed to download bodies, give it to other peer
BeaconJob* = ref object
case mode*: BeaconJobMode
of bjmGetBody:
getBodyJob*: BeaconJobGetBody
of bjmGetBlocks:
getBlocksJob*: BeaconJobGetBlocks
of bjmGetBodies:
getBodiesJob*: BeaconJobGetBodies
BeaconBuddyData* = object
## Local descriptor data extension
job* : BeaconJob
requeue*: seq[BeaconJob]
TargetQueue* = KeyedQueue[Hash256, BlockHeader]
HeaderInterval* = IntervalSetRef[uint64, uint64]
BeaconCtxData* = object
## Globally shared data extension
rng*: ref HmacDrbgContext ## Random generator, pre-initialised
id*: int
rng* : ref HmacDrbgContext ## Random generator, pre-initialised
id* : int ## Instance id, for debugging purpose
skeleton*: SkeletonRef ## Core algorithm, tracking both canonical and side chain
mode* : set[BeaconMode] ## Do one thing at a time
target* : TargetQueue ## Sync target list
jobs* : Deque[BeaconJob] ## Each buddy can get a job from here
mask* : HeaderInterval ## Skeleton gaps need to be downloaded
pulled* : HeaderInterval ## Downloaded skeleton blocks
BeaconBuddyRef* = BuddyRef[BeaconCtxData,BeaconBuddyData]
## Extended worker peer descriptor
@ -30,4 +89,9 @@ type
BeaconCtxRef* = CtxRef[BeaconCtxData]
## Extended global descriptor
const
MaxGetBlocks* = 64
MaxJobsQueue* = 32
MissingBody* = -1
# End

View File

@ -14,7 +14,8 @@ import
stew/byteutils,
../vm_state,
../vm_types,
../db/accounts_cache
../db/accounts_cache,
./utils
proc `$`(hash: Hash256): string =
hash.data.toHex
@ -143,3 +144,29 @@ proc debug*(tx: Transaction): string =
result.add "V : " & $tx.V & "\n"
result.add "R : " & $tx.R & "\n"
result.add "S : " & $tx.S & "\n"
proc debugSum*(h: BlockHeader): string =
result.add "txRoot : " & $h.txRoot & "\n"
result.add "ommersHash : " & $h.ommersHash & "\n"
if h.withdrawalsRoot.isSome:
result.add "withdrawalsRoot: " & $h.withdrawalsRoot.get() & "\n"
result.add "sumHash : " & $sumHash(h) & "\n"
proc debugSum*(body: BlockBody): string =
let ommersHash = keccakHash(rlp.encode(body.uncles))
let txRoot = calcTxRoot(body.transactions)
let wdRoot = if body.withdrawals.isSome:
calcWithdrawalsRoot(body.withdrawals.get)
else: EMPTY_ROOT_HASH
let numwd = if body.withdrawals.isSome:
$body.withdrawals.get().len
else:
"none"
result.add "txRoot : " & $txRoot & "\n"
result.add "ommersHash : " & $ommersHash & "\n"
if body.withdrawals.isSome:
result.add "wdRoot : " & $wdRoot & "\n"
result.add "num tx : " & $body.transactions.len & "\n"
result.add "num uncles : " & $body.uncles.len & "\n"
result.add "num wd : " & numwd & "\n"
result.add "sumHash : " & $sumHash(body) & "\n"

View File

@ -1,5 +1,5 @@
import
std/math,
std/[math, times, strutils],
eth/[rlp, common/eth_types_rlp],
stew/byteutils,
nimcrypto,
@ -47,6 +47,11 @@ proc sumHash*(header: BlockHeader): Hash256 =
else: EMPTY_ROOT_HASH
sumHash(header.txRoot, header.ommersHash, wdRoot)
func hasBody*(h: BlockHeader): bool =
h.txRoot != EMPTY_ROOT_HASH or
h.ommersHash != EMPTY_UNCLE_HASH or
h.withdrawalsRoot.get(EMPTY_ROOT_HASH) != EMPTY_ROOT_HASH
func generateAddress*(address: EthAddress, nonce: AccountNonce): EthAddress =
result[0..19] = keccakHash(rlp.encodeList(address, nonce)).data.toOpenArray(12, 31)
@ -89,6 +94,16 @@ proc short*(h: Hash256): string =
bytes[^3..^1] = h.data[^3..^1]
bytes.toHex
func short*(x: Duration): string =
let parts = x.toParts
if parts[Hours] > 0:
result.add $parts[Hours]
result.add ':'
result.add intToStr(parts[Minutes].int, 2)
result.add ':'
result.add intToStr(parts[Seconds].int, 2)
proc decompose*(rlp: var Rlp,
header: var BlockHeader,
body: var BlockBody) {.gcsafe, raises: [RlpError].} =