From 070b06f80931c71ae9007ae3132acbc9d144eac8 Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Tue, 5 Sep 2023 19:00:40 +0100 Subject: [PATCH] Implement backend filter mechanics (#1730) details: * Tested features + Successively store filters with increasing filter ID (think block number) + Cascading through fifos, deeper fifos merge groups of filters + Fetch squash merged N top fifos + Delete N top fifos, push back merged fifo, continue storing + Fifo chain is verified by hashes and filter ID * Not tested yet + Real live scenario (using data dumps) + Real filter data (only shallow filters used so far) --- nimbus/db/aristo/aristo_desc/desc_error.nim | 20 +- nimbus/db/aristo/aristo_filter.nim | 66 ++-- .../db/aristo/aristo_filter/filter_fifos.nim | 176 +++++++++++ .../aristo/aristo_filter/filter_helpers.nim | 85 +----- .../db/aristo/aristo_filter/filter_merge.nim | 145 +++++++++ .../aristo/aristo_filter/filter_scheduler.nim | 17 +- tests/test_aristo.nim | 7 +- tests/test_aristo/test_filter.nim | 283 +++++++++++++++++- 8 files changed, 687 insertions(+), 112 deletions(-) create mode 100644 nimbus/db/aristo/aristo_filter/filter_fifos.nim create mode 100644 nimbus/db/aristo/aristo_filter/filter_merge.nim diff --git a/nimbus/db/aristo/aristo_desc/desc_error.nim b/nimbus/db/aristo/aristo_desc/desc_error.nim index a8008b33b..c52cdc9ad 100644 --- a/nimbus/db/aristo/aristo_desc/desc_error.nim +++ b/nimbus/db/aristo/aristo_desc/desc_error.nim @@ -177,13 +177,25 @@ type DelVidStaleVtx # Functions from `aristo_filter.nim` - FilRoBackendOrMissing - FilStateRootMissing - FilStateRootMismatch - FilPrettyPointlessLayer + FilBackendMissing + FilBackendRoMode FilDudeFilterUpdateError + FilExecDublicateSave + FilExecHoldExpected + FilExecOops + FilExecSaveMissing + FilExecStackUnderflow + FilInxByFidFailed + FilNilFilterRejected FilNotReadOnlyDude + FilPosArgExpected + FilPrettyPointlessLayer + FilQidByLeFidFailed FilQuSchedDisabled + FilStateRootMismatch + FilStateRootMissing + FilTrgSrcMismatch + FilTrgTopSrcMismatch # Get functions form `aristo_get.nim` GetLeafNotFound diff --git a/nimbus/db/aristo/aristo_filter.nim b/nimbus/db/aristo/aristo_filter.nim index 017a27597..ac255ff14 100644 --- a/nimbus/db/aristo/aristo_filter.nim +++ b/nimbus/db/aristo/aristo_filter.nim @@ -17,7 +17,7 @@ import results, "."/[aristo_desc, aristo_get, aristo_vid], ./aristo_desc/desc_backend, - ./aristo_filter/[filter_desc, filter_helpers] + ./aristo_filter/[filter_desc, filter_fifos, filter_helpers, filter_merge] # ------------------------------------------------------------------------------ # Public helpers @@ -43,9 +43,9 @@ func bulk*(layer: LayerRef): int = # ------------------------------------------------------------------------------ proc fwdFilter*( - db: AristoDbRef; - layer: LayerRef; - chunkedMpt = false; + db: AristoDbRef; # Database + layer: LayerRef; # Layer to derive filter from + chunkedMpt = false; # Relax for snap/proof scenario ): Result[FilterRef,(VertexID,AristoError)] = ## Assemble forward delta, i.e. changes to the backend equivalent to applying ## the current top layer. @@ -79,8 +79,8 @@ proc fwdFilter*( proc revFilter*( - db: AristoDbRef; - filter: FilterRef; + db: AristoDbRef; # Database + filter: FilterRef; # Filter to revert ): Result[FilterRef,(VertexID,AristoError)] = ## Assemble reverse filter for the `filter` argument, i.e. changes to the ## backend that reverse the effect of applying the this read-only filter. @@ -96,9 +96,10 @@ proc revFilter*( # Get vid generator state on backend block: let rc = db.getIdgUBE() - if rc.isErr: + if rc.isOk: + rev.vGen = rc.value + elif rc.error != GetIdgNotFound: return err((VertexID(0), rc.error)) - rev.vGen = rc.value # Calculate reverse changes for the `sTab[]` structural table for vid in filter.sTab.keys: @@ -127,8 +128,8 @@ proc revFilter*( # ------------------------------------------------------------------------------ proc merge*( - db: AristoDbRef; - filter: FilterRef; + db: AristoDbRef; # Database + filter: FilterRef; # Filter to apply to database ): Result[void,(VertexID,AristoError)] = ## Merge the argument `filter` into the read-only filter layer. Note that ## this function has no control of the filter source. Having merged the @@ -166,8 +167,15 @@ proc resolveBE*(db: AristoDbRef): Result[void,(VertexID,AristoError)] = ## For any associated descriptors working on the same backend, their backend ## filters will be updated so that the change of the backend DB remains ## unnoticed. - if not db.canResolveBE(): - return err((VertexID(1),FilRoBackendOrMissing)) + ## + ## Unless the disabled (see `newAristoDbRef()`, reverse filters are stored + ## on a cascaded fifo table so that recent database states can be reverted. + ## + if db.backend.isNil: + return err((VertexID(0),FilBackendMissing)) + if not db.dudes.isNil and + not db.dudes.rwOk: + return err((VertexID(0),FilBackendRoMode)) # Blind or missing filter if db.roFilter.isNil: @@ -188,15 +196,24 @@ proc resolveBE*(db: AristoDbRef): Result[void,(VertexID,AristoError)] = for (d,f) in roFilters: d.roFilter = f + # Calculate reverse filter from current filter + let rev = block: + let rc = db.revFilter db.roFilter + if rc.isErr: + return err(rc.error) + rc.value + + # Figure out how to save the rev filter on cascades slots queue + let be = db.backend + var instr: SaveInstr + if not be.filters.isNil: + let rc = be.store rev + if rc.isErr: + return err((VertexID(0),rc.error)) + instr = rc.value + # Update dudes if not db.dudes.isNil: - # Calculate reverse filter from current filter - let rev = block: - let rc = db.revFilter db.roFilter - if rc.isErr: - return err(rc.error) - rc.value - # Update distributed filters. Note that the physical backend database # has not been updated, yet. So the new root key for the backend will # be `db.roFilter.trg`. @@ -209,17 +226,22 @@ proc resolveBE*(db: AristoDbRef): Result[void,(VertexID,AristoError)] = dude.roFilter = rc.value # Save structural and other table entries - let - be = db.backend - txFrame = be.putBegFn() + let txFrame = be.putBegFn() be.putVtxFn(txFrame, db.roFilter.sTab.pairs.toSeq) be.putKeyFn(txFrame, db.roFilter.kMap.pairs.toSeq) be.putIdgFn(txFrame, db.roFilter.vGen) + if not be.filters.isNil: + be.putFilFn(txFrame, instr.put) + be.putFqsFn(txFrame, instr.scd.state) let w = be.putEndFn txFrame if w != AristoError(0): rollback() return err((VertexID(0),w)) + # Update slot queue scheduler state (as saved) + if not be.filters.isNil: + be.filters.state = instr.scd.state + ok() diff --git a/nimbus/db/aristo/aristo_filter/filter_fifos.nim b/nimbus/db/aristo/aristo_filter/filter_fifos.nim new file mode 100644 index 000000000..09296beab --- /dev/null +++ b/nimbus/db/aristo/aristo_filter/filter_fifos.nim @@ -0,0 +1,176 @@ +# nimbus-eth1 +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +import + std/tables, + results, + ".."/[aristo_desc, aristo_desc/desc_backend], + "."/[filter_desc, filter_merge, filter_scheduler] + +type + SaveInstr* = object + put*: seq[(QueueID,FilterRef)] + scd*: QidSchedRef + + DeleteInstr* = object + fil*: FilterRef + put*: seq[(QueueID,FilterRef)] + scd*: QidSchedRef + +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +template getFilterOrReturn(be: BackendRef; qid: QueueID): FilterRef = + let rc = be.getFilFn qid + if rc.isErr: + return err(rc.error) + rc.value + +template joinFiltersOrReturn(upper, lower: FilterRef): FilterRef = + let rc = upper.merge lower + if rc.isErr: + return err(rc.error[1]) + rc.value + +template nextFidOrReturn(be: BackendRef): FilterID = + ## Get next free filter ID, or exit function using this wrapper + var fid = FilterID(1) + block: + let qid = be.filters[0] + if qid.isValid: + let rc = be.getFilFn qid + if rc.isOK: + fid = rc.value.fid + 1 + elif rc.error != GetFilNotFound: + return err(rc.error) + fid + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc store*( + be: BackendRef; # Database backend + filter: FilterRef; # Filter to save + ): Result[SaveInstr,AristoError] = + ## Calculate backend instructions for storing the arguent `filter` on the + ## argument backend `be`. + ## + if be.filters.isNil: + return err(FilQuSchedDisabled) + + # Calculate filter table queue update by slot addresses + let + qTop = be.filters[0] + upd = be.filters.addItem + + # Update filters and calculate database update + var + instr = SaveInstr(scd: upd.fifo) + hold: seq[FilterRef] + saved = false + + # make sure that filter matches top entry (if any) + if qTop.isValid: + let top = be.getFilterOrReturn qTop + if filter.trg != top.src: + return err(FilTrgTopSrcMismatch) + + for act in upd.exec: + case act.op: + of Oops: + return err(FilExecOops) + + of SaveQid: + if saved: + return err(FilExecDublicateSave) + instr.put.add (act.qid, filter) + saved = true + + of DelQid: + instr.put.add (act.qid, FilterRef(nil)) + + of HoldQid: + # Push filter + hold.add be.getFilterOrReturn act.qid + + # Merge additional filters into top filter + for w in act.qid+1 .. act.xid: + let lower = be.getFilterOrReturn w + hold[^1] = hold[^1].joinFiltersOrReturn lower + + of DequQid: + if hold.len == 0: + return err(FilExecStackUnderflow) + var lower = hold.pop + while 0 < hold.len: + let upper = hold.pop + lower = upper.joinFiltersOrReturn lower + instr.put.add (act.qid, lower) + + if not saved: + return err(FilExecSaveMissing) + + # Set next filter ID + filter.fid = be.nextFidOrReturn + + ok instr + + +proc fetch*( + be: BackendRef; # Database backend + backStep: int; # Backstep this many filters + ): Result[DeleteInstr,AristoError] = + ## This function returns the single filter obtained by squash merging the + ## topmost `backStep` filters on the backend fifo. Also, backend instructions + ## are calculated and returned for deleting the merged filters on the fifo. + ## + if be.filters.isNil: + return err(FilQuSchedDisabled) + if backStep <= 0: + return err(FilPosArgExpected) + + # Get instructions + let fetch = be.filters.fetchItems backStep + var instr = DeleteInstr(scd: fetch.fifo) + + # Follow `HoldQid` instructions and combine filters for sub-queues and + # push intermediate results on the `hold` stack + var hold: seq[FilterRef] + for act in fetch.exec: + if act.op != HoldQid: + return err(FilExecHoldExpected) + + hold.add be.getFilterOrReturn act.qid + instr.put.add (act.qid,FilterRef(nil)) + + for qid in act.qid+1 .. act.xid: + let lower = be.getFilterOrReturn qid + instr.put.add (qid,FilterRef(nil)) + + hold[^1] = hold[^1].joinFiltersOrReturn lower + + # Resolve `hold` stack + if hold.len == 0: + return err(FilExecStackUnderflow) + + var upper = hold.pop + while 0 < hold.len: + let lower = hold.pop + + upper = upper.joinFiltersOrReturn lower + + instr.fil = upper + ok instr + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_filter/filter_helpers.nim b/nimbus/db/aristo/aristo_filter/filter_helpers.nim index c663ba15a..10c371de8 100644 --- a/nimbus/db/aristo/aristo_filter/filter_helpers.nim +++ b/nimbus/db/aristo/aristo_filter/filter_helpers.nim @@ -11,8 +11,8 @@ import std/tables, results, - ".."/[aristo_desc, aristo_get], - ./filter_desc + ".."/[aristo_desc, aristo_desc/desc_backend, aristo_get], + "."/[filter_desc, filter_scheduler] # ------------------------------------------------------------------------------ # Public functions @@ -55,78 +55,19 @@ proc getLayerStateRoots*( err(FilStateRootMismatch) -proc merge*( - db: AristoDbRef; - upper: FilterRef; # Src filter, `nil` is ok - lower: FilterRef; # Trg filter, `nil` is ok - beStateRoot: HashKey; # Merkle hash key - ): Result[FilterRef,(VertexID,AristoError)] = - ## Merge argument `upper` into the `lower` filter instance. +proc le*(be: BackendRef; fid: FilterID): QueueID = + ## This function returns the filter lookup label of type `QueueID` for + ## the filter item with maximal filter ID `<=` argument `fid`. ## - ## Comparing before and after merge - ## :: - ## current | merged - ## ----------------------------+-------------------------------- - ## trg2 --upper-- (src2==trg1) | - ## | trg2 --newFilter-- (src1==trg0) - ## trg1 --lower-- (src1==trg0) | - ## | - ## trg0 --beStateRoot | trg0 --beStateRoot - ## | - ## - # Degenerate case: `upper` is void - if lower.isNil: - if upper.isNil: - # Even more degenerate case when both filters are void - return ok FilterRef(nil) - if upper.src != beStateRoot: - return err((VertexID(1),FilStateRootMismatch)) - return ok(upper) + proc qid2fid(qid: QueueID): FilterID = + let rc = be.getFilFn qid + if rc.isErr: + return FilterID(0) + rc.value.fid - # Degenerate case: `upper` is non-trivial and `lower` is void - if upper.isNil: - if lower.src != beStateRoot: - return err((VertexID(0), FilStateRootMismatch)) - return ok(lower) - - # Verify stackability - if upper.src != lower.trg or - lower.src != beStateRoot: - return err((VertexID(0), FilStateRootMismatch)) - - # There is no need to deep copy table vertices as they will not be modified. - let newFilter = FilterRef( - src: lower.src, - sTab: lower.sTab, - kMap: lower.kMap, - vGen: upper.vGen, - trg: upper.trg) - - for (vid,vtx) in upper.sTab.pairs: - if vtx.isValid or not newFilter.sTab.hasKey vid: - newFilter.sTab[vid] = vtx - elif newFilter.sTab.getOrVoid(vid).isValid: - let rc = db.getVtxUBE vid - if rc.isOk: - newFilter.sTab[vid] = vtx # VertexRef(nil) - elif rc.error == GetVtxNotFound: - newFilter.sTab.del vid - else: - return err((vid,rc.error)) - - for (vid,key) in upper.kMap.pairs: - if key.isValid or not newFilter.kMap.hasKey vid: - newFilter.kMap[vid] = key - elif newFilter.kMap.getOrVoid(vid).isValid: - let rc = db.getKeyUBE vid - if rc.isOk: - newFilter.kMap[vid] = key # VOID_HASH_KEY - elif rc.error == GetKeyNotFound: - newFilter.kMap.del vid - else: - return err((vid,rc.error)) - - ok newFilter + if not be.isNil and + not be.filters.isNil: + return be.filters.le(fid, qid2fid) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/db/aristo/aristo_filter/filter_merge.nim b/nimbus/db/aristo/aristo_filter/filter_merge.nim new file mode 100644 index 000000000..9e2e2ccee --- /dev/null +++ b/nimbus/db/aristo/aristo_filter/filter_merge.nim @@ -0,0 +1,145 @@ +# nimbus-eth1 +# Copyright (c) 2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +import + std/tables, + results, + ".."/[aristo_desc, aristo_get] + +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc merge*( + db: AristoDbRef; + upper: FilterRef; # Src filter, `nil` is ok + lower: FilterRef; # Trg filter, `nil` is ok + beStateRoot: HashKey; # Merkle hash key + ): Result[FilterRef,(VertexID,AristoError)] = + ## Merge argument `upper` into the `lower` filter instance. + ## + ## Note that the namimg `upper` and `lower` indicate that the filters are + ## stacked and the database access is `upper -> lower -> backend` whereas + ## the `src/trg` matching logic goes the other way round. + ## + ## The resuting filter has no `FilterID` set. + ## + ## Comparing before and after merge + ## :: + ## arguments | merged result + ## --------------------------------+------------------------------------ + ## (src2==trg1) --> upper --> trg2 | + ## | (src1==trg0) --> newFilter --> trg2 + ## (src1==trg0) --> lower --> trg1 | + ## | + ## beStateRoot --> trg0 | + ## + # Degenerate case: `upper` is void + if lower.isNil: + if upper.isNil: + # Even more degenerate case when both filters are void + return ok FilterRef(nil) + if upper.src != beStateRoot: + return err((VertexID(1),FilStateRootMismatch)) + return ok(upper) + + # Degenerate case: `upper` is non-trivial and `lower` is void + if upper.isNil: + if lower.src != beStateRoot: + return err((VertexID(0), FilStateRootMismatch)) + return ok(lower) + + # Verify stackability + if upper.src != lower.trg: + return err((VertexID(0), FilTrgSrcMismatch)) + if lower.src != beStateRoot: + return err((VertexID(0), FilStateRootMismatch)) + + # There is no need to deep copy table vertices as they will not be modified. + let newFilter = FilterRef( + src: lower.src, + sTab: lower.sTab, + kMap: lower.kMap, + vGen: upper.vGen, + trg: upper.trg) + + for (vid,vtx) in upper.sTab.pairs: + if vtx.isValid or not newFilter.sTab.hasKey vid: + newFilter.sTab[vid] = vtx + elif newFilter.sTab.getOrVoid(vid).isValid: + let rc = db.getVtxUBE vid + if rc.isOk: + newFilter.sTab[vid] = vtx # VertexRef(nil) + elif rc.error == GetVtxNotFound: + newFilter.sTab.del vid + else: + return err((vid,rc.error)) + + for (vid,key) in upper.kMap.pairs: + if key.isValid or not newFilter.kMap.hasKey vid: + newFilter.kMap[vid] = key + elif newFilter.kMap.getOrVoid(vid).isValid: + let rc = db.getKeyUBE vid + if rc.isOk: + newFilter.kMap[vid] = key # VOID_HASH_KEY + elif rc.error == GetKeyNotFound: + newFilter.kMap.del vid + else: + return err((vid,rc.error)) + + ok newFilter + + +proc merge*( + upper: FilterRef; # filter, not `nil` + lower: FilterRef; # filter, not `nil` + ): Result[FilterRef,(VertexID,AristoError)] = + ## Variant of `merge()` without optimising filters relative to the backend. + ## Also, filter arguments `upper` and `lower` are expected not`nil`. + ## Otherwise an error is returned. + ## + ## Comparing before and after merge + ## :: + ## arguments | merged result + ## --------------------------------+-------------------------------- + ## (src2==trg1) --> upper --> trg2 | + ## | (src1==trg0) --> newFilter --> trg2 + ## (src1==trg0) --> lower --> trg1 | + ## | + const + noisy = false + + if upper.isNil or lower.isNil: + return err((VertexID(0),FilNilFilterRejected)) + + # Verify stackability + if upper.src != lower.trg: + return err((VertexID(0), FilTrgSrcMismatch)) + + # There is no need to deep copy table vertices as they will not be modified. + let newFilter = FilterRef( + fid: upper.fid, + src: lower.src, + sTab: lower.sTab, + kMap: lower.kMap, + vGen: upper.vGen, + trg: upper.trg) + + for (vid,vtx) in upper.sTab.pairs: + newFilter.sTab[vid] = vtx + + for (vid,key) in upper.kMap.pairs: + newFilter.kMap[vid] = key + + ok newFilter + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_filter/filter_scheduler.nim b/nimbus/db/aristo/aristo_filter/filter_scheduler.nim index 8dc2853c1..e954cb103 100644 --- a/nimbus/db/aristo/aristo_filter/filter_scheduler.nim +++ b/nimbus/db/aristo/aristo_filter/filter_scheduler.nim @@ -588,9 +588,8 @@ proc le*( fid: FilterID; # Upper bound fn: QuFilMap; # QueueID/FilterID mapping ): QueueID = - ## Find the `qid` address of type `QueueID` with - ## * `fn(qid) <= fid` - ## * for all `qid1` with `fn(qid1) <= fid` one has `fn(qid1) <= fn(qid)` + ## Find the `qid` address of type `QueueID` with `fn(qid) <= fid` and + ## `fid < fn(qid+1)` ## ## If `fn()` returns `FilterID(0)`, then this function returns `QueueID(0)` ## @@ -613,6 +612,7 @@ proc le*( # Bisection if fifo[right].getFid <= fid: + var pivotLeFid = true while 1 < right - left: let half = (left + right) div 2 # @@ -622,13 +622,16 @@ proc le*( # # with `fifo[left].fn > fid >= fifo[right].fn` # - if fid >= fifo[half].getFid: + let pivotLeFid = fifo[half].getFid <= fid + if pivotLeFid: # fid >= fifo[half].getFid: right = half - else: # fifo[half].getFid > fid + else: # fifo[half].getFid > fid left = half - # Now: `fifo[right].fn <= fid < fifo[left].fn` (and `right == left+1`) - return fifo[right] + # Now: `fifo[right].fn <= fid < fifo[left].fn` (and `right == left+1`). + # So make sure that the entry exists. + if not pivotLeFid or fid < fifo[left].fn: + return fifo[right] # otherwise QueueID(0) diff --git a/tests/test_aristo.nim b/tests/test_aristo.nim index eaf843ec8..08ad90414 100644 --- a/tests/test_aristo.nim +++ b/tests/test_aristo.nim @@ -83,9 +83,12 @@ proc miscRunner( test "VertexID recyling lists": check noisy.testVidRecycleLists() - test &"QueueID cascaded fifos API (sample size: {qidSampleSize})": + test &"Low level cascaded fifos API (sample size: {qidSampleSize})": check noisy.testQidScheduler(sampleSize = qidSampleSize) + test &"High level cascaded fifos API (sample size: {qidSampleSize})": + check noisy.testFilterFifo(sampleSize = qidSampleSize) + proc accountsRunner( noisy = true; @@ -172,7 +175,7 @@ when isMainModule: setErrorLevel() when true: # and false: - noisy.miscRunner(qidSampleSize = 10_000) + noisy.miscRunner(qidSampleSize = 1_000) # This one uses dumps from the external `nimbus-eth1-blob` repo when true and false: diff --git a/tests/test_aristo/test_filter.nim b/tests/test_aristo/test_filter.nim index 32b39e248..fea300d6e 100644 --- a/tests/test_aristo/test_filter.nim +++ b/tests/test_aristo/test_filter.nim @@ -13,14 +13,17 @@ ## import + std/[sequtils, sets, strutils], eth/common, results, unittest2, ../../nimbus/db/aristo/[ aristo_check, aristo_debug, aristo_desc, aristo_filter, aristo_get, - aristo_merge, aristo_transcode], + aristo_merge, aristo_persistent, aristo_transcode], ../../nimbus/db/aristo, - ../../nimbus/db/aristo/aristo_init/persistent, + ../../nimbus/db/aristo/aristo_desc/desc_backend, + ../../nimbus/db/aristo/aristo_filter/[ + filter_desc, filter_fifos, filter_helpers, filter_scheduler], ./test_helpers type @@ -34,6 +37,67 @@ type # Private debugging helpers # ------------------------------------------------------------------------------ +proc fifosImpl[T](be: T): seq[seq[(QueueID,FilterRef)]] = + var lastChn = -1 + for (qid,val) in be.walkFifoBE: + let chn = (qid.uint64 shr 62).int + while lastChn < chn: + lastChn.inc + result.add newSeq[(QueueID,FilterRef)](0) + result[^1].add (qid,val) + +proc fifos(be: BackendRef): seq[seq[(QueueID,FilterRef)]] = + ## Wrapper + case be.kind: + of BackendMemory: + return be.MemBackendRef.fifosImpl + of BackendRocksDB: + return be.RdbBackendRef.fifosImpl + else: + discard + check be.kind == BackendMemory or be.kind == BackendRocksDB + +func flatten(a: seq[seq[(QueueID,FilterRef)]]): seq[(QueueID,FilterRef)] = + for w in a: + result &= w + +proc fList(be: BackendRef): seq[(QueueID,FilterRef)] = + case be.kind: + of BackendMemory: + return be.MemBackendRef.walkFilBe.toSeq.mapIt((it.qid, it.filter)) + of BackendRocksDB: + return be.RdbBackendRef.walkFilBe.toSeq.mapIt((it.qid, it.filter)) + else: + discard + check be.kind == BackendMemory or be.kind == BackendRocksDB + +func ppFil(w: FilterRef): string = + func pp(w: HashKey): string = + let n = w.to(HashID).UInt256 + if n == 0: "£ø" else: "£" & $n + "(" & w.fid.pp & "," & w.src.pp & "->" & w.trg.pp & ")" + +func pp(qf: (QueueID,FilterRef)): string = + "(" & qf[0].pp & "," & (if qf[1].isNil: "ø" else: qf[1].ppFil) & ")" + +proc pp(q: openArray[(QueueID,FilterRef)]): string = + "{" & q.mapIt(it.pp).join(",") & "}" + +proc pp(q: seq[seq[(QueueID,FilterRef)]]): string = + result = "[" + for w in q: + if w.len == 0: + result &= "ø" + else: + result &= w.mapIt(it.pp).join(",") + result &= "," + if result[^1] == ',': + result[^1] = ']' + else: + result &= "]" + +# ------------------------- + proc dump(pfx: string; dx: varargs[AristoDbRef]): string = proc dump(db: AristoDbRef): string = db.pp & "\n " & db.backend.pp(db) & "\n" @@ -97,9 +161,6 @@ proc dbTriplet(w: LeafQuartet; rdbPath: string): Result[DbTriplet,AristoError] = let db = block: let rc = newAristoDbRef(BackendRocksDB,rdbPath) xCheckRc rc.error == 0 - if rc.isErr: - check rc.error == 0 - return rc.value # Fill backend @@ -307,6 +368,138 @@ proc checkFilterTrancoderOk( true +# ------------------------- + +func to(fid: FilterID; T: type HashKey): T = + fid.uint64.to(HashID).to(T) + + +proc storeFilter( + be: BackendRef; + filter: FilterRef; + ): bool = + ## .. + let instr = block: + let rc = be.store filter + xCheckRc rc.error == 0 + rc.value + + # Update database + let txFrame = be.putBegFn() + be.putFilFn(txFrame, instr.put) + be.putFqsFn(txFrame, instr.scd.state) + let done = be.putEndFn txFrame + xCheck done == 0 + + be.filters.state = instr.scd.state + true + +proc storeFilter( + be: BackendRef; + serial: int; + ): bool = + ## Variant of `storeFilter()` + let fid = FilterID(serial) + be.storeFilter FilterRef( + fid: fid, + src: fid.to(HashKey), + trg: (fid-1).to(HashKey)) + + +proc fetchDelete( + be: BackendRef; + backStep: int; + filter: var FilterRef; + ): bool = + ## ... + # let filter = block: + + let + instr = block: + let rc = be.fetch(backStep = backStep) + xCheckRc rc.error == 0 + rc.value + qid = be.le instr.fil.fid + inx = be.filters[qid] + xCheck backStep == inx + 1 + + # Update database + let txFrame = be.putBegFn() + be.putFilFn(txFrame, instr.put) + be.putFqsFn(txFrame, instr.scd.state) + let done = be.putEndFn txFrame + xCheck done == 0 + + be.filters.state = instr.scd.state + filter = instr.fil + + # Verify that state was properly installed + let rc = be.getFqsFn() + xCheckRc rc.error == 0 + xCheck rc.value == be.filters.state + + true + + +proc validateFifo( + be: BackendRef; + serial: int; + ): bool = + func to(key: HashKey; T: type uint64): T = + key.to(HashID).UInt256.truncate(uint64) + + var lastTrg = serial.uint64 + ## Verify filter setup + ## + ## Example + ## :: + ## QueueID | FilterID | HashKey + ## qid | filter.fid | filter.src -> filter.trg + ## --------+------------+-------------------------- + ## %4 | @654 | £654 -> £653 + ## %3 | @653 | £653 -> £652 + ## %2 | @652 | £652 -> £651 + ## %1 | @651 | £651 -> £650 + ## %a | @650 | £650 -> £649 + ## %9 | @649 | £649 -> £648 + ## | | + ## %1:2 | @648 | £648 -> £644 + ## %1:1 | @644 | £644 -> £640 + ## %1:a | @640 | £640 -> £636 + ## %1:9 | @636 | £636 -> £632 + ## %1:8 | @632 | £632 -> £628 + ## %1:7 | @628 | £628 -> £624 + ## %1:6 | @624 | £624 -> £620 + ## | | + ## %2:1 | @620 | £620 -> £600 + ## %2:a | @600 | £600 -> £580 + ## .. | .. | .. + ## + var + inx = 0 + lastFid = FilterID(serial+1) + for chn,fifo in be.fifos: + for (qid,filter) in fifo: + + # Check filter objects + xCheck chn == (qid.uint64 shr 62).int + xCheck filter != FilterRef(nil) + xCheck filter.src.to(uint64) == lastTrg + lastTrg = filter.trg.to(uint64) + + # Check random access + xCheck qid == be.filters[inx] + xCheck inx == be.filters[qid] + + # Check access by queue ID (all end up at `qid`) + for fid in filter.fid ..< lastFid: + xCheck qid == be.le fid + + inx.inc + lastFid = filter.fid + + true + # ------------------------------------------------------------------------------ # Public test function # ------------------------------------------------------------------------------ @@ -436,6 +629,86 @@ proc testDistributedAccess*( true + +proc testFilterFifo*( + noisy = true; + layout = QidSlotLyo; + sampleSize = QidSample; + reorgPercent = 40; + rdbPath = ""; + ): bool = + var + debug = false # or true + let + db = if 0 < rdbPath.len: + let rc = newAristoDbRef(BackendRocksDB,rdbPath,layout.to(QidLayoutRef)) + xCheckRc rc.error == 0 + rc.value + else: + BackendMemory.newAristoDbRef(layout.to(QidLayoutRef)) + be = db.backend + + defer: db.finish(flush=true) + + proc show(serial = 0; exec: seq[QidAction] = @[]) = + var s = "" + if 0 < serial: + s &= " n=" & $serial + s &= " len=" & $be.filters.len + if 0 < exec.len: + s &= " exec=" & exec.pp + s &= "" & + "\n state=" & be.filters.state.pp & + #"\n list=" & be.fList.pp & + "\n fifo=" & be.fifos.pp & + "\n" + noisy.say "***", s + + if debug: + noisy.say "***", "sampleSize=", sampleSize, + " ctx=", be.filters.ctx.q, " stats=", be.filters.ctx.stats + + # ------------------- + + for n in 1 .. sampleSize: + let storeFilterOK = be.storeFilter(serial=n) + xCheck storeFilterOK + #show(n) + let validateFifoOk = be.validateFifo(serial=n) + xCheck validateFifoOk + + # ------------------- + + # Squash some entries on the fifo + block: + var + filtersLen = be.filters.len + nDel = (filtersLen * reorgPercent) div 100 + filter: FilterRef + + # Extract and delete leading filters, use squashed filters extract + let fetchDeleteOk = be.fetchDelete(nDel, filter) + xCheck fetchDeleteOk + xCheck be.filters.len + nDel == filtersLen + + # Push squashed filter + let storeFilterOK = be.storeFilter filter + xCheck storeFilterOK + + #show sampleSize + + # ------------------- + + # Continue adding items + for n in sampleSize + 1 .. 2 * sampleSize: + let storeFilterOK = be.storeFilter(serial=n) + xCheck storeFilterOK + #show(n) + let validateFifoOk = be.validateFifo(serial=n) + xCheck validateFifoOk + + true + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------