Implement backend filter mechanics (#1730)

details:
* Tested features
  + Successively store filters with increasing filter ID (think block number)
  + Cascading through fifos, deeper fifos merge groups of filters
  + Fetch squash merged N top fifos
  + Delete N top fifos, push back merged fifo, continue storing
  + Fifo chain is verified by hashes and filter ID
* Not tested yet
  + Real live scenario (using data dumps)
  + Real filter data (only shallow filters used so far)
This commit is contained in:
Jordan Hrycaj 2023-09-05 19:00:40 +01:00 committed by GitHub
parent 3936d4d0ad
commit 070b06f809
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 687 additions and 112 deletions

View File

@ -177,13 +177,25 @@ type
DelVidStaleVtx DelVidStaleVtx
# Functions from `aristo_filter.nim` # Functions from `aristo_filter.nim`
FilRoBackendOrMissing FilBackendMissing
FilStateRootMissing FilBackendRoMode
FilStateRootMismatch
FilPrettyPointlessLayer
FilDudeFilterUpdateError FilDudeFilterUpdateError
FilExecDublicateSave
FilExecHoldExpected
FilExecOops
FilExecSaveMissing
FilExecStackUnderflow
FilInxByFidFailed
FilNilFilterRejected
FilNotReadOnlyDude FilNotReadOnlyDude
FilPosArgExpected
FilPrettyPointlessLayer
FilQidByLeFidFailed
FilQuSchedDisabled FilQuSchedDisabled
FilStateRootMismatch
FilStateRootMissing
FilTrgSrcMismatch
FilTrgTopSrcMismatch
# Get functions form `aristo_get.nim` # Get functions form `aristo_get.nim`
GetLeafNotFound GetLeafNotFound

View File

@ -17,7 +17,7 @@ import
results, results,
"."/[aristo_desc, aristo_get, aristo_vid], "."/[aristo_desc, aristo_get, aristo_vid],
./aristo_desc/desc_backend, ./aristo_desc/desc_backend,
./aristo_filter/[filter_desc, filter_helpers] ./aristo_filter/[filter_desc, filter_fifos, filter_helpers, filter_merge]
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Public helpers # Public helpers
@ -43,9 +43,9 @@ func bulk*(layer: LayerRef): int =
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc fwdFilter*( proc fwdFilter*(
db: AristoDbRef; db: AristoDbRef; # Database
layer: LayerRef; layer: LayerRef; # Layer to derive filter from
chunkedMpt = false; chunkedMpt = false; # Relax for snap/proof scenario
): Result[FilterRef,(VertexID,AristoError)] = ): Result[FilterRef,(VertexID,AristoError)] =
## Assemble forward delta, i.e. changes to the backend equivalent to applying ## Assemble forward delta, i.e. changes to the backend equivalent to applying
## the current top layer. ## the current top layer.
@ -79,8 +79,8 @@ proc fwdFilter*(
proc revFilter*( proc revFilter*(
db: AristoDbRef; db: AristoDbRef; # Database
filter: FilterRef; filter: FilterRef; # Filter to revert
): Result[FilterRef,(VertexID,AristoError)] = ): Result[FilterRef,(VertexID,AristoError)] =
## Assemble reverse filter for the `filter` argument, i.e. changes to the ## Assemble reverse filter for the `filter` argument, i.e. changes to the
## backend that reverse the effect of applying the this read-only filter. ## backend that reverse the effect of applying the this read-only filter.
@ -96,9 +96,10 @@ proc revFilter*(
# Get vid generator state on backend # Get vid generator state on backend
block: block:
let rc = db.getIdgUBE() let rc = db.getIdgUBE()
if rc.isErr: if rc.isOk:
return err((VertexID(0), rc.error))
rev.vGen = rc.value rev.vGen = rc.value
elif rc.error != GetIdgNotFound:
return err((VertexID(0), rc.error))
# Calculate reverse changes for the `sTab[]` structural table # Calculate reverse changes for the `sTab[]` structural table
for vid in filter.sTab.keys: for vid in filter.sTab.keys:
@ -127,8 +128,8 @@ proc revFilter*(
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc merge*( proc merge*(
db: AristoDbRef; db: AristoDbRef; # Database
filter: FilterRef; filter: FilterRef; # Filter to apply to database
): Result[void,(VertexID,AristoError)] = ): Result[void,(VertexID,AristoError)] =
## Merge the argument `filter` into the read-only filter layer. Note that ## Merge the argument `filter` into the read-only filter layer. Note that
## this function has no control of the filter source. Having merged the ## this function has no control of the filter source. Having merged the
@ -166,8 +167,15 @@ proc resolveBE*(db: AristoDbRef): Result[void,(VertexID,AristoError)] =
## For any associated descriptors working on the same backend, their backend ## For any associated descriptors working on the same backend, their backend
## filters will be updated so that the change of the backend DB remains ## filters will be updated so that the change of the backend DB remains
## unnoticed. ## unnoticed.
if not db.canResolveBE(): ##
return err((VertexID(1),FilRoBackendOrMissing)) ## Unless the disabled (see `newAristoDbRef()`, reverse filters are stored
## on a cascaded fifo table so that recent database states can be reverted.
##
if db.backend.isNil:
return err((VertexID(0),FilBackendMissing))
if not db.dudes.isNil and
not db.dudes.rwOk:
return err((VertexID(0),FilBackendRoMode))
# Blind or missing filter # Blind or missing filter
if db.roFilter.isNil: if db.roFilter.isNil:
@ -188,8 +196,6 @@ proc resolveBE*(db: AristoDbRef): Result[void,(VertexID,AristoError)] =
for (d,f) in roFilters: for (d,f) in roFilters:
d.roFilter = f d.roFilter = f
# Update dudes
if not db.dudes.isNil:
# Calculate reverse filter from current filter # Calculate reverse filter from current filter
let rev = block: let rev = block:
let rc = db.revFilter db.roFilter let rc = db.revFilter db.roFilter
@ -197,6 +203,17 @@ proc resolveBE*(db: AristoDbRef): Result[void,(VertexID,AristoError)] =
return err(rc.error) return err(rc.error)
rc.value rc.value
# Figure out how to save the rev filter on cascades slots queue
let be = db.backend
var instr: SaveInstr
if not be.filters.isNil:
let rc = be.store rev
if rc.isErr:
return err((VertexID(0),rc.error))
instr = rc.value
# Update dudes
if not db.dudes.isNil:
# Update distributed filters. Note that the physical backend database # Update distributed filters. Note that the physical backend database
# has not been updated, yet. So the new root key for the backend will # has not been updated, yet. So the new root key for the backend will
# be `db.roFilter.trg`. # be `db.roFilter.trg`.
@ -209,17 +226,22 @@ proc resolveBE*(db: AristoDbRef): Result[void,(VertexID,AristoError)] =
dude.roFilter = rc.value dude.roFilter = rc.value
# Save structural and other table entries # Save structural and other table entries
let let txFrame = be.putBegFn()
be = db.backend
txFrame = be.putBegFn()
be.putVtxFn(txFrame, db.roFilter.sTab.pairs.toSeq) be.putVtxFn(txFrame, db.roFilter.sTab.pairs.toSeq)
be.putKeyFn(txFrame, db.roFilter.kMap.pairs.toSeq) be.putKeyFn(txFrame, db.roFilter.kMap.pairs.toSeq)
be.putIdgFn(txFrame, db.roFilter.vGen) be.putIdgFn(txFrame, db.roFilter.vGen)
if not be.filters.isNil:
be.putFilFn(txFrame, instr.put)
be.putFqsFn(txFrame, instr.scd.state)
let w = be.putEndFn txFrame let w = be.putEndFn txFrame
if w != AristoError(0): if w != AristoError(0):
rollback() rollback()
return err((VertexID(0),w)) return err((VertexID(0),w))
# Update slot queue scheduler state (as saved)
if not be.filters.isNil:
be.filters.state = instr.scd.state
ok() ok()

View File

@ -0,0 +1,176 @@
# nimbus-eth1
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
std/tables,
results,
".."/[aristo_desc, aristo_desc/desc_backend],
"."/[filter_desc, filter_merge, filter_scheduler]
type
SaveInstr* = object
put*: seq[(QueueID,FilterRef)]
scd*: QidSchedRef
DeleteInstr* = object
fil*: FilterRef
put*: seq[(QueueID,FilterRef)]
scd*: QidSchedRef
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
template getFilterOrReturn(be: BackendRef; qid: QueueID): FilterRef =
let rc = be.getFilFn qid
if rc.isErr:
return err(rc.error)
rc.value
template joinFiltersOrReturn(upper, lower: FilterRef): FilterRef =
let rc = upper.merge lower
if rc.isErr:
return err(rc.error[1])
rc.value
template nextFidOrReturn(be: BackendRef): FilterID =
## Get next free filter ID, or exit function using this wrapper
var fid = FilterID(1)
block:
let qid = be.filters[0]
if qid.isValid:
let rc = be.getFilFn qid
if rc.isOK:
fid = rc.value.fid + 1
elif rc.error != GetFilNotFound:
return err(rc.error)
fid
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc store*(
be: BackendRef; # Database backend
filter: FilterRef; # Filter to save
): Result[SaveInstr,AristoError] =
## Calculate backend instructions for storing the arguent `filter` on the
## argument backend `be`.
##
if be.filters.isNil:
return err(FilQuSchedDisabled)
# Calculate filter table queue update by slot addresses
let
qTop = be.filters[0]
upd = be.filters.addItem
# Update filters and calculate database update
var
instr = SaveInstr(scd: upd.fifo)
hold: seq[FilterRef]
saved = false
# make sure that filter matches top entry (if any)
if qTop.isValid:
let top = be.getFilterOrReturn qTop
if filter.trg != top.src:
return err(FilTrgTopSrcMismatch)
for act in upd.exec:
case act.op:
of Oops:
return err(FilExecOops)
of SaveQid:
if saved:
return err(FilExecDublicateSave)
instr.put.add (act.qid, filter)
saved = true
of DelQid:
instr.put.add (act.qid, FilterRef(nil))
of HoldQid:
# Push filter
hold.add be.getFilterOrReturn act.qid
# Merge additional filters into top filter
for w in act.qid+1 .. act.xid:
let lower = be.getFilterOrReturn w
hold[^1] = hold[^1].joinFiltersOrReturn lower
of DequQid:
if hold.len == 0:
return err(FilExecStackUnderflow)
var lower = hold.pop
while 0 < hold.len:
let upper = hold.pop
lower = upper.joinFiltersOrReturn lower
instr.put.add (act.qid, lower)
if not saved:
return err(FilExecSaveMissing)
# Set next filter ID
filter.fid = be.nextFidOrReturn
ok instr
proc fetch*(
be: BackendRef; # Database backend
backStep: int; # Backstep this many filters
): Result[DeleteInstr,AristoError] =
## This function returns the single filter obtained by squash merging the
## topmost `backStep` filters on the backend fifo. Also, backend instructions
## are calculated and returned for deleting the merged filters on the fifo.
##
if be.filters.isNil:
return err(FilQuSchedDisabled)
if backStep <= 0:
return err(FilPosArgExpected)
# Get instructions
let fetch = be.filters.fetchItems backStep
var instr = DeleteInstr(scd: fetch.fifo)
# Follow `HoldQid` instructions and combine filters for sub-queues and
# push intermediate results on the `hold` stack
var hold: seq[FilterRef]
for act in fetch.exec:
if act.op != HoldQid:
return err(FilExecHoldExpected)
hold.add be.getFilterOrReturn act.qid
instr.put.add (act.qid,FilterRef(nil))
for qid in act.qid+1 .. act.xid:
let lower = be.getFilterOrReturn qid
instr.put.add (qid,FilterRef(nil))
hold[^1] = hold[^1].joinFiltersOrReturn lower
# Resolve `hold` stack
if hold.len == 0:
return err(FilExecStackUnderflow)
var upper = hold.pop
while 0 < hold.len:
let lower = hold.pop
upper = upper.joinFiltersOrReturn lower
instr.fil = upper
ok instr
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -11,8 +11,8 @@
import import
std/tables, std/tables,
results, results,
".."/[aristo_desc, aristo_get], ".."/[aristo_desc, aristo_desc/desc_backend, aristo_get],
./filter_desc "."/[filter_desc, filter_scheduler]
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Public functions # Public functions
@ -55,78 +55,19 @@ proc getLayerStateRoots*(
err(FilStateRootMismatch) err(FilStateRootMismatch)
proc merge*( proc le*(be: BackendRef; fid: FilterID): QueueID =
db: AristoDbRef; ## This function returns the filter lookup label of type `QueueID` for
upper: FilterRef; # Src filter, `nil` is ok ## the filter item with maximal filter ID `<=` argument `fid`.
lower: FilterRef; # Trg filter, `nil` is ok
beStateRoot: HashKey; # Merkle hash key
): Result[FilterRef,(VertexID,AristoError)] =
## Merge argument `upper` into the `lower` filter instance.
## ##
## Comparing before and after merge proc qid2fid(qid: QueueID): FilterID =
## :: let rc = be.getFilFn qid
## current | merged if rc.isErr:
## ----------------------------+-------------------------------- return FilterID(0)
## trg2 --upper-- (src2==trg1) | rc.value.fid
## | trg2 --newFilter-- (src1==trg0)
## trg1 --lower-- (src1==trg0) |
## |
## trg0 --beStateRoot | trg0 --beStateRoot
## |
##
# Degenerate case: `upper` is void
if lower.isNil:
if upper.isNil:
# Even more degenerate case when both filters are void
return ok FilterRef(nil)
if upper.src != beStateRoot:
return err((VertexID(1),FilStateRootMismatch))
return ok(upper)
# Degenerate case: `upper` is non-trivial and `lower` is void if not be.isNil and
if upper.isNil: not be.filters.isNil:
if lower.src != beStateRoot: return be.filters.le(fid, qid2fid)
return err((VertexID(0), FilStateRootMismatch))
return ok(lower)
# Verify stackability
if upper.src != lower.trg or
lower.src != beStateRoot:
return err((VertexID(0), FilStateRootMismatch))
# There is no need to deep copy table vertices as they will not be modified.
let newFilter = FilterRef(
src: lower.src,
sTab: lower.sTab,
kMap: lower.kMap,
vGen: upper.vGen,
trg: upper.trg)
for (vid,vtx) in upper.sTab.pairs:
if vtx.isValid or not newFilter.sTab.hasKey vid:
newFilter.sTab[vid] = vtx
elif newFilter.sTab.getOrVoid(vid).isValid:
let rc = db.getVtxUBE vid
if rc.isOk:
newFilter.sTab[vid] = vtx # VertexRef(nil)
elif rc.error == GetVtxNotFound:
newFilter.sTab.del vid
else:
return err((vid,rc.error))
for (vid,key) in upper.kMap.pairs:
if key.isValid or not newFilter.kMap.hasKey vid:
newFilter.kMap[vid] = key
elif newFilter.kMap.getOrVoid(vid).isValid:
let rc = db.getKeyUBE vid
if rc.isOk:
newFilter.kMap[vid] = key # VOID_HASH_KEY
elif rc.error == GetKeyNotFound:
newFilter.kMap.del vid
else:
return err((vid,rc.error))
ok newFilter
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# End # End

View File

@ -0,0 +1,145 @@
# nimbus-eth1
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
std/tables,
results,
".."/[aristo_desc, aristo_get]
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc merge*(
db: AristoDbRef;
upper: FilterRef; # Src filter, `nil` is ok
lower: FilterRef; # Trg filter, `nil` is ok
beStateRoot: HashKey; # Merkle hash key
): Result[FilterRef,(VertexID,AristoError)] =
## Merge argument `upper` into the `lower` filter instance.
##
## Note that the namimg `upper` and `lower` indicate that the filters are
## stacked and the database access is `upper -> lower -> backend` whereas
## the `src/trg` matching logic goes the other way round.
##
## The resuting filter has no `FilterID` set.
##
## Comparing before and after merge
## ::
## arguments | merged result
## --------------------------------+------------------------------------
## (src2==trg1) --> upper --> trg2 |
## | (src1==trg0) --> newFilter --> trg2
## (src1==trg0) --> lower --> trg1 |
## |
## beStateRoot --> trg0 |
##
# Degenerate case: `upper` is void
if lower.isNil:
if upper.isNil:
# Even more degenerate case when both filters are void
return ok FilterRef(nil)
if upper.src != beStateRoot:
return err((VertexID(1),FilStateRootMismatch))
return ok(upper)
# Degenerate case: `upper` is non-trivial and `lower` is void
if upper.isNil:
if lower.src != beStateRoot:
return err((VertexID(0), FilStateRootMismatch))
return ok(lower)
# Verify stackability
if upper.src != lower.trg:
return err((VertexID(0), FilTrgSrcMismatch))
if lower.src != beStateRoot:
return err((VertexID(0), FilStateRootMismatch))
# There is no need to deep copy table vertices as they will not be modified.
let newFilter = FilterRef(
src: lower.src,
sTab: lower.sTab,
kMap: lower.kMap,
vGen: upper.vGen,
trg: upper.trg)
for (vid,vtx) in upper.sTab.pairs:
if vtx.isValid or not newFilter.sTab.hasKey vid:
newFilter.sTab[vid] = vtx
elif newFilter.sTab.getOrVoid(vid).isValid:
let rc = db.getVtxUBE vid
if rc.isOk:
newFilter.sTab[vid] = vtx # VertexRef(nil)
elif rc.error == GetVtxNotFound:
newFilter.sTab.del vid
else:
return err((vid,rc.error))
for (vid,key) in upper.kMap.pairs:
if key.isValid or not newFilter.kMap.hasKey vid:
newFilter.kMap[vid] = key
elif newFilter.kMap.getOrVoid(vid).isValid:
let rc = db.getKeyUBE vid
if rc.isOk:
newFilter.kMap[vid] = key # VOID_HASH_KEY
elif rc.error == GetKeyNotFound:
newFilter.kMap.del vid
else:
return err((vid,rc.error))
ok newFilter
proc merge*(
upper: FilterRef; # filter, not `nil`
lower: FilterRef; # filter, not `nil`
): Result[FilterRef,(VertexID,AristoError)] =
## Variant of `merge()` without optimising filters relative to the backend.
## Also, filter arguments `upper` and `lower` are expected not`nil`.
## Otherwise an error is returned.
##
## Comparing before and after merge
## ::
## arguments | merged result
## --------------------------------+--------------------------------
## (src2==trg1) --> upper --> trg2 |
## | (src1==trg0) --> newFilter --> trg2
## (src1==trg0) --> lower --> trg1 |
## |
const
noisy = false
if upper.isNil or lower.isNil:
return err((VertexID(0),FilNilFilterRejected))
# Verify stackability
if upper.src != lower.trg:
return err((VertexID(0), FilTrgSrcMismatch))
# There is no need to deep copy table vertices as they will not be modified.
let newFilter = FilterRef(
fid: upper.fid,
src: lower.src,
sTab: lower.sTab,
kMap: lower.kMap,
vGen: upper.vGen,
trg: upper.trg)
for (vid,vtx) in upper.sTab.pairs:
newFilter.sTab[vid] = vtx
for (vid,key) in upper.kMap.pairs:
newFilter.kMap[vid] = key
ok newFilter
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -588,9 +588,8 @@ proc le*(
fid: FilterID; # Upper bound fid: FilterID; # Upper bound
fn: QuFilMap; # QueueID/FilterID mapping fn: QuFilMap; # QueueID/FilterID mapping
): QueueID = ): QueueID =
## Find the `qid` address of type `QueueID` with ## Find the `qid` address of type `QueueID` with `fn(qid) <= fid` and
## * `fn(qid) <= fid` ## `fid < fn(qid+1)`
## * for all `qid1` with `fn(qid1) <= fid` one has `fn(qid1) <= fn(qid)`
## ##
## If `fn()` returns `FilterID(0)`, then this function returns `QueueID(0)` ## If `fn()` returns `FilterID(0)`, then this function returns `QueueID(0)`
## ##
@ -613,6 +612,7 @@ proc le*(
# Bisection # Bisection
if fifo[right].getFid <= fid: if fifo[right].getFid <= fid:
var pivotLeFid = true
while 1 < right - left: while 1 < right - left:
let half = (left + right) div 2 let half = (left + right) div 2
# #
@ -622,12 +622,15 @@ proc le*(
# #
# with `fifo[left].fn > fid >= fifo[right].fn` # with `fifo[left].fn > fid >= fifo[right].fn`
# #
if fid >= fifo[half].getFid: let pivotLeFid = fifo[half].getFid <= fid
if pivotLeFid: # fid >= fifo[half].getFid:
right = half right = half
else: # fifo[half].getFid > fid else: # fifo[half].getFid > fid
left = half left = half
# Now: `fifo[right].fn <= fid < fifo[left].fn` (and `right == left+1`) # Now: `fifo[right].fn <= fid < fifo[left].fn` (and `right == left+1`).
# So make sure that the entry exists.
if not pivotLeFid or fid < fifo[left].fn:
return fifo[right] return fifo[right]
# otherwise QueueID(0) # otherwise QueueID(0)

View File

@ -83,9 +83,12 @@ proc miscRunner(
test "VertexID recyling lists": test "VertexID recyling lists":
check noisy.testVidRecycleLists() check noisy.testVidRecycleLists()
test &"QueueID cascaded fifos API (sample size: {qidSampleSize})": test &"Low level cascaded fifos API (sample size: {qidSampleSize})":
check noisy.testQidScheduler(sampleSize = qidSampleSize) check noisy.testQidScheduler(sampleSize = qidSampleSize)
test &"High level cascaded fifos API (sample size: {qidSampleSize})":
check noisy.testFilterFifo(sampleSize = qidSampleSize)
proc accountsRunner( proc accountsRunner(
noisy = true; noisy = true;
@ -172,7 +175,7 @@ when isMainModule:
setErrorLevel() setErrorLevel()
when true: # and false: when true: # and false:
noisy.miscRunner(qidSampleSize = 10_000) noisy.miscRunner(qidSampleSize = 1_000)
# This one uses dumps from the external `nimbus-eth1-blob` repo # This one uses dumps from the external `nimbus-eth1-blob` repo
when true and false: when true and false:

View File

@ -13,14 +13,17 @@
## ##
import import
std/[sequtils, sets, strutils],
eth/common, eth/common,
results, results,
unittest2, unittest2,
../../nimbus/db/aristo/[ ../../nimbus/db/aristo/[
aristo_check, aristo_debug, aristo_desc, aristo_filter, aristo_get, aristo_check, aristo_debug, aristo_desc, aristo_filter, aristo_get,
aristo_merge, aristo_transcode], aristo_merge, aristo_persistent, aristo_transcode],
../../nimbus/db/aristo, ../../nimbus/db/aristo,
../../nimbus/db/aristo/aristo_init/persistent, ../../nimbus/db/aristo/aristo_desc/desc_backend,
../../nimbus/db/aristo/aristo_filter/[
filter_desc, filter_fifos, filter_helpers, filter_scheduler],
./test_helpers ./test_helpers
type type
@ -34,6 +37,67 @@ type
# Private debugging helpers # Private debugging helpers
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc fifosImpl[T](be: T): seq[seq[(QueueID,FilterRef)]] =
var lastChn = -1
for (qid,val) in be.walkFifoBE:
let chn = (qid.uint64 shr 62).int
while lastChn < chn:
lastChn.inc
result.add newSeq[(QueueID,FilterRef)](0)
result[^1].add (qid,val)
proc fifos(be: BackendRef): seq[seq[(QueueID,FilterRef)]] =
## Wrapper
case be.kind:
of BackendMemory:
return be.MemBackendRef.fifosImpl
of BackendRocksDB:
return be.RdbBackendRef.fifosImpl
else:
discard
check be.kind == BackendMemory or be.kind == BackendRocksDB
func flatten(a: seq[seq[(QueueID,FilterRef)]]): seq[(QueueID,FilterRef)] =
for w in a:
result &= w
proc fList(be: BackendRef): seq[(QueueID,FilterRef)] =
case be.kind:
of BackendMemory:
return be.MemBackendRef.walkFilBe.toSeq.mapIt((it.qid, it.filter))
of BackendRocksDB:
return be.RdbBackendRef.walkFilBe.toSeq.mapIt((it.qid, it.filter))
else:
discard
check be.kind == BackendMemory or be.kind == BackendRocksDB
func ppFil(w: FilterRef): string =
func pp(w: HashKey): string =
let n = w.to(HashID).UInt256
if n == 0: "£ø" else: "£" & $n
"(" & w.fid.pp & "," & w.src.pp & "->" & w.trg.pp & ")"
func pp(qf: (QueueID,FilterRef)): string =
"(" & qf[0].pp & "," & (if qf[1].isNil: "ø" else: qf[1].ppFil) & ")"
proc pp(q: openArray[(QueueID,FilterRef)]): string =
"{" & q.mapIt(it.pp).join(",") & "}"
proc pp(q: seq[seq[(QueueID,FilterRef)]]): string =
result = "["
for w in q:
if w.len == 0:
result &= "ø"
else:
result &= w.mapIt(it.pp).join(",")
result &= ","
if result[^1] == ',':
result[^1] = ']'
else:
result &= "]"
# -------------------------
proc dump(pfx: string; dx: varargs[AristoDbRef]): string = proc dump(pfx: string; dx: varargs[AristoDbRef]): string =
proc dump(db: AristoDbRef): string = proc dump(db: AristoDbRef): string =
db.pp & "\n " & db.backend.pp(db) & "\n" db.pp & "\n " & db.backend.pp(db) & "\n"
@ -97,9 +161,6 @@ proc dbTriplet(w: LeafQuartet; rdbPath: string): Result[DbTriplet,AristoError] =
let db = block: let db = block:
let rc = newAristoDbRef(BackendRocksDB,rdbPath) let rc = newAristoDbRef(BackendRocksDB,rdbPath)
xCheckRc rc.error == 0 xCheckRc rc.error == 0
if rc.isErr:
check rc.error == 0
return
rc.value rc.value
# Fill backend # Fill backend
@ -307,6 +368,138 @@ proc checkFilterTrancoderOk(
true true
# -------------------------
func to(fid: FilterID; T: type HashKey): T =
fid.uint64.to(HashID).to(T)
proc storeFilter(
be: BackendRef;
filter: FilterRef;
): bool =
## ..
let instr = block:
let rc = be.store filter
xCheckRc rc.error == 0
rc.value
# Update database
let txFrame = be.putBegFn()
be.putFilFn(txFrame, instr.put)
be.putFqsFn(txFrame, instr.scd.state)
let done = be.putEndFn txFrame
xCheck done == 0
be.filters.state = instr.scd.state
true
proc storeFilter(
be: BackendRef;
serial: int;
): bool =
## Variant of `storeFilter()`
let fid = FilterID(serial)
be.storeFilter FilterRef(
fid: fid,
src: fid.to(HashKey),
trg: (fid-1).to(HashKey))
proc fetchDelete(
be: BackendRef;
backStep: int;
filter: var FilterRef;
): bool =
## ...
# let filter = block:
let
instr = block:
let rc = be.fetch(backStep = backStep)
xCheckRc rc.error == 0
rc.value
qid = be.le instr.fil.fid
inx = be.filters[qid]
xCheck backStep == inx + 1
# Update database
let txFrame = be.putBegFn()
be.putFilFn(txFrame, instr.put)
be.putFqsFn(txFrame, instr.scd.state)
let done = be.putEndFn txFrame
xCheck done == 0
be.filters.state = instr.scd.state
filter = instr.fil
# Verify that state was properly installed
let rc = be.getFqsFn()
xCheckRc rc.error == 0
xCheck rc.value == be.filters.state
true
proc validateFifo(
be: BackendRef;
serial: int;
): bool =
func to(key: HashKey; T: type uint64): T =
key.to(HashID).UInt256.truncate(uint64)
var lastTrg = serial.uint64
## Verify filter setup
##
## Example
## ::
## QueueID | FilterID | HashKey
## qid | filter.fid | filter.src -> filter.trg
## --------+------------+--------------------------
## %4 | @654 | £654 -> £653
## %3 | @653 | £653 -> £652
## %2 | @652 | £652 -> £651
## %1 | @651 | £651 -> £650
## %a | @650 | £650 -> £649
## %9 | @649 | £649 -> £648
## | |
## %1:2 | @648 | £648 -> £644
## %1:1 | @644 | £644 -> £640
## %1:a | @640 | £640 -> £636
## %1:9 | @636 | £636 -> £632
## %1:8 | @632 | £632 -> £628
## %1:7 | @628 | £628 -> £624
## %1:6 | @624 | £624 -> £620
## | |
## %2:1 | @620 | £620 -> £600
## %2:a | @600 | £600 -> £580
## .. | .. | ..
##
var
inx = 0
lastFid = FilterID(serial+1)
for chn,fifo in be.fifos:
for (qid,filter) in fifo:
# Check filter objects
xCheck chn == (qid.uint64 shr 62).int
xCheck filter != FilterRef(nil)
xCheck filter.src.to(uint64) == lastTrg
lastTrg = filter.trg.to(uint64)
# Check random access
xCheck qid == be.filters[inx]
xCheck inx == be.filters[qid]
# Check access by queue ID (all end up at `qid`)
for fid in filter.fid ..< lastFid:
xCheck qid == be.le fid
inx.inc
lastFid = filter.fid
true
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Public test function # Public test function
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
@ -436,6 +629,86 @@ proc testDistributedAccess*(
true true
proc testFilterFifo*(
noisy = true;
layout = QidSlotLyo;
sampleSize = QidSample;
reorgPercent = 40;
rdbPath = "";
): bool =
var
debug = false # or true
let
db = if 0 < rdbPath.len:
let rc = newAristoDbRef(BackendRocksDB,rdbPath,layout.to(QidLayoutRef))
xCheckRc rc.error == 0
rc.value
else:
BackendMemory.newAristoDbRef(layout.to(QidLayoutRef))
be = db.backend
defer: db.finish(flush=true)
proc show(serial = 0; exec: seq[QidAction] = @[]) =
var s = ""
if 0 < serial:
s &= " n=" & $serial
s &= " len=" & $be.filters.len
if 0 < exec.len:
s &= " exec=" & exec.pp
s &= "" &
"\n state=" & be.filters.state.pp &
#"\n list=" & be.fList.pp &
"\n fifo=" & be.fifos.pp &
"\n"
noisy.say "***", s
if debug:
noisy.say "***", "sampleSize=", sampleSize,
" ctx=", be.filters.ctx.q, " stats=", be.filters.ctx.stats
# -------------------
for n in 1 .. sampleSize:
let storeFilterOK = be.storeFilter(serial=n)
xCheck storeFilterOK
#show(n)
let validateFifoOk = be.validateFifo(serial=n)
xCheck validateFifoOk
# -------------------
# Squash some entries on the fifo
block:
var
filtersLen = be.filters.len
nDel = (filtersLen * reorgPercent) div 100
filter: FilterRef
# Extract and delete leading filters, use squashed filters extract
let fetchDeleteOk = be.fetchDelete(nDel, filter)
xCheck fetchDeleteOk
xCheck be.filters.len + nDel == filtersLen
# Push squashed filter
let storeFilterOK = be.storeFilter filter
xCheck storeFilterOK
#show sampleSize
# -------------------
# Continue adding items
for n in sampleSize + 1 .. 2 * sampleSize:
let storeFilterOK = be.storeFilter(serial=n)
xCheck storeFilterOK
#show(n)
let validateFifoOk = be.validateFifo(serial=n)
xCheck validateFifoOk
true
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# End # End
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------