diff --git a/nimbus/sync/snap/worker/db/snapdb_desc.nim b/nimbus/sync/snap/worker/db/snapdb_desc.nim index 580a03744..1a8e0242a 100644 --- a/nimbus/sync/snap/worker/db/snapdb_desc.nim +++ b/nimbus/sync/snap/worker/db/snapdb_desc.nim @@ -323,7 +323,31 @@ proc dumpPath*(ps: SnapDbBaseRef; key: NodeTag): seq[string] = result = rPath.path.mapIt(it.pp(ps.hexaDb)) & @["(" & rPath.tail.pp & ")"] proc dumpHexaDB*(ps: SnapDbBaseRef; indent = 4): string = - ## Dump the entries from the a generic accounts trie. + ## Dump the entries from the a generic accounts trie. These are + ## key value pairs for + ## :: + ## Branch: ($1,b(<$2,$3,..,$17>,)) + ## Extension: ($18,e(832b5e..06e697,$19)) + ## Leaf: ($20,l(cc9b5d..1c3b4,f84401..f9e5129d[#70])) + ## + ## where keys are typically represented as `$` or `¶` or `ø` + ## depending on whether a key is final (`$`), temporary (`¶`) + ## or unset/missing (`ø`). + ## + ## The node types are indicated by a letter after the first key before + ## the round brackets + ## :: + ## Branch: 'b', 'þ', or 'B' + ## Extension: 'e', '€', or 'E' + ## Leaf: 'l', 'ł', or 'L' + ## + ## Here a small letter indicates a `Static` node which was from the + ## original `proofs` list, a capital letter indicates a `Mutable` node + ## added on the fly which might need some change, and the decorated + ## letters stand for `Locked` nodes which are like `Static` ones but + ## added later (typically these nodes are update `Mutable` nodes.) + ## + ## Beware: dumping a large database is not recommended ps.hexaDb.pp(ps.root,indent) proc hexaryPpFn*(ps: SnapDbBaseRef): HexaryPpFn = diff --git a/tests/test_sync_snap.nim b/tests/test_sync_snap.nim index 680faedad..72ed007f7 100644 --- a/tests/test_sync_snap.nim +++ b/tests/test_sync_snap.nim @@ -12,27 +12,23 @@ ## Snap sync components tester and TDD environment import - std/[algorithm, distros, hashes, math, os, sets, - sequtils, strformat, strutils, tables, times], + std/[distros, os, sets, sequtils, strformat, strutils, tables], chronicles, - eth/[common, p2p, rlp], - eth/trie/[nibbles], + eth/[common, p2p], rocksdb, - stint, - stew/[byteutils, interval_set, results], unittest2, - ../nimbus/common/common as nimbus_common, # avoid name clash - ../nimbus/db/[select_backend, storage_types], + ../nimbus/db/select_backend, ../nimbus/core/chain, ../nimbus/sync/types, ../nimbus/sync/snap/range_desc, ../nimbus/sync/snap/worker/db/[ hexary_desc, hexary_envelope, hexary_error, hexary_inspect, hexary_nearby, - hexary_paths, rocky_bulk_load, snapdb_accounts, snapdb_desc, snapdb_pivot, - snapdb_storage_slots], - ../nimbus/utils/prettify, - ./replay/[pp, undump_blocks, undump_accounts, undump_storages], - ./test_sync_snap/[bulk_test_xx, snap_test_xx, test_decompose, test_types] + hexary_paths, rocky_bulk_load, snapdb_accounts, snapdb_desc], + ./replay/[pp, undump_accounts, undump_storages], + ./test_sync_snap/[ + bulk_test_xx, snap_test_xx, + test_accounts, test_node_range, test_inspect, test_pivot, test_storage, + test_db_timing, test_types] const baseDir = [".", "..", ".."/"..", $DirSep] @@ -67,7 +63,6 @@ else: let # Forces `check()` to print the error (as opposed when using `isOk()`) OkHexDb = Result[void,HexaryError].ok() - OkStoDb = Result[void,seq[(int,HexaryError)]].ok() # There was a problem with the Github/CI which results in spurious crashes # when leaving the `runner()` if the persistent ChainDBRef initialisation @@ -79,30 +74,11 @@ var xDbs: TestDbs # for repeated storage/overwrite tests xTab32: Table[ByteArray32,Blob] # extracted data xTab33: Table[ByteArray33,Blob] - xVal32Sum, xVal32SqSum: float # statistics - xVal33Sum, xVal33SqSum: float # ------------------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------------------ -proc isOk(rc: ValidationResult): bool = - rc == ValidationResult.OK - -proc isImportOk(rc: Result[SnapAccountsGaps,HexaryError]): bool = - if rc.isErr: - check rc.error == NothingSerious # prints an error if different - elif 0 < rc.value.innerGaps.len: - check rc.value.innerGaps == seq[NodeSpecs].default - else: - return true - -proc toStoDbRc(r: seq[HexaryNodeReport]): Result[void,seq[(int,HexaryError)]]= - ## Kludge: map error report to (older version) return code - if r.len != 0: - return err(r.mapIt((it.slot.get(otherwise = -1),it.error))) - ok() - proc findFilePath(file: string; baseDir, repoDir: openArray[string]): Result[string,void] = for dir in baseDir: @@ -116,31 +92,6 @@ proc findFilePath(file: string; proc getTmpDir(sampleDir = sampleDirRefFile): string = sampleDir.findFilePath(baseDir,repoDir).value.splitFile.dir -proc pp(d: Duration): string = - if 40 < d.inSeconds: - d.ppMins - elif 200 < d.inMilliseconds: - d.ppSecs - elif 200 < d.inMicroseconds: - d.ppMs - else: - d.ppUs - -proc pp(rc: Result[Account,HexaryError]): string = - if rc.isErr: $rc.error else: rc.value.pp - -proc pp(rc: Result[Hash256,HexaryError]): string = - if rc.isErr: $rc.error else: $rc.value.to(NodeTag) - -proc pp(rc: Result[TrieNodeStat,HexaryError]; db: SnapDbBaseRef): string = - if rc.isErr: $rc.error else: rc.value.pp(db.hexaDb) - -proc pp(a: NodeKey; collapse = true): string = - a.to(Hash256).pp(collapse) - -proc ppKvPc(w: openArray[(string,int)]): string = - w.mapIt(&"{it[0]}={it[1]}%").join(", ") - proc say*(noisy = false; pfx = "***"; args: varargs[string, `$`]) = if noisy: if args.len == 0: @@ -196,34 +147,6 @@ proc to(sample: AccountsSample; T: type seq[UndumpStorages]): T = break result.add w -proc to(b: openArray[byte]; T: type ByteArray32): T = - ## Convert to other representation (or exception) - if b.len == 32: - (addr result[0]).copyMem(unsafeAddr b[0], 32) - else: - doAssert b.len == 32 - -proc to(b: openArray[byte]; T: type ByteArray33): T = - ## Convert to other representation (or exception) - if b.len == 33: - (addr result[0]).copyMem(unsafeAddr b[0], 33) - else: - doAssert b.len == 33 - -proc to(b: ByteArray32|ByteArray33; T: type Blob): T = - b.toSeq - -proc to(b: openArray[byte]; T: type NodeTag): T = - ## Convert from serialised equivalent - UInt256.fromBytesBE(b).T - -proc to(w: (byte, NodeTag); T: type Blob): T = - let (b,t) = w - @[b] & toSeq(t.UInt256.toBytesBE) - -proc to(t: NodeTag; T: type Blob): T = - toSeq(t.UInt256.toBytesBE) - proc flushDbDir(s: string; subDir = "") = if s != "": let baseDir = s / "tmp" @@ -256,25 +179,11 @@ proc testDbs(workDir = ""; subDir = ""; instances = nTestDbInstances): TestDbs = proc lastTwo(a: openArray[string]): seq[string] = if 1 < a.len: @[a[^2],a[^1]] else: a.toSeq -proc flatten(list: openArray[seq[Blob]]): seq[Blob] = - for w in list: - result.add w +proc snapDbRef(cdb: ChainDb; pers: bool): SnapDbRef = + if pers: SnapDbRef.init(cdb) else: SnapDbRef.init(newMemoryDB()) -proc thisRecord(r: rocksdb_iterator_t): (Blob,Blob) = - var kLen, vLen: csize_t - let - kData = r.rocksdb_iter_key(addr kLen) - vData = r.rocksdb_iter_value(addr vLen) - if not kData.isNil and not vData.isNil: - let - key = string.fromBytes(toOpenArrayByte(kData,0,int(kLen)-1)) - value = string.fromBytes(toOpenArrayByte(vData,0,int(vLen)-1)) - return (key.mapIt(it.byte),value.mapIt(it.byte)) - -proc meanStdDev(sum, sqSum: float; length: int): (float,float) = - if 0 < length: - result[0] = sum / length.float - result[1] = sqrt(sqSum / length.float - result[0] * result[0]) +proc snapDbAccountsRef(cdb:ChainDb; root:Hash256; pers:bool):SnapDbAccountsRef = + SnapDbAccountsRef.init(cdb.snapDbRef(pers), root, Peer()) # ------------------------------------------------------------------------------ # Test Runners: accounts and accounts storages @@ -283,8 +192,8 @@ proc meanStdDev(sum, sqSum: float; length: int): (float,float) = proc accountsRunner(noisy = true; persistent = true; sample = accSample) = let peer = Peer.new - accountsList = sample.to(seq[UndumpAccounts]) - root = accountsList[0].root + accLst = sample.to(seq[UndumpAccounts]) + root = accLst[0].root tmpDir = getTmpDir() db = if persistent: tmpDir.testDbs(sample.name, instances=2) else: testDbs() dbDir = db.dbDir.split($DirSep).lastTwo.join($DirSep) @@ -300,170 +209,41 @@ proc accountsRunner(noisy = true; persistent = true; sample = accSample) = tmpDir.flushDbDir(sample.name) suite &"SyncSnap: {fileInfo} accounts and proofs for {info}": - var - desc: SnapDbAccountsRef - accKeys: seq[NodeKey] + test &"Proofing {accLst.len} items for state root ..{root.pp}": + let desc = db.cdb[0].snapDbAccountsRef(root, db.persistent) + accLst.test_accountsImport(desc, db.persistent) - test &"Snap-proofing {accountsList.len} items for state root ..{root.pp}": + var accKeys: seq[NodeKey] + + block: let - dbBase = if persistent: SnapDbRef.init(db.cdb[0]) - else: SnapDbRef.init(newMemoryDB()) - dbDesc = SnapDbAccountsRef.init(dbBase, root, peer) - for n,w in accountsList: - check dbDesc.importAccounts(w.base, w.data, persistent).isImportOk + # Common descriptor for this group of tests + desc = db.cdb[1].snapDbAccountsRef(root, db.persistent) - test &"Merging {accountsList.len} proofs for state root ..{root.pp}": - let dbBase = if persistent: SnapDbRef.init(db.cdb[1]) - else: SnapDbRef.init(newMemoryDB()) - desc = SnapDbAccountsRef.init(dbBase, root, peer) + # Database abstractions + getFn = desc.getAccountFn # pestistent case + hexaDB = desc.hexaDB # in-memory, and debugging setup - # Load/accumulate data from several samples (needs some particular sort) - let baseTag = accountsList.mapIt(it.base).sortMerge - let packed = PackedAccountRange( - accounts: accountsList.mapIt(it.data.accounts).sortMerge, - proof: accountsList.mapIt(it.data.proof).flatten) - # Merging intervals will produce gaps, so the result is expected OK but - # different from `.isImportOk` - check desc.importAccounts(baseTag, packed, true).isOk + test &"Merging {accLst.len} proofs for state root ..{root.pp}": + accLst.test_accountsMergeProofs(desc, accKeys) # set up `accKeys` - # check desc.merge(lowerBound, accounts) == OkHexDb - desc.assignPrettyKeys() # for debugging, make sure that state root ~ "$0" + test &"Revisiting {accKeys.len} stored items on ChainDBRef": + accKeys.test_accountsRevisitStoredItems(desc, noisy) + # Beware: dumping a large database is not recommended + # true.say "***", "database dump\n ", desc.dumpHexaDB() - # Update list of accounts. There might be additional accounts in the set - # of proof nodes, typically before the `lowerBound` of each block. As - # there is a list of account ranges (that were merged for testing), one - # need to check for additional records only on either end of a range. - var keySet = packed.accounts.mapIt(it.accKey).toHashSet - for w in accountsList: - var key = desc.prevAccountsChainDbKey(w.data.accounts[0].accKey) - while key.isOk and key.value notin keySet: - keySet.incl key.value - let newKey = desc.prevAccountsChainDbKey(key.value) - check newKey != key - key = newKey - key = desc.nextAccountsChainDbKey(w.data.accounts[^1].accKey) - while key.isOk and key.value notin keySet: - keySet.incl key.value - let newKey = desc.nextAccountsChainDbKey(key.value) - check newKey != key - key = newKey - accKeys = toSeq(keySet).mapIt(it.to(NodeTag)).sorted(cmp) - .mapIt(it.to(NodeKey)) - check packed.accounts.len <= accKeys.len - - test &"Revisiting {accKeys.len} items stored items on ChainDBRef": - var - nextAccount = accKeys[0] - prevAccount: NodeKey - count = 0 - for accKey in accKeys: - count.inc - let - pfx = $count & "#" - byChainDB = desc.getAccountsChainDb(accKey) - byNextKey = desc.nextAccountsChainDbKey(accKey) - byPrevKey = desc.prevAccountsChainDbKey(accKey) - noisy.say "*** find", - "<", count, "> byChainDb=", byChainDB.pp - check byChainDB.isOk - - # Check `next` traversal funcionality. If `byNextKey.isOk` fails, the - # `nextAccount` value is still the old one and will be different from - # the account in the next for-loop cycle (if any.) - check pfx & accKey.pp(false) == pfx & nextAccount.pp(false) - if byNextKey.isOk: - nextAccount = byNextKey.get(otherwise = NodeKey.default) - - # Check `prev` traversal funcionality - if prevAccount != NodeKey.default: - check byPrevKey.isOk - if byPrevKey.isOk: - check pfx & byPrevKey.value.pp(false) == pfx & prevAccount.pp(false) - prevAccount = accKey - - # Hexary trie memory database dump. These are key value pairs for - # :: - # Branch: ($1,b(<$2,$3,..,$17>,)) - # Extension: ($18,e(832b5e..06e697,$19)) - # Leaf: ($20,l(cc9b5d..1c3b4,f84401..f9e5129d[#70])) - # - # where keys are typically represented as `$` or `¶` or `ø` - # depending on whether a key is final (`$`), temporary (`¶`) - # or unset/missing (`ø`). - # - # The node types are indicated by a letter after the first key before - # the round brackets - # :: - # Branch: 'b', 'þ', or 'B' - # Extension: 'e', '€', or 'E' - # Leaf: 'l', 'ł', or 'L' - # - # Here a small letter indicates a `Static` node which was from the - # original `proofs` list, a capital letter indicates a `Mutable` node - # added on the fly which might need some change, and the decorated - # letters stand for `Locked` nodes which are like `Static` ones but - # added later (typically these nodes are update `Mutable` nodes.) - # - # Beware: dumping a large database is not recommended - #true.say "***", "database dump\n ", desc.dumpHexaDB() - - test &"Decompose path prefix envelopes on {info}": - if db.persistent: - # Store accounts persistent accounts DB - accKeys.test_decompose(root.to(NodeKey), desc.getAccountFn, desc.hexaDB) - else: - accKeys.test_decompose(root.to(NodeKey), desc.hexaDB, desc.hexaDB) + test &"Decompose path prefix envelopes on {info}": + if db.persistent: + accKeys.test_NodeRangeDecompose(root, getFn, hexaDB) + else: + accKeys.test_NodeRangeDecompose(root, hexaDB, hexaDB) test &"Storing/retrieving {accKeys.len} items " & "on persistent pivot/checkpoint registry": - if not persistent: - skip() + if db.persistent: + accKeys.test_pivotStoreRead(db.cdb[0]) else: - let - dbBase = SnapDbRef.init(db.cdb[0]) - processed = @[(1.to(NodeTag),2.to(NodeTag)), - (4.to(NodeTag),5.to(NodeTag)), - (6.to(NodeTag),7.to(NodeTag))] - slotAccounts = seq[NodeKey].default - for n,w in accKeys: - check dbBase.savePivot( - SnapDbPivotRegistry( - header: BlockHeader(stateRoot: w.to(Hash256)), - nAccounts: n.uint64, - nSlotLists: n.uint64, - processed: processed, - slotAccounts: slotAccounts)).isOk - # verify latest state root - block: - let rc = dbBase.recoverPivot() - check rc.isOk - if rc.isOk: - check rc.value.nAccounts == n.uint64 - check rc.value.nSlotLists == n.uint64 - check rc.value.processed == processed - for n,w in accKeys: - block: - let rc = dbBase.recoverPivot(w) - check rc.isOk - if rc.isOk: - check rc.value.nAccounts == n.uint64 - check rc.value.nSlotLists == n.uint64 - # Update record in place - check dbBase.savePivot( - SnapDbPivotRegistry( - header: BlockHeader(stateRoot: w.to(Hash256)), - nAccounts: n.uint64, - nSlotLists: 0, - processed: @[], - slotAccounts: @[])).isOk - block: - let rc = dbBase.recoverPivot(w) - check rc.isOk - if rc.isOk: - check rc.value.nAccounts == n.uint64 - check rc.value.nSlotLists == 0 - check rc.value.processed == seq[(NodeTag,NodeTag)].default - + skip() proc storagesRunner( noisy = true; @@ -471,16 +251,14 @@ proc storagesRunner( sample = storSample; knownFailures: seq[(string,seq[(int,HexaryError)])] = @[]) = let - peer = Peer.new - accountsList = sample.to(seq[UndumpAccounts]) - storagesList = sample.to(seq[UndumpStorages]) - root = accountsList[0].root + accLst = sample.to(seq[UndumpAccounts]) + stoLst = sample.to(seq[UndumpStorages]) tmpDir = getTmpDir() db = if persistent: tmpDir.testDbs(sample.name, instances=1) else: testDbs() dbDir = db.dbDir.split($DirSep).lastTwo.join($DirSep) info = if db.persistent: &"persistent db on \"{dbDir}\"" else: "in-memory db" - fileInfo = sample.file.splitPath.tail.replace(".txt.gz","") + idPfx = sample.file.splitPath.tail.replace(".txt.gz","") defer: if db.persistent: @@ -488,48 +266,17 @@ proc storagesRunner( db.cdb[0].rocksStoreRef.store.db.rocksdb_close tmpDir.flushDbDir(sample.name) - suite &"SyncSnap: {fileInfo} accounts storage for {info}": - let - dbBase = if persistent: SnapDbRef.init(db.cdb[0]) - else: SnapDbRef.init(newMemoryDB()) + suite &"SyncSnap: {idPfx} accounts storage for {info}": + let xdb = db.cdb[0].snapDbRef(db.persistent) - test &"Merging {accountsList.len} accounts for state root ..{root.pp}": - for w in accountsList: - let desc = SnapDbAccountsRef.init(dbBase, root, peer) - check desc.importAccounts(w.base, w.data, persistent).isImportOk + test &"Merging {accLst.len} accounts for state root ..{accLst[0].root.pp}": + accLst.test_storageAccountsImport(xdb, db.persistent) - test &"Merging {storagesList.len} storages lists": - let - dbDesc = SnapDbStorageSlotsRef.init( - dbBase, Hash256().to(NodeKey), Hash256(), peer) - ignore = knownFailures.toTable - for n,w in storagesList: - let - testId = fileInfo & "#" & $n - expRc = if ignore.hasKey(testId): - Result[void,seq[(int,HexaryError)]].err(ignore[testId]) - else: - OkStoDb - check dbDesc.importStorageSlots(w.data, persistent).toStoDbRc == expRc - - test &"Inspecting {storagesList.len} imported storages lists sub-tries": - let ignore = knownFailures.toTable - for n,w in storagesList: - let - testId = fileInfo & "#" & $n - errInx = if ignore.hasKey(testId): ignore[testId][0][0] - else: high(int) - for m in 0 ..< w.data.storages.len: - let - accKey = w.data.storages[m].account.accKey - root = w.data.storages[m].account.storageRoot - dbDesc = SnapDbStorageSlotsRef.init(dbBase, accKey, root, peer) - rc = dbDesc.inspectStorageSlotsTrie(persistent=persistent) - if m == errInx: - check rc == Result[TrieNodeStat,HexaryError].err(TrieIsEmpty) - else: - check rc.isOk # ok => level > 0 and not stopped + test &"Merging {stoLst.len} storages lists": + stoLst.test_storageSlotsImport(xdb, db.persistent, knownFailures,idPfx) + test &"Inspecting {stoLst.len} imported storages lists sub-tries": + stoLst.test_storageSlotsTries(xdb, db.persistent, knownFailures,idPfx) proc inspectionRunner( noisy = true; @@ -537,7 +284,6 @@ proc inspectionRunner( cascaded = true; sample: openArray[AccountsSample] = snapTestList) = let - peer = Peer.new inspectList = sample.mapIt(it.to(seq[UndumpAccounts])) tmpDir = getTmpDir() db = if persistent: tmpDir.testDbs(sample[0].name) else: testDbs() @@ -555,203 +301,49 @@ proc inspectionRunner( tmpDir.flushDbDir(sample[0].name) suite &"SyncSnap: inspect {fileInfo} lists for {info} for healing": - let - memBase = SnapDbRef.init(newMemoryDB()) - memDesc = SnapDbAccountsRef.init(memBase, Hash256(), peer) var singleStats: seq[(int,TrieNodeStat)] accuStats: seq[(int,TrieNodeStat)] - perBase,altBase: SnapDbRef - perDesc,altDesc: SnapDbAccountsRef - if persistent: - perBase = SnapDbRef.init(db.cdb[0]) - perDesc = SnapDbAccountsRef.init(perBase, Hash256(), peer) - altBase = SnapDbRef.init(db.cdb[1]) - altDesc = SnapDbAccountsRef.init(altBase, Hash256(), peer) + let + ingerprinting = &"ingerprinting {inspectList.len}" + singleAcc = &"F{ingerprinting} single accounts lists" + accumAcc = &"F{ingerprinting} accumulated accounts" + cascAcc = &"Cascaded f{ingerprinting} accumulated accounts lists" - test &"Fingerprinting {inspectList.len} single accounts lists " & - "for in-memory-db": - for n,accList in inspectList: - # Separate storage - let - root = accList[0].root - rootKey = root.to(NodeKey) - desc = SnapDbAccountsRef.init(memBase, root, peer) - for w in accList: - check desc.importAccounts(w.base, w.data, persistent=false).isImportOk - let stats = desc.hexaDb.hexaryInspectTrie(rootKey) - check not stats.stopped - let - dangling = stats.dangling.mapIt(it.partialPath) - keys = dangling.hexaryPathNodeKeys( - rootKey, desc.hexaDb, missingOk=true) - check dangling.len == keys.len - singleStats.add (desc.hexaDb.tab.len,stats) + memBase = SnapDbRef.init(newMemoryDB()) + dbSlot = proc(n: int): SnapDbRef = + if 2+n < nTestDbInstances and not db.cdb[2+n].rocksStoreRef.isNil: + return SnapDbRef.init(db.cdb[2+n]) - # Verify piecemeal approach for `hexaryInspectTrie()` ... - var - ctx = TrieNodeStatCtxRef() - piecemeal: HashSet[Blob] - while not ctx.isNil: - let stat2 = desc.hexaDb.hexaryInspectTrie( - rootKey, resumeCtx=ctx, suspendAfter=128) - check not stat2.stopped - ctx = stat2.resumeCtx - piecemeal.incl stat2.dangling.mapIt(it.partialPath).toHashSet - # Must match earlier all-in-one result - check dangling.len == piecemeal.len - check dangling.toHashSet == piecemeal + test &"{singleAcc} for in-memory-db": + inspectList.test_inspectSingleAccountsMemDb(memBase, singleStats) - test &"Fingerprinting {inspectList.len} single accounts lists " & - "for persistent db": - if not persistent: - skip() + test &"{singleAcc} for persistent db": + if persistent: + inspectList.test_inspectSingleAccountsPersistent(dbSlot, singleStats) else: - for n,accList in inspectList: - if nTestDbInstances <= 2+n or db.cdb[2+n].rocksStoreRef.isNil: - continue - # Separate storage on persistent DB (leaving first db slot empty) - let - root = accList[0].root - rootKey = root.to(NodeKey) - dbBase = SnapDbRef.init(db.cdb[2+n]) - desc = SnapDbAccountsRef.init(dbBase, root, peer) - - for w in accList: - check desc.importAccounts(w.base,w.data, persistent=true).isImportOk - let stats = desc.getAccountFn.hexaryInspectTrie(rootKey) - check not stats.stopped - let - dangling = stats.dangling.mapIt(it.partialPath) - keys = dangling.hexaryPathNodeKeys( - rootKey, desc.hexaDb, missingOk=true) - check dangling.len == keys.len - # Must be the same as the in-memory fingerprint - let ssn1 = singleStats[n][1].dangling.mapIt(it.partialPath) - check ssn1.toHashSet == dangling.toHashSet - - # Verify piecemeal approach for `hexaryInspectTrie()` ... - var - ctx = TrieNodeStatCtxRef() - piecemeal: HashSet[Blob] - while not ctx.isNil: - let stat2 = desc.getAccountFn.hexaryInspectTrie( - rootKey, resumeCtx=ctx, suspendAfter=128) - check not stat2.stopped - ctx = stat2.resumeCtx - piecemeal.incl stat2.dangling.mapIt(it.partialPath).toHashSet - # Must match earlier all-in-one result - check dangling.len == piecemeal.len - check dangling.toHashSet == piecemeal - - test &"Fingerprinting {inspectList.len} accumulated accounts lists " & - "for in-memory-db": - for n,accList in inspectList: - # Accumulated storage - let - root = accList[0].root - rootKey = root.to(NodeKey) - desc = memDesc.dup(root,Peer()) - for w in accList: - check desc.importAccounts(w.base, w.data, persistent=false).isImportOk - let stats = desc.hexaDb.hexaryInspectTrie(rootKey) - check not stats.stopped - let - dangling = stats.dangling.mapIt(it.partialPath) - keys = dangling.hexaryPathNodeKeys( - rootKey, desc.hexaDb, missingOk=true) - check dangling.len == keys.len - accuStats.add (desc.hexaDb.tab.len, stats) - - test &"Fingerprinting {inspectList.len} accumulated accounts lists " & - "for persistent db": - if not persistent: skip() - else: - for n,accList in inspectList: - # Accumulated storage on persistent DB (using first db slot) - let - root = accList[0].root - rootKey = root.to(NodeKey) - rootSet = [rootKey].toHashSet - desc = perDesc.dup(root,Peer()) - for w in accList: - check desc.importAccounts(w.base, w.data, persistent).isImportOk - let stats = desc.getAccountFn.hexaryInspectTrie(rootKey) - check not stats.stopped - let - dangling = stats.dangling.mapIt(it.partialPath) - keys = dangling.hexaryPathNodeKeys( - rootKey, desc.hexaDb, missingOk=true) - check dangling.len == keys.len - check accuStats[n][1] == stats - test &"Cascaded fingerprinting {inspectList.len} accumulated accounts " & - "lists for in-memory-db": - if not cascaded: - skip() - else: - let - cscBase = SnapDbRef.init(newMemoryDB()) - cscDesc = SnapDbAccountsRef.init(cscBase, Hash256(), peer) - var - cscStep: Table[NodeKey,(int,seq[Blob])] - for n,accList in inspectList: - # Accumulated storage - let - root = accList[0].root - rootKey = root.to(NodeKey) - desc = cscDesc.dup(root,Peer()) - for w in accList: - check desc.importAccounts(w.base,w.data,persistent=false).isImportOk - if cscStep.hasKeyOrPut(rootKey,(1,seq[Blob].default)): - cscStep[rootKey][0].inc - let - stat0 = desc.hexaDb.hexaryInspectTrie(rootKey) - stats = desc.hexaDb.hexaryInspectTrie(rootKey,cscStep[rootKey][1]) - check not stat0.stopped - check not stats.stopped - let - accumulated = stat0.dangling.mapIt(it.partialPath).toHashSet - cascaded = stats.dangling.mapIt(it.partialPath).toHashSet - check accumulated == cascaded - # Make sure that there are no trivial cases - let trivialCases = toSeq(cscStep.values).filterIt(it[0] <= 1).len - check trivialCases == 0 + test &"{accumAcc} for in-memory-db": + inspectList.test_inspectAccountsInMemDb(memBase, accuStats) - test &"Cascaded fingerprinting {inspectList.len} accumulated accounts " & - "for persistent db": - if not cascaded or not persistent: - skip() + test &"{accumAcc} for persistent db": + if persistent: + inspectList.test_inspectAccountsPersistent(db.cdb[0], accuStats) else: - let - cscBase = altBase - cscDesc = altDesc - var - cscStep: Table[NodeKey,(int,seq[Blob])] - for n,accList in inspectList: - # Accumulated storage - let - root = accList[0].root - rootKey = root.to(NodeKey) - desc = cscDesc.dup(root,Peer()) - for w in accList: - check desc.importAccounts(w.base,w.data, persistent).isImportOk - if cscStep.hasKeyOrPut(rootKey,(1,seq[Blob].default)): - cscStep[rootKey][0].inc - let - stat0 = desc.getAccountFn.hexaryInspectTrie(rootKey) - stats = desc.getAccountFn.hexaryInspectTrie(rootKey, - cscStep[rootKey][1]) - check not stat0.stopped - check not stats.stopped - let - accumulated = stat0.dangling.mapIt(it.partialPath).toHashSet - cascaded = stats.dangling.mapIt(it.partialPath).toHashSet - check accumulated == cascaded - # Make sure that there are no trivial cases - let trivialCases = toSeq(cscStep.values).filterIt(it[0] <= 1).len - check trivialCases == 0 + skip() + + test &"{cascAcc} for in-memory-db": + if cascaded: + inspectList.test_inspectCascadedMemDb() + else: + skip() + + test &"{cascAcc} for persistent db": + if cascaded and persistent: + inspectList.test_inspectCascadedPersistent(db.cdb[1]) + else: + skip() # ------------------------------------------------------------------------------ # Test Runners: database timing tests @@ -764,8 +356,8 @@ proc importRunner(noisy = true; persistent = true; capture = bChainCapture) = filePath = capture.file.findFilePath(baseDir,repoDir).value tmpDir = getTmpDir() db = if persistent: tmpDir.testDbs(capture.name) else: testDbs() - numBlocksInfo = if capture.numBlocks == high(int): "" - else: $capture.numBlocks & " " + numBlocks = capture.numBlocks + numBlocksInfo = if numBlocks == high(int): "" else: $numBlocks & " " loadNoise = noisy defer: @@ -773,81 +365,27 @@ proc importRunner(noisy = true; persistent = true; capture = bChainCapture) = tmpDir.flushDbDir(capture.name) suite &"SyncSnap: using {fileInfo} capture for testing db timings": - var - ddb: CommonRef # perstent DB on disk - chn: ChainRef + var ddb: CommonRef # perstent DB on disk test &"Create persistent ChainDBRef on {tmpDir}": - let chainDb = if db.persistent: db.cdb[0].trieDB - else: newMemoryDB() - - # Constructor ... ddb = CommonRef.new( - chainDb, + db = if db.persistent: db.cdb[0].trieDB else: newMemoryDB(), networkId = capture.network, pruneTrie = true, params = capture.network.networkParams) - ddb.initializeEmptyDb - chn = ddb.newChain test &"Storing {numBlocksInfo}persistent blocks from dump": - for w in filePath.undumpNextGroup: - let (fromBlock, toBlock) = (w[0][0].blockNumber, w[0][^1].blockNumber) - if fromBlock == 0.u256: - doAssert w[0][0] == ddb.db.getBlockHeader(0.u256) - continue - # Message if [fromBlock,toBlock] contains a multiple of 700 - if fromBlock + (toBlock mod 900) <= toBlock: - loadNoise.say "***", &"processing ...[#{fromBlock},#{toBlock}]..." - check chn.persistBlocks(w[0], w[1]).isOk - if capture.numBlocks.toBlockNumber <= w[0][^1].blockNumber: - break + noisy.test_dbTimingUndumpBlocks(filePath, ddb, numBlocks, loadNoise) test "Extract key-value records into memory tables via rocksdb iterator": - # Implicit test: if not persistent => db.cdb[0] is nil if db.cdb[0].rocksStoreRef.isNil: - skip() + skip() # not persistent => db.cdb[0] is nil else: - let - rdb = db.cdb[0].rocksStoreRef - rop = rdb.store.readOptions - rit = rdb.store.db.rocksdb_create_iterator(rop) - check not rit.isNil - - xTab32.clear - xTab33.clear - - rit.rocksdb_iter_seek_to_first() - while rit.rocksdb_iter_valid() != 0: - let (key,val) = rit.thisRecord() - rit.rocksdb_iter_next() - if key.len == 32: - xTab32[key.to(ByteArray32)] = val - xVal32Sum += val.len.float - xVal32SqSum += val.len.float * val.len.float - check key.to(ByteArray32).to(Blob) == key - elif key.len == 33: - xTab33[key.to(ByteArray33)] = val - xVal33Sum += val.len.float - xVal33SqSum += val.len.float * val.len.float - check key.to(ByteArray33).to(Blob) == key - else: - noisy.say "***", "ignoring key=", key.toHex - - rit.rocksdb_iter_destroy() - - var - (mean32, stdv32) = meanStdDev(xVal32Sum, xVal32SqSum, xTab32.len) - (mean33, stdv33) = meanStdDev(xVal33Sum, xVal33SqSum, xTab33.len) - noisy.say "***", - "key 32 table: ", - &"size={xTab32.len} valLen={(mean32+0.5).int}({(stdv32+0.5).int})", - ", key 33 table: ", - &"size={xTab33.len} valLen={(mean33+0.5).int}({(stdv33+0.5).int})" + noisy.test_dbTimingRockySetup(xTab32, xTab33, db.cdb[0]) -proc storeRunner(noisy = true; persistent = true; cleanUp = true) = +proc dbTimingRunner(noisy = true; persistent = true; cleanUp = true) = let fullNoise = false var @@ -891,307 +429,51 @@ proc storeRunner(noisy = true; persistent = true; cleanUp = true) = doAssert 9 <= nTestDbInstances doAssert not xDbs.cdb[8].isNil + let + storeDir32 = &"Directly store {xTab32.len} records" + storeDir33 = &"Directly store {xTab33.len} records" + storeTx32 = &"Transactionally store directly {xTab32.len} records" + storeTx33 = &"Transactionally store directly {xTab33.len} records" + intoTrieDb = &"into {emptyDb} trie db" + + storeRks32 = &"Store {xTab32.len} records" + storeRks33 = &"Store {xTab33.len} records" + intoRksDb = &"into {emptyDb} rocksdb table" + if xTab32.len == 0 or xTab33.len == 0: test &"Both tables with 32 byte keys(size={xTab32.len}), " & &"33 byte keys(size={xTab32.len}) must be non-empty": skip() else: - # cdb[0] -- direct db, key length 32, no transaction - test &"Directly store {xTab32.len} records " & - &"(key length 32) into {emptyDb} trie database": - var ela: Duration - let tdb = xDbs.cdb[0].trieDB + test &"{storeDir32} (key length 32) {intoTrieDb}": + noisy.test_dbTimingStoreDirect32(xTab32, xDbs.cdb[0]) - if noisy: echo "" - noisy.showElapsed("Standard db loader(keyLen 32)", ela): - for (key,val) in xTab32.pairs: - tdb.put(key, val) + test &"{storeDir32} (key length 33) {intoTrieDb}": + noisy.test_dbTimingStoreDirectly32as33(xTab32, xDbs.cdb[1]) - if ela.inNanoseconds != 0: - let - elaNs = ela.inNanoseconds.float - perRec = ((elaNs / xTab32.len.float) + 0.5).int.initDuration - noisy.say "***", - "nRecords=", xTab32.len, ", ", - "perRecord=", perRec.pp + test &"{storeTx32} (key length 32) {intoTrieDb}": + noisy.test_dbTimingStoreTx32(xTab32, xDbs.cdb[2]) - # cdb[1] -- direct db, key length 32 as 33, no transaction - test &"Directly store {xTab32.len} records " & - &"(key length 33) into {emptyDb} trie database": - var ela = initDuration() - let tdb = xDbs.cdb[1].trieDB + test &"{storeTx32} (key length 33) {intoTrieDb}": + noisy.test_dbTimingStoreTx32as33(xTab32, xDbs.cdb[3]) - if noisy: echo "" - noisy.showElapsed("Standard db loader(keyLen 32 as 33)", ela): - for (key,val) in xTab32.pairs: - tdb.put(@[99.byte] & key.toSeq, val) + test &"{storeDir33} (key length 33) {intoTrieDb}": + noisy.test_dbTimingDirect33(xTab33, xDbs.cdb[4]) - if ela.inNanoseconds != 0: - let - elaNs = ela.inNanoseconds.float - perRec = ((elaNs / xTab32.len.float) + 0.5).int.initDuration - noisy.say "***", - "nRecords=", xTab32.len, ", ", - "perRecord=", perRec.pp - - # cdb[2] -- direct db, key length 32, transaction based - test &"Transactionally store {xTab32.len} records " & - &"(key length 32) into {emptyDb} trie database": - var ela: Duration - let tdb = xDbs.cdb[2].trieDB - - if noisy: echo "" - noisy.showElapsed("Standard db loader(tx,keyLen 32)", ela): - let dbTx = tdb.beginTransaction - defer: dbTx.commit - - for (key,val) in xTab32.pairs: - tdb.put(key, val) - - if ela.inNanoseconds != 0: - let - elaNs = ela.inNanoseconds.float - perRec = ((elaNs / xTab32.len.float) + 0.5).int.initDuration - noisy.say "***", - "nRecords=", xTab32.len, ", ", - "perRecord=", perRec.pp - - # cdb[3] -- direct db, key length 32 as 33, transaction based - test &"Transactionally store {xTab32.len} records " & - &"(key length 33) into {emptyDb} trie database": - var ela: Duration - let tdb = xDbs.cdb[3].trieDB - - if noisy: echo "" - noisy.showElapsed("Standard db loader(tx,keyLen 32 as 33)", ela): - let dbTx = tdb.beginTransaction - defer: dbTx.commit - - for (key,val) in xTab32.pairs: - tdb.put(@[99.byte] & key.toSeq, val) - - if ela.inNanoseconds != 0: - let - elaNs = ela.inNanoseconds.float - perRec = ((elaNs / xTab32.len.float) + 0.5).int.initDuration - noisy.say "***", - "nRecords=", xTab32.len, ", ", - "perRecord=", perRec.pp - - # cdb[4] -- direct db, key length 33, no transaction - test &"Directly store {xTab33.len} records " & - &"(key length 33) into {emptyDb} trie database": - var ela: Duration - let tdb = xDbs.cdb[4].trieDB - - if noisy: echo "" - noisy.showElapsed("Standard db loader(keyLen 33)", ela): - for (key,val) in xTab33.pairs: - tdb.put(key, val) - - if ela.inNanoseconds != 0: - let - elaNs = ela.inNanoseconds.float - perRec = ((elaNs / xTab33.len.float) + 0.5).int.initDuration - noisy.say "***", - "nRecords=", xTab33.len, ", ", - "perRecord=", perRec.pp - - # cdb[5] -- direct db, key length 33, transaction based - test &"Transactionally store {xTab33.len} records " & - &"(key length 33) into {emptyDb} trie database": - var ela: Duration - let tdb = xDbs.cdb[5].trieDB - - if noisy: echo "" - noisy.showElapsed("Standard db loader(tx,keyLen 33)", ela): - let dbTx = tdb.beginTransaction - defer: dbTx.commit - - for (key,val) in xTab33.pairs: - tdb.put(key, val) - - if ela.inNanoseconds != 0: - let - elaNs = ela.inNanoseconds.float - perRec = ((elaNs / xTab33.len.float) + 0.5).int.initDuration - noisy.say "***", - "nRecords=", xTab33.len, ", ", - "perRecord=", perRec.pp + test &"{storeTx33} (key length 33) {intoTrieDb}": + noisy.test_dbTimingTx33(xTab33, xDbs.cdb[5]) if xDbs.cdb[0].rocksStoreRef.isNil: test "The rocksdb interface must be available": skip() else: - # cdb[6] -- rocksdb, key length 32 - test &"Store {xTab32.len} records " & - "(key length 32) into empty rocksdb table": - var - ela: array[4,Duration] - size: int64 - let - rdb = xDbs.cdb[6].rocksStoreRef + test &"{storeRks32} (key length 32) {intoRksDb}": + noisy.test_dbTimingRocky32(xTab32, xDbs.cdb[6], fullNoise) - # Note that 32 and 33 size keys cannot be usefiully merged into the - # same SST file. The keys must be added in a sorted mode. So playing - # safe, key sizes should be of - # equal length. + test &"{storeRks32} (key length 33) {intoRksDb}": + noisy.test_dbTimingRocky32as33(xTab32, xDbs.cdb[7], fullNoise) - if noisy: echo "" - noisy.showElapsed("Rocky bulk loader(keyLen 32)", ela[0]): - let bulker = RockyBulkLoadRef.init(rdb) - defer: bulker.destroy() - check bulker.begin("rocky-bulk-cache") - - var - keyList = newSeq[NodeTag](xTab32.len) - - fullNoise.showElapsed("Rocky bulk loader/32, sorter", ela[1]): - var inx = 0 - for key in xTab32.keys: - keyList[inx] = key.to(NodeTag) - inx.inc - keyList.sort(cmp) - - fullNoise.showElapsed("Rocky bulk loader/32, append", ela[2]): - for n,nodeTag in keyList: - let key = nodeTag.to(Blob) - check bulker.add(key, xTab32[key.to(ByteArray32)]) - - fullNoise.showElapsed("Rocky bulk loader/32, slurp", ela[3]): - let rc = bulker.finish() - if rc.isOk: - size = rc.value - else: - check bulker.lastError == "" # force printing error - - fullNoise.say "***", " ela[]=", $ela.toSeq.mapIt(it.pp) - if ela[0].inNanoseconds != 0: - let - elaNs = ela.toSeq.mapIt(it.inNanoseconds.float) - elaPc = elaNs.mapIt(((it / elaNs[0]) * 100 + 0.5).int) - perRec = ((elaNs[0] / xTab32.len.float) + 0.5).int.initDuration - noisy.say "***", - "nRecords=", xTab32.len, ", ", - "perRecord=", perRec.pp, ", ", - "sstSize=", size.uint64.toSI, ", ", - "perRecord=", ((size.float / xTab32.len.float) + 0.5).int, ", ", - ["Total","Sorter","Append","Ingest"].zip(elaPc).ppKvPc - - # cdb[7] -- rocksdb, key length 32 as 33 - test &"Store {xTab32.len} records " & - "(key length 33) into empty rocksdb table": - var - ela: array[4,Duration] - size: int64 - let - rdb = xDbs.cdb[7].rocksStoreRef - - # Note that 32 and 33 size keys cannot be usefiully merged into the - # same SST file. The keys must be added in a sorted mode. So playing - # safe, key sizes should be of - # equal length. - - if noisy: echo "" - noisy.showElapsed("Rocky bulk loader(keyLen 32 as 33)", ela[0]): - let bulker = RockyBulkLoadRef.init(rdb) - defer: bulker.destroy() - check bulker.begin("rocky-bulk-cache") - - var - keyList = newSeq[NodeTag](xTab32.len) - - fullNoise.showElapsed("Rocky bulk loader/32 as 33, sorter", ela[1]): - var inx = 0 - for key in xTab32.keys: - keyList[inx] = key.to(NodeTag) - inx.inc - keyList.sort(cmp) - - fullNoise.showElapsed("Rocky bulk loader/32 as 33, append", ela[2]): - for n,nodeTag in keyList: - let key = nodeTag.to(Blob) - check bulker.add(@[99.byte] & key, xTab32[key.to(ByteArray32)]) - - fullNoise.showElapsed("Rocky bulk loader/32 as 33, slurp", ela[3]): - let rc = bulker.finish() - if rc.isOk: - size = rc.value - else: - check bulker.lastError == "" # force printing error - - fullNoise.say "***", " ela[]=", $ela.toSeq.mapIt(it.pp) - if ela[0].inNanoseconds != 0: - let - elaNs = ela.toSeq.mapIt(it.inNanoseconds.float) - elaPc = elaNs.mapIt(((it / elaNs[0]) * 100 + 0.5).int) - perRec = ((elaNs[0] / xTab32.len.float) + 0.5).int.initDuration - noisy.say "***", - "nRecords=", xTab32.len, ", ", - "perRecord=", perRec.pp, ", ", - "sstSize=", size.uint64.toSI, ", ", - "perRecord=", ((size.float / xTab32.len.float) + 0.5).int, ", ", - ["Total","Sorter","Append","Ingest"].zip(elaPc).ppKvPc - - - # cdb[8] -- rocksdb, key length 33 - test &"Store {xTab33.len} records " & - &"(key length 33) into {emptyDb} rocksdb table": - var - ela: array[4,Duration] - size: int64 - let rdb = xDbs.cdb[8].rocksStoreRef - - # Note that 32 and 33 size keys cannot be usefiully merged into the - # same SST file. The keys must be added in a sorted mode. So playing - # safe, key sizes should be of equal length. - - if noisy: echo "" - noisy.showElapsed("Rocky bulk loader(keyLen 33)", ela[0]): - let bulker = RockyBulkLoadRef.init(rdb) - defer: bulker.destroy() - check bulker.begin("rocky-bulk-cache") - - var - kKeys: seq[byte] # need to cacscade - kTab: Table[byte,seq[NodeTag]] - - fullNoise.showElapsed("Rocky bulk loader/33, sorter", ela[1]): - for key in xTab33.keys: - if kTab.hasKey(key[0]): - kTab[key[0]].add key.toOpenArray(1,32).to(NodeTag) - else: - kTab[key[0]] = @[key.toOpenArray(1,32).to(NodeTag)] - - kKeys = toSeq(kTab.keys).sorted - for w in kKeys: - kTab[w].sort(cmp) - - fullNoise.showElapsed("Rocky bulk loader/33, append", ela[2]): - for w in kKeys: - fullNoise.say "***", " prefix=", w, " entries=", kTab[w].len - for n,nodeTag in kTab[w]: - let key = (w,nodeTag).to(Blob) - check bulker.add(key, xTab33[key.to(ByteArray33)]) - - fullNoise.showElapsed("Rocky bulk loader/33, slurp", ela[3]): - let rc = bulker.finish() - if rc.isOk: - size = rc.value - else: - check bulker.lastError == "" # force printing error - - fullNoise.say "***", " ela[]=", $ela.toSeq.mapIt(it.pp) - if ela[0].inNanoseconds != 0: - let - elaNs = ela.toSeq.mapIt(it.inNanoseconds.float) - elaPc = elaNs.mapIt(((it / elaNs[0]) * 100 + 0.5).int) - perRec = ((elaNs[0] / xTab33.len.float) + 0.5).int.initDuration - noisy.say "***", - "nRecords=", xTab33.len, ", ", - "perRecord=", perRec.pp, ", ", - "sstSize=", size.uint64.toSI, ", ", - "perRecord=", ((size.float / xTab33.len.float) + 0.5).int, ", ", - ["Total","Cascaded-Sorter","Append","Ingest"].zip(elaPc).ppKvPc + test &"{storeRks33} (key length 33) {intoRksDb}": + noisy.test_dbTimingRocky33(xTab33, xDbs.cdb[8], fullNoise) # ------------------------------------------------------------------------------ # Main function(s) @@ -1202,7 +484,7 @@ proc syncSnapMain*(noisy = defined(debug)) = noisy.accountsRunner(persistent=false) noisy.importRunner() # small sample, just verify functionality noisy.inspectionRunner() - noisy.storeRunner() + noisy.dbTimingRunner() when isMainModule: const @@ -1298,9 +580,9 @@ when isMainModule: noisy.showElapsed("importRunner()"): noisy.importRunner(capture = bulkTest0) - noisy.showElapsed("storeRunner()"): - true.storeRunner(cleanUp = false) - true.storeRunner() + noisy.showElapsed("dbTimingRunner()"): + true.dbTimingRunner(cleanUp = false) + true.dbTimingRunner() # ------------------------------------------------------------------------------ # End diff --git a/tests/test_sync_snap/test_accounts.nim b/tests/test_sync_snap/test_accounts.nim new file mode 100644 index 000000000..e0e8c1ad5 --- /dev/null +++ b/tests/test_sync_snap/test_accounts.nim @@ -0,0 +1,124 @@ +# Nimbus - Types, data structures and shared utilities used in network sync +# +# Copyright (c) 2018-2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or +# distributed except according to those terms. + +## Snap sync components tester and TDD environment + +import + std/[algorithm, sequtils, strformat, strutils, tables], + eth/[common, p2p, trie/db], + unittest2, + ../../nimbus/db/select_backend, + ../../nimbus/sync/snap/range_desc, + ../../nimbus/sync/snap/worker/db/[snapdb_accounts, snapdb_desc], + ../replay/[pp, undump_accounts], + ./test_helpers + +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +proc flatten(list: openArray[seq[Blob]]): seq[Blob] = + for w in list: + result.add w + +# ------------------------------------------------------------------------------ +# Public test function +# ------------------------------------------------------------------------------ + +proc test_accountsImport*( + inList: seq[UndumpAccounts]; + desc: SnapDbAccountsRef; + persistent: bool + ) = + ## Import accounts + for n,w in inList: + check desc.importAccounts(w.base, w.data, persistent).isImportOk + + +proc test_accountsMergeProofs*( + inList: seq[UndumpAccounts]; + desc: SnapDbAccountsRef; + accKeys: var seq[NodeKey]; + ) = + ## Merge account proofs + # Load/accumulate data from several samples (needs some particular sort) + let baseTag = inList.mapIt(it.base).sortMerge + let packed = PackedAccountRange( + accounts: inList.mapIt(it.data.accounts).sortMerge, + proof: inList.mapIt(it.data.proof).flatten) + # Merging intervals will produce gaps, so the result is expected OK but + # different from `.isImportOk` + check desc.importAccounts(baseTag, packed, true).isOk + + # check desc.merge(lowerBound, accounts) == OkHexDb + desc.assignPrettyKeys() # for debugging, make sure that state root ~ "$0" + + # Update list of accounts. There might be additional accounts in the set + # of proof nodes, typically before the `lowerBound` of each block. As + # there is a list of account ranges (that were merged for testing), one + # need to check for additional records only on either end of a range. + var keySet = packed.accounts.mapIt(it.accKey).toHashSet + for w in inList: + var key = desc.prevAccountsChainDbKey(w.data.accounts[0].accKey) + while key.isOk and key.value notin keySet: + keySet.incl key.value + let newKey = desc.prevAccountsChainDbKey(key.value) + check newKey != key + key = newKey + key = desc.nextAccountsChainDbKey(w.data.accounts[^1].accKey) + while key.isOk and key.value notin keySet: + keySet.incl key.value + let newKey = desc.nextAccountsChainDbKey(key.value) + check newKey != key + key = newKey + accKeys = toSeq(keySet).mapIt(it.to(NodeTag)).sorted(cmp) + .mapIt(it.to(NodeKey)) + check packed.accounts.len <= accKeys.len + + +proc test_accountsRevisitStoredItems*( + accKeys: seq[NodeKey]; + desc: SnapDbAccountsRef; + noisy = false; + ) = + ## Revisit stored items on ChainDBRef + var + nextAccount = accKeys[0] + prevAccount: NodeKey + count = 0 + for accKey in accKeys: + count.inc + let + pfx = $count & "#" + byChainDB = desc.getAccountsChainDb(accKey) + byNextKey = desc.nextAccountsChainDbKey(accKey) + byPrevKey = desc.prevAccountsChainDbKey(accKey) + noisy.say "*** find", + "<", count, "> byChainDb=", byChainDB.pp + check byChainDB.isOk + + # Check `next` traversal funcionality. If `byNextKey.isOk` fails, the + # `nextAccount` value is still the old one and will be different from + # the account in the next for-loop cycle (if any.) + check pfx & accKey.pp(false) == pfx & nextAccount.pp(false) + if byNextKey.isOk: + nextAccount = byNextKey.get(otherwise = NodeKey.default) + + # Check `prev` traversal funcionality + if prevAccount != NodeKey.default: + check byPrevKey.isOk + if byPrevKey.isOk: + check pfx & byPrevKey.value.pp(false) == pfx & prevAccount.pp(false) + prevAccount = accKey + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/tests/test_sync_snap/test_db_timing.nim b/tests/test_sync_snap/test_db_timing.nim new file mode 100644 index 000000000..ae8ef7c36 --- /dev/null +++ b/tests/test_sync_snap/test_db_timing.nim @@ -0,0 +1,448 @@ +# Nimbus - Types, data structures and shared utilities used in network sync +# +# Copyright (c) 2018-2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or +# distributed except according to those terms. + +## Snap sync components tester and TDD environment + +import + std/[algorithm, math, sequtils, strformat, strutils, times], + stew/byteutils, + rocksdb, + unittest2, + ../../nimbus/core/chain, + ../../nimbus/db/select_backend, + ../../nimbus/sync/snap/range_desc, + ../../nimbus/sync/snap/worker/db/[hexary_desc, rocky_bulk_load], + ../../nimbus/utils/prettify, + ../replay/[pp, undump_blocks], + ./test_helpers + +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +proc thisRecord(r: rocksdb_iterator_t): (Blob,Blob) = + var kLen, vLen: csize_t + let + kData = r.rocksdb_iter_key(addr kLen) + vData = r.rocksdb_iter_value(addr vLen) + if not kData.isNil and not vData.isNil: + let + key = string.fromBytes(toOpenArrayByte(kData,0,int(kLen)-1)) + value = string.fromBytes(toOpenArrayByte(vData,0,int(vLen)-1)) + return (key.mapIt(it.byte),value.mapIt(it.byte)) + +proc meanStdDev(sum, sqSum: float; length: int): (float,float) = + if 0 < length: + result[0] = sum / length.float + result[1] = sqrt(sqSum / length.float - result[0] * result[0]) + +# ------------------------------------------------------------------------------ +# Public test function: setup +# ------------------------------------------------------------------------------ + +proc test_dbTimingUndumpBlocks*( + noisy: bool; + filePath: string; + com: CommonRef; + numBlocks: int; + loadNoise = false; + ) = + ## Store persistent blocks from dump into chain DB + let chain = com.newChain + + for w in filePath.undumpNextGroup: + let (fromBlock, toBlock) = (w[0][0].blockNumber, w[0][^1].blockNumber) + if fromBlock == 0.u256: + doAssert w[0][0] == com.db.getBlockHeader(0.u256) + continue + # Message if [fromBlock,toBlock] contains a multiple of 700 + if fromBlock + (toBlock mod 900) <= toBlock: + loadNoise.say "***", &"processing ...[#{fromBlock},#{toBlock}]..." + check chain.persistBlocks(w[0], w[1]) == ValidationResult.OK + if numBlocks.toBlockNumber <= w[0][^1].blockNumber: + break + +proc test_dbTimingRockySetup*( + noisy: bool; + t32: var Table[ByteArray32,Blob], + t33: var Table[ByteArray33,Blob], + cdb: ChainDb; + ) = + ## Extract key-value records into memory tables via rocksdb iterator + let + rdb = cdb.rocksStoreRef + rop = rdb.store.readOptions + rit = rdb.store.db.rocksdb_create_iterator(rop) + check not rit.isNil + + var + v32Sum, v32SqSum: float # statistics + v33Sum, v33SqSum: float + + t32.clear + t33.clear + + rit.rocksdb_iter_seek_to_first() + while rit.rocksdb_iter_valid() != 0: + let (key,val) = rit.thisRecord() + rit.rocksdb_iter_next() + if key.len == 32: + t32[key.to(ByteArray32)] = val + v32Sum += val.len.float + v32SqSum += val.len.float * val.len.float + check key.to(ByteArray32).to(Blob) == key + elif key.len == 33: + t33[key.to(ByteArray33)] = val + v33Sum += val.len.float + v33SqSum += val.len.float * val.len.float + check key.to(ByteArray33).to(Blob) == key + else: + noisy.say "***", "ignoring key=", key.toHex + + rit.rocksdb_iter_destroy() + + var + (mean32, stdv32) = meanStdDev(v32Sum, v32SqSum, t32.len) + (mean33, stdv33) = meanStdDev(v33Sum, v33SqSum, t33.len) + noisy.say "***", + "key 32 table: ", + &"size={t32.len} valLen={(mean32+0.5).int}({(stdv32+0.5).int})", + ", key 33 table: ", + &"size={t33.len} valLen={(mean33+0.5).int}({(stdv33+0.5).int})" + +# ------------------------------------------------------------------------------ +# Public test function: timing +# ------------------------------------------------------------------------------ + +proc test_dbTimingStoreDirect32*( + noisy: bool; + t32: Table[ByteArray32,Blob]; + cdb: ChainDb; + ) = + ## Direct db, key length 32, no transaction + var ela: Duration + let tdb = cdb.trieDB + + if noisy: echo "" + noisy.showElapsed("Standard db loader(keyLen 32)", ela): + for (key,val) in t32.pairs: + tdb.put(key, val) + + if ela.inNanoseconds != 0: + let + elaNs = ela.inNanoseconds.float + perRec = ((elaNs / t32.len.float) + 0.5).int.initDuration + noisy.say "***", + "nRecords=", t32.len, ", ", + "perRecord=", perRec.pp + +proc test_dbTimingStoreDirectly32as33*( + noisy: bool; + t32: Table[ByteArray32,Blob], + cdb: ChainDb; + ) = + ## Direct db, key length 32 as 33, no transaction + var ela = initDuration() + let tdb = cdb.trieDB + + if noisy: echo "" + noisy.showElapsed("Standard db loader(keyLen 32 as 33)", ela): + for (key,val) in t32.pairs: + tdb.put(@[99.byte] & key.toSeq, val) + + if ela.inNanoseconds != 0: + let + elaNs = ela.inNanoseconds.float + perRec = ((elaNs / t32.len.float) + 0.5).int.initDuration + noisy.say "***", + "nRecords=", t32.len, ", ", + "perRecord=", perRec.pp + +proc test_dbTimingStoreTx32*( + noisy: bool; + t32: Table[ByteArray32,Blob], + cdb: ChainDb; + ) = + ## Direct db, key length 32, transaction based + var ela: Duration + let tdb = cdb.trieDB + + if noisy: echo "" + noisy.showElapsed("Standard db loader(tx,keyLen 32)", ela): + let dbTx = tdb.beginTransaction + defer: dbTx.commit + + for (key,val) in t32.pairs: + tdb.put(key, val) + + if ela.inNanoseconds != 0: + let + elaNs = ela.inNanoseconds.float + perRec = ((elaNs / t32.len.float) + 0.5).int.initDuration + noisy.say "***", + "nRecords=", t32.len, ", ", + "perRecord=", perRec.pp + +proc test_dbTimingStoreTx32as33*( + noisy: bool; + t32: Table[ByteArray32,Blob], + cdb: ChainDb; + ) = + ## Direct db, key length 32 as 33, transaction based + var ela: Duration + let tdb = cdb.trieDB + + if noisy: echo "" + noisy.showElapsed("Standard db loader(tx,keyLen 32 as 33)", ela): + let dbTx = tdb.beginTransaction + defer: dbTx.commit + + for (key,val) in t32.pairs: + tdb.put(@[99.byte] & key.toSeq, val) + + if ela.inNanoseconds != 0: + let + elaNs = ela.inNanoseconds.float + perRec = ((elaNs / t32.len.float) + 0.5).int.initDuration + noisy.say "***", + "nRecords=", t32.len, ", ", + "perRecord=", perRec.pp + +proc test_dbTimingDirect33*( + noisy: bool; + t33: Table[ByteArray33,Blob], + cdb: ChainDb; + ) = + ## Direct db, key length 33, no transaction + var ela: Duration + let tdb = cdb.trieDB + + if noisy: echo "" + noisy.showElapsed("Standard db loader(keyLen 33)", ela): + for (key,val) in t33.pairs: + tdb.put(key, val) + + if ela.inNanoseconds != 0: + let + elaNs = ela.inNanoseconds.float + perRec = ((elaNs / t33.len.float) + 0.5).int.initDuration + noisy.say "***", + "nRecords=", t33.len, ", ", + "perRecord=", perRec.pp + +proc test_dbTimingTx33*( + noisy: bool; + t33: Table[ByteArray33,Blob], + cdb: ChainDb; + ) = + ## Direct db, key length 33, transaction based + var ela: Duration + let tdb = cdb.trieDB + + if noisy: echo "" + noisy.showElapsed("Standard db loader(tx,keyLen 33)", ela): + let dbTx = tdb.beginTransaction + defer: dbTx.commit + + for (key,val) in t33.pairs: + tdb.put(key, val) + + if ela.inNanoseconds != 0: + let + elaNs = ela.inNanoseconds.float + perRec = ((elaNs / t33.len.float) + 0.5).int.initDuration + noisy.say "***", + "nRecords=", t33.len, ", ", + "perRecord=", perRec.pp + +proc test_dbTimingRocky32*( + noisy: bool; + t32: Table[ByteArray32,Blob], + cdb: ChainDb; + fullNoise = false; + ) = + ## Rocksdb, key length 32 + var + ela: array[4,Duration] + size: int64 + let + rdb = cdb.rocksStoreRef + + # Note that 32 and 33 size keys cannot be usefully merged into the same SST + # file. The keys must be added in a sorted mode. So playing safe, key sizes + # should be of equal length. + + if noisy: echo "" + noisy.showElapsed("Rocky bulk loader(keyLen 32)", ela[0]): + let bulker = RockyBulkLoadRef.init(rdb) + defer: bulker.destroy() + check bulker.begin("rocky-bulk-cache") + + var + keyList = newSeq[NodeTag](t32.len) + + fullNoise.showElapsed("Rocky bulk loader/32, sorter", ela[1]): + var inx = 0 + for key in t32.keys: + keyList[inx] = key.to(NodeTag) + inx.inc + keyList.sort(cmp) + + fullNoise.showElapsed("Rocky bulk loader/32, append", ela[2]): + for n,nodeTag in keyList: + let key = nodeTag.to(Blob) + check bulker.add(key, t32[key.to(ByteArray32)]) + + fullNoise.showElapsed("Rocky bulk loader/32, slurp", ela[3]): + let rc = bulker.finish() + if rc.isOk: + size = rc.value + else: + check bulker.lastError == "" # force printing error + + fullNoise.say "***", " ela[]=", $ela.toSeq.mapIt(it.pp) + if ela[0].inNanoseconds != 0: + let + elaNs = ela.toSeq.mapIt(it.inNanoseconds.float) + elaPc = elaNs.mapIt(((it / elaNs[0]) * 100 + 0.5).int) + perRec = ((elaNs[0] / t32.len.float) + 0.5).int.initDuration + noisy.say "***", + "nRecords=", t32.len, ", ", + "perRecord=", perRec.pp, ", ", + "sstSize=", size.uint64.toSI, ", ", + "perRecord=", ((size.float / t32.len.float) + 0.5).int, ", ", + ["Total","Sorter","Append","Ingest"].zip(elaPc).ppKvPc + +proc test_dbTimingRocky32as33*( + noisy: bool; + t32: Table[ByteArray32,Blob], + cdb: ChainDb; + fullNoise = false; + ) = + ## Rocksdb, key length 32 as 33 + var + ela: array[4,Duration] + size: int64 + let + rdb = cdb.rocksStoreRef + + # Note that 32 and 33 size keys cannot be usefiully merged into the same SST + # file. The keys must be added in a sorted mode. So playing safe, key sizes + # should be of equal length. + + if noisy: echo "" + noisy.showElapsed("Rocky bulk loader(keyLen 32 as 33)", ela[0]): + let bulker = RockyBulkLoadRef.init(rdb) + defer: bulker.destroy() + check bulker.begin("rocky-bulk-cache") + + var + keyList = newSeq[NodeTag](t32.len) + + fullNoise.showElapsed("Rocky bulk loader/32 as 33, sorter", ela[1]): + var inx = 0 + for key in t32.keys: + keyList[inx] = key.to(NodeTag) + inx.inc + keyList.sort(cmp) + + fullNoise.showElapsed("Rocky bulk loader/32 as 33, append", ela[2]): + for n,nodeTag in keyList: + let key = nodeTag.to(Blob) + check bulker.add(@[99.byte] & key, t32[key.to(ByteArray32)]) + + fullNoise.showElapsed("Rocky bulk loader/32 as 33, slurp", ela[3]): + let rc = bulker.finish() + if rc.isOk: + size = rc.value + else: + check bulker.lastError == "" # force printing error + + fullNoise.say "***", " ela[]=", $ela.toSeq.mapIt(it.pp) + if ela[0].inNanoseconds != 0: + let + elaNs = ela.toSeq.mapIt(it.inNanoseconds.float) + elaPc = elaNs.mapIt(((it / elaNs[0]) * 100 + 0.5).int) + perRec = ((elaNs[0] / t32.len.float) + 0.5).int.initDuration + noisy.say "***", + "nRecords=", t32.len, ", ", + "perRecord=", perRec.pp, ", ", + "sstSize=", size.uint64.toSI, ", ", + "perRecord=", ((size.float / t32.len.float) + 0.5).int, ", ", + ["Total","Sorter","Append","Ingest"].zip(elaPc).ppKvPc + +proc test_dbTimingRocky33*( + noisy: bool; + t33: Table[ByteArray33,Blob], + cdb: ChainDb; + fullNoise = false; + ) = + ## Rocksdb, key length 33 + var + ela: array[4,Duration] + size: int64 + let rdb = cdb.rocksStoreRef + + # Note that 32 and 33 size keys cannot be usefiully merged into the same SST + # file. The keys must be added in a sorted mode. So playing safe, key sizes + # should be of equal length. + + if noisy: echo "" + noisy.showElapsed("Rocky bulk loader(keyLen 33)", ela[0]): + let bulker = RockyBulkLoadRef.init(rdb) + defer: bulker.destroy() + check bulker.begin("rocky-bulk-cache") + + var + kKeys: seq[byte] # need to cacscade + kTab: Table[byte,seq[NodeTag]] + + fullNoise.showElapsed("Rocky bulk loader/33, sorter", ela[1]): + for key in t33.keys: + if kTab.hasKey(key[0]): + kTab[key[0]].add key.toOpenArray(1,32).to(NodeTag) + else: + kTab[key[0]] = @[key.toOpenArray(1,32).to(NodeTag)] + + kKeys = toSeq(kTab.keys).sorted + for w in kKeys: + kTab[w].sort(cmp) + + fullNoise.showElapsed("Rocky bulk loader/33, append", ela[2]): + for w in kKeys: + fullNoise.say "***", " prefix=", w, " entries=", kTab[w].len + for n,nodeTag in kTab[w]: + let key = (w,nodeTag).to(Blob) + check bulker.add(key, t33[key.to(ByteArray33)]) + + fullNoise.showElapsed("Rocky bulk loader/33, slurp", ela[3]): + let rc = bulker.finish() + if rc.isOk: + size = rc.value + else: + check bulker.lastError == "" # force printing error + + fullNoise.say "***", " ela[]=", $ela.toSeq.mapIt(it.pp) + if ela[0].inNanoseconds != 0: + let + elaNs = ela.toSeq.mapIt(it.inNanoseconds.float) + elaPc = elaNs.mapIt(((it / elaNs[0]) * 100 + 0.5).int) + perRec = ((elaNs[0] / t33.len.float) + 0.5).int.initDuration + noisy.say "***", + "nRecords=", t33.len, ", ", + "perRecord=", perRec.pp, ", ", + "sstSize=", size.uint64.toSI, ", ", + "perRecord=", ((size.float / t33.len.float) + 0.5).int, ", ", + ["Total","Cascaded-Sorter","Append","Ingest"].zip(elaPc).ppKvPc + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/tests/test_sync_snap/test_helpers.nim b/tests/test_sync_snap/test_helpers.nim new file mode 100644 index 000000000..195c582f9 --- /dev/null +++ b/tests/test_sync_snap/test_helpers.nim @@ -0,0 +1,100 @@ +# Nimbus - Types, data structures and shared utilities used in network sync +# +# Copyright (c) 2018-2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or +# distributed except according to those terms. + +import + std/times, + eth/common, + stew/results, + unittest2, + ../../nimbus/sync/snap/range_desc, + ../../nimbus/sync/snap/worker/db/hexary_error, + ../../nimbus/sync/snap/worker/db/[hexary_desc, snapdb_accounts], + ../replay/pp + +# ------------------------------------------------------------------------------ +# Public helpers +# ------------------------------------------------------------------------------ + +proc isImportOk*(rc: Result[SnapAccountsGaps,HexaryError]): bool = + if rc.isErr: + check rc.error == NothingSerious # prints an error if different + elif 0 < rc.value.innerGaps.len: + check rc.value.innerGaps == seq[NodeSpecs].default + else: + return true + +# ------------------------------------------------------------------------------ +# Public type conversions +# ------------------------------------------------------------------------------ + +proc to*(b: openArray[byte]; T: type ByteArray32): T = + ## Convert to other representation (or exception) + if b.len == 32: + (addr result[0]).copyMem(unsafeAddr b[0], 32) + else: + doAssert b.len == 32 + +proc to*(b: openArray[byte]; T: type ByteArray33): T = + ## Convert to other representation (or exception) + if b.len == 33: + (addr result[0]).copyMem(unsafeAddr b[0], 33) + else: + doAssert b.len == 33 + +proc to*(b: ByteArray32|ByteArray33; T: type Blob): T = + b.toSeq + +proc to*(b: openArray[byte]; T: type NodeTag): T = + ## Convert from serialised equivalent + UInt256.fromBytesBE(b).T + +proc to*(w: (byte, NodeTag); T: type Blob): T = + let (b,t) = w + @[b] & toSeq(t.UInt256.toBytesBE) + +proc to*(t: NodeTag; T: type Blob): T = + toSeq(t.UInt256.toBytesBE) + +# ------------------------------------------------------------------------------ +# Public functions, pretty printing +# ------------------------------------------------------------------------------ + +proc pp*(rc: Result[Account,HexaryError]): string = + if rc.isErr: $rc.error else: rc.value.pp + +proc pp*(a: NodeKey; collapse = true): string = + a.to(Hash256).pp(collapse) + +proc pp*(d: Duration): string = + if 40 < d.inSeconds: + d.ppMins + elif 200 < d.inMilliseconds: + d.ppSecs + elif 200 < d.inMicroseconds: + d.ppMs + else: + d.ppUs + +proc ppKvPc*(w: openArray[(string,int)]): string = + w.mapIt(&"{it[0]}={it[1]}%").join(", ") + +proc say*(noisy = false; pfx = "***"; args: varargs[string, `$`]) = + if noisy: + if args.len == 0: + echo "*** ", pfx + elif 0 < pfx.len and pfx[^1] != ' ': + echo pfx, " ", args.toSeq.join + else: + echo pfx, args.toSeq.join + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/tests/test_sync_snap/test_inspect.nim b/tests/test_sync_snap/test_inspect.nim new file mode 100644 index 000000000..dc51a8e57 --- /dev/null +++ b/tests/test_sync_snap/test_inspect.nim @@ -0,0 +1,233 @@ +# Nimbus - Types, data structures and shared utilities used in network sync +# +# Copyright (c) 2018-2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or +# distributed except according to those terms. + +## Snap sync components tester and TDD environment + +import + std/[sequtils, strformat, strutils], + eth/[common, p2p, trie/db], + unittest2, + ../../nimbus/db/select_backend, + ../../nimbus/sync/snap/range_desc, + ../../nimbus/sync/snap/worker/db/[ + hexary_desc, hexary_inspect, hexary_paths, + rocky_bulk_load, snapdb_accounts, snapdb_desc], + ../replay/[pp, undump_accounts], + ./test_helpers + +# ------------------------------------------------------------------------------ +# Public test function +# ------------------------------------------------------------------------------ + +proc test_inspectSingleAccountsMemDb*( + inList: seq[seq[UndumpAccounts]]; + memBase: SnapDbRef; + singleStats: var seq[(int,TrieNodeStat)]; + ) = + ## Fingerprinting single accounts lists for in-memory-db (modifies + ## `singleStats`) + for n,accList in inList: + # Separate storage + let + root = accList[0].root + rootKey = root.to(NodeKey) + desc = SnapDbAccountsRef.init(memBase, root, Peer()) + for w in accList: + check desc.importAccounts(w.base, w.data, persistent=false).isImportOk + let stats = desc.hexaDb.hexaryInspectTrie(rootKey) + check not stats.stopped + let + dangling = stats.dangling.mapIt(it.partialPath) + keys = dangling.hexaryPathNodeKeys( + rootKey, desc.hexaDb, missingOk=true) + check dangling.len == keys.len + singleStats.add (desc.hexaDb.tab.len,stats) + + # Verify piecemeal approach for `hexaryInspectTrie()` ... + var + ctx = TrieNodeStatCtxRef() + piecemeal: HashSet[Blob] + while not ctx.isNil: + let stat2 = desc.hexaDb.hexaryInspectTrie( + rootKey, resumeCtx=ctx, suspendAfter=128) + check not stat2.stopped + ctx = stat2.resumeCtx + piecemeal.incl stat2.dangling.mapIt(it.partialPath).toHashSet + # Must match earlier all-in-one result + check dangling.len == piecemeal.len + check dangling.toHashSet == piecemeal + +proc test_inspectSingleAccountsPersistent*( + inList: seq[seq[UndumpAccounts]]; + dbSlotCb: proc(n: int): SnapDbRef; + singleStats: seq[(int,TrieNodeStat)]; + ) = + ## Fingerprinting single accounts listsfor persistent db" + for n,accList in inList: + let + root = accList[0].root + rootKey = root.to(NodeKey) + dbBase = n.dbSlotCb + if dbBase.isNil: + break + # Separate storage on persistent DB (leaving first db slot empty) + let desc = SnapDbAccountsRef.init(dbBase, root, Peer()) + + for w in accList: + check desc.importAccounts(w.base,w.data, persistent=true).isImportOk + let stats = desc.getAccountFn.hexaryInspectTrie(rootKey) + check not stats.stopped + let + dangling = stats.dangling.mapIt(it.partialPath) + keys = dangling.hexaryPathNodeKeys( + rootKey, desc.hexaDb, missingOk=true) + check dangling.len == keys.len + # Must be the same as the in-memory fingerprint + let ssn1 = singleStats[n][1].dangling.mapIt(it.partialPath) + check ssn1.toHashSet == dangling.toHashSet + + # Verify piecemeal approach for `hexaryInspectTrie()` ... + var + ctx = TrieNodeStatCtxRef() + piecemeal: HashSet[Blob] + while not ctx.isNil: + let stat2 = desc.getAccountFn.hexaryInspectTrie( + rootKey, resumeCtx=ctx, suspendAfter=128) + check not stat2.stopped + ctx = stat2.resumeCtx + piecemeal.incl stat2.dangling.mapIt(it.partialPath).toHashSet + # Must match earlier all-in-one result + check dangling.len == piecemeal.len + check dangling.toHashSet == piecemeal + +proc test_inspectAccountsInMemDb*( + inList: seq[seq[UndumpAccounts]]; + memBase: SnapDbRef; + accuStats: var seq[(int,TrieNodeStat)]; + ) = + ## Fingerprinting accumulated accounts for in-memory-db (updates `accuStats`) + let memDesc = SnapDbAccountsRef.init(memBase, Hash256(), Peer()) + + for n,accList in inList: + # Accumulated storage + let + root = accList[0].root + rootKey = root.to(NodeKey) + desc = memDesc.dup(root,Peer()) + for w in accList: + check desc.importAccounts(w.base, w.data, persistent=false).isImportOk + let stats = desc.hexaDb.hexaryInspectTrie(rootKey) + check not stats.stopped + let + dangling = stats.dangling.mapIt(it.partialPath) + keys = dangling.hexaryPathNodeKeys( + rootKey, desc.hexaDb, missingOk=true) + check dangling.len == keys.len + accuStats.add (desc.hexaDb.tab.len, stats) + +proc test_inspectAccountsPersistent*( + inList: seq[seq[UndumpAccounts]]; + cdb: ChainDb; + accuStats: seq[(int,TrieNodeStat)]; + ) = + ## Fingerprinting accumulated accounts for persistent db + let + perBase = SnapDbRef.init(cdb) + perDesc = SnapDbAccountsRef.init(perBase, Hash256(), Peer()) + + for n,accList in inList: + # Accumulated storage on persistent DB (using first db slot) + let + root = accList[0].root + rootKey = root.to(NodeKey) + rootSet = [rootKey].toHashSet + desc = perDesc.dup(root,Peer()) + for w in accList: + check desc.importAccounts(w.base, w.data, persistent=true).isImportOk + let stats = desc.getAccountFn.hexaryInspectTrie(rootKey) + check not stats.stopped + let + dangling = stats.dangling.mapIt(it.partialPath) + keys = dangling.hexaryPathNodeKeys( + rootKey, desc.hexaDb, missingOk=true) + check dangling.len == keys.len + check accuStats[n][1] == stats + +proc test_inspectCascadedMemDb*( + inList: seq[seq[UndumpAccounts]]; + ) = + ## Cascaded fingerprinting accounts for in-memory-db + let + cscBase = SnapDbRef.init(newMemoryDB()) + cscDesc = SnapDbAccountsRef.init(cscBase, Hash256(), Peer()) + var + cscStep: Table[NodeKey,(int,seq[Blob])] + + for n,accList in inList: + # Accumulated storage + let + root = accList[0].root + rootKey = root.to(NodeKey) + desc = cscDesc.dup(root,Peer()) + for w in accList: + check desc.importAccounts(w.base, w.data, persistent=false).isImportOk + if cscStep.hasKeyOrPut(rootKey, (1, seq[Blob].default)): + cscStep[rootKey][0].inc + let + stat0 = desc.hexaDb.hexaryInspectTrie(rootKey) + stats = desc.hexaDb.hexaryInspectTrie(rootKey, cscStep[rootKey][1]) + check not stat0.stopped + check not stats.stopped + let + accumulated = stat0.dangling.mapIt(it.partialPath).toHashSet + cascaded = stats.dangling.mapIt(it.partialPath).toHashSet + check accumulated == cascaded + # Make sure that there are no trivial cases + let trivialCases = toSeq(cscStep.values).filterIt(it[0] <= 1).len + check trivialCases == 0 + +proc test_inspectCascadedPersistent*( + inList: seq[seq[UndumpAccounts]]; + cdb: ChainDb; + ) = + ## Cascaded fingerprinting accounts for persistent db + let + cscBase = SnapDbRef.init(cdb) + cscDesc = SnapDbAccountsRef.init(cscBase, Hash256(), Peer()) + var + cscStep: Table[NodeKey,(int,seq[Blob])] + + for n,accList in inList: + # Accumulated storage + let + root = accList[0].root + rootKey = root.to(NodeKey) + desc = cscDesc.dup(root, Peer()) + for w in accList: + check desc.importAccounts(w.base, w.data, persistent=true).isImportOk + if cscStep.hasKeyOrPut(rootKey, (1, seq[Blob].default)): + cscStep[rootKey][0].inc + let + stat0 = desc.getAccountFn.hexaryInspectTrie(rootKey) + stats = desc.getAccountFn.hexaryInspectTrie(rootKey, cscStep[rootKey][1]) + check not stat0.stopped + check not stats.stopped + let + accumulated = stat0.dangling.mapIt(it.partialPath).toHashSet + cascaded = stats.dangling.mapIt(it.partialPath).toHashSet + check accumulated == cascaded + # Make sure that there are no trivial cases + let trivialCases = toSeq(cscStep.values).filterIt(it[0] <= 1).len + check trivialCases == 0 + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/tests/test_sync_snap/test_decompose.nim b/tests/test_sync_snap/test_node_range.nim similarity index 98% rename from tests/test_sync_snap/test_decompose.nim rename to tests/test_sync_snap/test_node_range.nim index fc2d482d5..8b6aac4a9 100644 --- a/tests/test_sync_snap/test_decompose.nim +++ b/tests/test_sync_snap/test_node_range.nim @@ -82,9 +82,9 @@ proc print_data( # Public test function # ------------------------------------------------------------------------------ -proc test_decompose*( +proc test_NodeRangeDecompose*( accKeys: seq[NodeKey]; ## Accounts key range - rootKey: NodeKey; ## State root + root: Hash256; ## State root db: HexaryTreeDbRef|HexaryGetFn; ## Database abstraction dbg: HexaryTreeDbRef; ## Debugging env ) = @@ -96,6 +96,7 @@ proc test_decompose*( const isPersistent = db.type is HexaryTreeDbRef let + rootKey = root.to(NodeKey) baseTag = accKeys[0].to(NodeTag) + 1.u256 firstTag = baseTag.hexaryNearbyRight(rootKey, db).get( otherwise = low(Nodetag)) diff --git a/tests/test_sync_snap/test_pivot.nim b/tests/test_sync_snap/test_pivot.nim new file mode 100644 index 000000000..ee58b7309 --- /dev/null +++ b/tests/test_sync_snap/test_pivot.nim @@ -0,0 +1,78 @@ +# Nimbus - Types, data structures and shared utilities used in network sync +# +# Copyright (c) 2018-2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or +# distributed except according to those terms. + +## Snap sync components tester and TDD environment + +import + std/[sequtils, strformat, strutils], + eth/[common, p2p], + unittest2, + ../../nimbus/db/select_backend, + ../../nimbus/sync/snap/range_desc, + ../../nimbus/sync/snap/worker/db/[snapdb_desc, snapdb_pivot] + +# ------------------------------------------------------------------------------ +# Public test function +# ------------------------------------------------------------------------------ + +proc test_pivotStoreRead*( + accKeys: seq[NodeKey]; + cdb: ChainDb; + ) = + ## Storing/retrieving items on persistent pivot/checkpoint registry + let + dbBase = SnapDbRef.init(cdb) + processed = @[(1.to(NodeTag),2.to(NodeTag)), + (4.to(NodeTag),5.to(NodeTag)), + (6.to(NodeTag),7.to(NodeTag))] + slotAccounts = seq[NodeKey].default + for n,w in accKeys: + check dbBase.savePivot( + SnapDbPivotRegistry( + header: BlockHeader(stateRoot: w.to(Hash256)), + nAccounts: n.uint64, + nSlotLists: n.uint64, + processed: processed, + slotAccounts: slotAccounts)).isOk + # verify latest state root + block: + let rc = dbBase.recoverPivot() + check rc.isOk + if rc.isOk: + check rc.value.nAccounts == n.uint64 + check rc.value.nSlotLists == n.uint64 + check rc.value.processed == processed + for n,w in accKeys: + block: + let rc = dbBase.recoverPivot(w) + check rc.isOk + if rc.isOk: + check rc.value.nAccounts == n.uint64 + check rc.value.nSlotLists == n.uint64 + # Update record in place + check dbBase.savePivot( + SnapDbPivotRegistry( + header: BlockHeader(stateRoot: w.to(Hash256)), + nAccounts: n.uint64, + nSlotLists: 0, + processed: @[], + slotAccounts: @[])).isOk + block: + let rc = dbBase.recoverPivot(w) + check rc.isOk + if rc.isOk: + check rc.value.nAccounts == n.uint64 + check rc.value.nSlotLists == 0 + check rc.value.processed == seq[(NodeTag,NodeTag)].default + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/tests/test_sync_snap/test_storage.nim b/tests/test_sync_snap/test_storage.nim new file mode 100644 index 000000000..93321b81c --- /dev/null +++ b/tests/test_sync_snap/test_storage.nim @@ -0,0 +1,108 @@ +# Nimbus - Types, data structures and shared utilities used in network sync +# +# Copyright (c) 2018-2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or +# distributed except according to those terms. + +## Snap sync components tester and TDD environment + +import + std/[sequtils, strformat, strutils, tables], + eth/[common, p2p], + unittest2, + ../../nimbus/db/select_backend, + ../../nimbus/sync/snap/range_desc, + ../../nimbus/sync/snap/worker/db/[ + hexary_desc, hexary_error, hexary_inspect, + snapdb_accounts, snapdb_desc, snapdb_storage_slots], + ../replay/[pp, undump_accounts, undump_storages], + ./test_helpers + +let + # Forces `check()` to print the error (as opposed when using `isOk()`) + OkStoDb = Result[void,seq[(int,HexaryError)]].ok() + +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +proc toStoDbRc(r: seq[HexaryNodeReport]): Result[void,seq[(int,HexaryError)]]= + ## Kludge: map error report to (older version) return code + if r.len != 0: + return err(r.mapIt((it.slot.get(otherwise = -1),it.error))) + ok() + +# ------------------------------------------------------------------------------ +# Public test function +# ------------------------------------------------------------------------------ + +proc test_storageAccountsImport*( + inList: seq[UndumpAccounts]; + dbBase: SnapDbRef; + persistent: bool; + ) = + ## Import and merge accounts lists + let + root = inList[0].root + + for w in inList: + let desc = SnapDbAccountsRef.init(dbBase, root, Peer()) + check desc.importAccounts(w.base, w.data, persistent).isImportOk + +proc test_storageSlotsImport*( + inList: seq[UndumpStorages]; + dbBase: SnapDbRef; + persistent: bool; + ignore: seq[(string,seq[(int,HexaryError)])]; + idPfx: string; + ) = + ## Import and merge storages lists + let + skipEntry = ignore.toTable + dbDesc = SnapDbStorageSlotsRef.init( + dbBase, NodeKey.default, Hash256(), Peer()) + + for n,w in inList: + let + testId = idPfx & "#" & $n + expRc = if skipEntry.hasKey(testId): + Result[void,seq[(int,HexaryError)]].err(skipEntry[testId]) + else: + OkStoDb + check dbDesc.importStorageSlots(w.data, persistent).toStoDbRc == expRc + +proc test_storageSlotsTries*( + inList: seq[UndumpStorages]; + dbBase: SnapDbRef; + persistent: bool; + ignore: seq[(string,seq[(int,HexaryError)])]; + idPfx: string; + ) = + ## Inspecting imported storages lists sub-tries + let + skipEntry = ignore.toTable + + for n,w in inList: + let + testId = idPfx & "#" & $n + errInx = if skipEntry.hasKey(testId): skipEntry[testId][0][0] + else: high(int) + for m in 0 ..< w.data.storages.len: + let + accKey = w.data.storages[m].account.accKey + root = w.data.storages[m].account.storageRoot + dbDesc = SnapDbStorageSlotsRef.init(dbBase, accKey, root, Peer()) + rc = dbDesc.inspectStorageSlotsTrie(persistent=persistent) + if m == errInx: + check rc == Result[TrieNodeStat,HexaryError].err(TrieIsEmpty) + else: + check rc.isOk # ok => level > 0 and not stopped + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------