web3-developer 799acf301d
Added support for namespaces to RocksDb kvstore. (#2066)
* Add new RocksNamespaceRef type and remove backups and readonly support from RocksDb KvStore.

* Bump nim-rocksdb to fc2ba4a836b6b47ae1b17d1c45801c7e06585e19

* Fix tests.

* Fix copyright notice.
2024-03-12 11:04:46 +08:00

507 lines
15 KiB
Nim

# Nimbus
# Copyright (c) 2022-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or
# distributed except according to those terms.
## Snap sync components tester and TDD environment
import
std/[algorithm, math, sequtils, strformat, times],
stew/byteutils,
rocksdb/lib/librocksdb,
rocksdb,
unittest2,
../../nimbus/core/chain,
../../nimbus/db/kvstore_rocksdb,
../../nimbus/db/core_db,
../../nimbus/db/core_db/persistent,
../../nimbus/sync/snap/range_desc,
../../nimbus/sync/snap/worker/db/[hexary_desc, rocky_bulk_load],
../../nimbus/utils/prettify,
../replay/[pp, undump_blocks]
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc to*(b: openArray[byte]; T: type ByteArray32): T =
## Convert to other representation (or exception)
if b.len == 32:
(addr result[0]).copyMem(unsafeAddr b[0], 32)
else:
doAssert b.len == 32
proc to*(b: openArray[byte]; T: type ByteArray33): T =
## Convert to other representation (or exception)
if b.len == 33:
(addr result[0]).copyMem(unsafeAddr b[0], 33)
else:
doAssert b.len == 33
proc to*(b: ByteArray32|ByteArray33; T: type Blob): T =
b.toSeq
proc to*(b: openArray[byte]; T: type NodeTag): T =
## Convert from serialised equivalent
UInt256.fromBytesBE(b).T
proc to*(w: (byte, NodeTag); T: type Blob): T =
let (b,t) = w
@[b] & toSeq(t.UInt256.toBytesBE)
proc to*(t: NodeTag; T: type Blob): T =
toSeq(t.UInt256.toBytesBE)
# ----------------
proc thisRecord(r: ptr rocksdb_iterator_t): (Blob,Blob) =
var kLen, vLen: csize_t
let
kData = r.rocksdb_iter_key(addr kLen)
vData = r.rocksdb_iter_value(addr vLen)
if not kData.isNil and not vData.isNil:
let
key = string.fromBytes(toOpenArrayByte(kData,0,int(kLen)-1))
value = string.fromBytes(toOpenArrayByte(vData,0,int(vLen)-1))
return (key.mapIt(it.byte),value.mapIt(it.byte))
proc meanStdDev(sum, sqSum: float; length: int): (float,float) =
if 0 < length:
result[0] = sum / length.float
result[1] = sqrt(sqSum / length.float - result[0] * result[0])
# ------------------------------------------------------------------------------
# Public functions, pretty printing
# ------------------------------------------------------------------------------
proc pp*(d: Duration): string =
if 40 < d.inSeconds:
d.ppMins
elif 200 < d.inMilliseconds:
d.ppSecs
elif 200 < d.inMicroseconds:
d.ppMs
else:
d.ppUs
proc ppKvPc*(w: openArray[(string,int)]): string =
w.mapIt(&"{it[0]}={it[1]}%").join(", ")
proc say*(noisy = false; pfx = "***"; args: varargs[string, `$`]) =
if noisy:
if args.len == 0:
echo "*** ", pfx
elif 0 < pfx.len and pfx[^1] != ' ':
echo pfx, " ", args.toSeq.join
else:
echo pfx, args.toSeq.join
# ------------------------------------------------------------------------------
# Public test function: setup
# ------------------------------------------------------------------------------
proc test_dbTimingUndumpBlocks*(
noisy: bool;
filePath: string;
com: CommonRef;
numBlocks: int;
loadNoise = false;
) =
## Store persistent blocks from dump into chain DB
let chain = com.newChain
for w in filePath.undumpBlocks:
let (fromBlock, toBlock) = (w[0][0].blockNumber, w[0][^1].blockNumber)
if fromBlock == 0.u256:
doAssert w[0][0] == com.db.getBlockHeader(0.u256)
continue
# Message if [fromBlock,toBlock] contains a multiple of 700
if fromBlock + (toBlock mod 900) <= toBlock:
loadNoise.say "***", &"processing ...[#{fromBlock},#{toBlock}]..."
check chain.persistBlocks(w[0], w[1]) == ValidationResult.OK
if numBlocks.toBlockNumber <= w[0][^1].blockNumber:
break
proc test_dbTimingRockySetup*(
noisy: bool;
t32: var Table[ByteArray32,Blob],
t33: var Table[ByteArray33,Blob],
cdb: CoreDbRef;
) =
## Extract key-value records into memory tables via rocksdb iterator
let
rdb = cdb.backend.toRocksStoreRef
rop = rocksdb_readoptions_create()
rit = rdb.rocksDb.cPtr.rocksdb_create_iterator(rop)
check not rit.isNil
var
v32Sum, v32SqSum: float # statistics
v33Sum, v33SqSum: float
t32.clear
t33.clear
rit.rocksdb_iter_seek_to_first()
while rit.rocksdb_iter_valid() != 0:
let (key,val) = rit.thisRecord()
rit.rocksdb_iter_next()
if key.len == 32:
t32[key.to(ByteArray32)] = val
v32Sum += val.len.float
v32SqSum += val.len.float * val.len.float
check key.to(ByteArray32).to(Blob) == key
elif key.len == 33:
t33[key.to(ByteArray33)] = val
v33Sum += val.len.float
v33SqSum += val.len.float * val.len.float
check key.to(ByteArray33).to(Blob) == key
else:
noisy.say "***", "ignoring key=", key.toHex
rit.rocksdb_iter_destroy()
rop.rocksdb_readoptions_destroy()
var
(mean32, stdv32) = meanStdDev(v32Sum, v32SqSum, t32.len)
(mean33, stdv33) = meanStdDev(v33Sum, v33SqSum, t33.len)
noisy.say "***",
"key 32 table: ",
&"size={t32.len} valLen={(mean32+0.5).int}({(stdv32+0.5).int})",
", key 33 table: ",
&"size={t33.len} valLen={(mean33+0.5).int}({(stdv33+0.5).int})"
# ------------------------------------------------------------------------------
# Public test function: timing
# ------------------------------------------------------------------------------
proc test_dbTimingStoreDirect32*(
noisy: bool;
t32: Table[ByteArray32,Blob];
cdb: CoreDbRef;
) =
## Direct db, key length 32, no transaction
var ela: Duration
let tdb = cdb.kvt
if noisy: echo ""
noisy.showElapsed("Standard db loader(keyLen 32)", ela):
for (key,val) in t32.pairs:
tdb.put(key, val)
if ela.inNanoseconds != 0:
let
elaNs = ela.inNanoseconds.float
perRec = ((elaNs / t32.len.float) + 0.5).int.initDuration
noisy.say "***",
"nRecords=", t32.len, ", ",
"perRecord=", perRec.pp
proc test_dbTimingStoreDirectly32as33*(
noisy: bool;
t32: Table[ByteArray32,Blob],
cdb: CoreDbRef;
) =
## Direct db, key length 32 as 33, no transaction
var ela = initDuration()
let tdb = cdb.kvt
if noisy: echo ""
noisy.showElapsed("Standard db loader(keyLen 32 as 33)", ela):
for (key,val) in t32.pairs:
tdb.put(@[99.byte] & key.toSeq, val)
if ela.inNanoseconds != 0:
let
elaNs = ela.inNanoseconds.float
perRec = ((elaNs / t32.len.float) + 0.5).int.initDuration
noisy.say "***",
"nRecords=", t32.len, ", ",
"perRecord=", perRec.pp
proc test_dbTimingStoreTx32*(
noisy: bool;
t32: Table[ByteArray32,Blob],
cdb: CoreDbRef;
) =
## Direct db, key length 32, transaction based
var ela: Duration
let tdb = cdb.kvt
if noisy: echo ""
noisy.showElapsed("Standard db loader(tx,keyLen 32)", ela):
let dbTx = cdb.beginTransaction
defer: dbTx.commit
for (key,val) in t32.pairs:
tdb.put(key, val)
if ela.inNanoseconds != 0:
let
elaNs = ela.inNanoseconds.float
perRec = ((elaNs / t32.len.float) + 0.5).int.initDuration
noisy.say "***",
"nRecords=", t32.len, ", ",
"perRecord=", perRec.pp
proc test_dbTimingStoreTx32as33*(
noisy: bool;
t32: Table[ByteArray32,Blob],
cdb: CoreDbRef;
) =
## Direct db, key length 32 as 33, transaction based
var ela: Duration
let tdb = cdb.kvt
if noisy: echo ""
noisy.showElapsed("Standard db loader(tx,keyLen 32 as 33)", ela):
let dbTx = cdb.beginTransaction
defer: dbTx.commit
for (key,val) in t32.pairs:
tdb.put(@[99.byte] & key.toSeq, val)
if ela.inNanoseconds != 0:
let
elaNs = ela.inNanoseconds.float
perRec = ((elaNs / t32.len.float) + 0.5).int.initDuration
noisy.say "***",
"nRecords=", t32.len, ", ",
"perRecord=", perRec.pp
proc test_dbTimingDirect33*(
noisy: bool;
t33: Table[ByteArray33,Blob],
cdb: CoreDbRef;
) =
## Direct db, key length 33, no transaction
var ela: Duration
let tdb = cdb.kvt
if noisy: echo ""
noisy.showElapsed("Standard db loader(keyLen 33)", ela):
for (key,val) in t33.pairs:
tdb.put(key, val)
if ela.inNanoseconds != 0:
let
elaNs = ela.inNanoseconds.float
perRec = ((elaNs / t33.len.float) + 0.5).int.initDuration
noisy.say "***",
"nRecords=", t33.len, ", ",
"perRecord=", perRec.pp
proc test_dbTimingTx33*(
noisy: bool;
t33: Table[ByteArray33,Blob],
cdb: CoreDbRef;
) =
## Direct db, key length 33, transaction based
var ela: Duration
let tdb = cdb.kvt
if noisy: echo ""
noisy.showElapsed("Standard db loader(tx,keyLen 33)", ela):
let dbTx = cdb.beginTransaction
defer: dbTx.commit
for (key,val) in t33.pairs:
tdb.put(key, val)
if ela.inNanoseconds != 0:
let
elaNs = ela.inNanoseconds.float
perRec = ((elaNs / t33.len.float) + 0.5).int.initDuration
noisy.say "***",
"nRecords=", t33.len, ", ",
"perRecord=", perRec.pp
proc test_dbTimingRocky32*(
noisy: bool;
t32: Table[ByteArray32,Blob],
cdb: CoreDbRef;
fullNoise = false;
) =
## Rocksdb, key length 32
var
ela: array[4,Duration]
size: int64
let
rdb = cdb.backend.toRocksStoreRef
# Note that 32 and 33 size keys cannot be usefully merged into the same SST
# file. The keys must be added in a sorted mode. So playing safe, key sizes
# should be of equal length.
if noisy: echo ""
noisy.showElapsed("Rocky bulk loader(keyLen 32)", ela[0]):
let bulker = RockyBulkLoadRef.init(rdb)
defer: bulker.destroy()
check bulker.begin("rocky-bulk-cache")
var
keyList = newSeq[NodeTag](t32.len)
fullNoise.showElapsed("Rocky bulk loader/32, sorter", ela[1]):
var inx = 0
for key in t32.keys:
keyList[inx] = key.to(NodeTag)
inx.inc
keyList.sort(cmp)
fullNoise.showElapsed("Rocky bulk loader/32, append", ela[2]):
for n,nodeTag in keyList:
let key = nodeTag.to(Blob)
check bulker.add(key, t32[key.to(ByteArray32)])
fullNoise.showElapsed("Rocky bulk loader/32, slurp", ela[3]):
let rc = bulker.finish()
if rc.isOk:
size = rc.value
else:
check bulker.lastError == "" # force printing error
fullNoise.say "***", " ela[]=", $ela.toSeq.mapIt(it.pp)
if ela[0].inNanoseconds != 0:
let
elaNs = ela.toSeq.mapIt(it.inNanoseconds.float)
elaPc = elaNs.mapIt(((it / elaNs[0]) * 100 + 0.5).int)
perRec = ((elaNs[0] / t32.len.float) + 0.5).int.initDuration
noisy.say "***",
"nRecords=", t32.len, ", ",
"perRecord=", perRec.pp, ", ",
"sstSize=", size.uint64.toSI, ", ",
"perRecord=", ((size.float / t32.len.float) + 0.5).int, ", ",
["Total","Sorter","Append","Ingest"].zip(elaPc).ppKvPc
proc test_dbTimingRocky32as33*(
noisy: bool;
t32: Table[ByteArray32,Blob],
cdb: CoreDbRef;
fullNoise = false;
) =
## Rocksdb, key length 32 as 33
var
ela: array[4,Duration]
size: int64
let
rdb = cdb.backend.toRocksStoreRef
# Note that 32 and 33 size keys cannot be usefiully merged into the same SST
# file. The keys must be added in a sorted mode. So playing safe, key sizes
# should be of equal length.
if noisy: echo ""
noisy.showElapsed("Rocky bulk loader(keyLen 32 as 33)", ela[0]):
let bulker = RockyBulkLoadRef.init(rdb)
defer: bulker.destroy()
check bulker.begin("rocky-bulk-cache")
var
keyList = newSeq[NodeTag](t32.len)
fullNoise.showElapsed("Rocky bulk loader/32 as 33, sorter", ela[1]):
var inx = 0
for key in t32.keys:
keyList[inx] = key.to(NodeTag)
inx.inc
keyList.sort(cmp)
fullNoise.showElapsed("Rocky bulk loader/32 as 33, append", ela[2]):
for n,nodeTag in keyList:
let key = nodeTag.to(Blob)
check bulker.add(@[99.byte] & key, t32[key.to(ByteArray32)])
fullNoise.showElapsed("Rocky bulk loader/32 as 33, slurp", ela[3]):
let rc = bulker.finish()
if rc.isOk:
size = rc.value
else:
check bulker.lastError == "" # force printing error
fullNoise.say "***", " ela[]=", $ela.toSeq.mapIt(it.pp)
if ela[0].inNanoseconds != 0:
let
elaNs = ela.toSeq.mapIt(it.inNanoseconds.float)
elaPc = elaNs.mapIt(((it / elaNs[0]) * 100 + 0.5).int)
perRec = ((elaNs[0] / t32.len.float) + 0.5).int.initDuration
noisy.say "***",
"nRecords=", t32.len, ", ",
"perRecord=", perRec.pp, ", ",
"sstSize=", size.uint64.toSI, ", ",
"perRecord=", ((size.float / t32.len.float) + 0.5).int, ", ",
["Total","Sorter","Append","Ingest"].zip(elaPc).ppKvPc
proc test_dbTimingRocky33*(
noisy: bool;
t33: Table[ByteArray33,Blob],
cdb: CoreDbRef;
fullNoise = false;
) =
## Rocksdb, key length 33
var
ela: array[4,Duration]
size: int64
let rdb = cdb.backend.toRocksStoreRef
# Note that 32 and 33 size keys cannot be usefiully merged into the same SST
# file. The keys must be added in a sorted mode. So playing safe, key sizes
# should be of equal length.
if noisy: echo ""
noisy.showElapsed("Rocky bulk loader(keyLen 33)", ela[0]):
let bulker = RockyBulkLoadRef.init(rdb)
defer: bulker.destroy()
check bulker.begin("rocky-bulk-cache")
var
kKeys: seq[byte] # need to cacscade
kTab: Table[byte,seq[NodeTag]]
fullNoise.showElapsed("Rocky bulk loader/33, sorter", ela[1]):
for key in t33.keys:
if kTab.hasKey(key[0]):
kTab[key[0]].add key.toOpenArray(1,32).to(NodeTag)
else:
kTab[key[0]] = @[key.toOpenArray(1,32).to(NodeTag)]
kKeys = toSeq(kTab.keys).sorted
for w in kKeys:
kTab[w].sort(cmp)
fullNoise.showElapsed("Rocky bulk loader/33, append", ela[2]):
for w in kKeys:
fullNoise.say "***", " prefix=", w, " entries=", kTab[w].len
for n,nodeTag in kTab[w]:
let key = (w,nodeTag).to(Blob)
check bulker.add(key, t33[key.to(ByteArray33)])
fullNoise.showElapsed("Rocky bulk loader/33, slurp", ela[3]):
let rc = bulker.finish()
if rc.isOk:
size = rc.value
else:
check bulker.lastError == "" # force printing error
fullNoise.say "***", " ela[]=", $ela.toSeq.mapIt(it.pp)
if ela[0].inNanoseconds != 0:
let
elaNs = ela.toSeq.mapIt(it.inNanoseconds.float)
elaPc = elaNs.mapIt(((it / elaNs[0]) * 100 + 0.5).int)
perRec = ((elaNs[0] / t33.len.float) + 0.5).int.initDuration
noisy.say "***",
"nRecords=", t33.len, ", ",
"perRecord=", perRec.pp, ", ",
"sstSize=", size.uint64.toSI, ", ",
"perRecord=", ((size.float / t33.len.float) + 0.5).int, ", ",
["Total","Cascaded-Sorter","Append","Ingest"].zip(elaPc).ppKvPc
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------