Prettify logging for snap sync environment (#1278)

* Multiple storage batches at a time

why:
  Previously only some small portion was processed at a time so the peer
  might have gone when the process was resumed at a later time

* Renamed some field of snap/1 protocol response object

why:
  Documented as `slots` is in reality a per-account list of slot lists. So
  the new name `slotLists` better reflects the nature of the beast.

* Some minor healing re-arrangements for storage slot tries

why;
  Resolving all complete inherited slots tries first in sync mode keeps
  the worker queues smaller which improves logging.

* Prettify logging, comments update etc.
This commit is contained in:
Jordan Hrycaj 2022-10-21 20:29:42 +01:00 committed by GitHub
parent 425b3ae322
commit 82ceec313d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 255 additions and 145 deletions

View File

@ -312,7 +312,7 @@ p2pProtocol snap1(version = 1,
# User message 0x03: StorageRanges.
# Note: See comments in this file for a list of Geth quirks to expect.
proc storageRanges(peer: Peer, slots: openArray[seq[SnapStorage]],
proc storageRanges(peer: Peer, slotLists: openArray[seq[SnapStorage]],
proof: SnapStorageProof)
# User message 0x04: GetByteCodes.

View File

@ -200,7 +200,7 @@ proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater =
uSum += fill
uSqSum += fill * fill
let sLen = kvp.data.nStorage.float
let sLen = kvp.data.nSlotLists.float
sSum += sLen
sSqSum += sLen * sLen
@ -225,7 +225,7 @@ proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater =
pivotBlock: pivotBlock,
nQueues: ctx.data.pivotTable.len,
nAccounts: meanStdDev(aSum, aSqSum, count),
nStorage: meanStdDev(sSum, sSqSum, count),
nSlotLists: meanStdDev(sSum, sSqSum, count),
accountsFill: (accFill[0], accFill[1], accCoverage),
storageQueue: meanStdDev(wSum, wSqSum, count))
@ -342,6 +342,11 @@ proc runPool*(buddy: SnapBuddyRef, last: bool) =
if env.fetchStorage.len == 0:
env.serialSync = true
if extraTraceMessages:
trace "Checked for pivot DB completeness",
nAccounts=env.nAccounts, accountsDone=env.accountsDone,
nSlotLists=env.nSlotLists, storageDone=env.serialSync
proc runMulti*(buddy: SnapBuddyRef) {.async.} =
## This peer worker is invoked if the `buddy.ctrl.multiOk` flag is set

View File

@ -29,7 +29,7 @@ type
# slotData*: Blob
#
# SnapStorageRanges* = object
# slots*: seq[seq[SnapStorage]]
# slotLists*: seq[seq[SnapStorage]]
# proof*: SnapStorageProof
GetStorageRanges* = object
@ -111,16 +111,16 @@ proc getStorageRanges*(
trace trSnapRecvTimeoutWaiting & "for reply to GetStorageRanges", peer,
nAccounts
return err(ComResponseTimeout)
if nAccounts < rc.value.get.slots.len:
if nAccounts < rc.value.get.slotLists.len:
# Ooops, makes no sense
return err(ComTooManyStorageSlots)
rc.value.get
let
nSlots = snStoRanges.slots.len
nSlotLists = snStoRanges.slotLists.len
nProof = snStoRanges.proof.len
if nSlots == 0:
if nSlotLists == 0:
# github.com/ethereum/devp2p/blob/master/caps/snap.md#getstorageranges-0x02:
#
# Notes:
@ -130,7 +130,7 @@ proc getStorageRanges*(
# the responsibility of the caller to query an state not older than 128
# blocks; and the caller is expected to only ever query existing accounts.
trace trSnapRecvReceived & "empty StorageRanges", peer,
nAccounts, nSlots, nProof, stateRoot, firstAccount=accounts[0].accHash
nAccounts, nSlotLists, nProof, stateRoot, firstAccount=accounts[0].accHash
return err(ComNoStorageForAccounts)
# Assemble return structure for given peer response
@ -138,21 +138,22 @@ proc getStorageRanges*(
# Filter remaining `slots` responses:
# * Accounts for empty ones go back to the `leftOver` list.
for n in 0 ..< nSlots:
for n in 0 ..< nSlotLists:
# Empty data for a slot indicates missing data
if snStoRanges.slots[n].len == 0:
if snStoRanges.slotLists[n].len == 0:
dd.leftOver.add accounts[n]
else:
dd.data.storages.add AccountSlots(
account: accounts[n], # known to be no fewer accounts than slots
data: snStoRanges.slots[n])
data: snStoRanges.slotLists[n])
# Complete the part that was not answered by the peer
if nProof == 0:
dd.leftOver = dd.leftOver & accounts[nSlots ..< nAccounts] # empty slice ok
# assigning empty slice is ok
dd.leftOver = dd.leftOver & accounts[nSlotLists ..< nAccounts]
# Ok, we have a proof now. What was it to be proved?
elif snStoRanges.slots[^1].len == 0:
elif snStoRanges.slotLists[^1].len == 0:
return err(ComNoDataForProof) # Now way to prove an empty node set
else:
@ -165,12 +166,13 @@ proc getStorageRanges*(
if respTop < reqTop:
dd.leftOver.add AccountSlotsHeader(
subRange: some(NodeTagRange.new(respTop + 1.u256, reqTop)),
accHash: accounts[nSlots-1].accHash,
storageRoot: accounts[nSlots-1].storageRoot)
dd.leftOver = dd.leftOver & accounts[nSlots ..< nAccounts] # empty slice ok
accHash: accounts[nSlotLists-1].accHash,
storageRoot: accounts[nSlotLists-1].storageRoot)
# assigning empty slice isa ok
dd.leftOver = dd.leftOver & accounts[nSlotLists ..< nAccounts]
trace trSnapRecvReceived & "StorageRanges", peer, nAccounts, nSlots, nProof,
nLeftOver=dd.leftOver.len, stateRoot
trace trSnapRecvReceived & "StorageRanges", peer, nAccounts, nSlotLists,
nProof, nLeftOver=dd.leftOver.len, stateRoot
return ok(dd)

View File

@ -332,22 +332,21 @@ proc inspectAccountsTrie*(
stats = ps.hexaDb.hexaryInspectTrie(ps.root, pathList)
block checkForError:
let error = block:
if stats.stopped:
TrieLoopAlert
elif stats.level == 0:
TrieIsEmpty
else:
break checkForError
trace "Inspect account trie failed", peer, nPathList=pathList.len,
nDangling=stats.dangling.len, stoppedAt=stats.level, error
var error = TrieIsEmpty
if stats.stopped:
error = TrieLoopAlert
trace "Inspect account trie failed", peer, nPathList=pathList.len,
nDangling=stats.dangling.len, stoppedAt=stats.level, error
elif 0 < stats.level:
break checkForError
if ignoreError:
return ok(stats)
return err(error)
when extraTraceMessages:
trace "Inspect account trie ok", peer, nPathList=pathList.len,
nDangling=stats.dangling.len, level=stats.level
#when extraTraceMessages:
# trace "Inspect account trie ok", peer, nPathList=pathList.len,
# nDangling=stats.dangling.len, level=stats.level
return ok(stats)
proc inspectAccountsTrie*(

View File

@ -95,28 +95,31 @@ proc persistentStorageSlots(
proc collectStorageSlots(
peer: Peer;
slots: seq[SnapStorage];
slotLists: seq[SnapStorage];
): Result[seq[RLeafSpecs],HexaryDbError]
{.gcsafe, raises: [Defect, RlpError].} =
## Similar to `collectAccounts()`
var rcSlots: seq[RLeafSpecs]
if slots.len != 0:
if slotLists.len != 0:
# Add initial account
rcSlots.add RLeafSpecs(
pathTag: slots[0].slotHash.to(NodeTag),
payload: slots[0].slotData)
pathTag: slotLists[0].slotHash.to(NodeTag),
payload: slotLists[0].slotData)
# Veify & add other accounts
for n in 1 ..< slots.len:
let nodeTag = slots[n].slotHash.to(NodeTag)
for n in 1 ..< slotLists.len:
let nodeTag = slotLists[n].slotHash.to(NodeTag)
if nodeTag <= rcSlots[^1].pathTag:
let error = SlotsNotSrictlyIncreasing
trace "collectStorageSlots()", peer, item=n, slots=slots.len, error
trace "collectStorageSlots()", peer, item=n,
nSlotLists=slotLists.len, error
return err(error)
rcSlots.add RLeafSpecs(pathTag: nodeTag, payload: slots[n].slotData)
rcSlots.add RLeafSpecs(
pathTag: nodeTag,
payload: slotLists[n].slotData)
ok(rcSlots)
@ -206,52 +209,52 @@ proc importStorageSlots*(
nItems = data.storages.len
sTop = nItems - 1
var
slot: Option[int]
itemInx: Option[int]
if 0 <= sTop:
try:
for n in 0 ..< sTop:
# These ones never come with proof data
slot = some(n)
itemInx = some(n)
let rc = ps.importStorageSlots(data.storages[n], @[])
if rc.isErr:
result.add HexaryNodeReport(slot: slot, error: rc.error)
trace "Storage slots item fails", peer, inx=n, nItems,
slots=data.storages[n].data.len, proofs=0,
result.add HexaryNodeReport(slot: itemInx, error: rc.error)
trace "Storage slots item fails", peer, itemInx=n, nItems,
nSlots=data.storages[n].data.len, proofs=0,
error=rc.error, nErrors=result.len
# Final one might come with proof data
block:
slot = some(sTop)
itemInx = some(sTop)
let rc = ps.importStorageSlots(data.storages[sTop], data.proof)
if rc.isErr:
result.add HexaryNodeReport(slot: slot, error: rc.error)
trace "Storage slots last item fails", peer, inx=sTop, nItems,
slots=data.storages[sTop].data.len, proofs=data.proof.len,
result.add HexaryNodeReport(slot: itemInx, error: rc.error)
trace "Storage slots last item fails", peer, itemInx=sTop, nItems,
nSlots=data.storages[sTop].data.len, proofs=data.proof.len,
error=rc.error, nErrors=result.len
# Store to disk
if persistent and 0 < ps.hexaDb.tab.len:
slot = none(int)
itemInx = none(int)
let rc = ps.hexaDb.persistentStorageSlots(ps)
if rc.isErr:
result.add HexaryNodeReport(slot: slot, error: rc.error)
result.add HexaryNodeReport(slot: itemInx, error: rc.error)
except RlpError:
result.add HexaryNodeReport(slot: slot, error: RlpEncoding)
trace "Storage slot node error", peer, slot, nItems,
slots=data.storages[sTop].data.len, proofs=data.proof.len,
result.add HexaryNodeReport(slot: itemInx, error: RlpEncoding)
trace "Storage slot node error", peer, itemInx, nItems,
nSlots=data.storages[sTop].data.len, proofs=data.proof.len,
error=RlpEncoding, nErrors=result.len
except KeyError as e:
raiseAssert "Not possible @ importStorages: " & e.msg
except OSError as e:
result.add HexaryNodeReport(slot: slot, error: OSErrorException)
trace "Import storage slots exception", peer, slot, nItems,
result.add HexaryNodeReport(slot: itemInx, error: OSErrorException)
trace "Import storage slots exception", peer, itemInx, nItems,
name=($e.name), msg=e.msg, nErrors=result.len
when extraTraceMessages:
if result.len == 0:
trace "Storage slots imported", peer, nItems,
slots=data.storages.len, proofs=data.proof.len
nSlotLists=data.storages.len, proofs=data.proof.len
proc importStorageSlots*(
pv: SnapDbRef; ## Base descriptor on `BaseChainDB`
@ -363,22 +366,21 @@ proc inspectStorageSlotsTrie*(
stats = ps.hexaDb.hexaryInspectTrie(ps.root, pathList)
block checkForError:
let error = block:
if stats.stopped:
TrieLoopAlert
elif stats.level == 0:
TrieIsEmpty
else:
break checkForError
trace "Inspect storage slots trie failed", peer, nPathList=pathList.len,
nDangling=stats.dangling.len, stoppedAt=stats.level, error
var error = TrieIsEmpty
if stats.stopped:
error = TrieLoopAlert
trace "Inspect storage slots trie failed", peer, nPathList=pathList.len,
nDangling=stats.dangling.len, stoppedAt=stats.level
elif 0 < stats.level:
break checkForError
if ignoreError:
return ok(stats)
return err(error)
when extraTraceMessages:
trace "Inspect storage slots trie ok", peer, nPathList=pathList.len,
nDangling=stats.dangling.len, level=stats.level
#when extraTraceMessages:
# trace "Inspect storage slots trie ok", peer, nPathList=pathList.len,
# nDangling=stats.dangling.len, level=stats.level
return ok(stats)
proc inspectStorageSlotsTrie*(

View File

@ -133,12 +133,12 @@ proc healingCtx(buddy: SnapBuddyRef): string =
let
ctx = buddy.ctx
env = buddy.data.pivotEnv
"[" &
"{" &
"nAccounts=" & $env.nAccounts & "," &
("covered=" & env.fetchAccounts.unprocessed.emptyFactor.toPC(0) & "/" &
ctx.data.coveredAccounts.fullFactor.toPC(0)) & "," &
"nCheckNodes=" & $env.fetchAccounts.checkNodes.len & "," &
"nMissingNodes=" & $env.fetchAccounts.missingNodes.len & "]"
"nMissingNodes=" & $env.fetchAccounts.missingNodes.len & "}"
# ------------------------------------------------------------------------------
# Private functions

View File

@ -45,15 +45,40 @@ proc healingCtx(
): string =
let
slots = kvp.data.slots
"[" &
"{" &
"covered=" & slots.unprocessed.emptyFactor.toPC(0) &
"nCheckNodes=" & $slots.checkNodes.len & "," &
"nMissingNodes=" & $slots.missingNodes.len & "]"
"nMissingNodes=" & $slots.missingNodes.len & "}"
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc acceptWorkItemAsIs(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair;
): Result[bool, HexaryDbError] =
## Check whether this work item is done and the corresponding storage trie
## can be completely inherited.
if kvp.data.inherit:
let
ctx = buddy.ctx
peer = buddy.peer
db = ctx.data.snapDb
accHash = kvp.data.accHash
storageRoot = kvp.key.to(Hash256)
rc = db.inspectStorageSlotsTrie(peer, accHash, storageRoot)
# Check whether the hexary trie is complete
if rc.isOk:
return ok(rc.value.dangling.len == 0)
return err(rc.error)
ok(false)
proc updateMissingNodesList(
buddy: SnapBuddyRef;
kvp: SnapSlotsQueuePair) =
@ -64,6 +89,7 @@ proc updateMissingNodesList(
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
env = buddy.data.pivotEnv
accHash = kvp.data.accHash
storageRoot = kvp.key.to(Hash256)
slots = kvp.data.slots
@ -71,7 +97,8 @@ proc updateMissingNodesList(
nodes: seq[Blob]
when extraTraceMessages:
trace "Start storage slots healing", peer, ctx=buddy.healingCtx(kvp)
trace "Start storage slots healing", peer, ctx=buddy.healingCtx(kvp),
nSlotLists=env.nSlotLists, nStorageQueue=env.fetchStorage.len
for slotKey in slots.missingNodes:
let rc = db.getStorageSlotsNodeKey(peer, accHash, storageRoot, slotKey)
@ -96,6 +123,7 @@ proc appendMoreDanglingNodesToMissingNodesList(
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
env = buddy.data.pivotEnv
accHash = kvp.data.accHash
storageRoot = kvp.key.to(Hash256)
slots = kvp.data.slots
@ -106,7 +134,8 @@ proc appendMoreDanglingNodesToMissingNodesList(
if rc.isErr:
when extraTraceMessages:
error "Storage slots healing failed => stop", peer,
ctx=buddy.healingCtx(kvp), error=rc.error
ctx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists,
nStorageQueue=env.fetchStorage.len, error=rc.error
# Attempt to switch peers, there is not much else we can do here
buddy.ctrl.zombie = true
return false
@ -128,6 +157,7 @@ proc getMissingNodesFromNetwork(
let
ctx = buddy.ctx
peer = buddy.peer
env = buddy.data.pivotEnv
accHash = kvp.data.accHash
storageRoot = kvp.key.to(Hash256)
slots = kvp.data.slots
@ -159,12 +189,14 @@ proc getMissingNodesFromNetwork(
discard
when extraTraceMessages:
trace "Error fetching storage slots nodes for healing => stop", peer,
ctx=buddy.healingCtx(kvp), error
ctx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists,
nStorageQueue=env.fetchStorage.len, error
else:
discard
when extraTraceMessages:
trace "Error fetching storage slots nodes for healing", peer,
ctx=buddy.healingCtx(kvp), error
ctx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists,
nStorageQueue=env.fetchStorage.len, error
return @[]
@ -179,6 +211,7 @@ proc kvStorageSlotsLeaf(
## Read leaf node from persistent database (if any)
let
peer = buddy.peer
env = buddy.data.pivotEnv
nodeRlp = rlpFromBytes node
(_,prefix) = hexPrefixDecode partialPath
@ -189,7 +222,8 @@ proc kvStorageSlotsLeaf(
when extraTraceMessages:
trace "Isolated node path for healing => ignored", peer,
ctx=buddy.healingCtx(kvp)
ctx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists,
nStorageQueue=env.fetchStorage.len
proc registerStorageSlotsLeaf(
@ -217,8 +251,9 @@ proc registerStorageSlotsLeaf(
discard ivSet.reduce(pt,pt)
when extraTraceMessages:
trace "Isolated storage slot for healing",
peer, ctx=buddy.healingCtx(kvp), slotKey=pt
trace "Isolated storage slot for healing", peer,
ctx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists,
nStorageQueue=env.fetchStorage.len, slotKey=pt
# ------------------------------------------------------------------------------
# Private functions: do the healing for one work item (sub-trie)
@ -235,6 +270,7 @@ proc storageSlotsHealing(
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
env = buddy.data.pivotEnv
accHash = kvp.data.accHash
slots = kvp.data.slots
@ -261,13 +297,16 @@ proc storageSlotsHealing(
if 0 < report.len and report[^1].slot.isNone:
# Storage error, just run the next lap (not much else that can be done)
error "Storage slots healing, error updating persistent database", peer,
ctx=buddy.healingCtx(kvp), nNodes=nodesData.len, error=report[^1].error
ctx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists,
nStorageQueue=env.fetchStorage.len, nNodes=nodesData.len,
error=report[^1].error
slots.missingNodes = slots.missingNodes & nodesData
return false
when extraTraceMessages:
trace "Storage slots healing, nodes merged into database", peer,
ctx=buddy.healingCtx(kvp), nNodes=nodesData.len
ctx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists,
nStorageQueue=env.fetchStorage.len, nNodes=nodesData.len
# Filter out error and leaf nodes
for w in report:
@ -295,7 +334,9 @@ proc storageSlotsHealing(
slots.checkNodes.add nodePath
when extraTraceMessages:
trace "Storage slots healing job done", peer, ctx=buddy.healingCtx(kvp)
trace "Storage slots healing job done", peer,
ctx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists,
nStorageQueue=env.fetchStorage.len
proc healingIsComplete(
@ -312,6 +353,7 @@ proc healingIsComplete(
ctx = buddy.ctx
db = ctx.data.snapDb
peer = buddy.peer
env = buddy.data.pivotEnv
accHash = kvp.data.accHash
storageRoot = kvp.key.to(Hash256)
@ -321,7 +363,9 @@ proc healingIsComplete(
if rc.isErr:
# Oops, not much we can do here (looping trie?)
error "Problem inspecting storage trie", peer, storageRoot, error=rc.error
error "Problem inspecting storage trie", peer,
nSlotLists=env.nSlotLists, nStorageQueue=env.fetchStorage.len,
storageRoot, error=rc.error
return false
# Check whether the hexary trie can be inherited as-is.
@ -355,6 +399,7 @@ proc healStoragesDb*(buddy: SnapBuddyRef) {.async.} =
env = buddy.data.pivotEnv
var
toBeHealed: seq[SnapSlotsQueuePair]
nAcceptedAsIs = 0
# Search the current slot item batch list for items to complete via healing
for kvp in env.fetchStorage.nextPairs:
@ -369,31 +414,52 @@ proc healStoragesDb*(buddy: SnapBuddyRef) {.async.} =
if slots.isNil or slots.unprocessed.emptyFactor < healSlorageSlotsTrigger:
continue
# Remove `kvp` work item from the queue object (which is allowed within a
# `for` loop over a `KeyedQueue` object type.)
env.fetchStorage.del(kvp.key)
# With some luck, the `kvp` work item refers to a complete storage trie
# that can be be accepted as-is in wich case `kvp` can be just dropped.
block:
let rc = buddy.acceptWorkItemAsIs(kvp)
if rc.isOk and rc.value:
env.nSlotLists.inc
nAcceptedAsIs.inc # for logging
continue # dropping `kvp`
# Add to local batch to be processed, below
env.fetchStorage.del(kvp.key) # ok to delete this item from batch queue
toBeHealed.add kvp # to be held in local queue
toBeHealed.add kvp
if maxStoragesHeal <= toBeHealed.len:
break
when extraTraceMessages:
let nToBeHealed = toBeHealed.len
if 0 < nToBeHealed:
trace "Processing storage healing items", peer, nToBeHealed
# Run against local batch
for n in 0 ..< toBeHealed.len:
let
kvp = toBeHealed[n]
isComplete = await buddy.healingIsComplete(kvp)
if isComplete:
env.nStorage.inc
else:
env.fetchStorage.merge kvp
let nHealerQueue = toBeHealed.len
if 0 < nHealerQueue:
when extraTraceMessages:
trace "Processing storage healing items", peer,
nSlotLists=env.nSlotLists, nStorageQueue=env.fetchStorage.len,
nHealerQueue, nAcceptedAsIs
if buddy.ctrl.stopped:
# Oops, peer has gone
env.fetchStorage.merge toBeHealed[n+1 ..< toBeHealed.len]
break
for n in 0 ..< toBeHealed.len:
let
kvp = toBeHealed[n]
isComplete = await buddy.healingIsComplete(kvp)
if isComplete:
env.nSlotLists.inc
nAcceptedAsIs.inc
else:
env.fetchStorage.merge kvp
if buddy.ctrl.stopped:
# Oops, peer has gone
env.fetchStorage.merge toBeHealed[n+1 ..< toBeHealed.len]
break
when extraTraceMessages:
if 0 < nHealerQueue or 0 < nAcceptedAsIs:
trace "Done storage healing items", peer,
nSlotLists=env.nSlotLists, nStorageQueue=env.fetchStorage.len,
nHealerQueue, nAcceptedAsIs
# ------------------------------------------------------------------------------
# End

View File

@ -106,8 +106,8 @@ proc getNextSlotItems(buddy: SnapBuddyRef): seq[AccountSlotsHeader] =
when extraTraceMessages:
trace "Prepare fetching partial storage slots", peer,
nStorageQueue=env.fetchStorage.len, subRange=rc.value,
account=reqData.accHash.to(NodeTag)
nSlotLists=env.nSlotLists, nStorageQueue=env.fetchStorage.len,
nToProcess=1, subRange=rc.value, account=reqData.accHash.to(NodeTag)
return @[AccountSlotsHeader(
accHash: reqData.accHash,
@ -118,8 +118,8 @@ proc getNextSlotItems(buddy: SnapBuddyRef): seq[AccountSlotsHeader] =
reqData.slots = nil
discard env.fetchStorage.lruFetch(reqKey)
# So there are no partial ranges (aka `slots`) anymore. Assemble maximal
# request queue.
# So there are no partial slot ranges anymore. Assemble maximal request queue.
var nInherit = 0
for kvp in env.fetchStorage.nextPairs:
let it = AccountSlotsHeader(
accHash: kvp.data.accHash,
@ -129,9 +129,7 @@ proc getNextSlotItems(buddy: SnapBuddyRef): seq[AccountSlotsHeader] =
if kvp.data.inherit or
ctx.data.snapDb.haveStorageSlotsData(peer, it.accHash, it.storageRoot):
kvp.data.inherit = true
when extraTraceMessages:
trace "Inheriting storage slots", peer,
nStorageQueue=env.fetchStorage.len, account=it.accHash.to(NodeTag)
nInherit.inc # update for logging
continue
result.add it
@ -141,11 +139,12 @@ proc getNextSlotItems(buddy: SnapBuddyRef): seq[AccountSlotsHeader] =
if maxStoragesFetch <= result.len:
break
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
when extraTraceMessages:
trace "Fetch account storage slots", peer, nSlotLists=env.nSlotLists,
nStorageQueue=env.fetchStorage.len, nToProcess=result.len, nInherit
proc storeStorages*(buddy: SnapBuddyRef) {.async.} =
proc storeStoragesSingleBatch(buddy: SnapBuddyRef) {.async.} =
## Fetch account storage slots and store them in the database.
let
ctx = buddy.ctx
@ -172,21 +171,22 @@ proc storeStorages*(buddy: SnapBuddyRef) {.async.} =
if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors):
discard
trace "Error fetching storage slots => stop", peer,
nReq=req.len, nStorageQueue=env.fetchStorage.len, error
nSlotLists=env.nSlotLists, nReq=req.len,
nStorageQueue=env.fetchStorage.len, error
return
rc.value
# Reset error counts for detecting repeated timeouts
buddy.data.errors.nTimeouts = 0
var gotStorage = stoRange.data.storages.len
var gotSlotLists = stoRange.data.storages.len
when extraTraceMessages:
trace "Fetched storage slots", peer, gotStorage,
nLeftOvers=stoRange.leftOver.len, nReq=req.len,
nStorageQueue=env.fetchStorage.len
#when extraTraceMessages:
# trace "Fetched storage slots", peer,
# nSlotLists=env.nSlotLists, nSlotLists=gotSlotLists, nReq=req.len,
# nStorageQueue=env.fetchStorage.len, nLeftOvers=stoRange.leftOver.len
if 0 < gotStorage:
if 0 < gotSlotLists:
# Verify/process storages data and save it to disk
let report = ctx.data.snapDb.importStorageSlots(peer, stoRange.data)
@ -196,11 +196,11 @@ proc storeStorages*(buddy: SnapBuddyRef) {.async.} =
if report[^1].slot.isNone:
# Failed to store on database, not much that can be done here
env.fetchStorage.merge req
gotStorage.dec(report.len - 1) # for logging only
gotSlotLists.dec(report.len - 1) # for logging only
error "Error writing storage slots to database", peer, gotStorage,
nReq=req.len, nStorageQueue=env.fetchStorage.len,
error=report[^1].error
error "Error writing storage slots to database", peer,
nSlotLists=env.nSlotLists, nSlotLists=gotSlotLists, nReq=req.len,
nStorageQueue=env.fetchStorage.len, error=report[^1].error
return
# Push back error entries to be processed later
@ -224,28 +224,52 @@ proc storeStorages*(buddy: SnapBuddyRef) {.async.} =
# No partial result processing anymore to consider
stoRange.data.proof = @[]
# Update local statistics counter for `nStorage` counter update
gotStorage.dec
# Update local statistics counter for `nSlotLists` counter update
gotSlotLists.dec
trace "Error processing storage slots", peer, gotStorage,
nReqInx=inx, nReq=req.len, nStorageQueue=env.fetchStorage.len,
error=report[inx].error
trace "Error processing storage slots", peer, nSlotLists=env.nSlotLists,
nSlotLists=gotSlotLists, nReqInx=inx, nReq=req.len,
nStorageQueue=env.fetchStorage.len, error=report[inx].error
# Update statistics
if gotStorage == 1 and
if gotSlotLists == 1 and
req[0].subRange.isSome and
env.fetchStorage.hasKey req[0].storageRoot.to(NodeKey):
# Successful partial request, but not completely done with yet.
gotStorage = 0
gotSlotLists = 0
env.nStorage.inc(gotStorage)
env.nSlotLists.inc(gotSlotLists)
# Return unprocessed left overs to batch queue
env.fetchStorage.merge stoRange.leftOver
when extraTraceMessages:
trace "Done fetching storage slots", peer, gotStorage,
nStorageQueue=env.fetchStorage.len
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc storeStorages*(buddy: SnapBuddyRef) {.async.} =
## Fetch account storage slots and store them in the database.
let
env = buddy.data.pivotEnv
peer = buddy.peer
if 0 < env.fetchStorage.len:
# Run at most the minimum number of times to get the batch queue cleaned up.
var loopCount = 1 + (env.fetchStorage.len - 1) div maxStoragesFetch
when extraTraceMessages:
trace "Start fetching storage slots", peer, nSlotLists=env.nSlotLists,
nStorageQueue=env.fetchStorage.len, loopCount
while 0 < loopCount and
0 < env.fetchStorage.len and
not buddy.ctrl.stopped:
loopCount.dec
await buddy.storeStoragesSingleBatch()
when extraTraceMessages:
trace "Done fetching storage slots", peer, nSlotLists=env.nSlotLists,
nStorageQueue=env.fetchStorage.len, loopCount
# ------------------------------------------------------------------------------
# End

View File

@ -28,7 +28,7 @@ type
pivotBlock*: Option[BlockNumber]
nAccounts*: (float,float) ## mean and standard deviation
accountsFill*: (float,float,float) ## mean, standard deviation, merged total
nStorage*: (float,float) ## mean and standard deviation
nSlotLists*: (float,float) ## mean and standard deviation
storageQueue*: (float,float) ## mean and standard deviation
nQueues*: int
@ -126,8 +126,10 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} =
noFmtError("runLogTicker"):
if data.pivotBlock.isSome:
pivot = &"#{data.pivotBlock.get}/{data.nQueues}"
nAcc = &"{(data.nAccounts[0]+0.5).int64}({(data.nAccounts[1]+0.5).int64})"
nSto = &"{(data.nStorage[0]+0.5).int64}({(data.nStorage[1]+0.5).int64})"
nAcc = (&"{(data.nAccounts[0]+0.5).int64}" &
&"({(data.nAccounts[1]+0.5).int64})")
nSto = (&"{(data.nSlotLists[0]+0.5).int64}" &
&"({(data.nSlotLists[1]+0.5).int64})")
info "Snap sync statistics",
tick, buddies, pivot, nAcc, accCov, nSto, stoQue, mem

View File

@ -137,7 +137,7 @@ type
# Info
nAccounts*: uint64 ## Imported # of accounts
nStorage*: uint64 ## Imported # of account storage tries
nSlotLists*: uint64 ## Imported # of account storage tries
SnapPivotTable* = ##\
## LRU table, indexed by state root

View File

@ -40,26 +40,32 @@
##
## The argument `last` is set `true` if the last entry is reached.
##
## Note that this function does not run in `async` mode.
## Note:
## + This function does not run in `async` mode.
## + The flag `buddy.ctx.poolMode` has priority over the flag
## `buddy.ctrl.multiOk` which controls `runSingle()` and `runMulti()`.
##
##
## *runSingle(buddy: BuddyRef[S,W]) {.async.}*
## This worker peer method is invoked if the peer-local flag
## `buddy.ctrl.multiOk` is set `false` which is the default mode. This flag
## is updated by the worker peer when deemed appropriate.
## * For all workers, there can be only one `runSingle()` function active
## + For all workers, there can be only one `runSingle()` function active
## simultaneously for all worker peers.
## * There will be no `runMulti()` function active for the same worker peer
## + There will be no `runMulti()` function active for the same worker peer
## simultaneously
## * There will be no `runPool()` iterator active simultaneously.
## + There will be no `runPool()` iterator active simultaneously.
##
## Note that this function runs in `async` mode.
##
##
## *runMulti(buddy: BuddyRef[S,W]) {.async.}*
## This worker peer method is invoked if the `buddy.ctrl.multiOk` flag is
## set `true` which is typically done after finishing `runSingle()`. This
## instance can be simultaneously active for all worker peers.
##
## Note that this function runs in `async` mode.
##
##
## Additional import files needed when using this template:
## * eth/[common, p2p]
@ -78,6 +84,10 @@ import
{.push raises: [Defect].}
static:
# type `EthWireRef` is needed in `initSync()`
type silenceUnusedhandlerComplaint = EthWireRef # dummy directive
type
ActiveBuddies[S,W] = ##\
## List of active workers, using `Hash(Peer)` rather than `Peer`
@ -274,7 +284,7 @@ proc initSync*[S,W](
dsc.buddies.init(dsc.ctx.buddiesMax)
proc startSync*[S,W](dsc: RunnerSyncRef[S,W]): bool =
## Set up syncing. This call should come early.
## Set up `PeerObserver` handlers and start syncing.
mixin runSetup
# Initialise sub-systems
if dsc.ctx.runSetup(dsc.tickerOk):
@ -291,10 +301,10 @@ proc startSync*[S,W](dsc: RunnerSyncRef[S,W]): bool =
return true
proc stopSync*[S,W](dsc: RunnerSyncRef[S,W]) =
## Stop syncing
## Stop syncing and free peer handlers .
mixin runRelease
dsc.pool.delObserver(dsc)
dsc.ctx.runRelease()
dsc.pool.delObserver(dsc)
# ------------------------------------------------------------------------------
# End