mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-01-11 21:04:11 +00:00
Experimental bulk loader tests (#1187)
why: Rocksdb bulk loading might provide a slight advantage when loading larger data sets into the system
This commit is contained in:
parent
f0cd340163
commit
7d7e26d45f
@ -11,7 +11,8 @@ const maxOpenFiles = 512
|
||||
|
||||
type
|
||||
RocksStoreRef* = ref object of RootObj
|
||||
store: RocksDBInstance
|
||||
store*: RocksDBInstance
|
||||
tmpDir*: string
|
||||
|
||||
proc get*(db: RocksStoreRef, key: openArray[byte], onData: kvstore.DataProc): KvResult[bool] =
|
||||
db.store.get(key, onData)
|
||||
@ -36,6 +37,7 @@ proc init*(
|
||||
readOnly = false): KvResult[T] =
|
||||
let
|
||||
dataDir = basePath / name / "data"
|
||||
tmpDir = basePath / name / "tmp"
|
||||
backupsDir = basePath / name / "backups"
|
||||
|
||||
try:
|
||||
|
@ -6,7 +6,7 @@ export kvstore
|
||||
# code that uses it isn't equipped to handle errors of that kind - this should
|
||||
# be reconsidered when making more changes here.
|
||||
|
||||
type DbBackend = enum
|
||||
type DbBackend* = enum
|
||||
none,
|
||||
sqlite,
|
||||
rocksdb,
|
||||
@ -14,11 +14,18 @@ type DbBackend = enum
|
||||
|
||||
const
|
||||
nimbus_db_backend* {.strdefine.} = "rocksdb"
|
||||
dbBackend = parseEnum[DbBackend](nimbus_db_backend)
|
||||
dbBackend* = parseEnum[DbBackend](nimbus_db_backend)
|
||||
|
||||
when dbBackend == sqlite:
|
||||
import eth/db/kvstore_sqlite3 as database_backend
|
||||
elif dbBackend == rocksdb:
|
||||
import ./kvstore_rocksdb as database_backend
|
||||
|
||||
type
|
||||
ChainDB* = ref object of RootObj
|
||||
kv: KvStoreRef
|
||||
kv*: KvStoreRef
|
||||
when dbBackend == rocksdb:
|
||||
rdb*: RocksStoreRef
|
||||
|
||||
# TODO KvStore is a virtual interface and TrieDB is a virtual interface - one
|
||||
# will be enough eventually - unless the TrieDB interface gains operations
|
||||
@ -39,14 +46,13 @@ proc del*(db: ChainDB, key: openArray[byte]) =
|
||||
db.kv.del(key).expect("working database")
|
||||
|
||||
when dbBackend == sqlite:
|
||||
import eth/db/kvstore_sqlite3 as database_backend
|
||||
proc newChainDB*(path: string): ChainDB =
|
||||
let db = SqStoreRef.init(path, "nimbus").expect("working database")
|
||||
ChainDB(kv: kvStore db.openKvStore().expect("working database"))
|
||||
elif dbBackend == rocksdb:
|
||||
import ./kvstore_rocksdb as database_backend
|
||||
proc newChainDB*(path: string): ChainDB =
|
||||
ChainDB(kv: kvStore RocksStoreRef.init(path, "nimbus").tryGet())
|
||||
let rdb = RocksStoreRef.init(path, "nimbus").tryGet()
|
||||
ChainDB(kv: kvStore rdb, rdb: rdb)
|
||||
elif dbBackend == lmdb:
|
||||
# TODO This implementation has several issues on restricted platforms, possibly
|
||||
# due to mmap restrictions - see:
|
||||
|
@ -56,6 +56,7 @@ type
|
||||
chainRef: Chain
|
||||
txPool: TxPoolRef
|
||||
networkLoop: Future[void]
|
||||
dbBackend: ChainDB
|
||||
|
||||
proc importBlocks(conf: NimbusConf, chainDB: BaseChainDB) =
|
||||
if string(conf.blocksFile).len > 0:
|
||||
@ -149,7 +150,7 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf,
|
||||
FullSyncRef.init(nimbus.ethNode, conf.maxPeers, tickerOK).start
|
||||
of SyncMode.Snap:
|
||||
SnapSyncRef.init(nimbus.ethNode, nimbus.chainRef, nimbus.ctx.rng,
|
||||
conf.maxPeers, tickerOK).start
|
||||
conf.maxPeers, nimbus.dbBackend, tickerOK).start
|
||||
of SyncMode.Default:
|
||||
discard
|
||||
|
||||
@ -361,7 +362,8 @@ proc start(nimbus: NimbusNode, conf: NimbusConf) =
|
||||
evmcSetLibraryPath(conf.evm)
|
||||
|
||||
createDir(string conf.dataDir)
|
||||
let trieDB = trieDB newChainDB(string conf.dataDir)
|
||||
nimbus.dbBackend = newChainDB(string conf.dataDir)
|
||||
let trieDB = trieDB nimbus.dbBackend
|
||||
var chainDB = newBaseChainDB(trieDB,
|
||||
conf.pruneMode == PruneMode.Full,
|
||||
conf.networkId,
|
||||
|
@ -12,6 +12,7 @@ import
|
||||
eth/[common/eth_types, p2p],
|
||||
chronicles,
|
||||
chronos,
|
||||
../db/select_backend,
|
||||
../p2p/chain,
|
||||
./snap/[worker, worker_desc],
|
||||
"."/[sync_desc, sync_sched, protocol]
|
||||
@ -59,11 +60,13 @@ proc init*(
|
||||
chain: Chain;
|
||||
rng: ref HmacDrbgContext;
|
||||
maxPeers: int;
|
||||
dbBackend: ChainDb,
|
||||
enableTicker = false): T =
|
||||
new result
|
||||
result.initSync(ethNode, maxPeers, enableTicker)
|
||||
result.ctx.chain = chain # explicitely override
|
||||
result.ctx.data.rng = rng
|
||||
result.ctx.data.dbBackend = dbBackend
|
||||
|
||||
proc start*(ctx: SnapSyncRef) =
|
||||
doAssert ctx.startSync()
|
||||
|
@ -107,6 +107,8 @@ proc `==`*(a, b: NodeTag): bool = a.u256 == b.u256
|
||||
proc `<=`*(a, b: NodeTag): bool = a.u256 <= b.u256
|
||||
proc `<`*(a, b: NodeTag): bool = a.u256 < b.u256
|
||||
|
||||
proc cmp*(x, y: NodeTag): int = cmp(x.UInt256, y.UInt256)
|
||||
|
||||
proc hash*(a: NodeTag): Hash =
|
||||
## Mixin for `Table` or `keyedQueue`
|
||||
a.to(Hash256).data.hash
|
||||
|
@ -15,6 +15,7 @@ import
|
||||
chronos,
|
||||
eth/[common/eth_types, p2p],
|
||||
stew/[interval_set, keyed_queue],
|
||||
../../db/select_backend,
|
||||
".."/[protocol, sync_desc],
|
||||
./worker/[accounts_db, fetch_accounts, pivot, ticker],
|
||||
"."/[range_desc, worker_desc]
|
||||
@ -121,7 +122,8 @@ proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater =
|
||||
activeQueues: tabLen,
|
||||
flushedQueues: ctx.data.pivotCount.int64 - tabLen,
|
||||
accounts: meanStdDev(aSum, aSqSum, count),
|
||||
fillFactor: meanStdDev(uSum, uSqSum, count))
|
||||
fillFactor: meanStdDev(uSum, uSqSum, count),
|
||||
bulkStore: ctx.data.accountsDb.dbImportStats)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public start/stop and admin functions
|
||||
@ -130,7 +132,9 @@ proc tickerUpdate*(ctx: SnapCtxRef): TickerStatsUpdater =
|
||||
proc setup*(ctx: SnapCtxRef; tickerOK: bool): bool =
|
||||
## Global set up
|
||||
ctx.data.accountRangeMax = high(UInt256) div ctx.buddiesMax.u256
|
||||
ctx.data.accountsDb = AccountsDbRef.init(ctx.chain.getTrieDB)
|
||||
ctx.data.accountsDb =
|
||||
if ctx.data.dbBackend.isNil: AccountsDbRef.init(ctx.chain.getTrieDB)
|
||||
else: AccountsDbRef.init(ctx.data.dbBackend)
|
||||
if tickerOK:
|
||||
ctx.data.ticker = TickerRef.init(ctx.tickerUpdate)
|
||||
else:
|
||||
|
@ -9,16 +9,20 @@
|
||||
# except according to those terms.
|
||||
|
||||
import
|
||||
std/[algorithm, hashes, options, sequtils, sets, strutils, strformat, tables],
|
||||
std/[algorithm, hashes, options, sequtils, sets, strutils, strformat,
|
||||
tables, times],
|
||||
chronos,
|
||||
eth/[common/eth_types, p2p, rlp],
|
||||
eth/trie/[db, nibbles, trie_defs],
|
||||
nimcrypto/keccak,
|
||||
stew/byteutils,
|
||||
stint,
|
||||
"../../.."/[db/storage_types, constants],
|
||||
rocksdb,
|
||||
../../../constants,
|
||||
../../../db/[kvstore_rocksdb, select_backend, storage_types],
|
||||
"../.."/[protocol, types],
|
||||
../range_desc
|
||||
../range_desc,
|
||||
./rocky_bulk_load
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
@ -26,30 +30,53 @@ logScope:
|
||||
topics = "snap-proof"
|
||||
|
||||
const
|
||||
BasicChainTrieEnabled = false # proof-of-concept code, currently unused
|
||||
BasicChainTrieDebugging = false
|
||||
|
||||
RepairTreeDebugging = false
|
||||
|
||||
|
||||
type
|
||||
AccountsDbError* = enum
|
||||
NothingSerious = 0
|
||||
AccountSmallerThanBase
|
||||
AccountsNotSrictlyIncreasing
|
||||
AccountRangesOverlap
|
||||
AccountRepairBlocked
|
||||
BoundaryProofFailed
|
||||
Rlp2Or17ListEntries
|
||||
RlpBlobExpected
|
||||
RlpBranchLinkExpected
|
||||
RlpEncoding
|
||||
RlpExtPathEncoding
|
||||
RlpNonEmptyBlobExpected
|
||||
BoundaryProofFailed
|
||||
|
||||
UnresolvedRepairNode
|
||||
NoRocksDbBackend
|
||||
CannotOpenRocksDbBulkSession
|
||||
AddBulkItemFailed
|
||||
CommitBulkItemsFailed
|
||||
AccountNotFound
|
||||
|
||||
HexaryGetFn = proc(key: Blob): Blob {.gcsafe.}
|
||||
## For testing/debugging
|
||||
|
||||
AccountsDbXKeyKind = enum
|
||||
## Extends `storage_types.DbDBKeyKind` for testing/debugging
|
||||
ChainDbStateRootPfx = 200 # <state-root> <hash-key> on trie db layer
|
||||
RockyBulkStateRootPfx # <state-root> <hash-key> for rocksdb bulk load
|
||||
RockyBulkHexary # <hash-key> for rocksdb bulk load
|
||||
|
||||
AccountsDbXKey = object
|
||||
## Extends `storage_types.DbDBKey` for testing/debugging
|
||||
data*: array[65, byte]
|
||||
dataEndPos*: uint8 # the last populated position in the data
|
||||
|
||||
AccountLoadStats* = object
|
||||
dura*: array[4,times.Duration] ## Accumulated time statistics
|
||||
size*: array[2,uint64] ## Accumulated size statistics
|
||||
|
||||
ByteArray32* =
|
||||
array[32,byte]
|
||||
|
||||
ByteArray33 =
|
||||
ByteArray33* =
|
||||
array[33,byte]
|
||||
|
||||
NodeKey = ## Internal DB record reference type
|
||||
@ -101,8 +128,8 @@ type
|
||||
RAccount = object
|
||||
## Temporarily stashed account data. Proper account records have non-empty
|
||||
## payload. Records with empty payload are lower boundary records.
|
||||
tag: NodeTag ## Equivalent to account hash
|
||||
key: RepairKey ## Leaf hash into hexary repair table
|
||||
pathTag: NodeTag ## Equivalent to account hash
|
||||
nodeKey: RepairKey ## Leaf hash into hexary repair table
|
||||
payload: Blob ## Data payload
|
||||
|
||||
RepairTreeDB = object
|
||||
@ -112,23 +139,30 @@ type
|
||||
|
||||
AccountsDbRef* = ref object
|
||||
db: TrieDatabaseRef ## General database
|
||||
rocky: RocksStoreRef ## Set if rocksdb is available
|
||||
aStats: AccountLoadStats ## Accumulated time and statistics
|
||||
|
||||
AccountsDbSessionRef* = ref object
|
||||
#dbTx: DbTransaction ## TBD
|
||||
keyMap: Table[RepairKey,uint] ## For debugging only (will go away)
|
||||
base: AccountsDbRef ## Back reference to common parameters
|
||||
rootKey: NodeKey ## Current root node
|
||||
peer: Peer ## For log messages
|
||||
rnDB: RepairTreeDB ## Repair database
|
||||
rpDB: RepairTreeDB ## Repair database
|
||||
dStats: AccountLoadStats ## Time and size statistics
|
||||
|
||||
const
|
||||
EmptyBlob = seq[byte].default
|
||||
EmptyNibbleRange = EmptyBlob.initNibbleRange
|
||||
|
||||
RockyBulkCache = "accounts.sst"
|
||||
|
||||
static:
|
||||
# Not that there is no doubt about this ...
|
||||
doAssert NodeKey.default.ByteArray32.initNibbleRange.len == 64
|
||||
|
||||
# Make sure that `DBKeyKind` extension does not overlap
|
||||
doAssert high(DBKeyKind).int < low(AccountsDbXKeyKind).int
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
@ -145,6 +179,9 @@ proc to(key: NodeKey; T: type NodeTag): T =
|
||||
proc to(h: Hash256; T: type NodeKey): T =
|
||||
h.data.T
|
||||
|
||||
proc to(key: NodeKey; T: type Hash256): T =
|
||||
result.Hash256.data = key.ByteArray32
|
||||
|
||||
proc to(key: NodeKey; T: type NibblesSeq): T =
|
||||
key.ByteArray32.initNibbleRange
|
||||
|
||||
@ -176,8 +213,8 @@ proc isNodeKey(a: RepairKey): bool =
|
||||
a.ByteArray33[0] == 0
|
||||
|
||||
proc newRepairKey(ps: AccountsDbSessionRef): RepairKey =
|
||||
ps.rnDB.repairKeyGen.inc
|
||||
var src = ps.rnDB.repairKeyGen.toBytesBE
|
||||
ps.rpDB.repairKeyGen.inc
|
||||
var src = ps.rpDB.repairKeyGen.toBytesBE
|
||||
(addr result.ByteArray33[25]).copyMem(addr src[0], 8)
|
||||
result.ByteArray33[0] = 1
|
||||
|
||||
@ -200,6 +237,10 @@ proc convertTo(key: RepairKey; T: type NodeKey): T =
|
||||
if key.isNodeKey:
|
||||
discard result.init(key.ByteArray33[1 .. 32])
|
||||
|
||||
proc convertTo(key: RepairKey; T: type NodeTag): T =
|
||||
if key.isNodeKey:
|
||||
result = UInt256.fromBytesBE(key.ByteArray33[1 .. 32]).T
|
||||
|
||||
proc convertTo(node: RNodeRef; T: type Blob): T =
|
||||
var writer = initRlpWriter()
|
||||
|
||||
@ -233,13 +274,19 @@ proc convertTo(node: RNodeRef; T: type Blob): T =
|
||||
|
||||
writer.finish()
|
||||
|
||||
|
||||
template noKeyError(info: static[string]; code: untyped) =
|
||||
try:
|
||||
code
|
||||
except KeyError as e:
|
||||
raiseAssert "Not possible (" & info & "): " & e.msg
|
||||
|
||||
template elapsed(duration: times.Duration; code: untyped) =
|
||||
block:
|
||||
let start = getTime()
|
||||
block:
|
||||
code
|
||||
duration = getTime() - start
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private getters & setters
|
||||
# ------------------------------------------------------------------------------
|
||||
@ -291,6 +338,8 @@ template noPpError(info: static[string]; code: untyped) =
|
||||
raiseAssert "Inconveivable (" & info & "): " & e.msg
|
||||
except KeyError as e:
|
||||
raiseAssert "Not possible (" & info & "): " & e.msg
|
||||
except OsError as e:
|
||||
raiseAssert "Ooops (" & info & "): " & e.msg
|
||||
|
||||
proc pp(s: string; hex = false): string =
|
||||
if hex:
|
||||
@ -443,6 +492,143 @@ proc pp(w: seq[RPathXStep]; ps: AccountsDbSessionRef; indent = 4): string =
|
||||
noPpError("pp(seq[RPathXStep])"):
|
||||
result = w.mapIt(it.pp(ps)).join(pfx)
|
||||
|
||||
proc pp(a: DbKey|AccountsDbXKey): string =
|
||||
a.data.toSeq.mapIt(it.toHex(2)).join
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private helpers for bulk load testing
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc chainDbHexaryKey*(a: RepairKey): ByteArray32 =
|
||||
a.convertTo(NodeKey).ByteArray32
|
||||
|
||||
proc chainDbStateRootPfxKey*(a: NodeKey; b: NodeTag): AccountsDbXKey =
|
||||
result.data[0] = byte ord(ChainDbStateRootPfx)
|
||||
result.data[1 .. 32] = a.ByteArray32
|
||||
result.data[33 .. 64] = b.to(NodeKey).ByteArray32
|
||||
result.dataEndPos = uint8 64
|
||||
|
||||
proc rockyBulkHexaryKey(a: NodeTag): DbKey =
|
||||
result.data[0] = byte ord(RockyBulkHexary)
|
||||
result.data[1 .. 32] = a.to(NodeKey).ByteArray32
|
||||
result.dataEndPos = uint8 32
|
||||
|
||||
proc rockyBulkStateRootPfxKey*(a: NodeKey; b: NodeTag): AccountsDbXKey =
|
||||
result.data[0] = byte ord(RockyBulkStateRootPfx)
|
||||
result.data[1 .. 32] = a.ByteArray32
|
||||
result.data[33 .. 64] = b.to(NodeKey).ByteArray32
|
||||
result.dataEndPos = uint8 64
|
||||
|
||||
template toOpenArray*(k: AccountsDbXKey): openArray[byte] =
|
||||
k.data.toOpenArray(0, int(k.dataEndPos))
|
||||
|
||||
template toOpenArray*(k: ByteArray32): openArray[byte] =
|
||||
k.toOpenArray(0, 31)
|
||||
|
||||
|
||||
proc storeAccountPathsOnChainDb(
|
||||
ps: AccountsDbSessionRef
|
||||
): Result[void,AccountsDbError] =
|
||||
let dbTx = ps.base.db.beginTransaction
|
||||
defer: dbTx.commit
|
||||
|
||||
for a in ps.rpDB.acc:
|
||||
if a.payload.len != 0:
|
||||
ps.base.db.put(
|
||||
ps.rootKey.chainDbStateRootPfxKey(a.pathTag).toOpenArray, a.payload)
|
||||
ok()
|
||||
|
||||
proc storeHexaryNodesOnChainDb(
|
||||
ps: AccountsDbSessionRef
|
||||
): Result[void,AccountsDbError] =
|
||||
let dbTx = ps.base.db.beginTransaction
|
||||
defer: dbTx.commit
|
||||
|
||||
for (key,value) in ps.rpDB.tab.pairs:
|
||||
if not key.isNodeKey:
|
||||
let error = UnresolvedRepairNode
|
||||
trace "Unresolved node in repair table", error
|
||||
return err(error)
|
||||
ps.base.db.put(key.chainDbHexaryKey.toOpenArray, value.convertTo(Blob))
|
||||
ok()
|
||||
|
||||
proc storeAccountPathsOnRockyDb(
|
||||
ps: AccountsDbSessionRef
|
||||
): Result[void,AccountsDbError]
|
||||
{.gcsafe, raises: [Defect,OSError,ValueError].} =
|
||||
if ps.base.rocky.isNil:
|
||||
return err(NoRocksDbBackend)
|
||||
let bulker = RockyBulkLoadRef.init(ps.base.rocky)
|
||||
defer: bulker.destroy()
|
||||
if not bulker.begin(RockyBulkCache):
|
||||
let error = CannotOpenRocksDbBulkSession
|
||||
trace "Rocky accounts session initiation failed",
|
||||
error, info=bulker.lastError()
|
||||
return err(error)
|
||||
|
||||
for n,a in ps.rpDB.acc:
|
||||
if a.payload.len != 0:
|
||||
let key = ps.rootKey.rockyBulkStateRootPfxKey(a.pathTag)
|
||||
if not bulker.add(key.toOpenArray, a.payload):
|
||||
let error = AddBulkItemFailed
|
||||
trace "Rocky accounts bulk load failure",
|
||||
n, len=ps.rpDB.acc.len, error, info=bulker.lastError()
|
||||
return err(error)
|
||||
|
||||
if bulker.finish().isErr:
|
||||
let error = CommitBulkItemsFailed
|
||||
trace "Rocky accounts commit failure",
|
||||
len=ps.rpDB.acc.len, error, info=bulker.lastError()
|
||||
return err(error)
|
||||
ok()
|
||||
|
||||
|
||||
proc storeHexaryNodesOnRockyDb(
|
||||
ps: AccountsDbSessionRef
|
||||
): Result[void,AccountsDbError]
|
||||
{.gcsafe, raises: [Defect,OSError,KeyError,ValueError].} =
|
||||
if ps.base.rocky.isNil:
|
||||
return err(NoRocksDbBackend)
|
||||
let bulker = RockyBulkLoadRef.init(ps.base.rocky)
|
||||
defer: bulker.destroy()
|
||||
if not bulker.begin(RockyBulkCache):
|
||||
let error = CannotOpenRocksDbBulkSession
|
||||
trace "Rocky hexary session initiation failed",
|
||||
error, info=bulker.lastError()
|
||||
return err(error)
|
||||
|
||||
#let keyList = toSeq(ps.rpDB.tab.keys)
|
||||
# .filterIt(it.isNodeKey)
|
||||
# .mapIt(it.convertTo(NodeTag))
|
||||
# .sorted(cmp)
|
||||
var
|
||||
keyList = newSeq[NodeTag](ps.rpDB.tab.len)
|
||||
inx = 0
|
||||
for repairKey in ps.rpDB.tab.keys:
|
||||
if repairKey.isNodeKey:
|
||||
keyList[inx] = repairKey.convertTo(NodeTag)
|
||||
inx.inc
|
||||
if inx < ps.rpDB.tab.len:
|
||||
return err(UnresolvedRepairNode)
|
||||
keyList.sort(cmp)
|
||||
|
||||
for n,nodeTag in keyList:
|
||||
let
|
||||
key = nodeTag.rockyBulkHexaryKey()
|
||||
data = ps.rpDB.tab[nodeTag.to(RepairKey)].convertTo(Blob)
|
||||
if not bulker.add(key.toOpenArray, data):
|
||||
let error = AddBulkItemFailed
|
||||
trace "Rocky hexary bulk load failure",
|
||||
n, len=ps.rpDB.tab.len, error, info=bulker.lastError()
|
||||
return err(error)
|
||||
|
||||
if bulker.finish().isErr:
|
||||
let error = CommitBulkItemsFailed
|
||||
trace "Rocky hexary commit failure",
|
||||
len=ps.rpDB.acc.len, error, info=bulker.lastError()
|
||||
return err(error)
|
||||
ok()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions
|
||||
# ------------------------------------------------------------------------------
|
||||
@ -551,7 +737,7 @@ proc hexaryImport(
|
||||
discard
|
||||
|
||||
# Add to repair database
|
||||
ps.rnDB.tab[repairKey] = rNode
|
||||
ps.rpDB.tab[repairKey] = rNode
|
||||
|
||||
# Add to hexary trie database -- disabled, using bulk import later
|
||||
#ps.base.db.put(nodeKey.ByteArray32, recData)
|
||||
@ -586,7 +772,7 @@ proc rTreeExtendLeaf(
|
||||
state: Mutable,
|
||||
kind: Leaf,
|
||||
lPfx: rPath.tail)
|
||||
ps.rnDB.tab[key] = leaf
|
||||
ps.rpDB.tab[key] = leaf
|
||||
if not key.isNodeKey:
|
||||
rPath.path[^1].node.bLink[nibble] = key
|
||||
return RPath(
|
||||
@ -645,7 +831,7 @@ proc rTreeSplitNode(
|
||||
kind: Extension,
|
||||
ePfx: result.tail.slice(0,lLen),
|
||||
eLink: ps.newRepairKey())
|
||||
ps.rnDB.tab[key] = lNode
|
||||
ps.rpDB.tab[key] = lNode
|
||||
result.path.add RPathStep(key: key, node: lNode, nibble: -1)
|
||||
result.tail = result.tail.slice(lLen)
|
||||
mKey = lNode.eLink
|
||||
@ -654,7 +840,7 @@ proc rTreeSplitNode(
|
||||
let mNode = RNodeRef(
|
||||
state: Mutable,
|
||||
kind: Branch)
|
||||
ps.rnDB.tab[mKey] = mNode
|
||||
ps.rpDB.tab[mKey] = mNode
|
||||
result.path.add RPathStep(key: mKey, node: mNode, nibble: -1) # no nibble yet
|
||||
|
||||
# Insert node (if any): right(Extension) -- not to be registered in `rPath`
|
||||
@ -662,7 +848,7 @@ proc rTreeSplitNode(
|
||||
let rKey = ps.newRepairKey()
|
||||
# Re-use argument node
|
||||
mNode.bLink[mNibble] = rKey
|
||||
ps.rnDB.tab[rKey] = node
|
||||
ps.rpDB.tab[rKey] = node
|
||||
node.xPfx = rPfx
|
||||
# Otherwise merge argument node
|
||||
elif node.kind == Extension:
|
||||
@ -680,8 +866,8 @@ proc rTreeFollow(nodeKey: NodeKey; ps: AccountsDbSessionRef): RPath =
|
||||
result.tail = nodeKey.to(NibblesSeq)
|
||||
noKeyError("rTreeFollow"):
|
||||
var key = ps.rootKey.to(RepairKey)
|
||||
while ps.rnDB.tab.hasKey(key) and 0 < result.tail.len:
|
||||
let node = ps.rnDB.tab[key]
|
||||
while ps.rpDB.tab.hasKey(key) and 0 < result.tail.len:
|
||||
let node = ps.rpDB.tab[key]
|
||||
case node.kind:
|
||||
of Leaf:
|
||||
if result.tail.len == result.tail.sharedPrefixLen(node.lPfx):
|
||||
@ -729,12 +915,12 @@ proc rTreeInterpolate(rPath: RPath; ps: AccountsDbSessionRef): RPath =
|
||||
return # sanitary check failed
|
||||
|
||||
# Case: unused slot => add leaf record
|
||||
if not ps.rnDB.tab.hasKey(key):
|
||||
if not ps.rpDB.tab.hasKey(key):
|
||||
return ps.rTreeExtendLeaf(rPath, key)
|
||||
|
||||
# So a `child` node exits but it is something that could not be used to
|
||||
# extend the argument `path` which is assumed the longest possible one.
|
||||
let child = ps.rnDB.tab[key]
|
||||
let child = ps.rpDB.tab[key]
|
||||
case child.kind:
|
||||
of Branch:
|
||||
# So a `Leaf` node can be linked into the `child` branch
|
||||
@ -762,8 +948,8 @@ proc rTreeInterpolate(rPath: RPath; ps: AccountsDbSessionRef): RPath =
|
||||
let key = step.node.eLink
|
||||
|
||||
var child: RNodeRef
|
||||
if ps.rnDB.tab.hasKey(key):
|
||||
child = ps.rnDB.tab[key]
|
||||
if ps.rpDB.tab.hasKey(key):
|
||||
child = ps.rpDB.tab[key]
|
||||
# `Extension` can only be followed by a `Branch` node
|
||||
if child.kind != Branch:
|
||||
return
|
||||
@ -772,7 +958,7 @@ proc rTreeInterpolate(rPath: RPath; ps: AccountsDbSessionRef): RPath =
|
||||
child = RNodeRef(
|
||||
state: Mutable,
|
||||
kind: Branch)
|
||||
ps.rnDB.tab[key] = child
|
||||
ps.rpDB.tab[key] = child
|
||||
|
||||
# So a `Leaf` node can be linked into the `child` branch
|
||||
return ps.rTreeExtendLeaf(rPath, key, child)
|
||||
@ -856,8 +1042,8 @@ proc rTreeUpdateKeys(rPath: RPath; ps: AccountsDbSessionRef): Result[void,int] =
|
||||
var lockOk = true
|
||||
for n in countDown(stack.len-1,0):
|
||||
let item = stack[n]
|
||||
ps.rnDB.tab.del(rPath.path[item.pos].key)
|
||||
ps.rnDB.tab[item.step.key] = item.step.node
|
||||
ps.rpDB.tab.del(rPath.path[item.pos].key)
|
||||
ps.rpDB.tab[item.step.key] = item.step.node
|
||||
if lockOk:
|
||||
if item.canLock:
|
||||
item.step.node.state = Locked
|
||||
@ -900,116 +1086,117 @@ proc rTreeUpdateKeys(rPath: RPath; ps: AccountsDbSessionRef): Result[void,int] =
|
||||
# Private walk along hexary trie records
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
when BasicChainTrieEnabled:
|
||||
proc hexaryFollow(
|
||||
ps: AccountsDbSessionRef;
|
||||
root: NodeKey;
|
||||
path: NibblesSeq
|
||||
): (int, bool, Blob)
|
||||
{.gcsafe, raises: [Defect,RlpError]} =
|
||||
## Returns the number of matching digits/nibbles from the argument `path`
|
||||
## found in the proofs trie.
|
||||
let
|
||||
nNibbles = path.len
|
||||
var
|
||||
inPath = path
|
||||
recKey = root.ByteArray32.toSeq
|
||||
leafBlob: Blob
|
||||
emptyRef = false
|
||||
proc hexaryFollow(
|
||||
ps: AccountsDbSessionRef;
|
||||
root: NodeKey;
|
||||
path: NibblesSeq;
|
||||
getFn: HexaryGetFn
|
||||
): (int, bool, Blob)
|
||||
{.gcsafe, raises: [Defect,RlpError]} =
|
||||
## Returns the number of matching digits/nibbles from the argument `path`
|
||||
## found in the proofs trie.
|
||||
let
|
||||
nNibbles = path.len
|
||||
var
|
||||
inPath = path
|
||||
recKey = root.ByteArray32.toSeq
|
||||
leafBlob: Blob
|
||||
emptyRef = false
|
||||
|
||||
when BasicChainTrieDebugging:
|
||||
trace "follow", rootKey=root.pp(ps), path
|
||||
when BasicChainTrieDebugging:
|
||||
trace "follow", rootKey=root.pp(ps), path
|
||||
|
||||
while true:
|
||||
let value = ps.base.db.get(recKey)
|
||||
if value.len == 0:
|
||||
while true:
|
||||
let value = getFn(recKey)
|
||||
if value.len == 0:
|
||||
break
|
||||
|
||||
var nodeRlp = rlpFromBytes value
|
||||
case nodeRlp.listLen:
|
||||
of 2:
|
||||
let
|
||||
(isLeaf, pathSegment) = hexPrefixDecode nodeRlp.listElem(0).toBytes
|
||||
sharedNibbles = inPath.sharedPrefixLen(pathSegment)
|
||||
fullPath = sharedNibbles == pathSegment.len
|
||||
inPathLen = inPath.len
|
||||
inPath = inPath.slice(sharedNibbles)
|
||||
|
||||
# Leaf node
|
||||
if isLeaf:
|
||||
let leafMode = sharedNibbles == inPathLen
|
||||
if fullPath and leafMode:
|
||||
leafBlob = nodeRlp.listElem(1).toBytes
|
||||
when BasicChainTrieDebugging:
|
||||
let nibblesLeft = inPathLen - sharedNibbles
|
||||
trace "follow leaf",
|
||||
fullPath, leafMode, sharedNibbles, nibblesLeft,
|
||||
pathSegment, newPath=inPath
|
||||
break
|
||||
|
||||
var nodeRlp = rlpFromBytes value
|
||||
case nodeRlp.listLen:
|
||||
of 2:
|
||||
let
|
||||
(isLeaf, pathSegment) = hexPrefixDecode nodeRlp.listElem(0).toBytes
|
||||
sharedNibbles = inPath.sharedPrefixLen(pathSegment)
|
||||
fullPath = sharedNibbles == pathSegment.len
|
||||
inPathLen = inPath.len
|
||||
inPath = inPath.slice(sharedNibbles)
|
||||
|
||||
# Leaf node
|
||||
if isLeaf:
|
||||
let leafMode = sharedNibbles == inPathLen
|
||||
if fullPath and leafMode:
|
||||
leafBlob = nodeRlp.listElem(1).toBytes
|
||||
when BasicChainTrieDebugging:
|
||||
let nibblesLeft = inPathLen - sharedNibbles
|
||||
trace "follow leaf",
|
||||
fullPath, leafMode, sharedNibbles, nibblesLeft,
|
||||
pathSegment, newPath=inPath
|
||||
break
|
||||
|
||||
# Extension node
|
||||
if fullPath:
|
||||
let branch = nodeRlp.listElem(1)
|
||||
if branch.isEmpty:
|
||||
when BasicChainTrieDebugging:
|
||||
trace "follow extension", newKey="n/a"
|
||||
emptyRef = true
|
||||
break
|
||||
recKey = branch.toBytes
|
||||
when BasicChainTrieDebugging:
|
||||
trace "follow extension",
|
||||
newKey=recKey.convertTo(NodeKey).pp(ps), newPath=inPath
|
||||
else:
|
||||
when BasicChainTrieDebugging:
|
||||
trace "follow extension",
|
||||
fullPath, sharedNibbles, pathSegment,
|
||||
inPathLen, newPath=inPath
|
||||
break
|
||||
|
||||
of 17:
|
||||
# Branch node
|
||||
if inPath.len == 0:
|
||||
leafBlob = nodeRlp.listElem(1).toBytes
|
||||
break
|
||||
let
|
||||
inx = inPath[0].int
|
||||
branch = nodeRlp.listElem(inx)
|
||||
# Extension node
|
||||
if fullPath:
|
||||
let branch = nodeRlp.listElem(1)
|
||||
if branch.isEmpty:
|
||||
when BasicChainTrieDebugging:
|
||||
trace "follow branch", newKey="n/a"
|
||||
trace "follow extension", newKey="n/a"
|
||||
emptyRef = true
|
||||
break
|
||||
inPath = inPath.slice(1)
|
||||
recKey = branch.toBytes
|
||||
when BasicChainTrieDebugging:
|
||||
trace "follow branch",
|
||||
newKey=recKey.convertTo(NodeKey).pp(ps), inx, newPath=inPath
|
||||
|
||||
trace "follow extension",
|
||||
newKey=recKey.convertTo(NodeKey).pp(ps), newPath=inPath
|
||||
else:
|
||||
when BasicChainTrieDebugging:
|
||||
trace "follow oops",
|
||||
nColumns = nodeRlp.listLen
|
||||
trace "follow extension",
|
||||
fullPath, sharedNibbles, pathSegment,
|
||||
inPathLen, newPath=inPath
|
||||
break
|
||||
|
||||
# end while
|
||||
of 17:
|
||||
# Branch node
|
||||
if inPath.len == 0:
|
||||
leafBlob = nodeRlp.listElem(1).toBytes
|
||||
break
|
||||
let
|
||||
inx = inPath[0].int
|
||||
branch = nodeRlp.listElem(inx)
|
||||
if branch.isEmpty:
|
||||
when BasicChainTrieDebugging:
|
||||
trace "follow branch", newKey="n/a"
|
||||
emptyRef = true
|
||||
break
|
||||
inPath = inPath.slice(1)
|
||||
recKey = branch.toBytes
|
||||
when BasicChainTrieDebugging:
|
||||
trace "follow branch",
|
||||
newKey=recKey.convertTo(NodeKey).pp(ps), inx, newPath=inPath
|
||||
|
||||
let pathLen = nNibbles - inPath.len
|
||||
else:
|
||||
when BasicChainTrieDebugging:
|
||||
trace "follow oops",
|
||||
nColumns = nodeRlp.listLen
|
||||
break
|
||||
|
||||
when BasicChainTrieDebugging:
|
||||
trace "follow done",
|
||||
recKey, emptyRef, pathLen, leafSize=leafBlob.len
|
||||
# end while
|
||||
|
||||
(pathLen, emptyRef, leafBlob)
|
||||
let pathLen = nNibbles - inPath.len
|
||||
|
||||
when BasicChainTrieDebugging:
|
||||
trace "follow done",
|
||||
recKey, emptyRef, pathLen, leafSize=leafBlob.len
|
||||
|
||||
(pathLen, emptyRef, leafBlob)
|
||||
|
||||
|
||||
proc hexaryFollow(
|
||||
ps: AccountsDbSessionRef;
|
||||
root: NodeKey;
|
||||
path: NodeKey
|
||||
): (int, bool, Blob)
|
||||
{.gcsafe, raises: [Defect,RlpError]} =
|
||||
## Variant of `hexaryFollow()`
|
||||
ps.hexaryFollow(root, path.to(NibblesSeq))
|
||||
proc hexaryFollow(
|
||||
ps: AccountsDbSessionRef;
|
||||
root: NodeKey;
|
||||
path: NodeKey;
|
||||
getFn: HexaryGetFn
|
||||
): (int, bool, Blob)
|
||||
{.gcsafe, raises: [Defect,RlpError]} =
|
||||
## Variant of `hexaryFollow()`
|
||||
ps.hexaryFollow(root, path.to(NibblesSeq), getFn)
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public constructor
|
||||
@ -1022,6 +1209,21 @@ proc init*(
|
||||
## Main object constructor
|
||||
T(db: db)
|
||||
|
||||
proc init*(
|
||||
T: type AccountsDbRef;
|
||||
db: ChainDb
|
||||
): T =
|
||||
## Variant of `init()`
|
||||
result = T(db: db.trieDB, rocky: db.rocksStoreRef)
|
||||
if not result.rocky.isNil:
|
||||
# A cache file might hang about from a previous crash
|
||||
try:
|
||||
discard result.rocky.clearCacheFile(RockyBulkCache)
|
||||
except OSError as e:
|
||||
result.rocky = nil
|
||||
error "Cannot clear rocksdb cache, bulkload disabled",
|
||||
exception=($e.name), msg=e.msg
|
||||
|
||||
proc init*(
|
||||
T: type AccountsDbSessionRef;
|
||||
pv: AccountsDbRef;
|
||||
@ -1049,7 +1251,7 @@ proc merge*(
|
||||
let rc = ps.hexaryImport(rlpRec)
|
||||
if rc.isErr:
|
||||
trace "merge(SnapAccountProof)", peer=ps.peer,
|
||||
proofs=ps.rnDB.tab.len, accounts=ps.rnDB.acc.len, error=rc.error
|
||||
proofs=ps.rpDB.tab.len, accounts=ps.rpDB.acc.len, error=rc.error
|
||||
return err(rc.error)
|
||||
|
||||
ok()
|
||||
@ -1067,66 +1269,72 @@ proc merge*(
|
||||
## the argument account data.
|
||||
##
|
||||
if acc.len != 0:
|
||||
#if ps.rnDB.acc.len == 0 or ps.rnDB.acc[^1].tag <= base:
|
||||
# return ps.mergeImpl(base, acc)
|
||||
|
||||
let
|
||||
prependOk = 0 < ps.rnDB.acc.len and base < ps.rnDB.acc[^1].tag
|
||||
saveLen = ps.rnDB.acc.len
|
||||
accTag0 = acc[0].accHash.to(NodeTag)
|
||||
pathTag0 = acc[0].accHash.to(NodeTag)
|
||||
pathTagTop = acc[^1].accHash.to(NodeTag)
|
||||
saveLen = ps.rpDB.acc.len
|
||||
|
||||
# For error logging
|
||||
(peer, proofs, accounts) = (ps.peer, ps.rnDB.tab.len, ps.rnDB.acc.len)
|
||||
(peer, proofs, accounts) = (ps.peer, ps.rpDB.tab.len, ps.rpDB.acc.len)
|
||||
|
||||
var
|
||||
error = NothingSerious
|
||||
saveQ: seq[RAccount]
|
||||
if prependOk:
|
||||
# Prepend `acc` argument before `ps.rnDB.acc`
|
||||
saveQ = ps.rnDB.acc
|
||||
prependOk = false
|
||||
if 0 < ps.rpDB.acc.len:
|
||||
if pathTagTop <= ps.rpDB.acc[0].pathTag:
|
||||
# Prepend `acc` argument before `ps.rpDB.acc`
|
||||
saveQ = ps.rpDB.acc
|
||||
prependOk = true
|
||||
|
||||
# Append, verify that there is no overlap
|
||||
elif pathTag0 <= ps.rpDB.acc[^1].pathTag:
|
||||
return err(AccountRangesOverlap)
|
||||
|
||||
block collectAccounts:
|
||||
# Verify lower bound
|
||||
if acc[0].accHash.to(NodeTag) < base:
|
||||
if pathTag0 < base:
|
||||
error = AccountSmallerThanBase
|
||||
trace "merge(seq[SnapAccount])", peer, proofs, base, accounts, error
|
||||
break collectAccounts
|
||||
|
||||
# Add base for the records (no payload). Note that the assumption
|
||||
# holds: `ps.rnDB.acc[^1].tag <= base`
|
||||
if base < accTag0:
|
||||
ps.rnDB.acc.add RAccount(tag: base)
|
||||
# holds: `ps.rpDB.acc[^1].tag <= base`
|
||||
if base < pathTag0:
|
||||
ps.rpDB.acc.add RAccount(pathTag: base)
|
||||
|
||||
# Check for the case that accounts are appended
|
||||
elif 0 < ps.rnDB.acc.len and accTag0 <= ps.rnDB.acc[^1].tag:
|
||||
elif 0 < ps.rpDB.acc.len and pathTag0 <= ps.rpDB.acc[^1].pathTag:
|
||||
error = AccountsNotSrictlyIncreasing
|
||||
trace "merge(seq[SnapAccount])", peer, proofs, base, accounts, error
|
||||
break collectAccounts
|
||||
|
||||
# Add first account
|
||||
ps.rnDB.acc.add RAccount(tag: accTag0, payload: acc[0].accBody.encode)
|
||||
ps.rpDB.acc.add RAccount(
|
||||
pathTag: pathTag0, payload: acc[0].accBody.encode)
|
||||
|
||||
# Veify & add other accounts
|
||||
for n in 1 ..< acc.len:
|
||||
let nodeTag = acc[n].accHash.to(NodeTag)
|
||||
|
||||
if nodeTag <= ps.rnDB.acc[^1].tag:
|
||||
if nodeTag <= ps.rpDB.acc[^1].pathTag:
|
||||
# Recover accounts list and return error
|
||||
ps.rnDB.acc.setLen(saveLen)
|
||||
ps.rpDB.acc.setLen(saveLen)
|
||||
|
||||
error = AccountsNotSrictlyIncreasing
|
||||
trace "merge(seq[SnapAccount])", peer, proofs, base, accounts, error
|
||||
break collectAccounts
|
||||
|
||||
ps.rnDB.acc.add RAccount(tag: nodeTag, payload: acc[n].accBody.encode)
|
||||
ps.rpDB.acc.add RAccount(
|
||||
pathTag: nodeTag, payload: acc[n].accBody.encode)
|
||||
|
||||
# End block `collectAccounts`
|
||||
|
||||
if prependOk:
|
||||
if error == NothingSerious:
|
||||
ps.rnDB.acc = ps.rnDB.acc & saveQ
|
||||
ps.rpDB.acc = ps.rpDB.acc & saveQ
|
||||
else:
|
||||
ps.rnDB.acc = saveQ
|
||||
ps.rpDB.acc = saveQ
|
||||
|
||||
if error != NothingSerious:
|
||||
return err(error)
|
||||
@ -1145,32 +1353,32 @@ proc interpolate*(ps: AccountsDbSessionRef): Result[void,AccountsDbError] =
|
||||
## database layer.
|
||||
##
|
||||
# Walk top down and insert/complete missing account access nodes
|
||||
for n in countDown(ps.rnDB.acc.len-1,0):
|
||||
let acc = ps.rnDB.acc[n]
|
||||
for n in countDown(ps.rpDB.acc.len-1,0):
|
||||
let acc = ps.rpDB.acc[n]
|
||||
if acc.payload.len != 0:
|
||||
let rPath = acc.tag.rTreeFollow(ps)
|
||||
var repairKey = acc.key
|
||||
let rPath = acc.pathTag.rTreeFollow(ps)
|
||||
var repairKey = acc.nodeKey
|
||||
if repairKey.isZero and 0 < rPath.path.len and rPath.tail.len == 0:
|
||||
repairKey = rPath.path[^1].key
|
||||
ps.rnDB.acc[n].key = repairKey
|
||||
ps.rpDB.acc[n].nodeKey = repairKey
|
||||
if repairKey.isZero:
|
||||
let
|
||||
update = rPath.rTreeInterpolate(ps, acc.payload)
|
||||
final = acc.tag.rTreeFollow(ps)
|
||||
final = acc.pathTag.rTreeFollow(ps)
|
||||
if update != final:
|
||||
return err(AccountRepairBlocked)
|
||||
ps.rnDB.acc[n].key = rPath.path[^1].key
|
||||
ps.rpDB.acc[n].nodeKey = rPath.path[^1].key
|
||||
|
||||
# Replace temporary repair keys by proper hash based node keys.
|
||||
var reVisit: seq[NodeTag]
|
||||
for n in countDown(ps.rnDB.acc.len-1,0):
|
||||
let acc = ps.rnDB.acc[n]
|
||||
if not acc.key.isZero:
|
||||
let rPath = acc.tag.rTreeFollow(ps)
|
||||
for n in countDown(ps.rpDB.acc.len-1,0):
|
||||
let acc = ps.rpDB.acc[n]
|
||||
if not acc.nodeKey.isZero:
|
||||
let rPath = acc.pathTag.rTreeFollow(ps)
|
||||
if rPath.path[^1].node.state == Mutable:
|
||||
let rc = rPath.rTreeUpdateKeys(ps)
|
||||
if rc.isErr:
|
||||
reVisit.add acc.tag
|
||||
reVisit.add acc.pathTag
|
||||
|
||||
while 0 < reVisit.len:
|
||||
var again: seq[NodeTag]
|
||||
@ -1184,25 +1392,83 @@ proc interpolate*(ps: AccountsDbSessionRef): Result[void,AccountsDbError] =
|
||||
|
||||
ok()
|
||||
|
||||
|
||||
proc dbImports*(ps: AccountsDbSessionRef): Result[void,AccountsDbError] =
|
||||
## Experimental: try several db-import modes and record statistics
|
||||
var als: AccountLoadStats
|
||||
noPpError("dbImports"):
|
||||
als.dura[0].elapsed:
|
||||
let rc = ps.storeAccountPathsOnChainDb()
|
||||
if rc.isErr: return rc
|
||||
als.dura[1].elapsed:
|
||||
let rc = ps.storeHexaryNodesOnChainDb()
|
||||
if rc.isErr: return rc
|
||||
als.dura[2].elapsed:
|
||||
let rc = ps.storeAccountPathsOnRockyDb()
|
||||
if rc.isErr: return rc
|
||||
als.dura[3].elapsed:
|
||||
let rc = ps.storeHexaryNodesOnRockyDb()
|
||||
if rc.isErr: return rc
|
||||
|
||||
for n in 0 ..< 4:
|
||||
ps.dStats.dura[n] += als.dura[n]
|
||||
ps.base.aStats.dura[n] += als.dura[n]
|
||||
|
||||
ps.dStats.size[0] += ps.rpDB.acc.len.uint64
|
||||
ps.base.aStats.size[0] += ps.rpDB.acc.len.uint64
|
||||
|
||||
ps.dStats.size[1] += ps.rpDB.tab.len.uint64
|
||||
ps.base.aStats.size[1] += ps.rpDB.tab.len.uint64
|
||||
|
||||
ok()
|
||||
|
||||
|
||||
proc sortMerge*(base: openArray[NodeTag]): NodeTag =
|
||||
## Helper for merging several `(NodeTag,seq[SnapAccount])` data sets
|
||||
## so that there are no overlap which would be rejected by `merge()`.
|
||||
##
|
||||
## This function selects a `NodeTag` from a list.
|
||||
result = high(NodeTag)
|
||||
for w in base:
|
||||
if w < result:
|
||||
result = w
|
||||
|
||||
proc sortMerge*(acc: openArray[seq[SnapAccount]]): seq[SnapAccount] =
|
||||
## Helper for merging several `(NodeTag,seq[SnapAccount])` data sets
|
||||
## so that there are no overlap which would be rejected by `merge()`.
|
||||
##
|
||||
## This function flattens and sorts the argument account lists.
|
||||
noPpError("sortMergeAccounts"):
|
||||
var accounts: Table[NodeTag,SnapAccount]
|
||||
for accList in acc:
|
||||
for item in accList:
|
||||
accounts[item.accHash.to(NodeTag)] = item
|
||||
result = toSeq(accounts.keys).sorted(cmp).mapIt(accounts[it])
|
||||
|
||||
proc nHexaryRecords*(ps: AccountsDbSessionRef): int =
|
||||
## Number of hexary record entries in the session database.
|
||||
ps.rnDB.tab.len
|
||||
ps.rpDB.tab.len
|
||||
|
||||
proc nAccountRecords*(ps: AccountsDbSessionRef): int =
|
||||
## Number of account records in the session database. This number includes
|
||||
## lower bound entries (which are not accoiunts, strictly speaking.)
|
||||
ps.rnDB.acc.len
|
||||
ps.rpDB.acc.len
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc dbBackendRocksDb*(pv: AccountsDbRef): bool =
|
||||
## Returns `true` if rocksdb features are available
|
||||
not pv.rocky.isNil
|
||||
|
||||
proc importAccounts*(
|
||||
pv: AccountsDbRef;
|
||||
peer: Peer, ## for log messages
|
||||
root: Hash256; ## state root
|
||||
base: NodeTag; ## before or at first account entry in `data`
|
||||
data: SnapAccountRange
|
||||
peer: Peer, ## for log messages
|
||||
root: Hash256; ## state root
|
||||
base: NodeTag; ## before or at first account entry in `data`
|
||||
data: SnapAccountRange; ## `snap/1 ` reply data
|
||||
storeData = false
|
||||
): Result[void,AccountsDbError] =
|
||||
## Validate and accounts and proofs (as received with the snap message
|
||||
## `AccountRange`). This function combines the functionality of the `merge()`
|
||||
@ -1226,16 +1492,19 @@ proc importAccounts*(
|
||||
return err(RlpEncoding)
|
||||
|
||||
block:
|
||||
## Note:
|
||||
## `interpolate()` is a temporary proof-of-concept function. For
|
||||
## production purposes, it must be replaced by the new facility of
|
||||
## the upcoming re-factored database layer.
|
||||
# Note:
|
||||
# `interpolate()` is a temporary proof-of-concept function. For
|
||||
# production purposes, it must be replaced by the new facility of
|
||||
# the upcoming re-factored database layer.
|
||||
let rc = ps.interpolate()
|
||||
if rc.isErr:
|
||||
return err(rc.error)
|
||||
|
||||
# TODO: bulk import
|
||||
# ...
|
||||
if storeData:
|
||||
# Experimental
|
||||
let rc = ps.dbImports()
|
||||
if rc.isErr:
|
||||
return err(rc.error)
|
||||
|
||||
trace "Accounts and proofs ok", peer, root=root.data.toHex,
|
||||
proof=data.proof.len, base, accounts=data.accounts.len
|
||||
@ -1245,18 +1514,72 @@ proc importAccounts*(
|
||||
# Debugging
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc getChainDbAccount*(
|
||||
ps: AccountsDbSessionRef;
|
||||
accHash: Hash256
|
||||
): Result[Account,AccountsDbError] =
|
||||
## Fetch account via `BaseChainDB`
|
||||
try:
|
||||
let
|
||||
getFn: HexaryGetFn = proc(key: Blob): Blob = ps.base.db.get(key)
|
||||
(_, _, leafBlob) = ps.hexaryFollow(ps.rootKey, accHash.to(NodeKey), getFn)
|
||||
if 0 < leafBlob.len:
|
||||
let acc = rlp.decode(leafBlob,Account)
|
||||
return ok(acc)
|
||||
except RlpError:
|
||||
return err(RlpEncoding)
|
||||
except Defect as e:
|
||||
raise e
|
||||
except Exception as e:
|
||||
raiseAssert "Ooops getChainDbAccount(): name=" & $e.name & " msg=" & e.msg
|
||||
|
||||
err(AccountNotFound)
|
||||
|
||||
proc getRockyAccount*(
|
||||
ps: AccountsDbSessionRef;
|
||||
accHash: Hash256
|
||||
): Result[Account,AccountsDbError] =
|
||||
## Fetch account via rocksdb table (paraellel to `BaseChainDB`)
|
||||
try:
|
||||
let
|
||||
getFn: HexaryGetFn = proc(key: Blob): Blob =
|
||||
var tag: NodeTag
|
||||
discard tag.init(key)
|
||||
ps.base.db.get(tag.rockyBulkHexaryKey().toOpenArray)
|
||||
(_, _, leafBlob) = ps.hexaryFollow(ps.rootKey, accHash.to(NodeKey), getFn)
|
||||
if 0 < leafBlob.len:
|
||||
let acc = rlp.decode(leafBlob,Account)
|
||||
return ok(acc)
|
||||
except RlpError:
|
||||
return err(RlpEncoding)
|
||||
except Defect as e:
|
||||
raise e
|
||||
except Exception as e:
|
||||
raiseAssert "Ooops getChainDbAccount(): name=" & $e.name & " msg=" & e.msg
|
||||
|
||||
err(AccountNotFound)
|
||||
|
||||
|
||||
proc dbImportStats*(ps: AccountsDbSessionRef): AccountLoadStats =
|
||||
## Session data load statistics
|
||||
ps.dStats
|
||||
|
||||
proc dbImportStats*(pv: AccountsDbRef): AccountLoadStats =
|
||||
## Accumulated data load statistics
|
||||
pv.aStats
|
||||
|
||||
proc assignPrettyKeys*(ps: AccountsDbSessionRef) =
|
||||
## Prepare foe pretty pringing/debugging. Run early enough this function
|
||||
## sets the root key to `"$"`, for instance.
|
||||
noPpError("validate(1)"):
|
||||
# Make keys assigned in pretty order for printing
|
||||
var keysList = toSeq(ps.rnDB.tab.keys)
|
||||
var keysList = toSeq(ps.rpDB.tab.keys)
|
||||
let rootKey = ps.rootKey.to(RepairKey)
|
||||
discard rootKey.toKey(ps)
|
||||
if ps.rnDB.tab.hasKey(rootKey):
|
||||
if ps.rpDB.tab.hasKey(rootKey):
|
||||
keysList = @[rootKey] & keysList
|
||||
for key in keysList:
|
||||
let node = ps.rnDB.tab[key]
|
||||
let node = ps.rpDB.tab[key]
|
||||
discard key.toKey(ps)
|
||||
case node.kind:
|
||||
of Branch: (for w in node.bLink: discard w.toKey(ps))
|
||||
@ -1272,7 +1595,7 @@ proc dumpPath*(ps: AccountsDbSessionRef; key: NodeTag): seq[string] =
|
||||
proc dumpProofsDB*(ps: AccountsDbSessionRef): seq[string] =
|
||||
## Dump the entries from the repair tree.
|
||||
var accu = @[(0u, "($0" & "," & ps.rootKey.pp(ps) & ")")]
|
||||
for key,node in ps.rnDB.tab.pairs:
|
||||
for key,node in ps.rpDB.tab.pairs:
|
||||
accu.add (key.toKey(ps), "(" & key.pp(ps) & "," & node.pp(ps) & ")")
|
||||
proc cmpIt(x, y: (uint,string)): int =
|
||||
cmp(x[0],y[0])
|
||||
|
@ -146,7 +146,8 @@ proc fetchAccounts*(buddy: SnapBuddyRef): Future[bool] {.async.} =
|
||||
# Process accounts data
|
||||
let
|
||||
nAccounts = dd.data.accounts.len
|
||||
rc = ctx.data.accountsDb.importAccounts(peer, stateRoot, iv.minPt, dd.data)
|
||||
rc = ctx.data.accountsDb.importAccounts(
|
||||
peer, stateRoot, iv.minPt, dd.data, storeData = true)
|
||||
if rc.isErr:
|
||||
# TODO: Prevent deadlock in case there is a problem with the approval
|
||||
# data which is not in production state, yet.
|
||||
|
198
nimbus/sync/snap/worker/rocky_bulk_load.nim
Normal file
198
nimbus/sync/snap/worker/rocky_bulk_load.nim
Normal file
@ -0,0 +1,198 @@
|
||||
# Nimbus - Types, data structures and shared utilities used in network sync
|
||||
#
|
||||
# Copyright (c) 2018-2021 Status Research & Development GmbH
|
||||
# Licensed under either of
|
||||
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
||||
# http://www.apache.org/licenses/LICENSE-2.0)
|
||||
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
||||
# http://opensource.org/licenses/MIT)
|
||||
# at your option. This file may not be copied, modified, or
|
||||
# distributed except according to those terms.
|
||||
|
||||
## Bulk import loader for rocksdb
|
||||
|
||||
import
|
||||
std/os, # std/[sequtils, strutils],
|
||||
eth/common/eth_types,
|
||||
rocksdb,
|
||||
../../../db/[kvstore_rocksdb, select_backend]
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
type
|
||||
RockyBulkLoadRef* = ref object of RootObj
|
||||
when select_backend.dbBackend == select_backend.rocksdb:
|
||||
db: RocksStoreRef
|
||||
envOption: rocksdb_envoptions_t
|
||||
importOption: rocksdb_ingestexternalfileoptions_t
|
||||
writer: rocksdb_sstfilewriter_t
|
||||
filePath: string
|
||||
csError: string
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public constructor
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc init*(
|
||||
T: type RockyBulkLoadRef;
|
||||
db: RocksStoreRef;
|
||||
envOption: rocksdb_envoptions_t
|
||||
): T =
|
||||
## Create a new bulk load descriptor.
|
||||
when select_backend.dbBackend == select_backend.rocksdb:
|
||||
result = T(
|
||||
db: db,
|
||||
envOption: envOption,
|
||||
importOption: rocksdb_ingestexternalfileoptions_create())
|
||||
|
||||
doAssert not result.importOption.isNil
|
||||
doAssert not envOption.isNil
|
||||
else:
|
||||
T(csError: "rocksdb is unsupported")
|
||||
|
||||
proc init*(T: type RockyBulkLoadRef; db: RocksStoreRef): T =
|
||||
## Variant of `init()`
|
||||
RockyBulkLoadRef.init(db, rocksdb_envoptions_create())
|
||||
|
||||
proc clearCacheFile*(db: RocksStoreRef; fileName: string): bool
|
||||
{.gcsafe, raises: [Defect,OSError].} =
|
||||
## Remove left-over cache file from an imcomplete previous session. The
|
||||
## return value `true` indicated that a cache file was detected.
|
||||
discard
|
||||
when select_backend.dbBackend == select_backend.rocksdb:
|
||||
let filePath = db.tmpDir / fileName
|
||||
if filePath.fileExists:
|
||||
filePath.removeFile
|
||||
return true
|
||||
|
||||
proc destroy*(rbl: RockyBulkLoadRef) {.gcsafe, raises: [Defect,OSError].} =
|
||||
## Destructor, free memory resources and delete temporary file. This function
|
||||
## can always be called even though `finish()` will call `destroy()`
|
||||
## automatically if successful.
|
||||
##
|
||||
## Note that after calling `destroy()`, the `RockyBulkLoadRef` descriptor is
|
||||
## reset and must not be used anymore with any function (different from
|
||||
## `destroy()`.)
|
||||
##
|
||||
discard
|
||||
when select_backend.dbBackend == select_backend.rocksdb:
|
||||
if not rbl.writer.isNil:
|
||||
rbl.writer.rocksdb_sstfilewriter_destroy()
|
||||
if not rbl.envOption.isNil:
|
||||
rbl.envOption.rocksdb_envoptions_destroy()
|
||||
if not rbl.importOption.isNil:
|
||||
rbl.importOption.rocksdb_ingestexternalfileoptions_destroy()
|
||||
if 0 < rbl.filePath.len:
|
||||
rbl.filePath.removeFile
|
||||
rbl[].reset
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions, getters
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc lastError*(rbl: RockyBulkLoadRef): string =
|
||||
## Get last error explainer
|
||||
rbl.csError
|
||||
|
||||
proc store*(rbl: RockyBulkLoadRef): RocksDBInstance =
|
||||
## Provide the diecriptor for backend functions as defined in `rocksdb`.
|
||||
discard
|
||||
when select_backend.dbBackend == select_backend.rocksdb:
|
||||
rbl.db.store
|
||||
|
||||
proc rocksStoreRef*(db: ChainDb): RocksStoreRef =
|
||||
## Pull out underlying rocksdb backend descripto (if any)
|
||||
# Current architecture allows only one globally defined persistent type
|
||||
discard
|
||||
when select_backend.dbBackend == select_backend.rocksdb:
|
||||
if not db.isNil:
|
||||
return db.rdb
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Public functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc begin*(rbl: RockyBulkLoadRef; fileName: string): bool =
|
||||
## Begin a new bulk load session storing data into a temporary cache file
|
||||
## `fileName`. When finished, this file will bi direcly imported into the
|
||||
## database.
|
||||
discard
|
||||
when select_backend.dbBackend == select_backend.rocksdb:
|
||||
rbl.writer = rocksdb_sstfilewriter_create(
|
||||
rbl.envOption, rbl.db.store.options)
|
||||
if rbl.writer.isNil:
|
||||
rbl.csError = "Cannot create sst writer session"
|
||||
return false
|
||||
|
||||
rbl.csError = ""
|
||||
let filePath = rbl.db.tmpDir / fileName
|
||||
var csError: cstring
|
||||
rbl.writer.rocksdb_sstfilewriter_open(fileName, addr csError)
|
||||
if not csError.isNil:
|
||||
rbl.csError = $csError
|
||||
return false
|
||||
|
||||
rbl.filePath = filePath
|
||||
return true
|
||||
|
||||
proc add*(
|
||||
rbl: RockyBulkLoadRef;
|
||||
key: openArray[byte];
|
||||
val: openArray[byte]
|
||||
): bool =
|
||||
## Append a record to the SST file. Note that consecutive records must be
|
||||
## strictly increasing.
|
||||
##
|
||||
## This function is a wrapper around `rocksdb_sstfilewriter_add()` or
|
||||
## `rocksdb_sstfilewriter_put()` (stragely enough, there are two functions
|
||||
## with exactly the same impementation code.)
|
||||
discard
|
||||
when select_backend.dbBackend == select_backend.rocksdb:
|
||||
var csError: cstring
|
||||
rbl.writer.rocksdb_sstfilewriter_add(
|
||||
cast[cstring](unsafeAddr key[0]), csize_t(key.len),
|
||||
cast[cstring](unsafeAddr val[0]), csize_t(val.len),
|
||||
addr csError)
|
||||
if csError.isNil:
|
||||
return true
|
||||
rbl.csError = $csError
|
||||
|
||||
proc finish*(
|
||||
rbl: RockyBulkLoadRef
|
||||
): Result[int64,void]
|
||||
{.gcsafe, raises: [Defect,OSError].} =
|
||||
## Commit collected and cached data to the database. This function implies
|
||||
## `destroy()` if successful. Otherwise `destroy()` must be called
|
||||
## explicitely, e.g. after error analysis.
|
||||
##
|
||||
## If successful, the return value is the size of the SST file used if
|
||||
## that value is available. Otherwise, `0` is returned.
|
||||
when select_backend.dbBackend == select_backend.rocksdb:
|
||||
var csError: cstring
|
||||
rbl.writer.rocksdb_sstfilewriter_finish(addr csError)
|
||||
|
||||
if csError.isNil:
|
||||
rbl.db.store.db.rocksdb_ingest_external_file(
|
||||
[rbl.filePath].allocCStringArray, 1,
|
||||
rbl.importOption,
|
||||
addr csError)
|
||||
|
||||
if csError.isNil:
|
||||
var size: int64
|
||||
try:
|
||||
var f: File
|
||||
if f.open(rbl.filePath):
|
||||
size = f.getFileSize
|
||||
f.close
|
||||
except:
|
||||
discard
|
||||
rbl.destroy()
|
||||
return ok(size)
|
||||
|
||||
rbl.csError = $csError
|
||||
|
||||
err()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
# ------------------------------------------------------------------------------
|
@ -10,13 +10,14 @@
|
||||
# except according to those terms.
|
||||
|
||||
import
|
||||
std/[strformat, strutils],
|
||||
std/[sequtils, strformat, strutils, times],
|
||||
chronos,
|
||||
chronicles,
|
||||
eth/[common/eth_types, p2p],
|
||||
stint,
|
||||
../../../utils/prettify,
|
||||
../../timer_helper
|
||||
../../timer_helper,
|
||||
./accounts_db
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
@ -30,6 +31,7 @@ type
|
||||
fillFactor*: (float,float) ## mean and standard deviation
|
||||
activeQueues*: int
|
||||
flushedQueues*: int64
|
||||
bulkStore*: AccountLoadStats
|
||||
|
||||
TickerStatsUpdater* =
|
||||
proc: TickerStats {.gcsafe, raises: [Defect].}
|
||||
@ -44,10 +46,53 @@ type
|
||||
tick: uint64 # more than 5*10^11y before wrap when ticking every sec
|
||||
|
||||
const
|
||||
tickerStartDelay = 100.milliseconds
|
||||
tickerLogInterval = 1.seconds
|
||||
tickerStartDelay = chronos.milliseconds(100)
|
||||
tickerLogInterval = chronos.seconds(1)
|
||||
tickerLogSuppressMax = 100
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions: pretty printing
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc ppMs*(elapsed: times.Duration): string
|
||||
{.gcsafe, raises: [Defect, ValueError]} =
|
||||
result = $elapsed.inMilliseconds
|
||||
let ns = elapsed.inNanoseconds mod 1_000_000 # fraction of a milli second
|
||||
if ns != 0:
|
||||
# to rounded deca milli seconds
|
||||
let dm = (ns + 5_000i64) div 10_000i64
|
||||
result &= &".{dm:02}"
|
||||
result &= "ms"
|
||||
|
||||
proc ppSecs*(elapsed: times.Duration): string
|
||||
{.gcsafe, raises: [Defect, ValueError]} =
|
||||
result = $elapsed.inSeconds
|
||||
let ns = elapsed.inNanoseconds mod 1_000_000_000 # fraction of a second
|
||||
if ns != 0:
|
||||
# round up
|
||||
let ds = (ns + 5_000_000i64) div 10_000_000i64
|
||||
result &= &".{ds:02}"
|
||||
result &= "s"
|
||||
|
||||
proc ppMins*(elapsed: times.Duration): string
|
||||
{.gcsafe, raises: [Defect, ValueError]} =
|
||||
result = $elapsed.inMinutes
|
||||
let ns = elapsed.inNanoseconds mod 60_000_000_000 # fraction of a minute
|
||||
if ns != 0:
|
||||
# round up
|
||||
let dm = (ns + 500_000_000i64) div 1_000_000_000i64
|
||||
result &= &":{dm:02}"
|
||||
result &= "m"
|
||||
|
||||
proc pp(d: times.Duration): string
|
||||
{.gcsafe, raises: [Defect, ValueError]} =
|
||||
if 40 < d.inSeconds:
|
||||
d.ppMins
|
||||
elif 200 < d.inMilliseconds:
|
||||
d.ppSecs
|
||||
else:
|
||||
d.ppMs
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Private functions: ticking log messages
|
||||
# ------------------------------------------------------------------------------
|
||||
@ -71,6 +116,7 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} =
|
||||
avAccounts = ""
|
||||
avUtilisation = ""
|
||||
pivot = "n/a"
|
||||
bulker = ""
|
||||
let
|
||||
flushed = data.flushedQueues
|
||||
|
||||
@ -85,9 +131,12 @@ proc runLogTicker(t: TickerRef) {.gcsafe.} =
|
||||
&"{(data.accounts[0]+0.5).int64}({(data.accounts[1]+0.5).int64})"
|
||||
avUtilisation =
|
||||
&"{data.fillFactor[0]*100.0:.2f}%({data.fillFactor[1]*100.0:.2f}%)"
|
||||
bulker =
|
||||
"[" & data.bulkStore.size.toSeq.mapIt(it.toSI).join(",") & "," &
|
||||
data.bulkStore.dura.toSeq.mapIt(it.pp).join(",") & "]"
|
||||
|
||||
info "Snap sync statistics",
|
||||
tick, buddies, pivot, avAccounts, avUtilisation, flushed, mem
|
||||
tick, buddies, pivot, avAccounts, avUtilisation, flushed, bulker, mem
|
||||
|
||||
t.tick.inc
|
||||
t.setLogTicker(Moment.fromNow(tickerLogInterval))
|
||||
|
@ -14,6 +14,7 @@ import
|
||||
eth/[common/eth_types, p2p],
|
||||
nimcrypto,
|
||||
stew/[byteutils, keyed_queue],
|
||||
../../db/select_backend,
|
||||
../../constants,
|
||||
".."/[sync_desc, types],
|
||||
./worker/[accounts_db, ticker],
|
||||
@ -112,6 +113,7 @@ type
|
||||
## Globally shared data extension
|
||||
seenBlock: WorkerSeenBlocks ## Temporary, debugging, pretty logs
|
||||
rng*: ref HmacDrbgContext ## Random generator
|
||||
dbBackend*: ChainDB ## Low level DB driver access (if any)
|
||||
ticker*: TickerRef ## Ticker, logger
|
||||
pivotTable*: SnapPivotTable ## Per state root environment
|
||||
pivotCount*: uint64 ## Total of all created tab entries
|
||||
|
@ -89,7 +89,7 @@ proc `==`*[T: SomeDistinctHash256](a,b: T): bool =
|
||||
a.Hash256 == b.Hash256
|
||||
|
||||
proc hash*(root: SomeDistinctHash256): Hash =
|
||||
## Mixin for `Table` or `keyedQueue`
|
||||
## Mixin for `Table` or `KeyedQueue`
|
||||
root.Hash256.data.hash
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
|
@ -36,9 +36,18 @@ proc reGroup(q: openArray[int]; itemsPerSegment = 16): seq[seq[int]] =
|
||||
# Public functions, units pretty printer
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc ppUs*(elapsed: Duration): string =
|
||||
result = $elapsed.inMicroseconds
|
||||
let ns = elapsed.inNanoseconds mod 1_000 # fraction of a micro second
|
||||
if ns != 0:
|
||||
# to rounded deca milli seconds
|
||||
let du = (ns + 5i64) div 10i64
|
||||
result &= &".{du:02}"
|
||||
result &= "us"
|
||||
|
||||
proc ppMs*(elapsed: Duration): string =
|
||||
result = $elapsed.inMilliseconds
|
||||
let ns = elapsed.inNanoseconds mod 1_000_000
|
||||
let ns = elapsed.inNanoseconds mod 1_000_000 # fraction of a milli second
|
||||
if ns != 0:
|
||||
# to rounded deca milli seconds
|
||||
let dm = (ns + 5_000i64) div 10_000i64
|
||||
@ -47,13 +56,22 @@ proc ppMs*(elapsed: Duration): string =
|
||||
|
||||
proc ppSecs*(elapsed: Duration): string =
|
||||
result = $elapsed.inSeconds
|
||||
let ns = elapsed.inNanoseconds mod 1_000_000_000
|
||||
let ns = elapsed.inNanoseconds mod 1_000_000_000 # fraction of a second
|
||||
if ns != 0:
|
||||
# to rounded decs seconds
|
||||
# round up
|
||||
let ds = (ns + 5_000_000i64) div 10_000_000i64
|
||||
result &= &".{ds:02}"
|
||||
result &= "s"
|
||||
|
||||
proc ppMins*(elapsed: Duration): string =
|
||||
result = $elapsed.inMinutes
|
||||
let ns = elapsed.inNanoseconds mod 60_000_000_000 # fraction of a minute
|
||||
if ns != 0:
|
||||
# round up
|
||||
let dm = (ns + 500_000_000i64) div 1_000_000_000i64
|
||||
result &= &":{dm:02}"
|
||||
result &= "m"
|
||||
|
||||
proc toKMG*[T](s: T): string =
|
||||
proc subst(s: var string; tag, new: string): bool =
|
||||
if tag.len < s.len and s[s.len - tag.len ..< s.len] == tag:
|
||||
@ -123,7 +141,8 @@ proc pp*(q: openArray[byte]; noHash = false): string =
|
||||
template showElapsed*(noisy: bool; info: string; code: untyped) =
|
||||
block:
|
||||
let start = getTime()
|
||||
code
|
||||
block:
|
||||
code
|
||||
if noisy:
|
||||
let elpd {.inject.} = getTime() - start
|
||||
if 0 < times.inSeconds(elpd):
|
||||
@ -131,6 +150,25 @@ template showElapsed*(noisy: bool; info: string; code: untyped) =
|
||||
else:
|
||||
echo "*** ", info, &": {elpd.ppMs:>4}"
|
||||
|
||||
template showElapsed*(
|
||||
noisy: bool;
|
||||
info: string;
|
||||
elapsed: Duration;
|
||||
code: untyped) =
|
||||
block:
|
||||
let start = getTime()
|
||||
block:
|
||||
code
|
||||
block:
|
||||
let now = getTime()
|
||||
elapsed = now - start
|
||||
if noisy:
|
||||
let elpd {.inject.} = elapsed
|
||||
if 0 < times.inSeconds(elpd):
|
||||
echo "*** ", info, &": {elpd.ppSecs:>4}"
|
||||
else:
|
||||
echo "*** ", info, &": {elpd.ppMs:>4}"
|
||||
|
||||
template catchException*(info: string; trace: bool; code: untyped) =
|
||||
block:
|
||||
try:
|
||||
|
@ -12,41 +12,59 @@
|
||||
## Snap sync components tester
|
||||
|
||||
import
|
||||
std/[distros, os, sequtils, strformat, strutils],
|
||||
std/[algorithm, distros, hashes, math, os,
|
||||
sequtils, strformat, strutils, tables, times],
|
||||
chronicles,
|
||||
eth/[common/eth_types, p2p, rlp, trie/db],
|
||||
rocksdb,
|
||||
stint,
|
||||
stew/results,
|
||||
stew/[byteutils, results],
|
||||
unittest2,
|
||||
../nimbus/db/select_backend,
|
||||
../nimbus/[chain_config, config, genesis],
|
||||
../nimbus/db/[db_chain, select_backend, storage_types],
|
||||
../nimbus/p2p/chain,
|
||||
../nimbus/sync/[types, protocol],
|
||||
../nimbus/sync/snap/range_desc,
|
||||
../nimbus/sync/snap/worker/accounts_db,
|
||||
./replay/pp,
|
||||
#./test_sync_snap/sample1,
|
||||
../nimbus/sync/snap/worker/[accounts_db, rocky_bulk_load],
|
||||
../nimbus/utils/prettify,
|
||||
./replay/[pp, undump],
|
||||
./test_sync_snap/sample0
|
||||
|
||||
const
|
||||
baseDir = [".", "..", ".."/"..", $DirSep]
|
||||
repoDir = ["tests"/"replay", "tests"/"test_sync_snap"]
|
||||
repoDir = [".", "tests"/"replay", "tests"/"test_sync_snap",
|
||||
"nimbus-eth1-blobs"/"replay"]
|
||||
|
||||
nTestDbInstances = 9
|
||||
|
||||
type
|
||||
TestSample = tuple ## sample format from `accounts_and_proofs`
|
||||
CaptureSpecs = tuple
|
||||
name: string ## sample name, also used as sub-directory for db separation
|
||||
network: NetworkId
|
||||
file: string ## name of capture file
|
||||
numBlocks: int ## Number of blocks to load
|
||||
|
||||
AccountsProofSample = object
|
||||
name: string ## sample name, also used as sub-directory for db separation
|
||||
root: Hash256
|
||||
data: seq[TestSample]
|
||||
|
||||
TestSample = tuple
|
||||
## Data layout provided by the data dump `sample0.nim`
|
||||
base: Hash256
|
||||
accounts: seq[(Hash256,uint64,UInt256,Hash256,Hash256)]
|
||||
proofs: seq[Blob]
|
||||
|
||||
TestItem = object ## palatable input format for tests
|
||||
TestItem = object
|
||||
## Palatable input format for test function
|
||||
base: NodeTag
|
||||
data: SnapAccountRange
|
||||
|
||||
TestDbInstances =
|
||||
array[3,TrieDatabaseRef]
|
||||
|
||||
TestDbs = object
|
||||
## Provide enough spare empty databases
|
||||
persistent: bool
|
||||
dbDir: string
|
||||
inst: TestDbInstances
|
||||
cdb: array[nTestDbInstances,ChainDb]
|
||||
|
||||
when defined(linux):
|
||||
# The `detectOs(Ubuntu)` directive is not Windows compatible, causes an
|
||||
@ -55,6 +73,18 @@ when defined(linux):
|
||||
else:
|
||||
const isUbuntu32bit = false
|
||||
|
||||
const
|
||||
goerliCapture: CaptureSpecs = (
|
||||
name: "goerli",
|
||||
network: GoerliNet,
|
||||
file: "goerli68161.txt.gz",
|
||||
numBlocks: 1_000)
|
||||
|
||||
accSample0 = AccountsProofSample(
|
||||
name: "sample0",
|
||||
root: sample0.snapRoot,
|
||||
data: sample0.snapProofData)
|
||||
|
||||
let
|
||||
# Forces `check()` to print the error (as opposed when using `isOk()`)
|
||||
OkAccDb = Result[void,AccountsDbError].ok()
|
||||
@ -64,11 +94,23 @@ let
|
||||
# was present, see `test_custom_network` for more details.
|
||||
disablePersistentDB = isUbuntu32bit
|
||||
|
||||
var
|
||||
xTmpDir: string
|
||||
xDbs: TestDbs # for repeated storage/overwrite tests
|
||||
xTab32: Table[ByteArray32,Blob] # extracted data
|
||||
xTab33: Table[ByteArray33,Blob]
|
||||
xVal32Sum, xVal32SqSum: float # statistics
|
||||
xVal33Sum, xVal33SqSum: float
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc findFilePath(file: string): Result[string,void] =
|
||||
proc isOk(rc: ValidationResult): bool =
|
||||
rc == ValidationResult.OK
|
||||
|
||||
proc findFilePath(file: string;
|
||||
baseDir, repoDir: openArray[string]): Result[string,void] =
|
||||
for dir in baseDir:
|
||||
for repo in repoDir:
|
||||
let path = dir / repo / file
|
||||
@ -76,14 +118,37 @@ proc findFilePath(file: string): Result[string,void] =
|
||||
return ok(path)
|
||||
err()
|
||||
|
||||
proc pp(w: Hash256): string =
|
||||
pp.pp(w) # `pp()` also available from `worker_desc`
|
||||
proc getTmpDir(sampleDir = "sample0.nim"): string =
|
||||
sampleDir.findFilePath(baseDir,repoDir).value.splitFile.dir
|
||||
|
||||
proc pp(w: NodeTag; collapse = true): string =
|
||||
pp.pp(w.to(Hash256),collapse)
|
||||
proc pp(d: Duration): string =
|
||||
if 40 < d.inSeconds:
|
||||
d.ppMins
|
||||
elif 200 < d.inMilliseconds:
|
||||
d.ppSecs
|
||||
elif 200 < d.inMicroseconds:
|
||||
d.ppMs
|
||||
else:
|
||||
d.ppUs
|
||||
|
||||
proc pp(w: seq[(string,string)]; indent = 4): string =
|
||||
w.mapIt(&"({it[0]},{it[1]})").join("\n" & " ".repeat(indent))
|
||||
proc pp(d: AccountLoadStats): string =
|
||||
"[" & d.size.toSeq.mapIt(it.toSI).join(",") & "," &
|
||||
d.dura.toSeq.mapIt(it.pp).join(",") & "]"
|
||||
|
||||
proc pp(rc: Result[Account,AccountsDbError]): string =
|
||||
if rc.isErr: $rc.error else: rc.value.pp
|
||||
|
||||
proc ppKvPc(w: openArray[(string,int)]): string =
|
||||
w.mapIt(&"{it[0]}={it[1]}%").join(", ")
|
||||
|
||||
proc say*(noisy = false; pfx = "***"; args: varargs[string, `$`]) =
|
||||
if noisy:
|
||||
if args.len == 0:
|
||||
echo "*** ", pfx
|
||||
elif 0 < pfx.len and pfx[^1] != ' ':
|
||||
echo pfx, " ", args.toSeq.join
|
||||
else:
|
||||
echo pfx, args.toSeq.join
|
||||
|
||||
proc setTraceLevel =
|
||||
discard
|
||||
@ -115,109 +180,640 @@ proc to(data: seq[TestSample]; T: type seq[TestItem]): T =
|
||||
storageRoot: it[3],
|
||||
codeHash: it[4])))))
|
||||
|
||||
#proc permute(r: var Rand; qLen: int): seq[int] =
|
||||
# result = (0 ..< qLen).toSeq
|
||||
# let
|
||||
# halfLen = result.len shr 1
|
||||
# randMax = result.len - halfLen - 1
|
||||
# for left in 0 ..< halfLen:
|
||||
# let right = halfLen + r.rand(randMax)
|
||||
# result[left].swap(result[right])
|
||||
proc to(b: openArray[byte]; T: type ByteArray32): T =
|
||||
## Convert to other representation (or exception)
|
||||
if b.len == 32:
|
||||
(addr result[0]).copyMem(unsafeAddr b[0], 32)
|
||||
else:
|
||||
doAssert b.len == 32
|
||||
|
||||
proc flushDbDir(s: string) =
|
||||
proc to(b: openArray[byte]; T: type ByteArray33): T =
|
||||
## Convert to other representation (or exception)
|
||||
if b.len == 33:
|
||||
(addr result[0]).copyMem(unsafeAddr b[0], 33)
|
||||
else:
|
||||
doAssert b.len == 33
|
||||
|
||||
proc to(b: ByteArray32|ByteArray33; T: type Blob): T =
|
||||
b.toSeq
|
||||
|
||||
proc to(b: openArray[byte]; T: type NodeTag): T =
|
||||
## Convert from serialised equivalent
|
||||
UInt256.fromBytesBE(b).T
|
||||
|
||||
proc to(w: (byte, NodeTag); T: type Blob): T =
|
||||
let (b,t) = w
|
||||
@[b] & toSeq(t.UInt256.toBytesBE)
|
||||
|
||||
proc to(t: NodeTag; T: type Blob): T =
|
||||
toSeq(t.UInt256.toBytesBE)
|
||||
|
||||
proc flushDbDir(s: string; subDir = "") =
|
||||
if s != "":
|
||||
let baseDir = s / "tmp"
|
||||
for n in 0 ..< TestDbInstances.len:
|
||||
let instDir = baseDir / $n
|
||||
for n in 0 ..< nTestDbInstances:
|
||||
let instDir = if subDir == "": baseDir / $n else: baseDir / subDir / $n
|
||||
if (instDir / "nimbus" / "data").dirExists:
|
||||
# Typically under Windows: there might be stale file locks.
|
||||
try: instDir.removeDir except: discard
|
||||
try: (baseDir / subDir).removeDir except: discard
|
||||
block dontClearUnlessEmpty:
|
||||
for w in baseDir.walkDir:
|
||||
break dontClearUnlessEmpty
|
||||
try: baseDir.removeDir except: discard
|
||||
|
||||
proc testDbs(workDir = ""): TestDbs =
|
||||
proc testDbs(workDir = ""; subDir = ""): TestDbs =
|
||||
if disablePersistentDB or workDir == "":
|
||||
result.persistent = false
|
||||
result.dbDir = "*notused*"
|
||||
else:
|
||||
result.persistent = true
|
||||
result.dbDir = workDir / "tmp"
|
||||
if subDir != "":
|
||||
result.dbDir = workDir / "tmp" / subDir
|
||||
else:
|
||||
result.dbDir = workDir / "tmp"
|
||||
if result.persistent:
|
||||
result.dbDir.flushDbDir
|
||||
for n in 0 ..< result.inst.len:
|
||||
if not result.persistent:
|
||||
result.inst[n] = newMemoryDB()
|
||||
else:
|
||||
result.inst[n] = (result.dbDir / $n).newChainDB.trieDB
|
||||
for n in 0 ..< result.cdb.len:
|
||||
result.cdb[n] = (result.dbDir / $n).newChainDB
|
||||
|
||||
proc lastTwo(a: openArray[string]): seq[string] =
|
||||
if 1 < a.len: @[a[^2],a[^1]] else: a.toSeq
|
||||
|
||||
proc thisRecord(r: rocksdb_iterator_t): (Blob,Blob) =
|
||||
var kLen, vLen: csize_t
|
||||
let
|
||||
kData = r.rocksdb_iter_key(addr kLen)
|
||||
vData = r.rocksdb_iter_value(addr vLen)
|
||||
if not kData.isNil and not vData.isNil:
|
||||
let
|
||||
key = string.fromBytes(toOpenArrayByte(kData,0,int(kLen)-1))
|
||||
value = string.fromBytes(toOpenArrayByte(vData,0,int(vLen)-1))
|
||||
return (key.mapIt(it.byte),value.mapIt(it.byte))
|
||||
|
||||
proc meanStdDev(sum, sqSum: float; length: int): (float,float) =
|
||||
if 0 < length:
|
||||
result[0] = sum / length.float
|
||||
result[1] = sqrt(sqSum / length.float - result[0] * result[0])
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Test Runners
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc accountsRunner(
|
||||
noisy = true; persistent: bool; root: Hash256; data: seq[TestSample]) =
|
||||
proc accountsRunner(noisy = true; persistent = true; sample = accSample0) =
|
||||
let
|
||||
peer = Peer.new
|
||||
testItemLst = data.to(seq[TestItem])
|
||||
tmpDir = "sample0.nim".findFilePath.value.splitFile.dir
|
||||
db = if persistent: tmpDir.testDbs() else: testDbs()
|
||||
root = sample.root
|
||||
testItemLst = sample.data.to(seq[TestItem])
|
||||
tmpDir = getTmpDir()
|
||||
db = if persistent: tmpDir.testDbs(sample.name) else: testDbs()
|
||||
dbDir = db.dbDir.split($DirSep).lastTwo.join($DirSep)
|
||||
info = if db.persistent: &"persistent db on \"{dbDir}\""
|
||||
else: "in-memory db"
|
||||
info = if db.persistent: &"persistent db on \"{dbDir}\"" else: "in-memory db"
|
||||
|
||||
defer:
|
||||
if db.persistent:
|
||||
tmpDir.flushDbDir
|
||||
tmpDir.flushDbDir(sample.name)
|
||||
|
||||
suite &"SyncSnap: accounts and proofs for {info}":
|
||||
var
|
||||
base: AccountsDbRef
|
||||
desc: AccountsDbSessionRef
|
||||
suite &"SyncSnap: {sample.name} accounts and proofs for {info}":
|
||||
|
||||
test &"Verifying {testItemLst.len} snap items for state root ..{root.pp}":
|
||||
base = AccountsDbRef.init(db.inst[0])
|
||||
for n,w in testItemLst:
|
||||
check base.importAccounts(peer, root, w.base, w.data) == OkAccDb
|
||||
let dbBase = if persistent: AccountsDbRef.init(db.cdb[0])
|
||||
else: AccountsDbRef.init(newMemoryDB())
|
||||
if not dbBase.dbBackendRocksDb():
|
||||
skip()
|
||||
else:
|
||||
for n,w in testItemLst:
|
||||
check dbBase.importAccounts(
|
||||
peer, root, w.base, w.data, storeData = persistent) == OkAccDb
|
||||
noisy.say "***", "import stats=", dbBase.dbImportStats.pp
|
||||
|
||||
test &"Merging {testItemLst.len} proofs for state root ..{root.pp}":
|
||||
base = AccountsDbRef.init(db.inst[1])
|
||||
desc = AccountsDbSessionRef.init(base, root, peer)
|
||||
for n,w in testItemLst:
|
||||
let
|
||||
dbBase = if persistent: AccountsDbRef.init(db.cdb[1])
|
||||
else: AccountsDbRef.init(newMemoryDB())
|
||||
desc = AccountsDbSessionRef.init(dbBase, root, peer)
|
||||
for w in testItemLst:
|
||||
check desc.merge(w.data.proof) == OkAccDb
|
||||
check desc.merge(w.base, w.data.accounts) == OkAccDb
|
||||
desc.assignPrettyKeys() # for debugging (if any)
|
||||
check desc.interpolate() == OkAccDb
|
||||
let
|
||||
base = testItemLst.mapIt(it.base).sortMerge
|
||||
accounts = testItemLst.mapIt(it.data.accounts).sortMerge
|
||||
check desc.merge(base, accounts) == OkAccDb
|
||||
desc.assignPrettyKeys() # for debugging (if any)
|
||||
check desc.interpolate() == OkAccDb
|
||||
|
||||
# echo ">>> ", desc.dumpProofsDB.join("\n ")
|
||||
if dbBase.dbBackendRocksDb():
|
||||
check desc.dbImports() == OkAccDb
|
||||
noisy.say "***", "import stats=", desc.dbImportStats.pp
|
||||
|
||||
for acc in accounts:
|
||||
let
|
||||
byChainDB = desc.getChainDbAccount(acc.accHash)
|
||||
byRockyBulker = desc.getRockyAccount(acc.accHash)
|
||||
noisy.say "*** find",
|
||||
"byChainDb=", byChainDB.pp, " inBulker=", byRockyBulker.pp
|
||||
check byChainDB.isOk
|
||||
check byRockyBulker.isOk
|
||||
check byChainDB == byRockyBulker
|
||||
|
||||
#noisy.say "***", "database dump\n ", desc.dumpProofsDB.join("\n ")
|
||||
|
||||
|
||||
proc importRunner(noisy = true; persistent = true; capture = goerliCapture) =
|
||||
|
||||
let
|
||||
fileInfo = capture.file.splitFile.name.split(".")[0]
|
||||
filePath = capture.file.findFilePath(baseDir,repoDir).value
|
||||
tmpDir = getTmpDir()
|
||||
db = if persistent: tmpDir.testDbs(capture.name) else: testDbs()
|
||||
numBlocksInfo = if capture.numBlocks == high(int): "" else: $capture.numBlocks & " "
|
||||
loadNoise = noisy
|
||||
|
||||
defer:
|
||||
if db.persistent:
|
||||
tmpDir.flushDbDir(capture.name)
|
||||
|
||||
suite &"SyncSnap: using {fileInfo} capture for testing db timings":
|
||||
var
|
||||
ddb: BaseChainDB # perstent DB on disk
|
||||
chn: Chain
|
||||
|
||||
test &"Create persistent BaseChainDB on {tmpDir}":
|
||||
let chainDb = if db.persistent: db.cdb[0].trieDB
|
||||
else: newMemoryDB()
|
||||
|
||||
# Constructor ...
|
||||
ddb = newBaseChainDB(
|
||||
chainDb,
|
||||
id = capture.network,
|
||||
pruneTrie = true,
|
||||
params = capture.network.networkParams)
|
||||
|
||||
ddb.initializeEmptyDb
|
||||
chn = ddb.newChain
|
||||
|
||||
test &"Storing {numBlocksInfo}persistent blocks from dump":
|
||||
for w in filePath.undumpNextGroup:
|
||||
let (fromBlock, toBlock) = (w[0][0].blockNumber, w[0][^1].blockNumber)
|
||||
if fromBlock == 0.u256:
|
||||
doAssert w[0][0] == ddb.getBlockHeader(0.u256)
|
||||
continue
|
||||
# Message if [fromBlock,toBlock] contains a multiple of 700
|
||||
if fromBlock + (toBlock mod 900) <= toBlock:
|
||||
loadNoise.say "***", &"processing ...[#{fromBlock},#{toBlock}]..."
|
||||
check chn.persistBlocks(w[0], w[1]).isOk
|
||||
if capture.numBlocks.toBlockNumber <= w[0][^1].blockNumber:
|
||||
break
|
||||
|
||||
test "Extract key-value records into memory tables via rocksdb iterator":
|
||||
# Implicit test: if not persistent => db.cdb[0] is nil
|
||||
if db.cdb[0].rocksStoreRef.isNil:
|
||||
skip()
|
||||
else:
|
||||
let
|
||||
rdb = db.cdb[0].rocksStoreRef
|
||||
rop = rdb.store.readOptions
|
||||
rit = rdb.store.db.rocksdb_create_iterator(rop)
|
||||
check not rit.isNil
|
||||
|
||||
xTab32.clear
|
||||
xTab33.clear
|
||||
|
||||
rit.rocksdb_iter_seek_to_first()
|
||||
while rit.rocksdb_iter_valid() != 0:
|
||||
let (key,val) = rit.thisRecord()
|
||||
rit.rocksdb_iter_next()
|
||||
if key.len == 32:
|
||||
xTab32[key.to(ByteArray32)] = val
|
||||
xVal32Sum += val.len.float
|
||||
xVal32SqSum += val.len.float * val.len.float
|
||||
check key.to(ByteArray32).to(Blob) == key
|
||||
elif key.len == 33:
|
||||
xTab33[key.to(ByteArray33)] = val
|
||||
xVal33Sum += val.len.float
|
||||
xVal33SqSum += val.len.float * val.len.float
|
||||
check key.to(ByteArray33).to(Blob) == key
|
||||
else:
|
||||
noisy.say "***", "ignoring key=", key.toHex
|
||||
|
||||
rit.rocksdb_iter_destroy()
|
||||
|
||||
var
|
||||
(mean32, stdv32) = meanStdDev(xVal32Sum, xVal32SqSum, xTab32.len)
|
||||
(mean33, stdv33) = meanStdDev(xVal33Sum, xVal33SqSum, xTab33.len)
|
||||
noisy.say "***",
|
||||
"key 32 table: ",
|
||||
&"size={xTab32.len} valLen={(mean32+0.5).int}({(stdv32+0.5).int})",
|
||||
", key 33 table: ",
|
||||
&"size={xTab33.len} valLen={(mean33+0.5).int}({(stdv33+0.5).int})"
|
||||
|
||||
|
||||
proc storeRunner(noisy = true; persistent = true; cleanUp = true) =
|
||||
let
|
||||
fullNoise = false
|
||||
var
|
||||
emptyDb = "empty"
|
||||
|
||||
# Allows to repeat storing on existing data
|
||||
if not xDbs.cdb[0].isNil:
|
||||
emptyDb = "pre-loaded"
|
||||
elif persistent:
|
||||
xTmpDir = getTmpDir()
|
||||
xDbs = xTmpDir.testDbs("store-runner")
|
||||
else:
|
||||
xDbs = testDbs()
|
||||
|
||||
defer:
|
||||
if xDbs.persistent and cleanUp:
|
||||
xTmpDir.flushDbDir("store-runner")
|
||||
xDbs.reset
|
||||
|
||||
suite &"SyncSnap: storage tests on {emptyDb} databases":
|
||||
#
|
||||
# `xDbs` instance slots layout:
|
||||
#
|
||||
# * cdb[0] -- direct db, key length 32, no transaction
|
||||
# * cdb[1] -- direct db, key length 32 as 33, no transaction
|
||||
#
|
||||
# * cdb[2] -- direct db, key length 32, transaction based
|
||||
# * cdb[3] -- direct db, key length 32 as 33, transaction based
|
||||
#
|
||||
# * cdb[4] -- direct db, key length 33, no transaction
|
||||
# * cdb[5] -- direct db, key length 33, transaction based
|
||||
#
|
||||
# * cdb[6] -- rocksdb, key length 32
|
||||
# * cdb[7] -- rocksdb, key length 32 as 33
|
||||
# * cdb[8] -- rocksdb, key length 33
|
||||
#
|
||||
doAssert 9 <= nTestDbInstances
|
||||
|
||||
if xTab32.len == 0 or xTab33.len == 0:
|
||||
test &"Both tables with 32 byte keys(size={xTab32.len}), " &
|
||||
&"33 byte keys(size={xTab32.len}) must be non-empty":
|
||||
skip()
|
||||
else:
|
||||
# cdb[0] -- direct db, key length 32, no transaction
|
||||
test &"Directly store {xTab32.len} records " &
|
||||
&"(key length 32) into {emptyDb} trie database":
|
||||
var ela: Duration
|
||||
let tdb = xDbs.cdb[0].trieDB
|
||||
|
||||
if noisy: echo ""
|
||||
noisy.showElapsed("Standard db loader(keyLen 32)", ela):
|
||||
for (key,val) in xTab32.pairs:
|
||||
tdb.put(key, val)
|
||||
|
||||
if ela.inNanoseconds != 0:
|
||||
let
|
||||
elaNs = ela.inNanoseconds.float
|
||||
perRec = ((elaNs / xTab32.len.float) + 0.5).int.initDuration
|
||||
noisy.say "***",
|
||||
"nRecords=", xTab32.len, ", ",
|
||||
"perRecord=", perRec.pp
|
||||
|
||||
# cdb[1] -- direct db, key length 32 as 33, no transaction
|
||||
test &"Directly store {xTab32.len} records " &
|
||||
&"(key length 33) into {emptyDb} trie database":
|
||||
var ela = initDuration()
|
||||
let tdb = xDbs.cdb[1].trieDB
|
||||
|
||||
if noisy: echo ""
|
||||
noisy.showElapsed("Standard db loader(keyLen 32 as 33)", ela):
|
||||
for (key,val) in xTab32.pairs:
|
||||
tdb.put(@[99.byte] & key.toSeq, val)
|
||||
|
||||
if ela.inNanoseconds != 0:
|
||||
let
|
||||
elaNs = ela.inNanoseconds.float
|
||||
perRec = ((elaNs / xTab32.len.float) + 0.5).int.initDuration
|
||||
noisy.say "***",
|
||||
"nRecords=", xTab32.len, ", ",
|
||||
"perRecord=", perRec.pp
|
||||
|
||||
# cdb[2] -- direct db, key length 32, transaction based
|
||||
test &"Transactionally store {xTab32.len} records " &
|
||||
&"(key length 32) into {emptyDb} trie database":
|
||||
var ela: Duration
|
||||
let tdb = xDbs.cdb[2].trieDB
|
||||
|
||||
if noisy: echo ""
|
||||
noisy.showElapsed("Standard db loader(tx,keyLen 32)", ela):
|
||||
let dbTx = tdb.beginTransaction
|
||||
defer: dbTx.commit
|
||||
|
||||
for (key,val) in xTab32.pairs:
|
||||
tdb.put(key, val)
|
||||
|
||||
if ela.inNanoseconds != 0:
|
||||
let
|
||||
elaNs = ela.inNanoseconds.float
|
||||
perRec = ((elaNs / xTab32.len.float) + 0.5).int.initDuration
|
||||
noisy.say "***",
|
||||
"nRecords=", xTab32.len, ", ",
|
||||
"perRecord=", perRec.pp
|
||||
|
||||
# cdb[3] -- direct db, key length 32 as 33, transaction based
|
||||
test &"Transactionally store {xTab32.len} records " &
|
||||
&"(key length 33) into {emptyDb} trie database":
|
||||
var ela: Duration
|
||||
let tdb = xDbs.cdb[3].trieDB
|
||||
|
||||
if noisy: echo ""
|
||||
noisy.showElapsed("Standard db loader(tx,keyLen 32 as 33)", ela):
|
||||
let dbTx = tdb.beginTransaction
|
||||
defer: dbTx.commit
|
||||
|
||||
for (key,val) in xTab32.pairs:
|
||||
tdb.put(@[99.byte] & key.toSeq, val)
|
||||
|
||||
if ela.inNanoseconds != 0:
|
||||
let
|
||||
elaNs = ela.inNanoseconds.float
|
||||
perRec = ((elaNs / xTab32.len.float) + 0.5).int.initDuration
|
||||
noisy.say "***",
|
||||
"nRecords=", xTab32.len, ", ",
|
||||
"perRecord=", perRec.pp
|
||||
|
||||
# cdb[4] -- direct db, key length 33, no transaction
|
||||
test &"Directly store {xTab33.len} records " &
|
||||
&"(key length 33) into {emptyDb} trie database":
|
||||
var ela: Duration
|
||||
let tdb = xDbs.cdb[4].trieDB
|
||||
|
||||
if noisy: echo ""
|
||||
noisy.showElapsed("Standard db loader(keyLen 33)", ela):
|
||||
for (key,val) in xTab33.pairs:
|
||||
tdb.put(key, val)
|
||||
|
||||
if ela.inNanoseconds != 0:
|
||||
let
|
||||
elaNs = ela.inNanoseconds.float
|
||||
perRec = ((elaNs / xTab33.len.float) + 0.5).int.initDuration
|
||||
noisy.say "***",
|
||||
"nRecords=", xTab33.len, ", ",
|
||||
"perRecord=", perRec.pp
|
||||
|
||||
# cdb[5] -- direct db, key length 33, transaction based
|
||||
test &"Transactionally store {xTab33.len} records " &
|
||||
&"(key length 33) into {emptyDb} trie database":
|
||||
var ela: Duration
|
||||
let tdb = xDbs.cdb[5].trieDB
|
||||
|
||||
if noisy: echo ""
|
||||
noisy.showElapsed("Standard db loader(tx,keyLen 33)", ela):
|
||||
let dbTx = tdb.beginTransaction
|
||||
defer: dbTx.commit
|
||||
|
||||
for (key,val) in xTab33.pairs:
|
||||
tdb.put(key, val)
|
||||
|
||||
if ela.inNanoseconds != 0:
|
||||
let
|
||||
elaNs = ela.inNanoseconds.float
|
||||
perRec = ((elaNs / xTab33.len.float) + 0.5).int.initDuration
|
||||
noisy.say "***",
|
||||
"nRecords=", xTab33.len, ", ",
|
||||
"perRecord=", perRec.pp
|
||||
|
||||
if xDbs.cdb[0].rocksStoreRef.isNil:
|
||||
test "The rocksdb interface must be available": skip()
|
||||
else:
|
||||
# cdb[6] -- rocksdb, key length 32
|
||||
test &"Store {xTab32.len} records " &
|
||||
"(key length 32) into empty rocksdb table":
|
||||
var
|
||||
ela: array[4,Duration]
|
||||
size: int64
|
||||
let
|
||||
rdb = xDbs.cdb[6].rocksStoreRef
|
||||
|
||||
# Note that 32 and 33 size keys cannot be usefiully merged into the
|
||||
# same SST file. The keys must be added in a sorted mode. So playing
|
||||
# safe, key sizes should be of
|
||||
# equal length.
|
||||
|
||||
if noisy: echo ""
|
||||
noisy.showElapsed("Rocky bulk loader(keyLen 32)", ela[0]):
|
||||
let bulker = RockyBulkLoadRef.init(rdb)
|
||||
defer: bulker.destroy()
|
||||
check bulker.begin("rocky-bulk-cache")
|
||||
|
||||
var
|
||||
keyList = newSeq[NodeTag](xTab32.len)
|
||||
|
||||
fullNoise.showElapsed("Rocky bulk loader/32, sorter", ela[1]):
|
||||
var inx = 0
|
||||
for key in xTab32.keys:
|
||||
keyList[inx] = key.to(NodeTag)
|
||||
inx.inc
|
||||
keyList.sort(cmp)
|
||||
|
||||
fullNoise.showElapsed("Rocky bulk loader/32, append", ela[2]):
|
||||
for n,nodeTag in keyList:
|
||||
let key = nodeTag.to(Blob)
|
||||
check bulker.add(key, xTab32[key.to(ByteArray32)])
|
||||
|
||||
fullNoise.showElapsed("Rocky bulk loader/32, slurp", ela[3]):
|
||||
let rc = bulker.finish()
|
||||
if rc.isOk:
|
||||
size = rc.value
|
||||
else:
|
||||
check bulker.lastError == "" # force printing error
|
||||
|
||||
fullNoise.say "***", " ela[]=", $ela.toSeq.mapIt(it.pp)
|
||||
if ela[0].inNanoseconds != 0:
|
||||
let
|
||||
elaNs = ela.toSeq.mapIt(it.inNanoseconds.float)
|
||||
elaPc = elaNs.mapIt(((it / elaNs[0]) * 100 + 0.5).int)
|
||||
perRec = ((elaNs[0] / xTab32.len.float) + 0.5).int.initDuration
|
||||
noisy.say "***",
|
||||
"nRecords=", xTab32.len, ", ",
|
||||
"perRecord=", perRec.pp, ", ",
|
||||
"sstSize=", size.uint64.toSI, ", ",
|
||||
"perRecord=", ((size.float / xTab32.len.float) + 0.5).int, ", ",
|
||||
["Total","Sorter","Append","Ingest"].zip(elaPc).ppKvPc
|
||||
|
||||
# cdb[7] -- rocksdb, key length 32 as 33
|
||||
test &"Store {xTab32.len} records " &
|
||||
"(key length 33) into empty rocksdb table":
|
||||
var
|
||||
ela: array[4,Duration]
|
||||
size: int64
|
||||
let
|
||||
rdb = xDbs.cdb[7].rocksStoreRef
|
||||
|
||||
# Note that 32 and 33 size keys cannot be usefiully merged into the
|
||||
# same SST file. The keys must be added in a sorted mode. So playing
|
||||
# safe, key sizes should be of
|
||||
# equal length.
|
||||
|
||||
if noisy: echo ""
|
||||
noisy.showElapsed("Rocky bulk loader(keyLen 32 as 33)", ela[0]):
|
||||
let bulker = RockyBulkLoadRef.init(rdb)
|
||||
defer: bulker.destroy()
|
||||
check bulker.begin("rocky-bulk-cache")
|
||||
|
||||
var
|
||||
keyList = newSeq[NodeTag](xTab32.len)
|
||||
|
||||
fullNoise.showElapsed("Rocky bulk loader/32 as 33, sorter", ela[1]):
|
||||
var inx = 0
|
||||
for key in xTab32.keys:
|
||||
keyList[inx] = key.to(NodeTag)
|
||||
inx.inc
|
||||
keyList.sort(cmp)
|
||||
|
||||
fullNoise.showElapsed("Rocky bulk loader/32 as 33, append", ela[2]):
|
||||
for n,nodeTag in keyList:
|
||||
let key = nodeTag.to(Blob)
|
||||
check bulker.add(@[99.byte] & key, xTab32[key.to(ByteArray32)])
|
||||
|
||||
fullNoise.showElapsed("Rocky bulk loader/32 as 33, slurp", ela[3]):
|
||||
let rc = bulker.finish()
|
||||
if rc.isOk:
|
||||
size = rc.value
|
||||
else:
|
||||
check bulker.lastError == "" # force printing error
|
||||
|
||||
fullNoise.say "***", " ela[]=", $ela.toSeq.mapIt(it.pp)
|
||||
if ela[0].inNanoseconds != 0:
|
||||
let
|
||||
elaNs = ela.toSeq.mapIt(it.inNanoseconds.float)
|
||||
elaPc = elaNs.mapIt(((it / elaNs[0]) * 100 + 0.5).int)
|
||||
perRec = ((elaNs[0] / xTab32.len.float) + 0.5).int.initDuration
|
||||
noisy.say "***",
|
||||
"nRecords=", xTab32.len, ", ",
|
||||
"perRecord=", perRec.pp, ", ",
|
||||
"sstSize=", size.uint64.toSI, ", ",
|
||||
"perRecord=", ((size.float / xTab32.len.float) + 0.5).int, ", ",
|
||||
["Total","Sorter","Append","Ingest"].zip(elaPc).ppKvPc
|
||||
|
||||
|
||||
# cdb[8] -- rocksdb, key length 33
|
||||
test &"Store {xTab33.len} records " &
|
||||
&"(key length 33) into {emptyDb} rocksdb table":
|
||||
var
|
||||
ela: array[4,Duration]
|
||||
size: int64
|
||||
let rdb = xDbs.cdb[8].rocksStoreRef
|
||||
|
||||
# Note that 32 and 33 size keys cannot be usefiully merged into the
|
||||
# same SST file. The keys must be added in a sorted mode. So playing
|
||||
# safe, key sizes should be of equal length.
|
||||
|
||||
if noisy: echo ""
|
||||
noisy.showElapsed("Rocky bulk loader(keyLen 33)", ela[0]):
|
||||
let bulker = RockyBulkLoadRef.init(rdb)
|
||||
defer: bulker.destroy()
|
||||
check bulker.begin("rocky-bulk-cache")
|
||||
|
||||
var
|
||||
kKeys: seq[byte] # need to cacscade
|
||||
kTab: Table[byte,seq[NodeTag]]
|
||||
|
||||
fullNoise.showElapsed("Rocky bulk loader/33, sorter", ela[1]):
|
||||
for key in xTab33.keys:
|
||||
if kTab.hasKey(key[0]):
|
||||
kTab[key[0]].add key.toOpenArray(1,32).to(NodeTag)
|
||||
else:
|
||||
kTab[key[0]] = @[key.toOpenArray(1,32).to(NodeTag)]
|
||||
|
||||
kKeys = toSeq(kTab.keys).sorted
|
||||
for w in kKeys:
|
||||
kTab[w].sort(cmp)
|
||||
|
||||
fullNoise.showElapsed("Rocky bulk loader/33, append", ela[2]):
|
||||
for w in kKeys:
|
||||
fullNoise.say "***", " prefix=", w, " entries=", kTab[w].len
|
||||
for n,nodeTag in kTab[w]:
|
||||
let key = (w,nodeTag).to(Blob)
|
||||
check bulker.add(key, xTab33[key.to(ByteArray33)])
|
||||
|
||||
fullNoise.showElapsed("Rocky bulk loader/33, slurp", ela[3]):
|
||||
let rc = bulker.finish()
|
||||
if rc.isOk:
|
||||
size = rc.value
|
||||
else:
|
||||
check bulker.lastError == "" # force printing error
|
||||
|
||||
fullNoise.say "***", " ela[]=", $ela.toSeq.mapIt(it.pp)
|
||||
if ela[0].inNanoseconds != 0:
|
||||
let
|
||||
elaNs = ela.toSeq.mapIt(it.inNanoseconds.float)
|
||||
elaPc = elaNs.mapIt(((it / elaNs[0]) * 100 + 0.5).int)
|
||||
perRec = ((elaNs[0] / xTab33.len.float) + 0.5).int.initDuration
|
||||
noisy.say "***",
|
||||
"nRecords=", xTab33.len, ", ",
|
||||
"perRecord=", perRec.pp, ", ",
|
||||
"sstSize=", size.uint64.toSI, ", ",
|
||||
"perRecord=", ((size.float / xTab33.len.float) + 0.5).int, ", ",
|
||||
["Total","Cascaded-Sorter","Append","Ingest"].zip(elaPc).ppKvPc
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# Main function(s)
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc syncSnapMain*(noisy = defined(debug)) =
|
||||
noisy.accountsRunner(
|
||||
persistent = false, sample0.snapRoot, sample0.snapProofData)
|
||||
noisy.accountsRunner()
|
||||
noisy.importRunner() # small sample, just verify functionality
|
||||
noisy.storeRunner()
|
||||
|
||||
when isMainModule:
|
||||
const
|
||||
noisy = defined(debug) or true
|
||||
test00 = (sample0.snapRoot, @[sample0.snapProofData0])
|
||||
test01 = (sample0.snapRoot, sample0.snapProofData)
|
||||
#test10 = (sample1.snapRoot, @[sample1.snapProofData1])
|
||||
#test11 = (sample1.snapRoot, sample1.snapProofData)
|
||||
snapTest0 = accSample0
|
||||
snapTest1 = AccountsProofSample(
|
||||
name: "test1",
|
||||
root: snapTest0.root,
|
||||
data: snapTest0.data[0..0])
|
||||
|
||||
setTraceLevel()
|
||||
bulkTest0 = goerliCapture
|
||||
bulkTest1: CaptureSpecs = (
|
||||
name: "full-goerli",
|
||||
network: goerliCapture.network,
|
||||
file: goerliCapture.file,
|
||||
numBlocks: high(int))
|
||||
bulkTest2: CaptureSpecs = (
|
||||
name: "more-goerli",
|
||||
network: GoerliNet,
|
||||
file: "goerli482304.txt.gz",
|
||||
numBlocks: high(int))
|
||||
bulkTest3: CaptureSpecs = (
|
||||
name: "mainnet",
|
||||
network: MainNet,
|
||||
file: "mainnet332160.txt.gz",
|
||||
numBlocks: high(int))
|
||||
|
||||
when false: # or true:
|
||||
import ../../nimbus-eth1-blobs/replay/sync_sample1 as sample1
|
||||
const
|
||||
snapTest2 = AccountsProofSample(
|
||||
name: "test2",
|
||||
root: sample1.snapRoot,
|
||||
data: sample1.snapProofData)
|
||||
snapTest3 = AccountsProofSample(
|
||||
name: "test3",
|
||||
root: snapTest2.root,
|
||||
data: snapTest2.data[0..0])
|
||||
|
||||
#setTraceLevel()
|
||||
setErrorLevel()
|
||||
|
||||
noisy.accountsRunner(persistent=false, test00[0], test00[1])
|
||||
noisy.accountsRunner(persistent=false, test01[0], test01[1])
|
||||
#noisy.accountsRunner(persistent=false, test10[0], test10[1])
|
||||
#noisy.accountsRunner(persistent=false, test11[0], test11[1])
|
||||
#noisy.accountsRunner(persistent=true, snapTest0)
|
||||
#noisy.accountsRunner(persistent=true, snapTest1)
|
||||
|
||||
when defined(snapTest2):
|
||||
discard
|
||||
#noisy.accountsRunner(persistent=true, snapTest2)
|
||||
#noisy.accountsRunner(persistent=true, snapTest3)
|
||||
|
||||
# ---- database storage timings -------
|
||||
|
||||
noisy.showElapsed("importRunner()"):
|
||||
noisy.importRunner(capture = bulkTest0)
|
||||
|
||||
noisy.showElapsed("storeRunner()"):
|
||||
true.storeRunner(cleanUp = false)
|
||||
true.storeRunner()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
@ -213,6 +213,7 @@ proc findFilePath*(file: string;
|
||||
let path = dir / repo / file
|
||||
if path.fileExists:
|
||||
return ok(path)
|
||||
err()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
2
vendor/nim-rocksdb
vendored
2
vendor/nim-rocksdb
vendored
@ -1 +1 @@
|
||||
Subproject commit fb3f2c30b0224466932ee132e3f853e6c35cff7c
|
||||
Subproject commit b7f07282251b043fa07fc071405727dd17c61654
|
Loading…
x
Reference in New Issue
Block a user