mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-02-02 15:24:01 +00:00
Aristo db store filters on backend (#1703)
* Simplify RocksDB sub-tables iterator * Implement `filter` storage on backend db details: Unit tests working
This commit is contained in:
parent
38c0c34331
commit
124ac064c6
@ -414,12 +414,12 @@ i.e. the last byte of a serialised record.
|
||||
| 0000 1000 | 0x08 | Branch record | 4.1 |
|
||||
| 10xx xxxx | 0x80 + x(6) | Extension record | 4.2 |
|
||||
| 11xx xxxx | 0xC0 + x(6) | Leaf record | 4.3 |
|
||||
| 0xxx 0yyy | (x(3)<<4) + y(3) | account payload | 4.4 |
|
||||
| 0xxx 0yyy | (x(3)<<4) + y(3) | Account payload | 4.4 |
|
||||
| 0110 1010 | 0x6a | RLP encoded payload | 4.5 |
|
||||
| 0110 1011 | 0x6b | unstructured payload | 4.6 |
|
||||
| 0111 1100 | 0x7c | list of vertex IDs | 4.7 |
|
||||
| 0110 1011 | 0x6b | Unstructured payload | 4.6 |
|
||||
| 0111 1100 | 0x7c | List of vertex IDs | 4.7 |
|
||||
| 0111 1101 | 0x7d | Filter record | 4.8 |
|
||||
| 0111 1110 | 0x7e | list of vertex IDs | 4.9 |
|
||||
| 0111 1110 | 0x7e | List of vertex IDs | 4.9 |
|
||||
|
||||
5. *Patricia Trie* implementation notes
|
||||
---------------------------------------
|
||||
|
@ -24,6 +24,9 @@ const
|
||||
EmptyVidSeq* = seq[VertexID].default
|
||||
## Useful shortcut
|
||||
|
||||
EmptyFidSeq* = seq[FilterID].default
|
||||
## Useful shortcut
|
||||
|
||||
VOID_CODE_HASH* = EMPTY_CODE_HASH
|
||||
## Equivalent of `nil` for `Account` object code hash
|
||||
|
||||
|
@ -85,6 +85,12 @@ proc ppVid(vid: VertexID; pfx = true): string =
|
||||
else:
|
||||
result &= "ø"
|
||||
|
||||
proc ppFid(fid: FilterID): string =
|
||||
if fid.isValid:
|
||||
"%" & fid.uint64.toHex.stripZeros.toLowerAscii
|
||||
else:
|
||||
"ø"
|
||||
|
||||
proc ppVidList(vGen: openArray[VertexID]): string =
|
||||
"[" & vGen.mapIt(it.ppVid).join(",") & "]"
|
||||
|
||||
@ -444,6 +450,9 @@ proc pp*(lty: LeafTie, db = AristoDbRef()): string =
|
||||
proc pp*(vid: VertexID): string =
|
||||
vid.ppVid
|
||||
|
||||
proc pp*(fid: FilterID): string =
|
||||
fid.ppFid
|
||||
|
||||
proc pp*(vGen: openArray[VertexID]): string =
|
||||
vGen.ppVidList
|
||||
|
||||
|
@ -29,11 +29,21 @@ type
|
||||
## Generic backend database retrieval function for a single
|
||||
## `Aristo DB` hash lookup value.
|
||||
|
||||
GetFilFn* =
|
||||
proc(fid: FilterID): Result[FilterRef,AristoError]
|
||||
{.gcsafe, raises: [].}
|
||||
## Generic backend database retrieval function for a filter record.
|
||||
|
||||
GetIdgFn* =
|
||||
proc(): Result[seq[VertexID],AristoError] {.gcsafe, raises: [].}
|
||||
## Generic backend database retrieval function for a the ID generator
|
||||
## `Aristo DB` state record.
|
||||
|
||||
GetFasFn* =
|
||||
proc(): Result[seq[FilterID],AristoError] {.gcsafe, raises: [].}
|
||||
## Generic backend database retrieval function for some administartion
|
||||
## of the filters (e.g. the top ID.)
|
||||
|
||||
# -------------
|
||||
|
||||
PutHdlRef* = ref object of RootRef
|
||||
@ -58,12 +68,23 @@ type
|
||||
## Generic backend database bulk storage function, `VOID_HASH_KEY`
|
||||
## values indicate that records should be deleted.
|
||||
|
||||
PutFilFn* =
|
||||
proc(hdl: PutHdlRef; vf: openArray[(FilterID,FilterRef)])
|
||||
{.gcsafe, raises: [].}
|
||||
## Generic backend database storage function for filter records.
|
||||
|
||||
PutIdgFn* =
|
||||
proc(hdl: PutHdlRef; vs: openArray[VertexID])
|
||||
{.gcsafe, raises: [].}
|
||||
## Generic backend database ID generator state storage function. This
|
||||
## function replaces the current generator state.
|
||||
|
||||
PutFasFn* =
|
||||
proc(hdl: PutHdlRef; vs: openArray[FilterID])
|
||||
{.gcsafe, raises: [].}
|
||||
## Generic backend database filter ID state storage function. This
|
||||
## function replaces the current filter ID state.
|
||||
|
||||
PutEndFn* =
|
||||
proc(hdl: PutHdlRef): AristoError {.gcsafe, raises: [].}
|
||||
## Generic transaction termination function
|
||||
@ -83,12 +104,18 @@ type
|
||||
## Backend interface.
|
||||
getVtxFn*: GetVtxFn ## Read vertex record
|
||||
getKeyFn*: GetKeyFn ## Read Merkle hash/key
|
||||
getIdgFn*: GetIdgFn ## Read ID generator state
|
||||
getFilFn*: GetFilFn ## Read back log filter
|
||||
getIdgFn*: GetIdgFn ## Read vertex ID generator state
|
||||
getFasFn*: GetFasFn ## Read filter ID state
|
||||
|
||||
putBegFn*: PutBegFn ## Start bulk store session
|
||||
putVtxFn*: PutVtxFn ## Bulk store vertex records
|
||||
putKeyFn*: PutKeyFn ## Bulk store vertex hashes
|
||||
putFilFn*: PutFilFn ## Store back log filter
|
||||
putIdgFn*: PutIdgFn ## Store ID generator state
|
||||
putFasFn*: PutFasFn ## Store filter ID state
|
||||
putEndFn*: PutEndFn ## Commit bulk store session
|
||||
|
||||
closeFn*: CloseFn ## Generic destructor
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -36,6 +36,15 @@ proc getIdgUBE*(
|
||||
return be.getIdgFn()
|
||||
err(GetIdgNotFound)
|
||||
|
||||
proc getFasUBE*(
|
||||
db: AristoDbRef;
|
||||
): Result[seq[FilterID],AristoError] =
|
||||
## Get the list of filter IDs unfiltered backened if available.
|
||||
let be = db.backend
|
||||
if not be.isNil:
|
||||
return be.getFasFn()
|
||||
err(GetFasNotFound)
|
||||
|
||||
proc getVtxUBE*(
|
||||
db: AristoDbRef;
|
||||
vid: VertexID;
|
||||
@ -56,6 +65,16 @@ proc getKeyUBE*(
|
||||
return be.getKeyFn vid
|
||||
err GetKeyNotFound
|
||||
|
||||
proc getFilUBE*(
|
||||
db: AristoDbRef;
|
||||
fid: FilterID;
|
||||
): Result[FilterRef,AristoError] =
|
||||
## Get the filter from the unfiltered backened if available.
|
||||
let be = db.backend
|
||||
if not be.isNil:
|
||||
return be.getFilFn fid
|
||||
err GetFilNotFound
|
||||
|
||||
# ------------------
|
||||
|
||||
proc getIdgBE*(
|
||||
|
@ -29,6 +29,7 @@ type
|
||||
AdmPfx = 1 ## Admin data, e.g. ID generator
|
||||
VtxPfx = 2 ## Vertex data
|
||||
KeyPfx = 3 ## Key/hash data
|
||||
FilPfx = 4 ## Filter logs (to revert to earlier state)
|
||||
|
||||
AdminTabID* = distinct uint64
|
||||
## Access keys for admin table records. When exposed (e.g. when itereating
|
||||
@ -44,6 +45,8 @@ type
|
||||
case pfx*: StorageType ## Error sub-table
|
||||
of VtxPfx, KeyPfx:
|
||||
vid*: VertexID ## Vertex ID where the error occured
|
||||
of FilPfx:
|
||||
fid*: FilterID ## Ditto
|
||||
of AdmPfx, Oops:
|
||||
discard
|
||||
code*: AristoError ## Error code (if any)
|
||||
@ -55,6 +58,7 @@ type
|
||||
|
||||
const
|
||||
AdmTabIdIdg* = AdminTabID(0) ## Access key for vertex ID generator state
|
||||
AdmTabIdFas* = AdminTabID(1) ## Access key for filter state
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public helpers
|
||||
|
@ -42,12 +42,16 @@ type
|
||||
## Inheriting table so access can be extended for debugging purposes
|
||||
sTab: Table[VertexID,Blob] ## Structural vertex table making up a trie
|
||||
kMap: Table[VertexID,HashKey] ## Merkle hash key mapping
|
||||
rFil: Table[FilterID,Blob] ## Backend filters
|
||||
vGen: Option[seq[VertexID]]
|
||||
vFas: Option[seq[FilterID]]
|
||||
|
||||
MemPutHdlRef = ref object of TypedPutHdlRef
|
||||
sTab: Table[VertexID,Blob]
|
||||
kMap: Table[VertexID,HashKey]
|
||||
rFil: Table[FilterID,Blob]
|
||||
vGen: Option[seq[VertexID]]
|
||||
vFas: Option[seq[FilterID]]
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private helpers
|
||||
@ -93,6 +97,14 @@ proc getKeyFn(db: MemBackendRef): GetKeyFn =
|
||||
return ok key
|
||||
err(GetKeyNotFound)
|
||||
|
||||
proc getFilFn(db: MemBackendRef): GetFilFn =
|
||||
result =
|
||||
proc(fid: FilterID): Result[FilterRef,AristoError] =
|
||||
let data = db.rFil.getOrDefault(fid, EmptyBlob)
|
||||
if 0 < data.len:
|
||||
return data.deblobify FilterRef
|
||||
err(GetFilNotFound)
|
||||
|
||||
proc getIdgFn(db: MemBackendRef): GetIdgFn =
|
||||
result =
|
||||
proc(): Result[seq[VertexID],AristoError]=
|
||||
@ -100,6 +112,13 @@ proc getIdgFn(db: MemBackendRef): GetIdgFn =
|
||||
return ok db.vGen.unsafeGet
|
||||
err(GetIdgNotFound)
|
||||
|
||||
proc getFasFn(db: MemBackendRef): GetFasFn =
|
||||
result =
|
||||
proc(): Result[seq[FilterID],AristoError]=
|
||||
if db.vFas.isSome:
|
||||
return ok db.vFas.unsafeGet
|
||||
err(GetFasNotFound)
|
||||
|
||||
# -------------
|
||||
|
||||
proc putBegFn(db: MemBackendRef): PutBegFn =
|
||||
@ -134,6 +153,21 @@ proc putKeyFn(db: MemBackendRef): PutKeyFn =
|
||||
for (vid,key) in vkps:
|
||||
hdl.kMap[vid] = key
|
||||
|
||||
proc putFilFn(db: MemBackendRef): PutFilFn =
|
||||
result =
|
||||
proc(hdl: PutHdlRef; vf: openArray[(FilterID,FilterRef)]) =
|
||||
let hdl = hdl.getSession db
|
||||
if hdl.error.isNil:
|
||||
for (fid,filter) in vf:
|
||||
let rc = filter.blobify()
|
||||
if rc.isErr:
|
||||
hdl.error = TypedPutHdlErrRef(
|
||||
pfx: FilPfx,
|
||||
fid: fid,
|
||||
code: rc.error)
|
||||
return
|
||||
hdl.rFil[fid] = rc.value
|
||||
|
||||
proc putIdgFn(db: MemBackendRef): PutIdgFn =
|
||||
result =
|
||||
proc(hdl: PutHdlRef; vs: openArray[VertexID]) =
|
||||
@ -141,6 +175,13 @@ proc putIdgFn(db: MemBackendRef): PutIdgFn =
|
||||
if hdl.error.isNil:
|
||||
hdl.vGen = some(vs.toSeq)
|
||||
|
||||
proc putFasFn(db: MemBackendRef): PutFasFn =
|
||||
result =
|
||||
proc(hdl: PutHdlRef; fs: openArray[FilterID]) =
|
||||
let hdl = hdl.getSession db
|
||||
if hdl.error.isNil:
|
||||
hdl.vFas = some(fs.toSeq)
|
||||
|
||||
|
||||
proc putEndFn(db: MemBackendRef): PutEndFn =
|
||||
result =
|
||||
@ -151,6 +192,9 @@ proc putEndFn(db: MemBackendRef): PutEndFn =
|
||||
of VtxPfx, KeyPfx:
|
||||
debug logTxt "putEndFn: vtx/key failed",
|
||||
pfx=hdl.error.pfx, vid=hdl.error.vid, error=hdl.error.code
|
||||
of FilPfx:
|
||||
debug logTxt "putEndFn: filter failed",
|
||||
pfx=hdl.error.pfx, fid=hdl.error.fid, error=hdl.error.code
|
||||
else:
|
||||
debug logTxt "putEndFn: failed",
|
||||
pfx=hdl.error.pfx, error=hdl.error.code
|
||||
@ -168,12 +212,26 @@ proc putEndFn(db: MemBackendRef): PutEndFn =
|
||||
else:
|
||||
db.kMap.del vid
|
||||
|
||||
for (fid,data) in hdl.rFil.pairs:
|
||||
if fid.isValid:
|
||||
db.rFil[fid] = data
|
||||
else:
|
||||
db.rFil.del fid
|
||||
|
||||
if hdl.vGen.isSome:
|
||||
let vGen = hdl.vGen.unsafeGet
|
||||
if vGen.len == 0:
|
||||
db.vGen = none(seq[VertexID])
|
||||
else:
|
||||
db.vGen = some(vGen)
|
||||
|
||||
if hdl.vFas.isSome:
|
||||
let vFas = hdl.vFas.unsafeGet
|
||||
if vFas.len == 0:
|
||||
db.vFas = none(seq[FilterID])
|
||||
else:
|
||||
db.vFas = some(vFas)
|
||||
|
||||
AristoError(0)
|
||||
|
||||
# -------------
|
||||
@ -192,12 +250,16 @@ proc memoryBackend*(): BackendRef =
|
||||
|
||||
db.getVtxFn = getVtxFn db
|
||||
db.getKeyFn = getKeyFn db
|
||||
db.getFilFn = getFilFn db
|
||||
db.getIdgFn = getIdgFn db
|
||||
db.getFasFn = getFasFn db
|
||||
|
||||
db.putBegFn = putBegFn db
|
||||
db.putVtxFn = putVtxFn db
|
||||
db.putKeyFn = putKeyFn db
|
||||
db.putFilFn = putFilFn db
|
||||
db.putIdgFn = putIdgFn db
|
||||
db.putFasFn = putFasFn db
|
||||
db.putEndFn = putEndFn db
|
||||
|
||||
db.closeFn = closeFn db
|
||||
@ -230,6 +292,20 @@ iterator walkKey*(
|
||||
if key.isValid:
|
||||
yield (n, vid, key)
|
||||
|
||||
iterator walkFil*(
|
||||
be: MemBackendRef;
|
||||
): tuple[n: int, fid: FilterID, filter: FilterRef] =
|
||||
## Iteration over the vertex sub-table.
|
||||
for n,fid in be.rFil.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.FilterID):
|
||||
let data = be.rFil.getOrDefault(fid, EmptyBlob)
|
||||
if 0 < data.len:
|
||||
let rc = data.deblobify FilterRef
|
||||
if rc.isErr:
|
||||
debug logTxt "walkFilFn() skip", n, fid, error=rc.error
|
||||
else:
|
||||
yield (n, fid, rc.value)
|
||||
|
||||
|
||||
iterator walk*(
|
||||
be: MemBackendRef;
|
||||
): tuple[n: int, pfx: StorageType, xid: uint64, data: Blob] =
|
||||
@ -238,10 +314,15 @@ iterator walk*(
|
||||
## Non-decodable entries are stepped over while the counter `n` of the
|
||||
## yield record is still incremented.
|
||||
var n = 0
|
||||
|
||||
if be.vGen.isSome:
|
||||
yield(0, AdmPfx, AdmTabIdIdg.uint64, be.vGen.unsafeGet.blobify)
|
||||
n.inc
|
||||
|
||||
if be.vFas.isSome:
|
||||
yield(0, AdmPfx, AdmTabIdFas.uint64, be.vFas.unsafeGet.blobify)
|
||||
n.inc
|
||||
|
||||
for vid in be.sTab.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.VertexID):
|
||||
let data = be.sTab.getOrDefault(vid, EmptyBlob)
|
||||
if 0 < data.len:
|
||||
@ -252,6 +333,12 @@ iterator walk*(
|
||||
yield (n, KeyPfx, vid.uint64, key.to(Blob))
|
||||
n.inc
|
||||
|
||||
for lid in be.rFil.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.FilterID):
|
||||
let data = be.rFil.getOrDefault(lid, EmptyBlob)
|
||||
if 0 < data.len:
|
||||
yield (n, FilPfx, lid.uint64, data)
|
||||
n.inc
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -83,6 +83,9 @@ proc `vtxCache=`(hdl: RdbPutHdlRef; val: tuple[vid: VertexID; data: Blob]) =
|
||||
proc `keyCache=`(hdl: RdbPutHdlRef; val: tuple[vid: VertexID; data: Blob]) =
|
||||
hdl.cache[KeyPfx][val.vid.uint64] = val.data
|
||||
|
||||
proc `filCache=`(hdl: RdbPutHdlRef; val: tuple[fid: FilterID; data: Blob]) =
|
||||
hdl.cache[FilPfx][val.fid.uint64] = val.data
|
||||
|
||||
proc `admCache=`(hdl: RdbPutHdlRef; val: tuple[id: AdminTabID; data: Blob]) =
|
||||
hdl.cache[AdmPfx][val.id.uint64] = val.data
|
||||
|
||||
@ -126,12 +129,29 @@ proc getKeyFn(db: RdbBackendRef): GetKeyFn =
|
||||
|
||||
err(GetKeyNotFound)
|
||||
|
||||
proc getFilFn(db: RdbBackendRef): GetFilFn =
|
||||
result =
|
||||
proc(fid: FilterID): Result[FilterRef,AristoError] =
|
||||
|
||||
# Fetch serialised data record
|
||||
let rc = db.rdb.get fid.toOpenArray()
|
||||
if rc.isErr:
|
||||
debug logTxt "getFilFn: failed", fid,
|
||||
error=rc.error[0], info=rc.error[1]
|
||||
return err(rc.error[0])
|
||||
|
||||
# Decode data record
|
||||
if 0 < rc.value.len:
|
||||
return rc.value.deblobify FilterRef
|
||||
|
||||
err(GetFilNotFound)
|
||||
|
||||
proc getIdgFn(db: RdbBackendRef): GetIdgFn =
|
||||
result =
|
||||
proc(): Result[seq[VertexID],AristoError]=
|
||||
|
||||
# Fetch serialised data record
|
||||
let rc = db.rdb.get AdmTabId_Idg.toOpenArray()
|
||||
let rc = db.rdb.get AdmTabIdIdg.toOpenArray()
|
||||
if rc.isErr:
|
||||
debug logTxt "getIdgFn: failed", error=rc.error[1]
|
||||
return err(rc.error[0])
|
||||
@ -143,6 +163,23 @@ proc getIdgFn(db: RdbBackendRef): GetIdgFn =
|
||||
# Decode data record
|
||||
rc.value.deblobify seq[VertexID]
|
||||
|
||||
proc getFasFn(db: RdbBackendRef): GetFasFn =
|
||||
result =
|
||||
proc(): Result[seq[FilterID],AristoError]=
|
||||
|
||||
# Fetch serialised data record
|
||||
let rc = db.rdb.get AdmTabIdFas.toOpenArray()
|
||||
if rc.isErr:
|
||||
debug logTxt "getFosFn: failed", error=rc.error[1]
|
||||
return err(rc.error[0])
|
||||
|
||||
if rc.value.len == 0:
|
||||
let w = EmptyFidSeq
|
||||
return ok w
|
||||
|
||||
# Decode data record
|
||||
rc.value.deblobify seq[FilterID]
|
||||
|
||||
# -------------
|
||||
|
||||
proc putBegFn(db: RdbBackendRef): PutBegFn =
|
||||
@ -180,15 +217,43 @@ proc putKeyFn(db: RdbBackendRef): PutKeyFn =
|
||||
else:
|
||||
hdl.keyCache = (vid, EmptyBlob)
|
||||
|
||||
proc putFilFn(db: RdbBackendRef): PutFilFn =
|
||||
result =
|
||||
proc(hdl: PutHdlRef; vrps: openArray[(FilterID,FilterRef)]) =
|
||||
let hdl = hdl.getSession db
|
||||
if hdl.error.isNil:
|
||||
for (fid,filter) in vrps:
|
||||
if filter.isValid:
|
||||
let rc = filter.blobify()
|
||||
if rc.isErr:
|
||||
hdl.error = TypedPutHdlErrRef(
|
||||
pfx: FilPfx,
|
||||
fid: fid,
|
||||
code: rc.error)
|
||||
return
|
||||
hdl.filCache = (fid, rc.value)
|
||||
else:
|
||||
hdl.filCache = (fid, EmptyBlob)
|
||||
|
||||
proc putIdgFn(db: RdbBackendRef): PutIdgFn =
|
||||
result =
|
||||
proc(hdl: PutHdlRef; vs: openArray[VertexID]) =
|
||||
let hdl = hdl.getSession db
|
||||
if hdl.error.isNil:
|
||||
if 0 < vs.len:
|
||||
hdl.admCache = (AdmTabId_Idg, vs.blobify)
|
||||
hdl.admCache = (AdmTabIdIdg, vs.blobify)
|
||||
else:
|
||||
hdl.admCache = (AdmTabId_Idg, EmptyBlob)
|
||||
hdl.admCache = (AdmTabIdIdg, EmptyBlob)
|
||||
|
||||
proc putFasFn(db: RdbBackendRef): PutFasFn =
|
||||
result =
|
||||
proc(hdl: PutHdlRef; vs: openArray[FilterID]) =
|
||||
let hdl = hdl.getSession db
|
||||
if hdl.error.isNil:
|
||||
if 0 < vs.len:
|
||||
hdl.admCache = (AdmTabIdFas, vs.blobify)
|
||||
else:
|
||||
hdl.admCache = (AdmTabIdFas, EmptyBlob)
|
||||
|
||||
|
||||
proc putEndFn(db: RdbBackendRef): PutEndFn =
|
||||
@ -234,12 +299,16 @@ proc rocksDbBackend*(path: string): Result[BackendRef,AristoError] =
|
||||
|
||||
db.getVtxFn = getVtxFn db
|
||||
db.getKeyFn = getKeyFn db
|
||||
db.getFilFn = getFilFn db
|
||||
db.getIdgFn = getIdgFn db
|
||||
db.getFasFn = getFasFn db
|
||||
|
||||
db.putBegFn = putBegFn db
|
||||
db.putVtxFn = putVtxFn db
|
||||
db.putKeyFn = putKeyFn db
|
||||
db.putFilFn = putFilFn db
|
||||
db.putIdgFn = putIdgFn db
|
||||
db.putFasFn = putFasFn db
|
||||
db.putEndFn = putEndFn db
|
||||
|
||||
db.closeFn = closeFn db
|
||||
@ -269,7 +338,7 @@ iterator walkVtx*(
|
||||
if rc.isOk:
|
||||
yield (n, VertexID(xid), rc.value)
|
||||
|
||||
iterator walkkey*(
|
||||
iterator walkKey*(
|
||||
be: RdbBackendRef;
|
||||
): tuple[n: int, vid: VertexID, key: HashKey] =
|
||||
## Variant of `walk()` iteration over the Markle hash sub-table.
|
||||
@ -278,6 +347,15 @@ iterator walkkey*(
|
||||
if hashKey.init data:
|
||||
yield (n, VertexID(xid), hashKey)
|
||||
|
||||
iterator walkFil*(
|
||||
be: RdbBackendRef;
|
||||
): tuple[n: int, fid: FilterID, filter: FilterRef] =
|
||||
## Variant of `walk()` iteration over the filter sub-table.
|
||||
for (n, xid, data) in be.rdb.walk FilPfx:
|
||||
let rc = data.deblobify FilterRef
|
||||
if rc.isOk:
|
||||
yield (n, FilterID(xid), rc.value)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -53,10 +53,13 @@ proc toRdbKey*(id: uint64; pfx: StorageType): RdbKey =
|
||||
copyMem(addr result[1], unsafeAddr idKey, sizeof idKey)
|
||||
|
||||
template toOpenArray*(vid: VertexID; pfx: StorageType): openArray[byte] =
|
||||
vid.uint64.toRdbKey(pfx).toOpenArray(0, sizeof VertexID)
|
||||
vid.uint64.toRdbKey(pfx).toOpenArray(0, sizeof uint64)
|
||||
|
||||
template toOpenArray*(fid: FilterID): openArray[byte] =
|
||||
fid.uint64.toRdbKey(FilPfx).toOpenArray(0, sizeof uint64)
|
||||
|
||||
template toOpenArray*(aid: AdminTabID): openArray[byte] =
|
||||
aid.uint64.toRdbKey(AdmPfx).toOpenArray(0, sizeof AdminTabID)
|
||||
aid.uint64.toRdbKey(AdmPfx).toOpenArray(0, sizeof uint64)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
@ -17,31 +17,35 @@ import
|
||||
std/sequtils,
|
||||
eth/common,
|
||||
rocksdb,
|
||||
../../aristo_desc,
|
||||
../aristo_init_common,
|
||||
./rdb_desc
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions
|
||||
# Private helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
func keyPfx(kData: cstring, kLen: csize_t): int =
|
||||
if not kData.isNil and kLen == 1 + sizeof(VertexID):
|
||||
if not kData.isNil and kLen == 1 + sizeof(uint64):
|
||||
kData.toOpenArrayByte(0,0)[0].int
|
||||
else:
|
||||
-1
|
||||
|
||||
func keyXid(kData: cstring, kLen: csize_t): uint64 =
|
||||
if not kData.isNil and kLen == 1 + sizeof(VertexID):
|
||||
if not kData.isNil and kLen == 1 + sizeof(uint64):
|
||||
return uint64.fromBytesBE kData.toOpenArrayByte(1,int(kLen)-1).toSeq
|
||||
|
||||
func to(xid: uint64; T: type Blob): T =
|
||||
xid.toBytesBE.toSeq
|
||||
|
||||
func valBlob(vData: cstring, vLen: csize_t): Blob =
|
||||
if not vData.isNil and 0 < vLen:
|
||||
return vData.toOpenArrayByte(0,int(vLen)-1).toSeq
|
||||
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private debugging helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc pp(kd: cstring, kl: csize_t): string =
|
||||
if kd.isNil: "n/a" else: $kd.keyXid(kl)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public iterators
|
||||
# ------------------------------------------------------------------------------
|
||||
@ -82,80 +86,60 @@ iterator walk*(
|
||||
count.inc
|
||||
# End while
|
||||
|
||||
|
||||
iterator walk*(
|
||||
rdb: RdbInst;
|
||||
pfx: StorageType;
|
||||
): tuple[n: int, xid: uint64, data: Blob] =
|
||||
## Walk over key-value pairs of the table referted to by the argument `pfx`.
|
||||
## Walk over key-value pairs of the table referted to by the argument `pfx`
|
||||
## whic must be different from `Oops` and `AdmPfx`.
|
||||
##
|
||||
## Non-decodable entries are stepped over while the counter `n` of the
|
||||
## yield record is still incremented.
|
||||
let rit = rdb.store.db.rocksdb_create_iterator(rdb.store.readOptions)
|
||||
defer: rit.rocksdb_iter_destroy()
|
||||
|
||||
rit.rocksdb_iter_seek_to_first()
|
||||
var
|
||||
count = 0
|
||||
kLen: csize_t
|
||||
kData: cstring
|
||||
|
||||
##
|
||||
block walkBody:
|
||||
# Skip over admin records (if any) and advance to the key sub-table
|
||||
if pfx in {Oops, AdmPfx}:
|
||||
# Unsupported
|
||||
break walkBody
|
||||
|
||||
let rit = rdb.store.db.rocksdb_create_iterator(rdb.store.readOptions)
|
||||
defer: rit.rocksdb_iter_destroy()
|
||||
|
||||
var
|
||||
count = 0
|
||||
kLen: csize_t
|
||||
kData: cstring
|
||||
|
||||
# Seek for `VertexID(1)` and subsequent entries if that fails. There should
|
||||
# always be a `VertexID(1)` entry unless the sub-table is empty. There is
|
||||
# no such control for the filter table in which case there is a blind guess
|
||||
# (in case `rocksdb_iter_seek()` does not search `ge` for some reason.)
|
||||
let keyOne = 1u64.toRdbKey pfx
|
||||
|
||||
# It is not clear what happens when the `key` does not exist. The guess
|
||||
# is that the interation will proceed at the next key position.
|
||||
#
|
||||
# Comment from GO port at
|
||||
# //github.com/DanielMorsing/rocksdb/blob/master/iterator.go:
|
||||
#
|
||||
# Seek moves the iterator the position of the key given or, if the key
|
||||
# doesn't exist, the next key that does exist in the database. If the key
|
||||
# doesn't exist, and there is no next key, the Iterator becomes invalid.
|
||||
#
|
||||
kData = cast[cstring](unsafeAddr keyOne[0])
|
||||
kLen = sizeof(keyOne).csize_t
|
||||
rit.rocksdb_iter_seek(kData, kLen)
|
||||
if rit.rocksdb_iter_valid() == 0:
|
||||
break walkBody
|
||||
kData = rit.rocksdb_iter_key(addr kLen)
|
||||
|
||||
case pfx:
|
||||
of Oops, AdmPfx:
|
||||
discard
|
||||
of VtxPfx, KeyPfx:
|
||||
# Skip over admin records until vertex sub-table reached
|
||||
while kData.keyPfx(kLen) < VtxPfx.ord:
|
||||
|
||||
# Update Iterator and fetch next item
|
||||
rit.rocksdb_iter_next()
|
||||
if rit.rocksdb_iter_valid() == 0:
|
||||
break walkBody
|
||||
kData = rit.rocksdb_iter_key(addr kLen)
|
||||
# End while
|
||||
|
||||
case pfx:
|
||||
of Oops, AdmPfx, VtxPfx:
|
||||
discard
|
||||
of KeyPfx:
|
||||
# Reposition search head to key sub-table
|
||||
while kData.keyPfx(kLen) < KeyPfx.ord:
|
||||
|
||||
# Move search head to the first Merkle hash entry by seeking the same
|
||||
# vertex ID on the key table. This might skip over stale keys smaller
|
||||
# than the current one.
|
||||
let key = @[KeyPfx.ord.byte] & kData.keyXid(kLen).to(Blob)
|
||||
rit.rocksdb_iter_seek(cast[cstring](unsafeAddr key[0]), csize_t(kLen))
|
||||
|
||||
# It is not clear what happens when the `key` does not exist. The guess
|
||||
# is that nothing would happen and the interation will proceed at the
|
||||
# next vertex position.
|
||||
kData = rit.rocksdb_iter_key(addr kLen)
|
||||
if KeyPfx.ord <= kData.keyPfx(kLen):
|
||||
# OK, reached Merkle hash table
|
||||
break
|
||||
|
||||
# Update Iterator
|
||||
rit.rocksdb_iter_next()
|
||||
if rit.rocksdb_iter_valid() == 0:
|
||||
break walkBody
|
||||
kData = rit.rocksdb_iter_key(addr kLen)
|
||||
# End while
|
||||
|
||||
# Fetch sub-table data
|
||||
while true:
|
||||
let kPfx = kData.keyPfx(kLen)
|
||||
if pfx.ord < kPfx:
|
||||
kData = rit.rocksdb_iter_key(addr kLen)
|
||||
if pfx.ord != kData.keyPfx kLen:
|
||||
break walkBody # done
|
||||
|
||||
let xid = kData.keyXid(kLen)
|
||||
if 0 < xid or pfx == AdmPfx:
|
||||
|
||||
if 0 < xid:
|
||||
# Fetch value data
|
||||
var vLen: csize_t
|
||||
let vData = rit.rocksdb_iter_value(addr vLen)
|
||||
@ -168,7 +152,7 @@ iterator walk*(
|
||||
rit.rocksdb_iter_next()
|
||||
if rit.rocksdb_iter_valid() == 0:
|
||||
break walkBody
|
||||
kData = rit.rocksdb_iter_key(addr kLen)
|
||||
|
||||
count.inc
|
||||
# End while
|
||||
|
||||
|
18
nimbus/db/aristo/aristo_persistent.nim
Normal file
18
nimbus/db/aristo/aristo_persistent.nim
Normal file
@ -0,0 +1,18 @@
|
||||
# nimbus-eth1
|
||||
# Copyright (c) 2021 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
||||
# http://opensource.org/licenses/MIT)
|
||||
# at your option. This file may not be copied, modified, or distributed
|
||||
# except according to those terms.
|
||||
|
||||
import
|
||||
aristo_init/persistent as init_persistent,
|
||||
aristo_walk/persistent as walk_persistent
|
||||
export
|
||||
init_persistent,
|
||||
walk_persistent
|
||||
|
||||
# End
|
@ -86,6 +86,18 @@ iterator walkKeyBeImpl*[T](
|
||||
yield (n,vid,key)
|
||||
n.inc
|
||||
|
||||
|
||||
iterator walkFilBeImpl*[T](
|
||||
be: T; # Backend descriptor
|
||||
db: AristoDbRef; # Database with optional backend filter
|
||||
): tuple[n: int, fid: FilterID, filter: FilterRef] =
|
||||
## Generic filter iterator
|
||||
when be isnot VoidBackendRef:
|
||||
mixin walkFil
|
||||
|
||||
for (n,fid,filter) in be.walkFil:
|
||||
yield (n,fid,filter)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -42,6 +42,14 @@ iterator walkKeyBe*[T: MemBackendRef|VoidBackendRef](
|
||||
for (n,vid,key) in db.to(T).walkKeyBeImpl db:
|
||||
yield (n,vid,key)
|
||||
|
||||
iterator walkFilBe*[T: MemBackendRef|VoidBackendRef](
|
||||
_: type T;
|
||||
db: AristoDbRef;
|
||||
): tuple[n: int, fid: FilterID, filter: FilterRef] =
|
||||
## Similar to `walkVtxBe()` but for filters.
|
||||
for (n,fid,filter) in db.to(T).walkFilBeImpl db:
|
||||
yield (n,fid,filter)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -47,6 +47,14 @@ iterator walkKeyBe*(
|
||||
for (n,vid,key) in db.to(T).walkKeyBeImpl db:
|
||||
yield (n,vid,key)
|
||||
|
||||
iterator walkFilBe*(
|
||||
T: type RdbBackendRef;
|
||||
db: AristoDbRef;
|
||||
): tuple[n: int, fid: FilterID, filter: FilterRef] =
|
||||
## Similar to `walkVtxBe()` but for filters.
|
||||
for (n,fid,filter) in db.to(T).walkFilBeImpl db:
|
||||
yield (n,fid,filter)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -11,21 +11,58 @@
|
||||
## Aristo (aka Patricia) DB records merge test
|
||||
|
||||
import
|
||||
std/[algorithm, sequtils, sets, tables],
|
||||
std/[algorithm, hashes, sequtils, sets, strutils, tables],
|
||||
eth/common,
|
||||
stew/results,
|
||||
results,
|
||||
unittest2,
|
||||
../../nimbus/sync/protocol,
|
||||
../../nimbus/db/aristo,
|
||||
../../nimbus/db/aristo/[aristo_desc, aristo_debug, aristo_hashify],
|
||||
../../nimbus/db/aristo/aristo_init/[
|
||||
aristo_memory, aristo_rocksdb, persistent],
|
||||
../../nimbus/db/aristo/[
|
||||
aristo_debug,
|
||||
aristo_desc,
|
||||
aristo_desc/aristo_types_backend,
|
||||
aristo_hashify,
|
||||
aristo_init/aristo_memory,
|
||||
aristo_init/aristo_rocksdb,
|
||||
aristo_persistent,
|
||||
aristo_transcode,
|
||||
aristo_vid],
|
||||
./test_helpers
|
||||
|
||||
const
|
||||
BlindHash = EmptyBlob.hash
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
func hash(filter: FilterRef): Hash =
|
||||
## Unique hash/filter -- cannot use de/blobify as the expressions
|
||||
## `filter.blobify` and `filter.blobify.value.deblobify.value.blobify` are
|
||||
## not necessarily the same binaries due to unsorted tables.
|
||||
##
|
||||
var h = BlindHash
|
||||
if not filter.isNil:
|
||||
h = h !& filter.src.ByteArray32.hash
|
||||
h = h !& filter.trg.ByteArray32.hash
|
||||
|
||||
for w in filter.vGen.vidReorg:
|
||||
h = h !& w.uint64.hash
|
||||
|
||||
for w in filter.sTab.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.VertexID):
|
||||
let data = filter.sTab.getOrVoid(w).blobify.get(otherwise = EmptyBlob)
|
||||
h = h !& (w.uint64.toBytesBE.toSeq & data).hash
|
||||
|
||||
for w in filter.kMap.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.VertexID):
|
||||
let data = filter.kMap.getOrVoid(w).ByteArray32.toSeq
|
||||
h = h !& (w.uint64.toBytesBE.toSeq & data).hash
|
||||
|
||||
!$h
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc mergeData(
|
||||
db: AristoDbRef;
|
||||
rootKey: HashKey;
|
||||
@ -100,6 +137,80 @@ proc verify(
|
||||
|
||||
true
|
||||
|
||||
# -----------
|
||||
|
||||
proc collectFilter(
|
||||
db: AristoDbRef;
|
||||
filter: FilterRef;
|
||||
tab: var Table[FilterID,Hash];
|
||||
noisy: bool;
|
||||
): bool =
|
||||
## Store filter on permanent BE and register digest
|
||||
if not filter.isNil:
|
||||
let
|
||||
fid = FilterID(7 * (tab.len + 1)) # just some number
|
||||
be = db.backend
|
||||
tx = be.putBegFn()
|
||||
|
||||
be.putFilFn(tx, @[(fid,filter)])
|
||||
let endOk = be.putEndFn tx
|
||||
if endOk != AristoError(0):
|
||||
check endOk == AristoError(0)
|
||||
return
|
||||
|
||||
tab[fid] = filter.hash
|
||||
|
||||
true
|
||||
|
||||
proc verifyFiltersImpl[T: MemBackendRef|RdbBackendRef](
|
||||
_: type T;
|
||||
db: AristoDbRef;
|
||||
tab: Table[FilterID,Hash];
|
||||
noisy: bool;
|
||||
): bool =
|
||||
## Compare stored filters against registered ones
|
||||
var n = 0
|
||||
for (_,fid,filter) in T.walkFilBe db:
|
||||
let
|
||||
filterHash = filter.hash
|
||||
registered = tab.getOrDefault(fid, BlindHash)
|
||||
if registered == BlindHash:
|
||||
check (fid,registered) != (0,BlindHash)
|
||||
return
|
||||
if filterHash != registered:
|
||||
noisy.say "***", "verifyFiltersImpl",
|
||||
" n=", n+1,
|
||||
" fid=", fid.pp,
|
||||
" filterHash=", filterHash.int.toHex,
|
||||
" registered=", registered.int.toHex
|
||||
check (fid,filterHash) == (fid,registered)
|
||||
return
|
||||
n.inc
|
||||
|
||||
if n != tab.len:
|
||||
check n == tab.len
|
||||
return
|
||||
|
||||
true
|
||||
|
||||
proc verifyFilters(
|
||||
db: AristoDbRef;
|
||||
tab: Table[FilterID,Hash];
|
||||
noisy: bool;
|
||||
): bool =
|
||||
## Wrapper
|
||||
let
|
||||
be = db.to(TypedBackendRef)
|
||||
kind = (if be.isNil: BackendVoid else: be.kind)
|
||||
case kind:
|
||||
of BackendMemory:
|
||||
return MemBackendRef.verifyFiltersImpl(db, tab, noisy)
|
||||
of BackendRocksDB:
|
||||
return RdbBackendRef.verifyFiltersImpl(db, tab, noisy)
|
||||
else:
|
||||
discard
|
||||
check kind == BackendMemory or kind == BackendRocksDB
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public test function
|
||||
# ------------------------------------------------------------------------------
|
||||
@ -113,6 +224,7 @@ proc test_backendConsistency*(
|
||||
): bool =
|
||||
## Import accounts
|
||||
var
|
||||
filTab: Table[FilterID,Hash] # Filter register
|
||||
ndb = AristoDbRef() # Reference cache
|
||||
mdb = AristoDbRef() # Memory backend database
|
||||
rdb = AristoDbRef() # Rocks DB backend database
|
||||
@ -129,6 +241,12 @@ proc test_backendConsistency*(
|
||||
ndb = newAristoDbRef BackendVoid
|
||||
mdb = newAristoDbRef BackendMemory
|
||||
if doRdbOk:
|
||||
if not rdb.backend.isNil: # ignore bootstrap
|
||||
let verifyFiltersOk = rdb.verifyFilters(filTab, noisy)
|
||||
if not verifyFiltersOk:
|
||||
check verifyFiltersOk
|
||||
return
|
||||
filTab.clear
|
||||
rdb.finish(flush=true)
|
||||
let rc = newAristoDbRef(BackendRocksDB,rdbPath)
|
||||
if rc.isErr:
|
||||
@ -188,11 +306,23 @@ proc test_backendConsistency*(
|
||||
mdbPreSaveCache, mdbPreSaveBackend: string
|
||||
rdbPreSaveCache, rdbPreSaveBackend: string
|
||||
when true: # and false:
|
||||
mdbPreSaveCache = mdb.pp
|
||||
mdbPreSaveBackend = mdb.to(MemBackendRef).pp(ndb)
|
||||
#mdbPreSaveCache = mdb.pp
|
||||
#mdbPreSaveBackend = mdb.to(MemBackendRef).pp(ndb)
|
||||
rdbPreSaveCache = rdb.pp
|
||||
rdbPreSaveBackend = rdb.to(RdbBackendRef).pp(ndb)
|
||||
|
||||
|
||||
# Provide filter, store filter on permanent BE, and register filter digest
|
||||
block:
|
||||
let rc = mdb.stow(persistent=false, dontHashify=true, chunkedMpt=true)
|
||||
if rc.isErr:
|
||||
check rc.error == (0,0)
|
||||
return
|
||||
let collectFilterOk = rdb.collectFilter(mdb.roFilter, filTab, noisy)
|
||||
if not collectFilterOk:
|
||||
check collectFilterOk
|
||||
return
|
||||
|
||||
# Store onto backend database
|
||||
block:
|
||||
#noisy.say "***", "db-dump\n ", mdb.pp
|
||||
@ -244,6 +374,13 @@ proc test_backendConsistency*(
|
||||
when true and false:
|
||||
noisy.say "***", "beCon(9) <", n, "/", list.len-1, ">", " groups=", count
|
||||
|
||||
# Finally ...
|
||||
block:
|
||||
let verifyFiltersOk = rdb.verifyFilters(filTab, noisy)
|
||||
if not verifyFiltersOk:
|
||||
check verifyFiltersOk
|
||||
return
|
||||
|
||||
true
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -10,7 +10,7 @@
|
||||
# distributed except according to those terms.
|
||||
|
||||
import
|
||||
std/[os, sequtils],
|
||||
std/[hashes, os, sequtils],
|
||||
eth/common,
|
||||
rocksdb,
|
||||
../../nimbus/db/aristo/[
|
||||
@ -109,6 +109,9 @@ proc `==`*(a: (int,AristoError), b: (int,int)): bool =
|
||||
proc `==`*(a: (int,VertexID,AristoError), b: (int,int,int)): bool =
|
||||
(a[0], a[1].int, a[2].int) == b
|
||||
|
||||
proc `==`*(a: (FilterID,Hash), b: (int,Hash)): bool =
|
||||
(a[0].int,a[1]) == b
|
||||
|
||||
proc to*(sample: AccountsSample; T: type seq[UndumpAccounts]): T =
|
||||
## Convert test data into usable in-memory format
|
||||
let file = sample.file.findFilePath.value
|
||||
|
Loading…
x
Reference in New Issue
Block a user