Core db+aristo update recovery journal management (#2156)
* Aristo: Allow to define/set `FilterID` for journal filter records why: After some changes, the `FilterID` is isomorphic to the `BlockNumber` scalar (well, the first 2^64 entries of a `BlockNumber`.) The needed change for `FilterID` is that the `FilterID(0)` value is valid part of the `FilterID` scalar. A non-valid `FilterID` entry is represented by `none(FilterID)`. * Aristo: Split off function `persist()` as persistent version of `stow()` why: In production, `stow(persistent=false,..)` is currently unused. So, using `persist()` rather than `stow(persistent=true,..)` improves readability and is better to maintain. * CoreDb+Aristo: Store block numbers in journal records why: This makes journal records searchable by block numbers * Aristo: Rename some journal related functions why: The name *journal* is more appropriate to api functions than something with *fifo* or *filter*. * CoreDb+Aristo: Update last/oldest journal state retrieval * CoreDb+Aristo: Register block number with state root in journal why: No need anymore for extra lookup table `stRootToBlockNum` which maps a storage root -> block number. * Aristo: Remove unused function `getFilUbe()` from api * CoreDb: Remove now unused virtual table `stRootToBlockNum` why: Was used to map a state root to a block number. This functionality is now embedded into the recovery journal backend. * Turn of API tracking (will fail on `fluffy`)
This commit is contained in:
parent
0d4ef023ed
commit
961f63358e
|
@ -192,8 +192,13 @@ proc persistBlocksImpl(c: ChainRef; headers: openArray[BlockHeader];
|
|||
dbTx.commit()
|
||||
|
||||
# The `c.db.persistent()` call is ignored by the legacy DB which
|
||||
# automatically saves persistently when reaching the zero level transaction
|
||||
c.db.persistent()
|
||||
# automatically saves persistently when reaching the zero level transaction.
|
||||
#
|
||||
# For the `Aristo` database, this code position is only reached if the
|
||||
# the parent state of the first block (as registered in `headers[0]`) was
|
||||
# the canonical state before updating. So this state will be saved with
|
||||
# `persistent()` together with the respective block number.
|
||||
c.db.persistent(headers[0].blockNumber - 1)
|
||||
|
||||
# For a single state ledger, there is only a limited backlog. So clean up
|
||||
# regularly (the `CleanUpEpoch` should not be too small as each lookup pulls
|
||||
|
|
|
@ -13,10 +13,11 @@
|
|||
|
||||
|
||||
import
|
||||
std/times,
|
||||
std/[options, times],
|
||||
eth/[common, trie/nibbles],
|
||||
results,
|
||||
./aristo_desc/desc_backend,
|
||||
./aristo_filter/filter_helpers,
|
||||
./aristo_init/memory_db,
|
||||
"."/[aristo_delete, aristo_desc, aristo_fetch, aristo_get, aristo_hashify,
|
||||
aristo_hike, aristo_init, aristo_merge, aristo_path, aristo_profile,
|
||||
|
@ -148,12 +149,22 @@ type
|
|||
## pair was found on the filter or the backend, this transaction is
|
||||
## empty.
|
||||
|
||||
AristoApiGetFilUbeFn* =
|
||||
proc(db: AristoDbRef;
|
||||
qid: QueueID;
|
||||
): Result[FilterRef,AristoError]
|
||||
AristoApiGetFromJournalFn* =
|
||||
proc(be: BackendRef;
|
||||
fid: Option[FilterID];
|
||||
earlierOK = false;
|
||||
): Result[FilterIndexPair,AristoError]
|
||||
{.noRaise.}
|
||||
## Get the filter from the unfiltered backened if available.
|
||||
## For a positive argument `fid`, find the filter on the journal with ID
|
||||
## not larger than `fid` (i e. the resulting filter might be older.)
|
||||
##
|
||||
## If the argument `earlierOK` is passed `false`, the function succeeds
|
||||
## only if the filter ID of the returned filter is equal to the argument
|
||||
## `fid`.
|
||||
##
|
||||
## In case that the argument `fid` is zera (i.e. `FilterID(0)`), the
|
||||
## filter with the smallest filter ID (i.e. the oldest filter) is
|
||||
## returned. In that case, the argument `earlierOK` is ignored.
|
||||
|
||||
AristoApiGetKeyRcFn* =
|
||||
proc(db: AristoDbRef;
|
||||
|
@ -252,6 +263,33 @@ type
|
|||
## paths used to index database leaf values can be represented as
|
||||
## `Blob`, i.e. `PathID` type paths with an even number of nibbles.
|
||||
|
||||
AristoApiPersistFn* =
|
||||
proc(db: AristoDbRef;
|
||||
nxtFid = none(FilterID);
|
||||
chunkedMpt = false;
|
||||
): Result[void,AristoError]
|
||||
{.noRaise.}
|
||||
## Persistently store data onto backend database. If the system is
|
||||
## running without a database backend, the function returns immediately
|
||||
## with an error. The same happens if there is a pending transaction.
|
||||
##
|
||||
## The function merges all staged data from the top layer cache onto the
|
||||
## backend stage area. After that, the top layer cache is cleared.
|
||||
##
|
||||
## Finally, the staged data are merged into the physical backend
|
||||
## database and the staged data area is cleared. Wile performing this
|
||||
## last step, the recovery journal is updated (if available.)
|
||||
##
|
||||
## If the argument `nxtFid` is passed non-zero, it will be the ID for
|
||||
## the next recovery journal record. If non-zero, this ID must be greater
|
||||
## than all previous IDs (e.g. block number when storing after block
|
||||
## execution.)
|
||||
##
|
||||
## Staging the top layer cache might fail with a partial MPT when it is
|
||||
## set up from partial MPT chunks as it happens with `snap` sync
|
||||
## processing. In this case, the `chunkedMpt` argument must be set
|
||||
## `true` (see alse `fwdFilter()`.)
|
||||
|
||||
AristoApiReCentreFn* =
|
||||
proc(db: AristoDbRef;
|
||||
) {.noRaise.}
|
||||
|
@ -284,28 +322,6 @@ type
|
|||
## Encode the data payload of the argument `pyl` as RLP `Blob` if
|
||||
## it is of account type, otherwise pass the data as is.
|
||||
|
||||
AristoApiStowFn* =
|
||||
proc(db: AristoDbRef;
|
||||
persistent = false;
|
||||
chunkedMpt = false;
|
||||
): Result[void,AristoError]
|
||||
{.noRaise.}
|
||||
## If there is no backend while the `persistent` argument is set `true`,
|
||||
## the function returns immediately with an error. The same happens if
|
||||
## there is a pending transaction.
|
||||
##
|
||||
## The function then merges the data from the top layer cache into the
|
||||
## backend stage area. After that, the top layer cache is cleared.
|
||||
##
|
||||
## Staging the top layer cache might fail withh a partial MPT when it
|
||||
## is set up from partial MPT chunks as it happens with `snap` sync
|
||||
## processing. In this case, the `chunkedMpt` argument must be set
|
||||
## `true` (see alse `fwdFilter`.)
|
||||
##
|
||||
## If the argument `persistent` is set `true`, all the staged data are
|
||||
## merged into the physical backend database and the staged data area
|
||||
## is cleared.
|
||||
|
||||
AristoApiTxBeginFn* =
|
||||
proc(db: AristoDbRef
|
||||
): Result[AristoTxRef,AristoError]
|
||||
|
@ -359,7 +375,7 @@ type
|
|||
forget*: AristoApiForgetFn
|
||||
forkTop*: AristoApiForkTopFn
|
||||
forkWith*: AristoApiForkWithFn
|
||||
getFilUbe*: AristoApiGetFilUbeFn
|
||||
getFromJournal*: AristoApiGetFromJournalFn
|
||||
getKeyRc*: AristoApiGetKeyRcFn
|
||||
hashify*: AristoApiHashifyFn
|
||||
hasPath*: AristoApiHasPathFn
|
||||
|
@ -370,10 +386,10 @@ type
|
|||
merge*: AristoApiMergeFn
|
||||
mergePayload*: AristoApiMergePayloadFn
|
||||
pathAsBlob*: AristoApiPathAsBlobFn
|
||||
persist*: AristoApiPersistFn
|
||||
reCentre*: AristoApiReCentreFn
|
||||
rollback*: AristoApiRollbackFn
|
||||
serialise*: AristoApiSerialiseFn
|
||||
stow*: AristoApiStowFn
|
||||
txBegin*: AristoApiTxBeginFn
|
||||
txTop*: AristoApiTxTopFn
|
||||
vidFetch*: AristoApiVidFetchFn
|
||||
|
@ -384,45 +400,45 @@ type
|
|||
## Index/name mapping for profile slots
|
||||
AristoApiProfTotal = "total"
|
||||
|
||||
AristoApiProfCommitFn = "commit"
|
||||
AristoApiProfDeleteFn = "delete"
|
||||
AristoApiProfDelTreeFn = "delTree"
|
||||
AristoApiProfFetchPayloadFn = "fetchPayload"
|
||||
AristoApiProfFinishFn = "finish"
|
||||
AristoApiProfForgetFn = "forget"
|
||||
AristoApiProfForkTopFn = "forkTop"
|
||||
AristoApiProfForkWithFn = "forkWith"
|
||||
AristoApiProfGetFilUbeFn = "getFilUBE"
|
||||
AristoApiProfGetKeyRcFn = "getKeyRc"
|
||||
AristoApiProfHashifyFn = "hashify"
|
||||
AristoApiProfHasPathFn = "hasPath"
|
||||
AristoApiProfHikeUpFn = "hikeUp"
|
||||
AristoApiProfIsTopFn = "isTop"
|
||||
AristoApiProfLevelFn = "level"
|
||||
AristoApiProfNForkedFn = "nForked"
|
||||
AristoApiProfMergeFn = "merge"
|
||||
AristoApiProfMergePayloadFn = "mergePayload"
|
||||
AristoApiProfPathAsBlobFn = "pathAsBlob"
|
||||
AristoApiProfReCentreFn = "reCentre"
|
||||
AristoApiProfRollbackFn = "rollback"
|
||||
AristoApiProfSerialiseFn = "serialise"
|
||||
AristoApiProfStowFn = "stow"
|
||||
AristoApiProfTxBeginFn = "txBegin"
|
||||
AristoApiProfTxTopFn = "txTop"
|
||||
AristoApiProfVidFetchFn = "vidFetch"
|
||||
AristoApiProfVidDisposeFn = "vidDispose"
|
||||
AristoApiProfCommitFn = "commit"
|
||||
AristoApiProfDeleteFn = "delete"
|
||||
AristoApiProfDelTreeFn = "delTree"
|
||||
AristoApiProfFetchPayloadFn = "fetchPayload"
|
||||
AristoApiProfFinishFn = "finish"
|
||||
AristoApiProfForgetFn = "forget"
|
||||
AristoApiProfForkTopFn = "forkTop"
|
||||
AristoApiProfForkWithFn = "forkWith"
|
||||
AristoApiProfGetFromJournalFn = "getFromJournal"
|
||||
AristoApiProfGetKeyRcFn = "getKeyRc"
|
||||
AristoApiProfHashifyFn = "hashify"
|
||||
AristoApiProfHasPathFn = "hasPath"
|
||||
AristoApiProfHikeUpFn = "hikeUp"
|
||||
AristoApiProfIsTopFn = "isTop"
|
||||
AristoApiProfLevelFn = "level"
|
||||
AristoApiProfNForkedFn = "nForked"
|
||||
AristoApiProfMergeFn = "merge"
|
||||
AristoApiProfMergePayloadFn = "mergePayload"
|
||||
AristoApiProfPathAsBlobFn = "pathAsBlob"
|
||||
AristoApiProfPersistFn = "persist"
|
||||
AristoApiProfReCentreFn = "reCentre"
|
||||
AristoApiProfRollbackFn = "rollback"
|
||||
AristoApiProfSerialiseFn = "serialise"
|
||||
AristoApiProfTxBeginFn = "txBegin"
|
||||
AristoApiProfTxTopFn = "txTop"
|
||||
AristoApiProfVidFetchFn = "vidFetch"
|
||||
AristoApiProfVidDisposeFn = "vidDispose"
|
||||
|
||||
AristoApiProfBeGetVtxFn = "be/getVtx"
|
||||
AristoApiProfBeGetKeyFn = "be/getKey"
|
||||
AristoApiProfBeGetFilFn = "be/getFil"
|
||||
AristoApiProfBeGetIdgFn = "be/getIfg"
|
||||
AristoApiProfBeGetFqsFn = "be/getFqs"
|
||||
AristoApiProfBePutVtxFn = "be/putVtx"
|
||||
AristoApiProfBePutKeyFn = "be/putKey"
|
||||
AristoApiProfBePutFilFn = "be/putFil"
|
||||
AristoApiProfBePutIdgFn = "be/putIdg"
|
||||
AristoApiProfBePutFqsFn = "be/putFqs"
|
||||
AristoApiProfBePutEndFn = "be/putEnd"
|
||||
AristoApiProfBeGetVtxFn = "be/getVtx"
|
||||
AristoApiProfBeGetKeyFn = "be/getKey"
|
||||
AristoApiProfBeGetFilFn = "be/getFil"
|
||||
AristoApiProfBeGetIdgFn = "be/getIfg"
|
||||
AristoApiProfBeGetFqsFn = "be/getFqs"
|
||||
AristoApiProfBePutVtxFn = "be/putVtx"
|
||||
AristoApiProfBePutKeyFn = "be/putKey"
|
||||
AristoApiProfBePutFilFn = "be/putFil"
|
||||
AristoApiProfBePutIdgFn = "be/putIdg"
|
||||
AristoApiProfBePutFqsFn = "be/putFqs"
|
||||
AristoApiProfBePutEndFn = "be/putEnd"
|
||||
|
||||
AristoApiProfRef* = ref object of AristoApiRef
|
||||
## Profiling API extension of `AristoApiObj`
|
||||
|
@ -443,7 +459,7 @@ when AutoValidateApiHooks:
|
|||
doAssert not api.forget.isNil
|
||||
doAssert not api.forkTop.isNil
|
||||
doAssert not api.forkWith.isNil
|
||||
doAssert not api.getFilUbe.isNil
|
||||
doAssert not api.getFromJournal.isNil
|
||||
doAssert not api.getKeyRc.isNil
|
||||
doAssert not api.hashify.isNil
|
||||
doAssert not api.hasPath.isNil
|
||||
|
@ -454,10 +470,10 @@ when AutoValidateApiHooks:
|
|||
doAssert not api.merge.isNil
|
||||
doAssert not api.mergePayload.isNil
|
||||
doAssert not api.pathAsBlob.isNil
|
||||
doAssert not api.persist.isNil
|
||||
doAssert not api.reCentre.isNil
|
||||
doAssert not api.rollback.isNil
|
||||
doAssert not api.serialise.isNil
|
||||
doAssert not api.stow.isNil
|
||||
doAssert not api.txBegin.isNil
|
||||
doAssert not api.txTop.isNil
|
||||
doAssert not api.vidFetch.isNil
|
||||
|
@ -496,7 +512,7 @@ func init*(api: var AristoApiObj) =
|
|||
api.forget = forget
|
||||
api.forkTop = forkTop
|
||||
api.forkWith = forkWith
|
||||
api.getFilUbe = getFilUbe
|
||||
api.getFromJournal = getFromJournal
|
||||
api.getKeyRc = getKeyRc
|
||||
api.hashify = hashify
|
||||
api.hasPath = hasPath
|
||||
|
@ -507,10 +523,10 @@ func init*(api: var AristoApiObj) =
|
|||
api.merge = merge
|
||||
api.mergePayload = mergePayload
|
||||
api.pathAsBlob = pathAsBlob
|
||||
api.persist = persist
|
||||
api.reCentre = reCentre
|
||||
api.rollback = rollback
|
||||
api.serialise = serialise
|
||||
api.stow = stow
|
||||
api.txBegin = txBegin
|
||||
api.txTop = txTop
|
||||
api.vidFetch = vidFetch
|
||||
|
@ -524,33 +540,33 @@ func init*(T: type AristoApiRef): T =
|
|||
|
||||
func dup*(api: AristoApiRef): AristoApiRef =
|
||||
result = AristoApiRef(
|
||||
commit: api.commit,
|
||||
delete: api.delete,
|
||||
delTree: api.delTree,
|
||||
fetchPayload: api.fetchPayload,
|
||||
finish: api.finish,
|
||||
forget: api.forget,
|
||||
forkTop: api.forkTop,
|
||||
forkWith: api.forkWith,
|
||||
getFilUbe: api.getFilUbe,
|
||||
getKeyRc: api.getKeyRc,
|
||||
hashify: api.hashify,
|
||||
hasPath: api.hasPath,
|
||||
hikeUp: api.hikeUp,
|
||||
isTop: api.isTop,
|
||||
level: api.level,
|
||||
nForked: api.nForked,
|
||||
merge: api.merge,
|
||||
mergePayload: api.mergePayload,
|
||||
pathAsBlob: api.pathAsBlob,
|
||||
reCentre: api.reCentre,
|
||||
rollback: api.rollback,
|
||||
serialise: api.serialise,
|
||||
stow: api.stow,
|
||||
txBegin: api.txBegin,
|
||||
txTop: api.txTop,
|
||||
vidFetch: api.vidFetch,
|
||||
vidDispose: api.vidDispose)
|
||||
commit: api.commit,
|
||||
delete: api.delete,
|
||||
delTree: api.delTree,
|
||||
fetchPayload: api.fetchPayload,
|
||||
finish: api.finish,
|
||||
forget: api.forget,
|
||||
forkTop: api.forkTop,
|
||||
forkWith: api.forkWith,
|
||||
getFromJournal: api.getFromJournal,
|
||||
getKeyRc: api.getKeyRc,
|
||||
hashify: api.hashify,
|
||||
hasPath: api.hasPath,
|
||||
hikeUp: api.hikeUp,
|
||||
isTop: api.isTop,
|
||||
level: api.level,
|
||||
nForked: api.nForked,
|
||||
merge: api.merge,
|
||||
mergePayload: api.mergePayload,
|
||||
pathAsBlob: api.pathAsBlob,
|
||||
persist: api.persist,
|
||||
reCentre: api.reCentre,
|
||||
rollback: api.rollback,
|
||||
serialise: api.serialise,
|
||||
txBegin: api.txBegin,
|
||||
txTop: api.txTop,
|
||||
vidFetch: api.vidFetch,
|
||||
vidDispose: api.vidDispose)
|
||||
when AutoValidateApiHooks:
|
||||
api.validate
|
||||
|
||||
|
@ -621,10 +637,10 @@ func init*(
|
|||
AristoApiProfForkWithFn.profileRunner:
|
||||
result = api.forkWith(a, b, c, d)
|
||||
|
||||
profApi.getFilUbe =
|
||||
proc(a: AristoDbRef; b: QueueID): auto =
|
||||
AristoApiProfGetFilUbeFn.profileRunner:
|
||||
result = api.getFilUbe(a, b)
|
||||
profApi.getFromJournal =
|
||||
proc(a: BackendRef; b: Option[FilterID]; c = false): auto =
|
||||
AristoApiProfGetFromJournalFn.profileRunner:
|
||||
result = api.getFromJournal(a, b, c)
|
||||
|
||||
profApi.getKeyRc =
|
||||
proc(a: AristoDbRef; b: VertexID): auto =
|
||||
|
@ -677,6 +693,11 @@ func init*(
|
|||
AristoApiProfPathAsBlobFn.profileRunner:
|
||||
result = api.pathAsBlob(a)
|
||||
|
||||
profApi.persist =
|
||||
proc(a: AristoDbRef; b = none(FilterID); c = false): auto =
|
||||
AristoApiProfPersistFn.profileRunner:
|
||||
result = api.persist(a, b, c)
|
||||
|
||||
profApi.reCentre =
|
||||
proc(a: AristoDbRef) =
|
||||
AristoApiProfReCentreFn.profileRunner:
|
||||
|
@ -692,11 +713,6 @@ func init*(
|
|||
AristoApiProfSerialiseFn.profileRunner:
|
||||
result = api.serialise(a, b)
|
||||
|
||||
profApi.stow =
|
||||
proc(a: AristoDbRef; b = false; c = false): auto =
|
||||
AristoApiProfStowFn.profileRunner:
|
||||
result = api.stow(a, b, c)
|
||||
|
||||
profApi.txBegin =
|
||||
proc(a: AristoDbRef): auto =
|
||||
AristoApiProfTxBeginFn.profileRunner:
|
||||
|
|
|
@ -147,8 +147,6 @@ func ppCodeHash(h: Hash256): string =
|
|||
result &= h.data.toHex.squeeze(hex=true,ignLen=true)
|
||||
|
||||
proc ppFid(fid: FilterID): string =
|
||||
if not fid.isValid:
|
||||
return "ø"
|
||||
"@" & $fid
|
||||
|
||||
proc ppQid(qid: QueueID): string =
|
||||
|
|
|
@ -128,9 +128,6 @@ func isValid*(sqv: HashSet[VertexID]): bool =
|
|||
func isValid*(qid: QueueID): bool =
|
||||
qid != QueueID(0)
|
||||
|
||||
func isValid*(fid: FilterID): bool =
|
||||
fid != FilterID(0)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions, miscellaneous
|
||||
# ------------------------------------------------------------------------------
|
||||
|
|
|
@ -205,9 +205,9 @@ type
|
|||
DelVidStaleVtx
|
||||
|
||||
# Functions from `aristo_filter.nim`
|
||||
FilBackStepsExpected
|
||||
FilBackendMissing
|
||||
FilBackendRoMode
|
||||
FilBackStepsExpected
|
||||
FilDudeFilterUpdateError
|
||||
FilExecDublicateSave
|
||||
FilExecHoldExpected
|
||||
|
@ -222,13 +222,14 @@ type
|
|||
FilNoMatchOnFifo
|
||||
FilPrettyPointlessLayer
|
||||
FilQidByLeFidFailed
|
||||
FilQuBespokeFidTooSmall
|
||||
FilQuSchedDisabled
|
||||
FilSiblingsCommitUnfinshed
|
||||
FilSrcTrgInconsistent
|
||||
FilStateRootMismatch
|
||||
FilStateRootMissing
|
||||
FilTrgSrcMismatch
|
||||
FilTrgTopSrcMismatch
|
||||
FilSiblingsCommitUnfinshed
|
||||
FilSrcTrgInconsistent
|
||||
|
||||
# Get functions from `aristo_get.nim`
|
||||
GetLeafMissing
|
||||
|
|
|
@ -128,6 +128,11 @@ type
|
|||
final*: LayerFinalRef ## Stored as latest version
|
||||
txUid*: uint ## Transaction identifier if positive
|
||||
|
||||
FilterIndexPair* = object
|
||||
## Helper structure for fetching fiters from journal.
|
||||
inx*: int ## Non negative journal index. latest=`0`
|
||||
fil*: FilterRef ## Valid filter
|
||||
|
||||
# ----------------------
|
||||
|
||||
QidLayoutRef* = ref object
|
||||
|
@ -137,14 +142,14 @@ type
|
|||
|
||||
QidSpec* = tuple
|
||||
## Layout of a filter ID slot queue
|
||||
size: uint ## Capacity of queue, length within `1..wrap`
|
||||
width: uint ## Instance gaps (relative to prev. item)
|
||||
wrap: QueueID ## Range `1..wrap` for round-robin queue
|
||||
size: uint ## Queue capacity, length within `1..wrap`
|
||||
width: uint ## Instance gaps (relative to prev. item)
|
||||
wrap: QueueID ## Range `1..wrap` for round-robin queue
|
||||
|
||||
QidSchedRef* = ref object of RootRef
|
||||
## Current state of the filter queues
|
||||
ctx*: QidLayoutRef ## Organisation of the FIFO
|
||||
state*: seq[(QueueID,QueueID)] ## Current fill state
|
||||
ctx*: QidLayoutRef ## Organisation of the FIFO
|
||||
state*: seq[(QueueID,QueueID)] ## Current fill state
|
||||
|
||||
const
|
||||
DefaultQidWrap = QueueID(0x3fff_ffff_ffff_ffffu64)
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
##
|
||||
|
||||
import
|
||||
std/[sequtils, sets, tables],
|
||||
std/[options, sequtils, sets, tables],
|
||||
eth/common,
|
||||
results,
|
||||
"."/[aristo_desc, aristo_get, aristo_vid],
|
||||
|
@ -99,7 +99,8 @@ proc canResolveBackendFilter*(db: AristoDbRef): bool =
|
|||
|
||||
|
||||
proc resolveBackendFilter*(
|
||||
db: AristoDbRef;
|
||||
db: AristoDbRef; # Database
|
||||
nxtFid = none(FilterID); # Next filter ID (if any)
|
||||
reCentreOk = false;
|
||||
): Result[void,AristoError] =
|
||||
## Resolve the backend filter into the physical backend database.
|
||||
|
@ -143,15 +144,17 @@ proc resolveBackendFilter*(
|
|||
|
||||
# Figure out how to save the reverse filter on a cascades slots queue
|
||||
var instr: FifoInstr
|
||||
if not be.journal.isNil: # Otherwise ignore
|
||||
if not be.journal.isNil: # Otherwise ignore
|
||||
block getInstr:
|
||||
# Compile instruction for updating filters on the cascaded fifos
|
||||
if db.roFilter.isValid:
|
||||
let ovLap = be.getFilterOverlap db.roFilter
|
||||
let ovLap = be.getJournalOverlap db.roFilter
|
||||
if 0 < ovLap:
|
||||
instr = ? be.fifosDelete ovLap # Revert redundant entries
|
||||
break getInstr
|
||||
instr = ? be.fifosStore updateSiblings.rev # Store reverse filter
|
||||
instr = ? be.fifosStore(
|
||||
updateSiblings.rev, # Store reverse filter
|
||||
nxtFid) # Set filter ID (if any)
|
||||
|
||||
# Store structural single trie entries
|
||||
let writeBatch = be.putBegFn()
|
||||
|
@ -175,7 +178,7 @@ proc resolveBackendFilter*(
|
|||
ok()
|
||||
|
||||
|
||||
proc forkBackLog*(
|
||||
proc forkByJournal*(
|
||||
db: AristoDbRef;
|
||||
episode: int;
|
||||
): Result[AristoDbRef,AristoError] =
|
||||
|
@ -199,18 +202,24 @@ proc forkBackLog*(
|
|||
clone.roFilter = instr.fil
|
||||
ok clone
|
||||
|
||||
proc forkBackLog*(
|
||||
proc forkByJournal*(
|
||||
db: AristoDbRef;
|
||||
fid: FilterID;
|
||||
fid: Option[FilterID];
|
||||
earlierOK = false;
|
||||
): Result[AristoDbRef,AristoError] =
|
||||
## ..
|
||||
## Variant of `forkByJounal()` for forking to a particular filter ID (or the
|
||||
## nearest predecessot if `earlierOK` is passed `true`.) if there is some
|
||||
## filter ID `fid`.
|
||||
##
|
||||
## Otherwise, the oldest filter is forked to (regardless of the value of
|
||||
## `earlierOK`.)
|
||||
##
|
||||
let be = db.backend
|
||||
if be.isNil:
|
||||
return err(FilBackendMissing)
|
||||
|
||||
let fip = ? be.getFilterFromFifo(fid, earlierOK)
|
||||
db.forkBackLog fip.inx
|
||||
let fip = ? be.getFromJournal(fid, earlierOK)
|
||||
db.forkByJournal fip.inx
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
# except according to those terms.
|
||||
|
||||
import
|
||||
std/tables,
|
||||
std/[options, tables],
|
||||
results,
|
||||
".."/[aristo_desc, aristo_desc/desc_backend],
|
||||
"."/[filter_merge, filter_scheduler]
|
||||
|
@ -40,18 +40,24 @@ template joinFiltersOrReturn(upper, lower: FilterRef): FilterRef =
|
|||
return err(rc.error[1])
|
||||
rc.value
|
||||
|
||||
template nextFidOrReturn(be: BackendRef): FilterID =
|
||||
template getNextFidOrReturn(be: BackendRef; fid: Option[FilterID]): FilterID =
|
||||
## Get next free filter ID, or exit function using this wrapper
|
||||
var fid = FilterID(1)
|
||||
block:
|
||||
let qid = be.journal[0]
|
||||
if qid.isValid:
|
||||
let rc = be.getFilFn qid
|
||||
if rc.isOK:
|
||||
fid = rc.value.fid + 1
|
||||
elif rc.error != GetFilNotFound:
|
||||
return err(rc.error)
|
||||
fid
|
||||
var nxtFid = fid.get(otherwise = FilterID(1))
|
||||
|
||||
let qid = be.journal[0]
|
||||
if qid.isValid:
|
||||
let rc = be.getFilFn qid
|
||||
if rc.isErr:
|
||||
# Must exist when `qid` exists
|
||||
return err(rc.error)
|
||||
elif fid.isNone:
|
||||
# Stepwise increase is the default
|
||||
nxtFid = rc.value.fid + 1
|
||||
elif nxtFid <= rc.value.fid:
|
||||
# The bespoke filter IDs must be greater than the existing ones
|
||||
return err(FilQuBespokeFidTooSmall)
|
||||
|
||||
nxtFid
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
|
@ -60,6 +66,7 @@ template nextFidOrReturn(be: BackendRef): FilterID =
|
|||
proc fifosStore*(
|
||||
be: BackendRef; # Database backend
|
||||
filter: FilterRef; # Filter to save
|
||||
fid: Option[FilterID]; # Next filter ID (if any)
|
||||
): Result[FifoInstr,AristoError] =
|
||||
## Calculate backend instructions for storing the arguent `filter` on the
|
||||
## argument backend `be`.
|
||||
|
@ -126,7 +133,7 @@ proc fifosStore*(
|
|||
return err(FilExecSaveMissing)
|
||||
|
||||
# Set next filter ID
|
||||
filter.fid = be.nextFidOrReturn
|
||||
filter.fid = be.getNextFidOrReturn fid
|
||||
|
||||
ok instr
|
||||
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
# except according to those terms.
|
||||
|
||||
import
|
||||
std/tables,
|
||||
std/[options, tables],
|
||||
eth/common,
|
||||
results,
|
||||
".."/[aristo_desc, aristo_desc/desc_backend, aristo_get],
|
||||
|
@ -21,11 +21,6 @@ type
|
|||
be*: Hash256 ## Backend state root
|
||||
fg*: Hash256 ## Layer or filter implied state root
|
||||
|
||||
FilterIndexPair* = object
|
||||
## Helper structure for fetching journal filters from cascaded fifo
|
||||
inx*: int ## Non negative fifo index
|
||||
fil*: FilterRef ## Valid filter
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -79,27 +74,41 @@ proc getLayerStateRoots*(
|
|||
err(FilStateRootMismatch)
|
||||
|
||||
|
||||
proc getFilterFromFifo*(
|
||||
proc getFromJournal*(
|
||||
be: BackendRef;
|
||||
fid: FilterID;
|
||||
fid = none(FilterID);
|
||||
earlierOK = false;
|
||||
): Result[FilterIndexPair,AristoError] =
|
||||
## Find filter on cascaded fifos and return its index and filter ID.
|
||||
## If there is some argument `fid`, find the filter on the journal with ID
|
||||
## not larger than `fid` (i e. the resulting filter must not be more recent.)
|
||||
##
|
||||
## If the argument `earlierOK` is passed `false`, the function succeeds only
|
||||
## if the filter ID of the returned filter is equal to the argument `fid`.
|
||||
##
|
||||
## In case that there is no argument `fid`, the filter with the smallest
|
||||
## filter ID (i.e. the oldest filter) is returned. here, the argument
|
||||
## `earlierOK` is ignored.
|
||||
##
|
||||
var cache = (QueueID(0),FilterRef(nil)) # Avoids double lookup for last entry
|
||||
proc qid2fid(qid: QueueID): FilterID =
|
||||
if qid == cache[0]: # Avoids double lookup for last entry
|
||||
return cache[1].fid
|
||||
let rc = be.getFilFn qid
|
||||
if rc.isErr:
|
||||
return FilterID(0)
|
||||
cache = (qid,rc.value)
|
||||
rc.value.fid
|
||||
|
||||
if be.journal.isNil:
|
||||
return err(FilQuSchedDisabled)
|
||||
|
||||
let qid = be.journal.le(fid, qid2fid, forceEQ = not earlierOK)
|
||||
var cache = (QueueID(0),FilterRef(nil)) # Avoids double lookup for last entry
|
||||
proc qid2fid(qid: QueueID): Result[FilterID,void] =
|
||||
if qid == cache[0]: # Avoids double lookup for last entry
|
||||
return ok cache[1].fid
|
||||
let fil = be.getFilFn(qid).valueOr:
|
||||
return err()
|
||||
cache = (qid,fil)
|
||||
ok fil.fid
|
||||
|
||||
let qid = block:
|
||||
if fid.isNone:
|
||||
# Get oldest filter
|
||||
be.journal[^1]
|
||||
else:
|
||||
# Find filter with ID not smaller than `fid`
|
||||
be.journal.le(fid.unsafeGet, qid2fid, forceEQ = not earlierOK)
|
||||
|
||||
if not qid.isValid:
|
||||
return err(FilFilterNotFound)
|
||||
|
||||
|
@ -108,10 +117,8 @@ proc getFilterFromFifo*(
|
|||
if cache[0] == qid:
|
||||
cache[1]
|
||||
else:
|
||||
let rc = be.getFilFn qid
|
||||
if rc.isErr:
|
||||
return err(rc.error)
|
||||
rc.value
|
||||
be.getFilFn(qid).valueOr:
|
||||
return err(error)
|
||||
|
||||
fip.inx = be.journal[qid]
|
||||
if fip.inx < 0:
|
||||
|
@ -120,7 +127,7 @@ proc getFilterFromFifo*(
|
|||
ok fip
|
||||
|
||||
|
||||
proc getFilterOverlap*(
|
||||
proc getJournalOverlap*(
|
||||
be: BackendRef;
|
||||
filter: FilterRef;
|
||||
): int =
|
||||
|
@ -130,15 +137,13 @@ proc getFilterOverlap*(
|
|||
## longer than one items. Only single step filter overlaps are guaranteed
|
||||
## to be found.
|
||||
##
|
||||
# Check against the top-fifo entry
|
||||
# Check against the top-fifo entry.
|
||||
let qid = be.journal[0]
|
||||
if not qid.isValid:
|
||||
return 0
|
||||
let top = block:
|
||||
let rc = be.getFilFn qid
|
||||
if rc.isErr:
|
||||
return 0
|
||||
rc.value
|
||||
|
||||
let top = be.getFilFn(qid).valueOr:
|
||||
return 0
|
||||
|
||||
# The `filter` must match the `top`
|
||||
if filter.src != top.src:
|
||||
|
@ -150,10 +155,10 @@ proc getFilterOverlap*(
|
|||
|
||||
# Check against some stored filter IDs
|
||||
if filter.isValid:
|
||||
let rc = be.getFilterFromFifo(filter.fid, earlierOK=true)
|
||||
if rc.isOk:
|
||||
if filter.trg == rc.value.fil.trg:
|
||||
return 1 + rc.value.inx
|
||||
let fp = be.getFromJournal(some(filter.fid), earlierOK=true).valueOr:
|
||||
return 0
|
||||
if filter.trg == fp.fil.trg:
|
||||
return 1 + fp.inx
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
|
||||
import
|
||||
std/[algorithm, sequtils, typetraits],
|
||||
results,
|
||||
".."/[aristo_constants, aristo_desc]
|
||||
|
||||
type
|
||||
|
@ -28,20 +29,23 @@ type
|
|||
DequQid ## Store merged local queue items
|
||||
DelQid ## Delete entry from last overflow queue
|
||||
|
||||
QuFilMap* = proc(qid: QueueID): FilterID {.gcsafe, raises: [].}
|
||||
## The map `fn: QueueID -> FilterID` can be augmented to a strictly
|
||||
## *decreasing* map `g: {0 .. N} -> FilterID`, with `g = fn([])`
|
||||
QuFilMap* = proc(qid: QueueID): Result[FilterID,void] {.gcsafe, raises: [].}
|
||||
## A map `fn: QueueID -> FilterID` of type `QuFilMap` must preserve the
|
||||
## order relation on the image of `fn()` defined as
|
||||
##
|
||||
## * `i < j` => `fn(fifo[j]) < fn(fifo[i])`
|
||||
## * `fn(fifo[j]) < fn(fifo[i])` <=> `i < j`
|
||||
##
|
||||
## for a `fifo` of type `QidSchedRef`, `N = fifo.len` and the function
|
||||
## `[]: {0 .. N} -> QueueID` as defined below.
|
||||
## where `[]` is defined as the index function `[]: {0 .. N-1} -> QueueID`,
|
||||
## `N = fifo.len`.
|
||||
##
|
||||
## This *decreasing* requirement can be seen as a generalisation of a
|
||||
## block chain scenario with `i`, `j` backward steps into the past and
|
||||
## the `FilterID` as the block number.
|
||||
## Any injective function `fn()` (aka monomorphism) will do.
|
||||
##
|
||||
## In order to flag an error, `FilterID(0)` must be returned.
|
||||
## This definition decouples access to ordered journal records from the
|
||||
## storage of these records on the database. The records are accessed via
|
||||
## `QueueID` type keys while the order is defined by a `FilterID` type
|
||||
## scalar.
|
||||
##
|
||||
## In order to flag an error, `err()` must be returned.
|
||||
|
||||
const
|
||||
ZeroQidPair = (QueueID(0),QueueID(0))
|
||||
|
@ -610,31 +614,29 @@ func `[]`*(
|
|||
|
||||
proc le*(
|
||||
fifo: QidSchedRef; # Cascaded fifos descriptor
|
||||
fid: FilterID; # Upper bound
|
||||
fid: FilterID; # Upper (or right) bound
|
||||
fn: QuFilMap; # QueueID/FilterID mapping
|
||||
forceEQ = false; # Check for strict equality
|
||||
): QueueID =
|
||||
## Find the `qid` address of type `QueueID` with `fn(qid) <= fid` and
|
||||
## `fid < fn(qid+1)`.
|
||||
## Find the `qid` address of type `QueueID` with `fn(qid) <= fid` with
|
||||
## maximal `fn(qid)`. The requirements on argument map `fn()` of type
|
||||
## `QuFilMap` has been commented on at the type definition.
|
||||
##
|
||||
## If `fn()` returns `FilterID(0)`, then this function returns `QueueID(0)`
|
||||
##
|
||||
## The argument type `QuFilMap` of map `fn()` has been commented on earlier.
|
||||
## This function returns `QueueID(0)` if `fn()` returns `err()` at some
|
||||
## stage of the algorithm applied here.
|
||||
##
|
||||
var
|
||||
left = 0
|
||||
right = fifo.len - 1
|
||||
|
||||
template toFid(qid: QueueID): FilterID =
|
||||
let w = fn(qid)
|
||||
if not w.isValid:
|
||||
fn(qid).valueOr:
|
||||
return QueueID(0) # exit hosting function environment
|
||||
w
|
||||
|
||||
# The algorithm below tryes to avoid `toFid()` as much as possible because
|
||||
# it might invoke an extra database lookup.
|
||||
# The algorithm below trys to avoid `toFid()` as much as possible because
|
||||
# it might invoke some extra database lookup.
|
||||
|
||||
if fid.isValid and 0 <= right:
|
||||
if 0 <= right:
|
||||
# Check left fringe
|
||||
let
|
||||
maxQid = fifo[left]
|
||||
|
@ -656,7 +658,7 @@ proc le*(
|
|||
# So `fifo[right] < fid`
|
||||
|
||||
# Bisection
|
||||
var rightQid = minQid # Might be used as end result
|
||||
var rightQid = minQid # Might be used as end result
|
||||
while 1 < right - left:
|
||||
let
|
||||
pivot = (left + right) div 2
|
||||
|
@ -671,10 +673,10 @@ proc le*(
|
|||
#
|
||||
# with `fifo[left].toFid > fid > fifo[right].toFid`
|
||||
#
|
||||
if pivFid < fid: # fid >= fifo[half].toFid:
|
||||
if pivFid < fid: # fid >= fifo[half].toFid:
|
||||
right = pivot
|
||||
rightQid = pivQid
|
||||
elif fid < pivFid: # fifo[half].toFid > fid
|
||||
elif fid < pivFid: # fifo[half].toFid > fid
|
||||
left = pivot
|
||||
else:
|
||||
return pivQid
|
||||
|
@ -682,11 +684,12 @@ proc le*(
|
|||
# Now: `fifo[right].toFid < fid < fifo[left].toFid` (and `right == left+1`).
|
||||
if not forceEQ:
|
||||
# Make sure that `fifo[right].toFid` exists
|
||||
if rightQid.fn.isValid:
|
||||
if fn(rightQid).isOk:
|
||||
return rightQid
|
||||
|
||||
# Otherwise QueueID(0)
|
||||
|
||||
|
||||
proc eq*(
|
||||
fifo: QidSchedRef; # Cascaded fifos descriptor
|
||||
fid: FilterID; # Filter ID to search for
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/tables,
|
||||
std/[options, tables],
|
||||
results,
|
||||
"."/[aristo_desc, aristo_filter, aristo_get, aristo_layers, aristo_hashify]
|
||||
|
||||
|
@ -69,6 +69,72 @@ iterator txWalk(tx: AristoTxRef): (AristoTxRef,LayerRef,AristoError) =
|
|||
|
||||
yield (tx,layer,AristoError(0))
|
||||
|
||||
# ---------
|
||||
|
||||
proc stowImpl(
|
||||
db: AristoDbRef; # Database
|
||||
nxtFid: Option[FilterID]; # Next filter ID (zero is OK)
|
||||
persistent: bool; # Stage only unless `true`
|
||||
chunkedMpt: bool; # Partial data (e.g. from `snap`)
|
||||
): Result[void,AristoError] =
|
||||
## Worker for `stow()` variants.
|
||||
##
|
||||
if not db.txRef.isNil:
|
||||
return err(TxPendingTx)
|
||||
if 0 < db.stack.len:
|
||||
return err(TxStackGarbled)
|
||||
if persistent and not db.canResolveBackendFilter():
|
||||
return err(TxBackendNotWritable)
|
||||
|
||||
# Update Merkle hashes (unless disabled)
|
||||
db.hashify().isOkOr:
|
||||
return err(error[1])
|
||||
|
||||
let fwd = db.fwdFilter(db.top, chunkedMpt).valueOr:
|
||||
return err(error[1])
|
||||
|
||||
if fwd.isValid:
|
||||
# Merge `top` layer into `roFilter`
|
||||
db.merge(fwd).isOkOr:
|
||||
return err(error[1])
|
||||
|
||||
# Special treatment for `snap` proofs (aka `chunkedMpt`)
|
||||
let final =
|
||||
if chunkedMpt: LayerFinalRef(fRpp: db.top.final.fRpp)
|
||||
else: LayerFinalRef()
|
||||
|
||||
# New empty top layer (probably with `snap` proofs and `vGen` carry over)
|
||||
db.top = LayerRef(
|
||||
delta: LayerDeltaRef(),
|
||||
final: final)
|
||||
if db.roFilter.isValid:
|
||||
db.top.final.vGen = db.roFilter.vGen
|
||||
else:
|
||||
let rc = db.getIdgUbe()
|
||||
if rc.isOk:
|
||||
db.top.final.vGen = rc.value
|
||||
else:
|
||||
# It is OK if there was no `Idg`. Otherwise something serious happened
|
||||
# and there is no way to recover easily.
|
||||
doAssert rc.error == GetIdgNotFound
|
||||
|
||||
if persistent:
|
||||
# Merge `roFiler` into persistent tables
|
||||
? db.resolveBackendFilter nxtFid
|
||||
db.roFilter = FilterRef(nil)
|
||||
|
||||
# Special treatment for `snap` proofs (aka `chunkedMpt`)
|
||||
let final =
|
||||
if chunkedMpt: LayerFinalRef(vGen: db.vGen, fRpp: db.top.final.fRpp)
|
||||
else: LayerFinalRef(vGen: db.vGen)
|
||||
|
||||
# New empty top layer (probably with `snap` proofs carry over)
|
||||
db.top = LayerRef(
|
||||
delta: LayerDeltaRef(),
|
||||
final: final,
|
||||
txUid: db.top.txUid)
|
||||
ok()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions, getters
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -397,84 +463,54 @@ proc collapse*(
|
|||
ok()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions: save database
|
||||
# Public functions: save to database
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc persist*(
|
||||
db: AristoDbRef; # Database
|
||||
nxtFid = none(FilterID); # Next filter ID (zero is OK)
|
||||
chunkedMpt = false; # Partial data (e.g. from `snap`)
|
||||
): Result[void,AristoError] =
|
||||
## Persistently store data onto backend database. If the system is running
|
||||
## without a database backend, the function returns immediately with an
|
||||
## error. The same happens if there is a pending transaction.
|
||||
##
|
||||
## The function merges all staged data from the top layer cache onto the
|
||||
## backend stage area. After that, the top layer cache is cleared.
|
||||
##
|
||||
## Finally, the staged data are merged into the physical backend database
|
||||
## and the staged data area is cleared. Wile performing this last step,
|
||||
## the recovery journal is updated (if available.)
|
||||
##
|
||||
## If the argument `nxtFid` is passed non-zero, it will be the ID for the
|
||||
## next recovery journal record. If non-zero, this ID must be greater than
|
||||
## all previous IDs (e.g. block number when stowing after block execution.)
|
||||
##
|
||||
## Staging the top layer cache might fail with a partial MPT when it is
|
||||
## set up from partial MPT chunks as it happens with `snap` sync processing.
|
||||
## In this case, the `chunkedMpt` argument must be set `true` (see alse
|
||||
## `fwdFilter()`.)
|
||||
##
|
||||
db.stowImpl(nxtFid, persistent=true, chunkedMpt=chunkedMpt)
|
||||
|
||||
proc stow*(
|
||||
db: AristoDbRef; # Database
|
||||
persistent = false; # Stage only unless `true`
|
||||
chunkedMpt = false; # Partial data (e.g. from `snap`)
|
||||
): Result[void,AristoError] =
|
||||
## If there is no backend while the `persistent` argument is set `true`,
|
||||
## the function returns immediately with an error. The same happens if there
|
||||
## is a pending transaction.
|
||||
## This function is similar to `persist()` stopping short of performing the
|
||||
## final step stoting on the persistent database. It fails if there is a
|
||||
## pending transaction.
|
||||
##
|
||||
## The function then merges the data from the top layer cache into the
|
||||
## backend stage area. After that, the top layer cache is cleared.
|
||||
## The function merges all staged data from the top layer cache onto the
|
||||
## backend stage area and leaves it there. This function can be seen as
|
||||
## a sort of a bottom level transaction `commit()`.
|
||||
##
|
||||
## Staging the top layer cache might fail withh a partial MPT when it is
|
||||
## Staging the top layer cache might fail with a partial MPT when it is
|
||||
## set up from partial MPT chunks as it happens with `snap` sync processing.
|
||||
## In this case, the `chunkedMpt` argument must be set `true` (see alse
|
||||
## `fwdFilter`.)
|
||||
## `fwdFilter()`.)
|
||||
##
|
||||
## If the argument `persistent` is set `true`, all the staged data are merged
|
||||
## into the physical backend database and the staged data area is cleared.
|
||||
##
|
||||
if not db.txRef.isNil:
|
||||
return err(TxPendingTx)
|
||||
if 0 < db.stack.len:
|
||||
return err(TxStackGarbled)
|
||||
if persistent and not db.canResolveBackendFilter():
|
||||
return err(TxBackendNotWritable)
|
||||
|
||||
# Updatre Merkle hashes (unless disabled)
|
||||
db.hashify().isOkOr:
|
||||
return err(error[1])
|
||||
|
||||
let fwd = db.fwdFilter(db.top, chunkedMpt).valueOr:
|
||||
return err(error[1])
|
||||
|
||||
if fwd.isValid:
|
||||
# Merge `top` layer into `roFilter`
|
||||
db.merge(fwd).isOkOr:
|
||||
return err(error[1])
|
||||
|
||||
# Special treatment for `snap` proofs (aka `chunkedMpt`)
|
||||
let final =
|
||||
if chunkedMpt: LayerFinalRef(fRpp: db.top.final.fRpp)
|
||||
else: LayerFinalRef()
|
||||
|
||||
# New empty top layer (probably with `snap` proofs and `vGen` carry over)
|
||||
db.top = LayerRef(
|
||||
delta: LayerDeltaRef(),
|
||||
final: final)
|
||||
if db.roFilter.isValid:
|
||||
db.top.final.vGen = db.roFilter.vGen
|
||||
else:
|
||||
let rc = db.getIdgUbe()
|
||||
if rc.isOk:
|
||||
db.top.final.vGen = rc.value
|
||||
else:
|
||||
# It is OK if there was no `Idg`. Otherwise something serious happened
|
||||
# and there is no way to recover easily.
|
||||
doAssert rc.error == GetIdgNotFound
|
||||
|
||||
if persistent:
|
||||
# Merge `roFiler` into persistent tables
|
||||
? db.resolveBackendFilter()
|
||||
db.roFilter = FilterRef(nil)
|
||||
|
||||
# Special treatment for `snap` proofs (aka `chunkedMpt`)
|
||||
let final =
|
||||
if chunkedMpt: LayerFinalRef(vGen: db.vGen, fRpp: db.top.final.fRpp)
|
||||
else: LayerFinalRef(vGen: db.vGen)
|
||||
|
||||
# New empty top layer (probably with `snap` proofs carry over)
|
||||
db.top = LayerRef(
|
||||
delta: LayerDeltaRef(),
|
||||
final: final,
|
||||
txUid: db.top.txUid)
|
||||
ok()
|
||||
db.stowImpl(nxtFid=none(FilterID), persistent=false, chunkedMpt=chunkedMpt)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/tables,
|
||||
std/[tables, typetraits],
|
||||
eth/common,
|
||||
results,
|
||||
../../aristo as use_ari,
|
||||
|
@ -132,10 +132,13 @@ proc baseMethods(db: AristoCoreDbRef): CoreDbBaseFns =
|
|||
db.tracer.push(flags)
|
||||
CoreDxCaptRef(methods: db.tracer.cptMethods)
|
||||
|
||||
proc persistent(): CoreDbRc[void] =
|
||||
proc persistent(bn: Option[BlockNumber]): CoreDbRc[void] =
|
||||
const info = "persistentFn()"
|
||||
? aBase.persistent info
|
||||
let fid =
|
||||
if bn.isNone: none(FilterID)
|
||||
else: some(bn.unsafeGet.truncate(uint64).FilterID)
|
||||
? kBase.persistent info
|
||||
? aBase.persistent(fid, info)
|
||||
ok()
|
||||
|
||||
CoreDbBaseFns(
|
||||
|
@ -179,8 +182,8 @@ proc baseMethods(db: AristoCoreDbRef): CoreDbBaseFns =
|
|||
newCaptureFn: proc(flags: set[CoreDbCaptFlags]): CoreDbRc[CoreDxCaptRef] =
|
||||
ok(db.bless flags.tracerSetup()),
|
||||
|
||||
persistentFn: proc(): CoreDbRc[void] =
|
||||
persistent())
|
||||
persistentFn: proc(bn: Option[BlockNumber]): CoreDbRc[void] =
|
||||
persistent(bn))
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public constructor and helper
|
||||
|
@ -242,10 +245,14 @@ func toAristo*(mBe: CoreDbMptBackendRef): AristoDbRef =
|
|||
if not mBe.isNil and mBe.parent.isAristo:
|
||||
return mBe.AristoCoreDbMptBE.adb
|
||||
|
||||
proc toAristoOldestStateRoot*(mBe: CoreDbMptBackendRef): Hash256 =
|
||||
proc toAristoOldestState*(
|
||||
mBe: CoreDbMptBackendRef;
|
||||
): tuple[stateRoot: Hash256, blockNumber: BlockNumber] =
|
||||
if not mBe.isNil and mBe.parent.isAristo:
|
||||
return mBe.parent.AristoCoreDbRef.adbBase.toJournalOldestStateRoot()
|
||||
EMPTY_ROOT_HASH
|
||||
let fil = mBe.parent.AristoCoreDbRef.adbBase.getFromJournal none(FilterID)
|
||||
if not fil.isNil:
|
||||
return (fil.trg, fil.fid.distinctBase.toBlockNumber)
|
||||
(EMPTY_ROOT_HASH, 0.toBlockNumber)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public aristo iterators
|
||||
|
|
|
@ -11,13 +11,13 @@
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[strutils, typetraits],
|
||||
std/[options, strutils, typetraits],
|
||||
chronicles,
|
||||
eth/[common, trie/nibbles],
|
||||
stew/byteutils,
|
||||
results,
|
||||
../../../aristo,
|
||||
../../../aristo/aristo_filter/filter_scheduler,
|
||||
../../../aristo/aristo_desc,
|
||||
../../base,
|
||||
../../base/base_desc,
|
||||
./common_desc
|
||||
|
@ -567,19 +567,12 @@ func toVoidRc*[T](
|
|||
return ok()
|
||||
err((VoidVID,rc.error).toError(base, info, error))
|
||||
|
||||
proc toJournalOldestStateRoot*(base: AristoBaseRef): Hash256 =
|
||||
let
|
||||
adb = base.ctx.mpt
|
||||
be = adb.backend
|
||||
proc getFromJournal*(base: AristoBaseRef; fid: Option[FilterID]): FilterRef =
|
||||
let be = base.ctx.mpt.backend
|
||||
if not be.isNil:
|
||||
let jrn = be.journal
|
||||
if not jrn.isNil:
|
||||
let qid = jrn[^1]
|
||||
if qid.isValid:
|
||||
let rc = base.api.getFilUbe(adb, qid)
|
||||
if rc.isOk:
|
||||
return rc.value.trg
|
||||
EMPTY_ROOT_HASH
|
||||
let fp = base.api.getFromJournal(be, fid, earlierOK=true).valueOr:
|
||||
return FilterRef(nil)
|
||||
return fp.fil
|
||||
|
||||
# ---------------------
|
||||
|
||||
|
@ -682,12 +675,13 @@ proc swapCtx*(base: AristoBaseRef; ctx: CoreDbCtxRef): CoreDbCtxRef =
|
|||
|
||||
proc persistent*(
|
||||
base: AristoBaseRef;
|
||||
fid: Option[FilterID];
|
||||
info: static[string];
|
||||
): CoreDbRc[void] =
|
||||
let
|
||||
api = base.api
|
||||
mpt = base.ctx.mpt
|
||||
rc = api.stow(mpt, persistent = true)
|
||||
rc = api.persist(mpt, fid)
|
||||
if rc.isOk:
|
||||
ok()
|
||||
elif api.level(mpt) == 0:
|
||||
|
|
|
@ -499,7 +499,7 @@ proc baseMethods(
|
|||
let fns = db.newRecorderRef(flgs).cptMethods(db)
|
||||
ok(db.bless CoreDxCaptRef(methods: fns)),
|
||||
|
||||
persistentFn: proc(): CoreDbRc[void] =
|
||||
persistentFn: proc(bn: Option[BlockNumber]): CoreDbRc[void] =
|
||||
# Emulate `Aristo` behaviour
|
||||
if 0 < db.txLevel():
|
||||
const info = "persistentFn()"
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/typetraits,
|
||||
std/[options, typetraits],
|
||||
chronicles,
|
||||
eth/common,
|
||||
results,
|
||||
|
@ -882,7 +882,9 @@ proc level*(db: CoreDbRef): int =
|
|||
result = db.methods.levelFn()
|
||||
db.ifTrackNewApi: debug newApiTxt, api, elapsed, result
|
||||
|
||||
proc persistent*(db: CoreDbRef): CoreDbRc[void] {.discardable.} =
|
||||
proc persistent*(
|
||||
db: CoreDbRef;
|
||||
): CoreDbRc[void] {.discardable.} =
|
||||
## For the legacy database, this function has no effect and succeeds always.
|
||||
## It will nevertheless return a discardable error if there is a pending
|
||||
## transaction (i.e. `db.level() == 0`.)
|
||||
|
@ -896,9 +898,38 @@ proc persistent*(db: CoreDbRef): CoreDbRc[void] {.discardable.} =
|
|||
## treated separately (see `saveOffSite()`.)
|
||||
##
|
||||
db.setTrackNewApi BasePersistentFn
|
||||
result = db.methods.persistentFn()
|
||||
result = db.methods.persistentFn none(BlockNumber)
|
||||
db.ifTrackNewApi: debug newApiTxt, api, elapsed, result
|
||||
|
||||
proc persistent*(
|
||||
db: CoreDbRef;
|
||||
blockNumber: BlockNumber;
|
||||
): CoreDbRc[void] {.discardable.} =
|
||||
## Variant of `persistent()` which stores a block number within the recovery
|
||||
## journal record. This recoed will be addressable by the `blockNumber` (e.g.
|
||||
## for recovery.) The argument block number `blockNumber` must be greater
|
||||
## than all previously stored block numbers.
|
||||
##
|
||||
## The function is intended to be used in a way so hat the argument block
|
||||
## number `blockNumber` is associated with the state root to be recovered
|
||||
## from a particular journal entry. This means that the correct block number
|
||||
## will be the one of the state *before* a state change takes place. Using
|
||||
## it that way, `pesistent()` must only be run after some blocks were fully
|
||||
## executed.
|
||||
##
|
||||
## Example:
|
||||
## ::
|
||||
## # Save block number for the current state
|
||||
## let stateBlockNumber = db.getCanonicalHead().blockNumber
|
||||
## ..
|
||||
## # Process blocks
|
||||
## ..
|
||||
## db.persistent(stateBlockNumber)
|
||||
##
|
||||
db.setTrackNewApi BasePersistentFn
|
||||
result = db.methods.persistentFn some(blockNumber)
|
||||
db.ifTrackNewApi: debug newApiTxt, api, elapsed, blockNumber, result
|
||||
|
||||
proc newTransaction*(db: CoreDbRef): CoreDbRc[CoreDxTxRef] =
|
||||
## Constructor
|
||||
##
|
||||
|
|
|
@ -115,7 +115,8 @@ type
|
|||
CoreDbBaseNewCaptFn* =
|
||||
proc(flgs: set[CoreDbCaptFlags]): CoreDbRc[CoreDxCaptRef] {.noRaise.}
|
||||
CoreDbBaseGetCaptFn* = proc(): CoreDbRc[CoreDxCaptRef] {.noRaise.}
|
||||
CoreDbBasePersistentFn* = proc(): CoreDbRc[void] {.noRaise.}
|
||||
CoreDbBasePersistentFn* =
|
||||
proc(bn: Option[BlockNumber]): CoreDbRc[void] {.noRaise.}
|
||||
|
||||
CoreDbBaseFns* = object
|
||||
destroyFn*: CoreDbBaseDestroyFn
|
||||
|
@ -227,7 +228,6 @@ type
|
|||
CoreDbAccHasPathFn* = proc(k: EthAddress): CoreDbRc[bool] {.noRaise.}
|
||||
CoreDbAccGetColFn* = proc(): CoreDbColRef {.noRaise.}
|
||||
CoreDbAccIsPruningFn* = proc(): bool {.noRaise.}
|
||||
CoreDbAccPersistentFn* = proc(): CoreDbRc[void] {.noRaise.}
|
||||
CoreDbAccForgetFn* = proc(): CoreDbRc[void] {.noRaise.}
|
||||
|
||||
CoreDbAccFns* = object
|
||||
|
@ -240,7 +240,6 @@ type
|
|||
hasPathFn*: CoreDbAccHasPathFn
|
||||
getColFn*: CoreDbAccGetColFn
|
||||
isPruningFn*: CoreDbAccIsPruningFn
|
||||
persistentFn*: CoreDbAccPersistentFn
|
||||
|
||||
|
||||
# --------------------------------------------------
|
||||
|
|
|
@ -18,7 +18,7 @@ import
|
|||
chronicles,
|
||||
eth/[common, rlp],
|
||||
results,
|
||||
stew/[byteutils, endians2],
|
||||
stew/byteutils,
|
||||
"../.."/[errors, constants],
|
||||
".."/[aristo, storage_types],
|
||||
./backend/aristo_db,
|
||||
|
@ -89,23 +89,6 @@ template discardRlpException(info: static[string]; code: untyped) =
|
|||
except RlpError as e:
|
||||
warn logTxt info, error=($e.name), msg=e.msg
|
||||
|
||||
# ---------
|
||||
|
||||
func to(bn: BlockNumber; T: type Blob): T =
|
||||
if bn <= high(uint64).toBlockNumber:
|
||||
bn.truncate(uint64).toBytesBE.toSeq
|
||||
else:
|
||||
bn.toBytesBE.toSeq
|
||||
|
||||
func to(data: openArray[byte]; T: type BlockNumber): T =
|
||||
case data.len:
|
||||
of 8:
|
||||
return uint64.fromBytesBE(data).toBlockNumber
|
||||
of 32:
|
||||
return UInt256.fromBytesBE(data).toBlockNumber
|
||||
else:
|
||||
discard
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private iterators
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -347,25 +330,19 @@ proc exists*(db: CoreDbRef, hash: Hash256): bool =
|
|||
warn logTxt "exisis()", hash, action="hasKey()", error=($$error)
|
||||
return false
|
||||
|
||||
proc getBlockNumber*(db: CoreDbRef; stateRoot: Hash256): BlockNumber =
|
||||
const info = "getBlockNumber()"
|
||||
if stateRoot != EMPTY_ROOT_HASH:
|
||||
let
|
||||
kvt = db.newKvt()
|
||||
data = kvt.get(stRootToBlockNumKey(stateRoot).toOpenArray).valueOr:
|
||||
if error.error != KvtNotFound:
|
||||
warn logTxt info, stateRoot, action="get()", error=($$error)
|
||||
return
|
||||
return data.to(BlockNumber)
|
||||
|
||||
proc getOldestJournalBlockNumber*(db: CoreDbRef): BlockNumber =
|
||||
proc getOldestJournalBlockNumber*(
|
||||
db: CoreDbRef;
|
||||
): BlockNumber
|
||||
{.gcsafe, raises: [RlpError].} =
|
||||
## Returns the block number implied by the database journal if there is any,
|
||||
## or `BlockNumber(0)`. At the moment, only the `Aristo` database has a
|
||||
## journal.
|
||||
##
|
||||
let be = db.ctx.getMpt(CtGeneric).backend
|
||||
if be.parent.isAristo:
|
||||
return db.getBlockNumber be.toAristoOldestStateRoot()
|
||||
let st = db.ctx.getMpt(CtGeneric).backend.toAristoOldestState
|
||||
var header: BlockHeader
|
||||
if db.getBlockHeader(st.blockNumber, header):
|
||||
doAssert header.stateRoot == st.stateRoot or st.blockNumber == 0
|
||||
return st.blockNumber
|
||||
|
||||
|
||||
proc getBlockHeader*(
|
||||
|
@ -561,18 +538,8 @@ proc getAncestorsHashes*(
|
|||
dec ancestorCount
|
||||
|
||||
proc addBlockNumberToHashLookup*(db: CoreDbRef; header: BlockHeader) =
|
||||
## The function stores lookup for
|
||||
## ::
|
||||
## header.stateRoot -> header.blockNumber -> header.hash()
|
||||
##
|
||||
let
|
||||
blockNumberKey = blockNumberToHashKey(header.blockNumber)
|
||||
stRootKey = stRootToBlockNumKey(header.stateRoot)
|
||||
kvt = db.newKvt()
|
||||
kvt.put(stRootKey.toOpenArray, header.blockNumber.to(Blob)).isOkOr:
|
||||
warn logTxt "addBlockNumberToHashLookup()",
|
||||
stRootKey, action="put()", `error`=($$error)
|
||||
kvt.put(blockNumberKey.toOpenArray, rlp.encode(header.hash)).isOkOr:
|
||||
let blockNumberKey = blockNumberToHashKey(header.blockNumber)
|
||||
db.newKvt.put(blockNumberKey.toOpenArray, rlp.encode(header.hash)).isOkOr:
|
||||
warn logTxt "addBlockNumberToHashLookup()",
|
||||
blockNumberKey, action="put()", error=($$error)
|
||||
|
||||
|
@ -599,7 +566,7 @@ proc persistTransactions*(
|
|||
warn logTxt info, idx, action="merge()", error=($$error)
|
||||
return EMPTY_ROOT_HASH
|
||||
kvt.put(blockKey.toOpenArray, rlp.encode(txKey)).isOkOr:
|
||||
warn logTxt info, blockKey, action="put()", error=($$error)
|
||||
trace logTxt info, blockKey, action="put()", error=($$error)
|
||||
return EMPTY_ROOT_HASH
|
||||
mpt.getColumn.state.valueOr:
|
||||
when extraTraceMessages:
|
||||
|
@ -624,7 +591,6 @@ proc forgetHistory*(
|
|||
if db.getBlockHeader(blockHash, header):
|
||||
# delete blockHash->header, stateRoot->blockNum
|
||||
discard kvt.del(genericHashKey(blockHash).toOpenArray)
|
||||
discard kvt.del(stRootToBlockNumKey(header.stateRoot).toOpenArray)
|
||||
|
||||
proc getTransaction*(
|
||||
db: CoreDbRef;
|
||||
|
@ -926,12 +892,6 @@ proc persistHeaderToDbWithoutSetHead*(
|
|||
let
|
||||
kvt = db.newKvt()
|
||||
scoreKey = blockHashToScoreKey(headerHash)
|
||||
|
||||
# This extra call `addBlockNumberToHashLookup()` has been added in order
|
||||
# to access the burrent block by the state root. So it can be deleted
|
||||
# if not needed, anymore.
|
||||
db.addBlockNumberToHashLookup(header)
|
||||
|
||||
kvt.put(scoreKey.toOpenArray, rlp.encode(score)).isOkOr:
|
||||
warn logTxt "persistHeaderToDbWithoutSetHead()",
|
||||
scoreKey, action="put()", `error`=($$error)
|
||||
|
|
|
@ -34,7 +34,7 @@ export
|
|||
isAristo,
|
||||
toAristo,
|
||||
toAristoProfData,
|
||||
toAristoOldestStateRoot,
|
||||
toAristoOldestState,
|
||||
|
||||
# see `legacy_db`
|
||||
isLegacy,
|
||||
|
|
|
@ -32,7 +32,6 @@ type
|
|||
snapSyncStorageSlot
|
||||
snapSyncStateRoot
|
||||
blockHashToBlockWitness
|
||||
stRootToBlockNum
|
||||
|
||||
DbKey* = object
|
||||
# The first byte stores the key type. The rest are key-specific values
|
||||
|
@ -136,11 +135,6 @@ proc blockHashToBlockWitnessKey*(h: Hash256): DbKey {.inline.} =
|
|||
result.data[1 .. 32] = h.data
|
||||
result.dataEndPos = uint8 32
|
||||
|
||||
proc stRootToBlockNumKey*(h: Hash256): DbKey =
|
||||
result.data[0] = byte ord(stRootToBlockNum)
|
||||
result.data[1 .. 32] = h.data
|
||||
result.dataEndPos = uint8 32
|
||||
|
||||
template toOpenArray*(k: DbKey): openArray[byte] =
|
||||
k.data.toOpenArray(0, int(k.dataEndPos))
|
||||
|
||||
|
|
|
@ -338,7 +338,7 @@ proc testBackendConsistency*(
|
|||
|
||||
# Provide filter, store filter on permanent BE, and register filter digest
|
||||
block:
|
||||
let rc = mdb.stow(persistent=false, chunkedMpt=true)
|
||||
let rc = mdb.persist(chunkedMpt=true)
|
||||
xCheckRc rc.error == 0
|
||||
let collectFilterOk = rdb.collectFilter(mdb.roFilter, filTab, noisy)
|
||||
xCheck collectFilterOk
|
||||
|
@ -346,10 +346,10 @@ proc testBackendConsistency*(
|
|||
# Store onto backend database
|
||||
block:
|
||||
#noisy.say "***", "db-dump\n ", mdb.pp
|
||||
let rc = mdb.stow(persistent=true, chunkedMpt=true)
|
||||
let rc = mdb.persist(chunkedMpt=true)
|
||||
xCheckRc rc.error == 0
|
||||
block:
|
||||
let rc = rdb.stow(persistent=true, chunkedMpt=true)
|
||||
let rc = rdb.persist(chunkedMpt=true)
|
||||
xCheckRc rc.error == 0
|
||||
|
||||
xCheck ndb.vGen == mdb.vGen
|
||||
|
|
|
@ -186,7 +186,7 @@ proc dbTriplet(w: LeafQuartet; rdbPath: string): Result[DbTriplet,AristoError] =
|
|||
db.finish(flush=true)
|
||||
check report.error == 0
|
||||
return err(report.error)
|
||||
let rc = db.stow(persistent=true)
|
||||
let rc = db.persist()
|
||||
if rc.isErr:
|
||||
check rc.error == 0
|
||||
return
|
||||
|
@ -406,11 +406,10 @@ proc checkFilterTrancoderOk(
|
|||
# -------------------------
|
||||
|
||||
proc qid2fidFn(be: BackendRef): QuFilMap =
|
||||
result = proc(qid: QueueID): FilterID =
|
||||
let rc = be.getFilFn qid
|
||||
if rc.isErr:
|
||||
return FilterID(0)
|
||||
rc.value.fid
|
||||
result = proc(qid: QueueID): Result[FilterID,void] =
|
||||
let fil = be.getFilFn(qid).valueOr:
|
||||
return err()
|
||||
ok fil.fid
|
||||
|
||||
proc storeFilter(
|
||||
be: BackendRef;
|
||||
|
@ -418,7 +417,7 @@ proc storeFilter(
|
|||
): bool =
|
||||
## ..
|
||||
let instr = block:
|
||||
let rc = be.fifosStore filter
|
||||
let rc = be.fifosStore(filter, none(FilterID))
|
||||
xCheckRc rc.error == 0
|
||||
rc.value
|
||||
|
||||
|
@ -590,13 +589,13 @@ proc testDistributedAccess*(
|
|||
|
||||
# Clause (9) from `aristo/README.md` example
|
||||
block:
|
||||
let rc = db1.stow(persistent=true)
|
||||
let rc = db1.persist()
|
||||
xCheckRc rc.error == 0
|
||||
xCheck db1.roFilter == FilterRef(nil)
|
||||
xCheck db2.roFilter == db3.roFilter
|
||||
|
||||
block:
|
||||
let rc = db2.stow(persistent=false)
|
||||
let rc = db2.stow() # non-persistent
|
||||
xCheckRc rc.error == 0:
|
||||
noisy.say "*** testDistributedAccess (3)", "n=", n, "db2".dump db2
|
||||
xCheck db1.roFilter == FilterRef(nil)
|
||||
|
@ -605,7 +604,7 @@ proc testDistributedAccess*(
|
|||
# Clause (11) from `aristo/README.md` example
|
||||
db2.reCentre()
|
||||
block:
|
||||
let rc = db2.stow(persistent=true)
|
||||
let rc = db2.persist()
|
||||
xCheckRc rc.error == 0
|
||||
xCheck db2.roFilter == FilterRef(nil)
|
||||
|
||||
|
@ -641,7 +640,7 @@ proc testDistributedAccess*(
|
|||
# Build clause (12) from `aristo/README.md` example
|
||||
db2.reCentre()
|
||||
block:
|
||||
let rc = db2.stow(persistent=true)
|
||||
let rc = db2.persist()
|
||||
xCheckRc rc.error == 0
|
||||
xCheck db2.roFilter == FilterRef(nil)
|
||||
xCheck db1.roFilter == db3.roFilter
|
||||
|
@ -649,7 +648,7 @@ proc testDistributedAccess*(
|
|||
# Clause (13) from `aristo/README.md` example
|
||||
xCheck not db1.isCentre()
|
||||
block:
|
||||
let rc = db1.stow(persistent=false)
|
||||
let rc = db1.stow() # non-persistent
|
||||
xCheckRc rc.error == 0
|
||||
|
||||
# Clause (14) from `aristo/README.md` check
|
||||
|
@ -808,7 +807,7 @@ proc testFilterBacklog*(
|
|||
let rc = db.mergeLeaf w
|
||||
xCheckRc rc.error == 0
|
||||
block:
|
||||
let rc = db.stow(persistent=true)
|
||||
let rc = db.persist()
|
||||
xCheckRc rc.error == 0
|
||||
block:
|
||||
let rc = db.checkJournal()
|
||||
|
@ -843,7 +842,7 @@ proc testFilterBacklog*(
|
|||
|
||||
# Realign to earlier state
|
||||
xb = block:
|
||||
let rc = db.forkBackLog(episode = episode)
|
||||
let rc = db.forkByJournal(episode = episode)
|
||||
xCheckRc rc.error == 0
|
||||
rc.value
|
||||
block:
|
||||
|
|
|
@ -224,10 +224,11 @@ proc validate(db: QTabRef; scd: QidSchedRef; serial: int; relax: bool): bool =
|
|||
else:
|
||||
xCheck db.len == scd.len
|
||||
|
||||
proc qFn(qid: QueueID): FilterID =
|
||||
proc qFn(qid: QueueID): Result[FilterID,void] =
|
||||
let val = db.getOrDefault(qid, QValRef(nil))
|
||||
if not val.isNil:
|
||||
return val.fid
|
||||
if val.isNil:
|
||||
return err()
|
||||
ok val.fid
|
||||
|
||||
# Test filter ID selection
|
||||
var lastFid = FilterID(serial + 1)
|
||||
|
|
|
@ -128,7 +128,10 @@ proc schedStow(
|
|||
filterMeter = if db.roFilter.isNil: 0
|
||||
else: db.roFilter.sTab.len + db.roFilter.kMap.len
|
||||
persistent = MaxFilterBulk < max(layersMeter, filterMeter)
|
||||
db.stow(persistent = persistent, chunkedMpt = chunkedMpt)
|
||||
if persistent:
|
||||
db.persist(chunkedMpt=chunkedMpt)
|
||||
else:
|
||||
db.stow(chunkedMpt=chunkedMpt)
|
||||
|
||||
proc saveToBackend(
|
||||
tx: var AristoTxRef;
|
||||
|
|
Loading…
Reference in New Issue