From 5fb0fc65ba22e2de56d50f2ce4742687ab481fda Mon Sep 17 00:00:00 2001 From: jangko Date: Mon, 14 Aug 2023 17:26:38 +0700 Subject: [PATCH] Implement beacon sync stub - Prepare a test env for beacon sync in engine api simulator. - Wiring beacon sync to the rest of subsystems. --- .../nodocker/engine/engine/engine_spec.nim | 186 +++++++++--------- .../nodocker/engine/engine_env.nim | 43 +++- hive_integration/nodocker/engine/test_env.nim | 10 +- .../nodocker/engine/withdrawal_tests.nim | 4 +- nimbus/beacon/beacon_engine.nim | 2 +- nimbus/nimbus.nim | 31 ++- nimbus/sync/beacon.nim | 150 ++++++++++++++ nimbus/sync/beacon/worker.nim | 171 ++++++++++++++++ nimbus/sync/beacon/worker_desc.nim | 33 ++++ 9 files changed, 512 insertions(+), 118 deletions(-) create mode 100644 nimbus/sync/beacon.nim create mode 100644 nimbus/sync/beacon/worker.nim create mode 100644 nimbus/sync/beacon/worker_desc.nim diff --git a/hive_integration/nodocker/engine/engine/engine_spec.nim b/hive_integration/nodocker/engine/engine/engine_spec.nim index 037c52067..b9a60be85 100644 --- a/hive_integration/nodocker/engine/engine/engine_spec.nim +++ b/hive_integration/nodocker/engine/engine/engine_spec.nim @@ -51,7 +51,7 @@ template testLatestHeader(client: untyped, expectedHash: Web3Hash) = expect = expectedHash, get = lastHash -#proc sendTx(t: TestEnv, recipient: EthAddress, val: UInt256, data: openArray[byte] = []): bool = +#proc sendTx(env: TestEnv, recipient: EthAddress, val: UInt256, data: openArray[byte] = []): bool = # t.tx = t.makeTx(recipient, val, data) # let rr = env.client.sendTransaction(t.tx) # if rr.isErr: @@ -59,7 +59,7 @@ template testLatestHeader(client: untyped, expectedHash: Web3Hash) = # return false # return true # -#proc sendTx(t: TestEnv, val: UInt256): bool = +#proc sendTx(env: TestEnv, val: UInt256): bool = # t.sendTx(prevRandaoContractAddr, val) # Invalid Terminal Block in ForkchoiceUpdated: @@ -88,7 +88,7 @@ proc invalidTerminalBlockForkchoiceUpdated*(env: TestEnv): bool = #[ # Invalid GetPayload Under PoW: Client must reject GetPayload directives under PoW. -proc invalidGetPayloadUnderPoW(t: TestEnv): TestStatus = +proc invalidGetPayloadUnderPoW(env: TestEnv): TestStatus = result = TestStatus.OK # We start in PoW and try to get an invalid Payload, which should produce an error but nothing should be disrupted. @@ -101,7 +101,7 @@ proc invalidGetPayloadUnderPoW(t: TestEnv): TestStatus = # Invalid Terminal Block in NewPayload: # Client must reject NewPayload directives if the referenced ParentHash does not meet the TTD requirement. -proc invalidTerminalBlockNewPayload(t: TestEnv): TestStatus = +proc invalidTerminalBlockNewPayload(env: TestEnv): TestStatus = result = TestStatus.OK let gBlock = t.gHeader @@ -126,16 +126,16 @@ proc invalidTerminalBlockNewPayload(t: TestEnv): TestStatus = # Check that PoW chain progresses testCond t.verifyPoWProgress(t.gHeader.blockHash) -proc unknownHeadBlockHash(t: TestEnv): TestStatus = +proc unknownHeadBlockHash(env: TestEnv): TestStatus = result = TestStatus.OK - let ok = waitFor t.clMock.waitForTTD() + let ok = waitFor env.clMock.waitForTTD() testCond ok var randomHash: common.Hash256 testCond randomBytes(randomHash.data) == 32 - let clMock = t.clMock + let clMock = env.clMock let forkchoiceStateUnknownHeadHash = ForkchoiceStateV1( headBlockHash: BlockHash randomHash.data, safeBlockHash: clMock.latestForkchoice.finalizedBlockHash, @@ -163,17 +163,17 @@ proc unknownHeadBlockHash(t: TestEnv): TestStatus = testCond s.payloadStatus.status == PayloadExecutionStatus.syncing testCond s.payloadId.isNone -proc unknownSafeBlockHash(t: TestEnv): TestStatus = +proc unknownSafeBlockHash(env: TestEnv): TestStatus = result = TestStatus.OK - let ok = waitFor t.clMock.waitForTTD() + let ok = waitFor env.clMock.waitForTTD() testCond ok # Produce blocks before starting the test - let produce5BlockRes = t.clMock.produceBlocks(5, BlockProcessCallbacks()) + let produce5BlockRes = env.clMock.produceBlocks(5, BlockProcessCallbacks()) testCond produce5BlockRes - let clMock = t.clMock + let clMock = env.clMock let client = env.client let produceSingleBlockRes = clMock.produceSingleBlock(BlockProcessCallbacks( # Run test after a new payload has been broadcast @@ -196,17 +196,17 @@ proc unknownSafeBlockHash(t: TestEnv): TestStatus = testCond produceSingleBlockRes -proc unknownFinalizedBlockHash(t: TestEnv): TestStatus = +proc unknownFinalizedBlockHash(env: TestEnv): TestStatus = result = TestStatus.OK - let ok = waitFor t.clMock.waitForTTD() + let ok = waitFor env.clMock.waitForTTD() testCond ok # Produce blocks before starting the test - let produce5BlockRes = t.clMock.produceBlocks(5, BlockProcessCallbacks()) + let produce5BlockRes = env.clMock.produceBlocks(5, BlockProcessCallbacks()) testCond produce5BlockRes - let clMock = t.clMock + let clMock = env.clMock let client = env.client let produceSingleBlockRes = clMock.produceSingleBlock(BlockProcessCallbacks( # Run test after a new payload has been broadcast @@ -250,15 +250,15 @@ type alternativePayloads: seq[ExecutableData] template inconsistentForkchoiceStateGen(procname: untyped, inconsistency: Inconsistency) = - proc procName(t: TestEnv): TestStatus = + proc procName(env: TestEnv): TestStatus = result = TestStatus.OK # Wait until TTD is reached by this client - let ok = waitFor t.clMock.waitForTTD() + let ok = waitFor env.clMock.waitForTTD() testCond ok var pList = PayloadList() - let clMock = t.clMock + let clMock = env.clMock let client = env.client # Produce blocks before starting the test @@ -320,14 +320,14 @@ inconsistentForkchoiceStateGen(inconsistentForkchoiceState3, Inconsistency.Final # Verify behavior on a forkchoiceUpdated with invalid payload attributes template invalidPayloadAttributesGen(procname: untyped, syncingCond: bool) = - proc procName(t: TestEnv): TestStatus = + proc procName(env: TestEnv): TestStatus = result = TestStatus.OK # Wait until TTD is reached by this client - let ok = waitFor t.clMock.waitForTTD() + let ok = waitFor env.clMock.waitForTTD() testCond ok - let clMock = t.clMock + let clMock = env.clMock let client = env.client # Produce blocks before starting the test @@ -378,14 +378,14 @@ template invalidPayloadAttributesGen(procname: untyped, syncingCond: bool) = invalidPayloadAttributesGen(invalidPayloadAttributes1, false) invalidPayloadAttributesGen(invalidPayloadAttributes2, true) -proc preTTDFinalizedBlockHash(t: TestEnv): TestStatus = +proc preTTDFinalizedBlockHash(env: TestEnv): TestStatus = result = TestStatus.OK - let ok = waitFor t.clMock.waitForTTD() + let ok = waitFor env.clMock.waitForTTD() testCond ok # Produce blocks before starting the test - let produce5BlockRes = t.clMock.produceBlocks(5, BlockProcessCallbacks()) + let produce5BlockRes = env.clMock.produceBlocks(5, BlockProcessCallbacks()) testCond produce5BlockRes let @@ -396,7 +396,7 @@ proc preTTDFinalizedBlockHash(t: TestEnv): TestStatus = finalizedBlockHash: gHash, ) client = env.client - clMock = t.clMock + clMock = env.clMock var res = client.forkchoiceUpdatedV1(forkchoiceState) testFCU(res, invalid, some(common.Hash256())) @@ -435,17 +435,17 @@ type hash: common.Hash256 template badHashOnNewPayloadGen(procname: untyped, syncingCond: bool, sideChain: bool) = - proc procName(t: TestEnv): TestStatus = + proc procName(env: TestEnv): TestStatus = result = TestStatus.OK - let ok = waitFor t.clMock.waitForTTD() + let ok = waitFor env.clMock.waitForTTD() testCond ok # Produce blocks before starting the test - let produce5BlockRes = t.clMock.produceBlocks(5, BlockProcessCallbacks()) + let produce5BlockRes = env.clMock.produceBlocks(5, BlockProcessCallbacks()) testCond produce5BlockRes - let clMock = t.clMock + let clMock = env.clMock let client = env.client let shadow = Shadow() @@ -530,18 +530,18 @@ badHashOnNewPayloadGen(badHashOnNewPayload2, true, false) badHashOnNewPayloadGen(badHashOnNewPayload3, false, true) badHashOnNewPayloadGen(badHashOnNewPayload4, true, true) -proc parentHashOnExecPayload(t: TestEnv): TestStatus = +proc parentHashOnExecPayload(env: TestEnv): TestStatus = result = TestStatus.OK # Wait until TTD is reached by this client - let ok = waitFor t.clMock.waitForTTD() + let ok = waitFor env.clMock.waitForTTD() testCond ok # Produce blocks before starting the test - let produce5BlockRes = t.clMock.produceBlocks(5, BlockProcessCallbacks()) + let produce5BlockRes = env.clMock.produceBlocks(5, BlockProcessCallbacks()) testCond produce5BlockRes - let clMock = t.clMock + let clMock = env.clMock let client = env.client var produceSingleBlockRes = clMock.produceSingleBlock(BlockProcessCallbacks( # Run test after the new payload has been obtained @@ -560,14 +560,14 @@ proc parentHashOnExecPayload(t: TestEnv): TestStatus = testCond produceSingleBlockRes # Attempt to re-org to a chain containing an invalid transition payload -proc invalidTransitionPayload(t: TestEnv): TestStatus = +proc invalidTransitionPayload(env: TestEnv): TestStatus = result = TestStatus.OK # Wait until TTD is reached by main client - let ok = waitFor t.clMock.waitForTTD() + let ok = waitFor env.clMock.waitForTTD() testCond ok - let clMock = t.clMock + let clMock = env.clMock let client = env.client # Produce two blocks before trying to re-org @@ -603,14 +603,14 @@ proc invalidTransitionPayload(t: TestEnv): TestStatus = testCond pbRes template invalidPayloadTestCaseGen(procName: untyped, payloadField: InvalidPayloadField, emptyTxs: bool = false) = - proc procName(t: TestEnv): TestStatus = + proc procName(env: TestEnv): TestStatus = result = TestStatus.OK # Wait until TTD is reached by this client - let ok = waitFor t.clMock.waitForTTD() + let ok = waitFor env.clMock.waitForTTD() testCond ok - let clMock = t.clMock + let clMock = env.clMock let client = env.client template txProc(): bool = @@ -763,19 +763,19 @@ invalidPayloadTestCaseGen(invalidPayload15, InvalidTransactionValue) # Test to verify Block information available at the Eth RPC after NewPayload template blockStatusExecPayloadGen(procname: untyped, transitionBlock: bool) = - proc procName(t: TestEnv): TestStatus = + proc procName(env: TestEnv): TestStatus = result = TestStatus.OK # Wait until TTD is reached by this client - let ok = waitFor t.clMock.waitForTTD() + let ok = waitFor env.clMock.waitForTTD() testCond ok # Produce blocks before starting the test, only if we are not testing the transition block when not transitionBlock: - let produce5BlockRes = t.clMock.produceBlocks(5, BlockProcessCallbacks()) + let produce5BlockRes = env.clMock.produceBlocks(5, BlockProcessCallbacks()) testCond produce5BlockRes - let clMock = t.clMock + let clMock = env.clMock let client = env.client let shadow = Shadow() @@ -828,14 +828,14 @@ type template invalidMissingAncestorReOrgGen(procName: untyped, invalid_index: int, payloadField: InvalidPayloadField, p2psync: bool, emptyTxs: bool) = - proc procName(t: TestEnv): TestStatus = + proc procName(env: TestEnv): TestStatus = result = TestStatus.OK # Wait until TTD is reached by this client - let ok = waitFor t.clMock.waitForTTD() + let ok = waitFor env.clMock.waitForTTD() testCond ok - let clMock = t.clMock + let clMock = env.clMock let client = env.client # Produce blocks before starting the test @@ -942,19 +942,19 @@ invalidMissingAncestorReOrgGen(invalidMissingAncestor2, 9, InvalidStateRoot, fal invalidMissingAncestorReOrgGen(invalidMissingAncestor3, 10, InvalidStateRoot, false, true) template blockStatusHeadBlockGen(procname: untyped, transitionBlock: bool) = - proc procName(t: TestEnv): TestStatus = + proc procName(env: TestEnv): TestStatus = result = TestStatus.OK # Wait until TTD is reached by this client - let ok = waitFor t.clMock.waitForTTD() + let ok = waitFor env.clMock.waitForTTD() testCond ok # Produce blocks before starting the test, only if we are not testing the transition block when not transitionBlock: - let produce5BlockRes = t.clMock.produceBlocks(5, BlockProcessCallbacks()) + let produce5BlockRes = env.clMock.produceBlocks(5, BlockProcessCallbacks()) testCond produce5BlockRes - let clMock = t.clMock + let clMock = env.clMock let client = env.client let shadow = Shadow() @@ -981,10 +981,10 @@ template blockStatusHeadBlockGen(procname: untyped, transitionBlock: bool) = blockStatusHeadBlockGen(blockStatusHeadBlock1, false) blockStatusHeadBlockGen(blockStatusHeadBlock2, true) -proc blockStatusSafeBlock(t: TestEnv): TestStatus = +proc blockStatusSafeBlock(env: TestEnv): TestStatus = result = TestStatus.OK - let clMock = t.clMock + let clMock = env.clMock let client = env.client # On PoW mode, `safe` tag shall return error. @@ -993,7 +993,7 @@ proc blockStatusSafeBlock(t: TestEnv): TestStatus = testCond rr.isErr # Wait until this client catches up with latest PoS Block - let ok = waitFor t.clMock.waitForTTD() + let ok = waitFor env.clMock.waitForTTD() testCond ok # First ForkchoiceUpdated sent was equal to 0x00..00, `safe` should return error now @@ -1012,10 +1012,10 @@ proc blockStatusSafeBlock(t: TestEnv): TestStatus = testCond pbres -proc blockStatusFinalizedBlock(t: TestEnv): TestStatus = +proc blockStatusFinalizedBlock(env: TestEnv): TestStatus = result = TestStatus.OK - let clMock = t.clMock + let clMock = env.clMock let client = env.client # On PoW mode, `finalized` tag shall return error. @@ -1024,7 +1024,7 @@ proc blockStatusFinalizedBlock(t: TestEnv): TestStatus = testCond rr.isErr # Wait until this client catches up with latest PoS Block - let ok = waitFor t.clMock.waitForTTD() + let ok = waitFor env.clMock.waitForTTD() testCond ok # First ForkchoiceUpdated sent was equal to 0x00..00, `finalized` should return error now @@ -1043,18 +1043,18 @@ proc blockStatusFinalizedBlock(t: TestEnv): TestStatus = testCond pbres -proc blockStatusReorg(t: TestEnv): TestStatus = +proc blockStatusReorg(env: TestEnv): TestStatus = result = TestStatus.OK # Wait until TTD is reached by this client - let ok = waitFor t.clMock.waitForTTD() + let ok = waitFor env.clMock.waitForTTD() testCond ok # Produce blocks before starting the test - let produce5BlockRes = t.clMock.produceBlocks(5, BlockProcessCallbacks()) + let produce5BlockRes = env.clMock.produceBlocks(5, BlockProcessCallbacks()) testCond produce5BlockRes - let clMock = t.clMock + let clMock = env.clMock let client = env.client var produceSingleBlockRes = clMock.produceSingleBlock(BlockProcessCallbacks( # Run test after a forkchoice with new HeadBlockHash has been broadcasted @@ -1130,18 +1130,18 @@ proc blockStatusReorg(t: TestEnv): TestStatus = )) testCond produceSingleBlockRes -proc reExecPayloads(t: TestEnv): TestStatus = +proc reExecPayloads(env: TestEnv): TestStatus = result = TestStatus.OK # Wait until this client catches up with latest PoS - let ok = waitFor t.clMock.waitForTTD() + let ok = waitFor env.clMock.waitForTTD() testCond ok # How many Payloads we are going to re-execute var payloadReExecCount = 10 # Create those blocks - let produceBlockRes = t.clMock.produceBlocks(payloadReExecCount, BlockProcessCallbacks()) + let produceBlockRes = env.clMock.produceBlocks(payloadReExecCount, BlockProcessCallbacks()) testCond produceBlockRes # Re-execute the payloads @@ -1154,7 +1154,7 @@ proc reExecPayloads(t: TestEnv): TestStatus = info "Started re-executing payloads at block", number=lastBlock let - clMock = t.clMock + clMock = env.clMock start = lastBlock - payloadReExecCount + 1 for i in start..lastBlock: @@ -1171,18 +1171,18 @@ proc reExecPayloads(t: TestEnv): TestStatus = testCond true: error "(test issue) Payload does not exist", index=i -proc multipleNewCanonicalPayloads(t: TestEnv): TestStatus = +proc multipleNewCanonicalPayloads(env: TestEnv): TestStatus = result = TestStatus.OK # Wait until TTD is reached by this client - let ok = waitFor t.clMock.waitForTTD() + let ok = waitFor env.clMock.waitForTTD() testCond ok # Produce blocks before starting the test - let produce5BlockRes = t.clMock.produceBlocks(5, BlockProcessCallbacks()) + let produce5BlockRes = env.clMock.produceBlocks(5, BlockProcessCallbacks()) testCond produce5BlockRes - let clMock = t.clMock + let clMock = env.clMock let client = env.client var produceSingleBlockRes = clMock.produceSingleBlock(BlockProcessCallbacks( # Run test after a new payload has been obtained @@ -1213,11 +1213,11 @@ proc multipleNewCanonicalPayloads(t: TestEnv): TestStatus = # At the end the clMocker continues to try to execute fcU with the original payload, which should not fail testCond produceSingleBlockRes -proc outOfOrderPayloads(t: TestEnv): TestStatus = +proc outOfOrderPayloads(env: TestEnv): TestStatus = result = TestStatus.OK # Wait until TTD is reached by this client - let ok = waitFor t.clMock.waitForTTD() + let ok = waitFor env.clMock.waitForTTD() testCond ok # First prepare payloads on a first client, which will also contain multiple transactions @@ -1232,7 +1232,7 @@ proc outOfOrderPayloads(t: TestEnv): TestStatus = var recipient: EthAddress doAssert randomBytes(recipient) == 20 - let clMock = t.clMock + let clMock = env.clMock let client = env.client var produceBlockRes = clMock.produceBlocks(payloadCount, BlockProcessCallbacks( # We send the transactions after we got the Payload ID, before the clMocker gets the prepared Payload @@ -1257,14 +1257,14 @@ proc outOfOrderPayloads(t: TestEnv): TestStatus = # Test that performing a re-org back into a previous block of the canonical chain does not produce errors and the chain # is still capable of progressing. -proc reorgBack(t: TestEnv): TestStatus = +proc reorgBack(env: TestEnv): TestStatus = result = TestStatus.OK # Wait until TTD is reached by this client - let ok = waitFor t.clMock.waitForTTD() + let ok = waitFor env.clMock.waitForTTD() testCond ok - let clMock = t.clMock + let clMock = env.clMock let client = env.client testCond clMock.produceSingleBlock(BlockProcessCallbacks()) @@ -1298,16 +1298,16 @@ type SideChainList = ref object sidechainPayloads: seq[ExecutionPayloadV1] -proc reorgBackFromSyncing(t: TestEnv): TestStatus = +proc reorgBackFromSyncing(env: TestEnv): TestStatus = result = TestStatus.OK # Wait until TTD is reached by this client - let ok = waitFor t.clMock.waitForTTD() + let ok = waitFor env.clMock.waitForTTD() testCond ok # Produce an alternative chain let pList = SideChainList() - let clMock = t.clMock + let clMock = env.clMock let client = env.client let r1 = clMock.produceBlocks(10, BlockProcessCallbacks( @@ -1371,15 +1371,15 @@ type noTxnPayload: ExecutionPayloadV1 txHash: common.Hash256 -proc transactionReorg(t: TestEnv): TestStatus = +proc transactionReorg(env: TestEnv): TestStatus = result = TestStatus.OK # Wait until TTD is reached by this client - let ok = waitFor t.clMock.waitForTTD() + let ok = waitFor env.clMock.waitForTTD() testCond ok # Produce blocks before starting the test - testCond t.clMock.produceBlocks(5, BlockProcessCallbacks()) + testCond env.clMock.produceBlocks(5, BlockProcessCallbacks()) # Create transactions that modify the state in order to testCond after the reorg. const @@ -1388,7 +1388,7 @@ proc transactionReorg(t: TestEnv): TestStatus = let client = env.client - clMock = t.clMock + clMock = env.clMock shadow = TxReorgShadow() for i in 0.. 0: case conf.syncMode: of SyncMode.Default: - nimbus.legaSyncRef.start - nimbus.ethNode.setEthHandlerNewBlocksAndHashes( - legacy.newBlockHandler, - legacy.newBlockHashesHandler, - cast[pointer](nimbus.legaSyncRef)) + if com.forkGTE(MergeFork): + nimbus.beaconSyncRef.start + else: + nimbus.legaSyncRef.start + nimbus.ethNode.setEthHandlerNewBlocksAndHashes( + legacy.newBlockHandler, + legacy.newBlockHashesHandler, + cast[pointer](nimbus.legaSyncRef)) of SyncMode.Full: nimbus.fullSyncRef.start of SyncMode.Stateless: @@ -474,6 +483,8 @@ proc stop*(nimbus: NimbusNode, conf: NimbusConf) {.async, gcsafe.} = nimbus.snapSyncRef.stop() if nimbus.fullSyncRef.isNil.not: nimbus.fullSyncRef.stop() + if nimbus.beaconSyncRef.isNil.not: + nimbus.beaconSyncRef.stop() proc process*(nimbus: NimbusNode, conf: NimbusConf) = # Main event loop diff --git a/nimbus/sync/beacon.nim b/nimbus/sync/beacon.nim new file mode 100644 index 000000000..c60a4a5a4 --- /dev/null +++ b/nimbus/sync/beacon.nim @@ -0,0 +1,150 @@ +# Nimbus +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +{.push raises: [].} + +import + eth/p2p, + chronicles, + chronos, + stew/[interval_set, sorted_set], + ./beacon/[worker, worker_desc], + "."/[sync_desc, sync_sched, protocol] + +logScope: + topics = "beacon-sync" + +type + BeaconSyncRef* = RunnerSyncRef[BeaconCtxData,BeaconBuddyData] + +const + extraTraceMessages = false # or true + ## Enable additional logging noise + +# ------------------------------------------------------------------------------ +# Private logging helpers +# ------------------------------------------------------------------------------ + +template traceMsg(f, info: static[string]; args: varargs[untyped]) = + trace "Snap scheduler " & f & "() " & info, args + +template traceMsgCtx(f, info: static[string]; c: BeaconCtxRef) = + when extraTraceMessages: + block: + let + poolMode {.inject.} = c.poolMode + daemon {.inject.} = c.daemon + f.traceMsg info, poolMode, daemon + +template traceMsgBuddy(f, info: static[string]; b: BeaconBuddyRef) = + when extraTraceMessages: + block: + let + peer {.inject.} = b.peer + runState {.inject.} = b.ctrl.state + multiOk {.inject.} = b.ctrl.multiOk + poolMode {.inject.} = b.ctx.poolMode + daemon {.inject.} = b.ctx.daemon + f.traceMsg info, peer, runState, multiOk, poolMode, daemon + + +template tracerFrameCtx(f: static[string]; c: BeaconCtxRef; code: untyped) = + f.traceMsgCtx "begin", c + code + f.traceMsgCtx "end", c + +template tracerFrameBuddy(f: static[string]; b: BeaconBuddyRef; code: untyped) = + f.traceMsgBuddy "begin", b + code + f.traceMsgBuddy "end", b + +# ------------------------------------------------------------------------------ +# Virtual methods/interface, `mixin` functions +# ------------------------------------------------------------------------------ + +proc runSetup(ctx: BeaconCtxRef): bool = + tracerFrameCtx("runSetup", ctx): + result = worker.setup(ctx) + +proc runRelease(ctx: BeaconCtxRef) = + tracerFrameCtx("runRelease", ctx): + worker.release(ctx) + +proc runDaemon(ctx: BeaconCtxRef) {.async.} = + tracerFrameCtx("runDaemon", ctx): + await worker.runDaemon(ctx) + +proc runStart(buddy: BeaconBuddyRef): bool = + tracerFrameBuddy("runStart", buddy): + result = worker.start(buddy) + +proc runStop(buddy: BeaconBuddyRef) = + tracerFrameBuddy("runStop", buddy): + worker.stop(buddy) + +proc runPool(buddy: BeaconBuddyRef; last: bool; laps: int): bool = + tracerFrameBuddy("runPool", buddy): + result = worker.runPool(buddy, last, laps) + +proc runSingle(buddy: BeaconBuddyRef) {.async.} = + tracerFrameBuddy("runSingle", buddy): + await worker.runSingle(buddy) + +proc runMulti(buddy: BeaconBuddyRef) {.async.} = + tracerFrameBuddy("runMulti", buddy): + await worker.runMulti(buddy) + +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +proc updateBeaconHeaderCB(ctx: BeaconSyncRef): SyncReqNewHeadCB = + ## Update beacon header. This function is intended as a call back function + ## for the RPC module. + result = proc(h: BlockHeader) {.gcsafe, raises: [].} = + try: + debugEcho "REQUEST SYNC TO: ", h.blockNumber + debugEcho "REQUEST SYNC TO: ", h.blockHash + except CatchableError as ex: + debugEcho ex.msg + +proc enableRpcMagic(ctx: BeaconSyncRef) = + ## Helper for `setup()`: Enable external pivot update via RPC + let com = ctx.ctx.chain.com + com.syncReqNewHead = ctx.updateBeaconHeaderCB + com.syncReqRelaxV2 = true + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc init*( + T: type BeaconSyncRef; + ethNode: EthereumNode; + chain: ChainRef; + rng: ref HmacDrbgContext; + maxPeers: int; + id: int = 0): T = + new result + result.initSync(ethNode, chain, maxPeers, none(string)) + result.ctx.pool.rng = rng + result.ctx.pool.id = id + +proc start*(ctx: BeaconSyncRef) = + ## Beacon Sync always begin with stop mode + doAssert ctx.startSync() # Initialize subsystems + ctx.enableRpcMagic() # Allow external pivot update via RPC + +proc stop*(ctx: BeaconSyncRef) = + ctx.stopSync() + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/beacon/worker.nim b/nimbus/sync/beacon/worker.nim new file mode 100644 index 000000000..43f7ce64d --- /dev/null +++ b/nimbus/sync/beacon/worker.nim @@ -0,0 +1,171 @@ +# Nimbus +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at +# https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at +# https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +{.push raises:[].} + +import + chronicles, + chronos, + eth/p2p, + ".."/[protocol, sync_desc], + ./worker_desc + +logScope: + topics = "beacon-buddy" + +const + extraTraceMessages = false # or true + ## Enabled additional logging noise + + FirstPivotSeenTimeout = 3.minutes + ## Turn on relaxed pivot negotiation after some waiting time when there + ## was a `peer` seen but was rejected. This covers a rare event. Typically + ## useless peers do not appear ready for negotiation. + + FirstPivotAcceptedTimeout = 50.seconds + ## Turn on relaxed pivot negotiation after some waiting time when there + ## was a `peer` accepted but no second one yet. + +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +proc pp(n: BlockNumber): string = + ## Dedicated pretty printer (`$` is defined elsewhere using `UInt256`) + if n == high(BlockNumber): "high" else:"#" & $n + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + + +# ------------------------------------------------------------------------------ +# Public start/stop and admin functions +# ------------------------------------------------------------------------------ + +proc setup*(ctx: BeaconCtxRef): bool = + ## Global set up + #ctx.pool.pivot = BestPivotCtxRef.init(ctx.pool.rng) + + true + +proc release*(ctx: BeaconCtxRef) = + ## Global clean up + #ctx.pool.pivot = nil + +proc start*(buddy: BeaconBuddyRef): bool = + ## Initialise worker peer + let + ctx = buddy.ctx + peer = buddy.peer + if peer.supports(protocol.eth) and + peer.state(protocol.eth).initialized: + + ctx.daemon = true + return true + +proc stop*(buddy: BeaconBuddyRef) = + ## Clean up this peer + buddy.ctrl.stopped = true + + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc runDaemon*(ctx: BeaconCtxRef) {.async.} = + ## Global background job that will be re-started as long as the variable + ## `ctx.daemon` is set `true`. If that job was stopped due to re-setting + ## `ctx.daemon` to `false`, it will be restarted next after it was reset + ## as `true` not before there is some activity on the `runPool()`, + ## `runSingle()`, or `runMulti()` functions. + ## + + debugEcho "RUNDAEMON: ", ctx.pool.id + ctx.daemon = false + + # Without waiting, this function repeats every 50ms (as set with the constant + # `sync_sched.execLoopTimeElapsedMin`.) Larger waiting time cleans up logging. + await sleepAsync 300.milliseconds + + +proc runSingle*(buddy: BeaconBuddyRef) {.async.} = + ## This peer worker is invoked if the peer-local flag `buddy.ctrl.multiOk` + ## is set `false` which is the default mode. This flag is updated by the + ## worker when deemed appropriate. + ## * For all workers, there can be only one `runSingle()` function active + ## simultaneously for all worker peers. + ## * There will be no `runMulti()` function active for the same worker peer + ## simultaneously + ## * There will be no `runPool()` iterator active simultaneously. + ## + ## Note that this function runs in `async` mode. + ## + let + ctx = buddy.ctx + peer {.used.} = buddy.peer + + debugEcho "RUNSINGLE: ", ctx.pool.id + + if buddy.ctrl.stopped: + when extraTraceMessages: + trace "Single mode stopped", peer, pivotState=ctx.pool.pivotState + return # done with this buddy + + var napping = 2.seconds + when extraTraceMessages: + trace "Single mode end", peer, napping + + # Without waiting, this function repeats every 50ms (as set with the constant + # `sync_sched.execLoopTimeElapsedMin`.) + await sleepAsync napping + + +proc runPool*(buddy: BeaconBuddyRef; last: bool; laps: int): bool = + ## Once started, the function `runPool()` is called for all worker peers in + ## sequence as the body of an iteration as long as the function returns + ## `false`. There will be no other worker peer functions activated + ## simultaneously. + ## + ## This procedure is started if the global flag `buddy.ctx.poolMode` is set + ## `true` (default is `false`.) It will be automatically reset before the + ## the loop starts. Re-setting it again results in repeating the loop. The + ## argument `lap` (starting with `0`) indicated the currend lap of the + ## repeated loops. + ## + ## The argument `last` is set `true` if the last entry is reached. + ## + ## Note that this function does not run in `async` mode. + ## + let + ctx = buddy.ctx + + debugEcho "RUNPOOL: ", ctx.pool.id + + true # Stop after running once regardless of peer + +proc runMulti*(buddy: BeaconBuddyRef) {.async.} = + ## This peer worker is invoked if the `buddy.ctrl.multiOk` flag is set + ## `true` which is typically done after finishing `runSingle()`. This + ## instance can be simultaneously active for all peer workers. + ## + let + ctx = buddy.ctx + + debugEcho "RUNMULTI: ", ctx.pool.id + + # Update persistent database + #while not buddy.ctrl.stopped: + # Allow thread switch as `persistBlocks()` might be slow + await sleepAsync(10.milliseconds) + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/beacon/worker_desc.nim b/nimbus/sync/beacon/worker_desc.nim new file mode 100644 index 000000000..81d34f683 --- /dev/null +++ b/nimbus/sync/beacon/worker_desc.nim @@ -0,0 +1,33 @@ +# Nimbus +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at +# https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at +# https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +{.push raises:[].} + +import + eth/p2p, + chronos, + ../sync_desc + +type + BeaconBuddyData* = object + ## Local descriptor data extension + + BeaconCtxData* = object + ## Globally shared data extension + rng*: ref HmacDrbgContext ## Random generator, pre-initialised + id*: int + + BeaconBuddyRef* = BuddyRef[BeaconCtxData,BeaconBuddyData] + ## Extended worker peer descriptor + + BeaconCtxRef* = CtxRef[BeaconCtxData] + ## Extended global descriptor + +# End