Fix memory usage spikes during sync, give memory to rocksdb (#2413)

* creating a seq from a table that holds lots of changes means copying
all data into the table - this can be several GB of data while syncing
blocks
* nim fails to optimize the moving of the `WidthFirstForest` - the real
solution is to not construct a `wff` to begin with, but this PR provides
relief while that is being worked on

This spike fix allows us to bump the rocksdb cache by another 2 GB and
still have a significantly lower peak memory usage during sync.
This commit is contained in:
Jacek Sieka 2024-06-25 13:39:53 +02:00 committed by GitHub
parent b5e059a234
commit 3e001e322c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 81 additions and 82 deletions

View File

@ -75,8 +75,10 @@ proc deltaPersistent*(
# Store structural single trie entries
let writeBatch = ? be.putBegFn()
be.putVtxFn(writeBatch, db.balancer.sTab.pairs.toSeq)
be.putKeyFn(writeBatch, db.balancer.kMap.pairs.toSeq)
for vid, vtx in db.balancer.sTab:
be.putVtxFn(writeBatch, vid, vtx)
for vid, key in db.balancer.kMap:
be.putKeyFn(writeBatch, vid,key)
be.putTuvFn(writeBatch, db.balancer.vTop)
be.putLstFn(writeBatch, lSst)
? be.putEndFn writeBatch # Finalise write batch

View File

@ -52,13 +52,13 @@ type
## Generic transaction initialisation function
PutVtxFn* =
proc(hdl: PutHdlRef; vrps: openArray[(VertexID,VertexRef)])
proc(hdl: PutHdlRef; vid: VertexID; vtx: VertexRef)
{.gcsafe, raises: [].}
## Generic backend database bulk storage function, `VertexRef(nil)`
## values indicate that records should be deleted.
PutKeyFn* =
proc(hdl: PutHdlRef; vkps: openArray[(VertexID,HashKey)])
proc(hdl: PutHdlRef; vid: VertexID, key: HashKey)
{.gcsafe, raises: [].}
## Generic backend database bulk storage function, `VOID_HASH_KEY`
## values indicate that records should be deleted.

View File

@ -45,7 +45,7 @@ type
root: HashSet[VertexID] ## Top level, root targets
pool: Table[VertexID,VertexID] ## Upper links pool
base: Table[VertexID,VertexID] ## Width-first leaf level links
leaf: HashSet[VertexID] ## Stans-alone leaf to process
leaf: seq[VertexID] ## Stand-alone leaf to process
rev: Table[VertexID,HashSet[VertexID]] ## Reverse look up table
logScope:
@ -82,15 +82,15 @@ func hasValue(
proc pedigree(
db: AristoDbRef; # Database, top layer
wff: var WidthFirstForest;
ancestors: HashSet[VertexID]; # Vertex IDs to start connecting from
proofs: HashSet[VertexID]; # Additional proof nodes to start from
): Result[WidthFirstForest,(VertexID,AristoError)] =
): Result[void, (VertexID,AristoError)] =
## For each vertex ID from the argument set `ancestors` find all un-labelled
## grand child vertices and build a forest (of trees) starting from the
## grand child vertices.
##
var
wff: WidthFirstForest
leafs: HashSet[VertexID]
proc register(wff: var WidthFirstForest; fromVid, toVid: VertexID) =
@ -134,7 +134,7 @@ proc pedigree(
let children = vtx.subVids
if children.len == 0:
# This is an isolated leaf node
wff.leaf.incl root
wff.leaf.add root
else:
wff.root.incl root
for child in vtx.subVids:
@ -187,18 +187,19 @@ proc pedigree(
redo.swap leafs
ok move(wff)
ok()
# ------------------------------------------------------------------------------
# Private functions, tree traversal
# ------------------------------------------------------------------------------
proc createSched(
wff: var WidthFirstForest; # Search tree to create
db: AristoDbRef; # Database, top layer
): Result[WidthFirstForest,(VertexID,AristoError)] =
): Result[void,(VertexID,AristoError)] =
## Create width-first search schedule (aka forest)
##
var wff = ? db.pedigree(db.dirty, db.pPrf)
? db.pedigree(wff, db.dirty, db.pPrf)
if 0 < wff.leaf.len:
for vid in wff.leaf:
@ -210,8 +211,9 @@ proc createSched(
return err((needed,HashifyVtxUnresolved))
continue
db.layersPutKey(VertexID(1), vid, node.digestTo(HashKey))
wff.leaf.reset() # No longer needed
ok move(wff)
ok()
proc processSched(
@ -306,7 +308,8 @@ proc hashify*(
##
if 0 < db.dirty.len:
# Set up widh-first traversal schedule
var wff = ? db.createSched()
var wff: WidthFirstForest
? wff.createSched db
# Traverse tree spanned by `wff` and label remaining vertices.
? wff.processSched db

View File

@ -132,29 +132,27 @@ proc putBegFn(db: MemBackendRef): PutBegFn =
proc putVtxFn(db: MemBackendRef): PutVtxFn =
result =
proc(hdl: PutHdlRef; vrps: openArray[(VertexID,VertexRef)]) =
proc(hdl: PutHdlRef; vid: VertexID; vtx: VertexRef) =
let hdl = hdl.getSession db
if hdl.error.isNil:
for (vid,vtx) in vrps:
if vtx.isValid:
let rc = vtx.blobify()
if rc.isErr:
hdl.error = TypedPutHdlErrRef(
pfx: VtxPfx,
vid: vid,
code: rc.error)
return
hdl.sTab[vid] = rc.value
else:
hdl.sTab[vid] = EmptyBlob
if vtx.isValid:
let rc = vtx.blobify()
if rc.isErr:
hdl.error = TypedPutHdlErrRef(
pfx: VtxPfx,
vid: vid,
code: rc.error)
return
hdl.sTab[vid] = rc.value
else:
hdl.sTab[vid] = EmptyBlob
proc putKeyFn(db: MemBackendRef): PutKeyFn =
result =
proc(hdl: PutHdlRef; vkps: openArray[(VertexID,HashKey)]) =
proc(hdl: PutHdlRef; vid: VertexID, key: HashKey) =
let hdl = hdl.getSession db
if hdl.error.isNil:
for (vid,key) in vkps:
hdl.kMap[vid] = key
hdl.kMap[vid] = key
proc putTuvFn(db: MemBackendRef): PutTuvFn =
result =

View File

@ -143,10 +143,10 @@ proc putBegFn(db: RdbBackendRef): PutBegFn =
proc putVtxFn(db: RdbBackendRef): PutVtxFn =
result =
proc(hdl: PutHdlRef; vrps: openArray[(VertexID,VertexRef)]) =
proc(hdl: PutHdlRef; vid: VertexID; vtx: VertexRef) =
let hdl = hdl.getSession db
if hdl.error.isNil:
db.rdb.putVtx(vrps).isOkOr:
db.rdb.putVtx(vid, vtx).isOkOr:
hdl.error = TypedPutHdlErrRef(
pfx: VtxPfx,
vid: error[0],
@ -155,10 +155,10 @@ proc putVtxFn(db: RdbBackendRef): PutVtxFn =
proc putKeyFn(db: RdbBackendRef): PutKeyFn =
result =
proc(hdl: PutHdlRef; vkps: openArray[(VertexID,HashKey)]) =
proc(hdl: PutHdlRef; vid: VertexID, key: HashKey) =
let hdl = hdl.getSession db
if hdl.error.isNil:
db.rdb.putKey(vkps).isOkOr:
db.rdb.putKey(vid, key).isOkOr:
hdl.error = TypedPutHdlErrRef(
pfx: KeyPfx,
vid: error[0],

View File

@ -88,71 +88,67 @@ proc putAdm*(
proc putKey*(
rdb: var RdbInst;
data: openArray[(VertexID,HashKey)];
vid: VertexID, key: HashKey;
): Result[void,(VertexID,AristoError,string)] =
let dsc = rdb.session
for (vid,key) in data:
if key.isValid:
dsc.put(vid.toOpenArray, key.data, $KeyCF).isOkOr:
# Caller must `rollback()` which will flush the `rdKeyLru` cache
const errSym = RdbBeDriverPutKeyError
when extraTraceMessages:
trace logTxt "putKey()", vid, error=errSym, info=error
return err((vid,errSym,error))
if key.isValid:
dsc.put(vid.toOpenArray, key.data, $KeyCF).isOkOr:
# Caller must `rollback()` which will flush the `rdKeyLru` cache
const errSym = RdbBeDriverPutKeyError
when extraTraceMessages:
trace logTxt "putKey()", vid, error=errSym, info=error
return err((vid,errSym,error))
# Update cache
if not rdb.rdKeyLru.lruUpdate(vid, key):
discard rdb.rdKeyLru.lruAppend(vid, key, RdKeyLruMaxSize)
# Update cache
if not rdb.rdKeyLru.lruUpdate(vid, key):
discard rdb.rdKeyLru.lruAppend(vid, key, RdKeyLruMaxSize)
else:
dsc.delete(vid.toOpenArray, $KeyCF).isOkOr:
# Caller must `rollback()` which will flush the `rdKeyLru` cache
const errSym = RdbBeDriverDelKeyError
when extraTraceMessages:
trace logTxt "putKey()", vid, error=errSym, info=error
return err((vid,errSym,error))
else:
dsc.delete(vid.toOpenArray, $KeyCF).isOkOr:
# Caller must `rollback()` which will flush the `rdKeyLru` cache
const errSym = RdbBeDriverDelKeyError
when extraTraceMessages:
trace logTxt "putKey()", vid, error=errSym, info=error
return err((vid,errSym,error))
# Update cache, vertex will most probably never be visited anymore
rdb.rdKeyLru.del vid
# Update cache, vertex will most probably never be visited anymore
rdb.rdKeyLru.del vid
ok()
proc putVtx*(
rdb: var RdbInst;
data: openArray[(VertexID,VertexRef)];
vid: VertexID; vtx: VertexRef
): Result[void,(VertexID,AristoError,string)] =
let dsc = rdb.session
for (vid,vtx) in data:
if vtx.isValid:
let rc = vtx.blobify()
if rc.isErr:
# Caller must `rollback()` which will flush the `rdVtxLru` cache
return err((vid,rc.error,""))
if vtx.isValid:
let rc = vtx.blobify()
if rc.isErr:
# Caller must `rollback()` which will flush the `rdVtxLru` cache
return err((vid,rc.error,""))
dsc.put(vid.toOpenArray, rc.value, $VtxCF).isOkOr:
# Caller must `rollback()` which will flush the `rdVtxLru` cache
const errSym = RdbBeDriverPutVtxError
when extraTraceMessages:
trace logTxt "putVtx()", vid, error=errSym, info=error
return err((vid,errSym,error))
dsc.put(vid.toOpenArray, rc.value, $VtxCF).isOkOr:
# Caller must `rollback()` which will flush the `rdVtxLru` cache
const errSym = RdbBeDriverPutVtxError
when extraTraceMessages:
trace logTxt "putVtx()", vid, error=errSym, info=error
return err((vid,errSym,error))
# Update cache
if not rdb.rdVtxLru.lruUpdate(vid, vtx):
discard rdb.rdVtxLru.lruAppend(vid, vtx, RdVtxLruMaxSize)
# Update cache
if not rdb.rdVtxLru.lruUpdate(vid, vtx):
discard rdb.rdVtxLru.lruAppend(vid, vtx, RdVtxLruMaxSize)
else:
dsc.delete(vid.toOpenArray, $VtxCF).isOkOr:
# Caller must `rollback()` which will flush the `rdVtxLru` cache
const errSym = RdbBeDriverDelVtxError
when extraTraceMessages:
trace logTxt "putVtx()", vid, error=errSym, info=error
return err((vid,errSym,error))
else:
dsc.delete(vid.toOpenArray, $VtxCF).isOkOr:
# Caller must `rollback()` which will flush the `rdVtxLru` cache
const errSym = RdbBeDriverDelVtxError
when extraTraceMessages:
trace logTxt "putVtx()", vid, error=errSym, info=error
return err((vid,errSym,error))
# Update cache, vertex will most probably never be visited anymore
rdb.rdVtxLru.del vid
# Update cache, vertex will most probably never be visited anymore
rdb.rdVtxLru.del vid
ok()

View File

@ -18,7 +18,7 @@ const
# https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
defaultMaxOpenFiles* = 512
defaultWriteBufferSize* = 64 * 1024 * 1024
defaultRowCacheSize* = 2048 * 1024 * 1024
defaultRowCacheSize* = 4096 * 1024 * 1024
defaultBlockCacheSize* = 256 * 1024 * 1024
type DbOptions* = object # Options that are transported to the database layer