feat: improve dataset API with batch ops, abort, and memory optimization

Signed-off-by: Chrysostomos Nanakos <chris@include.gr>
This commit is contained in:
Chrysostomos Nanakos 2026-01-05 15:48:56 +02:00
parent 7b23545c27
commit 0bd6ee5da3
No known key found for this signature in database
4 changed files with 326 additions and 4 deletions

View File

@ -45,6 +45,10 @@ proc computeCid*(data: openArray[byte], config: BlockHashConfig): BResult[Cid] =
proc computeCid*(data: openArray[byte]): BResult[Cid] =
computeCid(data, defaultBlockHashConfig())
proc cidFromHash*(hash: array[32, byte], config: BlockHashConfig): BResult[Cid] =
let mh = ?wrap(config.hashCode, hash)
newCidV1(config.blockCodec, mh)
proc newBlock*(data: seq[byte], config: BlockHashConfig): BResult[Block] =
let c = ?computeCid(data, config)
var blk = new(Block)

View File

@ -112,7 +112,6 @@ type
mimetype: Option[string]
blockHashConfig: BlockHashConfig
merkleBuilder: MerkleTreeBuilder
blockCids: seq[Cid]
streamingBuilder: StreamingMerkleBuilder
merkleStorage: MerkleStorage
treeId: string
@ -124,6 +123,7 @@ type
blockmapBackend: BlockmapBackend
ioMode: IOMode
writeHandle: WriteHandle
aborted: bool
LruIterator* = ref object
datasets: seq[Cid]
@ -833,7 +833,6 @@ proc startDataset*(store: DatasetStore, chunkSize: uint32, filename: Option[stri
ioMode: store.ioMode,
treeId: treeId,
blockIndex: 0,
blockCids: @[],
totalSize: 0,
store: store
)
@ -1051,6 +1050,29 @@ proc getBlock*(dataset: Dataset, index: int): Future[BResult[Option[(blk.Block,
except CatchableError as e:
return err(merkleTreeError("Failed to get proof: " & e.msg))
proc blockIndices*(dataset: Dataset): Slice[int] =
0 ..< dataset.blockCount
proc getBlocks*(dataset: Dataset, indices: seq[int]): Future[BResult[seq[Option[(blk.Block, MerkleProof)]]]] {.async.} =
var results = newSeq[Option[(blk.Block, MerkleProof)]](indices.len)
for i, index in indices:
let blockResult = await dataset.getBlock(index)
if blockResult.isErr:
return err(blockResult.error)
results[i] = blockResult.value
return ok(results)
proc forEachBlock*(dataset: Dataset,
cb: proc(index: int, blk: blk.Block, proof: MerkleProof): Future[void] {.async.}): Future[BResult[void]] {.async.} =
for i in dataset.blockIndices:
let blockResult = await dataset.getBlock(i)
if blockResult.isErr:
return err(blockResult.error)
if blockResult.value.isSome:
let (b, proof) = blockResult.value.get()
await cb(i, b, proof)
return ok()
proc setFilename*(builder: DatasetBuilder, filename: string) =
builder.filename = some(filename)
@ -1071,6 +1093,9 @@ proc chunkFile*(builder: DatasetBuilder, pool: Taskpool): Future[BResult[AsyncCh
return await chunker.chunkFile(builder.filename.get())
proc addBlock*(builder: DatasetBuilder, b: blk.Block): Future[BResult[int]] {.async.} =
if builder.aborted:
return err(invalidOperationError("Builder has been aborted"))
let
blockSize = b.data.len.uint64
index = builder.blockIndex
@ -1078,7 +1103,6 @@ proc addBlock*(builder: DatasetBuilder, b: blk.Block): Future[BResult[int]] {.as
case builder.merkleBackend
of mbEmbeddedProofs:
builder.merkleBuilder.addBlock(b.data)
builder.blockCids.add(b.cid)
of mbLevelDb, mbPacked:
let leafHash = builder.blockHashConfig.hashFunc(b.data)
?builder.streamingBuilder.addLeaf(leafHash)
@ -1106,6 +1130,9 @@ proc addBlock*(builder: DatasetBuilder, b: blk.Block): Future[BResult[int]] {.as
return ok(index)
proc finalize*(builder: DatasetBuilder): Future[BResult[Dataset]] {.async.} =
if builder.aborted:
return err(invalidOperationError("Builder has been aborted"))
let blockCount = builder.blockIndex
if blockCount == 0:
@ -1210,9 +1237,14 @@ proc finalize*(builder: DatasetBuilder): Future[BResult[Dataset]] {.async.} =
for index in chunkStart ..< chunkEnd:
let
leafHashOpt = builder.merkleBuilder.getLeafHash(index)
if leafHashOpt.isNone:
return err(invalidBlockError())
let
blockCid = ?blk.cidFromHash(leafHashOpt.get(), builder.blockHashConfig)
proof = ?builder.merkleBuilder.getProof(index)
blockRef = BlockRef(
blockCid: $builder.blockCids[index],
blockCid: $blockCid,
proof: proof
)
blockRefBytes = ?serializeBlockRef(blockRef)
@ -1263,6 +1295,61 @@ proc finalize*(builder: DatasetBuilder): Future[BResult[Dataset]] {.async.} =
except LevelDbException as e:
return err(databaseError(e.msg))
proc abort*(builder: DatasetBuilder): Future[BResult[void]] {.async.} =
if builder.aborted:
return ok()
builder.aborted = true
case builder.merkleBackend
of mbEmbeddedProofs:
discard
of mbLevelDb:
discard builder.merkleStorage.abort()
of mbPacked:
if not builder.merkleStorage.isNil:
discard builder.merkleStorage.abort()
if builder.blockBackend == bbPacked:
if not builder.writeHandle.isNil:
builder.writeHandle.close()
let tempDataPath = getTmpPath(builder.store.dataDir, builder.treeId, ".data")
if fileExists(tempDataPath):
try:
removeFile(tempDataPath)
except OSError:
discard
for i in 0 ..< builder.blockIndex:
let key = datasetBlockKey(builder.treeId, i)
if builder.blockBackend == bbSharded:
case builder.merkleBackend
of mbEmbeddedProofs:
let leafHashOpt = builder.merkleBuilder.getLeafHash(i)
if leafHashOpt.isSome:
let cidResult = blk.cidFromHash(leafHashOpt.get(), builder.blockHashConfig)
if cidResult.isOk:
discard builder.store.repo.releaseBlock(cidResult.value)
of mbLevelDb, mbPacked:
try:
let valueOpt = builder.store.db.get(key)
if valueOpt.isSome:
let blockRefResult = deserializeBlockRefSimple(cast[seq[byte]](valueOpt.get))
if blockRefResult.isOk:
let cidResult = cidFromString(blockRefResult.value.blockCid)
if cidResult.isOk:
discard builder.store.repo.releaseBlock(cidResult.value)
except LevelDbException:
discard
try:
builder.store.db.delete(key)
except LevelDbException:
discard
return ok()
proc next*(iter: LruIterator): Option[Cid] =
if iter.index < iter.datasets.len:
result = some(iter.datasets[iter.index])

View File

@ -69,6 +69,7 @@ method getHash*(s: MerkleStorage, level: int, index: uint64): Option[MerkleHash]
method setMetadata*(s: MerkleStorage, leafCount: uint64, numLevels: int): BResult[void] {.base, raises: [].}
method getMetadata*(s: MerkleStorage): tuple[leafCount: uint64, numLevels: int] {.base, raises: [].}
method close*(s: MerkleStorage): BResult[void] {.base, gcsafe, raises: [].}
method abort*(s: MerkleStorage): BResult[void] {.base, gcsafe, raises: [].}
method flush*(s: MerkleStorage) {.base, gcsafe, raises: [].}
proc computeNumLevels*(leafCount: uint64): int =
@ -115,6 +116,9 @@ method getMetadata*(s: MerkleStorage): tuple[leafCount: uint64, numLevels: int]
method close*(s: MerkleStorage): BResult[void] {.base, gcsafe, raises: [].} =
ok()
method abort*(s: MerkleStorage): BResult[void] {.base, gcsafe, raises: [].} =
ok()
method flush*(s: MerkleStorage) {.base, gcsafe, raises: [].} =
discard
@ -182,6 +186,20 @@ method getMetadata*(s: LevelDbMerkleStorage): tuple[leafCount: uint64, numLevels
except Exception:
(0'u64, 0)
method abort*(s: LevelDbMerkleStorage): BResult[void] {.raises: [].} =
try:
let (leafCount, numLevels) = s.getMetadata()
for level in 0 ..< numLevels:
let nodeCount = nodesAtLevel(leafCount, level)
for index in 0'u64 ..< nodeCount:
s.db.delete(levelDbKey(s.treeId, level, index))
s.db.delete(levelDbMetaKey(s.treeId))
ok()
except CatchableError as e:
err(databaseError(e.msg))
except Exception as e:
err(databaseError(e.msg))
proc levelTempPath(basePath: string, level: int): string =
basePath & ".L" & $level & ".tmp"
@ -358,6 +376,23 @@ method flush*(s: PackedMerkleStorage) {.gcsafe.} =
for levelFile in s.levelFiles:
flushFile(levelFile)
method abort*(s: PackedMerkleStorage): BResult[void] {.gcsafe, raises: [].} =
try:
if s.readOnly:
s.memFile.close()
else:
for i, levelFile in s.levelFiles:
levelFile.close()
removeFile(levelTempPath(s.path, i + 1))
s.levelFiles = @[]
s.file.close()
removeFile(s.path)
ok()
except CatchableError as e:
err(ioError(e.msg))
except Exception as e:
err(ioError(e.msg))
proc newStreamingMerkleBuilder*(storage: MerkleStorage): StreamingMerkleBuilder =
StreamingMerkleBuilder(
frontier: @[],
@ -559,6 +594,11 @@ proc rootCid*(builder: MerkleTreeBuilder): BResult[Cid] =
proc blockCount*(builder: MerkleTreeBuilder): int =
builder.leaves.len
proc getLeafHash*(builder: MerkleTreeBuilder, index: int): Option[array[32, byte]] =
if index < 0 or index >= builder.leaves.len:
return none(array[32, byte])
some(builder.leaves[index])
proc getProof*(builder: MerkleTreeBuilder, index: int): BResult[MerkleProof] =
if index < 0 or index >= builder.leaves.len:
return err(invalidBlockError())

View File

@ -6,6 +6,7 @@ import ../blockstore/cid
import ../blockstore/blocks
import ../blockstore/dataset
import ../blockstore/blockmap
import ../blockstore/merkle
const
TestDir = getTempDir() / "nim_blockstore_dataset_test"
@ -330,3 +331,193 @@ suite "Mapped blockmap backend tests":
test "mapped blockmap files deleted with dataset":
waitFor runMappedBlockmapDeletion()
proc runAbortBasic() {.async.} =
cleanup()
createDir(TestDir)
createDir(BlocksDir)
let storeResult = newDatasetStore(DbPath, BlocksDir)
doAssert storeResult.isOk
let store = storeResult.value
defer: store.close()
let builderResult = store.startDataset(4096.uint32, some("test"))
doAssert builderResult.isOk
var builder = builderResult.value
for i in 0 ..< 5:
var data = newSeq[byte](4096)
for j in 0 ..< 4096:
data[j] = byte((i * 4096 + j) mod 256)
let blkResult = newBlock(data)
doAssert blkResult.isOk
let addResult = await builder.addBlock(blkResult.value)
doAssert addResult.isOk
let usedBefore = store.used()
doAssert usedBefore > 0
let abortResult = await builder.abort()
doAssert abortResult.isOk
cleanup()
proc runAbortPreventsAddBlock() {.async.} =
cleanup()
createDir(TestDir)
createDir(BlocksDir)
let storeResult = newDatasetStore(DbPath, BlocksDir)
doAssert storeResult.isOk
let store = storeResult.value
defer: store.close()
let builderResult = store.startDataset(4096.uint32, some("test"))
doAssert builderResult.isOk
var builder = builderResult.value
let abortResult = await builder.abort()
doAssert abortResult.isOk
var data = newSeq[byte](4096)
let blkResult = newBlock(data)
doAssert blkResult.isOk
let addResult = await builder.addBlock(blkResult.value)
doAssert addResult.isErr
doAssert addResult.error.kind == InvalidOperation
cleanup()
proc runAbortPreventsFinalize() {.async.} =
cleanup()
createDir(TestDir)
createDir(BlocksDir)
let storeResult = newDatasetStore(DbPath, BlocksDir)
doAssert storeResult.isOk
let store = storeResult.value
defer: store.close()
let builderResult = store.startDataset(4096.uint32, some("test"))
doAssert builderResult.isOk
var builder = builderResult.value
var data = newSeq[byte](4096)
let blkResult = newBlock(data)
doAssert blkResult.isOk
let addResult = await builder.addBlock(blkResult.value)
doAssert addResult.isOk
let abortResult = await builder.abort()
doAssert abortResult.isOk
let finalizeResult = await builder.finalize()
doAssert finalizeResult.isErr
doAssert finalizeResult.error.kind == InvalidOperation
cleanup()
proc runAbortWithPackedBackend() {.async.} =
cleanup()
createDir(TestDir)
createDir(BlocksDir)
let storeResult = newDatasetStore(DbPath, BlocksDir, merkleBackend = mbPacked, blockBackend = bbPacked)
doAssert storeResult.isOk
let store = storeResult.value
defer: store.close()
let builderResult = store.startDataset(4096.uint32, some("test"))
doAssert builderResult.isOk
var builder = builderResult.value
for i in 0 ..< 5:
var data = newSeq[byte](4096)
for j in 0 ..< 4096:
data[j] = byte((i * 4096 + j) mod 256)
let blkResult = newBlock(data)
doAssert blkResult.isOk
let addResult = await builder.addBlock(blkResult.value)
doAssert addResult.isOk
let abortResult = await builder.abort()
doAssert abortResult.isOk
cleanup()
proc runAbortWithLevelDbBackend() {.async.} =
cleanup()
createDir(TestDir)
createDir(BlocksDir)
let storeResult = newDatasetStore(DbPath, BlocksDir, merkleBackend = mbLevelDb)
doAssert storeResult.isOk
let store = storeResult.value
defer: store.close()
let builderResult = store.startDataset(4096.uint32, some("test"))
doAssert builderResult.isOk
var builder = builderResult.value
for i in 0 ..< 5:
var data = newSeq[byte](4096)
for j in 0 ..< 4096:
data[j] = byte((i * 4096 + j) mod 256)
let blkResult = newBlock(data)
doAssert blkResult.isOk
let addResult = await builder.addBlock(blkResult.value)
doAssert addResult.isOk
let abortResult = await builder.abort()
doAssert abortResult.isOk
cleanup()
proc runAbortIdempotent() {.async.} =
cleanup()
createDir(TestDir)
createDir(BlocksDir)
let storeResult = newDatasetStore(DbPath, BlocksDir)
doAssert storeResult.isOk
let store = storeResult.value
defer: store.close()
let builderResult = store.startDataset(4096.uint32, some("test"))
doAssert builderResult.isOk
var builder = builderResult.value
let abort1 = await builder.abort()
doAssert abort1.isOk
let abort2 = await builder.abort()
doAssert abort2.isOk
cleanup()
suite "DatasetBuilder abort tests":
setup:
cleanup()
teardown:
cleanup()
test "abort cleans up builder":
waitFor runAbortBasic()
test "abort prevents further addBlock":
waitFor runAbortPreventsAddBlock()
test "abort prevents finalize":
waitFor runAbortPreventsFinalize()
test "abort with packed backend":
waitFor runAbortWithPackedBackend()
test "abort with leveldb backend":
waitFor runAbortWithLevelDbBackend()
test "abort is idempotent":
waitFor runAbortIdempotent()