diff --git a/nimbus/db/aristo/aristo_debug.nim b/nimbus/db/aristo/aristo_debug.nim index dd097a315..70aa43b9b 100644 --- a/nimbus/db/aristo/aristo_debug.nim +++ b/nimbus/db/aristo/aristo_debug.nim @@ -86,6 +86,11 @@ proc ppVid(vid: VertexID; pfx = true): string = else: result &= "ø" +proc ppFid(fid: FilterID): string = + if not fid.isValid: + return "ø" + "@" & $fid + proc ppQid(qid: QueueID): string = if not qid.isValid: return "ø" @@ -471,6 +476,9 @@ proc pp*(vid: VertexID): string = proc pp*(qid: QueueID): string = qid.ppQid +proc pp*(fid: FilterID): string = + fid.ppFid + proc pp*(a: openArray[(QueueID,QueueID)]): string = "[" & a.toSeq.mapIt("(" & it[0].pp & "," & it[1].pp & ")").join(",") & "]" diff --git a/nimbus/db/aristo/aristo_desc.nim b/nimbus/db/aristo/aristo_desc.nim index a4805c905..4580a6441 100644 --- a/nimbus/db/aristo/aristo_desc.nim +++ b/nimbus/db/aristo/aristo_desc.nim @@ -109,6 +109,9 @@ func isValid*(vid: VertexID): bool = func isValid*(qid: QueueID): bool = qid != QueueID(0) +func isValid*(fid: FilterID): bool = + fid != FilterID(0) + # ------------------------------------------------------------------------------ # Public functions, miscellaneous # ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_filter/filter_desc.nim b/nimbus/db/aristo/aristo_filter/filter_desc.nim index 525f520c6..4857e0aaf 100644 --- a/nimbus/db/aristo/aristo_filter/filter_desc.nim +++ b/nimbus/db/aristo/aristo_filter/filter_desc.nim @@ -32,6 +32,7 @@ type SaveQid ## Store new item HoldQid ## Move/append range items to local queue DequQid ## Store merged local queue items + DelQid ## Delete entry from last overflow queue QidLayoutRef* = ref object ## Layout of cascaded list of filter ID slot queues where a slot queue diff --git a/nimbus/db/aristo/aristo_filter/filter_scheduler.nim b/nimbus/db/aristo/aristo_filter/filter_scheduler.nim index a42cd8c29..301157811 100644 --- a/nimbus/db/aristo/aristo_filter/filter_scheduler.nim +++ b/nimbus/db/aristo/aristo_filter/filter_scheduler.nim @@ -13,6 +13,20 @@ import ".."/[aristo_constants, aristo_desc], ./filter_desc +type + QuFilMap* = proc(qid: QueueID): FilterID {.gcsafe, raises: [].} + ## The map `fn: QueueID -> FilterID` can be augmented to a strictly + ## *decreasing* map `g: {0 .. N} -> FilterID`, with `g = fn([])` + ## + ## * `i < j` => `fn(fifo[j]) < fn(fifo[i])` + ## + ## for a `fifo` of type `QidSchedRef`, `N = fifo.len` and the function + ## `[]: {0 .. N} -> QueueID` as defined below. + ## + ## This *decreasing* requirement can be seen as a generalisation of a + ## block chain scenario with `i`, `j` backward steps into the past and + ## the `FilterID` as the block number. + const ZeroQidPair = (QueueID(0),QueueID(0)) @@ -216,7 +230,7 @@ func fifoDel( # ------------------------------------------------------------------------------ proc stats*( - ctx: openArray[tuple[size, width: int]]; + ctx: openArray[tuple[size, width: int]]; # Schedule layout ): tuple[maxQueue: int, minCovered: int, maxCovered: int] = ## Number of maximally stored and covered queued entries for the argument ## layout `ctx`. The resulting value of `maxQueue` entry is the maximal @@ -232,30 +246,29 @@ proc stats*( result.maxCovered += (size * step).int proc stats*( - ctx: openArray[tuple[size, width, wrap: int]]; + ctx: openArray[tuple[size, width, wrap: int]]; # Schedule layout ): tuple[maxQueue: int, minCovered: int, maxCovered: int] = ## Variant of `stats()` ctx.toSeq.mapIt((it[0],it[1])).stats proc stats*( - ctx: QidLayoutRef; + ctx: QidLayoutRef; # Cascaded fifos descriptor ): tuple[maxQueue: int, minCovered: int, maxCovered: int] = ## Variant of `stats()` ctx.q.toSeq.mapIt((it[0].int,it[1].int)).stats proc addItem*( - fifo: QidSchedRef; + fifo: QidSchedRef; # Cascaded fifos descriptor ): tuple[exec: seq[QidAction], fifo: QidSchedRef] = ## Get the instructions for adding a new slot to the cascades queues. The - ## argument `fifo` is a complete state of the addresses a cascaded *FIFO* + ## argument `fifo` is a complete state of the addresses of a cascaded *FIFO* ## when applied to a database. Only the *FIFO* queue addresses are needed ## in order to describe how to add another item. ## - ## Return value is a list of instructions what to do when adding a new item - ## and the new state of the cascaded *FIFO*. - ## - ## The following instructions may be returned: + ## The function returns a list of instructions what to do when adding a new + ## item and the new state of the cascaded *FIFO*. The following instructions + ## may be returned: ## :: ## SaveQid -- Store a new item under the address ## -- on the database. @@ -271,6 +284,10 @@ proc addItem*( ## -- on the database. Clear the ## -- the hold queue. ## + ## DelQid -- Delete item. This happens if the last + ## -- oberflow queue needs to make space for + ## -- another item. + ## let ctx = fifo.ctx.q var @@ -280,7 +297,7 @@ proc addItem*( for n in 0 ..< ctx.len: if state.len < n + 1: - state.setlen(n + 1) + state.setLen(n + 1) let overlapWidth = ctx[(n+1) mod ctx.len].width @@ -296,6 +313,7 @@ proc addItem*( revActions.add QidAction(op: SaveQid, qid: qQidAdded) if 0 < deferred.len: revActions &= deferred + deferred.setLen(0) break else: @@ -324,8 +342,241 @@ proc addItem*( # End loop + # Delete item from final overflow queue. There is only one as `overlapWidth` + # is `ctx[0]` which is `0` + if 0 < deferred.len: + revActions.add QidAction( + op: DelQid, + qid: deferred[0].qid) + (revActions.reversed, QidSchedRef(ctx: fifo.ctx, state: state)) + +proc fetchItems*( + fifo: QidSchedRef; # Cascaded fifos descriptor + size: int; # Leading items to merge + ): tuple[exec: seq[QidAction], fifo: QidSchedRef] = + ## Get the instructions for extracting the latest `size` items from the + ## cascaded queues. argument `fifo` is a complete state of the addresses of + ## a cascaded *FIFO* when applied to a database. Only the *FIFO* queue + ## addresses are used in order to describe how to add another item. + ## + ## The function returns a list of instructions what to do when adding a new + ## item and the new state of the cascaded *FIFO*. The following instructions + ## may be returned: + ## :: + ## HoldQid .. -- Move the records accessed by the argument + ## -- addresses from the database to the right + ## -- end of the local hold queue. The age of + ## -- the items on the hold queue increases + ## -- left to right. + ## + ## The extracted items will then be available from the hold queue. + var + actions: seq[QidAction] + state = fifo.state + + if 0 < size: + var size = size.uint64 + + for n in 0 ..< fifo.state.len: + let q = fifo.state[n] + if q[0] == 0: + discard + + elif q[0] <= q[1]: + # Single file + # :: + # | : + # | q[0]--> 3 + # | 4 + # | 5 <--q[1] + # | : + # + let qSize = q[1] - q[0] + 1 + + if size <= qSize: + if size < qSize: + state[n][1] = q[1] - size + elif state.len == n + 1: + state.setLen(n) + else: + state[n] = (QueueID(0), QueueID(0)) + actions.add QidAction( + op: HoldQid, + qid: n.globalQid(q[1] - size + 1), + xid: n.globalQid q[1]) + break + + actions.add QidAction( + op: HoldQid, + qid: n.globalQid q[0], + xid: n.globalQid q[1]) + state[n] = (QueueID(0), QueueID(0)) + + size -= qSize # Otherwise continue + + else: + # Wrap aound, double files + # :: + # | : + # | 3 <--q[1] + # | 4 + # | q[0]--> 5 + # | : + # | wrap + let + wrap = fifo.ctx.q[n].wrap + qSize1 = q[1] - QueueID(0) + + if size <= qSize1: + if size == qSize1: + state[n][1] = wrap + else: + state[n][1] = q[1] - size + actions.add QidAction( + op: HoldQid, + qid: n.globalQid(q[1] - size + 1), + xid: n.globalQid q[1]) + break + + actions.add QidAction( + op: HoldQid, + qid: n.globalQid QueueID(1), + xid: n.globalQid q[1]) + size -= qSize1 # Otherwise continue + + let qSize0 = wrap - q[0] + 1 + + if size <= qSize0: + if size < qSize0: + state[n][1] = wrap - size + elif state.len == n + 1: + state.setLen(n) + else: + state[n] = (QueueID(0), QueueID(0)) + actions.add QidAction( + op: HoldQid, + qid: n.globalQid wrap - size + 1, + xid: n.globalQid wrap) + break + + actions.add QidAction( + op: HoldQid, + qid: n.globalQid q[0], + xid: n.globalQid wrap) + size -= qSize0 + + state[n] = (QueueID(0), QueueID(0)) + + (actions, QidSchedRef(ctx: fifo.ctx, state: state)) + + +proc lengths*( + fifo: QidSchedRef; # Cascaded fifos descriptor + ): seq[int] = + ## Return the list of lengths for all cascaded sub-fifos. + for n in 0 ..< fifo.state.len: + result.add fifo.state[n].fifoLen(fifo.ctx.q[n].wrap).int + +proc len*( + fifo: QidSchedRef; # Cascaded fifos descriptor + ): int = + ## Size of the fifo + fifo.lengths.foldl(a + b, 0) + + +proc `[]`*( + fifo: QidSchedRef; # Cascaded fifos descriptor + inx: int; # Index into latest items + ): QueueID = + ## Get the queue ID of the `inx`-th `fifo` entry where index `0` refers to + ## the entry most recently added, `1` the one before, etc. If there is no + ## such entry `QueueID(0)` is returned. + if 0 <= inx: + var inx = inx.uint64 + + for n in 0 ..< fifo.state.len: + let q = fifo.state[n] + if q[0] == 0: + discard + + elif q[0] <= q[1]: + # Single file + # :: + # | : + # | q[0]--> 3 + # | 4 + # | 5 <--q[1] + # | : + # + let qInxMax = q[1] - q[0] + if inx <= qInxMax: + return n.globalQid(q[1] - inx) + inx -= qInxMax + 1 # Otherwise continue + + else: + # Wrap aound, double files + # :: + # | : + # | 3 <--q[1] + # | 4 + # | q[0]--> 5 + # | : + # | wrap + let qInxMax1 = q[1] - QueueID(1) + if inx <= qInxMax1: + return n.globalQid(q[1] - inx) + inx -= qInxMax1 + 1 # Otherwise continue + + let + wrap = fifo.ctx.q[n].wrap + qInxMax0 = wrap - q[0] + if inx <= qInxMax0: + return n.globalQid(wrap - inx) + inx -= qInxMax0 + 1 # Otherwise continue + +proc le*( + fifo: QidSchedRef; # Cascaded fifos descriptor + fid: FilterID; # Upper bound + fn: QuFilMap; # QueueID/FilterID mapping + ): QueueID = + ## Find the `qid` address of type `QueueID` with + ## * `fn(qid) <= fid` + ## * for all `qid1` with `fn(qid1) <= fid` one has `fn(qid1) <= fn(qid)` + ## + ## The argument type `QuFilMap` of map `fn()` has been commented on earlier. + ## + var + left = 0 + right = fifo.len - 1 + + if 0 <= right: + let maxQid = fifo[left] + if maxQid.fn <= fid: + return maxQid + + # Bisection + if fifo[right].fn <= fid: + while 1 < right - left: + let half = (left + right) div 2 + # + # FilterID: 100 70 33 + # inx: left ... half ... right + # fid: 77 + # + # with `fifo[left].fn > fid >= fifo[right].fn` + # + if fid >= fifo[half].fn: + right = half + else: # fifo[half].fn > fid + left = half + + # Now: `fifo[right].fn <= fid < fifo[left].fn` (and `right == left+1`) + return fifo[right] + + # otherwise QueueID(0) + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/tests/test_aristo.nim b/tests/test_aristo.nim index 7f669e927..eaf843ec8 100644 --- a/tests/test_aristo.nim +++ b/tests/test_aristo.nim @@ -83,7 +83,7 @@ proc miscRunner( test "VertexID recyling lists": check noisy.testVidRecycleLists() - test &"QueueID slot management (sample size: {qidSampleSize})": + test &"QueueID cascaded fifos API (sample size: {qidSampleSize})": check noisy.testQidScheduler(sampleSize = qidSampleSize) diff --git a/tests/test_aristo/test_misc.nim b/tests/test_aristo/test_misc.nim index bb02a12ad..a1f423dfc 100644 --- a/tests/test_aristo/test_misc.nim +++ b/tests/test_aristo/test_misc.nim @@ -12,7 +12,7 @@ ## Aristo (aka Patricia) DB trancoder test import - std/[algorithm, sequtils, strutils], + std/[algorithm, sequtils, sets, strutils], eth/common, results, stew/byteutils, @@ -31,7 +31,7 @@ type fid: FilterID width: uint32 - QTab = Table[QueueID,QValRef] + QTabRef = TableRef[QueueID,QValRef] const QidSlotLyo = [(4,0,10),(3,3,10),(3,4,10),(3,5,10)] @@ -43,11 +43,19 @@ const # Private helpers # ------------------------------------------------------------------------------ -template trueOrReturn(expr: untyped): untyped = +template xCheck(expr: untyped): untyped = + ## Note: this check will invoke `expr` twice if not (expr): check expr return +template xCheck(expr: untyped; ifFalse: untyped): untyped = + ## Note: this check will invoke `expr` twice + if not (expr): + ifFalse + check expr + return + # --------------------- proc posixPrngRand(state: var uint32): byte = @@ -92,28 +100,30 @@ proc `+`(a: VertexID, b: int): VertexID = # --------------------- -func sortedPairs(t: QTab): seq[(QueueID,QValRef)] = - t.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.QueueID).mapIt((it,t[it])) +func sortedPairs(qt: QTabRef): seq[(QueueID,QValRef)] = + qt.keys.toSeq.mapIt(it.uint64).sorted.mapIt(it.QueueID).mapIt((it,qt[it])) -func fifos(qt: QTab; scd: QidSchedRef): seq[seq[(QueueID,QValRef)]] = +func fifos(qt: QTabRef; scd: QidSchedRef): seq[seq[(QueueID,QValRef)]] = proc kvp(chn: int, qid: QueueID): (QueueID,QValRef) = let cid = QueueID((chn.uint64 shl 62) or qid.uint64) val = qt.getOrDefault(cid, QValRef(nil)) - (qid, val) + (cid, val) for i in 0 ..< scd.state.len: let left = scd.state[i][0] right = scd.state[i][1] result.add newSeq[(QueueID,QValRef)](0) - if left <= right: - for j in left .. right: + if left == 0: + discard + elif left <= right: + for j in right.countDown left: result[i].add kvp(i, j) else: - for j in left .. scd.ctx.q[i].wrap: + for j in right.countDown QueueID(1): result[i].add kvp(i, j) - for j in QueueID(1) .. right: + for j in scd.ctx.q[i].wrap.countDown left: result[i].add kvp(i, j) func flatten(a: seq[seq[(QueueID,QValRef)]]): seq[(QueueID,QValRef)] = @@ -130,11 +140,125 @@ func pp(val: QValRef): string = func pp(kvp: (QueueID,QValRef)): string = kvp[0].pp & "=" & kvp[1].pp -func pp(t: QTab): string = - "{" & t.sortedPairs.mapIt(it.pp).join(",") & "}" +func pp(qt: QTabRef): string = + "{" & qt.sortedPairs.mapIt(it.pp).join(",") & "}" -func pp(t: QTab; scd: QidSchedRef): string = - "[" & t.fifos(scd).flatten.mapIt(it.pp).join(",") & "]" +func pp(qt: QTabRef; scd: QidSchedRef): string = + result = "[" + for w in qt.fifos scd: + if w.len == 0: + result &= "ø" + else: + result &= w.mapIt(it.pp).join(",") + result &= "," + if result[^1] == ',': + result[^1] = ']' + else: + result &= "]" + +# ------------------ + +proc exec(db: QTabRef; serial: int; instr: seq[QidAction]; relax: bool): bool = + ## .. + var + saved: bool + hold: seq[(QueueID,QueueID)] + + for act in instr: + case act.op: + of Oops: + xCheck act.op != Oops + + of SaveQid: + xCheck not saved + db[act.qid] = QValRef(fid: FilterID(serial)) + saved = true + + of DelQid: + let val = db.getOrDefault(act.qid, QValRef(nil)) + xCheck not val.isNil + db.del act.qid + + of HoldQid: + hold.add (act.qid, act.xid) + + of DequQid: + var merged = QValRef(nil) + for w in hold: + for qid in w[0] .. w[1]: + let val = db.getOrDefault(qid, QValRef(nil)) + if not relax: + xCheck not val.isNil + if not val.isNil: + if merged.isNil: + merged = val + else: + if relax: + xCheck merged.fid + merged.width + 1 <= val.fid + else: + xCheck merged.fid + merged.width + 1 == val.fid + merged.width += val.width + 1 + db.del qid + if not relax: + xCheck not merged.isNil + if not merged.isNil: + db[act.qid] = merged + hold.setLen(0) + + xCheck saved + xCheck hold.len == 0 + + true + + +proc validate(db: QTabRef; scd: QidSchedRef; serial: int; relax: bool): bool = + ## Verify that the round-robin queues in `db` are consecutive and in the + ## right order. + var + step = 1u + lastVal = FilterID(serial+1) + + for chn,queue in db.fifos scd: + step *= scd.ctx.q[chn].width + 1 # defined by schedule layout + for kvp in queue: + let (qid,val) = (kvp[0], kvp[1]) + if not relax: + xCheck not val.isNil # Entries must exist + xCheck val.fid + step == lastVal # Item distances must match + if not val.isNil: + xCheck val.fid + step <= lastVal # Item distances must decrease + xCheck val.width + 1 == step # Must correspond to `step` size + lastVal = val.fid + + # Compare database against expected fill state + if relax: + xCheck db.len <= scd.len + else: + xCheck db.len == scd.len + + proc qFn(qid: QueueID): FilterID = + let val = db.getOrDefault(qid, QValRef(nil)) + if not val.isNil: + return val.fid + + # Test filter ID selection + var lastFid = FilterID(serial + 1) + + xCheck scd.le(lastFid + 0, qFn) == scd[0] # Test fringe condition + xCheck scd.le(lastFid + 1, qFn) == scd[0] # Test fringe condition + + for (qid,val) in db.fifos(scd).flatten: + for w in (lastFid-1).countDown val.fid: + xCheck scd.le(w, qFn) == qid + lastFid = val.fid + + if FilterID(1) < lastFid: # Test fringe condition + xCheck scd.le(lastFid - 1, qFn) == QueueID(0) + + if FilterID(2) < lastFid: # Test fringe condition + xCheck scd.le(lastFid - 2, qFn) == QueueID(0) + + true # ------------------------------------------------------------------------------ # Public test function @@ -161,7 +285,7 @@ proc testVidRecycleLists*(noisy = true; seed = 42): bool = expectedVids += (vid < first).ord db.vidDispose vid - trueOrReturn db.top.vGen.len == expectedVids + xCheck db.top.vGen.len == expectedVids noisy.say "***", "vids=", db.top.vGen.len, " discarded=", count-expectedVids # Serialise/deserialise @@ -173,43 +297,43 @@ proc testVidRecycleLists*(noisy = true; seed = 42): bool = db1 = newAristoDbRef BackendVoid rc = dbBlob.deblobify seq[VertexID] if rc.isErr: - trueOrReturn rc.error == AristoError(0) + xCheck rc.error == AristoError(0) else: db1.top.vGen = rc.value - trueOrReturn db.top.vGen == db1.top.vGen + xCheck db.top.vGen == db1.top.vGen # Make sure that recycled numbers are fetched first let topVid = db.top.vGen[^1] while 1 < db.top.vGen.len: let w = db.vidFetch() - trueOrReturn w < topVid - trueOrReturn db.top.vGen.len == 1 and db.top.vGen[0] == topVid + xCheck w < topVid + xCheck db.top.vGen.len == 1 and db.top.vGen[0] == topVid # Get some consecutive vertex IDs for n in 0 .. 5: let w = db.vidFetch() - trueOrReturn w == topVid + n - trueOrReturn db.top.vGen.len == 1 + xCheck w == topVid + n + xCheck db.top.vGen.len == 1 # Repeat last test after clearing the cache db.top.vGen.setLen(0) for n in 0 .. 5: let w = db.vidFetch() - trueOrReturn w == VertexID(2) + n # VertexID(1) is default root ID - trueOrReturn db.top.vGen.len == 1 + xCheck w == VertexID(2) + n # VertexID(1) is default root ID + xCheck db.top.vGen.len == 1 # Recycling and re-org tests func toVQ(a: seq[int]): seq[VertexID] = a.mapIt(VertexID(it)) - trueOrReturn @[8, 7, 3, 4, 5, 9] .toVQ.vidReorg == @[3, 4, 5, 7] .toVQ - trueOrReturn @[8, 7, 6, 3, 4, 5, 9] .toVQ.vidReorg == @[3] .toVQ - trueOrReturn @[5, 4, 3, 7] .toVQ.vidReorg == @[5, 4, 3, 7] .toVQ - trueOrReturn @[5] .toVQ.vidReorg == @[5] .toVQ - trueOrReturn @[3, 5] .toVQ.vidReorg == @[3, 5] .toVQ - trueOrReturn @[4, 5] .toVQ.vidReorg == @[4] .toVQ + xCheck @[8, 7, 3, 4, 5, 9] .toVQ.vidReorg == @[3, 4, 5, 7] .toVQ + xCheck @[8, 7, 6, 3, 4, 5, 9] .toVQ.vidReorg == @[3] .toVQ + xCheck @[5, 4, 3, 7] .toVQ.vidReorg == @[5, 4, 3, 7] .toVQ + xCheck @[5] .toVQ.vidReorg == @[5] .toVQ + xCheck @[3, 5] .toVQ.vidReorg == @[3, 5] .toVQ + xCheck @[4, 5] .toVQ.vidReorg == @[4] .toVQ - trueOrReturn newSeq[VertexID](0).vidReorg().len == 0 + xCheck newSeq[VertexID](0).vidReorg().len == 0 true @@ -218,6 +342,7 @@ proc testQidScheduler*( noisy = true; layout = QidSlotLyo; sampleSize = QidSample; + reorgPercent = 40 ): bool = ## ## Example table for `QidSlotLyo` layout after 10_000 cycles @@ -225,159 +350,117 @@ proc testQidScheduler*( ## QueueID | QValRef | ## | FilterID | width | comment ## --------+----------+-------+---------------------------------- - ## %7 | 9997 | 0 | %7 stands for QueueID(7) - ## %8 | 9998 | 0 | + ## %a | 10000 | 0 | %a stands for QueueID(10) ## %9 | 9999 | 0 | - ## %a | 10000 | 0 | + ## %8 | 9998 | 0 | + ## %7 | 9997 | 0 | ## | | | - ## %1:6 | 9981 | 3 | %1:6 stands for QueueID((1 shl 62) + 6) - ## %1:7 | 9985 | 3 | - ## %1:8 | 9989 | 3 | ## %1:9 | 9993 | 3 | 9993 + 3 + 1 => 9997, see %7 + ## %1:8 | 9989 | 3 | + ## %1:7 | 9985 | 3 | + ## %1:6 | 9981 | 3 | %1:6 stands for QueueID((1 shl 62) + 6) ## | | | - ## %2:3 | 9841 | 19 | - ## %2:4 | 9861 | 19 | - ## %2:5 | 9881 | 19 | - ## %2:6 | 9901 | 19 | - ## %2:7 | 9921 | 19 | - ## %2:8 | 9941 | 19 | ## %2:9 | 9961 | 19 | 9961 + 19 + 1 => 9981, see %1:6 + ## %2:8 | 9941 | 19 | + ## %2:7 | 9921 | 19 | + ## %2:6 | 9901 | 19 | + ## %2:5 | 9881 | 19 | + ## %2:4 | 9861 | 19 | + ## %2:3 | 9841 | 19 | ## | | | - ## %3:a | 9481 | 119 | - ## %3:1 | 9601 | 119 | ## %3:2 | 9721 | 119 | 9721 + 119 + 1 => 9871, see %2:3 + ## %3:1 | 9601 | 119 | + ## %3:a | 9481 | 119 | ## var - list: Qtab debug = false # or true let + list = newTable[QueueID,QValRef]() scd = QidSchedRef.init layout ctx = scd.ctx.q + proc show(serial = 0; exec: seq[QidAction] = @[]) = + var s = "" + if 0 < serial: + s &= "n=" & $serial + if 0 < exec.len: + s &= " exec=" & exec.pp + s &= "" & + "\n state=" & scd.state.pp & + "\n list=" & list.pp & + "\n fifo=" & list.pp(scd) & + "\n" + noisy.say "***", s + if debug: - noisy.say "***", "testFilterSchedule", - " ctx=", ctx, - " stats=", scd.ctx.stats + noisy.say "***", "sampleSize=", sampleSize, + " ctx=", ctx, " stats=", scd.ctx.stats for n in 1 .. sampleSize: let w = scd.addItem() - - if debug and false: - noisy.say "***", "testFilterSchedule", - " n=", n, - " => ", w.exec.pp, - " / ", w.fifo.state.pp - - var - saved = false - hold: seq[(QueueID,QueueID)] - for act in w.exec: - case act.op: - of Oops: - noisy.say "***", "testFilterSchedule", " n=", n, " act=", act.pp - - of SaveQid: - if saved: - noisy.say "***", "testFilterSchedule", " n=", n, " act=", act.pp, - " hold=", hold.pp, " state=", scd.state.pp, " fifo=", list.pp scd - check not saved - return - list[act.qid] = QValRef(fid: FilterID(n)) - saved = true - - of HoldQid: - hold.add (act.qid, act.xid) - - of DequQid: - var merged = QValRef(nil) - for w in hold: - for qid in w[0] .. w[1]: - let val = list.getOrDefault(qid, QValRef(nil)) - if val.isNil: - noisy.say "***", "testFilterSchedule", " n=", n, " act=", act.pp, - " hold=", hold.pp, " state=", scd.state.pp, " fifo=", list.pp scd - check not val.isNil - return - if merged.isNil: - merged = val - elif merged.fid + merged.width + 1 == val.fid: - merged.width += val.width + 1 - else: - noisy.say "***", "testFilterSchedule", " n=", n, " act=", act.pp, - " hold=", hold.pp, " state=", scd.state.pp, " fifo=", list.pp scd - check merged.fid + merged.width + 1 == val.fid - return - list.del qid - if merged.isNil: - noisy.say "***", "testFilterSchedule", " n=", n, " act=", act.pp, - " hold=", hold.pp, " state=", scd.state.pp, " fifo=", list.pp scd - check not merged.isNil - return - list[act.qid] = merged - hold.setLen(0) - + let execOk = list.exec(serial=n, instr=w.exec, relax=false) + xCheck execOk scd[] = w.fifo[] + let validateOk = list.validate(scd, serial=n, relax=false) + xCheck validateOk: + show(serial=n, exec=w.exec) - # Verify that the round-robin queues in `list` are consecutive and in the - # right order. - var - botVal = FilterID(0) - step = 1u - for chn,queue in list.fifos scd: - var lastVal = FilterID(0) - step *= ctx[chn].width + 1 # defined by schedule layout - - for kvp in queue: - let (qid,val) = (kvp[0], kvp[1]) - - # Entries must exist - if val.isNil: - noisy.say "***", "testFilterSchedule", " n=", n, " chn=", chn, - " exec=", w.exec.pp, " kvp=", kvp.pp, " fifo=", list.pp scd - check not val.isNil - return - - # Fid value fields must increase witin a sub-queue. - if val.fid <= lastVal: - noisy.say "***", "testFilterSchedule", " n=", n, " chn=", chn, - " exec=", w.exec.pp, " kvp=", kvp.pp, " fifo=", list.pp scd - check lastVal < val.fid - return - - # Width value must correspond to `step` size - if val.width + 1 != step: - noisy.say "***", "testFilterSchedule", " n=", n, " chn=", chn, - " exec=", w.exec.pp, " kvp=", kvp.pp, " fifo=", list.pp scd - check val.width + 1 == step - return - - # Item distances must match the step width - if lastVal != 0: - let dist = val.fid - lastVal - if dist != step.uint64: - noisy.say "***", "testFilterSchedule", " n=", n, " chn=", chn, - " exec=", w.exec.pp, " kvp=", kvp.pp, " fifo=", list.pp scd - check dist == step.uint64 - return - lastVal = val.fid - - # The top value of the current queue must be smaller than the - # bottom value of the previous one - if 0 < chn and botVal != queue[^1][1].fid + step.uint64: - noisy.say "***", "testFilterSchedule", " n=", n, " chn=", chn, - " exec=", w.exec.pp, " step=", step, " fifo=", list.pp scd - check botVal == queue[^1][1].fid + step.uint64 - return - botVal = queue[0][1].fid + let fifoID = list.fifos(scd).flatten.mapIt(it[0]) + for j in 0 ..< list.len: + xCheck fifoID[j] == scd[j]: + noisy.say "***", "n=", n, " exec=", w.exec.pp, + " fifoID[", j, "]=", fifoID[j].pp, + " scd[", j, "]=", scd[j].pp, + "\n fifo=", list.pp scd if debug: - noisy.say "***", "testFilterSchedule", - " n=", n, - "\n exec=", w.exec.pp, - "\n state=", scd.state.pp, - "\n list=", list.pp, - "\n fifo=", list.pp scd, - "\n" + show(exec=w.exec) + + # ------------------- + + # Mark deleted some entries from database + var + nDel = (list.len * reorgPercent) div 100 + delIDs: HashSet[QueueID] + for n in 0 ..< nDel: + delIDs.incl scd[n] + + # Delete these entries + let fetch = scd.fetchItems nDel + for act in fetch.exec: + xCheck act.op == HoldQid + for qid in act.qid .. act.xid: + xCheck qid in delIDs + xCheck list.hasKey qid + delIDs.excl qid + list.del qid + + xCheck delIDs.len == 0 + scd[] = fetch.fifo[] + + # ------------------- + + # Continue adding items + for n in sampleSize + 1 .. 2 * sampleSize: + let w = scd.addItem() + let execOk = list.exec(serial=n, instr=w.exec, relax=true) + xCheck execOk + scd[] = w.fifo[] + let validateOk = list.validate(scd, serial=n, relax=true) + xCheck validateOk: + show(serial=n, exec=w.exec) + + # Continue adding items, now strictly + for n in 2 * sampleSize + 1 .. 3 * sampleSize: + let w = scd.addItem() + let execOk = list.exec(serial=n, instr=w.exec, relax=false) + xCheck execOk + scd[] = w.fifo[] + let validateOk = list.validate(scd, serial=n, relax=false) + xCheck validateOk + + if debug: # or true: + show() true