diff --git a/blockstore/blocks.nim b/blockstore/blocks.nim index faffb25..5b83fef 100644 --- a/blockstore/blocks.nim +++ b/blockstore/blocks.nim @@ -45,6 +45,10 @@ proc computeCid*(data: openArray[byte], config: BlockHashConfig): BResult[Cid] = proc computeCid*(data: openArray[byte]): BResult[Cid] = computeCid(data, defaultBlockHashConfig()) +proc cidFromHash*(hash: array[32, byte], config: BlockHashConfig): BResult[Cid] = + let mh = ?wrap(config.hashCode, hash) + newCidV1(config.blockCodec, mh) + proc newBlock*(data: seq[byte], config: BlockHashConfig): BResult[Block] = let c = ?computeCid(data, config) var blk = new(Block) diff --git a/blockstore/dataset.nim b/blockstore/dataset.nim index e2d5534..f218056 100644 --- a/blockstore/dataset.nim +++ b/blockstore/dataset.nim @@ -112,7 +112,6 @@ type mimetype: Option[string] blockHashConfig: BlockHashConfig merkleBuilder: MerkleTreeBuilder - blockCids: seq[Cid] streamingBuilder: StreamingMerkleBuilder merkleStorage: MerkleStorage treeId: string @@ -124,6 +123,7 @@ type blockmapBackend: BlockmapBackend ioMode: IOMode writeHandle: WriteHandle + aborted: bool LruIterator* = ref object datasets: seq[Cid] @@ -833,7 +833,6 @@ proc startDataset*(store: DatasetStore, chunkSize: uint32, filename: Option[stri ioMode: store.ioMode, treeId: treeId, blockIndex: 0, - blockCids: @[], totalSize: 0, store: store ) @@ -1051,6 +1050,29 @@ proc getBlock*(dataset: Dataset, index: int): Future[BResult[Option[(blk.Block, except CatchableError as e: return err(merkleTreeError("Failed to get proof: " & e.msg)) +proc blockIndices*(dataset: Dataset): Slice[int] = + 0 ..< dataset.blockCount + +proc getBlocks*(dataset: Dataset, indices: seq[int]): Future[BResult[seq[Option[(blk.Block, MerkleProof)]]]] {.async.} = + var results = newSeq[Option[(blk.Block, MerkleProof)]](indices.len) + for i, index in indices: + let blockResult = await dataset.getBlock(index) + if blockResult.isErr: + return err(blockResult.error) + results[i] = blockResult.value + return ok(results) + +proc forEachBlock*(dataset: Dataset, + cb: proc(index: int, blk: blk.Block, proof: MerkleProof): Future[void] {.async.}): Future[BResult[void]] {.async.} = + for i in dataset.blockIndices: + let blockResult = await dataset.getBlock(i) + if blockResult.isErr: + return err(blockResult.error) + if blockResult.value.isSome: + let (b, proof) = blockResult.value.get() + await cb(i, b, proof) + return ok() + proc setFilename*(builder: DatasetBuilder, filename: string) = builder.filename = some(filename) @@ -1071,6 +1093,9 @@ proc chunkFile*(builder: DatasetBuilder, pool: Taskpool): Future[BResult[AsyncCh return await chunker.chunkFile(builder.filename.get()) proc addBlock*(builder: DatasetBuilder, b: blk.Block): Future[BResult[int]] {.async.} = + if builder.aborted: + return err(invalidOperationError("Builder has been aborted")) + let blockSize = b.data.len.uint64 index = builder.blockIndex @@ -1078,7 +1103,6 @@ proc addBlock*(builder: DatasetBuilder, b: blk.Block): Future[BResult[int]] {.as case builder.merkleBackend of mbEmbeddedProofs: builder.merkleBuilder.addBlock(b.data) - builder.blockCids.add(b.cid) of mbLevelDb, mbPacked: let leafHash = builder.blockHashConfig.hashFunc(b.data) ?builder.streamingBuilder.addLeaf(leafHash) @@ -1106,6 +1130,9 @@ proc addBlock*(builder: DatasetBuilder, b: blk.Block): Future[BResult[int]] {.as return ok(index) proc finalize*(builder: DatasetBuilder): Future[BResult[Dataset]] {.async.} = + if builder.aborted: + return err(invalidOperationError("Builder has been aborted")) + let blockCount = builder.blockIndex if blockCount == 0: @@ -1210,9 +1237,14 @@ proc finalize*(builder: DatasetBuilder): Future[BResult[Dataset]] {.async.} = for index in chunkStart ..< chunkEnd: let + leafHashOpt = builder.merkleBuilder.getLeafHash(index) + if leafHashOpt.isNone: + return err(invalidBlockError()) + let + blockCid = ?blk.cidFromHash(leafHashOpt.get(), builder.blockHashConfig) proof = ?builder.merkleBuilder.getProof(index) blockRef = BlockRef( - blockCid: $builder.blockCids[index], + blockCid: $blockCid, proof: proof ) blockRefBytes = ?serializeBlockRef(blockRef) @@ -1263,6 +1295,61 @@ proc finalize*(builder: DatasetBuilder): Future[BResult[Dataset]] {.async.} = except LevelDbException as e: return err(databaseError(e.msg)) +proc abort*(builder: DatasetBuilder): Future[BResult[void]] {.async.} = + if builder.aborted: + return ok() + + builder.aborted = true + + case builder.merkleBackend + of mbEmbeddedProofs: + discard + of mbLevelDb: + discard builder.merkleStorage.abort() + of mbPacked: + if not builder.merkleStorage.isNil: + discard builder.merkleStorage.abort() + + if builder.blockBackend == bbPacked: + if not builder.writeHandle.isNil: + builder.writeHandle.close() + let tempDataPath = getTmpPath(builder.store.dataDir, builder.treeId, ".data") + if fileExists(tempDataPath): + try: + removeFile(tempDataPath) + except OSError: + discard + + for i in 0 ..< builder.blockIndex: + let key = datasetBlockKey(builder.treeId, i) + + if builder.blockBackend == bbSharded: + case builder.merkleBackend + of mbEmbeddedProofs: + let leafHashOpt = builder.merkleBuilder.getLeafHash(i) + if leafHashOpt.isSome: + let cidResult = blk.cidFromHash(leafHashOpt.get(), builder.blockHashConfig) + if cidResult.isOk: + discard builder.store.repo.releaseBlock(cidResult.value) + of mbLevelDb, mbPacked: + try: + let valueOpt = builder.store.db.get(key) + if valueOpt.isSome: + let blockRefResult = deserializeBlockRefSimple(cast[seq[byte]](valueOpt.get)) + if blockRefResult.isOk: + let cidResult = cidFromString(blockRefResult.value.blockCid) + if cidResult.isOk: + discard builder.store.repo.releaseBlock(cidResult.value) + except LevelDbException: + discard + + try: + builder.store.db.delete(key) + except LevelDbException: + discard + + return ok() + proc next*(iter: LruIterator): Option[Cid] = if iter.index < iter.datasets.len: result = some(iter.datasets[iter.index]) diff --git a/blockstore/merkle.nim b/blockstore/merkle.nim index 8cb0bc8..80eab92 100644 --- a/blockstore/merkle.nim +++ b/blockstore/merkle.nim @@ -69,6 +69,7 @@ method getHash*(s: MerkleStorage, level: int, index: uint64): Option[MerkleHash] method setMetadata*(s: MerkleStorage, leafCount: uint64, numLevels: int): BResult[void] {.base, raises: [].} method getMetadata*(s: MerkleStorage): tuple[leafCount: uint64, numLevels: int] {.base, raises: [].} method close*(s: MerkleStorage): BResult[void] {.base, gcsafe, raises: [].} +method abort*(s: MerkleStorage): BResult[void] {.base, gcsafe, raises: [].} method flush*(s: MerkleStorage) {.base, gcsafe, raises: [].} proc computeNumLevels*(leafCount: uint64): int = @@ -115,6 +116,9 @@ method getMetadata*(s: MerkleStorage): tuple[leafCount: uint64, numLevels: int] method close*(s: MerkleStorage): BResult[void] {.base, gcsafe, raises: [].} = ok() +method abort*(s: MerkleStorage): BResult[void] {.base, gcsafe, raises: [].} = + ok() + method flush*(s: MerkleStorage) {.base, gcsafe, raises: [].} = discard @@ -182,6 +186,20 @@ method getMetadata*(s: LevelDbMerkleStorage): tuple[leafCount: uint64, numLevels except Exception: (0'u64, 0) +method abort*(s: LevelDbMerkleStorage): BResult[void] {.raises: [].} = + try: + let (leafCount, numLevels) = s.getMetadata() + for level in 0 ..< numLevels: + let nodeCount = nodesAtLevel(leafCount, level) + for index in 0'u64 ..< nodeCount: + s.db.delete(levelDbKey(s.treeId, level, index)) + s.db.delete(levelDbMetaKey(s.treeId)) + ok() + except CatchableError as e: + err(databaseError(e.msg)) + except Exception as e: + err(databaseError(e.msg)) + proc levelTempPath(basePath: string, level: int): string = basePath & ".L" & $level & ".tmp" @@ -358,6 +376,23 @@ method flush*(s: PackedMerkleStorage) {.gcsafe.} = for levelFile in s.levelFiles: flushFile(levelFile) +method abort*(s: PackedMerkleStorage): BResult[void] {.gcsafe, raises: [].} = + try: + if s.readOnly: + s.memFile.close() + else: + for i, levelFile in s.levelFiles: + levelFile.close() + removeFile(levelTempPath(s.path, i + 1)) + s.levelFiles = @[] + s.file.close() + removeFile(s.path) + ok() + except CatchableError as e: + err(ioError(e.msg)) + except Exception as e: + err(ioError(e.msg)) + proc newStreamingMerkleBuilder*(storage: MerkleStorage): StreamingMerkleBuilder = StreamingMerkleBuilder( frontier: @[], @@ -559,6 +594,11 @@ proc rootCid*(builder: MerkleTreeBuilder): BResult[Cid] = proc blockCount*(builder: MerkleTreeBuilder): int = builder.leaves.len +proc getLeafHash*(builder: MerkleTreeBuilder, index: int): Option[array[32, byte]] = + if index < 0 or index >= builder.leaves.len: + return none(array[32, byte]) + some(builder.leaves[index]) + proc getProof*(builder: MerkleTreeBuilder, index: int): BResult[MerkleProof] = if index < 0 or index >= builder.leaves.len: return err(invalidBlockError()) diff --git a/tests/test_dataset.nim b/tests/test_dataset.nim index ef6a2a3..40160f4 100644 --- a/tests/test_dataset.nim +++ b/tests/test_dataset.nim @@ -6,6 +6,7 @@ import ../blockstore/cid import ../blockstore/blocks import ../blockstore/dataset import ../blockstore/blockmap +import ../blockstore/merkle const TestDir = getTempDir() / "nim_blockstore_dataset_test" @@ -330,3 +331,193 @@ suite "Mapped blockmap backend tests": test "mapped blockmap files deleted with dataset": waitFor runMappedBlockmapDeletion() + +proc runAbortBasic() {.async.} = + cleanup() + createDir(TestDir) + createDir(BlocksDir) + + let storeResult = newDatasetStore(DbPath, BlocksDir) + doAssert storeResult.isOk + let store = storeResult.value + defer: store.close() + + let builderResult = store.startDataset(4096.uint32, some("test")) + doAssert builderResult.isOk + var builder = builderResult.value + + for i in 0 ..< 5: + var data = newSeq[byte](4096) + for j in 0 ..< 4096: + data[j] = byte((i * 4096 + j) mod 256) + let blkResult = newBlock(data) + doAssert blkResult.isOk + let addResult = await builder.addBlock(blkResult.value) + doAssert addResult.isOk + + let usedBefore = store.used() + doAssert usedBefore > 0 + + let abortResult = await builder.abort() + doAssert abortResult.isOk + + cleanup() + +proc runAbortPreventsAddBlock() {.async.} = + cleanup() + createDir(TestDir) + createDir(BlocksDir) + + let storeResult = newDatasetStore(DbPath, BlocksDir) + doAssert storeResult.isOk + let store = storeResult.value + defer: store.close() + + let builderResult = store.startDataset(4096.uint32, some("test")) + doAssert builderResult.isOk + var builder = builderResult.value + + let abortResult = await builder.abort() + doAssert abortResult.isOk + + var data = newSeq[byte](4096) + let blkResult = newBlock(data) + doAssert blkResult.isOk + + let addResult = await builder.addBlock(blkResult.value) + doAssert addResult.isErr + doAssert addResult.error.kind == InvalidOperation + + cleanup() + +proc runAbortPreventsFinalize() {.async.} = + cleanup() + createDir(TestDir) + createDir(BlocksDir) + + let storeResult = newDatasetStore(DbPath, BlocksDir) + doAssert storeResult.isOk + let store = storeResult.value + defer: store.close() + + let builderResult = store.startDataset(4096.uint32, some("test")) + doAssert builderResult.isOk + var builder = builderResult.value + + var data = newSeq[byte](4096) + let blkResult = newBlock(data) + doAssert blkResult.isOk + let addResult = await builder.addBlock(blkResult.value) + doAssert addResult.isOk + + let abortResult = await builder.abort() + doAssert abortResult.isOk + + let finalizeResult = await builder.finalize() + doAssert finalizeResult.isErr + doAssert finalizeResult.error.kind == InvalidOperation + + cleanup() + +proc runAbortWithPackedBackend() {.async.} = + cleanup() + createDir(TestDir) + createDir(BlocksDir) + + let storeResult = newDatasetStore(DbPath, BlocksDir, merkleBackend = mbPacked, blockBackend = bbPacked) + doAssert storeResult.isOk + let store = storeResult.value + defer: store.close() + + let builderResult = store.startDataset(4096.uint32, some("test")) + doAssert builderResult.isOk + var builder = builderResult.value + + for i in 0 ..< 5: + var data = newSeq[byte](4096) + for j in 0 ..< 4096: + data[j] = byte((i * 4096 + j) mod 256) + let blkResult = newBlock(data) + doAssert blkResult.isOk + let addResult = await builder.addBlock(blkResult.value) + doAssert addResult.isOk + + let abortResult = await builder.abort() + doAssert abortResult.isOk + + cleanup() + +proc runAbortWithLevelDbBackend() {.async.} = + cleanup() + createDir(TestDir) + createDir(BlocksDir) + + let storeResult = newDatasetStore(DbPath, BlocksDir, merkleBackend = mbLevelDb) + doAssert storeResult.isOk + let store = storeResult.value + defer: store.close() + + let builderResult = store.startDataset(4096.uint32, some("test")) + doAssert builderResult.isOk + var builder = builderResult.value + + for i in 0 ..< 5: + var data = newSeq[byte](4096) + for j in 0 ..< 4096: + data[j] = byte((i * 4096 + j) mod 256) + let blkResult = newBlock(data) + doAssert blkResult.isOk + let addResult = await builder.addBlock(blkResult.value) + doAssert addResult.isOk + + let abortResult = await builder.abort() + doAssert abortResult.isOk + + cleanup() + +proc runAbortIdempotent() {.async.} = + cleanup() + createDir(TestDir) + createDir(BlocksDir) + + let storeResult = newDatasetStore(DbPath, BlocksDir) + doAssert storeResult.isOk + let store = storeResult.value + defer: store.close() + + let builderResult = store.startDataset(4096.uint32, some("test")) + doAssert builderResult.isOk + var builder = builderResult.value + + let abort1 = await builder.abort() + doAssert abort1.isOk + + let abort2 = await builder.abort() + doAssert abort2.isOk + + cleanup() + +suite "DatasetBuilder abort tests": + setup: + cleanup() + + teardown: + cleanup() + + test "abort cleans up builder": + waitFor runAbortBasic() + + test "abort prevents further addBlock": + waitFor runAbortPreventsAddBlock() + + test "abort prevents finalize": + waitFor runAbortPreventsFinalize() + + test "abort with packed backend": + waitFor runAbortWithPackedBackend() + + test "abort with leveldb backend": + waitFor runAbortWithLevelDbBackend() + + test "abort is idempotent": + waitFor runAbortIdempotent()