mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-01-10 12:26:02 +00:00
465d694834
* Rename FilterID => QueueID why: The current usage does not identify a particular filter but uses it as storage tag to manage it on the database (to be organised in a set of FIFOs or queues.) * Split `aristo_filter` source into sub-files why: Make space for filter management API * Store filter queue IDs in pairs on the backend why: Any pair will will describe a FIFO accessed by bottom/top IDs * Reorg some source file names why: The "aristo_" prefix for make local/private files is tedious to use, so removed. * Implement filter slot scheduler details: Filters will be stored on the database on cascaded FIFOs. When a FIFO queue is full, some filter items are bundled together and stored on the next FIFO.
389 lines
12 KiB
Nim
389 lines
12 KiB
Nim
# Nimbus
|
|
# Copyright (c) 2018-2021 Status Research & Development GmbH
|
|
# Licensed under either of
|
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
|
# http://www.apache.org/licenses/LICENSE-2.0)
|
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
|
# http://opensource.org/licenses/MIT)
|
|
# at your option. This file may not be copied, modified, or
|
|
# distributed except according to those terms.
|
|
|
|
## Aristo (aka Patricia) DB records merge test
|
|
|
|
import
|
|
std/[algorithm, hashes, sequtils, sets, strutils, tables],
|
|
eth/common,
|
|
results,
|
|
unittest2,
|
|
../../nimbus/sync/protocol,
|
|
../../nimbus/db/aristo,
|
|
../../nimbus/db/aristo/[
|
|
aristo_debug,
|
|
aristo_desc,
|
|
aristo_desc/desc_backend,
|
|
aristo_hashify,
|
|
aristo_init/memory_db,
|
|
aristo_init/rocks_db,
|
|
aristo_persistent,
|
|
aristo_transcode,
|
|
aristo_vid],
|
|
./test_helpers
|
|
|
|
const
|
|
BlindHash = EmptyBlob.hash
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Private helpers
|
|
# ------------------------------------------------------------------------------
|
|
|
|
func hash(filter: FilterRef): Hash =
|
|
## Unique hash/filter -- cannot use de/blobify as the expressions
|
|
## `filter.blobify` and `filter.blobify.value.deblobify.value.blobify` are
|
|
## not necessarily the same binaries due to unsorted tables.
|
|
##
|
|
var h = BlindHash
|
|
if not filter.isNil:
|
|
h = h !& filter.src.ByteArray32.hash
|
|
h = h !& filter.trg.ByteArray32.hash
|
|
|
|
for w in filter.vGen.vidReorg:
|
|
h = h !& w.uint64.hash
|
|
|
|
for w in filter.sTab.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.VertexID):
|
|
let data = filter.sTab.getOrVoid(w).blobify.get(otherwise = EmptyBlob)
|
|
h = h !& (w.uint64.toBytesBE.toSeq & data).hash
|
|
|
|
for w in filter.kMap.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.VertexID):
|
|
let data = filter.kMap.getOrVoid(w).ByteArray32.toSeq
|
|
h = h !& (w.uint64.toBytesBE.toSeq & data).hash
|
|
|
|
!$h
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Private functions
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc mergeData(
|
|
db: AristoDbRef;
|
|
rootKey: HashKey;
|
|
rootVid: VertexID;
|
|
proof: openArray[SnapProof];
|
|
leafs: openArray[LeafTiePayload];
|
|
noisy: bool;
|
|
): bool =
|
|
## Simplified loop body of `test_mergeProofAndKvpList()`
|
|
if 0 < proof.len:
|
|
let rc = db.merge(rootKey, rootVid)
|
|
if rc.isErr:
|
|
check rc.error == AristoError(0)
|
|
return
|
|
|
|
let proved = db.merge(proof, rc.value)
|
|
if proved.error notin {AristoError(0),MergeHashKeyCachedAlready}:
|
|
check proved.error in {AristoError(0),MergeHashKeyCachedAlready}
|
|
return
|
|
|
|
let merged = db.merge leafs
|
|
if merged.error notin {AristoError(0), MergeLeafPathCachedAlready}:
|
|
check merged.error in {AristoError(0), MergeLeafPathCachedAlready}
|
|
return
|
|
|
|
block:
|
|
let rc = db.hashify # (noisy, true)
|
|
if rc.isErr:
|
|
when true: # and false:
|
|
noisy.say "***", "dataMerge(9)",
|
|
" nLeafs=", leafs.len,
|
|
"\n cache dump\n ", db.pp,
|
|
"\n backend dump\n ", db.to(TypedBackendRef).pp(db)
|
|
check rc.error == (VertexID(0),AristoError(0))
|
|
return
|
|
|
|
true
|
|
|
|
proc verify(
|
|
ly: LayerRef; # Database layer
|
|
be: MemBackendRef|RdbBackendRef; # Backend
|
|
noisy: bool;
|
|
): bool =
|
|
## ..
|
|
|
|
let
|
|
beSTab = be.walkVtx.toSeq.mapIt((it[1],it[2])).toTable
|
|
beKMap = be.walkKey.toSeq.mapIt((it[1],it[2])).toTable
|
|
|
|
for vid in beSTab.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.VertexID):
|
|
let
|
|
nVtx = ly.sTab.getOrVoid vid
|
|
mVtx = beSTab.getOrVoid vid
|
|
if not nVtx.isValid and not mVtx.isValid:
|
|
check nVtx != VertexRef(nil)
|
|
check mVtx != VertexRef(nil)
|
|
return
|
|
if nVtx != mVtx:
|
|
noisy.say "***", "verify",
|
|
" beType=", be.typeof,
|
|
" vid=", vid.pp,
|
|
" nVtx=", nVtx.pp,
|
|
" mVtx=", mVtx.pp
|
|
check nVtx == mVtx
|
|
return
|
|
|
|
if beSTab.len != ly.sTab.len or
|
|
beKMap.len != ly.kMap.len:
|
|
check beSTab.len == ly.sTab.len
|
|
check beKMap.len == ly.kMap.len
|
|
return
|
|
|
|
true
|
|
|
|
# -----------
|
|
|
|
proc collectFilter(
|
|
db: AristoDbRef;
|
|
filter: FilterRef;
|
|
tab: var Table[QueueID,Hash];
|
|
noisy: bool;
|
|
): bool =
|
|
## Store filter on permanent BE and register digest
|
|
if not filter.isNil:
|
|
let
|
|
fid = QueueID(7 * (tab.len + 1)) # just some number
|
|
be = db.backend
|
|
tx = be.putBegFn()
|
|
|
|
be.putFilFn(tx, @[(fid,filter)])
|
|
let endOk = be.putEndFn tx
|
|
if endOk != AristoError(0):
|
|
check endOk == AristoError(0)
|
|
return
|
|
|
|
tab[fid] = filter.hash
|
|
|
|
true
|
|
|
|
proc verifyFiltersImpl[T: MemBackendRef|RdbBackendRef](
|
|
_: type T;
|
|
db: AristoDbRef;
|
|
tab: Table[QueueID,Hash];
|
|
noisy: bool;
|
|
): bool =
|
|
## Compare stored filters against registered ones
|
|
var n = 0
|
|
for (_,fid,filter) in T.walkFilBe db:
|
|
let
|
|
filterHash = filter.hash
|
|
registered = tab.getOrDefault(fid, BlindHash)
|
|
if registered == BlindHash:
|
|
check (fid,registered) != (0,BlindHash)
|
|
return
|
|
if filterHash != registered:
|
|
noisy.say "***", "verifyFiltersImpl",
|
|
" n=", n+1,
|
|
" fid=", fid.pp,
|
|
" filterHash=", filterHash.int.toHex,
|
|
" registered=", registered.int.toHex
|
|
check (fid,filterHash) == (fid,registered)
|
|
return
|
|
n.inc
|
|
|
|
if n != tab.len:
|
|
check n == tab.len
|
|
return
|
|
|
|
true
|
|
|
|
proc verifyFilters(
|
|
db: AristoDbRef;
|
|
tab: Table[QueueID,Hash];
|
|
noisy: bool;
|
|
): bool =
|
|
## Wrapper
|
|
let
|
|
be = db.to(TypedBackendRef)
|
|
kind = (if be.isNil: BackendVoid else: be.kind)
|
|
case kind:
|
|
of BackendMemory:
|
|
return MemBackendRef.verifyFiltersImpl(db, tab, noisy)
|
|
of BackendRocksDB:
|
|
return RdbBackendRef.verifyFiltersImpl(db, tab, noisy)
|
|
else:
|
|
discard
|
|
check kind == BackendMemory or kind == BackendRocksDB
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public test function
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc test_backendConsistency*(
|
|
noisy: bool;
|
|
list: openArray[ProofTrieData]; # Test data
|
|
rdbPath: string; # Rocks DB storage directory
|
|
resetDb = false;
|
|
doRdbOk = true;
|
|
): bool =
|
|
## Import accounts
|
|
var
|
|
filTab: Table[QueueID,Hash] # Filter register
|
|
ndb = AristoDbRef() # Reference cache
|
|
mdb = AristoDbRef() # Memory backend database
|
|
rdb = AristoDbRef() # Rocks DB backend database
|
|
rootKey = HashKey.default
|
|
count = 0
|
|
|
|
defer:
|
|
rdb.finish(flush=true)
|
|
|
|
for n,w in list:
|
|
if w.root != rootKey or resetDB:
|
|
rootKey = w.root
|
|
count = 0
|
|
ndb = newAristoDbRef BackendVoid
|
|
mdb = newAristoDbRef BackendMemory
|
|
if doRdbOk:
|
|
if not rdb.backend.isNil: # ignore bootstrap
|
|
let verifyFiltersOk = rdb.verifyFilters(filTab, noisy)
|
|
if not verifyFiltersOk:
|
|
check verifyFiltersOk
|
|
return
|
|
filTab.clear
|
|
rdb.finish(flush=true)
|
|
let rc = newAristoDbRef(BackendRocksDB,rdbPath)
|
|
if rc.isErr:
|
|
check rc.error == 0
|
|
return
|
|
rdb = rc.value
|
|
count.inc
|
|
|
|
check ndb.backend.isNil
|
|
check not mdb.backend.isNil
|
|
check doRdbOk or not rdb.backend.isNil
|
|
|
|
when true and false:
|
|
noisy.say "***", "beCon(1) <", n, "/", list.len-1, ">", " groups=", count
|
|
|
|
block:
|
|
let
|
|
rootVid = VertexID(1)
|
|
leafs = w.kvpLst.mapRootVid VertexID(1) # for merging it into main trie
|
|
|
|
block:
|
|
let ndbOk = ndb.mergeData(
|
|
rootKey, rootVid, w.proof, leafs, noisy=false)
|
|
if not ndbOk:
|
|
check ndbOk
|
|
return
|
|
block:
|
|
let mdbOk = mdb.mergeData(
|
|
rootKey, rootVid, w.proof, leafs, noisy=false)
|
|
if not mdbOk:
|
|
check mdbOk
|
|
return
|
|
if doRdbOk: # optional
|
|
let rdbOk = rdb.mergeData(
|
|
rootKey, rootVid, w.proof, leafs, noisy=false)
|
|
if not rdbOk:
|
|
check rdbOk
|
|
return
|
|
|
|
when true and false:
|
|
noisy.say "***", "beCon(2) <", n, "/", list.len-1, ">",
|
|
" groups=", count,
|
|
"\n cache dump\n ", ndb.pp,
|
|
"\n backend dump\n ", ndb.to(TypedBackendRef).pp(ndb),
|
|
"\n -------------",
|
|
"\n mdb cache\n ", mdb.pp,
|
|
"\n mdb backend\n ", mdb.to(TypedBackendRef).pp(ndb),
|
|
"\n -------------",
|
|
"\n rdb cache\n ", rdb.pp,
|
|
"\n rdb backend\n ", rdb.to(TypedBackendRef).pp(ndb),
|
|
"\n -------------"
|
|
|
|
when true and false:
|
|
noisy.say "***", "beCon(4) <", n, "/", list.len-1, ">", " groups=", count
|
|
|
|
var
|
|
mdbPreSaveCache, mdbPreSaveBackend: string
|
|
rdbPreSaveCache, rdbPreSaveBackend: string
|
|
when true: # and false:
|
|
#mdbPreSaveCache = mdb.pp
|
|
#mdbPreSaveBackend = mdb.to(MemBackendRef).pp(ndb)
|
|
rdbPreSaveCache = rdb.pp
|
|
rdbPreSaveBackend = rdb.to(RdbBackendRef).pp(ndb)
|
|
|
|
|
|
# Provide filter, store filter on permanent BE, and register filter digest
|
|
block:
|
|
let rc = mdb.stow(persistent=false, dontHashify=true, chunkedMpt=true)
|
|
if rc.isErr:
|
|
check rc.error == (0,0)
|
|
return
|
|
let collectFilterOk = rdb.collectFilter(mdb.roFilter, filTab, noisy)
|
|
if not collectFilterOk:
|
|
check collectFilterOk
|
|
return
|
|
|
|
# Store onto backend database
|
|
block:
|
|
#noisy.say "***", "db-dump\n ", mdb.pp
|
|
let rc = mdb.stow(persistent=true, dontHashify=true, chunkedMpt=true)
|
|
if rc.isErr:
|
|
check rc.error == (0,0)
|
|
return
|
|
|
|
if doRdbOk:
|
|
let rc = rdb.stow(persistent=true, dontHashify=true, chunkedMpt=true)
|
|
if rc.isErr:
|
|
check rc.error == (0,0)
|
|
return
|
|
|
|
if not ndb.top.verify(mdb.to(MemBackendRef), noisy):
|
|
when true and false:
|
|
noisy.say "***", "beCon(4) <", n, "/", list.len-1, ">",
|
|
" groups=", count,
|
|
"\n ndb cache\n ", ndb.pp,
|
|
"\n ndb backend=", ndb.backend.isNil.not,
|
|
#"\n -------------",
|
|
#"\n mdb pre-save cache\n ", mdbPreSaveCache,
|
|
#"\n mdb pre-save backend\n ", mdbPreSaveBackend,
|
|
"\n -------------",
|
|
"\n mdb cache\n ", mdb.pp,
|
|
"\n mdb backend\n ", mdb.to(TypedBackendRef).pp(ndb),
|
|
"\n -------------"
|
|
return
|
|
|
|
if doRdbOk:
|
|
if not ndb.top.verify(rdb.to(RdbBackendRef), noisy):
|
|
when true and false:
|
|
noisy.say "***", "beCon(4) <", n, "/", list.len-1, ">",
|
|
" groups=", count,
|
|
"\n ndb cache\n ", ndb.pp,
|
|
"\n ndb backend=", ndb.backend.isNil.not,
|
|
"\n -------------",
|
|
"\n rdb pre-save cache\n ", rdbPreSaveCache,
|
|
"\n rdb pre-save backend\n ", rdbPreSaveBackend,
|
|
"\n -------------",
|
|
"\n rdb cache\n ", rdb.pp,
|
|
"\n rdb backend\n ", rdb.to(TypedBackendRef).pp(ndb),
|
|
#"\n -------------",
|
|
#"\n mdb cache\n ", mdb.pp,
|
|
#"\n mdb backend\n ", mdb.to(TypedBackendRef).pp(ndb),
|
|
"\n -------------"
|
|
return
|
|
|
|
when true and false:
|
|
noisy.say "***", "beCon(9) <", n, "/", list.len-1, ">", " groups=", count
|
|
|
|
# Finally ...
|
|
block:
|
|
let verifyFiltersOk = rdb.verifyFilters(filTab, noisy)
|
|
if not verifyFiltersOk:
|
|
check verifyFiltersOk
|
|
return
|
|
|
|
true
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# End
|
|
# ------------------------------------------------------------------------------
|