mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-01-25 03:28:57 +00:00
Update Nimbus codebase to use the new nim-rocksdb API. (#2054)
* Bump nim-rocksdb. * Update codebase to use latest nim-rocksdb API. * Update copyright notices. * Fix memory leak due to allocCStringArray without deallocCStringArray. * Improve kvstore_rocksdb code. * Refactor and cleanup RocksStoreRef. * Update nim-rocksdb submodule to latest.
This commit is contained in:
parent
587ca3abbe
commit
11691c33e9
@ -1,5 +1,5 @@
|
||||
# nimbus-eth1
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
@ -16,6 +16,7 @@
|
||||
import
|
||||
std/[tables, os],
|
||||
eth/common,
|
||||
rocksdb/lib/librocksdb,
|
||||
rocksdb,
|
||||
stew/endians2,
|
||||
../../aristo_desc,
|
||||
@ -23,17 +24,18 @@ import
|
||||
|
||||
type
|
||||
RdbInst* = object
|
||||
store*: RocksDBInstance ## Rocks DB database handler
|
||||
dbOpts*: DbOptionsRef
|
||||
store*: RocksDbReadWriteRef ## Rocks DB database handler
|
||||
basePath*: string ## Database directory
|
||||
|
||||
# Low level Rocks DB access for bulk store
|
||||
envOpt*: rocksdb_envoptions_t
|
||||
impOpt*: rocksdb_ingestexternalfileoptions_t
|
||||
envOpt*: ptr rocksdb_envoptions_t
|
||||
impOpt*: ptr rocksdb_ingestexternalfileoptions_t
|
||||
|
||||
RdbKey* = array[1 + sizeof VertexID, byte]
|
||||
## Sub-table key, <pfx> + VertexID
|
||||
|
||||
RdbTabs* = array[StorageType,Table[uint64,Blob]]
|
||||
RdbTabs* = array[StorageType, Table[uint64,Blob]]
|
||||
## Combined table for caching data to be stored/updated
|
||||
|
||||
const
|
||||
|
@ -1,5 +1,5 @@
|
||||
# nimbus-eth1
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
@ -16,6 +16,7 @@
|
||||
import
|
||||
std/os,
|
||||
chronicles,
|
||||
rocksdb/lib/librocksdb,
|
||||
rocksdb,
|
||||
results,
|
||||
../../aristo_desc,
|
||||
@ -61,15 +62,19 @@ proc init*(
|
||||
except OSError, IOError:
|
||||
return err((RdbBeCantCreateTmpDir, ""))
|
||||
|
||||
let rc = rdb.store.init(
|
||||
dbPath=dataDir, dbBackuppath=backupsDir, readOnly=false,
|
||||
maxOpenFiles=openMax)
|
||||
let dbOpts = defaultDbOptions()
|
||||
dbOpts.setMaxOpenFiles(openMax)
|
||||
|
||||
let rc = openRocksDb(dataDir, dbOpts)
|
||||
if rc.isErr:
|
||||
let error = RdbBeDriverInitError
|
||||
debug logTxt "driver failed", dataDir, backupsDir, openMax,
|
||||
error, info=rc.error
|
||||
return err((RdbBeDriverInitError, rc.error))
|
||||
|
||||
rdb.dbOpts = dbOpts
|
||||
rdb.store = rc.get()
|
||||
|
||||
# The following is a default setup (subject to change)
|
||||
rdb.impOpt = rocksdb_ingestexternalfileoptions_create()
|
||||
rdb.envOpt = rocksdb_envoptions_create()
|
||||
|
@ -1,5 +1,5 @@
|
||||
# nimbus-eth1
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
@ -17,6 +17,7 @@ import
|
||||
std/[algorithm, os, sequtils, strutils, sets, tables],
|
||||
chronicles,
|
||||
eth/common,
|
||||
rocksdb/lib/librocksdb,
|
||||
rocksdb,
|
||||
results,
|
||||
"../.."/[aristo_constants, aristo_desc],
|
||||
@ -28,7 +29,7 @@ logScope:
|
||||
|
||||
type
|
||||
RdbPutSession = object
|
||||
writer: rocksdb_sstfilewriter_t
|
||||
writer: ptr rocksdb_sstfilewriter_t
|
||||
sstPath: string
|
||||
nRecords: int
|
||||
|
||||
@ -75,7 +76,7 @@ proc begin(
|
||||
var csError: cstring
|
||||
|
||||
var session = RdbPutSession(
|
||||
writer: rocksdb_sstfilewriter_create(rdb.envOpt, rdb.store.options),
|
||||
writer: rocksdb_sstfilewriter_create(rdb.envOpt, rdb.dbOpts.cPtr),
|
||||
sstPath: rdb.sstFilePath)
|
||||
|
||||
if session.writer.isNil:
|
||||
@ -88,7 +89,7 @@ proc begin(
|
||||
session.sstPath.rmFileIgnExpt
|
||||
|
||||
session.writer.rocksdb_sstfilewriter_open(
|
||||
session.sstPath.cstring, addr csError)
|
||||
session.sstPath.cstring, cast[cstringArray](csError.addr))
|
||||
if not csError.isNil:
|
||||
session.destroy()
|
||||
return err((RdbBeOpenSstWriter, $csError))
|
||||
@ -111,7 +112,8 @@ proc add(
|
||||
|
||||
session.writer.rocksdb_sstfilewriter_add(
|
||||
cast[cstring](unsafeAddr key[0]), csize_t(key.len),
|
||||
cast[cstring](unsafeAddr val[0]), csize_t(val.len), addr csError)
|
||||
cast[cstring](unsafeAddr val[0]), csize_t(val.len),
|
||||
cast[cstringArray](csError.addr))
|
||||
if not csError.isNil:
|
||||
return err((RdbBeAddSstWriter, $csError))
|
||||
|
||||
@ -129,12 +131,13 @@ proc commit(
|
||||
var csError: cstring
|
||||
|
||||
if 0 < session.nRecords:
|
||||
session.writer.rocksdb_sstfilewriter_finish(addr csError)
|
||||
session.writer.rocksdb_sstfilewriter_finish(cast[cstringArray](csError.addr))
|
||||
if not csError.isNil:
|
||||
return err((RdbBeFinishSstWriter, $csError))
|
||||
|
||||
rdb.store.db.rocksdb_ingest_external_file(
|
||||
[session.sstPath].allocCStringArray, 1, rdb.impOpt, addr csError)
|
||||
var sstPath = session.sstPath.cstring
|
||||
rdb.store.cPtr.rocksdb_ingest_external_file(
|
||||
cast[cstringArray](sstPath.addr), 1, rdb.impOpt, cast[cstringArray](csError.addr))
|
||||
if not csError.isNil:
|
||||
return err((RdbBeIngestSstWriter, $csError))
|
||||
|
||||
@ -186,7 +189,7 @@ proc put*(
|
||||
|
||||
# Delete vertices after successfully updating vertices with non-zero values.
|
||||
for key in delKey:
|
||||
let rc = rdb.store.del key
|
||||
let rc = rdb.store.delete key
|
||||
if rc.isErr:
|
||||
return err((RdbBeDriverDelError,rc.error))
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
# nimbus-eth1
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
@ -17,6 +17,7 @@ import
|
||||
std/sequtils,
|
||||
eth/common,
|
||||
stew/endians2,
|
||||
rocksdb/lib/librocksdb,
|
||||
rocksdb,
|
||||
../init_common,
|
||||
./rdb_desc
|
||||
@ -49,8 +50,13 @@ iterator walk*(
|
||||
## Walk over all key-value pairs of the database.
|
||||
##
|
||||
## Non-decodable entries are stepped over and ignored.
|
||||
let rit = rdb.store.db.rocksdb_create_iterator(rdb.store.readOptions)
|
||||
defer: rit.rocksdb_iter_destroy()
|
||||
|
||||
let
|
||||
readOptions = rocksdb_readoptions_create()
|
||||
rit = rdb.store.cPtr.rocksdb_create_iterator(readOptions)
|
||||
defer:
|
||||
rit.rocksdb_iter_destroy()
|
||||
readOptions.rocksdb_readoptions_destroy()
|
||||
|
||||
rit.rocksdb_iter_seek_to_first()
|
||||
|
||||
@ -91,8 +97,12 @@ iterator walk*(
|
||||
# Unsupported
|
||||
break walkBody
|
||||
|
||||
let rit = rdb.store.db.rocksdb_create_iterator(rdb.store.readOptions)
|
||||
defer: rit.rocksdb_iter_destroy()
|
||||
let
|
||||
readOptions = rocksdb_readoptions_create()
|
||||
rit = rdb.store.cPtr.rocksdb_create_iterator(readOptions)
|
||||
defer:
|
||||
rit.rocksdb_iter_destroy()
|
||||
readOptions.rocksdb_readoptions_destroy()
|
||||
|
||||
var
|
||||
kLen: csize_t
|
||||
|
@ -60,7 +60,7 @@ proc newLegacyPersistentCoreDbRef*(path: string): CoreDbRef =
|
||||
raise (ref ResultDefect)(msg: msg)
|
||||
|
||||
proc done() =
|
||||
backend.rdb.store.close()
|
||||
backend.rdb.close()
|
||||
|
||||
LegaPersDbRef(rdb: backend.rdb).init(LegacyDbPersistent, backend.trieDB, done)
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
# Nimbus
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
@ -12,45 +12,76 @@
|
||||
|
||||
import
|
||||
std/os,
|
||||
rocksdb, stew/results,
|
||||
stew/results,
|
||||
rocksdb,
|
||||
eth/db/kvstore
|
||||
|
||||
export results, kvstore
|
||||
export kvstore
|
||||
|
||||
const maxOpenFiles = 512
|
||||
|
||||
type
|
||||
RocksStoreRef* = ref object of RootObj
|
||||
store*: RocksDBInstance
|
||||
tmpDir*: string
|
||||
db: RocksDbRef
|
||||
backupEngine: BackupEngineRef
|
||||
readOnly: bool
|
||||
|
||||
proc get*(db: RocksStoreRef, key: openArray[byte], onData: kvstore.DataProc): KvResult[bool] =
|
||||
db.store.get(key, onData)
|
||||
proc readOnly*(store: RocksStoreRef): bool =
|
||||
store.readOnly
|
||||
|
||||
proc find*(db: RocksStoreRef, prefix: openArray[byte], onFind: kvstore.KeyValueProc): KvResult[int] =
|
||||
proc readOnlyDb*(store: RocksStoreRef): RocksDbReadOnlyRef =
|
||||
doAssert store.readOnly
|
||||
store.db.RocksDbReadOnlyRef
|
||||
|
||||
proc readWriteDb*(store: RocksStoreRef): RocksDbReadWriteRef =
|
||||
doAssert not store.readOnly
|
||||
store.db.RocksDbReadWriteRef
|
||||
|
||||
template validateCanWriteAndGet(store: RocksStoreRef): RocksDbReadWriteRef =
|
||||
if store.readOnly:
|
||||
raiseAssert "Unimplemented"
|
||||
store.db.RocksDbReadWriteRef
|
||||
|
||||
proc get*(store: RocksStoreRef, key: openArray[byte], onData: kvstore.DataProc): KvResult[bool] =
|
||||
store.db.get(key, onData)
|
||||
|
||||
proc find*(store: RocksStoreRef, prefix: openArray[byte], onFind: kvstore.KeyValueProc): KvResult[int] =
|
||||
raiseAssert "Unimplemented"
|
||||
|
||||
proc put*(db: RocksStoreRef, key, value: openArray[byte]): KvResult[void] =
|
||||
db.store.put(key, value)
|
||||
proc put*(store: RocksStoreRef, key, value: openArray[byte]): KvResult[void] =
|
||||
store.validateCanWriteAndGet().put(key, value)
|
||||
|
||||
proc contains*(db: RocksStoreRef, key: openArray[byte]): KvResult[bool] =
|
||||
db.store.contains(key)
|
||||
proc contains*(store: RocksStoreRef, key: openArray[byte]): KvResult[bool] =
|
||||
store.db.keyExists(key)
|
||||
|
||||
proc del*(db: RocksStoreRef, key: openArray[byte]): KvResult[bool] =
|
||||
db.store.del(key)
|
||||
proc del*(store: RocksStoreRef, key: openArray[byte]): KvResult[bool] =
|
||||
let db = store.validateCanWriteAndGet()
|
||||
|
||||
proc clear*(db: RocksStoreRef): KvResult[bool] =
|
||||
db.store.clear()
|
||||
let exists = ? db.keyExists(key)
|
||||
if not exists:
|
||||
return ok(false)
|
||||
|
||||
proc close*(db: RocksStoreRef) =
|
||||
db.store.close
|
||||
let res = db.delete(key)
|
||||
if res.isErr():
|
||||
return err(res.error())
|
||||
|
||||
ok(true)
|
||||
|
||||
proc clear*(store: RocksStoreRef): KvResult[bool] =
|
||||
raiseAssert "Unimplemented"
|
||||
|
||||
proc close*(store: RocksStoreRef) =
|
||||
store.db.close()
|
||||
store.backupEngine.close()
|
||||
|
||||
proc init*(
|
||||
T: type RocksStoreRef, basePath: string, name: string,
|
||||
T: type RocksStoreRef,
|
||||
basePath: string,
|
||||
name: string,
|
||||
readOnly = false): KvResult[T] =
|
||||
|
||||
let
|
||||
dataDir = basePath / name / "data"
|
||||
# tmpDir = basePath / name / "tmp" -- notused
|
||||
backupsDir = basePath / name / "backups"
|
||||
|
||||
try:
|
||||
@ -59,9 +90,14 @@ proc init*(
|
||||
except OSError, IOError:
|
||||
return err("rocksdb: cannot create database directory")
|
||||
|
||||
var store: RocksDBInstance
|
||||
if (let v = store.init(
|
||||
dataDir, backupsDir, readOnly, maxOpenFiles = maxOpenFiles); v.isErr):
|
||||
return err(v.error)
|
||||
let backupEngine = ? openBackupEngine(backupsDir)
|
||||
|
||||
ok(T(store: store))
|
||||
let dbOpts = defaultDbOptions()
|
||||
dbOpts.setMaxOpenFiles(maxOpenFiles)
|
||||
|
||||
if readOnly:
|
||||
let readOnlyDb = ? openRocksDbReadOnly(dataDir, dbOpts)
|
||||
ok(T(db: readOnlyDb, backupEngine: backupEngine, readOnly: true))
|
||||
else:
|
||||
let readWriteDb = ? openRocksDb(dataDir, dbOpts)
|
||||
ok(T(db: readWriteDb, backupEngine: backupEngine, readOnly: false))
|
||||
|
@ -1,5 +1,5 @@
|
||||
# nimbus-eth1
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
@ -15,16 +15,18 @@
|
||||
|
||||
import
|
||||
std/os,
|
||||
rocksdb/lib/librocksdb,
|
||||
rocksdb
|
||||
|
||||
type
|
||||
RdbInst* = object
|
||||
store*: RocksDBInstance ## Rocks DB database handler
|
||||
dbOpts*: DbOptionsRef
|
||||
store*: RocksDbReadWriteRef ## Rocks DB database handler
|
||||
basePath*: string ## Database directory
|
||||
|
||||
# Low level Rocks DB access for bulk store
|
||||
envOpt*: rocksdb_envoptions_t
|
||||
impOpt*: rocksdb_ingestexternalfileoptions_t
|
||||
envOpt*: ptr rocksdb_envoptions_t
|
||||
impOpt*: ptr rocksdb_ingestexternalfileoptions_t
|
||||
|
||||
const
|
||||
BaseFolder* = "nimbus" # Same as for Legacy DB
|
||||
|
@ -1,5 +1,5 @@
|
||||
# nimbus-eth1
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
@ -16,6 +16,7 @@
|
||||
import
|
||||
std/os,
|
||||
chronicles,
|
||||
rocksdb/lib/librocksdb,
|
||||
rocksdb,
|
||||
results,
|
||||
../../kvt_desc,
|
||||
@ -61,15 +62,19 @@ proc init*(
|
||||
except OSError, IOError:
|
||||
return err((RdbBeCantCreateTmpDir, ""))
|
||||
|
||||
let rc = rdb.store.init(
|
||||
dbPath=dataDir, dbBackuppath=backupsDir, readOnly=false,
|
||||
maxOpenFiles=openMax)
|
||||
let dbOpts = defaultDbOptions()
|
||||
dbOpts.setMaxOpenFiles(openMax)
|
||||
|
||||
let rc = openRocksDb(dataDir, dbOpts)
|
||||
if rc.isErr:
|
||||
let error = RdbBeDriverInitError
|
||||
debug logTxt "driver failed", dataDir, backupsDir, openMax,
|
||||
error, info=rc.error
|
||||
return err((RdbBeDriverInitError, rc.error))
|
||||
|
||||
rdb.dbOpts = dbOpts
|
||||
rdb.store = rc.get()
|
||||
|
||||
# The following is a default setup (subject to change)
|
||||
rdb.impOpt = rocksdb_ingestexternalfileoptions_create()
|
||||
rdb.envOpt = rocksdb_envoptions_create()
|
||||
|
@ -1,5 +1,5 @@
|
||||
# nimbus-eth1
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
@ -17,6 +17,7 @@ import
|
||||
std/[algorithm, os, sequtils, strutils, sets, tables],
|
||||
chronicles,
|
||||
eth/common,
|
||||
rocksdb/lib/librocksdb,
|
||||
rocksdb,
|
||||
results,
|
||||
../../kvt_desc,
|
||||
@ -27,7 +28,7 @@ logScope:
|
||||
|
||||
type
|
||||
RdbPutSession = object
|
||||
writer: rocksdb_sstfilewriter_t
|
||||
writer: ptr rocksdb_sstfilewriter_t
|
||||
sstPath: string
|
||||
nRecords: int
|
||||
|
||||
@ -74,7 +75,7 @@ proc begin(
|
||||
var csError: cstring
|
||||
|
||||
var session = RdbPutSession(
|
||||
writer: rocksdb_sstfilewriter_create(rdb.envOpt, rdb.store.options),
|
||||
writer: rocksdb_sstfilewriter_create(rdb.envOpt, rdb.dbOpts.cPtr),
|
||||
sstPath: rdb.sstFilePath)
|
||||
|
||||
if session.writer.isNil:
|
||||
@ -82,7 +83,7 @@ proc begin(
|
||||
session.sstPath.rmFileIgnExpt
|
||||
|
||||
session.writer.rocksdb_sstfilewriter_open(
|
||||
session.sstPath.cstring, addr csError)
|
||||
session.sstPath.cstring, cast[cstringArray](csError.addr))
|
||||
if not csError.isNil:
|
||||
session.destroy()
|
||||
let info = $csError
|
||||
@ -109,7 +110,8 @@ proc add(
|
||||
|
||||
session.writer.rocksdb_sstfilewriter_add(
|
||||
cast[cstring](unsafeAddr key[0]), csize_t(key.len),
|
||||
cast[cstring](unsafeAddr val[0]), csize_t(val.len), addr csError)
|
||||
cast[cstring](unsafeAddr val[0]), csize_t(val.len),
|
||||
cast[cstringArray](csError.addr))
|
||||
if not csError.isNil:
|
||||
return err((RdbBeAddSstWriter, $csError))
|
||||
|
||||
@ -127,12 +129,14 @@ proc commit(
|
||||
var csError: cstring
|
||||
|
||||
if 0 < session.nRecords:
|
||||
session.writer.rocksdb_sstfilewriter_finish(addr csError)
|
||||
session.writer.rocksdb_sstfilewriter_finish(cast[cstringArray](csError.addr))
|
||||
if not csError.isNil:
|
||||
return err((RdbBeFinishSstWriter, $csError))
|
||||
|
||||
rdb.store.db.rocksdb_ingest_external_file(
|
||||
[session.sstPath].allocCStringArray, 1, rdb.impOpt, addr csError)
|
||||
var sstPath = session.sstPath.cstring
|
||||
rdb.store.cPtr.rocksdb_ingest_external_file(
|
||||
cast[cstringArray](sstPath.addr),
|
||||
1, rdb.impOpt, cast[cstringArray](csError.addr))
|
||||
if not csError.isNil:
|
||||
return err((RdbBeIngestSstWriter, $csError))
|
||||
|
||||
@ -170,7 +174,7 @@ proc put*(
|
||||
return -1
|
||||
if b.len < a.len:
|
||||
return 1
|
||||
|
||||
|
||||
for key in tab.keys.toSeq.sorted cmpBlobs:
|
||||
let val = tab.getOrVoid key
|
||||
if val.isValid:
|
||||
@ -189,7 +193,7 @@ proc put*(
|
||||
|
||||
# Delete vertices after successfully updating vertices with non-zero values.
|
||||
for key in delKey:
|
||||
let rc = rdb.store.del key
|
||||
let rc = rdb.store.delete key
|
||||
if rc.isErr:
|
||||
return err((RdbBeDriverDelError,rc.error))
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
# nimbus-eth1
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
@ -16,6 +16,7 @@
|
||||
import
|
||||
std/sequtils,
|
||||
eth/common,
|
||||
rocksdb/lib/librocksdb,
|
||||
rocksdb,
|
||||
./rdb_desc
|
||||
|
||||
@ -26,8 +27,12 @@ import
|
||||
iterator walk*(rdb: RdbInst): tuple[key: Blob, data: Blob] =
|
||||
## Walk over all key-value pairs of the database.
|
||||
##
|
||||
let rit = rdb.store.db.rocksdb_create_iterator(rdb.store.readOptions)
|
||||
defer: rit.rocksdb_iter_destroy()
|
||||
let
|
||||
readOptions = rocksdb_readoptions_create()
|
||||
rit = rdb.store.cPtr.rocksdb_create_iterator(readOptions)
|
||||
defer:
|
||||
rit.rocksdb_iter_destroy()
|
||||
readOptions.rocksdb_readoptions_destroy()
|
||||
|
||||
rit.rocksdb_iter_seek_to_first()
|
||||
while rit.rocksdb_iter_valid() != 0:
|
||||
|
@ -13,6 +13,7 @@
|
||||
import
|
||||
std/os, # std/[sequtils, strutils],
|
||||
eth/common/eth_types,
|
||||
rocksdb/lib/librocksdb,
|
||||
rocksdb,
|
||||
../../../../db/kvstore_rocksdb
|
||||
|
||||
@ -21,9 +22,10 @@ import
|
||||
type
|
||||
RockyBulkLoadRef* = ref object of RootObj
|
||||
db: RocksStoreRef
|
||||
envOption: rocksdb_envoptions_t
|
||||
importOption: rocksdb_ingestexternalfileoptions_t
|
||||
writer: rocksdb_sstfilewriter_t
|
||||
dbOption: ptr rocksdb_options_t
|
||||
envOption: ptr rocksdb_envoptions_t
|
||||
importOption: ptr rocksdb_ingestexternalfileoptions_t
|
||||
writer: ptr rocksdb_sstfilewriter_t
|
||||
filePath: string
|
||||
csError: string
|
||||
|
||||
@ -34,7 +36,7 @@ type
|
||||
proc init*(
|
||||
T: type RockyBulkLoadRef;
|
||||
db: RocksStoreRef;
|
||||
envOption: rocksdb_envoptions_t
|
||||
envOption: ptr rocksdb_envoptions_t
|
||||
): T =
|
||||
## Create a new bulk load descriptor.
|
||||
result = T(
|
||||
@ -53,7 +55,7 @@ proc clearCacheFile*(db: RocksStoreRef; fileName: string): bool
|
||||
{.gcsafe, raises: [OSError].} =
|
||||
## Remove left-over cache file from an imcomplete previous session. The
|
||||
## return value `true` indicated that a cache file was detected.
|
||||
let filePath = db.tmpDir / fileName
|
||||
let filePath = fileName
|
||||
if filePath.fileExists:
|
||||
filePath.removeFile
|
||||
return true
|
||||
@ -69,6 +71,8 @@ proc destroy*(rbl: RockyBulkLoadRef) {.gcsafe, raises: [OSError].} =
|
||||
##
|
||||
if not rbl.writer.isNil:
|
||||
rbl.writer.rocksdb_sstfilewriter_destroy()
|
||||
if not rbl.dbOption.isNil:
|
||||
rbl.dbOption.rocksdb_options_destroy()
|
||||
if not rbl.envOption.isNil:
|
||||
rbl.envOption.rocksdb_envoptions_destroy()
|
||||
if not rbl.importOption.isNil:
|
||||
@ -85,9 +89,9 @@ proc lastError*(rbl: RockyBulkLoadRef): string =
|
||||
## Get last error explainer
|
||||
rbl.csError
|
||||
|
||||
proc store*(rbl: RockyBulkLoadRef): RocksDBInstance =
|
||||
proc store*(rbl: RockyBulkLoadRef): RocksDbReadWriteRef =
|
||||
## Provide the diecriptor for backend functions as defined in `rocksdb`.
|
||||
rbl.db.store
|
||||
rbl.db.readWriteDb()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
@ -97,22 +101,23 @@ proc begin*(rbl: RockyBulkLoadRef; fileName: string): bool =
|
||||
## Begin a new bulk load session storing data into a temporary cache file
|
||||
## `fileName`. When finished, this file will bi direcly imported into the
|
||||
## database.
|
||||
rbl.writer = rocksdb_sstfilewriter_create(
|
||||
rbl.envOption, rbl.db.store.options)
|
||||
|
||||
rbl.dbOption = rocksdb_options_create()
|
||||
rbl.writer = rocksdb_sstfilewriter_create(rbl.envOption, rbl.dbOption)
|
||||
if rbl.writer.isNil:
|
||||
rbl.csError = "Cannot create sst writer session"
|
||||
return false
|
||||
|
||||
rbl.csError = ""
|
||||
let filePath = rbl.db.tmpDir / fileName
|
||||
let filePath = fileName
|
||||
var csError: cstring
|
||||
rbl.writer.rocksdb_sstfilewriter_open(fileName, addr csError)
|
||||
rbl.writer.rocksdb_sstfilewriter_open(fileName, cast[cstringArray](csError.addr))
|
||||
if not csError.isNil:
|
||||
rbl.csError = $csError
|
||||
return false
|
||||
|
||||
rbl.filePath = filePath
|
||||
return true
|
||||
return true
|
||||
|
||||
proc add*(
|
||||
rbl: RockyBulkLoadRef;
|
||||
@ -129,7 +134,7 @@ proc add*(
|
||||
rbl.writer.rocksdb_sstfilewriter_add(
|
||||
cast[cstring](unsafeAddr key[0]), csize_t(key.len),
|
||||
cast[cstring](unsafeAddr val[0]), csize_t(val.len),
|
||||
addr csError)
|
||||
cast[cstringArray](csError.addr))
|
||||
if csError.isNil:
|
||||
return true
|
||||
rbl.csError = $csError
|
||||
@ -145,13 +150,14 @@ proc finish*(
|
||||
## If successful, the return value is the size of the SST file used if
|
||||
## that value is available. Otherwise, `0` is returned.
|
||||
var csError: cstring
|
||||
rbl.writer.rocksdb_sstfilewriter_finish(addr csError)
|
||||
rbl.writer.rocksdb_sstfilewriter_finish(cast[cstringArray](csError.addr))
|
||||
|
||||
var filePath = rbl.filePath.cstring
|
||||
if csError.isNil:
|
||||
rbl.db.store.db.rocksdb_ingest_external_file(
|
||||
[rbl.filePath].allocCStringArray, 1,
|
||||
rbl.db.readWriteDb().cPtr.rocksdb_ingest_external_file(
|
||||
cast[cstringArray](filePath.addr), 1,
|
||||
rbl.importOption,
|
||||
addr csError)
|
||||
cast[cstringArray](csError.addr))
|
||||
|
||||
if csError.isNil:
|
||||
var
|
||||
|
@ -1,5 +1,5 @@
|
||||
# Nimbus
|
||||
# Copyright (c) 2023 Status Research & Development GmbH
|
||||
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
|
@ -1,5 +1,5 @@
|
||||
# Nimbus
|
||||
# Copyright (c) 2021-2023 Status Research & Development GmbH
|
||||
# Copyright (c) 2021-2024 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
@ -12,6 +12,7 @@ import
|
||||
std/[os, sequtils, strformat, strutils],
|
||||
chronicles,
|
||||
eth/common,
|
||||
rocksdb/lib/librocksdb,
|
||||
rocksdb,
|
||||
stew/byteutils,
|
||||
../../nimbus/db/kvstore_rocksdb,
|
||||
@ -55,8 +56,8 @@ proc walkAllDb(
|
||||
) =
|
||||
## Walk over all key-value pairs of the database (`RocksDB` only.)
|
||||
let
|
||||
rop = rocky.store.readOptions
|
||||
rit = rocky.store.db.rocksdb_create_iterator(rop)
|
||||
rop = rocksdb_readoptions_create()
|
||||
rit = rocky.readWriteDb.cPtr.rocksdb_create_iterator(rop)
|
||||
|
||||
rit.rocksdb_iter_seek_to_first()
|
||||
while rit.rocksdb_iter_valid() != 0:
|
||||
@ -83,6 +84,7 @@ proc walkAllDb(
|
||||
# End while
|
||||
|
||||
rit.rocksdb_iter_destroy()
|
||||
rop.rocksdb_readoptions_destroy()
|
||||
|
||||
proc dumpAllDbImpl(
|
||||
rocky: RocksStoreRef; # Persistent database handle
|
||||
|
@ -16,6 +16,7 @@ import
|
||||
eth/[common, p2p],
|
||||
rocksdb,
|
||||
unittest2,
|
||||
../nimbus/db/kvstore_rocksdb,
|
||||
../nimbus/db/core_db/persistent,
|
||||
../nimbus/core/chain,
|
||||
../nimbus/sync/snap/range_desc,
|
||||
@ -119,7 +120,7 @@ proc flushDbs(db: TestDbs) =
|
||||
for n in 0 ..< nTestDbInstances:
|
||||
if db.cdb[n].isNil or db.cdb[n].dbType != LegacyDbPersistent:
|
||||
break
|
||||
db.cdb[n].backend.toRocksStoreRef.store.db.rocksdb_close
|
||||
db.cdb[n].backend.toRocksStoreRef.close()
|
||||
db.baseDir.flushDbDir(db.subDir)
|
||||
|
||||
proc testDbs(
|
||||
|
@ -1,5 +1,5 @@
|
||||
# Nimbus
|
||||
# Copyright (c) 2022-2023 Status Research & Development GmbH
|
||||
# Copyright (c) 2022-2024 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
@ -13,9 +13,11 @@
|
||||
import
|
||||
std/[algorithm, math, sequtils, strformat, times],
|
||||
stew/byteutils,
|
||||
rocksdb/lib/librocksdb,
|
||||
rocksdb,
|
||||
unittest2,
|
||||
../../nimbus/core/chain,
|
||||
../../nimbus/db/kvstore_rocksdb,
|
||||
../../nimbus/db/core_db,
|
||||
../../nimbus/db/core_db/persistent,
|
||||
../../nimbus/sync/snap/range_desc,
|
||||
@ -57,7 +59,7 @@ proc to*(t: NodeTag; T: type Blob): T =
|
||||
|
||||
# ----------------
|
||||
|
||||
proc thisRecord(r: rocksdb_iterator_t): (Blob,Blob) =
|
||||
proc thisRecord(r: ptr rocksdb_iterator_t): (Blob,Blob) =
|
||||
var kLen, vLen: csize_t
|
||||
let
|
||||
kData = r.rocksdb_iter_key(addr kLen)
|
||||
@ -134,8 +136,8 @@ proc test_dbTimingRockySetup*(
|
||||
## Extract key-value records into memory tables via rocksdb iterator
|
||||
let
|
||||
rdb = cdb.backend.toRocksStoreRef
|
||||
rop = rdb.store.readOptions
|
||||
rit = rdb.store.db.rocksdb_create_iterator(rop)
|
||||
rop = rocksdb_readoptions_create()
|
||||
rit = rdb.readWriteDb().cPtr.rocksdb_create_iterator(rop)
|
||||
check not rit.isNil
|
||||
|
||||
var
|
||||
@ -163,6 +165,7 @@ proc test_dbTimingRockySetup*(
|
||||
noisy.say "***", "ignoring key=", key.toHex
|
||||
|
||||
rit.rocksdb_iter_destroy()
|
||||
rop.rocksdb_readoptions_destroy()
|
||||
|
||||
var
|
||||
(mean32, stdv32) = meanStdDev(v32Sum, v32SqSum, t32.len)
|
||||
|
@ -151,7 +151,7 @@ proc flushDbs(db: TestDbs) =
|
||||
for n in 0 ..< nTestDbInstances:
|
||||
if db.cdb[n].isNil or db.cdb[n].dbType != LegacyDbPersistent:
|
||||
break
|
||||
db.cdb[n].backend.toRocksStoreRef.store.db.rocksdb_close
|
||||
db.cdb[n].backend.toRocksStoreRef.close()
|
||||
db.baseDir.flushDbDir(db.subDir)
|
||||
|
||||
proc testDbs(
|
||||
|
@ -1,5 +1,5 @@
|
||||
# Nimbus
|
||||
# Copyright (c) 2022-2023 Status Research & Development GmbH
|
||||
# Copyright (c) 2022-2024 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
|
2
vendor/nim-rocksdb
vendored
2
vendor/nim-rocksdb
vendored
@ -1 +1 @@
|
||||
Subproject commit 5e2b026f841fa57b10e669a2ab1d88a0e40f6bbe
|
||||
Subproject commit 5f6282e8d43ae27c5b46542c271a5776d26daaf2
|
Loading…
x
Reference in New Issue
Block a user