Aristo db kvt maintenance update (#1952)

* Update KVT layers abstraction

details:
  modelled after Aristo layers

* Simplified KVT database iterators (removed item counters)

why:
  Not needed for production functions

* Simplify KVT merge function `layersCc()`

* Simplified Aristo database iterators (removed item counters)

why:
  Not needed for production functions

* Update failure condition for hash labels compiler `hashify()`

why:
  Node need not be rejected as long as links are on the schedule. In
  that case, `redo[]` is to become `wff.base[]` at a later stage.

* Update merging layers and label update functions

why:
+ Merging a stack of layers with `layersCc()` could be simplified
+ Merging layers will optimise the reverse `kMap[]` table maps
  `pAmk: label->{vid, ..}` by deleting empty mappings `label->{}` where
  they are redundant.
+ Updated `layersPutLabel()` for optimising `pAmk[]` tables
This commit is contained in:
Jordan Hrycaj 2023-12-20 16:19:00 +00:00 committed by GitHub
parent dded8643d9
commit 43e5f428af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 313 additions and 236 deletions

View File

@ -92,7 +92,7 @@ proc checkBE*[T: RdbBackendRef|MemBackendRef|VoidBackendRef](
let vids = IntervalSetRef[VertexID,uint64].init()
discard vids.merge Interval[VertexID,uint64].new(VertexID(1),high(VertexID))
for (_,vid,vtx) in T.walkVtxBE db:
for (vid,vtx) in T.walkVtxBE db:
if not vtx.isValid:
return err((vid,CheckBeVtxInvalid))
let rc = db.getKeyBE vid
@ -114,7 +114,7 @@ proc checkBE*[T: RdbBackendRef|MemBackendRef|VoidBackendRef](
if vtx.ePfx.len == 0:
return err((vid,CheckBeVtxExtPfxMissing))
for (_,vid,key) in T.walkKeyBE db:
for (vid,key) in T.walkKeyBE db:
if not key.isvalid:
return err((vid,CheckBeKeyInvalid))
let vtx = db.getVtxBE(vid).valueOr:

View File

@ -483,12 +483,22 @@ proc ppBe[T](be: T; db: AristoDbRef; root: VertexID; indent: int): string =
result = "<" & $be.kind & ">"
result &= pfx & "vGen" & pfx1 & "[" &
be.getIdgFn().get(otherwise = EmptyVidSeq).mapIt(it.ppVid).join(",") & "]"
result &= pfx & "sTab" & pfx1 & "{" & be.walkVtx.toSeq.mapIt(
$(1+it[0]) & "(" & it[1].ppVid & "," & it[2].ppVtx(db,it[1]) & ")"
).join(pfx2) & "}"
result &= pfx & "kMap" & pfx1 & "{" & be.walkKey.toSeq.mapIt(
$(1+it[0]) & "(" & it[1].ppVid & "," & it[2].ppKey(db,root) & ")"
).join(pfx2) & "}"
block:
result &= pfx & "sTab" & pfx1 & "{"
var n = 0
for (vid,vtx) in be.walkVtx:
if 0 < n: result &= pfx2
n.inc
result &= $n & "(" & vid.ppVid & "," & vtx.ppVtx(db,vid) & ")"
result &= "}"
block:
result &= pfx & "kMap" & pfx1 & "{"
var n = 0
for (vid,key) in be.walkKey:
if 0 < n: result &= pfx2
n.inc
result &= $n & "(" & vid.ppVid & "," & key.ppKey(db,root) & ")"
result &= "}"
proc ppLayer(
layer: LayerRef;
@ -802,7 +812,24 @@ proc pp*(
): string =
result = db.layersCc.pp(db, indent=indent) & indent.toPfx
if 0 < db.stack.len:
result &= " level=" & $db.stack.len & indent.toPfx
result &= " level=" & $db.stack.len
when false: # or true:
let layers = @[db.top] & db.stack.reversed
var lStr = ""
for n,w in layers:
let
m = layers.len - n - 1
l = db.layersCc m
a = w.delta.kMap.values.toSeq.filterIt(not it.isValid).len
b = w.delta.pAmk.values.toSeq.filterIt(not it.isValid).len
c = l.delta.kMap.values.toSeq.filterIt(not it.isValid).len
d = l.delta.pAmk.values.toSeq.filterIt(not it.isValid).len
result &= " (" & $(w.delta.kMap.len - a) & "," & $a
result &= ";" & $(w.delta.pAmk.len - b) & "," & $b & ")"
lStr &= " " & $m & "=(" & $(l.delta.kMap.len - c) & "," & $c
lStr &= ";" & $(l.delta.pAmk.len - d) & "," & $d & ")"
result &= " --" & lStr
result &= indent.toPfx
if backendOk:
result &= db.backend.pp(db)
elif filterOk:

View File

@ -112,7 +112,6 @@ type
HashifyNodeUnresolved
HashifyRootHashMismatch
HashifyRootNodeUnresolved
HashifyLoop
# Cache checker `checkCache()`
CheckStkKeyStrayZeroEntry

View File

@ -119,7 +119,8 @@ type
vGen*: seq[VertexID] ## Unique vertex ID generator
dirty*: bool ## Needs to be hashified if `true`
LayerRef* = ref object
LayerRef* = ref LayerObj
LayerObj* = object
## Hexary trie database layer structures. Any layer holds the full
## change relative to the backend.
delta*: LayerDelta ## Most structural tables held as deltas

View File

@ -377,7 +377,7 @@ proc hashify*(
wff.pool[vid] = val
# Add the child vertices to `redo[]` for the schedule `base[]` list.
for w in error:
if w notin wff.base:
if w notin wff.base and w notin redo:
if db.layersGetVtx(w).isErr:
# Ooops, should have been marked for update
return err((w,HashifyNodeUnresolved))

View File

@ -311,7 +311,7 @@ proc memoryBackend*(qidLayout: QidLayoutRef): BackendRef =
iterator walkVtx*(
be: MemBackendRef;
): tuple[n: int, vid: VertexID, vtx: VertexRef] =
): tuple[vid: VertexID, vtx: VertexRef] =
## Iteration over the vertex sub-table.
for n,vid in be.sTab.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.VertexID):
let data = be.sTab.getOrDefault(vid, EmptyBlob)
@ -320,20 +320,20 @@ iterator walkVtx*(
if rc.isErr:
debug logTxt "walkVtxFn() skip", n, vid, error=rc.error
else:
yield (n, vid, rc.value)
yield (vid, rc.value)
iterator walkKey*(
be: MemBackendRef;
): tuple[n: int, vid: VertexID, key: HashKey] =
): tuple[vid: VertexID, key: HashKey] =
## Iteration over the Markle hash sub-table.
for n,vid in be.kMap.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.VertexID):
for vid in be.kMap.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.VertexID):
let key = be.kMap.getOrVoid vid
if key.isValid:
yield (n, vid, key)
yield (vid, key)
iterator walkFil*(
be: MemBackendRef;
): tuple[n: int, qid: QueueID, filter: FilterRef] =
): tuple[qid: QueueID, filter: FilterRef] =
## Iteration over the vertex sub-table.
if not be.noFq:
for n,qid in be.rFil.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.QueueID):
@ -341,45 +341,38 @@ iterator walkFil*(
if 0 < data.len:
let rc = data.deblobify FilterRef
if rc.isErr:
debug logTxt "walkFilFn() skip", n,qid, error=rc.error
debug logTxt "walkFilFn() skip", n, qid, error=rc.error
else:
yield (n, qid, rc.value)
yield (qid, rc.value)
iterator walk*(
be: MemBackendRef;
): tuple[n: int, pfx: StorageType, xid: uint64, data: Blob] =
): tuple[pfx: StorageType, xid: uint64, data: Blob] =
## Walk over all key-value pairs of the database.
##
## Non-decodable entries are stepped over while the counter `n` of the
## yield record is still incremented.
var n = 0
if be.vGen.isSome:
yield(0, AdmPfx, AdmTabIdIdg.uint64, be.vGen.unsafeGet.blobify)
n.inc
yield(AdmPfx, AdmTabIdIdg.uint64, be.vGen.unsafeGet.blobify)
if not be.noFq:
if be.vFqs.isSome:
yield(0, AdmPfx, AdmTabIdFqs.uint64, be.vFqs.unsafeGet.blobify)
n.inc
yield(AdmPfx, AdmTabIdFqs.uint64, be.vFqs.unsafeGet.blobify)
for vid in be.sTab.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.VertexID):
let data = be.sTab.getOrDefault(vid, EmptyBlob)
if 0 < data.len:
yield (n, VtxPfx, vid.uint64, data)
n.inc
yield (VtxPfx, vid.uint64, data)
for (_,vid,key) in be.walkKey:
yield (n, KeyPfx, vid.uint64, @key)
n.inc
for (vid,key) in be.walkKey:
yield (KeyPfx, vid.uint64, @key)
if not be.noFq:
for lid in be.rFil.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.QueueID):
let data = be.rFil.getOrDefault(lid, EmptyBlob)
if 0 < data.len:
yield (n, FilPfx, lid.uint64, data)
n.inc
yield (FilPfx, lid.uint64, data)
# ------------------------------------------------------------------------------
# End

View File

@ -368,7 +368,7 @@ proc rocksDbBackend*(
iterator walk*(
be: RdbBackendRef;
): tuple[n: int, pfx: StorageType, xid: uint64, data: Blob] =
): tuple[pfx: StorageType, xid: uint64, data: Blob] =
## Walk over all key-value pairs of the database.
##
## Non-decodable entries are stepped over while the counter `n` of the
@ -390,31 +390,31 @@ iterator walk*(
iterator walkVtx*(
be: RdbBackendRef;
): tuple[n: int, vid: VertexID, vtx: VertexRef] =
): tuple[vid: VertexID, vtx: VertexRef] =
## Variant of `walk()` iteration over the vertex sub-table.
for (n, xid, data) in be.rdb.walk VtxPfx:
for (xid, data) in be.rdb.walk VtxPfx:
let rc = data.deblobify VertexRef
if rc.isOk:
yield (n, VertexID(xid), rc.value)
yield (VertexID(xid), rc.value)
iterator walkKey*(
be: RdbBackendRef;
): tuple[n: int, vid: VertexID, key: HashKey] =
): tuple[vid: VertexID, key: HashKey] =
## Variant of `walk()` iteration over the Markle hash sub-table.
for (n, xid, data) in be.rdb.walk KeyPfx:
for (xid, data) in be.rdb.walk KeyPfx:
let lid = HashKey.fromBytes(data).valueOr:
continue
yield (n, VertexID(xid), lid)
yield (VertexID(xid), lid)
iterator walkFil*(
be: RdbBackendRef;
): tuple[n: int, qid: QueueID, filter: FilterRef] =
): tuple[qid: QueueID, filter: FilterRef] =
## Variant of `walk()` iteration over the filter sub-table.
if not be.noFq:
for (n, xid, data) in be.rdb.walk FilPfx:
for (xid, data) in be.rdb.walk FilPfx:
let rc = data.deblobify FilterRef
if rc.isOk:
yield (n, QueueID(xid), rc.value)
yield (QueueID(xid), rc.value)
# ------------------------------------------------------------------------------
# End

View File

@ -45,16 +45,14 @@ func valBlob(vData: cstring, vLen: csize_t): Blob =
iterator walk*(
rdb: RdbInst;
): tuple[n: int, pfx: StorageType, xid: uint64, data: Blob] =
): tuple[pfx: StorageType, xid: uint64, data: Blob] =
## Walk over all key-value pairs of the database.
##
## Non-decodable entries are stepped over while the counter `n` of the
## yield record is still incremented.
## Non-decodable entries are stepped over and ignored.
let rit = rdb.store.db.rocksdb_create_iterator(rdb.store.readOptions)
defer: rit.rocksdb_iter_destroy()
rit.rocksdb_iter_seek_to_first()
var count = 0
while rit.rocksdb_iter_valid() != 0:
var kLen: csize_t
@ -72,23 +70,21 @@ iterator walk*(
let val = vData.valBlob(vLen)
if 0 < val.len:
yield (count, pfx.StorageType, xid, val)
yield (pfx.StorageType, xid, val)
# Update Iterator (might overwrite kData/vdata)
rit.rocksdb_iter_next()
count.inc
# End while
iterator walk*(
rdb: RdbInst;
pfx: StorageType;
): tuple[n: int, xid: uint64, data: Blob] =
): tuple[xid: uint64, data: Blob] =
## Walk over key-value pairs of the table referted to by the argument `pfx`
## whic must be different from `Oops` and `AdmPfx`.
##
## Non-decodable entries are stepped over while the counter `n` of the
## yield record is still incremented.
## Non-decodable entries are stepped over and ignored.
##
block walkBody:
if pfx in {Oops, AdmPfx}:
@ -99,7 +95,6 @@ iterator walk*(
defer: rit.rocksdb_iter_destroy()
var
count = 0
kLen: csize_t
kData: cstring
@ -139,14 +134,13 @@ iterator walk*(
let val = vData.valBlob(vLen)
if 0 < val.len:
yield (count, xid, val)
yield (xid, val)
# Update Iterator
rit.rocksdb_iter_next()
if rit.rocksdb_iter_valid() == 0:
break walkBody
count.inc
# End while
# ------------------------------------------------------------------------------

View File

@ -25,18 +25,22 @@ func dup(sTab: Table[VertexID,VertexRef]): Table[VertexID,VertexRef] =
for (k,v) in sTab.pairs:
result[k] = v.dup
func dup(delta: LayerDelta): LayerDelta =
result = LayerDelta(
sTab: delta.sTab.dup, # explicit dup for ref values
kMap: delta.kMap,
pAmk: delta.pAmk)
func stackGetLebalOrVoid(db: AristoDbRef; lbl: HashLabel): HashSet[VertexID] =
func getLebalOrVoid(stack: seq[LayerRef]; lbl: HashLabel): HashSet[VertexID] =
# Helper: get next set of vertex IDs from stack.
for w in db.stack.reversed:
for w in stack.reversed:
w.delta.pAmk.withValue(lbl,value):
return value[]
proc recalcLebal(layer: var LayerObj) =
## Calculate reverse `kMap[]` for final (aka zero) layer
layer.delta.pAmk.clear
for (vid,lbl) in layer.delta.kMap.pairs:
if lbl.isValid:
layer.delta.pAmk.withValue(lbl, value):
value[].incl vid
do:
layer.delta.pAmk[lbl] = @[vid].toHashSet
# ------------------------------------------------------------------------------
# Public getters: lazy value lookup for read only versions
# ------------------------------------------------------------------------------
@ -58,15 +62,24 @@ func dirty*(db: AristoDbRef): bool =
# ------------------------------------------------------------------------------
func nLayersVtx*(db: AristoDbRef): int =
## Number of vertex entries on the cache layers
## Number of vertex ID/vertex entries on the cache layers. This is an upper
## bound for the number of effective vertex ID mappings held on the cache
## layers as there might be duplicate entries for the same vertex ID on
## different layers.
db.stack.mapIt(it.delta.sTab.len).foldl(a + b, db.top.delta.sTab.len)
func nLayersLabel*(db: AristoDbRef): int =
## Number of key/label entries on the cache layers
## Number of vertex ID/label entries on the cache layers. This is an upper
## bound for the number of effective vertex ID mappingss held on the cache
## layers as there might be duplicate entries for the same vertex ID on
## different layers.
db.stack.mapIt(it.delta.kMap.len).foldl(a + b, db.top.delta.kMap.len)
func nLayersLebal*(db: AristoDbRef): int =
## Number of key/label reverse lookup entries on the cache layers
## Number of label/vertex IDs reverse lookup entries on the cache layers.
## This is an upper bound for the number of effective label mappingss held
## on the cache layers as there might be duplicate entries for the same label
## on different layers.
db.stack.mapIt(it.delta.pAmk.len).foldl(a + b, db.top.delta.pAmk.len)
# ------------------------------------------------------------------------------
@ -170,17 +183,25 @@ proc layersPutLabel*(db: AristoDbRef; vid: VertexID; lbl: HashLabel) =
# Clear previous value on reverse table if it has changed
if blb.isValid and blb != lbl:
var vidsLen = -1
db.top.delta.pAmk.withValue(blb, value):
value[].excl vid
vidsLen = value[].len
do: # provide empty lookup
db.top.delta.pAmk[blb] = db.stackGetLebalOrVoid(blb) - @[vid].toHashSet
let vids = db.stack.getLebalOrVoid(blb)
if vids.isValid and vid in vids:
# This entry supersedes non-emtpty changed ones from lower levels
db.top.delta.pAmk[blb] = vids - @[vid].toHashSet
if vidsLen == 0 and not db.stack.getLebalOrVoid(blb).isValid:
# There is no non-emtpty entry on lower levels, so ledete this one
db.top.delta.pAmk.del blb
# Add updated value on reverse table if non-zero
if lbl.isValid:
db.top.delta.pAmk.withValue(lbl, value):
value[].incl vid
do: # else if not found: need to merge with value set from lower layer
db.top.delta.pAmk[lbl] = db.stackGetLebalOrVoid(lbl) + @[vid].toHashSet
db.top.delta.pAmk[lbl] = db.stack.getLebalOrVoid(lbl) + @[vid].toHashSet
proc layersResLabel*(db: AristoDbRef; vid: VertexID) =
@ -192,48 +213,53 @@ proc layersResLabel*(db: AristoDbRef; vid: VertexID) =
# Public functions
# ------------------------------------------------------------------------------
proc layersMergeOnto*(src: LayerRef; trg: LayerRef): LayerRef {.discardable.} =
proc layersMergeOnto*(src: LayerRef; trg: var LayerObj; stack: seq[LayerRef]) =
## Merges the argument `src` into the argument `trg` and returns `trg`. For
## the result layer, the `txUid` value set to `0`.
##
trg.final = src.final
trg.txUid = 0
for (vid,vtx) in src.delta.sTab.pairs:
trg.delta.sTab[vid] = vtx
for (vid,lbl) in src.delta.kMap.pairs:
trg.delta.kMap[vid] = lbl
if stack.len == 0:
# Re-calculate `pAmk[]`
trg.recalcLebal()
else:
# Merge reverse `kMap[]` layers. Empty label image sets are ignored unless
# they supersede non-empty values on the argument `stack[]`.
for (lbl,vids) in src.delta.pAmk.pairs:
trg.delta.pAmk.withValue(lbl, value):
value[] = value[] + vids
do:
if 0 < vids.len or stack.getLebalOrVoid(lbl).isValid:
trg.delta.pAmk[lbl] = vids
trg
proc layersCc*(db: AristoDbRef; level = high(int)): LayerRef =
func layersCc*(db: AristoDbRef; level = high(int)): LayerRef =
## Provide a collapsed copy of layers up to a particular transaction level.
## If the `level` argument is too large, the maximum transaction level is
## returned. For the result layer, the `txUid` value set to `0`.
let level = min(level, db.stack.len)
##
let layers = if db.stack.len <= level: db.stack & @[db.top]
else: db.stack[0 .. level]
result = LayerRef(final: db.top.final) # Pre-merged/final values
# Set up initial layer (bottom layer)
result = LayerRef(
final: layers[^1].final, # Pre-merged/final values
delta: LayerDelta(
sTab: layers[0].delta.sTab.dup, # explicit dup for ref values
kMap: layers[0].delta.kMap))
# Merge stack into its bottom layer
if level <= 0 and db.stack.len == 0:
result.delta = db.top.delta.dup # Explicit dup for ref values
else:
# now: 0 < level <= db.stack.len
result.delta = db.stack[0].delta.dup # Explicit dup for ref values
# Consecutively merge other layers on top
for n in 1 ..< layers.len:
for (vid,vtx) in layers[n].delta.sTab.pairs:
result.delta.sTab[vid] = vtx
for (vid,lbl) in layers[n].delta.kMap.pairs:
result.delta.kMap[vid] = lbl
# Merge stack: structural vertex table and hash key mapping
for w in db.stack.reversed:
w.layersMergeOnto result
# Merge top layer
db.top.layersMergeOnto result
# Re-calculate `pAmk[]`
result[].recalcLebal()
# ------------------------------------------------------------------------------
# Public iterators

View File

@ -14,6 +14,7 @@
{.push raises: [].}
import
std/tables,
results,
"."/[aristo_desc, aristo_filter, aristo_get, aristo_layers, aristo_hashify]
@ -248,12 +249,23 @@ proc commit*(
discard db.hashify().valueOr:
return err(error[1])
# Replace the top two layers by its merged version
let merged = db.top.layersMergeOnto db.stack[^1]
# Install `merged` layer
db.top = merged
# Pop layer from stack and merge database top layer onto it
let merged = block:
if db.top.delta.sTab.len == 0 and
db.top.delta.kMap.len == 0 and
db.top.delta.pAmk.len == 0:
# Avoid `layersMergeOnto()`
db.top.delta.shallowCopy db.stack[^1].delta
db.stack.setLen(db.stack.len-1)
db.top
else:
let layer = db.stack[^1]
db.stack.setLen(db.stack.len-1)
db.top.layersMergeOnto(layer[], db.stack)
layer
# Install `merged` stack top layer and update stack
db.top = merged
db.txRef = tx.parent
if 0 < db.stack.len:
db.txRef.txUid = db.getTxUid

View File

@ -28,27 +28,27 @@ export
iterator walkVtxBe*[T: MemBackendRef|VoidBackendRef](
_: type T;
db: AristoDbRef;
): tuple[n: int, vid: VertexID, vtx: VertexRef] =
): tuple[vid: VertexID, vtx: VertexRef] =
## Iterate over filtered memory backend or backend-less vertices. This
## function depends on the particular backend type name which must match
## the backend descriptor.
for (n,vid,vtx) in walkVtxBeImpl[T](db):
yield (n,vid,vtx)
for (vid,vtx) in walkVtxBeImpl[T](db):
yield (vid,vtx)
iterator walkKeyBe*[T: MemBackendRef|VoidBackendRef](
_: type T;
db: AristoDbRef;
): tuple[n: int, vid: VertexID, key: HashKey] =
): tuple[vid: VertexID, key: HashKey] =
## Similar to `walkVtxBe()` but for keys.
for (n,vid,key) in walkKeyBeImpl[T](db):
yield (n,vid,key)
for (vid,key) in walkKeyBeImpl[T](db):
yield (vid,key)
iterator walkFilBe*[T: MemBackendRef|VoidBackendRef](
be: T;
): tuple[n: int, qid: QueueID, filter: FilterRef] =
): tuple[qid: QueueID, filter: FilterRef] =
## Iterate over backend filters.
for (n,qid,filter) in walkFilBeImpl[T](be):
yield (n,qid,filter)
for (qid,filter) in walkFilBeImpl[T](be):
yield (qid,filter)
iterator walkFifoBe*[T: MemBackendRef|VoidBackendRef](
be: T;

View File

@ -34,26 +34,26 @@ export
iterator walkVtxBe*(
T: type RdbBackendRef;
db: AristoDbRef;
): tuple[n: int, vid: VertexID, vtx: VertexRef] =
): tuple[vid: VertexID, vtx: VertexRef] =
## Iterate over filtered RocksDB backend vertices. This function depends on
## the particular backend type name which must match the backend descriptor.
for (n,vid,vtx) in walkVtxBeImpl[T](db):
yield (n,vid,vtx)
for (vid,vtx) in walkVtxBeImpl[T](db):
yield (vid,vtx)
iterator walkKeyBe*(
T: type RdbBackendRef;
db: AristoDbRef;
): tuple[n: int, vid: VertexID, key: HashKey] =
): tuple[vid: VertexID, key: HashKey] =
## Similar to `walkVtxBe()` but for keys.
for (n,vid,key) in walkKeyBeImpl[T](db):
yield (n,vid,key)
for (vid,key) in walkKeyBeImpl[T](db):
yield (vid,key)
iterator walkFilBe*(
be: RdbBackendRef;
): tuple[n: int, qid: QueueID, filter: FilterRef] =
): tuple[qid: QueueID, filter: FilterRef] =
## Iterate over backend filters.
for (n,qid,filter) in be.walkFilBeImpl:
yield (n,qid,filter)
for (qid,filter) in be.walkFilBeImpl:
yield (qid,filter)
iterator walkFifoBe*(
be: RdbBackendRef;

View File

@ -12,7 +12,7 @@
import
std/[sequtils, sets, tables],
results,
".."/[aristo_desc, aristo_get, aristo_layers, aristo_init, aristo_utils]
".."/[aristo_desc, aristo_get, aristo_init, aristo_layers, aristo_utils]
# ------------------------------------------------------------------------------
# Public generic iterators
@ -20,10 +20,8 @@ import
iterator walkVtxBeImpl*[T](
db: AristoDbRef; # Database with optional backend filter
): tuple[n: int, vid: VertexID, vtx: VertexRef] =
): tuple[vid: VertexID, vtx: VertexRef] =
## Generic iterator
var n = 0
when T is VoidBackendRef:
let filter = if db.roFilter.isNil: FilterRef() else: db.roFilter
@ -34,30 +32,25 @@ iterator walkVtxBeImpl*[T](
if not db.roFilter.isNil:
filter.sTab = db.roFilter.sTab # copy table
for (_,vid,vtx) in db.backend.T.walkVtx:
for (vid,vtx) in db.backend.T.walkVtx:
if filter.sTab.hasKey vid:
let fVtx = filter.sTab.getOrVoid vid
if fVtx.isValid:
yield (n,vid,fVtx)
n.inc
yield (vid,fVtx)
filter.sTab.del vid
else:
yield (n,vid,vtx)
n.inc
yield (vid,vtx)
for vid in filter.sTab.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.VertexID):
let vtx = filter.sTab.getOrVoid vid
if vtx.isValid:
yield (n,vid,vtx)
n.inc
yield (vid,vtx)
iterator walkKeyBeImpl*[T](
db: AristoDbRef; # Database with optional backend filter
): tuple[n: int, vid: VertexID, key: HashKey] =
): tuple[vid: VertexID, key: HashKey] =
## Generic iterator
var n = 0
when T is VoidBackendRef:
let filter = if db.roFilter.isNil: FilterRef() else: db.roFilter
@ -68,33 +61,30 @@ iterator walkKeyBeImpl*[T](
if not db.roFilter.isNil:
filter.kMap = db.roFilter.kMap # copy table
for (_,vid,key) in db.backend.T.walkKey:
for (vid,key) in db.backend.T.walkKey:
if filter.kMap.hasKey vid:
let fKey = filter.kMap.getOrVoid vid
if fKey.isValid:
yield (n,vid,fKey)
n.inc
yield (vid,fKey)
filter.kMap.del vid
else:
yield (n,vid,key)
n.inc
yield (vid,key)
for vid in filter.kMap.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.VertexID):
let key = filter.kMap.getOrVoid vid
if key.isValid:
yield (n,vid,key)
n.inc
yield (vid,key)
iterator walkFilBeImpl*[T](
be: T; # Backend descriptor
): tuple[n: int, qid: QueueID, filter: FilterRef] =
): tuple[qid: QueueID, filter: FilterRef] =
## Generic filter iterator
when T isnot VoidBackendRef:
mixin walkFil
for (n,qid,filter) in be.walkFil:
yield (n,qid,filter)
for (qid,filter) in be.walkFil:
yield (qid,filter)
iterator walkFifoBeImpl*[T](

View File

@ -121,19 +121,24 @@ proc ppBe[T](be: T; db: KvtDbRef; indent: int): string =
pfx1 = indent.toPfx(1)
pfx2 = indent.toPfx(2)
pfx3 = indent.toPfx(3)
data = be.walk.toSeq.mapIt(
$(1+it[0]) & "(" & it[1].ppKey(db) & "," & it[2].ppValue & ")"
).join(pfx3)
spc = if 0 < data.len: pfx2 else: " "
var
data = ""
n = 0
for (key,val) in be.walk:
if 0 < n: data &= pfx3
n.inc
data &= $n & "(" & key.ppKey(db) & "," & val.ppValue & ")"
var
spc = if 0 < n: pfx2 else: " "
"<" & $be.kind & ">" & pfx1 & "tab" & spc & "{" & data & "}"
proc ppLayer(layer: LayerRef; db: KvtDbRef; indent = 4): string =
let
tLen = layer.dTab.len
tLen = layer.delta.sTab.len
info = "tab(" & $tLen & ")"
pfx1 = indent.toPfx(1)
pfx2 = if 0 < tLen: indent.toPfx(2) else: " "
"<layer>" & pfx1 & info & pfx2 & layer.dTab.ppTab(db,indent+2)
"<layer>" & pfx1 & info & pfx2 & layer.delta.sTab.ppTab(db,indent+2)
# ------------------------------------------------------------------------------
# Public functions

View File

@ -27,7 +27,7 @@
{.push raises: [].}
import
std/[algorithm, sequtils, tables],
std/tables,
chronicles,
eth/common,
results,
@ -140,14 +140,13 @@ proc memoryBackend*: BackendRef =
iterator walk*(
be: MemBackendRef;
): tuple[n: int, key: Blob, data: Blob] =
): tuple[key: Blob, data: Blob] =
## Walk over all key-value pairs of the database.
for n,key in be.tab.keys.toSeq.sorted:
let data = be.tab.getOrVoid key
if data.len == 0:
debug logTxt "walk() skip empty", n, key
for (key,data) in be.tab.pairs:
if data.isValid:
yield (key, data)
else:
yield (n, key, data)
debug logTxt "walk() skip empty", key
# ------------------------------------------------------------------------------
# End

View File

@ -169,13 +169,11 @@ proc rocksDbBackend*(
iterator walk*(
be: RdbBackendRef;
): tuple[n: int, key: Blob, data: Blob] =
): tuple[key: Blob, data: Blob] =
## Walk over all key-value pairs of the database.
##
var n = 0
for (k,v) in be.rdb.walk:
yield (n, k,v)
n.inc
yield (k,v)
# ------------------------------------------------------------------------------
# End

View File

@ -11,10 +11,58 @@
{.push raises: [].}
import
std/tables,
std/[algorithm, sequtils, sets, tables],
eth/common,
results,
./kvt_desc
# ------------------------------------------------------------------------------
# Public getters/helpers
# ------------------------------------------------------------------------------
func nLayersKeys*(db: KvtDbRef): int =
## Maximum number of ley/value entries on the cache layers. This is an upper
## bound for the number of effective key/value mappings held on the cache
## layers as there might be duplicate entries for the same key on different
## layers.
db.stack.mapIt(it.delta.sTab.len).foldl(a + b, db.top.delta.sTab.len)
# ------------------------------------------------------------------------------
# Public functions: get function
# ------------------------------------------------------------------------------
proc layersHasKey*(db: KvtDbRef; key: openArray[byte]): bool =
## Return `true` id the argument key is cached.
##
if db.top.delta.sTab.hasKey @key:
return true
for w in db.stack.reversed:
if w.delta.sTab.hasKey @key:
return true
proc layersGet*(db: KvtDbRef; key: openArray[byte]): Result[Blob,void] =
## Find an item on the cache layers. An `ok()` result might contain an
## empty value if it is stored on the cache that way.
##
if db.top.delta.sTab.hasKey @key:
return ok(db.top.delta.sTab.getOrVoid @key)
for w in db.stack.reversed:
if w.delta.sTab.hasKey @key:
return ok(w.delta.sTab.getOrVoid @key)
err()
# ------------------------------------------------------------------------------
# Public functions: put function
# ------------------------------------------------------------------------------
proc layersPut*(db: KvtDbRef; key: openArray[byte]; data: openArray[byte]) =
## Store a (potentally empty) value on the top layer
db.top.delta.sTab[@key] = @data
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
@ -23,23 +71,49 @@ proc layersCc*(db: KvtDbRef; level = high(int)): LayerRef =
## Provide a collapsed copy of layers up to a particular transaction level.
## If the `level` argument is too large, the maximum transaction level is
## returned. For the result layer, the `txUid` value set to `0`.
let level = min(level, db.stack.len)
let layers = if db.stack.len <= level: db.stack & @[db.top]
else: db.stack[0 .. level]
# Merge stack into its bottom layer
if level <= 0 and db.stack.len == 0:
result = LayerRef(delta: LayerDelta(sTab: db.top.delta.sTab))
else:
# now: 0 < level <= db.stack.len
result = LayerRef(delta: LayerDelta(sTab: db.stack[0].delta.sTab))
# Set up initial layer (bottom layer)
result = LayerRef(delta: LayerDelta(sTab: layers[0].delta.sTab))
for n in 1 ..< level:
for (key,val) in db.stack[n].delta.sTab.pairs:
# Consecutively merge other layers on top
for n in 1 ..< layers.len:
for (key,val) in layers[n].delta.sTab.pairs:
result.delta.sTab[key] = val
# Merge top layer if needed
if level == db.stack.len:
# ------------------------------------------------------------------------------
# Public iterators
# ------------------------------------------------------------------------------
iterator layersWalk*(
db: KvtDbRef;
seen: var HashSet[Blob];
): tuple[key: Blob, data: Blob] =
## Walk over all key-value pairs on the cache layers. Note that
## entries are unsorted.
##
## The argument `seen` collects a set of all visited vertex IDs including
## the one with a zero vertex which are othewise skipped by the iterator.
## The `seen` argument must not be modified while the iterator is active.
##
for (key,val) in db.top.delta.sTab.pairs:
result.delta.sTab[key] = val
yield (key,val)
seen.incl key
for w in db.stack.reversed:
for (key,val) in w.delta.sTab.pairs:
if key notin seen:
yield (key,val)
seen.incl key
iterator layersWalk*(
db: KvtDbRef;
): tuple[key: Blob, data: Blob] =
## Variant of `layersWalk()`.
var seen: HashSet[Blob]
for (key,val) in db.layersWalk seen:
yield (key,val)
# ------------------------------------------------------------------------------
# End

View File

@ -14,11 +14,10 @@
{.push raises: [].}
import
std/algorithm,
eth/common,
results,
./kvt_desc/desc_backend,
./kvt_desc
"."/[kvt_desc, kvt_layers]
# ------------------------------------------------------------------------------
# Private helpers
@ -52,7 +51,7 @@ proc put*(
if data.len == 0:
return err(DataInvalid)
db.top.delta.sTab[@key] = @data
db.layersPut(key, data)
ok()
@ -65,22 +64,7 @@ proc del*(
if key.len == 0:
return err(KeyInvalid)
block haveKey:
for w in db.stack.reversed:
if w.delta.sTab.hasKey @key:
break haveKey
# Do this one last as it is the most expensive lookup
let rc = db.getBE key
if rc.isOk:
break haveKey
if rc.error != GetNotFound:
return err(rc.error)
db.top.delta.sTab.del @key # No such key anywhere => delete now
return ok()
db.top.delta.sTab[@key] = EmptyBlob # Mark for deletion
db.layersPut(key, EmptyBlob)
ok()
# ------------
@ -95,19 +79,11 @@ proc get*(
if key.len == 0:
return err(KeyInvalid)
block:
let data = db.top.delta.sTab.getOrVoid @key
if data.isValid:
return ok(data)
let data = db.layersGet(key).valueOr:
return db.getBE key
block:
for w in db.stack.reversed:
let data = w.delta.sTab.getOrVoid @key
if data.isValid:
return ok(data)
db.getBE key
proc hasKey*(
db: KvtDbRef; # Database
@ -119,11 +95,7 @@ proc hasKey*(
if key.len == 0:
return err(KeyInvalid)
if db.top.delta.sTab.hasKey @key:
return ok(true)
for w in db.stack.reversed:
if w.delta.sTab.haskey @key:
if db.layersHasKey @key:
return ok(true)
let rc = db.getBE key

View File

@ -28,10 +28,10 @@ export
iterator walkPairs*[T: MemBackendRef|VoidBackendRef](
_: type T;
db: KvtDbRef;
): tuple[n: int; key: Blob, data: Blob] =
): tuple[key: Blob, data: Blob] =
## Iterate over backend filters.
for (n, vid,vtx) in walkPairsImpl[T](db):
yield (n, vid,vtx)
for (key,data) in walkPairsImpl[T](db):
yield (key,data)
# ------------------------------------------------------------------------------
# End

View File

@ -34,10 +34,10 @@ export
iterator walkPairs*(
T: type RdbBackendRef;
db: KvtDbRef;
): tuple[n: int, key: Blob, data: Blob] =
): tuple[key: Blob, data: Blob] =
## Iterate over backend filters.
for (n, vid,vtx) in walkPairsImpl[T](db):
yield (n, vid,vtx)
for (key,data) in walkPairsImpl[T](db):
yield (key,data)
# ------------------------------------------------------------------------------
# End

View File

@ -9,9 +9,9 @@
# distributed except according to those terms.
import
std/[algorithm, sets, tables],
std/[sets, tables],
eth/common,
".."/[kvt_desc, kvt_init]
".."/[kvt_desc, kvt_init, kvt_layers]
# ------------------------------------------------------------------------------
# Public generic iterators
@ -19,33 +19,20 @@ import
iterator walkPairsImpl*[T](
db: KvtDbRef; # Database with top layer & backend filter
): tuple[n: int, key: Blob, data: Blob] =
): tuple[key: Blob, data: Blob] =
## Walk over all `(VertexID,VertexRef)` in the database. Note that entries
## are unsorted.
var
seen: HashSet[Blob]
i = 0
for (key,data) in db.top.delta.sTab.pairs:
var seen: HashSet[Blob]
for (key,data) in db.layersWalk seen:
if data.isValid:
yield (i,key,data)
i.inc
seen.incl key
for w in db.stack.reversed:
for (key,data) in w.delta.sTab.pairs:
if key notin seen:
if data.isValid:
yield (i,key,data)
i.inc
seen.incl key
yield (key,data)
when T isnot VoidBackendRef:
mixin walk
for (n,key,data) in db.backend.T.walk:
for (_,key,data) in db.backend.T.walk:
if key notin seen and data.isValid:
yield (n+i,key,data)
yield (key,data)
# ------------------------------------------------------------------------------
# End

View File

@ -102,8 +102,8 @@ proc verify(
## ..
let
beSTab = be.walkVtx.toSeq.mapIt((it[1],it[2])).toTable
beKMap = be.walkKey.toSeq.mapIt((it[1],it[2])).toTable
beSTab = be.walkVtx.toSeq.mapIt((it[0],it[1])).toTable
beKMap = be.walkKey.toSeq.mapIt((it[0],it[1])).toTable
for vid in beSTab.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.VertexID):
let
@ -154,7 +154,7 @@ proc verifyFiltersImpl[T](
): bool =
## Compare stored filters against registered ones
var n = 0
for (_,fid,filter) in be.walkFilBe:
for (fid,filter) in be.walkFilBe:
let
filterHash = filter.hash
registered = tab.getOrDefault(fid, BlindHash)