diff --git a/nimbus/db/aristo/README.md b/nimbus/db/aristo/README.md index 6a1c6a6b7..7f92b57de 100644 --- a/nimbus/db/aristo/README.md +++ b/nimbus/db/aristo/README.md @@ -233,7 +233,7 @@ stored in the right byte of the serialised bitmap. ### 4.2 Extension record serialisation 0 +--+--+--+--+--+--+--+--+--+ - | | -- vertexID + | | -- vertex ID 8 +--+--+--+--+--+--+--+--+--+ | | ... -- path segment +--+ @@ -338,17 +338,19 @@ assumed, i.e. the list with the single vertex ID *1*. ### 4.8 Backend filter record serialisation - 0 +--+--+--+--+--+ .. --+--+ .. --+ + 0 +--+--+--+--+--+ .. --+ + | | -- filter ID + 8 +--+--+--+--+--+ .. --+--+ .. --+ | | -- 32 bytes filter source hash - 32 +--+--+--+--+--+ .. --+--+ .. --+ + 40 +--+--+--+--+--+ .. --+--+ .. --+ | | -- 32 bytes filter target hash - 64 +--+--+--+--+--+ .. --+--+ .. --+ + 72 +--+--+--+--+--+ .. --+--+ .. --+ | | -- number of unused vertex IDs - 68 +--+--+--+--+ + 76 +--+--+--+--+ | | -- number of structural triplets - 72 +--+--+--+--+--+ .. --+ - | | -- first unused vertex ID 80 +--+--+--+--+--+ .. --+ + | | -- first unused vertex ID + 88 +--+--+--+--+--+ .. --+ ... -- more unused vertex ID N1 +--+--+--+--+ || | -- flg(3) + vtxLen(29), 1st triplet diff --git a/nimbus/db/aristo/aristo_check.nim b/nimbus/db/aristo/aristo_check.nim index 185acdf85..653ecfa1f 100644 --- a/nimbus/db/aristo/aristo_check.nim +++ b/nimbus/db/aristo/aristo_check.nim @@ -76,15 +76,13 @@ proc checkBE*( ## Moreover, the union of both sets is equivalent to the set of positive ## `uint64` numbers. ## - if not db.backend.isNil: - let be = db.to(TypedBackendRef) - case be.kind: - of BackendMemory: - return MemBackendRef.checkBE(db, cache=cache, relax=relax) - of BackendRocksDB: - return RdbBackendRef.checkBE(db, cache=cache, relax=relax) - of BackendVoid: - return VoidBackendRef.checkBE(db, cache=cache, relax=relax) + case db.backend.kind: + of BackendMemory: + return MemBackendRef.checkBE(db, cache=cache, relax=relax) + of BackendRocksDB: + return RdbBackendRef.checkBE(db, cache=cache, relax=relax) + of BackendVoid: + return VoidBackendRef.checkBE(db, cache=cache, relax=relax) ok() # ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_debug.nim b/nimbus/db/aristo/aristo_debug.nim index 70aa43b9b..52a0f6305 100644 --- a/nimbus/db/aristo/aristo_debug.nim +++ b/nimbus/db/aristo/aristo_debug.nim @@ -16,14 +16,12 @@ import results, stew/byteutils, "."/[aristo_constants, aristo_desc, aristo_hike, aristo_init], + ./aristo_desc/desc_backend, ./aristo_init/[memory_db, rocks_db], ./aristo_filter/filter_desc -export - TypedBackendRef, aristo_init.to - # ------------------------------------------------------------------------------ -# Ptivate functions +# Private functions # ------------------------------------------------------------------------------ proc sortedKeys(lTab: Table[LeafTie,VertexID]): seq[LeafTie] = @@ -357,6 +355,7 @@ proc ppFilter(fl: FilterRef; db: AristoDbRef; indent: int): string = if fl.isNil: result &= " n/a" return + result &= pfx & "fid=" & fl.fid.ppFid result &= pfx & "trg(" & fl.trg.ppKey & ")" result &= pfx & "src(" & fl.src.ppKey & ")" result &= pfx & "vGen" & pfx1 & "[" & @@ -600,13 +599,6 @@ proc pp*(kMap: Table[VertexID,Hashlabel]; db: AristoDbRef; indent = 4): string = proc pp*(pAmk: Table[Hashlabel,VertexID]; db: AristoDbRef; indent = 4): string = db.ppXMap(db.top.kMap, pAmk, indent) -proc pp*( - be: MemBackendRef|RdbBackendRef; - db: AristoDbRef; - indent = 4; - ): string = - be.ppBe(db, indent) - # --------------------- proc pp*( @@ -668,12 +660,11 @@ proc pp*( filter.ppFilter(db, indent) proc pp*( - be: TypedBackendRef; + be: BackendRef; db: AristoDbRef; indent = 4; ): string = - ## May be called as `db.to(TypedBackendRef).pp(db)` - case (if be.isNil: BackendVoid else: be.kind) + case be.kind: of BackendMemory: be.MemBackendRef.ppBe(db, indent) diff --git a/nimbus/db/aristo/aristo_desc/desc_backend.nim b/nimbus/db/aristo/aristo_desc/desc_backend.nim index 5fd25df9f..34bae8473 100644 --- a/nimbus/db/aristo/aristo_desc/desc_backend.nim +++ b/nimbus/db/aristo/aristo_desc/desc_backend.nim @@ -102,6 +102,8 @@ type BackendRef* = ref object of RootRef ## Backend interface. + filters*: QidSchedRef ## Filter slot queue state + getVtxFn*: GetVtxFn ## Read vertex record getKeyFn*: GetKeyFn ## Read Merkle hash/key getFilFn*: GetFilFn ## Read back log filter diff --git a/nimbus/db/aristo/aristo_desc/desc_error.nim b/nimbus/db/aristo/aristo_desc/desc_error.nim index 871b829d9..a8008b33b 100644 --- a/nimbus/db/aristo/aristo_desc/desc_error.nim +++ b/nimbus/db/aristo/aristo_desc/desc_error.nim @@ -24,6 +24,8 @@ type RlpOtherException # Data record transcoders, `deblobify()` and `blobify()` + BlobifyNilFilter + BlobifyNilVertex BlobifyBranchMissingRefs BlobifyExtMissingRefs BlobifyExtPathOverflow @@ -181,6 +183,7 @@ type FilPrettyPointlessLayer FilDudeFilterUpdateError FilNotReadOnlyDude + FilQuSchedDisabled # Get functions form `aristo_get.nim` GetLeafNotFound diff --git a/nimbus/db/aristo/aristo_desc/desc_identifiers.nim b/nimbus/db/aristo/aristo_desc/desc_identifiers.nim index 9cf3d32b0..43ef2deee 100644 --- a/nimbus/db/aristo/aristo_desc/desc_identifiers.nim +++ b/nimbus/db/aristo/aristo_desc/desc_identifiers.nim @@ -27,8 +27,8 @@ type ## Identifier used to tag filter logs stored on the backend. FilterID* = distinct uint64 - ## Identifier used to identify a particular filter. It is generatied with the - ## filter. + ## Identifier used to identify a particular filter. It is generatied with + ## the filter when stored to database. VertexID* = distinct uint64 ## Unique identifier for a vertex of the `Aristo Trie`. The vertex is the diff --git a/nimbus/db/aristo/aristo_desc/desc_structural.nim b/nimbus/db/aristo/aristo_desc/desc_structural.nim index 49b138d0e..710c9b860 100644 --- a/nimbus/db/aristo/aristo_desc/desc_structural.nim +++ b/nimbus/db/aristo/aristo_desc/desc_structural.nim @@ -82,6 +82,7 @@ type FilterRef* = ref object ## Delta layer with expanded sequences for quick access + fid*: FilterID ## Filter identifier src*: HashKey ## Applicable to this state root trg*: HashKey ## Resulting state root (i.e. `kMap[1]`) sTab*: Table[VertexID,VertexRef] ## Filter structural vertex table @@ -100,6 +101,40 @@ type txUid*: uint ## Transaction identifier if positive dirty*: bool ## Needs to be hashified if `true` + # ---------------------- + + QidLayoutRef* = ref object + ## Layout of cascaded list of filter ID slot queues where a slot queue + ## with index `N+1` serves as an overflow queue of slot queue `N`. + q*: array[4,QidSpec] + + QidSpec* = tuple + ## Layout of a filter ID slot queue + size: uint ## Capacity of queue, length within `1..wrap` + width: uint ## Instance gaps (relative to prev. item) + wrap: QueueID ## Range `1..wrap` for round-robin queue + + QidSchedRef* = ref object of RootRef + ## Current state of the filter queues + ctx*: QidLayoutRef ## Organisation of the FIFO + state*: seq[(QueueID,QueueID)] ## Current fill state + +const + DefaultQidWrap = QueueID(0x3fff_ffff_ffff_ffffu64) + + QidSpecSizeMax* = high(uint32).uint + ## Maximum value allowed for a `size` value of a `QidSpec` object + + QidSpecWidthMax* = high(uint32).uint + ## Maximum value allowed for a `width` value of a `QidSpec` object + +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +func max(a, b, c: int): int = + max(max(a,b),c) + # ------------------------------------------------------------------------------ # Public helpers: `NodeRef` and `PayloadRef` # ------------------------------------------------------------------------------ @@ -242,20 +277,55 @@ proc dup*(layer: LayerRef): LayerRef = for (k,v) in layer.sTab.pairs: result.sTab[k] = v.dup -proc dup*(filter: FilterRef): FilterRef = - ## Duplicate layer. - result = FilterRef( - src: filter.src, - kMap: filter.kMap, - vGen: filter.vGen, - trg: filter.trg) - for (k,v) in filter.sTab.pairs: - result.sTab[k] = v.dup +# --------------- -proc to*(node: NodeRef; T: type VertexRef): T = +func to*(node: NodeRef; T: type VertexRef): T = ## Extract a copy of the `VertexRef` part from a `NodeRef`. node.VertexRef.dup +func to*(a: array[4,tuple[size, width: int]]; T: type QidLayoutRef): T = + ## Convert a size-width array to a `QidLayoutRef` layout. Over large + ## array field values are adjusted to its maximal size. + var q: array[4,QidSpec] + for n in 0..3: + q[n] = (min(a[n].size.uint, QidSpecSizeMax), + min(a[n].width.uint, QidSpecWidthMax), + DefaultQidWrap) + q[0].width = 0 + T(q: q) + +func to*(a: array[4,tuple[size, width, wrap: int]]; T: type QidLayoutRef): T = + ## Convert a size-width-wrap array to a `QidLayoutRef` layout. Over large + ## array field values are adjusted to its maximal size. Too small `wrap` + ## values are adjusted to its minimal size. + var q: array[4,QidSpec] + for n in 0..2: + q[n] = (min(a[n].size.uint, QidSpecSizeMax), + min(a[n].width.uint, QidSpecWidthMax), + QueueID(max(a[n].size + a[n+1].width, a[n].width+1, a[n].wrap))) + q[0].width = 0 + q[3] = (min(a[3].size.uint, QidSpecSizeMax), + min(a[3].width.uint, QidSpecWidthMax), + QueueID(max(a[3].size, a[3].width, a[3].wrap))) + T(q: q) + +# ------------------------------------------------------------------------------ +# Public constructors for filter slot scheduler state +# ------------------------------------------------------------------------------ + +func init*(T: type QidSchedRef; a: array[4,(int,int)]): T = + ## Constructor, see comments at the coverter function `to()` for adjustments + ## of the layout argument `a`. + T(ctx: a.to(QidLayoutRef)) + +func init*(T: type QidSchedRef; a: array[4,(int,int,int)]): T = + ## Constructor, see comments at the coverter function `to()` for adjustments + ## of the layout argument `a`. + T(ctx: a.to(QidLayoutRef)) + +func init*(T: type QidSchedRef; ctx: QidLayoutRef): T = + T(ctx: ctx) + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_filter/filter_desc.nim b/nimbus/db/aristo/aristo_filter/filter_desc.nim index 4857e0aaf..a7e2ab1ae 100644 --- a/nimbus/db/aristo/aristo_filter/filter_desc.nim +++ b/nimbus/db/aristo/aristo_filter/filter_desc.nim @@ -34,85 +34,6 @@ type DequQid ## Store merged local queue items DelQid ## Delete entry from last overflow queue - QidLayoutRef* = ref object - ## Layout of cascaded list of filter ID slot queues where a slot queue - ## with index `N+1` serves as an overflow queue of slot queue `N`. - q*: array[4,QidSpec] - - QidSpec* = tuple - ## Layout of a filter ID slot queue - size: uint ## Capacity of queue, length within `1..wrap` - width: uint ## Instance gaps (relative to prev. item) - wrap: QueueID ## Range `1..wrap` for round-robin queue - - QidSchedRef* = ref object of RootRef - ## Current state of the filter queues - ctx*: QidLayoutRef ## Organisation of the FIFO - state*: seq[(QueueID,QueueID)] ## Current fill state - -const - DefaultQidWrap = QueueID(0x3fff_ffff_ffff_ffffu64) - - QidSpecSizeMax* = high(uint32).uint - ## Maximum value allowed for a `size` value of a `QidSpec` object - - QidSpecWidthMax* = high(uint32).uint - ## Maximum value allowed for a `width` value of a `QidSpec` object - -# ------------------------------------------------------------------------------ -# Private helpers -# ------------------------------------------------------------------------------ - -func max(a, b, c: int): int = - max(max(a,b),c) - -# ------------------------------------------------------------------------------ -# Public helpers -# ------------------------------------------------------------------------------ - -func to*(a: array[4,tuple[size, width: int]]; T: type QidLayoutRef): T = - ## Convert a size-width array to a `QidLayoutRef` layout. Over large - ## array field values are adjusted to its maximal size. - var q: array[4,QidSpec] - for n in 0..3: - q[n] = (min(a[n].size.uint, QidSpecSizeMax), - min(a[n].width.uint, QidSpecWidthMax), - DefaultQidWrap) - q[0].width = 0 - T(q: q) - -func to*(a: array[4,tuple[size, width, wrap: int]]; T: type QidLayoutRef): T = - ## Convert a size-width-wrap array to a `QidLayoutRef` layout. Over large - ## array field values are adjusted to its maximal size. Too small `wrap` - ## values are adjusted to its minimal size. - var q: array[4,QidSpec] - for n in 0..2: - q[n] = (min(a[n].size.uint, QidSpecSizeMax), - min(a[n].width.uint, QidSpecWidthMax), - QueueID(max(a[n].size + a[n+1].width, a[n].width+1, a[n].wrap))) - q[0].width = 0 - q[3] = (min(a[3].size.uint, QidSpecSizeMax), - min(a[3].width.uint, QidSpecWidthMax), - QueueID(max(a[3].size, a[3].width, a[3].wrap))) - T(q: q) - -# ------------------------------------------------------------------------------ -# Public constructors -# ------------------------------------------------------------------------------ - -func init*(T: type QidSchedRef; a: array[4,(int,int)]): T = - ## Constructor, see comments at the coverter function `to()` for adjustments - ## of the layout argument `a`. - T(ctx: a.to(QidLayoutRef)) - -func init*(T: type QidSchedRef; a: array[4,(int,int,int)]): T = - ## Constructor, see comments at the coverter function `to()` for adjustments - ## of the layout argument `a`. - T(ctx: a.to(QidLayoutRef)) - -func init*(T: type QidSchedRef; ctx: QidLayoutRef): T = - T(ctx: ctx) - # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_filter/filter_scheduler.nim b/nimbus/db/aristo/aristo_filter/filter_scheduler.nim index 301157811..8dc2853c1 100644 --- a/nimbus/db/aristo/aristo_filter/filter_scheduler.nim +++ b/nimbus/db/aristo/aristo_filter/filter_scheduler.nim @@ -26,6 +26,8 @@ type ## This *decreasing* requirement can be seen as a generalisation of a ## block chain scenario with `i`, `j` backward steps into the past and ## the `FilterID` as the block number. + ## + ## In order to flag an error, `FilterID(0)` must be returned. const ZeroQidPair = (QueueID(0),QueueID(0)) @@ -34,9 +36,6 @@ const # Private helpers # ------------------------------------------------------------------------------ -func `+`*(a: QueueID; b: uint): QueueID = (a.uint64+b.uint64).QueueID -func `-`*(a: QueueID; b: uint): QueueID = (a.uint64-b.uint64).QueueID - func `<`(a: static[uint]; b: QueueID): bool = QueueID(a) < b func globalQid(queue: int, qid: QueueID): QueueID = @@ -229,7 +228,7 @@ func fifoDel( # Public functions # ------------------------------------------------------------------------------ -proc stats*( +func stats*( ctx: openArray[tuple[size, width: int]]; # Schedule layout ): tuple[maxQueue: int, minCovered: int, maxCovered: int] = ## Number of maximally stored and covered queued entries for the argument @@ -245,20 +244,20 @@ proc stats*( result.minCovered += (ctx[n].size * step).int result.maxCovered += (size * step).int -proc stats*( +func stats*( ctx: openArray[tuple[size, width, wrap: int]]; # Schedule layout ): tuple[maxQueue: int, minCovered: int, maxCovered: int] = ## Variant of `stats()` ctx.toSeq.mapIt((it[0],it[1])).stats -proc stats*( +func stats*( ctx: QidLayoutRef; # Cascaded fifos descriptor ): tuple[maxQueue: int, minCovered: int, maxCovered: int] = ## Variant of `stats()` ctx.q.toSeq.mapIt((it[0].int,it[1].int)).stats -proc addItem*( +func addItem*( fifo: QidSchedRef; # Cascaded fifos descriptor ): tuple[exec: seq[QidAction], fifo: QidSchedRef] = ## Get the instructions for adding a new slot to the cascades queues. The @@ -352,7 +351,7 @@ proc addItem*( (revActions.reversed, QidSchedRef(ctx: fifo.ctx, state: state)) -proc fetchItems*( +func fetchItems*( fifo: QidSchedRef; # Cascaded fifos descriptor size: int; # Leading items to merge ): tuple[exec: seq[QidAction], fifo: QidSchedRef] = @@ -472,21 +471,21 @@ proc fetchItems*( (actions, QidSchedRef(ctx: fifo.ctx, state: state)) -proc lengths*( +func lengths*( fifo: QidSchedRef; # Cascaded fifos descriptor ): seq[int] = ## Return the list of lengths for all cascaded sub-fifos. for n in 0 ..< fifo.state.len: result.add fifo.state[n].fifoLen(fifo.ctx.q[n].wrap).int -proc len*( +func len*( fifo: QidSchedRef; # Cascaded fifos descriptor ): int = ## Size of the fifo fifo.lengths.foldl(a + b, 0) -proc `[]`*( +func `[]`*( fifo: QidSchedRef; # Cascaded fifos descriptor inx: int; # Index into latest items ): QueueID = @@ -536,6 +535,54 @@ proc `[]`*( return n.globalQid(wrap - inx) inx -= qInxMax0 + 1 # Otherwise continue + +func `[]`*( + fifo: QidSchedRef; # Cascaded fifos descriptor + qid: QueueID; # Index into latest items + ): int = + ## .. + if QueueID(0) < qid: + let + chn = (qid.uint64 shr 62).int + qid = (qid.uint64 and 0x3fff_ffff_ffff_ffffu64).QueueID + + if chn < fifo.state.len: + var offs = 0 + for n in 0 ..< chn: + offs += fifo.state[n].fifoLen(fifo.ctx.q[n].wrap).int + + let q = fifo.state[chn] + if q[0] <= q[1]: + # Single file + # :: + # | : + # | q[0]--> 3 + # | 4 + # | 5 <--q[1] + # | : + # + if q[0] <= qid and qid <= q[1]: + return offs + (q[1] - qid).int + else: + # Wrap aound, double files + # :: + # | : + # | 3 <--q[1] + # | 4 + # | q[0]--> 5 + # | : + # | wrap + # + if QueueID(1) <= qid and qid <= q[1]: + return offs + (q[1] - qid).int + + if q[0] <= qid: + let wrap = fifo.ctx.q[chn].wrap + if qid <= wrap: + return offs + (q[1] - QueueID(0)).int + (wrap - qid).int + -1 + + proc le*( fifo: QidSchedRef; # Cascaded fifos descriptor fid: FilterID; # Upper bound @@ -545,19 +592,27 @@ proc le*( ## * `fn(qid) <= fid` ## * for all `qid1` with `fn(qid1) <= fid` one has `fn(qid1) <= fn(qid)` ## + ## If `fn()` returns `FilterID(0)`, then this function returns `QueueID(0)` + ## ## The argument type `QuFilMap` of map `fn()` has been commented on earlier. ## var left = 0 right = fifo.len - 1 + template getFid(qid: QueueID): FilterID = + let fid = fn(qid) + if not fid.isValid: + return QueueID(0) + fid + if 0 <= right: let maxQid = fifo[left] - if maxQid.fn <= fid: + if maxQid.getFid <= fid: return maxQid # Bisection - if fifo[right].fn <= fid: + if fifo[right].getFid <= fid: while 1 < right - left: let half = (left + right) div 2 # @@ -567,9 +622,9 @@ proc le*( # # with `fifo[left].fn > fid >= fifo[right].fn` # - if fid >= fifo[half].fn: + if fid >= fifo[half].getFid: right = half - else: # fifo[half].fn > fid + else: # fifo[half].getFid > fid left = half # Now: `fifo[right].fn <= fid < fifo[left].fn` (and `right == left+1`) diff --git a/nimbus/db/aristo/aristo_init/init_common.nim b/nimbus/db/aristo/aristo_init/init_common.nim index 7d35b781b..04c53d660 100644 --- a/nimbus/db/aristo/aristo_init/init_common.nim +++ b/nimbus/db/aristo/aristo_init/init_common.nim @@ -36,7 +36,7 @@ type ## over the tables), this data type is to be used. TypedBackendRef* = ref object of BackendRef - kind*: BackendType ## Backend type identifier + beKind*: BackendType ## Backend type identifier when verifyIxId: txGen: uint ## Transaction ID generator (for debugging) txId: uint ## Active transaction ID (for debugging) diff --git a/nimbus/db/aristo/aristo_init/memory_db.nim b/nimbus/db/aristo/aristo_init/memory_db.nim index 17726ba81..6610d3640 100644 --- a/nimbus/db/aristo/aristo_init/memory_db.nim +++ b/nimbus/db/aristo/aristo_init/memory_db.nim @@ -42,9 +42,10 @@ type ## Inheriting table so access can be extended for debugging purposes sTab: Table[VertexID,Blob] ## Structural vertex table making up a trie kMap: Table[VertexID,HashKey] ## Merkle hash key mapping - rFil: Table[QueueID,Blob] ## Backend filters + rFil: Table[QueueID,Blob] ## Backend filters vGen: Option[seq[VertexID]] vFqs: Option[seq[(QueueID,QueueID)]] + noFq: bool ## No filter queues available MemPutHdlRef = ref object of TypedPutHdlRef sTab: Table[VertexID,Blob] @@ -98,12 +99,17 @@ proc getKeyFn(db: MemBackendRef): GetKeyFn = err(GetKeyNotFound) proc getFilFn(db: MemBackendRef): GetFilFn = - result = - proc(qid: QueueID): Result[FilterRef,AristoError] = - let data = db.rFil.getOrDefault(qid, EmptyBlob) - if 0 < data.len: - return data.deblobify FilterRef - err(GetFilNotFound) + if db.noFq: + result = + proc(qid: QueueID): Result[FilterRef,AristoError] = + err(FilQuSchedDisabled) + else: + result = + proc(qid: QueueID): Result[FilterRef,AristoError] = + let data = db.rFil.getOrDefault(qid, EmptyBlob) + if 0 < data.len: + return data.deblobify FilterRef + err(GetFilNotFound) proc getIdgFn(db: MemBackendRef): GetIdgFn = result = @@ -113,11 +119,16 @@ proc getIdgFn(db: MemBackendRef): GetIdgFn = err(GetIdgNotFound) proc getFqsFn(db: MemBackendRef): GetFqsFn = - result = - proc(): Result[seq[(QueueID,QueueID)],AristoError]= - if db.vFqs.isSome: - return ok db.vFqs.unsafeGet - err(GetFqsNotFound) + if db.noFq: + result = + proc(): Result[seq[(QueueID,QueueID)],AristoError] = + err(FilQuSchedDisabled) + else: + result = + proc(): Result[seq[(QueueID,QueueID)],AristoError] = + if db.vFqs.isSome: + return ok db.vFqs.unsafeGet + err(GetFqsNotFound) # ------------- @@ -154,19 +165,32 @@ proc putKeyFn(db: MemBackendRef): PutKeyFn = hdl.kMap[vid] = key proc putFilFn(db: MemBackendRef): PutFilFn = - result = - proc(hdl: PutHdlRef; vf: openArray[(QueueID,FilterRef)]) = - let hdl = hdl.getSession db - if hdl.error.isNil: - for (qid,filter) in vf: - let rc = filter.blobify() - if rc.isErr: - hdl.error = TypedPutHdlErrRef( - pfx: FilPfx, - qid: qid, - code: rc.error) - return - hdl.rFil[qid] = rc.value + if db.noFq: + result = + proc(hdl: PutHdlRef; vf: openArray[(QueueID,FilterRef)]) = + let hdl = hdl.getSession db + if hdl.error.isNil: + hdl.error = TypedPutHdlErrRef( + pfx: FilPfx, + qid: (if 0 < vf.len: vf[0][0] else: QueueID(0)), + code: FilQuSchedDisabled) + else: + result = + proc(hdl: PutHdlRef; vf: openArray[(QueueID,FilterRef)]) = + let hdl = hdl.getSession db + if hdl.error.isNil: + for (qid,filter) in vf: + if filter.isValid: + let rc = filter.blobify() + if rc.isErr: + hdl.error = TypedPutHdlErrRef( + pfx: FilPfx, + qid: qid, + code: rc.error) + return + hdl.rFil[qid] = rc.value + else: + hdl.rFil[qid] = EmptyBlob proc putIdgFn(db: MemBackendRef): PutIdgFn = result = @@ -176,11 +200,20 @@ proc putIdgFn(db: MemBackendRef): PutIdgFn = hdl.vGen = some(vs.toSeq) proc putFqsFn(db: MemBackendRef): PutFqsFn = - result = - proc(hdl: PutHdlRef; fs: openArray[(QueueID,QueueID)]) = - let hdl = hdl.getSession db - if hdl.error.isNil: - hdl.vFqs = some(fs.toSeq) + if db.noFq: + result = + proc(hdl: PutHdlRef; fs: openArray[(QueueID,QueueID)]) = + let hdl = hdl.getSession db + if hdl.error.isNil: + hdl.error = TypedPutHdlErrRef( + pfx: AdmPfx, + code: FilQuSchedDisabled) + else: + result = + proc(hdl: PutHdlRef; fs: openArray[(QueueID,QueueID)]) = + let hdl = hdl.getSession db + if hdl.error.isNil: + hdl.vFqs = some(fs.toSeq) proc putEndFn(db: MemBackendRef): PutEndFn = @@ -213,7 +246,7 @@ proc putEndFn(db: MemBackendRef): PutEndFn = db.kMap.del vid for (qid,data) in hdl.rFil.pairs: - if qid.isValid: + if 0 < data.len: db.rFil[qid] = data else: db.rFil.del qid @@ -245,8 +278,10 @@ proc closeFn(db: MemBackendRef): CloseFn = # Public functions # ------------------------------------------------------------------------------ -proc memoryBackend*(): BackendRef = - let db = MemBackendRef(kind: BackendMemory) +proc memoryBackend*(qidLayout: QidLayoutRef): BackendRef = + let db = MemBackendRef( + beKind: BackendMemory, + noFq: qidLayout.isNil) db.getVtxFn = getVtxFn db db.getKeyFn = getKeyFn db @@ -264,6 +299,10 @@ proc memoryBackend*(): BackendRef = db.closeFn = closeFn db + # Set up filter management table + if not db.noFq: + db.filters = QidSchedRef(ctx: qidLayout) + db # ------------------------------------------------------------------------------ @@ -296,14 +335,15 @@ iterator walkFil*( be: MemBackendRef; ): tuple[n: int, qid: QueueID, filter: FilterRef] = ## Iteration over the vertex sub-table. - for n,qid in be.rFil.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.QueueID): - let data = be.rFil.getOrDefault(qid, EmptyBlob) - if 0 < data.len: - let rc = data.deblobify FilterRef - if rc.isErr: - debug logTxt "walkFilFn() skip", n,qid, error=rc.error - else: - yield (n, qid, rc.value) + if not be.noFq: + for n,qid in be.rFil.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.QueueID): + let data = be.rFil.getOrDefault(qid, EmptyBlob) + if 0 < data.len: + let rc = data.deblobify FilterRef + if rc.isErr: + debug logTxt "walkFilFn() skip", n,qid, error=rc.error + else: + yield (n, qid, rc.value) iterator walk*( @@ -319,9 +359,10 @@ iterator walk*( yield(0, AdmPfx, AdmTabIdIdg.uint64, be.vGen.unsafeGet.blobify) n.inc - if be.vFqs.isSome: - yield(0, AdmPfx, AdmTabIdFqs.uint64, be.vFqs.unsafeGet.blobify) - n.inc + if not be.noFq: + if be.vFqs.isSome: + yield(0, AdmPfx, AdmTabIdFqs.uint64, be.vFqs.unsafeGet.blobify) + n.inc for vid in be.sTab.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.VertexID): let data = be.sTab.getOrDefault(vid, EmptyBlob) @@ -333,11 +374,12 @@ iterator walk*( yield (n, KeyPfx, vid.uint64, key.to(Blob)) n.inc - for lid in be.rFil.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.QueueID): - let data = be.rFil.getOrDefault(lid, EmptyBlob) - if 0 < data.len: - yield (n, FilPfx, lid.uint64, data) - n.inc + if not be.noFq: + for lid in be.rFil.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.QueueID): + let data = be.rFil.getOrDefault(lid, EmptyBlob) + if 0 < data.len: + yield (n, FilPfx, lid.uint64, data) + n.inc # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/aristo/aristo_init/memory_only.nim b/nimbus/db/aristo/aristo_init/memory_only.nim index a63995283..af9cc9455 100644 --- a/nimbus/db/aristo/aristo_init/memory_only.nim +++ b/nimbus/db/aristo/aristo_init/memory_only.nim @@ -26,9 +26,10 @@ type export BackendType, - VoidBackendRef, - MemBackendRef, - TypedBackendRef + MemBackendRef + +let + DefaultQidLayoutRef* = DEFAULT_QID_QUEUES.to(QidLayoutRef) # ------------------------------------------------------------------------------ # Public database constuctors, destructor @@ -36,13 +37,20 @@ export proc newAristoDbRef*( backend: static[BackendType]; + qidLayout = DefaultQidLayoutRef; ): AristoDbRef = ## Simplified prototype for `BackendNone` and `BackendMemory` type backend. + ## + ## If the `qidLayout` argument is set `QidLayoutRef(nil)`, the a backend + ## database will not provide filter history management. Providing a different + ## scheduler layout shoud be used with care as table access with different + ## layouts might render the filter history data unmanageable. + ## when backend == BackendVoid: AristoDbRef(top: LayerRef()) elif backend == BackendMemory: - AristoDbRef(top: LayerRef(), backend: memoryBackend()) + AristoDbRef(top: LayerRef(), backend: memoryBackend(qidLayout)) elif backend == BackendRocksDB: {.error: "Aristo DB backend \"BackendRocksDB\" needs basePath argument".} @@ -84,6 +92,16 @@ proc to*[W: TypedBackendRef|MemBackendRef|VoidBackendRef]( ## Handy helper for lew-level access to some backend functionality db.backend.T +proc kind*( + be: BackendRef; + ): BackendType = + ## Retrieves the backend type symbol for a `TypedBackendRef` argument where + ## `BackendVoid` is returned for the`nil` backend. + if be.isNil: + BackendVoid + else: + be.TypedBackendRef.beKind + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_init/persistent.nim b/nimbus/db/aristo/aristo_init/persistent.nim index 7861cb484..5fcb11acd 100644 --- a/nimbus/db/aristo/aristo_init/persistent.nim +++ b/nimbus/db/aristo/aristo_init/persistent.nim @@ -33,13 +33,20 @@ export proc newAristoDbRef*( backend: static[BackendType]; basePath: string; + qidLayout = DefaultQidLayoutRef; ): Result[AristoDbRef, AristoError] = ## Generic constructor, `basePath` argument is ignored for `BackendNone` and ## `BackendMemory` type backend database. Also, both of these backends ## aways succeed initialising. + ## + ## If the `qidLayout` argument is set `QidLayoutRef(nil)`, the a backend + ## database will not provide filter history management. Providing a different + ## scheduler layout shoud be used with care as table access with different + ## layouts might render the filter history data unmanageable. + ## when backend == BackendRocksDB: let be = block: - let rc = rocksDbBackend basePath + let rc = rocksDbBackend(basePath, qidLayout) if rc.isErr: return err(rc.error) rc.value diff --git a/nimbus/db/aristo/aristo_init/rocks_db.nim b/nimbus/db/aristo/aristo_init/rocks_db.nim index 36c8cbf71..87e066c23 100644 --- a/nimbus/db/aristo/aristo_init/rocks_db.nim +++ b/nimbus/db/aristo/aristo_init/rocks_db.nim @@ -44,6 +44,7 @@ logScope: type RdbBackendRef* = ref object of TypedBackendRef rdb: RdbInst ## Allows low level access to database + noFq: bool ## No filter queues available RdbPutHdlRef = ref object of TypedPutHdlRef cache: RdbTabs ## Transaction cache @@ -130,21 +131,26 @@ proc getKeyFn(db: RdbBackendRef): GetKeyFn = err(GetKeyNotFound) proc getFilFn(db: RdbBackendRef): GetFilFn = - result = - proc(qid: QueueID): Result[FilterRef,AristoError] = + if db.noFq: + result = + proc(qid: QueueID): Result[FilterRef,AristoError] = + err(FilQuSchedDisabled) + else: + result = + proc(qid: QueueID): Result[FilterRef,AristoError] = - # Fetch serialised data record - let rc = db.rdb.get qid.toOpenArray() - if rc.isErr: - debug logTxt "getFilFn: failed", qid, - error=rc.error[0], info=rc.error[1] - return err(rc.error[0]) + # Fetch serialised data record + let rc = db.rdb.get qid.toOpenArray() + if rc.isErr: + debug logTxt "getFilFn: failed", qid, + error=rc.error[0], info=rc.error[1] + return err(rc.error[0]) - # Decode data record - if 0 < rc.value.len: - return rc.value.deblobify FilterRef + # Decode data record + if 0 < rc.value.len: + return rc.value.deblobify FilterRef - err(GetFilNotFound) + err(GetFilNotFound) proc getIdgFn(db: RdbBackendRef): GetIdgFn = result = @@ -164,21 +170,26 @@ proc getIdgFn(db: RdbBackendRef): GetIdgFn = rc.value.deblobify seq[VertexID] proc getFqsFn(db: RdbBackendRef): GetFqsFn = - result = - proc(): Result[seq[(QueueID,QueueID)],AristoError]= + if db.noFq: + result = + proc(): Result[seq[(QueueID,QueueID)],AristoError] = + err(FilQuSchedDisabled) + else: + result = + proc(): Result[seq[(QueueID,QueueID)],AristoError]= - # Fetch serialised data record - let rc = db.rdb.get AdmTabIdFqs.toOpenArray() - if rc.isErr: - debug logTxt "getFosFn: failed", error=rc.error[1] - return err(rc.error[0]) + # Fetch serialised data record + let rc = db.rdb.get AdmTabIdFqs.toOpenArray() + if rc.isErr: + debug logTxt "getFqsFn: failed", error=rc.error[1] + return err(rc.error[0]) - if rc.value.len == 0: - let w = EmptyQidPairSeq - return ok w + if rc.value.len == 0: + let w = EmptyQidPairSeq + return ok w - # Decode data record - rc.value.deblobify seq[(QueueID,QueueID)] + # Decode data record + rc.value.deblobify seq[(QueueID,QueueID)] # ------------- @@ -218,22 +229,32 @@ proc putKeyFn(db: RdbBackendRef): PutKeyFn = hdl.keyCache = (vid, EmptyBlob) proc putFilFn(db: RdbBackendRef): PutFilFn = - result = - proc(hdl: PutHdlRef; vrps: openArray[(QueueID,FilterRef)]) = - let hdl = hdl.getSession db - if hdl.error.isNil: - for (qid,filter) in vrps: - if filter.isValid: - let rc = filter.blobify() - if rc.isErr: - hdl.error = TypedPutHdlErrRef( - pfx: FilPfx, - qid: qid, - code: rc.error) - return - hdl.filCache = (qid, rc.value) - else: - hdl.filCache = (qid, EmptyBlob) + if db.noFq: + result = + proc(hdl: PutHdlRef; vf: openArray[(QueueID,FilterRef)]) = + let hdl = hdl.getSession db + if hdl.error.isNil: + hdl.error = TypedPutHdlErrRef( + pfx: FilPfx, + qid: (if 0 < vf.len: vf[0][0] else: QueueID(0)), + code: FilQuSchedDisabled) + else: + result = + proc(hdl: PutHdlRef; vrps: openArray[(QueueID,FilterRef)]) = + let hdl = hdl.getSession db + if hdl.error.isNil: + for (qid,filter) in vrps: + if filter.isValid: + let rc = filter.blobify() + if rc.isErr: + hdl.error = TypedPutHdlErrRef( + pfx: FilPfx, + qid: qid, + code: rc.error) + return + hdl.filCache = (qid, rc.value) + else: + hdl.filCache = (qid, EmptyBlob) proc putIdgFn(db: RdbBackendRef): PutIdgFn = result = @@ -246,14 +267,23 @@ proc putIdgFn(db: RdbBackendRef): PutIdgFn = hdl.admCache = (AdmTabIdIdg, EmptyBlob) proc putFqsFn(db: RdbBackendRef): PutFqsFn = - result = - proc(hdl: PutHdlRef; vs: openArray[(QueueID,QueueID)]) = - let hdl = hdl.getSession db - if hdl.error.isNil: - if 0 < vs.len: - hdl.admCache = (AdmTabIdFqs, vs.blobify) - else: - hdl.admCache = (AdmTabIdFqs, EmptyBlob) + if db.noFq: + result = + proc(hdl: PutHdlRef; fs: openArray[(QueueID,QueueID)]) = + let hdl = hdl.getSession db + if hdl.error.isNil: + hdl.error = TypedPutHdlErrRef( + pfx: AdmPfx, + code: FilQuSchedDisabled) + else: + result = + proc(hdl: PutHdlRef; vs: openArray[(QueueID,QueueID)]) = + let hdl = hdl.getSession db + if hdl.error.isNil: + if 0 < vs.len: + hdl.admCache = (AdmTabIdFqs, vs.blobify) + else: + hdl.admCache = (AdmTabIdFqs, EmptyBlob) proc putEndFn(db: RdbBackendRef): PutEndFn = @@ -287,15 +317,22 @@ proc closeFn(db: RdbBackendRef): CloseFn = # Public functions # ------------------------------------------------------------------------------ -proc rocksDbBackend*(path: string): Result[BackendRef,AristoError] = - let - db = RdbBackendRef(kind: BackendRocksDB) - rc = db.rdb.init(path, maxOpenFiles) - if rc.isErr: - when extraTraceMessages: - trace logTxt "constructor failed", - error=rc.error[0], info=rc.error[1] - return err(rc.error[0]) +proc rocksDbBackend*( + path: string; + qidLayout: QidLayoutRef; + ): Result[BackendRef,AristoError] = + let db = RdbBackendRef( + beKind: BackendRocksDB, + noFq: qidLayout.isNil) + + # Initialise RocksDB + block: + let rc = db.rdb.init(path, maxOpenFiles) + if rc.isErr: + when extraTraceMessages: + trace logTxt "constructor failed", + error=rc.error[0], info=rc.error[1] + return err(rc.error[0]) db.getVtxFn = getVtxFn db db.getKeyFn = getKeyFn db @@ -313,6 +350,16 @@ proc rocksDbBackend*(path: string): Result[BackendRef,AristoError] = db.closeFn = closeFn db + # Set up filter management table + if not db.noFq: + db.filters = QidSchedRef(ctx: qidLayout) + db.filters.state = block: + let rc = db.getFqsFn() + if rc.isErr: + db.closeFn(flush = false) + return err(rc.error) + rc.value + ok db # ------------------------------------------------------------------------------ @@ -326,8 +373,20 @@ iterator walk*( ## ## Non-decodable entries are stepped over while the counter `n` of the ## yield record is still incremented. - for w in be.rdb.walk: - yield w + if be.noFq: + for w in be.rdb.walk: + case w.pfx: + of AdmPfx: + if w.xid == AdmTabIdFqs.uint64: + continue + of FilPfx: + break # last sub-table + else: + discard + yield w + else: + for w in be.rdb.walk: + yield w iterator walkVtx*( be: RdbBackendRef; @@ -351,10 +410,11 @@ iterator walkFil*( be: RdbBackendRef; ): tuple[n: int, qid: QueueID, filter: FilterRef] = ## Variant of `walk()` iteration over the filter sub-table. - for (n, xid, data) in be.rdb.walk FilPfx: - let rc = data.deblobify FilterRef - if rc.isOk: - yield (n, QueueID(xid), rc.value) + if not be.noFq: + for (n, xid, data) in be.rdb.walk FilPfx: + let rc = data.deblobify FilterRef + if rc.isOk: + yield (n, QueueID(xid), rc.value) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/aristo/aristo_transcode.nim b/nimbus/db/aristo/aristo_transcode.nim index 7b54662b8..308befc07 100644 --- a/nimbus/db/aristo/aristo_transcode.nim +++ b/nimbus/db/aristo/aristo_transcode.nim @@ -220,6 +220,8 @@ proc blobify*(vtx: VertexRef; data: var Blob): AristoError = ## :: ## 8 * n * ((access shr (n * 4)) and 15) ## + if not vtx.isValid: + return BlobifyNilVertex case vtx.vType: of Branch: var @@ -280,6 +282,7 @@ proc blobify*(vGen: openArray[VertexID]): Blob = proc blobify*(filter: FilterRef; data: var Blob): AristoError = ## This function serialises an Aristo DB filter object ## :: + ## uint64 -- filter ID ## Uint256 -- source key ## Uint256 -- target key ## uint32 -- number of vertex IDs (vertex ID generator state) @@ -295,8 +298,10 @@ proc blobify*(filter: FilterRef; data: var Blob): AristoError = ## ... -- more triplets ## 0x7d -- marker(8) ## - ## + if not filter.isValid: + return BlobifyNilFilter data.setLen(0) + data &= filter.fid.uint64.toBytesBE.toSeq data &= filter.src.ByteArray32.toSeq data &= filter.trg.ByteArray32.toSeq @@ -366,7 +371,7 @@ proc blobify*(filter: FilterRef; data: var Blob): AristoError = vid.uint64.toBytesBE.toSeq & keyBlob - data[68 ..< 72] = n.uint32.toBytesBE.toSeq + data[76 ..< 80] = n.uint32.toBytesBE.toSeq data.add 0x7Du8 proc blobify*(filter: FilterRef): Result[Blob, AristoError] = @@ -564,27 +569,27 @@ proc deblobify*(data: Blob; T: type seq[VertexID]): Result[T,AristoError] = return err(info) ok vGen - proc deblobify*(data: Blob; filter: var FilterRef): AristoError = ## De-serialise an Aristo DB filter object - if data.len < 72: # minumum length 72 for an empty filter + if data.len < 80: # minumum length 80 for an empty filter return DeblobFilterTooShort if data[^1] != 0x7d: return DeblobWrongType let f = FilterRef() - (addr f.src.ByteArray32[0]).copyMem(unsafeAddr data[0], 32) - (addr f.trg.ByteArray32[0]).copyMem(unsafeAddr data[32], 32) + f.fid = (uint64.fromBytesBE data[0 ..< 8]).FilterID + (addr f.src.ByteArray32[0]).copyMem(unsafeAddr data[8], 32) + (addr f.trg.ByteArray32[0]).copyMem(unsafeAddr data[40], 32) let - nVids = uint32.fromBytesBE data[64 ..< 68] - nTriplets = uint32.fromBytesBE data[68 ..< 72] - nTrplStart = (72 + nVids * 8).int + nVids = uint32.fromBytesBE data[72 ..< 76] + nTriplets = uint32.fromBytesBE data[76 ..< 80] + nTrplStart = (80 + nVids * 8).int if data.len < nTrplStart: return DeblobFilterGenTooShort for n in 0 ..< nVids: - let w = 72 + n * 8 + let w = 80 + n * 8 f.vGen.add (uint64.fromBytesBE data[w ..< w + 8]).VertexID var offs = nTrplStart diff --git a/nimbus/db/aristo/aristo_walk/memory_only.nim b/nimbus/db/aristo/aristo_walk/memory_only.nim index 1294bf33a..4b5c5f891 100644 --- a/nimbus/db/aristo/aristo_walk/memory_only.nim +++ b/nimbus/db/aristo/aristo_walk/memory_only.nim @@ -42,14 +42,20 @@ iterator walkKeyBe*[T: MemBackendRef|VoidBackendRef]( for (n,vid,key) in db.to(T).walkKeyBeImpl db: yield (n,vid,key) -iterator walkFilBe*[T: MemBackendRef|VoidBackendRef]( - _: type T; - db: AristoDbRef; +iterator walkFilBe*( + be: MemBackendRef|VoidBackendRef; ): tuple[n: int, qid: QueueID, filter: FilterRef] = - ## Similar to `walkVtxBe()` but for filters. - for (n,qid,filter) in db.to(T).walkFilBeImpl db: + ## Iterate over backend filters. + for (n,qid,filter) in be.walkFilBeImpl: yield (n,qid,filter) +iterator walkFifoBe*( + be: MemBackendRef|VoidBackendRef; + ): (QueueID,FilterRef) = + ## Walk filter slots in fifo order. + for (qid,filter) in be.walkFifoBeImpl: + yield (qid,filter) + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_walk/persistent.nim b/nimbus/db/aristo/aristo_walk/persistent.nim index 09ae3c441..1ff110c58 100644 --- a/nimbus/db/aristo/aristo_walk/persistent.nim +++ b/nimbus/db/aristo/aristo_walk/persistent.nim @@ -48,13 +48,19 @@ iterator walkKeyBe*( yield (n,vid,key) iterator walkFilBe*( - T: type RdbBackendRef; - db: AristoDbRef; + be: RdbBackendRef; ): tuple[n: int, qid: QueueID, filter: FilterRef] = - ## Similar to `walkVtxBe()` but for filters. - for (n,qid,filter) in db.to(T).walkFilBeImpl db: + ## Iterate over backend filters. + for (n,qid,filter) in be.walkFilBeImpl: yield (n,qid,filter) +iterator walkFifoBe*( + be: RdbBackendRef; + ): (QueueID,FilterRef) = + ## Walk filter slots in fifo order. + for (qid,filter) in be.walkFifoBeImpl: + yield (qid,filter) + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_walk/walk_private.nim b/nimbus/db/aristo/aristo_walk/walk_private.nim index 91591693a..700cfdc47 100644 --- a/nimbus/db/aristo/aristo_walk/walk_private.nim +++ b/nimbus/db/aristo/aristo_walk/walk_private.nim @@ -11,6 +11,7 @@ import std/[algorithm, sequtils, tables], + results, ".."/[aristo_desc, aristo_init] # ------------------------------------------------------------------------------ @@ -89,7 +90,6 @@ iterator walkKeyBeImpl*[T]( iterator walkFilBeImpl*[T]( be: T; # Backend descriptor - db: AristoDbRef; # Database with optional backend filter ): tuple[n: int, qid: QueueID, filter: FilterRef] = ## Generic filter iterator when be isnot VoidBackendRef: @@ -98,6 +98,33 @@ iterator walkFilBeImpl*[T]( for (n,qid,filter) in be.walkFil: yield (n,qid,filter) + +iterator walkFifoBeImpl*[T]( + be: T; # Backend descriptor + ): (QueueID,FilterRef) = + ## Generic filter iterator walking slots in fifo order. This iterator does + ## not depend on the backend type but may be type restricted nevertheless. + when be isnot VoidBackendRef: + proc kvp(chn: int, qid: QueueID): (QueueID,FilterRef) = + let cid = QueueID((chn.uint64 shl 62) or qid.uint64) + (cid, be.getFilFn(cid).get(otherwise = FilterRef(nil))) + + if not be.isNil: + let scd = be.filters + if not scd.isNil: + for i in 0 ..< scd.state.len: + let (left, right) = scd.state[i] + if left == 0: + discard + elif left <= right: + for j in right.countDown left: + yield kvp(i, j) + else: + for j in right.countDown QueueID(1): + yield kvp(i, j) + for j in scd.ctx.q[i].wrap.countDown left: + yield kvp(i, j) + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/tests/test_aristo/test_backend.nim b/tests/test_aristo/test_backend.nim index 08738ef31..395e1771c 100644 --- a/tests/test_aristo/test_backend.nim +++ b/tests/test_aristo/test_backend.nim @@ -74,30 +74,21 @@ proc mergeData( ## Simplified loop body of `test_mergeProofAndKvpList()` if 0 < proof.len: let rc = db.merge(rootKey, rootVid) - if rc.isErr: - check rc.error == AristoError(0) - return + xCheckRc rc.error == 0 let proved = db.merge(proof, rc.value) - if proved.error notin {AristoError(0),MergeHashKeyCachedAlready}: - check proved.error in {AristoError(0),MergeHashKeyCachedAlready} - return + xCheck proved.error in {AristoError(0),MergeHashKeyCachedAlready} let merged = db.merge leafs - if merged.error notin {AristoError(0), MergeLeafPathCachedAlready}: - check merged.error in {AristoError(0), MergeLeafPathCachedAlready} - return + xCheck merged.error in {AristoError(0), MergeLeafPathCachedAlready} block: let rc = db.hashify # (noisy, true) - if rc.isErr: - when true: # and false: - noisy.say "***", "dataMerge(9)", - " nLeafs=", leafs.len, - "\n cache dump\n ", db.pp, - "\n backend dump\n ", db.to(TypedBackendRef).pp(db) - check rc.error == (VertexID(0),AristoError(0)) - return + xCheckRc rc.error == (0,0): + noisy.say "***", "dataMerge(9)", + " nLeafs=", leafs.len, + "\n cache dump\n ", db.pp, + "\n backend dump\n ", db.backend.pp(db) true @@ -116,24 +107,18 @@ proc verify( let nVtx = ly.sTab.getOrVoid vid mVtx = beSTab.getOrVoid vid - if not nVtx.isValid and not mVtx.isValid: - check nVtx != VertexRef(nil) - check mVtx != VertexRef(nil) - return - if nVtx != mVtx: + + xCheck (nVtx != VertexRef(nil)) + xCheck (mVtx != VertexRef(nil)) + xCheck nVtx == mVtx: noisy.say "***", "verify", " beType=", be.typeof, " vid=", vid.pp, " nVtx=", nVtx.pp, " mVtx=", mVtx.pp - check nVtx == mVtx - return - if beSTab.len != ly.sTab.len or - beKMap.len != ly.kMap.len: - check beSTab.len == ly.sTab.len - check beKMap.len == ly.kMap.len - return + xCheck beSTab.len == ly.sTab.len + xCheck beKMap.len == ly.kMap.len true @@ -154,43 +139,35 @@ proc collectFilter( be.putFilFn(tx, @[(fid,filter)]) let endOk = be.putEndFn tx - if endOk != AristoError(0): - check endOk == AristoError(0) - return + xCheck endOk == AristoError(0) tab[fid] = filter.hash true -proc verifyFiltersImpl[T: MemBackendRef|RdbBackendRef]( - _: type T; - db: AristoDbRef; +proc verifyFiltersImpl[T]( + be: T; tab: Table[QueueID,Hash]; noisy: bool; ): bool = ## Compare stored filters against registered ones var n = 0 - for (_,fid,filter) in T.walkFilBe db: + for (_,fid,filter) in be.walkFilBe: let filterHash = filter.hash registered = tab.getOrDefault(fid, BlindHash) - if registered == BlindHash: - check (fid,registered) != (0,BlindHash) - return - if filterHash != registered: + + xCheck (registered != BlindHash) + xCheck registered == filterHash: noisy.say "***", "verifyFiltersImpl", " n=", n+1, " fid=", fid.pp, " filterHash=", filterHash.int.toHex, " registered=", registered.int.toHex - check (fid,filterHash) == (fid,registered) - return + n.inc - if n != tab.len: - check n == tab.len - return - + xCheck n == tab.len true proc verifyFilters( @@ -199,23 +176,20 @@ proc verifyFilters( noisy: bool; ): bool = ## Wrapper - let - be = db.to(TypedBackendRef) - kind = (if be.isNil: BackendVoid else: be.kind) - case kind: + case db.backend.kind: of BackendMemory: - return MemBackendRef.verifyFiltersImpl(db, tab, noisy) + return db.to(MemBackendRef).verifyFiltersImpl(tab, noisy) of BackendRocksDB: - return RdbBackendRef.verifyFiltersImpl(db, tab, noisy) + return db.to(RdbBackendRef).verifyFiltersImpl(tab, noisy) else: discard - check kind == BackendMemory or kind == BackendRocksDB + check db.backend.kind == BackendMemory or db.backend.kind == BackendRocksDB # ------------------------------------------------------------------------------ # Public test function # ------------------------------------------------------------------------------ -proc test_backendConsistency*( +proc testBackendConsistency*( noisy: bool; list: openArray[ProofTrieData]; # Test data rdbPath: string; # Rocks DB storage directory @@ -240,24 +214,25 @@ proc test_backendConsistency*( count = 0 ndb = newAristoDbRef BackendVoid mdb = newAristoDbRef BackendMemory + if doRdbOk: if not rdb.backend.isNil: # ignore bootstrap let verifyFiltersOk = rdb.verifyFilters(filTab, noisy) - if not verifyFiltersOk: - check verifyFiltersOk - return + xCheck verifyFiltersOk filTab.clear rdb.finish(flush=true) - let rc = newAristoDbRef(BackendRocksDB,rdbPath) - if rc.isErr: - check rc.error == 0 - return + let rc = newAristoDbRef(BackendRocksDB, rdbPath) + xCheckRc rc.error == 0 rdb = rc.value + + # Disable automated filter management, still allow filter table access + # for low level read/write testing. + rdb.backend.filters = QidSchedRef(nil) count.inc - check ndb.backend.isNil - check not mdb.backend.isNil - check doRdbOk or not rdb.backend.isNil + xCheck ndb.backend.isNil + xCheck not mdb.backend.isNil + xCheck doRdbOk or not rdb.backend.isNil when true and false: noisy.say "***", "beCon(1) <", n, "/", list.len-1, ">", " groups=", count @@ -270,33 +245,27 @@ proc test_backendConsistency*( block: let ndbOk = ndb.mergeData( rootKey, rootVid, w.proof, leafs, noisy=false) - if not ndbOk: - check ndbOk - return + xCheck ndbOk block: let mdbOk = mdb.mergeData( rootKey, rootVid, w.proof, leafs, noisy=false) - if not mdbOk: - check mdbOk - return + xCheck mdbOk if doRdbOk: # optional let rdbOk = rdb.mergeData( rootKey, rootVid, w.proof, leafs, noisy=false) - if not rdbOk: - check rdbOk - return + xCheck rdbOk when true and false: noisy.say "***", "beCon(2) <", n, "/", list.len-1, ">", " groups=", count, "\n cache dump\n ", ndb.pp, - "\n backend dump\n ", ndb.to(TypedBackendRef).pp(ndb), + "\n backend dump\n ", ndb.backend.pp(ndb), "\n -------------", "\n mdb cache\n ", mdb.pp, - "\n mdb backend\n ", mdb.to(TypedBackendRef).pp(ndb), + "\n mdb backend\n ", mdb.backend.pp(ndb), "\n -------------", "\n rdb cache\n ", rdb.pp, - "\n rdb backend\n ", rdb.to(TypedBackendRef).pp(ndb), + "\n rdb backend\n ", rdb.backend.pp(ndb), "\n -------------" when true and false: @@ -315,45 +284,39 @@ proc test_backendConsistency*( # Provide filter, store filter on permanent BE, and register filter digest block: let rc = mdb.stow(persistent=false, dontHashify=true, chunkedMpt=true) - if rc.isErr: - check rc.error == (0,0) - return + xCheckRc rc.error == (0,0) let collectFilterOk = rdb.collectFilter(mdb.roFilter, filTab, noisy) - if not collectFilterOk: - check collectFilterOk - return + xCheck collectFilterOk # Store onto backend database block: #noisy.say "***", "db-dump\n ", mdb.pp let rc = mdb.stow(persistent=true, dontHashify=true, chunkedMpt=true) - if rc.isErr: - check rc.error == (0,0) - return + xCheckRc rc.error == (0,0) if doRdbOk: let rc = rdb.stow(persistent=true, dontHashify=true, chunkedMpt=true) - if rc.isErr: - check rc.error == (0,0) - return + xCheckRc rc.error == (0,0) - if not ndb.top.verify(mdb.to(MemBackendRef), noisy): - when true and false: - noisy.say "***", "beCon(4) <", n, "/", list.len-1, ">", - " groups=", count, - "\n ndb cache\n ", ndb.pp, - "\n ndb backend=", ndb.backend.isNil.not, - #"\n -------------", - #"\n mdb pre-save cache\n ", mdbPreSaveCache, - #"\n mdb pre-save backend\n ", mdbPreSaveBackend, - "\n -------------", - "\n mdb cache\n ", mdb.pp, - "\n mdb backend\n ", mdb.to(TypedBackendRef).pp(ndb), - "\n -------------" - return + block: + let mdbVerifyOk = ndb.top.verify(mdb.to(MemBackendRef), noisy) + xCheck mdbVerifyOk: + when true and false: + noisy.say "***", "beCon(4) <", n, "/", list.len-1, ">", + " groups=", count, + "\n ndb cache\n ", ndb.pp, + "\n ndb backend=", ndb.backend.isNil.not, + #"\n -------------", + #"\n mdb pre-save cache\n ", mdbPreSaveCache, + #"\n mdb pre-save backend\n ", mdbPreSaveBackend, + "\n -------------", + "\n mdb cache\n ", mdb.pp, + "\n mdb backend\n ", mdb.backend.pp(ndb), + "\n -------------" if doRdbOk: - if not ndb.top.verify(rdb.to(RdbBackendRef), noisy): + let rdbVerifyOk = ndb.top.verify(rdb.to(RdbBackendRef), noisy) + xCheck rdbVerifyOk: when true and false: noisy.say "***", "beCon(4) <", n, "/", list.len-1, ">", " groups=", count, @@ -364,12 +327,11 @@ proc test_backendConsistency*( "\n rdb pre-save backend\n ", rdbPreSaveBackend, "\n -------------", "\n rdb cache\n ", rdb.pp, - "\n rdb backend\n ", rdb.to(TypedBackendRef).pp(ndb), + "\n rdb backend\n ", rdb.backend.pp(ndb), #"\n -------------", #"\n mdb cache\n ", mdb.pp, - #"\n mdb backend\n ", mdb.to(TypedBackendRef).pp(ndb), + #"\n mdb backend\n ", mdb.backend.pp(ndb), "\n -------------" - return when true and false: noisy.say "***", "beCon(9) <", n, "/", list.len-1, ">", " groups=", count @@ -377,9 +339,7 @@ proc test_backendConsistency*( # Finally ... block: let verifyFiltersOk = rdb.verifyFilters(filTab, noisy) - if not verifyFiltersOk: - check verifyFiltersOk - return + xCheck verifyFiltersOk true diff --git a/tests/test_aristo/test_filter.nim b/tests/test_aristo/test_filter.nim index bccec8dab..32b39e248 100644 --- a/tests/test_aristo/test_filter.nim +++ b/tests/test_aristo/test_filter.nim @@ -19,7 +19,8 @@ import ../../nimbus/db/aristo/[ aristo_check, aristo_debug, aristo_desc, aristo_filter, aristo_get, aristo_merge, aristo_transcode], - ../../nimbus/db/[aristo, aristo/aristo_init/persistent], + ../../nimbus/db/aristo, + ../../nimbus/db/aristo/aristo_init/persistent, ./test_helpers type @@ -35,7 +36,7 @@ type proc dump(pfx: string; dx: varargs[AristoDbRef]): string = proc dump(db: AristoDbRef): string = - db.pp & "\n " & db.to(TypedBackendRef).pp(db) & "\n" + db.pp & "\n " & db.backend.pp(db) & "\n" if 0 < dx.len: result = "\n " var @@ -95,6 +96,7 @@ iterator quadripartite(td: openArray[ProofTrieData]): LeafQuartet = proc dbTriplet(w: LeafQuartet; rdbPath: string): Result[DbTriplet,AristoError] = let db = block: let rc = newAristoDbRef(BackendRocksDB,rdbPath) + xCheckRc rc.error == 0 if rc.isErr: check rc.error == 0 return @@ -271,10 +273,9 @@ proc checkBeOk( let cache = if forceCache: true else: not dx[n].top.dirty rc = dx[n].checkBE(relax=relax, cache=cache) - if rc.isErr: + xCheckRc rc.error == (0,0): noisy.say "***", "db check failed", " n=", n, " cache=", cache - check (n, rc.error[0], rc.error[1]) == (n, 0, 0) - return + true proc checkFilterTrancoderOk( @@ -286,26 +287,23 @@ proc checkFilterTrancoderOk( if dx[n].roFilter.isValid: let data = block: let rc = dx[n].roFilter.blobify() - if rc.isErr: + xCheckRc rc.error == 0: noisy.say "***", "db serialisation failed", " n=", n, " error=", rc.error - check rc.error == 0 - return rc.value + let dcdRoundTrip = block: let rc = data.deblobify FilterRef - if rc.isErr: + xCheckRc rc.error == 0: noisy.say "***", "db de-serialisation failed", " n=", n, " error=", rc.error - check rc.error == 0 - return rc.value - if not dx[n].roFilter.isEq(dcdRoundTrip, dx[n], noisy): - #noisy.say "***", "checkFilterTrancoderOk failed", - # "\n roFilter=", dx[n].roFilter.pp(dx[n]), - # "\n dcdRoundTrip=", dcdRoundTrip.pp(dx[n]) - check (n,dx[n].roFilter) == (n,dcdRoundTrip) - return + + let roFilterExRoundTrip = dx[n].roFilter.isEq(dcdRoundTrip, dx[n], noisy) + xCheck roFilterExRoundTrip: + noisy.say "***", "checkFilterTrancoderOk failed", + "\n roFilter=", dx[n].roFilter.pp(dx[n]), + "\n dcdRoundTrip=", dcdRoundTrip.pp(dx[n]) true @@ -335,8 +333,7 @@ proc testDistributedAccess*( let dx = block: let rc = dbTriplet(w, rdbPath) - if rc.isErr: - return + xCheckRc rc.error == 0 rc.value (db1, db2, db3) = (dx[0], dx[1], dx[2]) defer: @@ -348,57 +345,34 @@ proc testDistributedAccess*( # Clause (9) from `aristo/README.md` example block: let rc = db1.stow(persistent=true) - if rc.isErr: - # noisy.say "*** testDistributedAccess (2) n=", n, dx.dump - check rc.error == (0,0) - return - if db1.roFilter.isValid: - check db1.roFilter == FilterRef(nil) - return - if db2.roFilter != db3.roFilter: - check db2.roFilter == db3.roFilter - return + xCheckRc rc.error == (0,0) + xCheck db1.roFilter == FilterRef(nil) + xCheck db2.roFilter == db3.roFilter block: let rc = db2.stow(persistent=false) - if rc.isErr: + xCheckRc rc.error == (0,0): noisy.say "*** testDistributedAccess (3)", "n=", n, "db2".dump db2 - check rc.error == (0,0) - return - if db1.roFilter.isValid: - check db1.roFilter == FilterRef(nil) - return - if db2.roFilter == db3.roFilter: - check db2.roFilter != db3.roFilter - return + xCheck db1.roFilter == FilterRef(nil) + xCheck db2.roFilter != db3.roFilter # Clause (11) from `aristo/README.md` example block: let rc = db2.ackqRwMode() - if rc.isErr: - check rc.error == 0 - return + xCheckRc rc.error == 0 block: let rc = db2.stow(persistent=true) - if rc.isErr: - check rc.error == (0,0) - return - if db2.roFilter.isValid: - check db2.roFilter == FilterRef(nil) - return + xCheckRc rc.error == (0,0) + xCheck db2.roFilter == FilterRef(nil) # Check/verify backends block: let ok = dx.checkBeOk(noisy=noisy) - if not ok: + xCheck ok: noisy.say "*** testDistributedAccess (4)", "n=", n, "db3".dump db3 - check ok - return block: let ok = dx.checkFilterTrancoderOk(noisy=noisy) - if not ok: - check ok - return + xCheck ok # Capture filters from clause (11) c11Filter1 = db1.roFilter @@ -414,8 +388,7 @@ proc testDistributedAccess*( let dy = block: let rc = dbTriplet(w, rdbPath) - if rc.isErr: - return + xCheckRc rc.error == 0 rc.value (db1, db2, db3) = (dy[0], dy[1], dy[2]) defer: @@ -424,59 +397,39 @@ proc testDistributedAccess*( # Build clause (12) from `aristo/README.md` example block: let rc = db2.ackqRwMode() - if rc.isErr: - check rc.error == 0 - return + xCheckRc rc.error == 0 block: let rc = db2.stow(persistent=true) - if rc.isErr: - check rc.error == (0,0) - return - if db2.roFilter.isValid: - check db1.roFilter == FilterRef(nil) - return - if db1.roFilter != db3.roFilter: - check db1.roFilter == db3.roFilter - return + xCheckRc rc.error == (0,0) + xCheck db2.roFilter == FilterRef(nil) + xCheck db1.roFilter == db3.roFilter # Clause (13) from `aristo/README.md` example block: let rc = db1.stow(persistent=false) - if rc.isErr: - check rc.error == (0,0) - return + xCheckRc rc.error == (0,0) # Clause (14) from `aristo/README.md` check - block: - let c11Fil1_eq_db1RoFilter = c11Filter1.isDbEq(db1.roFilter, db1, noisy) - if not c11Fil1_eq_db1RoFilter: - noisy.say "*** testDistributedAccess (7)", "n=", n, - "\n c11Filter1=", c11Filter3.pp(db1), - "db1".dump(db1) - check c11Fil1_eq_db1RoFilter - return + let c11Fil1_eq_db1RoFilter = c11Filter1.isDbEq(db1.roFilter, db1, noisy) + xCheck c11Fil1_eq_db1RoFilter: + noisy.say "*** testDistributedAccess (7)", "n=", n, + "\n c11Filter1=", c11Filter3.pp(db1), + "db1".dump(db1) # Clause (15) from `aristo/README.md` check - block: - let c11Fil3_eq_db3RoFilter = c11Filter3.isDbEq(db3.roFilter, db3, noisy) - if not c11Fil3_eq_db3RoFilter: - noisy.say "*** testDistributedAccess (8)", "n=", n, - "\n c11Filter3=", c11Filter3.pp(db3), - "db3".dump(db3) - check c11Fil3_eq_db3RoFilter - return + let c11Fil3_eq_db3RoFilter = c11Filter3.isDbEq(db3.roFilter, db3, noisy) + xCheck c11Fil3_eq_db3RoFilter: + noisy.say "*** testDistributedAccess (8)", "n=", n, + "\n c11Filter3=", c11Filter3.pp(db3), + "db3".dump(db3) # Check/verify backends block: let ok = dy.checkBeOk(noisy=noisy) - if not ok: - check ok - return + xCheck ok block: let ok = dy.checkFilterTrancoderOk(noisy=noisy) - if not ok: - check ok - return + xCheck ok when false: # or true: noisy.say "*** testDistributedAccess (9)", "n=", n, dy.dump diff --git a/tests/test_aristo/test_helpers.nim b/tests/test_aristo/test_helpers.nim index 5c38fe380..524a8c089 100644 --- a/tests/test_aristo/test_helpers.nim +++ b/tests/test_aristo/test_helpers.nim @@ -14,7 +14,8 @@ import eth/common, rocksdb, ../../nimbus/db/aristo/[ - aristo_constants, aristo_debug, aristo_desc, aristo_merge], + aristo_constants, aristo_debug, aristo_desc, + aristo_filter/filter_scheduler, aristo_merge], ../../nimbus/db/kvstore_rocksdb, ../../nimbus/sync/protocol/snap/snap_types, ../test_sync_snap/test_types, @@ -30,6 +31,10 @@ type proof*: seq[SnapProof] kvpLst*: seq[LeafTiePayload] +const + QidSlotLyo* = [(4,0,10),(3,3,10),(3,4,10),(3,5,10)] + QidSample* = (3 * QidSlotLyo.stats.minCovered) div 2 + # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ @@ -187,6 +192,39 @@ proc mapRootVid*( leafTie: LeafTie(root: toVid, path: it.leafTie.path), payload: it.payload)) +# ------------------------------------------------------------------------------ +# Public workflow helpers +# ------------------------------------------------------------------------------ + +template xCheck*(expr: untyped): untyped = + ## Note: this check will invoke `expr` twice + if not (expr): + check expr + return + +template xCheck*(expr: untyped; ifFalse: untyped): untyped = + ## Note: this check will invoke `expr` twice + if not (expr): + ifFalse + check expr + return + +template xCheckRc*(expr: untyped): untyped = + if rc.isErr: + xCheck(expr) + +template xCheckRc*(expr: untyped; ifFalse: untyped): untyped = + if rc.isErr: + xCheck(expr, ifFalse) + +template xCheckErr*(expr: untyped): untyped = + if rc.isOk: + xCheck(expr) + +template xCheckErr*(expr: untyped; ifFalse: untyped): untyped = + if rc.isOk: + xCheck(expr, ifFalse) + # ------------------------------------------------------------------------------ # Public iterators # ------------------------------------------------------------------------------ diff --git a/tests/test_aristo/test_misc.nim b/tests/test_aristo/test_misc.nim index a1f423dfc..fdcca0f7e 100644 --- a/tests/test_aristo/test_misc.nim +++ b/tests/test_aristo/test_misc.nim @@ -33,31 +33,10 @@ type QTabRef = TableRef[QueueID,QValRef] -const - QidSlotLyo = [(4,0,10),(3,3,10),(3,4,10),(3,5,10)] - QidSlotLy1 = [(4,0,0),(3,3,0),(3,4,0),(3,5,0)] - - QidSample* = (3 * QidSlotLyo.stats.minCovered) div 2 - # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ -template xCheck(expr: untyped): untyped = - ## Note: this check will invoke `expr` twice - if not (expr): - check expr - return - -template xCheck(expr: untyped; ifFalse: untyped): untyped = - ## Note: this check will invoke `expr` twice - if not (expr): - ifFalse - check expr - return - -# --------------------- - proc posixPrngRand(state: var uint32): byte = ## POSIX.1-2001 example of a rand() implementation, see manual page rand(3). state = state * 1103515245 + 12345; @@ -100,32 +79,39 @@ proc `+`(a: VertexID, b: int): VertexID = # --------------------- +iterator walkFifo(qt: QTabRef;scd: QidSchedRef): (QueueID,QValRef) = + ## ... + proc kvp(chn: int, qid: QueueID): (QueueID,QValRef) = + let cid = QueueID((chn.uint64 shl 62) or qid.uint64) + (cid, qt.getOrDefault(cid, QValRef(nil))) + + if not scd.isNil: + for i in 0 ..< scd.state.len: + let (left, right) = scd.state[i] + if left == 0: + discard + elif left <= right: + for j in right.countDown left: + yield kvp(i, j) + else: + for j in right.countDown QueueID(1): + yield kvp(i, j) + for j in scd.ctx.q[i].wrap.countDown left: + yield kvp(i, j) + +proc fifos(qt: QTabRef; scd: QidSchedRef): seq[seq[(QueueID,QValRef)]] = + ## .. + var lastChn = -1 + for (qid,val) in qt.walkFifo scd: + let chn = (qid.uint64 shr 62).int + while lastChn < chn: + lastChn.inc + result.add newSeq[(QueueID,QValRef)](0) + result[^1].add (qid,val) + func sortedPairs(qt: QTabRef): seq[(QueueID,QValRef)] = qt.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.QueueID).mapIt((it,qt[it])) -func fifos(qt: QTabRef; scd: QidSchedRef): seq[seq[(QueueID,QValRef)]] = - proc kvp(chn: int, qid: QueueID): (QueueID,QValRef) = - let - cid = QueueID((chn.uint64 shl 62) or qid.uint64) - val = qt.getOrDefault(cid, QValRef(nil)) - (cid, val) - - for i in 0 ..< scd.state.len: - let - left = scd.state[i][0] - right = scd.state[i][1] - result.add newSeq[(QueueID,QValRef)](0) - if left == 0: - discard - elif left <= right: - for j in right.countDown left: - result[i].add kvp(i, j) - else: - for j in right.countDown QueueID(1): - result[i].add kvp(i, j) - for j in scd.ctx.q[i].wrap.countDown left: - result[i].add kvp(i, j) - func flatten(a: seq[seq[(QueueID,QValRef)]]): seq[(QueueID,QValRef)] = for w in a: result &= w @@ -296,10 +282,8 @@ proc testVidRecycleLists*(noisy = true; seed = 42): bool = let db1 = newAristoDbRef BackendVoid rc = dbBlob.deblobify seq[VertexID] - if rc.isErr: - xCheck rc.error == AristoError(0) - else: - db1.top.vGen = rc.value + xCheckRc rc.error == 0 + db1.top.vGen = rc.value xCheck db.top.vGen == db1.top.vGen @@ -407,11 +391,15 @@ proc testQidScheduler*( let fifoID = list.fifos(scd).flatten.mapIt(it[0]) for j in 0 ..< list.len: + # Check fifo order xCheck fifoID[j] == scd[j]: noisy.say "***", "n=", n, " exec=", w.exec.pp, " fifoID[", j, "]=", fifoID[j].pp, " scd[", j, "]=", scd[j].pp, "\n fifo=", list.pp scd + # Check random access and reverse + let qid = scd[j] + xCheck j == scd[qid] if debug: show(exec=w.exec) @@ -459,7 +447,7 @@ proc testQidScheduler*( let validateOk = list.validate(scd, serial=n, relax=false) xCheck validateOk - if debug: # or true: + if debug: show() true diff --git a/tests/test_aristo/test_tx.nim b/tests/test_aristo/test_tx.nim index b03a02028..67fe38dbd 100644 --- a/tests/test_aristo/test_tx.nim +++ b/tests/test_aristo/test_tx.nim @@ -32,9 +32,13 @@ const MaxFilterBulk = 150_000 ## Policy settig for `pack()` - WalkStopRc = + WalkStopErr = Result[LeafTie,(VertexID,AristoError)].err((VertexID(0),NearbyBeyondRange)) +let + TxQidLyo = QidSlotLyo.to(QidLayoutRef) + ## Cascaded filter slots layout for testing + # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ @@ -89,13 +93,12 @@ proc randomisedLeafs( let r = n + td.rand(result.len - n) result[n].swap result[r] -proc innerCleanUp(db: AristoDbRef) = +proc innerCleanUp(db: AristoDbRef): bool {.discardable.} = ## Defer action - let rc = db.txTop() - if rc.isOk: - let rx = rc.value.collapse(commit=false) - if rx.isErr: - check rx.error == (0,0) + let rx = db.txTop() + if rx.isOk: + let rc = rx.value.collapse(commit=false) + xCheckRc rc.error == (0,0) db.finish(flush=true) proc saveToBackend( @@ -107,74 +110,52 @@ proc saveToBackend( ): bool = var db = tx.to(AristoDbRef) - # Verify context: nesting level must be 1 (i.e. two transactions) + # Verify context: nesting level must be 2 (i.e. two transactions) + xCheck tx.level == 2 + block: - block: - let level = tx.level - if level != 2: - check level == 2 - return - block: - let rc = db.checkCache(relax=true) - if rc.isErr: - check rc.error == (0,0) - return + let rc = db.checkCache(relax=true) + xCheckRc rc.error == (0,0) # Commit and hashify the current layer block: - block: - let rc = tx.commit() - if rc.isErr: - check rc.error == (0,0) - return - block: - # Make sure MPT hashes are OK - if db.top.dirty: - check db.top.dirty == false - return - block: - let rc = db.txTop() - if rc.isErr: - check rc.error == 0 - return - tx = rc.value - let level = tx.level - if level != 1: - check level == 1 - return - block: - let rc = db.checkBE(relax=true) - if rc.isErr: - check rc.error == (0,0) - return + let rc = tx.commit() + xCheckRc rc.error == (0,0) + + # Make sure MPT hashes are OK + xCheck db.top.dirty == false + + block: + let rc = db.txTop() + xCheckRc rc.error == 0 + tx = rc.value + + # Verify context: nesting level must be 1 (i.e. one transaction) + xCheck tx.level == 1 + + block: + let rc = db.checkBE(relax=true) + xCheckRc rc.error == (0,0) # Commit and save to backend block: - block: - let rc = tx.commit() - if rc.isErr: - check rc.error == (0,0) - return - block: - # Make sure MPT hashes are OK - if db.top.dirty: - check db.top.dirty == false - return - block: - let rc = db.txTop() - if rc.isOk: - check rc.value.level < 0 # force error - return - block: - let rc = db.stow(stageLimit=MaxFilterBulk, chunkedMpt=chunkedMpt) - if rc.isErr: - check rc.error == (0,0) - return - block: - let rc = db.checkBE(relax=relax) - if rc.isErr: - check rc.error == (0,0) - return + let rc = tx.commit() + xCheckRc rc.error == (0,0) + + # Make sure MPT hashes are OK + xCheck db.top.dirty == false + + block: + let rc = db.txTop() + xCheckErr rc.value.level < 0 # force error + + block: + let rc = db.stow(stageLimit=MaxFilterBulk, chunkedMpt=chunkedMpt) + xCheckRc rc.error == (0,0) + + block: + let rc = db.checkBE(relax=relax) + xCheckRc rc.error == (0,0) # Update layers to original level tx = db.txBegin().value.to(AristoDbRef).txBegin().value @@ -190,59 +171,40 @@ proc saveToBackendWithOops( ): bool = var db = tx.to(AristoDbRef) - # Verify context: nesting level must be 1 (i.e. two transactions) - block: - block: - let level = tx.level - if level != 2: - check level == 2 - return + # Verify context: nesting level must be 2 (i.e. two transactions) + xCheck tx.level == 2 # Commit and hashify the current layer block: - block: - let rc = tx.commit() - if rc.isErr: - check rc.error == (0,0) - return - block: - # Make sure MPT hashes are OK - if db.top.dirty: - check db.top.dirty == false - return - block: - let rc = db.txTop() - if rc.isErr: - check rc.error == 0 - return - tx = rc.value - let level = tx.level - if level != 1: - check level == 1 - return + let rc = tx.commit() + xCheckRc rc.error == (0,0) + + # Make sure MPT hashes are OK + xCheck db.top.dirty == false + + block: + let rc = db.txTop() + xCheckRc rc.error == 0 + tx = rc.value + + # Verify context: nesting level must be 1 (i.e. one transaction) + xCheck tx.level == 1 # Commit and save to backend block: - block: - let rc = tx.commit() - if rc.isErr: - check rc.error == (0,0) - return - block: - # Make sure MPT hashes are OK - if db.top.dirty: - check db.top.dirty == false - return - block: - let rc = db.txTop() - if rc.isOk: - check rc.value.level < 0 - return - block: - let rc = db.stow(stageLimit=MaxFilterBulk, chunkedMpt=chunkedMpt) - if rc.isErr: - check rc.error == (0,0) - return + let rc = tx.commit() + xCheckRc rc.error == (0,0) + + # Make sure MPT hashes are OK + xCheck db.top.dirty == false + + block: + let rc = db.txTop() + xCheckErr rc.value.level < 0 # force error + + block: + let rc = db.stow(stageLimit=MaxFilterBulk, chunkedMpt=chunkedMpt) + xCheckRc rc.error == (0,0) # Update layers to original level tx = db.txBegin().value.to(AristoDbRef).txBegin().value @@ -264,10 +226,8 @@ proc fwdWalkVerify( last = LeafTie() n = 0 for (key,_) in db.right low(LeafTie,root): - if key notin leftOver: + xCheck key in leftOver: noisy.say "*** fwdWalkVerify", " id=", n + (nLeafs + 1) * debugID - check key in leftOver - return leftOver.excl key last = key n.inc @@ -278,13 +238,12 @@ proc fwdWalkVerify( elif last != high(LeafTie,root): last = last + 1 let rc = last.right db - if rc.isOk or rc.error[1] != NearbyBeyondRange: - check rc == WalkStopRc - return + if rc.isOk: + xCheck rc == WalkStopErr + else: + xCheck rc.error[1] == NearbyBeyondRange - if n != nLeafs: - check n == nLeafs - return + xCheck n == nLeafs true @@ -302,10 +261,8 @@ proc revWalkVerify( last = LeafTie() n = 0 for (key,_) in db.left high(LeafTie,root): - if key notin leftOver: + xCheck key in leftOver: noisy.say "*** revWalkVerify", " id=", n + (nLeafs + 1) * debugID - check key in leftOver - return leftOver.excl key last = key n.inc @@ -316,13 +273,12 @@ proc revWalkVerify( elif last != low(LeafTie,root): last = last - 1 let rc = last.left db - if rc.isOk or rc.error[1] != NearbyBeyondRange: - check rc == WalkStopRc - return + if rc.isOk: + xCheck rc == WalkStopErr + else: + xCheck rc.error[1] == NearbyBeyondRange - if n != nLeafs: - check n == nLeafs - return + xCheck n == nLeafs true @@ -345,17 +301,15 @@ proc testTxMergeAndDelete*( for n,w in list: # Start with brand new persistent database. db = block: - let rc = newAristoDbRef(BackendRocksDB,rdbPath) - if rc.isErr: - check rc.error == 0 - return + let rc = newAristoDbRef(BackendRocksDB, rdbPath, qidLayout=TxQidLyo) + xCheckRc rc.error == 0 rc.value # Start transaction (double frame for testing) - check db.txTop.isErr + xCheck db.txTop.isErr var tx = db.txBegin().value.to(AristoDbRef).txBegin().value - check tx.isTop() - check tx.level == 2 + xCheck tx.isTop() + xCheck tx.level == 2 # Reset database so that the next round has a clean setup defer: db.innerCleanUp @@ -364,18 +318,14 @@ proc testTxMergeAndDelete*( let kvpLeafs = w.kvpLst.mapRootVid VertexID(1) for leaf in kvpLeafs: let rc = db.merge leaf - if rc.isErr: - check rc.error == 0 - return + xCheckRc rc.error == 0 # List of all leaf entries that should be on the database var leafsLeft = kvpLeafs.mapIt(it.leafTie).toHashSet # Provide a (reproducible) peudo-random copy of the leafs list let leafVidPairs = db.randomisedLeafs prng - if leafVidPairs.len != leafsLeft.len: - check leafVidPairs.len == leafsLeft.len - return + xCheck leafVidPairs.len == leafsLeft.len # Trigger subsequent saving tasks in loop below let (saveMod, saveRest, relax) = block: @@ -398,17 +348,13 @@ proc testTxMergeAndDelete*( # Delete leaf let rc = db.delete leaf - if rc.isErr: - check rc.error == (0,0) - return + xCheckRc rc.error == (0,0) # Update list of remaininf leafs leafsLeft.excl leaf let deletedVtx = tx.db.getVtx lid - if deletedVtx.isValid: - check deletedVtx.isValid == false - return + xCheck deletedVtx.isValid == false # Walking the database is too slow for large tables. So the hope is that # potential errors will not go away and rather pop up later, as well. @@ -453,15 +399,14 @@ proc testTxMergeProofAndKvpList*( if resetDb or w.root != rootKey or w.proof.len == 0: db.innerCleanUp db = block: - let rc = newAristoDbRef(BackendRocksDB,rdbPath) - if rc.isErr: - check rc.error == 0 - return + # New DB with disabled filter slots management + let rc = newAristoDbRef(BackendRocksDB,rdbPath,QidLayoutRef(nil)) + xCheckRc rc.error == 0 rc.value # Start transaction (double frame for testing) tx = db.txBegin().value.to(AristoDbRef).txBegin().value - check tx.isTop() + xCheck tx.isTop() # Update root rootKey = w.root @@ -480,28 +425,22 @@ proc testTxMergeProofAndKvpList*( proved: tuple[merged: int, dups: int, error: AristoError] if 0 < w.proof.len: let rc = db.merge(rootKey, VertexID(1)) - if rc.isErr: - check rc.error == 0 - return + xCheckRc rc.error == 0 proved = db.merge(w.proof, rc.value) # , noisy) - check proved.error in {AristoError(0),MergeHashKeyCachedAlready} - check w.proof.len == proved.merged + proved.dups - check db.top.lTab.len == lTabLen - check db.top.sTab.len <= proved.merged + sTabLen - check proved.merged < db.top.pAmk.len + xCheck proved.error in {AristoError(0),MergeHashKeyCachedAlready} + xCheck w.proof.len == proved.merged + proved.dups + xCheck db.top.lTab.len == lTabLen + xCheck db.top.sTab.len <= proved.merged + sTabLen + xCheck proved.merged < db.top.pAmk.len let merged = db.merge leafs - check db.top.lTab.len == lTabLen + merged.merged - check merged.merged + merged.dups == leafs.len - - block: - if merged.error notin {AristoError(0), MergeLeafPathCachedAlready}: - check merged.error in {AristoError(0), MergeLeafPathCachedAlready} - return + xCheck db.top.lTab.len == lTabLen + merged.merged + xCheck merged.merged + merged.dups == leafs.len + xCheck merged.error in {AristoError(0), MergeLeafPathCachedAlready} block: let oops = oopsTab.getOrDefault(testId,(0,AristoError(0)))