nimbus-eth1/nimbus/sync/snap/worker/db/snapdb_persistent.nim

280 lines
8.8 KiB
Nim
Raw Normal View History

# nimbus-eth1
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
std/[algorithm, tables],
chronicles,
eth/[common, rlp, trie/db],
../../../../db/kvstore_rocksdb,
../../range_desc,
"."/[hexary_desc, hexary_error, rocky_bulk_load, snapdb_desc]
{.push raises: [].}
logScope:
topics = "snap-db"
type
AccountsGetFn* = proc(key: openArray[byte]): Blob {.gcsafe, raises:[Defect].}
## The `get()` function for the accounts trie
StorageSlotsGetFn* = proc(acc: NodeKey; key: openArray[byte]): Blob {.gcsafe, raises: [Defect].}
## The `get()` function for the storage trie depends on the current account
StateRootRegistry* = object
## State root record. A table of these kind of records is organised as
## follows.
## ::
## zero -> (n/a) -------+
## |
## ... |
## ^ |
## | |
## (data) |
## ^ |
## | |
## (data) |
## ^ |
## | |
## (data) <-----+
##
key*: NodeKey ## Top reference for base entry, back reference otherwise
data*: Blob ## Some data
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc convertTo(key: RepairKey; T: type NodeKey): T =
## Might be lossy, check before use
discard result.init(key.ByteArray33[1 .. 32])
proc convertTo(key: RepairKey; T: type NodeTag): T =
## Might be lossy, check before use
UInt256.fromBytesBE(key.ByteArray33[1 .. 32]).T
Prep for full sync after snap make 6 (#1291) * Update log ticker, using time interval rather than ticker count why: Counting and logging ticker occurrences is inherently imprecise. So time intervals are used. * Use separate storage tables for snap sync data * Left boundary proof update why: Was not properly implemented, yet. * Capture pivot in peer worker (aka buddy) tasks why: The pivot environment is linked to the `buddy` descriptor. While there is a task switch, the pivot may change. So it is passed on as function argument `env` rather than retrieved from the buddy at the start of a sub-function. * Split queues `fetchStorage` into `fetchStorageFull` and `fetchStoragePart` * Remove obsolete account range returned from `GetAccountRange` message why: Handler returned the wrong right value of the range. This range was for convenience, only. * Prioritise storage slots if the queue becomes large why: Currently, accounts processing is prioritised up until all accounts are downloaded. The new prioritisation has two thresholds for + start processing storage slots with a new worker + stop account processing and switch to storage processing also: Provide api for `SnapTodoRanges` pair of range sets in `worker_desc.nim` * Generalise left boundary proof for accounts or storage slots. why: Detailed explanation how this works is documented with `snapdb_accounts.importAccounts()`. Instead of enforcing a left boundary proof (which is still the default), the importer functions return a list of `holes` (aka node paths) found in the argument ranges of leaf nodes. This in turn is used by the book keeping software for data download. * Forgot to pass on variable in function wrapper also: + Start healing not before 99% accounts covered (previously 95%) + Logging updated/prettified
2022-11-08 18:56:04 +00:00
proc toAccountsKey(a: RepairKey): ByteArray33 =
a.convertTo(NodeKey).toAccountsKey
proc toStorageSlotsKey(a: RepairKey): ByteArray33 =
a.convertTo(NodeKey).toStorageSlotsKey
proc stateRootGet*(db: TrieDatabaseRef; nodeKey: Nodekey): Blob =
db.get(nodeKey.toStateRootKey.toOpenArray)
# ------------------------------------------------------------------------------
# Public functions: get
# ------------------------------------------------------------------------------
proc persistentAccountsGetFn*(db: TrieDatabaseRef): AccountsGetFn =
## Returns a `get()` function for retrieving accounts data
return proc(key: openArray[byte]): Blob =
var nodeKey: NodeKey
if nodeKey.init(key):
return db.get(nodeKey.toAccountsKey.toOpenArray)
proc persistentStorageSlotsGetFn*(db: TrieDatabaseRef): StorageSlotsGetFn =
## Returns a `get()` function for retrieving storage slots data
Prep for full sync after snap make 4 (#1282) * Re-arrange fetching storage slots in batch module why; Previously, fetching partial slot ranges first has a chance of terminating the worker peer 9due to network error) while there were many inheritable storage slots on the queue. Now, inheritance is checked first, then full slot ranges and finally partial ranges. * Update logging * Bundled node information for healing into single object `NodeSpecs` why: Previously, partial paths and node keys were kept in separate variables. This approach was error prone due to copying/reassembling function argument objects. As all partial paths, keys, and node data types are more or less handled as `Blob`s over the network (using Eth/6x, or Snap/1) it makes sense to hold these `Blob`s as named field in a single object (even if not all fields are active for the current purpose.) * For good housekeeping, using `NodeKey` type only for account keys why: previously, a mixture of `NodeKey` and `Hash256` was used. Now, only state or storage root keys use the `Hash256` type. * Always accept latest pivot (and not a slightly older one) why; For testing it was tried to use a slightly older pivot state root than available. Some anecdotal tests seemed to suggest an advantage so that more peers are willing to serve on that older pivot. But this could not be confirmed in subsequent tests (still anecdotal, though.) As a side note, the distance of the latest pivot to its predecessor is at least 128 (or whatever the constant `minPivotBlockDistance` is assigned to.) * Reshuffle name components for some file and function names why: Clarifies purpose: "storages" becomes: "storage slots" "store" becomes: "range fetch" * Stash away currently unused modules in sub-folder named "notused"
2022-10-27 13:49:28 +00:00
return proc(accKey: NodeKey; key: openArray[byte]): Blob =
var nodeKey: NodeKey
if nodeKey.init(key):
return db.get(nodeKey.toStorageSlotsKey.toOpenArray)
proc persistentStateRootGet*(
db: TrieDatabaseRef;
root: NodeKey;
): Result[StateRootRegistry,HexaryError] =
## Implements a `get()` function for returning state root registry data.
let rlpBlob = db.stateRootGet(root)
if 0 < rlpBlob.len:
try:
return ok(rlp.decode(rlpBlob, StateRootRegistry))
except RlpError:
return err(RlpEncoding)
err(StateRootNotFound)
# ------------------------------------------------------------------------------
# Public functions: store/put
# ------------------------------------------------------------------------------
proc persistentAccountsPut*(
db: HexaryTreeDbRef;
base: TrieDatabaseRef
): Result[void,HexaryError] =
## Bulk store using transactional `put()`
let dbTx = base.beginTransaction
defer: dbTx.commit
for (key,value) in db.tab.pairs:
if not key.isNodeKey:
let error = UnresolvedRepairNode
trace "Unresolved node in repair table", error
return err(error)
base.put(key.toAccountsKey.toOpenArray, value.convertTo(Blob))
ok()
proc persistentStorageSlotsPut*(
db: HexaryTreeDbRef;
base: TrieDatabaseRef
): Result[void,HexaryError] =
## Bulk store using transactional `put()`
let dbTx = base.beginTransaction
defer: dbTx.commit
for (key,value) in db.tab.pairs:
if not key.isNodeKey:
let error = UnresolvedRepairNode
trace "Unresolved node in repair table", error
return err(error)
base.put(key.toStorageSlotsKey.toOpenArray, value.convertTo(Blob))
ok()
proc persistentStateRootPut*(
db: TrieDatabaseRef;
root: NodeKey;
data: Blob;
) {.gcsafe, raises: [RlpError].} =
## Save or update state root registry data.
const
zeroKey = NodeKey.default
let
rlpData = db.stateRootGet(root)
if rlpData.len == 0:
var backKey: NodeKey
let baseBlob = db.stateRootGet(zeroKey)
if 0 < baseBlob.len:
backKey = rlp.decode(baseBlob, StateRootRegistry).key
# No need for a transaction frame. If the system crashes in between,
# so be it :). All that can happen is storing redundant top entries.
let
rootEntryData = rlp.encode StateRootRegistry(key: backKey, data: data)
zeroEntryData = rlp.encode StateRootRegistry(key: root)
# Store a new top entry
db.put(root.toStateRootKey.toOpenArray, rootEntryData)
# Store updated base record pointing to top entry
db.put(zeroKey.toStateRootKey.toOpenArray, zeroEntryData)
else:
let record = rlp.decode(rlpData, StateRootRegistry)
if record.data != data:
let rootEntryData =
rlp.encode StateRootRegistry(key: record.key, data: data)
db.put(root.toStateRootKey.toOpenArray, rootEntryData)
proc persistentAccountsPut*(
db: HexaryTreeDbRef;
rocky: RocksStoreRef
): Result[void,HexaryError]
{.gcsafe, raises: [OSError,KeyError].} =
## SST based bulk load on `rocksdb`.
if rocky.isNil:
return err(NoRocksDbBackend)
let bulker = RockyBulkLoadRef.init(rocky)
defer: bulker.destroy()
if not bulker.begin(RockyBulkCache):
let error = CannotOpenRocksDbBulkSession
trace "Rocky hexary session initiation failed",
error, info=bulker.lastError()
return err(error)
#let keyList = toSeq(db.tab.keys)
# .filterIt(it.isNodeKey)
# .mapIt(it.convertTo(NodeTag))
# .sorted(cmp)
var
keyList = newSeq[NodeTag](db.tab.len)
inx = 0
for repairKey in db.tab.keys:
if repairKey.isNodeKey:
keyList[inx] = repairKey.convertTo(NodeTag)
inx.inc
if inx < db.tab.len:
return err(UnresolvedRepairNode)
keyList.sort(cmp)
for n,nodeTag in keyList:
let
nodeKey = nodeTag.to(NodeKey)
data = db.tab[nodeKey.to(RepairKey)].convertTo(Blob)
if not bulker.add(nodeKey.toAccountsKey.toOpenArray, data):
let error = AddBulkItemFailed
trace "Rocky hexary bulk load failure",
n, len=db.tab.len, error, info=bulker.lastError()
return err(error)
if bulker.finish().isErr:
let error = CommitBulkItemsFailed
trace "Rocky hexary commit failure",
len=db.tab.len, error, info=bulker.lastError()
return err(error)
ok()
proc persistentStorageSlotsPut*(
db: HexaryTreeDbRef;
rocky: RocksStoreRef
): Result[void,HexaryError]
{.gcsafe, raises: [OSError,KeyError].} =
## SST based bulk load on `rocksdb`.
if rocky.isNil:
return err(NoRocksDbBackend)
let bulker = RockyBulkLoadRef.init(rocky)
defer: bulker.destroy()
if not bulker.begin(RockyBulkCache):
let error = CannotOpenRocksDbBulkSession
trace "Rocky hexary session initiation failed",
error, info=bulker.lastError()
return err(error)
#let keyList = toSeq(db.tab.keys)
# .filterIt(it.isNodeKey)
# .mapIt(it.convertTo(NodeTag))
# .sorted(cmp)
var
keyList = newSeq[NodeTag](db.tab.len)
inx = 0
for repairKey in db.tab.keys:
if repairKey.isNodeKey:
keyList[inx] = repairKey.convertTo(NodeTag)
inx.inc
if inx < db.tab.len:
return err(UnresolvedRepairNode)
keyList.sort(cmp)
for n,nodeTag in keyList:
let
nodeKey = nodeTag.to(NodeKey)
data = db.tab[nodeKey.to(RepairKey)].convertTo(Blob)
if not bulker.add(nodeKey.toStorageSlotsKey.toOpenArray, data):
let error = AddBulkItemFailed
trace "Rocky hexary bulk load failure",
n, len=db.tab.len, error, info=bulker.lastError()
return err(error)
if bulker.finish().isErr:
let error = CommitBulkItemsFailed
trace "Rocky hexary commit failure",
len=db.tab.len, error, info=bulker.lastError()
return err(error)
ok()
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------