mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-01-11 21:04:11 +00:00
Aristo db fixes n updates needed for filter fifo (#1728)
* Set scheduler state as part of the backend descriptor details: Moved type definitions `QidLayoutRef` and `QidSchedRef` to `desc_structural.nim` so that it shares the same folder as `desc_backend.nim` * Automatic filter queue table initialisation in backend details: Scheduler can be tweaked or completely disabled * Updated backend unit tests details: + some code clean up/beautification, reads better now + disabled persistent filters so that there is no automated filter management which will be implemented next * Prettify/update unit tests source code details: Mostly replacing the `check()` paradigm by `xCheck()` * Somewhat simplified backend type management why: Backend objects are labelled with a `BackendType` symbol where the `BackendVoid` label is implicitly assumed for a `nil` backend object reference. To make it easier, a `kind()` function is used now applicable to `nil` references as well. * Fix DB storage layout for filter objects why: Need to store the filter ID with the object * Implement reverse [] index on fifo why: An integer index argument on `[]` retrieves the QueueID (label) of the fifo item while a QueueID argument on `[]` retrieves the index (so it is inverse to the former variant). * Provide iterator over filters as fifo why: This iterator goes along the cascased fifo structure (i.e. in historical order)
This commit is contained in:
parent
69a1a988b0
commit
3936d4d0ad
@ -233,7 +233,7 @@ stored in the right byte of the serialised bitmap.
|
||||
### 4.2 Extension record serialisation
|
||||
|
||||
0 +--+--+--+--+--+--+--+--+--+
|
||||
| | -- vertexID
|
||||
| | -- vertex ID
|
||||
8 +--+--+--+--+--+--+--+--+--+
|
||||
| | ... -- path segment
|
||||
+--+
|
||||
@ -338,17 +338,19 @@ assumed, i.e. the list with the single vertex ID *1*.
|
||||
|
||||
### 4.8 Backend filter record serialisation
|
||||
|
||||
0 +--+--+--+--+--+ .. --+--+ .. --+
|
||||
0 +--+--+--+--+--+ .. --+
|
||||
| | -- filter ID
|
||||
8 +--+--+--+--+--+ .. --+--+ .. --+
|
||||
| | -- 32 bytes filter source hash
|
||||
32 +--+--+--+--+--+ .. --+--+ .. --+
|
||||
40 +--+--+--+--+--+ .. --+--+ .. --+
|
||||
| | -- 32 bytes filter target hash
|
||||
64 +--+--+--+--+--+ .. --+--+ .. --+
|
||||
72 +--+--+--+--+--+ .. --+--+ .. --+
|
||||
| | -- number of unused vertex IDs
|
||||
68 +--+--+--+--+
|
||||
76 +--+--+--+--+
|
||||
| | -- number of structural triplets
|
||||
72 +--+--+--+--+--+ .. --+
|
||||
| | -- first unused vertex ID
|
||||
80 +--+--+--+--+--+ .. --+
|
||||
| | -- first unused vertex ID
|
||||
88 +--+--+--+--+--+ .. --+
|
||||
... -- more unused vertex ID
|
||||
N1 +--+--+--+--+
|
||||
|| | -- flg(3) + vtxLen(29), 1st triplet
|
||||
|
@ -76,15 +76,13 @@ proc checkBE*(
|
||||
## Moreover, the union of both sets is equivalent to the set of positive
|
||||
## `uint64` numbers.
|
||||
##
|
||||
if not db.backend.isNil:
|
||||
let be = db.to(TypedBackendRef)
|
||||
case be.kind:
|
||||
of BackendMemory:
|
||||
return MemBackendRef.checkBE(db, cache=cache, relax=relax)
|
||||
of BackendRocksDB:
|
||||
return RdbBackendRef.checkBE(db, cache=cache, relax=relax)
|
||||
of BackendVoid:
|
||||
return VoidBackendRef.checkBE(db, cache=cache, relax=relax)
|
||||
case db.backend.kind:
|
||||
of BackendMemory:
|
||||
return MemBackendRef.checkBE(db, cache=cache, relax=relax)
|
||||
of BackendRocksDB:
|
||||
return RdbBackendRef.checkBE(db, cache=cache, relax=relax)
|
||||
of BackendVoid:
|
||||
return VoidBackendRef.checkBE(db, cache=cache, relax=relax)
|
||||
ok()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -16,14 +16,12 @@ import
|
||||
results,
|
||||
stew/byteutils,
|
||||
"."/[aristo_constants, aristo_desc, aristo_hike, aristo_init],
|
||||
./aristo_desc/desc_backend,
|
||||
./aristo_init/[memory_db, rocks_db],
|
||||
./aristo_filter/filter_desc
|
||||
|
||||
export
|
||||
TypedBackendRef, aristo_init.to
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Ptivate functions
|
||||
# Private functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc sortedKeys(lTab: Table[LeafTie,VertexID]): seq[LeafTie] =
|
||||
@ -357,6 +355,7 @@ proc ppFilter(fl: FilterRef; db: AristoDbRef; indent: int): string =
|
||||
if fl.isNil:
|
||||
result &= " n/a"
|
||||
return
|
||||
result &= pfx & "fid=" & fl.fid.ppFid
|
||||
result &= pfx & "trg(" & fl.trg.ppKey & ")"
|
||||
result &= pfx & "src(" & fl.src.ppKey & ")"
|
||||
result &= pfx & "vGen" & pfx1 & "[" &
|
||||
@ -600,13 +599,6 @@ proc pp*(kMap: Table[VertexID,Hashlabel]; db: AristoDbRef; indent = 4): string =
|
||||
proc pp*(pAmk: Table[Hashlabel,VertexID]; db: AristoDbRef; indent = 4): string =
|
||||
db.ppXMap(db.top.kMap, pAmk, indent)
|
||||
|
||||
proc pp*(
|
||||
be: MemBackendRef|RdbBackendRef;
|
||||
db: AristoDbRef;
|
||||
indent = 4;
|
||||
): string =
|
||||
be.ppBe(db, indent)
|
||||
|
||||
# ---------------------
|
||||
|
||||
proc pp*(
|
||||
@ -668,12 +660,11 @@ proc pp*(
|
||||
filter.ppFilter(db, indent)
|
||||
|
||||
proc pp*(
|
||||
be: TypedBackendRef;
|
||||
be: BackendRef;
|
||||
db: AristoDbRef;
|
||||
indent = 4;
|
||||
): string =
|
||||
## May be called as `db.to(TypedBackendRef).pp(db)`
|
||||
case (if be.isNil: BackendVoid else: be.kind)
|
||||
case be.kind:
|
||||
of BackendMemory:
|
||||
be.MemBackendRef.ppBe(db, indent)
|
||||
|
||||
|
@ -102,6 +102,8 @@ type
|
||||
|
||||
BackendRef* = ref object of RootRef
|
||||
## Backend interface.
|
||||
filters*: QidSchedRef ## Filter slot queue state
|
||||
|
||||
getVtxFn*: GetVtxFn ## Read vertex record
|
||||
getKeyFn*: GetKeyFn ## Read Merkle hash/key
|
||||
getFilFn*: GetFilFn ## Read back log filter
|
||||
|
@ -24,6 +24,8 @@ type
|
||||
RlpOtherException
|
||||
|
||||
# Data record transcoders, `deblobify()` and `blobify()`
|
||||
BlobifyNilFilter
|
||||
BlobifyNilVertex
|
||||
BlobifyBranchMissingRefs
|
||||
BlobifyExtMissingRefs
|
||||
BlobifyExtPathOverflow
|
||||
@ -181,6 +183,7 @@ type
|
||||
FilPrettyPointlessLayer
|
||||
FilDudeFilterUpdateError
|
||||
FilNotReadOnlyDude
|
||||
FilQuSchedDisabled
|
||||
|
||||
# Get functions form `aristo_get.nim`
|
||||
GetLeafNotFound
|
||||
|
@ -27,8 +27,8 @@ type
|
||||
## Identifier used to tag filter logs stored on the backend.
|
||||
|
||||
FilterID* = distinct uint64
|
||||
## Identifier used to identify a particular filter. It is generatied with the
|
||||
## filter.
|
||||
## Identifier used to identify a particular filter. It is generatied with
|
||||
## the filter when stored to database.
|
||||
|
||||
VertexID* = distinct uint64
|
||||
## Unique identifier for a vertex of the `Aristo Trie`. The vertex is the
|
||||
|
@ -82,6 +82,7 @@ type
|
||||
|
||||
FilterRef* = ref object
|
||||
## Delta layer with expanded sequences for quick access
|
||||
fid*: FilterID ## Filter identifier
|
||||
src*: HashKey ## Applicable to this state root
|
||||
trg*: HashKey ## Resulting state root (i.e. `kMap[1]`)
|
||||
sTab*: Table[VertexID,VertexRef] ## Filter structural vertex table
|
||||
@ -100,6 +101,40 @@ type
|
||||
txUid*: uint ## Transaction identifier if positive
|
||||
dirty*: bool ## Needs to be hashified if `true`
|
||||
|
||||
# ----------------------
|
||||
|
||||
QidLayoutRef* = ref object
|
||||
## Layout of cascaded list of filter ID slot queues where a slot queue
|
||||
## with index `N+1` serves as an overflow queue of slot queue `N`.
|
||||
q*: array[4,QidSpec]
|
||||
|
||||
QidSpec* = tuple
|
||||
## Layout of a filter ID slot queue
|
||||
size: uint ## Capacity of queue, length within `1..wrap`
|
||||
width: uint ## Instance gaps (relative to prev. item)
|
||||
wrap: QueueID ## Range `1..wrap` for round-robin queue
|
||||
|
||||
QidSchedRef* = ref object of RootRef
|
||||
## Current state of the filter queues
|
||||
ctx*: QidLayoutRef ## Organisation of the FIFO
|
||||
state*: seq[(QueueID,QueueID)] ## Current fill state
|
||||
|
||||
const
|
||||
DefaultQidWrap = QueueID(0x3fff_ffff_ffff_ffffu64)
|
||||
|
||||
QidSpecSizeMax* = high(uint32).uint
|
||||
## Maximum value allowed for a `size` value of a `QidSpec` object
|
||||
|
||||
QidSpecWidthMax* = high(uint32).uint
|
||||
## Maximum value allowed for a `width` value of a `QidSpec` object
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
func max(a, b, c: int): int =
|
||||
max(max(a,b),c)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public helpers: `NodeRef` and `PayloadRef`
|
||||
# ------------------------------------------------------------------------------
|
||||
@ -242,20 +277,55 @@ proc dup*(layer: LayerRef): LayerRef =
|
||||
for (k,v) in layer.sTab.pairs:
|
||||
result.sTab[k] = v.dup
|
||||
|
||||
proc dup*(filter: FilterRef): FilterRef =
|
||||
## Duplicate layer.
|
||||
result = FilterRef(
|
||||
src: filter.src,
|
||||
kMap: filter.kMap,
|
||||
vGen: filter.vGen,
|
||||
trg: filter.trg)
|
||||
for (k,v) in filter.sTab.pairs:
|
||||
result.sTab[k] = v.dup
|
||||
# ---------------
|
||||
|
||||
proc to*(node: NodeRef; T: type VertexRef): T =
|
||||
func to*(node: NodeRef; T: type VertexRef): T =
|
||||
## Extract a copy of the `VertexRef` part from a `NodeRef`.
|
||||
node.VertexRef.dup
|
||||
|
||||
func to*(a: array[4,tuple[size, width: int]]; T: type QidLayoutRef): T =
|
||||
## Convert a size-width array to a `QidLayoutRef` layout. Over large
|
||||
## array field values are adjusted to its maximal size.
|
||||
var q: array[4,QidSpec]
|
||||
for n in 0..3:
|
||||
q[n] = (min(a[n].size.uint, QidSpecSizeMax),
|
||||
min(a[n].width.uint, QidSpecWidthMax),
|
||||
DefaultQidWrap)
|
||||
q[0].width = 0
|
||||
T(q: q)
|
||||
|
||||
func to*(a: array[4,tuple[size, width, wrap: int]]; T: type QidLayoutRef): T =
|
||||
## Convert a size-width-wrap array to a `QidLayoutRef` layout. Over large
|
||||
## array field values are adjusted to its maximal size. Too small `wrap`
|
||||
## values are adjusted to its minimal size.
|
||||
var q: array[4,QidSpec]
|
||||
for n in 0..2:
|
||||
q[n] = (min(a[n].size.uint, QidSpecSizeMax),
|
||||
min(a[n].width.uint, QidSpecWidthMax),
|
||||
QueueID(max(a[n].size + a[n+1].width, a[n].width+1, a[n].wrap)))
|
||||
q[0].width = 0
|
||||
q[3] = (min(a[3].size.uint, QidSpecSizeMax),
|
||||
min(a[3].width.uint, QidSpecWidthMax),
|
||||
QueueID(max(a[3].size, a[3].width, a[3].wrap)))
|
||||
T(q: q)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public constructors for filter slot scheduler state
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
func init*(T: type QidSchedRef; a: array[4,(int,int)]): T =
|
||||
## Constructor, see comments at the coverter function `to()` for adjustments
|
||||
## of the layout argument `a`.
|
||||
T(ctx: a.to(QidLayoutRef))
|
||||
|
||||
func init*(T: type QidSchedRef; a: array[4,(int,int,int)]): T =
|
||||
## Constructor, see comments at the coverter function `to()` for adjustments
|
||||
## of the layout argument `a`.
|
||||
T(ctx: a.to(QidLayoutRef))
|
||||
|
||||
func init*(T: type QidSchedRef; ctx: QidLayoutRef): T =
|
||||
T(ctx: ctx)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -34,85 +34,6 @@ type
|
||||
DequQid ## Store merged local queue items
|
||||
DelQid ## Delete entry from last overflow queue
|
||||
|
||||
QidLayoutRef* = ref object
|
||||
## Layout of cascaded list of filter ID slot queues where a slot queue
|
||||
## with index `N+1` serves as an overflow queue of slot queue `N`.
|
||||
q*: array[4,QidSpec]
|
||||
|
||||
QidSpec* = tuple
|
||||
## Layout of a filter ID slot queue
|
||||
size: uint ## Capacity of queue, length within `1..wrap`
|
||||
width: uint ## Instance gaps (relative to prev. item)
|
||||
wrap: QueueID ## Range `1..wrap` for round-robin queue
|
||||
|
||||
QidSchedRef* = ref object of RootRef
|
||||
## Current state of the filter queues
|
||||
ctx*: QidLayoutRef ## Organisation of the FIFO
|
||||
state*: seq[(QueueID,QueueID)] ## Current fill state
|
||||
|
||||
const
|
||||
DefaultQidWrap = QueueID(0x3fff_ffff_ffff_ffffu64)
|
||||
|
||||
QidSpecSizeMax* = high(uint32).uint
|
||||
## Maximum value allowed for a `size` value of a `QidSpec` object
|
||||
|
||||
QidSpecWidthMax* = high(uint32).uint
|
||||
## Maximum value allowed for a `width` value of a `QidSpec` object
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
func max(a, b, c: int): int =
|
||||
max(max(a,b),c)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
func to*(a: array[4,tuple[size, width: int]]; T: type QidLayoutRef): T =
|
||||
## Convert a size-width array to a `QidLayoutRef` layout. Over large
|
||||
## array field values are adjusted to its maximal size.
|
||||
var q: array[4,QidSpec]
|
||||
for n in 0..3:
|
||||
q[n] = (min(a[n].size.uint, QidSpecSizeMax),
|
||||
min(a[n].width.uint, QidSpecWidthMax),
|
||||
DefaultQidWrap)
|
||||
q[0].width = 0
|
||||
T(q: q)
|
||||
|
||||
func to*(a: array[4,tuple[size, width, wrap: int]]; T: type QidLayoutRef): T =
|
||||
## Convert a size-width-wrap array to a `QidLayoutRef` layout. Over large
|
||||
## array field values are adjusted to its maximal size. Too small `wrap`
|
||||
## values are adjusted to its minimal size.
|
||||
var q: array[4,QidSpec]
|
||||
for n in 0..2:
|
||||
q[n] = (min(a[n].size.uint, QidSpecSizeMax),
|
||||
min(a[n].width.uint, QidSpecWidthMax),
|
||||
QueueID(max(a[n].size + a[n+1].width, a[n].width+1, a[n].wrap)))
|
||||
q[0].width = 0
|
||||
q[3] = (min(a[3].size.uint, QidSpecSizeMax),
|
||||
min(a[3].width.uint, QidSpecWidthMax),
|
||||
QueueID(max(a[3].size, a[3].width, a[3].wrap)))
|
||||
T(q: q)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public constructors
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
func init*(T: type QidSchedRef; a: array[4,(int,int)]): T =
|
||||
## Constructor, see comments at the coverter function `to()` for adjustments
|
||||
## of the layout argument `a`.
|
||||
T(ctx: a.to(QidLayoutRef))
|
||||
|
||||
func init*(T: type QidSchedRef; a: array[4,(int,int,int)]): T =
|
||||
## Constructor, see comments at the coverter function `to()` for adjustments
|
||||
## of the layout argument `a`.
|
||||
T(ctx: a.to(QidLayoutRef))
|
||||
|
||||
func init*(T: type QidSchedRef; ctx: QidLayoutRef): T =
|
||||
T(ctx: ctx)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -26,6 +26,8 @@ type
|
||||
## This *decreasing* requirement can be seen as a generalisation of a
|
||||
## block chain scenario with `i`, `j` backward steps into the past and
|
||||
## the `FilterID` as the block number.
|
||||
##
|
||||
## In order to flag an error, `FilterID(0)` must be returned.
|
||||
|
||||
const
|
||||
ZeroQidPair = (QueueID(0),QueueID(0))
|
||||
@ -34,9 +36,6 @@ const
|
||||
# Private helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
func `+`*(a: QueueID; b: uint): QueueID = (a.uint64+b.uint64).QueueID
|
||||
func `-`*(a: QueueID; b: uint): QueueID = (a.uint64-b.uint64).QueueID
|
||||
|
||||
func `<`(a: static[uint]; b: QueueID): bool = QueueID(a) < b
|
||||
|
||||
func globalQid(queue: int, qid: QueueID): QueueID =
|
||||
@ -229,7 +228,7 @@ func fifoDel(
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc stats*(
|
||||
func stats*(
|
||||
ctx: openArray[tuple[size, width: int]]; # Schedule layout
|
||||
): tuple[maxQueue: int, minCovered: int, maxCovered: int] =
|
||||
## Number of maximally stored and covered queued entries for the argument
|
||||
@ -245,20 +244,20 @@ proc stats*(
|
||||
result.minCovered += (ctx[n].size * step).int
|
||||
result.maxCovered += (size * step).int
|
||||
|
||||
proc stats*(
|
||||
func stats*(
|
||||
ctx: openArray[tuple[size, width, wrap: int]]; # Schedule layout
|
||||
): tuple[maxQueue: int, minCovered: int, maxCovered: int] =
|
||||
## Variant of `stats()`
|
||||
ctx.toSeq.mapIt((it[0],it[1])).stats
|
||||
|
||||
proc stats*(
|
||||
func stats*(
|
||||
ctx: QidLayoutRef; # Cascaded fifos descriptor
|
||||
): tuple[maxQueue: int, minCovered: int, maxCovered: int] =
|
||||
## Variant of `stats()`
|
||||
ctx.q.toSeq.mapIt((it[0].int,it[1].int)).stats
|
||||
|
||||
|
||||
proc addItem*(
|
||||
func addItem*(
|
||||
fifo: QidSchedRef; # Cascaded fifos descriptor
|
||||
): tuple[exec: seq[QidAction], fifo: QidSchedRef] =
|
||||
## Get the instructions for adding a new slot to the cascades queues. The
|
||||
@ -352,7 +351,7 @@ proc addItem*(
|
||||
(revActions.reversed, QidSchedRef(ctx: fifo.ctx, state: state))
|
||||
|
||||
|
||||
proc fetchItems*(
|
||||
func fetchItems*(
|
||||
fifo: QidSchedRef; # Cascaded fifos descriptor
|
||||
size: int; # Leading items to merge
|
||||
): tuple[exec: seq[QidAction], fifo: QidSchedRef] =
|
||||
@ -472,21 +471,21 @@ proc fetchItems*(
|
||||
(actions, QidSchedRef(ctx: fifo.ctx, state: state))
|
||||
|
||||
|
||||
proc lengths*(
|
||||
func lengths*(
|
||||
fifo: QidSchedRef; # Cascaded fifos descriptor
|
||||
): seq[int] =
|
||||
## Return the list of lengths for all cascaded sub-fifos.
|
||||
for n in 0 ..< fifo.state.len:
|
||||
result.add fifo.state[n].fifoLen(fifo.ctx.q[n].wrap).int
|
||||
|
||||
proc len*(
|
||||
func len*(
|
||||
fifo: QidSchedRef; # Cascaded fifos descriptor
|
||||
): int =
|
||||
## Size of the fifo
|
||||
fifo.lengths.foldl(a + b, 0)
|
||||
|
||||
|
||||
proc `[]`*(
|
||||
func `[]`*(
|
||||
fifo: QidSchedRef; # Cascaded fifos descriptor
|
||||
inx: int; # Index into latest items
|
||||
): QueueID =
|
||||
@ -536,6 +535,54 @@ proc `[]`*(
|
||||
return n.globalQid(wrap - inx)
|
||||
inx -= qInxMax0 + 1 # Otherwise continue
|
||||
|
||||
|
||||
func `[]`*(
|
||||
fifo: QidSchedRef; # Cascaded fifos descriptor
|
||||
qid: QueueID; # Index into latest items
|
||||
): int =
|
||||
## ..
|
||||
if QueueID(0) < qid:
|
||||
let
|
||||
chn = (qid.uint64 shr 62).int
|
||||
qid = (qid.uint64 and 0x3fff_ffff_ffff_ffffu64).QueueID
|
||||
|
||||
if chn < fifo.state.len:
|
||||
var offs = 0
|
||||
for n in 0 ..< chn:
|
||||
offs += fifo.state[n].fifoLen(fifo.ctx.q[n].wrap).int
|
||||
|
||||
let q = fifo.state[chn]
|
||||
if q[0] <= q[1]:
|
||||
# Single file
|
||||
# ::
|
||||
# | :
|
||||
# | q[0]--> 3
|
||||
# | 4
|
||||
# | 5 <--q[1]
|
||||
# | :
|
||||
#
|
||||
if q[0] <= qid and qid <= q[1]:
|
||||
return offs + (q[1] - qid).int
|
||||
else:
|
||||
# Wrap aound, double files
|
||||
# ::
|
||||
# | :
|
||||
# | 3 <--q[1]
|
||||
# | 4
|
||||
# | q[0]--> 5
|
||||
# | :
|
||||
# | wrap
|
||||
#
|
||||
if QueueID(1) <= qid and qid <= q[1]:
|
||||
return offs + (q[1] - qid).int
|
||||
|
||||
if q[0] <= qid:
|
||||
let wrap = fifo.ctx.q[chn].wrap
|
||||
if qid <= wrap:
|
||||
return offs + (q[1] - QueueID(0)).int + (wrap - qid).int
|
||||
-1
|
||||
|
||||
|
||||
proc le*(
|
||||
fifo: QidSchedRef; # Cascaded fifos descriptor
|
||||
fid: FilterID; # Upper bound
|
||||
@ -545,19 +592,27 @@ proc le*(
|
||||
## * `fn(qid) <= fid`
|
||||
## * for all `qid1` with `fn(qid1) <= fid` one has `fn(qid1) <= fn(qid)`
|
||||
##
|
||||
## If `fn()` returns `FilterID(0)`, then this function returns `QueueID(0)`
|
||||
##
|
||||
## The argument type `QuFilMap` of map `fn()` has been commented on earlier.
|
||||
##
|
||||
var
|
||||
left = 0
|
||||
right = fifo.len - 1
|
||||
|
||||
template getFid(qid: QueueID): FilterID =
|
||||
let fid = fn(qid)
|
||||
if not fid.isValid:
|
||||
return QueueID(0)
|
||||
fid
|
||||
|
||||
if 0 <= right:
|
||||
let maxQid = fifo[left]
|
||||
if maxQid.fn <= fid:
|
||||
if maxQid.getFid <= fid:
|
||||
return maxQid
|
||||
|
||||
# Bisection
|
||||
if fifo[right].fn <= fid:
|
||||
if fifo[right].getFid <= fid:
|
||||
while 1 < right - left:
|
||||
let half = (left + right) div 2
|
||||
#
|
||||
@ -567,9 +622,9 @@ proc le*(
|
||||
#
|
||||
# with `fifo[left].fn > fid >= fifo[right].fn`
|
||||
#
|
||||
if fid >= fifo[half].fn:
|
||||
if fid >= fifo[half].getFid:
|
||||
right = half
|
||||
else: # fifo[half].fn > fid
|
||||
else: # fifo[half].getFid > fid
|
||||
left = half
|
||||
|
||||
# Now: `fifo[right].fn <= fid < fifo[left].fn` (and `right == left+1`)
|
||||
|
@ -36,7 +36,7 @@ type
|
||||
## over the tables), this data type is to be used.
|
||||
|
||||
TypedBackendRef* = ref object of BackendRef
|
||||
kind*: BackendType ## Backend type identifier
|
||||
beKind*: BackendType ## Backend type identifier
|
||||
when verifyIxId:
|
||||
txGen: uint ## Transaction ID generator (for debugging)
|
||||
txId: uint ## Active transaction ID (for debugging)
|
||||
|
@ -42,9 +42,10 @@ type
|
||||
## Inheriting table so access can be extended for debugging purposes
|
||||
sTab: Table[VertexID,Blob] ## Structural vertex table making up a trie
|
||||
kMap: Table[VertexID,HashKey] ## Merkle hash key mapping
|
||||
rFil: Table[QueueID,Blob] ## Backend filters
|
||||
rFil: Table[QueueID,Blob] ## Backend filters
|
||||
vGen: Option[seq[VertexID]]
|
||||
vFqs: Option[seq[(QueueID,QueueID)]]
|
||||
noFq: bool ## No filter queues available
|
||||
|
||||
MemPutHdlRef = ref object of TypedPutHdlRef
|
||||
sTab: Table[VertexID,Blob]
|
||||
@ -98,12 +99,17 @@ proc getKeyFn(db: MemBackendRef): GetKeyFn =
|
||||
err(GetKeyNotFound)
|
||||
|
||||
proc getFilFn(db: MemBackendRef): GetFilFn =
|
||||
result =
|
||||
proc(qid: QueueID): Result[FilterRef,AristoError] =
|
||||
let data = db.rFil.getOrDefault(qid, EmptyBlob)
|
||||
if 0 < data.len:
|
||||
return data.deblobify FilterRef
|
||||
err(GetFilNotFound)
|
||||
if db.noFq:
|
||||
result =
|
||||
proc(qid: QueueID): Result[FilterRef,AristoError] =
|
||||
err(FilQuSchedDisabled)
|
||||
else:
|
||||
result =
|
||||
proc(qid: QueueID): Result[FilterRef,AristoError] =
|
||||
let data = db.rFil.getOrDefault(qid, EmptyBlob)
|
||||
if 0 < data.len:
|
||||
return data.deblobify FilterRef
|
||||
err(GetFilNotFound)
|
||||
|
||||
proc getIdgFn(db: MemBackendRef): GetIdgFn =
|
||||
result =
|
||||
@ -113,11 +119,16 @@ proc getIdgFn(db: MemBackendRef): GetIdgFn =
|
||||
err(GetIdgNotFound)
|
||||
|
||||
proc getFqsFn(db: MemBackendRef): GetFqsFn =
|
||||
result =
|
||||
proc(): Result[seq[(QueueID,QueueID)],AristoError]=
|
||||
if db.vFqs.isSome:
|
||||
return ok db.vFqs.unsafeGet
|
||||
err(GetFqsNotFound)
|
||||
if db.noFq:
|
||||
result =
|
||||
proc(): Result[seq[(QueueID,QueueID)],AristoError] =
|
||||
err(FilQuSchedDisabled)
|
||||
else:
|
||||
result =
|
||||
proc(): Result[seq[(QueueID,QueueID)],AristoError] =
|
||||
if db.vFqs.isSome:
|
||||
return ok db.vFqs.unsafeGet
|
||||
err(GetFqsNotFound)
|
||||
|
||||
# -------------
|
||||
|
||||
@ -154,19 +165,32 @@ proc putKeyFn(db: MemBackendRef): PutKeyFn =
|
||||
hdl.kMap[vid] = key
|
||||
|
||||
proc putFilFn(db: MemBackendRef): PutFilFn =
|
||||
result =
|
||||
proc(hdl: PutHdlRef; vf: openArray[(QueueID,FilterRef)]) =
|
||||
let hdl = hdl.getSession db
|
||||
if hdl.error.isNil:
|
||||
for (qid,filter) in vf:
|
||||
let rc = filter.blobify()
|
||||
if rc.isErr:
|
||||
hdl.error = TypedPutHdlErrRef(
|
||||
pfx: FilPfx,
|
||||
qid: qid,
|
||||
code: rc.error)
|
||||
return
|
||||
hdl.rFil[qid] = rc.value
|
||||
if db.noFq:
|
||||
result =
|
||||
proc(hdl: PutHdlRef; vf: openArray[(QueueID,FilterRef)]) =
|
||||
let hdl = hdl.getSession db
|
||||
if hdl.error.isNil:
|
||||
hdl.error = TypedPutHdlErrRef(
|
||||
pfx: FilPfx,
|
||||
qid: (if 0 < vf.len: vf[0][0] else: QueueID(0)),
|
||||
code: FilQuSchedDisabled)
|
||||
else:
|
||||
result =
|
||||
proc(hdl: PutHdlRef; vf: openArray[(QueueID,FilterRef)]) =
|
||||
let hdl = hdl.getSession db
|
||||
if hdl.error.isNil:
|
||||
for (qid,filter) in vf:
|
||||
if filter.isValid:
|
||||
let rc = filter.blobify()
|
||||
if rc.isErr:
|
||||
hdl.error = TypedPutHdlErrRef(
|
||||
pfx: FilPfx,
|
||||
qid: qid,
|
||||
code: rc.error)
|
||||
return
|
||||
hdl.rFil[qid] = rc.value
|
||||
else:
|
||||
hdl.rFil[qid] = EmptyBlob
|
||||
|
||||
proc putIdgFn(db: MemBackendRef): PutIdgFn =
|
||||
result =
|
||||
@ -176,11 +200,20 @@ proc putIdgFn(db: MemBackendRef): PutIdgFn =
|
||||
hdl.vGen = some(vs.toSeq)
|
||||
|
||||
proc putFqsFn(db: MemBackendRef): PutFqsFn =
|
||||
result =
|
||||
proc(hdl: PutHdlRef; fs: openArray[(QueueID,QueueID)]) =
|
||||
let hdl = hdl.getSession db
|
||||
if hdl.error.isNil:
|
||||
hdl.vFqs = some(fs.toSeq)
|
||||
if db.noFq:
|
||||
result =
|
||||
proc(hdl: PutHdlRef; fs: openArray[(QueueID,QueueID)]) =
|
||||
let hdl = hdl.getSession db
|
||||
if hdl.error.isNil:
|
||||
hdl.error = TypedPutHdlErrRef(
|
||||
pfx: AdmPfx,
|
||||
code: FilQuSchedDisabled)
|
||||
else:
|
||||
result =
|
||||
proc(hdl: PutHdlRef; fs: openArray[(QueueID,QueueID)]) =
|
||||
let hdl = hdl.getSession db
|
||||
if hdl.error.isNil:
|
||||
hdl.vFqs = some(fs.toSeq)
|
||||
|
||||
|
||||
proc putEndFn(db: MemBackendRef): PutEndFn =
|
||||
@ -213,7 +246,7 @@ proc putEndFn(db: MemBackendRef): PutEndFn =
|
||||
db.kMap.del vid
|
||||
|
||||
for (qid,data) in hdl.rFil.pairs:
|
||||
if qid.isValid:
|
||||
if 0 < data.len:
|
||||
db.rFil[qid] = data
|
||||
else:
|
||||
db.rFil.del qid
|
||||
@ -245,8 +278,10 @@ proc closeFn(db: MemBackendRef): CloseFn =
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc memoryBackend*(): BackendRef =
|
||||
let db = MemBackendRef(kind: BackendMemory)
|
||||
proc memoryBackend*(qidLayout: QidLayoutRef): BackendRef =
|
||||
let db = MemBackendRef(
|
||||
beKind: BackendMemory,
|
||||
noFq: qidLayout.isNil)
|
||||
|
||||
db.getVtxFn = getVtxFn db
|
||||
db.getKeyFn = getKeyFn db
|
||||
@ -264,6 +299,10 @@ proc memoryBackend*(): BackendRef =
|
||||
|
||||
db.closeFn = closeFn db
|
||||
|
||||
# Set up filter management table
|
||||
if not db.noFq:
|
||||
db.filters = QidSchedRef(ctx: qidLayout)
|
||||
|
||||
db
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
@ -296,14 +335,15 @@ iterator walkFil*(
|
||||
be: MemBackendRef;
|
||||
): tuple[n: int, qid: QueueID, filter: FilterRef] =
|
||||
## Iteration over the vertex sub-table.
|
||||
for n,qid in be.rFil.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.QueueID):
|
||||
let data = be.rFil.getOrDefault(qid, EmptyBlob)
|
||||
if 0 < data.len:
|
||||
let rc = data.deblobify FilterRef
|
||||
if rc.isErr:
|
||||
debug logTxt "walkFilFn() skip", n,qid, error=rc.error
|
||||
else:
|
||||
yield (n, qid, rc.value)
|
||||
if not be.noFq:
|
||||
for n,qid in be.rFil.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.QueueID):
|
||||
let data = be.rFil.getOrDefault(qid, EmptyBlob)
|
||||
if 0 < data.len:
|
||||
let rc = data.deblobify FilterRef
|
||||
if rc.isErr:
|
||||
debug logTxt "walkFilFn() skip", n,qid, error=rc.error
|
||||
else:
|
||||
yield (n, qid, rc.value)
|
||||
|
||||
|
||||
iterator walk*(
|
||||
@ -319,9 +359,10 @@ iterator walk*(
|
||||
yield(0, AdmPfx, AdmTabIdIdg.uint64, be.vGen.unsafeGet.blobify)
|
||||
n.inc
|
||||
|
||||
if be.vFqs.isSome:
|
||||
yield(0, AdmPfx, AdmTabIdFqs.uint64, be.vFqs.unsafeGet.blobify)
|
||||
n.inc
|
||||
if not be.noFq:
|
||||
if be.vFqs.isSome:
|
||||
yield(0, AdmPfx, AdmTabIdFqs.uint64, be.vFqs.unsafeGet.blobify)
|
||||
n.inc
|
||||
|
||||
for vid in be.sTab.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.VertexID):
|
||||
let data = be.sTab.getOrDefault(vid, EmptyBlob)
|
||||
@ -333,11 +374,12 @@ iterator walk*(
|
||||
yield (n, KeyPfx, vid.uint64, key.to(Blob))
|
||||
n.inc
|
||||
|
||||
for lid in be.rFil.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.QueueID):
|
||||
let data = be.rFil.getOrDefault(lid, EmptyBlob)
|
||||
if 0 < data.len:
|
||||
yield (n, FilPfx, lid.uint64, data)
|
||||
n.inc
|
||||
if not be.noFq:
|
||||
for lid in be.rFil.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.QueueID):
|
||||
let data = be.rFil.getOrDefault(lid, EmptyBlob)
|
||||
if 0 < data.len:
|
||||
yield (n, FilPfx, lid.uint64, data)
|
||||
n.inc
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
@ -26,9 +26,10 @@ type
|
||||
|
||||
export
|
||||
BackendType,
|
||||
VoidBackendRef,
|
||||
MemBackendRef,
|
||||
TypedBackendRef
|
||||
MemBackendRef
|
||||
|
||||
let
|
||||
DefaultQidLayoutRef* = DEFAULT_QID_QUEUES.to(QidLayoutRef)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public database constuctors, destructor
|
||||
@ -36,13 +37,20 @@ export
|
||||
|
||||
proc newAristoDbRef*(
|
||||
backend: static[BackendType];
|
||||
qidLayout = DefaultQidLayoutRef;
|
||||
): AristoDbRef =
|
||||
## Simplified prototype for `BackendNone` and `BackendMemory` type backend.
|
||||
##
|
||||
## If the `qidLayout` argument is set `QidLayoutRef(nil)`, the a backend
|
||||
## database will not provide filter history management. Providing a different
|
||||
## scheduler layout shoud be used with care as table access with different
|
||||
## layouts might render the filter history data unmanageable.
|
||||
##
|
||||
when backend == BackendVoid:
|
||||
AristoDbRef(top: LayerRef())
|
||||
|
||||
elif backend == BackendMemory:
|
||||
AristoDbRef(top: LayerRef(), backend: memoryBackend())
|
||||
AristoDbRef(top: LayerRef(), backend: memoryBackend(qidLayout))
|
||||
|
||||
elif backend == BackendRocksDB:
|
||||
{.error: "Aristo DB backend \"BackendRocksDB\" needs basePath argument".}
|
||||
@ -84,6 +92,16 @@ proc to*[W: TypedBackendRef|MemBackendRef|VoidBackendRef](
|
||||
## Handy helper for lew-level access to some backend functionality
|
||||
db.backend.T
|
||||
|
||||
proc kind*(
|
||||
be: BackendRef;
|
||||
): BackendType =
|
||||
## Retrieves the backend type symbol for a `TypedBackendRef` argument where
|
||||
## `BackendVoid` is returned for the`nil` backend.
|
||||
if be.isNil:
|
||||
BackendVoid
|
||||
else:
|
||||
be.TypedBackendRef.beKind
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -33,13 +33,20 @@ export
|
||||
proc newAristoDbRef*(
|
||||
backend: static[BackendType];
|
||||
basePath: string;
|
||||
qidLayout = DefaultQidLayoutRef;
|
||||
): Result[AristoDbRef, AristoError] =
|
||||
## Generic constructor, `basePath` argument is ignored for `BackendNone` and
|
||||
## `BackendMemory` type backend database. Also, both of these backends
|
||||
## aways succeed initialising.
|
||||
##
|
||||
## If the `qidLayout` argument is set `QidLayoutRef(nil)`, the a backend
|
||||
## database will not provide filter history management. Providing a different
|
||||
## scheduler layout shoud be used with care as table access with different
|
||||
## layouts might render the filter history data unmanageable.
|
||||
##
|
||||
when backend == BackendRocksDB:
|
||||
let be = block:
|
||||
let rc = rocksDbBackend basePath
|
||||
let rc = rocksDbBackend(basePath, qidLayout)
|
||||
if rc.isErr:
|
||||
return err(rc.error)
|
||||
rc.value
|
||||
|
@ -44,6 +44,7 @@ logScope:
|
||||
type
|
||||
RdbBackendRef* = ref object of TypedBackendRef
|
||||
rdb: RdbInst ## Allows low level access to database
|
||||
noFq: bool ## No filter queues available
|
||||
|
||||
RdbPutHdlRef = ref object of TypedPutHdlRef
|
||||
cache: RdbTabs ## Transaction cache
|
||||
@ -130,21 +131,26 @@ proc getKeyFn(db: RdbBackendRef): GetKeyFn =
|
||||
err(GetKeyNotFound)
|
||||
|
||||
proc getFilFn(db: RdbBackendRef): GetFilFn =
|
||||
result =
|
||||
proc(qid: QueueID): Result[FilterRef,AristoError] =
|
||||
if db.noFq:
|
||||
result =
|
||||
proc(qid: QueueID): Result[FilterRef,AristoError] =
|
||||
err(FilQuSchedDisabled)
|
||||
else:
|
||||
result =
|
||||
proc(qid: QueueID): Result[FilterRef,AristoError] =
|
||||
|
||||
# Fetch serialised data record
|
||||
let rc = db.rdb.get qid.toOpenArray()
|
||||
if rc.isErr:
|
||||
debug logTxt "getFilFn: failed", qid,
|
||||
error=rc.error[0], info=rc.error[1]
|
||||
return err(rc.error[0])
|
||||
# Fetch serialised data record
|
||||
let rc = db.rdb.get qid.toOpenArray()
|
||||
if rc.isErr:
|
||||
debug logTxt "getFilFn: failed", qid,
|
||||
error=rc.error[0], info=rc.error[1]
|
||||
return err(rc.error[0])
|
||||
|
||||
# Decode data record
|
||||
if 0 < rc.value.len:
|
||||
return rc.value.deblobify FilterRef
|
||||
# Decode data record
|
||||
if 0 < rc.value.len:
|
||||
return rc.value.deblobify FilterRef
|
||||
|
||||
err(GetFilNotFound)
|
||||
err(GetFilNotFound)
|
||||
|
||||
proc getIdgFn(db: RdbBackendRef): GetIdgFn =
|
||||
result =
|
||||
@ -164,21 +170,26 @@ proc getIdgFn(db: RdbBackendRef): GetIdgFn =
|
||||
rc.value.deblobify seq[VertexID]
|
||||
|
||||
proc getFqsFn(db: RdbBackendRef): GetFqsFn =
|
||||
result =
|
||||
proc(): Result[seq[(QueueID,QueueID)],AristoError]=
|
||||
if db.noFq:
|
||||
result =
|
||||
proc(): Result[seq[(QueueID,QueueID)],AristoError] =
|
||||
err(FilQuSchedDisabled)
|
||||
else:
|
||||
result =
|
||||
proc(): Result[seq[(QueueID,QueueID)],AristoError]=
|
||||
|
||||
# Fetch serialised data record
|
||||
let rc = db.rdb.get AdmTabIdFqs.toOpenArray()
|
||||
if rc.isErr:
|
||||
debug logTxt "getFosFn: failed", error=rc.error[1]
|
||||
return err(rc.error[0])
|
||||
# Fetch serialised data record
|
||||
let rc = db.rdb.get AdmTabIdFqs.toOpenArray()
|
||||
if rc.isErr:
|
||||
debug logTxt "getFqsFn: failed", error=rc.error[1]
|
||||
return err(rc.error[0])
|
||||
|
||||
if rc.value.len == 0:
|
||||
let w = EmptyQidPairSeq
|
||||
return ok w
|
||||
if rc.value.len == 0:
|
||||
let w = EmptyQidPairSeq
|
||||
return ok w
|
||||
|
||||
# Decode data record
|
||||
rc.value.deblobify seq[(QueueID,QueueID)]
|
||||
# Decode data record
|
||||
rc.value.deblobify seq[(QueueID,QueueID)]
|
||||
|
||||
# -------------
|
||||
|
||||
@ -218,22 +229,32 @@ proc putKeyFn(db: RdbBackendRef): PutKeyFn =
|
||||
hdl.keyCache = (vid, EmptyBlob)
|
||||
|
||||
proc putFilFn(db: RdbBackendRef): PutFilFn =
|
||||
result =
|
||||
proc(hdl: PutHdlRef; vrps: openArray[(QueueID,FilterRef)]) =
|
||||
let hdl = hdl.getSession db
|
||||
if hdl.error.isNil:
|
||||
for (qid,filter) in vrps:
|
||||
if filter.isValid:
|
||||
let rc = filter.blobify()
|
||||
if rc.isErr:
|
||||
hdl.error = TypedPutHdlErrRef(
|
||||
pfx: FilPfx,
|
||||
qid: qid,
|
||||
code: rc.error)
|
||||
return
|
||||
hdl.filCache = (qid, rc.value)
|
||||
else:
|
||||
hdl.filCache = (qid, EmptyBlob)
|
||||
if db.noFq:
|
||||
result =
|
||||
proc(hdl: PutHdlRef; vf: openArray[(QueueID,FilterRef)]) =
|
||||
let hdl = hdl.getSession db
|
||||
if hdl.error.isNil:
|
||||
hdl.error = TypedPutHdlErrRef(
|
||||
pfx: FilPfx,
|
||||
qid: (if 0 < vf.len: vf[0][0] else: QueueID(0)),
|
||||
code: FilQuSchedDisabled)
|
||||
else:
|
||||
result =
|
||||
proc(hdl: PutHdlRef; vrps: openArray[(QueueID,FilterRef)]) =
|
||||
let hdl = hdl.getSession db
|
||||
if hdl.error.isNil:
|
||||
for (qid,filter) in vrps:
|
||||
if filter.isValid:
|
||||
let rc = filter.blobify()
|
||||
if rc.isErr:
|
||||
hdl.error = TypedPutHdlErrRef(
|
||||
pfx: FilPfx,
|
||||
qid: qid,
|
||||
code: rc.error)
|
||||
return
|
||||
hdl.filCache = (qid, rc.value)
|
||||
else:
|
||||
hdl.filCache = (qid, EmptyBlob)
|
||||
|
||||
proc putIdgFn(db: RdbBackendRef): PutIdgFn =
|
||||
result =
|
||||
@ -246,14 +267,23 @@ proc putIdgFn(db: RdbBackendRef): PutIdgFn =
|
||||
hdl.admCache = (AdmTabIdIdg, EmptyBlob)
|
||||
|
||||
proc putFqsFn(db: RdbBackendRef): PutFqsFn =
|
||||
result =
|
||||
proc(hdl: PutHdlRef; vs: openArray[(QueueID,QueueID)]) =
|
||||
let hdl = hdl.getSession db
|
||||
if hdl.error.isNil:
|
||||
if 0 < vs.len:
|
||||
hdl.admCache = (AdmTabIdFqs, vs.blobify)
|
||||
else:
|
||||
hdl.admCache = (AdmTabIdFqs, EmptyBlob)
|
||||
if db.noFq:
|
||||
result =
|
||||
proc(hdl: PutHdlRef; fs: openArray[(QueueID,QueueID)]) =
|
||||
let hdl = hdl.getSession db
|
||||
if hdl.error.isNil:
|
||||
hdl.error = TypedPutHdlErrRef(
|
||||
pfx: AdmPfx,
|
||||
code: FilQuSchedDisabled)
|
||||
else:
|
||||
result =
|
||||
proc(hdl: PutHdlRef; vs: openArray[(QueueID,QueueID)]) =
|
||||
let hdl = hdl.getSession db
|
||||
if hdl.error.isNil:
|
||||
if 0 < vs.len:
|
||||
hdl.admCache = (AdmTabIdFqs, vs.blobify)
|
||||
else:
|
||||
hdl.admCache = (AdmTabIdFqs, EmptyBlob)
|
||||
|
||||
|
||||
proc putEndFn(db: RdbBackendRef): PutEndFn =
|
||||
@ -287,15 +317,22 @@ proc closeFn(db: RdbBackendRef): CloseFn =
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc rocksDbBackend*(path: string): Result[BackendRef,AristoError] =
|
||||
let
|
||||
db = RdbBackendRef(kind: BackendRocksDB)
|
||||
rc = db.rdb.init(path, maxOpenFiles)
|
||||
if rc.isErr:
|
||||
when extraTraceMessages:
|
||||
trace logTxt "constructor failed",
|
||||
error=rc.error[0], info=rc.error[1]
|
||||
return err(rc.error[0])
|
||||
proc rocksDbBackend*(
|
||||
path: string;
|
||||
qidLayout: QidLayoutRef;
|
||||
): Result[BackendRef,AristoError] =
|
||||
let db = RdbBackendRef(
|
||||
beKind: BackendRocksDB,
|
||||
noFq: qidLayout.isNil)
|
||||
|
||||
# Initialise RocksDB
|
||||
block:
|
||||
let rc = db.rdb.init(path, maxOpenFiles)
|
||||
if rc.isErr:
|
||||
when extraTraceMessages:
|
||||
trace logTxt "constructor failed",
|
||||
error=rc.error[0], info=rc.error[1]
|
||||
return err(rc.error[0])
|
||||
|
||||
db.getVtxFn = getVtxFn db
|
||||
db.getKeyFn = getKeyFn db
|
||||
@ -313,6 +350,16 @@ proc rocksDbBackend*(path: string): Result[BackendRef,AristoError] =
|
||||
|
||||
db.closeFn = closeFn db
|
||||
|
||||
# Set up filter management table
|
||||
if not db.noFq:
|
||||
db.filters = QidSchedRef(ctx: qidLayout)
|
||||
db.filters.state = block:
|
||||
let rc = db.getFqsFn()
|
||||
if rc.isErr:
|
||||
db.closeFn(flush = false)
|
||||
return err(rc.error)
|
||||
rc.value
|
||||
|
||||
ok db
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
@ -326,8 +373,20 @@ iterator walk*(
|
||||
##
|
||||
## Non-decodable entries are stepped over while the counter `n` of the
|
||||
## yield record is still incremented.
|
||||
for w in be.rdb.walk:
|
||||
yield w
|
||||
if be.noFq:
|
||||
for w in be.rdb.walk:
|
||||
case w.pfx:
|
||||
of AdmPfx:
|
||||
if w.xid == AdmTabIdFqs.uint64:
|
||||
continue
|
||||
of FilPfx:
|
||||
break # last sub-table
|
||||
else:
|
||||
discard
|
||||
yield w
|
||||
else:
|
||||
for w in be.rdb.walk:
|
||||
yield w
|
||||
|
||||
iterator walkVtx*(
|
||||
be: RdbBackendRef;
|
||||
@ -351,10 +410,11 @@ iterator walkFil*(
|
||||
be: RdbBackendRef;
|
||||
): tuple[n: int, qid: QueueID, filter: FilterRef] =
|
||||
## Variant of `walk()` iteration over the filter sub-table.
|
||||
for (n, xid, data) in be.rdb.walk FilPfx:
|
||||
let rc = data.deblobify FilterRef
|
||||
if rc.isOk:
|
||||
yield (n, QueueID(xid), rc.value)
|
||||
if not be.noFq:
|
||||
for (n, xid, data) in be.rdb.walk FilPfx:
|
||||
let rc = data.deblobify FilterRef
|
||||
if rc.isOk:
|
||||
yield (n, QueueID(xid), rc.value)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
@ -220,6 +220,8 @@ proc blobify*(vtx: VertexRef; data: var Blob): AristoError =
|
||||
## ::
|
||||
## 8 * n * ((access shr (n * 4)) and 15)
|
||||
##
|
||||
if not vtx.isValid:
|
||||
return BlobifyNilVertex
|
||||
case vtx.vType:
|
||||
of Branch:
|
||||
var
|
||||
@ -280,6 +282,7 @@ proc blobify*(vGen: openArray[VertexID]): Blob =
|
||||
proc blobify*(filter: FilterRef; data: var Blob): AristoError =
|
||||
## This function serialises an Aristo DB filter object
|
||||
## ::
|
||||
## uint64 -- filter ID
|
||||
## Uint256 -- source key
|
||||
## Uint256 -- target key
|
||||
## uint32 -- number of vertex IDs (vertex ID generator state)
|
||||
@ -295,8 +298,10 @@ proc blobify*(filter: FilterRef; data: var Blob): AristoError =
|
||||
## ... -- more triplets
|
||||
## 0x7d -- marker(8)
|
||||
##
|
||||
##
|
||||
if not filter.isValid:
|
||||
return BlobifyNilFilter
|
||||
data.setLen(0)
|
||||
data &= filter.fid.uint64.toBytesBE.toSeq
|
||||
data &= filter.src.ByteArray32.toSeq
|
||||
data &= filter.trg.ByteArray32.toSeq
|
||||
|
||||
@ -366,7 +371,7 @@ proc blobify*(filter: FilterRef; data: var Blob): AristoError =
|
||||
vid.uint64.toBytesBE.toSeq &
|
||||
keyBlob
|
||||
|
||||
data[68 ..< 72] = n.uint32.toBytesBE.toSeq
|
||||
data[76 ..< 80] = n.uint32.toBytesBE.toSeq
|
||||
data.add 0x7Du8
|
||||
|
||||
proc blobify*(filter: FilterRef): Result[Blob, AristoError] =
|
||||
@ -564,27 +569,27 @@ proc deblobify*(data: Blob; T: type seq[VertexID]): Result[T,AristoError] =
|
||||
return err(info)
|
||||
ok vGen
|
||||
|
||||
|
||||
proc deblobify*(data: Blob; filter: var FilterRef): AristoError =
|
||||
## De-serialise an Aristo DB filter object
|
||||
if data.len < 72: # minumum length 72 for an empty filter
|
||||
if data.len < 80: # minumum length 80 for an empty filter
|
||||
return DeblobFilterTooShort
|
||||
if data[^1] != 0x7d:
|
||||
return DeblobWrongType
|
||||
|
||||
let f = FilterRef()
|
||||
(addr f.src.ByteArray32[0]).copyMem(unsafeAddr data[0], 32)
|
||||
(addr f.trg.ByteArray32[0]).copyMem(unsafeAddr data[32], 32)
|
||||
f.fid = (uint64.fromBytesBE data[0 ..< 8]).FilterID
|
||||
(addr f.src.ByteArray32[0]).copyMem(unsafeAddr data[8], 32)
|
||||
(addr f.trg.ByteArray32[0]).copyMem(unsafeAddr data[40], 32)
|
||||
|
||||
let
|
||||
nVids = uint32.fromBytesBE data[64 ..< 68]
|
||||
nTriplets = uint32.fromBytesBE data[68 ..< 72]
|
||||
nTrplStart = (72 + nVids * 8).int
|
||||
nVids = uint32.fromBytesBE data[72 ..< 76]
|
||||
nTriplets = uint32.fromBytesBE data[76 ..< 80]
|
||||
nTrplStart = (80 + nVids * 8).int
|
||||
|
||||
if data.len < nTrplStart:
|
||||
return DeblobFilterGenTooShort
|
||||
for n in 0 ..< nVids:
|
||||
let w = 72 + n * 8
|
||||
let w = 80 + n * 8
|
||||
f.vGen.add (uint64.fromBytesBE data[w ..< w + 8]).VertexID
|
||||
|
||||
var offs = nTrplStart
|
||||
|
@ -42,14 +42,20 @@ iterator walkKeyBe*[T: MemBackendRef|VoidBackendRef](
|
||||
for (n,vid,key) in db.to(T).walkKeyBeImpl db:
|
||||
yield (n,vid,key)
|
||||
|
||||
iterator walkFilBe*[T: MemBackendRef|VoidBackendRef](
|
||||
_: type T;
|
||||
db: AristoDbRef;
|
||||
iterator walkFilBe*(
|
||||
be: MemBackendRef|VoidBackendRef;
|
||||
): tuple[n: int, qid: QueueID, filter: FilterRef] =
|
||||
## Similar to `walkVtxBe()` but for filters.
|
||||
for (n,qid,filter) in db.to(T).walkFilBeImpl db:
|
||||
## Iterate over backend filters.
|
||||
for (n,qid,filter) in be.walkFilBeImpl:
|
||||
yield (n,qid,filter)
|
||||
|
||||
iterator walkFifoBe*(
|
||||
be: MemBackendRef|VoidBackendRef;
|
||||
): (QueueID,FilterRef) =
|
||||
## Walk filter slots in fifo order.
|
||||
for (qid,filter) in be.walkFifoBeImpl:
|
||||
yield (qid,filter)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -48,13 +48,19 @@ iterator walkKeyBe*(
|
||||
yield (n,vid,key)
|
||||
|
||||
iterator walkFilBe*(
|
||||
T: type RdbBackendRef;
|
||||
db: AristoDbRef;
|
||||
be: RdbBackendRef;
|
||||
): tuple[n: int, qid: QueueID, filter: FilterRef] =
|
||||
## Similar to `walkVtxBe()` but for filters.
|
||||
for (n,qid,filter) in db.to(T).walkFilBeImpl db:
|
||||
## Iterate over backend filters.
|
||||
for (n,qid,filter) in be.walkFilBeImpl:
|
||||
yield (n,qid,filter)
|
||||
|
||||
iterator walkFifoBe*(
|
||||
be: RdbBackendRef;
|
||||
): (QueueID,FilterRef) =
|
||||
## Walk filter slots in fifo order.
|
||||
for (qid,filter) in be.walkFifoBeImpl:
|
||||
yield (qid,filter)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -11,6 +11,7 @@
|
||||
|
||||
import
|
||||
std/[algorithm, sequtils, tables],
|
||||
results,
|
||||
".."/[aristo_desc, aristo_init]
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
@ -89,7 +90,6 @@ iterator walkKeyBeImpl*[T](
|
||||
|
||||
iterator walkFilBeImpl*[T](
|
||||
be: T; # Backend descriptor
|
||||
db: AristoDbRef; # Database with optional backend filter
|
||||
): tuple[n: int, qid: QueueID, filter: FilterRef] =
|
||||
## Generic filter iterator
|
||||
when be isnot VoidBackendRef:
|
||||
@ -98,6 +98,33 @@ iterator walkFilBeImpl*[T](
|
||||
for (n,qid,filter) in be.walkFil:
|
||||
yield (n,qid,filter)
|
||||
|
||||
|
||||
iterator walkFifoBeImpl*[T](
|
||||
be: T; # Backend descriptor
|
||||
): (QueueID,FilterRef) =
|
||||
## Generic filter iterator walking slots in fifo order. This iterator does
|
||||
## not depend on the backend type but may be type restricted nevertheless.
|
||||
when be isnot VoidBackendRef:
|
||||
proc kvp(chn: int, qid: QueueID): (QueueID,FilterRef) =
|
||||
let cid = QueueID((chn.uint64 shl 62) or qid.uint64)
|
||||
(cid, be.getFilFn(cid).get(otherwise = FilterRef(nil)))
|
||||
|
||||
if not be.isNil:
|
||||
let scd = be.filters
|
||||
if not scd.isNil:
|
||||
for i in 0 ..< scd.state.len:
|
||||
let (left, right) = scd.state[i]
|
||||
if left == 0:
|
||||
discard
|
||||
elif left <= right:
|
||||
for j in right.countDown left:
|
||||
yield kvp(i, j)
|
||||
else:
|
||||
for j in right.countDown QueueID(1):
|
||||
yield kvp(i, j)
|
||||
for j in scd.ctx.q[i].wrap.countDown left:
|
||||
yield kvp(i, j)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -74,30 +74,21 @@ proc mergeData(
|
||||
## Simplified loop body of `test_mergeProofAndKvpList()`
|
||||
if 0 < proof.len:
|
||||
let rc = db.merge(rootKey, rootVid)
|
||||
if rc.isErr:
|
||||
check rc.error == AristoError(0)
|
||||
return
|
||||
xCheckRc rc.error == 0
|
||||
|
||||
let proved = db.merge(proof, rc.value)
|
||||
if proved.error notin {AristoError(0),MergeHashKeyCachedAlready}:
|
||||
check proved.error in {AristoError(0),MergeHashKeyCachedAlready}
|
||||
return
|
||||
xCheck proved.error in {AristoError(0),MergeHashKeyCachedAlready}
|
||||
|
||||
let merged = db.merge leafs
|
||||
if merged.error notin {AristoError(0), MergeLeafPathCachedAlready}:
|
||||
check merged.error in {AristoError(0), MergeLeafPathCachedAlready}
|
||||
return
|
||||
xCheck merged.error in {AristoError(0), MergeLeafPathCachedAlready}
|
||||
|
||||
block:
|
||||
let rc = db.hashify # (noisy, true)
|
||||
if rc.isErr:
|
||||
when true: # and false:
|
||||
noisy.say "***", "dataMerge(9)",
|
||||
" nLeafs=", leafs.len,
|
||||
"\n cache dump\n ", db.pp,
|
||||
"\n backend dump\n ", db.to(TypedBackendRef).pp(db)
|
||||
check rc.error == (VertexID(0),AristoError(0))
|
||||
return
|
||||
xCheckRc rc.error == (0,0):
|
||||
noisy.say "***", "dataMerge(9)",
|
||||
" nLeafs=", leafs.len,
|
||||
"\n cache dump\n ", db.pp,
|
||||
"\n backend dump\n ", db.backend.pp(db)
|
||||
|
||||
true
|
||||
|
||||
@ -116,24 +107,18 @@ proc verify(
|
||||
let
|
||||
nVtx = ly.sTab.getOrVoid vid
|
||||
mVtx = beSTab.getOrVoid vid
|
||||
if not nVtx.isValid and not mVtx.isValid:
|
||||
check nVtx != VertexRef(nil)
|
||||
check mVtx != VertexRef(nil)
|
||||
return
|
||||
if nVtx != mVtx:
|
||||
|
||||
xCheck (nVtx != VertexRef(nil))
|
||||
xCheck (mVtx != VertexRef(nil))
|
||||
xCheck nVtx == mVtx:
|
||||
noisy.say "***", "verify",
|
||||
" beType=", be.typeof,
|
||||
" vid=", vid.pp,
|
||||
" nVtx=", nVtx.pp,
|
||||
" mVtx=", mVtx.pp
|
||||
check nVtx == mVtx
|
||||
return
|
||||
|
||||
if beSTab.len != ly.sTab.len or
|
||||
beKMap.len != ly.kMap.len:
|
||||
check beSTab.len == ly.sTab.len
|
||||
check beKMap.len == ly.kMap.len
|
||||
return
|
||||
xCheck beSTab.len == ly.sTab.len
|
||||
xCheck beKMap.len == ly.kMap.len
|
||||
|
||||
true
|
||||
|
||||
@ -154,43 +139,35 @@ proc collectFilter(
|
||||
|
||||
be.putFilFn(tx, @[(fid,filter)])
|
||||
let endOk = be.putEndFn tx
|
||||
if endOk != AristoError(0):
|
||||
check endOk == AristoError(0)
|
||||
return
|
||||
xCheck endOk == AristoError(0)
|
||||
|
||||
tab[fid] = filter.hash
|
||||
|
||||
true
|
||||
|
||||
proc verifyFiltersImpl[T: MemBackendRef|RdbBackendRef](
|
||||
_: type T;
|
||||
db: AristoDbRef;
|
||||
proc verifyFiltersImpl[T](
|
||||
be: T;
|
||||
tab: Table[QueueID,Hash];
|
||||
noisy: bool;
|
||||
): bool =
|
||||
## Compare stored filters against registered ones
|
||||
var n = 0
|
||||
for (_,fid,filter) in T.walkFilBe db:
|
||||
for (_,fid,filter) in be.walkFilBe:
|
||||
let
|
||||
filterHash = filter.hash
|
||||
registered = tab.getOrDefault(fid, BlindHash)
|
||||
if registered == BlindHash:
|
||||
check (fid,registered) != (0,BlindHash)
|
||||
return
|
||||
if filterHash != registered:
|
||||
|
||||
xCheck (registered != BlindHash)
|
||||
xCheck registered == filterHash:
|
||||
noisy.say "***", "verifyFiltersImpl",
|
||||
" n=", n+1,
|
||||
" fid=", fid.pp,
|
||||
" filterHash=", filterHash.int.toHex,
|
||||
" registered=", registered.int.toHex
|
||||
check (fid,filterHash) == (fid,registered)
|
||||
return
|
||||
|
||||
n.inc
|
||||
|
||||
if n != tab.len:
|
||||
check n == tab.len
|
||||
return
|
||||
|
||||
xCheck n == tab.len
|
||||
true
|
||||
|
||||
proc verifyFilters(
|
||||
@ -199,23 +176,20 @@ proc verifyFilters(
|
||||
noisy: bool;
|
||||
): bool =
|
||||
## Wrapper
|
||||
let
|
||||
be = db.to(TypedBackendRef)
|
||||
kind = (if be.isNil: BackendVoid else: be.kind)
|
||||
case kind:
|
||||
case db.backend.kind:
|
||||
of BackendMemory:
|
||||
return MemBackendRef.verifyFiltersImpl(db, tab, noisy)
|
||||
return db.to(MemBackendRef).verifyFiltersImpl(tab, noisy)
|
||||
of BackendRocksDB:
|
||||
return RdbBackendRef.verifyFiltersImpl(db, tab, noisy)
|
||||
return db.to(RdbBackendRef).verifyFiltersImpl(tab, noisy)
|
||||
else:
|
||||
discard
|
||||
check kind == BackendMemory or kind == BackendRocksDB
|
||||
check db.backend.kind == BackendMemory or db.backend.kind == BackendRocksDB
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public test function
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc test_backendConsistency*(
|
||||
proc testBackendConsistency*(
|
||||
noisy: bool;
|
||||
list: openArray[ProofTrieData]; # Test data
|
||||
rdbPath: string; # Rocks DB storage directory
|
||||
@ -240,24 +214,25 @@ proc test_backendConsistency*(
|
||||
count = 0
|
||||
ndb = newAristoDbRef BackendVoid
|
||||
mdb = newAristoDbRef BackendMemory
|
||||
|
||||
if doRdbOk:
|
||||
if not rdb.backend.isNil: # ignore bootstrap
|
||||
let verifyFiltersOk = rdb.verifyFilters(filTab, noisy)
|
||||
if not verifyFiltersOk:
|
||||
check verifyFiltersOk
|
||||
return
|
||||
xCheck verifyFiltersOk
|
||||
filTab.clear
|
||||
rdb.finish(flush=true)
|
||||
let rc = newAristoDbRef(BackendRocksDB,rdbPath)
|
||||
if rc.isErr:
|
||||
check rc.error == 0
|
||||
return
|
||||
let rc = newAristoDbRef(BackendRocksDB, rdbPath)
|
||||
xCheckRc rc.error == 0
|
||||
rdb = rc.value
|
||||
|
||||
# Disable automated filter management, still allow filter table access
|
||||
# for low level read/write testing.
|
||||
rdb.backend.filters = QidSchedRef(nil)
|
||||
count.inc
|
||||
|
||||
check ndb.backend.isNil
|
||||
check not mdb.backend.isNil
|
||||
check doRdbOk or not rdb.backend.isNil
|
||||
xCheck ndb.backend.isNil
|
||||
xCheck not mdb.backend.isNil
|
||||
xCheck doRdbOk or not rdb.backend.isNil
|
||||
|
||||
when true and false:
|
||||
noisy.say "***", "beCon(1) <", n, "/", list.len-1, ">", " groups=", count
|
||||
@ -270,33 +245,27 @@ proc test_backendConsistency*(
|
||||
block:
|
||||
let ndbOk = ndb.mergeData(
|
||||
rootKey, rootVid, w.proof, leafs, noisy=false)
|
||||
if not ndbOk:
|
||||
check ndbOk
|
||||
return
|
||||
xCheck ndbOk
|
||||
block:
|
||||
let mdbOk = mdb.mergeData(
|
||||
rootKey, rootVid, w.proof, leafs, noisy=false)
|
||||
if not mdbOk:
|
||||
check mdbOk
|
||||
return
|
||||
xCheck mdbOk
|
||||
if doRdbOk: # optional
|
||||
let rdbOk = rdb.mergeData(
|
||||
rootKey, rootVid, w.proof, leafs, noisy=false)
|
||||
if not rdbOk:
|
||||
check rdbOk
|
||||
return
|
||||
xCheck rdbOk
|
||||
|
||||
when true and false:
|
||||
noisy.say "***", "beCon(2) <", n, "/", list.len-1, ">",
|
||||
" groups=", count,
|
||||
"\n cache dump\n ", ndb.pp,
|
||||
"\n backend dump\n ", ndb.to(TypedBackendRef).pp(ndb),
|
||||
"\n backend dump\n ", ndb.backend.pp(ndb),
|
||||
"\n -------------",
|
||||
"\n mdb cache\n ", mdb.pp,
|
||||
"\n mdb backend\n ", mdb.to(TypedBackendRef).pp(ndb),
|
||||
"\n mdb backend\n ", mdb.backend.pp(ndb),
|
||||
"\n -------------",
|
||||
"\n rdb cache\n ", rdb.pp,
|
||||
"\n rdb backend\n ", rdb.to(TypedBackendRef).pp(ndb),
|
||||
"\n rdb backend\n ", rdb.backend.pp(ndb),
|
||||
"\n -------------"
|
||||
|
||||
when true and false:
|
||||
@ -315,45 +284,39 @@ proc test_backendConsistency*(
|
||||
# Provide filter, store filter on permanent BE, and register filter digest
|
||||
block:
|
||||
let rc = mdb.stow(persistent=false, dontHashify=true, chunkedMpt=true)
|
||||
if rc.isErr:
|
||||
check rc.error == (0,0)
|
||||
return
|
||||
xCheckRc rc.error == (0,0)
|
||||
let collectFilterOk = rdb.collectFilter(mdb.roFilter, filTab, noisy)
|
||||
if not collectFilterOk:
|
||||
check collectFilterOk
|
||||
return
|
||||
xCheck collectFilterOk
|
||||
|
||||
# Store onto backend database
|
||||
block:
|
||||
#noisy.say "***", "db-dump\n ", mdb.pp
|
||||
let rc = mdb.stow(persistent=true, dontHashify=true, chunkedMpt=true)
|
||||
if rc.isErr:
|
||||
check rc.error == (0,0)
|
||||
return
|
||||
xCheckRc rc.error == (0,0)
|
||||
|
||||
if doRdbOk:
|
||||
let rc = rdb.stow(persistent=true, dontHashify=true, chunkedMpt=true)
|
||||
if rc.isErr:
|
||||
check rc.error == (0,0)
|
||||
return
|
||||
xCheckRc rc.error == (0,0)
|
||||
|
||||
if not ndb.top.verify(mdb.to(MemBackendRef), noisy):
|
||||
when true and false:
|
||||
noisy.say "***", "beCon(4) <", n, "/", list.len-1, ">",
|
||||
" groups=", count,
|
||||
"\n ndb cache\n ", ndb.pp,
|
||||
"\n ndb backend=", ndb.backend.isNil.not,
|
||||
#"\n -------------",
|
||||
#"\n mdb pre-save cache\n ", mdbPreSaveCache,
|
||||
#"\n mdb pre-save backend\n ", mdbPreSaveBackend,
|
||||
"\n -------------",
|
||||
"\n mdb cache\n ", mdb.pp,
|
||||
"\n mdb backend\n ", mdb.to(TypedBackendRef).pp(ndb),
|
||||
"\n -------------"
|
||||
return
|
||||
block:
|
||||
let mdbVerifyOk = ndb.top.verify(mdb.to(MemBackendRef), noisy)
|
||||
xCheck mdbVerifyOk:
|
||||
when true and false:
|
||||
noisy.say "***", "beCon(4) <", n, "/", list.len-1, ">",
|
||||
" groups=", count,
|
||||
"\n ndb cache\n ", ndb.pp,
|
||||
"\n ndb backend=", ndb.backend.isNil.not,
|
||||
#"\n -------------",
|
||||
#"\n mdb pre-save cache\n ", mdbPreSaveCache,
|
||||
#"\n mdb pre-save backend\n ", mdbPreSaveBackend,
|
||||
"\n -------------",
|
||||
"\n mdb cache\n ", mdb.pp,
|
||||
"\n mdb backend\n ", mdb.backend.pp(ndb),
|
||||
"\n -------------"
|
||||
|
||||
if doRdbOk:
|
||||
if not ndb.top.verify(rdb.to(RdbBackendRef), noisy):
|
||||
let rdbVerifyOk = ndb.top.verify(rdb.to(RdbBackendRef), noisy)
|
||||
xCheck rdbVerifyOk:
|
||||
when true and false:
|
||||
noisy.say "***", "beCon(4) <", n, "/", list.len-1, ">",
|
||||
" groups=", count,
|
||||
@ -364,12 +327,11 @@ proc test_backendConsistency*(
|
||||
"\n rdb pre-save backend\n ", rdbPreSaveBackend,
|
||||
"\n -------------",
|
||||
"\n rdb cache\n ", rdb.pp,
|
||||
"\n rdb backend\n ", rdb.to(TypedBackendRef).pp(ndb),
|
||||
"\n rdb backend\n ", rdb.backend.pp(ndb),
|
||||
#"\n -------------",
|
||||
#"\n mdb cache\n ", mdb.pp,
|
||||
#"\n mdb backend\n ", mdb.to(TypedBackendRef).pp(ndb),
|
||||
#"\n mdb backend\n ", mdb.backend.pp(ndb),
|
||||
"\n -------------"
|
||||
return
|
||||
|
||||
when true and false:
|
||||
noisy.say "***", "beCon(9) <", n, "/", list.len-1, ">", " groups=", count
|
||||
@ -377,9 +339,7 @@ proc test_backendConsistency*(
|
||||
# Finally ...
|
||||
block:
|
||||
let verifyFiltersOk = rdb.verifyFilters(filTab, noisy)
|
||||
if not verifyFiltersOk:
|
||||
check verifyFiltersOk
|
||||
return
|
||||
xCheck verifyFiltersOk
|
||||
|
||||
true
|
||||
|
||||
|
@ -19,7 +19,8 @@ import
|
||||
../../nimbus/db/aristo/[
|
||||
aristo_check, aristo_debug, aristo_desc, aristo_filter, aristo_get,
|
||||
aristo_merge, aristo_transcode],
|
||||
../../nimbus/db/[aristo, aristo/aristo_init/persistent],
|
||||
../../nimbus/db/aristo,
|
||||
../../nimbus/db/aristo/aristo_init/persistent,
|
||||
./test_helpers
|
||||
|
||||
type
|
||||
@ -35,7 +36,7 @@ type
|
||||
|
||||
proc dump(pfx: string; dx: varargs[AristoDbRef]): string =
|
||||
proc dump(db: AristoDbRef): string =
|
||||
db.pp & "\n " & db.to(TypedBackendRef).pp(db) & "\n"
|
||||
db.pp & "\n " & db.backend.pp(db) & "\n"
|
||||
if 0 < dx.len:
|
||||
result = "\n "
|
||||
var
|
||||
@ -95,6 +96,7 @@ iterator quadripartite(td: openArray[ProofTrieData]): LeafQuartet =
|
||||
proc dbTriplet(w: LeafQuartet; rdbPath: string): Result[DbTriplet,AristoError] =
|
||||
let db = block:
|
||||
let rc = newAristoDbRef(BackendRocksDB,rdbPath)
|
||||
xCheckRc rc.error == 0
|
||||
if rc.isErr:
|
||||
check rc.error == 0
|
||||
return
|
||||
@ -271,10 +273,9 @@ proc checkBeOk(
|
||||
let
|
||||
cache = if forceCache: true else: not dx[n].top.dirty
|
||||
rc = dx[n].checkBE(relax=relax, cache=cache)
|
||||
if rc.isErr:
|
||||
xCheckRc rc.error == (0,0):
|
||||
noisy.say "***", "db check failed", " n=", n, " cache=", cache
|
||||
check (n, rc.error[0], rc.error[1]) == (n, 0, 0)
|
||||
return
|
||||
|
||||
true
|
||||
|
||||
proc checkFilterTrancoderOk(
|
||||
@ -286,26 +287,23 @@ proc checkFilterTrancoderOk(
|
||||
if dx[n].roFilter.isValid:
|
||||
let data = block:
|
||||
let rc = dx[n].roFilter.blobify()
|
||||
if rc.isErr:
|
||||
xCheckRc rc.error == 0:
|
||||
noisy.say "***", "db serialisation failed",
|
||||
" n=", n, " error=", rc.error
|
||||
check rc.error == 0
|
||||
return
|
||||
rc.value
|
||||
|
||||
let dcdRoundTrip = block:
|
||||
let rc = data.deblobify FilterRef
|
||||
if rc.isErr:
|
||||
xCheckRc rc.error == 0:
|
||||
noisy.say "***", "db de-serialisation failed",
|
||||
" n=", n, " error=", rc.error
|
||||
check rc.error == 0
|
||||
return
|
||||
rc.value
|
||||
if not dx[n].roFilter.isEq(dcdRoundTrip, dx[n], noisy):
|
||||
#noisy.say "***", "checkFilterTrancoderOk failed",
|
||||
# "\n roFilter=", dx[n].roFilter.pp(dx[n]),
|
||||
# "\n dcdRoundTrip=", dcdRoundTrip.pp(dx[n])
|
||||
check (n,dx[n].roFilter) == (n,dcdRoundTrip)
|
||||
return
|
||||
|
||||
let roFilterExRoundTrip = dx[n].roFilter.isEq(dcdRoundTrip, dx[n], noisy)
|
||||
xCheck roFilterExRoundTrip:
|
||||
noisy.say "***", "checkFilterTrancoderOk failed",
|
||||
"\n roFilter=", dx[n].roFilter.pp(dx[n]),
|
||||
"\n dcdRoundTrip=", dcdRoundTrip.pp(dx[n])
|
||||
|
||||
true
|
||||
|
||||
@ -335,8 +333,7 @@ proc testDistributedAccess*(
|
||||
let
|
||||
dx = block:
|
||||
let rc = dbTriplet(w, rdbPath)
|
||||
if rc.isErr:
|
||||
return
|
||||
xCheckRc rc.error == 0
|
||||
rc.value
|
||||
(db1, db2, db3) = (dx[0], dx[1], dx[2])
|
||||
defer:
|
||||
@ -348,57 +345,34 @@ proc testDistributedAccess*(
|
||||
# Clause (9) from `aristo/README.md` example
|
||||
block:
|
||||
let rc = db1.stow(persistent=true)
|
||||
if rc.isErr:
|
||||
# noisy.say "*** testDistributedAccess (2) n=", n, dx.dump
|
||||
check rc.error == (0,0)
|
||||
return
|
||||
if db1.roFilter.isValid:
|
||||
check db1.roFilter == FilterRef(nil)
|
||||
return
|
||||
if db2.roFilter != db3.roFilter:
|
||||
check db2.roFilter == db3.roFilter
|
||||
return
|
||||
xCheckRc rc.error == (0,0)
|
||||
xCheck db1.roFilter == FilterRef(nil)
|
||||
xCheck db2.roFilter == db3.roFilter
|
||||
|
||||
block:
|
||||
let rc = db2.stow(persistent=false)
|
||||
if rc.isErr:
|
||||
xCheckRc rc.error == (0,0):
|
||||
noisy.say "*** testDistributedAccess (3)", "n=", n, "db2".dump db2
|
||||
check rc.error == (0,0)
|
||||
return
|
||||
if db1.roFilter.isValid:
|
||||
check db1.roFilter == FilterRef(nil)
|
||||
return
|
||||
if db2.roFilter == db3.roFilter:
|
||||
check db2.roFilter != db3.roFilter
|
||||
return
|
||||
xCheck db1.roFilter == FilterRef(nil)
|
||||
xCheck db2.roFilter != db3.roFilter
|
||||
|
||||
# Clause (11) from `aristo/README.md` example
|
||||
block:
|
||||
let rc = db2.ackqRwMode()
|
||||
if rc.isErr:
|
||||
check rc.error == 0
|
||||
return
|
||||
xCheckRc rc.error == 0
|
||||
block:
|
||||
let rc = db2.stow(persistent=true)
|
||||
if rc.isErr:
|
||||
check rc.error == (0,0)
|
||||
return
|
||||
if db2.roFilter.isValid:
|
||||
check db2.roFilter == FilterRef(nil)
|
||||
return
|
||||
xCheckRc rc.error == (0,0)
|
||||
xCheck db2.roFilter == FilterRef(nil)
|
||||
|
||||
# Check/verify backends
|
||||
block:
|
||||
let ok = dx.checkBeOk(noisy=noisy)
|
||||
if not ok:
|
||||
xCheck ok:
|
||||
noisy.say "*** testDistributedAccess (4)", "n=", n, "db3".dump db3
|
||||
check ok
|
||||
return
|
||||
block:
|
||||
let ok = dx.checkFilterTrancoderOk(noisy=noisy)
|
||||
if not ok:
|
||||
check ok
|
||||
return
|
||||
xCheck ok
|
||||
|
||||
# Capture filters from clause (11)
|
||||
c11Filter1 = db1.roFilter
|
||||
@ -414,8 +388,7 @@ proc testDistributedAccess*(
|
||||
let
|
||||
dy = block:
|
||||
let rc = dbTriplet(w, rdbPath)
|
||||
if rc.isErr:
|
||||
return
|
||||
xCheckRc rc.error == 0
|
||||
rc.value
|
||||
(db1, db2, db3) = (dy[0], dy[1], dy[2])
|
||||
defer:
|
||||
@ -424,59 +397,39 @@ proc testDistributedAccess*(
|
||||
# Build clause (12) from `aristo/README.md` example
|
||||
block:
|
||||
let rc = db2.ackqRwMode()
|
||||
if rc.isErr:
|
||||
check rc.error == 0
|
||||
return
|
||||
xCheckRc rc.error == 0
|
||||
block:
|
||||
let rc = db2.stow(persistent=true)
|
||||
if rc.isErr:
|
||||
check rc.error == (0,0)
|
||||
return
|
||||
if db2.roFilter.isValid:
|
||||
check db1.roFilter == FilterRef(nil)
|
||||
return
|
||||
if db1.roFilter != db3.roFilter:
|
||||
check db1.roFilter == db3.roFilter
|
||||
return
|
||||
xCheckRc rc.error == (0,0)
|
||||
xCheck db2.roFilter == FilterRef(nil)
|
||||
xCheck db1.roFilter == db3.roFilter
|
||||
|
||||
# Clause (13) from `aristo/README.md` example
|
||||
block:
|
||||
let rc = db1.stow(persistent=false)
|
||||
if rc.isErr:
|
||||
check rc.error == (0,0)
|
||||
return
|
||||
xCheckRc rc.error == (0,0)
|
||||
|
||||
# Clause (14) from `aristo/README.md` check
|
||||
block:
|
||||
let c11Fil1_eq_db1RoFilter = c11Filter1.isDbEq(db1.roFilter, db1, noisy)
|
||||
if not c11Fil1_eq_db1RoFilter:
|
||||
noisy.say "*** testDistributedAccess (7)", "n=", n,
|
||||
"\n c11Filter1=", c11Filter3.pp(db1),
|
||||
"db1".dump(db1)
|
||||
check c11Fil1_eq_db1RoFilter
|
||||
return
|
||||
let c11Fil1_eq_db1RoFilter = c11Filter1.isDbEq(db1.roFilter, db1, noisy)
|
||||
xCheck c11Fil1_eq_db1RoFilter:
|
||||
noisy.say "*** testDistributedAccess (7)", "n=", n,
|
||||
"\n c11Filter1=", c11Filter3.pp(db1),
|
||||
"db1".dump(db1)
|
||||
|
||||
# Clause (15) from `aristo/README.md` check
|
||||
block:
|
||||
let c11Fil3_eq_db3RoFilter = c11Filter3.isDbEq(db3.roFilter, db3, noisy)
|
||||
if not c11Fil3_eq_db3RoFilter:
|
||||
noisy.say "*** testDistributedAccess (8)", "n=", n,
|
||||
"\n c11Filter3=", c11Filter3.pp(db3),
|
||||
"db3".dump(db3)
|
||||
check c11Fil3_eq_db3RoFilter
|
||||
return
|
||||
let c11Fil3_eq_db3RoFilter = c11Filter3.isDbEq(db3.roFilter, db3, noisy)
|
||||
xCheck c11Fil3_eq_db3RoFilter:
|
||||
noisy.say "*** testDistributedAccess (8)", "n=", n,
|
||||
"\n c11Filter3=", c11Filter3.pp(db3),
|
||||
"db3".dump(db3)
|
||||
|
||||
# Check/verify backends
|
||||
block:
|
||||
let ok = dy.checkBeOk(noisy=noisy)
|
||||
if not ok:
|
||||
check ok
|
||||
return
|
||||
xCheck ok
|
||||
block:
|
||||
let ok = dy.checkFilterTrancoderOk(noisy=noisy)
|
||||
if not ok:
|
||||
check ok
|
||||
return
|
||||
xCheck ok
|
||||
|
||||
when false: # or true:
|
||||
noisy.say "*** testDistributedAccess (9)", "n=", n, dy.dump
|
||||
|
@ -14,7 +14,8 @@ import
|
||||
eth/common,
|
||||
rocksdb,
|
||||
../../nimbus/db/aristo/[
|
||||
aristo_constants, aristo_debug, aristo_desc, aristo_merge],
|
||||
aristo_constants, aristo_debug, aristo_desc,
|
||||
aristo_filter/filter_scheduler, aristo_merge],
|
||||
../../nimbus/db/kvstore_rocksdb,
|
||||
../../nimbus/sync/protocol/snap/snap_types,
|
||||
../test_sync_snap/test_types,
|
||||
@ -30,6 +31,10 @@ type
|
||||
proof*: seq[SnapProof]
|
||||
kvpLst*: seq[LeafTiePayload]
|
||||
|
||||
const
|
||||
QidSlotLyo* = [(4,0,10),(3,3,10),(3,4,10),(3,5,10)]
|
||||
QidSample* = (3 * QidSlotLyo.stats.minCovered) div 2
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
@ -187,6 +192,39 @@ proc mapRootVid*(
|
||||
leafTie: LeafTie(root: toVid, path: it.leafTie.path),
|
||||
payload: it.payload))
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public workflow helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
template xCheck*(expr: untyped): untyped =
|
||||
## Note: this check will invoke `expr` twice
|
||||
if not (expr):
|
||||
check expr
|
||||
return
|
||||
|
||||
template xCheck*(expr: untyped; ifFalse: untyped): untyped =
|
||||
## Note: this check will invoke `expr` twice
|
||||
if not (expr):
|
||||
ifFalse
|
||||
check expr
|
||||
return
|
||||
|
||||
template xCheckRc*(expr: untyped): untyped =
|
||||
if rc.isErr:
|
||||
xCheck(expr)
|
||||
|
||||
template xCheckRc*(expr: untyped; ifFalse: untyped): untyped =
|
||||
if rc.isErr:
|
||||
xCheck(expr, ifFalse)
|
||||
|
||||
template xCheckErr*(expr: untyped): untyped =
|
||||
if rc.isOk:
|
||||
xCheck(expr)
|
||||
|
||||
template xCheckErr*(expr: untyped; ifFalse: untyped): untyped =
|
||||
if rc.isOk:
|
||||
xCheck(expr, ifFalse)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public iterators
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -33,31 +33,10 @@ type
|
||||
|
||||
QTabRef = TableRef[QueueID,QValRef]
|
||||
|
||||
const
|
||||
QidSlotLyo = [(4,0,10),(3,3,10),(3,4,10),(3,5,10)]
|
||||
QidSlotLy1 = [(4,0,0),(3,3,0),(3,4,0),(3,5,0)]
|
||||
|
||||
QidSample* = (3 * QidSlotLyo.stats.minCovered) div 2
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
template xCheck(expr: untyped): untyped =
|
||||
## Note: this check will invoke `expr` twice
|
||||
if not (expr):
|
||||
check expr
|
||||
return
|
||||
|
||||
template xCheck(expr: untyped; ifFalse: untyped): untyped =
|
||||
## Note: this check will invoke `expr` twice
|
||||
if not (expr):
|
||||
ifFalse
|
||||
check expr
|
||||
return
|
||||
|
||||
# ---------------------
|
||||
|
||||
proc posixPrngRand(state: var uint32): byte =
|
||||
## POSIX.1-2001 example of a rand() implementation, see manual page rand(3).
|
||||
state = state * 1103515245 + 12345;
|
||||
@ -100,32 +79,39 @@ proc `+`(a: VertexID, b: int): VertexID =
|
||||
|
||||
# ---------------------
|
||||
|
||||
iterator walkFifo(qt: QTabRef;scd: QidSchedRef): (QueueID,QValRef) =
|
||||
## ...
|
||||
proc kvp(chn: int, qid: QueueID): (QueueID,QValRef) =
|
||||
let cid = QueueID((chn.uint64 shl 62) or qid.uint64)
|
||||
(cid, qt.getOrDefault(cid, QValRef(nil)))
|
||||
|
||||
if not scd.isNil:
|
||||
for i in 0 ..< scd.state.len:
|
||||
let (left, right) = scd.state[i]
|
||||
if left == 0:
|
||||
discard
|
||||
elif left <= right:
|
||||
for j in right.countDown left:
|
||||
yield kvp(i, j)
|
||||
else:
|
||||
for j in right.countDown QueueID(1):
|
||||
yield kvp(i, j)
|
||||
for j in scd.ctx.q[i].wrap.countDown left:
|
||||
yield kvp(i, j)
|
||||
|
||||
proc fifos(qt: QTabRef; scd: QidSchedRef): seq[seq[(QueueID,QValRef)]] =
|
||||
## ..
|
||||
var lastChn = -1
|
||||
for (qid,val) in qt.walkFifo scd:
|
||||
let chn = (qid.uint64 shr 62).int
|
||||
while lastChn < chn:
|
||||
lastChn.inc
|
||||
result.add newSeq[(QueueID,QValRef)](0)
|
||||
result[^1].add (qid,val)
|
||||
|
||||
func sortedPairs(qt: QTabRef): seq[(QueueID,QValRef)] =
|
||||
qt.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.QueueID).mapIt((it,qt[it]))
|
||||
|
||||
func fifos(qt: QTabRef; scd: QidSchedRef): seq[seq[(QueueID,QValRef)]] =
|
||||
proc kvp(chn: int, qid: QueueID): (QueueID,QValRef) =
|
||||
let
|
||||
cid = QueueID((chn.uint64 shl 62) or qid.uint64)
|
||||
val = qt.getOrDefault(cid, QValRef(nil))
|
||||
(cid, val)
|
||||
|
||||
for i in 0 ..< scd.state.len:
|
||||
let
|
||||
left = scd.state[i][0]
|
||||
right = scd.state[i][1]
|
||||
result.add newSeq[(QueueID,QValRef)](0)
|
||||
if left == 0:
|
||||
discard
|
||||
elif left <= right:
|
||||
for j in right.countDown left:
|
||||
result[i].add kvp(i, j)
|
||||
else:
|
||||
for j in right.countDown QueueID(1):
|
||||
result[i].add kvp(i, j)
|
||||
for j in scd.ctx.q[i].wrap.countDown left:
|
||||
result[i].add kvp(i, j)
|
||||
|
||||
func flatten(a: seq[seq[(QueueID,QValRef)]]): seq[(QueueID,QValRef)] =
|
||||
for w in a:
|
||||
result &= w
|
||||
@ -296,10 +282,8 @@ proc testVidRecycleLists*(noisy = true; seed = 42): bool =
|
||||
let
|
||||
db1 = newAristoDbRef BackendVoid
|
||||
rc = dbBlob.deblobify seq[VertexID]
|
||||
if rc.isErr:
|
||||
xCheck rc.error == AristoError(0)
|
||||
else:
|
||||
db1.top.vGen = rc.value
|
||||
xCheckRc rc.error == 0
|
||||
db1.top.vGen = rc.value
|
||||
|
||||
xCheck db.top.vGen == db1.top.vGen
|
||||
|
||||
@ -407,11 +391,15 @@ proc testQidScheduler*(
|
||||
|
||||
let fifoID = list.fifos(scd).flatten.mapIt(it[0])
|
||||
for j in 0 ..< list.len:
|
||||
# Check fifo order
|
||||
xCheck fifoID[j] == scd[j]:
|
||||
noisy.say "***", "n=", n, " exec=", w.exec.pp,
|
||||
" fifoID[", j, "]=", fifoID[j].pp,
|
||||
" scd[", j, "]=", scd[j].pp,
|
||||
"\n fifo=", list.pp scd
|
||||
# Check random access and reverse
|
||||
let qid = scd[j]
|
||||
xCheck j == scd[qid]
|
||||
|
||||
if debug:
|
||||
show(exec=w.exec)
|
||||
@ -459,7 +447,7 @@ proc testQidScheduler*(
|
||||
let validateOk = list.validate(scd, serial=n, relax=false)
|
||||
xCheck validateOk
|
||||
|
||||
if debug: # or true:
|
||||
if debug:
|
||||
show()
|
||||
|
||||
true
|
||||
|
@ -32,9 +32,13 @@ const
|
||||
MaxFilterBulk = 150_000
|
||||
## Policy settig for `pack()`
|
||||
|
||||
WalkStopRc =
|
||||
WalkStopErr =
|
||||
Result[LeafTie,(VertexID,AristoError)].err((VertexID(0),NearbyBeyondRange))
|
||||
|
||||
let
|
||||
TxQidLyo = QidSlotLyo.to(QidLayoutRef)
|
||||
## Cascaded filter slots layout for testing
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
@ -89,13 +93,12 @@ proc randomisedLeafs(
|
||||
let r = n + td.rand(result.len - n)
|
||||
result[n].swap result[r]
|
||||
|
||||
proc innerCleanUp(db: AristoDbRef) =
|
||||
proc innerCleanUp(db: AristoDbRef): bool {.discardable.} =
|
||||
## Defer action
|
||||
let rc = db.txTop()
|
||||
if rc.isOk:
|
||||
let rx = rc.value.collapse(commit=false)
|
||||
if rx.isErr:
|
||||
check rx.error == (0,0)
|
||||
let rx = db.txTop()
|
||||
if rx.isOk:
|
||||
let rc = rx.value.collapse(commit=false)
|
||||
xCheckRc rc.error == (0,0)
|
||||
db.finish(flush=true)
|
||||
|
||||
proc saveToBackend(
|
||||
@ -107,74 +110,52 @@ proc saveToBackend(
|
||||
): bool =
|
||||
var db = tx.to(AristoDbRef)
|
||||
|
||||
# Verify context: nesting level must be 1 (i.e. two transactions)
|
||||
# Verify context: nesting level must be 2 (i.e. two transactions)
|
||||
xCheck tx.level == 2
|
||||
|
||||
block:
|
||||
block:
|
||||
let level = tx.level
|
||||
if level != 2:
|
||||
check level == 2
|
||||
return
|
||||
block:
|
||||
let rc = db.checkCache(relax=true)
|
||||
if rc.isErr:
|
||||
check rc.error == (0,0)
|
||||
return
|
||||
let rc = db.checkCache(relax=true)
|
||||
xCheckRc rc.error == (0,0)
|
||||
|
||||
# Commit and hashify the current layer
|
||||
block:
|
||||
block:
|
||||
let rc = tx.commit()
|
||||
if rc.isErr:
|
||||
check rc.error == (0,0)
|
||||
return
|
||||
block:
|
||||
# Make sure MPT hashes are OK
|
||||
if db.top.dirty:
|
||||
check db.top.dirty == false
|
||||
return
|
||||
block:
|
||||
let rc = db.txTop()
|
||||
if rc.isErr:
|
||||
check rc.error == 0
|
||||
return
|
||||
tx = rc.value
|
||||
let level = tx.level
|
||||
if level != 1:
|
||||
check level == 1
|
||||
return
|
||||
block:
|
||||
let rc = db.checkBE(relax=true)
|
||||
if rc.isErr:
|
||||
check rc.error == (0,0)
|
||||
return
|
||||
let rc = tx.commit()
|
||||
xCheckRc rc.error == (0,0)
|
||||
|
||||
# Make sure MPT hashes are OK
|
||||
xCheck db.top.dirty == false
|
||||
|
||||
block:
|
||||
let rc = db.txTop()
|
||||
xCheckRc rc.error == 0
|
||||
tx = rc.value
|
||||
|
||||
# Verify context: nesting level must be 1 (i.e. one transaction)
|
||||
xCheck tx.level == 1
|
||||
|
||||
block:
|
||||
let rc = db.checkBE(relax=true)
|
||||
xCheckRc rc.error == (0,0)
|
||||
|
||||
# Commit and save to backend
|
||||
block:
|
||||
block:
|
||||
let rc = tx.commit()
|
||||
if rc.isErr:
|
||||
check rc.error == (0,0)
|
||||
return
|
||||
block:
|
||||
# Make sure MPT hashes are OK
|
||||
if db.top.dirty:
|
||||
check db.top.dirty == false
|
||||
return
|
||||
block:
|
||||
let rc = db.txTop()
|
||||
if rc.isOk:
|
||||
check rc.value.level < 0 # force error
|
||||
return
|
||||
block:
|
||||
let rc = db.stow(stageLimit=MaxFilterBulk, chunkedMpt=chunkedMpt)
|
||||
if rc.isErr:
|
||||
check rc.error == (0,0)
|
||||
return
|
||||
block:
|
||||
let rc = db.checkBE(relax=relax)
|
||||
if rc.isErr:
|
||||
check rc.error == (0,0)
|
||||
return
|
||||
let rc = tx.commit()
|
||||
xCheckRc rc.error == (0,0)
|
||||
|
||||
# Make sure MPT hashes are OK
|
||||
xCheck db.top.dirty == false
|
||||
|
||||
block:
|
||||
let rc = db.txTop()
|
||||
xCheckErr rc.value.level < 0 # force error
|
||||
|
||||
block:
|
||||
let rc = db.stow(stageLimit=MaxFilterBulk, chunkedMpt=chunkedMpt)
|
||||
xCheckRc rc.error == (0,0)
|
||||
|
||||
block:
|
||||
let rc = db.checkBE(relax=relax)
|
||||
xCheckRc rc.error == (0,0)
|
||||
|
||||
# Update layers to original level
|
||||
tx = db.txBegin().value.to(AristoDbRef).txBegin().value
|
||||
@ -190,59 +171,40 @@ proc saveToBackendWithOops(
|
||||
): bool =
|
||||
var db = tx.to(AristoDbRef)
|
||||
|
||||
# Verify context: nesting level must be 1 (i.e. two transactions)
|
||||
block:
|
||||
block:
|
||||
let level = tx.level
|
||||
if level != 2:
|
||||
check level == 2
|
||||
return
|
||||
# Verify context: nesting level must be 2 (i.e. two transactions)
|
||||
xCheck tx.level == 2
|
||||
|
||||
# Commit and hashify the current layer
|
||||
block:
|
||||
block:
|
||||
let rc = tx.commit()
|
||||
if rc.isErr:
|
||||
check rc.error == (0,0)
|
||||
return
|
||||
block:
|
||||
# Make sure MPT hashes are OK
|
||||
if db.top.dirty:
|
||||
check db.top.dirty == false
|
||||
return
|
||||
block:
|
||||
let rc = db.txTop()
|
||||
if rc.isErr:
|
||||
check rc.error == 0
|
||||
return
|
||||
tx = rc.value
|
||||
let level = tx.level
|
||||
if level != 1:
|
||||
check level == 1
|
||||
return
|
||||
let rc = tx.commit()
|
||||
xCheckRc rc.error == (0,0)
|
||||
|
||||
# Make sure MPT hashes are OK
|
||||
xCheck db.top.dirty == false
|
||||
|
||||
block:
|
||||
let rc = db.txTop()
|
||||
xCheckRc rc.error == 0
|
||||
tx = rc.value
|
||||
|
||||
# Verify context: nesting level must be 1 (i.e. one transaction)
|
||||
xCheck tx.level == 1
|
||||
|
||||
# Commit and save to backend
|
||||
block:
|
||||
block:
|
||||
let rc = tx.commit()
|
||||
if rc.isErr:
|
||||
check rc.error == (0,0)
|
||||
return
|
||||
block:
|
||||
# Make sure MPT hashes are OK
|
||||
if db.top.dirty:
|
||||
check db.top.dirty == false
|
||||
return
|
||||
block:
|
||||
let rc = db.txTop()
|
||||
if rc.isOk:
|
||||
check rc.value.level < 0
|
||||
return
|
||||
block:
|
||||
let rc = db.stow(stageLimit=MaxFilterBulk, chunkedMpt=chunkedMpt)
|
||||
if rc.isErr:
|
||||
check rc.error == (0,0)
|
||||
return
|
||||
let rc = tx.commit()
|
||||
xCheckRc rc.error == (0,0)
|
||||
|
||||
# Make sure MPT hashes are OK
|
||||
xCheck db.top.dirty == false
|
||||
|
||||
block:
|
||||
let rc = db.txTop()
|
||||
xCheckErr rc.value.level < 0 # force error
|
||||
|
||||
block:
|
||||
let rc = db.stow(stageLimit=MaxFilterBulk, chunkedMpt=chunkedMpt)
|
||||
xCheckRc rc.error == (0,0)
|
||||
|
||||
# Update layers to original level
|
||||
tx = db.txBegin().value.to(AristoDbRef).txBegin().value
|
||||
@ -264,10 +226,8 @@ proc fwdWalkVerify(
|
||||
last = LeafTie()
|
||||
n = 0
|
||||
for (key,_) in db.right low(LeafTie,root):
|
||||
if key notin leftOver:
|
||||
xCheck key in leftOver:
|
||||
noisy.say "*** fwdWalkVerify", " id=", n + (nLeafs + 1) * debugID
|
||||
check key in leftOver
|
||||
return
|
||||
leftOver.excl key
|
||||
last = key
|
||||
n.inc
|
||||
@ -278,13 +238,12 @@ proc fwdWalkVerify(
|
||||
elif last != high(LeafTie,root):
|
||||
last = last + 1
|
||||
let rc = last.right db
|
||||
if rc.isOk or rc.error[1] != NearbyBeyondRange:
|
||||
check rc == WalkStopRc
|
||||
return
|
||||
if rc.isOk:
|
||||
xCheck rc == WalkStopErr
|
||||
else:
|
||||
xCheck rc.error[1] == NearbyBeyondRange
|
||||
|
||||
if n != nLeafs:
|
||||
check n == nLeafs
|
||||
return
|
||||
xCheck n == nLeafs
|
||||
|
||||
true
|
||||
|
||||
@ -302,10 +261,8 @@ proc revWalkVerify(
|
||||
last = LeafTie()
|
||||
n = 0
|
||||
for (key,_) in db.left high(LeafTie,root):
|
||||
if key notin leftOver:
|
||||
xCheck key in leftOver:
|
||||
noisy.say "*** revWalkVerify", " id=", n + (nLeafs + 1) * debugID
|
||||
check key in leftOver
|
||||
return
|
||||
leftOver.excl key
|
||||
last = key
|
||||
n.inc
|
||||
@ -316,13 +273,12 @@ proc revWalkVerify(
|
||||
elif last != low(LeafTie,root):
|
||||
last = last - 1
|
||||
let rc = last.left db
|
||||
if rc.isOk or rc.error[1] != NearbyBeyondRange:
|
||||
check rc == WalkStopRc
|
||||
return
|
||||
if rc.isOk:
|
||||
xCheck rc == WalkStopErr
|
||||
else:
|
||||
xCheck rc.error[1] == NearbyBeyondRange
|
||||
|
||||
if n != nLeafs:
|
||||
check n == nLeafs
|
||||
return
|
||||
xCheck n == nLeafs
|
||||
|
||||
true
|
||||
|
||||
@ -345,17 +301,15 @@ proc testTxMergeAndDelete*(
|
||||
for n,w in list:
|
||||
# Start with brand new persistent database.
|
||||
db = block:
|
||||
let rc = newAristoDbRef(BackendRocksDB,rdbPath)
|
||||
if rc.isErr:
|
||||
check rc.error == 0
|
||||
return
|
||||
let rc = newAristoDbRef(BackendRocksDB, rdbPath, qidLayout=TxQidLyo)
|
||||
xCheckRc rc.error == 0
|
||||
rc.value
|
||||
|
||||
# Start transaction (double frame for testing)
|
||||
check db.txTop.isErr
|
||||
xCheck db.txTop.isErr
|
||||
var tx = db.txBegin().value.to(AristoDbRef).txBegin().value
|
||||
check tx.isTop()
|
||||
check tx.level == 2
|
||||
xCheck tx.isTop()
|
||||
xCheck tx.level == 2
|
||||
|
||||
# Reset database so that the next round has a clean setup
|
||||
defer: db.innerCleanUp
|
||||
@ -364,18 +318,14 @@ proc testTxMergeAndDelete*(
|
||||
let kvpLeafs = w.kvpLst.mapRootVid VertexID(1)
|
||||
for leaf in kvpLeafs:
|
||||
let rc = db.merge leaf
|
||||
if rc.isErr:
|
||||
check rc.error == 0
|
||||
return
|
||||
xCheckRc rc.error == 0
|
||||
|
||||
# List of all leaf entries that should be on the database
|
||||
var leafsLeft = kvpLeafs.mapIt(it.leafTie).toHashSet
|
||||
|
||||
# Provide a (reproducible) peudo-random copy of the leafs list
|
||||
let leafVidPairs = db.randomisedLeafs prng
|
||||
if leafVidPairs.len != leafsLeft.len:
|
||||
check leafVidPairs.len == leafsLeft.len
|
||||
return
|
||||
xCheck leafVidPairs.len == leafsLeft.len
|
||||
|
||||
# Trigger subsequent saving tasks in loop below
|
||||
let (saveMod, saveRest, relax) = block:
|
||||
@ -398,17 +348,13 @@ proc testTxMergeAndDelete*(
|
||||
|
||||
# Delete leaf
|
||||
let rc = db.delete leaf
|
||||
if rc.isErr:
|
||||
check rc.error == (0,0)
|
||||
return
|
||||
xCheckRc rc.error == (0,0)
|
||||
|
||||
# Update list of remaininf leafs
|
||||
leafsLeft.excl leaf
|
||||
|
||||
let deletedVtx = tx.db.getVtx lid
|
||||
if deletedVtx.isValid:
|
||||
check deletedVtx.isValid == false
|
||||
return
|
||||
xCheck deletedVtx.isValid == false
|
||||
|
||||
# Walking the database is too slow for large tables. So the hope is that
|
||||
# potential errors will not go away and rather pop up later, as well.
|
||||
@ -453,15 +399,14 @@ proc testTxMergeProofAndKvpList*(
|
||||
if resetDb or w.root != rootKey or w.proof.len == 0:
|
||||
db.innerCleanUp
|
||||
db = block:
|
||||
let rc = newAristoDbRef(BackendRocksDB,rdbPath)
|
||||
if rc.isErr:
|
||||
check rc.error == 0
|
||||
return
|
||||
# New DB with disabled filter slots management
|
||||
let rc = newAristoDbRef(BackendRocksDB,rdbPath,QidLayoutRef(nil))
|
||||
xCheckRc rc.error == 0
|
||||
rc.value
|
||||
|
||||
# Start transaction (double frame for testing)
|
||||
tx = db.txBegin().value.to(AristoDbRef).txBegin().value
|
||||
check tx.isTop()
|
||||
xCheck tx.isTop()
|
||||
|
||||
# Update root
|
||||
rootKey = w.root
|
||||
@ -480,28 +425,22 @@ proc testTxMergeProofAndKvpList*(
|
||||
proved: tuple[merged: int, dups: int, error: AristoError]
|
||||
if 0 < w.proof.len:
|
||||
let rc = db.merge(rootKey, VertexID(1))
|
||||
if rc.isErr:
|
||||
check rc.error == 0
|
||||
return
|
||||
xCheckRc rc.error == 0
|
||||
|
||||
proved = db.merge(w.proof, rc.value) # , noisy)
|
||||
|
||||
check proved.error in {AristoError(0),MergeHashKeyCachedAlready}
|
||||
check w.proof.len == proved.merged + proved.dups
|
||||
check db.top.lTab.len == lTabLen
|
||||
check db.top.sTab.len <= proved.merged + sTabLen
|
||||
check proved.merged < db.top.pAmk.len
|
||||
xCheck proved.error in {AristoError(0),MergeHashKeyCachedAlready}
|
||||
xCheck w.proof.len == proved.merged + proved.dups
|
||||
xCheck db.top.lTab.len == lTabLen
|
||||
xCheck db.top.sTab.len <= proved.merged + sTabLen
|
||||
xCheck proved.merged < db.top.pAmk.len
|
||||
|
||||
let
|
||||
merged = db.merge leafs
|
||||
|
||||
check db.top.lTab.len == lTabLen + merged.merged
|
||||
check merged.merged + merged.dups == leafs.len
|
||||
|
||||
block:
|
||||
if merged.error notin {AristoError(0), MergeLeafPathCachedAlready}:
|
||||
check merged.error in {AristoError(0), MergeLeafPathCachedAlready}
|
||||
return
|
||||
xCheck db.top.lTab.len == lTabLen + merged.merged
|
||||
xCheck merged.merged + merged.dups == leafs.len
|
||||
xCheck merged.error in {AristoError(0), MergeLeafPathCachedAlready}
|
||||
|
||||
block:
|
||||
let oops = oopsTab.getOrDefault(testId,(0,AristoError(0)))
|
||||
|
Loading…
x
Reference in New Issue
Block a user