nimbus-eth1/tests/test_sync_snap.nim
Jordan Hrycaj 33023aaf39
Update snap server client test scenario (#1518)
* Redesign snap1 message GetTrieNodes argument prototypes

why:
  A list of sub-objects `seq[SnapTriePath]` is more intuitive to work with
  than an opaque definition `seq[seq[Blob]]` because the inner object
  `SnapTriePath` object has a dedicated inner structure (for how to
  interprete `seq[Blob]`.)

* Collect some public constants into `constants.nim` file

* Reorg `hexary_paths.nim`

why:
+ Collecting nodes following a partial path properly ending at an
  extension node failed to collect this last node.
+ Merged the nodes collecting algorithm for persistent and in-memory
  into a single generic function `hexary_paths.rootPathExtend()`

info:
  Extracted common tasks to `hexary_nodes_helper.nim`

* Implement `StorageRanges` message handler for snap/1 protocol
2023-03-22 20:11:49 +00:00

568 lines
19 KiB
Nim

# Nimbus - Types, data structures and shared utilities used in network sync
#
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or
# distributed except according to those terms.
## Snap sync components tester and TDD environment
import
std/[distros, os, sets, sequtils, strformat, strutils, tables],
chronicles,
eth/[common, p2p],
rocksdb,
unittest2,
../nimbus/db/select_backend,
../nimbus/core/chain,
../nimbus/sync/types,
../nimbus/sync/snap/range_desc,
../nimbus/sync/snap/worker/db/[
hexary_desc, hexary_envelope, hexary_error, hexary_inspect, hexary_nearby,
hexary_paths, rocky_bulk_load, snapdb_accounts, snapdb_debug, snapdb_desc],
./replay/[pp, undump_accounts, undump_storages],
./test_sync_snap/[
bulk_test_xx, snap_test_xx,
test_accounts, test_calc, test_helpers, test_node_range, test_inspect,
test_pivot, test_storage, test_db_timing, test_types]
const
baseDir = [".", "..", ".."/"..", $DirSep]
repoDir = [".", "tests"/"replay", "tests"/"test_sync_snap",
"nimbus-eth1-blobs"/"replay"]
# Reference file for finding the database directory
sampleDirRefFile = "sample0.txt.gz"
# Standard test samples
bChainCapture = bulkTest0
accSample = snapTest0
storSample = snapTest4
# Number of database slots (needed for timing tests)
nTestDbInstances = 9
type
TestDbs = object
## Provide enough spare empty databases
persistent: bool
dbDir: string
baseDir: string # for cleanup
subDir: string # for cleanup
cdb: array[nTestDbInstances,ChainDb]
when defined(linux):
# The `detectOs(Ubuntu)` directive is not Windows compatible, causes an
# error when running the system command `lsb_release -d` in the background.
let isUbuntu32bit = detectOs(Ubuntu) and int.sizeof == 4
else:
const isUbuntu32bit = false
let
# There was a problem with the Github/CI which results in spurious crashes
# when leaving the `runner()` if the persistent ChainDBRef initialisation
# was present, see `test_custom_network` for more details.
disablePersistentDB = isUbuntu32bit
var
xTmpDir: string
xDbs: TestDbs # for repeated storage/overwrite tests
xTab32: Table[ByteArray32,Blob] # extracted data
xTab33: Table[ByteArray33,Blob]
# ------------------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------------------
proc findFilePath(file: string;
baseDir, repoDir: openArray[string]): Result[string,void] =
for dir in baseDir:
for repo in repoDir:
let path = dir / repo / file
if path.fileExists:
return ok(path)
echo "*** File not found \"", file, "\"."
err()
proc getTmpDir(sampleDir = sampleDirRefFile): string =
sampleDir.findFilePath(baseDir,repoDir).value.splitFile.dir
proc setTraceLevel {.used.} =
discard
when defined(chronicles_runtime_filtering) and loggingEnabled:
setLogLevel(LogLevel.TRACE)
proc setErrorLevel {.used.} =
discard
when defined(chronicles_runtime_filtering) and loggingEnabled:
setLogLevel(LogLevel.ERROR)
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc to(sample: AccountsSample; T: type seq[UndumpAccounts]): T =
## Convert test data into usable in-memory format
let file = sample.file.findFilePath(baseDir,repoDir).value
var root: Hash256
for w in file.undumpNextAccount:
let n = w.seenAccounts - 1
if n < sample.firstItem:
continue
if sample.lastItem < n:
break
if sample.firstItem == n:
root = w.root
elif w.root != root:
break
result.add w
proc to(sample: AccountsSample; T: type seq[UndumpStorages]): T =
## Convert test data into usable in-memory format
let file = sample.file.findFilePath(baseDir,repoDir).value
var root: Hash256
for w in file.undumpNextStorages:
let n = w.seenAccounts - 1 # storages selector based on accounts
if n < sample.firstItem:
continue
if sample.lastItem < n:
break
if sample.firstItem == n:
root = w.root
elif w.root != root:
break
result.add w
proc flushDbDir(s: string; subDir = "") =
if s != "":
let baseDir = s / "tmp"
for n in 0 ..< nTestDbInstances:
let instDir = if subDir == "": baseDir / $n else: baseDir / subDir / $n
if (instDir / "nimbus" / "data").dirExists:
# Typically under Windows: there might be stale file locks.
try: instDir.removeDir except CatchableError: discard
try: (baseDir / subDir).removeDir except CatchableError: discard
block dontClearUnlessEmpty:
for w in baseDir.walkDir:
break dontClearUnlessEmpty
try: baseDir.removeDir except CatchableError: discard
proc flushDbs(db: TestDbs) =
if db.persistent:
for n in 0 ..< nTestDbInstances:
if db.cdb[n].rocksStoreRef.isNil:
break
db.cdb[n].rocksStoreRef.store.db.rocksdb_close
db.baseDir.flushDbDir(db.subDir)
proc testDbs(
workDir: string;
subDir: string;
instances: int;
persistent: bool;
): TestDbs =
if disablePersistentDB or workDir == "" or not persistent:
result.persistent = false
result.dbDir = "*notused*"
else:
result.persistent = true
result.baseDir = workDir
result.subDir = subDir
if subDir != "":
result.dbDir = workDir / "tmp" / subDir
else:
result.dbDir = workDir / "tmp"
if result.persistent:
workDir.flushDbDir(subDir)
for n in 0 ..< min(result.cdb.len, instances):
result.cdb[n] = (result.dbDir / $n).newChainDB
proc snapDbRef(cdb: ChainDb; pers: bool): SnapDbRef =
if pers: SnapDbRef.init(cdb) else: SnapDbRef.init(newMemoryDB())
proc snapDbAccountsRef(cdb:ChainDb; root:Hash256; pers:bool):SnapDbAccountsRef =
SnapDbAccountsRef.init(cdb.snapDbRef(pers), root, Peer())
# ------------------------------------------------------------------------------
# Test Runners: accounts and accounts storages
# ------------------------------------------------------------------------------
proc miscRunner(noisy = true) =
suite "SyncSnap: Verify setup, constants, limits":
test "RLP accounts list sizes":
test_calcAccountsListSizes()
test "RLP proofs list sizes":
test_calcProofsListSizes()
test "RLP en/decode GetTrieNodes argument list":
test_calcTrieNodeTranscode()
proc accountsRunner(noisy = true; persistent = true; sample = accSample) =
let
accLst = sample.to(seq[UndumpAccounts])
root = accLst[0].root
tmpDir = getTmpDir()
db = tmpDir.testDbs(sample.name & "-accounts", instances=3, persistent)
info = if db.persistent: &"persistent db on \"{db.baseDir}\""
else: "in-memory db"
fileInfo = sample.file.splitPath.tail.replace(".txt.gz","")
defer:
db.flushDbs
suite &"SyncSnap: {fileInfo} accounts and proofs for {info}":
block:
# New common descriptor for this sub-group of tests
let
desc = db.cdb[0].snapDbAccountsRef(root, db.persistent)
hexaDb = desc.hexaDb
getFn = desc.getAccountFn
dbg = if noisy: hexaDb else: nil
test &"Proofing {accLst.len} list items for state root ..{root.pp}":
accLst.test_accountsImport(desc, db.persistent)
# debugging, make sure that state root ~ "$0"
hexaDb.assignPrettyKeys(root.to(NodeKey))
# Beware: dumping a large database is not recommended
# true.say "***", "database dump\n ", hexaDb.pp(root.to(NodeKey))
test &"Retrieve accounts & proofs for previous account ranges":
if db.persistent:
accLst.test_NodeRangeProof(getFn, dbg)
else:
accLst.test_NodeRangeProof(hexaDb, dbg)
test &"Verify left boundary checks":
if db.persistent:
accLst.test_NodeRangeLeftBoundary(getFn, dbg)
else:
accLst.test_NodeRangeLeftBoundary(hexaDb, dbg)
block:
# List of keys to be shared by sub-group
var accKeys: seq[NodeKey]
# New common descriptor for this sub-group of tests
let desc = db.cdb[1].snapDbAccountsRef(root, db.persistent)
test &"Merging {accLst.len} accounts/proofs lists into single list":
accLst.test_accountsMergeProofs(desc, accKeys) # set up `accKeys`
test &"Revisiting {accKeys.len} stored items on ChainDBRef":
accKeys.test_accountsRevisitStoredItems(desc, noisy)
test &"Decompose path prefix envelopes on {info}":
let hexaDb = desc.hexaDb
if db.persistent:
accKeys.test_NodeRangeDecompose(root, desc.getAccountFn, hexaDb)
else:
accKeys.test_NodeRangeDecompose(root, hexaDb, hexaDb)
# This one works with a new clean database in order to avoid some
# problems on observed qemu/Win7.
test &"Storing/retrieving {accKeys.len} stored items " &
"on persistent pivot/checkpoint registry":
if db.persistent:
accKeys.test_pivotStoreRead(db.cdb[2])
else:
skip()
proc storagesRunner(
noisy = true;
persistent = true;
sample = storSample;
knownFailures: seq[(string,seq[(int,HexaryError)])] = @[]) =
let
accLst = sample.to(seq[UndumpAccounts])
stoLst = sample.to(seq[UndumpStorages])
tmpDir = getTmpDir()
db = tmpDir.testDbs(sample.name & "-storages", instances=1, persistent)
info = if db.persistent: &"persistent db" else: "in-memory db"
idPfx = sample.file.splitPath.tail.replace(".txt.gz","")
defer:
db.flushDbs
suite &"SyncSnap: {idPfx} accounts storage for {info}":
let xdb = db.cdb[0].snapDbRef(db.persistent)
test &"Merging {accLst.len} accounts for state root ..{accLst[0].root.pp}":
accLst.test_storageAccountsImport(xdb, db.persistent)
test &"Merging {stoLst.len} storages lists":
stoLst.test_storageSlotsImport(xdb, db.persistent, knownFailures,idPfx)
test &"Inspecting {stoLst.len} imported storages lists sub-tries":
stoLst.test_storageSlotsTries(xdb, db.persistent, knownFailures,idPfx)
proc inspectionRunner(
noisy = true;
persistent = true;
cascaded = true;
sample: openArray[AccountsSample] = snapTestList) =
let
inspectList = sample.mapIt(it.to(seq[UndumpAccounts]))
tmpDir = getTmpDir()
db = tmpDir.testDbs(
sample[0].name & "-inspection", instances=nTestDbInstances, persistent)
info = if db.persistent: &"persistent db" else: "in-memory db"
fileInfo = "[" & sample[0].file.splitPath.tail.replace(".txt.gz","") & "..]"
defer:
db.flushDbs
suite &"SyncSnap: inspect {fileInfo} lists for {info} for healing":
var
singleStats: seq[(int,TrieNodeStat)]
accuStats: seq[(int,TrieNodeStat)]
let
ingerprinting = &"ingerprinting {inspectList.len}"
singleAcc = &"F{ingerprinting} single accounts lists"
accumAcc = &"F{ingerprinting} accumulated accounts"
cascAcc = &"Cascaded f{ingerprinting} accumulated accounts lists"
memBase = SnapDbRef.init(newMemoryDB())
dbSlot = proc(n: int): SnapDbRef =
if 2+n < nTestDbInstances and not db.cdb[2+n].rocksStoreRef.isNil:
return SnapDbRef.init(db.cdb[2+n])
test &"{singleAcc} for in-memory-db":
inspectList.test_inspectSingleAccountsMemDb(memBase, singleStats)
test &"{singleAcc} for persistent db":
if persistent:
inspectList.test_inspectSingleAccountsPersistent(dbSlot, singleStats)
else:
skip()
test &"{accumAcc} for in-memory-db":
inspectList.test_inspectAccountsInMemDb(memBase, accuStats)
test &"{accumAcc} for persistent db":
if persistent:
inspectList.test_inspectAccountsPersistent(db.cdb[0], accuStats)
else:
skip()
test &"{cascAcc} for in-memory-db":
if cascaded:
inspectList.test_inspectCascadedMemDb()
else:
skip()
test &"{cascAcc} for persistent db":
if cascaded and persistent:
inspectList.test_inspectCascadedPersistent(db.cdb[1])
else:
skip()
# ------------------------------------------------------------------------------
# Test Runners: database timing tests
# ------------------------------------------------------------------------------
proc importRunner(noisy = true; persistent = true; capture = bChainCapture) =
let
fileInfo = capture.file.splitFile.name.split(".")[0]
filePath = capture.file.findFilePath(baseDir,repoDir).value
tmpDir = getTmpDir()
db = tmpDir.testDbs(capture.name & "-import", instances=1, persistent)
numBlocks = capture.numBlocks
numBlocksInfo = if numBlocks == high(int): "" else: $numBlocks & " "
loadNoise = noisy
defer:
db.flushDbs
suite &"SyncSnap: using {fileInfo} capture for testing db timings":
var ddb: CommonRef # perstent DB on disk
test &"Create persistent ChainDBRef on {tmpDir}":
ddb = CommonRef.new(
db = if db.persistent: db.cdb[0].trieDB else: newMemoryDB(),
networkId = capture.network,
pruneTrie = true,
params = capture.network.networkParams)
ddb.initializeEmptyDb
test &"Storing {numBlocksInfo}persistent blocks from dump":
noisy.test_dbTimingUndumpBlocks(filePath, ddb, numBlocks, loadNoise)
test "Extract key-value records into memory tables via rocksdb iterator":
if db.cdb[0].rocksStoreRef.isNil:
skip() # not persistent => db.cdb[0] is nil
else:
noisy.test_dbTimingRockySetup(xTab32, xTab33, db.cdb[0])
proc dbTimingRunner(noisy = true; persistent = true; cleanUp = true) =
let
fullNoise = false
var
emptyDb = "empty"
# Allows to repeat storing on existing data
if not xDbs.cdb[0].isNil:
emptyDb = "pre-loaded"
else:
xTmpDir = getTmpDir()
xDbs = xTmpDir.testDbs(
"timing-runner", instances=nTestDbInstances, persistent)
defer:
if cleanUp:
xDbs.flushDbs
xDbs.reset
suite &"SyncSnap: storage tests on {emptyDb} databases":
#
# `xDbs` instance slots layout:
#
# * cdb[0] -- direct db, key length 32, no transaction
# * cdb[1] -- direct db, key length 32 as 33, no transaction
#
# * cdb[2] -- direct db, key length 32, transaction based
# * cdb[3] -- direct db, key length 32 as 33, transaction based
#
# * cdb[4] -- direct db, key length 33, no transaction
# * cdb[5] -- direct db, key length 33, transaction based
#
# * cdb[6] -- rocksdb, key length 32
# * cdb[7] -- rocksdb, key length 32 as 33
# * cdb[8] -- rocksdb, key length 33
#
doAssert 9 <= nTestDbInstances
doAssert not xDbs.cdb[8].isNil
let
storeDir32 = &"Directly store {xTab32.len} records"
storeDir33 = &"Directly store {xTab33.len} records"
storeTx32 = &"Transactionally store directly {xTab32.len} records"
storeTx33 = &"Transactionally store directly {xTab33.len} records"
intoTrieDb = &"into {emptyDb} trie db"
storeRks32 = &"Store {xTab32.len} records"
storeRks33 = &"Store {xTab33.len} records"
intoRksDb = &"into {emptyDb} rocksdb table"
if xTab32.len == 0 or xTab33.len == 0:
test &"Both tables with 32 byte keys(size={xTab32.len}), " &
&"33 byte keys(size={xTab32.len}) must be non-empty":
skip()
else:
test &"{storeDir32} (key length 32) {intoTrieDb}":
noisy.test_dbTimingStoreDirect32(xTab32, xDbs.cdb[0])
test &"{storeDir32} (key length 33) {intoTrieDb}":
noisy.test_dbTimingStoreDirectly32as33(xTab32, xDbs.cdb[1])
test &"{storeTx32} (key length 32) {intoTrieDb}":
noisy.test_dbTimingStoreTx32(xTab32, xDbs.cdb[2])
test &"{storeTx32} (key length 33) {intoTrieDb}":
noisy.test_dbTimingStoreTx32as33(xTab32, xDbs.cdb[3])
test &"{storeDir33} (key length 33) {intoTrieDb}":
noisy.test_dbTimingDirect33(xTab33, xDbs.cdb[4])
test &"{storeTx33} (key length 33) {intoTrieDb}":
noisy.test_dbTimingTx33(xTab33, xDbs.cdb[5])
if xDbs.cdb[0].rocksStoreRef.isNil:
test "The rocksdb interface must be available": skip()
else:
test &"{storeRks32} (key length 32) {intoRksDb}":
noisy.test_dbTimingRocky32(xTab32, xDbs.cdb[6], fullNoise)
test &"{storeRks32} (key length 33) {intoRksDb}":
noisy.test_dbTimingRocky32as33(xTab32, xDbs.cdb[7], fullNoise)
test &"{storeRks33} (key length 33) {intoRksDb}":
noisy.test_dbTimingRocky33(xTab33, xDbs.cdb[8], fullNoise)
# ------------------------------------------------------------------------------
# Main function(s)
# ------------------------------------------------------------------------------
proc syncSnapMain*(noisy = defined(debug)) =
noisy.miscRunner()
noisy.accountsRunner(persistent=true)
noisy.accountsRunner(persistent=false)
noisy.importRunner() # small sample, just verify functionality
noisy.inspectionRunner()
noisy.dbTimingRunner()
when isMainModule:
const
noisy = defined(debug) or true
#setTraceLevel()
setErrorLevel()
# Test constants, calculations etc.
when true: # and false:
noisy.miscRunner()
# This one uses dumps from the external `nimbus-eth1-blob` repo
when true and false:
import ./test_sync_snap/snap_other_xx
noisy.showElapsed("accountsRunner()"):
for n,sam in snapOtherList:
false.accountsRunner(persistent=true, sam)
noisy.showElapsed("inspectRunner()"):
for n,sam in snapOtherHealingList:
false.inspectionRunner(persistent=true, cascaded=false, sam)
# This one usues dumps from the external `nimbus-eth1-blob` repo
when true and false:
import ./test_sync_snap/snap_storage_xx
let knownFailures: KnownStorageFailure = @[
("storages5__34__41_dump#10", @[( 508, RootNodeMismatch)]),
]
noisy.showElapsed("storageRunner()"):
for n,sam in snapStorageList:
false.storagesRunner(persistent=true, sam, knownFailures)
# This one uses readily available dumps
when true: # and false:
false.inspectionRunner()
for n,sam in snapTestList:
false.accountsRunner(persistent=false, sam)
false.accountsRunner(persistent=true, sam)
for n,sam in snapTestStorageList:
false.accountsRunner(persistent=false, sam)
false.accountsRunner(persistent=true, sam)
false.storagesRunner(persistent=true, sam)
# This one uses the readily available dump: `bulkTest0` and some huge replay
# dumps `bulkTest1`, `bulkTest2`, .. from the `nimbus-eth1-blobs` package.
# For specs see `tests/test_sync_snap/bulk_test_xx.nim`.
when true and false:
# ---- database storage timings -------
for test in @[bulkTest0] & @[bulkTest1, bulkTest2, bulkTest3]:
noisy.showElapsed("importRunner()"):
noisy.importRunner(capture = test)
noisy.showElapsed("dbTimingRunner()"):
true.dbTimingRunner(cleanUp = false)
true.dbTimingRunner()
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------