From e8d59bc7a7a8ce43d890bbcc52c8f96f7049bbb2 Mon Sep 17 00:00:00 2001 From: andri lim Date: Thu, 28 Sep 2023 13:20:12 +0700 Subject: [PATCH] Working prototype of beacon downloader (#1780) * Working prototype of beacon downloader * Use KeyedQueue for setSyncTarget queue * Convert MergeTracker and PayloadQueue to non ref object --- .../nodocker/engine/engine_env.nim | 12 +- hive_integration/nodocker/engine/types.nim | 4 + .../nodocker/engine/withdrawal_tests.nim | 2 +- .../engine/withdrawals/wd_sync_spec.nim | 10 +- hive_integration/nodocker/sim_utils.nim | 3 +- nimbus/beacon/api_handler/api_forkchoice.nim | 6 +- nimbus/beacon/api_handler/api_newpayload.nim | 6 +- nimbus/beacon/beacon_engine.nim | 4 +- nimbus/beacon/merge_tracker.nim | 14 +- nimbus/beacon/payload_queue.nim | 2 +- nimbus/beacon/web3_eth_conv.nim | 7 +- nimbus/sync/beacon.nim | 11 +- nimbus/sync/beacon/beacon_impl.nim | 408 ++++++++++++++++++ nimbus/sync/beacon/skeleton_algo.nim | 178 ++++---- nimbus/sync/beacon/worker.nim | 114 +++-- nimbus/sync/beacon/worker_desc.nim | 70 ++- nimbus/utils/debug.nim | 29 +- nimbus/utils/utils.nim | 17 +- 18 files changed, 747 insertions(+), 150 deletions(-) create mode 100644 nimbus/sync/beacon/beacon_impl.nim diff --git a/hive_integration/nodocker/engine/engine_env.nim b/hive_integration/nodocker/engine/engine_env.nim index 89660bcfc..827232cd7 100644 --- a/hive_integration/nodocker/engine/engine_env.nim +++ b/hive_integration/nodocker/engine/engine_env.nim @@ -121,7 +121,10 @@ proc newEngineEnv*(conf: var NimbusConf, chainFile: string, enableAuth: bool): E sealer = SealingEngineRef.new( chain, ctx, conf.engineSigner, txPool, EngineStopped) - sync = BeaconSyncRef.init(node, chain, ctx.rng, conf.maxPeers, id=conf.tcpPort.int) + sync = if com.ttd().isSome: + BeaconSyncRef.init(node, chain, ctx.rng, conf.maxPeers, id=conf.tcpPort.int) + else: + BeaconSyncRef(nil) beaconEngine = BeaconEngineRef.new(txPool, chain) setupEthRpc(node, ctx, com, txPool, server) @@ -140,7 +143,9 @@ proc newEngineEnv*(conf: var NimbusConf, chainFile: string, enableAuth: bool): E let client = newRpcHttpClient() waitFor client.connect("127.0.0.1", conf.rpcPort, false) - sync.start() + if com.ttd().isSome: + sync.start() + node.startListening() EngineEnv( @@ -155,7 +160,8 @@ proc newEngineEnv*(conf: var NimbusConf, chainFile: string, enableAuth: bool): E proc close*(env: EngineEnv) = waitFor env.node.closeWait() - env.sync.stop() + if not env.sync.isNil: + env.sync.stop() waitFor env.client.close() waitFor env.sealer.stop() waitFor env.server.closeWait() diff --git a/hive_integration/nodocker/engine/types.nim b/hive_integration/nodocker/engine/types.nim index 3e55445e5..62500ea4d 100644 --- a/hive_integration/nodocker/engine/types.nim +++ b/hive_integration/nodocker/engine/types.nim @@ -16,6 +16,10 @@ type run* : proc(spec: BaseSpec): bool spec* : BaseSpec +const + DefaultTimeout* = 60 # seconds + DefaultSleep* = 1 + template testCond*(expr: untyped) = if not (expr): return false diff --git a/hive_integration/nodocker/engine/withdrawal_tests.nim b/hive_integration/nodocker/engine/withdrawal_tests.nim index 5ba479515..7a3ffdf30 100644 --- a/hive_integration/nodocker/engine/withdrawal_tests.nim +++ b/hive_integration/nodocker/engine/withdrawal_tests.nim @@ -256,7 +256,7 @@ let wdTestList* = [ "- Wait for sync, which include syncing a pre-Withdrawals block, and verify withdrawn account's balance\n", run: specExecute[SyncSpec], spec: SyncSpec( - timeoutSeconds: 50, + timeoutSeconds: 100, wdForkHeight: 2, wdBlockCount: 128, wdPerBlock: MAINNET_MAX_WITHDRAWAL_COUNT_PER_BLOCK, diff --git a/hive_integration/nodocker/engine/withdrawals/wd_sync_spec.nim b/hive_integration/nodocker/engine/withdrawals/wd_sync_spec.nim index 67c57b5f4..d5f2c9204 100644 --- a/hive_integration/nodocker/engine/withdrawals/wd_sync_spec.nim +++ b/hive_integration/nodocker/engine/withdrawals/wd_sync_spec.nim @@ -15,10 +15,16 @@ type syncSteps*: int # Sync block chunks that will be passed as head through FCUs to the syncing client syncShouldFail*: bool timeoutSeconds*: int + sleep*: int proc doSync(ws: SyncSpec, client: RpcClient, clMock: CLMocker): Future[bool] {.async.} = - let period = chronos.seconds(1) + if ws.sleep == 0: + ws.sleep = DefaultSleep + let period = chronos.seconds(ws.sleep) var loop = 0 + if ws.timeoutSeconds == 0: + ws.timeoutSeconds = DefaultTimeout + while loop < ws.timeoutSeconds: let res = client.newPayloadV2(clMock.latestExecutedPayload.V1V2) discard res @@ -36,7 +42,7 @@ proc doSync(ws: SyncSpec, client: RpcClient, clMock: CLMocker): Future[bool] {.a error "Syncing client rejected valid chain" await sleepAsync(period) - inc loop + loop += ws.sleep return false diff --git a/hive_integration/nodocker/sim_utils.nim b/hive_integration/nodocker/sim_utils.nim index cf05157ff..971651850 100644 --- a/hive_integration/nodocker/sim_utils.nim +++ b/hive_integration/nodocker/sim_utils.nim @@ -9,6 +9,7 @@ import std/[tables, strutils, times], + ../../nimbus/utils/utils, unittest2 export @@ -45,6 +46,6 @@ proc print*(stat: SimStat, dur: Duration, name: string) = f.write("\n") f.write(" - " & $stat) f.write("\n") - f.write(" - " & $dur) + f.write(" - " & dur.short) f.write("\n") f.close() diff --git a/nimbus/beacon/api_handler/api_forkchoice.nim b/nimbus/beacon/api_handler/api_forkchoice.nim index 032c88b12..3b521b1e0 100644 --- a/nimbus/beacon/api_handler/api_forkchoice.nim +++ b/nimbus/beacon/api_handler/api_forkchoice.nim @@ -78,7 +78,7 @@ proc forkchoiceUpdated*(ben: BeaconEngineRef, info "Forkchoice requested sync to new head", number = header.blockNumber, - hash = blockHash + hash = blockHash.short # Update sync header (if any) com.syncReqNewHead(header) @@ -117,7 +117,7 @@ proc forkchoiceUpdated*(ben: BeaconEngineRef, if db.getBlockHash(header.blockNumber, canonHash) and canonHash == blockHash: # TODO should this be possible? # If we allow these types of reorgs, we will do lots and lots of reorgs during sync - warn "Reorg to previous block" + debug "Reorg to previous block" if chain.setCanonical(header) != ValidationResult.OK: return invalidFCU(com, header) elif chain.setCanonical(header) != ValidationResult.OK: @@ -184,7 +184,7 @@ proc forkchoiceUpdated*(ben: BeaconEngineRef, info "Created payload for sealing", id = id.toHex, - hash = payload.blockHash, + hash = payload.blockHash.short, number = payload.blockNumber return validFCU(some(id), blockHash) diff --git a/nimbus/beacon/api_handler/api_newpayload.nim b/nimbus/beacon/api_handler/api_newpayload.nim index b5afa3f27..b16905056 100644 --- a/nimbus/beacon/api_handler/api_newpayload.nim +++ b/nimbus/beacon/api_handler/api_newpayload.nim @@ -66,7 +66,7 @@ proc newPayload*(ben: BeaconEngineRef, # return a fake success. if db.getBlockHeader(blockHash, header): warn "Ignoring already known beacon payload", - number = header.blockNumber, hash = blockHash + number = header.blockNumber, hash = blockHash.short return validStatus(blockHash) # If the parent is missing, we - in theory - could trigger a sync, but that @@ -94,8 +94,8 @@ proc newPayload*(ben: BeaconEngineRef, # a forkchoice update request. warn "Ignoring payload with missing parent", number = header.blockNumber, - hash = blockHash, - parent = header.parentHash + hash = blockHash.short, + parent = header.parentHash.short return acceptedStatus() # We have an existing parent, do some sanity checks to avoid the beacon client diff --git a/nimbus/beacon/beacon_engine.nim b/nimbus/beacon/beacon_engine.nim index 07a981dbe..b9f4ca04c 100644 --- a/nimbus/beacon/beacon_engine.nim +++ b/nimbus/beacon/beacon_engine.nim @@ -24,7 +24,7 @@ export type BeaconEngineRef* = ref object txPool: TxPoolRef - merge : MergeTrackerRef + merge : MergeTracker queue : PayloadQueue chain : ChainRef @@ -56,7 +56,7 @@ proc new*(_: type BeaconEngineRef, chain: ChainRef): BeaconEngineRef = BeaconEngineRef( txPool: txPool, - merge : MergeTrackerRef.new(txPool.com.db), + merge : MergeTracker.init(txPool.com.db), queue : PayloadQueue(), chain : chain, ) diff --git a/nimbus/beacon/merge_tracker.nim b/nimbus/beacon/merge_tracker.nim index cadbf214e..8a1a891a6 100644 --- a/nimbus/beacon/merge_tracker.nim +++ b/nimbus/beacon/merge_tracker.nim @@ -28,7 +28,7 @@ type # Merger is an internal help structure used to track the eth1/2 # transition status. It's a common structure can be used in both full node # and light client. - MergeTrackerRef* = ref object + MergeTracker* = object db : CoreDbRef status: TransitionStatus @@ -51,8 +51,8 @@ proc readStatus(db: CoreDbRef): TransitionStatus = # Constructors # ------------------------------------------------------------------------------ -proc new*(_: type MergeTrackerRef, db: CoreDbRef): MergeTrackerRef = - MergeTrackerRef( +proc init*(_: type MergeTracker, db: CoreDbRef): MergeTracker = + MergeTracker( db: db, status: db.readStatus() ) @@ -61,7 +61,7 @@ proc new*(_: type MergeTrackerRef, db: CoreDbRef): MergeTrackerRef = # Public functions, setters # ------------------------------------------------------------------------------ -proc reachTTD*(m: MergeTrackerRef) = +proc reachTTD*(m: var MergeTracker) = ## ReachTTD is called whenever the first NewHead message received ## from the consensus-layer. if m.status.leftPoW: @@ -72,7 +72,7 @@ proc reachTTD*(m: MergeTrackerRef) = info "Left PoW stage" -proc finalizePoS*(m: MergeTrackerRef) = +proc finalizePoS*(m: var MergeTracker) = ## FinalizePoS is called whenever the first FinalisedBlock message received ## from the consensus-layer. @@ -88,10 +88,10 @@ proc finalizePoS*(m: MergeTrackerRef) = # Public functions, getters # ------------------------------------------------------------------------------ -func ttdReached*(m: MergeTrackerRef): bool = +func ttdReached*(m: MergeTracker): bool = ## TTDReached reports whether the chain has left the PoW stage. m.status.leftPoW -func posFinalized*(m: MergeTrackerRef): bool = +func posFinalized*(m: MergeTracker): bool = ## PoSFinalized reports whether the chain has entered the PoS stage. m.status.enteredPoS diff --git a/nimbus/beacon/payload_queue.nim b/nimbus/beacon/payload_queue.nim index 02c1b5d18..2f08c1809 100644 --- a/nimbus/beacon/payload_queue.nim +++ b/nimbus/beacon/payload_queue.nim @@ -40,7 +40,7 @@ type hash: common.Hash256 header: common.BlockHeader - PayloadQueue* = ref object + PayloadQueue* = object payloadQueue: SimpleQueue[MaxTrackedPayloads, PayloadItem] headerQueue: SimpleQueue[MaxTrackedHeaders, HeaderItem] diff --git a/nimbus/beacon/web3_eth_conv.nim b/nimbus/beacon/web3_eth_conv.nim index e3bd20fd0..104f4dcc6 100644 --- a/nimbus/beacon/web3_eth_conv.nim +++ b/nimbus/beacon/web3_eth_conv.nim @@ -12,7 +12,8 @@ import web3/ethtypes, web3/engine_api_types, eth/common/eth_types_rlp, - stew/byteutils + stew/byteutils, + ../utils/utils from web3/ethtypes as web3types import nil import eth/common/eth_types as common @@ -55,6 +56,10 @@ proc `$`*(x: Web3Quantity): string = proc `$`*(x: Web3Address): string = distinctBase(x).toHex + +proc short*(x: Web3Hash): string = + let z = common.Hash256(data: distinctBase x) + short(z) # ------------------------------------------------------------------------------ # Web3 defaults diff --git a/nimbus/sync/beacon.nim b/nimbus/sync/beacon.nim index c60a4a5a4..5d14f9058 100644 --- a/nimbus/sync/beacon.nim +++ b/nimbus/sync/beacon.nim @@ -15,7 +15,7 @@ import chronicles, chronos, stew/[interval_set, sorted_set], - ./beacon/[worker, worker_desc], + ./beacon/[worker, worker_desc, beacon_impl], "."/[sync_desc, sync_sched, protocol] logScope: @@ -110,16 +110,17 @@ proc updateBeaconHeaderCB(ctx: BeaconSyncRef): SyncReqNewHeadCB = ## for the RPC module. result = proc(h: BlockHeader) {.gcsafe, raises: [].} = try: - debugEcho "REQUEST SYNC TO: ", h.blockNumber - debugEcho "REQUEST SYNC TO: ", h.blockHash + debug "REQUEST SYNC", number=h.blockNumber, hash=h.blockHash.short + waitFor ctx.ctx.appendSyncTarget(h) except CatchableError as ex: - debugEcho ex.msg + error "updateBeconHeaderCB error", msg=ex.msg proc enableRpcMagic(ctx: BeaconSyncRef) = ## Helper for `setup()`: Enable external pivot update via RPC let com = ctx.ctx.chain.com com.syncReqNewHead = ctx.updateBeaconHeaderCB - com.syncReqRelaxV2 = true + # We need engine_newPayload to be strict + com.syncReqRelaxV2 = false # ------------------------------------------------------------------------------ # Public functions diff --git a/nimbus/sync/beacon/beacon_impl.nim b/nimbus/sync/beacon/beacon_impl.nim new file mode 100644 index 000000000..5eddb574a --- /dev/null +++ b/nimbus/sync/beacon/beacon_impl.nim @@ -0,0 +1,408 @@ +# Nimbus +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +import + std/tables, + chronicles, + chronos, + chronos/timer, + ./worker_desc, + ./skeleton_main, + ./skeleton_utils, + ./skeleton_db, + ../../utils/utils, + ../protocol, + ../types + +logScope: + topics = "beacon-impl" + +{.push gcsafe, raises: [].} + +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +func makeGetBodyJob(header: BlockHeader, setHead: bool): BeaconJob = + BeaconJob( + mode: bjmGetBody, + getBodyJob: BeaconJobGetBody( + headerHash: header.blockHash, + sumHash: header.sumHash, + header: header, + setHead: setHead + ) + ) + +func makeGetBlocksJob(number, maxResults: uint64) : BeaconJob = + BeaconJob( + mode: bjmGetBlocks, + getBlocksJob: BeaconJobGetBlocks( + number: number, + maxResults: maxResults, + ) + ) + +func makeHeaderRequest(number: uint64, maxResults: uint64): BlocksRequest = + BlocksRequest( + startBlock: HashOrNum(isHash: false, number: number.u256), + maxResults: maxResults.uint, + skip: 0, + reverse: true + ) + +func makeGetBodiesJob(headers: sink seq[BlockHeader], + headerHash: sink seq[Hash256], + reqBodies: sink seq[bool]): BeaconJob = + BeaconJob( + mode: bjmGetBodies, + getBodiesJob: BeaconJobGetBodies( + headers : system.move(headers), + headerHash: system.move(headerHash), + reqBodies : system.move(reqBodies) + ) + ) + +proc requeue(buddy: BeaconBuddyRef, job: BeaconJob) = + buddy.ctx.poolMode = true + buddy.only.requeue.add job + +proc jobDone(buddy: BeaconBuddyRef) = + buddy.only.job = nil + +proc mapBodiesToHeader(buddy: BeaconBuddyRef, + job: BeaconJob, + bodies: openArray[BlockBody], + reqBodies: openArray[bool]) {.raises: [CatchableError].} = + var + headers = system.move(job.getBlocksJob.headers) + map = initTable[Hash256, int]() + + for i, x in bodies: + let bodyHash = sumHash(x) + map[bodyHash] = i + + for i, req in reqBodies: + if not req: + if job.mode == bjmGetBlocks: + job.getBlocksJob.headers.add headers[i] + job.getBlocksJob.bodies.add BlockBody() + else: + job.getBodiesJob.headers.add headers[i] + job.getBodiesJob.bodies.add BlockBody() + continue + + let bodyHash = sumHash(headers[i]) + let z = map.getOrDefault(bodyHash, MissingBody) + if z == MissingBody: + # missing or invalid body, request again + buddy.requeue makeGetBodyJob(headers[i], setHead = false) + continue + + if job.mode == bjmGetBlocks: + job.getBlocksJob.headers.add headers[i] + job.getBlocksJob.bodies.add bodies[z] + else: + job.getBodiesJob.headers.add headers[i] + job.getBodiesJob.bodies.add bodies[z] + +proc putBlocks(ctx: BeaconCtxRef, + skel: SkeletonRef, + headers: openArray[BlockHeader], + bodies: openArray[BlockBody]) = + + for i, body in bodies: + let r = skel.putBody(headers[i], body) + doAssert(r.isOk) + + let res = skel.putBlocks(headers) + if res.isErr: + error "putBlocks->putBlocks", msg=res.error + return + + let z = res.get + if FillCanonical in z.status: + let rr = skel.fillCanonicalChain() + if rr.isErr: + error "putBlocks->fillCanonicalChain", msg=rr.error + return + +proc setupTally*(ctx: BeaconCtxRef) = + let + skel = ctx.pool.skeleton + last = skel.last + + discard ctx.pool.mask.merge(1'u64, last.head) + for x in skel.subchains: + discard ctx.pool.mask.reduce(x.tail, x.head) + discard ctx.pool.pulled.merge(x.tail, x.head) + +proc mergeTally*(ctx: BeaconCtxRef, least: uint64, last: uint64) = + discard ctx.pool.mask.merge(least, last) + +proc reduceTally*(ctx: BeaconCtxRef, least: uint64, last: uint64) = + discard ctx.pool.mask.reduce(least, last) + discard ctx.pool.pulled.merge(least, last) + +proc downloaded*(ctx: BeaconCtxRef, head: uint64): bool = + ctx.pool.pulled.covered(head, head) > 0'u64 + +proc headTally(ctx: BeaconCtxRef, head: uint64) = + discard ctx.pool.pulled.merge(head, head) + let rc = ctx.pool.mask.le() + if rc.isSome: + let maxPt = rc.get().maxPt + if head > maxPt: + # new head + discard ctx.pool.mask.merge(maxPt+1, head-1) + else: + # initialize + discard ctx.pool.mask.merge(1'u64, head) + discard ctx.pool.mask.reduce(head, head) + +proc popFirst(x: var TargetQueue): BlockHeader = + # assume we already check len > 0 + x.shift().get().data + +proc addLast(x: var TargetQueue, h: BlockHeader) = + discard x.prepend(h.blockHash, h) + +# ------------------------------------------------------------------------------ +# Synchronizer will produce jobs for workers +# ------------------------------------------------------------------------------ + +proc resumeSync*(ctx: BeaconCtxRef): Future[bool] {.async.} = + let skel = ctx.pool.skeleton + if skel.len == 0: + return true + + let last = skel.last + let res = skel.getHeader(last.head) + if res.isErr: + error "resumeSync->getHeader", msg=res.error + return false + + let maybeHeader = res.get + if maybeHeader.isNone: + return true + + let header = maybeHeader.get + let r = skel.initSync(header) + if r.isErr: + error "resumeSync->initSync", msg=r.error + return false + + let z = r.get + if FillCanonical in z.status: + let rr = skel.fillCanonicalChain() + if rr.isErr: + error "resumeSync->fillCanonicalChain", msg=rr.error + return false + + # collect gaps of skeleton, excluding genesis + ctx.setupTally() + + return true + +proc appendSyncTarget*(ctx: BeaconCtxRef, h: BlockHeader): Future[void] {.async.} = + while bmShiftTarget in ctx.pool.mode: + await sleepAsync timer.milliseconds(10) + + let number = h.u64 + ctx.pool.mode.incl bmAppendTarget + + if not ctx.downloaded(number): + ctx.headTally(number) + ctx.pool.target.addLast(h) + + ctx.pool.mode.excl bmAppendTarget + ctx.daemon = true + +proc shiftSyncTarget*(ctx: BeaconCtxRef): Future[BlockHeader] {.async.} = + doAssert(ctx.pool.target.len > 0) + while bmAppendTarget in ctx.pool.mode: + await sleepAsync timer.milliseconds(10) + + ctx.pool.mode.incl bmShiftTarget + let h = ctx.pool.target.popFirst() + ctx.pool.mode.excl bmShiftTarget + return h + +proc setSyncTarget*(ctx: BeaconCtxRef): Future[void] {.async.} = + let header = await ctx.shiftSyncTarget() + let job = makeGetBodyJob(header, setHead = true) + ctx.pool.jobs.addLast(job) + +proc fillBlocksGaps*(ctx: BeaconCtxRef, least: uint64, last: uint64) = + if last - least < MaxGetBlocks: + ctx.reduceTally(last-least, last) + let job = makeGetBlocksJob(last, last-least+1) + ctx.pool.jobs.addLast(job) + return + + var + max = last + + while true: + ctx.reduceTally(max-MaxGetBlocks, max) + let job = makeGetBlocksJob(max, MaxGetBlocks) + ctx.pool.jobs.addLast(job) + if ctx.pool.jobs.len > MaxJobsQueue: + return + max = max-MaxGetBlocks + if max <= MaxGetBlocks: + break + + if max > 1: + ctx.reduceTally(1, max) + let job = makeGetBlocksJob(max, max) + ctx.pool.jobs.addLast(job) + +# ------------------------------------------------------------------------------ +# Worker will consume available jobs +# ------------------------------------------------------------------------------ + +proc executeGetBodyJob*(buddy: BeaconBuddyRef, job: BeaconJob): Future[void] {.async.} = + let + ctx = buddy.ctx + peer = buddy.peer + skel = ctx.pool.skeleton + + let b = await peer.getBlockBodies([job.getBodyJob.headerHash]) + if b.isNone: + debug "executeGetBodyJob->getBodies none" + # retry with other peer + buddy.requeue job + return + + let bodies = b.get + if bodies.blocks.len == 0: + debug "executeGetBodyJob->getBodies isZero" + # retry with other peer + buddy.requeue job + return + + job.getBodyJob.body = bodies.blocks[0] + let bodySumHash = sumHash(job.getBodyJob.body) + if bodySumHash != job.getBodyJob.sumHash: + # retry with other peer + debug "executeGetBodyJob->sumHash", + expect=job.getBodyJob.sumHash.short, + get=bodySumHash.short + buddy.requeue job + return + + var status: set[SkeletonStatus] + + if job.getBodyJob.setHead: + let res = skel.setHead(job.getBodyJob.header) + if res.isErr: + error "executeGetBodyJob->setHead", msg=res.error + # something wrong + return + status = res.get().status + else: + let res = skel.putBlocks([job.getBodyJob.header]) + if res.isErr: + error "executeGetBodyJob->putBlocks", msg=res.error + return + status = res.get().status + + let r = skel.putBody(job.getBodyJob.header, job.getBodyJob.body) + doAssert(r.isOk) + if FillCanonical in status: + let rr = skel.fillCanonicalChain() + if rr.isErr: + error "executeGetBodyJob->fillCanonicalChain", msg=rr.error + return + + buddy.jobDone() + +proc executeGetBlocksJob*(buddy: BeaconBuddyRef, job: BeaconJob): Future[void] {.async.} = + let + ctx = buddy.ctx + peer = buddy.peer + skel = ctx.pool.skeleton + request = makeHeaderRequest(job.getBlocksJob.number, job.getBlocksJob.maxResults) + + let res = await peer.getBlockHeaders(request) + if res.isNone: + # retry with other peer + error "executeGetBlocksJob->getBlockHeaders none" + buddy.requeue job + return + + job.getBlocksJob.headers = res.get().headers + let numHeaders = job.getBlocksJob.headers.len + + var + headerHashes = newSeqOfCap[Hash256](numHeaders) + reqBodies = newSeqOfCap[bool](numHeaders) + numRequest = 0 + + for i, x in job.getBlocksJob.headers: + if not x.hasBody: + reqBodies.add false + continue + reqBodies.add true + headerHashes.add x.blockHash + inc numRequest + + if numRequest == 0: + # all bodies are empty + for _ in 0..getBodies none" + # retry with other peer + buddy.requeue makeGetBodiesJob(job.getBlocksJob.headers, + headerHashes, reqBodies) + return + buddy.mapBodiesToHeader(job, b.get().blocks, reqBodies) + + ctx.putBlocks(skel, job.getBlocksJob.headers, job.getBlocksJob.bodies) + buddy.jobDone() + +proc executeGetBodiesJob*(buddy: BeaconBuddyRef, job: BeaconJob): Future[void] {.async.} = + let + ctx = buddy.ctx + peer = buddy.peer + skel = ctx.pool.skeleton + + let b = await peer.getBlockBodies(job.getBodiesJob.headerHash) + if b.isNone: + debug "executeGetBodiesJob->getBodies none" + # retry with other peer + buddy.requeue job + return + buddy.mapBodiesToHeader(job, b.get().blocks, job.getBodiesJob.reqBodies) + ctx.putBlocks(skel, job.getBodiesJob.headers, job.getBodiesJob.bodies) + buddy.jobDone() + +proc executeJob*(buddy: BeaconBuddyRef, job: BeaconJob): Future[void] {.async.} = + let + ctx = buddy.ctx + + try: + case job.mode + of bjmGetBody: + await executeGetBodyJob(buddy, job) + of bjmGetBlocks: + await executeGetBlocksJob(buddy, job) + of bjmGetBodies: + await executeGetBodiesJob(buddy, job) + except TransportError as ex: + error "executeJob->TransportError", msg=ex.msg + except CatchableError as ex: + error "executeJob->OtherError", msg=ex.msg + # retry with other peer + buddy.requeue job diff --git a/nimbus/sync/beacon/skeleton_algo.nim b/nimbus/sync/beacon/skeleton_algo.nim index e26e5d17f..3ae58cffb 100644 --- a/nimbus/sync/beacon/skeleton_algo.nim +++ b/nimbus/sync/beacon/skeleton_algo.nim @@ -11,44 +11,17 @@ import ./skeleton_desc, ./skeleton_utils, - ./skeleton_db + ./skeleton_db, + ../../utils/utils {.push gcsafe, raises: [].} logScope: topics = "skeleton" -proc isLinked*(sk: SkeletonRef): Result[bool, string] = - ## Returns true if the skeleton chain is linked to canonical - if sk.isEmpty: - return ok(false) - - let sc = sk.last - - # if its genesis we are linked - if sc.tail == 0: - return ok(true) - - let head = sk.blockHeight - if sc.tail > head + 1: - return ok(false) - - let number = sc.tail - 1 - let maybeHeader = sk.getHeader(number).valueOr: - return err("isLinked: " & error) - - # The above sc.tail > head - 1 - # assure maybeHeader.isSome - doAssert maybeHeader.isSome - - let nextHeader = maybeHeader.get - let linked = sc.next == nextHeader.blockHash - if linked and sk.len > 1: - # Remove all other subchains as no more relevant - sk.removeAllButLast() - sk.writeProgress() - - return ok(linked) +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ proc fastForwardHead(sk: SkeletonRef, last: Segment, target: uint64): Result[void, string] = # Try fast forwarding the chain head to the number @@ -84,6 +57,76 @@ proc fastForwardHead(sk: SkeletonRef, last: Segment, target: uint64): Result[voi `from`=head, to=last.head, tail=last.tail ok() +proc backStep(sk: SkeletonRef): Result[uint64, string] = + if sk.conf.fillCanonicalBackStep <= 0: + return ok(0) + + let sc = sk.last + var + newTail = sc.tail + maybeTailHeader: Opt[BlockHeader] + + while true: + newTail = newTail + sk.conf.fillCanonicalBackStep + maybeTailHeader = sk.getHeader(newTail, true).valueOr: + return err(error) + if maybeTailHeader.isSome or newTail > sc.head: break + + if newTail > sc.head: + newTail = sc.head + maybeTailHeader = sk.getHeader(newTail, true).valueOr: + return err(error) + + if maybeTailHeader.isSome and newTail > 0: + debug "Backstepped skeleton", head=sc.head, tail=newTail + let tailHeader = maybeTailHeader.get + sk.last.tail = tailHeader.u64 + sk.last.next = tailHeader.parentHash + sk.writeProgress() + return ok(newTail) + + # we need a new head, emptying the subchains + sk.clear() + sk.writeProgress() + debug "Couldn't backStep subchain 0, dropping subchains for new head signal" + return ok(0) + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc isLinked*(sk: SkeletonRef): Result[bool, string] = + ## Returns true if the skeleton chain is linked to canonical + if sk.isEmpty: + return ok(false) + + let sc = sk.last + + # if its genesis we are linked + if sc.tail == 0: + return ok(true) + + let head = sk.blockHeight + if sc.tail > head + 1: + return ok(false) + + let number = sc.tail - 1 + let maybeHeader = sk.getHeader(number).valueOr: + return err("isLinked: " & error) + + # The above sc.tail > head - 1 + # assure maybeHeader.isSome + doAssert maybeHeader.isSome + + let nextHeader = maybeHeader.get + let linked = sc.next == nextHeader.blockHash + if linked and sk.len > 1: + # Remove all other subchains as no more relevant + sk.removeAllButLast() + sk.writeProgress() + + return ok(linked) + proc trySubChainsMerge*(sk: SkeletonRef): Result[bool, string] = var merged = false @@ -146,6 +189,13 @@ proc putBlocks*(sk: SkeletonRef, headers: openArray[BlockHeader]): if sk.len == 0: return err("no subchain set") + # best place to debug beacon downloader + when false: + var numbers: seq[uint64] + for header in headers: + numbers.add header.u64 + debugEcho numbers + for header in headers: let number = header.u64 @@ -193,18 +243,18 @@ proc putBlocks*(sk: SkeletonRef, headers: openArray[BlockHeader]): sk.writeProgress() # Print a progress report making the UX a bit nicer - if getTime() - sk.logged > STATUS_LOG_INTERVAL: - var left = sk.last.tail - 1 - sk.blockHeight - if sk.progress.linked: left = 0 - if left > 0: - sk.logged = getTime() - if sk.pulled == 0: - info "Beacon sync starting", left=left - else: - let sinceStarted = getTime() - sk.started - let eta = (sinceStarted div sk.pulled.int64) * left.int64 - info "Syncing beacon headers", - downloaded=sk.pulled, left=left, eta=eta + #if getTime() - sk.logged > STATUS_LOG_INTERVAL: + # var left = sk.last.tail - 1 - sk.blockHeight + # if sk.progress.linked: left = 0 + # if left > 0: + # sk.logged = getTime() + # if sk.pulled == 0: + # info "Beacon sync starting", left=left + # else: + # let sinceStarted = getTime() - sk.started + # let eta = (sinceStarted div sk.pulled.int64) * left.int64 + # info "Syncing beacon headers", + # downloaded=sk.pulled, left=left, eta=eta.short sk.progress.linked = sk.isLinked().valueOr: return err(error) @@ -218,40 +268,6 @@ proc putBlocks*(sk: SkeletonRef, headers: openArray[BlockHeader]): res.status.incl SyncMerged ok(res) -proc backStep(sk: SkeletonRef): Result[uint64, string] = - if sk.conf.fillCanonicalBackStep <= 0: - return ok(0) - - let sc = sk.last - var - newTail = sc.tail - maybeTailHeader: Opt[BlockHeader] - - while true: - newTail = newTail + sk.conf.fillCanonicalBackStep - maybeTailHeader = sk.getHeader(newTail, true).valueOr: - return err(error) - if maybeTailHeader.isSome or newTail > sc.head: break - - if newTail > sc.head: - newTail = sc.head - maybeTailHeader = sk.getHeader(newTail, true).valueOr: - return err(error) - - if maybeTailHeader.isSome and newTail > 0: - debug "Backstepped skeleton", head=sc.head, tail=newTail - let tailHeader = maybeTailHeader.get - sk.last.tail = tailHeader.u64 - sk.last.next = tailHeader.parentHash - sk.writeProgress() - return ok(newTail) - - # we need a new head, emptying the subchains - sk.clear() - sk.writeProgress() - debug "Couldn't backStep subchain 0, dropping subchains for new head signal" - return ok(0) - # Inserts skeleton blocks into canonical chain and runs execution. proc fillCanonicalChain*(sk: SkeletonRef): Result[void, string] = if sk.filling: return ok() @@ -324,11 +340,11 @@ proc fillCanonicalChain*(sk: SkeletonRef): Result[void, string] = parentHash=header.parentHash.short # Lets log some parent by number and parent by hash, that may help to understand whats going on - let parent = sk.getHeader(number - 1).valueOr: + let parent {.used.} = sk.getHeader(number - 1).valueOr: return err(error) debug "ParentByNumber", number=parent.numberStr, hash=parent.blockHashStr - let parentWithHash = sk.getHeader(header.parentHash).valueOr: + let parentWithHash {.used.} = sk.getHeader(header.parentHash).valueOr: return err(error) debug "parentByHash", @@ -346,7 +362,7 @@ proc fillCanonicalChain*(sk: SkeletonRef): Result[void, string] = # it will be fetched from the chain without any issues sk.deleteHeaderAndBody(header) if sk.fillLogIndex >= 20: - info "Skeleton canonical chain fill status", + debug "Skeleton canonical chain fill status", canonicalHead, chainHead=sk.blockHeight, subchainHead=subchain.head diff --git a/nimbus/sync/beacon/worker.nim b/nimbus/sync/beacon/worker.nim index 43f7ce64d..1f2a1f884 100644 --- a/nimbus/sync/beacon/worker.nim +++ b/nimbus/sync/beacon/worker.nim @@ -13,9 +13,13 @@ import chronicles, chronos, + chronos/timer, eth/p2p, ".."/[protocol, sync_desc], - ./worker_desc + ./worker_desc, + ./skeleton_main, + ./skeleton_utils, + ./beacon_impl logScope: topics = "beacon-buddy" @@ -24,41 +28,27 @@ const extraTraceMessages = false # or true ## Enabled additional logging noise - FirstPivotSeenTimeout = 3.minutes - ## Turn on relaxed pivot negotiation after some waiting time when there - ## was a `peer` seen but was rejected. This covers a rare event. Typically - ## useless peers do not appear ready for negotiation. - - FirstPivotAcceptedTimeout = 50.seconds - ## Turn on relaxed pivot negotiation after some waiting time when there - ## was a `peer` accepted but no second one yet. - -# ------------------------------------------------------------------------------ -# Private helpers -# ------------------------------------------------------------------------------ - -proc pp(n: BlockNumber): string = - ## Dedicated pretty printer (`$` is defined elsewhere using `UInt256`) - if n == high(BlockNumber): "high" else:"#" & $n - -# ------------------------------------------------------------------------------ -# Private functions -# ------------------------------------------------------------------------------ - - # ------------------------------------------------------------------------------ # Public start/stop and admin functions # ------------------------------------------------------------------------------ proc setup*(ctx: BeaconCtxRef): bool = ## Global set up - #ctx.pool.pivot = BestPivotCtxRef.init(ctx.pool.rng) + ctx.pool.target.init() + ctx.pool.mask = HeaderInterval.init() + ctx.pool.pulled = HeaderInterval.init() + ctx.pool.skeleton = SkeletonRef.new(ctx.chain) + let res = ctx.pool.skeleton.open() + if res.isErr: + error "Cannot open beacon skeleton", msg=res.error + return false + ctx.pool.mode.incl bmResumeSync true proc release*(ctx: BeaconCtxRef) = ## Global clean up - #ctx.pool.pivot = nil + discard proc start*(buddy: BeaconBuddyRef): bool = ## Initialise worker peer @@ -88,12 +78,37 @@ proc runDaemon*(ctx: BeaconCtxRef) {.async.} = ## `runSingle()`, or `runMulti()` functions. ## - debugEcho "RUNDAEMON: ", ctx.pool.id - ctx.daemon = false + debug "RUNDAEMON", id=ctx.pool.id + + # Just wake up after long sleep (e.g. client terminated) + if bmResumeSync in ctx.pool.mode: + let ok = await ctx.resumeSync() + ctx.pool.mode.excl bmResumeSync + + # We get order from engine API + if ctx.pool.target.len > 0: + await ctx.setSyncTarget() + + # Distributing jobs of filling gaps to peers + let mask = ctx.pool.mask + for x in mask.decreasing: + ctx.fillBlocksGaps(x.minPt, x.maxPt) + + # Tell the `runPool` to grab job for each peer + if ctx.pool.jobs.len > 0: + ctx.poolMode = true + + # Rerun this function next iteration + # if there are more new sync target + ctx.daemon = ctx.pool.target.len > 0 # Without waiting, this function repeats every 50ms (as set with the constant # `sync_sched.execLoopTimeElapsedMin`.) Larger waiting time cleans up logging. - await sleepAsync 300.milliseconds + var sleepDuration = timer.milliseconds(300) + if ctx.pool.jobs.len == 0 and ctx.pool.target.len == 0: + sleepDuration = timer.seconds(5) + + await sleepAsync sleepDuration proc runSingle*(buddy: BeaconBuddyRef) {.async.} = @@ -110,16 +125,15 @@ proc runSingle*(buddy: BeaconBuddyRef) {.async.} = ## let ctx = buddy.ctx - peer {.used.} = buddy.peer - debugEcho "RUNSINGLE: ", ctx.pool.id + debug "RUNSINGLE", id=ctx.pool.id if buddy.ctrl.stopped: when extraTraceMessages: trace "Single mode stopped", peer, pivotState=ctx.pool.pivotState return # done with this buddy - var napping = 2.seconds + var napping = timer.seconds(2) when extraTraceMessages: trace "Single mode end", peer, napping @@ -127,6 +141,10 @@ proc runSingle*(buddy: BeaconBuddyRef) {.async.} = # `sync_sched.execLoopTimeElapsedMin`.) await sleepAsync napping + # request new jobs, if available + if ctx.pool.jobs.len == 0: + ctx.daemon = true + proc runPool*(buddy: BeaconBuddyRef; last: bool; laps: int): bool = ## Once started, the function `runPool()` is called for all worker peers in @@ -147,9 +165,25 @@ proc runPool*(buddy: BeaconBuddyRef; last: bool; laps: int): bool = let ctx = buddy.ctx - debugEcho "RUNPOOL: ", ctx.pool.id + debug "RUNPOOL", id=ctx.pool.id + + # If a peer cannot finish it's job, + # we will put it back into circulation. + # A peer can also spawn more jobs. + if buddy.only.requeue.len > 0: + for job in buddy.only.requeue: + ctx.pool.jobs.addLast(job) + buddy.only.requeue.setLen(0) + buddy.only.job = nil + + # Take distributed jobs for each peer + if ctx.pool.jobs.len > 0 and buddy.only.job.isNil: + buddy.only.job = ctx.pool.jobs.popFirst() + buddy.ctrl.multiOk = true + + # If there is no more jobs, stop + ctx.pool.jobs.len == 0 - true # Stop after running once regardless of peer proc runMulti*(buddy: BeaconBuddyRef) {.async.} = ## This peer worker is invoked if the `buddy.ctrl.multiOk` flag is set @@ -159,12 +193,22 @@ proc runMulti*(buddy: BeaconBuddyRef) {.async.} = let ctx = buddy.ctx - debugEcho "RUNMULTI: ", ctx.pool.id + debug "RUNMULTI", id=ctx.pool.id + + # If each of peers get their job, + # execute it until failure or success + # It is also possible to spawn more jobs + if buddy.only.job.isNil.not: + await buddy.executeJob(buddy.only.job) # Update persistent database #while not buddy.ctrl.stopped: # Allow thread switch as `persistBlocks()` might be slow - await sleepAsync(10.milliseconds) + await sleepAsync timer.milliseconds(10) + + # request new jobs, if available + if ctx.pool.jobs.len == 0: + ctx.daemon = true # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/beacon/worker_desc.nim b/nimbus/sync/beacon/worker_desc.nim index 81d34f683..7433eda58 100644 --- a/nimbus/sync/beacon/worker_desc.nim +++ b/nimbus/sync/beacon/worker_desc.nim @@ -11,18 +11,77 @@ {.push raises:[].} import + std/deques, + stew/interval_set, + stew/keyed_queue, eth/p2p, chronos, - ../sync_desc + ../sync_desc, + ./skeleton_desc + +export + deques, + interval_set, + keyed_queue type + BeaconMode* = enum + bmNone ## Do nothing + bmResumeSync ## Resume sync if the client stopped + bmAppendTarget ## Put new sync target into queue + bmShiftTarget ## Get sync target from queue + + BeaconJobGetBody* = object + headerHash*: Hash256 ## request body using this hash + sumHash* : Hash256 ## compare downloaded body to this hash + header* : BlockHeader ## downloaded header + body* : BlockBody ## downloaded body + setHead* : bool ## true: setHead, false: putBlocks + + BeaconJobGetBlocks* = object + number* : uint64 ## starting number of blocks + maxResults*: uint64 ## number of blocks we want to download + headers* : seq[BlockHeader] ## downloaded headers + bodies* : seq[BlockBody] ## downloaded bodies + + BeaconJobGetBodies* = object + headers* : seq[BlockHeader] ## downloaded headers + bodies* : seq[BlockBody] ## downloaded bodies + headerHash*: seq[Hash256] ## request to download bodies using this hashes + reqBodies* : seq[bool] ## skip downloading body if header has no body + + BeaconJobMode* = enum + bjmGetBody ## when setSyncTarget done, download the body + bjmGetBlocks ## download blocks to fill skeleton gaps + bjmGetBodies ## if bjmGetBlocks failed to download bodies, give it to other peer + + BeaconJob* = ref object + case mode*: BeaconJobMode + of bjmGetBody: + getBodyJob*: BeaconJobGetBody + of bjmGetBlocks: + getBlocksJob*: BeaconJobGetBlocks + of bjmGetBodies: + getBodiesJob*: BeaconJobGetBodies + BeaconBuddyData* = object ## Local descriptor data extension + job* : BeaconJob + requeue*: seq[BeaconJob] + + TargetQueue* = KeyedQueue[Hash256, BlockHeader] + HeaderInterval* = IntervalSetRef[uint64, uint64] BeaconCtxData* = object ## Globally shared data extension - rng*: ref HmacDrbgContext ## Random generator, pre-initialised - id*: int + rng* : ref HmacDrbgContext ## Random generator, pre-initialised + id* : int ## Instance id, for debugging purpose + skeleton*: SkeletonRef ## Core algorithm, tracking both canonical and side chain + mode* : set[BeaconMode] ## Do one thing at a time + target* : TargetQueue ## Sync target list + jobs* : Deque[BeaconJob] ## Each buddy can get a job from here + mask* : HeaderInterval ## Skeleton gaps need to be downloaded + pulled* : HeaderInterval ## Downloaded skeleton blocks BeaconBuddyRef* = BuddyRef[BeaconCtxData,BeaconBuddyData] ## Extended worker peer descriptor @@ -30,4 +89,9 @@ type BeaconCtxRef* = CtxRef[BeaconCtxData] ## Extended global descriptor +const + MaxGetBlocks* = 64 + MaxJobsQueue* = 32 + MissingBody* = -1 + # End diff --git a/nimbus/utils/debug.nim b/nimbus/utils/debug.nim index 8b1b753a8..f4b1faf2e 100644 --- a/nimbus/utils/debug.nim +++ b/nimbus/utils/debug.nim @@ -14,7 +14,8 @@ import stew/byteutils, ../vm_state, ../vm_types, - ../db/accounts_cache + ../db/accounts_cache, + ./utils proc `$`(hash: Hash256): string = hash.data.toHex @@ -143,3 +144,29 @@ proc debug*(tx: Transaction): string = result.add "V : " & $tx.V & "\n" result.add "R : " & $tx.R & "\n" result.add "S : " & $tx.S & "\n" + +proc debugSum*(h: BlockHeader): string = + result.add "txRoot : " & $h.txRoot & "\n" + result.add "ommersHash : " & $h.ommersHash & "\n" + if h.withdrawalsRoot.isSome: + result.add "withdrawalsRoot: " & $h.withdrawalsRoot.get() & "\n" + result.add "sumHash : " & $sumHash(h) & "\n" + +proc debugSum*(body: BlockBody): string = + let ommersHash = keccakHash(rlp.encode(body.uncles)) + let txRoot = calcTxRoot(body.transactions) + let wdRoot = if body.withdrawals.isSome: + calcWithdrawalsRoot(body.withdrawals.get) + else: EMPTY_ROOT_HASH + let numwd = if body.withdrawals.isSome: + $body.withdrawals.get().len + else: + "none" + result.add "txRoot : " & $txRoot & "\n" + result.add "ommersHash : " & $ommersHash & "\n" + if body.withdrawals.isSome: + result.add "wdRoot : " & $wdRoot & "\n" + result.add "num tx : " & $body.transactions.len & "\n" + result.add "num uncles : " & $body.uncles.len & "\n" + result.add "num wd : " & numwd & "\n" + result.add "sumHash : " & $sumHash(body) & "\n" diff --git a/nimbus/utils/utils.nim b/nimbus/utils/utils.nim index 2dd0b36de..7004d458a 100644 --- a/nimbus/utils/utils.nim +++ b/nimbus/utils/utils.nim @@ -1,5 +1,5 @@ import - std/math, + std/[math, times, strutils], eth/[rlp, common/eth_types_rlp], stew/byteutils, nimcrypto, @@ -47,6 +47,11 @@ proc sumHash*(header: BlockHeader): Hash256 = else: EMPTY_ROOT_HASH sumHash(header.txRoot, header.ommersHash, wdRoot) +func hasBody*(h: BlockHeader): bool = + h.txRoot != EMPTY_ROOT_HASH or + h.ommersHash != EMPTY_UNCLE_HASH or + h.withdrawalsRoot.get(EMPTY_ROOT_HASH) != EMPTY_ROOT_HASH + func generateAddress*(address: EthAddress, nonce: AccountNonce): EthAddress = result[0..19] = keccakHash(rlp.encodeList(address, nonce)).data.toOpenArray(12, 31) @@ -89,6 +94,16 @@ proc short*(h: Hash256): string = bytes[^3..^1] = h.data[^3..^1] bytes.toHex +func short*(x: Duration): string = + let parts = x.toParts + if parts[Hours] > 0: + result.add $parts[Hours] + result.add ':' + + result.add intToStr(parts[Minutes].int, 2) + result.add ':' + result.add intToStr(parts[Seconds].int, 2) + proc decompose*(rlp: var Rlp, header: var BlockHeader, body: var BlockBody) {.gcsafe, raises: [RlpError].} =