nimbus-eth1/tests/test_sync_snap.nim
Jordan Hrycaj b793f0de8d
Snap sync extractor and sub range proofs cont1 (#1468)
* Redefine `seq[Blob]` => `seq[SnapProof]` for `snap/1` protocol

why:
  Proof nodes are traded as `Blob` type items rather than Nim objects. So
  the RLP transcoder must not extra wrap proofs which are of type
  seq[Blob]. Without custom encoding one would produce a
  `list(blob(item1), blob(item2) ..)` instead of `list(item1, item2 ..)`.

* Limit leaf extractor by RLP size rather than number of items

why:
  To be used serving `snap/1` requests, the result of function
  `hexaryRangeLeafsProof()` is limited by the maximal space
  needed to serialise the result which will be part of the
  `snap/1` repsonse.

* Let the range extractor `hexaryRangeLeafsProof()` return RLP list sizes

why:
  When collecting accounts, the size oft the accounts list when encoded
  as RLP is continually updated. So the summed up value is available
  anyway. For the proof nodes list, there are not many (~ 10) so summing
  up is not expensive here.
2023-02-15 10:14:40 +00:00

564 lines
19 KiB
Nim

# Nimbus - Types, data structures and shared utilities used in network sync
#
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or
# distributed except according to those terms.
## Snap sync components tester and TDD environment
import
std/[distros, os, sets, sequtils, strformat, strutils, tables],
chronicles,
eth/[common, p2p],
rocksdb,
unittest2,
../nimbus/db/select_backend,
../nimbus/core/chain,
../nimbus/sync/types,
../nimbus/sync/snap/range_desc,
../nimbus/sync/snap/worker/db/[
hexary_desc, hexary_envelope, hexary_error, hexary_inspect, hexary_nearby,
hexary_paths, rocky_bulk_load, snapdb_accounts, snapdb_desc],
./replay/[pp, undump_accounts, undump_storages],
./test_sync_snap/[
bulk_test_xx, snap_test_xx,
test_accounts, test_calc, test_helpers, test_node_range, test_inspect,
test_pivot, test_storage, test_db_timing, test_types]
const
baseDir = [".", "..", ".."/"..", $DirSep]
repoDir = [".", "tests"/"replay", "tests"/"test_sync_snap",
"nimbus-eth1-blobs"/"replay"]
# Reference file for finding the database directory
sampleDirRefFile = "sample0.txt.gz"
# Standard test samples
bChainCapture = bulkTest0
accSample = snapTest0
storSample = snapTest4
# Number of database slots (needed for timing tests)
nTestDbInstances = 9
type
TestDbs = object
## Provide enough spare empty databases
persistent: bool
dbDir: string
baseDir: string # for cleanup
subDir: string # for cleanup
cdb: array[nTestDbInstances,ChainDb]
when defined(linux):
# The `detectOs(Ubuntu)` directive is not Windows compatible, causes an
# error when running the system command `lsb_release -d` in the background.
let isUbuntu32bit = detectOs(Ubuntu) and int.sizeof == 4
else:
const isUbuntu32bit = false
let
# There was a problem with the Github/CI which results in spurious crashes
# when leaving the `runner()` if the persistent ChainDBRef initialisation
# was present, see `test_custom_network` for more details.
disablePersistentDB = isUbuntu32bit
var
xTmpDir: string
xDbs: TestDbs # for repeated storage/overwrite tests
xTab32: Table[ByteArray32,Blob] # extracted data
xTab33: Table[ByteArray33,Blob]
# ------------------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------------------
proc findFilePath(file: string;
baseDir, repoDir: openArray[string]): Result[string,void] =
for dir in baseDir:
for repo in repoDir:
let path = dir / repo / file
if path.fileExists:
return ok(path)
echo "*** File not found \"", file, "\"."
err()
proc getTmpDir(sampleDir = sampleDirRefFile): string =
sampleDir.findFilePath(baseDir,repoDir).value.splitFile.dir
proc setTraceLevel =
discard
when defined(chronicles_runtime_filtering) and loggingEnabled:
setLogLevel(LogLevel.TRACE)
proc setErrorLevel =
discard
when defined(chronicles_runtime_filtering) and loggingEnabled:
setLogLevel(LogLevel.ERROR)
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc to(sample: AccountsSample; T: type seq[UndumpAccounts]): T =
## Convert test data into usable in-memory format
let file = sample.file.findFilePath(baseDir,repoDir).value
var root: Hash256
for w in file.undumpNextAccount:
let n = w.seenAccounts - 1
if n < sample.firstItem:
continue
if sample.lastItem < n:
break
if sample.firstItem == n:
root = w.root
elif w.root != root:
break
result.add w
proc to(sample: AccountsSample; T: type seq[UndumpStorages]): T =
## Convert test data into usable in-memory format
let file = sample.file.findFilePath(baseDir,repoDir).value
var root: Hash256
for w in file.undumpNextStorages:
let n = w.seenAccounts - 1 # storages selector based on accounts
if n < sample.firstItem:
continue
if sample.lastItem < n:
break
if sample.firstItem == n:
root = w.root
elif w.root != root:
break
result.add w
proc flushDbDir(s: string; subDir = "") =
if s != "":
let baseDir = s / "tmp"
for n in 0 ..< nTestDbInstances:
let instDir = if subDir == "": baseDir / $n else: baseDir / subDir / $n
if (instDir / "nimbus" / "data").dirExists:
# Typically under Windows: there might be stale file locks.
try: instDir.removeDir except: discard
try: (baseDir / subDir).removeDir except: discard
block dontClearUnlessEmpty:
for w in baseDir.walkDir:
break dontClearUnlessEmpty
try: baseDir.removeDir except: discard
proc flushDbs(db: TestDbs) =
if db.persistent:
for n in 0 ..< nTestDbInstances:
if db.cdb[n].rocksStoreRef.isNil:
break
db.cdb[n].rocksStoreRef.store.db.rocksdb_close
db.baseDir.flushDbDir(db.subDir)
proc testDbs(
workDir: string;
subDir: string;
instances: int;
persistent: bool;
): TestDbs =
if disablePersistentDB or workDir == "" or not persistent:
result.persistent = false
result.dbDir = "*notused*"
else:
result.persistent = true
result.baseDir = workDir
result.subDir = subDir
if subDir != "":
result.dbDir = workDir / "tmp" / subDir
else:
result.dbDir = workDir / "tmp"
if result.persistent:
result.dbDir.flushDbDir
for n in 0 ..< min(result.cdb.len, instances):
result.cdb[n] = (result.dbDir / $n).newChainDB
proc snapDbRef(cdb: ChainDb; pers: bool): SnapDbRef =
if pers: SnapDbRef.init(cdb) else: SnapDbRef.init(newMemoryDB())
proc snapDbAccountsRef(cdb:ChainDb; root:Hash256; pers:bool):SnapDbAccountsRef =
SnapDbAccountsRef.init(cdb.snapDbRef(pers), root, Peer())
# ------------------------------------------------------------------------------
# Test Runners: accounts and accounts storages
# ------------------------------------------------------------------------------
proc miscRunner(noisy = true) =
suite "SyncSnap: Verify setup, constants, limits":
test "RLP accounts list sizes":
test_calcAccountsListSizes()
test "RLP proofs list sizes":
test_calcProofsListSizes()
proc accountsRunner(noisy = true; persistent = true; sample = accSample) =
let
accLst = sample.to(seq[UndumpAccounts])
root = accLst[0].root
tmpDir = getTmpDir()
db = tmpDir.testDbs(sample.name & "-accounts", instances=3, persistent)
info = if db.persistent: &"persistent db on \"{db.baseDir}\""
else: "in-memory db"
fileInfo = sample.file.splitPath.tail.replace(".txt.gz","")
defer:
db.flushDbs
suite &"SyncSnap: {fileInfo} accounts and proofs for {info}":
block:
# New common descriptor for this sub-group of tests
let
desc = db.cdb[0].snapDbAccountsRef(root, db.persistent)
hexaDb = desc.hexaDb
getFn = desc.getAccountFn
dbg = if noisy: hexaDb else: nil
test &"Proofing {accLst.len} list items for state root ..{root.pp}":
accLst.test_accountsImport(desc, db.persistent)
# debugging, make sure that state root ~ "$0"
desc.assignPrettyKeys()
# Beware: dumping a large database is not recommended
# true.say "***", "database dump\n ", desc.dumpHexaDB()
test &"Retrieve accounts & proofs for previous account ranges":
if db.persistent:
accLst.test_NodeRangeProof(getFn, dbg)
else:
accLst.test_NodeRangeProof(hexaDB, dbg)
test &"Verify left boundary checks":
if db.persistent:
accLst.test_NodeRangeLeftBoundary(getFn, dbg)
else:
accLst.test_NodeRangeLeftBoundary(hexaDB, dbg)
block:
# List of keys to be shared by sub-group
var accKeys: seq[NodeKey]
# New common descriptor for this sub-group of tests
let desc = db.cdb[1].snapDbAccountsRef(root, db.persistent)
test &"Merging {accLst.len} accounts/proofs lists into single list":
accLst.test_accountsMergeProofs(desc, accKeys) # set up `accKeys`
test &"Revisiting {accKeys.len} stored items on ChainDBRef":
accKeys.test_accountsRevisitStoredItems(desc, noisy)
test &"Decompose path prefix envelopes on {info}":
let hexaDb = desc.hexaDb
if db.persistent:
accKeys.test_NodeRangeDecompose(root, desc.getAccountFn, hexaDb)
else:
accKeys.test_NodeRangeDecompose(root, hexaDb, hexaDb)
# This one works with a new clean database in order to avoid some
# problems on observed qemu/Win7.
test &"Storing/retrieving {accKeys.len} stored items " &
"on persistent pivot/checkpoint registry":
if db.persistent:
accKeys.test_pivotStoreRead(db.cdb[2])
else:
skip()
proc storagesRunner(
noisy = true;
persistent = true;
sample = storSample;
knownFailures: seq[(string,seq[(int,HexaryError)])] = @[]) =
let
accLst = sample.to(seq[UndumpAccounts])
stoLst = sample.to(seq[UndumpStorages])
tmpDir = getTmpDir()
db = tmpDir.testDbs(sample.name & "-storages", instances=1, persistent)
info = if db.persistent: &"persistent db" else: "in-memory db"
idPfx = sample.file.splitPath.tail.replace(".txt.gz","")
defer:
db.flushDbs
suite &"SyncSnap: {idPfx} accounts storage for {info}":
let xdb = db.cdb[0].snapDbRef(db.persistent)
test &"Merging {accLst.len} accounts for state root ..{accLst[0].root.pp}":
accLst.test_storageAccountsImport(xdb, db.persistent)
test &"Merging {stoLst.len} storages lists":
stoLst.test_storageSlotsImport(xdb, db.persistent, knownFailures,idPfx)
test &"Inspecting {stoLst.len} imported storages lists sub-tries":
stoLst.test_storageSlotsTries(xdb, db.persistent, knownFailures,idPfx)
proc inspectionRunner(
noisy = true;
persistent = true;
cascaded = true;
sample: openArray[AccountsSample] = snapTestList) =
let
inspectList = sample.mapIt(it.to(seq[UndumpAccounts]))
tmpDir = getTmpDir()
db = tmpDir.testDbs(
sample[0].name & "-inspection", instances=nTestDbInstances, persistent)
info = if db.persistent: &"persistent db" else: "in-memory db"
fileInfo = "[" & sample[0].file.splitPath.tail.replace(".txt.gz","") & "..]"
defer:
db.flushDbs
suite &"SyncSnap: inspect {fileInfo} lists for {info} for healing":
var
singleStats: seq[(int,TrieNodeStat)]
accuStats: seq[(int,TrieNodeStat)]
let
ingerprinting = &"ingerprinting {inspectList.len}"
singleAcc = &"F{ingerprinting} single accounts lists"
accumAcc = &"F{ingerprinting} accumulated accounts"
cascAcc = &"Cascaded f{ingerprinting} accumulated accounts lists"
memBase = SnapDbRef.init(newMemoryDB())
dbSlot = proc(n: int): SnapDbRef =
if 2+n < nTestDbInstances and not db.cdb[2+n].rocksStoreRef.isNil:
return SnapDbRef.init(db.cdb[2+n])
test &"{singleAcc} for in-memory-db":
inspectList.test_inspectSingleAccountsMemDb(memBase, singleStats)
test &"{singleAcc} for persistent db":
if persistent:
inspectList.test_inspectSingleAccountsPersistent(dbSlot, singleStats)
else:
skip()
test &"{accumAcc} for in-memory-db":
inspectList.test_inspectAccountsInMemDb(memBase, accuStats)
test &"{accumAcc} for persistent db":
if persistent:
inspectList.test_inspectAccountsPersistent(db.cdb[0], accuStats)
else:
skip()
test &"{cascAcc} for in-memory-db":
if cascaded:
inspectList.test_inspectCascadedMemDb()
else:
skip()
test &"{cascAcc} for persistent db":
if cascaded and persistent:
inspectList.test_inspectCascadedPersistent(db.cdb[1])
else:
skip()
# ------------------------------------------------------------------------------
# Test Runners: database timing tests
# ------------------------------------------------------------------------------
proc importRunner(noisy = true; persistent = true; capture = bChainCapture) =
let
fileInfo = capture.file.splitFile.name.split(".")[0]
filePath = capture.file.findFilePath(baseDir,repoDir).value
tmpDir = getTmpDir()
db = tmpDir.testDbs(capture.name & "-import", instances=1, persistent)
numBlocks = capture.numBlocks
numBlocksInfo = if numBlocks == high(int): "" else: $numBlocks & " "
loadNoise = noisy
defer:
db.flushDbs
suite &"SyncSnap: using {fileInfo} capture for testing db timings":
var ddb: CommonRef # perstent DB on disk
test &"Create persistent ChainDBRef on {tmpDir}":
ddb = CommonRef.new(
db = if db.persistent: db.cdb[0].trieDB else: newMemoryDB(),
networkId = capture.network,
pruneTrie = true,
params = capture.network.networkParams)
ddb.initializeEmptyDb
test &"Storing {numBlocksInfo}persistent blocks from dump":
noisy.test_dbTimingUndumpBlocks(filePath, ddb, numBlocks, loadNoise)
test "Extract key-value records into memory tables via rocksdb iterator":
if db.cdb[0].rocksStoreRef.isNil:
skip() # not persistent => db.cdb[0] is nil
else:
noisy.test_dbTimingRockySetup(xTab32, xTab33, db.cdb[0])
proc dbTimingRunner(noisy = true; persistent = true; cleanUp = true) =
let
fullNoise = false
var
emptyDb = "empty"
# Allows to repeat storing on existing data
if not xDbs.cdb[0].isNil:
emptyDb = "pre-loaded"
else:
xTmpDir = getTmpDir()
xDbs = xTmpDir.testDbs(
"timing-runner", instances=nTestDbInstances, persistent)
defer:
if cleanUp:
xDbs.flushDbs
xDbs.reset
suite &"SyncSnap: storage tests on {emptyDb} databases":
#
# `xDbs` instance slots layout:
#
# * cdb[0] -- direct db, key length 32, no transaction
# * cdb[1] -- direct db, key length 32 as 33, no transaction
#
# * cdb[2] -- direct db, key length 32, transaction based
# * cdb[3] -- direct db, key length 32 as 33, transaction based
#
# * cdb[4] -- direct db, key length 33, no transaction
# * cdb[5] -- direct db, key length 33, transaction based
#
# * cdb[6] -- rocksdb, key length 32
# * cdb[7] -- rocksdb, key length 32 as 33
# * cdb[8] -- rocksdb, key length 33
#
doAssert 9 <= nTestDbInstances
doAssert not xDbs.cdb[8].isNil
let
storeDir32 = &"Directly store {xTab32.len} records"
storeDir33 = &"Directly store {xTab33.len} records"
storeTx32 = &"Transactionally store directly {xTab32.len} records"
storeTx33 = &"Transactionally store directly {xTab33.len} records"
intoTrieDb = &"into {emptyDb} trie db"
storeRks32 = &"Store {xTab32.len} records"
storeRks33 = &"Store {xTab33.len} records"
intoRksDb = &"into {emptyDb} rocksdb table"
if xTab32.len == 0 or xTab33.len == 0:
test &"Both tables with 32 byte keys(size={xTab32.len}), " &
&"33 byte keys(size={xTab32.len}) must be non-empty":
skip()
else:
test &"{storeDir32} (key length 32) {intoTrieDb}":
noisy.test_dbTimingStoreDirect32(xTab32, xDbs.cdb[0])
test &"{storeDir32} (key length 33) {intoTrieDb}":
noisy.test_dbTimingStoreDirectly32as33(xTab32, xDbs.cdb[1])
test &"{storeTx32} (key length 32) {intoTrieDb}":
noisy.test_dbTimingStoreTx32(xTab32, xDbs.cdb[2])
test &"{storeTx32} (key length 33) {intoTrieDb}":
noisy.test_dbTimingStoreTx32as33(xTab32, xDbs.cdb[3])
test &"{storeDir33} (key length 33) {intoTrieDb}":
noisy.test_dbTimingDirect33(xTab33, xDbs.cdb[4])
test &"{storeTx33} (key length 33) {intoTrieDb}":
noisy.test_dbTimingTx33(xTab33, xDbs.cdb[5])
if xDbs.cdb[0].rocksStoreRef.isNil:
test "The rocksdb interface must be available": skip()
else:
test &"{storeRks32} (key length 32) {intoRksDb}":
noisy.test_dbTimingRocky32(xTab32, xDbs.cdb[6], fullNoise)
test &"{storeRks32} (key length 33) {intoRksDb}":
noisy.test_dbTimingRocky32as33(xTab32, xDbs.cdb[7], fullNoise)
test &"{storeRks33} (key length 33) {intoRksDb}":
noisy.test_dbTimingRocky33(xTab33, xDbs.cdb[8], fullNoise)
# ------------------------------------------------------------------------------
# Main function(s)
# ------------------------------------------------------------------------------
proc syncSnapMain*(noisy = defined(debug)) =
noisy.miscRunner()
noisy.accountsRunner(persistent=true)
noisy.accountsRunner(persistent=false)
noisy.importRunner() # small sample, just verify functionality
noisy.inspectionRunner()
noisy.dbTimingRunner()
when isMainModule:
const
noisy = defined(debug) or true
#setTraceLevel()
setErrorLevel()
# Test constant, calculations etc.
when true: # and false:
noisy.miscRunner()
# This one uses dumps from the external `nimbus-eth1-blob` repo
when true and false:
import ./test_sync_snap/snap_other_xx
noisy.showElapsed("accountsRunner()"):
for n,sam in snapOtherList:
false.accountsRunner(persistent=true, sam)
noisy.showElapsed("inspectRunner()"):
for n,sam in snapOtherHealingList:
false.inspectionRunner(persistent=true, cascaded=false, sam)
# This one usues dumps from the external `nimbus-eth1-blob` repo
when true and false:
import ./test_sync_snap/snap_storage_xx
let knownFailures: KnownStorageFailure = @[
("storages5__34__41_dump#10", @[( 508, RootNodeMismatch)]),
]
noisy.showElapsed("storageRunner()"):
for n,sam in snapStorageList:
false.storagesRunner(persistent=true, sam, knownFailures)
# This one uses readily available dumps
when true: # and false:
false.inspectionRunner()
for n,sam in snapTestList:
false.accountsRunner(persistent=false, sam)
false.accountsRunner(persistent=true, sam)
for n,sam in snapTestStorageList:
false.accountsRunner(persistent=false, sam)
false.accountsRunner(persistent=true, sam)
false.storagesRunner(persistent=true, sam)
# This one uses the readily available dump: `bulkTest0` and some huge replay
# dumps `bulkTest1`, `bulkTest2`, .. from the `nimbus-eth1-blobs` package
when true and false:
# ---- database storage timings -------
for test in @[bulkTest0] & @[bulkTest1, bulkTest2, bulkTest3]:
noisy.showElapsed("importRunner()"):
noisy.importRunner(capture = test)
noisy.showElapsed("dbTimingRunner()"):
true.dbTimingRunner(cleanUp = false)
true.dbTimingRunner()
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------