diff --git a/nimbus/db/aristo/aristo_api.nim b/nimbus/db/aristo/aristo_api.nim index 870089530..88385d837 100644 --- a/nimbus/db/aristo/aristo_api.nim +++ b/nimbus/db/aristo/aristo_api.nim @@ -405,6 +405,14 @@ type AristoApiProfBeGetVtxFn = "be/getVtx" AristoApiProfBeGetKeyFn = "be/getKey" + AristoApiProfBeGetFilFn = "be/getFil" + AristoApiProfBeGetIdgFn = "be/getIfg" + AristoApiProfBeGetFqsFn = "be/getFqs" + AristoApiProfBePutVtxFn = "be/putVtx" + AristoApiProfBePutKeyFn = "be/putKey" + AristoApiProfBePutFilFn = "be/putFil" + AristoApiProfBePutIdgFn = "be/putIdg" + AristoApiProfBePutFqsFn = "be/putFqs" AristoApiProfBePutEndFn = "be/putEnd" AristoApiProfRef* = ref object of AristoApiRef @@ -709,6 +717,54 @@ func init*( result = be.getKeyFn(a) data.list[AristoApiProfBeGetKeyFn.ord].masked = true + beDup.getFilFn = + proc(a: QueueID): auto = + AristoApiProfBeGetFilFn.profileRunner: + result = be.getFilFn(a) + data.list[AristoApiProfBeGetFilFn.ord].masked = true + + beDup.getIdgFn = + proc(): auto = + AristoApiProfBeGetIdgFn.profileRunner: + result = be.getIdgFn() + data.list[AristoApiProfBeGetIdgFn.ord].masked = true + + beDup.getFqsFn = + proc(): auto = + AristoApiProfBeGetFqsFn.profileRunner: + result = be.getFqsFn() + data.list[AristoApiProfBeGetFqsFn.ord].masked = true + + beDup.putVtxFn = + proc(a: PutHdlRef; b: openArray[(VertexID,VertexRef)]) = + AristoApiProfBePutVtxFn.profileRunner: + be.putVtxFn(a,b) + data.list[AristoApiProfBePutVtxFn.ord].masked = true + + beDup.putKeyFn = + proc(a: PutHdlRef; b: openArray[(VertexID,HashKey)]) = + AristoApiProfBePutKeyFn.profileRunner: + be.putKeyFn(a,b) + data.list[AristoApiProfBePutKeyFn.ord].masked = true + + beDup.putFilFn = + proc(a: PutHdlRef; b: openArray[(QueueID,FilterRef)]) = + AristoApiProfBePutFilFn.profileRunner: + be.putFilFn(a,b) + data.list[AristoApiProfBePutFilFn.ord].masked = true + + beDup.putIdgFn = + proc(a: PutHdlRef; b: openArray[VertexID]) = + AristoApiProfBePutIdgFn.profileRunner: + be.putIdgFn(a,b) + data.list[AristoApiProfBePutIdgFn.ord].masked = true + + beDup.putFqsFn = + proc(a: PutHdlRef; b: openArray[(QueueID,QueueID)]) = + AristoApiProfBePutFqsFn.profileRunner: + be.putFqsFn(a,b) + data.list[AristoApiProfBePutFqsFn.ord].masked = true + beDup.putEndFn = proc(a: PutHdlRef): auto = AristoApiProfBePutEndFn.profileRunner: diff --git a/nimbus/db/aristo/aristo_filter/filter_scheduler.nim b/nimbus/db/aristo/aristo_filter/filter_scheduler.nim index e7b5d0ed9..6f6fcab23 100644 --- a/nimbus/db/aristo/aristo_filter/filter_scheduler.nim +++ b/nimbus/db/aristo/aristo_filter/filter_scheduler.nim @@ -1,5 +1,5 @@ # nimbus-eth1 -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed under either of # * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or # http://www.apache.org/licenses/LICENSE-2.0) @@ -9,7 +9,7 @@ # except according to those terms. import - std/[algorithm, sequtils], + std/[algorithm, sequtils, typetraits], ".."/[aristo_constants, aristo_desc] type @@ -238,11 +238,7 @@ func fifoDel( # Delete all available return (@[(QueueID(1), fifo[1]), (fifo[0], wrap)], ZeroQidPair) -# ------------------------------------------------------------------------------ -# Public functions -# ------------------------------------------------------------------------------ - -func stats*( +func capacity( ctx: openArray[tuple[size, width: int]]; # Schedule layout ): tuple[maxQueue: int, minCovered: int, maxCovered: int] = ## Number of maximally stored and covered queued entries for the argument @@ -258,17 +254,24 @@ func stats*( result.minCovered += (ctx[n].size * step).int result.maxCovered += (size * step).int -func stats*( +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +func capacity*( ctx: openArray[tuple[size, width, wrap: int]]; # Schedule layout ): tuple[maxQueue: int, minCovered: int, maxCovered: int] = - ## Variant of `stats()` - ctx.toSeq.mapIt((it[0],it[1])).stats + ## Variant of `capacity()` below. + ctx.toSeq.mapIt((it[0],it[1])).capacity -func stats*( - ctx: QidLayoutRef; # Cascaded fifos descriptor +func capacity*( + journal: QidSchedRef; # Cascaded fifos descriptor ): tuple[maxQueue: int, minCovered: int, maxCovered: int] = - ## Variant of `stats()` - ctx.q.toSeq.mapIt((it[0].int,it[1].int)).stats + ## Number of maximally stored and covered queued entries for the layout of + ## argument `journal`. The resulting value of `maxQueue` entry is the maximal + ## number of database slots needed, the `minCovered` and `maxCovered` entry + ## indicate the rancge of the backlog foa a fully populated database. + journal.ctx.q.toSeq.mapIt((it[0].int,it[1].int)).capacity() func addItem*( @@ -549,6 +552,13 @@ func `[]`*( return n.globalQid(wrap - inx) inx -= qInxMax0 + 1 # Otherwise continue +func `[]`*( + fifo: QidSchedRef; # Cascaded fifos descriptor + bix: BackwardsIndex; # Index into latest items + ): QueueID = + ## Variant of `[]` for provifing `[^bix]`. + fifo[fifo.state.len - bix.distinctBase] + func `[]`*( fifo: QidSchedRef; # Cascaded fifos descriptor diff --git a/nimbus/db/aristo/aristo_init/rocks_db.nim b/nimbus/db/aristo/aristo_init/rocks_db.nim index 32c99514d..afaad8f5a 100644 --- a/nimbus/db/aristo/aristo_init/rocks_db.nim +++ b/nimbus/db/aristo/aristo_init/rocks_db.nim @@ -81,7 +81,7 @@ proc getVtxFn(db: RdbBackendRef): GetVtxFn = proc(vid: VertexID): Result[VertexRef,AristoError] = # Fetch serialised data record - let data = db.rdb.get(VtxPfx, vid.uint64).valueOr: + let data = db.rdb.getVtx(vid.uint64).valueOr: when extraTraceMessages: trace logTxt "getVtxFn() failed", vid, error=error[0], info=error[1] return err(error[0]) @@ -97,7 +97,7 @@ proc getKeyFn(db: RdbBackendRef): GetKeyFn = proc(vid: VertexID): Result[HashKey,AristoError] = # Fetch serialised data record - let data = db.rdb.get(KeyPfx, vid.uint64).valueOr: + let data = db.rdb.getKey(vid.uint64).valueOr: when extraTraceMessages: trace logTxt "getKeyFn: failed", vid, error=error[0], info=error[1] return err(error[0]) @@ -119,8 +119,8 @@ proc getFilFn(db: RdbBackendRef): GetFilFn = result = proc(qid: QueueID): Result[FilterRef,AristoError] = - # Fetch serialised data record - let data = db.rdb.get(FilPfx, qid.uint64).valueOr: + # Fetch serialised data record. + let data = db.rdb.getByPfx(FilPfx, qid.uint64).valueOr: when extraTraceMessages: trace logTxt "getFilFn: failed", qid, error=error[0], info=error[1] return err(error[0]) @@ -135,8 +135,8 @@ proc getIdgFn(db: RdbBackendRef): GetIdgFn = result = proc(): Result[seq[VertexID],AristoError]= - # Fetch serialised data record - let data = db.rdb.get(AdmPfx, AdmTabIdIdg.uint64).valueOr: + # Fetch serialised data record. + let data = db.rdb.getByPfx(AdmPfx, AdmTabIdIdg.uint64).valueOr: when extraTraceMessages: trace logTxt "getIdgFn: failed", error=error[0], info=error[1] return err(error[0]) @@ -158,8 +158,8 @@ proc getFqsFn(db: RdbBackendRef): GetFqsFn = result = proc(): Result[seq[(QueueID,QueueID)],AristoError]= - # Fetch serialised data record - let data = db.rdb.get(AdmPfx, AdmTabIdFqs.uint64).valueOr: + # Fetch serialised data record. + let data = db.rdb.getByPfx(AdmPfx, AdmTabIdFqs.uint64).valueOr: when extraTraceMessages: trace logTxt "getFqsFn: failed", error=error[0], info=error[1] return err(error[0]) @@ -179,7 +179,6 @@ proc putBegFn(db: RdbBackendRef): PutBegFn = db.rdb.begin() db.newSession() - proc putVtxFn(db: RdbBackendRef): PutVtxFn = result = proc(hdl: PutHdlRef; vrps: openArray[(VertexID,VertexRef)]) = @@ -201,8 +200,8 @@ proc putVtxFn(db: RdbBackendRef): PutVtxFn = else: batch.add (vid.uint64, EmptyBlob) - # Stash batch session data - db.rdb.put(VtxPfx, batch).isOkOr: + # Stash batch session data via LRU cache + db.rdb.putVtx(batch).isOkOr: hdl.error = TypedPutHdlErrRef( pfx: VtxPfx, vid: VertexID(error[0]), @@ -223,8 +222,8 @@ proc putKeyFn(db: RdbBackendRef): PutKeyFn = else: batch.add (vid.uint64, EmptyBlob) - # Stash batch session data - db.rdb.put(KeyPfx, batch).isOkOr: + # Stash batch session data via LRU cache + db.rdb.putKey(batch).isOkOr: hdl.error = TypedPutHdlErrRef( pfx: KeyPfx, vid: VertexID(error[0]), @@ -263,7 +262,7 @@ proc putFilFn(db: RdbBackendRef): PutFilFn = batch.add (qid.uint64, EmptyBlob) # Stash batch session data - db.rdb.put(FilPfx, batch).isOkOr: + db.rdb.putByPfx(FilPfx, batch).isOkOr: hdl.error = TypedPutHdlErrRef( pfx: FilPfx, qid: QueueID(error[0]), @@ -276,7 +275,7 @@ proc putIdgFn(db: RdbBackendRef): PutIdgFn = let hdl = hdl.getSession db if hdl.error.isNil: let idg = if 0 < vs.len: vs.blobify else: EmptyBlob - db.rdb.put(AdmPfx, @[(AdmTabIdIdg.uint64, idg)]).isOkOr: + db.rdb.putByPfx(AdmPfx, @[(AdmTabIdIdg.uint64, idg)]).isOkOr: hdl.error = TypedPutHdlErrRef( pfx: AdmPfx, aid: AdmTabIdIdg, @@ -300,7 +299,7 @@ proc putFqsFn(db: RdbBackendRef): PutFqsFn = # Stash batch session data let fqs = if 0 < vs.len: vs.blobify else: EmptyBlob - db.rdb.put(AdmPfx, @[(AdmTabIdFqs.uint64, fqs)]).isOkOr: + db.rdb.putByPfx(AdmPfx, @[(AdmTabIdFqs.uint64, fqs)]).isOkOr: hdl.error = TypedPutHdlErrRef( pfx: AdmPfx, aid: AdmTabIdFqs, diff --git a/nimbus/db/aristo/aristo_init/rocks_db/rdb_desc.nim b/nimbus/db/aristo/aristo_init/rocks_db/rdb_desc.nim index 3d219cea1..c3ff5aead 100644 --- a/nimbus/db/aristo/aristo_init/rocks_db/rdb_desc.nim +++ b/nimbus/db/aristo/aristo_init/rocks_db/rdb_desc.nim @@ -15,29 +15,34 @@ import std/os, + eth/common, rocksdb, - stew/endians2, + stew/[endians2, keyed_queue], ../../aristo_desc, ../init_common type RdbInst* = object - store*: ColFamilyReadWrite ## Rocks DB database handler - session*: WriteBatchRef ## For batched `put()` - basePath*: string ## Database directory - noFq*: bool ## No filter queues available + store*: ColFamilyReadWrite ## Rocks DB database handler + session*: WriteBatchRef ## For batched `put()` + rdKeyLru*: KeyedQueue[RdbKey,Blob] ## Read cache + rdVtxLru*: KeyedQueue[RdbKey,Blob] ## Read cache + basePath*: string ## Database directory + noFq*: bool ## No filter queues available RdbGuestDbRef* = ref object of GuestDbRef - guestDb*: ColFamilyReadWrite ## Pigiback feature reference + guestDb*: ColFamilyReadWrite ## Pigiback feature reference RdbKey* = array[1 + sizeof VertexID, byte] ## Sub-table key, + VertexID const - GuestFamily* = "Guest" ## Guest family (e.g. for Kvt) - AristoFamily* = "Aristo" ## RocksDB column family - BaseFolder* = "nimbus" ## Same as for Legacy DB - DataFolder* = "aristo" ## Legacy DB has "data" + GuestFamily* = "Guest" ## Guest family (e.g. for Kvt) + AristoFamily* = "Aristo" ## RocksDB column family + BaseFolder* = "nimbus" ## Same as for Legacy DB + DataFolder* = "aristo" ## Legacy DB has "data" + RdKeyLruMaxSize* = 4096 ## Max size of read cache for keys + RdVtxLruMaxSize* = 2048 ## Max size of read cache for vertex IDs # ------------------------------------------------------------------------------ # Public functions diff --git a/nimbus/db/aristo/aristo_init/rocks_db/rdb_get.nim b/nimbus/db/aristo/aristo_init/rocks_db/rdb_get.nim index 6182d8f9f..d60595491 100644 --- a/nimbus/db/aristo/aristo_init/rocks_db/rdb_get.nim +++ b/nimbus/db/aristo/aristo_init/rocks_db/rdb_get.nim @@ -17,6 +17,7 @@ import eth/common, rocksdb, results, + stew/keyed_queue, ../../aristo_desc, ../init_common, ./rdb_desc @@ -32,29 +33,61 @@ when extraTraceMessages: logScope: topics = "aristo-rocksdb" -# ------------------------------------------------------------------------------ -# Public functions -# ------------------------------------------------------------------------------ - -proc get*( - rdb: RdbInst; - pfx: StorageType; - xid: uint64, - ): Result[Blob,(AristoError,string)] = +proc getImpl(rdb: RdbInst; key: RdbKey): Result[Blob,(AristoError,string)] = var res: Blob let onData = proc(data: openArray[byte]) = - res = @data + res = @data - let gotData = rdb.store.get(xid.toRdbKey pfx, onData).valueOr: - const errSym = RdbBeDriverGetError - when extraTraceMessages: - trace logTxt "get", error=errSym, info=error - return err((errSym,error)) + let gotData = rdb.store.get(key, onData).valueOr: + const errSym = RdbBeDriverGetError + when extraTraceMessages: + trace logTxt "get", pfx=key[0], error=errSym, info=error + return err((errSym,error)) + # Correct result if needed if not gotData: res = EmptyBlob ok res +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc getByPfx*( + rdb: RdbInst; + pfx: StorageType; + xid: uint64, + ): Result[Blob,(AristoError,string)] = + rdb.getImpl(xid.toRdbKey pfx) + +proc getKey*(rdb: var RdbInst; xid: uint64): Result[Blob,(AristoError,string)] = + # Try LRU cache first + let + key = xid.toRdbKey KeyPfx + rc = rdb.rdKeyLru.lruFetch(key) + if rc.isOK: + return ok(rc.value) + + # Otherwise fetch from backend database + let res = ? rdb.getImpl(key) + + # Update cache and return + ok rdb.rdKeyLru.lruAppend(key, res, RdKeyLruMaxSize) + +proc getVtx*(rdb: var RdbInst; xid: uint64): Result[Blob,(AristoError,string)] = + # Try LRU cache first + let + key = xid.toRdbKey VtxPfx + rc = rdb.rdVtxLru.lruFetch(key) + if rc.isOK: + return ok(rc.value) + + # Otherwise fetch from backend database + let res = ? rdb.getImpl(key) + + # Update cache and return + ok rdb.rdVtxLru.lruAppend(key, res, RdVtxLruMaxSize) + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_init/rocks_db/rdb_put.nim b/nimbus/db/aristo/aristo_init/rocks_db/rdb_put.nim index d5f7467a2..d142938ea 100644 --- a/nimbus/db/aristo/aristo_init/rocks_db/rdb_put.nim +++ b/nimbus/db/aristo/aristo_init/rocks_db/rdb_put.nim @@ -17,6 +17,7 @@ import eth/common, rocksdb, results, + stew/[endians2, keyed_queue], ../../aristo_desc, ../init_common, ./rdb_desc @@ -39,6 +40,30 @@ proc disposeSession(rdb: var RdbInst) = rdb.session.close() rdb.session = WriteBatchRef(nil) +proc putImpl( + dsc: WriteBatchRef; + name: string; + key: RdbKey; + val: Blob; + ): Result[void,(uint64,AristoError,string)] = + if val.len == 0: + dsc.delete(key, name).isOkOr: + const errSym = RdbBeDriverDelError + let xid = uint64.fromBytesBE key[1 .. 8] + when extraTraceMessages: + trace logTxt "del", + pfx=StorageType(key[0]), xid, error=errSym, info=error + return err((xid,errSym,error)) + else: + dsc.put(key, val, name).isOkOr: + const errSym = RdbBeDriverPutError + let xid = uint64.fromBytesBE key[1 .. 8] + when extraTraceMessages: + trace logTxt "put", + pfx=StorageType(key[0]), xid, error=errSym, info=error + return err((xid,errSym,error)) + ok() + # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ @@ -49,6 +74,8 @@ proc begin*(rdb: var RdbInst) = proc rollback*(rdb: var RdbInst) = if not rdb.session.isClosed(): + rdb.rdKeyLru.clear() # Flush caches + rdb.rdVtxLru.clear() # Flush caches rdb.disposeSession() proc commit*(rdb: var RdbInst): Result[void,(AristoError,string)] = @@ -61,26 +88,55 @@ proc commit*(rdb: var RdbInst): Result[void,(AristoError,string)] = return err((errSym,error)) ok() -proc put*( - rdb: RdbInst; +proc putByPfx*( + rdb: var RdbInst; pfx: StorageType; data: openArray[(uint64,Blob)]; ): Result[void,(uint64,AristoError,string)] = - let dsc = rdb.session + let + dsc = rdb.session + name = rdb.store.name for (xid,val) in data: - let key = xid.toRdbKey pfx - if val.len == 0: - dsc.delete(key, rdb.store.name).isOkOr: - const errSym = RdbBeDriverDelError - when extraTraceMessages: - trace logTxt "del", pfx, xid, error=errSym, info=error - return err((xid,errSym,error)) - else: - dsc.put(key, val, rdb.store.name).isOkOr: - const errSym = RdbBeDriverPutError - when extraTraceMessages: - trace logTxt "put", pfx, xid, error=errSym, info=error - return err((xid,errSym,error)) + dsc.putImpl(name, xid.toRdbKey pfx, val).isOkOr: + return err(error) + ok() + +proc putKey*( + rdb: var RdbInst; + data: openArray[(uint64,Blob)]; + ): Result[void,(uint64,AristoError,string)] = + let + dsc = rdb.session + name = rdb.store.name + for (xid,val) in data: + let key = xid.toRdbKey KeyPfx + + # Update cache + if not rdb.rdKeyLru.lruUpdate(key, val): + discard rdb.rdKeyLru.lruAppend(key, val, RdKeyLruMaxSize) + + # Store on write batch queue + dsc.putImpl(name, key, val).isOkOr: + return err(error) + ok() + +proc putVtx*( + rdb: var RdbInst; + data: openArray[(uint64,Blob)]; + ): Result[void,(uint64,AristoError,string)] = + let + dsc = rdb.session + name = rdb.store.name + for (xid,val) in data: + let key = xid.toRdbKey VtxPfx + + # Update cache + if not rdb.rdVtxLru.lruUpdate(key, val): + discard rdb.rdVtxLru.lruAppend(key, val, RdVtxLruMaxSize) + + # Store on write batch queue + dsc.putImpl(name, key, val).isOkOr: + return err(error) ok() # ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_profile.nim b/nimbus/db/aristo/aristo_profile.nim index 7645aee7b..10ec9acd9 100644 --- a/nimbus/db/aristo/aristo_profile.nim +++ b/nimbus/db/aristo/aristo_profile.nim @@ -53,10 +53,8 @@ func toFloat(ela: Duration): float = proc updateTotal(t: AristoDbProfListRef; fnInx: uint) = ## Summary update helper if fnInx == 0: - t.list[0].sum = 0.0 - t.list[0].sqSum = 0.0 - t.list[0].count = 0 - elif t.list[0].masked == false: + t.list[0].reset + elif not t.list[fnInx].masked: t.list[0].sum += t.list[fnInx].sum t.list[0].sqSum += t.list[fnInx].sqSum t.list[0].count += t.list[fnInx].count diff --git a/nimbus/db/kvt/kvt_api.nim b/nimbus/db/kvt/kvt_api.nim index 78034d9f9..fb86ab4a7 100644 --- a/nimbus/db/kvt/kvt_api.nim +++ b/nimbus/db/kvt/kvt_api.nim @@ -116,6 +116,7 @@ type KvtApiProfTxTopFn = "txTop" KvtApiProfBeGetKvpFn = "be/getKvp" + KvtApiProfBePutKvpFn = "be/putKvp" KvtApiProfBePutEndFn = "be/putEnd" KvtApiProfRef* = ref object of KvtApiRef @@ -355,6 +356,11 @@ func init*( result = be.getKvpFn(a) data.list[KvtApiProfBeGetKvpFn.ord].masked = true + beDup.putKvpFn = + proc(a: PutHdlRef; b: openArray[(Blob,Blob)]) = + be.putKvpFn(a,b) + data.list[KvtApiProfBePutKvpFn.ord].masked = true + beDup.putEndFn = proc(a: PutHdlRef): auto = KvtApiProfBePutEndFn.profileRunner: diff --git a/tests/test_aristo/test_helpers.nim b/tests/test_aristo/test_helpers.nim index fb400ee00..dccb45b6c 100644 --- a/tests/test_aristo/test_helpers.nim +++ b/tests/test_aristo/test_helpers.nim @@ -32,7 +32,7 @@ type const QidSlotLyo* = [(4,0,10),(3,3,10),(3,4,10),(3,5,10)] - QidSample* = (3 * QidSlotLyo.stats.minCovered) div 2 + QidSample* = (3 * QidSlotLyo.capacity.minCovered) div 2 # ------------------------------------------------------------------------------ # Private helpers diff --git a/tests/test_aristo/test_misc.nim b/tests/test_aristo/test_misc.nim index f7bab98af..09eb1b2fe 100644 --- a/tests/test_aristo/test_misc.nim +++ b/tests/test_aristo/test_misc.nim @@ -389,7 +389,7 @@ proc testQidScheduler*( if debug: noisy.say "***", "sampleSize=", sampleSize, - " ctx=", ctx, " stats=", scd.ctx.stats + " ctx=", ctx, " stats=", scd.capacity() for n in 1 .. sampleSize: let w = scd.addItem() diff --git a/vendor/nim-stew b/vendor/nim-stew index 1662762c0..104132fd0 160000 --- a/vendor/nim-stew +++ b/vendor/nim-stew @@ -1 +1 @@ -Subproject commit 1662762c0144854db60632e4115fe596ffa67fca +Subproject commit 104132fd0217e846b04dd26a5fbe3e43a4929a05