From 82ceec313db87f00747e43bd4925bde701a4ccda Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Fri, 21 Oct 2022 20:29:42 +0100 Subject: [PATCH] Prettify logging for snap sync environment (#1278) * Multiple storage batches at a time why: Previously only some small portion was processed at a time so the peer might have gone when the process was resumed at a later time * Renamed some field of snap/1 protocol response object why: Documented as `slots` is in reality a per-account list of slot lists. So the new name `slotLists` better reflects the nature of the beast. * Some minor healing re-arrangements for storage slot tries why; Resolving all complete inherited slots tries first in sync mode keeps the worker queues smaller which improves logging. * Prettify logging, comments update etc. --- nimbus/sync/protocol/snap1.nim | 2 +- nimbus/sync/snap/worker.nim | 9 +- .../snap/worker/com/get_storage_ranges.nim | 32 +++-- .../sync/snap/worker/db/snapdb_accounts.nim | 23 ++-- .../snap/worker/db/snapdb_storage_slots.nim | 76 +++++----- nimbus/sync/snap/worker/heal_accounts.nim | 4 +- nimbus/sync/snap/worker/heal_storages.nim | 130 +++++++++++++----- nimbus/sync/snap/worker/store_storages.nim | 90 +++++++----- nimbus/sync/snap/worker/ticker.nim | 8 +- nimbus/sync/snap/worker_desc.nim | 2 +- nimbus/sync/sync_sched.nim | 24 +++- 11 files changed, 255 insertions(+), 145 deletions(-) diff --git a/nimbus/sync/protocol/snap1.nim b/nimbus/sync/protocol/snap1.nim index c9178cabc..8a7d502ec 100644 --- a/nimbus/sync/protocol/snap1.nim +++ b/nimbus/sync/protocol/snap1.nim @@ -312,7 +312,7 @@ p2pProtocol snap1(version = 1, # User message 0x03: StorageRanges. # Note: See comments in this file for a list of Geth quirks to expect. - proc storageRanges(peer: Peer, slots: openArray[seq[SnapStorage]], + proc storageRanges(peer: Peer, slotLists: openArray[seq[SnapStorage]], proof: SnapStorageProof) # User message 0x04: GetByteCodes. diff --git a/nimbus/sync/snap/worker.nim b/nimbus/sync/snap/worker.nim index 4092198a5..49bf2d5cf 100644 --- a/nimbus/sync/snap/worker.nim +++ b/nimbus/sync/snap/worker.nim @@ -200,7 +200,7 @@ proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater = uSum += fill uSqSum += fill * fill - let sLen = kvp.data.nStorage.float + let sLen = kvp.data.nSlotLists.float sSum += sLen sSqSum += sLen * sLen @@ -225,7 +225,7 @@ proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater = pivotBlock: pivotBlock, nQueues: ctx.data.pivotTable.len, nAccounts: meanStdDev(aSum, aSqSum, count), - nStorage: meanStdDev(sSum, sSqSum, count), + nSlotLists: meanStdDev(sSum, sSqSum, count), accountsFill: (accFill[0], accFill[1], accCoverage), storageQueue: meanStdDev(wSum, wSqSum, count)) @@ -342,6 +342,11 @@ proc runPool*(buddy: SnapBuddyRef, last: bool) = if env.fetchStorage.len == 0: env.serialSync = true + if extraTraceMessages: + trace "Checked for pivot DB completeness", + nAccounts=env.nAccounts, accountsDone=env.accountsDone, + nSlotLists=env.nSlotLists, storageDone=env.serialSync + proc runMulti*(buddy: SnapBuddyRef) {.async.} = ## This peer worker is invoked if the `buddy.ctrl.multiOk` flag is set diff --git a/nimbus/sync/snap/worker/com/get_storage_ranges.nim b/nimbus/sync/snap/worker/com/get_storage_ranges.nim index d5aa8d0b5..3c455c6f8 100644 --- a/nimbus/sync/snap/worker/com/get_storage_ranges.nim +++ b/nimbus/sync/snap/worker/com/get_storage_ranges.nim @@ -29,7 +29,7 @@ type # slotData*: Blob # # SnapStorageRanges* = object - # slots*: seq[seq[SnapStorage]] + # slotLists*: seq[seq[SnapStorage]] # proof*: SnapStorageProof GetStorageRanges* = object @@ -111,16 +111,16 @@ proc getStorageRanges*( trace trSnapRecvTimeoutWaiting & "for reply to GetStorageRanges", peer, nAccounts return err(ComResponseTimeout) - if nAccounts < rc.value.get.slots.len: + if nAccounts < rc.value.get.slotLists.len: # Ooops, makes no sense return err(ComTooManyStorageSlots) rc.value.get let - nSlots = snStoRanges.slots.len + nSlotLists = snStoRanges.slotLists.len nProof = snStoRanges.proof.len - if nSlots == 0: + if nSlotLists == 0: # github.com/ethereum/devp2p/blob/master/caps/snap.md#getstorageranges-0x02: # # Notes: @@ -130,7 +130,7 @@ proc getStorageRanges*( # the responsibility of the caller to query an state not older than 128 # blocks; and the caller is expected to only ever query existing accounts. trace trSnapRecvReceived & "empty StorageRanges", peer, - nAccounts, nSlots, nProof, stateRoot, firstAccount=accounts[0].accHash + nAccounts, nSlotLists, nProof, stateRoot, firstAccount=accounts[0].accHash return err(ComNoStorageForAccounts) # Assemble return structure for given peer response @@ -138,21 +138,22 @@ proc getStorageRanges*( # Filter remaining `slots` responses: # * Accounts for empty ones go back to the `leftOver` list. - for n in 0 ..< nSlots: + for n in 0 ..< nSlotLists: # Empty data for a slot indicates missing data - if snStoRanges.slots[n].len == 0: + if snStoRanges.slotLists[n].len == 0: dd.leftOver.add accounts[n] else: dd.data.storages.add AccountSlots( account: accounts[n], # known to be no fewer accounts than slots - data: snStoRanges.slots[n]) + data: snStoRanges.slotLists[n]) # Complete the part that was not answered by the peer if nProof == 0: - dd.leftOver = dd.leftOver & accounts[nSlots ..< nAccounts] # empty slice ok + # assigning empty slice is ok + dd.leftOver = dd.leftOver & accounts[nSlotLists ..< nAccounts] # Ok, we have a proof now. What was it to be proved? - elif snStoRanges.slots[^1].len == 0: + elif snStoRanges.slotLists[^1].len == 0: return err(ComNoDataForProof) # Now way to prove an empty node set else: @@ -165,12 +166,13 @@ proc getStorageRanges*( if respTop < reqTop: dd.leftOver.add AccountSlotsHeader( subRange: some(NodeTagRange.new(respTop + 1.u256, reqTop)), - accHash: accounts[nSlots-1].accHash, - storageRoot: accounts[nSlots-1].storageRoot) - dd.leftOver = dd.leftOver & accounts[nSlots ..< nAccounts] # empty slice ok + accHash: accounts[nSlotLists-1].accHash, + storageRoot: accounts[nSlotLists-1].storageRoot) + # assigning empty slice isa ok + dd.leftOver = dd.leftOver & accounts[nSlotLists ..< nAccounts] - trace trSnapRecvReceived & "StorageRanges", peer, nAccounts, nSlots, nProof, - nLeftOver=dd.leftOver.len, stateRoot + trace trSnapRecvReceived & "StorageRanges", peer, nAccounts, nSlotLists, + nProof, nLeftOver=dd.leftOver.len, stateRoot return ok(dd) diff --git a/nimbus/sync/snap/worker/db/snapdb_accounts.nim b/nimbus/sync/snap/worker/db/snapdb_accounts.nim index 1644f2216..f7d622966 100644 --- a/nimbus/sync/snap/worker/db/snapdb_accounts.nim +++ b/nimbus/sync/snap/worker/db/snapdb_accounts.nim @@ -332,22 +332,21 @@ proc inspectAccountsTrie*( stats = ps.hexaDb.hexaryInspectTrie(ps.root, pathList) block checkForError: - let error = block: - if stats.stopped: - TrieLoopAlert - elif stats.level == 0: - TrieIsEmpty - else: - break checkForError - trace "Inspect account trie failed", peer, nPathList=pathList.len, - nDangling=stats.dangling.len, stoppedAt=stats.level, error + var error = TrieIsEmpty + if stats.stopped: + error = TrieLoopAlert + trace "Inspect account trie failed", peer, nPathList=pathList.len, + nDangling=stats.dangling.len, stoppedAt=stats.level, error + elif 0 < stats.level: + break checkForError if ignoreError: return ok(stats) return err(error) - when extraTraceMessages: - trace "Inspect account trie ok", peer, nPathList=pathList.len, - nDangling=stats.dangling.len, level=stats.level + #when extraTraceMessages: + # trace "Inspect account trie ok", peer, nPathList=pathList.len, + # nDangling=stats.dangling.len, level=stats.level + return ok(stats) proc inspectAccountsTrie*( diff --git a/nimbus/sync/snap/worker/db/snapdb_storage_slots.nim b/nimbus/sync/snap/worker/db/snapdb_storage_slots.nim index 615f5aa69..88ca37355 100644 --- a/nimbus/sync/snap/worker/db/snapdb_storage_slots.nim +++ b/nimbus/sync/snap/worker/db/snapdb_storage_slots.nim @@ -95,28 +95,31 @@ proc persistentStorageSlots( proc collectStorageSlots( peer: Peer; - slots: seq[SnapStorage]; + slotLists: seq[SnapStorage]; ): Result[seq[RLeafSpecs],HexaryDbError] {.gcsafe, raises: [Defect, RlpError].} = ## Similar to `collectAccounts()` var rcSlots: seq[RLeafSpecs] - if slots.len != 0: + if slotLists.len != 0: # Add initial account rcSlots.add RLeafSpecs( - pathTag: slots[0].slotHash.to(NodeTag), - payload: slots[0].slotData) + pathTag: slotLists[0].slotHash.to(NodeTag), + payload: slotLists[0].slotData) # Veify & add other accounts - for n in 1 ..< slots.len: - let nodeTag = slots[n].slotHash.to(NodeTag) + for n in 1 ..< slotLists.len: + let nodeTag = slotLists[n].slotHash.to(NodeTag) if nodeTag <= rcSlots[^1].pathTag: let error = SlotsNotSrictlyIncreasing - trace "collectStorageSlots()", peer, item=n, slots=slots.len, error + trace "collectStorageSlots()", peer, item=n, + nSlotLists=slotLists.len, error return err(error) - rcSlots.add RLeafSpecs(pathTag: nodeTag, payload: slots[n].slotData) + rcSlots.add RLeafSpecs( + pathTag: nodeTag, + payload: slotLists[n].slotData) ok(rcSlots) @@ -206,52 +209,52 @@ proc importStorageSlots*( nItems = data.storages.len sTop = nItems - 1 var - slot: Option[int] + itemInx: Option[int] if 0 <= sTop: try: for n in 0 ..< sTop: # These ones never come with proof data - slot = some(n) + itemInx = some(n) let rc = ps.importStorageSlots(data.storages[n], @[]) if rc.isErr: - result.add HexaryNodeReport(slot: slot, error: rc.error) - trace "Storage slots item fails", peer, inx=n, nItems, - slots=data.storages[n].data.len, proofs=0, + result.add HexaryNodeReport(slot: itemInx, error: rc.error) + trace "Storage slots item fails", peer, itemInx=n, nItems, + nSlots=data.storages[n].data.len, proofs=0, error=rc.error, nErrors=result.len # Final one might come with proof data block: - slot = some(sTop) + itemInx = some(sTop) let rc = ps.importStorageSlots(data.storages[sTop], data.proof) if rc.isErr: - result.add HexaryNodeReport(slot: slot, error: rc.error) - trace "Storage slots last item fails", peer, inx=sTop, nItems, - slots=data.storages[sTop].data.len, proofs=data.proof.len, + result.add HexaryNodeReport(slot: itemInx, error: rc.error) + trace "Storage slots last item fails", peer, itemInx=sTop, nItems, + nSlots=data.storages[sTop].data.len, proofs=data.proof.len, error=rc.error, nErrors=result.len # Store to disk if persistent and 0 < ps.hexaDb.tab.len: - slot = none(int) + itemInx = none(int) let rc = ps.hexaDb.persistentStorageSlots(ps) if rc.isErr: - result.add HexaryNodeReport(slot: slot, error: rc.error) + result.add HexaryNodeReport(slot: itemInx, error: rc.error) except RlpError: - result.add HexaryNodeReport(slot: slot, error: RlpEncoding) - trace "Storage slot node error", peer, slot, nItems, - slots=data.storages[sTop].data.len, proofs=data.proof.len, + result.add HexaryNodeReport(slot: itemInx, error: RlpEncoding) + trace "Storage slot node error", peer, itemInx, nItems, + nSlots=data.storages[sTop].data.len, proofs=data.proof.len, error=RlpEncoding, nErrors=result.len except KeyError as e: raiseAssert "Not possible @ importStorages: " & e.msg except OSError as e: - result.add HexaryNodeReport(slot: slot, error: OSErrorException) - trace "Import storage slots exception", peer, slot, nItems, + result.add HexaryNodeReport(slot: itemInx, error: OSErrorException) + trace "Import storage slots exception", peer, itemInx, nItems, name=($e.name), msg=e.msg, nErrors=result.len when extraTraceMessages: if result.len == 0: trace "Storage slots imported", peer, nItems, - slots=data.storages.len, proofs=data.proof.len + nSlotLists=data.storages.len, proofs=data.proof.len proc importStorageSlots*( pv: SnapDbRef; ## Base descriptor on `BaseChainDB` @@ -363,22 +366,21 @@ proc inspectStorageSlotsTrie*( stats = ps.hexaDb.hexaryInspectTrie(ps.root, pathList) block checkForError: - let error = block: - if stats.stopped: - TrieLoopAlert - elif stats.level == 0: - TrieIsEmpty - else: - break checkForError - trace "Inspect storage slots trie failed", peer, nPathList=pathList.len, - nDangling=stats.dangling.len, stoppedAt=stats.level, error + var error = TrieIsEmpty + if stats.stopped: + error = TrieLoopAlert + trace "Inspect storage slots trie failed", peer, nPathList=pathList.len, + nDangling=stats.dangling.len, stoppedAt=stats.level + elif 0 < stats.level: + break checkForError if ignoreError: return ok(stats) return err(error) - when extraTraceMessages: - trace "Inspect storage slots trie ok", peer, nPathList=pathList.len, - nDangling=stats.dangling.len, level=stats.level + #when extraTraceMessages: + # trace "Inspect storage slots trie ok", peer, nPathList=pathList.len, + # nDangling=stats.dangling.len, level=stats.level + return ok(stats) proc inspectStorageSlotsTrie*( diff --git a/nimbus/sync/snap/worker/heal_accounts.nim b/nimbus/sync/snap/worker/heal_accounts.nim index a67cf808b..94fff613d 100644 --- a/nimbus/sync/snap/worker/heal_accounts.nim +++ b/nimbus/sync/snap/worker/heal_accounts.nim @@ -133,12 +133,12 @@ proc healingCtx(buddy: SnapBuddyRef): string = let ctx = buddy.ctx env = buddy.data.pivotEnv - "[" & + "{" & "nAccounts=" & $env.nAccounts & "," & ("covered=" & env.fetchAccounts.unprocessed.emptyFactor.toPC(0) & "/" & ctx.data.coveredAccounts.fullFactor.toPC(0)) & "," & "nCheckNodes=" & $env.fetchAccounts.checkNodes.len & "," & - "nMissingNodes=" & $env.fetchAccounts.missingNodes.len & "]" + "nMissingNodes=" & $env.fetchAccounts.missingNodes.len & "}" # ------------------------------------------------------------------------------ # Private functions diff --git a/nimbus/sync/snap/worker/heal_storages.nim b/nimbus/sync/snap/worker/heal_storages.nim index 445204016..de28a89e6 100644 --- a/nimbus/sync/snap/worker/heal_storages.nim +++ b/nimbus/sync/snap/worker/heal_storages.nim @@ -45,15 +45,40 @@ proc healingCtx( ): string = let slots = kvp.data.slots - "[" & + "{" & "covered=" & slots.unprocessed.emptyFactor.toPC(0) & "nCheckNodes=" & $slots.checkNodes.len & "," & - "nMissingNodes=" & $slots.missingNodes.len & "]" + "nMissingNodes=" & $slots.missingNodes.len & "}" # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ +proc acceptWorkItemAsIs( + buddy: SnapBuddyRef; + kvp: SnapSlotsQueuePair; + ): Result[bool, HexaryDbError] = + ## Check whether this work item is done and the corresponding storage trie + ## can be completely inherited. + if kvp.data.inherit: + let + ctx = buddy.ctx + peer = buddy.peer + db = ctx.data.snapDb + accHash = kvp.data.accHash + storageRoot = kvp.key.to(Hash256) + + rc = db.inspectStorageSlotsTrie(peer, accHash, storageRoot) + + # Check whether the hexary trie is complete + if rc.isOk: + return ok(rc.value.dangling.len == 0) + + return err(rc.error) + + ok(false) + + proc updateMissingNodesList( buddy: SnapBuddyRef; kvp: SnapSlotsQueuePair) = @@ -64,6 +89,7 @@ proc updateMissingNodesList( ctx = buddy.ctx db = ctx.data.snapDb peer = buddy.peer + env = buddy.data.pivotEnv accHash = kvp.data.accHash storageRoot = kvp.key.to(Hash256) slots = kvp.data.slots @@ -71,7 +97,8 @@ proc updateMissingNodesList( nodes: seq[Blob] when extraTraceMessages: - trace "Start storage slots healing", peer, ctx=buddy.healingCtx(kvp) + trace "Start storage slots healing", peer, ctx=buddy.healingCtx(kvp), + nSlotLists=env.nSlotLists, nStorageQueue=env.fetchStorage.len for slotKey in slots.missingNodes: let rc = db.getStorageSlotsNodeKey(peer, accHash, storageRoot, slotKey) @@ -96,6 +123,7 @@ proc appendMoreDanglingNodesToMissingNodesList( ctx = buddy.ctx db = ctx.data.snapDb peer = buddy.peer + env = buddy.data.pivotEnv accHash = kvp.data.accHash storageRoot = kvp.key.to(Hash256) slots = kvp.data.slots @@ -106,7 +134,8 @@ proc appendMoreDanglingNodesToMissingNodesList( if rc.isErr: when extraTraceMessages: error "Storage slots healing failed => stop", peer, - ctx=buddy.healingCtx(kvp), error=rc.error + ctx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists, + nStorageQueue=env.fetchStorage.len, error=rc.error # Attempt to switch peers, there is not much else we can do here buddy.ctrl.zombie = true return false @@ -128,6 +157,7 @@ proc getMissingNodesFromNetwork( let ctx = buddy.ctx peer = buddy.peer + env = buddy.data.pivotEnv accHash = kvp.data.accHash storageRoot = kvp.key.to(Hash256) slots = kvp.data.slots @@ -159,12 +189,14 @@ proc getMissingNodesFromNetwork( discard when extraTraceMessages: trace "Error fetching storage slots nodes for healing => stop", peer, - ctx=buddy.healingCtx(kvp), error + ctx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists, + nStorageQueue=env.fetchStorage.len, error else: discard when extraTraceMessages: trace "Error fetching storage slots nodes for healing", peer, - ctx=buddy.healingCtx(kvp), error + ctx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists, + nStorageQueue=env.fetchStorage.len, error return @[] @@ -179,6 +211,7 @@ proc kvStorageSlotsLeaf( ## Read leaf node from persistent database (if any) let peer = buddy.peer + env = buddy.data.pivotEnv nodeRlp = rlpFromBytes node (_,prefix) = hexPrefixDecode partialPath @@ -189,7 +222,8 @@ proc kvStorageSlotsLeaf( when extraTraceMessages: trace "Isolated node path for healing => ignored", peer, - ctx=buddy.healingCtx(kvp) + ctx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists, + nStorageQueue=env.fetchStorage.len proc registerStorageSlotsLeaf( @@ -217,8 +251,9 @@ proc registerStorageSlotsLeaf( discard ivSet.reduce(pt,pt) when extraTraceMessages: - trace "Isolated storage slot for healing", - peer, ctx=buddy.healingCtx(kvp), slotKey=pt + trace "Isolated storage slot for healing", peer, + ctx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists, + nStorageQueue=env.fetchStorage.len, slotKey=pt # ------------------------------------------------------------------------------ # Private functions: do the healing for one work item (sub-trie) @@ -235,6 +270,7 @@ proc storageSlotsHealing( ctx = buddy.ctx db = ctx.data.snapDb peer = buddy.peer + env = buddy.data.pivotEnv accHash = kvp.data.accHash slots = kvp.data.slots @@ -261,13 +297,16 @@ proc storageSlotsHealing( if 0 < report.len and report[^1].slot.isNone: # Storage error, just run the next lap (not much else that can be done) error "Storage slots healing, error updating persistent database", peer, - ctx=buddy.healingCtx(kvp), nNodes=nodesData.len, error=report[^1].error + ctx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists, + nStorageQueue=env.fetchStorage.len, nNodes=nodesData.len, + error=report[^1].error slots.missingNodes = slots.missingNodes & nodesData return false when extraTraceMessages: trace "Storage slots healing, nodes merged into database", peer, - ctx=buddy.healingCtx(kvp), nNodes=nodesData.len + ctx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists, + nStorageQueue=env.fetchStorage.len, nNodes=nodesData.len # Filter out error and leaf nodes for w in report: @@ -295,7 +334,9 @@ proc storageSlotsHealing( slots.checkNodes.add nodePath when extraTraceMessages: - trace "Storage slots healing job done", peer, ctx=buddy.healingCtx(kvp) + trace "Storage slots healing job done", peer, + ctx=buddy.healingCtx(kvp), nSlotLists=env.nSlotLists, + nStorageQueue=env.fetchStorage.len proc healingIsComplete( @@ -312,6 +353,7 @@ proc healingIsComplete( ctx = buddy.ctx db = ctx.data.snapDb peer = buddy.peer + env = buddy.data.pivotEnv accHash = kvp.data.accHash storageRoot = kvp.key.to(Hash256) @@ -321,7 +363,9 @@ proc healingIsComplete( if rc.isErr: # Oops, not much we can do here (looping trie?) - error "Problem inspecting storage trie", peer, storageRoot, error=rc.error + error "Problem inspecting storage trie", peer, + nSlotLists=env.nSlotLists, nStorageQueue=env.fetchStorage.len, + storageRoot, error=rc.error return false # Check whether the hexary trie can be inherited as-is. @@ -355,6 +399,7 @@ proc healStoragesDb*(buddy: SnapBuddyRef) {.async.} = env = buddy.data.pivotEnv var toBeHealed: seq[SnapSlotsQueuePair] + nAcceptedAsIs = 0 # Search the current slot item batch list for items to complete via healing for kvp in env.fetchStorage.nextPairs: @@ -369,31 +414,52 @@ proc healStoragesDb*(buddy: SnapBuddyRef) {.async.} = if slots.isNil or slots.unprocessed.emptyFactor < healSlorageSlotsTrigger: continue + # Remove `kvp` work item from the queue object (which is allowed within a + # `for` loop over a `KeyedQueue` object type.) + env.fetchStorage.del(kvp.key) + + # With some luck, the `kvp` work item refers to a complete storage trie + # that can be be accepted as-is in wich case `kvp` can be just dropped. + block: + let rc = buddy.acceptWorkItemAsIs(kvp) + if rc.isOk and rc.value: + env.nSlotLists.inc + nAcceptedAsIs.inc # for logging + continue # dropping `kvp` + # Add to local batch to be processed, below - env.fetchStorage.del(kvp.key) # ok to delete this item from batch queue - toBeHealed.add kvp # to be held in local queue + toBeHealed.add kvp if maxStoragesHeal <= toBeHealed.len: break - when extraTraceMessages: - let nToBeHealed = toBeHealed.len - if 0 < nToBeHealed: - trace "Processing storage healing items", peer, nToBeHealed - # Run against local batch - for n in 0 ..< toBeHealed.len: - let - kvp = toBeHealed[n] - isComplete = await buddy.healingIsComplete(kvp) - if isComplete: - env.nStorage.inc - else: - env.fetchStorage.merge kvp + let nHealerQueue = toBeHealed.len + if 0 < nHealerQueue: + when extraTraceMessages: + trace "Processing storage healing items", peer, + nSlotLists=env.nSlotLists, nStorageQueue=env.fetchStorage.len, + nHealerQueue, nAcceptedAsIs - if buddy.ctrl.stopped: - # Oops, peer has gone - env.fetchStorage.merge toBeHealed[n+1 ..< toBeHealed.len] - break + for n in 0 ..< toBeHealed.len: + let + kvp = toBeHealed[n] + isComplete = await buddy.healingIsComplete(kvp) + if isComplete: + env.nSlotLists.inc + nAcceptedAsIs.inc + else: + env.fetchStorage.merge kvp + + if buddy.ctrl.stopped: + # Oops, peer has gone + env.fetchStorage.merge toBeHealed[n+1 ..< toBeHealed.len] + break + + when extraTraceMessages: + if 0 < nHealerQueue or 0 < nAcceptedAsIs: + trace "Done storage healing items", peer, + nSlotLists=env.nSlotLists, nStorageQueue=env.fetchStorage.len, + nHealerQueue, nAcceptedAsIs # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/store_storages.nim b/nimbus/sync/snap/worker/store_storages.nim index 352e57dc8..09f9a5e8c 100644 --- a/nimbus/sync/snap/worker/store_storages.nim +++ b/nimbus/sync/snap/worker/store_storages.nim @@ -106,8 +106,8 @@ proc getNextSlotItems(buddy: SnapBuddyRef): seq[AccountSlotsHeader] = when extraTraceMessages: trace "Prepare fetching partial storage slots", peer, - nStorageQueue=env.fetchStorage.len, subRange=rc.value, - account=reqData.accHash.to(NodeTag) + nSlotLists=env.nSlotLists, nStorageQueue=env.fetchStorage.len, + nToProcess=1, subRange=rc.value, account=reqData.accHash.to(NodeTag) return @[AccountSlotsHeader( accHash: reqData.accHash, @@ -118,8 +118,8 @@ proc getNextSlotItems(buddy: SnapBuddyRef): seq[AccountSlotsHeader] = reqData.slots = nil discard env.fetchStorage.lruFetch(reqKey) - # So there are no partial ranges (aka `slots`) anymore. Assemble maximal - # request queue. + # So there are no partial slot ranges anymore. Assemble maximal request queue. + var nInherit = 0 for kvp in env.fetchStorage.nextPairs: let it = AccountSlotsHeader( accHash: kvp.data.accHash, @@ -129,9 +129,7 @@ proc getNextSlotItems(buddy: SnapBuddyRef): seq[AccountSlotsHeader] = if kvp.data.inherit or ctx.data.snapDb.haveStorageSlotsData(peer, it.accHash, it.storageRoot): kvp.data.inherit = true - when extraTraceMessages: - trace "Inheriting storage slots", peer, - nStorageQueue=env.fetchStorage.len, account=it.accHash.to(NodeTag) + nInherit.inc # update for logging continue result.add it @@ -141,11 +139,12 @@ proc getNextSlotItems(buddy: SnapBuddyRef): seq[AccountSlotsHeader] = if maxStoragesFetch <= result.len: break -# ------------------------------------------------------------------------------ -# Public functions -# ------------------------------------------------------------------------------ + when extraTraceMessages: + trace "Fetch account storage slots", peer, nSlotLists=env.nSlotLists, + nStorageQueue=env.fetchStorage.len, nToProcess=result.len, nInherit -proc storeStorages*(buddy: SnapBuddyRef) {.async.} = + +proc storeStoragesSingleBatch(buddy: SnapBuddyRef) {.async.} = ## Fetch account storage slots and store them in the database. let ctx = buddy.ctx @@ -172,21 +171,22 @@ proc storeStorages*(buddy: SnapBuddyRef) {.async.} = if await buddy.ctrl.stopAfterSeriousComError(error, buddy.data.errors): discard trace "Error fetching storage slots => stop", peer, - nReq=req.len, nStorageQueue=env.fetchStorage.len, error + nSlotLists=env.nSlotLists, nReq=req.len, + nStorageQueue=env.fetchStorage.len, error return rc.value # Reset error counts for detecting repeated timeouts buddy.data.errors.nTimeouts = 0 - var gotStorage = stoRange.data.storages.len + var gotSlotLists = stoRange.data.storages.len - when extraTraceMessages: - trace "Fetched storage slots", peer, gotStorage, - nLeftOvers=stoRange.leftOver.len, nReq=req.len, - nStorageQueue=env.fetchStorage.len + #when extraTraceMessages: + # trace "Fetched storage slots", peer, + # nSlotLists=env.nSlotLists, nSlotLists=gotSlotLists, nReq=req.len, + # nStorageQueue=env.fetchStorage.len, nLeftOvers=stoRange.leftOver.len - if 0 < gotStorage: + if 0 < gotSlotLists: # Verify/process storages data and save it to disk let report = ctx.data.snapDb.importStorageSlots(peer, stoRange.data) @@ -196,11 +196,11 @@ proc storeStorages*(buddy: SnapBuddyRef) {.async.} = if report[^1].slot.isNone: # Failed to store on database, not much that can be done here env.fetchStorage.merge req - gotStorage.dec(report.len - 1) # for logging only + gotSlotLists.dec(report.len - 1) # for logging only - error "Error writing storage slots to database", peer, gotStorage, - nReq=req.len, nStorageQueue=env.fetchStorage.len, - error=report[^1].error + error "Error writing storage slots to database", peer, + nSlotLists=env.nSlotLists, nSlotLists=gotSlotLists, nReq=req.len, + nStorageQueue=env.fetchStorage.len, error=report[^1].error return # Push back error entries to be processed later @@ -224,28 +224,52 @@ proc storeStorages*(buddy: SnapBuddyRef) {.async.} = # No partial result processing anymore to consider stoRange.data.proof = @[] - # Update local statistics counter for `nStorage` counter update - gotStorage.dec + # Update local statistics counter for `nSlotLists` counter update + gotSlotLists.dec - trace "Error processing storage slots", peer, gotStorage, - nReqInx=inx, nReq=req.len, nStorageQueue=env.fetchStorage.len, - error=report[inx].error + trace "Error processing storage slots", peer, nSlotLists=env.nSlotLists, + nSlotLists=gotSlotLists, nReqInx=inx, nReq=req.len, + nStorageQueue=env.fetchStorage.len, error=report[inx].error # Update statistics - if gotStorage == 1 and + if gotSlotLists == 1 and req[0].subRange.isSome and env.fetchStorage.hasKey req[0].storageRoot.to(NodeKey): # Successful partial request, but not completely done with yet. - gotStorage = 0 + gotSlotLists = 0 - env.nStorage.inc(gotStorage) + env.nSlotLists.inc(gotSlotLists) # Return unprocessed left overs to batch queue env.fetchStorage.merge stoRange.leftOver - when extraTraceMessages: - trace "Done fetching storage slots", peer, gotStorage, - nStorageQueue=env.fetchStorage.len +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc storeStorages*(buddy: SnapBuddyRef) {.async.} = + ## Fetch account storage slots and store them in the database. + let + env = buddy.data.pivotEnv + peer = buddy.peer + + if 0 < env.fetchStorage.len: + # Run at most the minimum number of times to get the batch queue cleaned up. + var loopCount = 1 + (env.fetchStorage.len - 1) div maxStoragesFetch + + when extraTraceMessages: + trace "Start fetching storage slots", peer, nSlotLists=env.nSlotLists, + nStorageQueue=env.fetchStorage.len, loopCount + + while 0 < loopCount and + 0 < env.fetchStorage.len and + not buddy.ctrl.stopped: + loopCount.dec + await buddy.storeStoragesSingleBatch() + + when extraTraceMessages: + trace "Done fetching storage slots", peer, nSlotLists=env.nSlotLists, + nStorageQueue=env.fetchStorage.len, loopCount # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/ticker.nim b/nimbus/sync/snap/worker/ticker.nim index 827f9189c..0988b6bfe 100644 --- a/nimbus/sync/snap/worker/ticker.nim +++ b/nimbus/sync/snap/worker/ticker.nim @@ -28,7 +28,7 @@ type pivotBlock*: Option[BlockNumber] nAccounts*: (float,float) ## mean and standard deviation accountsFill*: (float,float,float) ## mean, standard deviation, merged total - nStorage*: (float,float) ## mean and standard deviation + nSlotLists*: (float,float) ## mean and standard deviation storageQueue*: (float,float) ## mean and standard deviation nQueues*: int @@ -126,8 +126,10 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} = noFmtError("runLogTicker"): if data.pivotBlock.isSome: pivot = &"#{data.pivotBlock.get}/{data.nQueues}" - nAcc = &"{(data.nAccounts[0]+0.5).int64}({(data.nAccounts[1]+0.5).int64})" - nSto = &"{(data.nStorage[0]+0.5).int64}({(data.nStorage[1]+0.5).int64})" + nAcc = (&"{(data.nAccounts[0]+0.5).int64}" & + &"({(data.nAccounts[1]+0.5).int64})") + nSto = (&"{(data.nSlotLists[0]+0.5).int64}" & + &"({(data.nSlotLists[1]+0.5).int64})") info "Snap sync statistics", tick, buddies, pivot, nAcc, accCov, nSto, stoQue, mem diff --git a/nimbus/sync/snap/worker_desc.nim b/nimbus/sync/snap/worker_desc.nim index 6d8d65d2a..88436fe79 100644 --- a/nimbus/sync/snap/worker_desc.nim +++ b/nimbus/sync/snap/worker_desc.nim @@ -137,7 +137,7 @@ type # Info nAccounts*: uint64 ## Imported # of accounts - nStorage*: uint64 ## Imported # of account storage tries + nSlotLists*: uint64 ## Imported # of account storage tries SnapPivotTable* = ##\ ## LRU table, indexed by state root diff --git a/nimbus/sync/sync_sched.nim b/nimbus/sync/sync_sched.nim index 2038dbf66..6ad0bd2d1 100644 --- a/nimbus/sync/sync_sched.nim +++ b/nimbus/sync/sync_sched.nim @@ -40,26 +40,32 @@ ## ## The argument `last` is set `true` if the last entry is reached. ## -## Note that this function does not run in `async` mode. +## Note: +## + This function does not run in `async` mode. +## + The flag `buddy.ctx.poolMode` has priority over the flag +## `buddy.ctrl.multiOk` which controls `runSingle()` and `runMulti()`. ## ## ## *runSingle(buddy: BuddyRef[S,W]) {.async.}* ## This worker peer method is invoked if the peer-local flag ## `buddy.ctrl.multiOk` is set `false` which is the default mode. This flag ## is updated by the worker peer when deemed appropriate. -## * For all workers, there can be only one `runSingle()` function active +## + For all workers, there can be only one `runSingle()` function active ## simultaneously for all worker peers. -## * There will be no `runMulti()` function active for the same worker peer +## + There will be no `runMulti()` function active for the same worker peer ## simultaneously -## * There will be no `runPool()` iterator active simultaneously. +## + There will be no `runPool()` iterator active simultaneously. ## ## Note that this function runs in `async` mode. ## +## ## *runMulti(buddy: BuddyRef[S,W]) {.async.}* ## This worker peer method is invoked if the `buddy.ctrl.multiOk` flag is ## set `true` which is typically done after finishing `runSingle()`. This ## instance can be simultaneously active for all worker peers. ## +## Note that this function runs in `async` mode. +## ## ## Additional import files needed when using this template: ## * eth/[common, p2p] @@ -78,6 +84,10 @@ import {.push raises: [Defect].} +static: + # type `EthWireRef` is needed in `initSync()` + type silenceUnusedhandlerComplaint = EthWireRef # dummy directive + type ActiveBuddies[S,W] = ##\ ## List of active workers, using `Hash(Peer)` rather than `Peer` @@ -274,7 +284,7 @@ proc initSync*[S,W]( dsc.buddies.init(dsc.ctx.buddiesMax) proc startSync*[S,W](dsc: RunnerSyncRef[S,W]): bool = - ## Set up syncing. This call should come early. + ## Set up `PeerObserver` handlers and start syncing. mixin runSetup # Initialise sub-systems if dsc.ctx.runSetup(dsc.tickerOk): @@ -291,10 +301,10 @@ proc startSync*[S,W](dsc: RunnerSyncRef[S,W]): bool = return true proc stopSync*[S,W](dsc: RunnerSyncRef[S,W]) = - ## Stop syncing + ## Stop syncing and free peer handlers . mixin runRelease - dsc.pool.delObserver(dsc) dsc.ctx.runRelease() + dsc.pool.delObserver(dsc) # ------------------------------------------------------------------------------ # End