Aristo db extend filter storage scheduler api (#1725)

* Add backwards index `[]` operator into fifo

also:
  Need another maintenance instruction: The last overflow queue must
  irrevocably delete some item in order to make space for a new one.

* Add re-org scheduler

details:
  Generates instructions how to extract and merge some leading entries

* Add filter ID selector

details:
  This allows to find the next filter now newer that a given filter ID

* Message update
This commit is contained in:
Jordan Hrycaj 2023-08-30 18:08:39 +01:00 committed by GitHub
parent 96fb355efe
commit f177f5bf11
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 519 additions and 173 deletions

View File

@ -86,6 +86,11 @@ proc ppVid(vid: VertexID; pfx = true): string =
else: else:
result &= "ø" result &= "ø"
proc ppFid(fid: FilterID): string =
if not fid.isValid:
return "ø"
"@" & $fid
proc ppQid(qid: QueueID): string = proc ppQid(qid: QueueID): string =
if not qid.isValid: if not qid.isValid:
return "ø" return "ø"
@ -471,6 +476,9 @@ proc pp*(vid: VertexID): string =
proc pp*(qid: QueueID): string = proc pp*(qid: QueueID): string =
qid.ppQid qid.ppQid
proc pp*(fid: FilterID): string =
fid.ppFid
proc pp*(a: openArray[(QueueID,QueueID)]): string = proc pp*(a: openArray[(QueueID,QueueID)]): string =
"[" & a.toSeq.mapIt("(" & it[0].pp & "," & it[1].pp & ")").join(",") & "]" "[" & a.toSeq.mapIt("(" & it[0].pp & "," & it[1].pp & ")").join(",") & "]"

View File

@ -109,6 +109,9 @@ func isValid*(vid: VertexID): bool =
func isValid*(qid: QueueID): bool = func isValid*(qid: QueueID): bool =
qid != QueueID(0) qid != QueueID(0)
func isValid*(fid: FilterID): bool =
fid != FilterID(0)
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Public functions, miscellaneous # Public functions, miscellaneous
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------

View File

@ -32,6 +32,7 @@ type
SaveQid ## Store new item SaveQid ## Store new item
HoldQid ## Move/append range items to local queue HoldQid ## Move/append range items to local queue
DequQid ## Store merged local queue items DequQid ## Store merged local queue items
DelQid ## Delete entry from last overflow queue
QidLayoutRef* = ref object QidLayoutRef* = ref object
## Layout of cascaded list of filter ID slot queues where a slot queue ## Layout of cascaded list of filter ID slot queues where a slot queue

View File

@ -13,6 +13,20 @@ import
".."/[aristo_constants, aristo_desc], ".."/[aristo_constants, aristo_desc],
./filter_desc ./filter_desc
type
QuFilMap* = proc(qid: QueueID): FilterID {.gcsafe, raises: [].}
## The map `fn: QueueID -> FilterID` can be augmented to a strictly
## *decreasing* map `g: {0 .. N} -> FilterID`, with `g = fn([])`
##
## * `i < j` => `fn(fifo[j]) < fn(fifo[i])`
##
## for a `fifo` of type `QidSchedRef`, `N = fifo.len` and the function
## `[]: {0 .. N} -> QueueID` as defined below.
##
## This *decreasing* requirement can be seen as a generalisation of a
## block chain scenario with `i`, `j` backward steps into the past and
## the `FilterID` as the block number.
const const
ZeroQidPair = (QueueID(0),QueueID(0)) ZeroQidPair = (QueueID(0),QueueID(0))
@ -216,7 +230,7 @@ func fifoDel(
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc stats*( proc stats*(
ctx: openArray[tuple[size, width: int]]; ctx: openArray[tuple[size, width: int]]; # Schedule layout
): tuple[maxQueue: int, minCovered: int, maxCovered: int] = ): tuple[maxQueue: int, minCovered: int, maxCovered: int] =
## Number of maximally stored and covered queued entries for the argument ## Number of maximally stored and covered queued entries for the argument
## layout `ctx`. The resulting value of `maxQueue` entry is the maximal ## layout `ctx`. The resulting value of `maxQueue` entry is the maximal
@ -232,30 +246,29 @@ proc stats*(
result.maxCovered += (size * step).int result.maxCovered += (size * step).int
proc stats*( proc stats*(
ctx: openArray[tuple[size, width, wrap: int]]; ctx: openArray[tuple[size, width, wrap: int]]; # Schedule layout
): tuple[maxQueue: int, minCovered: int, maxCovered: int] = ): tuple[maxQueue: int, minCovered: int, maxCovered: int] =
## Variant of `stats()` ## Variant of `stats()`
ctx.toSeq.mapIt((it[0],it[1])).stats ctx.toSeq.mapIt((it[0],it[1])).stats
proc stats*( proc stats*(
ctx: QidLayoutRef; ctx: QidLayoutRef; # Cascaded fifos descriptor
): tuple[maxQueue: int, minCovered: int, maxCovered: int] = ): tuple[maxQueue: int, minCovered: int, maxCovered: int] =
## Variant of `stats()` ## Variant of `stats()`
ctx.q.toSeq.mapIt((it[0].int,it[1].int)).stats ctx.q.toSeq.mapIt((it[0].int,it[1].int)).stats
proc addItem*( proc addItem*(
fifo: QidSchedRef; fifo: QidSchedRef; # Cascaded fifos descriptor
): tuple[exec: seq[QidAction], fifo: QidSchedRef] = ): tuple[exec: seq[QidAction], fifo: QidSchedRef] =
## Get the instructions for adding a new slot to the cascades queues. The ## Get the instructions for adding a new slot to the cascades queues. The
## argument `fifo` is a complete state of the addresses a cascaded *FIFO* ## argument `fifo` is a complete state of the addresses of a cascaded *FIFO*
## when applied to a database. Only the *FIFO* queue addresses are needed ## when applied to a database. Only the *FIFO* queue addresses are needed
## in order to describe how to add another item. ## in order to describe how to add another item.
## ##
## Return value is a list of instructions what to do when adding a new item ## The function returns a list of instructions what to do when adding a new
## and the new state of the cascaded *FIFO*. ## item and the new state of the cascaded *FIFO*. The following instructions
## ## may be returned:
## The following instructions may be returned:
## :: ## ::
## SaveQid <queue-id> -- Store a new item under the address ## SaveQid <queue-id> -- Store a new item under the address
## -- <queue-id> on the database. ## -- <queue-id> on the database.
@ -271,6 +284,10 @@ proc addItem*(
## -- <queue-id> on the database. Clear the ## -- <queue-id> on the database. Clear the
## -- the hold queue. ## -- the hold queue.
## ##
## DelQid <queue-id> -- Delete item. This happens if the last
## -- oberflow queue needs to make space for
## -- another item.
##
let let
ctx = fifo.ctx.q ctx = fifo.ctx.q
var var
@ -280,7 +297,7 @@ proc addItem*(
for n in 0 ..< ctx.len: for n in 0 ..< ctx.len:
if state.len < n + 1: if state.len < n + 1:
state.setlen(n + 1) state.setLen(n + 1)
let let
overlapWidth = ctx[(n+1) mod ctx.len].width overlapWidth = ctx[(n+1) mod ctx.len].width
@ -296,6 +313,7 @@ proc addItem*(
revActions.add QidAction(op: SaveQid, qid: qQidAdded) revActions.add QidAction(op: SaveQid, qid: qQidAdded)
if 0 < deferred.len: if 0 < deferred.len:
revActions &= deferred revActions &= deferred
deferred.setLen(0)
break break
else: else:
@ -324,8 +342,241 @@ proc addItem*(
# End loop # End loop
# Delete item from final overflow queue. There is only one as `overlapWidth`
# is `ctx[0]` which is `0`
if 0 < deferred.len:
revActions.add QidAction(
op: DelQid,
qid: deferred[0].qid)
(revActions.reversed, QidSchedRef(ctx: fifo.ctx, state: state)) (revActions.reversed, QidSchedRef(ctx: fifo.ctx, state: state))
proc fetchItems*(
fifo: QidSchedRef; # Cascaded fifos descriptor
size: int; # Leading items to merge
): tuple[exec: seq[QidAction], fifo: QidSchedRef] =
## Get the instructions for extracting the latest `size` items from the
## cascaded queues. argument `fifo` is a complete state of the addresses of
## a cascaded *FIFO* when applied to a database. Only the *FIFO* queue
## addresses are used in order to describe how to add another item.
##
## The function returns a list of instructions what to do when adding a new
## item and the new state of the cascaded *FIFO*. The following instructions
## may be returned:
## ::
## HoldQid <from-id>..<to-id> -- Move the records accessed by the argument
## -- addresses from the database to the right
## -- end of the local hold queue. The age of
## -- the items on the hold queue increases
## -- left to right.
##
## The extracted items will then be available from the hold queue.
var
actions: seq[QidAction]
state = fifo.state
if 0 < size:
var size = size.uint64
for n in 0 ..< fifo.state.len:
let q = fifo.state[n]
if q[0] == 0:
discard
elif q[0] <= q[1]:
# Single file
# ::
# | :
# | q[0]--> 3
# | 4
# | 5 <--q[1]
# | :
#
let qSize = q[1] - q[0] + 1
if size <= qSize:
if size < qSize:
state[n][1] = q[1] - size
elif state.len == n + 1:
state.setLen(n)
else:
state[n] = (QueueID(0), QueueID(0))
actions.add QidAction(
op: HoldQid,
qid: n.globalQid(q[1] - size + 1),
xid: n.globalQid q[1])
break
actions.add QidAction(
op: HoldQid,
qid: n.globalQid q[0],
xid: n.globalQid q[1])
state[n] = (QueueID(0), QueueID(0))
size -= qSize # Otherwise continue
else:
# Wrap aound, double files
# ::
# | :
# | 3 <--q[1]
# | 4
# | q[0]--> 5
# | :
# | wrap
let
wrap = fifo.ctx.q[n].wrap
qSize1 = q[1] - QueueID(0)
if size <= qSize1:
if size == qSize1:
state[n][1] = wrap
else:
state[n][1] = q[1] - size
actions.add QidAction(
op: HoldQid,
qid: n.globalQid(q[1] - size + 1),
xid: n.globalQid q[1])
break
actions.add QidAction(
op: HoldQid,
qid: n.globalQid QueueID(1),
xid: n.globalQid q[1])
size -= qSize1 # Otherwise continue
let qSize0 = wrap - q[0] + 1
if size <= qSize0:
if size < qSize0:
state[n][1] = wrap - size
elif state.len == n + 1:
state.setLen(n)
else:
state[n] = (QueueID(0), QueueID(0))
actions.add QidAction(
op: HoldQid,
qid: n.globalQid wrap - size + 1,
xid: n.globalQid wrap)
break
actions.add QidAction(
op: HoldQid,
qid: n.globalQid q[0],
xid: n.globalQid wrap)
size -= qSize0
state[n] = (QueueID(0), QueueID(0))
(actions, QidSchedRef(ctx: fifo.ctx, state: state))
proc lengths*(
fifo: QidSchedRef; # Cascaded fifos descriptor
): seq[int] =
## Return the list of lengths for all cascaded sub-fifos.
for n in 0 ..< fifo.state.len:
result.add fifo.state[n].fifoLen(fifo.ctx.q[n].wrap).int
proc len*(
fifo: QidSchedRef; # Cascaded fifos descriptor
): int =
## Size of the fifo
fifo.lengths.foldl(a + b, 0)
proc `[]`*(
fifo: QidSchedRef; # Cascaded fifos descriptor
inx: int; # Index into latest items
): QueueID =
## Get the queue ID of the `inx`-th `fifo` entry where index `0` refers to
## the entry most recently added, `1` the one before, etc. If there is no
## such entry `QueueID(0)` is returned.
if 0 <= inx:
var inx = inx.uint64
for n in 0 ..< fifo.state.len:
let q = fifo.state[n]
if q[0] == 0:
discard
elif q[0] <= q[1]:
# Single file
# ::
# | :
# | q[0]--> 3
# | 4
# | 5 <--q[1]
# | :
#
let qInxMax = q[1] - q[0]
if inx <= qInxMax:
return n.globalQid(q[1] - inx)
inx -= qInxMax + 1 # Otherwise continue
else:
# Wrap aound, double files
# ::
# | :
# | 3 <--q[1]
# | 4
# | q[0]--> 5
# | :
# | wrap
let qInxMax1 = q[1] - QueueID(1)
if inx <= qInxMax1:
return n.globalQid(q[1] - inx)
inx -= qInxMax1 + 1 # Otherwise continue
let
wrap = fifo.ctx.q[n].wrap
qInxMax0 = wrap - q[0]
if inx <= qInxMax0:
return n.globalQid(wrap - inx)
inx -= qInxMax0 + 1 # Otherwise continue
proc le*(
fifo: QidSchedRef; # Cascaded fifos descriptor
fid: FilterID; # Upper bound
fn: QuFilMap; # QueueID/FilterID mapping
): QueueID =
## Find the `qid` address of type `QueueID` with
## * `fn(qid) <= fid`
## * for all `qid1` with `fn(qid1) <= fid` one has `fn(qid1) <= fn(qid)`
##
## The argument type `QuFilMap` of map `fn()` has been commented on earlier.
##
var
left = 0
right = fifo.len - 1
if 0 <= right:
let maxQid = fifo[left]
if maxQid.fn <= fid:
return maxQid
# Bisection
if fifo[right].fn <= fid:
while 1 < right - left:
let half = (left + right) div 2
#
# FilterID: 100 70 33
# inx: left ... half ... right
# fid: 77
#
# with `fifo[left].fn > fid >= fifo[right].fn`
#
if fid >= fifo[half].fn:
right = half
else: # fifo[half].fn > fid
left = half
# Now: `fifo[right].fn <= fid < fifo[left].fn` (and `right == left+1`)
return fifo[right]
# otherwise QueueID(0)
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# End # End
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------

View File

@ -83,7 +83,7 @@ proc miscRunner(
test "VertexID recyling lists": test "VertexID recyling lists":
check noisy.testVidRecycleLists() check noisy.testVidRecycleLists()
test &"QueueID slot management (sample size: {qidSampleSize})": test &"QueueID cascaded fifos API (sample size: {qidSampleSize})":
check noisy.testQidScheduler(sampleSize = qidSampleSize) check noisy.testQidScheduler(sampleSize = qidSampleSize)

View File

@ -12,7 +12,7 @@
## Aristo (aka Patricia) DB trancoder test ## Aristo (aka Patricia) DB trancoder test
import import
std/[algorithm, sequtils, strutils], std/[algorithm, sequtils, sets, strutils],
eth/common, eth/common,
results, results,
stew/byteutils, stew/byteutils,
@ -31,7 +31,7 @@ type
fid: FilterID fid: FilterID
width: uint32 width: uint32
QTab = Table[QueueID,QValRef] QTabRef = TableRef[QueueID,QValRef]
const const
QidSlotLyo = [(4,0,10),(3,3,10),(3,4,10),(3,5,10)] QidSlotLyo = [(4,0,10),(3,3,10),(3,4,10),(3,5,10)]
@ -43,11 +43,19 @@ const
# Private helpers # Private helpers
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
template trueOrReturn(expr: untyped): untyped = template xCheck(expr: untyped): untyped =
## Note: this check will invoke `expr` twice
if not (expr): if not (expr):
check expr check expr
return return
template xCheck(expr: untyped; ifFalse: untyped): untyped =
## Note: this check will invoke `expr` twice
if not (expr):
ifFalse
check expr
return
# --------------------- # ---------------------
proc posixPrngRand(state: var uint32): byte = proc posixPrngRand(state: var uint32): byte =
@ -92,28 +100,30 @@ proc `+`(a: VertexID, b: int): VertexID =
# --------------------- # ---------------------
func sortedPairs(t: QTab): seq[(QueueID,QValRef)] = func sortedPairs(qt: QTabRef): seq[(QueueID,QValRef)] =
t.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.QueueID).mapIt((it,t[it])) qt.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.QueueID).mapIt((it,qt[it]))
func fifos(qt: QTab; scd: QidSchedRef): seq[seq[(QueueID,QValRef)]] = func fifos(qt: QTabRef; scd: QidSchedRef): seq[seq[(QueueID,QValRef)]] =
proc kvp(chn: int, qid: QueueID): (QueueID,QValRef) = proc kvp(chn: int, qid: QueueID): (QueueID,QValRef) =
let let
cid = QueueID((chn.uint64 shl 62) or qid.uint64) cid = QueueID((chn.uint64 shl 62) or qid.uint64)
val = qt.getOrDefault(cid, QValRef(nil)) val = qt.getOrDefault(cid, QValRef(nil))
(qid, val) (cid, val)
for i in 0 ..< scd.state.len: for i in 0 ..< scd.state.len:
let let
left = scd.state[i][0] left = scd.state[i][0]
right = scd.state[i][1] right = scd.state[i][1]
result.add newSeq[(QueueID,QValRef)](0) result.add newSeq[(QueueID,QValRef)](0)
if left <= right: if left == 0:
for j in left .. right: discard
elif left <= right:
for j in right.countDown left:
result[i].add kvp(i, j) result[i].add kvp(i, j)
else: else:
for j in left .. scd.ctx.q[i].wrap: for j in right.countDown QueueID(1):
result[i].add kvp(i, j) result[i].add kvp(i, j)
for j in QueueID(1) .. right: for j in scd.ctx.q[i].wrap.countDown left:
result[i].add kvp(i, j) result[i].add kvp(i, j)
func flatten(a: seq[seq[(QueueID,QValRef)]]): seq[(QueueID,QValRef)] = func flatten(a: seq[seq[(QueueID,QValRef)]]): seq[(QueueID,QValRef)] =
@ -130,11 +140,125 @@ func pp(val: QValRef): string =
func pp(kvp: (QueueID,QValRef)): string = func pp(kvp: (QueueID,QValRef)): string =
kvp[0].pp & "=" & kvp[1].pp kvp[0].pp & "=" & kvp[1].pp
func pp(t: QTab): string = func pp(qt: QTabRef): string =
"{" & t.sortedPairs.mapIt(it.pp).join(",") & "}" "{" & qt.sortedPairs.mapIt(it.pp).join(",") & "}"
func pp(t: QTab; scd: QidSchedRef): string = func pp(qt: QTabRef; scd: QidSchedRef): string =
"[" & t.fifos(scd).flatten.mapIt(it.pp).join(",") & "]" result = "["
for w in qt.fifos scd:
if w.len == 0:
result &= "ø"
else:
result &= w.mapIt(it.pp).join(",")
result &= ","
if result[^1] == ',':
result[^1] = ']'
else:
result &= "]"
# ------------------
proc exec(db: QTabRef; serial: int; instr: seq[QidAction]; relax: bool): bool =
## ..
var
saved: bool
hold: seq[(QueueID,QueueID)]
for act in instr:
case act.op:
of Oops:
xCheck act.op != Oops
of SaveQid:
xCheck not saved
db[act.qid] = QValRef(fid: FilterID(serial))
saved = true
of DelQid:
let val = db.getOrDefault(act.qid, QValRef(nil))
xCheck not val.isNil
db.del act.qid
of HoldQid:
hold.add (act.qid, act.xid)
of DequQid:
var merged = QValRef(nil)
for w in hold:
for qid in w[0] .. w[1]:
let val = db.getOrDefault(qid, QValRef(nil))
if not relax:
xCheck not val.isNil
if not val.isNil:
if merged.isNil:
merged = val
else:
if relax:
xCheck merged.fid + merged.width + 1 <= val.fid
else:
xCheck merged.fid + merged.width + 1 == val.fid
merged.width += val.width + 1
db.del qid
if not relax:
xCheck not merged.isNil
if not merged.isNil:
db[act.qid] = merged
hold.setLen(0)
xCheck saved
xCheck hold.len == 0
true
proc validate(db: QTabRef; scd: QidSchedRef; serial: int; relax: bool): bool =
## Verify that the round-robin queues in `db` are consecutive and in the
## right order.
var
step = 1u
lastVal = FilterID(serial+1)
for chn,queue in db.fifos scd:
step *= scd.ctx.q[chn].width + 1 # defined by schedule layout
for kvp in queue:
let (qid,val) = (kvp[0], kvp[1])
if not relax:
xCheck not val.isNil # Entries must exist
xCheck val.fid + step == lastVal # Item distances must match
if not val.isNil:
xCheck val.fid + step <= lastVal # Item distances must decrease
xCheck val.width + 1 == step # Must correspond to `step` size
lastVal = val.fid
# Compare database against expected fill state
if relax:
xCheck db.len <= scd.len
else:
xCheck db.len == scd.len
proc qFn(qid: QueueID): FilterID =
let val = db.getOrDefault(qid, QValRef(nil))
if not val.isNil:
return val.fid
# Test filter ID selection
var lastFid = FilterID(serial + 1)
xCheck scd.le(lastFid + 0, qFn) == scd[0] # Test fringe condition
xCheck scd.le(lastFid + 1, qFn) == scd[0] # Test fringe condition
for (qid,val) in db.fifos(scd).flatten:
for w in (lastFid-1).countDown val.fid:
xCheck scd.le(w, qFn) == qid
lastFid = val.fid
if FilterID(1) < lastFid: # Test fringe condition
xCheck scd.le(lastFid - 1, qFn) == QueueID(0)
if FilterID(2) < lastFid: # Test fringe condition
xCheck scd.le(lastFid - 2, qFn) == QueueID(0)
true
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Public test function # Public test function
@ -161,7 +285,7 @@ proc testVidRecycleLists*(noisy = true; seed = 42): bool =
expectedVids += (vid < first).ord expectedVids += (vid < first).ord
db.vidDispose vid db.vidDispose vid
trueOrReturn db.top.vGen.len == expectedVids xCheck db.top.vGen.len == expectedVids
noisy.say "***", "vids=", db.top.vGen.len, " discarded=", count-expectedVids noisy.say "***", "vids=", db.top.vGen.len, " discarded=", count-expectedVids
# Serialise/deserialise # Serialise/deserialise
@ -173,43 +297,43 @@ proc testVidRecycleLists*(noisy = true; seed = 42): bool =
db1 = newAristoDbRef BackendVoid db1 = newAristoDbRef BackendVoid
rc = dbBlob.deblobify seq[VertexID] rc = dbBlob.deblobify seq[VertexID]
if rc.isErr: if rc.isErr:
trueOrReturn rc.error == AristoError(0) xCheck rc.error == AristoError(0)
else: else:
db1.top.vGen = rc.value db1.top.vGen = rc.value
trueOrReturn db.top.vGen == db1.top.vGen xCheck db.top.vGen == db1.top.vGen
# Make sure that recycled numbers are fetched first # Make sure that recycled numbers are fetched first
let topVid = db.top.vGen[^1] let topVid = db.top.vGen[^1]
while 1 < db.top.vGen.len: while 1 < db.top.vGen.len:
let w = db.vidFetch() let w = db.vidFetch()
trueOrReturn w < topVid xCheck w < topVid
trueOrReturn db.top.vGen.len == 1 and db.top.vGen[0] == topVid xCheck db.top.vGen.len == 1 and db.top.vGen[0] == topVid
# Get some consecutive vertex IDs # Get some consecutive vertex IDs
for n in 0 .. 5: for n in 0 .. 5:
let w = db.vidFetch() let w = db.vidFetch()
trueOrReturn w == topVid + n xCheck w == topVid + n
trueOrReturn db.top.vGen.len == 1 xCheck db.top.vGen.len == 1
# Repeat last test after clearing the cache # Repeat last test after clearing the cache
db.top.vGen.setLen(0) db.top.vGen.setLen(0)
for n in 0 .. 5: for n in 0 .. 5:
let w = db.vidFetch() let w = db.vidFetch()
trueOrReturn w == VertexID(2) + n # VertexID(1) is default root ID xCheck w == VertexID(2) + n # VertexID(1) is default root ID
trueOrReturn db.top.vGen.len == 1 xCheck db.top.vGen.len == 1
# Recycling and re-org tests # Recycling and re-org tests
func toVQ(a: seq[int]): seq[VertexID] = a.mapIt(VertexID(it)) func toVQ(a: seq[int]): seq[VertexID] = a.mapIt(VertexID(it))
trueOrReturn @[8, 7, 3, 4, 5, 9] .toVQ.vidReorg == @[3, 4, 5, 7] .toVQ xCheck @[8, 7, 3, 4, 5, 9] .toVQ.vidReorg == @[3, 4, 5, 7] .toVQ
trueOrReturn @[8, 7, 6, 3, 4, 5, 9] .toVQ.vidReorg == @[3] .toVQ xCheck @[8, 7, 6, 3, 4, 5, 9] .toVQ.vidReorg == @[3] .toVQ
trueOrReturn @[5, 4, 3, 7] .toVQ.vidReorg == @[5, 4, 3, 7] .toVQ xCheck @[5, 4, 3, 7] .toVQ.vidReorg == @[5, 4, 3, 7] .toVQ
trueOrReturn @[5] .toVQ.vidReorg == @[5] .toVQ xCheck @[5] .toVQ.vidReorg == @[5] .toVQ
trueOrReturn @[3, 5] .toVQ.vidReorg == @[3, 5] .toVQ xCheck @[3, 5] .toVQ.vidReorg == @[3, 5] .toVQ
trueOrReturn @[4, 5] .toVQ.vidReorg == @[4] .toVQ xCheck @[4, 5] .toVQ.vidReorg == @[4] .toVQ
trueOrReturn newSeq[VertexID](0).vidReorg().len == 0 xCheck newSeq[VertexID](0).vidReorg().len == 0
true true
@ -218,6 +342,7 @@ proc testQidScheduler*(
noisy = true; noisy = true;
layout = QidSlotLyo; layout = QidSlotLyo;
sampleSize = QidSample; sampleSize = QidSample;
reorgPercent = 40
): bool = ): bool =
## ##
## Example table for `QidSlotLyo` layout after 10_000 cycles ## Example table for `QidSlotLyo` layout after 10_000 cycles
@ -225,159 +350,117 @@ proc testQidScheduler*(
## QueueID | QValRef | ## QueueID | QValRef |
## | FilterID | width | comment ## | FilterID | width | comment
## --------+----------+-------+---------------------------------- ## --------+----------+-------+----------------------------------
## %7 | 9997 | 0 | %7 stands for QueueID(7) ## %a | 10000 | 0 | %a stands for QueueID(10)
## %8 | 9998 | 0 |
## %9 | 9999 | 0 | ## %9 | 9999 | 0 |
## %a | 10000 | 0 | ## %8 | 9998 | 0 |
## %7 | 9997 | 0 |
## | | | ## | | |
## %1:6 | 9981 | 3 | %1:6 stands for QueueID((1 shl 62) + 6)
## %1:7 | 9985 | 3 |
## %1:8 | 9989 | 3 |
## %1:9 | 9993 | 3 | 9993 + 3 + 1 => 9997, see %7 ## %1:9 | 9993 | 3 | 9993 + 3 + 1 => 9997, see %7
## %1:8 | 9989 | 3 |
## %1:7 | 9985 | 3 |
## %1:6 | 9981 | 3 | %1:6 stands for QueueID((1 shl 62) + 6)
## | | | ## | | |
## %2:3 | 9841 | 19 |
## %2:4 | 9861 | 19 |
## %2:5 | 9881 | 19 |
## %2:6 | 9901 | 19 |
## %2:7 | 9921 | 19 |
## %2:8 | 9941 | 19 |
## %2:9 | 9961 | 19 | 9961 + 19 + 1 => 9981, see %1:6 ## %2:9 | 9961 | 19 | 9961 + 19 + 1 => 9981, see %1:6
## %2:8 | 9941 | 19 |
## %2:7 | 9921 | 19 |
## %2:6 | 9901 | 19 |
## %2:5 | 9881 | 19 |
## %2:4 | 9861 | 19 |
## %2:3 | 9841 | 19 |
## | | | ## | | |
## %3:a | 9481 | 119 |
## %3:1 | 9601 | 119 |
## %3:2 | 9721 | 119 | 9721 + 119 + 1 => 9871, see %2:3 ## %3:2 | 9721 | 119 | 9721 + 119 + 1 => 9871, see %2:3
## %3:1 | 9601 | 119 |
## %3:a | 9481 | 119 |
## ##
var var
list: Qtab
debug = false # or true debug = false # or true
let let
list = newTable[QueueID,QValRef]()
scd = QidSchedRef.init layout scd = QidSchedRef.init layout
ctx = scd.ctx.q ctx = scd.ctx.q
proc show(serial = 0; exec: seq[QidAction] = @[]) =
var s = ""
if 0 < serial:
s &= "n=" & $serial
if 0 < exec.len:
s &= " exec=" & exec.pp
s &= "" &
"\n state=" & scd.state.pp &
"\n list=" & list.pp &
"\n fifo=" & list.pp(scd) &
"\n"
noisy.say "***", s
if debug: if debug:
noisy.say "***", "testFilterSchedule", noisy.say "***", "sampleSize=", sampleSize,
" ctx=", ctx, " ctx=", ctx, " stats=", scd.ctx.stats
" stats=", scd.ctx.stats
for n in 1 .. sampleSize: for n in 1 .. sampleSize:
let w = scd.addItem() let w = scd.addItem()
let execOk = list.exec(serial=n, instr=w.exec, relax=false)
if debug and false: xCheck execOk
noisy.say "***", "testFilterSchedule",
" n=", n,
" => ", w.exec.pp,
" / ", w.fifo.state.pp
var
saved = false
hold: seq[(QueueID,QueueID)]
for act in w.exec:
case act.op:
of Oops:
noisy.say "***", "testFilterSchedule", " n=", n, " act=", act.pp
of SaveQid:
if saved:
noisy.say "***", "testFilterSchedule", " n=", n, " act=", act.pp,
" hold=", hold.pp, " state=", scd.state.pp, " fifo=", list.pp scd
check not saved
return
list[act.qid] = QValRef(fid: FilterID(n))
saved = true
of HoldQid:
hold.add (act.qid, act.xid)
of DequQid:
var merged = QValRef(nil)
for w in hold:
for qid in w[0] .. w[1]:
let val = list.getOrDefault(qid, QValRef(nil))
if val.isNil:
noisy.say "***", "testFilterSchedule", " n=", n, " act=", act.pp,
" hold=", hold.pp, " state=", scd.state.pp, " fifo=", list.pp scd
check not val.isNil
return
if merged.isNil:
merged = val
elif merged.fid + merged.width + 1 == val.fid:
merged.width += val.width + 1
else:
noisy.say "***", "testFilterSchedule", " n=", n, " act=", act.pp,
" hold=", hold.pp, " state=", scd.state.pp, " fifo=", list.pp scd
check merged.fid + merged.width + 1 == val.fid
return
list.del qid
if merged.isNil:
noisy.say "***", "testFilterSchedule", " n=", n, " act=", act.pp,
" hold=", hold.pp, " state=", scd.state.pp, " fifo=", list.pp scd
check not merged.isNil
return
list[act.qid] = merged
hold.setLen(0)
scd[] = w.fifo[] scd[] = w.fifo[]
let validateOk = list.validate(scd, serial=n, relax=false)
xCheck validateOk:
show(serial=n, exec=w.exec)
# Verify that the round-robin queues in `list` are consecutive and in the let fifoID = list.fifos(scd).flatten.mapIt(it[0])
# right order. for j in 0 ..< list.len:
var xCheck fifoID[j] == scd[j]:
botVal = FilterID(0) noisy.say "***", "n=", n, " exec=", w.exec.pp,
step = 1u " fifoID[", j, "]=", fifoID[j].pp,
for chn,queue in list.fifos scd: " scd[", j, "]=", scd[j].pp,
var lastVal = FilterID(0) "\n fifo=", list.pp scd
step *= ctx[chn].width + 1 # defined by schedule layout
for kvp in queue:
let (qid,val) = (kvp[0], kvp[1])
# Entries must exist
if val.isNil:
noisy.say "***", "testFilterSchedule", " n=", n, " chn=", chn,
" exec=", w.exec.pp, " kvp=", kvp.pp, " fifo=", list.pp scd
check not val.isNil
return
# Fid value fields must increase witin a sub-queue.
if val.fid <= lastVal:
noisy.say "***", "testFilterSchedule", " n=", n, " chn=", chn,
" exec=", w.exec.pp, " kvp=", kvp.pp, " fifo=", list.pp scd
check lastVal < val.fid
return
# Width value must correspond to `step` size
if val.width + 1 != step:
noisy.say "***", "testFilterSchedule", " n=", n, " chn=", chn,
" exec=", w.exec.pp, " kvp=", kvp.pp, " fifo=", list.pp scd
check val.width + 1 == step
return
# Item distances must match the step width
if lastVal != 0:
let dist = val.fid - lastVal
if dist != step.uint64:
noisy.say "***", "testFilterSchedule", " n=", n, " chn=", chn,
" exec=", w.exec.pp, " kvp=", kvp.pp, " fifo=", list.pp scd
check dist == step.uint64
return
lastVal = val.fid
# The top value of the current queue must be smaller than the
# bottom value of the previous one
if 0 < chn and botVal != queue[^1][1].fid + step.uint64:
noisy.say "***", "testFilterSchedule", " n=", n, " chn=", chn,
" exec=", w.exec.pp, " step=", step, " fifo=", list.pp scd
check botVal == queue[^1][1].fid + step.uint64
return
botVal = queue[0][1].fid
if debug: if debug:
noisy.say "***", "testFilterSchedule", show(exec=w.exec)
" n=", n,
"\n exec=", w.exec.pp, # -------------------
"\n state=", scd.state.pp,
"\n list=", list.pp, # Mark deleted some entries from database
"\n fifo=", list.pp scd, var
"\n" nDel = (list.len * reorgPercent) div 100
delIDs: HashSet[QueueID]
for n in 0 ..< nDel:
delIDs.incl scd[n]
# Delete these entries
let fetch = scd.fetchItems nDel
for act in fetch.exec:
xCheck act.op == HoldQid
for qid in act.qid .. act.xid:
xCheck qid in delIDs
xCheck list.hasKey qid
delIDs.excl qid
list.del qid
xCheck delIDs.len == 0
scd[] = fetch.fifo[]
# -------------------
# Continue adding items
for n in sampleSize + 1 .. 2 * sampleSize:
let w = scd.addItem()
let execOk = list.exec(serial=n, instr=w.exec, relax=true)
xCheck execOk
scd[] = w.fifo[]
let validateOk = list.validate(scd, serial=n, relax=true)
xCheck validateOk:
show(serial=n, exec=w.exec)
# Continue adding items, now strictly
for n in 2 * sampleSize + 1 .. 3 * sampleSize:
let w = scd.addItem()
let execOk = list.exec(serial=n, instr=w.exec, relax=false)
xCheck execOk
scd[] = w.fifo[]
let validateOk = list.validate(scd, serial=n, relax=false)
xCheck validateOk
if debug: # or true:
show()
true true