Update snap client storage slots download and healing (#1529)

* Fix fringe condition for `GetStorageRanges` message handler

why:
  Receiving a proved empty range was not considered at all. This lead to
  inconsistencies of the return value which led to subsequent errors.

* Update storage range bulk download

details;
  Mainly re-org of storage queue processing in `storage_queue_helper.nim`

* Update logging variables/messages

* Update storage slots healing

details:
  Mainly clean up after improved helper functions from the sources
  `find_missing_nodes.nim` and `storage_queue_helper.nim`.

* Simplify account fetch

why:
  To much fuss made tolerating some errors. There will be an overall
  strategy implemented where the concert of download and healing function
  is orchestrated.

* Add error resilience to the concert of download and healing.

why:
  The idea is that a peer might stop serving snap/1 accounts and storage
  slot downloads while still able to support fetching nodes for healing.
This commit is contained in:
Jordan Hrycaj 2023-04-04 14:36:18 +01:00 committed by GitHub
parent 9453d5bb3c
commit 5e865edec0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 959 additions and 590 deletions

View File

@ -123,7 +123,7 @@ proc getSlotsSpecs(
# Ignore missing account entry
if accData.len == 0:
when extraTraceMessages:
trace logTxt "getSlotsSpecs: no such account", accKey
trace logTxt "getSlotsSpecs: no such account", accKey, rootKey
return err()
# Ignore empty storage list
@ -169,7 +169,8 @@ iterator doTrieNodeSpecs(
# Fail on this group
when extraTraceMessages:
trace logTxt "doTrieNodeSpecs (blind)", nBlind=w.slotPaths.len
trace logTxt "doTrieNodeSpecs (blind)", accPath=w.accPath.toHex,
nBlind=w.slotPaths.len, nBlind0=w.slotPaths[0].toHex
yield (NodeKey.default, nil, EmptyBlob, w.slotPaths.len)
@ -416,14 +417,9 @@ method getStorageRanges*(
dataAllocated += rangeProof.leafsSize
when extraTraceMessages:
if accounts.len == 1:
trace logTxt "getStorageRanges: single account", iv,
accKey=accHash.to(NodeKey), stoRoot=sp.stoRoot
#when extraTraceMessages:
# trace logTxt "getStorageRanges: data slots", iv, sizeMax, dataAllocated,
# accKey, stoRoot, nSlots=rangeProof.leafs.len,
# nProof=rangeProof.proof.len
trace logTxt "getStorageRanges: data slots", iv, sizeMax, dataAllocated,
nAccounts=accounts.len, accKey=accHash.to(NodeKey), stoRoot=sp.stoRoot,
nSlots=rangeProof.leafs.len, nProof=rangeProof.proof.len
slotLists.add rangeProof.leafs.mapIt(it.to(SnapStorage))
if 0 < rangeProof.proof.len:
@ -494,8 +490,7 @@ method getTrieNodes*(
let steps = partPath.hexPrefixDecode[1].hexaryPath(stateKey, getFn)
if 0 < steps.path.len and
steps.tail.len == 0 and steps.path[^1].nibble < 0:
let data = steps.path[^1].node.convertTo(Blob)
data
steps.path[^1].node.convertTo(Blob)
else:
EmptyBlob

View File

@ -11,12 +11,16 @@
{.push raises: [].}
import
std/sets,
eth/[common, trie/nibbles]
const
EmptyBlob* = seq[byte].default
## Useful shortcut
EmptyBlobSet* = HashSet[Blob].default
## Useful shortcut
EmptyBlobSeq* = seq[Blob].default
## Useful shortcut
@ -63,11 +67,6 @@ const
# --------------
accountsFetchRetryMax* = 2
## The request intervals will be slightly re-arranged after failure.
## So re-trying to fetch another range might be successful (set to 0
## for disabling retries.)
accountsSaveProcessedChunksMax* = 1000
## Recovery data are stored if the processed ranges list contains no more
## than this many range *chunks*.
@ -82,6 +81,14 @@ const
## If there are too many dangling nodes, no data will be saved and restart
## has to perform from scratch or an earlier checkpoint.
# --------------
storageSlotsFetchFailedFullMax* = fetchRequestStorageSlotsMax + 100
## Maximal number of failures when fetching full range storage slots.
## These failed slot ranges are only called for once in the same cycle.
storageSlotsFetchFailedPartialMax* = 300
## Ditto for partial range storage slots.
storageSlotsTrieInheritPerusalMax* = 30_000
## Maximal number of nodes to visit in order to find out whether this
@ -108,26 +115,33 @@ const
healAccountsInspectionPlanBLevel* = 4
## Search this level deep for missing nodes if `hexaryEnvelopeDecompose()`
## only produces existing nodes.
##
## The maximal number of nodes visited at level 3 is *4KiB* and at level 4
## is *64Kib*.
healAccountsInspectionPlanBRetryMax* = 2
## Retry inspection if this may times unless there is at least one dangling
## node found.
## Retry inspection with depth level argument starting at
## `healAccountsInspectionPlanBLevel-1` and counting down at most this
## many times until there is at least one dangling node found and the
## depth level argument remains positive. The cumulative depth of the
## iterated seach is
## ::
## b 1
## Σ ν = --- (b - a + 1) (a + b)
## a 2
## for
## ::
## b = healAccountsInspectionPlanBLevel
## a = b - healAccountsInspectionPlanBRetryMax
##
healAccountsInspectionPlanBRetryNapMSecs* = 2
## Sleep beween inspection retrys to allow thread switch. If this constant
## is set `0`, `1`ns wait is used.
healSlorageSlotsTrigger* = 0.70
## Consider per account storage slost healing if a per-account hexary
## sub-trie has reached this factor of completeness.
# --------------
healStorageSlotsInspectionPlanBLevel* = 4
healStorageSlotsInspectionPlanBLevel* = 5
## Similar to `healAccountsInspectionPlanBLevel`
healStorageSlotsInspectionPlanBRetryMax* = 2
healStorageSlotsInspectionPlanBRetryMax* = 99 # 5 + 4 + .. + 1 => 15
## Similar to `healAccountsInspectionPlanBRetryMax`
healStorageSlotsInspectionPlanBRetryNapMSecs* = 2
@ -138,6 +152,9 @@ const
## this many items will be removed from the batch queue. These items will
## then be processed one by one.
healStorageSlotsFailedMax* = 300
## Ditto for partial range storage slots.
# --------------
comErrorsTimeoutMax* = 3
@ -167,17 +184,8 @@ const
static:
doAssert storageSlotsQuPrioThresh < accountsSaveStorageSlotsMax
# Deprecated, to be expired
const
healInspectionBatch* = 10_000
## Number of nodes to inspect in a single batch. In between batches, a
## task/thread switch is allowed.
healInspectionBatchWaitNanoSecs* = 500
## Wait some time asynchroneously after processing `healInspectionBatch`
## nodes to allow for a pseudo -task switch.
doAssert 0 <= storageSlotsFetchFailedFullMax
doAssert 0 <= storageSlotsFetchFailedPartialMax
# ------------------------------------------------------------------------------
# End

View File

@ -13,9 +13,10 @@
import
std/[math, sequtils, strutils, hashes],
eth/common,
stew/[byteutils, interval_set],
stew/interval_set,
stint,
../../constants,
../../utils/prettify,
../protocol,
../types
@ -71,6 +72,11 @@ type
storageRoot*: Hash256 ## Start of storage tree
subRange*: Option[NodeTagRange] ## Sub-range of slot range covered
AccountSlotsChanged* = object
## Variant of `AccountSlotsHeader` representing some transition
account*: AccountSlotsHeader ## Account header
newRange*: Option[NodeTagRange] ## New sub-range (if-any)
AccountStorageRange* = object
## List of storage descriptors, the last `AccountSlots` storage data might
## be incomplete and the `proof` is needed for proving validity.
@ -83,6 +89,8 @@ type
account*: AccountSlotsHeader
data*: seq[SnapStorage]
# See below for definition of constant `FullNodeTagRange`
# ------------------------------------------------------------------------------
# Public helpers
# ------------------------------------------------------------------------------
@ -189,6 +197,10 @@ proc digestTo*(data: Blob; T: type NodeTag): T =
## Hash the `data` argument
keccakHash(data).to(T)
const
# Cannot be defined earlier: `NodeTag` operations needed
FullNodeTagRange* = NodeTagRange.new(low(NodeTag),high(NodeTag))
# ------------------------------------------------------------------------------
# Public functions: `NodeTagRange` helpers
# ------------------------------------------------------------------------------
@ -205,11 +217,34 @@ proc isEmpty*(lrs: openArray[NodeTagRangeSet]): bool =
return false
true
proc isEmpty*(iv: NodeTagRange): bool =
## Ditto for an interval range.
false # trivially by definition
proc isFull*(lrs: NodeTagRangeSet): bool =
## Returns `true` if the argument set `lrs` contains of the single
## interval [low(NodeTag),high(NodeTag)].
lrs.total == 0 and 0 < lrs.chunks
proc isFull*(lrs: openArray[NodeTagRangeSet]): bool =
## Variant of `isFull()` where intervals are distributed across several
## sets. This function makes sense only if the interval sets are mutually
## disjunct.
var accu: NodeTag
for ivSet in lrs:
if 0 < ivSet.total:
if high(NodeTag) - ivSet.total < accu:
return true
accu = accu + ivSet.total
elif 0 < ivSet.chunks:
# number of points in `ivSet` is `2^256 + 1`
return true
proc isFull*(iv: NodeTagRange): bool =
## Ditto for an interval range.
iv == FullNodeTagRange
proc emptyFactor*(lrs: NodeTagRangeSet): float =
## Relative uncovered total, i.e. `#points-not-covered / 2^256` to be used
@ -235,9 +270,11 @@ proc emptyFactor*(lrs: openArray[NodeTagRangeSet]): float =
discard
else: # number of points in `ivSet` is `2^256 + 1`
return 0.0
# Calculate: (2^256 - accu) / 2^256
if accu == 0.to(NodeTag):
return 1.0
((high(NodeTag) - accu).u256 + 1).to(float) / (2.0^256)
1.0
else:
((high(NodeTag) - accu).u256 + 1).to(float) / (2.0^256)
proc fullFactor*(lrs: NodeTagRangeSet): float =
@ -250,6 +287,22 @@ proc fullFactor*(lrs: NodeTagRangeSet): float =
else:
1.0 # number of points in `lrs` is `2^256 + 1`
proc fullFactor*(lrs: openArray[NodeTagRangeSet]): float =
## Variant of `fullFactor()` where intervals are distributed across several
## sets. This function makes sense only if the interval sets are mutually
## disjunct.
var accu: NodeTag
for ivSet in lrs:
if 0 < ivSet.total:
if high(NodeTag) - ivSet.total < accu:
return 1.0
accu = accu + ivSet.total
elif ivSet.chunks == 0:
discard
else: # number of points in `ivSet` is `2^256 + 1`
return 1.0
accu.u256.to(float) / (2.0^256)
proc fullFactor*(iv: NodeTagRange): float =
## Relative covered length of an inetrval, i.e. `#points-covered / 2^256`
if 0 < iv.len:
@ -266,8 +319,16 @@ proc `$`*(nodeTag: NodeTag): string =
"2^256-1"
elif nodeTag == 0.u256.NodeTag:
"0"
elif nodeTag == 2.u256.pow(255).NodeTag:
"2^255" # 800...
elif nodeTag == 2.u256.pow(254).NodeTag:
"2^254" # 400..
elif nodeTag == 2.u256.pow(253).NodeTag:
"2^253" # 200...
elif nodeTag == 2.u256.pow(251).NodeTag:
"2^252" # 100...
else:
nodeTag.to(Hash256).data.toHex
nodeTag.UInt256.toHex
proc `$`*(nodeKey: NodeKey): string =
$nodeKey.to(NodeTag)
@ -293,6 +354,37 @@ proc `$`*(iv: NodeTagRange): string =
leafRangePp iv
proc fullPC3*(w: NodeTagRangeSet|NodeTagRange): string =
## Pretty print fill state of range sets.
if w.isEmpty:
"0%"
elif w.isFull:
"100%"
else:
let ff = w.fullFactor
if ff <= 0.99999:
ff.toPC(3)
else:
"99.999"
proc fullPC3*(w: openArray[NodeTagRangeSet]): string =
## Variant of `fullPC3()` where intervals are distributed across several
## sets. This function makes sense only if the interval sets are mutually
## disjunct.
if w.isEmpty:
"0%"
else:
let partition = "~" & $w.mapIt(it.chunks).foldl(a+b)
if w.isFull:
"100%" & partition
else:
let ff = w.fullFactor
if ff <= 0.99999:
ff.toPC(3) & partition
else:
"99.999" & partition
proc dump*(
ranges: openArray[NodeTagRangeSet];
moan: proc(overlap: UInt256; iv: NodeTagRange) {.gcsafe.};

View File

@ -9,14 +9,13 @@
# except according to those terms.
import
std/[options, sets, strutils],
std/[options, sets],
chronicles,
chronos,
eth/[common, p2p],
stew/[interval_set, keyed_queue],
../../common as nimcom,
../../db/select_backend,
../../utils/prettify,
".."/[handlers, protocol, sync_desc],
./worker/[pivot, ticker],
./worker/com/com_error,
@ -251,11 +250,9 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} =
when extraTraceMessages:
block:
let
nAccounts {.used.} = env.nAccounts
nSlotLists {.used.} = env.nSlotLists
processed {.used.} = env.fetchAccounts.processed.fullFactor.toPC(2)
trace "Multi sync runner", peer, pivot, nAccounts, nSlotLists, processed,
trace "Multi sync runner", peer, pivot, nAccounts=env.nAccounts,
nSlotLists=env.nSlotLists,
processed=env.fetchAccounts.processed.fullPC3,
nStoQu=nStorQuAtStart
# This one is the syncing work horse which downloads the database
@ -263,10 +260,10 @@ proc runMulti*(buddy: SnapBuddyRef) {.async.} =
# Various logging entries (after accounts and storage slots download)
let
nAccounts = env.nAccounts
nSlotLists = env.nSlotLists
processed = env.fetchAccounts.processed.fullFactor.toPC(2)
nStoQuLater = env.fetchStorageFull.len + env.fetchStoragePart.len
nAccounts {.used.} = env.nAccounts
nSlotLists {.used.} = env.nSlotLists
processed {.used.} = env.fetchAccounts.processed.fullPC3
nStoQuLater {.used.} = env.fetchStorageFull.len + env.fetchStoragePart.len
if env.archived:
# Archive pivot if it became stale

View File

@ -33,9 +33,12 @@ type
# proof*: seq[SnapProof]
GetStorageRanges* = object
leftOver*: seq[AccountSlotsHeader]
leftOver*: seq[AccountSlotsChanged]
data*: AccountStorageRange
const
extraTraceMessages = false or true
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
@ -68,9 +71,8 @@ proc getStorageRangesReq(
return ok(reply)
except CatchableError as e:
let error {.used.} = e.msg
trace trSnapRecvError & "waiting for GetStorageRanges reply", peer, pivot,
error
name=($e.name), error=(e.msg)
return err()
# ------------------------------------------------------------------------------
@ -84,25 +86,28 @@ proc getStorageRanges*(
pivot: string; ## For logging, instead of `stateRoot`
): Future[Result[GetStorageRanges,ComError]]
{.async.} =
## Fetch data using the `snap#` protocol, returns the range covered.
## Fetch data using the `snap/1` protocol, returns the range covered.
##
## If the first `accounts` argument sequence item has the `firstSlot` field
## set non-zero, only this account is fetched with a range. Otherwise all
## accounts are asked for without a range (non-zero `firstSlot` fields are
## ignored of later sequence items.)
let
peer {.used.} = buddy.peer
var
nAccounts = accounts.len
## If the first `accounts` argument sequence item has the optional `subRange`
## field set, only this account is fetched with for the range `subRange`.
## Otherwise all accounts are asked for without a range (`subRange` fields
## are ignored for later accounts list items.)
var nAccounts = accounts.len
if nAccounts == 0:
return err(ComEmptyAccountsArguments)
if trSnapTracePacketsOk:
trace trSnapSendSending & "GetStorageRanges", peer, pivot, nAccounts
let
peer {.used.} = buddy.peer
iv = accounts[0].subRange
when trSnapTracePacketsOk:
when extraTraceMessages:
trace trSnapSendSending & "GetStorageRanges", peer, pivot, nAccounts,
iv=iv.get(otherwise=FullNodeTagRange)
else:
trace trSnapSendSending & "GetStorageRanges", peer, pivot, nAccounts
let
iv = accounts[0].subRange
snStoRanges = block:
let rc = await buddy.getStorageRangesReq(stateRoot,
accounts.mapIt(it.accKey.to(Hash256)), iv, pivot)
@ -119,7 +124,6 @@ proc getStorageRanges*(
return err(ComTooManyStorageSlots)
rc.value.get
let
nSlotLists = snStoRanges.slotLists.len
nProof = snStoRanges.proof.nodes.len
@ -148,40 +152,52 @@ proc getStorageRanges*(
# Filter remaining `slots` responses:
# * Accounts for empty ones go back to the `leftOver` list.
for n in 0 ..< nSlotLists:
# Empty data for a slot indicates missing data
if snStoRanges.slotLists[n].len == 0:
dd.leftOver.add accounts[n]
else:
if 0 < snStoRanges.slotLists[n].len or (n == nSlotLists-1 and 0 < nProof):
# Storage slot data available. The last storage slots list may
# be a proved empty sub-range.
dd.data.storages.add AccountSlots(
account: accounts[n], # known to be no fewer accounts than slots
data: snStoRanges.slotLists[n])
# Complete the part that was not answered by the peer
if nProof == 0:
# assigning empty slice is ok
dd.leftOver = dd.leftOver & accounts[nSlotLists ..< nAccounts]
else: # if n < nSlotLists-1 or nProof == 0:
# Empty data here indicate missing data
dd.leftOver.add AccountSlotsChanged(
account: accounts[n])
else:
# Ok, we have a proof now
if 0 < snStoRanges.slotLists[^1].len:
# If the storage data for the last account comes with a proof, then the
# data set is incomplete. So record the missing part on the `dd.leftOver`
# list.
if 0 < nProof:
# Ok, we have a proof now. In that case, there is always a duplicate
# of the proved entry on the `dd.leftOver` list.
#
# Note that `storages[^1]` exists due to the clause
# `(n==nSlotLists-1 and 0<nProof)` in the above `for` loop.
let topAcc = dd.data.storages[^1].account
dd.leftOver.add AccountSlotsChanged(account: topAcc)
if 0 < dd.data.storages[^1].data.len:
let
reqTop = if accounts[0].subRange.isNone: high(NodeTag)
else: accounts[0].subRange.unsafeGet.maxPt
respTop = dd.data.storages[^1].data[^1].slotHash.to(NodeTag)
if respTop < reqTop:
dd.leftOver.add AccountSlotsHeader(
subRange: some(NodeTagRange.new(respTop + 1.u256, reqTop)),
accKey: accounts[nSlotLists-1].accKey,
storageRoot: accounts[nSlotLists-1].storageRoot)
reqMaxPt = topAcc.subRange.get(otherwise = FullNodeTagRange).maxPt
respMaxPt = dd.data.storages[^1].data[^1].slotHash.to(NodeTag)
if respMaxPt < reqMaxPt:
dd.leftOver[^1].newRange = some(
NodeTagRange.new(respMaxPt + 1.u256, reqMaxPt))
elif 0 < dd.data.storages.len:
let topAcc = dd.data.storages[^1].account
if topAcc.subRange.isSome:
#
# Fringe case when a partial request was answered without a proof.
# This means, that the interval requested covers the complete trie.
#
# Copying the request to the `leftOver`, the ranges reflect the new
# state: `topAcc.subRange.isSome` and `newRange.isNone`.
dd.leftOver.add AccountSlotsChanged(account: topAcc)
# Do thew rest (assigning empty slice is ok)
dd.leftOver = dd.leftOver & accounts[nSlotLists ..< nAccounts]
# Complete the part that was not answered by the peer.
dd.leftOver = dd.leftOver & accounts[nSlotLists ..< nAccounts].mapIt(
AccountSlotsChanged(account: it))
trace trSnapRecvReceived & "StorageRanges", peer, pivot, nAccounts,
nSlotLists, nProof, nLeftOver=dd.leftOver.len
when trSnapTracePacketsOk:
trace trSnapRecvReceived & "StorageRanges", peer, pivot, nAccounts,
nSlotLists, nProof, nSlotLstRc=dd.data.storages.len,
nLeftOver=dd.leftOver.len
return ok(dd)

View File

@ -226,6 +226,7 @@ proc execSnapSyncAction*(
if buddy.ctrl.stopped or env.archived:
return
var rangeFetchOk = true
if not env.fetchAccounts.processed.isFull:
await buddy.rangeFetchAccounts(env)
@ -234,26 +235,35 @@ proc execSnapSyncAction*(
# Run at least one round fetching storage slosts even if the `archived`
# flag is set in order to keep the batch queue small.
if not buddy.ctrl.stopped:
if buddy.ctrl.running:
await buddy.rangeFetchStorageSlots(env)
if buddy.ctrl.stopped or env.archived:
else:
rangeFetchOk = false
if env.archived:
return
# Uncconditonally try healing if enabled.
if env.accountsHealingOk(ctx):
# Let this procedure decide whether to ditch this peer (if any.) The idea
# is that the healing process might address different peer ressources
# than the fetch procedure. So that peer might still be useful unless
# physically disconnected.
buddy.ctrl.forceRun = true
await buddy.healAccounts(env)
if buddy.ctrl.stopped or env.archived:
if env.archived:
return
# Some additional storage slots might have been popped up
await buddy.rangeFetchStorageSlots(env)
if buddy.ctrl.stopped or env.archived:
return
if rangeFetchOk:
await buddy.rangeFetchStorageSlots(env)
if env.archived:
return
# Don't bother with storage slots healing before accounts healing takes
# place. This saves communication bandwidth. The pivot might change soon,
# anyway.
if env.accountsHealingOk(ctx):
buddy.ctrl.forceRun = true
await buddy.healStorageSlots(env)

View File

@ -114,6 +114,7 @@ proc findMissingNodes*(
planBLevelMax: uint8;
planBRetryMax: int;
planBRetrySleepMs: int;
forcePlanBOk = false;
): Future[MissingNodesSpecs]
{.async.} =
## Find some missing nodes in the hexary trie database.
@ -138,7 +139,7 @@ proc findMissingNodes*(
# Plan B, carefully employ `hexaryInspect()`
var nRetryCount = 0
if 0 < nodes.len:
if 0 < nodes.len or forcePlanBOk:
ignExceptionOops("compileMissingNodesList"):
let
paths = nodes.mapIt it.partialPath
@ -152,11 +153,13 @@ proc findMissingNodes*(
while stats.dangling.len == 0 and
nRetryCount < planBRetryMax and
1 < maxLevel and
not stats.resumeCtx.isNil:
await sleepAsync suspend
nRetryCount.inc
maxLevel = (120 * maxLevel + 99) div 100 # ~20% increase
trace logTxt "plan B retry", nRetryCount, maxLevel
maxLevel.dec
when extraTraceMessages:
trace logTxt "plan B retry", forcePlanBOk, nRetryCount, maxLevel
stats = getFn.hexaryInspectTrie(rootKey,
resumeCtx = stats.resumeCtx,
stopAtLevel = maxLevel,
@ -169,19 +172,20 @@ proc findMissingNodes*(
if 0 < result.missing.len:
when extraTraceMessages:
trace logTxt "plan B", nNodes=nodes.len, nDangling=result.missing.len,
level=result.level, nVisited=result.visited, nRetryCount
trace logTxt "plan B", forcePlanBOk, nNodes=nodes.len,
nDangling=result.missing.len, level=result.level,
nVisited=result.visited, nRetryCount
return
when extraTraceMessages:
trace logTxt "plan B not applicable", nNodes=nodes.len,
trace logTxt "plan B not applicable", forcePlanBOk, nNodes=nodes.len,
level=result.level, nVisited=result.visited, nRetryCount
# Plan C, clean up intervals
# Calculate `gaps` as the complement of the `processed` set of intervals
let gaps = NodeTagRangeSet.init()
discard gaps.merge(low(NodeTag),high(NodeTag))
discard gaps.merge FullNodeTagRange
for w in ranges.processed.increasing: discard gaps.reduce w
# Clean up empty gaps in the processed range

View File

@ -53,14 +53,12 @@ import
"."/[find_missing_nodes, storage_queue_helper, swap_in]
logScope:
topics = "snap-heal"
topics = "snap-acc"
const
extraTraceMessages = false or true
## Enabled additional logging noise
EmptyBlobSet = HashSet[Blob].default
# ------------------------------------------------------------------------------
# Private logging helpers
# ------------------------------------------------------------------------------
@ -72,11 +70,10 @@ proc `$`(node: NodeSpecs): string =
node.partialPath.toHex
proc `$`(rs: NodeTagRangeSet): string =
let ff = rs.fullFactor
if 0.99 <= ff and ff < 1.0: "99%" else: ff.toPC(0)
rs.fullPC3
proc `$`(iv: NodeTagRange): string =
iv.fullFactor.toPC(3)
iv.fullPC3
proc toPC(w: openArray[NodeSpecs]; n: static[int] = 3): string =
let sumUp = w.mapIt(it.hexaryEnvelope.len).foldl(a+b, 0.u256)
@ -88,7 +85,8 @@ proc healingCtx(
): string =
let ctx = buddy.ctx
"{" &
"pivot=" & "#" & $env.stateHeader.blockNumber & "," &
"piv=" & "#" & $env.stateHeader.blockNumber & "," &
"ctl=" & $buddy.ctrl.state & "," &
"nAccounts=" & $env.nAccounts & "," &
("covered=" & $env.fetchAccounts.processed & "/" &
$ctx.pool.coveredAccounts ) & "}"
@ -146,7 +144,7 @@ proc compileMissingNodesList(
return mlv.missing
proc fetchMissingNodes(
proc getNodesFromNetwork(
buddy: SnapBuddyRef;
missingNodes: seq[NodeSpecs]; # Nodes to fetch from the network
ignore: HashSet[Blob]; # Except for these partial paths listed
@ -156,7 +154,6 @@ proc fetchMissingNodes(
## Extract from `nodes.missing` the next batch of nodes that need
## to be merged it into the database
let
ctx {.used.} = buddy.ctx
peer {.used.} = buddy.peer
rootHash = env.stateHeader.stateRoot
pivot = "#" & $env.stateHeader.blockNumber # for logging
@ -205,11 +202,7 @@ proc kvAccountLeaf(
env: SnapPivotRef;
): (bool,NodeKey,Account) =
## Re-read leaf node from persistent database (if any)
let
peer {.used.} = buddy.peer
var
nNibbles = -1
var nNibbles = -1
discardRlpError("kvAccountLeaf"):
let
nodeRlp = rlpFromBytes node.data
@ -226,7 +219,7 @@ proc kvAccountLeaf(
return (true, nodeKey, accData)
when extraTraceMessages:
trace logTxt "non-leaf node path or corrupt data", peer,
trace logTxt "non-leaf node path or corrupt data", peer=buddy.peer,
ctx=buddy.healingCtx(env), nNibbles
@ -297,7 +290,7 @@ proc accountsHealingImpl(
return (0,EmptyBlobSet) # nothing to do
# Get next batch of nodes that need to be merged it into the database
let fetchedNodes = await buddy.fetchMissingNodes(missingNodes, ignore, env)
let fetchedNodes = await buddy.getNodesFromNetwork(missingNodes, ignore, env)
if fetchedNodes.len == 0:
return (0,EmptyBlobSet)
@ -308,8 +301,8 @@ proc accountsHealingImpl(
if 0 < report.len and report[^1].slot.isNone:
# Storage error, just run the next lap (not much else that can be done)
error logTxt "error updating persistent database", peer,
ctx=buddy.healingCtx(env), nFetchedNodes, error=report[^1].error
error logTxt "databse error", peer, ctx=buddy.healingCtx(env),
nFetchedNodes, error=report[^1].error
return (-1,EmptyBlobSet)
# Filter out error and leaf nodes
@ -349,9 +342,7 @@ proc healAccounts*(
) {.async.} =
## Fetching and merging missing account trie database nodes.
when extraTraceMessages:
let
ctx {.used.} = buddy.ctx
peer {.used.} = buddy.peer
let peer {.used.} = buddy.peer
trace logTxt "started", peer, ctx=buddy.healingCtx(env)
let
@ -364,7 +355,7 @@ proc healAccounts*(
while not fa.processed.isFull() and
buddy.ctrl.running and
not env.archived:
var (nNodes, rejected) = await buddy.accountsHealingImpl(ignore, env)
let (nNodes, rejected) = await buddy.accountsHealingImpl(ignore, env)
if nNodes <= 0:
break
ignore = ignore + rejected
@ -372,8 +363,8 @@ proc healAccounts*(
nFetchLoop.inc
when extraTraceMessages:
trace logTxt "job done", peer, ctx=buddy.healingCtx(env),
nNodesFetched, nFetchLoop, nIgnore=ignore.len, runState=buddy.ctrl.state
trace logTxt "done", peer, ctx=buddy.healingCtx(env),
nNodesFetched, nFetchLoop, nIgnore=ignore.len
# ------------------------------------------------------------------------------
# End

View File

@ -38,12 +38,10 @@
## healing algorithm again.
##
# ###### --- CHECK FOR DEADLOCK ---- ####
{.push raises: [].}
import
std/[math, sequtils, tables],
std/[math, sequtils, sets, tables],
chronicles,
chronos,
eth/[common, p2p, trie/nibbles],
@ -52,11 +50,12 @@ import
"../../.."/[sync_desc, protocol, types],
"../.."/[constants, range_desc, worker_desc],
../com/[com_error, get_trie_nodes],
../db/[hexary_desc, hexary_envelope, snapdb_storage_slots],
../db/[hexary_desc, hexary_envelope, hexary_error, hexary_range,
snapdb_storage_slots],
"."/[find_missing_nodes, storage_queue_helper]
logScope:
topics = "snap-heal"
topics = "snap-slot"
const
extraTraceMessages = false or true
@ -73,10 +72,10 @@ proc `$`(node: NodeSpecs): string =
node.partialPath.toHex
proc `$`(rs: NodeTagRangeSet): string =
rs.fullFactor.toPC(0)
rs.fullPC3
proc `$`(iv: NodeTagRange): string =
iv.fullFactor.toPC(3)
iv.fullPC3
proc toPC(w: openArray[NodeSpecs]; n: static[int] = 3): string =
let sumUp = w.mapIt(it.hexaryEnvelope.len).foldl(a+b, 0.u256)
@ -87,33 +86,36 @@ proc healingCtx(
env: SnapPivotRef;
): string {.used.} =
"{" &
"pivot=" & "#" & $env.stateHeader.blockNumber & "," &
"runState=" & $buddy.ctrl.state & "," &
"piv=" & "#" & $env.stateHeader.blockNumber & "," &
"ctl=" & $buddy.ctrl.state & "," &
"nStoQu=" & $env.storageQueueTotal() & "," &
"nQuPart=" & $env.fetchStoragePart.len & "," &
"nParked=" & $env.parkedStorage.len & "," &
"nSlotLists=" & $env.nSlotLists & "}"
proc healingCtx(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
kvp: StoQuSlotsKVP;
env: SnapPivotRef;
): string =
"{" &
"pivot=" & "#" & $env.stateHeader.blockNumber & "," &
"runState=" & $buddy.ctrl.state & "," &
"covered=" & $kvp.data.slots.processed & "," &
"piv=" & "#" & $env.stateHeader.blockNumber & "," &
"ctl=" & $buddy.ctrl.state & "," &
"processed=" & $kvp.data.slots.processed & "," &
"nStoQu=" & $env.storageQueueTotal() & "," &
"nQuPart=" & $env.fetchStoragePart.len & "," &
"nParked=" & $env.parkedStorage.len & "," &
"nSlotLists=" & $env.nSlotLists & "}"
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
template noExceptionOops(info: static[string]; code: untyped) =
template discardRlpError(info: static[string]; code: untyped) =
try:
code
except CatchableError as e:
raiseAssert "Inconveivable (" &
info & "): name=" & $e.name & " msg=" & e.msg
except RlpError:
discard
# ------------------------------------------------------------------------------
# Private functions
@ -121,7 +123,7 @@ template noExceptionOops(info: static[string]; code: untyped) =
proc compileMissingNodesList(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
kvp: StoQuSlotsKVP;
env: SnapPivotRef;
): Future[seq[NodeSpecs]]
{.async.} =
@ -138,7 +140,14 @@ proc compileMissingNodesList(
rootKey, getFn,
healStorageSlotsInspectionPlanBLevel,
healStorageSlotsInspectionPlanBRetryMax,
healStorageSlotsInspectionPlanBRetryNapMSecs)
healStorageSlotsInspectionPlanBRetryNapMSecs,
forcePlanBOk = true)
# Clean up empty account ranges found while looking for nodes
if not mlv.emptyGaps.isNil:
for w in mlv.emptyGaps.increasing:
discard slots.processed.merge w
slots.unprocessed.reduce w
when extraTraceMessages:
trace logTxt "missing nodes", peer,
@ -150,73 +159,124 @@ proc compileMissingNodesList(
proc getNodesFromNetwork(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
missing: seq[NodeSpecs];
env: SnapPivotRef;
missingNodes: seq[NodeSpecs]; # Nodes to fetch from the network
ignore: HashSet[Blob]; # Except for these partial paths listed
kvp: StoQuSlotsKVP; # Storage slots context
env: SnapPivotRef; # For logging
): Future[seq[NodeSpecs]]
{.async.} =
## Extract from `missing` the next batch of nodes that need
## to be merged it into the database
let
ctx {.used.} = buddy.ctx
peer {.used.} = buddy.peer
accPath = kvp.data.accKey.to(Blob)
storageRoot = kvp.key
fetchNodes = missing[0 ..< fetchRequestTrieNodesMax]
if fetchNodes.len == 0:
return # Nothing to do
# Initalise for `getTrieNodes()` for fetching nodes from the network
var
nodeKey: Table[Blob,NodeKey] # Temporary `path -> key` mapping
req = SnapTriePaths(accPath: accpath)
for w in fetchNodes:
req.slotPaths.add w.partialPath
nodeKey[w.partialPath] = w.nodeKey
# Fetch nodes from the network.
let
rootHash = env.stateHeader.stateRoot
pivot = "#" & $env.stateHeader.blockNumber # for logging
rc = await buddy.getTrieNodes(storageRoot, @[req], pivot)
if rc.isOk:
# Reset error counts for detecting repeated timeouts, network errors, etc.
buddy.only.errors.resetComError()
return rc.value.nodes.mapIt(NodeSpecs(
partialPath: it.partialPath,
nodeKey: nodeKey[it.partialPath],
data: it.data))
# Initalise for fetching nodes from the network via `getTrieNodes()`
var
nodeKey: Table[Blob,NodeKey] # Temporary `path -> key` mapping
req = SnapTriePaths(accPath: accPath) # Argument for `getTrieNodes()`
let error = rc.error
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.only.errors):
# There is no point in fetching too many nodes as it will be rejected. So
# rest of the `missingNodes` list is ignored to be picked up later.
for w in missingNodes:
if w.partialPath notin ignore and not nodeKey.hasKey(w.partialPath):
req.slotPaths.add w.partialPath
nodeKey[w.partialPath] = w.nodeKey
if fetchRequestTrieNodesMax <= req.slotPaths.len:
break
if 0 < req.slotPaths.len:
# Fetch nodes from the network.
let rc = await buddy.getTrieNodes(rootHash, @[req], pivot)
if rc.isOk:
# Reset error counts for detecting repeated timeouts, network errors, etc.
buddy.only.errors.resetComError()
return rc.value.nodes.mapIt(NodeSpecs(
partialPath: it.partialPath,
nodeKey: nodeKey[it.partialPath],
data: it.data))
# Process error ...
let
error = rc.error
ok = await buddy.ctrl.stopAfterSeriousComError(error, buddy.only.errors)
when extraTraceMessages:
trace logTxt "fetch nodes error => stop", peer,
ctx=buddy.healingCtx(kvp,env), error
trace logTxt "reply error", peer, ctx=buddy.healingCtx(kvp,env),
error, stop=ok
return @[]
proc slotKey(node: NodeSpecs): (bool,NodeKey) =
## Read leaf node from persistent database (if any)
try:
proc kvStoSlotsLeaf(
buddy: SnapBuddyRef;
node: NodeSpecs; # Node data fetched from network
kvp: StoQuSlotsKVP; # For logging
env: SnapPivotRef; # For logging
): (bool,NodeKey) =
## Re-read leaf node from persistent database (if any)
var nNibbles = -1
discardRlpError("kvStorageSlotsLeaf"):
let
nodeRlp = rlpFromBytes node.data
(_,prefix) = hexPrefixDecode node.partialPath
(_,segment) = hexPrefixDecode nodeRlp.listElem(0).toBytes
prefix = (hexPrefixDecode node.partialPath)[1]
segment = (hexPrefixDecode nodeRlp.listElem(0).toBytes)[1]
nibbles = prefix & segment
if nibbles.len == 64:
nNibbles = nibbles.len
if nNibbles == 64:
return (true, nibbles.getBytes.convertTo(NodeKey))
except CatchableError:
discard
when extraTraceMessages:
trace logTxt "non-leaf node path or corrupt data", peer=buddy.peer,
ctx=buddy.healingCtx(kvp,env), nNibbles
proc registerStoSlotsLeaf(
buddy: SnapBuddyRef;
slotKey: NodeKey;
kvp: StoQuSlotsKVP;
env: SnapPivotRef;
) =
## Process single account node as would be done with an interval by
## the `storeAccounts()` function
let
ctx = buddy.ctx
peer = buddy.peer
rootKey = kvp.key.to(NodeKey)
getSlotFn = ctx.pool.snapDb.getStorageSlotsFn kvp.data.accKey
pt = slotKey.to(NodeTag)
# Extend interval [pt,pt] if possible
var iv: NodeTagRange
try:
iv = getSlotFn.hexaryRangeInflate(rootKey, pt)
except CatchableError as e:
error logTxt "inflating interval oops", peer, ctx=buddy.healingCtx(kvp,env),
accKey=kvp.data.accKey, slotKey, name=($e.name), msg=e.msg
iv = NodeTagRange.new(pt,pt)
# Register isolated leaf node
if 0 < kvp.data.slots.processed.merge iv:
kvp.data.slots.unprocessed.reduce iv
when extraTraceMessages:
trace logTxt "registered single slot", peer, ctx=buddy.healingCtx(env),
leftSlack=(iv.minPt < pt), rightSlack=(pt < iv.maxPt)
# ------------------------------------------------------------------------------
# Private functions: do the healing for one work item (sub-trie)
# ------------------------------------------------------------------------------
proc storageSlotsHealing(
proc stoSlotsHealingImpl(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
ignore: HashSet[Blob]; # Except for these partial paths listed
kvp: StoQuSlotsKVP;
env: SnapPivotRef;
) {.async.} =
): Future[(int,HashSet[Blob])]
{.async.} =
## Returns `true` is the sub-trie is complete (probably inherited), and
## `false` if there are nodes left to be completed.
let
@ -227,51 +287,53 @@ proc storageSlotsHealing(
if missing.len == 0:
trace logTxt "nothing to do", peer, ctx=buddy.healingCtx(kvp,env)
return
when extraTraceMessages:
trace logTxt "started", peer, ctx=buddy.healingCtx(kvp,env)
return (0,EmptyBlobSet) # nothing to do
# Get next batch of nodes that need to be merged it into the database
let nodeSpecs = await buddy.getNodesFromNetwork(kvp, missing, env)
if nodeSpecs.len == 0:
return
let fetchedNodes = await buddy.getNodesFromNetwork(missing, ignore, kvp, env)
if fetchedNodes.len == 0:
when extraTraceMessages:
trace logTxt "node set unavailable", nMissing=missing.len
return (0,EmptyBlobSet)
# Store nodes onto disk
let report = db.importRawStorageSlotsNodes(peer, kvp.data.accKey, nodeSpecs)
let
nFetchedNodes = fetchedNodes.len
report = db.importRawStorageSlotsNodes(peer, kvp.data.accKey, fetchedNodes)
if 0 < report.len and report[^1].slot.isNone:
# Storage error, just run the next lap (not much else that can be done)
error logTxt "database error", peer, ctx=buddy.healingCtx(kvp,env),
nNodes=nodeSpecs.len, error=report[^1].error
return
when extraTraceMessages:
trace logTxt "nodes merged into database", peer,
ctx=buddy.healingCtx(kvp,env), nNodes=nodeSpecs.len
nFetchedNodes, error=report[^1].error
return (-1,EmptyBlobSet)
# Filter out leaf nodes
var nLeafNodes = 0 # for logging
var
nLeafNodes = 0 # for logging
rejected: HashSet[Blob]
trace logTxt "importRawStorageSlotsNodes", nReport=report.len #########
for w in report:
if w.slot.isSome and w.kind.get(otherwise = Branch) == Leaf:
if w.slot.isSome: # non-indexed entries appear typically at the end, though
let inx = w.slot.unsafeGet
# Leaf Node has been stored, so register it
let
inx = w.slot.unsafeGet
(isLeaf, slotKey) = nodeSpecs[inx].slotKey
if isLeaf:
let
slotTag = slotKey.to(NodeTag)
iv = NodeTagRange.new(slotTag,slotTag)
kvp.data.slots.unprocessed.reduce iv
discard kvp.data.slots.processed.merge iv
nLeafNodes.inc
# Node error, will need to pick up later and download again. Node that
# there need not be an expicit node specs (so `kind` is opted out.)
if w.kind.isNone or w.error != HexaryError(0):
rejected.incl fetchedNodes[inx].partialPath
when extraTraceMessages:
trace logTxt "stored slot", peer,
ctx=buddy.healingCtx(kvp,env), slotKey=slotTag
elif w.kind.unsafeGet == Leaf:
# Leaf node has been stored, double check
let (isLeaf, key) = buddy.kvStoSlotsLeaf(fetchedNodes[inx], kvp, env)
if isLeaf:
# Update `unprocessed` registry, collect storage roots (if any)
buddy.registerStoSlotsLeaf(key, kvp, env)
nLeafNodes.inc
when extraTraceMessages:
trace logTxt "job done", peer, ctx=buddy.healingCtx(kvp,env), nLeafNodes
trace logTxt "merged into database", peer, ctx=buddy.healingCtx(kvp,env),
nLeafNodes
return (nFetchedNodes - rejected.len, rejected)
# ------------------------------------------------------------------------------
# Public functions
@ -282,48 +344,48 @@ proc healStorageSlots*(
env: SnapPivotRef;
) {.async.} =
## Fetching and merging missing slorage slots trie database nodes.
let
ctx {.used.} = buddy.ctx
peer {.used.} = buddy.peer
when extraTraceMessages:
let peer {.used.} = buddy.peer
trace logTxt "started", peer, ctx=buddy.healingCtx(env)
# Extract healing slot items from partial slots list
var toBeHealed: seq[SnapSlotsQueuePair]
for kvp in env.fetchStoragePart.nextPairs:
# Delete from queue and process this entry
env.fetchStoragePart.del kvp.key
var
nNodesFetched = 0
nFetchLoop = 0
ignore: HashSet[Blob]
visited: HashSet[NodeKey]
# Move to returned list unless duplicated in full slots list
if env.fetchStorageFull.eq(kvp.key).isErr:
toBeHealed.add kvp
env.parkedStorage.incl kvp.data.accKey # temporarily parked
if healStorageSlotsBatchMax <= toBeHealed.len:
while buddy.ctrl.running and
visited.len <= healStorageSlotsBatchMax and
ignore.len <= healStorageSlotsFailedMax and
not env.archived:
# Pull out the next request list from the queue
let kvp = block:
let rc = env.storageQueueUnlinkPartialItem visited
if rc.isErr:
when extraTraceMessages:
trace logTxt "queue exhausted", peer, ctx=buddy.healingCtx(env),
nIgnore=ignore.len, nVisited=visited.len
break
rc.value
# Run against local batch
let nHealerQueue = toBeHealed.len
if 0 < nHealerQueue:
when extraTraceMessages:
trace logTxt "processing", peer, ctx=buddy.healingCtx(env), nHealerQueue
nFetchLoop.inc
for n in 0 ..< toBeHealed.len:
# Stop processing, hand back the rest
if buddy.ctrl.stopped:
for m in n ..< toBeHealed.len:
let kvp = toBeHealed[n]
discard env.fetchStoragePart.append(kvp.key, kvp.data)
env.parkedStorage.excl kvp.data.accKey
break
# Process request range for healing
let (nNodes, rejected) = await buddy.stoSlotsHealingImpl(ignore, kvp, env)
if kvp.data.slots.processed.isFull:
env.nSlotLists.inc
env.parkedStorage.excl kvp.data.accKey
else:
# Re-queue again, to be re-processed in another cycle
visited.incl kvp.data.accKey
env.storageQueueAppend kvp
let kvp = toBeHealed[n]
await buddy.storageSlotsHealing(kvp, env)
# Re-queue again unless ready
env.parkedStorage.excl kvp.data.accKey # un-register
if not kvp.data.slots.processed.isFull:
discard env.fetchStoragePart.append(kvp.key, kvp.data)
ignore = ignore + rejected
nNodesFetched.inc(nNodes)
when extraTraceMessages:
trace logTxt "done", peer, ctx=buddy.healingCtx(env), nHealerQueue
trace logTxt "done", peer, ctx=buddy.healingCtx(env),
nNodesFetched, nFetchLoop, nIgnore=ignore.len, nVisited=visited.len
# ------------------------------------------------------------------------------
# End

View File

@ -57,7 +57,7 @@ import
"."/[storage_queue_helper, swap_in]
logScope:
topics = "snap-range"
topics = "snap-acc"
const
extraTraceMessages = false or true
@ -70,19 +70,19 @@ const
template logTxt(info: static[string]): static[string] =
"Accounts range " & info
#proc `$`(rs: NodeTagRangeSet): string =
# rs.fullFactor.toPC(0)
proc `$`(rs: NodeTagRangeSet): string =
rs.fullPC3
proc `$`(iv: NodeTagRange): string =
iv.fullFactor.toPC(3)
iv.fullPC3
proc fetchCtx(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): string {.used.} =
"{" &
"pivot=" & "#" & $env.stateHeader.blockNumber & "," &
"runState=" & $buddy.ctrl.state & "," &
"piv=" & "#" & $env.stateHeader.blockNumber & "," &
"ctl=" & $buddy.ctrl.state & "," &
"nStoQu=" & $env.storageQueueTotal() & "," &
"nSlotLists=" & $env.nSlotLists & "}"
@ -133,12 +133,10 @@ proc accountsRangefetchImpl(
rc = await buddy.getAccountRange(stateRoot, iv, pivot)
if rc.isErr:
fa.unprocessed.mergeSplit iv # fail => interval back to pool
let error = rc.error
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.only.errors):
if await buddy.ctrl.stopAfterSeriousComError(rc.error, buddy.only.errors):
when extraTraceMessages:
let reqLen {.used.} = $iv
trace logTxt "fetch error", peer, ctx=buddy.fetchCtx(env),
reqLen, error
reqLen=iv, error=rc.error
return
rc.value
@ -169,9 +167,8 @@ proc accountsRangefetchImpl(
# Bad data, just try another peer
buddy.ctrl.zombie = true
when extraTraceMessages:
let reqLen {.used.} = $iv
trace logTxt "import failed", peer, ctx=buddy.fetchCtx(env),
gotAccounts, gotStorage, reqLen, covered, error=rc.error
gotAccounts, gotStorage, reqLen=iv, covered, error=rc.error
return
rc.value
@ -221,32 +218,21 @@ proc rangeFetchAccounts*(
env: SnapPivotRef;
) {.async.} =
## Fetch accounts and store them in the database.
let
fa = env.fetchAccounts
let fa = env.fetchAccounts
if not fa.processed.isFull():
let
ctx {.used.} = buddy.ctx
peer {.used.} = buddy.peer
when extraTraceMessages:
trace logTxt "start", peer, ctx=buddy.fetchCtx(env)
trace logTxt "start", peer=buddy.peer, ctx=buddy.fetchCtx(env)
static:
doAssert 0 <= accountsFetchRetryMax
var
nFetchAccounts = 0 # for logging
nRetry = 0
var nFetchAccounts = 0 # for logging
while not fa.processed.isFull() and
buddy.ctrl.running and
not env.archived and
nRetry <= accountsFetchRetryMax:
not env.archived:
# May repeat fetching with re-arranged request intervals
if await buddy.accountsRangefetchImpl(env):
nFetchAccounts.inc
nRetry = 0
else:
nRetry.inc
if not await buddy.accountsRangefetchImpl(env):
break
nFetchAccounts.inc
# Clean up storage slots queue first it it becomes too large
let nStoQu = env.fetchStorageFull.len + env.fetchStoragePart.len
@ -254,7 +240,8 @@ proc rangeFetchAccounts*(
break
when extraTraceMessages:
trace logTxt "done", peer, ctx=buddy.fetchCtx(env), nFetchAccounts
trace logTxt "done", peer=buddy.peer, ctx=buddy.fetchCtx(env),
nFetchAccounts
# ------------------------------------------------------------------------------
# End

View File

@ -65,19 +65,20 @@
{.push raises: [].}
import
std/sets,
chronicles,
chronos,
eth/[common, p2p],
stew/[interval_set, keyed_queue],
stint,
../../../sync_desc,
"../.."/[range_desc, worker_desc],
"../.."/[constants, range_desc, worker_desc],
../com/[com_error, get_storage_ranges],
../db/[hexary_error, snapdb_storage_slots],
./storage_queue_helper
logScope:
topics = "snap-range"
topics = "snap-slot"
const
extraTraceMessages = false or true
@ -93,27 +94,26 @@ proc fetchCtx(
buddy: SnapBuddyRef;
env: SnapPivotRef;
): string =
let
nStoQu = (env.fetchStorageFull.len +
env.fetchStoragePart.len +
env.parkedStorage.len)
"{" &
"pivot=" & "#" & $env.stateHeader.blockNumber & "," &
"runState=" & $buddy.ctrl.state & "," &
"nStoQu=" & $nStoQu & "," &
"piv=" & "#" & $env.stateHeader.blockNumber & "," &
"ctl=" & $buddy.ctrl.state & "," &
"nQuFull=" & $env.fetchStorageFull.len & "," &
"nQuPart=" & $env.fetchStoragePart.len & "," &
"nParked=" & $env.parkedStorage.len & "," &
"nSlotLists=" & $env.nSlotLists & "}"
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc storeStoragesSingleBatch(
proc fetchStorageSlotsImpl(
buddy: SnapBuddyRef;
req: seq[AccountSlotsHeader];
env: SnapPivotRef;
): Future[bool]
): Future[Result[HashSet[NodeKey],void]]
{.async.} =
## Fetch account storage slots and store them in the database.
## Fetch account storage slots and store them in the database, returns
## number of error or -1 for total failure.
let
ctx = buddy.ctx
peer = buddy.peer
@ -124,29 +124,28 @@ proc storeStoragesSingleBatch(
var stoRange = block:
let rc = await buddy.getStorageRanges(stateRoot, req, pivot)
if rc.isErr:
let error = rc.error
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.only.errors):
trace logTxt "fetch error => stop", peer, ctx=buddy.fetchCtx(env),
nReq=req.len, error
return false # all of `req` failed
if await buddy.ctrl.stopAfterSeriousComError(rc.error, buddy.only.errors):
trace logTxt "fetch error", peer, ctx=buddy.fetchCtx(env),
nReq=req.len, error=rc.error
return err() # all of `req` failed
rc.value
# Reset error counts for detecting repeated timeouts, network errors, etc.
buddy.only.errors.resetComError()
var gotSlotLists = stoRange.data.storages.len
if 0 < gotSlotLists:
var
nSlotLists = stoRange.data.storages.len
reject: HashSet[NodeKey]
if 0 < nSlotLists:
# Verify/process storages data and save it to disk
let report = ctx.pool.snapDb.importStorageSlots(peer, stoRange.data)
if 0 < report.len:
if report[^1].slot.isNone:
# Failed to store on database, not much that can be done here
gotSlotLists.dec(report.len - 1) # for logging only
error logTxt "import failed", peer, ctx=buddy.fetchCtx(env),
nSlotLists=gotSlotLists, nReq=req.len, error=report[^1].error
return false # all of `req` failed
nSlotLists=0, nReq=req.len, error=report[^1].error
return err() # all of `req` failed
# Push back error entries to be processed later
for w in report:
@ -155,17 +154,15 @@ proc storeStoragesSingleBatch(
let
inx = w.slot.get
acc = stoRange.data.storages[inx].account
splitOk = w.error in {RootNodeMismatch,RightBoundaryProofFailed}
if w.error == RootNodeMismatch:
# Some pathological case, needs further investigation. For the
# moment, provide partial fetches.
env.storageQueueAppendPartialBisect acc
reject.incl acc.accKey
elif w.error == RightBoundaryProofFailed and
acc.subRange.isSome and 1 < acc.subRange.unsafeGet.len:
# Some pathological case, needs further investigation. For the
# moment, provide a partial fetches.
env.storageQueueAppendPartialBisect acc
if splitOk:
# Some pathological cases need further investigation. For the
# moment, provide partial split requeue. So a different range
# will be unqueued and processed, next time.
env.storageQueueAppendPartialSplit acc
else:
# Reset any partial result (which would be the last entry) to
@ -173,33 +170,24 @@ proc storeStoragesSingleBatch(
# re-fetched completely for this account.
env.storageQueueAppendFull acc
# Last entry might be partial (if any)
#
# Forget about partial result processing if the last partial entry
# was reported because
# * either there was an error processing it
# * or there were some gaps reprored as dangling links
stoRange.data.proof = @[]
# Update local statistics counter for `nSlotLists` counter update
gotSlotLists.dec
error logTxt "processing error", peer, ctx=buddy.fetchCtx(env),
nSlotLists=gotSlotLists, nReqInx=inx, nReq=req.len,
error logTxt "import error", peer, ctx=buddy.fetchCtx(env), splitOk,
nSlotLists, nRejected=reject.len, nReqInx=inx, nReq=req.len,
nDangling=w.dangling.len, error=w.error
# Update statistics
if gotSlotLists == 1 and
req[0].subRange.isSome and
env.fetchStoragePart.hasKey req[0].storageRoot:
# Successful partial request, but not completely done with yet.
gotSlotLists = 0
# Return unprocessed left overs to batch queue. The `req[^1].subRange` is
# the original range requested for the last item (if any.)
let (_,removed) = env.storageQueueUpdate(stoRange.leftOver, reject)
env.nSlotLists.inc(gotSlotLists)
# Update statistics. The variable removed is set if the queue for a partial
# slot range was logically removed. A partial slot range list has one entry.
# So the correction factor for the slot lists statistics is `removed - 1`.
env.nSlotLists.inc(nSlotLists - reject.len + (removed - 1))
# Return unprocessed left overs to batch queue
env.storageQueueAppend(stoRange.leftOver, req[^1].subRange)
return true
# Clean up, un-park successful slots (if any)
for w in stoRange.data.storages:
env.parkedStorage.excl w.account.accKey
return ok(reject)
# ------------------------------------------------------------------------------
# Public functions
@ -214,63 +202,58 @@ proc rangeFetchStorageSlots*(
## each work item on the queue at least once.For partial partial slot range
## items this means in case of success that the outstanding range has become
## at least smaller.
when extraTraceMessages:
trace logTxt "start", peer=buddy.peer, ctx=buddy.fetchCtx(env)
# Fetch storage data and save it on disk. Storage requests are managed by
# request queues for handling full/partial replies and re-fetch issues. For
# all practical puroses, this request queue should mostly be empty.
if 0 < env.fetchStorageFull.len or 0 < env.fetchStoragePart.len:
let
ctx = buddy.ctx
peer {.used.} = buddy.peer
when extraTraceMessages:
trace logTxt "start", peer, ctx=buddy.fetchCtx(env)
for (fetchFn, failMax) in [
(storageQueueFetchFull, storageSlotsFetchFailedFullMax),
(storageQueueFetchPartial, storageSlotsFetchFailedPartialMax)]:
var
ignored: HashSet[NodeKey]
rc = Result[HashSet[NodeKey],void].ok(ignored) # set ok() start value
# Run batch even if `archived` flag is set in order to shrink the queues.
var delayed: seq[AccountSlotsHeader]
while buddy.ctrl.running:
while buddy.ctrl.running and
rc.isOk and
ignored.len <= failMax:
# Pull out the next request list from the queue
let (req, nComplete {.used.}, nPartial {.used.}) =
ctx.storageQueueFetchFull(env)
if req.len == 0:
let reqList = buddy.ctx.fetchFn(env, ignored)
if reqList.len == 0:
when extraTraceMessages:
trace logTxt "queue exhausted", peer=buddy.peer,
ctx=buddy.fetchCtx(env),
isPartQueue=(fetchFn==storageQueueFetchPartial)
break
when extraTraceMessages:
trace logTxt "fetch full", peer, ctx=buddy.fetchCtx(env),
nStorageQuFull=env.fetchStorageFull.len, nReq=req.len,
nPartial, nComplete
if await buddy.storeStoragesSingleBatch(req, env):
for w in req:
env.parkedStorage.excl w.accKey # Done with these items
# Process list, store in database. The `reqList` is re-queued accordingly
# in the `fetchStorageSlotsImpl()` function unless there is an error. In
# the error case, the whole argument list `reqList` is left untouched.
rc = await buddy.fetchStorageSlotsImpl(reqList, env)
if rc.isOk:
for w in rc.value:
ignored.incl w # Ignoring bogus response items
else:
delayed &= req
env.storageQueueAppend delayed
# Ditto for partial queue
delayed.setLen(0)
while buddy.ctrl.running:
# Pull out the next request item from the queue
let rc = env.storageQueueFetchPartial()
if rc.isErr:
break
# Push back unprocessed jobs after error
env.storageQueueAppendPartialSplit reqList
when extraTraceMessages:
let
subRange {.used.} = rc.value.subRange.get
account {.used.} = rc.value.accKey
trace logTxt "fetch partial", peer, ctx=buddy.fetchCtx(env),
nStorageQuPart=env.fetchStoragePart.len, subRange, account
trace logTxt "processed", peer=buddy.peer, ctx=buddy.fetchCtx(env),
isPartQueue=(fetchFn==storageQueueFetchPartial),
nReqList=reqList.len,
nIgnored=ignored.len,
subRange0=reqList[0].subRange.get(otherwise=FullNodeTagRange),
account0=reqList[0].accKey,
rc=(if rc.isOk: rc.value.len else: -1)
# End `while`
# End `for`
if await buddy.storeStoragesSingleBatch(@[rc.value], env):
env.parkedStorage.excl rc.value.accKey # Done with this item
else:
delayed.add rc.value
env.storageQueueAppend delayed
when extraTraceMessages:
trace logTxt "done", peer, ctx=buddy.fetchCtx(env)
when extraTraceMessages:
trace logTxt "done", peer=buddy.peer, ctx=buddy.fetchCtx(env)
# ------------------------------------------------------------------------------
# End

View File

@ -8,19 +8,46 @@
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises: [].}
import
std/sets,
chronicles,
eth/[common, p2p],
stew/[interval_set, keyed_queue],
../../../sync_desc,
"../.."/[constants, range_desc, worker_desc],
../db/[hexary_inspect, snapdb_storage_slots]
{.push raises: [].}
logScope:
topics = "snap-slots"
type
StoQuSlotsKVP* = KeyedQueuePair[Hash256,SnapSlotsQueueItemRef]
## Key-value return code from `SnapSlotsQueue` handler
StoQuPartialSlotsQueue = object
## Return type for `getOrMakePartial()`
stoQu: SnapSlotsQueueItemRef
isCompleted: bool
const
extraTraceMessages = false # or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
template logTxt(info: static[string]): static[string] =
"Storage queue " & info
proc `$`(rs: NodeTagRangeSet): string =
rs.fullPC3
proc `$`(tr: SnapTodoRanges): string =
tr.fullPC3
template noExceptionOops(info: static[string]; code: untyped) =
try:
code
@ -32,23 +59,151 @@ template noExceptionOops(info: static[string]; code: untyped) =
# Private functions
# ------------------------------------------------------------------------------
proc getOrMakePartial(
env: SnapPivotRef;
stoRoot: Hash256;
accKey: NodeKey;
): (SnapSlotsQueueItemRef, bool) =
## Create record on `fetchStoragePart` or return existing one
let rc = env.fetchStoragePart.lruFetch stoRoot
if rc.isOk:
result = (rc.value, true) # Value exists
else:
result = (SnapSlotsQueueItemRef(accKey: accKey), false) # New value
env.parkedStorage.excl accKey # Un-park
discard env.fetchStoragePart.append(stoRoot, result[0])
proc updatePartial(
env: SnapPivotRef; # Current pivot environment
req: AccountSlotsChanged; # Left over account data
): bool = # List entry was added
## Update the range of account argument `req` to the partial slot ranges
## queue.
##
## The function returns `true` if a new list entry was added.
let
accKey = req.account.accKey
stoRoot = req.account.storageRoot
noFullEntry = env.fetchStorageFull.delete(stoRoot).isErr
iv = req.account.subRange.get(otherwise = FullNodeTagRange)
jv = req.newRange.get(otherwise = FullNodeTagRange)
(slots, newEntry, newPartEntry) = block:
let rc = env.fetchStoragePart.lruFetch stoRoot
if rc.isOk:
(rc.value.slots, false, false)
else:
# New entry
let
stoSlo = SnapRangeBatchRef(processed: NodeTagRangeSet.init())
stoItem = SnapSlotsQueueItemRef(accKey: accKey, slots: stoSlo)
discard env.fetchStoragePart.append(stoRoot, stoItem)
stoSlo.unprocessed.init(clear = true)
if result[0].slots.isNil:
result[0].slots = SnapRangeBatchRef(processed: NodeTagRangeSet.init())
result[0].slots.unprocessed.init()
# Initalise ranges
var newItem = false
if iv == FullNodeTagRange:
# New record (probably was a full range, before)
stoSlo.unprocessed.mergeSplit FullNodeTagRange
newItem = noFullEntry
else:
# Restore `processed` range, `iv` was the left over.
discard stoSlo.processed.merge FullNodeTagRange
discard stoSlo.processed.reduce iv
(stoSlo, newItem, true)
# Remove delta state relative to original state
if iv != jv:
# Calculate `iv - jv`
let ivSet = NodeTagRangeSet.init()
discard ivSet.merge iv # Previous range
discard ivSet.reduce jv # Left over range
# Update `processed` by delta range
for w in ivSet.increasing:
discard slots.processed.merge w
# Update left over
slots.unprocessed.merge jv # Left over range
when extraTraceMessages:
trace logTxt "updated partially", accKey, iv, jv,
processed=slots.processed, unprocessed=slots.unprocessed,
noFullEntry, newEntry, newPartEntry
env.parkedStorage.excl accKey # Un-park (if any)
newEntry
proc appendPartial(
env: SnapPivotRef; # Current pivot environment
acc: AccountSlotsHeader; # Left over account data
splitMerge: bool; # Bisect or straight merge
): bool = # List entry was added
## Append to partial queue. The argument range of `acc` is split so that
## the next request of this range will result in the right most half size
## of this very range.
##
## The function returns `true` if a new list entry was added.
let
accKey = acc.accKey
stoRoot = acc.storageRoot
notFull = env.fetchStorageFull.delete(stoRoot).isErr
iv = acc.subRange.get(otherwise = FullNodeTagRange)
rc = env.fetchStoragePart.lruFetch acc.storageRoot
(slots,newEntry) = block:
if rc.isOk:
(rc.value.slots, false)
else:
# Restore missing range
let
stoSlo = SnapRangeBatchRef(processed: NodeTagRangeSet.init())
stoItem = SnapSlotsQueueItemRef(accKey: accKey, slots: stoSlo)
discard env.fetchStoragePart.append(stoRoot, stoItem)
stoSlo.unprocessed.init(clear = true)
discard stoSlo.processed.merge FullNodeTagRange
discard stoSlo.processed.reduce iv
(stoSlo, notFull)
if splitMerge:
slots.unprocessed.mergeSplit iv
else:
slots.unprocessed.merge iv
when extraTraceMessages:
trace logTxt "merged partial", splitMerge, accKey, iv,
processed=slots.processed, unprocessed=slots.unprocessed, newEntry
env.parkedStorage.excl accKey # Un-park (if any)
newEntry
proc reducePartial(
env: SnapPivotRef; # Current pivot environment
acc: AccountSlotsHeader; # Left over account data
): bool = # List entry was removed
## Reduce range from partial ranges list.
##
## The function returns `true` if a list entry was removed.
# So `iv` was not the full range in which case all of `iv` was fully
# processed and there is nothing left.
let
accKey = acc.accKey
stoRoot = acc.storageRoot
notFull = env.fetchStorageFull.delete(stoRoot).isErr
iv = acc.subRange.get(otherwise = FullNodeTagRange)
rc = env.fetchStoragePart.lruFetch stoRoot
var entryRemoved = false
if rc.isErr:
# This was the last missing range anyway. So there is no need to
# re-insert this entry.
entryRemoved = true # Virtually deleted
when extraTraceMessages:
trace logTxt "reduced partial, discarded", accKey, iv, entryRemoved
else:
let slots = rc.value.slots
discard slots.processed.merge iv
if slots.processed.isFull:
env.fetchStoragePart.del stoRoot
result = true
when extraTraceMessages:
trace logTxt "reduced partial, deleted", accKey, iv, entryRemoved
else:
slots.unprocessed.reduce iv
when extraTraceMessages:
trace logTxt "reduced partial, completed", accKey, iv,
processed=slots.processed, unprocessed=slots.unprocessed,
entryRemoved
env.parkedStorage.excl accKey # Un-park (if any)
entryRemoved
# ------------------------------------------------------------------------------
# Public helpers
@ -66,130 +221,150 @@ proc storageQueueAppendFull*(
env: SnapPivotRef;
stoRoot: Hash256;
accKey: NodeKey;
) =
## Append item to `fetchStorageFull` queue
env.fetchStoragePart.del stoRoot # Not a partial item anymore (if any)
env.parkedStorage.excl accKey # Un-park
discard env.fetchStorageFull.append(
stoRoot, SnapSlotsQueueItemRef(accKey: accKey))
): bool
{.discardable.} =
## Append item to `fetchStorageFull` queue. This undoes the effect of the
## function `storageQueueFetchFull()`. The function returns `true` if
## a new entry was added.
let
notPart = env.fetchStoragePart.delete(stoRoot).isErr
stoItem = SnapSlotsQueueItemRef(accKey: accKey)
env.parkedStorage.excl accKey # Un-park (if any)
env.fetchStorageFull.append(stoRoot, stoItem) and notPart
proc storageQueueAppendFull*(
env: SnapPivotRef;
acc: AccountSlotsHeader;
) =
## variant of `storageQueueAppendFull()`
): bool
{.discardable.} =
## Variant of `storageQueueAppendFull()`
env.storageQueueAppendFull(acc.storageRoot, acc.accKey)
proc storageQueueAppendFull*(
env: SnapPivotRef;
kvp: SnapSlotsQueuePair;
proc storageQueueAppendPartialSplit*(
env: SnapPivotRef; # Current pivot environment
acc: AccountSlotsHeader; # Left over account data
): bool
{.discardable.} =
## Merge slot range back into partial queue. This undoes the effect of the
## function `storageQueueFetchPartial()` with the additional feature that
## the argument range of `acc` is split. So some next range request for this
## account will result in the right most half size of this very range just
## inserted.
##
## The function returns `true` if a new entry was added.
env.appendPartial(acc, splitMerge=true)
proc storageQueueAppendPartialSplit*(
env: SnapPivotRef; # Current pivot environment
req: openArray[AccountSlotsHeader]; # List of entries to push back
) =
## variant of `storageQueueAppendFull()`
env.storageQueueAppendFull(kvp.key, kvp.data.accKey)
proc storageQueueAppendPartialBisect*(
env: SnapPivotRef;
acc: AccountSlotsHeader;
) =
## Append to partial queue so that the next fetch range is half the size of
## the current next range.
# Fetch/rotate queue item
let data = env.getOrMakePartial(acc.storageRoot, acc.accKey)[0]
# Derive unprocessed ranges => into lower priority queue
data.slots.unprocessed.clear()
discard data.slots.unprocessed[1].merge(low(NodeTag),high(NodeTag))
for iv in data.slots.processed.increasing:
discard data.slots.unprocessed[1].reduce iv # complements processed ranges
# Prioritise half of first unprocessed range
let rc = data.slots.unprocessed[1].ge()
if rc.isErr:
env.fetchStoragePart.del acc.storageRoot # Oops, nothing to do
return # Done
let halfTag = rc.value.minPt + ((rc.value.maxPt - rc.value.minPt) div 2)
data.slots.unprocessed.merge NodeTagRange.new(rc.value.minPt, halfTag)
## Variant of `storageQueueAppendPartialSplit()`
for w in req:
discard env.appendPartial(w, splitMerge=true)
proc storageQueueAppend*(
env: SnapPivotRef;
reqList: openArray[AccountSlotsHeader];
subRange = none(NodeTagRange); # For a partially fetched slot
env: SnapPivotRef; # Current pivot environment
req: openArray[AccountSlotsHeader]; # List of entries to push back
) =
for n,w in reqList:
env.parkedStorage.excl w.accKey # Un-park
# Only last item (when `n+1 == reqList.len`) may be registered partial
if w.subRange.isNone or n + 1 < reqList.len:
## Append a job list of ranges. This undoes the effect of either function
## `storageQueueFetchFull()` or `storageQueueFetchPartial()`.
for w in req:
let iv = w.subRange.get(otherwise = FullNodeTagRange)
if iv == FullNodeTagRange:
env.storageQueueAppendFull w
else:
env.fetchStorageFull.del w.storageRoot
discard env.appendPartial(w, splitMerge=false)
proc storageQueueAppend*(
env: SnapPivotRef; # Current pivot environment
kvp: StoQuSlotsKVP; # List of entries to push back
) =
## Insert back a full administrative queue record. This function is typically
## used after a record was unlinked vis `storageQueueUnlinkPartialItem()`.
let accKey = kvp.data.accKey
env.parkedStorage.excl accKey # Un-park (if any)
if kvp.data.slots.isNil:
env.fetchStoragePart.del kvp.key # Sanitise data
discard env.fetchStorageFull.append(kvp.key, kvp.data)
when extraTraceMessages:
trace logTxt "re-queued full", accKey
else:
env.fetchStorageFull.del kvp.key # Sanitise data
let rc = env.fetchStoragePart.eq kvp.key
if rc.isErr:
discard env.fetchStoragePart.append(kvp.key, kvp.data)
when extraTraceMessages:
trace logTxt "re-queued partial",
processed=kvp.data.slots.processed,
unprocessed=kvp.data.slots.unprocessed, accKey
else:
# Merge `processed` ranges
for w in kvp.data.slots.processed.increasing:
discard rc.value.slots.processed.merge w
# Intersect `unprocessed` ranges
for w in kvp.data.slots.unprocessed.ivItems:
rc.value.slots.unprocessed.reduce w
when extraTraceMessages:
trace logTxt "re-merged partial",
processed=kvp.data.slots.processed,
unprocessed=kvp.data.slots.unprocessed, accKey
# ------------------------------------------------------------------------------
# Public functions, modify/update/remove queue items
# ------------------------------------------------------------------------------
proc storageQueueUpdate*(
env: SnapPivotRef; # Current pivot environment
req: openArray[AccountSlotsChanged]; # List of entries to push back
ignore: HashSet[NodeKey]; # Ignore accounts with these keys
): (int,int) = # Added, removed
## Similar to `storageQueueAppend()`, this functions appends account header
## entries back into the storage queues. Different to `storageQueueAppend()`,
## this function is aware of changes after partial downloads from the network.
##
## The function returns the tuple `(added, removed)` reflecting the numbers
## of changed list items (accumulated for partial and full range lists.)
for w in req:
if w.account.accKey notin ignore:
let
(data, hasItem) = env.getOrMakePartial(w.storageRoot, w.accKey)
iv = w.subRange.unsafeGet
# Register partial range
if subRange.isSome:
# The `subRange` is the original request, `iv` the uncompleted part
let reqRange = subRange.unsafeGet
if not hasItem:
# Re-initialise book keeping
discard data.slots.processed.merge(low(NodeTag),high(NodeTag))
discard data.slots.processed.reduce reqRange
data.slots.unprocessed.clear()
# Calculate `reqRange - iv` which are the completed ranges
let temp = NodeTagRangeSet.init()
discard temp.merge reqRange
discard temp.reduce iv
# Update `processed` ranges by adding `reqRange - iv`
for w in temp.increasing:
discard data.slots.processed.merge w
# Update `unprocessed` ranges
data.slots.unprocessed.merge reqRange
data.slots.unprocessed.reduce iv
elif hasItem:
# Restore unfetched request
data.slots.unprocessed.merge iv
iv = w.account.subRange.get(otherwise = FullNodeTagRange)
jv = w.newRange.get(otherwise = FullNodeTagRange)
if jv != FullNodeTagRange:
# So `jv` is some rest after processing. Typically this entry is
# related to partial range response message that came with a proof.
if env.updatePartial w:
result[0].inc
when extraTraceMessages:
trace logTxt "update/append partial", accKey=w.account.accKey,
iv, jv, nAdded=result[0], nRemoved=result[1]
elif jv == iv:
if env.storageQueueAppendFull w.account:
result[0].inc
#when extraTraceMessages:
# trace logTxt "update/append full", accKey=w.account.accKey,
# nAdded=result[0], nRemoved=result[1]t
else:
# Makes no sense with a `leftOver` item
env.storageQueueAppendFull w
if env.reducePartial w.account:
result[1].inc
when extraTraceMessages:
trace logTxt "update/reduce partial", accKey=w.account.accKey,
iv, jv, nAdded=result[0], nRemoved=result[1]
# ------------------------------------------------------------------------------
# Public functions, make/create queue items
# ------------------------------------------------------------------------------
proc storageQueueGetOrMakePartial*(
env: SnapPivotRef;
stoRoot: Hash256;
accKey: NodeKey;
): SnapSlotsQueueItemRef =
## Create record on `fetchStoragePart` or return existing one
env.getOrMakePartial(stoRoot, accKey)[0]
proc storageQueueGetOrMakePartial*(
env: SnapPivotRef;
acc: AccountSlotsHeader;
): SnapSlotsQueueItemRef =
## Variant of `storageQueueGetOrMakePartial()`
env.getOrMakePartial(acc.storageRoot, acc.accKey)[0]
# ------------------------------------------------------------------------------
# Public functions, fetch and remove queue items
# Public functions, fetch/remove queue items
# ------------------------------------------------------------------------------
proc storageQueueFetchFull*(
ctx: SnapCtxRef; # Global context
env: SnapPivotRef; # Current pivot environment
): (seq[AccountSlotsHeader],int,int) =
ignore: HashSet[NodeKey]; # Ignore accounts with these keys
): seq[AccountSlotsHeader] =
## Fetch a list of at most `fetchRequestStorageSlotsMax` full work items
## from the batch queue.
##
@ -207,84 +382,118 @@ proc storageQueueFetchFull*(
## number of items moved to the partial queue is returned as third item of
## the return code tuple.
##
var
rcList: seq[AccountSlotsHeader]
nComplete = 0
nPartial = 0
noExceptionOops("getNextSlotItemsFull"):
for kvp in env.fetchStorageFull.nextPairs:
let
getFn = ctx.pool.snapDb.getStorageSlotsFn kvp.data.accKey
rootKey = kvp.key.to(NodeKey)
accItem = AccountSlotsHeader(
accKey: kvp.data.accKey,
storageRoot: kvp.key)
if kvp.data.accKey notin ignore:
let
getFn = ctx.pool.snapDb.getStorageSlotsFn kvp.data.accKey
rootKey = kvp.key.to(NodeKey)
accItem = AccountSlotsHeader(
accKey: kvp.data.accKey,
storageRoot: kvp.key)
# This item will either be returned, discarded, or moved to the partial
# queue subject for healing. So it will be removed from this queue.
env.fetchStorageFull.del kvp.key # OK to delete current link
# This item will eventuallly be returned, discarded, or moved to the
# partial queue (also subject for healing.) So it will be removed from
# the full range lists queue.
env.fetchStorageFull.del kvp.key # OK to delete current link
# Check whether the tree is fully empty
if rootKey.ByteArray32.getFn.len == 0:
# Collect for return
rcList.add accItem
env.parkedStorage.incl accItem.accKey # Registerd as absent
# Check whether the database trie is empty. Otherwise the sub-trie is
# at least partially allocated.
if rootKey.ByteArray32.getFn.len == 0:
# Collect for return
result.add accItem
env.parkedStorage.incl accItem.accKey # Registerd as absent
# Maximal number of items to fetch
if fetchRequestStorageSlotsMax <= rcList.len:
break
else:
# Check how much there is below the top level storage slots node. For
# a small storage trie, this check will be exhaustive.
let stats = getFn.hexaryInspectTrie(rootKey,
suspendAfter = storageSlotsTrieInheritPerusalMax,
maxDangling = 1)
if stats.dangling.len == 0 and stats.resumeCtx.isNil:
# This storage trie could be fully searched and there was no dangling
# node. So it is complete and can be fully removed from the batch.
nComplete.inc # Update for logging
# Maximal number of items to fetch
if fetchRequestStorageSlotsMax <= result.len:
break # stop here
else:
# This item becomes a partially available slot
#let data = env.storageQueueGetOrMakePartial accItem -- notused
nPartial.inc # Update for logging
# Check how much there is below the top level storage slots node. For
# a small storage trie, this check will be exhaustive.
let stats = getFn.hexaryInspectTrie(rootKey,
suspendAfter = storageSlotsTrieInheritPerusalMax,
maxDangling = 1)
(rcList, nComplete, nPartial)
if stats.dangling.len == 0 and stats.resumeCtx.isNil:
# This storage trie could be fully searched and there was no
# dangling node. So it is complete and can be considered done.
# It can be left removed from the batch queue.
env.nSlotLists.inc # Update for logging
else:
# This item must be treated as a partially available slot
env.storageQueueAppendPartialSplit accItem
proc storageQueueFetchPartial*(
env: SnapPivotRef;
): Result[AccountSlotsHeader,void] =
ctx: SnapCtxRef; # Global context (unused here)
env: SnapPivotRef; # Current pivot environment
ignore: HashSet[NodeKey]; # Ignore accounts with these keys
): seq[AccountSlotsHeader] = # At most one item
## Get work item from the batch queue. This will typically return the full
## work item and remove it from the queue unless the parially completed
## range is fragmented.
block findItem:
for kvp in env.fetchStoragePart.nextPairs:
# Extract range and return single item request queue
let rc = kvp.data.slots.unprocessed.fetch(maxLen = high(UInt256))
for kvp in env.fetchStoragePart.nextPairs:
# Extract range and return single item request queue
let
slots = kvp.data.slots
accKey = kvp.data.accKey
accepted = accKey notin ignore
if accepted:
let rc = slots.unprocessed.fetch()
if rc.isOk:
result = ok(AccountSlotsHeader(
accKey: kvp.data.accKey,
let reqItem = AccountSlotsHeader(
accKey: accKey,
storageRoot: kvp.key,
subRange: some rc.value))
subRange: some rc.value)
# Delete from batch queue if the `unprocessed` range set becomes empty
# and the `processed` set is the complemet of `rc.value`.
if kvp.data.slots.unprocessed.isEmpty and
high(UInt256) - rc.value.len <= kvp.data.slots.processed.total:
env.fetchStoragePart.del kvp.key
env.parkedStorage.incl kvp.data.accKey # Temporarily parked
return
# Delete from batch queue if the `unprocessed` range has become empty.
if slots.unprocessed.isEmpty and
high(UInt256) - rc.value.len <= slots.processed.total:
# If this is all the rest, the record can be deleted from the todo
# list. If not fully downloaded at a later stage, a new record will
# be created on-the-fly.
env.parkedStorage.incl accKey # Temporarily parked
env.fetchStoragePart.del kvp.key # Last one not needed
else:
# Otherwise rotate queue
break findItem
# End for()
# Otherwise accept and update/rotate queue. Note that `lruFetch`
# does leave the item on the queue.
discard env.fetchStoragePart.lruFetch reqItem.storageRoot
return err()
when extraTraceMessages:
trace logTxt "fetched partial",
processed=slots.processed, unprocessed=slots.unprocessed,
accKey, iv=rc.value
return @[reqItem] # done
# Rotate queue item
discard env.fetchStoragePart.lruFetch result.value.storageRoot
when extraTraceMessages:
trace logTxt "rejected partial", accepted,
processed=slots.processed, unprocessed=slots.unprocessed, accKey
# End for()
proc storageQueueUnlinkPartialItem*(
env: SnapPivotRef; # Current pivot environment
ignore: HashSet[NodeKey]; # Ignore accounts with these keys
): Result[StoQuSlotsKVP,void] =
## Fetch an item from the partial list. This item will be removed from the
## list and ca be re-queued via `storageQueueAppend()`.
for kvp in env.fetchStoragePart.nextPairs:
# Extract range and return single item request queue
let
accKey = kvp.data.accKey
accepted = accKey notin ignore
if accepted:
env.parkedStorage.incl accKey # Temporarily parked
env.fetchStoragePart.del kvp.key # Last one not needed
when extraTraceMessages:
trace logTxt "unlink partial item", processed=kvp.data.slots.processed,
unprocessed=kvp.data.slots.unprocessed, accKey
return ok(kvp) # done
when extraTraceMessages:
trace logTxt "unlink partial skip", accepted,
processed=kvp.data.slots.processed,
unprocessed=kvp.data.slots.unprocessed, accKey
# End for()
# ------------------------------------------------------------------------------
# End

View File

@ -72,10 +72,10 @@ proc `$`(node: NodeSpecs): string =
node.partialPath.toHex
proc `$`(rs: NodeTagRangeSet): string =
rs.fullFactor.toPC(3)
rs.fullPC3
proc `$`(iv: NodeTagRange): string =
iv.fullFactor.toPC(3)
iv.fullPC3
proc toPC(w: openArray[NodeSpecs]; n: static[int] = 3): string =
let sumUp = w.mapIt(it.hexaryEnvelope.len).foldl(a+b, 0.u256)

View File

@ -32,9 +32,6 @@ type
## there is only a partial list of slots to fetch, the queue entry is
## stored left-most for easy access.
SnapSlotsQueuePair* = KeyedQueuePair[Hash256,SnapSlotsQueueItemRef]
## Key-value return code from `SnapSlotsQueue` handler
SnapSlotsQueueItemRef* = ref object
## Storage slots request data. This entry is similar to `AccountSlotsHeader`
## where the optional `subRange` interval has been replaced by an interval
@ -71,7 +68,7 @@ type
nSlotLists*: uint64 ## Imported # of account storage tries
# Mothballing, ready to be swapped into newer pivot record
storageAccounts*: SnapAccountsList ## Accounts with missing stortage slots
storageAccounts*: SnapAccountsList ## Accounts with missing storage slots
archived*: bool ## Not latest pivot, anymore
SnapPivotTable* = KeyedQueue[Hash256,SnapPivotRef]
@ -142,7 +139,7 @@ proc pivotAccountsCoverage100PcRollOver*(ctx: SnapCtxRef) =
# Public helpers: SnapTodoRanges
# ------------------------------------------------------------------------------
proc init*(q: var SnapTodoRanges) =
proc init*(q: var SnapTodoRanges; clear = false) =
## Populate node range sets with maximal range in the first range set. This
## kind of pair or interval sets is managed as follows:
## * As long as possible, fetch and merge back intervals on the first set.
@ -152,7 +149,8 @@ proc init*(q: var SnapTodoRanges) =
## is considered after the prioitised intervals are exhausted.
q[0] = NodeTagRangeSet.init()
q[1] = NodeTagRangeSet.init()
discard q[0].merge(low(NodeTag),high(NodeTag))
if not clear:
discard q[0].merge FullNodeTagRange
proc clear*(q: var SnapTodoRanges) =
## Reset argument range sets empty.
@ -167,8 +165,12 @@ proc merge*(q: var SnapTodoRanges; iv: NodeTagRange) =
proc mergeSplit*(q: var SnapTodoRanges; iv: NodeTagRange) =
## Ditto w/priorities partially reversed
if 1 < iv.len:
if iv.len == 1:
discard q[0].reduce iv
discard q[1].merge iv
else:
let
# note that (`iv.len` == 0) => (`iv` == `FullNodeTagRange`)
midPt = iv.minPt + ((iv.maxPt - iv.minPt) shr 1)
iv1 = NodeTagRange.new(iv.minPt, midPt)
iv2 = NodeTagRange.new(midPt + 1.u256, iv.maxPt)
@ -176,9 +178,6 @@ proc mergeSplit*(q: var SnapTodoRanges; iv: NodeTagRange) =
discard q[1].merge iv1
discard q[0].merge iv2
discard q[1].reduce iv2
else:
discard q[0].reduce iv
discard q[1].merge iv
proc reduce*(q: var SnapTodoRanges; iv: NodeTagRange) =
@ -194,8 +193,9 @@ iterator ivItems*(q: var SnapTodoRanges): NodeTagRange =
yield iv
proc fetch*(q: var SnapTodoRanges; maxLen: UInt256): Result[NodeTagRange,void] =
## Fetch interval from node ranges with maximal size `maxLen`
proc fetch*(q: var SnapTodoRanges; maxLen = 0.u256): Result[NodeTagRange,void] =
## Fetch interval from node ranges with maximal size `maxLen`, where
## `0.u256` is interpreted as `2^256`.
# Swap batch queues if the first one is empty
if q[0].isEmpty:
@ -207,9 +207,17 @@ proc fetch*(q: var SnapTodoRanges; maxLen: UInt256): Result[NodeTagRange,void] =
return err()
let
val = rc.value
iv = if 0 < val.len and val.len <= maxLen: val # val.len==0 => 2^256
else: NodeTagRange.new(val.minPt, val.minPt + (maxLen - 1.u256))
jv = rc.value
iv = block:
if maxLen == 0 or (0 < jv.len and jv.len <= maxLen):
jv
else:
# Note that either:
# (`jv.len` == 0) => (`jv` == `FullNodeTagRange`) => `jv.minPt` == 0
# or
# (`maxLen` < `jv.len`) => (`jv.minPt`+`maxLen` <= `jv.maxPt`)
NodeTagRange.new(jv.minPt, jv.minPt + maxLen)
discard q[0].reduce(iv)
ok(iv)

View File

@ -13,19 +13,19 @@
##
## Public descriptors
{.push raises: [].}
import
#std/options,
eth/[common, p2p],
../core/chain,
../db/db_chain,
./handlers
./handlers/eth
export
chain,
db_chain
{.push raises: [].}
type
BuddyRunState* = enum
Running = 0 ## Running, default state
@ -121,6 +121,13 @@ proc `stopped=`*(ctrl: BuddyCtrlRef; value: bool) =
else:
discard
proc `forceRun=`*(ctrl: BuddyCtrlRef; value: bool) =
## Setter, gets out of `Zombie` jail/locked state with `true argument.
if value:
ctrl.runState = Running
else:
ctrl.stopped = true
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------