Simplify beacon stream pivot update (#1435)
* Simplify pivot update why: No need to fetch the pivot header from the network when it can be be made available in the ivot cache also: Keep `txPool` update disabled while syncing * Cosmetics, tune down some logging noise * Support `snap/1` without `eth/6?` why: Eth is not needed here. * Snap is an (optional) extension of `eth` so: It it must be supported somehow. Nevertheless it will be currently unused in the snap syncer.
This commit is contained in:
parent
a1163b4ade
commit
30135ab1ef
|
@ -34,7 +34,7 @@ type
|
||||||
current: BlockNumber
|
current: BlockNumber
|
||||||
highest: BlockNumber
|
highest: BlockNumber
|
||||||
|
|
||||||
SyncReqNewHeadCB* = proc(number: BlockNumber; hash: Hash256) {.gcsafe.}
|
SyncReqNewHeadCB* = proc(header: BlockHeader) {.gcsafe.}
|
||||||
## Update head for syncing
|
## Update head for syncing
|
||||||
|
|
||||||
CommonRef* = ref object
|
CommonRef* = ref object
|
||||||
|
@ -362,10 +362,10 @@ proc initializeEmptyDb*(com: CommonRef)
|
||||||
com.consensusType == ConsensusType.POS)
|
com.consensusType == ConsensusType.POS)
|
||||||
doAssert(canonicalHeadHashKey().toOpenArray in trieDB)
|
doAssert(canonicalHeadHashKey().toOpenArray in trieDB)
|
||||||
|
|
||||||
proc syncReqNewHead*(com: CommonRef; number: BlockNumber; hash: Hash256) =
|
proc syncReqNewHead*(com: CommonRef; header: BlockHeader) =
|
||||||
## Used by RPC to update the beacon head for snap sync
|
## Used by RPC to update the beacon head for snap sync
|
||||||
if not com.syncReqNewHead.isNil:
|
if not com.syncReqNewHead.isNil:
|
||||||
com.syncReqNewHead(number, hash)
|
com.syncReqNewHead(header)
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
# Getters
|
# Getters
|
||||||
|
|
|
@ -250,7 +250,7 @@ proc setupEngineApi*(
|
||||||
hash = blockHash
|
hash = blockHash
|
||||||
|
|
||||||
# Update sync header (if any)
|
# Update sync header (if any)
|
||||||
com.syncReqNewHead(header.blockNumber, blockHash)
|
com.syncReqNewHead(header)
|
||||||
|
|
||||||
return simpleFCU(PayloadExecutionStatus.syncing)
|
return simpleFCU(PayloadExecutionStatus.syncing)
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,7 @@ import
|
||||||
../../common as nimcom,
|
../../common as nimcom,
|
||||||
../../db/select_backend,
|
../../db/select_backend,
|
||||||
../../utils/prettify,
|
../../utils/prettify,
|
||||||
".."/[protocol, sync_desc],
|
".."/[handlers, protocol, sync_desc],
|
||||||
./worker/[pivot, ticker],
|
./worker/[pivot, ticker],
|
||||||
./worker/com/com_error,
|
./worker/com/com_error,
|
||||||
./worker/db/[hexary_desc, snapdb_desc, snapdb_pivot],
|
./worker/db/[hexary_desc, snapdb_desc, snapdb_pivot],
|
||||||
|
@ -106,6 +106,7 @@ proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool =
|
||||||
## Global set up
|
## Global set up
|
||||||
ctx.data.coveredAccounts = NodeTagRangeSet.init()
|
ctx.data.coveredAccounts = NodeTagRangeSet.init()
|
||||||
noExceptionOops("worker.setup()"):
|
noExceptionOops("worker.setup()"):
|
||||||
|
ctx.ethWireCtx.txPoolEnabled false
|
||||||
ctx.chain.com.syncReqNewHead = ctx.pivotUpdateBeaconHeaderCB
|
ctx.chain.com.syncReqNewHead = ctx.pivotUpdateBeaconHeaderCB
|
||||||
ctx.data.snapDb =
|
ctx.data.snapDb =
|
||||||
if ctx.data.dbBackend.isNil: SnapDbRef.init(ctx.chain.db.db)
|
if ctx.data.dbBackend.isNil: SnapDbRef.init(ctx.chain.db.db)
|
||||||
|
@ -135,6 +136,8 @@ proc release*(ctx: SnapCtxRef) =
|
||||||
if not ctx.data.ticker.isNil:
|
if not ctx.data.ticker.isNil:
|
||||||
ctx.data.ticker.stop()
|
ctx.data.ticker.stop()
|
||||||
ctx.data.ticker = nil
|
ctx.data.ticker = nil
|
||||||
|
noExceptionOops("worker.release()"):
|
||||||
|
ctx.ethWireCtx.txPoolEnabled true
|
||||||
ctx.chain.com.syncReqNewHead = nil
|
ctx.chain.com.syncReqNewHead = nil
|
||||||
|
|
||||||
proc start*(buddy: SnapBuddyRef): bool =
|
proc start*(buddy: SnapBuddyRef): bool =
|
||||||
|
|
|
@ -16,7 +16,6 @@ import
|
||||||
stew/[interval_set, keyed_queue, sorted_set],
|
stew/[interval_set, keyed_queue, sorted_set],
|
||||||
../../sync_desc,
|
../../sync_desc,
|
||||||
".."/[constants, range_desc, worker_desc],
|
".."/[constants, range_desc, worker_desc],
|
||||||
./com/get_block_header,
|
|
||||||
./db/[hexary_error, snapdb_accounts, snapdb_pivot],
|
./db/[hexary_error, snapdb_accounts, snapdb_pivot],
|
||||||
./pivot/[heal_accounts, heal_storage_slots,
|
./pivot/[heal_accounts, heal_storage_slots,
|
||||||
range_fetch_accounts, range_fetch_storage_slots,
|
range_fetch_accounts, range_fetch_storage_slots,
|
||||||
|
@ -166,8 +165,8 @@ proc tickerStats*(
|
||||||
pivotBlock = some(env.stateHeader.blockNumber)
|
pivotBlock = some(env.stateHeader.blockNumber)
|
||||||
procChunks = env.fetchAccounts.processed.chunks
|
procChunks = env.fetchAccounts.processed.chunks
|
||||||
stoQuLen = some(env.storageQueueTotal())
|
stoQuLen = some(env.storageQueueTotal())
|
||||||
if 0 < ctx.data.beaconNumber:
|
if 0 < ctx.data.beaconHeader.blockNumber:
|
||||||
beaconBlock = some(ctx.data.beaconNumber)
|
beaconBlock = some(ctx.data.beaconHeader.blockNumber)
|
||||||
|
|
||||||
TickerStats(
|
TickerStats(
|
||||||
beaconBlock: beaconBlock,
|
beaconBlock: beaconBlock,
|
||||||
|
@ -343,102 +342,48 @@ proc pivotApprovePeer*(buddy: SnapBuddyRef) {.async.} =
|
||||||
let
|
let
|
||||||
ctx = buddy.ctx
|
ctx = buddy.ctx
|
||||||
peer = buddy.peer
|
peer = buddy.peer
|
||||||
|
beaconHeader = ctx.data.beaconHeader
|
||||||
# Cache beacon header data, iyt might change while waiting for a peer
|
|
||||||
# reply message
|
|
||||||
beaconNumber = ctx.data.beaconNumber
|
|
||||||
beaconHash = ctx.data.beaconHash
|
|
||||||
|
|
||||||
var
|
var
|
||||||
pivotHeader: BlockHeader
|
pivotHeader: BlockHeader
|
||||||
pivotNumber: BlockNumber
|
|
||||||
peerNumber: BlockNumber # for error message
|
|
||||||
|
|
||||||
block:
|
block:
|
||||||
let rc = ctx.data.pivotTable.lastValue
|
let rc = ctx.data.pivotTable.lastValue
|
||||||
if rc.isOk:
|
if rc.isOk:
|
||||||
pivotHeader = rc.value.stateHeader
|
pivotHeader = rc.value.stateHeader
|
||||||
pivotNumber = pivotHeader.blockNumber
|
|
||||||
|
|
||||||
# Check whether the pivot needs to be updated
|
# Check whether the pivot needs to be updated
|
||||||
if pivotNumber + pivotBlockDistanceMin < beaconNumber:
|
if pivotHeader.blockNumber + pivotBlockDistanceMin < beaconHeader.blockNumber:
|
||||||
let rc = await buddy.getBlockHeader(beaconHash)
|
|
||||||
if rc.isErr:
|
|
||||||
buddy.ctrl.zombie = true
|
|
||||||
return
|
|
||||||
|
|
||||||
# Sanity check => append a new pivot environment
|
|
||||||
if rc.value.blockNumber == beaconNumber:
|
|
||||||
|
|
||||||
# If the entry before the previous entry is unused, then run a pool mode
|
# If the entry before the previous entry is unused, then run a pool mode
|
||||||
# based session (which should enable a pivot table purge).
|
# based session (which should enable a pivot table purge).
|
||||||
block:
|
block:
|
||||||
let rx = ctx.data.pivotTable.beforeLast
|
let rc = ctx.data.pivotTable.beforeLast
|
||||||
if rx.isOk and rx.value.data.fetchAccounts.processed.isEmpty:
|
if rc.isOk and rc.value.data.fetchAccounts.processed.isEmpty:
|
||||||
ctx.poolMode = true
|
ctx.poolMode = true
|
||||||
|
|
||||||
when extraTraceMessages:
|
when extraTraceMessages:
|
||||||
trace "New pivot from beacon chein", peer, pivotNumber,
|
trace "New pivot from beacon chain", peer,
|
||||||
beaconNumber, beaconHash, poolMode=ctx.poolMode
|
pivot=("#" & $pivotHeader.blockNumber),
|
||||||
|
beacon=("#" & $beaconHeader.blockNumber), poolMode=ctx.poolMode
|
||||||
|
|
||||||
discard ctx.data.pivotTable.lruAppend(
|
discard ctx.data.pivotTable.lruAppend(
|
||||||
rc.value.stateRoot, SnapPivotRef.init(ctx, rc.value),
|
beaconHeader.stateRoot, SnapPivotRef.init(ctx, beaconHeader),
|
||||||
pivotTableLruEntriesMax)
|
pivotTableLruEntriesMax)
|
||||||
|
|
||||||
# Done, buddy runs on updated environment. It has proved already that it
|
pivotHeader = beaconHeader
|
||||||
# supports the current header.
|
|
||||||
return
|
|
||||||
|
|
||||||
# Header/block number mismatch => process at the end of function
|
# Not ready yet?
|
||||||
peerNumber = rc.value.blockNumber
|
if pivotHeader.blockNumber == 0:
|
||||||
|
|
||||||
elif 0 < pivotNumber:
|
|
||||||
# So there was no reason to update update then pivot. Verify that
|
|
||||||
# the pivot header can be fetched from peer.
|
|
||||||
let
|
|
||||||
pivotHash = pivotHeader.hash
|
|
||||||
rc = await buddy.getBlockHeader(pivotHash)
|
|
||||||
if rc.isErr:
|
|
||||||
when extraTraceMessages:
|
|
||||||
trace "Header unsupported by peer", peer, pivotNumber, pivotHash
|
|
||||||
buddy.ctrl.zombie = true
|
|
||||||
return
|
|
||||||
|
|
||||||
# Sanity check
|
|
||||||
if rc.value.blockNumber == beaconNumber:
|
|
||||||
return
|
|
||||||
|
|
||||||
# Header/block number mismatch => process at the end of function
|
|
||||||
peerNumber = rc.value.blockNumber
|
|
||||||
|
|
||||||
else:
|
|
||||||
# Not ready yet for initialising. Currently there is no pivot, at all
|
|
||||||
when extraTraceMessages:
|
|
||||||
trace "Beacon chain header not ready yet", peer
|
|
||||||
buddy.ctrl.stopped = true
|
|
||||||
return
|
|
||||||
|
|
||||||
# Header/block number mismatch
|
|
||||||
trace "Oops, cannot approve peer due to header mismatch", peer, peerNumber,
|
|
||||||
beaconNumber, hash=beaconHash
|
|
||||||
|
|
||||||
# Reset cache
|
|
||||||
ctx.data.beaconNumber = 0.u256
|
|
||||||
ctx.data.beaconHash = Hash256.default
|
|
||||||
|
|
||||||
# See you later :)
|
|
||||||
buddy.ctrl.stopped = true
|
buddy.ctrl.stopped = true
|
||||||
|
|
||||||
|
|
||||||
proc pivotUpdateBeaconHeaderCB*(ctx: SnapCtxRef): SyncReqNewHeadCB =
|
proc pivotUpdateBeaconHeaderCB*(ctx: SnapCtxRef): SyncReqNewHeadCB =
|
||||||
## Update beacon header. This function is intended as a call back function
|
## Update beacon header. This function is intended as a call back function
|
||||||
## for the RPC module.
|
## for the RPC module.
|
||||||
result = proc(number: BlockNumber; hash: Hash256) {.gcsafe.} =
|
result = proc(h: BlockHeader) {.gcsafe.} =
|
||||||
if ctx.data.beaconNumber < number:
|
if ctx.data.beaconHeader.blockNumber < h.blockNumber:
|
||||||
when extraTraceMessages:
|
# when extraTraceMessages:
|
||||||
trace "External beacon info update", number, hash
|
# trace "External beacon info update", header=("#" & $h.blockNumber)
|
||||||
ctx.data.beaconNumber = number
|
ctx.data.beaconHeader = h
|
||||||
ctx.data.beaconHash = hash
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------------------
|
# ------------------------------------------------------------------------------
|
||||||
# End
|
# End
|
||||||
|
|
|
@ -123,8 +123,8 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} =
|
||||||
tickerLogSuppressMax < (now - t.visited):
|
tickerLogSuppressMax < (now - t.visited):
|
||||||
var
|
var
|
||||||
nAcc, nSto, bulk: string
|
nAcc, nSto, bulk: string
|
||||||
pivot = "n/a"
|
pv = "n/a"
|
||||||
beacon = "n/a"
|
bc = "n/a"
|
||||||
nStoQue = "n/a"
|
nStoQue = "n/a"
|
||||||
let
|
let
|
||||||
recoveryDone = t.lastRecov
|
recoveryDone = t.lastRecov
|
||||||
|
@ -132,7 +132,7 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} =
|
||||||
"(" & data.accountsFill[1].pc99 & ")" &
|
"(" & data.accountsFill[1].pc99 & ")" &
|
||||||
"/" & data.accountsFill[2].pc99 &
|
"/" & data.accountsFill[2].pc99 &
|
||||||
"~" & data.nAccountStats.uint.toSI
|
"~" & data.nAccountStats.uint.toSI
|
||||||
buddies = t.nBuddies
|
nInst = t.nBuddies
|
||||||
|
|
||||||
# With `int64`, there are more than 29*10^10 years range for seconds
|
# With `int64`, there are more than 29*10^10 years range for seconds
|
||||||
up = (now - t.started).seconds.uint64.toSI
|
up = (now - t.started).seconds.uint64.toSI
|
||||||
|
@ -144,9 +144,9 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} =
|
||||||
|
|
||||||
noFmtError("runLogTicker"):
|
noFmtError("runLogTicker"):
|
||||||
if data.pivotBlock.isSome:
|
if data.pivotBlock.isSome:
|
||||||
pivot = &"#{data.pivotBlock.get}/{data.nQueues}"
|
pv = &"#{data.pivotBlock.get}/{data.nQueues}"
|
||||||
if data.beaconBlock.isSome:
|
if data.beaconBlock.isSome:
|
||||||
beacon = &"#{data.beaconBlock.get}"
|
bc = &"#{data.beaconBlock.get}"
|
||||||
nAcc = (&"{(data.nAccounts[0]+0.5).int64}" &
|
nAcc = (&"{(data.nAccounts[0]+0.5).int64}" &
|
||||||
&"({(data.nAccounts[1]+0.5).int64})")
|
&"({(data.nAccounts[1]+0.5).int64})")
|
||||||
nSto = (&"{(data.nSlotLists[0]+0.5).int64}" &
|
nSto = (&"{(data.nSlotLists[0]+0.5).int64}" &
|
||||||
|
@ -157,13 +157,13 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} =
|
||||||
|
|
||||||
if t.recovery:
|
if t.recovery:
|
||||||
info "Snap sync statistics (recovery)",
|
info "Snap sync statistics (recovery)",
|
||||||
up, buddies, beacon, pivot, nAcc, accCov, nSto, nStoQue, mem
|
up, nInst, bc, pv, nAcc, accCov, nSto, nStoQue, mem
|
||||||
elif recoveryDone:
|
elif recoveryDone:
|
||||||
info "Snap sync statistics (recovery done)",
|
info "Snap sync statistics (recovery done)",
|
||||||
up, buddies, beacon, pivot, nAcc, accCov, nSto, nStoQue, mem
|
up, nInst, bc, pv, nAcc, accCov, nSto, nStoQue, mem
|
||||||
else:
|
else:
|
||||||
info "Snap sync statistics",
|
info "Snap sync statistics",
|
||||||
up, buddies, beacon, pivot, nAcc, accCov, nSto, nStoQue, mem
|
up, nInst, bc, pv, nAcc, accCov, nSto, nStoQue, mem
|
||||||
|
|
||||||
t.setLogTicker(Moment.fromNow(tickerLogInterval))
|
t.setLogTicker(Moment.fromNow(tickerLogInterval))
|
||||||
|
|
||||||
|
|
|
@ -96,8 +96,7 @@ type
|
||||||
|
|
||||||
# Pivot table
|
# Pivot table
|
||||||
pivotTable*: SnapPivotTable ## Per state root environment
|
pivotTable*: SnapPivotTable ## Per state root environment
|
||||||
beaconNumber*: BlockNumber ## Running on beacon chain
|
beaconHeader*: BlockHeader ## Running on beacon chain
|
||||||
beaconHash*: Hash256 ## Ditto
|
|
||||||
coveredAccounts*: NodeTagRangeSet ## Derived from all available accounts
|
coveredAccounts*: NodeTagRangeSet ## Derived from all available accounts
|
||||||
covAccTimesFull*: uint ## # of 100% coverages
|
covAccTimesFull*: uint ## # of 100% coverages
|
||||||
recovery*: SnapRecoveryRef ## Current recovery checkpoint/context
|
recovery*: SnapRecoveryRef ## Current recovery checkpoint/context
|
||||||
|
|
Loading…
Reference in New Issue