nim-blockstore/tests/bench_dataset.nim
Chrysostomos Nanakos 7b23545c27
initial commit
Signed-off-by: Chrysostomos Nanakos <chris@include.gr>
2026-01-05 03:05:14 +02:00

499 lines
16 KiB
Nim

import std/[os, times, strformat, random, options, strutils]
import chronos
import taskpools
import results
import ../blockstore/errors
import ../blockstore/blocks
import ../blockstore/chunker
import ../blockstore/dataset
import ../blockstore/cid
import ../blockstore/merkle
import ../blockstore/ioutils
import ../blockstore/blockmap
when defined(posix):
import std/posix
elif defined(windows):
import std/winlean
const
DefaultSize = 4'u64 * 1024 * 1024 * 1024
DefaultChunkSize = 64 * 1024
DefaultPoolSize = 4
TestDir = "nim_blockstore_bench"
TestFile = TestDir / "testfile.bin"
DbPath = TestDir / "bench_db"
BlocksDir = TestDir / "blocks"
type
BenchConfig = object
totalSize: uint64
chunkSize: int
merkleBackend: MerkleBackend
blockBackend: BlockBackend
blockmapBackend: BlockmapBackend
ioMode: IOMode
syncBatchSize: int
synthetic: bool
reportInterval: float
poolSize: int
blockHashConfig: BlockHashConfig
proc formatSize(bytes: uint64): string =
if bytes >= 1024'u64 * 1024 * 1024 * 1024:
&"{bytes.float / (1024 * 1024 * 1024 * 1024):.2f} TB"
elif bytes >= 1024'u64 * 1024 * 1024:
&"{bytes.float / (1024 * 1024 * 1024):.2f} GB"
elif bytes >= 1024'u64 * 1024:
&"{bytes.float / (1024 * 1024):.2f} MB"
else:
&"{bytes} bytes"
proc formatRate(bytesPerSec: float): string =
if bytesPerSec >= 1024 * 1024 * 1024:
&"{bytesPerSec / (1024 * 1024 * 1024):.2f} GB/s"
elif bytesPerSec >= 1024 * 1024:
&"{bytesPerSec / (1024 * 1024):.2f} MB/s"
else:
&"{bytesPerSec / 1024:.2f} KB/s"
proc parseSize(s: string): uint64 =
var num = s
var multiplier: uint64 = 1
if s.endsWith("TB") or s.endsWith("tb"):
num = s[0..^3]
multiplier = 1024'u64 * 1024 * 1024 * 1024
elif s.endsWith("GB") or s.endsWith("gb"):
num = s[0..^3]
multiplier = 1024'u64 * 1024 * 1024
elif s.endsWith("MB") or s.endsWith("mb"):
num = s[0..^3]
multiplier = 1024'u64 * 1024
elif s.endsWith("KB") or s.endsWith("kb"):
num = s[0..^3]
multiplier = 1024'u64
try:
result = uint64(parseInt(num)) * multiplier
except ValueError:
result = DefaultSize
proc syncFile(f: File) =
flushFile(f)
when defined(posix):
discard fsync(f.getFileHandle().cint)
elif defined(windows):
discard flushFileBuffers(f.getFileHandle())
proc createTestFile(path: string, size: uint64) =
echo &"Creating {formatSize(size)} test file..."
let startTime = epochTime()
randomize()
var f = open(path, fmWrite)
const bufSize = 1024 * 1024
var buf = newSeq[byte](bufSize)
var remaining = size
while remaining > 0:
for i in 0 ..< bufSize:
buf[i] = byte(rand(255))
let writeSize = min(remaining, bufSize.uint64)
discard f.writeBytes(buf, 0, writeSize.int)
remaining -= writeSize
syncFile(f)
f.close()
let elapsed = epochTime() - startTime
let rate = size.float / elapsed
echo &" Created in {elapsed:.2f}s ({formatRate(rate)})"
proc cleanup() =
if dirExists(TestDir):
removeDir(TestDir)
proc runBenchmark(config: BenchConfig) {.async.} =
echo "=== Dataset Ingestion Benchmark ==="
echo &"Size: {formatSize(config.totalSize)}"
echo &"Chunk size: {config.chunkSize div 1024} KB"
echo &"Expected blocks: {config.totalSize div config.chunkSize.uint64}"
echo &"Merkle backend: {config.merkleBackend}"
echo &"Block backend: {config.blockBackend}"
echo &"Blockmap backend: {config.blockmapBackend}"
echo &"IO mode: {config.ioMode}"
echo &"Sync batch size: {config.syncBatchSize}"
echo &"Thread pool size: {config.poolSize}"
echo &"Data mode: {(if config.synthetic: \"synthetic\" else: \"file-based (async)\")}"
echo ""
cleanup()
createDir(TestDir)
createDir(BlocksDir)
if not config.synthetic:
createTestFile(TestFile, config.totalSize)
echo ""
echo "Initializing dataset store..."
let storeResult = newDatasetStore(DbPath, BlocksDir,
blockHashConfig = config.blockHashConfig,
merkleBackend = config.merkleBackend,
blockBackend = config.blockBackend,
blockmapBackend = config.blockmapBackend,
ioMode = config.ioMode,
syncBatchSize = config.syncBatchSize)
if storeResult.isErr:
echo &"Failed to create store: {storeResult.error}"
return
let store = storeResult.value
let filename = if config.synthetic: some("benchmark") else: some(TestFile)
let builderResult = store.startDataset(config.chunkSize.uint32, filename)
if builderResult.isErr:
echo &"Failed to start dataset: {builderResult.error}"
return
var builder = builderResult.value
echo "Ingesting blocks..."
let ingestStart = epochTime()
var blockCount: uint64 = 0
var totalBytes: uint64 = 0
var lastReport = ingestStart
var lastBytes: uint64 = 0
let totalBlocks = config.totalSize div config.chunkSize.uint64
if config.synthetic:
var chunk = newSeq[byte](config.chunkSize)
randomize()
for i in 0 ..< config.chunkSize:
chunk[i] = byte(rand(255))
while totalBytes < config.totalSize:
chunk[0] = byte(blockCount and 0xFF)
chunk[1] = byte((blockCount shr 8) and 0xFF)
chunk[2] = byte((blockCount shr 16) and 0xFF)
chunk[3] = byte((blockCount shr 24) and 0xFF)
let blkResult = newBlock(chunk, config.blockHashConfig)
if blkResult.isErr:
echo &"Failed to create block: {blkResult.error}"
break
let blk = blkResult.value
totalBytes += blk.data.len.uint64
let addResult = await builder.addBlock(blk)
if addResult.isErr:
echo &"Failed to add block: {addResult.error}"
break
blockCount += 1
let now = epochTime()
if now - lastReport >= config.reportInterval:
let intervalBytes = totalBytes - lastBytes
let intervalRate = intervalBytes.float / (now - lastReport)
let overallRate = totalBytes.float / (now - ingestStart)
let progress = (blockCount.float / totalBlocks.float) * 100
let eta = if overallRate > 0: (config.totalSize - totalBytes).float / overallRate else: 0.0
echo &" Progress: {progress:.1f}% | Blocks: {blockCount}/{totalBlocks} | Rate: {formatRate(intervalRate)} (avg: {formatRate(overallRate)}) | ETA: {eta:.0f}s"
lastReport = now
lastBytes = totalBytes
else:
var pool = Taskpool.new(numThreads = config.poolSize)
defer: pool.shutdown()
let streamResult = await builder.chunkFile(pool)
if streamResult.isErr:
echo &"Failed to open file: {streamResult.error}"
return
var stream = streamResult.value
while true:
let blockOpt = await stream.nextBlock()
if blockOpt.isNone:
break
let blockResult = blockOpt.get()
if blockResult.isErr:
echo &"Block read error: {blockResult.error}"
break
let blk = blockResult.value
totalBytes += blk.data.len.uint64
let addResult = await builder.addBlock(blk)
if addResult.isErr:
echo &"Failed to add block: {addResult.error}"
break
blockCount += 1
let now = epochTime()
if now - lastReport >= config.reportInterval:
let intervalBytes = totalBytes - lastBytes
let intervalRate = intervalBytes.float / (now - lastReport)
let overallRate = totalBytes.float / (now - ingestStart)
let progress = (blockCount.float / totalBlocks.float) * 100
let eta = if overallRate > 0: (config.totalSize - totalBytes).float / overallRate else: 0.0
echo &" Progress: {progress:.1f}% | Blocks: {blockCount}/{totalBlocks} | Rate: {formatRate(intervalRate)} (avg: {formatRate(overallRate)}) | ETA: {eta:.0f}s"
lastReport = now
lastBytes = totalBytes
stream.close()
let ingestEnd = epochTime()
let ingestTime = ingestEnd - ingestStart
let ingestRate = totalBytes.float / ingestTime
echo ""
echo "Ingestion complete:"
echo &" Blocks: {blockCount}"
echo &" Bytes: {formatSize(totalBytes)}"
echo &" Time: {ingestTime:.2f}s"
echo &" Rate: {formatRate(ingestRate)}"
echo ""
echo "Finalizing dataset (building merkle tree)..."
let finalizeStart = epochTime()
let datasetResult = await builder.finalize()
if datasetResult.isErr:
echo &"Failed to finalize: {datasetResult.error}"
return
let dataset = datasetResult.value
let finalizeEnd = epochTime()
let finalizeTime = finalizeEnd - finalizeStart
echo &" Finalize time: {finalizeTime:.2f}s"
echo ""
let totalTime = ingestTime + finalizeTime
let overallRate = totalBytes.float / totalTime
echo "=== Write Summary ==="
echo &" Dataset manifest CID: {dataset.manifestCid}"
echo &" Dataset tree CID: {dataset.treeCid}"
echo &" Total blocks: {dataset.blockCount}"
echo &" Total time: {totalTime:.2f}s"
echo &" Overall rate: {formatRate(overallRate)}"
echo &" Storage used: {formatSize(store.used())}"
echo ""
echo "=== Read Benchmark (without verification) ==="
echo "Reading all blocks..."
let readStart = epochTime()
var readBytes: uint64 = 0
var readBlocks = 0
var lastReadReport = readStart
var lastReadBytes: uint64 = 0
for i in 0 ..< dataset.blockCount:
let blockResult = await dataset.getBlock(i)
if blockResult.isErr:
echo &"Failed to read block {i}: {blockResult.error}"
break
let blockOpt = blockResult.value
if blockOpt.isNone:
echo &"Block {i} not found"
break
let (blk, _) = blockOpt.get()
readBytes += blk.data.len.uint64
readBlocks += 1
let now = epochTime()
if now - lastReadReport >= config.reportInterval:
let intervalBytes = readBytes - lastReadBytes
let intervalRate = intervalBytes.float / (now - lastReadReport)
let overallReadRate = readBytes.float / (now - readStart)
let progress = (readBytes.float / totalBytes.float) * 100
echo &" Progress: {progress:.1f}% | Blocks: {readBlocks} | Rate: {formatRate(intervalRate)} (avg: {formatRate(overallReadRate)})"
lastReadReport = now
lastReadBytes = readBytes
let readEnd = epochTime()
let readTime = readEnd - readStart
let readRate = readBytes.float / readTime
echo ""
echo "Read complete (no verification):"
echo &" Blocks read: {readBlocks}"
echo &" Bytes read: {formatSize(readBytes)}"
echo &" Time: {readTime:.2f}s"
echo &" Rate: {formatRate(readRate)}"
echo ""
echo "=== Read Benchmark (with verification) ==="
echo "Reading and verifying all blocks..."
let mhashResult = dataset.treeCid.mhash()
if mhashResult.isErr:
echo &"Failed to get multihash from treeCid: {mhashResult.error}"
return
let mhash = mhashResult.value
var rootHash: MerkleHash
let digestBytes = mhash.data.buffer
if digestBytes.len >= HashSize + 2:
copyMem(addr rootHash[0], unsafeAddr digestBytes[2], HashSize)
else:
echo "Invalid multihash length"
return
let verifyStart = epochTime()
var verifiedBlocks = 0
var verifiedBytes: uint64 = 0
var verifyFailed = 0
var lastVerifyReport = verifyStart
var lastVerifyBytes: uint64 = 0
for i in 0 ..< dataset.blockCount:
let blockResult = await dataset.getBlock(i)
if blockResult.isErr:
echo &"Failed to read block {i}: {blockResult.error}"
break
let blockOpt = blockResult.value
if blockOpt.isNone:
echo &"Block {i} not found"
break
let (blk, proof) = blockOpt.get()
let leafHash = config.blockHashConfig.hashFunc(blk.data)
if not verify(proof, rootHash, leafHash):
verifyFailed += 1
if verifyFailed <= 5:
echo &" WARNING: Block {i} verification failed!"
verifiedBlocks += 1
verifiedBytes += blk.data.len.uint64
let now = epochTime()
if now - lastVerifyReport >= config.reportInterval:
let intervalBytes = verifiedBytes - lastVerifyBytes
let intervalRate = intervalBytes.float / (now - lastVerifyReport)
let overallVerifyRate = verifiedBytes.float / (now - verifyStart)
let progress = (verifiedBytes.float / totalBytes.float) * 100
echo &" Progress: {progress:.1f}% | Verified: {verifiedBlocks} | Failed: {verifyFailed} | Rate: {formatRate(intervalRate)} (avg: {formatRate(overallVerifyRate)})"
lastVerifyReport = now
lastVerifyBytes = verifiedBytes
let verifyEnd = epochTime()
let verifyTime = verifyEnd - verifyStart
let verifyRate = verifiedBytes.float / verifyTime
echo ""
echo "Read with verification complete:"
echo &" Blocks verified: {verifiedBlocks}"
echo &" Verification failures: {verifyFailed}"
echo &" Bytes verified: {formatSize(verifiedBytes)}"
echo &" Time: {verifyTime:.2f}s"
echo &" Rate: {formatRate(verifyRate)}"
echo ""
echo "Closing store..."
await store.closeAsync()
echo "Cleaning up..."
cleanup()
echo "Done!"
proc printUsage() =
echo "Usage: bench_dataset [options]"
echo ""
echo "Options:"
echo " --size=<size> Dataset size (e.g., 1GB, 4GB, 100GB, 1TB)"
echo " --chunk=<size> Chunk size in KB (default: 64)"
echo " --merkle=<type> Merkle backend: embedded, leveldb, packed (default: packed)"
echo " --blocks=<type> Block backend: sharded, packed (default: sharded)"
echo " --blockmap=<type> Blockmap backend: leveldb, file (default: leveldb)"
echo " --io=<mode> I/O mode: direct, buffered (default: direct)"
echo " --sync=<value> Sync batch: none, every, or N (default: none)"
echo " --pool=<size> Thread pool size for async I/O (default: 4, min: 2)"
echo " --synthetic Use synthetic in-memory data (no file I/O)"
echo " --help Show this help"
proc main() =
var config = BenchConfig(
totalSize: DefaultSize,
chunkSize: DefaultChunkSize,
merkleBackend: mbPacked,
blockBackend: bbSharded,
blockmapBackend: bmLevelDb,
ioMode: ioDirect,
syncBatchSize: 0,
synthetic: false,
reportInterval: 1.0,
poolSize: DefaultPoolSize,
blockHashConfig: defaultBlockHashConfig()
)
for arg in commandLineParams():
if arg.startsWith("--size="):
config.totalSize = parseSize(arg[7..^1])
elif arg.startsWith("--chunk="):
config.chunkSize = parseInt(arg[8..^1]) * 1024
elif arg.startsWith("--merkle="):
let backend = arg[9..^1]
case backend
of "embedded", "embeddedproofs": config.merkleBackend = mbEmbeddedProofs
of "leveldb": config.merkleBackend = mbLevelDb
of "packed": config.merkleBackend = mbPacked
else: echo &"Unknown merkle backend: {backend}"; return
elif arg.startsWith("--blocks="):
let backend = arg[9..^1]
case backend
of "sharded": config.blockBackend = bbSharded
of "packed": config.blockBackend = bbPacked
else: echo &"Unknown block backend: {backend}"; return
elif arg.startsWith("--blockmap="):
let backend = arg[11..^1]
case backend
of "leveldb": config.blockmapBackend = bmLevelDb
of "file": config.blockmapBackend = bmFile
else: echo &"Unknown blockmap backend: {backend}"; return
elif arg.startsWith("--io="):
let mode = arg[5..^1]
case mode
of "direct": config.ioMode = ioDirect
of "buffered": config.ioMode = ioBuffered
else: echo &"Unknown IO mode: {mode}"; return
elif arg.startsWith("--sync="):
let value = arg[7..^1]
if value == "none":
config.syncBatchSize = 0
elif value == "every":
config.syncBatchSize = 1
else:
try:
config.syncBatchSize = parseInt(value)
except ValueError:
echo &"Invalid sync batch size: {value}"; return
elif arg.startsWith("--pool="):
try:
config.poolSize = max(2, parseInt(arg[7..^1]))
except ValueError:
echo &"Invalid pool size: {arg[7..^1]}"; return
elif arg == "--synthetic":
config.synthetic = true
elif arg == "--help":
printUsage()
return
waitFor runBenchmark(config)
when isMainModule:
main()