Migrate `keyed_queue` to `minilru` (#2608)

Compared to `keyed_queue`, `minilru` uses significantly less memory, in
particular for the 32-byte hash keys where `kq` stores several copies of
the key redundantly.
This commit is contained in:
Jacek Sieka 2024-09-13 15:47:50 +02:00 committed by GitHub
parent d17ddacf39
commit 5c1e2e7d3b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 92 additions and 95 deletions

3
.gitmodules vendored
View File

@ -237,3 +237,6 @@
url = https://github.com/status-im/portal-mainnet.git
ignore = untracked
branch = master
[submodule "vendor/nim-minilru"]
path = vendor/nim-minilru
url = https://github.com/status-im/nim-minilru.git

View File

@ -30,7 +30,8 @@ requires "nim >= 1.6.0",
"ethash",
"blscurve",
"evmc",
"web3"
"web3",
"minilru"
binDir = "build"

View File

@ -16,7 +16,7 @@ import
stew/[arrayops, endians2],
./aristo_desc
export aristo_desc
export aristo_desc, results
# Allocation-free version short big-endian encoding that skips the leading
# zeroes

View File

@ -12,7 +12,6 @@
import
std/sets,
eth/common,
./aristo_desc/desc_identifiers
const

View File

@ -219,7 +219,7 @@ proc deleteStorageData*(
?db.deleteImpl(stoHike)
db.layersPutStoLeaf(AccountKey.mixUp(accPath, stoPath), nil)
db.layersPutStoLeaf(mixUp(accPath, stoPath), nil)
# Make sure that an account leaf has no dangling sub-trie
if db.getVtx((stoID.vid, stoID.vid)).isValid:

View File

@ -62,7 +62,7 @@ proc delStoTreeNow(
of Leaf:
let stoPath = Hash256(data: (stoPath & vtx.lPfx).getBytes())
db.layersPutStoLeaf(AccountKey.mixUp(accPath, stoPath), nil)
db.layersPutStoLeaf(mixUp(accPath, stoPath), nil)
db.disposeOfVtx(rvid)

View File

@ -104,14 +104,10 @@ proc deltaPersistent*(
# Copy back updated payloads
for accPath, vtx in db.balancer.accLeaves:
let accKey = accPath.to(AccountKey)
if not db.accLeaves.lruUpdate(accKey, vtx):
discard db.accLeaves.lruAppend(accKey, vtx, ACC_LRU_SIZE)
db.accLeaves.put(accPath, vtx)
for mixPath, vtx in db.balancer.stoLeaves:
let mixKey = mixPath.to(AccountKey)
if not db.stoLeaves.lruUpdate(mixKey, vtx):
discard db.stoLeaves.lruAppend(mixKey, vtx, ACC_LRU_SIZE)
db.stoLeaves.put(mixPath, vtx)
# Done with balancer, all saved to backend
db.balancer = LayerRef(nil)

View File

@ -23,11 +23,12 @@
import
std/[hashes, sets, tables],
stew/keyed_queue,
eth/common,
results,
./aristo_constants,
./aristo_desc/[desc_error, desc_identifiers, desc_nibbles, desc_structural]
./aristo_desc/[desc_error, desc_identifiers, desc_nibbles, desc_structural],
minilru
from ./aristo_desc/desc_backend
import BackendRef
@ -35,7 +36,7 @@ from ./aristo_desc/desc_backend
# Not auto-exporting backend
export
tables, aristo_constants, desc_error, desc_identifiers, desc_nibbles,
desc_structural, keyed_queue
desc_structural, minilru, common
type
AristoTxRef* = ref object
@ -60,12 +61,6 @@ type
centre: AristoDbRef ## Link to peer with write permission
peers: HashSet[AristoDbRef] ## List of all peers
AccountKey* = distinct ref Hash256
# `ref` version of the account path / key
# `KeyedQueue` is inefficient for large keys, so we have to use this ref
# workaround to not experience a memory explosion in the account cache
# TODO rework KeyedQueue to deal with large keys and/or heterogenous lookup
AristoDbRef* = ref object
## Three tier database object supporting distributed instances.
top*: LayerRef ## Database working layer, mutable
@ -77,7 +72,7 @@ type
txUidGen*: uint ## Tx-relative unique number generator
dudes: DudesRef ## Related DB descriptors
accLeaves*: KeyedQueue[AccountKey, VertexRef]
accLeaves*: LruCache[Hash256, VertexRef]
## Account path to payload cache - accounts are frequently accessed by
## account path when contracts interact with them - this cache ensures
## that we don't have to re-traverse the storage trie for every such
@ -85,7 +80,7 @@ type
## TODO a better solution would probably be to cache this in a type
## exposed to the high-level API
stoLeaves*: KeyedQueue[AccountKey, VertexRef]
stoLeaves*: LruCache[Hash256, VertexRef]
## Mixed account/storage path to payload cache - same as above but caches
## the full lookup of storage slots
@ -96,18 +91,7 @@ type
# Public helpers
# ------------------------------------------------------------------------------
template hash*(a: AccountKey): Hash =
mixin hash
hash((ref Hash256)(a)[])
template `==`*(a, b: AccountKey): bool =
mixin `==`
(ref Hash256)(a)[] == (ref Hash256)(b)[]
template to*(a: Hash256, T: type AccountKey): T =
AccountKey((ref Hash256)(data: a.data))
template mixUp*(T: type AccountKey, accPath, stoPath: Hash256): Hash256 =
template mixUp*(accPath, stoPath: Hash256): Hash256 =
# Insecure but fast way of mixing the values of two hashes, for the purpose
# of quick lookups - this is certainly not a good idea for general Hash256
# values but account paths are generated from accounts which would be hard
@ -235,7 +219,10 @@ proc fork*(
let clone = AristoDbRef(
dudes: db.dudes,
backend: db.backend)
backend: db.backend,
accLeaves: db.accLeaves,
stoLeaves: db.stoLeaves,
)
if not noFilter:
clone.balancer = db.balancer # Ref is ok here (filters are immutable)

View File

@ -64,8 +64,7 @@ proc retrieveAccountPayload(
return err(FetchPathNotFound)
return ok leafVtx[].lData
let accKey = accPath.to(AccountKey)
if (let leafVtx = db.accLeaves.lruFetch(accKey); leafVtx.isSome()):
if (let leafVtx = db.accLeaves.get(accPath); leafVtx.isSome()):
if not leafVtx[].isValid():
return err(FetchPathNotFound)
return ok leafVtx[].lData
@ -78,7 +77,9 @@ proc retrieveAccountPayload(
return err(FetchPathNotFound)
return err(error)
ok db.accLeaves.lruAppend(accKey, leafVtx, ACC_LRU_SIZE).lData
db.accLeaves.put(accPath, leafVtx)
ok leafVtx.lData
proc retrieveMerkleHash(
db: AristoDbRef;
@ -182,14 +183,13 @@ proc retrieveStoragePayload(
accPath: Hash256;
stoPath: Hash256;
): Result[UInt256,AristoError] =
let mixPath = AccountKey.mixUp(accPath, stoPath)
let mixPath = mixUp(accPath, stoPath)
if (let leafVtx = db.layersGetStoLeaf(mixPath); leafVtx.isSome()):
if not leafVtx[].isValid():
return err(FetchPathNotFound)
return ok leafVtx[].lData.stoData
let mixKey = mixPath.to(AccountKey)
if (let leafVtx = db.stoLeaves.lruFetch(mixKey); leafVtx.isSome()):
if (let leafVtx = db.stoLeaves.get(mixPath); leafVtx.isSome()):
if not leafVtx[].isValid():
return err(FetchPathNotFound)
return ok leafVtx[].lData.stoData
@ -199,7 +199,9 @@ proc retrieveStoragePayload(
let leafVtx = db.retrieveLeaf(? db.fetchStorageIdImpl(accPath), stoPath.data).valueOr:
return err(error)
ok db.stoLeaves.lruAppend(mixKey, leafVtx, ACC_LRU_SIZE).lData.stoData
db.stoLeaves.put(mixPath, leafVtx)
ok leafVtx.lData.stoData
proc hasStoragePayload(
db: AristoDbRef;

View File

@ -52,8 +52,11 @@ proc newAristoRdbDbRef(
return err(rc.error)
rc.value
ok((AristoDbRef(
top: LayerRef(vTop: vTop),
backend: be), oCfs))
top: LayerRef(vTop: vTop),
backend: be,
accLeaves: LruCache[Hash256, VertexRef].init(ACC_LRU_SIZE),
stoLeaves: LruCache[Hash256, VertexRef].init(ACC_LRU_SIZE),
), oCfs))
# ------------------------------------------------------------------------------
# Public database constuctors, destructor

View File

@ -18,9 +18,12 @@ import
std/concurrency/atomics,
eth/common,
rocksdb,
stew/[endians2, keyed_queue],
stew/endians2,
../../aristo_desc,
../init_common
../init_common,
minilru
export minilru
type
RdbWriteEventCb* =
@ -53,9 +56,9 @@ type
# is less memory and time efficient (the latter one due to internal LRU
# handling of the longer key.)
#
rdKeyLru*: KeyedQueue[VertexID,HashKey] ## Read cache
rdKeyLru*: LruCache[VertexID,HashKey] ## Read cache
rdKeySize*: int
rdVtxLru*: KeyedQueue[VertexID,VertexRef] ## Read cache
rdVtxLru*: LruCache[VertexID,VertexRef] ## Read cache
rdVtxSize*: int
basePath*: string ## Database directory

View File

@ -17,7 +17,6 @@ import
eth/common,
rocksdb,
results,
stew/keyed_queue,
../../[aristo_blobify, aristo_desc],
../init_common,
./rdb_desc,
@ -103,7 +102,7 @@ proc getKey*(
rvid: RootedVertexID;
): Result[HashKey,(AristoError,string)] =
# Try LRU cache first
var rc = rdb.rdKeyLru.lruFetch(rvid.vid)
var rc = rdb.rdKeyLru.get(rvid.vid)
if rc.isOK:
rdbKeyLruStats[rvid.to(RdbStateType)].inc(true)
return ok(move(rc.value))
@ -129,21 +128,19 @@ proc getKey*(
return err((RdbHashKeyExpected,"")) # Parsing failed
# Update cache and return
if rdb.rdKeySize > 0:
ok rdb.rdKeyLru.lruAppend(rvid.vid, res.value(), rdb.rdKeySize)
else:
ok res.value()
rdb.rdKeyLru.put(rvid.vid, res.value())
ok res.value()
proc getVtx*(
rdb: var RdbInst;
rvid: RootedVertexID;
): Result[VertexRef,(AristoError,string)] =
# Try LRU cache first
if rdb.rdVtxSize > 0:
var rc = rdb.rdVtxLru.lruFetch(rvid.vid)
if rc.isOK:
rdbVtxLruStats[rvid.to(RdbStateType)][rc.value().vType].inc(true)
return ok(move(rc.value))
var rc = rdb.rdVtxLru.get(rvid.vid)
if rc.isOK:
rdbVtxLruStats[rvid.to(RdbStateType)][rc.value().vType].inc(true)
return ok(move(rc.value))
# Otherwise fetch from backend database
# A threadvar is used to avoid allocating an environment for onData
@ -168,10 +165,9 @@ proc getVtx*(
rdbVtxLruStats[rvid.to(RdbStateType)][res.value().vType].inc(false)
# Update cache and return
if rdb.rdVtxSize > 0:
ok rdb.rdVtxLru.lruAppend(rvid.vid, res.value(), rdb.rdVtxSize)
else:
ok res.value()
rdb.rdVtxLru.put(rvid.vid, res.value())
ok res.value()
# ------------------------------------------------------------------------------
# End

View File

@ -26,10 +26,8 @@ import
# ------------------------------------------------------------------------------
const
lruOverhead = 32
# Approximate LRU cache overhead per entry - although `keyed_queue` which is
# currently used has a much larger overhead, 32 is an easily reachable
# number which likely can be reduced in the future
lruOverhead = 20
# Approximate LRU cache overhead per entry based on minilru sizes
proc dumpCacheStats(keySize, vtxSize: int) =
block vtx:
@ -88,6 +86,9 @@ proc initImpl(
rdb.rdVtxSize =
opts.rdbVtxCacheSize div (sizeof(VertexID) + sizeof(default(VertexRef)[]) + lruOverhead)
rdb.rdKeyLru = typeof(rdb.rdKeyLru).init(rdb.rdKeySize)
rdb.rdVtxLru = typeof(rdb.rdVtxLru).init(rdb.rdVtxSize)
if opts.rdbPrintStats:
let
ks = rdb.rdKeySize

View File

@ -17,7 +17,6 @@ import
eth/common,
rocksdb,
results,
stew/keyed_queue,
../../[aristo_blobify, aristo_desc],
../init_common,
./rdb_desc
@ -50,8 +49,8 @@ proc begin*(rdb: var RdbInst) =
proc rollback*(rdb: var RdbInst) =
if not rdb.session.isClosed():
rdb.rdKeyLru.clear() # Flush caches
rdb.rdVtxLru.clear() # Flush caches
rdb.rdKeyLru = typeof(rdb.rdKeyLru).init(rdb.rdKeySize)
rdb.rdVtxLru = typeof(rdb.rdVtxLru).init(rdb.rdVtxSize)
rdb.disposeSession()
proc commit*(rdb: var RdbInst): Result[void,(AristoError,string)] =
@ -98,11 +97,10 @@ proc putKey*(
trace logTxt "putKey()", vid, error=errSym, info=error
return err((rvid.vid,errSym,error))
if rdb.rdKeySize > 0:
# Update existing cached items but don't add new ones since doing so is
# likely to evict more useful items (when putting many items, we might even
# evict those that were just added)
discard rdb.rdKeyLru.lruUpdate(rvid.vid, key)
# Update existing cached items but don't add new ones since doing so is
# likely to evict more useful items (when putting many items, we might even
# evict those that were just added)
discard rdb.rdKeyLru.update(rvid.vid, key)
else:
dsc.delete(rvid.blobify().data(), rdb.keyCol.handle()).isOkOr:
@ -131,11 +129,10 @@ proc putVtx*(
trace logTxt "putVtx()", vid, error=errSym, info=error
return err((rvid.vid,errSym,error))
if rdb.rdVtxSize > 0:
# Update existing cached items but don't add new ones since doing so is
# likely to evict more useful items (when putting many items, we might even
# evict those that were just added)
discard rdb.rdVtxLru.lruUpdate(rvid.vid, vtx)
# Update existing cached items but don't add new ones since doing so is
# likely to evict more useful items (when putting many items, we might even
# evict those that were just added)
discard rdb.rdVtxLru.update(rvid.vid, vtx)
else:
dsc.delete(rvid.blobify().data(), rdb.vtxCol.handle()).isOkOr:

View File

@ -140,7 +140,7 @@ proc mergeStorageData*(
# Mark account path Merkle keys for update
resetKeys()
db.layersPutStoLeaf(AccountKey.mixUp(accPath, stoPath), rc.value)
db.layersPutStoLeaf(mixUp(accPath, stoPath), rc.value)
if not stoID.isValid:
# Make sure that there is an account that refers to that storage trie

View File

@ -17,6 +17,7 @@
import
std/[sequtils, tables, typetraits],
stew/keyed_queue,
eth/common,
results,
../../aristo as use_aristo,

View File

@ -15,7 +15,7 @@ import
chronicles,
eth/common,
results,
stew/keyed_queue,
minilru,
../../../utils/mergeutils,
../../../evm/code_bytes,
../../../stateless/multi_keys,
@ -70,7 +70,7 @@ type
cache: Table[EthAddress, AccountRef]
# Second-level cache for the ledger save point, which is cleared on every
# persist
code: KeyedQueue[Hash256, CodeBytesRef]
code: LruCache[Hash256, CodeBytesRef]
## The code cache provides two main benefits:
##
## * duplicate code is shared in memory beween accounts
@ -81,7 +81,7 @@ type
## when underpriced code opcodes are being run en masse - both advantages
## help performance broadly as well.
slots: KeyedQueue[UInt256, Hash256]
slots: LruCache[UInt256, Hash256]
## Because the same slots often reappear, we want to avoid writing them
## over and over again to the database to avoid the WAL and compation
## write amplification that ensues
@ -157,6 +157,8 @@ proc init*(x: typedesc[AccountsLedgerRef], db: CoreDbRef,
result.kvt = db.ctx.getKvt()
result.witnessCache = Table[EthAddress, WitnessData]()
result.storeSlotHash = storeSlotHash
result.code = typeof(result.code).init(codeLruSize)
result.slots = typeof(result.slots).init(slotsLruSize)
discard result.beginSavepoint
proc init*(x: typedesc[AccountsLedgerRef], db: CoreDbRef): AccountsLedgerRef =
@ -249,11 +251,13 @@ proc getAccount(
return
# not found in cache, look into state trie
let rc = ac.ledger.fetch address.toAccountKey
let
accPath = address.toAccountKey
rc = ac.ledger.fetch accPath
if rc.isOk:
result = AccountRef(
statement: rc.value,
accPath: address.keccakHash,
accPath: accPath,
flags: {Alive})
elif shouldCreate:
result = AccountRef(
@ -261,7 +265,7 @@ proc getAccount(
nonce: emptyEthAccount.nonce,
balance: emptyEthAccount.balance,
codeHash: emptyEthAccount.codeHash),
accPath: address.keccakHash,
accPath: accPath,
flags: {Alive, IsNew})
else:
return # ignore, don't cache
@ -305,7 +309,7 @@ proc originalStorageValue(
# Not in the original values cache - go to the DB.
let
slotKey = ac.slots.lruFetch(slot).valueOr:
slotKey = ac.slots.get(slot).valueOr:
slot.toBytesBE.keccakHash
rc = ac.ledger.slotFetch(acc.toAccountKey, slotKey)
if rc.isOk:
@ -381,9 +385,11 @@ proc persistStorage(acc: AccountRef, ac: AccountsLedgerRef) =
continue # Avoid writing A-B-A updates
var cached = true
let slotKey = ac.slots.lruFetch(slot).valueOr:
let slotKey = ac.slots.get(slot).valueOr:
cached = false
ac.slots.lruAppend(slot, slot.toBytesBE.keccakHash, slotsLruSize)
let hash = slot.toBytesBE.keccakHash
ac.slots.put(slot, hash)
hash
if value > 0:
ac.ledger.slotMerge(acc.toAccountKey, slotKey, value).isOkOr:
@ -449,14 +455,15 @@ proc getCode*(ac: AccountsLedgerRef, address: EthAddress): CodeBytesRef =
if acc.code == nil:
acc.code =
if acc.statement.codeHash != EMPTY_CODE_HASH:
ac.code.lruFetch(acc.statement.codeHash).valueOr:
ac.code.get(acc.statement.codeHash).valueOr:
var rc = ac.kvt.get(contractHashKey(acc.statement.codeHash).toOpenArray)
if rc.isErr:
warn logTxt "getCode()", codeHash=acc.statement.codeHash, error=($$rc.error)
CodeBytesRef()
else:
let newCode = CodeBytesRef.init(move(rc.value), persisted = true)
ac.code.lruAppend(acc.statement.codeHash, newCode, codeLruSize)
ac.code.put(acc.statement.codeHash, newCode)
newCode
else:
CodeBytesRef()
@ -470,7 +477,7 @@ proc getCodeSize*(ac: AccountsLedgerRef, address: EthAddress): int =
if acc.code == nil:
if acc.statement.codeHash == EMPTY_CODE_HASH:
return 0
acc.code = ac.code.lruFetch(acc.statement.codeHash).valueOr:
acc.code = ac.code.get(acc.statement.codeHash).valueOr:
# On a cache miss, we don't fetch the code - instead, we fetch just the
# length - should the code itself be needed, it will typically remain
# cached and easily accessible in the database layer - this is to prevent
@ -567,7 +574,7 @@ proc setCode*(ac: AccountsLedgerRef, address: EthAddress, code: seq[byte]) =
acc.statement.codeHash = codeHash
# Try to reuse cache entry if it exists, but don't save the code - it's not
# a given that it will be executed within LRU range
acc.code = ac.code.lruFetch(codeHash).valueOr(CodeBytesRef.init(code))
acc.code = ac.code.get(codeHash).valueOr(CodeBytesRef.init(code))
acc.flags.incl CodeChanged
proc setStorage*(ac: AccountsLedgerRef, address: EthAddress, slot, value: UInt256) =
@ -878,7 +885,7 @@ proc getStorageProof*(ac: AccountsLedgerRef, address: EthAddress, slots: openArr
continue
let
slotKey = ac.slots.lruFetch(slot).valueOr:
slotKey = ac.slots.get(slot).valueOr:
slot.toBytesBE.keccakHash
slotProof = ac.ledger.slotProof(addressHash, slotKey).valueOr:
if error.aErr == FetchPathNotFound:

1
vendor/nim-minilru vendored Submodule

@ -0,0 +1 @@
Subproject commit 2682cffa8733f3b61751c65a963941315e887bac