diff --git a/nimbus/sync/full.nim b/nimbus/sync/full.nim index d939c7db8..a85f272fc 100644 --- a/nimbus/sync/full.nim +++ b/nimbus/sync/full.nim @@ -113,7 +113,7 @@ proc init*( enableTicker = false): T = new result result.initSync(ethNode, chain, maxPeers, enableTicker) - result.ctx.data.rng = rng + result.ctx.pool.rng = rng proc start*(ctx: FullSyncRef) = doAssert ctx.startSync() diff --git a/nimbus/sync/full/worker.nim b/nimbus/sync/full/worker.nim index 1917b0569..fe8667597 100644 --- a/nimbus/sync/full/worker.nim +++ b/nimbus/sync/full/worker.nim @@ -98,7 +98,7 @@ proc topUsedNumber( proc tickerUpdater(ctx: FullCtxRef): TickerStatsUpdater = result = proc: TickerStats = var stats: BlockQueueStats - ctx.data.bCtx.blockQueueStats(stats) + ctx.pool.bCtx.blockQueueStats(stats) TickerStats( topPersistent: stats.topAccepted, @@ -116,7 +116,7 @@ proc processStaged(buddy: FullBuddyRef): bool = peer = buddy.peer chainDb = buddy.ctx.chain.db chain = buddy.ctx.chain - bq = buddy.data.bQueue + bq = buddy.only.bQueue # Get a work item, a list of headers + bodies wi = block: @@ -135,16 +135,6 @@ proc processStaged(buddy: FullBuddyRef): bool = except CatchableError as e: error "Storing persistent blocks failed", peer, range=($wi.blocks), error = $e.name, msg = e.msg - #except Defect as e: - # # Pass through - # raise e - #except Exception as e: - # # Notorious case where the `Chain` reference applied to - # # `persistBlocks()` has the compiler traced a possible `Exception` - # # (i.e. `ctx.chain` could be uninitialised.) - # error "Exception while storing persistent blocks", peer, - # range=($wi.blocks), error=($e.name), msg=e.msg - # raise (ref Defect)(msg: $e.name & ": " & e.msg) # Something went wrong. Recycle work item (needs to be re-fetched, anyway) let @@ -185,23 +175,23 @@ proc processStaged(buddy: FullBuddyRef): bool = proc setup*(ctx: FullCtxRef; tickerOK: bool): bool = ## Global set up - ctx.data.pivot = BestPivotCtxRef.init(ctx.data.rng) - if tickerOK: - ctx.data.ticker = TickerRef.init(ctx.tickerUpdater) - else: - debug "Ticker is disabled" + ctx.pool.pivot = BestPivotCtxRef.init(ctx.pool.rng) let rc = ctx.topUsedNumber(backBlocks = 0) if rc.isErr: - ctx.data.bCtx = BlockQueueCtxRef.init() + ctx.pool.bCtx = BlockQueueCtxRef.init() return false - ctx.data.bCtx = BlockQueueCtxRef.init(rc.value + 1) + ctx.pool.bCtx = BlockQueueCtxRef.init(rc.value + 1) + if tickerOK: + ctx.pool.ticker = TickerRef.init(ctx.tickerUpdater) + else: + debug "Ticker is disabled" true proc release*(ctx: FullCtxRef) = ## Global clean up - ctx.data.pivot = nil - if not ctx.data.ticker.isNil: - ctx.data.ticker.stop() + ctx.pool.pivot = nil + if not ctx.pool.ticker.isNil: + ctx.pool.ticker.stop() proc start*(buddy: FullBuddyRef): bool = ## Initialise worker peer @@ -210,20 +200,20 @@ proc start*(buddy: FullBuddyRef): bool = peer = buddy.peer if peer.supports(protocol.eth) and peer.state(protocol.eth).initialized: - if not ctx.data.ticker.isNil: - ctx.data.ticker.startBuddy() - buddy.data.pivot = - BestPivotWorkerRef.init(ctx.data.pivot, buddy.ctrl, buddy.peer) - buddy.data.bQueue = BlockQueueWorkerRef.init( - ctx.data.bCtx, buddy.ctrl, peer) + if not ctx.pool.ticker.isNil: + ctx.pool.ticker.startBuddy() + buddy.only.pivot = + BestPivotWorkerRef.init(ctx.pool.pivot, buddy.ctrl, buddy.peer) + buddy.only.bQueue = BlockQueueWorkerRef.init( + ctx.pool.bCtx, buddy.ctrl, peer) return true proc stop*(buddy: FullBuddyRef) = ## Clean up this peer buddy.ctrl.stopped = true - buddy.data.pivot.clear() - if not buddy.ctx.data.ticker.isNil: - buddy.ctx.data.ticker.stopBuddy() + buddy.only.pivot.clear() + if not buddy.ctx.pool.ticker.isNil: + buddy.ctx.pool.ticker.stopBuddy() # ------------------------------------------------------------------------------ # Public functions @@ -236,39 +226,39 @@ proc runDaemon*(ctx: FullCtxRef) {.async.} = ## as `true` not before there is some activity on the `runPool()`, ## `runSingle()`, or `runMulti()` functions. ## - case ctx.data.pivotState: + case ctx.pool.pivotState: of FirstPivotSeen: - let elapsed = Moment.now() - ctx.data.pivotStamp + let elapsed = Moment.now() - ctx.pool.pivotStamp if FirstPivotSeenTimeout < elapsed: # Switch to single peer pivot negotiation - ctx.data.pivot.pivotRelaxedMode(enable = true) + ctx.pool.pivot.pivotRelaxedMode(enable = true) # Currently no need for other monitor tasks ctx.daemon = false when extraTraceMessages: trace "First seen pivot timeout", elapsed, - pivotState=ctx.data.pivotState + pivotState=ctx.pool.pivotState return # Otherwise delay for some time of FirstPivotAccepted: - let elapsed = Moment.now() - ctx.data.pivotStamp + let elapsed = Moment.now() - ctx.pool.pivotStamp if FirstPivotAcceptedTimeout < elapsed: # Switch to single peer pivot negotiation - ctx.data.pivot.pivotRelaxedMode(enable = true) + ctx.pool.pivot.pivotRelaxedMode(enable = true) # Use currents pivot next time `runSingle()` is visited. This bent is # necessary as there must be a peer initialising and syncing blocks. But # this daemon has no peer assigned. - ctx.data.pivotState = FirstPivotUseRegardless + ctx.pool.pivotState = FirstPivotUseRegardless # Currently no need for other monitor tasks ctx.daemon = false when extraTraceMessages: trace "First accepted pivot timeout", elapsed, - pivotState=ctx.data.pivotState + pivotState=ctx.pool.pivotState return # Otherwise delay for some time @@ -297,17 +287,17 @@ proc runSingle*(buddy: FullBuddyRef) {.async.} = let ctx = buddy.ctx peer {.used.} = buddy.peer - bq = buddy.data.bQueue - pv = buddy.data.pivot + bq = buddy.only.bQueue + pv = buddy.only.pivot when extraTraceMessages: - trace "Single mode begin", peer, pivotState=ctx.data.pivotState + trace "Single mode begin", peer, pivotState=ctx.pool.pivotState - case ctx.data.pivotState: + case ctx.pool.pivotState: of PivotStateInitial: # Set initial state on first encounter - ctx.data.pivotState = FirstPivotSeen - ctx.data.pivotStamp = Moment.now() + ctx.pool.pivotState = FirstPivotSeen + ctx.pool.pivotStamp = Moment.now() ctx.daemon = true # Start monitor of FirstPivotSeen, FirstPivotAccepted: @@ -319,13 +309,13 @@ proc runSingle*(buddy: FullBuddyRef) {.async.} = if rc.isOK: # Update/activate `bestNumber` from the pivot header bq.bestNumber = some(rc.value.blockNumber) - ctx.data.pivotState = PivotRunMode + ctx.pool.pivotState = PivotRunMode buddy.ctrl.multiOk = true trace "Single pivot accepted", peer, pivot=('#' & $bq.bestNumber.get) return # stop logging, otherwise unconditional return for this case when extraTraceMessages: - trace "Single mode stopped", peer, pivotState=ctx.data.pivotState + trace "Single mode stopped", peer, pivotState=ctx.pool.pivotState return # unconditional return for this case of PivotRunMode: @@ -352,26 +342,26 @@ proc runSingle*(buddy: FullBuddyRef) {.async.} = # Negotiate in order to derive the pivot header from this `peer`. This code # location here is reached when there was no compelling reason for the # `case()` handler to process and `return`. - if await pv.pivotNegotiate(buddy.data.bQueue.bestNumber): + if await pv.pivotNegotiate(buddy.only.bQueue.bestNumber): # Update/activate `bestNumber` from the pivot header bq.bestNumber = some(pv.pivotHeader.value.blockNumber) - ctx.data.pivotState = PivotRunMode + ctx.pool.pivotState = PivotRunMode buddy.ctrl.multiOk = true trace "Pivot accepted", peer, pivot=('#' & $bq.bestNumber.get) return if buddy.ctrl.stopped: when extraTraceMessages: - trace "Single mode stopped", peer, pivotState=ctx.data.pivotState + trace "Single mode stopped", peer, pivotState=ctx.pool.pivotState return # done with this buddy var napping = 2.seconds - case ctx.data.pivotState: + case ctx.pool.pivotState: of FirstPivotSeen: # Possible state transition if pv.pivotHeader(relaxedMode=true).isOk: - ctx.data.pivotState = FirstPivotAccepted - ctx.data.pivotStamp = Moment.now() + ctx.pool.pivotState = FirstPivotAccepted + ctx.pool.pivotStamp = Moment.now() napping = 300.milliseconds of FirstPivotAccepted: napping = 300.milliseconds @@ -379,7 +369,7 @@ proc runSingle*(buddy: FullBuddyRef) {.async.} = discard when extraTraceMessages: - trace "Single mode end", peer, pivotState=ctx.data.pivotState, napping + trace "Single mode end", peer, pivotState=ctx.pool.pivotState, napping # Without waiting, this function repeats every 50ms (as set with the constant # `sync_sched.execLoopTimeElapsedMin`.) @@ -402,7 +392,7 @@ proc runPool*(buddy: FullBuddyRef; last: bool): bool = ## Note that this function does not run in `async` mode. ## # Mind the gap, fill in if necessary (function is peer independent) - buddy.data.bQueue.blockQueueGrout() + buddy.only.bQueue.blockQueueGrout() # Stop after running once regardless of peer buddy.ctx.poolMode = false @@ -417,7 +407,7 @@ proc runMulti*(buddy: FullBuddyRef) {.async.} = # Fetch work item let ctx {.used.} = buddy.ctx - bq = buddy.data.bQueue + bq = buddy.only.bQueue rc = await bq.blockQueueWorker() if rc.isErr: if rc.error == StagedQueueOverflow: diff --git a/nimbus/sync/snap.nim b/nimbus/sync/snap.nim index 6c0bd8117..a30e22749 100644 --- a/nimbus/sync/snap.nim +++ b/nimbus/sync/snap.nim @@ -119,9 +119,9 @@ proc init*( new result result.initSync(ethNode, chain, maxPeers, enableTicker) result.ctx.chain = chain # explicitely override - result.ctx.data.rng = rng - result.ctx.data.dbBackend = dbBackend - result.ctx.data.noRecovery = noRecovery + result.ctx.pool.rng = rng + result.ctx.pool.dbBackend = dbBackend + result.ctx.pool.noRecovery = noRecovery # Required to have been initialised via `addEthHandlerCapability()` doAssert not result.ctx.ethWireCtx.isNil diff --git a/nimbus/sync/snap/worker.nim b/nimbus/sync/snap/worker.nim index 076e3b9b1..1e1db4204 100644 --- a/nimbus/sync/snap/worker.nim +++ b/nimbus/sync/snap/worker.nim @@ -48,7 +48,7 @@ template noExceptionOops(info: static[string]; code: untyped) = # ------------------------------------------------------------------------------ proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} = - let recov = ctx.data.recovery + let recov = ctx.pool.recovery if recov.isNil: return false @@ -57,7 +57,7 @@ proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} = "#" & $recov.state.header.blockNumber & "(" & $recov.level & ")" topLevel = recov.level == 0 env = block: - let rc = ctx.data.pivotTable.eq recov.state.header.stateRoot + let rc = ctx.pool.pivotTable.eq recov.state.header.stateRoot if rc.isErr: error "Recovery pivot context gone", checkpoint, topLevel return false @@ -79,19 +79,19 @@ proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} = #when extraTraceMessages: # trace "Recovery done", checkpoint, topLevel return false - let rc = ctx.data.snapDb.recoverPivot(recov.state.predecessor) + let rc = ctx.pool.snapDb.recoverPivot(recov.state.predecessor) if rc.isErr: when extraTraceMessages: trace "Recovery stopped at pivot stale checkpoint", checkpoint, topLevel return false # Set up next level pivot checkpoint - ctx.data.recovery = SnapRecoveryRef( + ctx.pool.recovery = SnapRecoveryRef( state: rc.value, level: recov.level + 1) # Push onto pivot table and continue recovery (i.e. do not stop it yet) - ctx.data.pivotTable.reverseUpdate(ctx.data.recovery.state.header, ctx) + ctx.pool.pivotTable.reverseUpdate(ctx.pool.recovery.state.header, ctx) return true # continue recovery @@ -101,38 +101,38 @@ proc recoveryStepContinue(ctx: SnapCtxRef): Future[bool] {.async.} = proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool = ## Global set up - ctx.data.coveredAccounts = NodeTagRangeSet.init() + ctx.pool.coveredAccounts = NodeTagRangeSet.init() noExceptionOops("worker.setup()"): ctx.ethWireCtx.txPoolEnabled false ctx.chain.com.syncReqNewHead = ctx.pivotUpdateBeaconHeaderCB - ctx.data.snapDb = - if ctx.data.dbBackend.isNil: SnapDbRef.init(ctx.chain.db.db) - else: SnapDbRef.init(ctx.data.dbBackend) + ctx.pool.snapDb = + if ctx.pool.dbBackend.isNil: SnapDbRef.init(ctx.chain.db.db) + else: SnapDbRef.init(ctx.pool.dbBackend) if tickerOK: - ctx.data.ticker = TickerRef.init(ctx.data.pivotTable.tickerStats(ctx)) + ctx.pool.ticker = TickerRef.init(ctx.pool.pivotTable.tickerStats(ctx)) else: trace "Ticker is disabled" # Check for recovery mode - if not ctx.data.noRecovery: - let rc = ctx.data.snapDb.recoverPivot() + if not ctx.pool.noRecovery: + let rc = ctx.pool.snapDb.recoverPivot() if rc.isOk: - ctx.data.recovery = SnapRecoveryRef(state: rc.value) + ctx.pool.recovery = SnapRecoveryRef(state: rc.value) ctx.daemon = true # Set up early initial pivot - ctx.data.pivotTable.reverseUpdate(ctx.data.recovery.state.header, ctx) + ctx.pool.pivotTable.reverseUpdate(ctx.pool.recovery.state.header, ctx) trace "Recovery started", - checkpoint=("#" & $ctx.data.pivotTable.topNumber() & "(0)") - if not ctx.data.ticker.isNil: - ctx.data.ticker.startRecovery() + checkpoint=("#" & $ctx.pool.pivotTable.topNumber() & "(0)") + if not ctx.pool.ticker.isNil: + ctx.pool.ticker.startRecovery() true proc release*(ctx: SnapCtxRef) = ## Global clean up - if not ctx.data.ticker.isNil: - ctx.data.ticker.stop() - ctx.data.ticker = nil + if not ctx.pool.ticker.isNil: + ctx.pool.ticker.stop() + ctx.pool.ticker = nil noExceptionOops("worker.release()"): ctx.ethWireCtx.txPoolEnabled true ctx.chain.com.syncReqNewHead = nil @@ -145,16 +145,16 @@ proc start*(buddy: SnapBuddyRef): bool = if peer.supports(protocol.snap) and peer.supports(protocol.eth) and peer.state(protocol.eth).initialized: - buddy.data.errors = ComErrorStatsRef() - if not ctx.data.ticker.isNil: - ctx.data.ticker.startBuddy() + buddy.only.errors = ComErrorStatsRef() + if not ctx.pool.ticker.isNil: + ctx.pool.ticker.startBuddy() return true proc stop*(buddy: SnapBuddyRef) = ## Clean up this peer let ctx = buddy.ctx - if not ctx.data.ticker.isNil: - ctx.data.ticker.stopBuddy() + if not ctx.pool.ticker.isNil: + ctx.pool.ticker.stopBuddy() # ------------------------------------------------------------------------------ # Public functions @@ -163,15 +163,15 @@ proc stop*(buddy: SnapBuddyRef) = proc runDaemon*(ctx: SnapCtxRef) {.async.} = ## Enabled while `ctx.daemon` is `true` ## - if not ctx.data.recovery.isNil: + if not ctx.pool.recovery.isNil: if not await ctx.recoveryStepContinue(): # Done, stop recovery - ctx.data.recovery = nil + ctx.pool.recovery = nil ctx.daemon = false # Update logging - if not ctx.data.ticker.isNil: - ctx.data.ticker.stopRecovery() + if not ctx.pool.ticker.isNil: + ctx.pool.ticker.stopRecovery() proc runSingle*(buddy: SnapBuddyRef) {.async.} = @@ -191,12 +191,12 @@ proc runPool*(buddy: SnapBuddyRef, last: bool): bool = result = true # Clean up empty pivot slots (never the top one) - var rc = ctx.data.pivotTable.beforeLast + var rc = ctx.pool.pivotTable.beforeLast while rc.isOK: let (key, env) = (rc.value.key, rc.value.data) if env.fetchAccounts.processed.isEmpty: - ctx.data.pivotTable.del key - rc = ctx.data.pivotTable.prev(key) + ctx.pool.pivotTable.del key + rc = ctx.pool.pivotTable.prev(key) proc runMulti*(buddy: SnapBuddyRef) {.async.} = @@ -211,13 +211,13 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} = # Set up current state root environment for accounts snapshot let env = block: - let rc = ctx.data.pivotTable.lastValue + let rc = ctx.pool.pivotTable.lastValue if rc.isErr: return # nothing to do rc.value pivot = "#" & $env.stateHeader.blockNumber # for logging - buddy.data.pivotEnv = env + buddy.only.pivotEnv = env # Full sync processsing based on current snapshot # ----------------------------------------------- @@ -231,7 +231,7 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} = # If this is a new pivot, the previous one can be cleaned up. There is no # point in keeping some older space consuming state data any longer. - ctx.data.pivotTable.beforeTopMostlyClean() + ctx.pool.pivotTable.beforeTopMostlyClean() when extraTraceMessages: block: diff --git a/nimbus/sync/snap/worker/pivot.nim b/nimbus/sync/snap/worker/pivot.nim index 5b6721a90..c6b0ed278 100644 --- a/nimbus/sync/snap/worker/pivot.nim +++ b/nimbus/sync/snap/worker/pivot.nim @@ -60,7 +60,7 @@ proc init( # Initialise accounts range fetch batch, the pair of `fetchAccounts[]` range # sets. Deprioritise already processed ranges by moving it to the second set. - for iv in ctx.data.coveredAccounts.increasing: + for iv in ctx.pool.coveredAccounts.increasing: discard result.unprocessed[0].reduce iv discard result.unprocessed[1].merge iv @@ -134,7 +134,7 @@ proc tickerStats*( var aSum, aSqSum, uSum, uSqSum, sSum, sSqSum: float count = 0 - for kvp in ctx.data.pivotTable.nextPairs: + for kvp in ctx.pool.pivotTable.nextPairs: # Accounts mean & variance let aLen = kvp.data.nAccounts.float @@ -152,9 +152,9 @@ proc tickerStats*( sSum += sLen sSqSum += sLen * sLen let - env = ctx.data.pivotTable.lastValue.get(otherwise = nil) - accCoverage = (ctx.data.coveredAccounts.fullFactor + - ctx.data.covAccTimesFull.float) + env = ctx.pool.pivotTable.lastValue.get(otherwise = nil) + accCoverage = (ctx.pool.coveredAccounts.fullFactor + + ctx.pool.covAccTimesFull.float) accFill = meanStdDev(uSum, uSqSum, count) var beaconBlock = none(BlockNumber) @@ -165,13 +165,13 @@ proc tickerStats*( pivotBlock = some(env.stateHeader.blockNumber) procChunks = env.fetchAccounts.processed.chunks stoQuLen = some(env.storageQueueTotal()) - if 0 < ctx.data.beaconHeader.blockNumber: - beaconBlock = some(ctx.data.beaconHeader.blockNumber) + if 0 < ctx.pool.beaconHeader.blockNumber: + beaconBlock = some(ctx.pool.beaconHeader.blockNumber) SnapTickerStats( beaconBlock: beaconBlock, pivotBlock: pivotBlock, - nQueues: ctx.data.pivotTable.len, + nQueues: ctx.pool.pivotTable.len, nAccounts: meanStdDev(aSum, aSqSum, count), nSlotLists: meanStdDev(sSum, sSqSum, count), accountsFill: (accFill[0], accFill[1], accCoverage), @@ -273,7 +273,7 @@ proc saveCheckpoint*( if accountsSaveStorageSlotsMax < nStoQu: return err(TooManySlotAccounts) - ctx.data.snapDb.savePivot SnapDbPivotRegistry( + ctx.pool.snapDb.savePivot SnapDbPivotRegistry( header: env.stateHeader, nAccounts: env.nAccounts, nSlotLists: env.nSlotLists, @@ -294,7 +294,7 @@ proc recoverPivotFromCheckpoint*( ## `processed`, `unprocessed`, and the `fetchStorageFull` lists are ## initialised. ## - let recov = ctx.data.recovery + let recov = ctx.pool.recovery if recov.isNil: return @@ -306,7 +306,7 @@ proc recoverPivotFromCheckpoint*( if topLevel: env.fetchAccounts.unprocessed.reduce(minPt, maxPt) discard env.fetchAccounts.processed.merge(minPt, maxPt) - discard ctx.data.coveredAccounts.merge(minPt, maxPt) + discard ctx.pool.coveredAccounts.merge(minPt, maxPt) ctx.pivotAccountsCoverage100PcRollOver() # update coverage level roll over # Handle storage slots @@ -316,7 +316,7 @@ proc recoverPivotFromCheckpoint*( if 0 < env.fetchAccounts.processed.covered(pt): # Ignoring slots that have accounts to be downloaded, anyway - let rc = ctx.data.snapDb.getAccountsData(stateRoot, w) + let rc = ctx.pool.snapDb.getAccountsData(stateRoot, w) if rc.isErr: # Oops, how did that account get lost? discard env.fetchAccounts.processed.reduce pt @@ -342,12 +342,12 @@ proc pivotApprovePeer*(buddy: SnapBuddyRef) {.async.} = let ctx = buddy.ctx peer {.used.} = buddy.peer - beaconHeader = ctx.data.beaconHeader + beaconHeader = ctx.pool.beaconHeader var pivotHeader: BlockHeader block: - let rc = ctx.data.pivotTable.lastValue + let rc = ctx.pool.pivotTable.lastValue if rc.isOk: pivotHeader = rc.value.stateHeader @@ -356,7 +356,7 @@ proc pivotApprovePeer*(buddy: SnapBuddyRef) {.async.} = # If the entry before the previous entry is unused, then run a pool mode # based session (which should enable a pivot table purge). block: - let rc = ctx.data.pivotTable.beforeLast + let rc = ctx.pool.pivotTable.beforeLast if rc.isOk and rc.value.data.fetchAccounts.processed.isEmpty: ctx.poolMode = true @@ -365,7 +365,7 @@ proc pivotApprovePeer*(buddy: SnapBuddyRef) {.async.} = pivot=("#" & $pivotHeader.blockNumber), beacon=("#" & $beaconHeader.blockNumber), poolMode=ctx.poolMode - discard ctx.data.pivotTable.lruAppend( + discard ctx.pool.pivotTable.lruAppend( beaconHeader.stateRoot, SnapPivotRef.init(ctx, beaconHeader), pivotTableLruEntriesMax) @@ -380,10 +380,10 @@ proc pivotUpdateBeaconHeaderCB*(ctx: SnapCtxRef): SyncReqNewHeadCB = ## Update beacon header. This function is intended as a call back function ## for the RPC module. result = proc(h: BlockHeader) {.gcsafe.} = - if ctx.data.beaconHeader.blockNumber < h.blockNumber: + if ctx.pool.beaconHeader.blockNumber < h.blockNumber: # when extraTraceMessages: # trace "External beacon info update", header=("#" & $h.blockNumber) - ctx.data.beaconHeader = h + ctx.pool.beaconHeader = h # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/pivot/heal_accounts.nim b/nimbus/sync/snap/worker/pivot/heal_accounts.nim index 793b0cd7e..e30fbfb3c 100644 --- a/nimbus/sync/snap/worker/pivot/heal_accounts.nim +++ b/nimbus/sync/snap/worker/pivot/heal_accounts.nim @@ -86,7 +86,7 @@ proc healingCtx( "pivot=" & "#" & $env.stateHeader.blockNumber & "," & "nAccounts=" & $env.nAccounts & "," & ("covered=" & $env.fetchAccounts.processed & "/" & - $ctx.data.coveredAccounts ) & "}" + $ctx.pool.coveredAccounts ) & "}" # ------------------------------------------------------------------------------ # Private helpers @@ -118,7 +118,7 @@ proc compileMissingNodesList( ctx = buddy.ctx peer {.used.} = buddy.peer rootKey = env.stateHeader.stateRoot.to(NodeKey) - getFn = ctx.data.snapDb.getAccountFn + getFn = ctx.pool.snapDb.getAccountFn fa {.used.} = env.fetchAccounts # Import from earlier run @@ -171,7 +171,7 @@ proc fetchMissingNodes( let rc = await buddy.getTrieNodes(stateRoot, pathList, pivot) if rc.isOk: # Reset error counts for detecting repeated timeouts, network errors, etc. - buddy.data.errors.resetComError() + buddy.only.errors.resetComError() # Forget about unfetched missing nodes, will be picked up later return rc.value.nodes.mapIt(NodeSpecs( @@ -182,7 +182,7 @@ proc fetchMissingNodes( # Process error ... let error = rc.error - ok = await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors) + ok = await buddy.ctrl.stopAfterSeriousComError(error, buddy.only.errors) when extraTraceMessages: if ok: trace logTxt "fetch nodes error => stop", peer, @@ -241,7 +241,7 @@ proc registerAccountLeaf( if 0 < env.fetchAccounts.processed.merge(pt,pt) : env.nAccounts.inc env.fetchAccounts.unprocessed.reduce(pt,pt) - discard buddy.ctx.data.coveredAccounts.merge(pt,pt) + discard buddy.ctx.pool.coveredAccounts.merge(pt,pt) # Update storage slots batch if acc.storageRoot != emptyRlpHash: @@ -260,7 +260,7 @@ proc accountsHealingImpl( ## number of nodes fetched from the network, and -1 upon error. let ctx = buddy.ctx - db = ctx.data.snapDb + db = ctx.pool.snapDb peer = buddy.peer # Import from earlier runs (if any) diff --git a/nimbus/sync/snap/worker/pivot/heal_storage_slots.nim b/nimbus/sync/snap/worker/pivot/heal_storage_slots.nim index c73b45a5a..52277d2c1 100644 --- a/nimbus/sync/snap/worker/pivot/heal_storage_slots.nim +++ b/nimbus/sync/snap/worker/pivot/heal_storage_slots.nim @@ -130,7 +130,7 @@ proc compileMissingNodesList( peer {.used.} = buddy.peer slots = kvp.data.slots rootKey = kvp.key.to(NodeKey) - getFn = ctx.data.snapDb.getStorageSlotsFn(kvp.data.accKey) + getFn = ctx.pool.snapDb.getStorageSlotsFn(kvp.data.accKey) if not slots.processed.isFull: noExceptionOops("compileMissingNodesList"): @@ -177,7 +177,7 @@ proc getNodesFromNetwork( rc = await buddy.getTrieNodes(storageRoot, @[req], pivot) if rc.isOk: # Reset error counts for detecting repeated timeouts, network errors, etc. - buddy.data.errors.resetComError() + buddy.only.errors.resetComError() return rc.value.nodes.mapIt(NodeSpecs( partialPath: it.partialPath, @@ -185,7 +185,7 @@ proc getNodesFromNetwork( data: it.data)) let error = rc.error - if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): + if await buddy.ctrl.stopAfterSeriousComError(error, buddy.only.errors): when extraTraceMessages: trace logTxt "fetch nodes error => stop", peer, ctx=buddy.healingCtx(kvp,env), error @@ -217,7 +217,7 @@ proc storageSlotsHealing( ## `false` if there are nodes left to be completed. let ctx = buddy.ctx - db = ctx.data.snapDb + db = ctx.pool.snapDb peer = buddy.peer missing = buddy.compileMissingNodesList(kvp, env) diff --git a/nimbus/sync/snap/worker/pivot/range_fetch_accounts.nim b/nimbus/sync/snap/worker/pivot/range_fetch_accounts.nim index b35b3f1b3..9a5efb2e5 100644 --- a/nimbus/sync/snap/worker/pivot/range_fetch_accounts.nim +++ b/nimbus/sync/snap/worker/pivot/range_fetch_accounts.nim @@ -113,7 +113,7 @@ proc accountsRangefetchImpl( let ctx = buddy.ctx peer = buddy.peer - db = ctx.data.snapDb + db = ctx.pool.snapDb fa = env.fetchAccounts stateRoot = env.stateHeader.stateRoot @@ -134,7 +134,7 @@ proc accountsRangefetchImpl( if rc.isErr: fa.unprocessed.merge iv # fail => interval back to pool let error = rc.error - if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): + if await buddy.ctrl.stopAfterSeriousComError(error, buddy.only.errors): when extraTraceMessages: trace logTxt "fetch error", peer, ctx=buddy.fetchCtx(env), reqLen=iv.len, error @@ -142,7 +142,7 @@ proc accountsRangefetchImpl( rc.value # Reset error counts for detecting repeated timeouts, network errors, etc. - buddy.data.errors.resetComError() + buddy.only.errors.resetComError() let gotAccounts = dd.data.accounts.len # comprises `gotStorage` @@ -188,7 +188,7 @@ proc accountsRangefetchImpl( fa.unprocessed.reduce w # Register consumed intervals on the accumulators over all state roots. discard fa.processed.merge w - discard ctx.data.coveredAccounts.merge w + discard ctx.pool.coveredAccounts.merge w ctx.pivotAccountsCoverage100PcRollOver() # update coverage level roll over # Register accounts with storage slots on the storage TODO list. diff --git a/nimbus/sync/snap/worker/pivot/range_fetch_storage_slots.nim b/nimbus/sync/snap/worker/pivot/range_fetch_storage_slots.nim index 2616a065d..31d0f6023 100644 --- a/nimbus/sync/snap/worker/pivot/range_fetch_storage_slots.nim +++ b/nimbus/sync/snap/worker/pivot/range_fetch_storage_slots.nim @@ -124,20 +124,20 @@ proc storeStoragesSingleBatch( let rc = await buddy.getStorageRanges(stateRoot, req, pivot) if rc.isErr: let error = rc.error - if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): + if await buddy.ctrl.stopAfterSeriousComError(error, buddy.only.errors): trace logTxt "fetch error => stop", peer, ctx=buddy.fetchCtx(env), nReq=req.len, error return false # all of `req` failed rc.value # Reset error counts for detecting repeated timeouts, network errors, etc. - buddy.data.errors.resetComError() + buddy.only.errors.resetComError() var gotSlotLists = stoRange.data.storages.len if 0 < gotSlotLists: # Verify/process storages data and save it to disk - let report = ctx.data.snapDb.importStorageSlots( + let report = ctx.pool.snapDb.importStorageSlots( peer, stoRange.data, noBaseBoundCheck = true) if 0 < report.len: diff --git a/nimbus/sync/snap/worker/pivot/storage_queue_helper.nim b/nimbus/sync/snap/worker/pivot/storage_queue_helper.nim index 272f175e8..bc44e3114 100644 --- a/nimbus/sync/snap/worker/pivot/storage_queue_helper.nim +++ b/nimbus/sync/snap/worker/pivot/storage_queue_helper.nim @@ -215,7 +215,7 @@ proc storageQueueFetchFull*( noExceptionOops("getNextSlotItemsFull"): for kvp in env.fetchStorageFull.nextPairs: let - getFn = ctx.data.snapDb.getStorageSlotsFn kvp.data.accKey + getFn = ctx.pool.snapDb.getStorageSlotsFn kvp.data.accKey rootKey = kvp.key.to(NodeKey) accItem = AccountSlotsHeader( accKey: kvp.data.accKey, diff --git a/nimbus/sync/snap/worker/pivot/swap_in.nim b/nimbus/sync/snap/worker/pivot/swap_in.nim index 85c02cbfb..ded88e374 100644 --- a/nimbus/sync/snap/worker/pivot/swap_in.nim +++ b/nimbus/sync/snap/worker/pivot/swap_in.nim @@ -259,9 +259,9 @@ proc swapInAccounts*( let pivot {.used.} = "#" & $env.stateHeader.blockNumber # Logging & debugging rootKey = env.stateHeader.stateRoot.to(NodeKey) - getFn = ctx.data.snapDb.getAccountFn + getFn = ctx.pool.snapDb.getAccountFn - others = toSeq(ctx.data.pivotTable.nextPairs) + others = toSeq(ctx.pool.pivotTable.nextPairs) # Swap in from mothballed pivots different from the current one .filterIt(it.data.archived and it.key.to(NodeKey) != rootKey) diff --git a/nimbus/sync/snap/worker_desc.nim b/nimbus/sync/snap/worker_desc.nim index 1d8e354ad..128c967f7 100644 --- a/nimbus/sync/snap/worker_desc.nim +++ b/nimbus/sync/snap/worker_desc.nim @@ -129,15 +129,15 @@ proc hash*(a: Hash256): Hash = proc pivotAccountsCoverage*(ctx: SnapCtxRef): float = ## Returns the accounts coverage factor - ctx.data.coveredAccounts.fullFactor + ctx.data.covAccTimesFull.float + ctx.pool.coveredAccounts.fullFactor + ctx.pool.covAccTimesFull.float proc pivotAccountsCoverage100PcRollOver*(ctx: SnapCtxRef) = ## Roll over `coveredAccounts` registry when it reaches 100%. - if ctx.data.coveredAccounts.isFull: + if ctx.pool.coveredAccounts.isFull: # All of accounts hashes are covered by completed range fetch processes # for all pivot environments. So reset covering and record full-ness level. - ctx.data.covAccTimesFull.inc - ctx.data.coveredAccounts.clear() + ctx.pool.covAccTimesFull.inc + ctx.pool.coveredAccounts.clear() # ------------------------------------------------------------------------------ # Public helpers: SnapTodoRanges diff --git a/nimbus/sync/sync_desc.nim b/nimbus/sync/sync_desc.nim index 205abf4b4..6c2dbdeb1 100644 --- a/nimbus/sync/sync_desc.nim +++ b/nimbus/sync/sync_desc.nim @@ -42,7 +42,7 @@ type ctx*: CtxRef[S] ## Shared data descriptor back reference peer*: Peer ## Reference to eth p2pProtocol entry ctrl*: BuddyCtrlRef ## Control and state settings - data*: W ## Worker peer specific data + only*: W ## Worker peer specific data CtxRef*[S] = ref object ## Shared state among all syncing peer workers (aka buddies.) @@ -51,7 +51,7 @@ type chain*: ChainRef ## Block chain database (no need for `Peer`) poolMode*: bool ## Activate `runPool()` workers if set `true` daemon*: bool ## Enable global background job - data*: S ## Shared context for all worker peers + pool*: S ## Shared context for all worker peers # ------------------------------------------------------------------------------ # Public functions diff --git a/tests/test_sync_snap.nim b/tests/test_sync_snap.nim index 2525bc3b6..d3a8727aa 100644 --- a/tests/test_sync_snap.nim +++ b/tests/test_sync_snap.nim @@ -178,7 +178,7 @@ proc testDbs( else: result.dbDir = workDir / "tmp" if result.persistent: - result.dbDir.flushDbDir + workDir.flushDbDir(subDir) for n in 0 ..< min(result.cdb.len, instances): result.cdb[n] = (result.dbDir / $n).newChainDB @@ -510,7 +510,7 @@ when isMainModule: #setTraceLevel() setErrorLevel() - # Test constant, calculations etc. + # Test constants, calculations etc. when true: # and false: noisy.miscRunner() @@ -546,7 +546,8 @@ when isMainModule: false.storagesRunner(persistent=true, sam) # This one uses the readily available dump: `bulkTest0` and some huge replay - # dumps `bulkTest1`, `bulkTest2`, .. from the `nimbus-eth1-blobs` package + # dumps `bulkTest1`, `bulkTest2`, .. from the `nimbus-eth1-blobs` package. + # For specs see `tests/test_sync_snap/bulk_test_xx.nim`. when true and false: # ---- database storage timings -------